Source code for biothings.hub.dataindex.snapshot_cleanup

import logging
import xml.dom.minidom
from typing import NamedTuple
from xml.etree import ElementTree

from elasticsearch import Elasticsearch
from pymongo.collection import Collection
from config import logger
from elasticsearch.exceptions import NotFoundError


class _Ele(NamedTuple):  # Cleanup Element
    tag: str
    attrs: dict
    elems: list

    @classmethod
    def ment(cls, tag, attrs, content):  # _Ele.ment(..) :)
        return _Ele(tag, attrs, [_Ele.ment(*e) for e in content])

    def to_xml(self):
        attrs = self.attrs.copy()

        if self.tag in ("CleanUps", "Remove", "Keep"):
            attrs["size"] = str(len(self.elems))

        if self.tag == "Snapshot":
            attrs = {
                "_id": attrs["_id"],
                "build_name": attrs["build_name"],
                "created_at": str(attrs["created_at"]),
                "env": attrs.get("environment") or "N/A",
            }

        root = ElementTree.Element(self.tag, attrs)
        for elem in self.elems:
            root.append(elem.to_xml())
        return root

    def __str__(self):
        ets = ElementTree.tostring(self.to_xml())
        dom = xml.dom.minidom.parseString(ets)
        return dom.toprettyxml(indent=" " * 2)


[docs] def find(collection, *, env=None, keep=3, group_by=None, return_db_cols=False, **filters): """ Identify snapshots to remove or keep based on specified criteria. This function queries a MongoDB collection to find snapshots matching the given filters, groups them according to the specified grouping key(s), and determines which snapshots to keep or remove based on the 'keep' parameter. Parameters: - collection (Collection): The MongoDB collection to query. Must be an instance of `pymongo.collection.Collection`. - env (str, optional): The environment name to filter snapshots. Defaults to None. - keep (int, optional): The number of most recent snapshots to keep in each group. Defaults to 3. - group_by (str or list, optional): The key or list of keys to group snapshots by. If None, defaults to 'build_config'. - return_db_cols (bool, optional): If True, returns the raw database query results instead of the structured `_Ele` element. Defaults to False. - **filters: Additional keyword arguments to filter snapshots. Returns: - _Ele or list: An `_Ele` element representing the snapshots to be removed and kept, organized by groups, or a list of raw database query results if `return_db_cols` is True. Raises: - NotImplementedError: If 'collection' is not an instance of `pymongo.collection.Collection`. - TypeError: If 'group_by' is neither a string, list, tuple, nor None. """ if not isinstance(collection, Collection): raise NotImplementedError("Require MongoDB Hubdb.") if isinstance(group_by, (str, type(None))): group_by = "$" + (group_by or "build_config") elif isinstance(group_by, (list, tuple)): group_by = {k.replace(".", "_"): "$" + k for k in group_by} groups = list( collection.aggregate( [ { "$project": { "build_config": "$build_config._id", "snapshot": {"$objectToArray": "$snapshot"}, } }, {"$unwind": {"path": "$snapshot"}}, { "$addFields": { "snapshot.v.build_config": "$build_config", "snapshot.v.build_name": "$_id", "snapshot.v._id": "$snapshot.k", } }, {"$replaceRoot": {"newRoot": "$snapshot.v"}}, {"$match": {"environment": env, **filters} if env else filters}, # Exclude cloud credentials {"$unset": ["conf.cloud.access_key", "conf.cloud.secret_key"]}, {"$sort": {"created_at": 1}}, {"$group": {"_id": group_by, "items": {"$push": "$$ROOT"}}}, ] ) ) if return_db_cols: return groups return _Ele.ment( "CleanUps", {}, [ ( "Group", _expand(group["_id"], group_by), [ ("Remove", {}, [("Snapshot", _doc, []) for _doc in _remove(group, keep)]), ("Keep", {}, [("Snapshot", _doc, []) for _doc in _keep(group, keep)]), ], ) for group in groups ], )
def _expand(group_id, group_by): if isinstance(group_id, str): return {group_by.strip("$"): group_id} if isinstance(group_id, dict): return group_id raise TypeError() def _keep(doc, keep): return doc["items"][-keep or len(doc["items"]) :] def _remove(doc, keep): return doc["items"][: -keep or len(doc["items"])] # the operations below are not made async # because SnapshotEnv.client is not async
[docs] def delete(collection, element, envs, ignoreErrors=False): cnt = 0 assert element.tag == "CleanUps" for group in element.elems: for category in group.elems: if category.tag == "Remove": for snapshot in category.elems: _delete(collection, snapshot, envs, ignoreErrors) cnt += 1 return cnt
def _delete(collection, snapshot, envs, ignoreErrors=False): """ Delete a single snapshot from the Elasticsearch repository and update the MongoDB collection. This helper function deletes the specified snapshot from the Elasticsearch repository and removes its reference from the MongoDB 'collection'. Parameters: - collection (Collection): The MongoDB collection where snapshot metadata is stored. - snapshot (_Ele): An `_Ele` element representing the snapshot to be deleted. - envs (dict): A mapping of environment names to their respective clients or configurations. - ignoreErrors (bool, optional): If True, ignores errors during deletion and continues processing. Defaults to False. Raises: - AssertionError: If the tag of 'snapshot' is not 'Snapshot'. - ValueError: If the environment is not registered in 'envs' and 'ignoreErrors' is False, or if the snapshot does not exist in the repository. - KeyError: If required keys are missing in 'snapshot.attrs'. """ assert snapshot.tag == "Snapshot" try: if "environment" in snapshot.attrs: env = snapshot.attrs["environment"] client = envs[env].client else: # legacy format env = snapshot.attrs["conf"]["indexer"]["env"] env = envs.index_manager[env] client = Elasticsearch(**env["args"]) except KeyError as exc: message = ( f"Environment '{env}' is not registered and connection details are unavailable. " "Consider adding it to the hub configuration otherwise manual deletion is required." ) if ignoreErrors: logger.error(message) logger.info("Ignoring error and continuing to delete snapshot '%s'", snapshot.attrs["_id"]) collection.update_one( {"_id": snapshot.attrs["build_name"]}, {"$unset": {f"snapshot.{snapshot.attrs['_id']}": 1}}, ) return raise ValueError(message) from exc try: client.snapshot.delete( repository=snapshot.attrs["conf"]["repository"]["name"], snapshot=snapshot.attrs["_id"], ) except NotFoundError as exc: raise ValueError( f"Snapshot '{snapshot.attrs['_id']}' does not exist in the repository " f"'{snapshot.attrs['conf']['repository']['name']}'. " "Validate the snapshots to remove this snapshot from the database." ) from exc collection.update_one( {"_id": snapshot.attrs["build_name"]}, {"$unset": {f"snapshot.{snapshot.attrs['_id']}": 1}}, )
[docs] def plain_text(element): plain_texts = [] assert element.tag == "CleanUps" for group in element.elems: assert group.tag == "Group" plain_texts.append("Snapshots filtered by:") for k, v in group.attrs.items(): plain_texts.append(f" {k}={repr(v)}") plain_texts.append("") removes = group.elems[0].elems plain_texts.append(f" Found {len(removes)} snapshots to remove:") for snapshot in removes: plain_texts.append(" " * 8 + _plain_text(snapshot)) keeps = group.elems[1].elems plain_texts.append(f" Found {len(keeps)} snapshots to keep:") for snapshot in keeps: plain_texts.append(" " * 8 + _plain_text(snapshot)) plain_texts.append("") return "\n".join(plain_texts)
def _plain_text(snapshot): assert snapshot.tag == "Snapshot" return "".join( ( snapshot.attrs["_id"], " (", f'env={snapshot.attrs.get("environment") or "N/A"}', ", ", # "build_name" generally agrees with the snapshot _id, # although technically snapshots can be named anything. # since in most use cases, the snapshot name at least # indicates which build it is created from, even when # it is not named exactly the same as the build, for # presentation concision, build_name is not shown here. # uncomment the following line if this assumption is # no longer true in the future. # # f'build_name={repr(snapshot.attrs["build_name"])}', ", ", f'created_at={str(snapshot.attrs["created_at"])}', ")", ) ) # Feature Specification ↑ # https://suwulab.slack.com/archives/CC19LHAF2/p1631126588023700?thread_ts=1631063247.003700&cid=CC19LHAF2 # Snapshots filtered by: # build_config="demo_allspecies" # ... # Found 8 snapshots to remove: # ... # Found 3 snapshots to keep: # ...
[docs] def test_find(): from pymongo import MongoClient logging.basicConfig(level="DEBUG") # mychem # ------- # "su04" # "mychem_hubdb", "src_build" client = MongoClient("su06") collection = client["outbreak_hubdb"]["src_build"] print(plain_text(find(collection)))
[docs] def test_print(): print( _Ele.ment( "A", {}, [ ("AA", {}, []), ("AB", {"ABC": "D"}, []), ], ) )
if __name__ == "__main__": test_find()