Source code for biothings.hub.datainspect.inspector

import asyncio
import math
import random
import time
from datetime import datetime
from functools import partial

import biothings.utils.es as es
import biothings.utils.inspect as btinspect
from biothings import config
from biothings.hub import INSPECTOR_CATEGORY
from biothings.hub.databuild.backend import create_backend
from biothings.utils.common import timesofar
from biothings.utils.dataload import dict_traverse
from biothings.utils.hub_db import get_source_fullname, get_src_build, get_src_dump
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseManager
from biothings.utils.mongo import doc_feeder, id_feeder

logging = config.logger


[docs] class InspectorError(Exception): pass
# commong function used to call inspector
[docs] def inspect_data(backend_provider, ids, mode, pre_mapping, **kwargs): col = create_backend(backend_provider).target_collection cur = doc_feeder(col, step=len(ids), inbatch=False, query={"_id": {"$in": ids}}) res = btinspect.inspect_docs(cur, mode=mode, pre_mapping=pre_mapping, metadata=False, auto_convert=False, **kwargs) return res
[docs] class InspectorManager(BaseManager): def __init__(self, upload_manager, build_manager, *args, **kwargs): super(InspectorManager, self).__init__(*args, **kwargs) self.upload_manager = upload_manager self.build_manager = build_manager self.logfile = None self.setup_log()
[docs] def clean_stale_status(self): src_dump = get_src_dump() srcs = src_dump.find() for src in srcs: dirty = False jobs = src.get("inspect", {}).get("jobs", {}) for subsrc in jobs: if jobs[subsrc].get("status") == "inspecting": logging.warning("Found stale datasource '%s', marking inspect status as 'canceled'" % src["_id"]) jobs[subsrc]["status"] = "canceled" dirty = True if dirty: src_dump.replace_one({"_id": src["_id"]}, src)
[docs] def setup_log(self): """Setup and return a logger instance""" self.logger, self.logfile = get_logger("inspect")
[docs] def get_backend_provider_info(self, data_provider): data_provider_type, registerer_obj, backend_provider, ups = (None,) * 4 if data_provider[0] == "src": data_provider_type = "source" # find src_dump doc # is it a full source name (dot notation) ? fullname = get_source_fullname(data_provider[1]) if fullname: # it's a dot-notation src_name = fullname.split(".")[0] else: # no subsource, full source name is the passed name src_name = data_provider[1] fullname = src_name doc = get_src_dump().find_one({"_id": src_name}) # query by main source if not doc: raise InspectorError("Can't find document associated to '%s'" % src_name) # get an uploader instance (used to get the data if type is "uploader" # but also used to update status of the datasource via register_status() ups = self.upload_manager[fullname] # potentially using dot notation # create uploader registerer_obj = self.upload_manager.create_instance(ups[0]) registerer_obj.src_doc = doc backend_provider = data_provider else: try: data_provider_type = "build" registerer_obj = self.build_manager.get_builder(data_provider) registerer_obj.src_build = get_src_build().find_one({"_id": data_provider}) if not registerer_obj.src_build: raise InspectorError("Can't find build associated to '%s'" % data_provider) backend_provider = data_provider except Exception as e: raise InspectorError("Unable to create backend from '%s': %s" % (repr(data_provider), e)) return data_provider_type, registerer_obj, backend_provider, ups
[docs] def inspect( self, data_provider, mode="type", batch_size=10000, limit=None, sample=None, **kwargs, ): """ Inspect given data provider: - backend definition, see bt.hub.dababuild.create_backend for supported format), eg "merged_collection" or ("src","clinvar") - or callable yielding documents Mode: - "type": will inspect and report type map found in data (internal/non-standard format) - "mapping": will inspect and return a map compatible for later ElasticSearch mapping generation (see bt.utils.es.generate_es_mapping) - "stats": will inspect and report types + different counts found in data, giving a detailed overview of the volumetry of each fields and sub-fields - "jsonschema", same as "type" but result is formatted as json-schema standard - limit: when set to an integer, will inspect only x documents. - sample: combined with limit, for each document, if random.random() <= sample (float), the document is inspected. This option allows to inspect only a sample of data. """ # /!\ attention: this piece of code is critical and not easy to understand... # Depending on the source of data to inspect, this method will create an # uploader or a builder. These objects don't be behave the same while they # pass through pickle: uploader needs to be "unprepare()"ed so it can be # pickled (remove some db connection, socket), while builder must *not* be # unprepare() because it would reset the underlying target_name (the actual # target collection). Also, the way results and statuses are registered is # different for uploader and builder... # So, there are lots of "if", be careful if you want to modify that code. data_provider_type = None # where to register results (if possible to do so) registerer_obj = None # who should register result t0 = time.time() started_at = datetime.now().astimezone() self.logger.info("Inspecting data with mode %s and data_provider %s" % (repr(mode), repr(data_provider))) if callable(data_provider): raise NotImplementedError("data_provider as callable untested...") else: data_provider_type, registerer_obj, backend_provider, ups = self.get_backend_provider_info(data_provider) if data_provider_type == "source": if len(ups) > 1: # recursively call inspect(), collect and return corresponding tasks self.logger.debug("Multiple uploaders found, running inspector for each of them: %s" % ups) res = [] for up in ups: r = self.inspect( (data_provider[0], "%s" % up.name), mode=mode, batch_size=batch_size, limit=limit, sample=sample, **kwargs, ) res.append(r) return res assert len(ups) == 1, ( "More than one uploader found for '%s', " "not supported (yet), use main_source.source notation" % data_provider[1] ) got_error = None try: async def do(): await asyncio.sleep(0.0) nonlocal mode pinfo = { "category": INSPECTOR_CATEGORY, "source": "%s" % repr(data_provider), "step": "", "description": "", } # register begin of inspection (differ slightly depending on type) if data_provider_type == "source": registerer_obj.register_status("inspecting", subkey="inspect") elif data_provider_type == "build": registerer_obj.register_status("inspecting", transient=True, init=True, job={"step": "inspect"}) self.logger.info( "Running inspector on %s (type:%s,data_provider:%s)", repr(data_provider), data_provider_type, backend_provider, ) if sample is not None: self.logger.info("Sample set to %s, inspect only a subset of data", sample) if limit is None: self.logger.info("Inspecting all the documents") else: nonlocal batch_size # adjust batch_size so we inspect only "limit" docs if batch is smaller than the limit if batch_size > limit: batch_size = limit self.logger.info("Inspecting only %s documents", limit) # make it pickleable if data_provider_type == "source": # because register_obj is also used to fetch data, it has to be unprepare() for pickling registerer_obj.unprepare() else: # NOTE: do not unprepare() the builder, we'll loose the target name # (it's be randomly generated again) and we won't be able to register results pass cnt = 0 doccnt = 0 jobs = [] # normalize mode param and prepare global results if type(mode) == str: mode = [mode] converters, mode = btinspect.get_converters(mode) inspected = {} for m in mode: inspected.setdefault(m, {}) backend = create_backend(backend_provider).target_collection for ids in id_feeder(backend, batch_size=batch_size): if sample is not None: if random.random() > sample: continue cnt += 1 doccnt += batch_size if limit and doccnt > limit: break pinfo["description"] = "batch #%s" % cnt def batch_inspected(bnum, i, f): nonlocal inspected nonlocal got_error nonlocal mode try: res = f.result() for m in mode: inspected[m] = btinspect.merge_record(inspected[m], res[m], m) except Exception as e: got_error = e self.logger.error("Error while inspecting data from batch #%s: %s" % (bnum, e)) raise pre_mapping = "mapping" in mode # we want to generate intermediate mapping so we can merge # all maps later and then generate the ES mapping from there self.logger.info("Creating inspect worker for batch #%s" % cnt) job = await self.job_manager.defer_to_process( pinfo, partial(inspect_data, backend_provider, ids, mode=mode, pre_mapping=pre_mapping, **kwargs), ) job.add_done_callback(partial(batch_inspected, cnt, ids)) jobs.append(job) await asyncio.gather(*jobs) # compute metadata (they were skipped before) for m in mode: if m == "mapping": try: inspected["mapping"] = es.generate_es_mapping(inspected["mapping"]) # metadata for mapping only once generated inspected = btinspect.compute_metadata(inspected, m) except es.MappingError as e: inspected["mapping"] = {"pre-mapping": inspected["mapping"], "errors": e.args[1]} else: inspected = btinspect.compute_metadata(inspected, m) # just potential converters btinspect.run_converters(inspected, converters) def fully_inspected(res): nonlocal got_error try: res = btinspect.stringify_inspect_doc(res) _map = {"results": res} _map["data_provider"] = repr(data_provider) _map["started_at"] = started_at _map["duration"] = timesofar(t0) # when inspecting with "stats" mode, we can get huge number but mongo # can't store more than 2^64, make sure to get rid of big nums there def clean_big_nums(k, v): # TODO: same with float/double? seems mongo handles more there ? if isinstance(v, int) and v > 2**64: return k, math.nan else: return k, v dict_traverse(_map, clean_big_nums) # register begin of inspection (differ slightly depending on type) if "mapping" in mode and "errors" in res["mapping"] and "pre-mapping" in res["mapping"]: registerer_obj.register_status("failed", subkey="inspect", inspect=_map) got_error = InspectorError(res["mapping"]["errors"]) else: if data_provider_type == "source": registerer_obj.register_status("success", subkey="inspect", inspect=_map) elif data_provider_type == "build": registerer_obj.register_status( "success", job={"step": "inspect"}, build={"inspect": _map} ) except Exception as e: self.logger.exception("Error while inspecting data: %s" % e) got_error = e if data_provider_type == "source": registerer_obj.register_status("failed", subkey="inspect", err=repr(e)) elif data_provider_type == "build": registerer_obj.register_status("failed", job={"err": repr(e)}) fully_inspected(inspected) if data_provider_type is None: return if got_error: raise got_error task = asyncio.ensure_future(do()) return task except Exception as e: self.logger.error("Error while inspecting '%s': %s" % (repr(data_provider), e)) raise
[docs] def flatten(self, data_provider, mode=("type", "stats"), do_validate=True): if isinstance(mode, tuple): mode = list(mode) # may not be necessary, but previous mode default is a list, so let's be consistent elif isinstance(mode, str): mode = [mode] data_provider_type, registerer_obj, backend_provider, _ = self.get_backend_provider_info(data_provider) results = {} if data_provider_type == "source": src_sources_inspect_data = registerer_obj.src_doc.get("inspect", {}).get("jobs", {}) for src_source_name, inspect_data in src_sources_inspect_data.items(): results[src_source_name] = { _mode: btinspect.flatten_and_validate( inspect_data["inspect"]["results"].get(_mode) or {}, do_validate ) for _mode in mode } else: inspect_data = registerer_obj.src_build.get("inspect", {}).get("results", {}) results[backend_provider] = { _mode: btinspect.flatten_and_validate(inspect_data.get(_mode) or {}, do_validate) for _mode in mode } return results