Source code for biothings.hub.datatransform.datatransform_mdb

DataTransform MDB module - class for performing key lookup
using conversions described in a networkx graph.
# pylint: disable=E0401, E0611
import copy

import networkx as nx
from pymongo.collation import Collation

import biothings.utils.mongo as mongo
from biothings.hub.datatransform import DataTransform, DataTransformEdge, IDStruct, nested_lookup
from biothings.utils.loggers import get_logger

[docs] class MongoDBEdge(DataTransformEdge): """ The MongoDBEdge uses data within a MongoDB collection to convert one identifier to another. The input identifier is used to search a collection. The output identifier values are read out of that collection: """ def __init__(self, collection_name, lookup, field, weight=1, label=None, check_index=True): # pylint: disable=R0913 """ :param collection_name: The name of the MongoDB collection. :type collection_name: str :param lookup: The field that will match the input identifier in the collection. :type lookup: str :param field: The output identifier field that will be read out of matching documents. :type field: str :param weight: Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1. :type weight: int """ super(MongoDBEdge, self).__init__(label) # unpickleable attributes, grouped self.init_state() self.collection_name = collection_name self.lookup = lookup self.field = field self.weight = weight if check_index: if self.collection_name in self.collection.database.collection_names(): avail_idxs = {} for idx in self.collection.list_indexes(): keys = idx["key"] # this could be a composite index, multiple keys being part of the index # we'll consider them as individually accessible, but I'm not sure how # MongoDB deals with that => TODO check for k in keys: avail_idxs[k] = True if self.lookup not in avail_idxs: raise ValueError("Field '%s' isn't indexed, this would " % self.lookup + "result in very long datatransform process") else: self.logger.warning("Collection '%s' doesn't exist, can't check indices" % self.collection_name)
[docs] def init_state(self): self._state = { "collection": None, "logger": None }
@property def collection(self): """getting for collection member variable""" if not self._state["collection"]: try: self.prepare_collection() # pylint: disable=W0703 except Exception: # if accessed but not ready, then just ignore and return invalid # value for a client return None return self._state["collection"]
[docs] def prepare_collection(self): """ Load the mongodb collection specified by collection_name. :return: """ self._state["collection"] = mongo.get_src_db()[self.collection_name]"Registering collection: {}".format(self.collection_name))
[docs] def edge_lookup(self, keylookup_obj, id_strct, debug=False): """ Follow an edge given a key. An edge represets a document and this method uses the data in the edge_object to find one key to another key using exactly one mongodb lookup. :param keylookup_obj: :param id_strct: :return: """ if not isinstance(id_strct, IDStruct): raise TypeError("edge_lookup id_struct is of the wrong type") # Build up a new_id_strct from the results res_id_strct = IDStruct() # Keep the old debug information if debug: res_id_strct.import_debug(id_strct) id_lst = id_strct.id_lst if id_lst: find_lst = self.collection_find(id_lst, self.lookup, self.field) for doc in find_lst: for orig_id in id_strct.find_right(nested_lookup(doc, self.lookup)): res_id_strct.add(orig_id, nested_lookup(doc, self.field)) if debug: res_id_strct.set_debug(orig_id, self.label, nested_lookup(doc, self.field)) return res_id_strct
[docs] def collection_find(self, id_lst, lookup, field): """ Abstract out (as one line) the call to collection.find """ return self.collection.find({lookup: {"$in": id_lst}}, {lookup: 1, field: 1})
[docs] class CIMongoDBEdge(MongoDBEdge): """ Case-insensitive MongoDBEdge """ def __init__(self, collection_name, lookup, field, weight=1, label=None): # pylint: disable=R0913, W0235 super(CIMongoDBEdge, self).__init__(collection_name, lookup, field, weight, label)
[docs] def collection_find(self, id_lst, lookup, field): """ Abstract out (as one line) the call to collection.find and use a case-insensitive collation """ return self.collection.find({lookup: {"$in": id_lst}}, {lookup: 1, field: 1})\ .collation(Collation(locale='en', strength=2))
[docs] class DataTransformMDB(DataTransform): """ Convert document identifiers from one type to another. """ # Constants batch_size = 1000 default_source = '_id' def __init__(self, graph, *args, **kwargs): """ The DataTransformNetworkX module was written as a decorator class which should be applied to the load_data function of a Biothings Uploader. The load_data function yields documents, which are then post processed by call and the 'id' key conversion is performed. :param graph: nx.DiGraph (networkx 2.1) configuration graph :param input_types: A list of input types for the form (identifier, field) where identifier matches a node and field is an optional dotstring field for where the identifier should be read from (the default is '_id'). :param output_types: A priority list of identifiers to convert to. These identifiers should match nodes in the graph. :type output_types: list(str) :param id_priority_list: A priority list of identifiers to to sort input and output types by. :type id_priority_list: list(str) :param skip_on_failure: If True, documents where identifier conversion fails will be skipped in the final document list. :type skip_on_failure: bool :param skip_w_regex: Do not perform conversion if the identifier matches the regular expression provided to this argument. By default, this option is disabled. :type skip_w_regex: bool :param skip_on_success: If True, documents where identifier conversion succeeds will be skipped in the final document list. :type skip_on_success: bool :param idstruct_class: Override an internal data structure used by the this module (advanced usage) :type idstruct_class: class :param copy_from_doc: If true then an identifier is copied from the input source document regardless as to weather it matches an edge or not. (advanced usage) :type copy_from_doc: bool """ if not isinstance(graph, nx.DiGraph): raise ValueError("key_lookup configuration error: graph must be of type nx.DiGraph") self._validate_graph(graph) self.graph = graph self.logger, _ = get_logger('datatransform') super(DataTransformMDB, self).__init__(*args, **kwargs) self._precompute_paths() def _valid_input_type(self, input_type): return input_type.lower() in self.graph.nodes() def _valid_output_type(self, output_type): return output_type.lower() in self.graph.nodes() @staticmethod def _validate_graph(graph): """ Check if the input configuration graph graph has a valid structure. :param graph: key_lookup configuration graph :return: """ # all node names should be lowercase for node in graph.nodes(): if node != node.lower(): raise ValueError("node object {} is not lowercase".format(node)) for (vert1, vert2) in graph.edges(): if 'object' not in graph.edges[vert1, vert2].keys(): raise ValueError("edge_object for ({}, {}) is missing".format(vert1, vert2)) edge_object = graph.edges[vert1, vert2]['object'] if not isinstance(edge_object, DataTransformEdge): raise ValueError("edge_object for ({}, {}) is of the wrong type". format(vert1, vert2)) def _precompute_paths(self): """ Precompute all paths from the given key_type to all target key types provided on initialization. :return: """ self.paths = {} for output_type in self.output_types: for input_type in self.input_types: paths = [p for p in nx.all_simple_paths(self.graph, input_type[0], output_type)] if not paths: try: # this will try to find self-loops. all_shortest_paths() return one element, # the self-lopped node, but we need an tuple so the "*2" # also make sure those self-loops actually are defined in the graph try: # this will raise a keyerror is edge for self-loop # p-to-p isn't defined # pylint: disable=W0104 self.graph.edges[input_type[0], output_type] paths = [p*2 for p in nx.all_shortest_paths( self.graph, input_type[0], output_type)] except KeyError: pass except nx.NetworkXNoPath: pass # Sort by path length - try the shortest paths first paths = sorted(paths, key=self._compute_path_weight) self.paths[(input_type[0], output_type)] = paths # self.logger.debug("All Pre-Computed DataTransform Paths: {}".format(self.paths))
[docs] def key_lookup_batch(self, batchiter): # pylint: disable=R0912 """ Look up all keys for ids given in the batch iterator (1 block) :param batchiter: 1 lock of records to look up keys for :return: """ doc_lst = [] for doc in batchiter: # in debug mode, skip all documents not in the debug list if self.debug: # pylint: disable=C0121 if self.debug is True or doc['_id'] in self.debug: # set debug information doc['dt_debug'] = {'orig_id': doc['_id']} doc_lst.append(doc) else: doc_lst.append(doc) hit_lst = [] miss_lst = [] for doc in doc_lst: if self.skip_w_regex and self.skip_w_regex.match(doc['_id']): yield doc else: miss_lst.append(doc) # Attempt to reach each destination in order... for output_type in self.output_types: # Starting with each input_type for input_type in self.input_types: # self.logger.debug("Attempt Lookup: from '{}' To '{}'"\ # .format(input_type[0], output_type)) if output_type == input_type[0]: # the doc itself has the correct ID, # so either there's a self-loop avail to check this ID is valid if self.graph.has_edge(output_type, output_type): (hit_lst, miss_lst) =, output_type, miss_lst) # or if copy is allowed, we get the value from the doc elif self.copy_from_doc: (hit_lst, miss_lst) = self._copy(input_type, miss_lst) else: (hit_lst, miss_lst) =, output_type, miss_lst) if not self.skip_on_success: for doc in hit_lst: yield doc # Keep the misses if we do not skip on failure if not self.skip_on_failure: for doc in miss_lst: yield doc
def _copy(self, input_type, doc_lst): """Copy ids in the case where input_type == output_type""" hit_lst = [] miss_lst = [] for doc in doc_lst: val = nested_lookup(doc, input_type[1]) if val: # ensure _id is always a str doc['_id'] = str(val) hit_lst.append(doc) # retain debug information if available (assumed dt_debug already in place) if self.debug: doc['dt_debug']['copy_from'] = (input_type[1], val) else: miss_lst.append(doc) # Keep a record of IDs copied self.histogram.update_io(input_type, input_type, len(hit_lst)) return (hit_lst, miss_lst) def _compute_path_weight(self, path): """ Helper function to compute the weight of a path :param path: :return: computed weight """ weight = 0 for path_var in map(nx.utils.pairwise, [path]): for (vert1, vert2) in path_var: edge = self.graph.edges[vert1, vert2]['object'] weight = weight + edge.weight return weight
[docs] def travel(self, input_type, target, doc_lst): # pylint: disable=R0914 """ Traverse a graph from a start key type to a target key type using precomputed paths. :param start: key type to start from :param target: key type to end at :param key: key value of type 'start' :return: """ def _build_path_strct(input_type, doc_lst): """ Build the path structure for the travel function :return: """ return self.idstruct_class(input_type[1], doc_lst) def _build_hit_miss_lsts(doc_lst, id_strct, debug): """ Return a list of documents that have had their identifiers replaced also return a list of documents that were not changed :param doc_lst: :param id_strct: :return: """ hit_lst = [] miss_lst = [] for doc in doc_lst: hit_flag = False value = nested_lookup(doc, input_type[1]) for lookup_id in id_strct.find_left(value): new_doc = copy.deepcopy(doc) # ensure _id is always a str new_doc['_id'] = str(lookup_id) # capture debug information if debug: new_doc['dt_debug']['start_field'] = input_type[1] new_doc['dt_debug']['debug'] = id_strct.get_debug(value) hit_lst.append(new_doc) hit_flag = True if not hit_flag: miss_lst.append(doc) return hit_lst, miss_lst #self.logger.debug("Travel From '{}' To '{}'".format(input_type[0], target)) # Keep a running list of all saved hits saved_hits = IDStruct() # Build the path structure, which will save results path_strct = _build_path_strct(input_type, doc_lst) for path in map(nx.utils.misc.pairwise, self.paths[(input_type[0], target)]): for (vert1, vert2) in path: edge = self.graph.edges[vert1, vert2]['object'] num_input_ids = len(path_strct) path_strct = self._edge_lookup(edge, path_strct) num_output_ids = len(path_strct) if num_input_ids: # self.logger.debug("Edge {} - {}, {} searched returned {}"\ # .format(vert1, vert2, num_input_ids, num_output_ids)) self.histogram.update_edge(vert1, vert2, num_output_ids) if path_strct: saved_hits += path_strct # reset the state to lookup misses path_strct = self.idstruct_class() for doc in doc_lst: val = nested_lookup(doc, input_type[1]) if val: if not saved_hits.left(val): path_strct.add(val, val) # Return a list of documents that have had their identifiers replaced # also return a list of documents that were not changed hit_lst, miss_lst = _build_hit_miss_lsts(doc_lst, saved_hits, self.debug) self.histogram.update_io(input_type, target, len(hit_lst)) return hit_lst, miss_lst
def _edge_lookup(self, edge_obj, id_strct): """ Follow an edge given a key. This method uses the data in the edge_object to find one key to another key using one of several types of lookup functions. """ return edge_obj.edge_lookup(self, id_strct, self.debug)