import asyncio
import contextlib
import importlib
import logging
import os
import socket
import sys
import types
from datetime import datetime
from functools import partial
import pytest
from biothings import config as btconfig
from biothings.hub import APITESTER_CATEGORY
from biothings.hub.manager import BaseManager
from biothings.utils.hub_db import get_api
from biothings.utils.loggers import get_logger
from biothings.web.launcher import BiothingsAPILauncher
[docs]
class LoggerFile:
"""
File-like object that writes to a logger at a specific level.
This object is used to redirect stdout/stderr to a logger.
"""
def __init__(self, logger, level):
self.logger = logger
self.level = level
[docs]
def write(self, message):
if message.rstrip() != "":
self.logger.log(self.level, message)
[docs]
def isatty(self):
return False
[docs]
class APIManagerException(Exception):
pass
[docs]
class APIManager(BaseManager):
def __init__(self, log_folder=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api = get_api()
self.register = {}
self.timestamp = datetime.now()
self.log_folder = log_folder or btconfig.LOG_FOLDER
self.setup()
self.restore_running_apis()
[docs]
def setup(self):
self.setup_log()
[docs]
def setup_log(self):
self.logger, _ = get_logger("apimanager")
[docs]
def get_predicates(self):
return []
[docs]
def get_pinfo(self):
"""
Return dict containing information about the current process
(used to report in the hub)
"""
pinfo = {
"category": APITESTER_CATEGORY,
"source": "",
"description": "",
"step": "test_api",
}
preds = self.get_predicates()
if preds:
pinfo["__predicates__"] = preds
return pinfo
[docs]
def restore_running_apis(self):
"""
If some APIs were running but the hub stopped, re-start APIs
as hub restarts
"""
apis = self.get_apis()
# these were running but had to stop when hub stopped
running_apis = [api for api in apis if api.get("status") == "running"]
for api in running_apis:
self.logger.info("Restart API '%s'" % api["_id"])
self.start_api(api["_id"])
[docs]
def register_status(self, api_id, status, **extra):
apidoc = self.api.find_one({"_id": api_id})
apidoc.update(extra)
# clean according to status
if status == "running":
apidoc.pop("err", None)
else:
apidoc.pop("url", None)
apidoc["status"] = status
self.api.save(apidoc)
[docs]
def get_apis(self):
return [d for d in self.api.find()]
[docs]
def import_config_web(self):
try:
self.logger.info("Original sys.path: %s", sys.path)
if btconfig.APITEST_CONFIG_ROOT not in sys.path:
sys.path.append(btconfig.APITEST_CONFIG_ROOT)
self.logger.info("Updated sys.path: %s", sys.path)
config_mod = importlib.import_module(btconfig.APITEST_CONFIG)
self.logger.info("Imported %s as config_mod.", btconfig.APITEST_CONFIG)
except (AttributeError, ImportError):
self.logger.info("Cannot import APITEST_CONFIG variable from btconfig, creating a new module")
config_mod = types.ModuleType("config_mod")
finally:
return config_mod
[docs]
def log_pytests(self, pytest_path, host):
"""
Run the pytests for the given pytest path and host. We create a LoggerFile object to redirect stdout and stderr to the logger.
"""
stdout = LoggerFile(self.logger, logging.INFO)
stderr = LoggerFile(self.logger, logging.ERROR)
with contextlib.redirect_stdout(stdout), contextlib.redirect_stderr(stderr):
pytest.main(["-vv", pytest_path, "-m", "not production", "--host", host, "--scheme", "http"])
[docs]
def test_api(self, api_id):
"""
Run pytests for the given api id. If no pytest path is found in the config_web_local.py then log an error.
APITEST_CONFIG_ROOT: root directory containing the config_web
APITEST_CONFIG: name of the config file containing the api web configuration
APITEST_ROOT: root directory containing the conftest.py file for the pytests
APITEST_PATH: path to the directory containing the pytests
"""
assert self.job_manager
has_pytests = False
try:
if (
btconfig.APITEST_CONFIG_ROOT
and btconfig.APITEST_CONFIG
and btconfig.APITEST_ROOT
and btconfig.APITEST_PATH
):
has_pytests = True
except AttributeError:
self.logger.error(
"Missing APIRTEST_CONFIG_ROOT or API_CONFIG or APITEST_ROOT or APITEST_PATH. Skipping pytests for '%s'",
api_id,
)
try:
old_dir = os.getcwd()
# if has_pytest is true then run the pytests
if has_pytests:
# have to change directory otherwise pytest will not find the conftest.py file
self.logger.info("Changing directory from %s to %s", old_dir, btconfig.APITEST_ROOT)
os.chdir(btconfig.APITEST_ROOT)
apidoc = self.api.find_one({"_id": api_id})
port = int(apidoc["config"]["port"])
APITEST_PATH = os.path.join(btconfig.APITEST_ROOT, btconfig.APITEST_PATH)
self.logger.info("APITEST_PATH found in config. Running pytests from %s.", APITEST_PATH)
async def run_pytests(path, port):
pinfo = self.get_pinfo()
pinfo["description"] = "Running API tests"
# defer_to_process leaves the websocket open for unknown reasons when trying to stop the api so we use defer_to_thread
job = await self.job_manager.defer_to_thread(
pinfo, partial(self.log_pytests, path, "localhost:" + str(port))
)
got_error = None
def updated(f):
try:
_ = f.result()
self.logger.info("Finished running pytests for '%s'" % api_id)
self.register_status(api_id, "running", job={"step": "test_api"})
except Exception as e:
nonlocal got_error
self.logger.error("Failed to run pytests for '%s': %s" % (api_id, e))
self.register_status(api_id, "running", job={"err": repr(e)})
got_error = e
job.add_done_callback(updated)
await job
if isinstance(got_error, Exception):
raise got_error
job = asyncio.ensure_future(run_pytests(APITEST_PATH, port))
return job
except Exception as e:
self.logger.error("Failed to run pytests for '%s': %s" % (api_id, e))
raise
finally:
self.logger.info("Changing directory back to %s", old_dir)
os.chdir(old_dir)
[docs]
def start_api(self, api_id):
apidoc = self.api.find_one({"_id": api_id})
if not apidoc:
raise APIManagerException("No such API with ID '%s'" % api_id)
if "entry_point" in apidoc:
raise NotImplementedError(
"Custom entry point not implemented yet, " + "only basic generated APIs are currently supported"
)
config_mod = self.import_config_web()
config_mod.ES_HOST = apidoc["config"]["es_host"]
config_mod.ES_DOC_TYPE = apidoc["config"]["doc_type"]
try:
if config_mod.ES_INDICES:
for key in config_mod.ES_INDICES:
config_mod.ES_INDICES[key] = apidoc["config"]["index"]
except AttributeError:
config_mod.ES_INDEX = apidoc["config"]["index"]
launcher = BiothingsAPILauncher(config_mod)
port = int(apidoc["config"]["port"])
try:
server = launcher.get_server()
server.listen(port)
self.register[api_id] = server
self.logger.info("Running API '%s' on port %s" % (api_id, port))
url = "http://%s:%s" % (socket.gethostname(), port)
self.register_status(api_id, "running", url=url)
except Exception as e:
self.logger.exception("Failed to start API '%s'" % api_id)
self.register_status(api_id, "failed", err=str(e))
raise
[docs]
def stop_api(self, api_id):
try:
assert api_id in self.register, "API '%s' is not running" % api_id
server = self.register.pop(api_id)
server.stop()
if server._stopped:
self.register_status(api_id, "stopped")
except Exception as e:
self.logger.exception("Failed to stop API '%s'" % api_id)
self.register_status(api_id, "failed", err=str(e))
raise
[docs]
def delete_api(self, api_id):
try:
self.stop_api(api_id)
except Exception as e:
self.logger.warning("While trying to stop API '%s': %s" % (api_id, e))
finally:
self.api.remove({"_id": api_id})
[docs]
def create_api(
self,
api_id,
es_host,
index,
doc_type,
port,
description=None,
**kwargs,
):
apidoc = {
"_id": api_id,
"config": {
"port": port,
"es_host": es_host,
"index": index,
"doc_type": doc_type,
},
"description": description,
}
apidoc.update(kwargs)
self.api.save(apidoc)
return apidoc