import collections
import copy
from dataclasses import asdict
from typing import Optional, Dict, Union, List
from graphxplore.Basis import RelationalDataIODevice
from graphxplore.MetaDataHandling import MetaData, VariableInfo, DataType, VariableType
from .data_mapping import DataMapping, TableMapping
from .variable_mapping import VariableMapping, MappingCase
from .data_transformation import DataTransformation
from .Conclusions import CopyConclusion
from .Conditionals import AlwaysTrueOperator, InListOperator, NegatedOperator
from .data_structure_transformer import SourceDataType, TableMappingType
[docs]
class DataMappingUtils:
"""This class contains static utility methods for data cleaning or adding primary keys.
"""
[docs]
@staticmethod
def copy_variable(source_meta : MetaData, source_table : str, source_var : str, target_meta : MetaData,
target_table : str, target_var : str, delete_artifacts : bool = False) -> VariableMapping:
"""Generates a variable mapping for copying a source variable to a target variable,
optionally deleting artifacts. The target variable is added to ``target_meta`` if it
doesn't exist yet.
:param source_meta: The source metadata
:param source_table: The source table
:param source_var: The name of the source variable
:param target_meta: The target metadata, ``target_table`` must already exist as a table
:param target_table: The target table
:param target_var: The name of the target variable
:param delete_artifacts: If ``True`` artifacts are removed while copying
:return: Returns the variable mapping
"""
if target_table not in target_meta.get_table_names():
raise AttributeError('Table "' + target_table + '" not in target meta data')
if target_var not in target_meta.get_variable_names(target_table):
source_var_info = source_meta.get_variable(source_table, source_var)
var_dict = asdict(source_var_info)
var_dict['table'] = target_table
target_meta.data[target_table]['variables'][target_var] = VariableInfo.from_dict(target_var, target_table,
var_dict)
target_var_info = target_meta.get_variable(target_table, target_var)
if target_var_info.variable_type == VariableType.PrimaryKey:
target_meta.assign_primary_key(target_table, target_var)
else:
target_var_info = target_meta.get_variable(target_table, target_var)
if not delete_artifacts and target_var_info.artifacts is not None:
data_type = DataType.String
else:
data_type = target_var_info.data_type
conclusion = CopyConclusion(data_type, source_table, source_var)
to_delete = []
if delete_artifacts and target_var_info.artifacts is not None:
to_delete += target_var_info.artifacts
target_var_info.artifacts = None
if len(to_delete) == 0:
conditional = AlwaysTrueOperator()
else:
conditional = NegatedOperator(InListOperator(source_table, source_var, data_type,
to_delete))
return VariableMapping(target_table, target_var, [MappingCase(conditional, conclusion)])
[docs]
@staticmethod
def get_copy_mapping(source_meta : MetaData, target_meta : MetaData, delete_artifacts : bool = False) -> DataMapping:
"""Generates mappings for copying all data from a source dataset, optionally deleting artifacts.
``target_meta`` is filled with all variables from the source dataset, but tables have to exist already.
Foreign key relations are inferred from ``source_meta`` if they do not exist already.
:param source_meta: The source metadata
:param target_meta: The target metadata, tables must exist and be identical with the source metadata
:param delete_artifacts: If ``True``, artifacts are removed while copying
:return: Returns the data mapping
"""
var_mappings = collections.defaultdict(dict)
for source_table in source_meta.get_table_names():
if not source_meta.has_primary_key(source_table):
raise AttributeError('Before copying, the source table "' + source_table
+ '" needs an assigned primary key')
pk = source_meta.get_primary_key(source_table)
if not target_meta.has_primary_key(source_table):
if pk not in target_meta.get_variable_names(source_table):
source_var_info = source_meta.get_variable(source_table, pk)
var_dict = asdict(source_var_info)
target_meta.data[source_table]['variables'][pk] = VariableInfo.from_dict(
pk, source_table, var_dict)
target_meta.assign_primary_key(source_table, pk)
for source_var in source_meta.get_variable_names(source_table):
if source_var == pk:
continue
var_mapping = DataMappingUtils.copy_variable(source_meta, source_table, source_var, target_meta,
source_table, source_var, delete_artifacts)
var_mappings[source_table][source_var] = var_mapping
table_mappings = {table : TableMapping(TableMappingType.OneToOne, [table])
for table in source_meta.get_table_names()}
for table in target_meta.get_table_names():
target_fks = target_meta.get_foreign_keys(table)
if len(target_fks) == 0:
target_meta.data[table]["foreign_keys"] = source_meta.get_foreign_keys(table)
return DataMapping(source_meta, target_meta, table_mappings, var_mappings)
[docs]
@staticmethod
def copy_dataset(source_meta : MetaData, data_source : Union[str, Dict[str, List[Dict[str, str]]]],
data_target : Union[str, Dict[str, List[Dict[str, str]]]], delete_artifacts : bool = False,
source_file_encoding : Optional[str] = None) -> None:
"""Copies a whole dataset while optionally deleting artifacts.
:param source_meta: The source metadata
:param data_source: The path to a directory where the CSV files are read from or a data dictionary where data is
retrieved
:param data_target: The path to a directory where the resulting CSV files are written to or a data dictionary
where data is inserted
:param delete_artifacts: If ``True`` artifacts are removed while copying
:param source_file_encoding: Specifies the file encoding of all source tables, if read from a CSV. Will be
detected if not specified, defaults to ``None``
"""
target_meta = MetaData(source_meta.get_table_names())
mappings = DataMappingUtils.get_copy_mapping(source_meta, target_meta, delete_artifacts)
data_transformation = DataTransformation(mappings)
data_transformation.transform_to_target(SourceDataType.CSV, data_source, data_target,
source_file_encoding=source_file_encoding)
[docs]
@staticmethod
def add_primary_key(data_source : Union[str, Dict[str, List[Dict[str, str]]]], source_table : str,
data_target : Union[str, Dict[str, List[Dict[str, str]]]], target_table : str,
primary_key : str, start_idx : int = 0,
file_encoding : Optional[str] = None) -> int:
"""Adds an integer primary key to each row of the source table and stores the result in a data target.
:param data_source: The path to a directory where the CSV file is read from or a data dictionary where data is
retrieved
:param source_table: The name of the source table
:param data_target: The path to a directory where the resulting CSV file is written to or a data dictionary
where data is inserted
:param target_table: The name of the resulting target table
:param primary_key: The name of the primary key
:param start_idx: The start index for the primary key, defaults to 0
:param file_encoding: The file encoding of the CSV file (ascii, utf-8,...) in chardet definition.
Is guessed if not specified, defaults to None
:return: Returns the largest assigned primary key value
"""
available_tables = RelationalDataIODevice.get_available_table_names(data_source)
if source_table not in available_tables:
raise AttributeError('Source table "' + source_table + '" does not exist in data source')
RelationalDataIODevice.check_data_location(data_target, write=True)
with RelationalDataIODevice(data_source, source_table, file_encoding=file_encoding) as reader:
header = reader.get_header()
if primary_key in header:
raise AttributeError('Specified attribute name "' + primary_key
+ '" for primary key is already contained in source table')
header_with_pk = [primary_key] + list(header)
with RelationalDataIODevice(data_target, target_table, write=True, header=header_with_pk) as writer:
idx = start_idx
for input_line in reader:
output_line = copy.deepcopy(input_line)
output_line[primary_key] = idx
writer.writerow(output_line)
idx += 1
return idx
[docs]
@staticmethod
def pivot_table(source_table: List[Dict[str, str]], index_column: str, value_column: str,
to_index: Optional[Dict[str, str]] = None,
columns_to_keep: Optional[List[str]] = None) -> List[Dict[str, str]]:
header = source_table[0].keys()
if index_column not in header:
raise AttributeError('Index column "' + index_column + '" not found in source table')
if value_column not in header:
raise AttributeError('Value column "' + value_column + '" not found in source table')
if index_column == value_column:
raise AttributeError('Index column and value column cannot both be "' + index_column + '"')
if columns_to_keep is not None:
if index_column in columns_to_keep:
raise AttributeError('Index column "' + index_column
+ '" in "columns_to_keep", but it will be used for pivotization')
if value_column in columns_to_keep:
raise AttributeError('Value column "' + value_column
+ '" in "columns_to_keep", but it will be used to fill pivot columns in result'
' table')
for column in columns_to_keep:
if column not in header:
raise AttributeError('Column "' + column + '" marked for keeping, but not found in source table')
target_header = copy.deepcopy(columns_to_keep)
else:
target_header = [column for column in header if column not in [index_column, value_column]]
index_vals = set(row[index_column] for row in source_table)
if to_index is not None:
for index_val, target_column in to_index.items():
if index_val not in index_vals:
raise AttributeError('Value to index "' + index_val + '" not found in index column "'
+ index_column + '"')
if target_column in target_header:
raise AttributeError('Index target column name "' + target_column
+ '" already existing as column name')
target_header += list(to_index.values())
else:
target_header += list(index_vals)
result = []
for source_row in source_table:
index_row_val = source_row[index_column]
if to_index is not None:
if index_row_val not in to_index:
continue
target_row = {column: '' if column not in source_row else source_row[column] for column in target_header}
if to_index is not None:
target_row[to_index[index_row_val]] = source_row[value_column]
else:
target_row[index_row_val] = source_row[value_column]
result.append(target_row)
return result