"""
Elasticsearch Handlers
biothings.web.handlers.BaseESRequestHandler
Supports: (all features above and)
- access to biothing_type attribute
- access to ES query pipeline stages
- pretty print elasticsearch exceptions
- common control option out_format
Subclasses:
- biothings.web.handlers.MetadataSourceHandler
- biothings.web.handlers.MetadataFieldHandler
- myvariant.web.beacon.BeaconHandler
biothings.web.handlers.ESRequestHandler
Supports: (all features above and)
- common control options (raw, rawquery)
- common transform options (dotfield, always_list...)
- query pipeline customization hooks
- single query through GET
- multiple quers through POST
Subclasses:
- biothings.web.handlers.BiothingHandler
- biothings.web.handlers.QueryHandler
"""
import logging
from collections import Counter
from inspect import iscoroutinefunction
from types import CoroutineType
from tornado.web import Finish, HTTPError
from biothings.utils import serializer
from biothings.web.analytics.events import GAEvent
from biothings.web.handlers.base import BaseAPIHandler
from biothings.web.query.pipeline import QueryPipelineException, QueryPipelineInterrupt
__all__ = [
"BaseQueryHandler",
"MetadataSourceHandler",
"MetadataFieldHandler",
"BiothingHandler",
"QueryHandler",
]
logger = logging.getLogger(__name__)
[docs]
class BaseQueryHandler(BaseAPIHandler):
[docs]
def initialize(self, biothing_type=None, *args, **kwargs):
super().initialize(*args, **kwargs)
self.biothing_type = biothing_type
self.pipeline = self.biothings.pipeline
self.metadata = self.biothings.metadata
[docs]
def prepare(self):
super().prepare()
# provide convenient access to next stages
self.args.biothing_type = self.biothing_type
self.event = GAEvent(
{
"__secondary__": [], # secondary analytical objective: field tracking
"category": "{}_api".format(self.biothings.config.APP_VERSION), # eg.'v1_api'
"action": "_".join((self.name, self.request.method.lower())), # eg.'query_get'
# 'label': 'fetch_all', etc.
# 'value': 100, # number of queries
}
)
if self.args._source:
fields = [str(field) for field in self.args._source] # in case input is not str
fields = [field.split(".", 1)[0] for field in fields] # only consider root keys
for field, cnt in Counter(fields).items():
self.event["__secondary__"].append(
GAEvent({"category": "parameter_tracking", "action": "field_filter", "label": field, "value": cnt})
)
else:
self.event["__secondary__"].append(
GAEvent(
{
"category": "parameter_tracking",
"action": "field_filter",
"label": "all",
}
)
)
[docs]
def write(self, chunk):
# add an additional header to the JSON formatter
# with a header image and a title-like section
# further more, show a link to documentation
# relevant settings in config:
# HTML_OUT_TITLE
# HTML_OUT_HEADER_IMG
# HTML_OUT_<ENDPOINT>_DOCS
DEFAULT_TITLE = "<p>Biothings API</p>"
DEFAULT_IMG = "https://biothings.io/static/favicon.ico"
try:
if self.format == "html":
config = self.biothings.config
chunk = self.render_string(
template_name="api.html",
data=serializer.to_json(chunk),
link=serializer.URL(self.request.full_url()).remove("format"),
title_div=getattr(config, "HTML_OUT_TITLE", "") or DEFAULT_TITLE,
header_img=getattr(config, "HTML_OUT_HEADER_IMG", "") or DEFAULT_IMG,
learn_more=getattr(config, f"HTML_OUT_{self.name.upper()}_DOCS", ""),
)
self.set_header("Content-Type", "text/html; charset=utf-8")
return super(BaseAPIHandler, self).write(chunk)
except Exception as exc:
logger.warning(exc)
super().write(chunk)
async def ensure_awaitable(obj):
if isinstance(obj, CoroutineType):
return await obj
return obj
def capture_exceptions(coro):
async def _method(*args, **kwargs):
try:
return await coro(*args, **kwargs)
except QueryPipelineInterrupt as itr:
raise Finish(itr.details)
except QueryPipelineException as exc:
raise HTTPError(exc.code, None, exc.details, reason=exc.summary)
return _method
[docs]
class BiothingHandler(BaseQueryHandler):
r"""
Biothings Annotation Endpoint
URL pattern examples:
/{pre}/{ver}/{typ}/?
/{pre}/{ver}/{typ}/([^\/]+)/?
queries a term against a pre-determined field that
represents the id of a document, like _id and dbsnp.rsid
GET -> {...} or [{...}, ...]
POST -> [{...}, ...]
"""
name = "annotation"
[docs]
@capture_exceptions
async def post(self, *args, **kwargs):
self.event["value"] = len(self.args["id"])
result = await ensure_awaitable(self.pipeline.fetch(**self.args))
self.finish(result)
[docs]
@capture_exceptions
async def get(self, *args, **kwargs):
self.event["value"] = 1
result = await ensure_awaitable(self.pipeline.fetch(**self.args))
self.finish(result)
[docs]
class QueryHandler(BaseQueryHandler):
"""
Biothings Query Endpoint
URL pattern examples:
/{pre}/{ver}/{typ}/query/?
/{pre}/{ver}//query/?
GET -> {...}
POST -> [{...}, ...]
"""
name = "query"
[docs]
@capture_exceptions
async def post(self, *args, **kwargs):
self.event["value"] = len(self.args["q"])
result = await ensure_awaitable(self.pipeline.search(**self.args))
self.finish(result)
[docs]
@capture_exceptions
async def get(self, *args, **kwargs):
self.event["value"] = 1
if self.args.get("fetch_all"):
self.event["label"] = "fetch_all"
if self.args.get("fetch_all") or self.args.get("scroll_id") or self.args.get("q") == "__any__":
self.clear_header("Cache-Control")
response = await ensure_awaitable(self.pipeline.search(**self.args))
self.finish(response)