import time
import collections
import os
import csv
import contextlib
from typing import Iterable, Union, Optional, Tuple, Dict
from graphxplore.MetaDataHandling import MetaData, VariableInfo, VariableType
from graphxplore.Basis import (GraphCSVWriter, GraphType, BaseUtils, GraphDatabaseWriter, GraphOutputType,
GraphDatabaseUtils, RelationalDataIODevice)
from graphxplore.Basis.BaseGraph import BinBoundInfo, BaseLabels, BaseNode, BaseEdge, BaseEdgeType, BaseNodeType
[docs]
class GraphTranslator:
"""This class transforms relational data represented by one or multiple CSVs to a graph structure given a
:class:`~graphxplore.MetaDataHandling.MetaData` object. Each unique triplet of table, variable and cell is assigned
to a node in the graph structure. Cell nodes are connected to the node for their primary key value via an edge.
This way, multiple primary keys with some identical cell value share a neighbor and are connected if they are in a
foreign key relation. As a result, efficient data lookup can be achieved while avoiding complex joins across
different tables. The generated :class:`~graphxplore.Basis.BaseGraph.BaseGraph` forms the basis for all further
data exploration/analysis methods.
:param metadata: The metadata of the relational dataset
:param missing_vals: This cell values are skipped and not added to the generated graph. Convenient for data with
missing values, defaults to common missing value definitions
:param file_encoding: The file encoding of the CSV files (ascii, utf-8,...) in chardet definition.
Is guessed if not specified, defaults to None
"""
def __init__(self, metadata: MetaData,
missing_vals : Iterable[Union[str, None]] = (None, '', 'NaN', 'Na', 'NA', 'NAN', 'nan', 'na'),
file_encoding : Optional[str] = None):
"""Constructor method
"""
self.metadata = metadata
self.missing_vals = set(missing_vals)
self.table_look_data = collections.defaultdict(dict)
self.node_uuid = 0
self.edge_uuid = 0
self.table_names = self.metadata.get_table_names()
self.primary_key_link = dict([(table, False) for table in self.table_names])
self.line_counter = 0
self.writer = None
self.file_encoding = file_encoding
def __initialize_look_up(self) -> None:
"""Initialize data structures for storage of generated nodes. Attribute nodes are deleted, after the table was
fully processed. Primary key nodes are deleted, if the primary keys are not used as foreign keys in other
tables.
"""
for table in self.table_names:
self.table_look_data[table]['stored_keys'] = collections.defaultdict(int)
self.table_look_data[table]['stored_attributes'] = collections.defaultdict(int)
self.table_look_data[table]['attributes_to_bin'] = collections.defaultdict(lambda
: collections.defaultdict(int))
for foreign_key, foreign_table in self.metadata.get_foreign_keys(table).items():
self.primary_key_link[foreign_table] = True
def __process_row(self, row: dict, table: str) -> None:
"""Reads one row from the CSV and generates a node for each column. The node is labeled as 'Key' if it is a
primary or foreign key and as 'Attribute' if it is no key. Additionally, the table of origin is added as label
to all nodes. Edges between the generated nodes are added. The generated nodes are checked for uniqueness to
conclude nodes with the same value.
:param row: The row of the CSV
:param table: The name of the CSV
"""
primary_key = self.metadata.get_primary_key(table)
table_label = self.metadata.get_label(table)
if table_label == '':
table_label = table
variable_names = self.metadata.get_variable_names(table)
foreign_key_references = self.metadata.get_foreign_keys(table)
store_keys = self.primary_key_link[table]
# generate node for data point/primary key
prim_info = self.metadata.get_variable(table, primary_key)
data_point_id = self.__generate_and_insert_node(row[primary_key], table, table_label, primary_key, prim_info,
store_keys)
# primary key column should never contain empty cells
if data_point_id == -1:
raise AttributeError('In table "' + table + '" primary key column "' + primary_key
+ '" contains empty cells')
# connect data point to attributes in relevant_columns (no foreign keys)
for variable in variable_names:
var_info = self.metadata.get_variable(table, variable)
if var_info.variable_type != VariableType.Categorical and var_info.variable_type != VariableType.Metric:
continue
attribute_id = self.__generate_and_insert_node(row[variable], table, table_label, variable, var_info, True)
# attribute cell is empty or invalid
if attribute_id == -1:
continue
self.edge_uuid += 1
self.writer.write_edge(BaseEdge(data_point_id, attribute_id, BaseEdgeType.HAS_ATTR_VAL))
# connect data point to foreign key entries
for foreign_key, foreign_table in foreign_key_references.items():
foreign_label = self.metadata.get_label(foreign_table)
if foreign_label == '':
foreign_label = foreign_table
foreign_key_info = self.metadata.get_variable(foreign_table, foreign_key)
foreign_key_id = self.__generate_and_insert_node(row[foreign_key], foreign_table, foreign_label,
foreign_key, foreign_key_info, True)
# no foreign key linked
if foreign_key_id == -1:
continue
self.edge_uuid += 1
self.writer.write_edge(BaseEdge(foreign_key_id, data_point_id, BaseEdgeType.CONNECTED_TO))
self.line_counter += 1
if self.line_counter % 1000000 == 0:
print('Processed ' + str(self.line_counter) + ' lines')
def __generate_bins(self, table: str) -> None:
"""Generates bins for all attributes assigned for binning using quintiles. Values in the first quintile are
assigned to the 'low' bin, values in the second to fourth quintile are assigned to the 'normal' bin and
values in the fifth quintile to the 'high' bin. For each bin a new node is created that has an edge to all
nodes representing the values within the bin.
:param table: The table for which attributes are binned
"""
generated_bins = {}
# derive bins
for attribute, values in self.table_look_data[table]['attributes_to_bin'].items():
var_info = self.metadata.get_variable(table, attribute)
if var_info.binning.ref_low is not None:
low = var_info.binning.ref_low
high = var_info.binning.ref_high
else:
sorted_vals = sorted(values.items())
low = float(BaseUtils.calculate_quartile_quintile_sorted_dist(sorted_vals, False, 1))
high = float(BaseUtils.calculate_quartile_quintile_sorted_dist(sorted_vals, False, 4))
generated_bins[attribute] = {'lower': low, 'upper': high, 'info' : var_info}
assigned_bins = collections.defaultdict(lambda : collections.defaultdict(list))
# assign nodes to bins
for node, node_id in self.table_look_data[table]['stored_attributes'].items():
if node.name not in generated_bins:
continue
lower = generated_bins[node.name]['lower']
upper = generated_bins[node.name]['upper']
info = generated_bins[node.name]['info']
if info.binning.exclude_from_binning is not None and node.val in info.binning.exclude_from_binning:
continue
bins = assigned_bins[node.name]
(bins['low'] if node.val < lower else bins['high'] if node.val > upper else bins['normal']).append(node_id)
table_label = self.metadata.get_label(table)
if table_label == '':
table_label = table
# generate bin nodes and edges and write to output
for attribute, bins in assigned_bins.items():
ref_lower = generated_bins[attribute]['lower']
ref_upper = generated_bins[attribute]['upper']
info = generated_bins[attribute]['info']
for bin_val, binned_nodes in bins.items():
self.node_uuid += 1
bin_id = self.node_uuid
labels = BaseLabels(membership_labels=tuple([table_label] + info.labels),
node_type=BaseNodeType.AttributeBin)
bin_name = attribute
desc = info.description
node = BaseNode(bin_id, labels, bin_name, bin_val, desc,
BinBoundInfo(ref_lower, ref_upper))
self.writer.write_node(node)
for binned_node in binned_nodes:
self.writer.write_edge(BaseEdge(source=binned_node, target=bin_id,
edge_type=BaseEdgeType.ASSIGNED_BIN))
def __generate_and_insert_node(self, value: str, table: str, table_label: str, var_name: str,
var_info : VariableInfo, insert_in_map: bool) -> int:
"""Generates a Node object from the specified data with 'Key' or 'Attribute' and the table name as labels.
The node has to two properties: the column name as 'name' and the cell value as 'value'. The generated node is
checked, if it already exists (if 'insert_in_map' is set). The existing or newly generated id of the node is
returned.
:param value: The cell value as string
:param table: The CSV file
:param table_label: The label of the CSV file
:param var_name: The name of the column
:param var_info: Variable information containing metadata for the variable
:param insert_in_map: If true the node is checked for uniqueness (not necessary for primary keys)
:return: Returns the id of the generated node
"""
if value in self.missing_vals or (var_info.artifacts is not None and value in var_info.artifacts):
if var_info.default_value is None:
return -1
value = var_info.default_value
cast_value = var_info.cast_value_to_data_type(value)
# cell value does not belong to column data type
if cast_value is None:
return -1
if var_info.variable_type == VariableType.PrimaryKey or var_info.variable_type == VariableType.ForeignKey:
node_type = BaseNodeType.Key
else:
node_type = BaseNodeType.Attribute
desc = var_info.description
labels = BaseLabels(membership_labels = tuple([table_label] + var_info.labels), node_type = node_type)
node = BaseNode(self.node_uuid + 1, labels, var_name, cast_value, desc)
should_bin = node_type == BaseNodeType.Attribute and var_info.binning is not None and var_info.binning.should_bin
if should_bin and var_info.binning.exclude_from_binning is not None:
for entry in var_info.binning.exclude_from_binning:
if entry == cast_value:
should_bin = False
break
if node_type == BaseNodeType.Attribute or insert_in_map:
if node_type == BaseNodeType.Key:
node_id = self.__insert_into_lookup(node, self.table_look_data[table]['stored_keys'])
else:
node_id = self.__insert_into_lookup(node, self.table_look_data[table]['stored_attributes'])
if should_bin:
self.table_look_data[table]['attributes_to_bin'][var_name][cast_value] += 1
else :
node_id = self.node_uuid + 1
# write node if it was not generated before
# primary keys are always unique and don't have to be added to lookup map
# unless they are used as foreign keys
if node_id == self.node_uuid + 1:
self.node_uuid += 1
self.writer.write_node(node)
return node_id
@staticmethod
def __insert_into_lookup(node: BaseNode, lookup: dict) -> int:
"""Checks if the specified node already exists in the lookup structure.
:param node: The generated node
:param lookup: The lookup structure
:return: Returns the existing or newly generated node ID
"""
node_id = lookup[node]
# node was not present before
if node_id == 0:
lookup[node] = node.node_id
return node.node_id
else:
return node_id