Source code for dr_data.transplant
import sys
import logging
import psycopg2
from progress.bar import Bar
from dr_data.static_strings import *
from dr_data.utilities.file import FileUtility
__author__ = AUTHOR
__copyright__ = COPYRIGHT
__license__ = LICENSE
_logger = logging.getLogger(__name__)
logging.basicConfig(format='%(message)s', stream=sys.stdout, level=logging.INFO)
[docs]class Transplant:
"""
Insert one or all CSV files to table
"""
def __init__(self, configuration):
"""
Constructor of Transplant
:param configuration: configuration file
:type configuration: JSON
"""
self.configuration = configuration
conn_info = self.configuration['db']
self.connection = psycopg2.connect(**conn_info)
self.database = conn_info['database']
self.cursor = self.connection.cursor()
[docs] def execute_file_cmd(self, source, destination):
"""
Executes the transplant command, main entry point for file processing
:param source: source file to import
:type source: str or list
:param destination: destination table
:type destination: str
:return: None
:rtype: None
"""
with open(source, 'r') as csv_file:
copy_sql = """COPY "{table_name}" FROM stdin WITH CSV HEADER DELIMITER as ','""".format(table_name=destination)
try:
self.cursor.copy_expert(sql=copy_sql, file=csv_file)
self.connection.commit()
except Exception as error:
logging.info("- FAILED query execution for:: \"{query}\" ".format(query=self.cursor.query.decode("utf-8")))
logging.info("\n")
logging.info("ERROR: {error}".format(error=error))
self.connection.rollback()
self.cursor.close()
sys.exit()
[docs] def execute_directory_cmd(self, source, schema_data):
"""
Executes the transplant command, main entry point for directory processing
:param source: source file to import
:type source: str or list
:param destination: destination table
:type destination: str
:return: None
:rtype: None
"""
files = FileUtility.get_directory_files(source)
logging.info('- Total of {count} file'.format(count=len(files)))
insertion_order_schema = schema_data[1]
skipped_tables = []
progress_bar = Bar('- Importing CSVs to {database}...'.format(database=self.database), max=len(insertion_order_schema.items()))
for key, value in insertion_order_schema.items():
key = list(value.keys())[0]
if key not in files:
skipped_tables.append(key)
continue
source = files[key]
self.execute_file_cmd(source, key)
progress_bar.next()
progress_bar.finish()
logging.info('- Skipped tables {tables}'.format(tables=skipped_tables))