Source code for dr_data.transplant
import sys
import logging
import psycopg2
from progress.bar import Bar
from dr_data.static_strings import *
from dr_data.utilities.file import FileUtility
__author__ = AUTHOR
__copyright__ = COPYRIGHT
__license__ = LICENSE
_logger = logging.getLogger(__name__)
logging.basicConfig(format='%(message)s', stream=sys.stdout, level=logging.INFO)
[docs]class Transplant:
def __init__(self, configuration):
self.configuration = configuration
conn_info = self.configuration['db']
self.connection = psycopg2.connect(**conn_info)
self.database = conn_info['database']
self.cursor = self.connection.cursor()
[docs] def execute_file_cmd(self, source, destination):
with open(source, 'r') as csv_file:
copy_sql = """COPY "{table_name}" FROM stdin WITH CSV HEADER DELIMITER as ','""".format(table_name=destination)
try:
self.cursor.copy_expert(sql=copy_sql, file=csv_file)
self.connection.commit()
except Exception as error:
logging.info("- FAILED query execution for:: \"{query}\" ".format(query=self.cursor.query.decode("utf-8")))
logging.info("\n")
logging.info("ERROR: {error}".format(error=error))
self.connection.rollback()
self.cursor.close()
sys.exit()
[docs] def execute_directory_cmd(self, source, schema_data):
files = FileUtility.get_directory_files(source)
insertion_order_schema = schema_data[1]
skipped_tables = []
progress_bar = Bar('- Importing CSVs to {database}...'.format(database=self.database), max=len(insertion_order_schema.items()))
for key, value in insertion_order_schema.items():
key = list(value.keys())[0]
if key not in files:
skipped_tables.append(key)
continue
source = files[key]
self.execute_file_cmd(source, key)
progress_bar.next()
progress_bar.finish()
logging.info('- Skipped tables {tables}'.format(tables=skipped_tables))