Source code for dr_data.inject

import sys
import uuid
import psycopg2.extras as psql_extras
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import pandas as pd
import psycopg2
import logging
from progress.bar import Bar
from dr_data.static_strings import *
from dr_data.randoms import Randoms
from dr_data.sql import Sql

__author__ = AUTHOR
__copyright__ = COPYRIGHT
__license__ = LICENSE

_logger = logging.getLogger(__name__)
logging.basicConfig(format='%(message)s', stream=sys.stdout, level=logging.INFO)


[docs]class Inject: """ Inserts one or many randomly regenerated rows """ def __init__(self, schema, configuration): """ Constructor for Inject :param schema: schema file :type schema: JSON :param configuration: configuration file :type configuration: JSON """ self.configuration = configuration conn_info = self.configuration['db'] self.connection = psycopg2.connect(**conn_info) self.database = conn_info['database'] self.cursor = self.connection.cursor() self.insertion_schema = schema
[docs] def execute_cmd(self, how_many): """ Executes the insert command, main entry point :param how_many: How many rows to insert :type how_many: int :return: None :rtype: None """ for key, value in self.insertion_schema.items(): table = list(value.keys())[0] self.populate_table(how_many, table, value[table]['columns'])
[docs] def build_dataframe(self, columns): """ Creates panda dataframe from the list of table columns :param columns: name of the column :type columns: list[str] :return: Panda DataFrame :rtype: DataFrame """ dataframe = {} for column in columns: value = self.set_data_by_type(column) dataframe[column['name']] = [value] return pd.DataFrame(dataframe)
[docs] def set_data_by_type(self, column): """ This sets the value (or data) by the column type :param column: column dictionary :type column: dict :return: value of type :rtype: Any """ value = None # check if is_nullable is True (if False we need a value) if not column['is_nullable']: if 'constraint' in column: constraint_list = list(column['constraint']) constraint_values = list(column['constraint'].values()) types = [const_dict['type'] for const_dict in constraint_values] if 'FOREIGN KEY' in types: index = types.index('FOREIGN KEY') constraint_name = constraint_list[index] constraint = column['constraint'][constraint_name] constraint_table = constraint['referenced_table'] constraint_column = constraint['referenced_column'] value = str(self.get_random_row(constraint_column, constraint_table)[0]) elif 'PRIMARY KEY' in types and column['data_type'] == 'uuid': value = str(uuid.uuid4()) elif 'PRIMARY KEY' in types: value = Randoms.get_hash(25) else: raise Exception(INJECT_NEED_TO_IMPLEMENT_TYPE.format(types=types)) else: if column['data_type'] == 'character varying': value = Randoms.get_custom_text(column['name']) elif column['data_type'] == 'timestamp without time zone': value = Randoms.get_datetime(min_year=2000) elif column['data_type'] == 'timestamp with time zone': value = Randoms.get_datetime_with_timezone(min_year=2000) elif column['data_type'] == 'text': value = Randoms.get_custom_text(column['name']) elif column['data_type'] == 'boolean': value = Randoms.get_boolean() elif column['data_type'] == 'USER-DEFINED': user_defined_values = column['user_defined_type']['values'] value = Randoms.get_value_from_list(user_defined_values) elif column['data_type'] == 'integer': value = Randoms.get_number() else: raise Exception(INJECT_NEED_TO_IMPLEMENT_TYPE.format(types=column['data_type'])) return value
[docs] def populate_table(self, how_many, table_name, columns_data): """ Inserts all table data into the database :param how_many: The amount of rows to insert :type how_many: int :param table_name: The name of the table :type table_name: str :param columns_data: The column meta data :type columns_data: dict :return: None :rtype: None """ progress_bar = Bar(' - building [{}] table '.format(table_name), max=how_many) for index in range(how_many): dataframe = self.build_dataframe(columns_data) columns = ', '.join(dataframe.columns.tolist()) query = Sql.build_populate_insert(table_name, columns) self.insert_table_data(index + 1, query, self.connection, self.cursor, dataframe, how_many) progress_bar.next() progress_bar.finish()
[docs] def insert_table_data( self, index: int, query: str, conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor, df: pd.DataFrame, page_size: int ) -> None: """ Insert table data :param index: index to start :type index: int :param query: The query to run :type query: str :param conn: database connection :type conn: dict :param cur: database cursor :type cur: dict :param df: The dataframe that will be inserted :type df: Dataframe :param page_size: The size of the database page :type page_size: int :return: None :rtype: None """ data_tuples = [tuple(row.to_numpy()) for index, row in df.iterrows()] try: psql_extras.execute_values(cur, query, data_tuples, page_size=page_size) except Exception as error: logging.info("- {index}) FAILED query execution for:: \"{query}\" ".format(index=index, query=cur.query.decode("utf-8"))) logging.info("\n") logging.info("ERROR: {error}".format(error=error)) conn.rollback() cur.close() sys.exit() else: conn.commit()
[docs] def get_random_row(self, columns, table): """ Gets a random row from a table :param columns: The columns of a table :type columns: dict :param table: The name of the table :type table: str :return: Random row from the database :rtype: dict """ self.cursor.execute(Sql.build_random_row(columns, table)) return self.cursor.fetchone()
[docs] def get_random_row_where(self, columns, table, query): """ Gets a random row from a table where something equals something. :param columns: The columns of a table :type columns: dict :param table: The name of the table :type table: str :param query: The execution query :type query: str :return: Random row from the database :rtype: dict """ self.cursor.execute(Sql.build_random_row_where(columns, table, query)) return self.cursor.fetchone()