Source code for graphxplore.MetaDataHandling.meta_data_generator
import collections
import itertools
from typing import Union, Optional, List, Dict, Iterable
from graphxplore.Basis import RelationalDataIODevice
from .variable_info import BinningInfo, VariableType, DataType, ArtifactMode
from .meta_data import MetaData
[docs]
class MetaDataGenerator:
"""This class extracts metadata information from CSV files. It detects primary keys and foreign key relations
between tables. Additionally, :class:`VariableInfo` objects are inferred for all columns of all CSV files. The
result is a :class:`MetaData` object.
:param csv_data: The input data as CSV files either as directory path containing the CSV files or as
dictionary of table name and table data as list of dictionaries per row
:param artifact_mode: Determines if artifacts should be detected and at what level. For further information check
:class:`ArtifactMode`
:param missing_vals: These characters indicate missing values, defaults to empty string, None and variations of
"NaN" and "Na"
:param nof_read_lines: Maximum number of lines read from each CSV file to gather metadata, defaults to 1 million
:param str_len_free_text: Strings with at least this number of characters are considered free text and the
containing variable is unfavored as primary key. Defaults to 300.
:param binning_threshold: Metric variables with more distinct values are marked for binning, defaults to 20
:param categorical_threshold: Variables with at most this number of distinct values are considered categorical,
defaults to 20
:param file_encoding: The file encoding of the CSV files (ascii, utf-8,...) in chardet definition.
Is guessed if not specified. Only used when CSV data is read from a directory, defaults to None
"""
def __init__(self, csv_data: Union[str, Dict[str, List[Dict[str, str]]]],
artifact_mode : ArtifactMode = ArtifactMode.DataTypeMismatchAndOutliers,
missing_vals : Iterable[Union[str, None]] = ('', 'NaN', 'Na', 'NA', 'NAN', 'nan', 'na'),
nof_read_lines : int = 1000000, str_len_free_text : int = 300, binning_threshold : int = 20,
categorical_threshold : int = 20, file_encoding : Optional[str] = None):
"""Constructor method
"""
tables = RelationalDataIODevice.get_available_table_names(csv_data)
self.result = MetaData(tables)
self.csv_data = csv_data
self.artifact_mode = artifact_mode
self.missing_vals = missing_vals
self.file_encoding = file_encoding
self.nof_read_lines = nof_read_lines
self.str_len_free_text = str_len_free_text
self.binning_threshold = binning_threshold
self.categorical_threshold = categorical_threshold
[docs]
def gather_meta_data(self) -> MetaData:
"""Extracts variables and primary/foreign key relations between CSV files. Each CSV MUST contain a column with
unique entries and no empty cells. Among these, a primary key is selected prioritizing integer columns.
Additionally, :class:`VariableInfo` objects are inferred for all columns of all CSV files. Artifacts are
detected, if specified by ``artifact_mode``. For more information checkout :class:`ArtifactMode`
:return: Returns the gathered metadata
"""
self.extract_variable_infos()
print('Assigning foreign keys')
self.assign_foreign_keys()
return self.result
[docs]
def extract_variable_infos(self) -> None:
"""Extracts all information about variables contained in CSVs of the source directory and detects primary keys.
Artifacts are detected, if specified by ``artifact_mode``. For more information checkout :class:`ArtifactMode`
"""
tables_without_primary = []
for table in self.result.get_table_names():
print('Extracting variable information from table ' + table)
self.result.assign_label(table, label=table)
data = self.__extract_data(table)
primary_key = self.__get_primary_key(data)
if primary_key == "":
tables_without_primary.append(table)
for variable in data.keys():
var_info = self.result.add_variable(table, variable)
var_info.data_type = data[variable]['data_type']
var_info.data_type_distribution = data[variable]['data_type_dist']
if variable == primary_key:
self.result.assign_primary_key(table, primary_key)
print('Assigned ' + primary_key + ' as primary key to table ' + table)
continue
val_count_dict = data[variable]['value_dist']
if (var_info.data_type in [DataType.Decimal, DataType.Integer]
and var_info.variable_type not in [VariableType.PrimaryKey, VariableType.ForeignKey]):
non_missing_vals = {val : count for val, count in val_count_dict.items()
if val not in self.missing_vals}
non_missing_unique_count = sum(non_missing_vals.values())
if len(non_missing_vals) > self.categorical_threshold:
var_info.variable_type = VariableType.Metric
if non_missing_unique_count > self.binning_threshold:
var_info.binning = BinningInfo(should_bin=True, exclude_from_binning=[])
else:
var_info.binning = BinningInfo(should_bin=False, exclude_from_binning=None)
else:
var_info.binning = BinningInfo(should_bin=False, exclude_from_binning=None)
if var_info.variable_type not in [VariableType.PrimaryKey, VariableType.ForeignKey]:
var_info.detect_artifacts_and_value_distribution(val_count_dict, artifact_mode=self.artifact_mode,
missing_vals=self.missing_vals)
data.clear()
if len(tables_without_primary) != 0:
table_str = 'table' if len(tables_without_primary) == 1 else 'tables'
print('No primary key found for ' + table_str + ' "' + '", "'.join(tables_without_primary) + '"')
def __get_primary_key(self, data: dict) -> str:
"""Processes all columns of a CSV and tries to find a primary key which contains only unique cell values and no
empty values. The hint dict is used (if specified) and integer columns are preferred over string or float.
:param data: The variable data that was extracted before
:return: Returns the name of the found primary key column or an empty string if none was found
"""
integer_candidates = []
other_candidates = []
backup_candidates = []
for var_name, var_dict in data.items():
# cell values must be unique and should not contain missing values
if not var_dict['values_are_unique'] or var_dict['contains_missing_vals']:
continue
if var_dict['contains_freetext']:
backup_candidates.append(var_name)
continue
(integer_candidates if var_dict['data_type'] == DataType.Integer else other_candidates).append(var_name)
all_candidates = integer_candidates + other_candidates + backup_candidates
if len(all_candidates) == 0:
return ""
return all_candidates[0]
[docs]
def assign_foreign_keys(self) -> None:
"""Assigns foreign keys by detecting occurrences of primary keys in other tables.
"""
for table, foreign_table in itertools.permutations(self.result.get_table_names(), 2):
for variable in self.result.get_variable_names(table):
if variable == self.result.get_primary_key(table):
continue
if self.result.get_primary_key(foreign_table) != variable:
continue
if variable in self.result.get_foreign_keys(table):
print('Variable "' + variable + '" was assigned as foreign key in table "' + table
+ '" with foreign table "' + self.result.get_foreign_keys(table)[variable]
+ '", but could also be assigned with foreign table "' + foreign_table + '"')
continue
self.result.add_foreign_key(table, foreign_table, variable)
var_info = self.result.get_variable(table, variable)
var_info.value_distribution = None
var_info.binning = BinningInfo(should_bin=False, exclude_from_binning=None)
def __extract_data(self, table: str) -> dict:
"""Extract data for one CSV table. Either by loading a maximum of ``self.nof_read_lines``
(1 million by default) lines from a CSV file, or retrieving the pre-read data from a dictionary. Data types are
inferred and check for empty as well as freetext cells is done.
:param table: Name of the CSV table
:return: Returns a dictionary of the columns with the generated data
"""
with RelationalDataIODevice(self.csv_data, table, file_encoding=self.file_encoding) as reader:
data = {}
for column in reader.get_header():
data[column] = {
'column_name' : column,
'data_type' : DataType.String,
'contains_freetext' : False,
'contains_missing_vals' : False,
'value_dist' : collections.defaultdict(int),
'nof_non_missing' : 0,
'data_type_dist' : {DataType.String : 0, DataType.Integer : 0, DataType.Decimal : 0},
'values_are_unique' : True}
counter = 0
for line in reader:
for column, val in line.items():
data[column]['value_dist'][val] += 1
if val is None or val in self.missing_vals:
data[column]['contains_missing_vals'] = True
else:
data[column]['nof_non_missing'] += 1
datatype = self.__infer_cell_datatype(val)
data[column]['data_type_dist'][datatype] += 1
# cell value not seen before
if data[column]['value_dist'][val] == 1:
if not data[column]['contains_freetext'] and datatype == DataType.String:
if len(val) > self.str_len_free_text or "\n" in val:
data[column]['contains_freetext'] = True
else:
data[column]['values_are_unique'] = False
counter += 1
if counter == self.nof_read_lines:
break
self.__infer_table_datatypes(data)
return data
@staticmethod
def __infer_cell_datatype(val: str) -> DataType:
"""Infers the data type of the current cell with the following hierarchy of specificity/preference:
'Integer', 'Float', 'String'.
:param val: The cell value
:return: Returns the inferred cell data type
"""
# data type int or not set yet
# try to cast to int
try:
int(val)
return DataType.Integer
except (ValueError, TypeError):
# try to cast to float
try:
float(val)
return DataType.Decimal
except (ValueError, TypeError):
return DataType.String
@staticmethod
def __infer_table_datatypes(table_data: dict) -> None:
"""Infers the data type for each column by checking the distribution of data types of unique cell values.
:param table_data: The data dictionary for the table
"""
for column, column_data in table_data.items():
nof_values = column_data['nof_non_missing']
if nof_values > 0:
for data_type, count in column_data['data_type_dist'].items():
column_data['data_type_dist'][data_type] = round(count/nof_values, 4)
column_data['data_type'] = MetaDataGenerator.__infer_column_datatype(column_data['data_type_dist'])
@staticmethod
def __infer_column_datatype(type_dict: dict) -> DataType:
"""Checks the distribution of data types for a column. If more than 5% are strings, the whole column is assigned
'String'. Same concept with lower priority for decimal numbers. Otherwise, the column is assigned 'Integer',
which is the most specific data type.
:param type_dict: The data type distribution
:return: Returns the column data type
"""
string_share = type_dict[DataType.String]
if string_share > 0.05:
return DataType.String
float_share = type_dict[DataType.Decimal]
if float_share > 0.05:
return DataType.Decimal
return DataType.Integer