Source code for dr_data.biopsy

import psycopg2
import logging
from progress.bar import Bar
from dr_data.static_strings import *
from dr_data.sql import Sql

__author__ = AUTHOR
__copyright__ = COPYRIGHT
__license__ = LICENSE

_logger = logging.getLogger(__name__)


[docs]class Biopsy: def __init__(self, configuration): self.configuration = configuration conn_info = self.configuration['db'] self.connection = psycopg2.connect(**conn_info) self.database = conn_info['database'] self.cursor = self.connection.cursor() self.schema = dict() self.insertion_order = dict() self.table_count = 0 self.insert_order = 1
[docs] def execute_cmd(self): return ( self.build_schema(), self.build_insertion_order_schema() )
[docs] def build_schema(self, table_schema_name='public'): tables = self.build_tables(table_schema_name) no_foreign_keys = [] has_foreign_keys = [] progress_bar = Bar('- Generating schema for {database}...'.format(database=self.database), max=len(tables)) for table in tables: columns = self.build_columns(table) self.schema[table] = columns if columns['@table_metadata']['has_foreign_keys']: has_foreign_keys.append(table) else: no_foreign_keys.append(table) progress_bar.next() progress_bar.finish() self.schema["@database_metadata"] = { "no_foreign_key_tables": no_foreign_keys, "foreign_key_tables": has_foreign_keys } return self.schema
[docs] def build_insertion_order_schema(self): if len(self.schema) == 0: self.build_schema() insertion_table_order = self.get_insertion_table_order() progress_bar = Bar('- Generating insertion order for {database}....'.format(database=self.database), max=len(insertion_table_order)) for table in insertion_table_order: table = table.replace("\"", "") self.insertion_order[self.insert_order] = { table: self.schema[table] } self.insert_order += 1 progress_bar.next() progress_bar.finish() return self.insertion_order
[docs] def get_insertion_table_order(self): query = Sql.build_insertion_table_order() self.cursor.execute(query) insertion_data = self.cursor.fetchall() output = [] for insertion_name in insertion_data: output.append(insertion_name[0]) return output
[docs] def build_tables(self, table_schema_name='public'): query = Sql.build_tables_query().format(name=table_schema_name) self.cursor.execute(query) table_data = self.cursor.fetchall() data = [] for table in table_data: data.append(table[1]) self.table_count += 1 return data
[docs] def build_columns(self, table_name, table_schema_name='public'): query = Sql.build_columns_query().format(schema_name=table_schema_name, table_name=table_name) self.cursor.execute(query) column_data = self.cursor.fetchall() column_data_list = [] foreign_key_tables = [] has_foreign_keys = False has_user_defined_keys = False for column in column_data: constraint = self.get_column_constraint(table_name, column[2]) insert_object = { "name": column[2], "data_type": column[5], "column_default": column[3], "is_nullable": column[4], } if column[5] == 'USER-DEFINED': has_user_defined_keys = True insert_object["user_defined_type"] = { "name": column[6], "values": self.get_values_from_type(column[6]) } if constraint: insert_object["constraint"] = constraint for key, value in constraint.items(): if value['type'] == 'FOREIGN KEY': foreign_key_tables.append(value['referenced_table']) has_foreign_keys = True column_data_list.append(insert_object) output = { "columns": column_data_list, "@table_metadata": { "column_count": len(column_data), "has_foreign_keys": has_foreign_keys, "has_user_defined_keys": has_user_defined_keys, "foreign_constraint_tables": foreign_key_tables } } return output
[docs] def get_column_constraint(self, table_name, column_name, table_schema_name='public'): query = Sql.build_column_constraints().format(schema_name=table_schema_name, table_name=table_name, column_name=column_name) self.cursor.execute(query) constraint_data = self.cursor.fetchall() data = dict() for constraint in constraint_data: data[constraint[4]] = { "type": constraint[5], "referenced_table": constraint[8], "referenced_column": constraint[9] } return data
[docs] def get_values_from_type(self, type): query = Sql.build_values_from_type().format(type=type) self.cursor.execute(query) types = [] type_data = self.cursor.fetchall() for tdata in type_data: types.append(tdata[1]) return types