Source code for graphxplore.DataMapping.data_transformation

import collections
import contextlib
from typing import Union, Optional, Dict, List, Tuple
from graphxplore.MetaDataHandling import MetaData
from .meta_lattice import MetaLattice
from .data_mapping import DataMapping, VariableMapping, TableMapping
from .data_structure_transformer import (CSVDataFlattener, DataSegmentor, SourceDataType, FlattenerLatticeConfig,
                                         TableMappingType, SourceDataLine)

class PrimaryKeyGenerator:
    def __init__(self, table_mapping: TableMapping, source_meta: MetaData, mapping_type: TableMappingType):
        if mapping_type != table_mapping.type:
            raise AttributeError('Primary key generator initialized with from table mapping type')
        for table in table_mapping.source_tables:
            if table not in source_meta.get_table_names():
                raise AttributeError('Source table "' + table + '" of table mapping not specified in source metadata')
        self.table_mapping = table_mapping
        self.source_meta = source_meta

    def generate_primary_key(self, source_data: SourceDataLine) -> Optional[str]:
        raise NotImplemented('Never call parent class')

    @staticmethod
    def from_table_mapping(table_mapping: TableMapping, source_meta: MetaData) -> 'PrimaryKeyGenerator':
        if table_mapping.type == TableMappingType.OneToOne:
            return OneToOnePrimaryKeyGenerator(table_mapping, source_meta)
        elif table_mapping.type == TableMappingType.Inherited:
            return InheritedPrimaryKeyGenerator(table_mapping, source_meta)
        elif table_mapping.type == TableMappingType.Merge:
            return MergePrimaryKeyGenerator(table_mapping, source_meta)
        elif table_mapping.type == TableMappingType.Concatenate:
            return ConcatenatePrimaryKeyGenerator(table_mapping, source_meta)
        else:
            raise NotImplemented('Table mapping type not implemented for primary key generator')

class OneToOnePrimaryKeyGenerator(PrimaryKeyGenerator):
    def __init__(self, table_mapping: TableMapping, source_meta: MetaData):
        super().__init__(table_mapping, source_meta, TableMappingType.OneToOne)
        self.table_to_copy = self.table_mapping.source_tables[0]
        self.pk_to_copy = self.source_meta.get_primary_key(self.table_to_copy)

    def generate_primary_key(self, source_data: SourceDataLine) -> str:
        return str(source_data.get_singular_value(self.table_to_copy, self.pk_to_copy))

class InheritedPrimaryKeyGenerator(PrimaryKeyGenerator):
    def __init__(self, table_mapping: TableMapping, source_meta: MetaData):
        super().__init__(table_mapping, source_meta, TableMappingType.Inherited)

    def generate_primary_key(self, source_data: SourceDataLine) -> Optional[str]:
        return None

class MergePrimaryKeyGenerator(PrimaryKeyGenerator):
    def __init__(self, table_mapping: TableMapping, source_meta: MetaData):
        super().__init__(table_mapping, source_meta, TableMappingType.Merge)
        self.table_pk_map = {table : self.source_meta.get_primary_key(table) for table in table_mapping.source_tables}

    def generate_primary_key(self, source_data: SourceDataLine) -> str:
        merged_returns = collections.defaultdict(list)
        for table, pk in self.table_pk_map.items():
            pk_val = source_data.get_singular_value(table, pk)
            if pk_val is not None:
                merged_returns[str(pk_val)].append(table)

        if len(merged_returns) == 0:
            raise AttributeError('No valid primary key value to merge found in unit of source data for source tables "'
                                 + '", '.join(self.table_mapping.source_tables) + '"')

        if len(merged_returns) > 1:
            raise AttributeError('Multiple differing primary key values for merging found in unit of source data: "'
                                 + '", '"".join((pk_val + '" in table(s) ("' + '", "'.join(tables) + '")'
                                                 for pk_val, tables in merged_returns.items())))
        return list(merged_returns.keys())[0]

class ConcatenatePrimaryKeyGenerator(PrimaryKeyGenerator):
    def __init__(self, table_mapping: TableMapping, source_meta: MetaData):
        super().__init__(table_mapping, source_meta, TableMappingType.Concatenate)
        self.uid = -1

    def generate_primary_key(self, source_data: SourceDataLine) -> str:
        self.uid += 1
        return str(self.uid)


[docs] class DataTransformation: """This class conducts the ETL process of transforming the given source dataset to the specified target dataset using the given :class:`DataMapping` :param data_mapping: The variable mappings """ def __init__(self, data_mapping : DataMapping): """Constructor method """ self.data_mapping = data_mapping self.inheriting_tables = {table : table_mapping.to_inherit for table, table_mapping in self.data_mapping.table_mappings.items() if table_mapping.type == TableMappingType.Inherited} self.assigned_tables = {table for table in self.data_mapping.target.get_table_names() if table not in self.inheriting_tables}
[docs] def transform_to_target(self, source_type : SourceDataType, source_specifier : Union[str, Dict[str, List[Dict[str, str]]]], data_target : Union[str, Dict[str, List[Dict[str, str]]]], global_unique_target_keys : bool = False, source_file_encoding : Optional[str] = None) -> None: """Reads the source data from a directory with CSV files or from a Neo4J database. Transforms the data and writes it to a target directory as CSV files. :param source_type: The type of source data :param source_specifier: Either a source directory path, the name of the Neo4J database or a dictionary containing the source data set :param data_target: The path to a directory where CSV files are written or a data dictionary where data is inserted :param global_unique_target_keys: If ``True``, the generated IDs are unique across all automatically generated primary keys, defaults to ``False`` :param source_file_encoding: Specifies the file encoding of all source tables, if read from a CSV. Will be detected if not specified, defaults to ``None`` """ with DataSegmentor(self.data_mapping.target, self.data_mapping.target_lattice, self.inheriting_tables, data_target, global_unique_target_keys) as segmentor: for target_table in self.assigned_tables: print('Start generating data for target table "'+ target_table + '"') table_mapping = self.data_mapping.table_mappings[target_table] sub_target_lattice, flattener_config, lattice_mappings\ = self.__get_transformation_data_for_target_table(target_table) key_generator = PrimaryKeyGenerator.from_table_mapping(table_mapping, self.data_mapping.source) with contextlib.ExitStack() as inner_stack: if source_type == SourceDataType.CSV: flattener = inner_stack.enter_context(CSVDataFlattener(self.data_mapping.source, source_specifier, table_mapping.type, flattener_config, source_file_encoding)) elif source_type == SourceDataType.Database: raise NotImplemented('Not implemented') else: raise NotImplemented('Not implemented') counter = 0 pk = self.data_mapping.target.get_primary_key(target_table) for source_line in flattener: counter += 1 if counter % 100000 == 0: print('Transformed ' + str(counter) + ' source lines to target format') # skip whole source data unit, if condition of table mapping is not met if not table_mapping.condition.valid(source_line): continue target_line = collections.defaultdict(dict) target_line[target_table][pk] = key_generator.generate_primary_key(source_line) for mapping in lattice_mappings: target_line[mapping.target_table][mapping.target_variable] = mapping[source_line] segmentor.write_row(sub_target_lattice, target_line)
def __get_transformation_data_for_target_table(self, target_table : str)\ -> Tuple[MetaLattice, FlattenerLatticeConfig, List[VariableMapping]]: """Generates the :class:`MetaLattice` objects for the relevant source and target tables (and their primary/foreign key relations) to map all data for the specified ``target_table``. All necessary :class:`VariableMapping` objects and required source tables/variables are identified as well and split into data required for singular sources and data aggregation (if aggregation required). :param target_table: The name of the target table :return: Returns a quadruple of all generated data """ if target_table not in self.assigned_tables: raise AttributeError('Target table "' + target_table + '" cannot be the starting point for a transformation, because its relation to the ' 'source dataset is inherited') # get sub lattice with other inheriting tables sub_target_lattice = self.data_mapping.target_lattice.get_sub_lattice_from_inheritance( target_table, self.inheriting_tables) required_singular_source_vars = collections.defaultdict(set) required_aggregated_source_vars = collections.defaultdict(lambda : collections.defaultdict(set)) lattice_mappings = [] for table in sub_target_lattice.children.keys(): for variable in self.data_mapping.target.get_variable_names(table): if self.data_mapping.variable_should_get_mapped(table, variable): var_mapping = self.data_mapping.get_variable_mapping(table, variable) lattice_mappings.append(var_mapping) for source_table, source_vars in var_mapping.sources.items(): for source_var, aggregator_info in source_vars: if aggregator_info is None: required_singular_source_vars[source_table].add(source_var) else: required_aggregated_source_vars[source_table][source_var].add(aggregator_info) table_mapping = self.data_mapping.table_mappings[target_table] for source_table, source_vars in table_mapping.condition.get_required_data().items(): for source_var, aggregator_info in source_vars: if aggregator_info is None: required_singular_source_vars[source_table].add(source_var) else: required_aggregated_source_vars[source_table][source_var].add(aggregator_info) sub_source_upward_lattice = self.data_mapping.source_lattice.get_sub_lattice_whitelist( table_mapping.source_tables, required_singular_source_vars.keys()) # caveat: Each source table is checked for multi reference independently. When source tables are merged, this # can still lead to multi reference by different source tables. The user has to check that the same primary key # values from distinct tables always reference the same foreign key values for source_table in table_mapping.source_tables: if sub_source_upward_lattice.has_multi_reference_relative(source_table): raise AttributeError('Source table "' + source_table + '" has multi referenced descendent table. This prevents data flattening') lattice_config = FlattenerLatticeConfig(sub_source_upward_lattice, required_singular_source_vars) if len(required_aggregated_source_vars) > 0: sub_source_downward_lattice = self.data_mapping.source_lattice.get_ancestor_lattice( table_mapping.source_tables, required_aggregated_source_vars.keys()) for source_table in table_mapping.source_tables: if sub_source_downward_lattice.has_multi_reference_relative(source_table, upward=False): raise AttributeError('Source table "' + source_table + '" has multi referenced ancestor table. This prevents data aggregation') lattice_config.aggregation_lattice = sub_source_downward_lattice lattice_config.required_aggregation_vars = required_aggregated_source_vars return sub_target_lattice, lattice_config, lattice_mappings