Source code for dr_data.transplant

import sys
import logging
import psycopg2
from progress.bar import Bar
from dr_data.static_strings import *
from dr_data.utilities.file import FileUtility

__author__ = AUTHOR
__copyright__ = COPYRIGHT
__license__ = LICENSE

_logger = logging.getLogger(__name__)
logging.basicConfig(format='%(message)s', stream=sys.stdout, level=logging.INFO)


[docs]class Transplant: """ Insert one or all CSV files to table """ def __init__(self, configuration): """ Constructor of Transplant :param configuration: configuration file :type configuration: JSON """ self.configuration = configuration conn_info = self.configuration['db'] self.connection = psycopg2.connect(**conn_info) self.database = conn_info['database'] self.cursor = self.connection.cursor()
[docs] def execute_file_cmd(self, source, destination): """ Executes the transplant command, main entry point for file processing :param source: source file to import :type source: str or list :param destination: destination table :type destination: str :return: None :rtype: None """ with open(source, 'r') as csv_file: copy_sql = """COPY "{table_name}" FROM stdin WITH CSV HEADER DELIMITER as ','""".format(table_name=destination) try: self.cursor.copy_expert(sql=copy_sql, file=csv_file) self.connection.commit() except Exception as error: logging.info("- FAILED query execution for:: \"{query}\" ".format(query=self.cursor.query.decode("utf-8"))) logging.info("\n") logging.info("ERROR: {error}".format(error=error)) self.connection.rollback() self.cursor.close() sys.exit()
[docs] def execute_directory_cmd(self, source, schema_data): """ Executes the transplant command, main entry point for directory processing :param source: source file to import :type source: str or list :param destination: destination table :type destination: str :return: None :rtype: None """ files = FileUtility.get_directory_files(source) logging.info('- Total of {count} file'.format(count=len(files))) insertion_order_schema = schema_data[1] skipped_tables = [] progress_bar = Bar('- Importing CSVs to {database}...'.format(database=self.database), max=len(insertion_order_schema.items())) for key, value in insertion_order_schema.items(): key = list(value.keys())[0] if key not in files: skipped_tables.append(key) continue source = files[key] self.execute_file_cmd(source, key) progress_bar.next() progress_bar.finish() logging.info('- Skipped tables {tables}'.format(tables=skipped_tables))