Source code for biothings.hub.dataindex.indexer

import os
import time
import math
import copy
from datetime import datetime
import pickle
import asyncio
from functools import partial

from elasticsearch import Elasticsearch

from biothings.utils.hub_db import get_src_build
from biothings.utils.common import timesofar, get_random_string, \
    get_class_from_classpath
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseManager
from biothings.utils.es import ESIndexer
from biothings import config as btconfig
from biothings.utils.mongo import doc_feeder, id_feeder
from config import LOG_FOLDER, logger as logging
from biothings.hub.databuild.backend import create_backend, merge_src_build_metadata
from biothings.hub import INDEXER_CATEGORY, INDEXMANAGER_CATEGORY


def new_index_worker(col_name, ids, pindexer, batch_num):
    col = create_backend(col_name).target_collection
    idxer = pindexer()
    cur = doc_feeder(col,
                     step=len(ids),
                     inbatch=False,
                     query={'_id': {
                         '$in': ids
                     }})
    cnt = idxer.index_bulk(cur)
    return cnt


def merge_index_worker(col_name, ids, pindexer, batch_num):
    col = create_backend(col_name).target_collection
    idxer = pindexer()
    upd_cnt = 0
    new_cnt = 0
    cur = doc_feeder(col,
                     step=len(ids),
                     inbatch=False,
                     query={'_id': {
                         '$in': ids
                     }})
    docs = [d for d in cur]
    [d.pop("_timestamp", None) for d in docs]
    # dids = dict([(d["_id"], d) for d in docs])
    dids = {d["_id"]: d for d in docs}
    # dexistings = dict([(d["_id"], d)
    #                    for d in idxer.get_docs([k for k in dids.keys()])])
    dexistings = {d["_id"]: d for d in idxer.get_docs([k for k in dids.keys()])}
    for _id in dexistings:
        d = dexistings[_id]
        # update in-place
        d.update(dids[_id])
        # mark as processed/updated
        dids.pop(_id)
    # updated docs (those existing in col *and* index)
    upd_cnt = idxer.index_bulk(dexistings.values(), len(dexistings))
    logging.debug("%s documents updated in index", repr(upd_cnt))
    # new docs (only in col, *not* in index)
    new_cnt = idxer.index_bulk(dids.values(), len(dids))
    logging.debug("%s new documents in index", repr(new_cnt))
    # need to return one: tuple(cnt,list)
    ret = (upd_cnt[0] + new_cnt[0], upd_cnt[1] + new_cnt[1])
    return ret


def indexer_worker(col_name,
                   ids,
                   pindexer,
                   batch_num,
                   mode="index",
                   worker=new_index_worker):
    try:
        if mode in ["index", "merge"]:
            return worker(col_name, ids, pindexer, batch_num)
        elif mode == "resume":
            idxr = pindexer()
            es_ids = idxr.mexists(ids)
            missing_ids = [e[0] for e in es_ids if e[1] is False]
            if missing_ids:
                return worker(col_name, missing_ids, pindexer, batch_num)
            else:
                # fake indexer results, it has to be a tuple, first elem is num of indexed docs
                return (0, None)
    except Exception as e:
        logger_name = "index_%s_%s_batch_%s" % (pindexer.keywords.get(
            "index", "index"), col_name, batch_num)
        logger, _ = get_logger(logger_name, btconfig.LOG_FOLDER)
        logger.exception("indexer_worker failed")
        exc_fn = os.path.join(btconfig.LOG_FOLDER, "%s.pick" % logger_name)
        pickle.dump({"exc": e, "ids": ids}, open(exc_fn, "wb"))
        logger.info("Exception and IDs were dumped in pickle file '%s'", exc_fn)
        raise


[docs]class IndexerException(Exception): pass
[docs]class Indexer(object): """ Basic indexer, reading documents from a mongo collection (target_name) and sending documents to ES. """ def __init__(self, es_host, target_name=None, **kwargs): self.host = es_host self.env = None self.log_folder = LOG_FOLDER self.timestamp = datetime.now() self.conf_name = None self.build_doc = None self.target_name = None self.index_name = None self.doc_type = None self.num_shards = None self.num_replicas = None self.kwargs = kwargs self.ti = time.time() def get_predicates(self): return []
[docs] def get_pinfo(self): """ Return dict containing information about the current process (used to report in the hub) """ return { "category": INDEXER_CATEGORY, "source": "%s:%s" % (self.conf_name, self.index_name), "step": "", "description": "" } preds = self.get_predicates() if preds: pinfo["__predicates__"] = preds return pinfo
[docs] @asyncio.coroutine def index(self, target_name, index_name, job_manager, steps=["index", "post"], batch_size=10000, ids=None, mode="index", worker=None): """ Build an index named "index_name" with data from collection "target_collection". "ids" can be passed to selectively index documents. "mode" can have the following values: - 'purge': will delete index if it exists - 'resume': will use existing index and add documents. "ids" can be passed as a list of missing IDs, or, if not pass, ES will be queried to identify which IDs are missing for each batch in order to complete the index. - 'merge': will merge data with existing index' documents, used when populated several distinct times (cold/hot merge for instance) - None (default): will create a new index, assuming it doesn't already exist """ assert job_manager # check what to do if type(steps) == str: steps = [steps] self.target_name = target_name self.index_name = index_name self.load_build() self.setup_log() # select proper index worker according to mode: if worker is None: # none specified, choose correct one if mode == "merge": worker = merge_index_worker else: worker = new_index_worker got_error = False cnt = 0 if "index" in steps: self.register_status("indexing", transient=True, init=True, job={"step": "index"}) assert self.build_doc.get("backend_url") target_collection = create_backend( self.build_doc["backend_url"]).target_collection backend_url = self.build_doc["backend_url"] _mapping = self.get_mapping() _extra = self.get_index_creation_settings() # partially instantiated indexer instance for process workers partial_idxer = partial(ESIndexer, doc_type=self.doc_type, index=index_name, es_host=self.host, step=batch_size, number_of_shards=self.num_shards, number_of_replicas=self.num_replicas, **self.kwargs) # instantiate one here for index creation es_idxer = partial_idxer() if es_idxer.exists_index(): if mode == "purge": es_idxer.delete_index() elif mode not in ["resume", "merge"]: msg = "Index already '%s' exists, (use mode='purge' to auto-delete it or mode='resume' to add more documents)" % index_name self.register_status("failed", job={"err": msg}) raise IndexerException(msg) if mode not in ["resume", "merge"]: try: es_idxer.create_index({self.doc_type: _mapping}, _extra) except Exception as e: self.logger.exception("Failed to create index") self.register_status("failed", job={"err": repr(e)}) raise def clean_ids(ids): # can't use a generator, it's going to be pickled cleaned = [] for _id in ids: if type(_id) != str: self.logger.warning( "_id '%s' has invalid type (!str), skipped", repr(_id) ) continue if len(_id) > 512: # this is an ES6 limitation self.logger.warning("_id is too long: '%s'", _id) continue cleaned.append(_id) return cleaned jobs = [] total = target_collection.count() btotal = math.ceil(total / batch_size) bnum = 1 if ids: self.logger.info( "Indexing from '%s' with specific list of _ids, create indexer job with batch_size=%d", target_name, batch_size) id_provider = iter_n(ids,batch_size) else: self.logger.info( "Fetch _ids from '%s', and create indexer job with batch_size=%d", target_name, batch_size) id_provider = id_feeder(target_collection, batch_size=batch_size, logger=self.logger) for ids in id_provider: yield from asyncio.sleep(0.0) origcnt = len(ids) ids = clean_ids(ids) newcnt = len(ids) if origcnt != newcnt: self.logger.warning( "%d document(s) can't be indexed and will be skipped (invalid _id)", origcnt-newcnt) # progress count cnt += len(ids) pinfo = self.get_pinfo() pinfo["step"] = self.target_name try: descprogress = cnt / total * 100 except ZeroDivisionError: descprogress = 0.0 pinfo["description"] = "#%d/%d (%.1f%%)" % (bnum, btotal, descprogress) self.logger.info("Creating indexer job #%d/%d, to index '%s' %d/%d (%.1f%%)", bnum, btotal, backend_url, cnt, total, descprogress) job = yield from job_manager.defer_to_process( pinfo, partial(indexer_worker, backend_url, ids, partial_idxer, bnum, mode, worker)) def batch_indexed(f, batch_num): nonlocal got_error try: res = f.result() if type(res) != tuple or type(res[0]) != int: got_error = Exception("Batch #%s failed while indexing collection '%s' [result:%s]" % (batch_num, self.target_name, repr(res))) except Exception as e: got_error = e self.logger.exception("Batch indexed error") return job.add_done_callback(partial(batch_indexed, batch_num=bnum)) jobs.append(job) bnum += 1 # raise error as soon as we know if got_error: self.register_status("failed", job={"err": repr(got_error)}) raise got_error self.logger.info("%d jobs created for indexing step", len(jobs)) tasks = asyncio.gather(*jobs) def done(f): nonlocal got_error if None in f.result(): got_error = None return # compute overall inserted/updated records # returned values looks like [(num,[]),(num,[]),...] cnt = sum((val[0] for val in f.result())) self.register_status("success", job={"step": "index"}, index={"count": cnt}) if total != cnt: # raise error if counts don't match, but index is still created, # fully registered in case we want to use it anyways err = "Merged collection has %d documents but %d have been indexed (check logs for more)" % ( total, cnt) raise IndexerException(err) self.logger.info( "Index '%s' successfully created using merged collection %s", index_name, target_name, extra={"notify": True}) tasks.add_done_callback(done) yield from tasks if "post" in steps: self.logger.info("Running post-index process for index '%s'", index_name) self.register_status("indexing", transient=True, init=True, job={"step": "post-index"}) pinfo = self.get_pinfo() pinfo["step"] = "post_index" # for some reason (like maintaining object's state between pickling). # we can't use process there. Need to use thread to maintain that state without # building an unmaintainable monster job = yield from job_manager.defer_to_thread( pinfo, partial(self.post_index, target_name, index_name, job_manager, steps=steps, batch_size=batch_size, ids=ids, mode=mode)) def posted(f): nonlocal got_error try: res = f.result() self.logger.info( "Post-index process done for index '%s': %s", index_name, res) self.register_status("indexing", job={"step": "post-index"}) except Exception as e: got_error = e self.logger.exception( "Post-index process failed for index '%s':", index_name, extra={"notify": True}) return job.add_done_callback(posted) yield from job # consume future if got_error: self.register_status("failed", job={"err": repr(got_error)}) raise got_error else: self.register_status("success") return {"%s" % self.index_name: cnt}
def register_status(self, status, transient=False, init=False, **extra): assert self.build_doc src_build = get_src_build() job_info = { 'status': status, 'step_started_at': datetime.now().astimezone(), 'logfile': self.logfile, } index_info = { "index": { self.index_name: { 'host': self.host, 'environment': self.env, 'conf_name': self.conf_name, 'target_name': self.target_name, 'index_name': self.index_name, 'doc_type': self.doc_type, 'num_shards': self.num_shards, 'num_replicas': self.num_replicas } } } if transient: # record some "in-progress" information job_info['pid'] = os.getpid() else: # only register time when it's a final state job_info["time"] = timesofar(self.ti) t1 = round(time.time() - self.ti, 0) job_info["time_in_s"] = t1 index_info["index"][self.index_name]["created_at"] = datetime.now().astimezone() if "index" in extra: index_info["index"][self.index_name].update(extra["index"]) if "job" in extra: job_info.update(extra["job"]) # since the base is the merged collection, we register info there build = src_build.find_one({'_id': self.target_name}) assert build, "Can't find build document '%s'" % self.target_name if init: # init timer for this step self.ti = time.time() src_build.update({'_id': self.target_name}, {"$push": { 'jobs': job_info }}) # now refresh/sync build = src_build.find_one({'_id': self.target_name}) else: # merge extra at root level # (to keep building data...) and update the last one # (it's been properly created before when init=True) build["jobs"] and build["jobs"][-1].update(job_info) def merge_index_info(target, d): if "__REPLACE__" in d.keys(): d.pop("__REPLACE__") target = d else: for k, v in d.items(): if type(v) == dict: if k in target: target[k] = merge_index_info(target[k], v) else: v.pop("__REPLACE__", None) # merge v with "nothing" just to make sure to remove any "__REPLACE__" v = merge_index_info({}, v) target[k] = v else: target[k] = v return target build = merge_index_info(build, index_info) src_build.replace_one({"_id": build["_id"]}, build)
[docs] def post_index(self, target_name, index_name, job_manager, steps=["index", "post"], batch_size=10000, ids=None, mode=None): """ Override in sub-class to add a post-index process. Method's signature is the same as index() to get the full context. This method will run in a thread (using job_manager.defer_to_thread()) """ pass
def setup_log(self): self.logger, self.logfile = get_logger('index_%s' % self.index_name, self.log_folder)
[docs] def get_index_creation_settings(self): """ Override to return a dict containing some extra settings for index creation. Dict will be merged with mandatory settings, see biothings.utils.es.ESIndexer.create_index for more. """ return { # as of ES6, include_in_all was removed, we need to create our own "all" field "query": { "default_field": "all" }, "codec": "best_compression", # as of ES6, analysers/tokenizers must be defined in index settings, during creation "analysis": { "analyzer": { # soon deprecated in favor of keyword_lowercase_normalizer "string_lowercase": { "tokenizer": "keyword", "filter": "lowercase" }, "whitespace_lowercase": { "tokenizer": "whitespace", "filter": "lowercase" }, }, "normalizer": { "keyword_lowercase_normalizer": { "filter": ["lowercase"], "type": "custom", "char_filter": [] }, } }, }
[docs] def enrich_final_mapping(self, final_mapping): """ final_mapping is the ES mapping ready to be sent, (with "dynamic" and "all" at its root for instance) this method gives opportunity to add more mapping definitions not directly related to datasources, such as other root keys """ return final_mapping
[docs] def get_mapping(self): '''collect mapping data from data sources. ''' mapping = self.build_doc.get("mapping", {}) # default "all" field to replace include_in_all field in older versions of ES mapping["all"] = {'type': 'text'} final_mapping = {"properties": mapping, "dynamic": "false"} final_mapping = self.enrich_final_mapping(final_mapping) final_mapping["_meta"] = self.get_metadata() return final_mapping
def get_metadata(self): return self.build_doc.get("_meta", {}) def get_build(self, target_name=None): target_name = target_name or self.target_name assert target_name, "target_name must be defined first before searching for builds" builds = [b for b in self.build_config["build"] if b == target_name] assert len( builds ) == 1, "Can't find build for config '%s' and target_name '%s'" % ( self.conf_name, self.target_name) return self.build_config["build"][builds[0]] def get_src_versions(self): build = self.get_build() return build["src_version"] def get_stats(self): build = self.get_build() return build["stats"] def get_timestamp(self): build = self.get_build() return build["build_date"] def get_build_version(self): build = self.get_build() return build["build_version"]
[docs] def load_build(self, target_name=None): '''Load build info from src_build collection.''' target_name = target_name or self.target_name src_build = get_src_build() self.build_doc = src_build.find_one({'_id': target_name}) assert self.build_doc, "Can't find build document associated to '%s'" % target_name _cfg = self.build_doc.get("build_config") if _cfg: self.build_config = _cfg #if not "doc_type" in _cfg: # raise ValueError("Missing 'doc_type' in build config") self.doc_type = _cfg.get("doc_type") self.num_shards = _cfg.get("num_shards", 10) # optional self.num_shards = self.num_shards and int( self.num_shards) or self.num_shards self.num_replicas = _cfg.get("num_replicas", 0) # optional self.num_replicas = self.num_replicas and int( self.num_replicas) or self.num_replicas self.conf_name = _cfg["name"] else: raise ValueError("Cannot find build config associated to '%s'" % target_name) return _cfg
[docs]class ColdHotIndexer(Indexer): """ This indexer works with 2 mongo collections to create a single index. - one premerge collection contains "cold" data, which never changes (not updated) - another collection contains "hot" data, regularly updated Index is created fetching the premerge documents. Then, documents from the hot collection are merged by fetching docs from the index, updating them, and putting them back in the index. """ def __init__(self, *args, **kwargs): super(ColdHotIndexer, self).__init__(*args, **kwargs) self.hot_target_name = None self.cold_target_name = None self.cold_build_doc = None self.hot_build_doc = None self.cold_cfg = None self.hot_cfg = None
[docs] @asyncio.coroutine def index(self, hot_name, index_name, job_manager, steps=["index", "post"], batch_size=10000, ids=None, mode="index"): """ Same as Indexer.index method but works with a cold/hot collections strategy: first index the cold collection then complete the index with hot collection (adding docs or merging them in existing docs within the index) """ assert job_manager # check what to do if type(steps) == str: steps = [steps] self.hot_target_name = hot_name self.setup_log() self.load_build() if type(index_name) == list: # values are coming from target names, use the cold self.index_name = self.hot_target_name else: self.index_name = index_name got_error = False cnt = 0 if "index" in steps: # selectively index cold then hot collections, using default index method # but specifically 'index' step to prevent any post-process before end of # index creation # Note: copy backend values as there are some references values between cold/hot and build_doc hot_backend_url = self.hot_build_doc["backend_url"] cold_backend_url = self.cold_build_doc["backend_url"] # target collection is taken from backend_url field, temporarily override. self.build_doc["backend_url"] = cold_backend_url cold_task = super(ColdHotIndexer, self).index(self.cold_target_name, self.index_name, steps="index", job_manager=job_manager, batch_size=batch_size, ids=ids, mode=mode) # wait until cold is fully indexed yield from cold_task # use updating indexer worker for hot to merge in index # back to hot collection self.build_doc["backend_url"] = hot_backend_url hot_task = super(ColdHotIndexer, self).index(self.hot_target_name, self.index_name, steps="index", job_manager=job_manager, batch_size=batch_size, ids=ids, mode="merge") task = asyncio.ensure_future(hot_task) def done(f): nonlocal got_error nonlocal cnt try: res = f.result() # compute overall inserted/updated records cnt = sum(res.values()) self.register_status("success", job={"step": "index"}, index={"count": cnt}) self.logger.info("index '%s' successfully created", index_name, extra={"notify": True}) except Exception as e: logging.exception("failed indexing cold/hot collections:") got_error = e raise task.add_done_callback(done) yield from task if got_error: raise got_error if "post" in steps: # use super index but this time only on hot collection (this is the entry point, cold collection # remains hidden from outside) hot_task = super(ColdHotIndexer, self).index(self.hot_target_name, self.index_name, steps="post", job_manager=job_manager, batch_size=batch_size, ids=ids, mode=mode) task = asyncio.ensure_future(hot_task) def posted(f): nonlocal got_error try: _ = f.result() # no need to process the return value more, it's been done in super except Exception as e: self.logger.exception( "Post-index process failed for index '%s':", self.index_name, extra={"notify": True}) got_error = e raise task.add_done_callback(posted) yield from task if got_error: raise got_error return {self.index_name: cnt}
# by default, build_doc is considered to be the hot one # (mainly used so we can call super methods as parent) @property def build_doc(self): return self.hot_build_doc @build_doc.setter def build_doc(self, val): self.hot_build_doc = val
[docs] def get_mapping(self): final_mapping = super(ColdHotIndexer, self).get_mapping() cold_mapping = self.cold_build_doc.get("mapping", {}) final_mapping["properties"].update(cold_mapping) # mix cold&hot return final_mapping
def get_metadata(self): meta = merge_src_build_metadata( [self.cold_build_doc, self.hot_build_doc]) return meta def get_src_versions(self): _meta = self.get_metadata() return _meta["src_version"] def get_stats(self): _meta = self.get_metadata() return _meta["stats"] def get_timestamp(self): _meta = self.get_metadata() return _meta["build_date"] def get_build_version(self): _meta = self.get_metadata() return _meta["build_version"]
[docs] def load_build(self): """ Load cold and hot build documents. Index settings are the one declared in the hot build doc. """ src_build = get_src_build() # we don't want to reload build docs if they are already loaded # so we can temporarily override values when dealing with cold/hot collection # (kind of a hack, not really clean, but...) if self.hot_build_doc and self.cold_build_doc and self.build_doc: self.logger.debug("Build documents already loaded") return self.hot_build_doc = src_build.find_one({'_id': self.hot_target_name}) # search the cold collection definition assert "build_config" in self.hot_build_doc and "cold_collection" in self.hot_build_doc["build_config"], \ "Can't find cold_collection field in build_config" self.cold_target_name = self.hot_build_doc["build_config"][ "cold_collection"] self.cold_build_doc = src_build.find_one( {'_id': self.cold_target_name}) # we'll register everything (status) on the hot one self.build_doc = self.hot_build_doc assert self.cold_build_doc, "Can't find build document associated to '%s'" % self.cold_target_name assert self.hot_build_doc, "Can't find build document associated to '%s'" % self.hot_target_name self.cold_cfg = self.cold_build_doc.get("build_config") self.hot_cfg = self.hot_build_doc.get("build_config") if self.hot_cfg or not self.cold_cfg: self.build_config = self.hot_cfg if "doc_type" not in self.hot_cfg: raise ValueError("Missing 'doc_type' in build config") self.doc_type = self.hot_cfg["doc_type"] self.num_shards = self.hot_cfg.get("num_shards", 10) # optional self.num_shards = self.num_shards and int( self.num_shards) or self.num_shards self.num_replicas = self.hot_cfg.get("num_replicas", 0) # optional self.num_replicas = self.num_replicas and int( self.num_replicas) or self.num_replicas self.conf_name = self.hot_cfg["name"] else: raise ValueError( "Cannot find build config associated to '%s' or '%s'" % (self.hot_target_name, self.cold_target_name)) return (self.cold_cfg, self.hot_cfg)
class IndexManager(BaseManager): DEFAULT_INDEXER = Indexer def __init__(self, *args, **kwargs): super(IndexManager, self).__init__(*args, **kwargs) self.src_build = get_src_build() self.indexers = {} self.es_config = {} self.t0 = time.time() self.prepared = False self.log_folder = LOG_FOLDER self.timestamp = datetime.now() self.setup() def clean_stale_status(self): src_build = get_src_build() for build in src_build.find(): dirty = False for job in build.get("jobs", []): if job.get("status") == "indexing": logging.warning( "Found stale build '%s', marking index status as 'canceled'", build["_id"]) job["status"] = "canceled" dirty = True if dirty: src_build.replace_one({"_id": build["_id"]}, build) def setup(self): self.setup_log() def setup_log(self): self.logger, self.logfile = get_logger('indexmanager', self.log_folder) def get_predicates(self): def no_other_indexmanager_step_running(job_manager): """IndexManager deals with snapshot, publishing, none of them should run more than one at a time""" return len([ j for j in job_manager.jobs.values() if j["category"] == INDEXMANAGER_CATEGORY ]) == 0 return [no_other_indexmanager_step_running] def get_pinfo(self): """ Return dict containing information about the current process (used to report in the hub) """ pinfo = { "category": INDEXMANAGER_CATEGORY, "source": "", "step": "", "description": "" } preds = self.get_predicates() if preds: pinfo["__predicates__"] = preds return pinfo def __getitem__(self, conf_name): """ Return an instance of an indexer for the build configuration named 'conf_name' Note: each call returns a different instance (factory call behind the scene...) """ kwargs = BaseManager.__getitem__(self, conf_name) return kwargs def configure_from_list(self, indexers_kwargs): for dindex in indexers_kwargs: assert len( dindex) == 1, "Invalid indexer registration data: %s" % dindex env, idxkwargs = list(dindex.items())[0] self.register[env] = idxkwargs def configure_from_dict(self, confdict): self.es_config = copy.deepcopy(confdict) self.indexers.update(confdict.get("indexer_select", {})) indexers_kwargs = [] for env, conf in confdict["env"].items(): idxkwargs = dict(**conf["indexer"]["args"]) # propagate ES host to indexer's kwargs idxkwargs["es_host"] = self.es_config["env"][env]["host"] indexers_kwargs.append({env: idxkwargs}) self.configure_from_list(indexers_kwargs) def configure(self, indexer_defs): """ Register indexers with: - a list of dict as: [{"indexer_type_name": partial},{....}] - a dict containing all indexer definitions: {"env" : { "env1" : { "host": "localhost:9200", "timeout": ..., "retry":..., "indexer" : "path.to.ClassIndexer", }, ... } Partial is used to instantiate an indexer, without args """ if type(indexer_defs) == list: self.configure_from_list(indexer_defs) elif type(indexer_defs) == dict: self.configure_from_dict(indexer_defs) else: raise ValueError( "Unknown indexer definitions type (expecting a list or a dict") self.logger.info(self.indexers) self.logger.info(self.register) def find_indexer(self, target_name): """ Return indexer class required to index target_name. Rules depend on what's inside the corresponding src_build doc and the indexers definitions """ doc = self.src_build.find_one({"_id": target_name}) if not self.indexers or not doc: return self.__class__.DEFAULT_INDEXER klass = None for path_in_doc in self.indexers: if klass is None and ( path_in_doc is None or path_in_doc == "default" or path_in_doc == ""): # couldn't find a klass yet and we found a default declated, keep it strklass = self.indexers[path_in_doc] klass = get_class_from_classpath(strklass) else: try: strklass = self.indexers[path_in_doc] klass = get_class_from_classpath(strklass) self.logger.info( "Found special indexer '%s' required to index '%s'", klass, target_name) # the first to match wins break except KeyError: pass if klass is None: self.logger.debug("Using default indexer") return self.__class__.DEFAULT_INDEXER else: # either we return a default declared in config or # a specific one found according to the doc self.logger.debug("Using custom indexer %s", klass) return klass def index(self, indexer_env, target_name=None, index_name=None, ids=None, **kwargs): """ Trigger an index creation to index the collection target_name and create an index named index_name (or target_name if None). Optional list of IDs can be passed to index specific documents. """ def indexed(f): try: res = f.result() self.logger.info( "Done indexing target '%s' to index '%s': %s", target_name, index_name, res) except Exception: self.logger.exception("Error while running index job:") raise idxklass = self.find_indexer(target_name) idxkwargs = self[indexer_env] idx = idxklass(**idxkwargs) idx.env = indexer_env idx.target_name = target_name index_name = index_name or target_name job = idx.index(target_name, index_name, ids=ids, job_manager=self.job_manager, **kwargs) job = asyncio.ensure_future(job) job.add_done_callback(indexed) return job def update_metadata(self, indexer_env, index_name, build_name=None, _meta=None): """ Update _meta for index_name, based on build_name (_meta directly taken from the src_build document) or _meta """ idxkwargs = self[indexer_env] # 1st pass we get the doc_type (don't want to ask that on the signature...) indexer = create_backend( (idxkwargs["es_host"], index_name, None)).target_esidxer m = indexer._es.indices.get_mapping(index_name) assert len(m[index_name]["mappings"]) == 1, "Found more than one doc_type: " + \ "%s" % m[index_name]["mappings"].keys() doc_type = list(m[index_name]["mappings"].keys())[0] # 2nd pass to re-create correct indexer indexer = create_backend( (idxkwargs["es_host"], index_name, doc_type)).target_esidxer if build_name: build = get_src_build().find_one({"_id": build_name}) assert build, "No such build named '%s'" % build_name _meta = build.get("_meta") assert _meta is not None, "No _meta found" return indexer.update_mapping_meta({"_meta": _meta}) def index_info(self, env=None, remote=False): res = copy.deepcopy(self.es_config) for kenv in self.es_config["env"]: if env and env != kenv: continue if remote: # lost all indices, remotely try: cl = Elasticsearch(res["env"][kenv]["host"], timeout=1, max_retries=0) indices = [{ "index": k, "doc_type": list(v["mappings"].keys())[0], "aliases": list(v["aliases"].keys()) } for k, v in cl.indices.get("*").items()] # init index key if not done in config var # (a default index name can be specified in config...) if "index" not in res["env"][kenv]: res["env"][kenv]["index"] = [] # for now, we just consider if type(res["env"][kenv]["index"]) == dict: # we don't where to put those indices because we don't # have that information, so we just put those in a default category # TODO: put that info in metadata ? res["env"][kenv]["index"].setdefault( None, []).extend(indices) else: assert type(res["env"][kenv]["index"]) == list res["env"][kenv]["index"].extend(indices) except Exception: self.logger.exception("Can't load remote indices:") continue return res def validate_mapping(self, mapping, env): idxkwargs = self[env] # just get the default indexer (target_name doesn't exist, return default one) idxklass = self.find_indexer(target_name="__placeholder_name__%s" % get_random_string()) idxr_obj = idxklass(**idxkwargs) settings = idxr_obj.get_index_creation_settings() # generate a random index, it'll be deleted at the end index_name = ("hub_tmp_%s" % get_random_string()).lower() idxr = ESIndexer(index=index_name, es_host=idxr_obj.host, doc_type=None) self.logger.info("Testing mapping by creating index '%s' on host '%s' (settings: %s)", index_name, idxr_obj.host, settings) try: res = idxr.create_index(mapping, settings) return res except Exception as e: self.logger.exception("create_index failed") raise e finally: try: idxr.delete_index() except Exception: pass
[docs]class DynamicIndexerFactory(object): """ In the context of autohub/standalone instances, create indexer with parameters taken from versions.json URL. A list of URLs is provided so the factory knows how to create these indexers for each URLs. There's no way to "guess" an ES host from a URL, so this parameter must be specified as well, common to all URLs "suffix" param is added at the end of index names. """ def __init__(self, urls, es_host, suffix="_current"): self.urls = urls self.es_host = es_host self.bynames = {} for url in urls: if isinstance(url, dict): name = url["name"] # actual_url = url["url"] else: name = os.path.basename(os.path.dirname(url)) # actual_url = url self.bynames[name] = { "es_host": self.es_host, "index": name + suffix } def create(self, name): conf = self.bynames[name] pidxr = partial(ESIndexer, index=conf["index"], doc_type=None, es_host=conf["es_host"]) conf = {"es_host": conf["es_host"], "index": conf["index"]} return pidxr, conf