Source code for biothings.hub.dataindex.indexer_task

import logging
from collections import namedtuple
from enum import Enum
from functools import partial
from types import SimpleNamespace

from elasticsearch import Elasticsearch, helpers
from pymongo import MongoClient

from biothings.utils.es import ESIndex as BaseESIndex
from biothings.utils.loggers import get_logger

try:
    from biothings.utils.mongo import doc_feeder
except ImportError:
    import biothings

    biothings.config = SimpleNamespace()
    biothings.config.DATA_SRC_DATABASE = "biothings_src"
    biothings.config.DATA_TARGET_DATABASE = "biothings_build"
    from biothings.utils.mongo import doc_feeder

_IDExists = namedtuple("IDExists", ("id", "exists"))


[docs] class ESIndex(BaseESIndex): def __init__(self, client, index_name, **bulk_index_args): super().__init__(client, index_name) self.bulk_index_args = bulk_index_args # -------------------- # bulk operations (m*) # --------------------
[docs] def mget(self, ids): """Return a list of documents like [ { "_id": "0", "a": "b" }, { "_id": "1", "c": "d" }, # 404s are skipped ] """ response = self.client.mget( body={"ids": ids}, index=self.index_name, doc_type=self.doc_type, ) for doc in response["docs"]: if doc.get("found"): doc["_source"]["_id"] = doc["_id"] yield doc["_source"]
[docs] def mexists(self, ids): """Return a list of tuples like [ (_id_0, True), (_id_1, False), (_id_2, True), .... ] """ res = self.client.search( index=self.index_name, doc_type=self.doc_type, body={"query": {"ids": {"values": ids}}}, stored_fields=None, _source=None, size=len(ids), ) id_set = {doc["_id"] for doc in res["hits"]["hits"]} return [_IDExists(_id, _id in id_set) for _id in ids]
[docs] def mindex(self, docs): """Index and return the number of docs indexed.""" def _action(doc): _doc = { "_index": self.index_name, "_type": self.doc_type, "_op_type": "index", } _doc.update(doc) # with _id return _doc return helpers.bulk(self.client, map(_action, docs), **self.bulk_index_args)[0]
# NOTE # Why doesn't "mget", "mexists", "mindex" belong to the base class? # At this moment, their interfaces are too opinionated/customized # for usage in this module. Unless we find them directly useful in # another module in the future, proving their genericity, they should # stay close in this module only. # Data Collection Client def _get_es_client(es_client_args, es_blk_args, es_idx_name): return ESIndex(Elasticsearch(**es_client_args), es_idx_name, **es_blk_args) def _get_mg_client(mg_client_args, mg_dbs_name, mg_col_name): return MongoClient(**mg_client_args)[mg_dbs_name][mg_col_name] # -------------- # Entry Point # --------------
[docs] def dispatch( mg_client_args, mg_dbs_name, mg_col_name, es_client_args, es_blk_args, es_idx_name, ids, mode, name, ): return IndexingTask( partial(_get_es_client, es_client_args, es_blk_args, es_idx_name), partial(_get_mg_client, mg_client_args, mg_dbs_name, mg_col_name), ids, mode, f"index_{es_idx_name}", name, ).dispatch()
def _ensure_logger(logger): if not logger: return logging.getLogger(__name__) if isinstance(logger, str): return get_logger(logger)[0] return logger def _validate_ids(ids, logger=None): validated_ids = [] invalid_ids = [] for _id in ids: if not isinstance(_id, str): raise TypeError("_id '%s' has invalid type (!str)." % repr(_id)) if len(_id) > 512: # this is an ES limitation invalid_ids.append(_id) message = "_id is too long: '%s'" % _id if logger: logger.warning(message) else: print(message) else: validated_ids.append(_id) return validated_ids, invalid_ids
[docs] class Mode(Enum): INDEX = "index" PURGE = "purge" # same as 'index' in this module MERGE = "merge" RESUME = "resume"
[docs] class IndexingTask: """ Index one batch of documents from MongoDB to Elasticsearch. The documents to index are specified by their ids. """ def __init__(self, es, mongo, ids, mode=None, logger=None, name="task"): assert callable(es) assert callable(mongo) self.logger = _ensure_logger(logger) self.name = f"#{name}" if isinstance(name, int) else name self.ids, self.invalid_ids = _validate_ids(ids, self.logger) self.mode = Mode(mode or "index") # these are functions to create clients, # each also associated with an organizational # structure in the corresponding database, # functioning as the source or destination # of batch document manipulation. self.backend = SimpleNamespace() self.backend.es = es # wrt an index self.backend.mongo = mongo # wrt a collection def _get_clients(self): clients = SimpleNamespace() clients.es = self.backend.es() clients.mongo = self.backend.mongo() return clients
[docs] def dispatch(self): if self.mode in (Mode.INDEX, Mode.PURGE): return self.index() elif self.mode == Mode.MERGE: return self.merge() elif self.mode == Mode.RESUME: return self.resume()
[docs] def index(self): clients = self._get_clients() docs = doc_feeder( clients.mongo, step=len(self.ids), inbatch=False, query={"_id": {"$in": self.ids}}, ) self.logger.info("%s: %d documents.", self.name, len(self.ids)) count_docs = clients.es.mindex(docs) return count_docs + len(self.invalid_ids)
[docs] def merge(self): clients = self._get_clients() upd_cnt, docs_old = 0, {} new_cnt, docs_new = 0, {} # populate docs_old for doc in clients.es.mget(self.ids): docs_old[doc["_id"]] = doc # populate docs_new for doc in doc_feeder( clients.mongo, step=len(self.ids), inbatch=False, query={"_id": {"$in": self.ids}}, ): docs_new[doc["_id"]] = doc doc.pop("_timestamp", None) # merge existing ids for key in list(docs_new): if key in docs_old: docs_old[key].update(docs_new[key]) del docs_new[key] # updated docs (those existing in col *and* index) upd_cnt = clients.es.mindex(docs_old.values()) self.logger.info("%s: %d documents updated.", self.name, upd_cnt) # new docs (only in col, *not* in index) new_cnt = clients.es.mindex(docs_new.values()) self.logger.info("%s: %d new documents.", self.name, new_cnt) return upd_cnt + new_cnt
[docs] def resume(self): clients = self._get_clients() missing_ids = [x.id for x in clients.es.mexists(self.ids) if not x.exists] self.logger.info("%s: %d missing documents.", self.name, len(missing_ids)) if missing_ids: self.ids = missing_ids self.index() return len(self.ids)
[docs] def test_00(): # ES from pprint import pprint as print index = ESIndex(Elasticsearch(), "mynews_202105261855_5ffxvchx") print(index.doc_type) print( list( index.mget( [ "0999b13cb8026aba", "1111647aaf9c70b4", "________________", ] ) ) )
# print(list(index.mexists([ # "0999b13cb8026aba", # "1111647aaf9c70b4", # "________________" # ])))
[docs] def test_clients(): def _mongo(): client = MongoClient() database = client["biothings_build"] return database["mynews_202012280220_vsdevjdk"] def _es(): client = Elasticsearch() return ESIndex(client, "indexer-test") return (_es, _mongo)
[docs] def test0(): task = IndexingTask( *test_clients(), ("0999b13cb8026aba", "1111647aaf9c70b4", "1c9828073bad510c"), ) task.index()
[docs] def test1(): task = IndexingTask( *test_clients(), ( "0999b13cb8026aba", "1111647aaf9c70b4", "1c9828073bad510c", "1f447d7fc6dcc2cf", "27e81a308e4e04da", ), ) task.resume()
if __name__ == "__main__": logging.basicConfig(level="DEBUG") test_00() # test0() # test1()