Source code for biothings.hub.api.manager

import socket
import types
from datetime import datetime

from biothings import config as btconfig
from biothings.utils.hub_db import get_api
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseManager
from biothings.web.launcher import BiothingsAPILauncher


[docs] class APIManagerException(Exception): pass
[docs] class APIManager(BaseManager): def __init__(self, log_folder=None, *args, **kwargs): self.api = get_api() self.register = {} self.timestamp = datetime.now() self.log_folder = log_folder or btconfig.LOG_FOLDER self.setup() self.restore_running_apis()
[docs] def setup(self): self.setup_log()
[docs] def setup_log(self): self.logger, _ = get_logger("apimanager")
[docs] def restore_running_apis(self): """ If some APIs were running but the hub stopped, re-start APIs as hub restarts """ apis = self.get_apis() # these were running but had to stop when hub stopped running_apis = [api for api in apis if api.get("status") == "running"] for api in running_apis: self.logger.info("Restart API '%s'" % api["_id"]) self.start_api(api["_id"])
[docs] def register_status(self, api_id, status, **extra): apidoc = self.api.find_one({"_id": api_id}) apidoc.update(extra) # clean according to status if status == "running": apidoc.pop("err", None) else: apidoc.pop("url", None) apidoc["status"] = status self.api.save(apidoc)
[docs] def get_apis(self): return [d for d in self.api.find()]
[docs] def start_api(self, api_id): apidoc = self.api.find_one({"_id": api_id}) if not apidoc: raise APIManagerException("No such API with ID '%s'" % api_id) if "entry_point" in apidoc: raise NotImplementedError( "Custom entry point not implemented yet, " + "only basic generated APIs are currently supported" ) config_mod = types.ModuleType("config_mod") config_mod.ES_HOST = apidoc["config"]["es_host"] config_mod.ES_INDEX = apidoc["config"]["index"] config_mod.ES_DOC_TYPE = apidoc["config"]["doc_type"] launcher = BiothingsAPILauncher(config_mod) port = int(apidoc["config"]["port"]) try: server = launcher.get_server() server.listen(port) self.register[api_id] = server self.logger.info("Running API '%s' on port %s" % (api_id, port)) url = "http://%s:%s" % (socket.gethostname(), port) self.register_status(api_id, "running", url=url) except Exception as e: self.logger.exception("Failed to start API '%s'" % api_id) self.register_status(api_id, "failed", err=str(e)) raise
[docs] def stop_api(self, api_id): try: assert api_id in self.register, "API '%s' is not running" % api_id server = self.register.pop(api_id) server.stop() if server._stopped: self.register_status(api_id, "stopped") except Exception as e: self.logger.exception("Failed to stop API '%s'" % api_id) self.register_status(api_id, "failed", err=str(e)) raise
[docs] def delete_api(self, api_id): try: self.stop_api(api_id) except Exception as e: self.logger.warning("While trying to stop API '%s': %s" % (api_id, e)) finally: self.api.remove({"_id": api_id})
[docs] def create_api( self, api_id, es_host, index, doc_type, port, description=None, **kwargs, ): apidoc = { "_id": api_id, "config": { "port": port, "es_host": es_host, "index": index, "doc_type": doc_type, }, "description": description, } apidoc.update(kwargs) self.api.save(apidoc) return apidoc