# from http://asyncssh.readthedocs.io/en/latest/#id13
# To run this program, the file ``ssh_host_key`` must exist with an SSH
# private key in it to use as a server host key.
import asyncio
import copy
import datetime
import io
import json
import os
import pydoc
import re
import sys
import time
from collections import OrderedDict, UserDict
from functools import partial
from pprint import pformat
try:
from IPython import InteractiveShell
except ImportError:
# Suppress import error when we just run CLI
InteractiveShell = object
from biothings import config
from biothings.utils.common import timesofar
from biothings.utils.dataload import to_boolean
from biothings.utils.docs import flatten_doc
from biothings.utils.hub_db import backup, get_cmd, get_last_command, restore
from biothings.utils.loggers import ShellLogger
from biothings.utils.redirect_streams import RedirectStdStreams
if hasattr(config, "logger"):
logging = config.logger
else:
import logging
# useful variables to bring into hub namespace
pending = "pending"
HUB_ENV = hasattr(config, "HUB_ENV") and config.HUB_ENV or "" # default: prod (or "normal")
VERSIONS = HUB_ENV and "%s-versions" % HUB_ENV or "versions"
LATEST = HUB_ENV and "%s-latest" % HUB_ENV or "latest"
[docs]
def jsonreadify(cmd):
newcmd = copy.copy(cmd)
newcmd.pop("jobs")
# try to make the data structure of the returned
# results for each command is hubdb compatible.
if "results" in newcmd:
results = []
for result in newcmd.pop("results"):
if isinstance(result, UserDict):
results.append(result.data)
#
# elif isinstance(result, ...):
# pass # add more here
#
else: # already compliant
results.append(result)
newcmd["results"] = results
return newcmd
##############
# HUB SERVER #
##############
[docs]
class AlreadyRunningException(Exception):
pass
[docs]
class CommandError(Exception):
pass
[docs]
class NoSuchCommand(Exception):
pass
[docs]
class CommandNotAllowed(Exception):
pass
[docs]
class CommandDefinition(dict):
pass
[docs]
class HubShell(InteractiveShell):
launched_commands = {}
pending_outputs = {}
cmd_cnt = None
cmd = None # "cmd" collection
def __init__(self, job_manager):
self.job_manager = job_manager
self.commands = OrderedDict()
self.managers = {}
self.extra_ns = OrderedDict()
self.tracked = {} # command calls kept in history or not
self.hidden = {} # not returned by help()
self.origout = sys.stdout
self.buf = io.StringIO()
self.shellog = ShellLogger(name="shell")
# there should be only one shell instance (todo: singleton)
self.__class__.cmd = get_cmd()
self.__class__.set_command_counter()
self.last_std_contents = None
super(HubShell, self).__init__(user_ns=self.extra_ns)
[docs]
@classmethod
def set_command_counter(cls):
assert cls.cmd, "No cmd collection set"
try:
res = get_last_command()
if res:
logging.debug("Last launched command ID: %s", res["_id"])
cls.cmd_cnt = int(res["_id"]) + 1
else:
logging.info("No previously stored command found, set counter to 1")
cls.cmd_cnt = 1
except StopIteration:
logging.info("Can't find highest command number, assuming starting from scratch")
cls.cmd_cnt = 1
[docs]
def set_commands(self, basic_commands, *extra_ns):
def register(commands, hidden=False):
for name, cmd in commands.items():
if name in self.commands:
raise CommandError("Command defined multiple time: %s" % name)
# if type(cmd) == CommandDefinition: # TODO: remove this line
if isinstance(cmd, CommandDefinition):
try:
self.commands[name] = cmd["command"]
self.tracked[name] = cmd.get("tracked", True)
self.hidden[name] = cmd.get("hidden", False) or hidden
except KeyError as e:
raise CommandError(
"Could not register command because missing '%s' in definition: %s" % (e, cmd)
)
else:
self.commands[name] = cmd
self.hidden[name] = hidden
# update original passed commands to caller knows what's been done there
if name not in basic_commands:
basic_commands[name] = cmd
# update with ssh server default commands
register(basic_commands)
# don't track this calls
register({"restart": CommandDefinition(command=self.restart, track=True)})
register({"stop": CommandDefinition(command=self.stop, track=True)})
register({"backup": CommandDefinition(command=backup, track=True)})
register({"restore": CommandDefinition(command=restore, track=True)})
register({"help": CommandDefinition(command=self.help, track=False)})
register({"commands": CommandDefinition(command=self.command_info, tracked=False)})
register(
{
"command": CommandDefinition(
command=lambda id, *args, **kwargs: self.command_info(id=id, *args, **kwargs),
tracked=False,
)
}
)
for extra in extra_ns:
# don't expose extra commands, they're kind of private/advanced
register(extra, hidden=True)
# self.extra_ns["cancel"] = self.__class__.cancel
# for boolean calls
self.extra_ns["_and"] = _and
self.extra_ns["partial"] = partial
self.extra_ns["hub"] = self
# merge official/public commands with hidden/private to
# make the whole available in shell's namespace
self.extra_ns.update(self.commands)
# Note: there's no need to update shell namespace as self.extra_ns
# has been passed by ref in __init__() so things get updated automagically
# (self.user_ns.update(...) can be used otherwise, self.user_ns is IPython
# internal namespace dict
[docs]
def stop(self, force=False):
return self.restart(force=force, stop=True)
[docs]
def restart(self, force=False, stop=False):
async def do():
try:
if stop:
event = "hub_stop"
msg = "Hub is stopping"
else:
event = "hub_restart"
msg = "Hub is restarting"
logging.critical(
json.dumps({"type": "alert", "msg": msg, "event": event}),
extra={"event": True},
)
logging.info("Stopping job manager...")
j = self.job_manager.stop(force=force)
def ok(f):
f.result() # consume
logging.error("Job manager stopped")
j.add_done_callback(ok)
await j
except Exception as e:
logging.error("Error while recycling the process queue: %s", e)
raise
def start(f):
f.result() # consume future's result to potentially raise exception
logging.debug("%s %s", [sys.executable], sys.argv)
import subprocess
subprocess.Popen([sys.executable] + sys.argv)
self.job_manager.hub_process.kill()
sys.exit(0)
def autokill(f):
f.result()
self.job_manager.hub_process.kill()
fut = asyncio.ensure_future(do())
if stop:
logging.warning("Stopping hub")
fut.add_done_callback(autokill)
else:
logging.warning("Restarting hub")
fut.add_done_callback(start)
return fut
[docs]
def help(self, func=None):
"""
Display help on given function/object or list all available commands
"""
if not func:
cmds = "\nAvailable commands:\n\n"
for k in self.commands:
if self.hidden[k]:
continue
cmds += "\t%s\n" % k
cmds += "\nType: 'help(command)' for more\n"
return cmds
elif isinstance(func, partial):
docstr = "\n" + pydoc.render_doc(func.func, title="Hub documentation: %s")
docstr += "\nDefined et as a partial, with:\nargs:%s\nkwargs:%s\n" % (
repr(func.args),
repr(func.keywords),
)
return docstr
elif isinstance(func, CompositeCommand):
docstr = "\nComposite command:\n\n%s\n" % func
return docstr
else:
try:
return "\n" + pydoc.render_doc(func, title="Hub documentation: %s")
except ImportError:
return "\nHelp not available for this command\n"
[docs]
def launch(self, pfunc):
"""
Helper to run a command and register it
pfunc is partial taking no argument. Command name
is generated from partial's func and arguments
"""
res = pfunc()
# rebuild a command as string
strcmd = pfunc.func.__name__
strcmd += "("
strargs = []
if pfunc.args:
strargs.append(",".join([repr(a) for a in pfunc.args]))
if pfunc.keywords:
strargs.append(",".join(["%s=%s" % (k, repr(v)) for (k, v) in pfunc.keywords.items()]))
strcmd += ",".join(strargs)
strcmd += ")"
# we use force here because, very likely, the command from strcmd we generated
# isn't part of shell's known commands (and there's a check about that when force=False)
self.register_command(strcmd, res, force=True)
return pfunc
[docs]
def extract_command_name(self, cmd):
try:
# extract before () (non-callable are never tracked)
grps = re.fullmatch(r"([\w\.]+)(\(.*\))", cmd.strip()).groups()
return grps[0]
except AttributeError:
raise CommandError("Can't extract command name from '%s'" % repr(cmd))
[docs]
@classmethod
def save_cmd(cls, _id, cmd):
newcmd = jsonreadify(cmd)
cls.cmd.replace_one({"_id": _id}, newcmd, upsert=True)
[docs]
def register_managers(self, managers):
self.managers = managers
[docs]
def register_command(self, cmd, result, force=False):
"""
Register a command 'cmd' inside the shell (so we can keep track of it).
'result' is the original value that was returned when cmd was submitted.
Depending on the type, returns a cmd number (ie. result was an asyncio task
and we need to wait before getting the result) or directly the result of
'cmd' execution, returning, in that case, the output.
"""
# see if command should actually be registered
try:
cmdname = self.extract_command_name(cmd)
except CommandError:
# if can't extract command name, then don't even try to register
# (could be, for instance, "pure" python code typed from the console)
logging.debug("Can't extract command from %s, can't register", cmd)
return result
# also, never register non-callable command
if not force and (not callable(self.extra_ns.get(cmdname)) or self.tracked.get(cmdname, True) is False):
return result
cmdnum = self.__class__.cmd_cnt
cmdinfo = CommandInformation(cmd=cmd, jobs=result, started_at=time.time(), id=cmdnum, is_done=False)
assert cmdnum not in self.__class__.launched_commands
# register
self.__class__.launched_commands[cmdnum] = cmdinfo
self.__class__.save_cmd(cmdnum, cmdinfo)
self.__class__.cmd_cnt += 1
# TODO: clean up the following if statement, confirm with Sebastien
if (
type(result) == asyncio.tasks.Task
or type(result) == asyncio.tasks._GatheringFuture
or type(result) == asyncio.Future
or type(result) == list
and len(result) > 0
and type(result[0]) == asyncio.tasks.Task
):
# it's asyncio related
result = type(result) != list and [result] or result # TODO: cleanup and confirm this line
cmdinfo["jobs"] = result
return cmdinfo
else:
# ... and it's not asyncio related, we can display it directly
cmdinfo["is_done"] = True
cmdinfo["failed"] = False
cmdinfo["started_at"] = time.time()
cmdinfo["finished_at"] = time.time()
cmdinfo["duration"] = "0s"
return result
[docs]
def eval(self, line, return_cmdinfo=False, secure=False):
line = line.strip()
self.shellog.input(line)
origline = line # keep what's been originally entered
# poor man's singleton...
if line in [j["cmd"] for j in self.__class__.launched_commands.values() if not j.get("is_done")]:
raise AlreadyRunningException("Command '%s' is already running\n" % repr(line))
# is it a hub command, in which case, intercept and run the actual declared cmd
# IMPORTANT !!! this is where we allow the command or not when secure=True IMPORTANT !!!
# the logic is following:
# - what's before parenthesis must exactly match a command
# - parenthesis are mandatory
# - no '&&' operator allowed
if secure:
# command must be alpha only, argument with "," and "=", or no arg at all
pat = r'^([A-Za-z_]+)\(["\'\w\s=,.-]*\)$'
else:
pat = r"(.*)\(.*\)" # more permissive
m = re.match(pat, line)
if m:
cmd = m.groups()[0].strip()
if secure and cmd not in self.commands:
# match regex rule but not a valid/existing command, discard it
raise NoSuchCommand(cmd)
if cmd in self.commands and isinstance(self.commands[cmd], CompositeCommand):
line = self.commands[cmd]
elif line != "" and secure:
# we have something entered, it doesn't match our regex rule, discard it
raise CommandNotAllowed(line)
# cmdline is the actual command sent to shell, line is the one displayed
# they can be different if there's a preprocessing
cmdline = line
# && cmds ? ie. chained cmds
if "&&" in line:
chained_cmds = [cmd for cmd in map(str.strip, line.split("&&")) if cmd]
if len(chained_cmds) > 1:
# need to build a command with _and and using partial, meaning passing original func param
# to the partials
strcmds = []
for one_cmd in chained_cmds:
func, args = re.match(r"(.*)\((.*)\)", one_cmd).groups()
if args:
strcmds.append("partial(%s,%s)" % (func, args))
else:
strcmds.append("partial(%s)" % func)
cmdline = "_and(%s)" % ",".join(strcmds)
else:
raise CommandError("Using '&&' operator required two operands\n")
# r = self.run_cell(cmdline, store_history=True)
outputs = []
with RedirectStdStreams() as redirect_stream:
r = self.run_cell(cmdline, store_history=True)
self.last_std_contents = redirect_stream.get_std_contents()
if not r.success:
raise CommandError("%s\n" % repr(r.error_in_exec))
else:
# command was a success, now get the results:
if r.result is None:
# -> nothing special was returned, grab the stdout
self.buf.seek(0)
# from print stdout ?
b = self.buf.read()
outputs.append(b)
# clear buffer
self.buf.seek(0)
self.buf.truncate()
else:
# -> we have something returned...
res = self.register_command(cmd=origline, result=r.result)
# if type(res) != CommandInformation: # TODO: remove this line
if not isinstance(res, CommandInformation):
# if type(res) != str: # TODO: remove this line
if not isinstance(res, str):
outputs.append(pformat(res))
else:
outputs.append(res)
else:
if return_cmdinfo:
return res
# Note: this will cause all outputs to go to one SSH session, ie. if multiple users
# are logged, only one will the results
if self.__class__.pending_outputs:
outputs.extend(self.__class__.pending_outputs.values())
self.__class__.pending_outputs = {}
return outputs
# @classmethod
# def cancel(klass,jobnum):
# return klass.launched_commands.get(jobnum)
[docs]
@classmethod
def refresh_commands(cls):
for num, info in sorted(cls.launched_commands.items()):
# already process, this current command is now history
# Note: if we have millions of commands there, it could last quite a while,
# but IRL we only have a few
if info.get("is_done") is True:
continue
# is_done = set([j.done() for j in info["jobs"]]) == set([True]) # TODO: remove this line
is_done = {j.done() for j in info["jobs"]} == {True}
has_err = is_done and [True for j in info["jobs"] if j.exception()] or None
localoutputs = (
is_done
and ([str(j.exception()) for j in info["jobs"] if j.exception()] or [j.result() for j in info["jobs"]])
or None
)
if is_done:
cls.launched_commands[num]["is_done"] = True
cls.launched_commands[num]["failed"] = has_err and has_err[0] or False
cls.launched_commands[num]["results"] = localoutputs
cls.launched_commands[num]["finished_at"] = time.time()
cls.launched_commands[num]["duration"] = timesofar(
t0=cls.launched_commands[num]["started_at"],
t1=cls.launched_commands[num]["finished_at"],
)
cls.save_cmd(num, cls.launched_commands[num])
if not has_err and localoutputs and set(map(type, localoutputs)) == {str}:
localoutputs = "\n" + "".join(localoutputs)
cls.pending_outputs[num] = "[%s] %s {%s} %s: finished %s " % (
num,
has_err and "ERR" or "OK",
timesofar(info["started_at"]),
info["cmd"],
localoutputs,
)
else:
cls.pending_outputs[num] = "[%s] RUN {%s} %s" % (
num,
timesofar(info["started_at"]),
info["cmd"],
)
[docs]
@classmethod
def command_info(cls, id=None, running=None, failed=None):
cmds = {}
if id is not None:
try:
id = int(id)
return jsonreadify(cls.launched_commands[id])
except KeyError:
raise CommandError("No such command with ID %s" % repr(id))
except ValueError:
raise CommandError("Invalid ID %s" % repr(id))
if running is not None:
is_done = not to_boolean(running)
else:
is_done = None
if failed is not None:
failed = to_boolean(failed)
for _id, cmd in cls.launched_commands.items():
if is_done is not None:
# running or done commands (not both)
if cmd.get("is_done") == is_done:
# done + failed (a failed command is always done btw)
if failed is not None and cmd.get("is_done") is True:
if cmd.get("failed") == failed:
cmds[_id] = jsonreadify(cmd)
else:
# don't care if failed or not
cmds[_id] = jsonreadify(cmd)
else:
# If asked is_done=true, it means command _id has is_done=false
# if we get there. So the command is sill running, so we don't
# know if it failed or not, so no need to check failed there,
# it's been handled above.
# If asksed is_done=false, we don't need to check failed,
# same logic applies
continue
else:
# either running or done commands (both)
if failed is not None and cmd.get("is_done") is True:
if cmd.get("failed") == failed:
cmds[_id] = jsonreadify(cmd)
else:
# don't care if failed or not
cmds[_id] = jsonreadify(cmd)
return cmds
####################
# DEFAULT HUB CMDS #
####################
# these can be used in client code to define
# commands. partial should be used to pass the
# required arguments, eg.:
# {"schedule" ; partial(schedule,loop)}
[docs]
def stats(src_dump):
pass
[docs]
def template_out(field, confdict):
"""
Return field as a templated-out filed,
substituting some "%(...)s" part with confdict,
Fields can follow dotfield notation.
Fields like "$(...)" are replaced with a timestamp
following specified format (see time.strftime)
Example::
confdict = {"a":"one"}
field = "%(a)s_two_three_$(%Y%m)"
=> "one_two_three_201908" # assuming we're in August 2019
"""
# first deal with timestamp
pat = re.compile(r".*(\$\((.*?)\)).*")
try:
m = pat.match(field)
except TypeError:
# not string/byte-like just skip the process
return field
if m:
tosub, fmt = m.groups()
ts = datetime.datetime.now().strftime("%%%s" % fmt)
field = field.replace(tosub, ts)
flatdict = flatten_doc(confdict)
# then use dict to sub keys
field = field % flatdict
return field
[docs]
def publish_data_version(s3_bucket, s3_folder, version_info, update_latest=True, aws_key=None, aws_secret=None):
"""
Update remote files:
- versions.json: add version_info to the JSON list
or replace if arg version_info is a list
- latest.json: update redirect so it points to latest version url
"versions" is dict such as::
{"build_version":"...", # version name for this release/build
"require_version":"...", # version required for incremental update
"target_version": "...", # version reached once update is applied
"type" : "incremental|full" # release type
"release_date" : "...", # ISO 8601 timestamp, release date/time
"url": "http...."} # url pointing to release metadata
"""
# import utils.aws within this function to avoid boto3 import error in
# the same like CLI (boto3 is not required)
import biothings.utils.aws as aws
# register version
versionskey = os.path.join(s3_folder, "%s.json" % VERSIONS)
try:
versions = json.loads(
aws.get_s3_file_contents(versionskey, aws_key=aws_key, aws_secret=aws_secret, s3_bucket=s3_bucket).decode()
)
except (FileNotFoundError, json.JSONDecodeError):
versions = {"format": "1.0", "versions": []}
if isinstance(version_info, list):
versions["versions"] = version_info
else:
# used to check duplicates
tmp = {}
# [tmp.setdefault(e["build_version"], e) for e in versions["versions"]] # TODO: remove this line
for e in versions["versions"]:
tmp.setdefault(e["build_version"], e)
tmp[version_info["build_version"]] = version_info
# order by build_version
versions["versions"] = sorted(tmp.values(), key=lambda e: e["build_version"])
aws.send_s3_file(
None,
versionskey,
content=json.dumps(versions, indent=True),
aws_key=aws_key,
aws_secret=aws_secret,
s3_bucket=s3_bucket,
content_type="application/json",
overwrite=True,
)
# update latest
if not isinstance(version_info, list) and update_latest:
latestkey = os.path.join(s3_folder, "%s.json" % LATEST)
newredir = os.path.join("/", s3_folder, "{}.json".format(version_info["build_version"]))
# the consensus is that we will upload the data and have the
# redirection, for record-keep purpose
aws.send_s3_file(
None,
latestkey,
content=json.dumps(version_info["build_version"], indent=True),
content_type="application/json",
aws_key=aws_key,
aws_secret=aws_secret,
s3_bucket=s3_bucket,
overwrite=True,
redirect=newredir,
)
def _and(*funcs):
"""
Calls passed functions, one by one. If one fails, then it stops.
Function should return a asyncio Task. List of one Task only are also permitted.
Partial can be used to pass arguments to functions.
Ex: _and(f1,f2,partial(f3,arg1,kw=arg2))
"""
all_res = []
func1 = funcs[0]
func2 = None
fut1 = func1()
# if type(fut1) == list: # TODO: remove this line
if isinstance(fut1, list):
assert len(fut1) == 1, "Can't deal with list of more than 1 task: %s" % fut1
fut1 = fut1.pop()
if not isinstance(fut1, asyncio.Future):
raise CommandError("First command didn't return a future, can't chain commands")
all_res.append(fut1)
# err = None
def do(f, cb):
f.result() # consume exception if any
if cb:
all_res.extend(_and(cb, *funcs))
if len(funcs) > 1:
func2 = funcs[1]
if len(funcs) > 2:
funcs = funcs[2:]
else:
funcs = []
fut1.add_done_callback(partial(do, cb=func2))
return all_res
[docs]
class CompositeCommand(str):
"""
Defines a composite hub commands, that is,
a new command made of other commands. Useful to define
shortcuts when typing commands in hub console.
"""
def __init__(self, cmd):
self.cmd = cmd
def __str__(self):
return "<CompositeCommand: '%s'>" % self.cmd
############
# RELOADER #
############
[docs]
def exclude_from_reloader(path):
# exlucde cached, git and hidden files
return path.endswith("__pycache__") or ".git" in path or os.path.basename(path).startswith(".")
[docs]
class BaseHubReloader(object):
"""
Monitor sources' code and reload hub accordingly to update running code
"""
def __init__(self, paths, reload_func, wait=5.0):
"""
Monitor given paths for directory deletion/creation
and for file deletion/creation. Poll for events every 'wait' seconds.
"""
raise NotImplementedError("Implement me in a sub-class")
[docs]
def poll(self):
"""Start monitoring changes on files and/directories"""
raise NotImplementedError("Implement me in a sub-class")
[docs]
def watched_files(self):
"""Return a list of files/directories being watched"""
raise NotImplementedError("Implement me in a sub-class")
[docs]
class TornadoAutoReloadHubReloader(BaseHubReloader):
"""Reloader based on tornado.autoreload module"""
def __init__(self, paths, reload_func, wait=5):
self.mod = sys.modules["tornado.autoreload"]
if isinstance(paths, str):
paths = [paths]
paths = set(paths) # get rid of duplicated, just in case
self.reload_func = reload_func
self.mod.add_reload_hook(self.reload_func)
# only listen to these events. Note: directory detection is done via a flag so
# no need to use IS_DIR
self.add_watch(paths)
self.wait = wait
[docs]
def monitor(self):
logging.info("Monitoring source code in, %s:\n%s", repr(self.paths), pformat(self.watched_files()))
self.mod.start(self.wait * 1000) # millis
[docs]
def add_watch(self, paths):
"""This method recursively adds the input paths, and their children to tornado autoreload for watching them.
If any file changes, the tornado will call our hook to reload the hub.
Each path will be forced to become an absolute path.
If a path is matched excluding patterns, it will be ignored.
Only file is added for watching. Directory will be passed to another add_watch.
"""
input_paths = paths.copy()
self.paths = []
for path in input_paths:
if not os.path.isabs(path):
path = os.path.abspath(path)
if exclude_from_reloader(path):
continue
self.paths.append(path)
self.mod.watch(path)
for dirpath, dirnames, filenames in os.walk(path):
if exclude_from_reloader(dirpath):
continue
# Add file to watcher
for fn in filenames:
f_path = os.path.join(dirpath, fn)
if exclude_from_reloader(f_path):
continue
self.mod.watch(f_path)
# add dirnames' contents to watcher
self.add_watch([os.path.join(dirpath, dirname) for dirname in dirnames])
[docs]
def watched_files(self):
return self.mod._watched_files
# return [d for d in self.mod._watched_files if os.path.isdir(d)]
[docs]
def get_hub_reloader(*args, **kwargs):
if getattr(config, "USE_RELOADER", False):
import tornado.autoreload # noqa
logging.info("Using Hub reloader based on tornado.autoreload")
def ensure_run_task(func):
def wrapper():
result = func()
if isinstance(result, asyncio.Future):
asyncio.get_event_loop().run_until_complete(result)
return wrapper
if kwargs.get("reload_func"):
logging.info("Decorator reload_func to ensure it can be run")
kwargs["reload_func"] = ensure_run_task(kwargs["reload_func"])
return TornadoAutoReloadHubReloader(*args, **kwargs)
else:
logging.info("USE_RELOADER not set (or False), won't monitor for changes")
return None