"""
Biothings Query Builder
Turn the biothings query language to that of the database.
The interface contains a query term (q) and query options.
Depending on the underlying database choice, the data type
of the query term and query options vary. At a minimum,
a query builder should support:
q: str, a query term,
when not provided, always perform a match all query.
when provided as an empty string, always match none.
options: dotdict, optional query options.
scopes: List[str], the fields to look for the query term.
the meaning of scopes being an empty list or a
None object/not provided is controlled by specific
class implementations or not defined.
_source: List[str], fields to return in the result.
size: int, maximum number of hits to return.
from_: int, starting index of result to return.
sort: str, customized sort keys for result list
aggs: str, customized aggregation string.
post_filter: str, when provided, the search hits are filtered after the aggregations are calculated.
facet_size: int, maximum number of agg results.
"""
from collections import UserString, namedtuple
from copy import deepcopy
from random import randrange
import logging
import os
import re
from typing import Iterable, List, Set, Tuple, Union
from elasticsearch.dsl import MultiSearch, Q, Search
from elasticsearch.dsl.exceptions import IllegalOperation
from biothings.utils import serializer
from biothings.utils.common import dotdict
from biothings.web.query.formatter import ESResultFormatter
from biothings.web.services.metadata import BiothingsMetadata
from biothings.web.settings.default import ANNOTATION_DEFAULT_REGEX_PATTERN
logger = logging.getLogger(__name__)
[docs]
class RawQueryInterrupt(Exception):
def __init__(self, data):
super().__init__()
self.data = data
Query = namedtuple("Query", ("term", "scopes"))
Group = namedtuple("Group", ("term", "scopes"))
[docs]
class QStringParser:
def __init__(
self,
default_scopes: Tuple[str] = None,
patterns: Iterable[Tuple[Union[str, re.Pattern], Union[str, Iterable]]] = None,
default_pattern: Tuple[Union[str, re.Pattern], Union[str, Iterable]] = ANNOTATION_DEFAULT_REGEX_PATTERN,
gpnames: Tuple[str] = None,
formatter: ESResultFormatter = None,
):
if default_scopes is None:
default_scopes = ("_id",)
if gpnames is None:
gpnames = ("term", "scope")
self.gpname = Group(*gpnames) # symbolic group name for term substitution
assert isinstance(default_scopes, (tuple, list))
assert all(isinstance(field, str) for field in default_scopes)
self.default_scopes = default_scopes
if formatter is None:
formatter = ESResultFormatter()
self.metadata_field_formatter = formatter
self.default_pattern = self._verify_default_regex_pattern(default_pattern=default_pattern)
self.patterns = self._build_regex_pattern_collection(patterns=patterns)
def _build_endpoint_metadata_fields(self, metadata: BiothingsMetadata) -> Set[str]:
"""
Extracts the field mappings stored in our "metadata" instance
BiothingsESMetadata is constructed in
biothings.web.services.namespace._configure_elasticsearch
We want to access the mappings stored in elasticsearch provided via
the biothing_mappings class property
Example entry for metadata.biothing_metadata
[doid]
defaultdict(
<class 'dict'>,
{
None: {
"_biothing": "disease",
"_indices": [
"doid_20230601_gpycp0cq"
],
"biothing_type": "disease",
"build_date": "2023-06-01T18:26:14.250729-07:00",
"build_version": "20230601",
"src": {
"doid": {
"author": {
"name": "Eric Zhou",
"url": "https://github.com/ericz1803"
},
"code": {
"branch": "main",
"commit": "37c9bda",
"repo": "https://github.com/ericz1803/doid",
"url": "https://github.com/ericz1803/doid/tree/37c9bda7ba0e0569dad3181842ebc14d3af6c6a9/"
},
"download_date": "2023-06-02T01:24:14.106000",
"license": "Creative Commons \nPublic Domain Dedication CC0 \n1.0 Universal license",
"license_url": "https://creativecommons.org/publicdomain/zero/1.0/",
"stats": {
"doid": 11314
},
"upload_date": "2023-06-02T01:24:20.680000",
"url": "https://creativecommons.org/publicdomain/zero/1.0/",
"version": "f360b43144cc9d7b05fd020ad8a0ce6da4419581738252a5b558ef68b00e4ae7"
}
},
"stats": {
"total": 11314
}
},
"disease": {
"_biothing": "disease",
"_indices": [
"doid_20230601_gpycp0cq"
],
"biothing_type": "disease",
"build_date": "2023-06-01T18:26:14.250729-07:00",
"build_version": "20230601",
"src": {
"doid": {
"author": {
"name": "Eric Zhou",
"url": "https://github.com/ericz1803"
},
"code": {
"branch": "main",
"commit": "37c9bda",
"repo": "https://github.com/ericz1803/doid",
"url": "https://github.com/ericz1803/doid/tree/37c9bda7ba0e0569dad3181842ebc14d3af6c6a9/"
},
"download_date": "2023-06-02T01:24:14.106000",
"license": "Creative Commons \nPublic Domain Dedication CC0 \n1.0 Universal license",
"license_url": "https://creativecommons.org/publicdomain/zero/1.0/",
"stats": {
"doid": 11314
},
"upload_date": "2023-06-02T01:24:20.680000",
"url": "https://creativecommons.org/publicdomain/zero/1.0/",
"version": "f360b43144cc9d7b05fd020ad8a0ce6da4419581738252a5b558ef68b00e4ae7"
}
},
"stats": {
"total": 11314
}
}
}
"""
metadata_fields = set()
if metadata is not None:
index_metadata = list(metadata.biothing_metadata.values())
if index_metadata is not None:
for index_metadata_mapping in index_metadata:
metadata_mapping = {}
biothing_type = index_metadata_mapping.get("_biothing", None)
try:
raw_metadata_mapping = metadata.get_mappings(biothing_type)
metadata_mapping = self.metadata_field_formatter.transform_mapping(raw_metadata_mapping)
except Exception as gen_exc:
logger.exception(gen_exc)
logger.error(
"Unable to retrieve elasticsearch field mappings. biothing_type: [%s]", biothing_type
)
metadata_mapping = {}
for field, elasticsearch_mapping in metadata_mapping.items():
field_index = elasticsearch_mapping.get("index", True)
if field_index:
metadata_fields.add(field)
if len(metadata_fields) == 0:
metadata_fields = None
return metadata_fields
def _verify_default_regex_pattern(
self, default_pattern: Tuple[Union[str, re.Pattern], Union[str, Iterable]]
) -> Tuple[re.Pattern, Iterable]:
"""
Take the default pattern and ensure that if the user does intend to override
the default value provided by ANNOTATION_DEFAULT_REGEX_PATTERN that it still matches
the overall structure we expect
Also provides a warning if the user does change the value in case that provides
unwanted behavior
We do allow for setting the regex pattern to None in case the instance does want to
eliminate regex pattern matching in the query building
"""
if default_pattern != ANNOTATION_DEFAULT_REGEX_PATTERN:
logger.warning(
(
f"Default regex pattern changed to [{ANNOTATION_DEFAULT_REGEX_PATTERN}]."
"Set by <ANNOTATION_DEFAULT_REGEX_PATTERN> in the configuration",
)
)
# Initialize to the default pattern and then reset it as well if any exceptions occur
# while loading the overrided pattern
default_regex_pattern = ANNOTATION_DEFAULT_REGEX_PATTERN[0]
default_regex_fields = ANNOTATION_DEFAULT_REGEX_PATTERN[1]
if default_pattern is not None:
try:
default_regex_pattern = re.compile(default_pattern[0])
default_regex_fields = [str(field_entry) for field_entry in default_pattern[1]]
except Exception as gen_exc:
logger.exception(gen_exc)
logger.error(
(
f"Invalid new regex pattern [{default_pattern}]. "
f"Resetting to the default pattern [{ANNOTATION_DEFAULT_REGEX_PATTERN}]"
)
)
default_regex_pattern = ANNOTATION_DEFAULT_REGEX_PATTERN[0]
default_regex_fields = ANNOTATION_DEFAULT_REGEX_PATTERN[1]
finally:
default_pattern = (default_regex_pattern, default_regex_fields)
return default_pattern
def _build_regex_pattern_collection(
self,
patterns: Iterable[Tuple[Union[str, re.Pattern], Union[str, Iterable]]],
) -> List[Tuple[re.Pattern, List[str]]]:
"""
Builds the regex pattern list based off the provided patterns. With the
ANNOTATION_ID_REGEX_LIST configuration parameter, the user can provide
regex patterns matching the following structure:
(Union[str, re.Pattern], Union[str, Iterable])
We also load the default annotation regex pattern from the settings and ensure it's
applied as the very last pattern in a the potential list of regex patterns provided
by the instance configuration. We don't want to publically expose the default regex
pattern in the configuration as accidently modifying that could lead to unexpected /
unwanted behavior. Therefore we add it at runtime if it isn't discovered
Flow:
1) Branch on if a regex pattern list was provided. If none provided then set to the default
and return
2) If an iterable of regex patterns is provided then we force the structure into what we
expect: List[re.Pattern, Iterable]
3) We then iterate over the structure looking for the default regex pattern.
- If we find the default regex pattern match, we ignore updating our list
- If we don't find the default regex pattern match, we update our list with the pattern
4) At the end we add the default regex pattern because we've exhausted our search of the
current pattern list and trimmed any instances we found. This should ensure we've set the
default as the last instance in the regex pattern list
"""
if self.default_pattern:
default_regex_pattern = self.default_pattern[0]
else:
default_regex_pattern = None
structured_patterns = []
if isinstance(patterns, Iterable):
for regex_pattern, regex_fields in patterns:
regex_pattern = re.compile(regex_pattern)
if isinstance(regex_fields, str):
regex_fields = [regex_fields]
# Check if the pattern matchs the default
# If it does match, we ignore adding it until outside the loop
# If it doesn't match we add it in the next instruction
if (
default_regex_pattern
and regex_pattern.pattern == default_regex_pattern.pattern
and len(regex_fields) == 0
):
continue
structured_patterns.append((regex_pattern, regex_fields))
if self.default_pattern:
structured_patterns.append(self.default_pattern)
return structured_patterns
[docs]
def parse(self, query: str, metadata: BiothingsMetadata) -> Query:
"""
Parsing method for the QStringParser object
Inputs
query: string query to search the elasticsearch instance
metadata: BiothingsMetadata object. Typically the BiothingsESMetadata
object defined in the namespace configuration
Flow:
1) It will first attempt to load the metadata fields associated
the endpoint we're querying against. There is a potential chance
that the cache for the BiothingsESMetadata object never refreshed due
to the asynchronous nature of the connection so we can't assume that the
data will be loaded
2) We then iterate over the provided regex patterns from the configuration.
It greedily searchs the supplied regex patterns supplied
via <self.patterns> to the first match in the list. The search breaks after the first
match so the order of `self.patterns` is important when setting the configuration
3) If a match if found we then attempt to extract the two main matching groups
from the expression. We have the `gpname` property defined for the parser class
that is a namedtuple of the following structure:
>>> Group = namedtuple("Group", ("term", "scopes"))
The regex patterns typically define the pattern roughly of the following structure
of <term>:<scope>. With the <term> grouping referring to the search term and <scope>
group matching the different fields to search against. The matched regex pattern attempts
to find these defined groups and pull them out. However it isn't a requirement for either
term or scope so we have an order of precedence for storing the `term_query` and
`scope_fields`
<structure>
(highest priority[variable name] << higher priority << lower priority << lowest priority)
<term>
(regex term[self.gpname.term] << raw input query[query])
<scope>
(regex_scope[self.gpname.scopes] << regex pattern[pattern_fields] << default scope[self.default_scopes]
Using this priority structure, we build the Query object. This is also a named tuple with
the exact same structure as the previously defined Group
>>> Query = namedtuple("Query", ("term", "scopes"))
4) After exiting the loop we perform the metadata check. If we have metadata fields to
validate against we check to see if the generated scope fields are a subset of the metadata
fields. In the positive case, we do nothing and continue with the same `query_object`
instance. In the negative case, we reset the `query_object` to the default
5) The final check is see if we have a defined `query_object`. In the case of no regex
pattern matching against the query, we simply set the `query_object` to the default instance
6) We return the constructed Query instance to the caller
"""
logger.debug("Attempting to parse query string %s", query)
query_metadata = self._build_endpoint_metadata_fields(metadata)
fallback_scope_fields = self.default_scopes
scope_fields = []
query_object = None
for regex, pattern_fields in self.patterns:
match = re.fullmatch(regex, query)
if match:
logger.debug("Discovered regex-query match: regex [%s] | match [%s]", regex, match)
named_groups = match.groupdict()
match_term = named_groups.get(self.gpname.term, None)
matched_fields = named_groups.get(self.gpname.scopes, None)
term_query = match_term or query
scope_fields = matched_fields or pattern_fields or fallback_scope_fields
if not isinstance(scope_fields, (list, tuple)):
scope_fields = [scope_fields]
query_object = Query(term_query, scope_fields)
logger.info("Regex match generated query object: [%s]", query_object)
break
if query_metadata is not None:
logger.debug(
(
"Validating the scope fields against the metadata fields. "
f"scope fields [{set(scope_fields)}] "
f"| metadata fields [{query_metadata}]"
)
)
if not set(scope_fields) <= query_metadata:
query_object = Query(query, fallback_scope_fields)
logger.warning(
(
"Provided scope fields aren't a subset of the metadata elasticsearch fields. "
f"Resetting query object instance to default [{query_object}]"
)
)
if query_object is None:
query_object = Query(query, fallback_scope_fields)
logger.debug("No regex pattern match found. Setting query object instance to default [%s]", query_object)
logger.info("Generated query object: [%s]", query_object)
return query_object
[docs]
class ESUserQuery:
def __init__(self, path):
self._queries = {}
self._filters = {}
try:
for dirpath, dirnames, filenames in os.walk(path):
if dirnames:
self.logger.info("User query folders: %s.", dirnames)
continue
for filename in filenames:
with open(os.path.join(dirpath, filename)) as text_file:
if "query" in filename:
## alternative implementation # noqa: E266
# self._queries[os.path.basename(dirpath)] = text_file.read()
##
self._queries[os.path.basename(dirpath)] = serializer.load_json(text_file.read())
elif "filter" in filename:
self._filters[os.path.basename(dirpath)] = serializer.load_json(text_file.read())
except Exception:
self.logger.exception("Error loading user queries.")
[docs]
def has_query(self, named_query):
return named_query in self._queries
[docs]
def has_filter(self, named_query):
return named_query in self._filters
[docs]
def get_query(self, named_query, **kwargs):
def in_place_sub(dic, kwargs):
for key in dic:
if isinstance(dic[key], dict):
in_place_sub(dic[key], kwargs)
elif isinstance(dic[key], list):
for item in dic[key]:
in_place_sub(item, kwargs)
elif isinstance(dic[key], str):
dic[key] = dic[key].format(**kwargs).format(**kwargs) # {{q}}
dic = deepcopy(self._queries.get(named_query))
in_place_sub(dic, kwargs)
key, val = next(iter(dic.items()))
return Q(key, **val)
## alternative implementation # noqa: E266
# string = self._queries.get(named_query)
# string1 = re.sub(r"\}", "}}", string)
# string2 = re.sub(r"\{", "{{", string1)
# string3 = re.sub(r'\{\{\{\{(?P<var>.*?)\}\}\}\}', r'{\g<var>}', string2)
# return string3
##
[docs]
def get_filter(self, named_query):
dic = self._filters.get(named_query)
key, val = next(iter(dic.items()))
return Q(key, **val)
@property
def logger(self):
return logging.getLogger(__name__)
[docs]
class ESQueryBuilder:
"""
Build an Elasticsearch query with elasticsearch-dsl.
ES Query Builder Architecture
-------------------------------------------------------
build
(support multisearch)
--------------------------↓↓↓--------------------------
_build_one
(dispatch basing on scopes, then apply_extras(..))
------------↓↓↓------------------------↓↓↓-------------
_build_string_query | _build_match_query
(__all__, userquery,..) | (compound match query)
------------↓↓↓------------------------↓↓↓-------------
default_string_query | default_match_query
(map to ES query string) | (map to ES match query)
-------------------------------------------------------
"""
# Different from other query pipelines, elasticsearch
# supports querystring query, which means we can directly
# dispatch queries without fields to querystring query,
# and those with fields specified to typical match queries.
def __init__(
self,
user_query: Union[str, ESUserQuery] = None, # like a prepared statement in SQL
scopes_regexs: Iterable[Tuple[Union[str, re.Pattern], Union[str, Iterable]]] = None,
scopes_default: Tuple[str] = ("_id",), # fallback used when scope inference fails
pattern_default: Tuple[Union[str, re.Pattern], Union[str, Iterable]] = ANNOTATION_DEFAULT_REGEX_PATTERN,
allow_random_query: bool = True, # used for data exploration, can be expensive
allow_nested_query: bool = False, # nested aggregation can be expensive
metadata: BiothingsMetadata = None, # access to data like total number of documents
formatter: ESResultFormatter = None,
):
# all settings below affect only query string queries
if user_query is None:
user_query = ESUserQuery("userquery")
self.user_query = user_query
self.allow_random_query = allow_random_query
self.allow_nested_query = allow_nested_query # for aggregations
# currently metadata is only used for __any__ query
self.metadata = metadata
if formatter is None:
formatter = ESResultFormatter()
self.parser = QStringParser(
default_scopes=scopes_default,
patterns=scopes_regexs,
default_pattern=pattern_default,
gpnames=("term", "scope"),
formatter=formatter,
)
[docs]
def build(self, q=None, **options):
"""
Build a query according to q and options.
This is the public method called by API handlers.
Regarding scopes:
scopes: [str] nonempty, match query.
scopes: NoneType, or [], no scope, so query string query.
Additionally support these options:
explain: include es scoring information
userquery: customized function to interpret q
* additional keywords are passed through as es keywords
for example: 'explain', 'version' ...
* multi-search is supported when q is a list. all queries
are built individually and then sent in one request.
"""
options = dotdict(options)
if options.scroll_id:
# bypass all query building stages
return ESScrollID(options.scroll_id)
if options.fetch_all:
# clean up conflicting parameters
options.pop("sort", None)
options.pop("size", None)
try:
# process single q vs list of q(s).
# dispatch 'val' vs 'key:val' to corresponding functions.
if isinstance(q, list):
search = MultiSearch()
for _q in q:
_search = self._build_one(_q, options)
search = search.add(_search)
else: # str, int ...
search = self._build_one(q, options)
except IllegalOperation as illegal_operation_error:
logger.exception(illegal_operation_error)
raise ValueError from illegal_operation_error
if options.get("rawquery"):
raise RawQueryInterrupt(search.to_dict())
return search
def _build_one(self, q, options):
# a single query, possibly included in a multi-search
# later but it itself is a single query unit.
if options.scopes:
search = self._build_match_query(q, options.scopes, options)
elif not isinstance(q, (list, tuple)) and options.autoscope:
q, scopes = self.parser.parse(str(q), self.metadata)
search = self._build_match_query(q, scopes, options)
else: # no scope provided and cannot derive from q
search = self._build_string_query(q, options)
# pass through es query options. (from, size ...)
search = self.apply_extras(search, options)
return search
def _build_string_query(self, q, options):
"""q + options -> query object
options:
userquery
"""
search = Search()
userquery = options.userquery or ""
if q == "": # same empty q behavior as that of ES.
search = search.query("match_none")
elif q == "__all__" or q is None:
search = search.query()
if options.aggs and not options.size:
options.size = 0
elif q == "__any__":
if self.allow_random_query:
search = search.query("function_score", random_score={})
else: # pseudo random by overriding 'from' value
search = search.query()
try: # limit 'from' parameter to a valid result window
metadata = self.metadata.biothings_metadata[options.biothing_type]
total = metadata["stats"]["total"]
fmax = total - options.get("size", 0)
from_ = randrange(fmax if fmax < 10000 else 10000)
options["from"] = from_ if from_ >= 0 else 0
except Exception:
raise ValueError("random query not available.")
elif self.user_query.has_query(userquery):
userquery_ = self.user_query.get_query(userquery, q=q)
search = search.query(userquery_)
else: # customization here
search = self.default_string_query(str(q), options)
if self.user_query.has_filter(userquery):
userfilter = self.user_query.get_filter(userquery)
search = search.filter(userfilter)
return search
def _build_match_query(self, q, scopes, options):
"""q + scopes + options -> query object
case 1:
# single match query
q = "1017"
scopes = ["_id"] or "_id"
case 2:
# compound match query
q = ["1017", "CDK2"]
scopes = [["_id", "entrezgene"], "symbol"]
"""
if not isinstance(q, (list, tuple)):
q, scopes = [q], [scopes]
# considering the complexity of data types,
# for example, q can take the type of int, bool, and float,
# maybe it's better to let elasticsearch or its python package
# handle the type checking. the checks below represent a
# typical case but is inconclusive.
# if not all((
# isinstance(q, (list, tuple)),
# all(isinstance(_q, str) for _q in q))):
# raise TypeError("Expect q: Union[List[str], str].", q)
# if not all((
# isinstance(scopes, (list, tuple)),
# all(isinstance(_s, (list, tuple, str)) for _s in scopes))):
# raise TypeError("Expect scopes: List[Union[List[str], str]].", scopes)
if not len(q) == len(scopes):
raise ValueError("Expect len(q) == len(scopes).")
# additional uncommon type errors
# will be raised in elasticsearch
search = Search()
for _q, _scopes in zip(q, scopes):
if not (_q and _scopes):
raise ValueError("No search terms or scopes.")
_search = self.default_match_query(_q, _scopes, options)
search = search.query(_search.query)
return search
[docs]
def default_string_query(self, q, options):
"""
Override this to customize default string query.
By default it implements a query string query.
"""
assert isinstance(q, str) and q
assert not options.scopes
return Search().query("query_string", query=q, default_operator="AND", lenient=True)
[docs]
def default_match_query(self, q, scopes, options):
"""
Override this to customize default match query.
By default it implements a multi_match query.
"""
assert isinstance(q, (str, int, float, bool))
assert isinstance(scopes, (list, tuple, str)) and scopes
_params = dict(query=q, fields=scopes, operator="AND", lenient=True)
if options.analyzer:
_params["analyzer"] = options.analyzer
return Search().query("multi_match", **_params)
[docs]
class MongoQueryBuilder:
def __init__(self, default_scopes=("_id",)):
self.parser = QStringParser(default_scopes)
[docs]
def build(self, q, **options):
fields = options.get("scopes", ())
if not fields and q:
q, fields = self.parser.parse(q, metadata=None)
assert isinstance(fields, (list, tuple))
assert q is None and not fields or q and isinstance(q, str)
assert all((isinstance(field, str) for field in fields))
filter_ = {field: 1 for field in options.get("_source", ())} or None # project fields to return
query = {"$or": [{field: q} for field in fields]} if fields else {}
if options.get("rawquery"):
raise RawQueryInterrupt((query, filter_))
return (query, filter_)
[docs]
class SQLQueryBuilder:
# PROOF OF CONCEPT
# INPUT NOT SANITIZED
# INTERNAL USE ONLY
def __init__(self, tables, default_scopes=("id",), default_limit=10):
assert default_scopes
assert isinstance(default_limit, int)
assert tables and isinstance(tables, dict)
self.tables = tables
self.default_limit = default_limit
self.parser = QStringParser(default_scopes)
if None not in self.tables: # set default table
self.tables[None] = next(iter(self.tables.values()))
[docs]
def build(self, q, **options):
statements = [
"SELECT {}".format(", ".join(options.get("_source", ())) or "*"),
"FROM {}".format(self.tables[options.get("biothing_type")]),
]
scopes = options.get("scopes")
if not scopes:
q, scopes = self.parser.parse(q, metadata=None)
if scopes and q:
assert isinstance(q, str)
selections = ['{} = "{}"'.format(field, q) for field in scopes]
statements.append("WHERE")
statements.append(" OR ".join(selections))
# limit result window
statements.append("LIMIT {}".format(options.get("size", self.default_limit)))
if "from_" in options:
statements.append("OFFSET {}".format(options["from_"]))
if options.get("rawquery"):
raise RawQueryInterrupt(statements)
return " ".join(statements)