Source code for biothings.hub.dataindex.snapshooter

Elasticsearch Snapshot Feature

Snapshot Config Example:
    "cloud": {
        "type": "aws",  # default, only one supported by now
        "access_key": None,
        "secret_key": None,
    "repository": {
        "name": "s3-$(Y)",
        "type": "s3",
        "settings": {
            "bucket": "<SNAPSHOT_BUCKET_NAME>",
            "base_path": "$(Y)",  # per year
            "region": "us-west-2",
        "acl": "private",
    #----------------------------- inferred from build doc from now on
    "indexer": {
        # reference to INDEX_CONFIG
        "env": "local",
    # when creating a snapshot, how long should we wait before querying ES
    # to check snapshot status/completion ? (in seconds)
    "monitor_delay": 60 * 5,

SnapshotManager => SnapshotEnvConfig(s)
SnapshotEnvConfig + (Build) -> SnapshotEnv
SnapshotEnv + Index + Snapshot -> SnapshotTaskEnv

import asyncio
import copy
import json
from functools import partial
from pprint import pformat
import time
import as aws
    from biothings import config as btconfig
    from config import logger as logging
except ImportError:
    import sys
    sys.path.insert(1, '/home/biothings/')
    import config
    from biothings import config_for_app
    from biothings import config as btconfig
    from config import logger as logging

from biothings.hub.databuild.buildconfig import AutoBuildConfig
from biothings.hub.datarelease import set_pending_to_release_note
from import ESIndexer
from import IndexerException as ESIndexerException
from biothings.utils.hub import template_out
from biothings.utils.hub_db import get_src_build
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseManager, BaseStatusRegisterer

[docs]class BuildSpecificEnv(dict): """ Snapshot Env % Build """ def __init__(self, env_conf, build_doc): super().__init__(self._template_out_config(env_conf, build_doc)) def _template_out_config(self, env_conf, build_doc): """ Template out for special value using build_doc Templated values can look like: "base_path" : "onefolder/%(_meta.build_version)s" where "_meta.build_version" value is taken from build_doc dictionary (dot field notation). In other words, such repo config are dynamic and potentially change for each index/snapshot created. """ try: strconf = template_out(json.dumps(env_conf), build_doc) return json.loads(strconf) except Exception as exc: logging.exception("Coudn't template out configuration: %s", exc) raise
class SnapshotTaskStatusRegister(BaseStatusRegisterer): STATUS_NAME = { "pre": "pre-snapshot", "snapshot": "snapshot", "post": "post-snapshot" } def __init__(self, index_name, snapshot_name, env_conf, build_doc): self.index_name = index_name self.snapshot_name = snapshot_name self.build_doc = build_doc self.env_conf = env_conf self.logger, self.logfile = get_logger( SNAPSHOOTER_CATEGORY, btconfig.LOG_FOLDER) @property # override def collection(self): return get_src_build() def reset_repository_info(self, snapshot_name): bdoc = self.build_doc if bdoc.get("snapshot", {}).get(snapshot_name): if bdoc["snapshot"][snapshot_name].pop("repository", None): # override def register_status( self, status, job_info, snapshot_info, transient=False, init=False, **extra): super().register_status( self.build_doc, "snapshot", status, transient=transient, init=init, job=job_info, snapshot={self.snapshot_name: snapshot_info}, **extra ) def register_start(self, step): return self.register_status( status='in progress', transient=True, init=True, job_info={"step": self.STATUS_NAME[step]}, snapshot_info={} ) def register_result(self, step, success, res): params = { 'status': 'success' if success else 'failed', 'job_info': { "step": self.STATUS_NAME[step], }, 'snapshot_info': { "conf": self.env_conf } } if success: params['job_info']['result'] = res params['snapshot_info'][step] = res else: params['job_info']['err'] = res params['snapshot_info'][step] = None return self.register_status(**params)
[docs]class SnapshotTaskProcessInfo(): """ Generate information about the current process. (used to report in the hub) """ def __init__(self, task_env): self.source = task_env.index_name self.category = SNAPSHOOTER_CATEGORY, self.predicates = self.get_predicates() self.es_host = task_env.indexer.es_host def get_predicates(self): return [] def get_pinfo(self, step, description): pinfo = { "category": self.category, "source": self.source, "step": step, "description": description } if self.predicates: pinfo["__predicates__"] = self.predicates return pinfo def get_pinfo_for(self, step): step_info = { "pre": ("pre-snapshot", None), "snapshot": ("snapshot", self.es_host), "post": ("post-snapshot", None) } return self.get_pinfo(*step_info[step])
class SnapshotTaskESStatusInterpreter(): def __init__(self, res, task_env): assert "snapshots" in res,\ "Can't find snapshot '%s' in repository '%s'" % \ (task_env.snapshot_name, task_env.repo) # assuming only one index in the snapshot, # so only check first element info = res["snapshots"][0] assert "state" in info, \ "Can't find state in snapshot '%s'" \ % task_env.snapshot_name = info self.state = info["state"] self.failed_shards = info["shards_stats"]["failed"] def is_running(self): return self.state in ("INIT", "IN_PROGRESS", "STARTED") def is_succeed(self): return self.state == "SUCCESS" and self.failed_shards == 0 def is_partially_succeed(self): return self.state == "SUCCESS" and self.failed_shards != 0
[docs]class SnapshotTaskEnv(): """ SnapshotEnv + Index Name + Snapshot Name Access to status register and ES indexer. """ def __init__(self, env, index, snapshot): self.env = env self.index_name = index self.snapshot_name = snapshot self.repo = env.env_config['repository']['name'] self.status = SnapshotTaskStatusRegister( index, self.snapshot_name, env.env_config, env.build_doc ) self.indexer = ESIndexer( index=index, doc_type=env.build_doc['index'][index]['doc_type'], es_host=env.build_doc['index'][index]['host'], check_index=index is not None ) def snapshot(self): return self.indexer.snapshot(self.repo, self.snapshot_name)
[docs] def wait_for_error(self): """ Blocking, execute in a thread. """ while True: info = self.indexer.get_snapshot_status(self.repo, self.snapshot_name) status = SnapshotTaskESStatusInterpreter(info, self) if status.is_running(): time.sleep(self.env.env_config.get('monitor_delay', 60)) elif status.is_succeed(): "Snapshot '%s' successfully created (host: '%s', repository: '%s')" % (self.snapshot_name, self.indexer.es_host, self.repo), extra={"notify": True} ) return None else: # failed if status.is_partially_succeed(): e = ESIndexerException( "Snapshot '%s' partially failed: state is %s but %s shards failed" % (self.snapshot_name, status.state, status.failed_shards)) else: e = ESIndexerException( "Snapshot '%s' failed: state is %s" % (self.snapshot_name, status.state)) logging.error( "Failed creating snapshot '%s' (host: %s, repository: %s), state: %s" % (self.snapshot_name, self.indexer.es_host, self.repo, status.state), extra={"notify": True}) return e
[docs]class SnapshotEnv(): """ Corresponds to an ES repository for a specific build. The repository type can be what are supported by ES. """ def __init__(self, job_manager, env_config, build_doc): self.job_manager = job_manager self.env_config = env_config # already build specific self.build_doc = build_doc
[docs] def snapshot(self, index, snapshot=None, steps=("pre", "snapshot", "post")): """ Make a snapshot of 'index', with name 'snapshot' in this env. Typically used to make a snapshot of an index created by the hub. """ # process params if isinstance(steps, str): steps = (steps,) snapshot = snapshot or index # create a task env, so we have access to indexers, etc. task_env = SnapshotTaskEnv(self, index, snapshot) @asyncio.coroutine def do(): pinfo = SnapshotTaskProcessInfo(task_env) # we only allow one repo conf per snapshot name task_env.status.reset_repository_info(snapshot) funcs = { 'pre': partial(self.pre_snapshot, task_env), 'snapshot': partial(self._snapshot, task_env), 'post': partial(self.post_snapshot, task_env) } for step in ('pre', 'snapshot', 'post'): if step in steps: task_env.status.register_start(step) job = yield from self.job_manager.defer_to_thread( pinfo.get_pinfo_for(step), funcs[step]) try: result = yield from job except Exception as e: task_env.status.register_result(step, False, str(e)) logging.error("Error running %s.", step) raise else: task_env.status.register_result(step, True, result)"Step %s done: %s.", step, result) set_pending_to_release_note(self.build_doc['_id']) # TODO: conditional return asyncio.ensure_future(do())
def _snapshot(self, task_env): task_env.snapshot()"Snapshot successfully launched.") error = task_env.wait_for_error() if error: raise RuntimeError("Snapshot failed: %s" % error) return "created" def pre_snapshot(self, task_env): pass def post_snapshot(self, task_env): pass
[docs]class SnapshotFSEnv(SnapshotEnv): def __init__(self, job_manager, env_config, build_doc): super().__init__(job_manager, env_config, build_doc) assert env_config['repository']['type'] == 'fs' raise NotImplementedError
[docs]class SnapshotS3Env(SnapshotEnv): """ Relevent Config Entries: { "cloud": { "type": "aws", # default, only one supported by now "access_key": None, "secret_key": None, }, "repository": { "name": "s3-$(Y)", "type": "s3", "settings": { "bucket": "<SNAPSHOT_BUCKET_NAME>", "base_path": "$(Y)", # per year "region": "us-west-2", }, "acl": "private", } } """ def __init__(self, job_manager, env_config, build_doc): super().__init__(job_manager, env_config, build_doc) assert env_config['repository']['type'] == 's3'
[docs] def pre_snapshot(self, task_env): """ Ensure the destination repository is ready. Create the bucket and repository if necessary. """ cloud = dict(self.env_config.get("cloud", {})) repository = dict(self.env_config.get("repository", {})) try: # check if already created task_env.indexer.get_repository(repository["name"]) except ESIndexerException: # first make sure bucket exists aws.create_bucket( name=repository["settings"]["bucket"], region=repository["settings"]["region"], aws_key=cloud.get("access_key"), aws_secret=cloud.get("secret_key"), acl=repository.get("acl", None), # let aws.create_bucket default it ignore_already_exists=True )"Create repository:\n%s" % pformat(repository)) task_env.indexer.create_repository(repository.pop("name"), repository) return "repo_ready"
[docs]class SnapshotEnvConfig(): """ Snapshot Env before Combining with Build Info. """ def __init__(self, name, env_class, env_config): = name self.env_class = env_class self.env_config = env_config
[docs] def get_env(self, job_manager, build_doc=None): """ Get build specific snapshot environment. """ env = BuildSpecificEnv(self.env_config, build_doc or {}) return self.env_class(job_manager, env, build_doc or {})
class SnapshotManager(BaseManager): SNAPSHOT_ENV = { "s3": SnapshotS3Env, "fs": SnapshotFSEnv } def __init__(self, index_manager, *args, **kwargs): super().__init__(*args, **kwargs) self.index_manager = index_manager self.snapshot_config = {} # user specified config @staticmethod def pending_snapshot(build_name): src_build = get_src_build() src_build.update({"_id": build_name}, {"$addToSet": {"pending": "snapshot"}}) @staticmethod def get_build_doc(index_name): src_build = get_src_build() doc = src_build.find_one({"index." + index_name: {"$exists": True}}) if not doc: logging.error("No build associated with index %s.", index_name) return doc # override def clean_stale_status(self): src_build = get_src_build() for build in src_build.find(): for job in build.get("jobs", []): if job.get("status", "") == "in progress": logging.warning( "Found stale build '%s', marking snapshot status as 'canceled'" % build["_id"]) job["status"] = "canceled" src_build.replace_one({"_id": build["_id"]}, build) # override def poll(self, state, func): super().poll(state, func, col=get_src_build()) def configure(self, snapshot_confdict): """ Configure manager with snapshot config dict. See SNAPSHOT_CONFIG in for the format. """ self.snapshot_config = copy.deepcopy(snapshot_confdict) for env, envconf in self.snapshot_config.get("env", {}).items(): if envconf.get("cloud"): assert envconf["cloud"]["type"] == "aws", \ "Only Amazon AWS cloud is supported at the moment" repo_type = envconf.get("repository", {}).get("type") if not repo_type: raise ValueError("Repository type not specified.") if repo_type not in self.SNAPSHOT_ENV.keys(): raise ValueError("Unsupported repository type %s.", repo_type) try: self.register[env] = SnapshotEnvConfig( name=env, env_class=self.SNAPSHOT_ENV[repo_type], env_config=envconf ) except Exception as e: logging.exception( "Couldn't setup snapshot environment '%s' because: %s" % (env, e)) def snapshot(self, env, index, snapshot=None, steps=("pre", "snapshot", "post")): """ Create a snapshot named "snapshot" (or, by default, same name as the index) from "index" according to environment definition (repository, etc...) "env". """ if env not in self.register: raise ValueError("Unknown snapshot environment '%s'." % env) build_doc = self.get_build_doc(index) if not build_doc: logging.warning("The index is not created by the hub.") env_for_build = self[env].get_env(self.job_manager, build_doc) return env_for_build.snapshot(index, snapshot=snapshot, steps=steps) def snapshot_build(self, build_doc): """ Create a snapshot basing on the autobuild settings in the build config. If the build config associated with this build has: { "autobuild": { "type": "snapshot", // implied when env is set. env must be set. "env": "local" // which es env to make the snapshot. }, ... } Attempt to make a snapshot for this build on the specified es env "local". """ @asyncio.coroutine def _(): autoconf = AutoBuildConfig(build_doc['build_config']) env = autoconf.auto_build.get('env') assert env, "Unknown autobuild env." try: latest_index = list(build_doc['index'].keys())[-1] except Exception:"No index already created, now create one.") yield from self.index_manager.index(env, build_doc['_id']) latest_index = build_doc['_id'] return self.snapshot(env, latest_index) return asyncio.ensure_future(_()) def snapshot_info(self, env=None, remote=False): return copy.deepcopy(self.snapshot_config) def test(): from biothings.utils.manager import JobManager from biothings.hub.dataindex.indexer import IndexManager loop = asyncio.get_event_loop() job_manager = JobManager(loop) index_manager = IndexManager(job_manager=job_manager) index_manager.configure(config.INDEX_CONFIG) snapshot_manager = SnapshotManager( index_manager=index_manager, job_manager=job_manager, poll_schedule="* * * * * */10" ) snapshot_manager.configure(config.SNAPSHOT_CONFIG) # snapshot_manager.poll("snapshot",snapshot_manager.snapshot_build) async def test_code(): snapshot_manager.snapshot('prod', 'mynews_202009170234_fjvg7skx', steps="post") asyncio.ensure_future(test_code()) loop.run_forever() if __name__ == '__main__': test()