Source code for biothings.hub.dataindex.snapshot_cleanup

import logging
import xml.dom.minidom
from typing import NamedTuple
from xml.etree import ElementTree

from elasticsearch import Elasticsearch
from pymongo.collection import Collection


class _Ele(NamedTuple):  # Cleanup Element
    tag: str
    attrs: dict
    elems: list

    @classmethod
    def ment(cls, tag, attrs, content):  # _Ele.ment(..) :)
        return _Ele(tag, attrs, [_Ele.ment(*e) for e in content])

    def to_xml(self):
        attrs = self.attrs.copy()

        if self.tag in ("CleanUps", "Remove", "Keep"):
            attrs["size"] = str(len(self.elems))

        if self.tag == "Snapshot":
            attrs = {
                "_id": attrs["_id"],
                "build_name": attrs["build_name"],
                "created_at": str(attrs["created_at"]),
                "env": attrs.get("environment") or "N/A",
            }

        root = ElementTree.Element(self.tag, attrs)
        for elem in self.elems:
            root.append(elem.to_xml())
        return root

    def __str__(self):
        ets = ElementTree.tostring(self.to_xml())
        dom = xml.dom.minidom.parseString(ets)
        return dom.toprettyxml(indent=" " * 2)


[docs] def find(collection, env=None, keep=3, group_by=None, return_db_cols=False, **filters): if not isinstance(collection, Collection): raise NotImplementedError("Require MongoDB Hubdb.") if isinstance(group_by, (str, type(None))): group_by = "$" + (group_by or "build_config") elif isinstance(group_by, (list, tuple)): group_by = {k.replace(".", "_"): "$" + k for k in group_by} groups = list( collection.aggregate( [ {"$project": {"build_config": "$build_config._id", "snapshot": {"$objectToArray": "$snapshot"}}}, {"$unwind": {"path": "$snapshot"}}, { "$addFields": { "snapshot.v.build_config": "$build_config", "snapshot.v.build_name": "$_id", "snapshot.v._id": "$snapshot.k", } }, {"$replaceRoot": {"newRoot": "$snapshot.v"}}, {"$match": {"environment": env, **filters} if env else filters}, {"$sort": {"created_at": 1}}, {"$group": {"_id": group_by, "items": {"$push": "$$ROOT"}}}, ] ) ) if return_db_cols: return groups return _Ele.ment( "CleanUps", {}, [ ( "Group", _expand(group["_id"], group_by), [ ("Remove", {}, [("Snapshot", _doc, []) for _doc in _remove(group, keep)]), ("Keep", {}, [("Snapshot", _doc, []) for _doc in _keep(group, keep)]), ], ) for group in groups ], )
def _expand(group_id, group_by): if isinstance(group_id, str): return {group_by.strip("$"): group_id} if isinstance(group_id, dict): return group_id raise TypeError() def _keep(doc, keep): return doc["items"][-keep or len(doc["items"]) :] def _remove(doc, keep): return doc["items"][: -keep or len(doc["items"])] # the operations below are not made async # because SnapshotEnv.client is not async
[docs] def delete(collection, element, envs): cnt = 0 assert element.tag == "CleanUps" for group in element.elems: for catagory in group.elems: if catagory.tag == "Remove": for snapshot in catagory.elems: _delete(collection, snapshot, envs) cnt += 1 return cnt
def _delete(collection, snapshot, envs): assert snapshot.tag == "Snapshot" if "environment" in snapshot.attrs: env = snapshot.attrs["environment"] client = envs[env].client else: # legacy format env = snapshot.attrs["conf"]["indexer"]["env"] env = envs.index_manager[env] client = Elasticsearch(**env["args"]) client.snapshot.delete( snapshot.attrs["conf"]["repository"]["name"], snapshot.attrs["_id"], ) collection.update_one( {"_id": snapshot.attrs["build_name"]}, {"$unset": {f"snapshot.{snapshot.attrs['_id']}": 1}}, )
[docs] def plain_text(element): plain_texts = [] assert element.tag == "CleanUps" for group in element.elems: assert group.tag == "Group" plain_texts.append("Snapshots filtered by:") for k, v in group.attrs.items(): plain_texts.append(f" {k}={repr(v)}") plain_texts.append("") removes = group.elems[0].elems plain_texts.append(f" Found {len(removes)} snapshots to remove:") for snapshot in removes: plain_texts.append(" " * 8 + _plain_text(snapshot)) keeps = group.elems[1].elems plain_texts.append(f" Found {len(keeps)} snapshots to keep:") for snapshot in keeps: plain_texts.append(" " * 8 + _plain_text(snapshot)) plain_texts.append("") return "\n".join(plain_texts)
def _plain_text(snapshot): assert snapshot.tag == "Snapshot" return "".join( ( snapshot.attrs["_id"], " (", f'env={snapshot.attrs.get("environment") or "N/A"}', ", " # # "build_name" generally agrees with the snapshot _id, # although technically snapshots can be named anything. # since in most use cases, the snapshot name at least # indicates which build it is created from, even when # it is not named exactly the same as the build, for # presentation concision, build_name is not shown here. # uncomment the following line if this assumption is # no longer true in the future. # # f'build_name={repr(snapshot.attrs["build_name"])}', ", ", f'created_at={str(snapshot.attrs["created_at"])}', ")", ) ) # Feature Specification ↑ # https://suwulab.slack.com/archives/CC19LHAF2/p1631126588023700?thread_ts=1631063247.003700&cid=CC19LHAF2 # Snapshots filtered by: # build_config="demo_allspecies" # ... # Found 8 snapshots to remove: # ... # Found 3 snapshots to keep: # ...
[docs] def test_find(): from pymongo import MongoClient logging.basicConfig(level="DEBUG") # mychem # ------- # "su04" # "mychem_hubdb", "src_build" client = MongoClient("su06") collection = client["outbreak_hubdb"]["src_build"] print(plain_text(find(collection)))
[docs] def test_print(): print( _Ele.ment( "A", {}, [ ("AA", {}, []), ("AB", {"ABC": "D"}, []), ], ) )
if __name__ == "__main__": test_find()