Source code for graphxplore.DataMapping.data_aggregator

import collections
import math
from enum import Enum
import copy
from typing import Union, Optional, Set, Tuple, Mapping, Iterable, Dict, List
from .meta_lattice import MetaLattice
from graphxplore.MetaDataHandling import DataType, MetaData, VariableInfo
from graphxplore.Basis import BaseUtils, RelationalDataIODevice

[docs] class AggregatorType(str, Enum): """The type of variable data aggregator. """ List = 'LIST' Count = 'COUNT' Concatenate = 'CONCATENATE' Sum = 'SUM' Min = 'MIN' Max = 'MAX' Mean = 'MEAN' Std = 'STDEV' Median = 'MEDIAN' Amplitude = 'AMPLITUDE'
[docs] class DataAggregator: """This class gathers data of time series, events or other data associated with the same primary key value. To achieve this, a :class:`~graphxplore.DataMapping.MetaLattice` object is traversed in inverse order (starting from its maximal elements), and the table data is loaded and assigned to each unique primary key value. Data for variables in ``required_vars`` is aggregated with the specified :class:`AggregatorType` and :class:`~graphxplore.MetaDataHandling.DataType`. :param meta: The metadata of the whole dataset :param lattice: The lattice that will be traversed in inverse order. :param required_vars: The variables required for data aggregation per table """ def __init__(self, meta: MetaData, lattice : MetaLattice, required_vars : Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]]): """Constructor method """ self.meta = meta self.lattice = lattice self.required_vars = required_vars # start table -> primary key value -> AggregatedData self.aggregated_data = {table : {} for table in self.lattice.max_elements}
[docs] def aggregate_data(self) -> None: """Starts the data aggregation and stores the results. """ raise NotImplemented('Never call parent class')
[docs] class CSVDataAggregator(DataAggregator): """This class inherits from :class:`DataAggregator` and implements the process by reading data directly from the CSV files. :param data_source: The path to a directory with CSV files or a dictionary containing the source dataset :param meta: The metadata of the whole dataset :param lattice: The lattice that will be traversed in inverse order :param required_vars: The variables required for data aggregation per table :param file_encoding: Specifies the file encoding of all read CSV tables. Will be detected if not specified, defaults to None """ def __init__(self, data_source : Union[str, Dict[str, List[Dict[str, str]]]], meta: MetaData, lattice : MetaLattice, required_vars : Mapping[str, Mapping[str, Iterable[Tuple[AggregatorType, DataType]]]], file_encoding : Optional[str] = None): """Constructor method """ super().__init__(meta, lattice, required_vars) RelationalDataIODevice.check_data_location(data_source) # if not os.path.isdir(source_dir): # raise AttributeError(source_dir + ' was specified for data aggregation, but it is not a valid source ' # 'directory') self.data_source = data_source # start table -> start primary key value -> ancestor table -> ancestor primary key values self.downward_key_relations = {table : collections.defaultdict(lambda: collections.defaultdict(list)) for table in self.lattice.max_elements} # ancestor table -> ancestor primary key value -> start table -> start primary key value self.upward_key_relations = collections.defaultdict(lambda : collections.defaultdict(dict)) self.file_encoding = file_encoding
[docs] def aggregate_data(self) -> None: start_tables = self.lattice.max_elements visited = set(start_tables) queue = list(start_tables) while len(queue) > 0: current = queue.pop(0) self.__extract_data(current) for parent in self.lattice.parents[current]: if parent not in visited: visited.add(parent) queue.append(parent)
def __extract_data(self, table : str): with RelationalDataIODevice(self.data_source, table, file_encoding=self.file_encoding) as reader: print('Loading data for aggregation from table "' + table + '"') is_start_table = table in self.lattice.max_elements primary_key = self.meta.get_primary_key(table) # start table -> start primary key val -> variable -> value -> count data_to_aggregate = collections.defaultdict( lambda: collections.defaultdict( lambda: collections.defaultdict( lambda : collections.defaultdict(int)))) for line in reader: pk_value = line[primary_key] if is_start_table: self.aggregated_data[table][pk_value] = AggregatedData() continue self.__insert_key_relation(table, line) # add data for required variables if table not in self.required_vars: continue for required_var in self.required_vars[table].keys(): val = line[required_var] if val != '': for start_table, start_pk_value in self.upward_key_relations[table][pk_value].items(): data_to_aggregate[start_table][start_pk_value][required_var][val] += 1 # aggregate data if table in self.required_vars: for start_table in self.lattice.max_elements: for pk_value, aggregated in self.aggregated_data[start_table].items(): for required_var, aggregation_infos in self.required_vars[table].items(): for agg_type, data_type in aggregation_infos: if (start_table not in data_to_aggregate or pk_value not in data_to_aggregate[start_table] or required_var not in data_to_aggregate[start_table][pk_value]): agg_val = None else: val_dist = data_to_aggregate[start_table][pk_value][required_var] agg_val = self.__calculate_aggregated_value(val_dist, agg_type, data_type) aggregated.add_variable_aggregation(table, required_var, data_type, agg_type, agg_val) def __insert_key_relation(self, table : str, line : Dict[str, str]) -> None: pk_value = line[self.meta.get_primary_key(table)] for foreign_key, foreign_table in self.meta.get_foreign_keys(table).items(): fk_value = line[foreign_key] if fk_value != '': if foreign_table in self.lattice.max_elements: self.downward_key_relations[foreign_table][fk_value][table].append(pk_value) self.upward_key_relations[table][pk_value][foreign_table] = fk_value else: self.upward_key_relations[table][pk_value].update( self.upward_key_relations[foreign_table][fk_value]) for start_table, start_pk_value in self.upward_key_relations[table][pk_value].items(): self.downward_key_relations[start_table][start_pk_value][table].append(pk_value) @staticmethod def __calculate_aggregated_value(val_dist : Dict[str, int], agg_type : AggregatorType, data_type : DataType) -> Optional[Union[str, int, float]]: casted_dist = {VariableInfo.cast_value(val, data_type): count for val, count in val_dist.items() if VariableInfo.cast_value(val, data_type) is not None and not ((data_type == DataType.Integer or data_type == DataType.Decimal) and math.isnan(float(val)))} if agg_type == AggregatorType.List: agg_val = set(casted_dist.keys()) elif agg_type == AggregatorType.Count: agg_val = sum(casted_dist.values()) elif agg_type == AggregatorType.Concatenate: agg_val = ';'.join(sorted(casted_dist.keys())) else: if len(casted_dist) == 0: agg_val = None elif agg_type == AggregatorType.Sum: agg_val = sum((val * count for val, count in casted_dist.items())) elif agg_type == AggregatorType.Min: agg_val = min(casted_dist.keys()) elif agg_type == AggregatorType.Max: agg_val = max(casted_dist.keys()) elif agg_type == AggregatorType.Mean: agg_val = BaseUtils.calculate_mean(casted_dist) elif agg_type == AggregatorType.Median: agg_val = BaseUtils.calculate_median(casted_dist) elif agg_type == AggregatorType.Std: agg_val = BaseUtils.calculate_std(casted_dist) elif agg_type == AggregatorType.Amplitude: agg_val = max(casted_dist.keys()) - min(casted_dist.keys()) else: raise AttributeError('Aggregator type not implemented') if isinstance(agg_val, float): agg_val = round(agg_val, 5) return agg_val
[docs] class AggregatorParser: """This class contains functionality for parsing :class:`~graphxplore.DataMapping.Conditionals.AggregatorOperator` and :class:`~graphxplore.DataMapping.Conclusions.AggregateConclusion` objects from and to string. """
[docs] @staticmethod def from_string(input_str : str) -> Optional[Tuple[str, str, DataType, AggregatorType]]: """Parses a table, variable, data type and aggregator type from a string. If the string is invalid ``None`` is returned. :param input_str: The string to parse :return: Returns a tuple with the parsed data, or ``None`` if the string could not be parsed """ if not input_str.startswith('AGGREGATE '): return None # split into 10 parts literals = input_str.split(maxsplit=9) if len(literals) != 10: return None # cut comparisons if present if '' in literals[9]: literals[9] = literals[9].split()[0] aggregator = literals[1] if aggregator not in AggregatorType._value2member_map_: return None aggregator = AggregatorType(aggregator) if literals[2] != 'VARIABLE': return None var = literals[3] if literals[5] != 'TYPE': return None data_type = literals[6] if data_type not in DataType._value2member_map_: return None data_type = DataType[data_type] if literals[8] != 'TABLE': return None table = literals[9] return table, var, data_type, aggregator
[docs] @staticmethod def to_str(table : str, var : str, data_type : DataType, aggregator : AggregatorType) -> str: """Converts data of :class:`~graphxplore.DataMapping.Conditionals.AggregatorOperator` and :class:`~graphxplore.DataMapping.Conclusions.AggregateConclusion` objects to string. :param table: The table of variable to aggregate :param var: The name of the variable to aggregate :param data_type: The data type of values that should be aggregated :param aggregator: The type of aggregation :return: Returns the parsed string """ return 'AGGREGATE ' + aggregator + ' VARIABLE ' + var + ' OF TYPE ' + data_type + ' IN TABLE ' + table
[docs] @staticmethod def check_compatibility(table : str, var : str, data_type : DataType, aggregator : AggregatorType, list_aggregation_allowed : bool = True) -> None: """Checks if data type and aggregation type match. String values can only be counted or concatenated. For :class:`~graphxplore.DataMapping.Conditionals.AggregatorOperator` the ``AggregatorType.List`` type is also valid for all data types. :param table: The table of variable to aggregate :param var: The name of the variable to aggregate :param data_type: The data type of values that should be aggregated :param aggregator: The type of aggregation :param list_aggregation_allowed: If ``True`` the ``AggregatorType.List`` type is also valid :return: """ if data_type == DataType.String: valid_string_aggregators = [AggregatorType.Concatenate, AggregatorType.Count] if list_aggregation_allowed: valid_string_aggregators.append(AggregatorType.List) if aggregator not in valid_string_aggregators: raise AttributeError('The aggregator type "' + aggregator + '" is invalid for string value aggregation of variable "' + var + '" of table "' + table + '". Possible aggregator types are: "' + '", "'.join(valid_string_aggregators) + '"')
[docs] @staticmethod def get_aggregated_data_type(aggregator : AggregatorType) -> Optional[DataType]: """Returns the data type of the aggregation (not the type of cell values that should be aggregated). :param aggregator: The type of aggregation :return: Returns the data, or ``None`` if the type is ``AggregatorType.List`` (is a list, has to basic data type) """ # list is not a basic data type if aggregator == AggregatorType.List: return None if aggregator == AggregatorType.Count: return DataType.Integer if aggregator == AggregatorType.Concatenate: return DataType.String if aggregator in [AggregatorType.Min, AggregatorType.Max, AggregatorType.Mean, AggregatorType.Sum, AggregatorType.Std, AggregatorType.Median, AggregatorType.Amplitude]: return DataType.Decimal raise NotImplemented('Aggregator type not implemented')
[docs] class AggregatedData: """This class stores all aggregated data (of other variables) for a fixed primary key value. """ def __init__(self): """Constructor method """ # table -> aggregated variable -> (aggregated data type, aggregation type) -> aggregated value self.aggregated_data = collections.defaultdict(lambda : collections.defaultdict(dict))
[docs] def get_variable_aggregation(self, table : str, variable : str, data_type : DataType, agg_type : AggregatorType) -> Optional[Union[str, int, float, Set[str]]]: """Returns data aggregation value for a specific variable, data type and aggregation type. :param table: The table of the aggregated variable :param variable: The name of the aggregated variable :param data_type: The data type of values that were aggregated :param agg_type: The type of aggregation :return: Returns the aggregated value or ``None`` if no data was aggregated """ if table not in self.aggregated_data: raise AttributeError('Table "' + table + '" not found in aggregated source data') if variable not in self.aggregated_data[table]: raise AttributeError('Variable "' + variable + '" for table "' + table + '" not found in aggregated source data') pair = (data_type, agg_type) if pair not in self.aggregated_data[table][variable]: raise AttributeError('Aggregated data of type "' + agg_type + '" for values of data type "' + data_type + '" of variable "' + variable + '" in table "' + table + '" does not exist in aggregated source data') return self.aggregated_data[table][variable][pair]
[docs] def add_variable_aggregation(self, table : str, variable : str, data_type : DataType, agg_type : AggregatorType, value : Optional[Union[str, int, float, Set[str]]]) -> None: """Adds a data aggregation value for a specific variable, data type and aggregation type for this specific primary key value. :param table: The table of the aggregated variable :param variable: The name of the aggregated variable :param data_type: The data type of values that were aggregated :param agg_type: The type of aggregation :param value: The aggregated value or ``None`` if no data was aggregated """ pair = (data_type, agg_type) if (table in self.aggregated_data and variable in self.aggregated_data[table] and pair in self.aggregated_data[table][variable]): raise AttributeError('Aggregated data already exists for table "' + table + '", variable "' + variable + '", data type ' + data_type + ' and aggregator type ' + agg_type) self.aggregated_data[table][variable][pair] = value
[docs] def exists(self, table : str, variable : str, data_type : DataType, agg_type : AggregatorType) -> bool: """Checks if some value or ``None`` exists for this table, variable, data type and aggregation type in the data structure. :param table: The table of the aggregated variable :param variable: The name of the aggregated variable :param data_type: The data type of values that were aggregated :param agg_type: The type of aggregation :return: Returns ``True`` if some value or ``None`` exists in the data structure for the specified parameters """ return (table in self.aggregated_data and variable in self.aggregated_data[table] and (data_type, agg_type) in self.aggregated_data[table][variable])
[docs] def merge(self, other : 'AggregatedData') -> 'AggregatedData': """Merges two data structures. Raises an exception if different aggregation values were assigned to the same configuration of table, variable, data type and aggregation type. :param other: The other data structure that should be merged with this one :return: Returns a new merged data structure """ result = AggregatedData() result.aggregated_data = copy.deepcopy(self.aggregated_data) for table, table_data in other.aggregated_data.items(): if table not in result.aggregated_data: result.aggregated_data[table] = copy.deepcopy(table_data) continue for var, aggregations in table_data.items(): if var not in result.aggregated_data[table]: result.aggregated_data[table][var] = copy.deepcopy(aggregations) continue for (data_type, agg_type), agg_val in aggregations.items(): if (data_type, agg_type) not in result.aggregated_data[table][var]: result.aggregated_data[table][var][(data_type, agg_type)] = copy.deepcopy(agg_val) else: this_val = result.aggregated_data[table][var][(data_type, agg_type)] if this_val is None: result.aggregated_data[table][var][(data_type, agg_type)] = copy.deepcopy(agg_val) elif agg_val is not None and this_val != agg_val: raise AttributeError('Cannot merge aggregated data objects, because aggregated data ' 'for variable "' + var + '" of table "' + table + '", data type ' + data_type + ' and aggregation type ' + agg_type + ' is contained in both objects and values differ') return result