import collections
from enum import Enum
from typing import Union, Iterable, Tuple, Dict, List, Optional
from graphxplore.MetaDataHandling import DataType, VariableInfo
from ..data_structure_transformer import SourceDataLine
from ..data_aggregator import AggregatorType, AggregatorParser
[docs]
class LogicOperator:
"""This is the abstract parent class of all conditionals for a :class:`~graphxplore.DataMapping.MappingCase`.
Each logic operator checks the validity of a given line of source data based on the described conditional
"""
[docs]
def get_required_data(self) -> Dict[str, List[Tuple[str, Optional[Tuple[AggregatorType, DataType]]]]]:
"""Returns the source tables and variables needed by the conditional for its validity check
:return: Returns the required data as a dictionary
"""
raise NotImplementedError('Never call the parent class')
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
"""Checks if the conditions of the logic operator are given by a line of source data.
:param source_data: The line of source data
:return: Returns ``True`` if the conditional is met, ``False`` otherwise
"""
raise NotImplementedError('Never call the parent class')
def __str__(self) -> str:
raise NotImplementedError('Never call the parent class')
[docs]
class AndOperator(LogicOperator):
"""This logic operator checks if all of its sub-operators are valid.
:param sub_operators: The sub-operators
"""
def __init__(self, sub_operators : Iterable[LogicOperator]):
self.sub_operators = list(sub_operators)
[docs]
def get_required_data(self) -> Dict[str, List[Tuple[str, Optional[Tuple[AggregatorType, DataType]]]]]:
result = collections.defaultdict(set)
for operator in self.sub_operators:
sub_result = operator.get_required_data()
for table, variables in sub_result.items():
result[table].update(variables)
return {table : list(variables) for table, variables in result.items()}
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
for operator in self.sub_operators:
if not operator.valid(source_data):
return False
return True
def __str__(self) -> str:
return '(' + ' AND '.join([str(operator) for operator in self.sub_operators]) + ')'
[docs]
class OrOperator(LogicOperator):
"""This logic operator checks if any of its sub-operators is valid.
:param sub_operators: The sub-operators
"""
def __init__(self, sub_operators: Iterable[LogicOperator]):
self.sub_operators = sub_operators
[docs]
def get_required_data(self) -> Dict[str, List[Tuple[str, Optional[Tuple[AggregatorType, DataType]]]]]:
result = collections.defaultdict(set)
for operator in self.sub_operators:
sub_result = operator.get_required_data()
for table, variables in sub_result.items():
result[table].update(variables)
return {table : list(variables) for table, variables in result.items()}
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
for operator in self.sub_operators:
if operator.valid(source_data):
return True
return False
def __str__(self) -> str:
return '(' + ' OR '.join([str(operator) for operator in self.sub_operators]) + ')'
[docs]
class NegatedOperator(LogicOperator):
"""This logic operator negates an input operator ``pos_operator``, i.e. it checks if ``pos_operator.valid()``
evaluates to ``False``
:param pos_operator: The operator to negate
"""
def __init__(self, pos_operator: LogicOperator):
self.pos_operator = pos_operator
[docs]
def get_required_data(self) -> Dict[str, List[Tuple[str, Optional[Tuple[AggregatorType, DataType]]]]]:
return self.pos_operator.get_required_data()
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
return not self.pos_operator.valid(source_data)
def __str__(self) -> str:
return '(NOT ' + str(self.pos_operator) + ')'
[docs]
class AlwaysTrueOperator(LogicOperator):
"""This logic operator always evaluates to ``True``. Consequently, its
:class:`~graphxplore.DataMapping.MappingCase` is always triggered, when checked. As a result, it can be used as a
default.
"""
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
return True
[docs]
def get_required_data(self) -> Dict[str, List[Tuple[str, Optional[Tuple[AggregatorType, DataType]]]]]:
return {}
def __str__(self) -> str:
return '(TRUE)'
[docs]
class AtomicOperator(LogicOperator):
"""This abstract class and all its children check the value of a single source variable in a line of source data.
:param table: The table of the source variable
:param variable: The name of the source variable
:param data_type: The data type the value of the source variable should have
"""
def __init__(self, table : str, variable : str, data_type : DataType):
self.table = table
self.variable = variable
self.data_type = data_type
[docs]
def get_required_data(self) -> Dict[str, List[Tuple[str, Optional[Tuple[AggregatorType, DataType]]]]]:
return {self.table: [(self.variable, ((self.aggregator, self.data_type)
if isinstance(self, AggregatorOperator) else None))]}
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
raise NotImplementedError('Never call the parent class')
[docs]
def common_prefix(self) -> str:
"""Generates the common prefix of all atomic logic operators containing the variable, its data type and its
origin table.
:return: Returns the prefix string
"""
return 'VARIABLE ' + self.variable + ' OF TYPE ' + self.data_type + ' IN TABLE ' + self.table
[docs]
def common_atomic_str(self, inner : str) -> str:
"""Wraps an inner string with the prefix and brackets common to all atomic logic operators.
:param inner: The inner part of the string which depends on the type of logic operator
:return: Returns the generated str
"""
return ('(' + self.common_prefix() + ' ' + inner
+ ')')
[docs]
@staticmethod
def from_string(input_str: str) -> Optional['AtomicOperator']:
"""Parses an input string and generates the operator if the string is valid
:param input_str: The string to parse
:return: Returns the parsed operator or ``None`` if ``input_str`` is invalid for this type of
operator
"""
raise NotImplementedError('Never call the parent class')
def __str__(self):
raise NotImplementedError('Never call the parent class')
[docs]
class InListOperator(AtomicOperator):
"""This logic operator checks if the value for a given source variable has the correct data type and is contained
in a list of values. Can be combined with :class:`NegatedOperator` to form a "black list check".
:param table: The source table of the variable
:param variable: The source variable
:param data_type: The desired data type of the variable's value
:param white_list: The list of acceptable values, gets converted so string values
"""
def __init__(self, table : str, variable : str, data_type : DataType, white_list : Iterable):
super().__init__(table, variable, data_type)
self.ordered_white_list = [str(entry) for entry in white_list]
self.white_list = set(self.ordered_white_list)
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
raw_val = source_data.get_singular_value(self.table, self.variable)
if raw_val is None:
return False
casted_val = VariableInfo.cast_value(raw_val, self.data_type)
if casted_val is None:
return False
return str(raw_val) in self.white_list
[docs]
@staticmethod
def from_string(input_str : str) -> Optional['InListOperator']:
outer = AtomicOperator.extract_common_atomic(input_str)
if outer is None:
return None
table, variable, data_type = outer
idx = input_str.find('IN [')
if idx == -1:
return None
if input_str[-1] != ']':
return None
substring = input_str[idx:-1].replace('IN [', '', 1)
white_list = substring.split(', ')
white_list = [entry.strip('"') for entry in white_list]
return InListOperator(table, variable, data_type, white_list)
def __str__(self) -> str:
inner = 'IN [' + ', '.join([entry if ' ' not in entry else '"' + entry + '"'
for entry in self.ordered_white_list]) + ']'
return self.common_atomic_str(inner)
[docs]
class StringOperatorType(str, Enum):
"""The type of logic operator on string variables
"""
Equals = 'IS'
Contains = 'CONTAINS'
[docs]
class StringOperator(AtomicOperator):
"""This logic operator performs string comparisons between the value of a single source variable and ``value``.
:param table: The source table of the variable
:param variable: The source variable
:param value: The value for comparison
:param compare: The type of string comparison
"""
def __init__(self, table : str, variable : str, value : str, compare : StringOperatorType):
super().__init__(table, variable, DataType.String)
self.value = value
self.compare = compare
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
source_val = source_data.get_singular_value(self.table, self.variable)
if source_val is None:
return False
return StringOperator.check_value(source_val, self.value, self.compare)
[docs]
@staticmethod
def check_value(val_to_check : str, to_check_against : str, compare : StringOperatorType) -> bool:
"""Checks the validity of ``val_to_check``
:param val_to_check: The value to check the validity for
:param to_check_against: The base value to check against
:param compare: The type of comparison
:return: Returns ``True`` if ``val_to_check`` is valid, else ``False``
"""
if compare == StringOperatorType.Equals:
return val_to_check == to_check_against
if compare == StringOperatorType.Contains:
return to_check_against in val_to_check
raise NotImplemented('String operator type not implemented')
[docs]
@staticmethod
def from_string(input_str: str) -> Optional['StringOperator']:
outer = AtomicOperator.extract_common_atomic(input_str)
if outer is None:
return None
table, variable, data_type = outer
if data_type != DataType.String:
return None
literals = input_str.split(maxsplit=9)
if len(literals) != 10:
return None
compare = literals[-2]
value = literals[-1]
if compare not in StringOperatorType._value2member_map_:
return None
if not value.startswith('"') or not value.endswith('"'):
return None
value = value[1:-1]
return StringOperator(table, variable, value, StringOperatorType(compare))
def __str__(self) -> str:
inner = self.compare + ' ' + '"' + self.value + '"'
return self.common_atomic_str(inner)
[docs]
class MetricOperatorType(str, Enum):
"""The type of logic operator on metric or categorical variables of numeric type
"""
Equals = '=='
Smaller = '<'
Larger = '>'
SmallerOrEqual = '<='
LargerOrEqual = '>='
[docs]
class MetricOperator(AtomicOperator):
"""This logic operator performs metric comparisons between the value of a single source variable and ``value``.
:param table: The source table of the variable
:param variable: The source variable
:param value: The value for comparison
:param compare: The type of metric comparison
"""
def __init__(self, table : str, variable : str, value : Union[int, float], data_type : DataType,
compare : MetricOperatorType):
super().__init__(table, variable, data_type)
self.value = value
self.compare = compare
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
raw_val = source_data.get_singular_value(self.table, self.variable)
if raw_val is None:
return False
casted_val = VariableInfo.cast_value(raw_val, self.data_type)
if casted_val is None:
return False
return MetricOperator.check_value(casted_val, self.value, self.compare)
[docs]
@staticmethod
def check_value(val_to_check: Union[int, float], to_check_against: Union[int, float],
compare: MetricOperatorType) -> bool:
"""Checks the validity of ``val_to_check``
:param val_to_check: The value to check the validity for
:param to_check_against: The base value to check against
:param compare: The type of comparison
:return: Returns ``True`` if ``val_to_check`` is valid, else ``False``
"""
if compare == MetricOperatorType.Equals:
return val_to_check == to_check_against
if compare == MetricOperatorType.Smaller:
return val_to_check < to_check_against
if compare == MetricOperatorType.SmallerOrEqual:
return val_to_check <= to_check_against
if compare == MetricOperatorType.Larger:
return val_to_check > to_check_against
if compare == MetricOperatorType.LargerOrEqual:
return val_to_check >= to_check_against
raise NotImplemented('Metric operator type not implemented')
[docs]
@staticmethod
def from_string(input_str: str) -> Optional['MetricOperator']:
outer = AtomicOperator.extract_common_atomic(input_str)
if outer is None:
return None
table, variable, data_type = outer
if data_type != DataType.Integer and data_type != DataType.Decimal:
return None
literals = input_str.split()
if len(literals) != 10:
return None
compare = literals[-2]
value = literals[-1]
if compare not in MetricOperatorType._value2member_map_:
return None
casted_val = VariableInfo.cast_value(value, data_type)
if casted_val is None:
return None
return MetricOperator(table, variable, casted_val, data_type, MetricOperatorType(compare))
def __str__(self) -> str:
inner = self.compare + ' ' + str(self.value)
return self.common_atomic_str(inner)
[docs]
class AggregatorOperator(AtomicOperator):
"""This logic operator checks aggregated data of a specific table, variable and data type for a primary key value.
It can be used to check time series for certain events. E.g., if at least one blood pressure measurements was above
a certain threshold or all doctor's notes mentioned a certain precondition.
:param table: The table of origin of the variable to check
:param variable: The name of variable to check
:param value: The value to check the aggregated data against
:param data_type: Only values of this type will be aggregated
:param aggregator: The type of aggregation
:param compare: The comparison between the aggregated data and ``value``. Must match ``aggregator``. E.g. minimum,
maximum or average calculations must be compared with :class:`MetricOperatorType` objects.
``AggregatorType.List`` must be used with ``StringOperatorType.Contains`` and ``AggregatorType.Concatenate``
with :class:`StringOperatorType` objects
"""
def __init__(self, table : str, variable : str, value : Union[str, int, float], data_type : DataType,
aggregator : AggregatorType, compare : Union[StringOperatorType, MetricOperatorType]):
super().__init__(table, variable, data_type)
# check if aggregator type is valid for variable data type
AggregatorParser.check_compatibility(table, variable, data_type, aggregator)
# check if aggregator type and comparison operator are compatible
aggregated_data_type = AggregatorParser.get_aggregated_data_type(aggregator)
if aggregated_data_type == DataType.Decimal or aggregated_data_type == DataType.Integer:
if not isinstance(compare, MetricOperatorType):
raise AttributeError('Aggregator type "' + aggregator
+ '" of variable "' + variable + '" in table "' + table
+ '" can only be combined with a metric operator type. Possible types are: "'
+ '", "'.join(MetricOperatorType._value2member_map_.keys()) + '"')
elif aggregator == AggregatorType.List:
if compare != StringOperatorType.Contains:
raise AttributeError('Aggregator type "' + AggregatorType.List
+ '" of variable "' + variable + '" in table "' + table
+ '" must be combined with operator type "'
+ StringOperatorType.Contains + '"')
elif aggregator == AggregatorType.Concatenate:
if not isinstance(compare, StringOperatorType):
raise AttributeError('Aggregator type "' + AggregatorType.Concatenate
+ '" of variable "' + variable + '" in table "' + table
+ '" must be combined with a string operator type. Possible types are: "'
+ '", "'.join(StringOperatorType._value2member_map_.keys()) + '"')
# check if comparison operator and value to check against are compatible
if (type(value) == str) != isinstance(compare, StringOperatorType):
raise AttributeError('Variable "' + variable + '" in table "' + table
+ '" has mismatch of operator type "' + compare
+ '" and value type to compare with ' + value.__class__.__name__)
self.value = value
self.compare = compare
self.aggregator = aggregator
[docs]
def valid(self, source_data : SourceDataLine) -> bool:
# value already casted, if nothing was aggregated, None is returned
aggregated_value = source_data.aggregated_data.get_variable_aggregation(self.table, self.variable,
self.data_type, self.aggregator)
if aggregated_value is None:
return False
if self.aggregator == AggregatorType.List:
return str(self.value) in aggregated_value
if isinstance(self.compare, StringOperatorType):
return StringOperator.check_value(aggregated_value, self.value, self.compare)
return MetricOperator.check_value(aggregated_value, self.value, self.compare)
def __str__(self) -> str:
return ('(' + AggregatorParser.to_str(self.table, self.variable, self.data_type, self.aggregator) + ' '
+ self.compare + ' '
+ (str(self.value) if isinstance(self.compare, MetricOperatorType) else '"' + self.value + '"') + ')')
[docs]
@staticmethod
def from_string(input_str: str) -> Optional['AggregatorOperator']:
aggregator_parsed = AggregatorParser.from_string(input_str)
if aggregator_parsed is None:
return None
table, variable, data_type, aggregator = aggregator_parsed
idx = input_str.find('TABLE')
if idx == -1:
return None
sub_str = input_str[idx:]
literals = sub_str.split(maxsplit=3)[2:]
if len(literals) != 2:
return None
compare = literals[0]
value = literals[1]
if compare in StringOperatorType._value2member_map_:
if not value.startswith('"') or not value.endswith('"'):
return None
value = value[1:-1]
compare = StringOperatorType(compare)
elif compare in MetricOperatorType._value2member_map_:
casted_value = VariableInfo.cast_value(value, DataType.Integer)
if casted_value is None:
casted_value = VariableInfo.cast_value(value, DataType.Decimal)
if casted_value is None:
return None
value = casted_value
compare = MetricOperatorType(compare)
else:
return None
return AggregatorOperator(table, variable, value, data_type, aggregator, compare)
[docs]
class LogicOperatorParser:
"""This class parses conditional strings and extracts the represented :class:`LogicOperator`
"""
[docs]
@staticmethod
def from_string(input_str: str) -> LogicOperator:
"""Parses a string and returns the generated :class:`LogicOperator` or raises an exception of the string is
invalid.
:param input_str: The input string
:return: Returns the generated operator
"""
return LogicOperatorParser.__from_string_rec(input_str, input_str)
@staticmethod
def __from_string_rec(current : str, input_str : str) -> LogicOperator:
"""Recursively parses parts of an input string by detecting brackets and identifying sub-strings for parsing.
:param current: The current substring
:param input_str: The full input string
:return: Returns the generated logic operator
"""
if not current.startswith('('):
raise AttributeError('Logic sub operator string must start with opening parenthesis: ' + current
+ ', total string was: ' + input_str)
if not current.endswith(')'):
raise AttributeError('Logic sub operator string must end with closing parenthesis: ' + current
+ ', total string was: ' + input_str)
if len(current) < 3:
raise AttributeError('Logic sub operator contains "()": ' + current
+ ', total string was: ' + input_str)
# outer parenthesis removed
substring = current[1:-1]
# found atomic operator
if '(' not in substring:
if ')' in substring:
raise ('Logic sub operator string has too many closing parenthesis: ' + current
+ ', total string was: ' + input_str)
if substring == 'TRUE':
return AlwaysTrueOperator()
operator = InListOperator.from_string(substring)
if operator is not None:
return operator
operator = StringOperator.from_string(substring)
if operator is not None:
return operator
operator = MetricOperator.from_string(substring)
if operator is not None:
return operator
operator = AggregatorOperator.from_string(substring)
if operator is not None:
return operator
raise AttributeError('Logic atomic operator string is invalid: ' + current
+ ', total string was: ' + input_str)
# found negation
elif substring.startswith('NOT '):
pos_substring = substring[4:]
pos_operator = LogicOperatorParser.__from_string_rec(pos_substring, input_str)
return NegatedOperator(pos_operator)
# found composition
else:
if not substring.startswith('('):
raise AttributeError('Logic sub composite operator string must start with opening parenthesis: ' + substring
+ ', total string was: ' + input_str)
# find sub operators
return LogicOperatorParser.resolve_composition(substring, input_str)
[docs]
@staticmethod
def resolve_composition(substring : str, input_str : str) -> Union[AndOperator, OrOperator]:
"""Decompose and/or composition into sub operators
:param substring: The current substring
:param input_str: The full input string
:return: Returns the and/or logic operator
"""
sub_operator_strings = []
counter = 0
starting_idx = None
composition_type = None
for idx, char in enumerate(substring):
if char == '(':
counter += 1
if counter == 1:
starting_idx = idx
elif char == ')':
counter -= 1
if counter == 0:
# found closing parenthesis
if starting_idx is None:
raise AttributeError(
'Logic sub composite operator string does not have enough opening parenthesis: ' + substring
+ ', total string was: ' + input_str)
sub_operator_strings.append(substring[starting_idx:idx + 1])
# check composition type
if idx < len(substring) - 1:
comp_start = substring[idx + 1:]
if comp_start.startswith(' AND '):
if composition_type is None:
composition_type = 'and'
elif composition_type == 'or':
raise AttributeError(
'Logic sub composite operator string cannot have "AND" and "OR" as composition: '
+ substring + ', total string was: ' + input_str)
elif comp_start.startswith(' OR '):
if composition_type is None:
composition_type = 'or'
elif composition_type == 'and':
raise AttributeError(
'Logic sub composite operator string cannot have "AND" and "OR" as composition: '
+ substring + ', total string was: ' + input_str)
else:
raise AttributeError(
'Logic sub composite operator string must have "AND" or "OR" as composition: ' + substring
+ ', total string was: ' + input_str)
sub_operators = [LogicOperatorParser.__from_string_rec(composite_sub, input_str)
for composite_sub in sub_operator_strings]
if composition_type == 'and':
return AndOperator(sub_operators)
else:
return OrOperator(sub_operators)