Source code for biothings.hub.api

import asyncio
import contextlib
import inspect
import logging
import socket
import time
import types
from functools import partial

import tornado.web

from biothings.hub.api.handlers.base import GenericHandler
from biothings.utils.hub import CommandDefinition, CommandError, CommandInformation


[docs] class EndpointDefinition(dict): pass
[docs] def generate_endpoint_for_callable(name, command, method, force_bodyargs): if force_bodyargs is True: pass # assert method != "get", \ # "Can't have force_bodyargs=True with method '%s' for command '%s'" % (method,command) try: specs = inspect.getfullargspec(command) except TypeError as e: # TODO: generate a display handler raise TypeError("Can't determine arguments for command '%s': %s" % (command, e)) argfrom = 0 if ( isinstance(command, types.MethodType) or isinstance(command, partial) and isinstance(command.func, types.MethodType) ): # skip "self" arg argfrom = 1 defaultargs = {} args = specs.args[argfrom:] # defaults are listed from the end ("n latest args") if specs.defaults: for i in range(-len(specs.defaults), 0): defaultargs[args[i]] = specs.defaults[i] # ignore "self" args, assuming it's the one used when dealing with method # mandatargs = set(args).difference({"self"}).difference(defaultargs) or "" mandatargs = force_bodyargs is False and set(args).difference(defaultargs) or "" cmdargs = "{}" if mandatargs: # this is for cmd args building before submitting to the shell cmdargs = "{" + ",".join(["'''%s''':%s" % (v, v) for v in mandatargs]) + "}" # this is for signature mandatargs = "," + ",".join(["%s" % v for v in mandatargs]) # generate a wrapper over the passed command # print({"method":method,"args":args,"defaultargs":defaultargs,"name":name, # "mandatargs":mandatargs,"cmdargs":cmdargs}) strcode = """ async def %(method)s(self%(mandatargs)s): '''%(name)s => %(command)s''' cmdargs = %(cmdargs)s reqargs = {} # holds either body or query string args qkwargs = {} # holds kwargs (either from body or query string) for k in cmdargs: if cmdargs[k] is None: raise tornado.web.HTTPError(400,reason="Bad Request (Missing argument " + k + ")") if "%(method)s" != "get": # allow to have no body at all, defaulting to empty dict (no args) reqargs = tornado.escape.json_decode(self.request.body or '{}') elif %(force_bodyargs)s == True: # force_bodyargs for arg in %(args)s: qarg = self.get_query_argument(arg,None) if qarg: reqargs[arg] = qarg; else: # extract optional args for arg in self.request.arguments: if not arg in reqargs: qkwargs[arg] = self.get_argument(arg) #print("arguments:") #print(%(args)s) #print(%(defaultargs)s) #print(reqargs) #print(qkwargs) for arg in %(args)s + list(reqargs.keys()) + list(qkwargs.keys()): if arg in %(defaultargs)s or arg in qkwargs: mandatory = False else: mandatory = True if %(force_bodyargs)s or "%(method)s" != "get": try: if mandatory: # part of signature (in URL) or body args ? try: cmdargs[arg] # just check key exists except KeyError: cmdargs[arg] = reqargs[arg] else: # check if optional has been passed or if value is taken from default # (used to display/build command line with minimal info, # ie. what's been passed by user) try: val = reqargs[arg] cmdargs[arg] = val except KeyError: pass except KeyError: raise tornado.web.HTTPError(400,reason="Bad Request (Missing argument " + arg + ")") else: # if not default arg and arg not passed, this will raise a 400 (by tornado) if mandatory: cmdargs[arg] # check key else: try: val = qkwargs[arg] cmdargs[arg] = val except KeyError: pass # we don't pass though shell evaluation there # to prevent security issue (injection)... strcmd = '''%(name)s''' + "(" strcmd += ",".join([str(k) + "=" + repr(v) for k,v in cmdargs.items()]) strcmd += ")" res = command(**cmdargs) # ... but we register the command in the shell to track it cmdres = shell.register_command(strcmd,res) if type(cmdres) == CommandInformation:# or type(cmdres) == list and type(: # asyncio tasks unserializable # but keep original one cmdres = CommandInformation([(k,v) for k,v in cmdres.items() if k != 'jobs']) from inspect import isawaitable if isawaitable(cmdres): self.write(await cmdres) else: self.write(cmdres) """ % { "method": method, "args": args, "defaultargs": defaultargs, "name": name, "mandatargs": mandatargs, "cmdargs": cmdargs, "command": repr(command), "force_bodyargs": force_bodyargs, } # if name == "info" or name == "builds": # print(strcode) return strcode, mandatargs != ""
[docs] def generate_endpoint_for_composite_command(name, command, method): strcode = """ async def %(method)s(self): # composite commands never take arguments cmdres = shell.eval('''%(name)s()''',return_cmdinfo=True) if type(cmdres) == CommandInformation: # asyncio tasks unserializable # but keep original one cmdres = CommandInformation([(k,v) for k,v in cmdres.items() if k != 'jobs']) self.write(cmdres) """ % { "method": method, "name": name, } return strcode
[docs] def generate_endpoint_for_display(name, command, method): strcode = """ async def %(method)s(self): self.write(command) """ % { "method": method } return strcode
[docs] def generate_handler(shell, name, command_defs): if not type(command_defs) == list: command_defs = [command_defs] by_suffix = {} for commanddef in command_defs: confdict = {} method = commanddef["method"].lower() cmdname = commanddef["name"] try: # retrieve the actual command from the shell command = shell.commands.get(cmdname) # first try public commands if command is None: command = shell.extra_ns[cmdname] # then private/hidden commands # could be directly a callable or an encapsulating CommandDefinition if type(command) == CommandDefinition: command = command["command"] except KeyError as e: raise CommandError("Command '%s' can't be found in hub shell" % e) force_bodyargs = commanddef.get("force_bodyargs", False) suffix = commanddef.get("suffix", "") num_mandatory = 0 # if callable(command): strcode, num_mandatory = generate_endpoint_for_callable(cmdname, command, method, force_bodyargs) # elif type(command) == CompositeCommand: # strcode = generate_endpoint_for_composite_command(name,command,method) # else: # assert method == "get", "display endpoint needs a GET method for command '%s'" % name # strcode = generate_endpoint_for_display(name,command,method) # compile the code string and eval (link) to a globals() dict code = compile(strcode, "<string>", "exec") command_globals = {} endpoint_ns = { "command": command, "asyncio": asyncio, "shell": shell, "CommandInformation": CommandInformation, "tornado": tornado, } eval(code, endpoint_ns, command_globals) methodfunc = command_globals[method] confdict[method] = methodfunc by_suffix.setdefault(suffix, {}) by_suffix[suffix].setdefault(method, {})["confdict"] = confdict by_suffix[suffix][method]["num_mandatory"] = num_mandatory by_suffix[suffix][method]["strcode"] = strcode routes = [] # when suffix present, it gives a new handler # so method don't get mixed (actually overwritten) together with non-suffixed # for i, (suffix, by_method) in enumerate(by_suffix.items()): for suffix, by_method in by_suffix.items(): confdict = {} methods = [] # merge all method into same handler for method, dat in by_method.items(): confdict.update(dat["confdict"]) methods.append(method) handler_class = type("%s%s_handler" % (name, suffix), (GenericHandler,), confdict) num_mandatory = dat["num_mandatory"] if suffix and not suffix.startswith("/"): suffix = "/" + suffix if num_mandatory: # this is for more REST alike URLs (eg. /info/clinvar == /info?src=clinvar url = r"/%s%s%s" % (name, r"/([\w\.-]+)?" * num_mandatory, suffix) else: url = r"/%s%s" % (name, suffix) routes.append((url, handler_class, {"shell": shell})) logging.info("route: %s %s => %s" % (repr([m.upper() for m in methods]), url, handler_class)) return routes
[docs] def create_handlers(shell, command_defs): routes = [] for cmdname, config in command_defs.items(): if type(config) == list: # multiple endpoints per handler commands = config else: # normalized as a list commands = [ EndpointDefinition( name=config["name"], method=config.get("method", "GET"), force_bodyargs=config.get("force_bodyargs", False), ) ] try: routes += generate_handler(shell, cmdname, commands) except TypeError as e: logging.exception("Can't generate handler for '%s': %s" % (cmdname, e)) continue return routes
# def generate_api_routes(shell, commands, settings={}):
[docs] def generate_api_routes(shell, commands): routes = create_handlers(shell, commands) return routes
[docs] def start_api(app, port, check=True, wait=5, retry=5, settings=None): settings = settings or {} if check: # check if port is used def check_socket(host, port): num = 1 with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: if sock.connect_ex((host, port)) == 0: if num >= retry: raise Exception(f"Can't start API, port {port} is already used and already tried {retry} times") logging.info("Port %s is already used, sleep and retry (%s/%s)", port, num, retry) time.sleep(wait) num += 1 else: return check_socket("localhost", port) app_server = tornado.httpserver.HTTPServer(app, **settings) app_server.listen(port) app_server.start() return app_server