graphxplore.DataMapping package

This subpackage can be used to clean your data from artifacts or conduct more complex ETL processes. The workflows of this package are independent of the graphxplore.GraphTranslation package and graphxplore.GraphDataScience package and can be used without the necessity for graph-based data representation. All ETL processes can be stored as JSON files for reusability.

The central class is DataMapping. It contains the MetaData objects of the source dataset and the target data structure. It contains mappings between the source and target tables as TableMapping objects. These can describe one-to-one, one-to-many, many-to-one, or many-to-many relationships. Mapping rules on the variable-level are stored as VariableMapping objects. They must be defined for each variable (with the exception of primary keys and some foreign keys) of the target data structure. Each VariableMapping contains one or multiple MappingCase objects which in turn each contain a LogicOperator and a Conclusion. The MappingCase objects are checked in input order. If the function valid() of the LogicOperator returns True on the given unit of source data, get_return() of the Conclusion is triggered. If valid returns False the next MappingCase is checked. If no conditional is met, None is returned. Code for a single VariableMapping could look like

>>> from graphxplore.MetaDataHandling import DataType
>>> from graphxplore.DataMapping import VariableMapping, MappingCase, SourceDataLine
>>> from graphxplore.DataMapping.Conditionals import (StringOperator, StringOperatorType, MetricOperator,
>>>                                                   MetricOperatorType, AndOperator, AlwaysTrueOperator)
>>> from graphxplore.DataMapping.Conclusions import CopyConclusion, FixedReturnConclusion
# value for variable 'var' is decimal, larger than 0 and value for variable 'another_val' is a string and contains 'nana'
# then copy value for variable 'var'
>>> first_case = MappingCase(conditional=AndOperator(sub_operators=[
>>>         MetricOperator(table='FirstSourceTable', variable='var', value=0, data_type=DataType.Decimal,
>>>                        compare=MetricOperatorType.Larger),
>>>         StringOperator(table='SecondSourceTable', variable='another_var', value='nana',
>>>                        compare=StringOperatorType.Contains)
>>>                 ]), conclusion=CopyConclusion(target_data_type=DataType.Decimal,
>>>                                               origin_table='FirstSourceTable', var_to_copy='var'))
# always return 0.0
>>> second_case = MappingCase(conditional=AlwaysTrueOperator(), conclusion=FixedReturnConclusion(DataType.Decimal,
>>>                           return_val=0.0))
>>> var_mapping = VariableMapping(target_table='TargetTable', target_variable='target_var',
>>>                               cases=[first_case, second_case])
>>> source_line = SourceDataLine({'FirstSourceTable' : {'var' : 1.5}, 'SecondSourceTable' : {'another_var' : 'banana'}})
>>> var_mapping[source_line]
# first case is met
1.5
>>> source_line = SourceDataLine({'FirstSourceTable' : {'var' : -7.8}, 'SecondSourceTable' : {'another_var' : 'banana'}})
>>> var_mapping[source_line]
# first case not met because 'var' is negative -> second case is executed
0.0
>>> source_line = SourceDataLine({'FirstSourceTable' : {'var' : 1.5}, 'SecondSourceTable' : {'another_var' : None}})
>>> var_mapping[source_line]
# first case not met because 'another_var' has a missing value -> second case is executed
0.0

You can see from this code snippet that data from different source tables can be combined. This is achieved by gathering all source data that is associated with one primary key pk in table t with value x into a single SourceDataLine using the foreign key relations within the source dataset which are captured in a MetaLattice objects. Data from other tables that appear as foreign tables in t (or related across multiple tables) can be seen as a property of pk and directly added to the SourceDataLine for x. Data from tables that reference themselves pk as a foreign key can be aggregated with AggregatorOperator or AggregateConclusion objects. Here, all data from lines where x is a foreign key value for pk (or across multiple tables) is gathered and some aggregate calculated (e.g. count, minimal value, etc.). This can be useful e.g. for aggregation of time series.

This automation strategy is enabled by the TableMapping which must be defined for each target table. Examples might look like:

>>> from graphxplore.MetaDataHandling import MetaData, DataType
>>> from graphxplore.DataMapping import TableMapping, MappingCase, TableMappingType, DataMapping
>>> from graphxplore.DataMapping.Conditionals import MetricOperator, MetricOperatorType, AlwaysTrueOperator
>>>
>>> source_meta = MetaData.load_from_json(filepath='/source_meta.json')
>>> target_meta = MetaData.load_from_json(filepath='/target_meta.json')
>>> data_mapping = DataMapping(source=source_meta, target=target_meta)
# one-to-one mapping between 'SourceTable' and 'TargetTable'
# each source data line will contain a row of 'SourceTable' and one row of each foreign table (potentially across
# foreign table chains) of the corresponding foreign key value
>>> one_to_one = TableMapping(type=TableMappingType.OneToOne, source_tables=['SourceTable'])
>>> data_mapping.assign_table_mapping(table='TargetTable', table_mapping=one_to_one)
# again a one-to-one mapping, but with an added condition. If this condition is not met for a source data line, the
# whole line will be skipped. Adding condition is possible for all table mapping types except inherited table mappings
>>> added_condition = MetricOperator(table='ForeignSourceTable', variable='var', value=0,
>>>                                  data_type=DataType.Integer, compare=MetricOperatorType.Equals)
>>> filtered_one_to_one = TableMapping(type=TableMappingType.OneToOne, source_tables=['SourceTable'],
>>>                                    condition=added_condition)
>>> data_mapping.assign_table_mapping(table='TargetTable', table_mapping=filtered_one_to_one)
# merge the the of two source tables 'FirstSourceTable' and 'SecondSourceTable' into a single
# target table 'TargetTable' (many-to-one). SourceDataLine objects from 'FirstSourceTable' and 'SecondSourceTable'
# with the same primary key value are merged. This way, data rows from multiple source tables can be combined into
# one target data row
>>> merge = TableMapping(TableMappingType.Merge, source_tables=['FirstSourceTable', 'SecondSourceTable'])
>>> data_mapping.assign_table_mapping(table='TargetTable', table_mapping=merge)
# data from two source tables 'FirstSourceTable' and 'SecondSourceTable' is processed one after another into
# SourceDataLine objects and concatenated into a single target table 'TargetTable' (many-to-one). No merging of
# source data rows is conducted
>>> concatenate = TableMapping(TableMappingType.Concatenate, source_tables=['FirstSourceTable', 'SecondSourceTable'])
>>> data_mapping.assign_table_mapping(table='TargetTable', table_mapping=concatenate)
# If 'ForeignTargetTable' is a foreign table (or foreign table of foreign table ...) of 'TargetTable', it can
# inherit the mapping type of 'TargetTable'. the rows of both tables will be created together and thus the
# result data will be split. This can be useful to make the target dataset for manageable. If the relation of
# 'TargetTable' is 'one-to-one', this will become 'one-to-many'. If  its relation is 'many-to-one', it will become
# 'many-to-many'.
>>> inherited = TableMapping(TableMappingType.Inherited, to_inherit='TargetTable')
>>> data_mapping.assign_table_mapping(table='ForeignTargetTable', table_mapping=inherited)

Data mapping can be quite complex and there exist many functionalities in this subpackage. DataMappingUtils can be used to ease some common workflows. For further impressions check out test/DataMapping/test_data_mapping.py in the graphxplore Github repository.

Submodules

Module contents

class graphxplore.DataMapping.AggregatedData[source]

Bases: object

This class stores all aggregated data (of other variables) for a fixed primary key value.

add_variable_aggregation(table: str, variable: str, data_type: DataType, agg_type: AggregatorType, value: str | int | float | Set[str] | None) None[source]

Adds a data aggregation value for a specific variable, data type and aggregation type for this specific primary key value.

Parameters:
  • table (str) – The table of the aggregated variable

  • variable (str) – The name of the aggregated variable

  • data_type (DataType) – The data type of values that were aggregated

  • agg_type (AggregatorType) – The type of aggregation

  • value (str | int | float | Set[str] | None) – The aggregated value or None if no data was aggregated

Return type:

None

exists(table: str, variable: str, data_type: DataType, agg_type: AggregatorType) bool[source]

Checks if some value or None exists for this table, variable, data type and aggregation type in the data structure.

Parameters:
  • table (str) – The table of the aggregated variable

  • variable (str) – The name of the aggregated variable

  • data_type (DataType) – The data type of values that were aggregated

  • agg_type (AggregatorType) – The type of aggregation

Returns:

Returns True if some value or None exists in the data structure for the specified parameters

Return type:

bool

get_variable_aggregation(table: str, variable: str, data_type: DataType, agg_type: AggregatorType) str | int | float | Set[str] | None[source]

Returns data aggregation value for a specific variable, data type and aggregation type.

Parameters:
  • table (str) – The table of the aggregated variable

  • variable (str) – The name of the aggregated variable

  • data_type (DataType) – The data type of values that were aggregated

  • agg_type (AggregatorType) – The type of aggregation

Returns:

Returns the aggregated value or None if no data was aggregated

Return type:

str | int | float | Set[str] | None

merge(other: AggregatedData) AggregatedData[source]

Merges two data structures. Raises an exception if different aggregation values were assigned to the same configuration of table, variable, data type and aggregation type.

Parameters:

other (AggregatedData) – The other data structure that should be merged with this one

Returns:

Returns a new merged data structure

Return type:

AggregatedData

class graphxplore.DataMapping.AggregatorParser[source]

Bases: object

This class contains functionality for parsing AggregatorOperator and AggregateConclusion objects from and to string.

static check_compatibility(table: str, var: str, data_type: DataType, aggregator: AggregatorType, list_aggregation_allowed: bool = True) None[source]

Checks if data type and aggregation type match. String values can only be counted or concatenated. For AggregatorOperator the AggregatorType.List type is also valid for all data types.

Parameters:
  • table (str) – The table of variable to aggregate

  • var (str) – The name of the variable to aggregate

  • data_type (DataType) – The data type of values that should be aggregated

  • aggregator (AggregatorType) – The type of aggregation

  • list_aggregation_allowed (bool) – If True the AggregatorType.List type is also valid

Returns:

Return type:

None

static from_string(input_str: str) Tuple[str, str, DataType, AggregatorType] | None[source]

Parses a table, variable, data type and aggregator type from a string. If the string is invalid None is returned.

Parameters:

input_str (str) – The string to parse

Returns:

Returns a tuple with the parsed data, or None if the string could not be parsed

Return type:

Tuple[str, str, DataType, AggregatorType] | None

static get_aggregated_data_type(aggregator: AggregatorType) DataType | None[source]

Returns the data type of the aggregation (not the type of cell values that should be aggregated).

Parameters:

aggregator (AggregatorType) – The type of aggregation

Returns:

Returns the data, or None if the type is AggregatorType.List (is a list, has to basic data type)

Return type:

DataType | None

static to_str(table: str, var: str, data_type: DataType, aggregator: AggregatorType) str[source]

Converts data of AggregatorOperator and AggregateConclusion objects to string.

Parameters:
  • table (str) – The table of variable to aggregate

  • var (str) – The name of the variable to aggregate

  • data_type (DataType) – The data type of values that should be aggregated

  • aggregator (AggregatorType) – The type of aggregation

Returns:

Returns the parsed string

Return type:

str

class graphxplore.DataMapping.AggregatorType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

The type of variable data aggregator.

Amplitude = 'AMPLITUDE'
Concatenate = 'CONCATENATE'
Count = 'COUNT'
List = 'LIST'
Max = 'MAX'
Mean = 'MEAN'
Median = 'MEDIAN'
Min = 'MIN'
Std = 'STDEV'
Sum = 'SUM'
class graphxplore.DataMapping.CSVDataAggregator(data_source: str | Dict[str, List[Dict[str, str]]], meta: MetaData, lattice: MetaLattice, required_vars: Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]], file_encoding: str | None = None)[source]

Bases: DataAggregator

This class inherits from DataAggregator and implements the process by reading data directly from the CSV files.

Parameters:
  • data_source (str | Dict[str, List[Dict[str, str]]]) – The path to a directory with CSV files or a dictionary containing the source dataset

  • meta (MetaData) – The metadata of the whole dataset

  • lattice (MetaLattice) – The lattice that will be traversed in inverse order

  • required_vars (Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]]) – The variables required for data aggregation per table

  • file_encoding (str | None) – Specifies the file encoding of all read CSV tables. Will be detected if not specified, defaults to None

aggregate_data() None[source]

Starts the data aggregation and stores the results.

Return type:

None

class graphxplore.DataMapping.CSVDataFlattener(meta: MetaData, data_source: str | Dict[str, List[Dict[str, str]]], mapping_type: TableMappingType, lattice_config: FlattenerLatticeConfig, file_encoding: str | None = None)[source]

Bases: DataFlattener

This class reads data from CSV files and “flattens” all primary/foreign key relations to single rows given a MetaLattice representing a subset of the source dataset.

Parameters:
  • meta (MetaData) – The metadata of the source data

  • data_source (str | Dict[str, List[Dict[str, str]]]) – The path to a directory with CSV files or a dictionary containing the source dataset

  • mapping_type (TableMappingType) – the mapping type of the currently considered minimal target table

  • lattice_config (FlattenerLatticeConfig) – The lattices and required variables for singular and optionally aggregated source data retrieval

  • file_encoding (str | None) – Specifies the file encoding of all read CSV tables. Will be detected if not specified, defaults to None

class graphxplore.DataMapping.DataAggregator(meta: MetaData, lattice: MetaLattice, required_vars: Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]])[source]

Bases: object

This class gathers data of time series, events or other data associated with the same primary key value. To achieve this, a MetaLattice object is traversed in inverse order (starting from its maximal elements), and the table data is loaded and assigned to each unique primary key value. Data for variables in required_vars is aggregated with the specified AggregatorType and DataType.

Parameters:
  • meta (MetaData) – The metadata of the whole dataset

  • lattice (MetaLattice) – The lattice that will be traversed in inverse order.

  • required_vars (Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]]) – The variables required for data aggregation per table

aggregate_data() None[source]

Starts the data aggregation and stores the results.

Return type:

None

class graphxplore.DataMapping.DataFlattener(source_type: SourceDataType, meta: MetaData, mapping_type: TableMappingType, lattice_config: FlattenerLatticeConfig)[source]

Bases: object

This class is the parent of all classes reading data from a source dataset and resolving all foreign key relations based on one or multiple minimal tables. As a result, the source data is “flattened” to single rows of data instead of spread across multiple tables. The class and all its children functions as an iterable context manager.

Parameters:
  • source_type (SourceDataType) – The type of the source data

  • meta (MetaData) – The metadata of the source data

  • mapping_type (TableMappingType) – the mapping type of the currently considered minimal target table

  • lattice_config (FlattenerLatticeConfig) – The lattices and required variables for singular and optionally aggregated source data retrieval

class graphxplore.DataMapping.DataMapping(source: MetaData, target: MetaData, table_mappings: Mapping[str, TableMapping] | None = None, variable_mappings: Mapping[str, Mapping[str, VariableMapping]] | None = None)[source]

Bases: object

This class summarizes all individual VariableMapping objects for a whole dataset via a dictionary of table -> variable -> VariableMapping

Parameters:
  • source (MetaData) – The MetaData of the source dataset

  • target (MetaData) – The MetaData of the source structure

  • table_mappings (Mapping[str, TableMapping] | None) – The table mapping for each table. Can be filled later, defaults to None.

  • variable_mappings (Mapping[str, Mapping[str, VariableMapping]] | None) – The dictionary of all variable mappings for all tables. Can be filled later, defaults to None.

assign_table_mapping(table: str, table_mapping: TableMapping)[source]

Assign the table mapping for table. This overwrites any existing table mapping

Parameters:
  • table (str) – The table the mapping gets assigned to

  • table_mapping (TableMapping) – The table mapping that gets assigned

assign_variable_mapping(var_mapping: VariableMapping) None[source]

Adds a VariableMapping object to the collection. If a mapping exists already for the target table and variable, it will be overwritten

Parameters:

var_mapping (VariableMapping) – The variable mapping to add

Return type:

None

complete() bool[source]

Checks if all variables of all tables are mapped, meaning they have at least one MappingCase

Returns:

Returns True, if all variables of all tables are mapped

Return type:

bool

foreign_key_is_for_inheritance(table: str, foreign_key: str) bool[source]

Checks if foreign_key is marked for inheritance, i.e its foreign table inherits the table mapping from table

Parameters:
  • table (str) – The target table to check the foreign key for

  • foreign_key (str) – The foreign key, an exception will be raised if this is not a foreign key of table table

Returns:

Returns True if the foreign table of foreign_key is inheriting from table

Return type:

bool

static from_dict(input_dict: dict, source: MetaData, target: MetaData) DataMapping[source]

Reads VariableMapping and TableMapping objects from a dictionary and combines them with the specified source and target MetaData

Parameters:
  • input_dict (dict) – The input dictionary

  • source (MetaData) – The metadata of the source dataset

  • target (MetaData) – The metadata of the target dataset

Returns:

Returns a dictionary containing all mappings

Return type:

DataMapping

static from_json(json_path: str, source: MetaData, target: MetaData, file_encoding: str | None = None) DataMapping[source]

Reads VariableMapping and TableMapping objects from a JSON and combines them with the specified source and target MetaData

Parameters:
  • json_path (str) – Path to the JSON

  • source (MetaData) – The metadata of the source dataset

  • target (MetaData) – The metadata of the target dataset

  • file_encoding (str | None) – file encoding of the JSON

  • file_encoding – file encoding of the JSON

Returns:

Returns a dictionary with all mapping data

Return type:

DataMapping

get_source_tables_for_var_mappings(target_table: str, mapping_to_set: TableMapping | None = None) Tuple[List[str], List[str]][source]

Based on the table mapping of target_table, find all source tables that can be used for variable mappings. To cases are possible: Single value conditionals/conclusion (related source tables and foreign tables, foreign tables of foreign tables, etc.), and source tables that can be used for aggregation (inverted foreign tables of the related source tables, inverted foreign tables of inverted foreign tables, etc.)

Parameters:
  • target_table (str) – The target table for which available source tables should be retrieved

  • mapping_to_set (TableMapping | None) – If the table mapping of target_table is not yet set, you can specify the future mapping here. If this parameter is None, the assigned table mapping will be used. Defaults to None

Returns:

Returns two lists of source tables, one for single value and one for aggregation conditionals/conclusions

Return type:

Tuple[List[str], List[str]]

get_table_mapping(table: str) TableMapping[source]

Returns the table mapping for table if it exists

Parameters:

table (str) – The table to retrieve the mapping for

Returns:

Returns the retrieved mapping or raises an exception if it does not exist

Return type:

TableMapping

get_variable_mapping(table: str, variable: str) VariableMapping[source]

Retrieves the VariableMapping for the given table and variable. Raises an exception if the table or variable does not exist in the collection

Parameters:
  • table (str) – The target table of the variable to map

  • variable (str) – The name of the variable

Returns:

Returns the retrieved variable mapping

Return type:

VariableMapping

table_fully_mapped(table: str) bool[source]

Checks, if all variables of a table are mapped, meaning they have at least one MappingCase

Parameters:

table (str) – The table to check all variables for

Returns:

Returns True, if all variables are mapped

Return type:

bool

to_dict() Dict[str, Dict[str, Dict[str, str | List[Dict[str, str]]]]][source]

Converts the object to a dictionary containing only strings

Returns:

Returns a dictionary containing all mappings

Return type:

Dict[str, Dict[str, Dict[str, str | List[Dict[str, str]]]]]

to_json(json_path: str, file_encoding: str | None = None) None[source]

Stores all variable mappings in a JSON

Parameters:
  • json_path (str) – Path to the JSON

  • file_encoding (str | None) – file encoding that should be used for writing the JSON

Return type:

None

variable_mapped(table: str, variable: str) bool[source]

Checks, if at least one MappingCase is defined for the table and variable. Raises an exception, if the variable and/or table is not present in the mapping

Parameters:
  • table (str) – The table of the variable to check for

  • variable (str) – The variable name to check for

Returns:

Returns True, if the table and variable exist in the mapping and at least one MappingCase was defined

Return type:

bool

variable_should_get_mapped(table: str, variable: str) bool[source]

Checks if a variable mapping should be defined for the variable. All variables should be mapped except primary keys and foreign keys of foreign tables which inherit the table mapping of table

Parameters:
  • table (str) – The table of the variable to check

  • variable (str) – The name of the variable to check

Returns:

Returns True if the variable should have a variable mapping

Return type:

bool

class graphxplore.DataMapping.DataMappingUtils[source]

Bases: object

This class contains static utility methods for data cleaning or adding primary keys.

static add_primary_key(data_source: str | Dict[str, List[Dict[str, str]]], source_table: str, data_target: str | Dict[str, List[Dict[str, str]]], target_table: str, primary_key: str, start_idx: int = 0, file_encoding: str | None = None) int[source]

Adds an integer primary key to each row of the source table and stores the result in a data target.

Parameters:
  • data_source (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where the CSV file is read from or a data dictionary where data is retrieved

  • source_table (str) – The name of the source table

  • data_target (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where the resulting CSV file is written to or a data dictionary where data is inserted

  • target_table (str) – The name of the resulting target table

  • primary_key (str) – The name of the primary key

  • start_idx (int) – The start index for the primary key, defaults to 0

  • file_encoding (str | None) – The file encoding of the CSV file (ascii, utf-8,…) in chardet definition. Is guessed if not specified, defaults to None

Returns:

Returns the largest assigned primary key value

Return type:

int

static copy_dataset(source_meta: MetaData, data_source: str | Dict[str, List[Dict[str, str]]], data_target: str | Dict[str, List[Dict[str, str]]], delete_artifacts: bool = False, source_file_encoding: str | None = None) None[source]

Copies a whole dataset while optionally deleting artifacts.

Parameters:
  • source_meta (MetaData) – The source metadata

  • data_source (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where the CSV files are read from or a data dictionary where data is retrieved

  • data_target (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where the resulting CSV files are written to or a data dictionary where data is inserted

  • delete_artifacts (bool) – If True artifacts are removed while copying

  • source_file_encoding (str | None) – Specifies the file encoding of all source tables, if read from a CSV. Will be detected if not specified, defaults to None

Return type:

None

static copy_variable(source_meta: MetaData, source_table: str, source_var: str, target_meta: MetaData, target_table: str, target_var: str, delete_artifacts: bool = False) VariableMapping[source]

Generates a variable mapping for copying a source variable to a target variable, optionally deleting artifacts. The target variable is added to target_meta if it doesn’t exist yet.

Parameters:
  • source_meta (MetaData) – The source metadata

  • source_table (str) – The source table

  • source_var (str) – The name of the source variable

  • target_meta (MetaData) – The target metadata, target_table must already exist as a table

  • target_table (str) – The target table

  • target_var (str) – The name of the target variable

  • delete_artifacts (bool) – If True artifacts are removed while copying

Returns:

Returns the variable mapping

Return type:

VariableMapping

static get_copy_mapping(source_meta: MetaData, target_meta: MetaData, delete_artifacts: bool = False) DataMapping[source]

Generates mappings for copying all data from a source dataset, optionally deleting artifacts. target_meta is filled with all variables from the source dataset, but tables have to exist already. Foreign key relations are inferred from source_meta if they do not exist already.

Parameters:
  • source_meta (MetaData) – The source metadata

  • target_meta (MetaData) – The target metadata, tables must exist and be identical with the source metadata

  • delete_artifacts (bool) – If True, artifacts are removed while copying

Returns:

Returns the data mapping

Return type:

DataMapping

static pivot_table(source_table: List[Dict[str, str]], index_column: str, value_column: str, to_index: Dict[str, str] | None = None, columns_to_keep: List[str] | None = None) List[Dict[str, str]][source]
Parameters:
  • source_table (List[Dict[str, str]])

  • index_column (str)

  • value_column (str)

  • to_index (Dict[str, str] | None)

  • columns_to_keep (List[str] | None)

Return type:

List[Dict[str, str]]

class graphxplore.DataMapping.DataSegmentor(meta: MetaData, lattice: MetaLattice, inheriting_tables: Dict[str, str], data_target: str | Dict[str, List[Dict[str, str]]], global_unique_keys: bool = False)[source]

Bases: object

This abstract class and all its children are the counterpart of the DataFlattener. They take a line of data and distribute it among the various foreign tables.

Parameters:
  • meta (MetaData) – The metadata of the target dataset

  • lattice (MetaLattice) – The lattice of the whole target dataset

  • inheriting_tables (Dict[str, str]) – The tables (keys of dictionary) for which the primary key should be automatically generated via a uniqueness check, because they inherit the relation from other target tables (value of dictionary)

  • data_target (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where CSV files are written or a data dictionary where data is inserted

  • global_unique_keys (bool) – If True the automatically generated primary key values will be unique across the dataset, defaults to False

write_row(sub_lattice: MetaLattice, row: Dict[str, Dict[str, str | int | float | None]]) None[source]

Takes a single line of data and distributes it among the target dataset

Parameters:
  • sub_lattice (MetaLattice) – The sub-lattice starting at the currently considered target table and containing all inheriting related tables

  • row (Dict[str, Dict[str, str | int | float | None]]) – The line of data to be distributed

Return type:

None

class graphxplore.DataMapping.DataTransformation(data_mapping: DataMapping)[source]

Bases: object

This class conducts the ETL process of transforming the given source dataset to the specified target dataset using the given DataMapping

Parameters:

data_mapping (DataMapping) – The variable mappings

transform_to_target(source_type: SourceDataType, source_specifier: str | Dict[str, List[Dict[str, str]]], data_target: str | Dict[str, List[Dict[str, str]]], global_unique_target_keys: bool = False, source_file_encoding: str | None = None) None[source]

Reads the source data from a directory with CSV files or from a Neo4J database. Transforms the data and writes it to a target directory as CSV files.

Parameters:
  • source_type (SourceDataType) – The type of source data

  • source_specifier (str | Dict[str, List[Dict[str, str]]]) – Either a source directory path, the name of the Neo4J database or a dictionary containing the source data set

  • data_target (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where CSV files are written or a data dictionary where data is inserted

  • global_unique_target_keys (bool) – If True, the generated IDs are unique across all automatically generated primary keys, defaults to False

  • source_file_encoding (str | None) – Specifies the file encoding of all source tables, if read from a CSV. Will be detected if not specified, defaults to None

Return type:

None

class graphxplore.DataMapping.MappingCase(conditional: LogicOperator, conclusion: Conclusion)[source]

Bases: object

This class contains a conditional clause that is checked against the source data and a conclusion generating the target data. It resembles the atomic part of a data mapping process. If the condition is met, the conclusion is processed.

Parameters:
  • conditional (LogicOperator) – The condition that evaluates to True or False

  • conclusion (Conclusion) – The conclusion returning the target data, if conditional evaluates to True on the input data

conclusion: Conclusion
conditional: LogicOperator
static from_dict(input_dict: dict) MappingCase[source]

Generates a MappingCase object from a dictionary.

Parameters:

input_dict (dict) – The input dictionary

Returns:

Returns the generated MappingCase object

Return type:

MappingCase

to_dict() Dict[str, str][source]

Returns a dictionary contained the data of the object

Returns:

Returns the dictionary

Return type:

Dict[str, str]

class graphxplore.DataMapping.MetaLattice(children: Dict[str, List[str]])[source]

Bases: object

This class captures the partial ordering of primary/foreign key relations of tables in a lattice. Here, table y is a child of table x (and x is a parent of y) if x contains the primary key of y as a foreign key. This structure is used to efficiently traverse through the relationships of tables within a dataset

Parameters:

children (Dict[str, List[str]]) – Direct foreign tables for each table

static from_meta_data(meta: MetaData) MetaLattice[source]

Generate a lattice from the primary/foreign key relations specified in a metadata object

Parameters:

meta (MetaData) – The metadata

Returns:

Returns the generated lattice object

Return type:

MetaLattice

get_ancestor_lattice(start_tables: Iterable[str], required_tables: Iterable[str]) MetaLattice[source]

Generates a sub-lattice, starting from start_tables and traversing the lattice in reverse order until all required_tables were found. As a result, tables are added to the sub-lattice if they reference members of start_tables as foreign tables or reference foreign tables with that behaviour. All non-related tables of the overall lattice are removed.

Parameters:
  • start_tables (Iterable[str]) – The tables from which the reverse traversal is started

  • required_tables (Iterable[str]) – All tables that must be contained in the sub-lattice

Returns:

Returns the sub-lattice

Return type:

MetaLattice

get_relatives(start_table: str, upward: bool = True) List[str][source]

Finds all upward or downward relatives of start_table in the lattice (excluding start_table itself).

Parameters:
  • start_table (str) – The table for which the relatives should be found

  • upward (bool) – If True upward foreign table relations are considered, else downward (inverted) relations

Returns:

Returns the list of relative tables

Return type:

List[str]

get_shortest_paths_to_required(start_table: str, required_tables: Iterable[str]) Dict[str, List[str]][source]

Detects the shortest path from start_table through the lattice to all tables in required_tables individually. A BFS strategy with parent storage is applied.

Parameters:
  • start_table (str) – The starting table of the paths

  • required_tables (Iterable[str]) – The tables for which the paths to the root should be calculated

Returns:

Returns a dictionary containing the shortest path as list starting from start_table for each table in required_tables

Return type:

Dict[str, List[str]]

get_sub_lattice_blacklist(min_tables: Iterable[str], exclude_tables: Iterable[str]) MetaLattice[source]

Finds the sub-lattice with the specified minimal table, recursively adding children and stopping at the specified exclusion tables.

Parameters:
  • min_tables (Iterable[str]) – The minimal table of the sub-lattice

  • exclude_tables (Iterable[str]) – Tables that should not be included in the sub-lattice

Returns:

Returns the sub-lattice

Return type:

MetaLattice

get_sub_lattice_from_inheritance(start_table: str, inheriting_tables: Dict[str, str]) MetaLattice[source]

Get the sub-lattice of all tables directly or indirectly inheriting the relation to the source dataset from start_table. If no table inherits from start_table, it will be the only table in the sub-lattice

Parameters:
  • start_table (str) – The table from which all others of the sub-lattice inherit

  • inheriting_tables (Dict[str, str]) – Dictionary of all inheriting tables and the table they directly inherit from

Returns:

Returns the generated sub-lattice

Return type:

MetaLattice

get_sub_lattice_whitelist(min_tables: Iterable[str], required_tables: Iterable[str]) MetaLattice[source]

Finds the sub-lattice with the specified minimal tables containing all required tables. All non-related tables of the overall lattice are removed.

Parameters:
  • min_tables (Iterable[str]) – The minimal tables of the sub-lattice

  • required_tables (Iterable[str]) – All tables that must be contained in the sub-lattice

Returns:

Returns the sub-lattice

Return type:

MetaLattice

has_multi_reference_relative(start_table: str, upward: bool = True) bool[source]

Generates the tree of tables related to start_table by foreign key relation and checks if a table is referenced multiple times. This prevents the flattening of the data to the start_table using the DataFlattener.

Parameters:
  • start_table (str) – The start table for the tree

  • upward (bool) – If True, descendants (referenced by start_table) are checked. Otherwise, ancestors (referencing start_table) are checked. Defaults to True

Returns:

Returns True if a multi reference was found, False otherwise

Return type:

bool

class graphxplore.DataMapping.SourceDataLine(singular_data: Dict[str, Dict[str, str | int | float | None]], aggregated_data: AggregatedData | None = None)[source]

Bases: object

One flattened line of source data optionally containing aggregated data as well.

Parameters:
  • singular_data (Dict[str, Dict[str, str | int | float | None]]) – The flattened line of data

  • aggregated_data (AggregatedData | None) – The data that was aggregated for the root primary key of this line, defaults to None

aggregated_data: AggregatedData | None = None
get_singular_value(table: str, variable: str) str | int | float | None[source]

Retrieves the value of variable contained in this source data line, if variable was not aggregated

Parameters:
  • table (str) – The table of variable

  • variable (str) – The variable name

Returns:

Return type:

str | int | float | None

singular_data: Dict[str, Dict[str, str | int | float | None]]
class graphxplore.DataMapping.SourceDataType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

The type of source data: A directory with CSV files, or a Neo4J database.

CSV = 'CSV'
Database = 'Database'
class graphxplore.DataMapping.TableMapping(type: ~graphxplore.DataMapping.data_structure_transformer.TableMappingType | None = None, source_tables: ~typing.List[str] = <factory>, to_inherit: str | None = None, condition: ~graphxplore.DataMapping.Conditionals.logic_operators.LogicOperator = <graphxplore.DataMapping.Conditionals.logic_operators.AlwaysTrueOperator object>)[source]

Bases: object

Each target table x must have some relationship to one or multiple source tables. Using this relationship, single units of source data are formed. Variable mappings are applied to these units to form a single output row of x. Variables of the related source tables and their foreign tables (and their foreign tables, and so on…) will have a single value (might be a missing value) in this unit of source data. These variables are called singular variables. Variables of inverted foreign tables (a is an inverted foreign table of b, if b is a foreign table of a), might have multiple values in a unit of source data (e.g. timeseries, or multiple blood measurements for a single patient). They are called aggregate variables. For a table mapping you have the following options:

  • x has a one-to-one relationship with a single source table y. Primary key values are copied from y to x. A unit of source data is formed by a single row of y and rows from foreign tables and/or inverted foreign tables of y. (Most common option)

  • x has a one-to-many relationship with multiple source tables. The data of the source tables (and foreign tables or inverted foreign tables) will be combined to form a single unit of source data. This can be done in two ways:

    • The data of the source tables can be merged. Here, data rows from different source tables are combined to a single unit, if the row’s primary key values are identical. If a primary key value of a source table has no analog in another source table, its row is taken independently.

    • The data of the source tables can be concatenated. Here, the source tables are processed independently one after the other to form units of source data together with their foreign tables or inverted foreign tables. The primary key values of x will be 0-indexed integers.

  • If x is a foreign table of another target table x’, the relationship to source tables can be inherited from x’. If x’ itself inherits the relationship of another target table x’’, this inheritance is propagated to x. The primary key values of x will be 0-indexed integers and all its rows will be de-duplicated. The primary key values of x will be used as foreign key values in x’.

Optionally, you can define a condition to filter out units of source data that should not be considered in the mapping. If the condition evaluates to False for a unit of source data, it is fully removed from the transformation process of this target table. By default, the AlwaysTrueOperator is used and all source data is taken into the transformation

Parameters:
condition: LogicOperator = <graphxplore.DataMapping.Conditionals.logic_operators.AlwaysTrueOperator object>
static from_dict(input_dict: Dict[str, str | List[str] | None]) TableMapping[source]
Parameters:

input_dict (Dict[str, str | List[str] | None])

Return type:

TableMapping

source_tables: List[str]
to_dict() Dict[str, str | List[str] | None][source]
Return type:

Dict[str, str | List[str] | None]

to_inherit: str | None = None
type: TableMappingType | None = None
class graphxplore.DataMapping.TableMappingType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

The type of mapping for a target table: One-to-one relation to a single source table, or one-to-many relation to multiple source tables by merging (combination of rows with same primary key value) or concatenation (processing each table independently. Lastly, the mapping relation can be inherited from an ancestor source table (inverted foreign table chain).

Concatenate = 'Concatenate'
Inherited = 'Inherited'
Merge = 'Merge'
OneToOne = 'OneToOne'
class graphxplore.DataMapping.VariableMapping(target_table: str, target_variable: str, cases: List[MappingCase])[source]

Bases: object

This class contains all data required for the data mapping of one target variable.

Parameters:
  • target_table (str) – The table of the target variable

  • target_variable (str) – The name of the target variable

  • cases (List[MappingCase]) – The mapping cases (input order sensitive)

add_case(case: MappingCase) None[source]

Adds a mapping case to the mapping at the last position. The required tables and variables are added to self.sources

Parameters:

case (MappingCase) – The mapping case to add

Return type:

None

static from_dict(input_dict: Dict[str, str | List[Dict[str, str]]]) VariableMapping[source]

Generates a VariableMapping object from a dictionary.

Parameters:

input_dict (Dict[str, str | List[Dict[str, str]]])

Returns:

Returns the generated VariableMapping object

Return type:

VariableMapping

remove_case(case_idx: int) None[source]

Removes a mapping case by index. self.sources is updated as well

Parameters:

case_idx (int) – The index of the case to remove

Return type:

None

to_dict() Dict[str, str | List[Dict[str, str]]][source]

Converts the object to a dictionary.

Returns:

Returns the object’s dictionary

Return type:

Dict[str, str | List[Dict[str, str]]]