Source code for biothings.hub.datatransform.datatransform_api

"""
DataTransforAPI - classes around API based key lookup.
"""
# pylint: disable=E0401, E0611
import copy

import biothings_client

from biothings.hub.datatransform.datatransform import DataTransform, DataTransformEdge, IDStruct, nested_lookup
from biothings.utils.loggers import get_logger


[docs] class DataTransformAPI(DataTransform): """ Perform key lookup or key conversion from one key type to another using an API endpoint as a data source. This class uses biothings apis to conversion from one key type to another. Base classes are used with the decorator syntax shown below:: @IDLookupMyChemInfo(input_types, output_types) def load_document(doc_lst): for d in doc_lst: yield d Lookup fields are configured in the 'lookup_fields' object, examples of which can be found in 'IDLookupMyGeneInfo' and 'IDLookupMyChemInfo'. Required Options: - input_types - 'type' - ('type', 'nested_source_field') - [('type1', 'nested.source_field1'), ('type2', 'nested.source_field2'), ...] - output_types: - 'type' - ['type1', 'type2'] Additional Options: see DataTransform class """ batch_size = 10 default_source = "_id" lookup_fields = {} def __init__(self, input_types, output_types, *args, **kwargs): """ Initialize the IDLookupAPI object. """ self._generate_return_fields() super(DataTransformAPI, self).__init__(input_types, output_types, *args, **kwargs) # default value of None for client self.client = None # Keep track of one_to_many relationships self.one_to_many_cnt = 0 self.logger, _ = get_logger("keylookup_api") def _valid_input_type(self, input_type): """ Check if the input_type is valid :param input_type: :return: """ if not isinstance(input_type, str): return False return input_type.lower() in self.lookup_fields.keys() def _valid_output_type(self, output_type): """ Check if the output_type is valid :param output_type: :return: """ if not isinstance(output_type, str): return False return output_type.lower() in self.lookup_fields.keys() def _generate_return_fields(self): """ Generate the return_fields member variable from the lookup_fields dictionary. :return: """ self.return_fields = "" for k in self.lookup_fields: for field in self._get_lookup_field(k): self.return_fields += field + "," self.logger.debug("IDLookupAPI return_fields: {}".format(self.return_fields))
[docs] def key_lookup_batch(self, batchiter): """ Look up all keys for ids given in the batch iterator (1 block) :param batchiter: 1 lock of records to look up keys for :return: """ id_lst, doc_cache = self._build_cache(batchiter) self.logger.info("key_lookup_batch num. id_lst items: {}".format(len(id_lst))) query_res = self._query_many(id_lst) qm_struct = self._parse_querymany(query_res) return self._replace_keys(qm_struct, doc_cache)
def _build_cache(self, batchiter): """ Build an id list and document cache for documents read from the batch iterator. :param batchiter: an iterator for a batch of documents. :return: """ id_lst = [] doc_cache = [] for doc in batchiter: # handle skip logic if self.skip_w_regex and self.skip_w_regex.match(doc["_id"]): pass else: for input_type in self.input_types: val = DataTransformAPI._nested_lookup(doc, input_type[1]) if val: id_lst.append('"{}"'.format(val)) # always place the document in the cache doc_cache.append(doc) return list(set(id_lst)), doc_cache def _query_many(self, id_lst): """ Call the biothings_client querymany function with a list of identifiers and output fields that will be returned. :param id_lst: list of identifiers to query :return: """ # Query MyGene.info # self.logger.debug("query_many scopes: {}".format(self.lookup_fields[self.input_type])) scopes = [] for input_type in self.input_types: for field in self._get_lookup_field(input_type[0]): scopes.append(field) client = self._get_client() return client.querymany( id_lst, scopes=scopes, fields=self.return_fields, as_generator=True, returnall=True, size=self.batch_size, ) def _parse_querymany(self, query_res): """ Parse the querymany results from the biothings_client into a structure that will later be used for document key replacement. :param query_res: querymany results :return: """ # self.logger.debug("QueryMany Structure: {}".format(query_res)) qm_struct = {} for q_out in query_res["out"]: query = q_out["query"] val = self._parse_h(q_out) if val: if query not in qm_struct.keys(): qm_struct[query] = [val] else: self.one_to_many_cnt += 1 qm_struct[query] = qm_struct[query] + [val] # self.logger.debug("parse_querymany num qm_struct keys: {}"\ # .format(len(qm_struct.keys()))) # self.logger.info("parse_querymany running one_to_many_cnt: {}"\ # .format(self.one_to_many_cnt)) # self.logger.debug("parse_querymany qm_struct: {}"\ # .format(qm_struct.keys())) return qm_struct def _parse_h(self, hit): """ Parse a single hit from the API. :param hit: :return: dictionary of keys """ for output_type in self.output_types: for doc_field in self._get_lookup_field(output_type): val = DataTransformAPI._nested_lookup(hit, doc_field) if val: return val return None def _replace_keys(self, qm_struct, doc_cache): """ Build a new list of documents to return that have their keys replaced by answers specified in the qm_structure which was built earlier. :param qm_struct: structure of keys from _parse_querymany :param doc_cache: cache of documents that will have keys replaced. :return: """ # Replace the keys and build up a new result list res_lst = [] for doc in doc_cache: new_doc = None for input_type in self.input_types: # doc[input_type[1]] must be typed to a string because # qm_struct.keys are always strings val = DataTransformAPI._nested_lookup(doc, input_type[1]) if val in qm_struct.keys(): for key in qm_struct[val]: new_doc = copy.deepcopy(doc) new_doc["_id"] = key res_lst.append(new_doc) # Break out if an input type was used. if new_doc: break if not new_doc and ( (self.skip_w_regex and self.skip_w_regex.match(doc["_id"])) or not self.skip_on_failure ): res_lst.append(doc) self.logger.info("_replace_keys: Num of documents yielded: {}".format(len(res_lst))) # Yield the results for res in res_lst: yield res def _get_lookup_field(self, field): """ Getter for lookup fields which may be either a string or a list of string fields. :param field: the name of the field to lookup :return: """ if field not in self.lookup_fields.keys(): raise KeyError(f"provided field {field} is not in self.lookup_fields") if isinstance(self.lookup_fields[field], str): return [self.lookup_fields[field]] return self.lookup_fields[field] def _get_client(self): """get biothings_client""" raise NotImplementedError("_get_client not implemented in the super class")
[docs] class DataTransformMyChemInfo(DataTransformAPI): """Single key lookup for MyChemInfo""" lookup_fields = { "unii": "unii.unii", "rxnorm": ["unii.rxcui"], "drugbank": "drugbank.drugbank_id", "chebi": "chebi.chebi_id", "chembl": "chembl.molecule_chembl_id", "pubchem": "pubchem.cid", "drugname": [ "drugbank.name", "unii.preferred_term", "chebi.chebi_name", "chembl.pref_name", ], "inchi": [ "drugbank.inchi", "chembl.inchi", "pubchem.inchi", ], "inchikey": [ "drugbank.inchi_key", "chembl.inchi_key", "pubchem.inchi_key", ], } # The order of output_types decides the priority # of the key types we used to get _id value output_types = ["inchikey", "unii", "rxnorm", "drugbank", "chebi", "chembl", "pubchem", "drugname"] def __init__(self, input_types, output_types=None, skip_on_failure=False, skip_w_regex=None): """ Initialize the class by seting up the client object. """ _output_types = output_types or self.output_types super(DataTransformMyChemInfo, self).__init__(input_types, _output_types, skip_on_failure, skip_w_regex) def _get_client(self): """ Get Client - return a client appropriate for IDLookup This method must be defined in the child class. It is an artifact of multithreading. :return: """ if not self.client: self.client = biothings_client.get_client("drug") return self.client
[docs] class BiothingsAPIEdge(DataTransformEdge): """ APIEdge - IDLookupEdge object for API calls """ # define in subclass client_name = None def __init__(self, lookup, fields, weight=1, label=None, url=None): # pylint: disable=R0913 super(BiothingsAPIEdge, self).__init__(label) self.init_state() if isinstance(lookup, str): self.scopes = [lookup] elif isinstance(lookup, list): self.scopes = lookup else: raise TypeError("scopes argument must be str or list") if isinstance(fields, str): self.fields = [fields] elif isinstance(fields, list): self.fields = fields else: raise TypeError("fields argument must be str or list") self.weight = weight self.url = url
[docs] def init_state(self): """initialize state - pickleable member variables""" self._state = {"client": None, "logger": None}
@property def client(self): """property getter for client""" if not self._state["client"]: try: self.prepare_client() except NotImplementedError: # if accessed but not ready, then just ignore and return invalid value for a client return None return self._state["client"]
[docs] def prepare_client(self): """ Load the biothings_client for the class :return: """ if not self.client_name: raise NotImplementedError("Define client_name in subclass") if self.url: self._state["client"] = biothings_client.get_client(self.client_name, url=self.url) else: self._state["client"] = biothings_client.get_client(self.client_name) self.logger.info("Registering biothings_client {}".format(self.client_name))
[docs] def edge_lookup(self, keylookup_obj, id_strct, debug=False): """ Follow an edge given a key. This method uses the data in the edge_object to find one key to another key using an api. :param edge: :param key: :return: """ # If no keys were passed return an empty idstruct_class # pylint: disable=C1801 if not len(id_strct): return keylookup_obj.idstruct_class() # query the api query_res = self._query_many(keylookup_obj, id_strct) new_id_strct = self._parse_querymany(keylookup_obj, query_res, id_strct, self.fields, debug) return new_id_strct
def _query_many(self, keylookup_obj, id_strct): """ Call the biothings_client querymany function with a list of identifiers and output fields that will be returned. :param id_lst: list of identifiers to query :return: """ if not isinstance(id_strct, IDStruct): raise TypeError("id_strct shouldb be of type IDStruct") id_lst = [] for key in id_strct.id_lst: id_lst.append('"{}"'.format(key)) return self.client.querymany( id_lst, scopes=self.scopes, fields=self.fields, as_generator=True, returnall=True, size=keylookup_obj.batch_size, verbose=False, ) def _parse_querymany(self, keylookup_obj, query_res, id_strct, fields, debug): # pylint: disable=R0913, W0613 """ Parse the querymany results from the biothings_client into a structure that will later be used for document key replacement. :param query_res: querymany results :return: """ # self.logger.debug("QueryMany Structure: {}".format(query_res)) qm_struct = IDStruct() # Keep the old debug information if debug: qm_struct.import_debug(id_strct) # pylint: disable=R1702 for q_out in query_res["out"]: query = q_out["query"] for field in fields: val = nested_lookup(q_out, field) if val: for orig_id, curr_id in id_strct: # query is always a string, so this check requires conversion if query == str(curr_id): qm_struct.add(orig_id, val) # save debug information in the option is set if debug: qm_struct.set_debug(orig_id, self.label, val) return qm_struct
[docs] class MyChemInfoEdge(BiothingsAPIEdge): """ The MyChemInfoEdge uses the MyChem.info API to convert identifiers. """ client_name = "drug" def __init__(self, lookup, field, weight=1, label=None, url=None): # pylint: disable=R0913, W0235 """ :param lookup: The field in the API to search with the input identifier. :type lookup: str :param field: The field in the API to convert to. :type field: str :param weight: Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1. :type weight: int """ super(MyChemInfoEdge, self).__init__(lookup, field, weight, label, url)
[docs] class MyGeneInfoEdge(BiothingsAPIEdge): """ The MyGeneInfoEdge uses the MyGene.info API to convert identifiers. """ client_name = "gene" def __init__(self, lookup, field, weight=1, label=None, url=None): # pylint: disable=R0913, W0235 """ :param lookup: The field in the API to search with the input identifier. :type lookup: str :param field: The field in the API to convert to. :type field: str :param weight: Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1. :type weight: int """ super(MyGeneInfoEdge, self).__init__(lookup, field, weight, label, url)
####################
[docs] class DataTransformMyGeneInfo(DataTransformAPI): """deprecated""" lookup_fields = { "ensembl": "ensembl.gene", "entrezgene": "entrezgene", "symbol": "symbol", "uniprot": "uniprot.Swiss-Prot", } def __init__( self, input_types, output_types=None, skip_on_failure=False, skip_w_regex=None, ): # pylint: disable=W0102 """ Initialize the class by seting up the client object. """ output_types = output_types or ["entrezgene"] super(DataTransformMyGeneInfo, self).__init__(input_types, output_types, skip_on_failure, skip_w_regex) def _get_client(self): """ Get Client - return a client appropriate for IDLookup This method must be defined in the child class. It is an artifact of multithreading. :return: """ if not self.client: self.client = biothings_client.get_client("gene") return self.client