Source code for biothings.hub.dataindex.snapshooter

import asyncio
import json
import os
import time
from collections import UserDict, UserString
from dataclasses import dataclass
from datetime import datetime
from functools import partial

import boto3
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError, TransportError

from biothings import config as btconfig
from biothings.hub import SNAPSHOOTER_CATEGORY
from biothings.hub.databuild.buildconfig import AutoBuildConfig
from biothings.hub.datarelease import set_pending_to_release_note
from biothings.hub.manager import BaseManager
from biothings.utils.common import merge
from biothings.utils.exceptions import RepositoryVerificationFailed
from biothings.utils.hub import template_out
from biothings.utils.hub_db import get_src_build
from biothings.utils.loggers import get_logger
from config import logger as logging

from . import snapshot_cleanup as cleaner, snapshot_registrar as registrar
from .snapshot_repo import Repository
from .snapshot_task import Snapshot


[docs] class ProcessInfo: """ JobManager Process Info. Reported in Biothings Studio. """ def __init__(self, env): self.env_name = env
[docs] def get_predicates(self): return []
[docs] def get_pinfo(self, step, snapshot, description=""): pinfo = { "__predicates__": self.get_predicates(), "category": SNAPSHOOTER_CATEGORY, "step": f"{step}:{snapshot}", "description": description, "source": self.env_name, } return pinfo
[docs] @dataclass class CloudStorage: type: str access_key: str secret_key: str region: str = "us-west-2"
[docs] def get(self): if self.type == "aws": session = boto3.Session( aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, region_name=self.region, ) return session.resource("s3") # [X] raise ValueError(self.type)
[docs] class Bucket: def __init__(self, client, bucket, region=None): self.client = client # boto3.S3.Client [X] self.bucket = bucket # bucket name self.region = region
[docs] def exists(self): bucket = self.client.Bucket(self.bucket) return bool(bucket.creation_date)
[docs] def create(self, acl="private"): # https://boto3.amazonaws.com/v1/documentation/api # /latest/reference/services/s3.html # #S3.Client.create_bucket return self.client.create_bucket( ACL=acl, Bucket=self.bucket, CreateBucketConfiguration={"LocationConstraint": self.region}, )
def __str__(self): return f"<Bucket {'READY' if self.exists() else 'MISSING'} name='{self.bucket}' client={self.client}>"
class _UserString(UserString): def __str__(self): return f"{type(self).__name__}({self.data})"
[docs] class TemplateStr(_UserString): ...
[docs] class RenderedStr(_UserString): ...
[docs] class RepositoryConfig(UserDict): """ { "type": "s3", "name": "s3-$(Y)", "settings": { "bucket": "<SNAPSHOT_BUCKET_NAME>", "base_path": "mynews.info/$(Y)", # per year } } """ @property def repo(self): return self["name"] @property def bucket(self): return self["settings"]["bucket"] @property def region(self): return self["settings"]["region"]
[docs] def format(self, doc=None): """Template special values in this config. For example: { "bucket": "backup-$(Y)", "base_path" : "snapshots/%(_meta.build_version)s" } where "_meta.build_version" value is taken from doc in dot field notation, and the current year replaces "$(Y)". """ template = TemplateStr(json.dumps(self.data)) string = RenderedStr(template_out(template.data, doc or {})) if "%" in string: logging.error(template) logging.error(string) raise ValueError("Failed to template.") if template != string: logging.debug(template) logging.debug(string) return RepositoryConfig(json.loads(string.data))
class _SnapshotResult(UserDict): def __str__(self): return f"{type(self).__name__}({str(self.data)})"
[docs] class CumulativeResult(_SnapshotResult): ...
[docs] class StepResult(_SnapshotResult): ...
[docs] class SnapshotEnv: def __init__(self, job_manager, cloud, repository, indexer, **kwargs): self.job_manager = job_manager self.cloud = CloudStorage(**cloud).get() self.repcfg = RepositoryConfig(repository) self.client = Elasticsearch(**indexer["args"]) self.name = kwargs["name"] # snapshot env self.idxenv = indexer["name"] # indexer env self.pinfo = ProcessInfo(self.name) self.wtime = kwargs.get("monitor_delay", 15) def _doc(self, index): doc = get_src_build().find_one({f"index.{index}.environment": self.idxenv}) if not doc: # not asso. with a build raise ValueError("Not a hub-managed index.") return doc # TODO UNIQUENESS
[docs] def setup_log(self, index): build_doc = self._doc(index) log_name = build_doc["target_name"] or build_doc["_id"] log_folder = os.path.join(btconfig.LOG_FOLDER, "build", log_name, "snapshot") if btconfig.LOG_FOLDER else None self.logger, _ = get_logger(index, log_folder=log_folder, force=True)
[docs] def snapshot(self, index, snapshot=None, recreate_repo=False): self.setup_log(index) async def _snapshot(snapshot): x = CumulativeResult() build_doc = self._doc(index) cfg = self.repcfg.format(build_doc) for step in ("pre", "snapshot", "post"): state = registrar.dispatch(step) # _TaskState Class state = state(get_src_build(), build_doc.get("_id")) self.logger.info(state) state.started() job = await self.job_manager.defer_to_thread( self.pinfo.get_pinfo(step, snapshot), partial(getattr(self, state.func), cfg, index, snapshot, recreate_repo=recreate_repo), ) try: dx = await job dx = StepResult(dx) except RepositoryVerificationFailed as ex: self.logger.exception(ex) state.failed(snapshot, detail=ex.args) raise ex except Exception as exc: self.logger.exception(exc) state.failed({}, exc) raise exc else: merge(x.data, dx.data) self.logger.info(dx) self.logger.info(x) state.succeed({snapshot: x.data}, res=dx.data) return x future = asyncio.ensure_future(_snapshot(snapshot or index)) future.add_done_callback(self.logger.debug) return future
[docs] def pre_snapshot(self, cfg, index, snapshot, **kwargs): bucket = Bucket(self.cloud, cfg.bucket, region=cfg.region) repo = Repository(self.client, cfg.repo) self.logger.info(bucket) self.logger.info(repo) if kwargs.get("recreate_repo"): self.logger.info("Delete old repository") repo.delete() if not repo.exists(): if not bucket.exists(): bucket.create(cfg.get("acl")) self.logger.info(bucket) repo.create(**cfg) self.logger.info(repo) try: repo.verify(config=cfg) except TransportError as tex: raise RepositoryVerificationFailed({"error": tex.error, "detail": tex.info["error"]}) return { "__REPLACE__": True, "conf": {"repository": cfg.data}, "indexer_env": self.idxenv, "environment": self.name, }
def _snapshot(self, cfg, index, snapshot, **kwargs): snapshot = Snapshot(self.client, cfg.repo, snapshot) self.logger.info(snapshot) _replace = False if snapshot.exists(): snapshot.delete() self.logger.info(snapshot) _replace = True # ------------------ # snapshot.create(index) # ------------------ # while True: self.logger.info(snapshot) state = snapshot.state() if state == "FAILED": raise ValueError(state) elif state == "IN_PROGRESS": time.sleep(self.wtime) elif state == "SUCCESS": break else: # PARTIAL/MISSING/N/A raise ValueError(state) return { "index_name": index, "replaced": _replace, "created_at": datetime.now().astimezone(), }
[docs] def post_snapshot(self, cfg, index, snapshot, **kwargs): build_id = self._doc(index)["_id"] set_pending_to_release_note(build_id) return {}
[docs] def snapshot_exists(self, snapshot_name, build_doc): cfg = self.repcfg.format(build_doc) snapshot = Snapshot(self.client, cfg.repo, snapshot_name) try: return snapshot.exists() except NotFoundError: return False except Exception as e: logging.exception(f"Error checking if snapshot '{snapshot_name}' exists: {e}") raise
[docs] class SnapshotManager(BaseManager): """ Hub ES Snapshot Management Config Ex: # env.<name>: { "cloud": { "type": "aws", # default, only one supported. "access_key": <------------------>, "secret_key": <------------------>, "region": "us-west-2" }, "repository": { "name": "s3-$(Y)", "type": "s3", "settings": { "bucket": "<SNAPSHOT_BUCKET_NAME>", "base_path": "mygene.info/$(Y)", # year }, "acl": "private", }, "indexer": { "name": "local", "args": { "request_timeout": 100, "max_retries": 5 } }, "monitor_delay": 15, } """ def __init__(self, index_manager, *args, **kwargs): super().__init__(*args, **kwargs) self.index_manager = index_manager self.snapshot_config = {}
[docs] @staticmethod def pending_snapshot(build_name): src_build = get_src_build() src_build.update( {"_id": build_name}, {"$addToSet": {"pending": "snapshot"}}, )
# Object Lifecycle Calls # -------------------------- # manager = IndexManager(job_manager) # manager.clean_stale_status() # in __init__ # manager.configure(config)
[docs] def clean_stale_status(self): registrar.audit(get_src_build(), logging)
[docs] def configure(self, conf): self.snapshot_config = conf for name, envdict in conf.get("env", {}).items(): # Merge Indexer Config # ---------------------------------------- dx = envdict["indexer"] if isinstance(dx, str): # {"indexer": "prod"} dx = dict(name=dx) # . ↓ if not isinstance(dx, dict): # {"indexer": {"name": "prod"}} raise TypeError(dx) # compatibility with previous hubs. dx.setdefault("name", dx.pop("env", None)) x = self.index_manager[dx["name"]] x = dict(x) # merge into a copy merge(x, dx) # <- envdict["indexer"] = x # ------------------------------------------ envdict["name"] = name self.register[name] = SnapshotEnv(self.job_manager, **envdict)
[docs] def poll(self, state, func): super().poll(state, func, col=get_src_build())
# Features # -----------
[docs] def snapshot(self, snapshot_env, index, snapshot=None, recreate_repo=False): """ Create a snapshot named "snapshot" (or, by default, same name as the index) from "index" according to environment definition (repository, etc...) "env". """ env = self.register[snapshot_env] return env.snapshot(index, snapshot, recreate_repo=recreate_repo)
[docs] def snapshot_a_build(self, build_doc): """ Create a snapshot basing on the autobuild settings in the build config. If the build config associated with this build has: { "autobuild": { "type": "snapshot", // implied when env is set. env must be set. "env": "local" // which es env to make the snapshot. }, ... } Attempt to make a snapshot for this build on the specified es env "local". """ async def _(): autoconf = AutoBuildConfig(build_doc["build_config"]) env = autoconf.auto_build.get("env") assert env, "Unknown autobuild env." if isinstance(env, str): indexer_env = env snapshot_env = env else: # assume env is an (x,y) pair indexer_env, snapshot_env = env try: # find the index (latest) to snapshot latest_index = list(build_doc["index"].keys())[-1] except Exception: # no existing indices, need to create one await self.index_manager.index(indexer_env, build_doc["_id"]) latest_index = build_doc["_id"] # index_name is build name return self.snapshot(snapshot_env, latest_index) return asyncio.ensure_future(_())
[docs] def snapshot_info(self, env=None, remote=False): return self.snapshot_config
[docs] def list_snapshots(self, env=None, return_db_cols=True, **filters): return cleaner.find( # filters support dotfield get_src_build(), env=env, group_by="build_config", return_db_cols=return_db_cols, **filters, )
[docs] def cleanup( self, env=None, # a snapshot environment describing a repository keep=3, # the number of most recent snapshots to keep in one group group_by="build_config", # the attr of which its values form groups dryrun=True, # display the snapshots to be deleted without deleting them ignoreErrors=False, # continue deleting snapshots even if an error occurs **filters, # a set of criterions to limit which snapshots are to be cleaned ): """Delete past snapshots and keep only the most recent ones. Examples: >>> snapshot_cleanup() >>> snapshot_cleanup("s3_outbreak") >>> snapshot_cleanup("s3_outbreak", keep=0) """ # filters support dotfield. snapshots = cleaner.find(get_src_build(), env=env, keep=keep, group_by=group_by, **filters) if dryrun: return "\n".join( ( "-" * 75, cleaner.plain_text(snapshots), "-" * 75, "DRYRUN ONLY - APPLY THE ACTIONS WITH:", " > snapshot_cleanup(..., dryrun=False)", ) ) # return the number of snapshots successfully deleted return cleaner.delete(get_src_build(), snapshots, self, ignoreErrors)
[docs] def delete_snapshots(self, snapshots_data, ignoreErrors=False): async def delete(environment, snapshot_names): if environment == "__no_env__": environment = None return self.cleanup( env=environment, keep=0, dryrun=False, ignoreErrors=ignoreErrors, _id={"$in": snapshot_names} ) def done(f): try: # just consume the result to raise exception # if there were an error... (what an api...) f.result() logging.info("success", extra={"notify": True}) except Exception as e: logging.exception("failed: %s" % e, extra={"notify": True}) jobs = [] try: for environment, snapshot_names in snapshots_data.items(): job = self.job_manager.submit(partial(delete, environment, snapshot_names)) jobs.append(job) tasks = asyncio.gather(*jobs) tasks.add_done_callback(done) except Exception as ex: logging.exception("Error while deleting snapshots. error: %s", ex, extra={"notify": True}) return jobs
[docs] def delete_snapshot_from_db(self, build_name, snapshot_name): """ Delete a snapshot entry from the MongoDB database. This method removes the specified snapshot from the build document in the source build collection. Called when a snapshot is found to be missing in the snapshot environment during validation. Parameters: build_name (str): The name of the build from which to delete the snapshot. snapshot_name (str): The name of the snapshot to delete. Returns: None """ collection = get_src_build() collection.update_one( {"_id": build_name}, {"$unset": {f"snapshot.{snapshot_name}": 1}}, ) logging.info("Snapshot '%s' deleted from build '%s' in MongoDB", snapshot_name, build_name)
[docs] def validate_snapshots(self): """ Validate the snapshots stored in the database. This method checks each snapshot in the source build collection to verify whether it exists in the corresponding snapshot environment. If a snapshot does not exist in the environment, it is removed from the database. This helps keep the database in sync with the actual snapshots present in the environments. The validation process is executed asynchronously and any errors encountered during validation are logged. Returns: job (asyncio.Task): The asynchronous task representing the validation process. """ async def validate(): logging.info("Starting validation of snapshots...") collection = get_src_build() snapshots = self.list_snapshots(return_db_cols=True) errors = [] snapshots_deleted = 0 for group in snapshots: for snapshot_data in group["items"]: snapshot_name = snapshot_data["_id"] build_name = snapshot_data["build_name"] environment = snapshot_data.get("environment") or snapshot_data["conf"]["indexer"]["env"] if not environment: msg = ( f"[{snapshot_name}] Snapshot '{snapshot_name}' does not have an environment " "associated with it. Skipping validation." ) logging.warning(msg) errors.append(msg) continue try: env = self.register[environment] except KeyError: msg = ( f"[{snapshot_name}] Environment '{environment}' is not registered and " "connection details are unavailable. Consider adding it to the hub " "configuration otherwise manual deletion is required." ) logging.error(msg) errors.append(msg) continue build_doc = collection.find_one({"_id": build_name}) try: exists = env.snapshot_exists(snapshot_name, build_doc) if not exists: logging.info("Deleting snapshot '%s' from MongoDB", snapshot_name) self.delete_snapshot_from_db(build_name, snapshot_name) snapshots_deleted += 1 except Exception as e: msg = f"Error checking snapshot '{snapshot_name}': {str(e)}" logging.exception(msg) errors.append(msg) logging.info("Validation of snapshots completed.") return {"snapshots_deleted": snapshots_deleted, "errors": errors} def done(f): try: result = f.result() snapshots_deleted = result.get("snapshots_deleted", 0) errors = result.get("errors", []) if errors: error_message = "\n".join(errors) logging.error("Validation completed with errors:\n%s", error_message, extra={"notify": True}) else: logging.info( "Validation successful, %d snapshots deleted.", snapshots_deleted, extra={"notify": True} ) except Exception as e: logging.exception("Validation failed: %s", e, extra={"notify": True}) try: job = self.job_manager.submit(validate) job.add_done_callback(done) except Exception as ex: logging.exception("Error while submitting validation job: %s", ex, extra={"notify": True}) return job