graphxplore.DataMapping package
This subpackage can be used to clean your data from artifacts or conduct more complex ETL processes. The workflows of this package are independent of the graphxplore.GraphTranslation package and graphxplore.GraphDataScience package and can be used without the necessity for graph-based data representation. All ETL processes can be stored as JSON files for reusability.
The central class is DataMapping. It contains the
MetaData objects of the source dataset and the target data structure.
It contains mappings between the source and target tables as TableMapping objects.
These can describe one-to-one, one-to-many, many-to-one, or many-to-many relationships.
Mapping rules on the variable-level are stored as VariableMapping objects. They must
be defined for each variable (with the exception of primary keys and some foreign keys) of the target data structure.
Each VariableMapping contains one or multiple
MappingCase objects which in turn each contain a
LogicOperator and a
Conclusion. The MappingCase objects are
checked in input order. If the function valid() of the
LogicOperator returns True on the
given unit of source data, get_return() of the
Conclusion is triggered. If
valid returns False the next
MappingCase is checked. If no conditional is met, None is returned. Code for a
single VariableMapping could look like
>>> from graphxplore.MetaDataHandling import DataType
>>> from graphxplore.DataMapping import VariableMapping, MappingCase, SourceDataLine
>>> from graphxplore.DataMapping.Conditionals import (StringOperator, StringOperatorType, MetricOperator,
>>> MetricOperatorType, AndOperator, AlwaysTrueOperator)
>>> from graphxplore.DataMapping.Conclusions import CopyConclusion, FixedReturnConclusion
# value for variable 'var' is decimal, larger than 0 and value for variable 'another_val' is a string and contains 'nana'
# then copy value for variable 'var'
>>> first_case = MappingCase(conditional=AndOperator(sub_operators=[
>>> MetricOperator(table='FirstSourceTable', variable='var', value=0, data_type=DataType.Decimal,
>>> compare=MetricOperatorType.Larger),
>>> StringOperator(table='SecondSourceTable', variable='another_var', value='nana',
>>> compare=StringOperatorType.Contains)
>>> ]), conclusion=CopyConclusion(target_data_type=DataType.Decimal,
>>> origin_table='FirstSourceTable', var_to_copy='var'))
# always return 0.0
>>> second_case = MappingCase(conditional=AlwaysTrueOperator(), conclusion=FixedReturnConclusion(DataType.Decimal,
>>> return_val=0.0))
>>> var_mapping = VariableMapping(target_table='TargetTable', target_variable='target_var',
>>> cases=[first_case, second_case])
>>> source_line = SourceDataLine({'FirstSourceTable' : {'var' : 1.5}, 'SecondSourceTable' : {'another_var' : 'banana'}})
>>> var_mapping[source_line]
# first case is met
1.5
>>> source_line = SourceDataLine({'FirstSourceTable' : {'var' : -7.8}, 'SecondSourceTable' : {'another_var' : 'banana'}})
>>> var_mapping[source_line]
# first case not met because 'var' is negative -> second case is executed
0.0
>>> source_line = SourceDataLine({'FirstSourceTable' : {'var' : 1.5}, 'SecondSourceTable' : {'another_var' : None}})
>>> var_mapping[source_line]
# first case not met because 'another_var' has a missing value -> second case is executed
0.0
You can see from this code snippet that data from different source tables can be combined. This is achieved by
gathering all source data that is associated with one primary key pk in table t with value x into a single
SourceDataLine using the foreign key relations within the source dataset which are
captured in a MetaLattice objects. Data from
other tables that appear as foreign tables in t (or related across multiple tables) can be seen as a property of pk
and directly added to the SourceDataLine for x. Data from tables that reference
themselves pk as a foreign key can be aggregated with
AggregatorOperator or
AggregateConclusion objects. Here, all data from lines where x is a
foreign key value for pk (or across multiple tables) is gathered and some aggregate calculated (e.g. count,
minimal value, etc.). This can be useful e.g. for aggregation of time series.
This automation strategy is enabled by the TableMapping which must be defined for
each target table. Examples might look like:
>>> from graphxplore.MetaDataHandling import MetaData, DataType
>>> from graphxplore.DataMapping import TableMapping, MappingCase, TableMappingType, DataMapping
>>> from graphxplore.DataMapping.Conditionals import MetricOperator, MetricOperatorType, AlwaysTrueOperator
>>>
>>> source_meta = MetaData.load_from_json(filepath='/source_meta.json')
>>> target_meta = MetaData.load_from_json(filepath='/target_meta.json')
>>> data_mapping = DataMapping(source=source_meta, target=target_meta)
# one-to-one mapping between 'SourceTable' and 'TargetTable'
# each source data line will contain a row of 'SourceTable' and one row of each foreign table (potentially across
# foreign table chains) of the corresponding foreign key value
>>> one_to_one = TableMapping(type=TableMappingType.OneToOne, source_tables=['SourceTable'])
>>> data_mapping.assign_table_mapping(table='TargetTable', table_mapping=one_to_one)
# again a one-to-one mapping, but with an added condition. If this condition is not met for a source data line, the
# whole line will be skipped. Adding condition is possible for all table mapping types except inherited table mappings
>>> added_condition = MetricOperator(table='ForeignSourceTable', variable='var', value=0,
>>> data_type=DataType.Integer, compare=MetricOperatorType.Equals)
>>> filtered_one_to_one = TableMapping(type=TableMappingType.OneToOne, source_tables=['SourceTable'],
>>> condition=added_condition)
>>> data_mapping.assign_table_mapping(table='TargetTable', table_mapping=filtered_one_to_one)
# merge the the of two source tables 'FirstSourceTable' and 'SecondSourceTable' into a single
# target table 'TargetTable' (many-to-one). SourceDataLine objects from 'FirstSourceTable' and 'SecondSourceTable'
# with the same primary key value are merged. This way, data rows from multiple source tables can be combined into
# one target data row
>>> merge = TableMapping(TableMappingType.Merge, source_tables=['FirstSourceTable', 'SecondSourceTable'])
>>> data_mapping.assign_table_mapping(table='TargetTable', table_mapping=merge)
# data from two source tables 'FirstSourceTable' and 'SecondSourceTable' is processed one after another into
# SourceDataLine objects and concatenated into a single target table 'TargetTable' (many-to-one). No merging of
# source data rows is conducted
>>> concatenate = TableMapping(TableMappingType.Concatenate, source_tables=['FirstSourceTable', 'SecondSourceTable'])
>>> data_mapping.assign_table_mapping(table='TargetTable', table_mapping=concatenate)
# If 'ForeignTargetTable' is a foreign table (or foreign table of foreign table ...) of 'TargetTable', it can
# inherit the mapping type of 'TargetTable'. the rows of both tables will be created together and thus the
# result data will be split. This can be useful to make the target dataset for manageable. If the relation of
# 'TargetTable' is 'one-to-one', this will become 'one-to-many'. If its relation is 'many-to-one', it will become
# 'many-to-many'.
>>> inherited = TableMapping(TableMappingType.Inherited, to_inherit='TargetTable')
>>> data_mapping.assign_table_mapping(table='ForeignTargetTable', table_mapping=inherited)
Data mapping can be quite complex and there exist many functionalities in this subpackage.
DataMappingUtils can be used to ease some common workflows. For further impressions
check out test/DataMapping/test_data_mapping.py in the
graphxplore Github repository.
Submodules
Module contents
- class graphxplore.DataMapping.AggregatedData[source]
Bases:
objectThis class stores all aggregated data (of other variables) for a fixed primary key value.
- add_variable_aggregation(table: str, variable: str, data_type: DataType, agg_type: AggregatorType, value: str | int | float | Set[str] | None) None[source]
Adds a data aggregation value for a specific variable, data type and aggregation type for this specific primary key value.
- Parameters:
table (str) – The table of the aggregated variable
variable (str) – The name of the aggregated variable
data_type (DataType) – The data type of values that were aggregated
agg_type (AggregatorType) – The type of aggregation
value (str | int | float | Set[str] | None) – The aggregated value or
Noneif no data was aggregated
- Return type:
None
- exists(table: str, variable: str, data_type: DataType, agg_type: AggregatorType) bool[source]
Checks if some value or
Noneexists for this table, variable, data type and aggregation type in the data structure.- Parameters:
table (str) – The table of the aggregated variable
variable (str) – The name of the aggregated variable
data_type (DataType) – The data type of values that were aggregated
agg_type (AggregatorType) – The type of aggregation
- Returns:
Returns
Trueif some value orNoneexists in the data structure for the specified parameters- Return type:
bool
- get_variable_aggregation(table: str, variable: str, data_type: DataType, agg_type: AggregatorType) str | int | float | Set[str] | None[source]
Returns data aggregation value for a specific variable, data type and aggregation type.
- Parameters:
table (str) – The table of the aggregated variable
variable (str) – The name of the aggregated variable
data_type (DataType) – The data type of values that were aggregated
agg_type (AggregatorType) – The type of aggregation
- Returns:
Returns the aggregated value or
Noneif no data was aggregated- Return type:
str | int | float | Set[str] | None
- merge(other: AggregatedData) AggregatedData[source]
Merges two data structures. Raises an exception if different aggregation values were assigned to the same configuration of table, variable, data type and aggregation type.
- Parameters:
other (AggregatedData) – The other data structure that should be merged with this one
- Returns:
Returns a new merged data structure
- Return type:
- class graphxplore.DataMapping.AggregatorParser[source]
Bases:
objectThis class contains functionality for parsing
AggregatorOperatorandAggregateConclusionobjects from and to string.- static check_compatibility(table: str, var: str, data_type: DataType, aggregator: AggregatorType, list_aggregation_allowed: bool = True) None[source]
Checks if data type and aggregation type match. String values can only be counted or concatenated. For
AggregatorOperatortheAggregatorType.Listtype is also valid for all data types.- Parameters:
table (str) – The table of variable to aggregate
var (str) – The name of the variable to aggregate
data_type (DataType) – The data type of values that should be aggregated
aggregator (AggregatorType) – The type of aggregation
list_aggregation_allowed (bool) – If
TruetheAggregatorType.Listtype is also valid
- Returns:
- Return type:
None
- static from_string(input_str: str) Tuple[str, str, DataType, AggregatorType] | None[source]
Parses a table, variable, data type and aggregator type from a string. If the string is invalid
Noneis returned.- Parameters:
input_str (str) – The string to parse
- Returns:
Returns a tuple with the parsed data, or
Noneif the string could not be parsed- Return type:
Tuple[str, str, DataType, AggregatorType] | None
- static get_aggregated_data_type(aggregator: AggregatorType) DataType | None[source]
Returns the data type of the aggregation (not the type of cell values that should be aggregated).
- Parameters:
aggregator (AggregatorType) – The type of aggregation
- Returns:
Returns the data, or
Noneif the type isAggregatorType.List(is a list, has to basic data type)- Return type:
DataType | None
- static to_str(table: str, var: str, data_type: DataType, aggregator: AggregatorType) str[source]
Converts data of
AggregatorOperatorandAggregateConclusionobjects to string.- Parameters:
table (str) – The table of variable to aggregate
var (str) – The name of the variable to aggregate
data_type (DataType) – The data type of values that should be aggregated
aggregator (AggregatorType) – The type of aggregation
- Returns:
Returns the parsed string
- Return type:
str
- class graphxplore.DataMapping.AggregatorType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str,EnumThe type of variable data aggregator.
- Amplitude = 'AMPLITUDE'
- Concatenate = 'CONCATENATE'
- Count = 'COUNT'
- List = 'LIST'
- Max = 'MAX'
- Mean = 'MEAN'
- Median = 'MEDIAN'
- Min = 'MIN'
- Std = 'STDEV'
- Sum = 'SUM'
- class graphxplore.DataMapping.CSVDataAggregator(data_source: str | Dict[str, List[Dict[str, str]]], meta: MetaData, lattice: MetaLattice, required_vars: Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]], file_encoding: str | None = None)[source]
Bases:
DataAggregatorThis class inherits from
DataAggregatorand implements the process by reading data directly from the CSV files.- Parameters:
data_source (str | Dict[str, List[Dict[str, str]]]) – The path to a directory with CSV files or a dictionary containing the source dataset
meta (MetaData) – The metadata of the whole dataset
lattice (MetaLattice) – The lattice that will be traversed in inverse order
required_vars (Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]]) – The variables required for data aggregation per table
file_encoding (str | None) – Specifies the file encoding of all read CSV tables. Will be detected if not specified, defaults to None
- class graphxplore.DataMapping.CSVDataFlattener(meta: MetaData, data_source: str | Dict[str, List[Dict[str, str]]], mapping_type: TableMappingType, lattice_config: FlattenerLatticeConfig, file_encoding: str | None = None)[source]
Bases:
DataFlattenerThis class reads data from CSV files and “flattens” all primary/foreign key relations to single rows given a
MetaLatticerepresenting a subset of the source dataset.- Parameters:
meta (MetaData) – The metadata of the source data
data_source (str | Dict[str, List[Dict[str, str]]]) – The path to a directory with CSV files or a dictionary containing the source dataset
mapping_type (TableMappingType) – the mapping type of the currently considered minimal target table
lattice_config (FlattenerLatticeConfig) – The lattices and required variables for singular and optionally aggregated source data retrieval
file_encoding (str | None) – Specifies the file encoding of all read CSV tables. Will be detected if not specified, defaults to None
- class graphxplore.DataMapping.DataAggregator(meta: MetaData, lattice: MetaLattice, required_vars: Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]])[source]
Bases:
objectThis class gathers data of time series, events or other data associated with the same primary key value. To achieve this, a
MetaLatticeobject is traversed in inverse order (starting from its maximal elements), and the table data is loaded and assigned to each unique primary key value. Data for variables inrequired_varsis aggregated with the specifiedAggregatorTypeandDataType.- Parameters:
meta (MetaData) – The metadata of the whole dataset
lattice (MetaLattice) – The lattice that will be traversed in inverse order.
required_vars (Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]]) – The variables required for data aggregation per table
- class graphxplore.DataMapping.DataFlattener(source_type: SourceDataType, meta: MetaData, mapping_type: TableMappingType, lattice_config: FlattenerLatticeConfig)[source]
Bases:
objectThis class is the parent of all classes reading data from a source dataset and resolving all foreign key relations based on one or multiple minimal tables. As a result, the source data is “flattened” to single rows of data instead of spread across multiple tables. The class and all its children functions as an iterable context manager.
- Parameters:
source_type (SourceDataType) – The type of the source data
meta (MetaData) – The metadata of the source data
mapping_type (TableMappingType) – the mapping type of the currently considered minimal target table
lattice_config (FlattenerLatticeConfig) – The lattices and required variables for singular and optionally aggregated source data retrieval
- class graphxplore.DataMapping.DataMapping(source: MetaData, target: MetaData, table_mappings: Mapping[str, TableMapping] | None = None, variable_mappings: Mapping[str, Mapping[str, VariableMapping]] | None = None)[source]
Bases:
objectThis class summarizes all individual
VariableMappingobjects for a whole dataset via a dictionary of table -> variable ->VariableMapping- Parameters:
table_mappings (Mapping[str, TableMapping] | None) – The table mapping for each table. Can be filled later, defaults to
None.variable_mappings (Mapping[str, Mapping[str, VariableMapping]] | None) – The dictionary of all variable mappings for all tables. Can be filled later, defaults to
None.
- assign_table_mapping(table: str, table_mapping: TableMapping)[source]
Assign the table mapping for
table. This overwrites any existing table mapping- Parameters:
table (str) – The table the mapping gets assigned to
table_mapping (TableMapping) – The table mapping that gets assigned
- assign_variable_mapping(var_mapping: VariableMapping) None[source]
Adds a
VariableMappingobject to the collection. If a mapping exists already for the target table and variable, it will be overwritten- Parameters:
var_mapping (VariableMapping) – The variable mapping to add
- Return type:
None
- complete() bool[source]
Checks if all variables of all tables are mapped, meaning they have at least one
MappingCase- Returns:
Returns
True, if all variables of all tables are mapped- Return type:
bool
- foreign_key_is_for_inheritance(table: str, foreign_key: str) bool[source]
Checks if
foreign_keyis marked for inheritance, i.e its foreign table inherits the table mapping fromtable- Parameters:
table (str) – The target table to check the foreign key for
foreign_key (str) – The foreign key, an exception will be raised if this is not a foreign key of table
table
- Returns:
Returns
Trueif the foreign table offoreign_keyis inheriting fromtable- Return type:
bool
- static from_dict(input_dict: dict, source: MetaData, target: MetaData) DataMapping[source]
Reads
VariableMappingandTableMappingobjects from a dictionary and combines them with the specified source and targetMetaData- Parameters:
- Returns:
Returns a dictionary containing all mappings
- Return type:
- static from_json(json_path: str, source: MetaData, target: MetaData, file_encoding: str | None = None) DataMapping[source]
Reads
VariableMappingandTableMappingobjects from a JSON and combines them with the specified source and targetMetaData- Parameters:
- Returns:
Returns a dictionary with all mapping data
- Return type:
- get_source_tables_for_var_mappings(target_table: str, mapping_to_set: TableMapping | None = None) Tuple[List[str], List[str]][source]
Based on the table mapping of
target_table, find all source tables that can be used for variable mappings. To cases are possible: Single value conditionals/conclusion (related source tables and foreign tables, foreign tables of foreign tables, etc.), and source tables that can be used for aggregation (inverted foreign tables of the related source tables, inverted foreign tables of inverted foreign tables, etc.)- Parameters:
target_table (str) – The target table for which available source tables should be retrieved
mapping_to_set (TableMapping | None) – If the table mapping of
target_tableis not yet set, you can specify the future mapping here. If this parameter is None, the assigned table mapping will be used. Defaults to None
- Returns:
Returns two lists of source tables, one for single value and one for aggregation conditionals/conclusions
- Return type:
Tuple[List[str], List[str]]
- get_table_mapping(table: str) TableMapping[source]
Returns the table mapping for
tableif it exists- Parameters:
table (str) – The table to retrieve the mapping for
- Returns:
Returns the retrieved mapping or raises an exception if it does not exist
- Return type:
- get_variable_mapping(table: str, variable: str) VariableMapping[source]
Retrieves the
VariableMappingfor the given table and variable. Raises an exception if the table or variable does not exist in the collection- Parameters:
table (str) – The target table of the variable to map
variable (str) – The name of the variable
- Returns:
Returns the retrieved variable mapping
- Return type:
- table_fully_mapped(table: str) bool[source]
Checks, if all variables of a table are mapped, meaning they have at least one
MappingCase- Parameters:
table (str) – The table to check all variables for
- Returns:
Returns
True, if all variables are mapped- Return type:
bool
- to_dict() Dict[str, Dict[str, Dict[str, str | List[Dict[str, str]]]]][source]
Converts the object to a dictionary containing only strings
- Returns:
Returns a dictionary containing all mappings
- Return type:
Dict[str, Dict[str, Dict[str, str | List[Dict[str, str]]]]]
- to_json(json_path: str, file_encoding: str | None = None) None[source]
Stores all variable mappings in a JSON
- Parameters:
json_path (str) – Path to the JSON
file_encoding (str | None) – file encoding that should be used for writing the JSON
- Return type:
None
- variable_mapped(table: str, variable: str) bool[source]
Checks, if at least one
MappingCaseis defined for the table and variable. Raises an exception, if the variable and/or table is not present in the mapping- Parameters:
table (str) – The table of the variable to check for
variable (str) – The variable name to check for
- Returns:
Returns
True, if the table and variable exist in the mapping and at least oneMappingCasewas defined- Return type:
bool
- variable_should_get_mapped(table: str, variable: str) bool[source]
Checks if a variable mapping should be defined for the variable. All variables should be mapped except primary keys and foreign keys of foreign tables which inherit the table mapping of
table- Parameters:
table (str) – The table of the variable to check
variable (str) – The name of the variable to check
- Returns:
Returns
Trueif the variable should have a variable mapping- Return type:
bool
- class graphxplore.DataMapping.DataMappingUtils[source]
Bases:
objectThis class contains static utility methods for data cleaning or adding primary keys.
- static add_primary_key(data_source: str | Dict[str, List[Dict[str, str]]], source_table: str, data_target: str | Dict[str, List[Dict[str, str]]], target_table: str, primary_key: str, start_idx: int = 0, file_encoding: str | None = None) int[source]
Adds an integer primary key to each row of the source table and stores the result in a data target.
- Parameters:
data_source (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where the CSV file is read from or a data dictionary where data is retrieved
source_table (str) – The name of the source table
data_target (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where the resulting CSV file is written to or a data dictionary where data is inserted
target_table (str) – The name of the resulting target table
primary_key (str) – The name of the primary key
start_idx (int) – The start index for the primary key, defaults to 0
file_encoding (str | None) – The file encoding of the CSV file (ascii, utf-8,…) in chardet definition. Is guessed if not specified, defaults to None
- Returns:
Returns the largest assigned primary key value
- Return type:
int
- static copy_dataset(source_meta: MetaData, data_source: str | Dict[str, List[Dict[str, str]]], data_target: str | Dict[str, List[Dict[str, str]]], delete_artifacts: bool = False, source_file_encoding: str | None = None) None[source]
Copies a whole dataset while optionally deleting artifacts.
- Parameters:
source_meta (MetaData) – The source metadata
data_source (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where the CSV files are read from or a data dictionary where data is retrieved
data_target (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where the resulting CSV files are written to or a data dictionary where data is inserted
delete_artifacts (bool) – If
Trueartifacts are removed while copyingsource_file_encoding (str | None) – Specifies the file encoding of all source tables, if read from a CSV. Will be detected if not specified, defaults to
None
- Return type:
None
- static copy_variable(source_meta: MetaData, source_table: str, source_var: str, target_meta: MetaData, target_table: str, target_var: str, delete_artifacts: bool = False) VariableMapping[source]
Generates a variable mapping for copying a source variable to a target variable, optionally deleting artifacts. The target variable is added to
target_metaif it doesn’t exist yet.- Parameters:
source_meta (MetaData) – The source metadata
source_table (str) – The source table
source_var (str) – The name of the source variable
target_meta (MetaData) – The target metadata,
target_tablemust already exist as a tabletarget_table (str) – The target table
target_var (str) – The name of the target variable
delete_artifacts (bool) – If
Trueartifacts are removed while copying
- Returns:
Returns the variable mapping
- Return type:
- static get_copy_mapping(source_meta: MetaData, target_meta: MetaData, delete_artifacts: bool = False) DataMapping[source]
Generates mappings for copying all data from a source dataset, optionally deleting artifacts.
target_metais filled with all variables from the source dataset, but tables have to exist already. Foreign key relations are inferred fromsource_metaif they do not exist already.- Parameters:
- Returns:
Returns the data mapping
- Return type:
- static pivot_table(source_table: List[Dict[str, str]], index_column: str, value_column: str, to_index: Dict[str, str] | None = None, columns_to_keep: List[str] | None = None) List[Dict[str, str]][source]
- Parameters:
source_table (List[Dict[str, str]])
index_column (str)
value_column (str)
to_index (Dict[str, str] | None)
columns_to_keep (List[str] | None)
- Return type:
List[Dict[str, str]]
- class graphxplore.DataMapping.DataSegmentor(meta: MetaData, lattice: MetaLattice, inheriting_tables: Dict[str, str], data_target: str | Dict[str, List[Dict[str, str]]], global_unique_keys: bool = False)[source]
Bases:
objectThis abstract class and all its children are the counterpart of the
DataFlattener. They take a line of data and distribute it among the various foreign tables.- Parameters:
meta (MetaData) – The metadata of the target dataset
lattice (MetaLattice) – The lattice of the whole target dataset
inheriting_tables (Dict[str, str]) – The tables (keys of dictionary) for which the primary key should be automatically generated via a uniqueness check, because they inherit the relation from other target tables (value of dictionary)
data_target (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where CSV files are written or a data dictionary where data is inserted
global_unique_keys (bool) – If
Truethe automatically generated primary key values will be unique across the dataset, defaults toFalse
- write_row(sub_lattice: MetaLattice, row: Dict[str, Dict[str, str | int | float | None]]) None[source]
Takes a single line of data and distributes it among the target dataset
- Parameters:
sub_lattice (MetaLattice) – The sub-lattice starting at the currently considered target table and containing all inheriting related tables
row (Dict[str, Dict[str, str | int | float | None]]) – The line of data to be distributed
- Return type:
None
- class graphxplore.DataMapping.DataTransformation(data_mapping: DataMapping)[source]
Bases:
objectThis class conducts the ETL process of transforming the given source dataset to the specified target dataset using the given
DataMapping- Parameters:
data_mapping (DataMapping) – The variable mappings
- transform_to_target(source_type: SourceDataType, source_specifier: str | Dict[str, List[Dict[str, str]]], data_target: str | Dict[str, List[Dict[str, str]]], global_unique_target_keys: bool = False, source_file_encoding: str | None = None) None[source]
Reads the source data from a directory with CSV files or from a Neo4J database. Transforms the data and writes it to a target directory as CSV files.
- Parameters:
source_type (SourceDataType) – The type of source data
source_specifier (str | Dict[str, List[Dict[str, str]]]) – Either a source directory path, the name of the Neo4J database or a dictionary containing the source data set
data_target (str | Dict[str, List[Dict[str, str]]]) – The path to a directory where CSV files are written or a data dictionary where data is inserted
global_unique_target_keys (bool) – If
True, the generated IDs are unique across all automatically generated primary keys, defaults toFalsesource_file_encoding (str | None) – Specifies the file encoding of all source tables, if read from a CSV. Will be detected if not specified, defaults to
None
- Return type:
None
- class graphxplore.DataMapping.MappingCase(conditional: LogicOperator, conclusion: Conclusion)[source]
Bases:
objectThis class contains a conditional clause that is checked against the source data and a conclusion generating the target data. It resembles the atomic part of a data mapping process. If the condition is met, the conclusion is processed.
- Parameters:
conditional (LogicOperator) – The condition that evaluates to
TrueorFalseconclusion (Conclusion) – The conclusion returning the target data, if
conditionalevaluates toTrueon the input data
- conclusion: Conclusion
- conditional: LogicOperator
- static from_dict(input_dict: dict) MappingCase[source]
Generates a
MappingCaseobject from a dictionary.- Parameters:
input_dict (dict) – The input dictionary
- Returns:
Returns the generated
MappingCaseobject- Return type:
- class graphxplore.DataMapping.MetaLattice(children: Dict[str, List[str]])[source]
Bases:
objectThis class captures the partial ordering of primary/foreign key relations of tables in a lattice. Here, table y is a child of table x (and x is a parent of y) if x contains the primary key of y as a foreign key. This structure is used to efficiently traverse through the relationships of tables within a dataset
- Parameters:
children (Dict[str, List[str]]) – Direct foreign tables for each table
- static from_meta_data(meta: MetaData) MetaLattice[source]
Generate a lattice from the primary/foreign key relations specified in a metadata object
- Parameters:
meta (MetaData) – The metadata
- Returns:
Returns the generated lattice object
- Return type:
- get_ancestor_lattice(start_tables: Iterable[str], required_tables: Iterable[str]) MetaLattice[source]
Generates a sub-lattice, starting from
start_tablesand traversing the lattice in reverse order until allrequired_tableswere found. As a result, tables are added to the sub-lattice if they reference members ofstart_tablesas foreign tables or reference foreign tables with that behaviour. All non-related tables of the overall lattice are removed.- Parameters:
start_tables (Iterable[str]) – The tables from which the reverse traversal is started
required_tables (Iterable[str]) – All tables that must be contained in the sub-lattice
- Returns:
Returns the sub-lattice
- Return type:
- get_relatives(start_table: str, upward: bool = True) List[str][source]
Finds all upward or downward relatives of
start_tablein the lattice (excludingstart_tableitself).- Parameters:
start_table (str) – The table for which the relatives should be found
upward (bool) – If
Trueupward foreign table relations are considered, else downward (inverted) relations
- Returns:
Returns the list of relative tables
- Return type:
List[str]
- get_shortest_paths_to_required(start_table: str, required_tables: Iterable[str]) Dict[str, List[str]][source]
Detects the shortest path from
start_tablethrough the lattice to all tables inrequired_tablesindividually. A BFS strategy with parent storage is applied.- Parameters:
start_table (str) – The starting table of the paths
required_tables (Iterable[str]) – The tables for which the paths to the root should be calculated
- Returns:
Returns a dictionary containing the shortest path as list starting from
start_tablefor each table inrequired_tables- Return type:
Dict[str, List[str]]
- get_sub_lattice_blacklist(min_tables: Iterable[str], exclude_tables: Iterable[str]) MetaLattice[source]
Finds the sub-lattice with the specified minimal table, recursively adding children and stopping at the specified exclusion tables.
- Parameters:
min_tables (Iterable[str]) – The minimal table of the sub-lattice
exclude_tables (Iterable[str]) – Tables that should not be included in the sub-lattice
- Returns:
Returns the sub-lattice
- Return type:
- get_sub_lattice_from_inheritance(start_table: str, inheriting_tables: Dict[str, str]) MetaLattice[source]
Get the sub-lattice of all tables directly or indirectly inheriting the relation to the source dataset from
start_table. If no table inherits fromstart_table, it will be the only table in the sub-lattice- Parameters:
start_table (str) – The table from which all others of the sub-lattice inherit
inheriting_tables (Dict[str, str]) – Dictionary of all inheriting tables and the table they directly inherit from
- Returns:
Returns the generated sub-lattice
- Return type:
- get_sub_lattice_whitelist(min_tables: Iterable[str], required_tables: Iterable[str]) MetaLattice[source]
Finds the sub-lattice with the specified
minimal tablescontaining all required tables. All non-related tables of the overall lattice are removed.- Parameters:
min_tables (Iterable[str]) – The minimal tables of the sub-lattice
required_tables (Iterable[str]) – All tables that must be contained in the sub-lattice
- Returns:
Returns the sub-lattice
- Return type:
- has_multi_reference_relative(start_table: str, upward: bool = True) bool[source]
Generates the tree of tables related to
start_tableby foreign key relation and checks if a table is referenced multiple times. This prevents the flattening of the data to thestart_tableusing theDataFlattener.- Parameters:
start_table (str) – The start table for the tree
upward (bool) – If
True, descendants (referenced by start_table) are checked. Otherwise, ancestors (referencingstart_table) are checked. Defaults toTrue
- Returns:
Returns
Trueif a multi reference was found,Falseotherwise- Return type:
bool
- class graphxplore.DataMapping.SourceDataLine(singular_data: Dict[str, Dict[str, str | int | float | None]], aggregated_data: AggregatedData | None = None)[source]
Bases:
objectOne flattened line of source data optionally containing aggregated data as well.
- Parameters:
singular_data (Dict[str, Dict[str, str | int | float | None]]) – The flattened line of data
aggregated_data (AggregatedData | None) – The data that was aggregated for the root primary key of this line, defaults to
None
- aggregated_data: AggregatedData | None = None
- get_singular_value(table: str, variable: str) str | int | float | None[source]
Retrieves the value of
variablecontained in this source data line, ifvariablewas not aggregated- Parameters:
table (str) – The table of
variablevariable (str) – The variable name
- Returns:
- Return type:
str | int | float | None
- singular_data: Dict[str, Dict[str, str | int | float | None]]
- class graphxplore.DataMapping.SourceDataType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str,EnumThe type of source data: A directory with CSV files, or a Neo4J database.
- CSV = 'CSV'
- Database = 'Database'
- class graphxplore.DataMapping.TableMapping(type: ~graphxplore.DataMapping.data_structure_transformer.TableMappingType | None = None, source_tables: ~typing.List[str] = <factory>, to_inherit: str | None = None, condition: ~graphxplore.DataMapping.Conditionals.logic_operators.LogicOperator = <graphxplore.DataMapping.Conditionals.logic_operators.AlwaysTrueOperator object>)[source]
Bases:
objectEach target table x must have some relationship to one or multiple source tables. Using this relationship, single units of source data are formed. Variable mappings are applied to these units to form a single output row of x. Variables of the related source tables and their foreign tables (and their foreign tables, and so on…) will have a single value (might be a missing value) in this unit of source data. These variables are called singular variables. Variables of inverted foreign tables (a is an inverted foreign table of b, if b is a foreign table of a), might have multiple values in a unit of source data (e.g. timeseries, or multiple blood measurements for a single patient). They are called aggregate variables. For a table mapping you have the following options:
x has a one-to-one relationship with a single source table y. Primary key values are copied from y to x. A unit of source data is formed by a single row of y and rows from foreign tables and/or inverted foreign tables of y. (Most common option)
x has a one-to-many relationship with multiple source tables. The data of the source tables (and foreign tables or inverted foreign tables) will be combined to form a single unit of source data. This can be done in two ways:
The data of the source tables can be merged. Here, data rows from different source tables are combined to a single unit, if the row’s primary key values are identical. If a primary key value of a source table has no analog in another source table, its row is taken independently.
The data of the source tables can be concatenated. Here, the source tables are processed independently one after the other to form units of source data together with their foreign tables or inverted foreign tables. The primary key values of x will be 0-indexed integers.
If x is a foreign table of another target table x’, the relationship to source tables can be inherited from x’. If x’ itself inherits the relationship of another target table x’’, this inheritance is propagated to x. The primary key values of x will be 0-indexed integers and all its rows will be de-duplicated. The primary key values of x will be used as foreign key values in x’.
Optionally, you can define a condition to filter out units of source data that should not be considered in the mapping. If the condition evaluates to
Falsefor a unit of source data, it is fully removed from the transformation process of this target table. By default, theAlwaysTrueOperatoris used and all source data is taken into the transformation- Parameters:
type (TableMappingType | None)
source_tables (List[str])
to_inherit (str | None)
condition (LogicOperator)
- condition: LogicOperator = <graphxplore.DataMapping.Conditionals.logic_operators.AlwaysTrueOperator object>
- static from_dict(input_dict: Dict[str, str | List[str] | None]) TableMapping[source]
- Parameters:
input_dict (Dict[str, str | List[str] | None])
- Return type:
- source_tables: List[str]
- to_inherit: str | None = None
- type: TableMappingType | None = None
- class graphxplore.DataMapping.TableMappingType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str,EnumThe type of mapping for a target table: One-to-one relation to a single source table, or one-to-many relation to multiple source tables by merging (combination of rows with same primary key value) or concatenation (processing each table independently. Lastly, the mapping relation can be inherited from an ancestor source table (inverted foreign table chain).
- Concatenate = 'Concatenate'
- Inherited = 'Inherited'
- Merge = 'Merge'
- OneToOne = 'OneToOne'
- class graphxplore.DataMapping.VariableMapping(target_table: str, target_variable: str, cases: List[MappingCase])[source]
Bases:
objectThis class contains all data required for the data mapping of one target variable.
- Parameters:
target_table (str) – The table of the target variable
target_variable (str) – The name of the target variable
cases (List[MappingCase]) – The mapping cases (input order sensitive)
- add_case(case: MappingCase) None[source]
Adds a mapping case to the mapping at the last position. The required tables and variables are added to
self.sources- Parameters:
case (MappingCase) – The mapping case to add
- Return type:
None
- static from_dict(input_dict: Dict[str, str | List[Dict[str, str]]]) VariableMapping[source]
Generates a
VariableMappingobject from a dictionary.- Parameters:
input_dict (Dict[str, str | List[Dict[str, str]]])
- Returns:
Returns the generated
VariableMappingobject- Return type: