Source code for biothings.hub.datarelease.releasenote

# from __future__ import annotations  # for cyclic type hints
# Use forward-references for compatible with python3.6
# ref https://peps.python.org/pep-0484/#forward-references

import locale
from datetime import datetime

from dateutil.parser import parse as dtparse

from biothings.utils.dataload import update_dict_recur
from biothings.utils.hub_db import get_source_fullname
from biothings.utils.jsondiff import make as make_json_diff

locale.setlocale(locale.LC_ALL, "")


[docs] class ReleaseNoteSrcBuildReader: # TODO shall we use biothings.hub.dataindex.indexer._Build_doc here? def __init__(self, src_build_doc: dict): self.src_build_doc = src_build_doc # If `self` is a "hot" src_build doc reader, it can refer to a "cold" reader to access the cold build info. # This works like a two-node linked list. self.cold_src_build_reader: "ReleaseNoteSrcBuildReader" = None @property def build_id(self) -> str: return self.src_build_doc["_id"] @property def build_version(self) -> str: return self.src_build_doc.get("_meta", {}).get("build_version") @property def cold_collection_name(self) -> str: return self.src_build_doc.get("build_config", {}).get("cold_collection", None)
[docs] def has_cold_collection(self) -> bool: return self.cold_collection_name is not None
[docs] def attach_cold_src_build_reader(self, other: "ReleaseNoteSrcBuildReader"): """ Attach a cold src_build reader. It's required that `self` is a hot src_builder reader and `other` is cold. """ if not self.has_cold_collection(): raise ValueError( f"{self.build_id} is not a hot src_build doc, " f"thus not able to attach a cold reader of {other.build_id}." ) if other.has_cold_collection(): raise ValueError( f"{other.build_id} is a hot src_build doc, " f"thus not able to be attached to the reader of {self.build_id}" ) # src_build `_id`s and collection names are interchangeable # See https://github.com/biothings/biothings.api/blob/master/biothings/hub/databuild/builder.py#L311 if self.cold_collection_name != other.build_id: raise ValueError( f"{self.build_id} has cold collection {self.cold_collection_name}, " f"while the reader to be attached is for {other.build_id}" ) self.cold_src_build_reader = other
@property def build_stats(self) -> dict: meta = self.src_build_doc.get("_meta", {}) return meta.get("stats", {}) def _get_datasource_stats(self) -> dict: return self.src_build_doc.get("merge_stats", {}) def _get_datasource_versions(self) -> dict: meta = self.src_build_doc.get("_meta", {}) # previous version format if "src_version" in meta: return meta["src_version"] # current version format src = meta.get("src", {}) src_version = {src_name: src_info["version"] for src_name, src_info in src.items() if "version" in src_info} return src_version def _get_datasource_mapping(self) -> dict: return self.src_build_doc.get("mapping", {}) @property def datasource_stats(self) -> dict: if not self.has_cold_collection(): return self._get_datasource_stats() combined_stats = { **self._get_datasource_stats(), **self.cold_src_build_reader._get_datasource_stats(), } return combined_stats @property def datasource_versions(self) -> dict: if not self.has_cold_collection(): return self._get_datasource_versions() combined_versions = { **self._get_datasource_versions(), **self.cold_src_build_reader._get_datasource_versions(), } return combined_versions @property def datasource_mapping(self) -> dict: if not self.has_cold_collection(): return self._get_datasource_mapping() combined_mapping = { **self._get_datasource_mapping(), **self.cold_src_build_reader._get_datasource_mapping(), } return combined_mapping
[docs] class ReleaseNoteSrcBuildReaderAdapter: def __init__(self, src_build_reader: ReleaseNoteSrcBuildReader): self.src_build_reader = src_build_reader @classmethod def _get_datasource_fullname_stats(cls, datasource_stats) -> dict: """ Receive a stat dictionary of <datasource_name>:<doc_count>, fetch the full datasource name, and return a new stat dictionary. If a full datasource name is two-tier (e.g. "gnomad.gnomad_exomes_hg19", as the full datasource name of "gnomad_exomes_hg19"), the returned dictionary is formed as: { <main_datasource_name> : { <sub_datasource_name> : { "_count" : <doc_count> } } } e.g. { "gnomad" : { "gnomad_exomes_hg19" : { "_count" : 12345678 } } } If a full datasource name is one-tier: CASE 1: the full datasource name is identical to the input datasource name, e.g. "cosmic" is the full name of "cosmic"; CASE 2: the full datasource is None, e.g. when the input datasource name is "observed" or "total" in MyVariant, or "total_*" in MyGene In this case, the input datasource name is not a merge stat from a source but a custom field stat. the returned stats dictionary has the following structure: { <datasource_name> : { "_count" : <doc_count> } } e.g. { "cosmic" : { "_count" : 12345678 } } { "total" : { "_count" : 12345678 } } """ result = {} for datasource_name, doc_count in datasource_stats.items(): datasource_fullname = get_source_fullname(datasource_name) if (datasource_fullname is None) or (datasource_fullname == datasource_name): # one-tier fullname result[datasource_name] = {"_count": doc_count} else: # two-tier fullname main_name, sub_name = datasource_fullname.split(".") result.setdefault(main_name, {}) result[main_name][sub_name] = {"_count": doc_count} return result @classmethod def _expand_datasource_versions(cls, datasource_versions) -> dict: """ Receive a version dictionary of <datasource_name>:<version> (e.g. {"dbsnp" : "155"}), rewrite it to { <datasource_name> : {"_version" : <version>} } """ return dict((k, {"_version": v}) for k, v in datasource_versions.items()) @property def datasource_info(self): datasource_versions = self._expand_datasource_versions(self.src_build_reader.datasource_versions) datasource_stats = self._get_datasource_fullname_stats(self.src_build_reader.datasource_stats) datasource_info = update_dict_recur(datasource_versions, datasource_stats) return datasource_info @property def build_stats(self): # TODO this is the original logic, however I don't think it's necessary to apply get_source_fullname() to # `build_stats.keys()`. E.g. in MyVariant, the `build_stats` keys are "total", "vcf", "hg19", and "observed", # none of which has a two-tier full name. So the only effect of _get_datasource_fullname_stats() is to add a # "_count" key to each of them. return self._get_datasource_fullname_stats(self.src_build_reader.build_stats)
[docs] class ReleaseNoteSource: def __init__( self, old_src_build_reader: ReleaseNoteSrcBuildReader, new_src_build_reader: ReleaseNoteSrcBuildReader, diff_stats_from_metadata_file: dict, addon_note: str, ): self.old_src_build_reader = old_src_build_reader self.new_src_build_reader = new_src_build_reader self.old_src_build_reader_adapter = ReleaseNoteSrcBuildReaderAdapter(self.old_src_build_reader) self.new_src_build_reader_adapter = ReleaseNoteSrcBuildReaderAdapter(self.new_src_build_reader) self.diff_stats_from_metadata_file = diff_stats_from_metadata_file self.addon_note = addon_note @classmethod def _make_stats_diff(cls, old: dict, new: dict): result = { "added": {}, "deleted": {}, "updated": {}, } diff = make_json_diff(old, new) for item in diff: # get main source / main field key = item["path"].strip("/").split("/")[0] if item["op"] == "add": result["added"][key] = new[key] elif item["op"] == "remove": result["deleted"][key] = old[key] elif item["op"] == "replace": result["updated"][key] = {"new": new[key], "old": old[key]} else: raise ValueError("Unknown operation '%s' while computing changes" % item["op"]) return result @classmethod def _make_mapping_diff(cls, old: dict, new: dict): def mapping_path_to_field_name(path: str) -> str: """ Convert a JSON-Pointer path in a mapping json to a field name. E.g. path "/dbnsfp/properties/altai_neandertal" => field name "dbnsfp.altai_neandertal". Note that "properties" should not be included as part of a field name. The strategy here is iterate over the path components and remove any "properties" found at odd indices (1, 3, 5...). """ path_components = path.strip("/").split("/") path_components = [ path_components[i] for i in range(len(path_components)) if (i % 2 == 0) or (i % 2 == 1 and path_components[i] != "properties") ] return ".".join(path_components) fields = {} diff = make_json_diff(old, new) for item in diff: if item["op"] in ("add", "remove", "replace"): field_name = mapping_path_to_field_name(item["path"]) fields.setdefault(item["op"], []).append(field_name) elif item["op"] == "move": add_field_name = mapping_path_to_field_name(item["path"]) remove_field_name = mapping_path_to_field_name(item["from"]) fields.setdefault("add", []).append(add_field_name) fields.setdefault("remove", []).append(remove_field_name) else: raise ValueError("Unknown operation '%s' while computing changes" % item["op"]) return fields
[docs] def diff_build_stats(self) -> dict: # Read from the reader adapters, not the readers directly old_stats = self.old_src_build_reader_adapter.build_stats new_stats = self.new_src_build_reader_adapter.build_stats return self._make_stats_diff(old_stats, new_stats)
[docs] def diff_datasource_info(self) -> dict: old_info = self.old_src_build_reader_adapter.datasource_info new_info = self.new_src_build_reader_adapter.datasource_info return self._make_stats_diff(old_info, new_info)
[docs] def diff_datasource_mapping(self) -> dict: new_mapping = self.new_src_build_reader.datasource_mapping if not new_mapping: raise ValueError(f"New Mapping cannot be empty. Build id: {self.new_src_build_reader.build_id}") old_mapping = self.old_src_build_reader.datasource_mapping return self._make_mapping_diff(old_mapping, new_mapping)
[docs] def to_dict(self) -> dict: result = { "old": { "_version": self.old_src_build_reader.build_version, "_count": self.old_src_build_reader.build_stats.get("total"), }, "new": { "_version": self.new_src_build_reader.build_version, "_count": self.new_src_build_reader.build_stats.get("total"), "_fields": self.diff_datasource_mapping(), "_summary": self.diff_stats_from_metadata_file, }, "stats": self.diff_build_stats(), "sources": self.diff_datasource_info(), "note": self.addon_note, "generated_on": str(datetime.now().astimezone()), } return result
[docs] class ReleaseNoteTxt(object): def __init__(self, source: ReleaseNoteSource): self.source = source # member kept for debugging self.changes = source.to_dict() @classmethod def _format_number(cls, num, sign=None): try: sign_symbol = "" if sign: if num > 0: sign_symbol = "+" elif num < 0: sign_symbol = "-" num_str = locale.format_string("%d", abs(num), grouping=True) return "%s%s" % (sign_symbol, num_str) except TypeError: # something wrong with converting, maybe we don't even have a number to format... return "N.A"
[docs] def save(self, filepath): try: import prettytable except ImportError: raise ImportError("Please install prettytable to use this rendered") txt = "" title = "Build version: '%s'" % self.changes["new"]["_version"] txt += title + "\n" txt += "".join(["="] * len(title)) + "\n" dt = dtparse(self.changes["generated_on"]) txt += "Previous build version: '%s'\n" % self.changes["old"]["_version"] txt += "Generated on: %s\n" % dt.strftime("%Y-%m-%d at %H:%M:%S") txt += "\n" table = prettytable.PrettyTable( ["Updated datasource", "prev. release", "new release", "prev. # of docs", "new # of docs"] ) table.align["Updated datasource"] = "l" table.align["prev. release"] = "c" table.align["new release"] = "c" table.align["prev. # of docs"] = "r" table.align["new # of docs"] = "r" for src, info in sorted(self.changes["sources"]["added"].items(), key=lambda e: e[0]): main_info = dict([(k, v) for k, v in info.items() if k.startswith("_")]) sub_infos = dict([(k, v) for k, v in info.items() if not k.startswith("_")]) if sub_infos: for sub, sub_info in sub_infos.items(): table.add_row( ["%s.%s" % (src, sub), "-", main_info["_version"], "-", self._format_number(sub_info["_count"])] ) # only _count avail there else: main_count = main_info.get("_count") and self._format_number(main_info["_count"]) or "" table.add_row([src, "-", main_info.get("_version", ""), "-", main_count]) for src, info in sorted(self.changes["sources"]["deleted"].items(), key=lambda e: e[0]): main_info = dict([(k, v) for k, v in info.items() if k.startswith("_")]) sub_infos = dict([(k, v) for k, v in info.items() if not k.startswith("_")]) if sub_infos: for sub, sub_info in sub_infos.items(): table.add_row( [ "%s.%s" % (src, sub), main_info.get("_version", ""), "-", self._format_number(sub_info["_count"]), "-", ] ) # only _count avail there else: main_count = main_info.get("_count") and self._format_number(main_info["_count"]) or "" table.add_row([src, main_info.get("_version", ""), "-", main_count, "-"]) for src, info in sorted(self.changes["sources"]["updated"].items(), key=lambda e: e[0]): # extract information from main-source old_main_info = dict([(k, v) for k, v in info["old"].items() if k.startswith("_")]) new_main_info = dict([(k, v) for k, v in info["new"].items() if k.startswith("_")]) old_main_count = old_main_info.get("_count") and self._format_number(old_main_info["_count"]) or None new_main_count = new_main_info.get("_count") and self._format_number(new_main_info["_count"]) or None if old_main_count is None: assert new_main_count is None, ( "Sub-sources found for '%s', old and new count should " % src + "both be None. Info was: %s" % info ) old_sub_infos = dict([(k, v) for k, v in info["old"].items() if not k.startswith("_")]) new_sub_infos = dict([(k, v) for k, v in info["new"].items() if not k.startswith("_")]) # old & new sub_infos should have the same structure (same existing keys) # so we just use one of them to explore if old_sub_infos: assert new_sub_infos for sub, sub_info in old_sub_infos.items(): table.add_row( [ "%s.%s" % (src, sub), old_main_info.get("_version", ""), new_main_info.get("_version", ""), self._format_number(sub_info["_count"]), self._format_number(new_sub_infos[sub]["_count"]), ] ) else: assert new_main_count is not None, ( "No sub-sources found, old and new count should NOT " + "both be None. Info was: %s" % info ) table.add_row( [ src, old_main_info.get("_version", ""), new_main_info.get("_version", ""), old_main_count, new_main_count, ] ) if table._rows: txt += table.get_string() txt += "\n" else: txt += "No datasource changed.\n" total_count = self.changes["new"].get("_count") if self.changes["sources"]["added"]: txt += "New datasource(s): %s\n" % ", ".join(sorted(list(self.changes["sources"]["added"]))) if self.changes["sources"]["deleted"]: txt += "Deleted datasource(s): %s\n" % ", ".join(sorted(list(self.changes["sources"]["deleted"]))) if self.changes["sources"]: txt += "\n" table = prettytable.PrettyTable(["Updated stats.", "previous", "new"]) table.align["Updated stats."] = "l" table.align["previous"] = "r" table.align["new"] = "r" for stat_name, stat in sorted(self.changes["stats"]["added"].items(), key=lambda e: e[0]): table.add_row([stat_name, "-", self._format_number(stat["_count"])]) for stat_name, stat in sorted(self.changes["stats"]["deleted"].items(), key=lambda e: e[0]): table.add_row([stat_name, self._format_number(stat["_count"]), "-"]) for stat_name, stat in sorted(self.changes["stats"]["updated"].items(), key=lambda e: e[0]): table.add_row( [ stat_name, self._format_number(stat["old"]["_count"]), self._format_number(stat["new"]["_count"]), ] ) if table._rows: txt += table.get_string() txt += "\n\n" if self.changes["new"]["_fields"]: new_fields = sorted(self.changes["new"]["_fields"].get("add", [])) deleted_fields = sorted(self.changes["new"]["_fields"].get("remove", [])) updated_fields = sorted(self.changes["new"]["_fields"].get("replace", [])) if new_fields: txt += "New field(s): %s\n" % ", ".join(new_fields) if deleted_fields: txt += "Deleted field(s): %s\n" % ", ".join(deleted_fields) if updated_fields: txt += "Updated field(s): %s\n" % ", ".join(updated_fields) txt += "\n" if total_count is not None: txt += "Overall, %s documents in this release\n" % (self._format_number(total_count)) if self.changes["new"]["_summary"]: sumups = [] sumups.append("%s document(s) added" % self._format_number(self.changes["new"]["_summary"].get("add", 0))) sumups.append( "%s document(s) deleted" % self._format_number(self.changes["new"]["_summary"].get("delete", 0)) ) sumups.append( "%s document(s) updated" % self._format_number(self.changes["new"]["_summary"].get("update", 0)) ) txt += ", ".join(sumups) + "\n" else: txt += "No information available for added/deleted/updated documents\n" if self.changes.get("note"): txt += "\n" txt += "Note: %s\n" % self.changes["note"] with open(filepath, "w") as fout: fout.write(txt) return txt