Source code for biothings.hub.dataindex.indexer_cleanup

import datetime
import itertools
import logging
from collections import UserDict, UserList
from dataclasses import dataclass
from pprint import pformat

from elasticsearch import AsyncElasticsearch
from pymongo.collection import Collection

# NOTE
# Throughout this module, an XML-like serialization
# format is utilized, just like the python default:
#
# >>> object()
# <object object at 0x7f7b4882a110>
#
# This format helps preserve hierarchical structure
# in the object it represents, is intuitive to read,
# and closely represent the underlying programming
# concepts, each tagname corresponds to one class.
#
_TAB = " " * 2


class _Index(UserDict):
    def __str__(self):
        return (
            f"<Index"
            f" env={repr(self.get('environment'))}"
            f" name={repr(self.get('_id'))}"
            f" ts={repr(self.get('created_at'))}"
            f"/>"
        )


class _Indices(UserList):
    group = NotImplemented

    def __init__(self, initlist=None):
        super().__init__(initlist)
        self.data = [_Index(x) for x in self]

    def __str__(self):
        return "\n".join(
            (f"<{self.group} len={len(self)}>", *(_TAB + str(index) for index in self), f"<{self.group}/>")
        )


class _IndicesToKeep(_Indices):
    group = "Keep"


class _IndicesToRemove(_Indices):
    group = "Remove"


@dataclass
class _BuildConfig:
    name: str
    remove: _IndicesToRemove
    keep: _IndicesToKeep

    def __str__(self):
        return "\n".join(
            (
                f"<BuildConfig {repr(self.name)}>",
                *(_TAB + line for line in str(self.remove).split("\n")),
                *(_TAB + line for line in str(self.keep).split("\n")),
                "<BuildConfig/>",
            )
        )

    def __iter__(self):
        return iter(self.remove)


class _CleanUps(UserList):
    def __str__(self):
        lines = map(lambda x: str(x).split("\n"), self)
        lines = itertools.chain.from_iterable(lines)
        return "\n".join(("<CleanUps>", *(_TAB + line for line in lines), "<CleanUps/>"))


# OUTPUT EXAMPLE
# ---------------------
# >>> _CleanUpList(...)
# <CleanUp>
#   <BuildConfig 'mygene_allspecies'>
#     <Remove len=6>
#       <Index env='prod' name='mygene_xbuo6d' ts=datetime(2019, 8, 27, ...)/>
#       <Index env='prod' name='mygene_vxia0r' ts=datetime(2020, 1, 8, ...)/>
#       <Index env='prod' name='mygene_25wlt4' ts=datetime(2020, 1, 20, ...)/>
#       <Index env='local' name='mygene_ufkw79' ts=datetime(2020, 8, 28, ...)/>
#       <Index env='local' name='mygene_uq3chc' ts=datetime(2021, 4, 6, ...)/>
#       <Index env='local' name='mygene_mnkct5' ts=datetime(2021, 4, 15, ...)/>
#     <Remove/>
#     <Keep len=3>
#       <Index env='su10' name='mygene_ibjpha' ts=datetime(2021, 6, 30, ...)/>
#       <Index env='su10' name='mygene_osyzmt' ts=datetime(2021, 8, 9, ...)/>
#       <Index env='su10' name='mygene_test' ts=datetime(2021, 8, 9, ...)/>
#     <Keep/>
#   <BuildConfig/>
#   <BuildConfig 'demo_allspecies'>
#     <Remove len=3>
#       <Index env='test' name='demo_mygene_ngupjv' ts=datetime(2018, 3, 12, ...)/>
#       <Index env='test' name='demo_mygene_rpguqe' ts=datetime(2018, 8, 6, ...)/>
#       <Index env='test' name='demo_mygene_vqpur6' ts=datetime(2019, 1, 29, ...)/>
#     <Remove/>
#     <Keep len=3>
#       <Index env='test' name='demo_mygene_irvwa0' ts=datetime(2019, 3, 25, ...)/>
#       <Index env='test' name='demo_mygene_cpoldl' ts=datetime(2020, 1, 8, ...)/>
#       <Index env='local' name='demo_mygene_qekeic' ts=datetime(2020, 4, 22, ...)/>
#     <Keep/>
#   <BuildConfig/>
# <CleanUp/>
#


[docs] class CleanUpResult(list): def __repr__(self): return "".join((type(self).__name__, "(", "\n" if self else "", pformat(list(self), width=150), ")"))
[docs] class Cleaner: def __init__(self, collection, indexers, logger=None): self.collection = collection # pymongo.collection.Collection self.indexers = indexers # hub.dataindex.IndexManager self.logger = logger or logging.getLogger(__name__)
[docs] def find(self, env=None, keep=3, **filters): if not isinstance(self.collection, Collection): raise NotImplementedError("Require MongoDB Hubdb.") results = list( self.collection.aggregate( [ {"$project": {"build_config": "$build_config._id", "index": {"$objectToArray": "$index"}}}, {"$unwind": {"path": "$index"}}, {"$addFields": {"index.v.build_config": "$build_config", "index.v._id": "$index.k"}}, {"$replaceRoot": {"newRoot": "$index.v"}}, {"$match": {"environment": env or {"$exists": True}, **filters}}, # {X # . '_id': 'mynews_202012280220_vsdevjdk', # ...'build_config': 'mynews', ──────────┐ # ...'environment': 'local', │ # : 'created_at': datetime(...) │ # }Y │ # { GROUP BY │ # '_id': 'mynews', <──────────────────┘ # ...'indices': [ # ..: {X ... }Y, ... # ....] # .} { "$project": dict.fromkeys( ( "build_config", "environment", "created_at", ), 1, ) }, {"$sort": {"created_at": 1}}, { "$group": { "_id": "$build_config", "indices": {"$push": "$$ROOT"}, } }, ] ) ) # ............................: } return _CleanUps( [ _BuildConfig( doc["_id"], # ↓ -0 in slicing does not yield the desired result _IndicesToRemove(doc["indices"][: -keep or len(doc["indices"])]), _IndicesToKeep(doc["indices"][-keep or len(doc["indices"]) :]), ) for doc in results ] )
[docs] async def clean(self, cleanups): self.logger.debug(cleanups) actions = CleanUpResult() for index in itertools.chain.from_iterable(cleanups): args = self.indexers[index["environment"]]["args"] async with AsyncElasticsearch(**args) as client: await client.indices.delete(index["_id"], ignore_unavailable=True) action = ("DELETE", str(index)) actions.append(action) logging.info(action) self.collection.update_many( {f"index.{index['_id']}.environment": index["environment"]}, {"$unset": {f"index.{index['_id']}": 1}}, ) return actions
[docs] @staticmethod def plain_text(cleanups): plain_texts = [] for build_config in cleanups: plain_texts.append(f"> BuildConfig {repr(build_config.name)}") plain_texts.append(f" Found {len(build_config.remove)} indices to remove:") for index in build_config.remove: _id = index.get("_id") env = repr(index.get("environment")) ts = str(index.get("created_at")) plain_texts.append(f" {_id} (env={env}, created={ts})") plain_texts.append(f" Found {len(build_config.keep)} indices to keep:") for index in build_config.keep: _id = index.get("_id") env = repr(index.get("environment")) ts = str(index.get("created_at")) plain_texts.append(f" {_id} (env={env}, created={ts})") plain_texts.append("") return "\n".join(plain_texts)
# Feature Specification ↑ # https://suwulab.slack.com/archives/CC19LHAF2/p1631119811009900?thread_ts=1631063230.003400&cid=CC19LHAF2 # >BuildConfig "mygene_allspecies": # Found 8 indices to remove: # mygene_allspecies_20180501_rpds7zzn (indexer_env='prod', created=2018-05-01T04:45:25.132000) # ... # Found 3 indices to keep: # mygene_allspecies_20180501_rpds7zzn (indexer_env='prod', created=2021-04-06T22:34:09.863000) # NOTE # Using "env" instead of "indexer_env" as desribed in the feature spec # to match the signature of the Cleaner.find method (first argument).
[docs] def test_str(): print( _CleanUps( [ _BuildConfig( "mynews", _IndicesToRemove( [ { "_id": "mynews_20210811_test", "build_config": "mynews", "created_at": datetime.datetime(2021, 8, 11, 19, 27, 25, 141000), "environment": "local", }, { "_id": "mynews_202105261855_5ffxvchx", "build_config": "mynews", "created_at": datetime.datetime(2021, 8, 16, 9, 26, 56, 221000), "environment": "local", }, { "_id": "mynews_202012280220_vsdevjdk", "build_config": "mynews", "created_at": datetime.datetime(2021, 8, 17, 0, 23, 11, 374000), "environment": "local", }, ] ), _IndicesToKeep( [ { "_id": "mynews_202009170234_fjvg7skx", "build_config": "mynews", "created_at": datetime.datetime(2020, 9, 17, 2, 35, 12, 800000), "environment": "local", }, { "_id": "mynews_202009222133_6rz3vljq", "build_config": "mynews", "created_at": datetime.datetime(2020, 9, 22, 21, 33, 40, 958000), "environment": "local", }, { "_id": "mynews_202010060100_ontyofuv", "build_config": "mynews", "created_at": datetime.datetime(2020, 10, 6, 1, 0, 11, 237000), "environment": "local", }, ] ), ) ] ) )
[docs] def test_find(): from pymongo import MongoClient logging.basicConfig(level="DEBUG") # mychem # ------- # "su04" # "mychem_hubdb", "src_build" # mygene # ------- # "su05" # "genedoc_src", "src_build" # docker # ------ # # "biothings", "src_build" client = MongoClient("su05") collection = client["genedoc_src"]["src_build"] cleaner = Cleaner(collection, {"local": {"args": {}}}) obj = cleaner.find() # print(type(obj)) # print(obj) print(cleaner.plain_text(obj)) return cleaner, obj
[docs] def test_clean(): import asyncio cleaner, cleanups = test_find() loop = asyncio.get_event_loop() print(loop.run_until_complete(cleaner.clean(cleanups)))
if __name__ == "__main__": test_find()