Source code for biothings.hub.datatransform.datatransform
"""
DataTransform Module
- IDStruct
- DataTransform (superclass)
"""
# pylint: disable=E0401, E0611
import re
from functools import wraps
from biothings.utils.common import is_str, iter_n
from biothings.utils.loggers import get_logger
from .histogram import Histogram
[docs]
class IDStruct(object):
"""
IDStruct - id structure for use with the DataTransform classes. The basic idea
is to provide a structure that provides a list of (original_id, current_id)
pairs.
"""
def __init__(self, field=None, doc_lst=None):
"""
Initialize the structure
:param field: field for documents to use as an initial id (optional)
:param doc_lst: list of documents to use when building an initial list (optional)
"""
self.forward = {}
self.inverse = {}
self.debug = {}
if field and doc_lst:
self._init_strct(field, doc_lst)
def _init_strct(self, field, doc_lst):
"""initialze _id_tuple_lst"""
for doc in doc_lst:
value = nested_lookup(doc, field)
if value:
self.add(value, value)
def __iter__(self):
"""iterator overload function"""
for key in self.forward:
for val in self.forward[key]:
yield key, val
[docs]
def add(self, left, right):
"""add a (original_id, current_id) pair to the list"""
if not left or not right:
return # identifiers cannot be None
if self.lookup(left, right):
return # tuple already in the list
# ensure it's hashable
if not isinstance(left, (list, tuple)):
left = [left]
if not isinstance(right, (list, tuple)):
right = [right]
# These two blocks collapse duplicates in a list of keys
if isinstance(left, list):
left = set(left)
if isinstance(right, list):
right = set(right)
for val in left:
if val not in self.forward.keys():
self.forward[val] = right
else:
self.forward[val] = self.forward[val] | right
for val in right:
if val not in self.inverse.keys():
self.inverse[val] = left
else:
self.inverse[val] = self.inverse[val] | left
def __iadd__(self, other):
"""object += additional, which combines lists"""
if not isinstance(other, IDStruct):
raise TypeError("other is not of type IDStruct")
for left, right in other:
self.add(left, right)
# retain debug information
self.transfer_debug(left, other)
return self
def __len__(self):
"""Return the number of keys (forward direction)"""
return len(self.forward.keys())
def __str__(self):
"""convert to a string, useful for debugging"""
lst = []
for key in self.forward:
for val in self.forward[key]:
lst.append((key, val))
return str(lst)
@property
def id_lst(self):
"""Build up a list of current ids"""
id_set = set()
for key in self.forward:
for val in self.forward[key]:
id_set.add(val)
return list(id_set)
[docs]
def lookup(self, left, right):
"""Find if a (left, right) pair is already in the list"""
for val in self.find_left(left):
if right == val:
return True
return False
[docs]
@staticmethod
def side(_id, where):
"""Find if an _id is a key in where"""
if isinstance(_id, list):
_id = tuple(_id)
return _id in where.keys()
[docs]
def left(self, key):
"""Determine if the id (left, _) is registered"""
return self.side(key, self.forward)
[docs]
@staticmethod
def find(where, ids):
"""Find all ids in dictionary where"""
if not ids:
return
if not isinstance(ids, (set, list, tuple)):
ids = [ids]
for key in ids:
if key in where:
for i in where[key]:
yield i
[docs]
def find_left(self, ids):
"""Find left values given a list of ids"""
return self.find(self.forward, ids)
[docs]
def right(self, key):
"""Determine if the id (_, right) is registered"""
return self.side(key, self.inverse)
[docs]
def find_right(self, ids):
"""Find the first id founding by searching the (_, right) identifiers"""
return self.find(self.inverse, ids)
[docs]
def set_debug(self, left, label, right):
"""Set debug (left, right) debug values for the structure"""
# lowercase left and right keys
if is_str(left):
left = left.lower()
if is_str(right):
right = right.lower()
# remove duplicates in the debug structure
# - duplicates in the structure itself are
# - handled elsewhere
if isinstance(right, list):
right = list(set(right))
# if there is only one element in the list, collapse
if len(right) == 1:
right = right.pop()
# capture the label if it is used
if label:
right = (label, right)
try:
self.debug[left] = self.debug[left] + [right]
except KeyError:
self.debug[left] = [left, right]
[docs]
def get_debug(self, key):
"""Get debug information for a given key"""
# lowercase key if possible
if is_str(key):
key = key.lower()
# return debug information
if isinstance(key, list):
return "type(list)"
try:
return self.debug[key]
except KeyError:
return "not-available"
[docs]
def import_debug(self, other):
"""
import debug information the entire IDStruct object
"""
for key in other.debug:
self.transfer_debug(key, other)
[docs]
def transfer_debug(self, key, other):
"""
transfer debug information for one key in the IDStruct object
"""
# ensure lower case key
if is_str(key):
key = key.lower()
# transfer debug information
self.debug[key] = other.get_debug(key)
[docs]
class DataTransform(object):
"""DataTransform class. This class is the public interface for
the DataTransform module. Much of the core logic is
in the subclass."""
# pylint: disable=R0902
# Constants
batch_size = 1000
DEFAULT_WEIGHT = 1
default_source = "_id"
debug = False
def __init__(
self,
input_types,
output_types,
id_priority_list=None,
skip_on_failure=False,
skip_w_regex=None,
skip_on_success=False,
idstruct_class=IDStruct,
copy_from_doc=False,
debug=False,
):
# pylint: disable=R0913, W0102
"""
Initialize the keylookup object and precompute paths from the
start key to all target keys.
The decorator is intended to be applied to the load_data function
of an uploader. The load_data function yields documents, which
are then post processed by call and the 'id' key conversion is
performed.
:param G: nx.DiGraph (networkx 2.1) configuration graph
:param collections: list of mongodb collection names
:param input_type: key type to start key lookup from
:param output_types: list of all output types to convert to
:param id_priority_list: A priority list of identifiers to to
sort input and output types by.
:type id_priority_list: list(str)
:param id_struct_class: IDStruct used to manager/fetch IDs from docs
:param copy_from_doc: if transform failed using the graph, try to get
value from the document itself when output_type == input_type.
No check is performed, it's a straight copy. If checks are needed
(eg. check that an ID referenced in the doc actually exists in
another collection, nodes with self-loops can be used, so
ID resolution will be forced to go through these loops to ensure
data exists)
"""
self.input_types = self._parse_input_types(input_types)
self.output_types = self._parse_output_types(output_types)
self.id_priority_list = id_priority_list or []
self.skip_on_failure = skip_on_failure
self.skip_on_success = skip_on_success
if skip_w_regex and not isinstance(skip_w_regex, str):
raise ValueError("skip_w_regex must be a string")
elif not skip_w_regex:
self.skip_w_regex = None
else:
self.skip_w_regex = re.compile(skip_w_regex)
self.idstruct_class = idstruct_class
self.copy_from_doc = copy_from_doc
self.histogram = Histogram()
# Setup logger and logging level
self.logger, _ = get_logger("datatransform")
self.debug = debug
def _parse_input_types(self, input_types):
"""
Parse the input_types argument
:return:
"""
res_input_types = []
if isinstance(input_types, str):
input_types = [input_types]
if isinstance(input_types, list):
for input_type in input_types:
if isinstance(input_type, (tuple, list)):
if not self._valid_input_type(input_type[0]):
raise ValueError("input_type '%s' is not a node in the key_lookup graph" % repr(input_type[0]))
res_input_types.append((input_type[0].lower(), input_type[1]))
elif isinstance(input_type, str):
if not self._valid_input_type(input_type.lower()):
raise ValueError("input_type '%s' is not a node in the key_lookup graph" % repr(input_type))
res_input_types.append((input_type, self.default_source))
else:
raise ValueError("Provided input_types is not of the correct type")
else:
raise ValueError("Provided input_types is not of the correct type")
return res_input_types
def _valid_input_type(self, input_type):
# pylint: disable=W0613, R0201
"""In the base class, all input_types and output_types are valid."""
return True
def _parse_output_types(self, output_types):
"""
Parse through output_types
:param output_types:
:return:
"""
if not isinstance(output_types, list):
raise ValueError("output_types should be of type list")
for output_type in output_types:
if not self._valid_output_type(output_type):
raise ValueError("output_type is not a node in the key_lookup graph")
return output_types
def _valid_output_type(self, output_type):
# pylint: disable=W0613, R0201
"""In the base class, all input_types and output_types are valid."""
return True
def __call__(self, func, debug=None):
"""
Perform the data transformation on all documents on call.
:param func: function to apply to
:param debug: Enable debugging information.
:type debug: bool
:param debug: Enable debugging information. When enabled, debugging information
will be retained in the 'dt_debug' field of each document. This parameter
can be either list of original id's to retain debugging information for or
a True, which will retain debugging information for all documents.
:type debug: bool or list(str)
:return:
"""
# additional handling for the debug option
if not debug:
self.debug = False
elif debug is True:
self.debug = True
self.logger.debug("DataTransform Debug Mode Enabled for all documents.")
elif isinstance(debug, list):
self.logger.debug("DataTransform Debug Mode: {}".format(debug))
self.debug = debug
@wraps(func)
def wrapped_f(*args):
"""This is a wrapped function which will be called by the decorator method."""
input_docs = func(*args)
output_doc_cnt = 0
# split input_docs into chunks of size self.batch_size
for batchiter in iter_n(input_docs, int(self.batch_size / len(self.input_types))):
output_docs = self.key_lookup_batch(batchiter)
for odoc in output_docs:
# print debug information if the original id is the in the debug list
if "dt_debug" in odoc:
if isinstance(self.debug, list) and odoc["dt_debug"]["orig_id"] in self.debug:
self.logger.debug("DataTransform Debug doc['dt_debug']: {}".format(odoc["dt_debug"]))
output_doc_cnt += 1
yield odoc
self.logger.info("wrapped_f Num. output_docs: {}".format(output_doc_cnt))
self.logger.info("DataTransform.histogram: {}".format(self.histogram))
return wrapped_f
[docs]
def key_lookup_batch(self, batchiter):
"""
Core method for looking up all keys in batch (iterator)
:param batchiter:
:return:
"""
pass
[docs]
def lookup_one(self, doc):
"""
KeyLookup on document. This method is called as a function call instead of a
decorator on a document iterator.
"""
# special handling for the debug option
self.debug = [doc["_id"]]
output_docs = self.key_lookup_batch([doc])
for odoc in output_docs:
# print debug information if available
if self.debug and "dt_debug" in odoc:
self.logger.debug("DataTransform Debug doc['dt_debug']: {}".format(odoc["dt_debug"]))
yield odoc
self.logger.info("DataTransform.histogram: {}".format(self.histogram))
@staticmethod
def _nested_lookup(doc, field):
"""
Performs a nested lookup of doc using a period (.) delimited
list of fields. This is a nested dictionary lookup.
:param doc: document to perform lookup on
:param field: period delimited list of fields
:return:
"""
value = doc
keys = field.split(".")
try:
for k in keys:
value = value[k]
except KeyError:
return None
return str(value)
@property
def id_priority_list(self):
"""Property method for getting id_priority_list"""
return self._id_priority_list
@id_priority_list.setter
def id_priority_list(self, value):
# pylint: disable=W0201
"""Property method for setting id_priority_list and
sorting input_types and output_types."""
self._id_priority_list = value
self.input_types = self.sort_input_by_priority_list(self.input_types)
self.output_types = self.sort_output_by_priority_list(self.output_types)
[docs]
def sort_input_by_priority_list(self, input_types):
"""
Reorder the given input_types to follow a priority list. Inputs not in the
priority list should remain in their given order at the end of the list.
"""
# construct temporary id_priority_list with extra elements at the end
id_priority_list = self._expand_priority_order([x[0] for x in input_types])
input_types = sorted(input_types, key=lambda e: self._priority_order(id_priority_list, e[0]))
return input_types
[docs]
def sort_output_by_priority_list(self, output_types):
"""
Reorder the given output_types to follow a priority list. Outputs not in the
priority list should remain in their given order at the end of the list.
"""
# construct temporary id_priority_list with extra elements at the end
id_priority_list = self._expand_priority_order(output_types)
output_types = sorted(output_types, key=lambda e: self._priority_order(id_priority_list, e))
return output_types
def _expand_priority_order(self, id_list):
"""
Expand the self.id_priority_list to also include elements in id_list that are not
in the priority list. These elements are added to the priority list in the order
that they appear in the id_list.
Example:
> self.id_priority_list = ['a', 'c']
> self._expand_priority_order(['a', 'd', 'e'])
['a', 'c', 'd', 'e']
"""
res = self.id_priority_list.copy()
for key in id_list:
if key not in self.id_priority_list:
res.append(key)
return res
@staticmethod
def _priority_order(id_priority_list, elem):
"""
Determine the priority order of an input_type following a id_priority_list.
This list, first defined in DataTransformMDB is used to reorder the input_types
so that their order matches the id types listed in id_priority_list. If an id
type is not in that list then the input_type will be placed at the end of the list
in arbitrary order.
"""
assert isinstance(id_priority_list, list)
# match id types with id priority
for index, id_elem in enumerate(id_priority_list):
if elem == id_elem:
return index
# the id type is not in id_priority_list so it will be placed last
return len(id_priority_list) + 1
[docs]
class DataTransformEdge(object):
"""
DataTransformEdge. This class contains information needed to
transform one key to another.
"""
def __init__(self, label=None):
"""
Initialize the class
:param label: A label can be used for debugging purposes.
"""
self.prepared = False
self.label = label
self.init_state()
[docs]
def edge_lookup(self, keylookup_obj, id_strct, debug=False):
# pylint: disable=E1102, R0201, W0613
"""
virtual method for edge lookup. Each edge class is
responsible for its own lookup procedures given a
keylookup_obj and an id_strct
:param keylookup_obj:
:param id_strct: - list of tuples (orig_id, current_id)
:return:
"""
yield NotImplemented("This method must be overridden by the base class.")
[docs]
def init_state(self):
"""initialize the state of pickleable objects"""
self._state = {"logger": None}
@property
def logger(self):
"""getter for the logger property"""
if not self._state["logger"]:
self.prepare()
return self._state["logger"]
@logger.setter
def logger(self, value):
"""setter for the logger variable"""
self._state["logger"] = value
[docs]
def setup_log(self):
"""setup the logger member variable"""
self.logger, _ = get_logger("datatransform")
[docs]
def prepare(self, state=None):
# pylint: disable=W0102
"""Prepare class state objects (pickleable objects)"""
state = state or {}
if self.prepared:
return
if state:
# let's be explicit, _state takes what it wants
for k in self._state:
self._state[k] = state[k]
return
self.setup_log()
[docs]
def unprepare(self):
"""
reset anything that's not picklable (so self can be pickled)
return what's been reset as a dict, so self can be restored
once pickled
"""
state = {
"logger": self._state["logger"],
}
for k in state:
self._state[k] = None
self.prepared = False
return state
[docs]
class RegExEdge(DataTransformEdge):
"""
The RegExEdge allows an identifier to be transformed using a
regular expression. POSIX regular expressions are supported.
"""
def __init__(self, from_regex, to_regex, weight=1, label=None):
"""
:param from_regex: The first parameter of the regular expression substitution.
:type from_regex: str
:param to_regex: The second parameter of the regular expression substitution.
:type to_regex: str
:param weight: Weights are used to prefer one path over another. The path
with the lowest weight is preferred. The default weight is 1.
:type weight: int
"""
super(RegExEdge, self).__init__(label)
self.from_regex = from_regex
self.to_regex = to_regex
self.weight = weight
[docs]
def edge_lookup(self, keylookup_obj, id_strct, debug=False):
"""
Transform identifiers using a regular expression substitution.
"""
res_id_strct = IDStruct()
for left, right in id_strct:
res_id_strct.add(left, re.sub(self.from_regex, self.to_regex, right))
return res_id_strct
[docs]
def nested_lookup(doc, field):
"""
Performs a nested lookup of doc using a period (.) delimited
list of fields. This is a nested dictionary lookup.
:param doc: document to perform lookup on
:param field: period delimited list of fields
:return:
"""
value = doc
keys = field.split(".")
try:
for k in keys:
if isinstance(value, (list, tuple)):
# assuming we have a list of dict with k as one of the keys
stype = set([type(e) for e in value])
if not stype:
return None
assert len(stype) == 1 and stype == {dict}, "Expecting a list of dict, found types: %s" % stype
value = [e[k] for e in value if e.get(k)]
# can't go further ?
return value
else:
value = value[k]
except KeyError:
return None
return value