"""
Biothings Query Builder
Turn the biothings query language to that of the database.
The interface contains a query term (q) and query options.
Depending on the underlying database choice, the data type
of the query term and query options vary. At a minimum,
a query builder should support:
q: str, a query term,
when not provided, always perform a match all query.
when provided as an empty string, always match none.
options: dotdict, optional query options.
scopes: list[str], the fields to look for the query term.
the meaning of scopes being an empty list or a
None object/not provided is controlled by specific
class implementations or not defined.
_source: list[str], fields to return in the result.
size: int, maximum number of hits to return.
from_: int, starting index of result to return.
sort: str, customized sort keys for result list
aggs: str, customized aggregation string.
post_filter: str, when provided, the search hits are filtered after the aggregations are calculated.
facet_size: int, maximum number of agg results.
"""
import logging
import os
import re
from collections import UserString, namedtuple
from copy import deepcopy
from random import randrange
import orjson
from elasticsearch_dsl import MultiSearch, Q, Search
from elasticsearch_dsl.exceptions import IllegalOperation
from biothings.utils.common import dotdict
[docs]
class RawQueryInterrupt(Exception):
def __init__(self, data):
super().__init__()
self.data = data
Query = namedtuple("Query", ("term", "scopes"))
Group = namedtuple("Group", ("term", "scopes"))
[docs]
class QStringParser:
def __init__(
self,
default_scopes=("_id",),
patterns=((r"(?P<scope>\w+):(?P<term>[^:]+)", ()),),
gpnames=("term", "scope"),
):
assert isinstance(default_scopes, (tuple, list))
assert all(isinstance(field, str) for field in default_scopes)
self.default = default_scopes # ["_id", "entrezgene", "ensembl.gene"]
self.patterns = [] # [(re.compile(r'^\d+$'), ['entrezgene', 'retired'])]
self.gpname = Group(*gpnames) # symbolic group name for term substitution
for pattern, fields in patterns:
fields = [fields] if isinstance(fields, str) else fields
assert all(isinstance(field, str) for field in fields)
pattern = re.compile(pattern) if isinstance(pattern, str) else pattern
if hasattr(re, "Pattern"): # TODO remove for python>3.7
assert isinstance(pattern, re.Pattern)
self.patterns.append((pattern, fields))
[docs]
def parse(self, q):
assert isinstance(q, str)
for regex, fields in self.patterns:
match = re.fullmatch(regex, q)
if match:
named_groups = match.groupdict()
q = named_groups.get(self.gpname.term) or q
_fields = named_groups.get(self.gpname.scopes)
fields = [_fields] if _fields else fields or self.default
return Query(q, fields)
return Query(q, self.default)
#
# ES Query Builder Architecture
# -------------------------------------------------------
# build
# (support multisearch)
# --------------------------↓↓↓--------------------------
# _build_one
# (dispatch basing on scopes, then apply_extras(..))
# ------------↓↓↓------------------------↓↓↓-------------
# _build_string_query | _build_match_query
# (__all__, userquery,..) | (compound match query)
# ------------↓↓↓------------------------↓↓↓-------------
# default_string_query | default_match_query
# (map to ES query string) | (map to ES match query)
# -------------------------------------------------------
[docs]
class ESQueryBuilder:
"""
Build an Elasticsearch query with elasticsearch-dsl.
"""
# Different from other query pipelines, elasticsearch
# supports querystring query, which means we can directly
# dispatch queires without fields to querystring query,
# and those with fields specified to typical match queries.
def __init__(
self,
user_query=None, # like a prepared statement in SQL
scopes_regexs=(), # inference used when encountering empty scopes
scopes_default=("_id",), # fallback used when scope inference fails
allow_random_query=True, # used for data exploration, can be expensive
allow_nested_query=False, # nested aggregation can be expensive
metadata=None, # access to data like total number of documents
):
# for autoscope feature, to infer scope from q when enabled
self.parser = QStringParser(scopes_default, scopes_regexs)
# all settings below affect only query string queries
self.user_query = user_query or ESUserQuery("userquery")
self.allow_random_query = allow_random_query
self.allow_nested_query = allow_nested_query # for aggregations
# currently metadata is only used for __any__ query
self.metadata = metadata
[docs]
def build(self, q=None, **options):
"""
Build a query according to q and options.
This is the public method called by API handlers.
Regarding scopes:
scopes: [str] nonempty, match query.
scopes: NoneType, or [], no scope, so query string query.
Additionally support these options:
explain: include es scoring information
userquery: customized function to interpret q
* additional keywords are passed through as es keywords
for example: 'explain', 'version' ...
* multi-search is supported when q is a list. all queries
are built individually and then sent in one request.
"""
options = dotdict(options)
if options.scroll_id:
# bypass all query building stages
return ESScrollID(options.scroll_id)
if options.fetch_all:
# clean up conflicting parameters
options.pop("sort", None)
options.pop("size", None)
try:
# process single q vs list of q(s).
# dispatch 'val' vs 'key:val' to corresponding functions.
if isinstance(q, list):
search = MultiSearch()
for _q in q:
_search = self._build_one(_q, options)
search = search.add(_search)
else: # str, int ...
search = self._build_one(q, options)
except IllegalOperation as exc:
raise ValueError(str(exc)) # ex. sorting by -_score
if options.get("rawquery"):
raise RawQueryInterrupt(search.to_dict())
return search
def _build_one(self, q, options):
# a single query, possibly included in a multi-search
# later but it itself is a single query unit.
if options.scopes:
search = self._build_match_query(q, options.scopes, options)
elif not isinstance(q, (list, tuple)) and options.autoscope:
q, scopes = self.parser.parse(str(q))
search = self._build_match_query(q, scopes, options)
else: # no scope provided and cannot derive from q
search = self._build_string_query(q, options)
# pass through es query options. (from, size ...)
search = self.apply_extras(search, options)
return search
def _build_string_query(self, q, options):
"""q + options -> query object
options:
userquery
"""
search = Search()
userquery = options.userquery or ""
if q == "": # same empty q behavior as that of ES.
search = search.query("match_none")
elif q == "__all__" or q is None:
search = search.query()
if options.aggs and not options.size:
options.size = 0
elif q == "__any__":
if self.allow_random_query:
search = search.query("function_score", random_score={})
else: # pseudo random by overriding 'from' value
search = search.query()
try: # limit 'from' parameter to a valid result window
metadata = self.metadata[options.biothing_type]
total = metadata["stats"]["total"]
fmax = total - options.get("size", 0)
from_ = randrange(fmax if fmax < 10000 else 10000)
options["from"] = from_ if from_ >= 0 else 0
except Exception:
raise ValueError("random query not available.")
elif self.user_query.has_query(userquery):
userquery_ = self.user_query.get_query(userquery, q=q)
search = search.query(userquery_)
else: # customization here
search = self.default_string_query(str(q), options)
if self.user_query.has_filter(userquery):
userfilter = self.user_query.get_filter(userquery)
search = search.filter(userfilter)
return search
def _build_match_query(self, q, scopes, options):
"""q + scopes + options -> query object
case 1:
# single match query
q = "1017"
scopes = ["_id"] or "_id"
case 2:
# compound match query
q = ["1017", "CDK2"]
scopes = [["_id", "entrezgene"], "symbol"]
"""
if not isinstance(q, (list, tuple)):
q, scopes = [q], [scopes]
# considering the complexity of data types,
# for example, q can take the type of int, bool, and float,
# maybe it's better to let elasticsearch or its python package
# handle the type checking. the checks below represent a
# typical case but is inconclusive.
# if not all((
# isinstance(q, (list, tuple)),
# all(isinstance(_q, str) for _q in q))):
# raise TypeError("Expect q: Union[list[str], str].", q)
# if not all((
# isinstance(scopes, (list, tuple)),
# all(isinstance(_s, (list, tuple, str)) for _s in scopes))):
# raise TypeError("Expect scopes: list[Union[list[str], str]].", scopes)
if not len(q) == len(scopes):
raise ValueError("Expect len(q) == len(scopes).")
# additional uncommon type errors
# will be raised in elasticsearch
search = Search()
for _q, _scopes in zip(q, scopes):
if not (_q and _scopes):
raise ValueError("No search terms or scopes.")
_search = self.default_match_query(_q, _scopes, options)
search = search.query(_search.query)
return search
[docs]
def default_string_query(self, q, options):
"""
Override this to customize default string query.
By default it implements a query string query.
"""
assert isinstance(q, str) and q
assert not options.scopes
return Search().query("query_string", query=q, default_operator="AND", lenient=True)
[docs]
def default_match_query(self, q, scopes, options):
"""
Override this to customize default match query.
By default it implements a multi_match query.
"""
assert isinstance(q, (str, int, float, bool))
assert isinstance(scopes, (list, tuple, str)) and scopes
_params = dict(query=q, fields=scopes, operator="AND", lenient=True)
if options.analyzer:
_params["analyzer"] = options.analyzer
return Search().query("multi_match", **_params)
[docs]
class MongoQueryBuilder:
def __init__(self, default_scopes=("_id",)):
self.parser = QStringParser(default_scopes)
[docs]
def build(self, q, **options):
fields = options.get("scopes", ())
if not fields and q:
q, fields = self.parser.parse(q)
assert isinstance(fields, (list, tuple))
assert q is None and not fields or q and isinstance(q, str)
assert all((isinstance(field, str) for field in fields))
filter_ = {field: 1 for field in options.get("_source", ())} or None # project fields to return
query = {"$or": [{field: q} for field in fields]} if fields else {}
if options.get("rawquery"):
raise RawQueryInterrupt((query, filter_))
return (query, filter_)
[docs]
class SQLQueryBuilder:
# PROOF OF CONCEPT
# INPUT NOT SANITIZED
# INTERNAL USE ONLY
def __init__(self, tables, default_scopes=("id",), default_limit=10):
assert default_scopes
assert isinstance(default_limit, int)
assert tables and isinstance(tables, dict)
self.tables = tables
self.default_limit = default_limit
self.parser = QStringParser(default_scopes)
if None not in self.tables: # set default table
self.tables[None] = next(iter(self.tables.values()))
[docs]
def build(self, q, **options):
statements = [
"SELECT {}".format(", ".join(options.get("_source", ())) or "*"),
"FROM {}".format(self.tables[options.get("biothing_type")]),
]
scopes = options.get("scopes")
if not scopes:
q, scopes = self.parser.parse(q)
if scopes and q:
assert isinstance(q, str)
selections = ['{} = "{}"'.format(field, q) for field in scopes]
statements.append("WHERE")
statements.append(" OR ".join(selections))
# limit result window
statements.append("LIMIT {}".format(options.get("size", self.default_limit)))
if "from_" in options:
statements.append("OFFSET {}".format(options["from_"]))
if options.get("rawquery"):
raise RawQueryInterrupt(statements)
return " ".join(statements)
[docs]
class ESUserQuery:
def __init__(self, path):
self._queries = {}
self._filters = {}
try:
for dirpath, dirnames, filenames in os.walk(path):
if dirnames:
self.logger.info("User query folders: %s.", dirnames)
continue
for filename in filenames:
with open(os.path.join(dirpath, filename)) as text_file:
if "query" in filename:
## alternative implementation # noqa: E266
# self._queries[os.path.basename(dirpath)] = text_file.read()
##
self._queries[os.path.basename(dirpath)] = orjson.loads(text_file.read())
elif "filter" in filename:
self._filters[os.path.basename(dirpath)] = orjson.loads(text_file.read())
except Exception:
self.logger.exception("Error loading user queries.")
[docs]
def has_query(self, named_query):
return named_query in self._queries
[docs]
def has_filter(self, named_query):
return named_query in self._filters
[docs]
def get_query(self, named_query, **kwargs):
def in_place_sub(dic, kwargs):
for key in dic:
if isinstance(dic[key], dict):
in_place_sub(dic[key], kwargs)
elif isinstance(dic[key], list):
for item in dic[key]:
in_place_sub(item, kwargs)
elif isinstance(dic[key], str):
dic[key] = dic[key].format(**kwargs).format(**kwargs) # {{q}}
dic = deepcopy(self._queries.get(named_query))
in_place_sub(dic, kwargs)
key, val = next(iter(dic.items()))
return Q(key, **val)
## alternative implementation # noqa: E266
# string = self._queries.get(named_query)
# string1 = re.sub(r"\}", "}}", string)
# string2 = re.sub(r"\{", "{{", string1)
# string3 = re.sub(r'\{\{\{\{(?P<var>.*?)\}\}\}\}', r'{\g<var>}', string2)
# return string3
##
[docs]
def get_filter(self, named_query):
dic = self._filters.get(named_query)
key, val = next(iter(dic.items()))
return Q(key, **val)
@property
def logger(self):
return logging.getLogger(__name__)