Source code for biothings.utils.inspect

"""
This module contains util functions may be shared by both BioThings data-hub and web components.
In general, do not include utils depending on any third-party modules.
Note: unittests available in biothings.tests.hub
"""
import copy
import enum
import logging
import math
import random
import re
import statistics
import time
from collections.abc import Iterable
from dataclasses import dataclass, field
from datetime import datetime
from pprint import pformat
from typing import Any, Dict, List

import bson

from biothings.utils.common import inf, is_scalar, is_str, nan, splitstr, timesofar
from biothings.utils.dataload import dict_walk
from biothings.utils.docs import flatten_doc


[docs] class BaseMode(object): # dict storing the actual specific values the mode deals with template = {} # key under which values are stored for this mode key = None
[docs] def report(self, struct, drep, orig_struct=None): """ Given a data structure "struct" being inspected, report (fill) "drep" dictionary with useful values for this mode, under drep[self.key] key. Sometimes "struct" is already converted to its analytical value at this point (inspect may count number of dict and would force to pass struct as "1", instead of the whole dict, where number of keys could be then be reported), "orig_struct" is that case contains the original structure that was to be reported, whatever the pre-conversion step did. """ raise NotImplementedError("Implement in sub-class")
[docs] def merge(self, target, tomerge): """ Merge two different maps together (from tomerge into target) """ raise NotImplementedError("Implement in sub-class")
[docs] def post(self, mapt, mode, clean): pass
[docs] class StatsMode(BaseMode): template = {"_stats": {"_min": math.inf, "_max": -math.inf, "_count": 0, "_none": 0}} key = "_stats"
[docs] def sumiflist(self, val): # if type(val) == list: # TODO: remove this line if isinstance(val, list): return sum(val) else: return val
[docs] def maxminiflist(self, val, func): # if type(val) == list: # TODO: remove this line if isinstance(val, list): return func(val) else: return val
[docs] def flatten_stats(self, stats): # after merge_struct, stats can be merged together as list (merge_struct # is only about data structures). Re-adjust here considering there could lists # that need to be sum'ed and min/max to be dealt with stats["_count"] = self.sumiflist(stats["_count"]) stats["_max"] = self.maxminiflist(stats["_max"], max) stats["_min"] = self.maxminiflist(stats["_min"], min) return stats
[docs] def report(self, struct, drep, orig_struct=None): # if is_str(struct) or type(struct) in [dict, list]: # TODO: remove this line if is_str(struct) or isinstance(struct, (dict, list)): val = len(struct) else: val = struct drep[self.key]["_count"] += 1 if val is None: drep[self.key]["_none"] += 1 else: if val < drep[self.key]["_min"]: drep[self.key]["_min"] = val if val > drep[self.key]["_max"]: drep[self.key]["_max"] = val
[docs] def merge(self, target_stats, tomerge_stats): target_stats = self.flatten_stats(target_stats) tomerge_stats = self.flatten_stats(tomerge_stats) # sum the counts and the sums target_stats["_count"] = target_stats["_count"] + tomerge_stats["_count"] # adjust min and max if tomerge_stats["_max"] > target_stats["_max"]: target_stats["_max"] = tomerge_stats["_max"] if tomerge_stats["_min"] < target_stats["_min"]: target_stats["_min"] = tomerge_stats["_min"]
[docs] class DeepStatsMode(StatsMode): template = {"_stats": {"_min": math.inf, "_max": -math.inf, "_count": 0, "__vals": []}} key = "_stats"
[docs] def merge(self, target_stats, tomerge_stats): super(DeepStatsMode, self).merge(target_stats, tomerge_stats) # extend values target_stats.get("__vals", []).extend(tomerge_stats.get("__vals", []))
[docs] def report(self, val, drep, orig_struct=None): super(DeepStatsMode, self).report(val, drep, orig_struct) # keep track of vals for now, stats are computed at the end drep[self.key]["__vals"].append(val)
[docs] def post(self, mapt, mode, clean): if isinstance(mapt, dict): for k in list(mapt.keys()): if is_str(k) and k.startswith("__"): if k == "__vals" and mode == "deepstats": if len(mapt["__vals"]) > 1: mapt["_stdev"] = statistics.stdev(mapt["__vals"]) mapt["_median"] = statistics.median(mapt["__vals"]) mapt["_mean"] = statistics.mean(mapt["__vals"]) if clean: mapt.pop(k) else: self.post(mapt[k], mode, clean) elif isinstance(mapt, list): for e in mapt: self.post(e, mode, clean)
[docs] class RegexMode(BaseMode): # list of {"re":"...","info":...}, if regex matches, then content # in "info" is used in report matchers = [] def __init__(self): # pre-compile patterns for d in self.matchers: d["_pat"] = re.compile(d["re"]) assert self.__class__.key, "Define class attribute 'key' in sub-class" self.__class__.template = {self.__class__.key: []}
[docs] def merge(self, target, tomerge): # structure are lists (see template), just extend avoiding duplicated for e in tomerge: if e not in target: target.append(e)
[docs] def report(self, val, drep, orig_struct=None): if orig_struct is not None: v = orig_struct else: v = val if is_scalar(v): sval = str(v) for dreg in self.matchers: if dreg["_pat"].match(sval): for oneinfo in dreg["info"]: if oneinfo not in drep.get(self.key, []): drep.setdefault(self.key, []).append(oneinfo)
[docs] class IdentifiersMode(RegexMode): key = "_ident" # set this to a list of dict coming from http://identifiers.org/rest/collections ids = None matchers = None def __init__(self): if self.__class__.matchers is None: self.__class__.matchers = [] res = {} # not initialized for ident in self.__class__.ids: res.setdefault(ident["pattern"], []).append(ident) for pat, info in res.items(): self.__class__.matchers.append({"re": pat, "info": info}) super().__init__()
############################################################################ MODES_MAP = {"stats": StatsMode, "deepstats": DeepStatsMode, "identifiers": IdentifiersMode}
[docs] def get_mode_layer(mode): try: k = MODES_MAP[mode] return k() # instance is what's used except KeyError: return None
[docs] def merge_record(target, tomerge, mode): mode_inst = get_mode_layer(mode) for k in tomerge: if k in target: if mode_inst and mode_inst.key == k: tgt = target[mode_inst.key] tom = tomerge[mode_inst.key] mode_inst.merge(tgt, tom) continue if not isinstance(tomerge[k], Iterable): continue for typ in tomerge[k]: # if not an actual type we need to merge further to reach them if mode_inst is None and (type(typ) != type or typ == list): target[k].setdefault(typ, {}) target[k][typ] = merge_record(target[k][typ], tomerge[k][typ], mode) else: if mode == "type": # we can safely update and possibly overwrite # target with tomerge's values and in mode "type" # there's no actual information for scalar fields # (eg a string field will be like: {"myfield" : {str:{}}} target[k].update(tomerge[k]) elif mode == "mapping": # keep track on splitable (precedence: splitable > non-splitable) # so don't merge if target has a "split" and tomerge has not, # as we would loose that information if splitstr is typ: target.pop(k) target[k] = tomerge[k] elif mode_inst: if typ in target[k]: # same key, same type, need to merge if mode_inst.key not in tomerge[k][typ]: # we try to merge record at a too higher level, need to merge deeper target[k] = merge_record(target[k], tomerge[k], mode) continue tgt = target[k][typ][mode_inst.key] tom = tomerge[k][typ][mode_inst.key] mode_inst.merge(tgt, tom) else: target[k].setdefault(typ, {}).update(tomerge[k][typ]) else: raise ValueError("Unknown mode '%s'" % mode) else: # key doesn't exist, create key if mode == "type": target.setdefault(k, {}).update(tomerge[k]) else: if mode_inst: # running special mode, we just set the keys in target target.setdefault(k, tomerge[k]) else: target.setdefault(k, {}).update(tomerge[k]) # if we already have splitstr and we want to merge str, skip it # as splitstr > str if str in target and splitstr in target: target.pop(str) return target
[docs] def inspect(struct, key=None, mapt=None, mode="type", level=0, logger=logging): """ Explore struct and report types contained in it. Args: struct: is the data structure to explore mapt: if not None, will complete that type map with passed struct. This is useful when iterating over a dataset of similar data, trying to find a good type summary contained in that dataset. level: is for internal purposes, mostly debugging mode: see inspect_docs() documentation """ mode_inst = get_mode_layer(mode) # init recording structure if none were passed if mapt is None: mapt = {} # if type(struct) == dict: # TODO: remove this line if isinstance(struct, dict): # was this struct already explored before ? was it a list for that previous doc ? # then we have to pretend here it's also a list even if not, because we want to # report the list structure for k in struct: if mapt and list in mapt: # and key == k: already_explored_as_list = True else: already_explored_as_list = False # noqa F841 if False: # already_explored_as_list: # TODO: check this mapt[list].setdefault(k, {}) typ = inspect(struct[k], key=k, mapt=mapt[list][k], mode=mode, level=level + 1) mapt[list].update({k: typ}) else: mapt.setdefault(k, {}) typ = inspect(struct[k], key=k, mapt=mapt[k], mode=mode, level=level + 1) if mode_inst: mapt.setdefault(mode_inst.key, copy.deepcopy(mode_inst.template[mode_inst.key])) mode_inst.report(1, mapt, struct) elif type(struct) == list: mapl = {} for e in struct: typ = inspect(e, key=key, mapt=mapl, mode=mode, level=level + 1) mapl.update(typ) if mode_inst: # here we just report that one document had a list mapl.update(copy.deepcopy(mode_inst.template)) mode_inst.report(struct, mapl) # if mapt exist, it means it's been explored previously but not as a list, # instead of mixing dict and list types, we want to normalize so we merge the previous # struct into that current list if mapt and list in mapt: mapt[list] = merge_record(mapt[list], mapl, mode) else: mapt.setdefault(list, {}) mapt[list].update(mapl) # elif is_scalar(struct) or type(struct) == datetime: # TODO: remove this line elif is_scalar(struct) or isinstance(struct, datetime): typ = type(struct) if mode == "type": mapt[typ] = {} elif mode == "mapping": # some type precedence processing... # splittable string ? if is_str(struct) and len(re.split(" +", struct.strip())) > 1: mapt[splitstr] = {} elif typ == bson.int64.Int64: mapt[int] = {} # we know struct is a scalar. NaN and Inf can't be indexed on ES, # need to catch those elif isinstance(struct, float) and math.isnan(struct): mapt[nan] = {} elif isinstance(struct, float) and math.isinf(struct): mapt[inf] = {} else: mapt[typ] = {} # splitstr > str if str in mapt and splitstr in mapt: mapt.pop(str) # float > int # TODO: could this be moved to es.generate_es_mapping ? if int in mapt and float in mapt: mapt.pop(int) else: mapt.setdefault(typ, copy.deepcopy(mode_inst.template)) mode_inst.report(struct, mapt[typ]) else: raise TypeError("Can't analyze type %s (data was: %s)" % (type(struct), struct)) return mapt
[docs] def merge_scalar_list(mapt, mode): # TODO: this looks "strangely" to merge_record... refactoring needed ? # if a list is found and other keys at same level are found in that # list, then we need to merge. Ex: ...,{"bla":1},["bla":2],... mode_inst = get_mode_layer(mode) if "stats" in mode: raise NotImplementedError("merging with stats is not supported (yet)") if is_scalar(mapt): return if list in mapt.keys(): other_keys = [k for k in mapt if k != list] for e in other_keys: if e in mapt[list]: tomerge = mapt.pop(e) if mode_inst: for typ in tomerge: if not type(typ) == type: continue if typ not in mapt[list][e]: mapt[list][e][typ] = tomerge[typ] # Note: don't update [list]["_stats" (or other modes' key)], we keep the original stats # that is, what's actually been inspected on the list, originally # (and we can't really update those stats as scalar stats aren't relevant # to a list context elif typ == mode_inst.key: mode_inst.merge(mapt[list][e][mode_inst.key], tomerge[mode_inst.key]) else: mode_inst.merge(mapt[list][e][typ][mode_inst.key], tomerge[typ][mode_inst.key]) elif mode == "mapping": for typ in tomerge: if typ is str and splitstr in mapt[list][e]: # precedence splitstr > str, we keep splitstr and ignore str continue if typ not in mapt[list][e]: # that field exist in the [list] but with a different type # just merge the typ mapt[list][e].update(tomerge) # precedence splitstr > str if splitstr is typ: mapt[list][e].pop(str, None) mapt[list][e].update(tomerge) else: # assuming what's in [list] is enough, we just popped the value # from mapt, that's enough pass # explore further merge_scalar_list(mapt[list], mode) elif isinstance(mapt, dict): for k in mapt: merge_scalar_list(mapt[k], mode) elif isinstance(mapt, list): for e in mapt: merge_scalar_list(e, mode)
[docs] def get_converters(modes, logger=logging): converters = [] # should we actually run another mode and then convert the results ? if "jsonschema" in modes: from biothings.utils.jsonschema import generate_json_schema # first get schema with mode="type", then convert the results # note "type" can't also be specified as jsonschema will replace # the results in _map["type"] key converters.append( { "output_mode": "jsonschema", "input_mode": "type", "func": generate_json_schema, "delete_input_mode": "type" not in modes, } ) modes.remove("jsonschema") if "type" not in modes: modes.append("type") return converters, modes
[docs] def run_converters(_map, converters, logger=logging): # need to convert some results ? for converter in converters: logger.info( "Finalizing result for mode '%s' using converter %s", converter["output_mode"], converter, ) converted = converter["func"](_map[converter["input_mode"]]) _map[converter["output_mode"]] = converted if converter["delete_input_mode"]: _map.pop(converter["input_mode"])
[docs] def inspect_docs( docs, mode="type", clean=True, merge=False, logger=logging, pre_mapping=False, limit=None, sample=None, metadata=True, auto_convert=True, ): """Inspect docs and return a summary of its structure: Args: mode: possible values are: - "type": (default) explore documents and report strict data structure - "mapping": same as type but also perform test on data so guess best mapping (eg. check if a string is splitable, etc...). Implies merge=True - "stats": explore documents and compute basic stats (count,min,max,sum) - "deepstats": same as stats but record values and also compute mean,stdev,median (memory intensive...) - "jsonschema", same as "type" but returned a json-schema formatted result `mode` can also be a list of modes, eg. ["type","mapping"]. There's little overhead computing multiple types as most time is spent on actually getting the data. clean: don't delete recorded vqlues or temporary results merge: merge scalar into list when both exist (eg. {"val":..} and [{"val":...}] limit: can limit the inspection to the x first docs (None = no limit, inspects all) sample: in combination with limit, randomly extract a sample of 'limit' docs (so not necessarily the x first ones defined by limit). If random.random() is greater than sample, doc is inspected, otherwise it's skipped metadata: compute metadata on the result auto_convert: run converters automatically (converters are used to convert one mode's output to another mode's output, eg. type to jsonschema) """ # if type(mode) == str: # TODO: remove this line if isinstance(mode, str): modes = [mode] else: modes = mode if auto_convert: converters, modes = get_converters(modes, logger=logger) _map = {} for m in modes: _map[m] = {} cnt = 0 errors = set() t0 = time.time() innert0 = time.time() if sample is not None: assert limit, "Parameter 'sample' requires 'limit' to be defined" assert sample != 1, "Sample value 1 not allowed (no documents would be inspected)" if limit: limit = int(limit) logger.debug("Limiting inspection to the %s first documents", limit) for doc in docs: if sample is not None: if random.random() <= sample: continue for m in modes: try: inspect(doc, mapt=_map[m], mode=m) except Exception as e: logging.exception( "Can't inspect document (_id: %s) because: %s\ndoc: %s", doc.get("_id"), e, pformat("dpc"), ) errors.add(str(e)) cnt += 1 if cnt % 10000 == 0: logger.info("%d documents processed [%s]", cnt, timesofar(innert0)) innert0 = time.time() if limit and cnt > limit: logger.debug("done") break logger.info("Done [%s]", timesofar(t0)) logger.info("Post-processing") # post-process, specific for each mode for m in modes: mode_inst = get_mode_layer(m) if mode_inst: mode_inst.post(_map[m], m, clean) if auto_convert: run_converters(_map, converters, logger=logger) merge = "mapping" in modes and True or merge if merge: merge_scalar_list(_map["mapping"], "mapping") if "mapping" in modes and pre_mapping is False: # directly generate ES mapping import biothings.utils.es as es try: _map["mapping"] = es.generate_es_mapping(_map["mapping"]) if metadata: # compute some extra metadata _map = compute_metadata(_map, "mapping") except es.MappingError as e: prem = {"pre-mapping": _map["mapping"], "errors": e.args[1]} _map["mapping"] = prem elif errors: _map["errors"] = errors return _map
[docs] def compute_metadata(mapt, mode): if mode == "mapping": flat = flatten_doc(mapt["mapping"]) # total fields: ES6 requires to overcome the default 1000 limit if needed mapt["__metadata__"] = {"total_fields": len(flat)} return mapt
[docs] def typify_inspect_doc(dmap): """ dmap is an inspect which was converted to be stored in a database, namely actual python types were stringify to be storabled. This function does the oposite and restore back python types within the inspect doc """ def typify(val): if type(val) != type and val.startswith("__type__:"): typ = val.replace("__type__:", "") # special cases if typ == "NoneType": return None elif typ == "Int64": # bson's Int64 return bson.int64.Int64 else: return eval(val.replace("__type__:", "")) else: return val return dict_walk(dmap, typify)
[docs] def stringify_inspect_doc(dmap): def stringify(val): if type(val) == type: return "__type__:%s" % val.__name__ # prevent having dots in the field (not storable in mongo) else: return str(val) return dict_walk(dmap, stringify)
[docs] @dataclass class FieldInspection: field_name: str field_type: str stats: dict = None warnings: list = field(default_factory=list)
[docs] @dataclass class FieldInspectValidation: warnings: set() = field(default_factory=set) types: set = field(default_factory=set) has_multiple_types: bool = False
[docs] def flatten_inspection_data( # data: dict[str, any], # This only works for Python 3.9+ data: Dict[str, Any], current_deep: int = 0, parent_name: str = None, parent_type: str = None, ) -> List[FieldInspection]: # for py3.9+, we can use list[FieldInspection] directly without importing List """This function will convert the multiple depth nested inspection data into a flatten list Nested key will be appended with the parent key and seperate with a dot. """ ROOT_FIELD = "__root__" STATS_FIELD = "_stats" DICT_TYPE = "dict" LIST_TYPE = "list" TYPE_PREFIX = "__type__:" field_inspections = [] if current_deep == 0: field_inspections.append( FieldInspection( field_name=ROOT_FIELD, field_type=DICT_TYPE, stats=data.get(STATS_FIELD), ) ) for field_name in data: if field_name == STATS_FIELD: continue if not field_name.startswith(TYPE_PREFIX): parent_type = DICT_TYPE new_parent_name = field_name if parent_name: new_parent_name = parent_name + "." + field_name sub_field_inspections = flatten_inspection_data( data[field_name], current_deep + 1, new_parent_name, parent_type ) field_inspections += sub_field_inspections continue field_type = field_name.replace(TYPE_PREFIX, "") if field_type == LIST_TYPE: parent_type = LIST_TYPE sub_field_inspections = flatten_inspection_data( data[field_name], current_deep + 1, parent_name, parent_type ) if len(sub_field_inspections) == 1 and sub_field_inspections[0].field_name == parent_name: field_inspections.append( FieldInspection( field_name=parent_name, field_type=f"{parent_type} of {sub_field_inspections[0].field_type}", stats=data[field_name].get(STATS_FIELD), ) ) else: field_inspections.append( FieldInspection( field_name=parent_name, field_type=parent_type, stats=data[field_name].get(STATS_FIELD), ) ) field_inspections += sub_field_inspections else: field_inspections.append( FieldInspection( field_name=parent_name, field_type=field_type, stats=data[field_name].get(STATS_FIELD), ) ) return field_inspections
[docs] class InspectionValidation: """This class provide a mechanism to validate and flag any field which: - contains whitespace - contains upper cased letter or special characters (lower-cased is recommended, in some cases the upper-case field names are acceptable, so we should raise it as a warning and let user to confirm it's necessary) - when the type inspection detects more than one types (but a mixed or single value and an array of same type of values are acceptable, or the case of mixed integer and float should be acceptable too) Usage: ``` result = InspectionValidation.validate(data) ``` Adding more rules: - add new code, and message to Warning Enum - add a new staticmethod for validate new rule and named in format: `validate_{warning_code}` - add new rule to docstring. """
[docs] class Warning(enum.Enum): W001 = "field name contains whitespace." W002 = "field name contains uppercase." W003 = "field name contains special character. Only alphanumeric, dot, or underscore are valid." W004 = "field name has more than one type."
[docs] def to_dict(self): return {"code": self.name, "message": self.value}
SPACE_PATTERN = " " INVALID_CHARACTERS_PATTERN = r"[^a-zA-Z0-9_.]" NUMERIC_FIELDS = ["int", "float"]
[docs] @staticmethod def validate(data: List[FieldInspection]) -> Dict[str, FieldInspectValidation]: field_validations: Dict[str, FieldInspectValidation] = {} for field_inspection in data: field_name = field_inspection.field_name type = field_inspection.field_type if field_name not in field_validations: field_validations[field_name] = FieldInspectValidation() field_validations[field_name].types.add(type) for inspection_warning in InspectionValidation.Warning: if inspection_warning in field_validations[field_name].warnings: continue validate_method = getattr(InspectionValidation, f"validate_{inspection_warning.name}", None) if not validate_method: continue if validate_method(field_inspection, field_validations[field_name]): continue field_validations[field_name].warnings.add(inspection_warning) return field_validations
[docs] @staticmethod def validate_W001( field_inspection: FieldInspection, field_validation: FieldInspectValidation, ) -> bool: if re.search(InspectionValidation.SPACE_PATTERN, field_inspection.field_name): return False return True
[docs] @staticmethod def validate_W002( field_inspection: FieldInspection, field_validation: FieldInspectValidation, ) -> bool: return field_inspection.field_name == field_inspection.field_name.lower()
[docs] @staticmethod def validate_W003( field_inspection: FieldInspection, field_validation: FieldInspectValidation, ) -> bool: return not re.search(InspectionValidation.INVALID_CHARACTERS_PATTERN, field_inspection.field_name)
[docs] @staticmethod def validate_W004( field_inspection: FieldInspection, field_validation: FieldInspectValidation, ) -> bool: is_valid = True for existing_type in field_validation.types: normalized_type = field_inspection.field_type.replace("list of ", "") normalized_existing_type = existing_type.replace("list of ", "") if normalized_type == normalized_existing_type: continue if ( normalized_type in InspectionValidation.NUMERIC_FIELDS and normalized_existing_type in InspectionValidation.NUMERIC_FIELDS ): continue is_valid = False field_validation.has_multiple_types = True break return is_valid
[docs] def merge_field_inspections_validations( field_inspections: List[FieldInspection], field_validations: Dict[str, FieldInspectValidation], ): """Adding any warnings from field_validations to field_inspections with corresponding field name""" for field_inspection in field_inspections: field_name = field_inspection.field_name field_validation = field_validations.get(field_name, {}) field_inspection.warnings = sorted( [warning.to_dict() for warning in field_validation.warnings], key=lambda warning: warning["code"], )
[docs] def simplify_inspection_data(field_inspections: List[FieldInspection]) -> List[Dict[str, Any]]: return [vars(field_inspection) for field_inspection in field_inspections]
[docs] def flatten_and_validate(data, do_validate=True): flattened_data = flatten_inspection_data(data) if do_validate: validated_data = InspectionValidation.validate(flattened_data) merge_field_inspections_validations(flattened_data, validated_data) return simplify_inspection_data(flattened_data)