Source code for biothings.web.query.builder

"""
    Biothings Query Builder

    Turn the biothings query language to that of the database.
    The interface contains a query term (q) and query options.

    Depending on the underlying database choice, the data type
    of the query term and query options vary. At a minimum,
    a query builder should support:

    q: str, a query term,
        when not provided, always perform a match all query.
        when provided as an empty string, always match none.

    options: dotdict, optional query options.

        scopes: list[str], the fields to look for the query term.
                the meaning of scopes being an empty list or a
                None object/not provided is controlled by specific
                class implementations or not defined.

        _source: list[str], fields to return in the result.
        size: int, maximum number of hits to return.
        from_: int, starting index of result to return.
        sort: str, customized sort keys for result list

        aggs: str, customized aggregation string.
        post_filter: str, when provided, the search hits are filtered after the aggregations are calculated.
        facet_size: int, maximum number of agg results.

"""
import logging
import os
import re
from collections import UserString, namedtuple
from copy import deepcopy
from random import randrange

import orjson
from elasticsearch_dsl import MultiSearch, Q, Search
from elasticsearch_dsl.exceptions import IllegalOperation

from biothings.utils.common import dotdict


[docs] class RawQueryInterrupt(Exception): def __init__(self, data): super().__init__() self.data = data
Query = namedtuple("Query", ("term", "scopes")) Group = namedtuple("Group", ("term", "scopes"))
[docs] class QStringParser: def __init__( self, default_scopes=("_id",), patterns=((r"(?P<scope>\w+):(?P<term>[^:]+)", ()),), gpnames=("term", "scope"), ): assert isinstance(default_scopes, (tuple, list)) assert all(isinstance(field, str) for field in default_scopes) self.default = default_scopes # ["_id", "entrezgene", "ensembl.gene"] self.patterns = [] # [(re.compile(r'^\d+$'), ['entrezgene', 'retired'])] self.gpname = Group(*gpnames) # symbolic group name for term substitution for pattern, fields in patterns: fields = [fields] if isinstance(fields, str) else fields assert all(isinstance(field, str) for field in fields) pattern = re.compile(pattern) if isinstance(pattern, str) else pattern if hasattr(re, "Pattern"): # TODO remove for python>3.7 assert isinstance(pattern, re.Pattern) self.patterns.append((pattern, fields))
[docs] def parse(self, q): assert isinstance(q, str) for regex, fields in self.patterns: match = re.fullmatch(regex, q) if match: named_groups = match.groupdict() q = named_groups.get(self.gpname.term) or q _fields = named_groups.get(self.gpname.scopes) fields = [_fields] if _fields else fields or self.default return Query(q, fields) return Query(q, self.default)
[docs] class ESScrollID(UserString): def __init__(self, seq: object): super().__init__(seq) # scroll id cannot be empty assert self.data
# # ES Query Builder Architecture # ------------------------------------------------------- # build # (support multisearch) # --------------------------↓↓↓-------------------------- # _build_one # (dispatch basing on scopes, then apply_extras(..)) # ------------↓↓↓------------------------↓↓↓------------- # _build_string_query | _build_match_query # (__all__, userquery,..) | (compound match query) # ------------↓↓↓------------------------↓↓↓------------- # default_string_query | default_match_query # (map to ES query string) | (map to ES match query) # -------------------------------------------------------
[docs] class ESQueryBuilder: """ Build an Elasticsearch query with elasticsearch-dsl. """ # Different from other query pipelines, elasticsearch # supports querystring query, which means we can directly # dispatch queires without fields to querystring query, # and those with fields specified to typical match queries. def __init__( self, user_query=None, # like a prepared statement in SQL scopes_regexs=(), # inference used when encountering empty scopes scopes_default=("_id",), # fallback used when scope inference fails allow_random_query=True, # used for data exploration, can be expensive allow_nested_query=False, # nested aggregation can be expensive metadata=None, # access to data like total number of documents ): # for autoscope feature, to infer scope from q when enabled self.parser = QStringParser(scopes_default, scopes_regexs) # all settings below affect only query string queries self.user_query = user_query or ESUserQuery("userquery") self.allow_random_query = allow_random_query self.allow_nested_query = allow_nested_query # for aggregations # currently metadata is only used for __any__ query self.metadata = metadata
[docs] def build(self, q=None, **options): """ Build a query according to q and options. This is the public method called by API handlers. Regarding scopes: scopes: [str] nonempty, match query. scopes: NoneType, or [], no scope, so query string query. Additionally support these options: explain: include es scoring information userquery: customized function to interpret q * additional keywords are passed through as es keywords for example: 'explain', 'version' ... * multi-search is supported when q is a list. all queries are built individually and then sent in one request. """ options = dotdict(options) if options.scroll_id: # bypass all query building stages return ESScrollID(options.scroll_id) if options.fetch_all: # clean up conflicting parameters options.pop("sort", None) options.pop("size", None) try: # process single q vs list of q(s). # dispatch 'val' vs 'key:val' to corresponding functions. if isinstance(q, list): search = MultiSearch() for _q in q: _search = self._build_one(_q, options) search = search.add(_search) else: # str, int ... search = self._build_one(q, options) except IllegalOperation as exc: raise ValueError(str(exc)) # ex. sorting by -_score if options.get("rawquery"): raise RawQueryInterrupt(search.to_dict()) return search
def _build_one(self, q, options): # a single query, possibly included in a multi-search # later but it itself is a single query unit. if options.scopes: search = self._build_match_query(q, options.scopes, options) elif not isinstance(q, (list, tuple)) and options.autoscope: q, scopes = self.parser.parse(str(q)) search = self._build_match_query(q, scopes, options) else: # no scope provided and cannot derive from q search = self._build_string_query(q, options) # pass through es query options. (from, size ...) search = self.apply_extras(search, options) return search def _build_string_query(self, q, options): """q + options -> query object options: userquery """ search = Search() userquery = options.userquery or "" if q == "": # same empty q behavior as that of ES. search = search.query("match_none") elif q == "__all__" or q is None: search = search.query() if options.aggs and not options.size: options.size = 0 elif q == "__any__": if self.allow_random_query: search = search.query("function_score", random_score={}) else: # pseudo random by overriding 'from' value search = search.query() try: # limit 'from' parameter to a valid result window metadata = self.metadata[options.biothing_type] total = metadata["stats"]["total"] fmax = total - options.get("size", 0) from_ = randrange(fmax if fmax < 10000 else 10000) options["from"] = from_ if from_ >= 0 else 0 except Exception: raise ValueError("random query not available.") elif self.user_query.has_query(userquery): userquery_ = self.user_query.get_query(userquery, q=q) search = search.query(userquery_) else: # customization here search = self.default_string_query(str(q), options) if self.user_query.has_filter(userquery): userfilter = self.user_query.get_filter(userquery) search = search.filter(userfilter) return search def _build_match_query(self, q, scopes, options): """q + scopes + options -> query object case 1: # single match query q = "1017" scopes = ["_id"] or "_id" case 2: # compound match query q = ["1017", "CDK2"] scopes = [["_id", "entrezgene"], "symbol"] """ if not isinstance(q, (list, tuple)): q, scopes = [q], [scopes] # considering the complexity of data types, # for example, q can take the type of int, bool, and float, # maybe it's better to let elasticsearch or its python package # handle the type checking. the checks below represent a # typical case but is inconclusive. # if not all(( # isinstance(q, (list, tuple)), # all(isinstance(_q, str) for _q in q))): # raise TypeError("Expect q: Union[list[str], str].", q) # if not all(( # isinstance(scopes, (list, tuple)), # all(isinstance(_s, (list, tuple, str)) for _s in scopes))): # raise TypeError("Expect scopes: list[Union[list[str], str]].", scopes) if not len(q) == len(scopes): raise ValueError("Expect len(q) == len(scopes).") # additional uncommon type errors # will be raised in elasticsearch search = Search() for _q, _scopes in zip(q, scopes): if not (_q and _scopes): raise ValueError("No search terms or scopes.") _search = self.default_match_query(_q, _scopes, options) search = search.query(_search.query) return search
[docs] def default_string_query(self, q, options): """ Override this to customize default string query. By default it implements a query string query. """ assert isinstance(q, str) and q assert not options.scopes return Search().query("query_string", query=q, default_operator="AND", lenient=True)
[docs] def default_match_query(self, q, scopes, options): """ Override this to customize default match query. By default it implements a multi_match query. """ assert isinstance(q, (str, int, float, bool)) assert isinstance(scopes, (list, tuple, str)) and scopes _params = dict(query=q, fields=scopes, operator="AND", lenient=True) if options.analyzer: _params["analyzer"] = options.analyzer return Search().query("multi_match", **_params)
[docs] def apply_extras(self, search, options): """ Process non-query options and customize their behaviors. Customized aggregation syntax string is translated here. """ # add aggregations facet_size = options.facet_size or 10 for agg in options.aggs or []: term, bucket = agg, search.aggs while term: if self.allow_nested_query and "(" in term and term.endswith(")"): _term, term = term[:-1].split("(", 1) else: _term, term = term, "" bucket = bucket.bucket(_term, "terms", field=_term, size=facet_size) # add es params if isinstance(options.sort, list): # accept '-' prefixed field names search = search.sort(*options.sort) if isinstance(options._source, list): if "all" not in options._source: fields_with_minus = [field.lstrip("-") for field in options._source if field.startswith("-")] fields_without_minus = [field for field in options._source if not field.startswith("-")] search = search.source(includes=fields_without_minus, excludes=fields_with_minus) for key in ("from", "size", "explain", "version"): if key in options: search = search.extra(**{key: options[key]}) # the valid values for from and size depend on the # index.max_result_window elasticsearch setting. # more about this constraint on: # https://www.elastic.co/guide/en/elasticsearch/ # reference/current/index-modules.html # Feature: filter # apply extra filter (as query_string query) to filter results # Ref: https://www.elastic.co/guide/en/elasticsearch/reference/8.10/query-dsl-bool-query.html if options.filter: search = search.filter("query_string", query=options.filter) # Feature: post_filter # -- implementation using query string matching # Ref: https://www.elastic.co/guide/en/elasticsearch/reference/8.10/filter-search-results.html#post-filter if options.post_filter: search = search.post_filter("query_string", query=options["post_filter"]) return search
[docs] class MongoQueryBuilder: def __init__(self, default_scopes=("_id",)): self.parser = QStringParser(default_scopes)
[docs] def build(self, q, **options): fields = options.get("scopes", ()) if not fields and q: q, fields = self.parser.parse(q) assert isinstance(fields, (list, tuple)) assert q is None and not fields or q and isinstance(q, str) assert all((isinstance(field, str) for field in fields)) filter_ = {field: 1 for field in options.get("_source", ())} or None # project fields to return query = {"$or": [{field: q} for field in fields]} if fields else {} if options.get("rawquery"): raise RawQueryInterrupt((query, filter_)) return (query, filter_)
[docs] class SQLQueryBuilder: # PROOF OF CONCEPT # INPUT NOT SANITIZED # INTERNAL USE ONLY def __init__(self, tables, default_scopes=("id",), default_limit=10): assert default_scopes assert isinstance(default_limit, int) assert tables and isinstance(tables, dict) self.tables = tables self.default_limit = default_limit self.parser = QStringParser(default_scopes) if None not in self.tables: # set default table self.tables[None] = next(iter(self.tables.values()))
[docs] def build(self, q, **options): statements = [ "SELECT {}".format(", ".join(options.get("_source", ())) or "*"), "FROM {}".format(self.tables[options.get("biothing_type")]), ] scopes = options.get("scopes") if not scopes: q, scopes = self.parser.parse(q) if scopes and q: assert isinstance(q, str) selections = ['{} = "{}"'.format(field, q) for field in scopes] statements.append("WHERE") statements.append(" OR ".join(selections)) # limit result window statements.append("LIMIT {}".format(options.get("size", self.default_limit))) if "from_" in options: statements.append("OFFSET {}".format(options["from_"])) if options.get("rawquery"): raise RawQueryInterrupt(statements) return " ".join(statements)
[docs] class ESUserQuery: def __init__(self, path): self._queries = {} self._filters = {} try: for dirpath, dirnames, filenames in os.walk(path): if dirnames: self.logger.info("User query folders: %s.", dirnames) continue for filename in filenames: with open(os.path.join(dirpath, filename)) as text_file: if "query" in filename: ## alternative implementation # noqa: E266 # self._queries[os.path.basename(dirpath)] = text_file.read() ## self._queries[os.path.basename(dirpath)] = orjson.loads(text_file.read()) elif "filter" in filename: self._filters[os.path.basename(dirpath)] = orjson.loads(text_file.read()) except Exception: self.logger.exception("Error loading user queries.")
[docs] def has_query(self, named_query): return named_query in self._queries
[docs] def has_filter(self, named_query): return named_query in self._filters
[docs] def get_query(self, named_query, **kwargs): def in_place_sub(dic, kwargs): for key in dic: if isinstance(dic[key], dict): in_place_sub(dic[key], kwargs) elif isinstance(dic[key], list): for item in dic[key]: in_place_sub(item, kwargs) elif isinstance(dic[key], str): dic[key] = dic[key].format(**kwargs).format(**kwargs) # {{q}} dic = deepcopy(self._queries.get(named_query)) in_place_sub(dic, kwargs) key, val = next(iter(dic.items())) return Q(key, **val)
## alternative implementation # noqa: E266 # string = self._queries.get(named_query) # string1 = re.sub(r"\}", "}}", string) # string2 = re.sub(r"\{", "{{", string1) # string3 = re.sub(r'\{\{\{\{(?P<var>.*?)\}\}\}\}', r'{\g<var>}', string2) # return string3 ##
[docs] def get_filter(self, named_query): dic = self._filters.get(named_query) key, val = next(iter(dic.items())) return Q(key, **val)
@property def logger(self): return logging.getLogger(__name__)