Source code for dr_data.utilities.file
import logging
import json
import string
import csv
import os
import sys
import time
from dr_data.static_strings import *
__author__ = AUTHOR
__copyright__ = COPYRIGHT
__license__ = LICENSE
_logger = logging.getLogger(__name__)
logging.basicConfig(format='%(message)s', stream=sys.stdout, level=logging.INFO)
[docs]class FileUtility:
"""
File utility class
"""
[docs] @staticmethod
def read_file(file_path):
"""
Reads a file's contents
:param file_path: Path of the file
:type file_path: str
:return: Content of file
:rtype: str
"""
file = open(file_path)
contents = file.read()
file.close()
return contents
[docs] @staticmethod
def append_to_file(json_data, filename):
"""
This append JSON data to a file
:param json_data: JSON data to append
:type json_data: JSON
:param filename: Name of the file
:type filename: str
:return: None
:rtype: None
"""
with open(filename, 'w') as json_file:
json.dump(json_data, json_file, indent=4, separators=(',', ': '))
[docs] @staticmethod
def get_filename(name):
"""
Get the name of the file with date
:param name: Name of file
:type name: str
:return: String of filename
:rtype: str
"""
file_date = time.strftime("%Y%m%d")
return "{database}_{date}".format(database=name, date=file_date)
[docs] @staticmethod
def generate_json_file(name, path, data):
"""
Generates a JSON file
:param name: name of file to create
:type name: str
:param path: Path of the file to create
:type path: str
:param data: The JSON data to insert into the file
:type data: JSON
:return: Filename
:rtype: str
"""
file_name = FileUtility.get_filename(name)
json_schema = json.dumps(data, indent=4)
with open('{path}/{file_name}.json'.format(path=path, file_name=file_name),
'w') as outfile:
outfile.write(json_schema)
return file_name
[docs] @staticmethod
def is_csv_file(selected_file):
"""
Check if a file is a CSV file
:param selected_file: Path of the file to check
:type selected_file: str
:return: Boolean
:rtype: bool
"""
try:
with open(selected_file, newline='') as csv_file:
start = csv_file.read(4096)
if not all([c in string.printable or c.isprintable() for c in start]):
return False
dialect = csv.Sniffer().sniff(start)
return True
except csv.Error:
return False
[docs] @staticmethod
def get_directory_files(directory):
"""
Get the files in a directory
:param directory: Path of the directory
:type directory: str
:return: Dictionary of files
:rtype: dict
"""
directory_files = dict()
for root, dirs, files in os.walk(os.path.abspath(directory)):
for file in files:
key = os.path.splitext(file)[0]
path = os.path.join(root, file)
directory_files[key] = path
return directory_files