import csv
import re
import pathlib
import chardet
import os
import math
from typing import Dict, Any, Union, Optional, Tuple, Sequence, List
[docs]
class BaseUtils:
"""This class contains utility functions.
"""
[docs]
@staticmethod
def detect_file_encoding(file_path : str) -> str:
"""Reads the first 100k bytes from a file and guesses its encoding e.g., ASCII, UTF-8,...
Can afterwards be used with `open(file_path, 'r', encoding=encoding)`. Uses the library chardet.
:param file_path: The path to the file
:return: Returns the guessed encoding
"""
if not os.path.exists(file_path) or not os.path.isfile(file_path):
raise AttributeError('Filepath "' + file_path + '" does not exist or is not a file')
with open(file_path, 'rb') as file:
raw_data = file.read(100000)
result = chardet.detect(raw_data)
encoding = result['encoding']
# ascii (without special characters) is subset of utf-8
if encoding == 'ascii':
encoding = 'utf-8'
return encoding
[docs]
@staticmethod
def load_csv_data(file_or_dir_path: str, delimiter: Optional[str] = None,
file_encoding : Optional[str] = None) -> Dict[str, List[Dict[str, str]]]:
"""Load table data from one CSV file or from all CSV files contained in a directory
:param file_or_dir_path: Path to directory and file
:param delimiter: CSV delimiter used for all files, inferred automatically if ``None`` is specified
:param file_encoding: File encoding used for all files, inferred automatically if ``None`` is specified
:return: Returns a dict with the filename without '.csv' extension as key and list of row dicts as table data
"""
if not os.path.exists(file_or_dir_path):
raise AttributeError('Filepath "' + file_or_dir_path + '" does not exist or is not a file')
if os.path.isdir(file_or_dir_path):
csv_files = {table[:-4] : str(pathlib.Path(os.path.join(file_or_dir_path, table)).resolve())
for table in os.listdir(file_or_dir_path) if table.endswith('.csv')}
if len(csv_files) == 0:
raise AttributeError('No CSV files found in directory "' + file_or_dir_path + '"')
elif os.path.isfile(file_or_dir_path):
if not file_or_dir_path.endswith('.csv'):
raise AttributeError('No CSV file extension found in filepath "' + file_or_dir_path + '"')
csv_files = {pathlib.Path(file_or_dir_path).stem : str(pathlib.Path(file_or_dir_path).resolve())}
else:
raise AttributeError('Not a valid filename or directory: "' + file_or_dir_path + '"')
result = {}
for table, file_path in csv_files.items():
print('Loading data from table "' + table + '"')
file_enc = file_encoding if file_encoding is not None else BaseUtils.detect_file_encoding(
file_path)
with open(file_path, encoding=file_enc) as file:
if delimiter is None:
try:
dialect = csv.Sniffer().sniff(file.read(100000), delimiters=',;|\t ')
file.seek(0)
reader = csv.DictReader(file, dialect=dialect)
except csv.Error:
file.seek(0)
reader = csv.DictReader(file)
else:
reader = csv.DictReader(file, delimiter=delimiter)
result[table] = [row for row in reader]
return result
[docs]
@staticmethod
def check_csv_row(row: Dict[str, str], required_data : Dict[str, Any]):
"""Checks if all required fields are present in the CSV row and have the correct data type.
:param row: The CSV row to check
:param required_data: A dictionary of required field names and entry data types
"""
for field, data_type in required_data.items():
if field not in row:
raise AttributeError('CSV row must contain the field "' + field + '"')
try:
data_type(row[field])
except ValueError:
raise AttributeError('CSV row entry for field "' + field + '" must be of type ' + data_type.__name__)
[docs]
@staticmethod
def csv_row_string_to_list(row: Dict[str, str], row_key: str) -> Union[List[int], List[float], List[str]]:
"""Retrieves the value string for the key `row_key` from a CSV row, the value string should contain semicolons
separating the individual property values. The string value is split and each entry is cast to string,
integer or float.
:param row: The CSV row as dictionary
:param row_key: he key for the property in `row`
:return: Returns the list of cast values
"""
if row_key not in row:
raise AttributeError('CSV row must contain the field "' + row_key + '"')
split_string = row[row_key].split(';')
if ':long' in row_key:
try:
return [int(entry) for entry in split_string]
except ValueError:
raise AttributeError('CSV row must contain integers seperated by semicolons in field "'
+ row_key + '"')
elif ':double' in row_key:
try:
return [float(entry) for entry in split_string]
except ValueError:
raise AttributeError('CSV row must contain floats seperated by semicolons in field "'
+ row_key + '"')
else:
return split_string
[docs]
@staticmethod
def combine_group_info(groups : List[str], group_size: Dict[str, int], pos_group: Optional[str],
neg_group: Optional[str]) -> List[str]:
group_strings = []
for group in groups:
if group not in group_size:
raise AttributeError('Group "' + group + '" not in group size dict')
group_str = group + ' (' + str(group_size[group]) + ')'
if pos_group == group:
if neg_group == group:
raise AttributeError('Group "' + group + '" cannot be both positive and negative')
group_str += '[+]'
elif neg_group == group:
group_str += '[-]'
group_strings.append(group_str)
return group_strings
[docs]
@staticmethod
def calculate_mean(value_dist : Dict[Union[int, float], int]) -> Optional[float]:
"""Calculates the mean of a distribution dictionary with distribution values as key and counts as values of
the dictionary. If the dictionary is empty `None` is returned.
:param value_dist: The distribution
:return: Returns the mean or `None` for empty distributions
"""
if len(value_dist) == 0:
return None
acc_values = 0.0
total_vals = 0
for value, count in value_dist.items():
acc_values += value * count
total_vals += count
return acc_values / total_vals
[docs]
@staticmethod
def calculate_min_max(value_dist: Dict[Union[int, float], int]) -> Optional[Tuple[float, float]]:
"""Calculates the minimal and maximal value of a distribution dictionary with distribution values as key and
counts as values of the dictionary. If the dictionary is empty `None` is returned.
:param value_dist: The distribution
:return: Returns the minimum and maximum or `None` for empty distributions
"""
if len(value_dist) == 0:
return None
min_val = None
max_val = None
for value, count in value_dist.items():
if min_val is None or min_val > value:
min_val = value
if max_val is None or max_val < value:
max_val = value
return min_val, max_val
[docs]
@staticmethod
def calculate_std(value_dist: Dict[Union[int, float], int], mean : Optional[float] = None) -> Optional[float]:
"""Calculates the standard deviation of a distribution dictionary with distribution values as key and counts
as values of the dictionary. If the dictionary is empty `None` is returned. A precalculated mean can be
specified to speed up the calculation.
:param value_dist: The distribution
:param mean: The precalculated mean, defaults to None.
:return: Returns the standard deviation or `None` for empty distributions
"""
if len(value_dist) == 0:
return None
calc_mean = mean if mean is not None else BaseUtils.calculate_mean(value_dist)
acc_var = 0.0
total_vals = 0
for value, count in value_dist.items():
acc_var += count * (calc_mean - value) ** 2
total_vals += count
return math.sqrt(acc_var / total_vals)
[docs]
@staticmethod
def calculate_quartile_quintile_sorted_dist(sorted_dist : Sequence[Tuple[Union[int, float], int]],
use_quartile : bool, quantile_id : int) -> Optional[float]:
"""Calculates quartiles or quintiles from a sorted distribution. If the distribution is empty, ```None`` is
returned.
:param sorted_dist: The distribution with pairs of values and counts sorted in ascending value order
:param use_quartile: If ``True`` quartiles are calculate, else quintiles
:param quantile_id: The identifier for the quartile. Must be 1, 2, or 3 for quartiles, or 1, 2, 3, 4 for quintiles
:return: Returns the quartile, or `None` for empty distributions
"""
if use_quartile and quantile_id not in [1, 2, 3]:
raise AttributeError('Quartile ID must be 1, 2 or 3')
if not use_quartile and quantile_id not in [1, 2, 3, 4]:
raise AttributeError('Quintile ID must be 1, 2, 3 or 4')
if len(sorted_dist) == 0:
return None
idx = 0
count_sum = sum((entry[1] for entry in sorted_dist))
accumulated_count = sorted_dist[idx][1]
if use_quartile:
multiplier = 4 if quantile_id == 1 else 2 if quantile_id == 2 else 4/3
divisor = 2 if quantile_id == 2 else 4
else:
multiplier = 5 if quantile_id == 1 else 2.5 if quantile_id == 2 else 5/3 if quantile_id == 3 else 1.25
divisor = 5
while multiplier * accumulated_count < count_sum:
idx += 1
accumulated_count += sorted_dist[idx][1]
# divisible
if count_sum % divisor == 0:
# at the border of two values
if multiplier * accumulated_count == count_sum:
return (sorted_dist[idx][0] + sorted_dist[idx + 1][0]) / 2
else:
return sorted_dist[idx][0]
# not divisible
else:
return sorted_dist[idx][0]
[docs]
@staticmethod
def count_lines_in_file(file_path : str) -> int:
"""Count lines in a text file
:param file_path: The path to the text file
:return: Returns the number of lines
"""
if not os.path.exists(file_path) or not os.path.isfile(file_path):
raise AttributeError('Filepath "' + file_path + '" does not exist or is not a file')
with open(file_path, 'rb') as file:
line_counter = 0
raw_data = file.read(10000000)
while raw_data:
line_counter += raw_data.count(b'\n')
raw_data = file.read(10000000)
return line_counter
[docs]
@staticmethod
def file_has_more_lines(file_path: str, threshold : int) -> bool:
"""Checks if a text file has more than ``threshold`` lines
:param file_path: The path to the text file
:param threshold: The threshold to be checked
:return: Returns ``True`` if the file contains more lines
"""
if not os.path.exists(file_path) or not os.path.isfile(file_path):
raise AttributeError('Filepath "' + file_path + '" does not exist or is not a file')
with open(file_path, 'rb') as file:
line_counter = 0
raw_data = file.read(100000)
while raw_data and line_counter < threshold:
line_counter += raw_data.count(b'\n')
raw_data = file.read(100000)
return line_counter > threshold