biothings.utils

biothings.utils.aws

biothings.utils.aws.create_bucket(name, region=None, aws_key=None, aws_secret=None, acl=None, ignore_already_exists=False)[source]

Create a S3 bucket “name” in optional “region”. If aws_key and aws_secret are set, S3 client will these, otherwise it’ll use default system-wide setting. “acl” defines permissions on the bucket: “private” (default), “public-read”, “public-read-write” and “authenticated-read”

biothings.utils.aws.download_s3_file(s3key, localfile=None, aws_key=None, aws_secret=None, s3_bucket=None, overwrite=False)[source]
biothings.utils.aws.get_s3_file(s3key, localfile=None, return_what=False, aws_key=None, aws_secret=None, s3_bucket=None)[source]
biothings.utils.aws.get_s3_file_contents(s3key, aws_key=None, aws_secret=None, s3_bucket=None) bytes[source]
biothings.utils.aws.get_s3_folder(s3folder, basedir=None, aws_key=None, aws_secret=None, s3_bucket=None)[source]
biothings.utils.aws.get_s3_static_website_url(s3key, aws_key=None, aws_secret=None, s3_bucket=None)[source]
biothings.utils.aws.get_s3_url(s3key, aws_key=None, aws_secret=None, s3_bucket=None)[source]
biothings.utils.aws.key_exists(bucket, s3key, aws_key=None, aws_secret=None)[source]
biothings.utils.aws.send_s3_big_file(localfile, s3key, overwrite=False, acl=None, aws_key=None, aws_secret=None, s3_bucket=None, storage_class=None)[source]

Multiparts upload for file bigger than 5GiB

biothings.utils.aws.send_s3_file(localfile, s3key, overwrite=False, permissions=None, metadata=None, content=None, content_type=None, aws_key=None, aws_secret=None, s3_bucket=None, redirect=None)[source]

save a localfile to s3 bucket with the given key. bucket is set via S3_BUCKET it also save localfile’s lastmodified time in s3 file’s metadata

Parameters

redirect (str) – if not None, set the redirect property of the object so it produces a 301 when accessed

biothings.utils.aws.send_s3_folder(folder, s3basedir=None, acl=None, overwrite=False, aws_key=None, aws_secret=None, s3_bucket=None)[source]
biothings.utils.aws.set_static_website(name, aws_key=None, aws_secret=None, index='index.html', error='error.html')[source]

biothings.utils.backend

Backend access class.

class biothings.utils.backend.DocBackendBase[source]

Bases: object

drop()[source]
finalize()[source]

if needed, for example for bulk updates, perform flush at the end of updating. Final optimization or compacting can be done here as well.

get_from_id(id)[source]
get_id_list()[source]
insert(doc_li)[source]
name = 'Undefined'
prepare()[source]

if needed, add extra preparation steps here.

property target_name
update(id, extra_doc)[source]

update only, no upsert.

property version
class biothings.utils.backend.DocBackendOptions(cls, es_index=None, es_host=None, es_doc_type=None, mongo_target_db=None, mongo_target_collection=None)[source]

Bases: object

class biothings.utils.backend.DocESBackend(esidxer=None)[source]

Bases: DocBackendBase

esidxer is an instance of utils.es.ESIndexer class.

count()[source]
classmethod create_from_options(options)[source]

Function that recreates itself from a DocBackendOptions class. Probably a needless rewrite of __init__…

drop()[source]
finalize()[source]

if needed, for example for bulk updates, perform flush at the end of updating. Final optimization or compacting can be done here as well.

get_from_id(id)[source]
get_id_list(step=None)[source]
insert(doc_li)[source]
mget_from_ids(ids, step=100000, only_source=True, asiter=True, **kwargs)[source]

ids is an id list. always return a generator

name = 'es'
prepare(update_mapping=True)[source]

if needed, add extra preparation steps here.

query(query=None, verbose=False, step=10000, scroll='10m', only_source=True, **kwargs)[source]

Function that takes a query and returns an iterator to query results.

remove_from_ids(ids, step=10000)[source]
property target_alias
property target_esidxer
property target_name
update(id, extra_doc)[source]

update only, no upsert.

property version
class biothings.utils.backend.DocMemoryBackend(target_name=None)[source]

Bases: DocBackendBase

target_dict is None or a dict.

drop()[source]
finalize()[source]

dump target_dict into a file.

get_from_id(id)[source]
get_id_list()[source]
insert(doc_li)[source]
name = 'memory'
property target_name
update(id, extra_doc)[source]

update only, no upsert.

class biothings.utils.backend.DocMongoBackend(target_db, target_collection=None)[source]

Bases: DocBackendBase

target_collection is a pymongo collection object.

count()[source]
count_from_ids(ids, step=100000)[source]

return the count of docs matching with input ids normally, it does not need to query in batches, but MongoDB has a BSON size limit of 16M bytes, so too many ids will raise a pymongo.errors.DocumentTooLarge error.

drop()[source]
finalize()[source]

flush all pending writes.

get_from_id(id)[source]
get_id_list()[source]
insert(docs)[source]
mget_from_ids(ids, asiter=False)[source]

ids is an id list. returned doc list should be in the same order of the

input ids. non-existing ids are ignored.

name = 'mongo'
remove_from_ids(ids, step=10000)[source]
property target_db
property target_name
update(docs, upsert=False)[source]

if id does not exist in the target_collection, the update will be ignored except if upsert is True

update_diff(diff, extra={})[source]

update a doc based on the diff returned from diff.diff_doc “extra” can be passed (as a dictionary) to add common fields to the updated doc, e.g. a timestamp.

property version
biothings.utils.backend.DocMongoDBBackend

alias of DocMongoBackend

biothings.utils.common

This module contains util functions may be shared by both BioThings data-hub and web components. In general, do not include utils depending on any third-party modules.

class biothings.utils.common.BiothingsJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

A class to dump Python Datetime object. json.dumps(data, cls=DateTimeJSONEncoder, indent=indent)

Constructor for JSONEncoder, with sensible defaults.

If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.

If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.

If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an RecursionError). Otherwise, no such check takes place.

If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.

If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.

If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.

If specified, separators should be an (item_separator, key_separator) tuple. The default is (’, ‘, ‘: ‘) if indent is None and (‘,’, ‘: ‘) otherwise. To get the most compact JSON representation, you should specify (‘,’, ‘:’) to eliminate whitespace.

If specified, default is a function that gets called for objects that can’t otherwise be serialized. It should return a JSON encodable version of the object or raise a TypeError.

default(o)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
class biothings.utils.common.DummyConfig(name, doc=None)[source]

Bases: module

This class allows “import config” or “from biothings import config” to work without actually creating a config.py file:

import sys from biothings.utils.common import DummyConfig sys.modules[“config”] = DummyConfig(‘config’) sys.modules[“biothings.config”] = DummyConfig(‘config’)

class biothings.utils.common.LogPrint(log_f, log=1, timestamp=0)[source]

Bases: object

If this class is set to sys.stdout, it will output both log_f and __stdout__. log_f is a file handler.

close()[source]
fileno()[source]
flush()[source]
pause()[source]
resume()[source]
start()[source]
write(text)[source]
biothings.utils.common.SubStr(input_string, start_string='', end_string='', include=0)[source]

Return the substring between start_string and end_string. If start_string is ‘’, cut string from the beginning of input_string. If end_string is ‘’, cut string to the end of input_string. If either start_string or end_string can not be found from input_string, return ‘’. The end_pos is the first position of end_string after start_string. If multi-occurence,cut at the first position. include=0(default), does not include start/end_string; include=1: include start/end_string.

biothings.utils.common.addsuffix(filename, suffix, noext=False)[source]

Add suffix in front of “.extension”, so keeping the same extension. if noext is True, remove extension from the filename.

async biothings.utils.common.aiogunzipall(folder, pattern, job_manager, pinfo)[source]

Gunzip all files in folder matching pattern. job_manager is used for parallelisation, and pinfo is a pre-filled dict used by job_manager to report jobs in the hub (see bt.utils.manager.JobManager)

biothings.utils.common.anyfile(infile, mode='r')[source]

return a file handler with the support for gzip/zip comppressed files. if infile is a two value tuple, then first one is the compressed file; the second one is the actual filename in the compressed file. e.g., (‘a.zip’, ‘aa.txt’)

biothings.utils.common.ask(prompt, options='YN')[source]

Prompt Yes or No,return the upper case ‘Y’ or ‘N’.

class biothings.utils.common.dotdict[source]

Bases: dict

biothings.utils.common.dump(obj, filename, protocol=4, compress='gzip')[source]

Saves a compressed object to disk protocol version 4 is the default for py3.8, supported since py3.4

biothings.utils.common.dump2gridfs(obj, filename, db, protocol=2)[source]

Save a compressed (support gzip only) object to MongoDB gridfs.

biothings.utils.common.file_newer(source, target)[source]

return True if source file is newer than target file.

biothings.utils.common.filter_dict(d, keys)[source]

Remove keys from dict “d”. “keys” is a list of string, dotfield notation can be used to express nested keys. If key to remove doesn’t exist, silently ignore it

biothings.utils.common.find_classes_subclassing(mods, baseclass)[source]

Given a module or a list of modules, inspect and find all classes which are a subclass of the given baseclass, inside those modules

biothings.utils.common.find_doc(k, keys)[source]

Used by jsonld insertion in www.api.es._insert_jsonld

biothings.utils.common.get_class_from_classpath(class_path)[source]
biothings.utils.common.get_compressed_outfile(filename, compress='gzip')[source]

Get a output file handler with given compress method. currently support gzip/bz2/lzma, lzma only available in py3

biothings.utils.common.get_dotfield_value(dotfield, d)[source]

Explore dictionary d using dotfield notation and return value. Example:

d = {"a":{"b":1}}.
get_dotfield_value("a.b",d) => 1
biothings.utils.common.get_loop()[source]

Since Python 3.10, a Deprecation warning is emitted if there is no running event loop. In future Python releases, a RuntimeError will be raised instead.

Ref: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_event_loop

biothings.utils.common.get_loop_with_max_workers(max_workers=None)[source]
biothings.utils.common.get_random_string()[source]
biothings.utils.common.get_timestamp()[source]
biothings.utils.common.gunzip(f, pattern='*.gz')[source]
biothings.utils.common.gunzipall(folder, pattern='*.gz')[source]

gunzip all *.gz files in “folder”

class biothings.utils.common.inf[source]

Bases: object

Represents Inf type, but not as a float

biothings.utils.common.is_filehandle(fh)[source]

return True/False if fh is a file-like object

biothings.utils.common.is_float(f)[source]

return True if input is a float.

biothings.utils.common.is_int(s)[source]

return True or False if input string is integer or not.

biothings.utils.common.is_scalar(f)[source]
biothings.utils.common.is_seq(li)[source]

return True if input is either a list or a tuple.

biothings.utils.common.is_str(s)[source]

return True or False if input is a string or not. python3 compatible.

biothings.utils.common.iter_n(iterable, n, with_cnt=False)[source]

Iterate an iterator by chunks (of n) if with_cnt is True, return (chunk, cnt) each time ref http://stackoverflow.com/questions/8991506/iterate-an-iterator-by-chunks-of-n-in-python

biothings.utils.common.json_encode(obj)[source]

Tornado-aimed json encoder, it does the same job as tornado.escape.json_encode but also deals with datetime encoding

biothings.utils.common.json_serial(obj)[source]

JSON serializer for objects not serializable by default json code

biothings.utils.common.list2dict(a_list, keyitem, alwayslist=False)[source]

Return a dictionary with specified keyitem as key, others as values. keyitem can be an index or a sequence of indexes. For example:

li = [['A','a',1],
     ['B','a',2],
     ['A','b',3]]
list2dict(li, 0)---> {'A':[('a',1),('b',3)],
                     'B':('a',2)}

If alwayslist is True, values are always a list even there is only one item in it.

list2dict(li, 0, True)---> {'A':[('a',1),('b',3)],
                            'B':[('a',2),]}
biothings.utils.common.loadobj(filename, mode='file')[source]

Loads a compressed object from disk file (or file-like handler) or MongoDB gridfs file (mode=’gridfs’)

obj = loadobj('data.pyobj')
obj = loadobj(('data.pyobj', mongo_db), mode='gridfs')
biothings.utils.common.md5sum(fname)[source]
biothings.utils.common.merge(x, dx)[source]

Merge dictionary dx (Δx) into dictionary x. If __REPLACE__ key is present in any level z in dx, z in x is replaced, instead of merged, with z in dx.

class biothings.utils.common.nan[source]

Bases: object

Represents NaN type, but not as a float

biothings.utils.common.newer(t0, t1, fmt='%Y%m%d')[source]

t0 and t1 are string of timestamps matching “format” pattern. Return True if t1 is newer than t0.

biothings.utils.common.open_anyfile(infile, mode='r')[source]

a context manager can be used in “with” stmt. accepts a filehandle or anything accepted by anyfile function.

with open_anyfile(‘test.txt’) as in_f:

do_something()

biothings.utils.common.open_compressed_file(filename)[source]

Get a read-only file-handler for compressed file, currently support gzip/bz2/lzma, lzma only available in py3

biothings.utils.common.rmdashfr(top)[source]

Recursively delete dirs and files from “top” directory, then delete “top” dir

biothings.utils.common.run_once()[source]

should_run_task_1 = run_once() print(should_run_task_1()) -> True print(should_run_task_1()) -> False print(should_run_task_1()) -> False print(should_run_task_1()) -> False

should_run_task_2 = run_once() print(should_run_task_2(‘2a’)) -> True print(should_run_task_2(‘2b’)) -> True print(should_run_task_2(‘2a’)) -> False print(should_run_task_2(‘2b’)) -> False …

biothings.utils.common.safe_unicode(s, mask='#')[source]

replace non-decodable char into “#”.

biothings.utils.common.safewfile(filename, prompt=True, default='C', mode='w')[source]

return a file handle in ‘w’ mode,use alternative name if same name exist. if prompt == 1, ask for overwriting,appending or changing name, else, changing to available name automatically.

biothings.utils.common.sanitize_tarfile(tar_object, directory)[source]

Prevent user-assisted remote attackers to overwrite arbitrary files via a .. (dot dot) sequence in filenames in a TAR archive, a related issue to CVE-2007-4559

biothings.utils.common.setup_logfile(logfile)[source]
biothings.utils.common.sizeof_fmt(num, suffix='B')[source]
biothings.utils.common.split_ids(q)[source]

split input query string into list of ids. any of ``

|,+”`` as the separator,

but perserving a phrase if quoted (either single or double quoted) more detailed rules see: http://docs.python.org/2/library/shlex.html#parsing-rules

e.g.:

>>> split_ids('CDK2 CDK3')
 ['CDK2', 'CDK3']
>>> split_ids('"CDK2 CDK3"
CDk4’)

[‘CDK2 CDK3’, ‘CDK4’]

class biothings.utils.common.splitstr[source]

Bases: str

Type representing strings with space in it

biothings.utils.common.timesofar(t0, clock=0, t1=None)[source]

return the string(eg.’3m3.42s’) for the passed real time/CPU time so far from given t0 (return from t0=time.time() for real time/ t0=time.clock() for CPU time).

biothings.utils.common.traverse(obj, leaf_node=False)[source]

Output path-dictionary pairs. For example, input: {

‘exac_nontcga’: {‘af’: 0.00001883}, ‘gnomad_exome’: {‘af’: {‘af’: 0.0000119429, ‘af_afr’: 0.000123077}}, ‘snpeff’: {‘ann’: [{‘effect’: ‘intron_variant’,

‘feature_id’: ‘NM_014672.3’}, {‘effect’: ‘intron_variant’, ‘feature_id’: ‘NM_001256678.1’}]}

} will be translated to a generator: (

(“exac_nontcga”, {“af”: 0.00001883}), (“gnomad_exome.af”, {“af”: 0.0000119429, “af_afr”: 0.000123077}), (“gnomad_exome”, {“af”: {“af”: 0.0000119429, “af_afr”: 0.000123077}}), (“snpeff.ann”, {“effect”: “intron_variant”, “feature_id”: “NM_014672.3”}), (“snpeff.ann”, {“effect”: “intron_variant”, “feature_id”: “NM_001256678.1”}), (“snpeff.ann”, [{ … },{ … }]), (“snpeff”, {“ann”: [{ … },{ … }]}), (‘’, {‘exac_nontcga’: {…}, ‘gnomad_exome’: {…}, ‘snpeff’: {…}})

) or when traversing leaf nodes: (

(‘exac_nontcga.af’, 0.00001883), (‘gnomad_exome.af.af’, 0.0000119429), (‘gnomad_exome.af.af_afr’, 0.000123077), (‘snpeff.ann.effect’, ‘intron_variant’), (‘snpeff.ann.feature_id’, ‘NM_014672.3’), (‘snpeff.ann.effect’, ‘intron_variant’), (‘snpeff.ann.feature_id’, ‘NM_001256678.1’)

)

biothings.utils.common.uncompressall(folder)[source]

Try to uncompress any known archive files in folder

biothings.utils.common.untargzall(folder, pattern='*.tar.gz')[source]

gunzip and untar all *.tar.gz files in “folder”

biothings.utils.common.unxzall(folder, pattern='*.xz')[source]

unxz all xz files in “folder”, in “folder”

biothings.utils.common.unzipall(folder, pattern='*.zip')[source]

unzip all zip files in “folder”, in “folder”

biothings.utils.configuration

class biothings.utils.configuration.ConfigAttrMeta(confmod: biothings.utils.configuration.MetaField = <factory>, section: biothings.utils.configuration.Text = <factory>, description: biothings.utils.configuration.Paragraph = <factory>, readonly: biothings.utils.configuration.Flag = <factory>, hidden: biothings.utils.configuration.Flag = <factory>, invisible: biothings.utils.configuration.Flag = <factory>)[source]

Bases: object

asdict()[source]
commit()[source]
confmod: MetaField
description: Paragraph
feed(field, value)[source]
hidden: Flag
invisible: Flag
readonly: Flag
reset()[source]
section: Text
update(meta)[source]
class biothings.utils.configuration.ConfigLine(seq)[source]

Bases: UserString

PATTERNS = (('hidden', re.compile('^#-\\s*hide\\s*-#\\s*$'), <function ConfigLine.<lambda>>), ('invisible', re.compile('^#-\\s*invisible\\s*-#\\s*$'), <function ConfigLine.<lambda>>), ('readonly', re.compile('^#-\\s*readonly\\s*-#\\s*$'), <function ConfigLine.<lambda>>), ('section', re.compile('^#\\*\\s*(.*)\\s*\\*#\\s*$'), <function ConfigLine.<lambda>>), ('description', re.compile('.*\\s*#\\s+(.*)$'), <function ConfigLine.<lambda>>))
match()[source]
class biothings.utils.configuration.ConfigLines(initlist=None)[source]

Bases: UserList

parse(attrs=())[source]
class biothings.utils.configuration.ConfigurationDefault(default, desc)[source]

Bases: object

exception biothings.utils.configuration.ConfigurationError[source]

Bases: Exception

class biothings.utils.configuration.ConfigurationValue(code)[source]

Bases: object

type to wrap default value when it’s code and needs to be interpreted later code is passed to eval() in the context of the whole “config” dict (so for instance, paths declared before in the configuration file can be used in the code passed to eval) code will also be executed through exec() if eval() raised a syntax error. This would happen when code contains statements, not just expression. In that case, a variable should be created in these statements (named the same as the original config variable) so the proper value can be through ConfigurationManager.

get_value(name, conf)[source]

Return value by eval’ing code in self.code, in the context of given configuration dict (namespace), for given config parameter name.

class biothings.utils.configuration.ConfigurationWrapper(default_config, conf)[source]

Bases: object

Wraps and manages configuration access and edit. A singleton instance is available throughout all hub apps using biothings.config or biothings.hub.config after calling import biothings.hub. In addition to providing config value access, either from config files or database, config manager can supersede attributes of a class with values coming from the database, allowing dynamic configuration of hub’s elements.

When constructing a ConfigurationWrapper instance, variables will be defined with default values coming from default_config, then they can be overridden by conf’s values, or new variables will be added if not defined in default_conf. Only metadata come from default_config will be used.

get_value_from_db(name)[source]
get_value_from_file(name)[source]
property modified
property readonly
reset(name=None)[source]
show()[source]
store_value_to_db(name, value)[source]
supersede(klass)[source]

supersede class variable with db values

class biothings.utils.configuration.Flag(value=None)[source]

Bases: MetaField

default

alias of bool

feed(value)[source]
class biothings.utils.configuration.MetaField(value=None)[source]

Bases: object

clear()[source]
default

alias of None

feed(value)[source]
property value
class biothings.utils.configuration.Paragraph(value=None)[source]

Bases: MetaField

default

alias of list

feed(value)[source]
property value
class biothings.utils.configuration.Text(value=None)[source]

Bases: MetaField

feed(value)[source]
biothings.utils.configuration.is_jsonable(x)[source]
biothings.utils.configuration.set_default_folder(data_archive_root, sub_folder)[source]

set default sub folder based on data_archive_root

biothings.utils.dataload

Utility functions for parsing flatfiles, mapping to JSON, cleaning.

class biothings.utils.dataload.MinType[source]

Bases: object

biothings.utils.dataload.alwayslist(value)[source]

If input value if not a list/tuple type, return it as a single value list.

biothings.utils.dataload.boolean_convert(d, convert_keys=None, level=0)[source]

Explore document d and specified convert keys to boolean. Use dotfield notation for inner keys

biothings.utils.dataload.dict_apply(d, key, value, sort=True)[source]

add value to d[key], append it if key exists

>>> d = {'a': 1}
>>> dict_apply(d, 'a', 2)
 {'a': [1, 2]}
>>> dict_apply(d, 'a', 3)
 {'a': [1, 2, 3]}
>>> dict_apply(d, 'b', 2)
 {'a': 1, 'b': 2}
biothings.utils.dataload.dict_attrmerge(dict_li, removedup=True, sort=True, special_fns=None)[source]
dict_attrmerge([{‘a’: 1, ‘b’:[2,3]},

{‘a’: [1,2], ‘b’:[3,5], ‘c’=4}])

sould return

{‘a’: [1,2], ‘b’:[2,3,5], ‘c’=4}

special_fns is a dictionary of {attr: merge_fn}

used for some special attr, which need special merge_fn e.g., {‘uniprot’: _merge_uniprot}

biothings.utils.dataload.dict_convert(_dict, keyfn=None, valuefn=None)[source]

Return a new dict with each key converted by keyfn (if not None), and each value converted by valuefn (if not None).

biothings.utils.dataload.dict_nodup(_dict, sort=True)[source]
biothings.utils.dataload.dict_sweep(d, vals=None, remove_invalid_list=False)[source]

Remove keys whos values are “.”, “-”, “”, “NA”, “none”, ” ” and remove empty dictionaries

Parameters
  • d (dict) – a dictionary

  • vals (str or list) – a string or list of strings to sweep, or None to use the default values

  • remove_invalid_list (boolean) –

    when true, will remove key for which list has only one value, which is part of “vals”. Ex:

    test_dict = {'gene': [None, None], 'site': ["Intron", None], 'snp_build' : 136}
    

    with remove_invalid_list == False:

    {'gene': [None], 'site': ['Intron'], 'snp_build': 136}
    

    with remove_invalid_list == True:

    {'site': ['Intron'], 'snp_build': 136}
    

biothings.utils.dataload.dict_to_list(gene_d)[source]

return a list of genedoc from genedoc dictionary and make sure the “_id” field exists.

biothings.utils.dataload.dict_traverse(d, func, traverse_list=False)[source]

Recursively traverse dictionary d, calling func(k,v) for each key/value found. func must return a tuple(new_key,new_value)

biothings.utils.dataload.dict_walk(dictionary, key_func)[source]

Recursively apply key_func to dict’s keys

biothings.utils.dataload.dupline_seperator(dupline, dup_sep, dup_idx=None, strip=False)[source]

for a line like this:

a   b1,b2  c1,c2

return a generator of this list (breaking out of the duplicates in each field):

[(a,b1,c1),
 (a,b2,c1),
 (a,b1,c2),
 (a,b2,c2)]

Example:

dupline_seperator(dupline=['a', 'b1,b2', 'c1,c2'],
                  dup_idx=[1,2],
                  dup_sep=',')

if dup_idx is None, try to split on every field. if strip is True, also tripe out of extra spaces.

biothings.utils.dataload.file_merge(infiles, outfile=None, header=1, verbose=1)[source]

merge a list of input files with the same format. if header will be removed from the 2nd files in the list.

biothings.utils.dataload.float_convert(d, include_keys=None, exclude_keys=None)[source]

Convert elements in a document to floats.

By default, traverse all keys If include_keys is specified, only convert the list from include_keys a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys

Parameters
  • d – a dictionary to traverse keys on

  • include_keys – only convert these keys (optional)

  • exclude_keys – exclude all other keys except these keys (optional)

Returns

generate key, value pairs

biothings.utils.dataload.id_strip(id_list)[source]
biothings.utils.dataload.int_convert(d, include_keys=None, exclude_keys=None)[source]

Convert elements in a document to integers.

By default, traverse all keys If include_keys is specified, only convert the list from include_keys a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys

Parameters
  • d – a dictionary to traverse keys on

  • include_keys – only convert these keys (optional)

  • exclude_keys – exclude all other keys except these keys (optional)

Returns

generate key, value pairs

biothings.utils.dataload.list2dict(a_list, keyitem, alwayslist=False)[source]

Return a dictionary with specified keyitem as key, others as values. keyitem can be an index or a sequence of indexes. For example:

li=[['A','a',1],
    ['B','a',2],
    ['A','b',3]]
list2dict(li,0)---> {'A':[('a',1),('b',3)],
                     'B':('a',2)}

If alwayslist is True, values are always a list even there is only one item in it:

list2dict(li,0,True)---> {'A':[('a',1),('b',3)],
                          'B':[('a',2),]}
biothings.utils.dataload.list_itemcnt(a_list)[source]

Return number of occurrence for each type of item in the list.

biothings.utils.dataload.list_nondup(a_list)[source]
biothings.utils.dataload.list_split(d, sep)[source]

Split fields by sep into comma separated lists, strip.

biothings.utils.dataload.listitems(a_list, *idx)[source]

Return multiple items from list by given indexes.

biothings.utils.dataload.listsort(a_list, by, reverse=False, cmp=None, key=None)[source]

Given list is a list of sub(list/tuple.) Return a new list sorted by the ith(given from “by” item) item of each sublist.

biothings.utils.dataload.llist(li, sep='\t')[source]

Nicely output the list with each item a line.

biothings.utils.dataload.merge_dict(dict_li, attr_li, missingvalue=None)[source]

Merging multiple dictionaries into a new one. Example:

In [136]: d1 = {'id1': 100, 'id2': 200}
In [137]: d2 = {'id1': 'aaa', 'id2': 'bbb', 'id3': 'ccc'}
In [138]: merge_dict([d1,d2], ['number', 'string'])
Out[138]:
{'id1': {'number': 100, 'string': 'aaa'},
'id2': {'number': 200, 'string': 'bbb'},
'id3': {'string': 'ccc'}}
In [139]: merge_dict([d1,d2], ['number', 'string'], missingvalue='NA')
Out[139]:
{'id1': {'number': 100, 'string': 'aaa'},
'id2': {'number': 200, 'string': 'bbb'},
'id3': {'number': 'NA', 'string': 'ccc'}}
biothings.utils.dataload.merge_duplicate_rows(rows, db)[source]

@param rows: rows to be grouped by @param db: database name, string

biothings.utils.dataload.merge_root_keys(doc1, doc2, exclude=None)[source]
Ex: d1 = {“_id”:1,”a”:”a”,”b”:{“k”:”b”}}

d2 = {“_id”:1,”a”:”A”,”b”:{“k”:”B”},”c”:123}

Both documents have the same _id, and 2 root keys, “a” and “b”. Using this storage, the resulting document will be:

{‘_id’: 1, ‘a’: [‘A’, ‘a’], ‘b’: [{‘k’: ‘B’}, {‘k’: ‘b’}],”c”:123}

biothings.utils.dataload.merge_struct(v1, v2, aslistofdict=None)[source]
biothings.utils.dataload.normalized_value(value, sort=True)[source]

Return a “normalized” value: 1. if a list, remove duplicate and sort it 2. if a list with one item, convert to that single item only 3. if a list, remove empty values 4. otherwise, return value as it is.

biothings.utils.dataload.rec_handler(infile, block_end='\n', skip=0, include_block_end=False, as_list=False)[source]

A generator to return a record (block of text) at once from the infile. The record is separated by one or more empty lines by default. skip can be used to skip top n-th lines if include_block_end is True, the line matching block_end will also be returned. if as_list is True, return a list of lines in one record.

biothings.utils.dataload.safe_type(f, val)[source]

Convert an input string to int/float/… using passed function. If the conversion fails then None is returned. If value of a type other than a string then the original value is returned.

biothings.utils.dataload.tab2dict(datafile, cols, key, alwayslist=False, **kwargs)[source]
biothings.utils.dataload.tab2dict_iter(datafile, cols, key, alwayslist=False, **kwargs)[source]
biothings.utils.dataload.tab2list(datafile, cols, **kwargs)[source]
biothings.utils.dataload.tabfile_feeder(datafile, header=1, sep='\t', includefn=None, coerce_unicode=True, assert_column_no=None)[source]

a generator for each row in the file.

biothings.utils.dataload.tabfile_tester(datafile, header=1, sep='\t')[source]
biothings.utils.dataload.to_boolean(val, true_str=None, false_str=None)[source]

Normlize str value to boolean value

biothings.utils.dataload.to_float(val)[source]

convert an input string to int

biothings.utils.dataload.to_int(val)[source]

convert an input string to float

biothings.utils.dataload.to_number(val)[source]

convert an input string to int/float.

biothings.utils.dataload.traverse_keys(d, include_keys=None, exclude_keys=None)[source]

Return all key, value pairs for a document.

By default, traverse all keys If include_keys is specified, only traverse the list from include_kes a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys

if a key in include_keys/exclude_keys is not found in d, it’s skipped quietly.

Parameters
  • d – a dictionary to traverse keys on

  • include_keys – only traverse these keys (optional)

  • exclude_keys – exclude all other keys except these keys (optional)

Returns

generate key, value pairs

biothings.utils.dataload.unique_ids(src_module)[source]
biothings.utils.dataload.unlist(d)[source]
biothings.utils.dataload.unlist_incexcl(d, include_keys=None, exclude_keys=None)[source]

Unlist elements in a document.

If there is 1 value in the list, set the element to that value. Otherwise, leave the list unchanged.

By default, traverse all keys If include_keys is specified, only traverse the list from include_keys a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys

Parameters
  • d – a dictionary to unlist

  • include_keys – only unlist these keys (optional)

  • exclude_keys – exclude all other keys except these keys (optional)

Returns

generate key, value pairs

biothings.utils.dataload.update_dict_recur(d, u)[source]

Update dict d with dict u’s values, recursively (so existing values in d but not in u are kept even if nested)

biothings.utils.dataload.updated_dict(_dict, attrs)[source]

Same as dict.update, but return the updated dictionary.

biothings.utils.dataload.value_convert(_dict, fn, traverse_list=True)[source]

For each value in _dict, apply fn and then update _dict with return the value. if traverse_list is True and a value is a list, apply fn to each item of the list.

biothings.utils.dataload.value_convert_incexcl(d, fn, include_keys=None, exclude_keys=None)[source]

Convert elements in a document using a function fn.

By default, traverse all keys If include_keys is specified, only convert the list from include_keys a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys

Parameters
  • d – a dictionary to traverse keys on

  • fn – function to convert elements with

  • include_keys – only convert these keys (optional)

  • exclude_keys – exclude all other keys except these keys (optional)

Returns

generate key, value pairs

biothings.utils.dataload.value_convert_to_number(d, skipped_keys=None)[source]

convert string numbers into integers or floats skip converting certain keys in skipped_keys list

biothings.utils.diff

Utils to compare two list of gene documents

biothings.utils.diff.diff_collections(b1, b2, use_parallel=True, step=10000)[source]

b1, b2 are one of supported backend class in databuild.backend. e.g.:

b1 = DocMongoDBBackend(c1)
b2 = DocMongoDBBackend(c2)
biothings.utils.diff.diff_collections_batches(b1, b2, result_dir, step=10000)[source]

b2 is new collection, b1 is old collection

biothings.utils.diff.diff_doc(doc_1, doc_2, exclude_attrs=['_timestamp'])[source]
biothings.utils.diff.diff_docs_jsonpatch(b1, b2, ids, fastdiff=False, exclude_attrs=[])[source]

if fastdiff is True, only compare the whole doc, do not traverse into each attributes.

biothings.utils.diff.full_diff_doc(doc_1, doc_2, exclude_attrs=['_timestamp'])[source]
biothings.utils.diff.get_backend(uri, db, col, bk_type)[source]
biothings.utils.diff.get_mongodb_uri(backend)[source]
biothings.utils.diff.two_docs_iterator(b1, b2, id_list, step=10000, verbose=False)[source]

biothings.utils.doc_traversal

Some utility functions that do document traversal

class biothings.utils.doc_traversal.Queue[source]

Bases: object

isempty()[source]
pop()[source]

get next obj from queue

push(obj)[source]

put obj on queue

exception biothings.utils.doc_traversal.QueueEmptyError[source]

Bases: Exception

class biothings.utils.doc_traversal.Stack[source]

Bases: object

isempty()[source]
pop()[source]
push(obj)[source]

put obj on stack

exception biothings.utils.doc_traversal.StackEmptyError[source]

Bases: Exception

biothings.utils.doc_traversal.breadth_first_recursive_traversal(doc, path=None)[source]

doesn’t exactly implement breadth first ordering it seems, not sure why…

biothings.utils.doc_traversal.breadth_first_traversal(doc)[source]

Yield a 2 element tuple for every k, v pair in document items (nodes are visited in breadth first order k is itself a tuple of keys annotating the path for this node (v) to root v is the node value

biothings.utils.doc_traversal.depth_first_recursive_traversal(doc, path=None)[source]
biothings.utils.doc_traversal.depth_first_traversal(doc)[source]

Yield a 2 element tuple for every k, v pair in document items (nodes are visited in depth first order k is itself a tuple of keys annotating the path for this node (v) to root v is the node value

biothings.utils.docs

biothings.utils.docs.exists_or_null(doc, field, val=None)[source]
biothings.utils.docs.flatten_doc(doc, outfield_sep='.', sort=True)[source]

This function will flatten an elasticsearch document (really any json object). outfield_sep is the separator between the fields in the return object. sort specifies whether the output object should be sorted alphabetically before returning

(otherwise output will remain in traveral order)

biothings.utils.docs.flatten_doc_2(doc, outfield_sep='.', sort=True)[source]

biothings.utils.dotfield

biothings.utils.dotfield.compose_dot_fields_by_fields(genedoc, fields)[source]

reverse funtion of parse_dot_fields

biothings.utils.dotfield.make_object(attr, value)[source]

Create dictionary following the input dot notation and the value Example:

make_object('a.b.c', 100) -->
or make_object(['a','b','c'], 100) -->
{a:{b:{c:100}}}
biothings.utils.dotfield.merge_object(obj1, obj2)[source]
biothings.utils.dotfield.parse_dot_fields({'a': 1, 'b.c': 2, 'b.a.c': 3})[source]

should return {‘a’: 1, ‘b’: {‘a’: {‘c’: 3}, ‘c’: 2}}

biothings.utils.dotstring

biothings.utils.dotstring.key_value(dictionary, key)[source]
Return a generator for all values in a dictionary specific by a dotstirng (key)

if key is not found from the dictionary, None is returned.

Parameters
  • dictionary – a dictionary to return values from

  • key – key that specifies a value in the dictionary

Returns

generator for values that match the given key

biothings.utils.dotstring.last_element(d, key_list)[source]

Return the last element and key for a document d given a docstring.

A document d is passed with a list of keys key_list. A generator is then returned for all elements that match all keys. Not that there may be a 1-to-many relationship between keys and elements due to lists in the document.

Parameters
  • d – document d to return elements from

  • key_list – list of keys that specify elements in the document d

Returns

generator for elements that match all keys

biothings.utils.dotstring.list_length(d, field)[source]

Return the length of a list specified by field.

If field represents a list in the document, then return its length. Otherwise return 0.

Parameters
  • d – a dictionary

  • field – the dotstring field specifying a list

biothings.utils.dotstring.remove_key(dictionary, key)[source]

Remove field specified by the docstring key

Parameters
  • dictionary – a dictionary to remove the value from

  • key – key that specifies an element in the dictionary

Returns

dictionary after changes have been made

biothings.utils.dotstring.set_key_value(dictionary, key, value)[source]
Set values all values in dictionary matching a dotstring key to a specified value.

if key is not found in dictionary, it just skip quietly.

Parameters
  • dictionary – a dictionary to set values in

  • key – key that specifies an element in the dictionary

Returns

dictionary after changes have been made

biothings.utils.es

class biothings.utils.es.Collection(colname, db)[source]

Bases: object

count()[source]
find(filter=None, projection=None, *args, **kwargs)[source]
find_one(*args, **kwargs)[source]
insert_one(document, *args, **kwargs)[source]
remove(query)[source]
replace_one(filter, replacement, upsert=False, *args, **kwargs)[source]
save(doc, *args, **kwargs)[source]
update(*args, **kwargs)[source]
update_many(filter, update, upsert=False, *args, **kwargs)[source]
update_one(filter, update, upsert=False, *args, **kwargs)[source]
class biothings.utils.es.Database[source]

Bases: IDatabase

CONFIG = None
property address

Returns sufficient information so a connection to a database can be created. Information can be a dictionary, object, etc… and depends on the actual backend

create_collection(colname)[source]

Create a table/colleciton named colname. If backend is using a schema-based database (ie. SQL), backend should enforce the schema with at least field “_id” as the primary key (as a string).

class biothings.utils.es.ESIndex(client, index_name)[source]

Bases: object

An Elasticsearch Index Wrapping A Client. Counterpart for pymongo.collection.Collection

property doc_type
class biothings.utils.es.ESIndexer(index, doc_type='_doc', es_host='localhost:9200', step=500, step_size=10, number_of_shards=1, number_of_replicas=0, check_index=True, **kwargs)[source]

Bases: object

build_index(**kwargs)[source]
check_index()[source]

Check if index is an alias, and update self._index to point to actual index

TODO: the overall design of ESIndexer is not great. If we are exposing ES

implementation details (such as the abilities to create and delete indices, create and update aliases, etc.) to the user of this Class, then this method doesn’t seem that out of place.

clean_field(field, dryrun=True, step=5000)[source]

remove a top-level field from ES index, if the field is the only field of the doc, remove the doc as well. step is the size of bulk update on ES try first with dryrun turned on, and then perform the actual updates with dryrun off.

count(**kwargs)[source]
count_src(**kwargs)[source]
create_index(**kwargs)[source]
create_repository(repo_name, settings)[source]
delete_doc(id)[source]

delete a doc from the index based on passed id.

delete_docs(ids, step=None)[source]

delete a list of docs in bulk.

delete_index(index=None)[source]
doc_feeder(**kwargs)[source]
doc_feeder_using_helper(**kwargs)[source]
exists(**kwargs)[source]

return True/False if a biothing id exists or not.

exists_index(index: Optional[str] = None)[source]
find_biggest_doc(fields_li, min=5, return_doc=False)[source]

return the doc with the max number of fields from fields_li.

get_alias(index: Optional[str] = None, alias_name: Optional[str] = None) List[str][source]

Get indices with alias associated with given index name or alias name

Parameters
  • index – name of index

  • alias_name – name of alias

Returns

Mapping of index names with their aliases

get_biothing(**kwargs)[source]
get_docs(**kwargs)[source]

Return matching docs for given ids iterable, if not found return None. A generator is returned to the matched docs. If only_source is False, the entire document is returned, otherwise only the source is returned.

get_id_list(**kwargs)[source]
get_indice_names_by_settings(index: Optional[str] = None, sort_by_creation_date=False, reverse=False) List[str][source]

Get list of indices names associated with given index name, using indices’ settings

Parameters
  • index – name of index

  • sort_by_creation_date – sort the result by indice’s creation_date

  • reverse – control the direction of the sorting

Returns

list of index names (str)

get_indices_from_snapshots(repo_name, snapshot_name)[source]
get_internal_number_of_replicas()[source]
get_mapping()[source]

return the current index mapping

get_mapping_meta()[source]

return the current _meta field.

get_repository(repo_name)[source]
get_restore_status(index_name=None)[source]
get_settings(index: Optional[str] = None) Mapping[str, Mapping][source]

Get indices with settings associated with given index name

Parameters

index – name of index

Returns

Mapping of index names with their settings

get_snapshot_status(repo, snapshot)[source]
get_snapshots(repo_name, snapshot_name)[source]
index(doc, id=None, action='index')[source]

add a doc to the index. If id is not None, the existing doc will be updated.

index_bulk(docs, step=None, action='index')[source]
mexists(**kwargs)[source]
optimize(**kwargs)[source]

optimize the default index.

restore(repo_name, snapshot_name, index_name=None, purge=False)[source]
set_internal_number_of_replicas(number_of_replicas=None)[source]
snapshot(repo, snapshot, mode=None, **params)[source]
update(id, extra_doc, upsert=True)[source]

update an existing doc with extra_doc. allow to set upsert=True, to insert new docs.

update_alias(alias_name: str, index: Optional[str] = None)[source]

Create or update an ES alias pointing to an index

Creates or updates an alias in Elasticsearch, associated with the given index name or the underlying index of the ESIndexer instance.

When the alias name does not exist, it will be created. If an existing alias already exists, it will be updated to only associate with the index.

When the alias name already exists, an exception will be raised, UNLESS the alias name is the same as index name that the ESIndexer is initialized with. In this case, the existing index with the name collision will be deleted, and the alias will be created in its place. This feature is intended for seamless migration from an index to an alias associated with an index for zero-downtime installs.

Parameters
  • alias_name – name of the alias

  • index – name of the index to associate with alias. If None, the index of the ESIndexer instance is used.

Raises

IndexerException

update_docs(partial_docs, upsert=True, step=None, **kwargs)[source]

update a list of partial_docs in bulk. allow to set upsert=True, to insert new docs.

update_mapping(m)[source]
update_mapping_meta(meta)[source]
exception biothings.utils.es.IndexerException[source]

Bases: Exception

exception biothings.utils.es.MappingError[source]

Bases: Exception

biothings.utils.es.generate_es_mapping(inspect_doc, init=True, level=0)[source]

Generate an ES mapping according to “inspect_doc”, which is produced by biothings.utils.inspect module

biothings.utils.es.get_api()[source]
biothings.utils.es.get_cmd()[source]
biothings.utils.es.get_data_plugin()[source]
biothings.utils.es.get_es(es_host, timeout=120, max_retries=3, retry_on_timeout=False)[source]
biothings.utils.es.get_event()[source]
biothings.utils.es.get_hub_config()[source]
biothings.utils.es.get_hub_db_conn()[source]
biothings.utils.es.get_last_command()[source]
biothings.utils.es.get_source_fullname(col_name)[source]
biothings.utils.es.get_src_build()[source]
biothings.utils.es.get_src_build_config()[source]
biothings.utils.es.get_src_conn()[source]
biothings.utils.es.get_src_dump()[source]
biothings.utils.es.get_src_master()[source]
biothings.utils.es.verify_ids(doc_iter, es_host, index, doc_type=None, step=100000)[source]

verify how many docs from input interator/list overlapping with existing docs.

biothings.utils.es.wrapper(func)[source]

this wrapper allows passing index and doc_type from wrapped method.

biothings.utils.exclude_ids

class biothings.utils.exclude_ids.ExcludeFieldsById(exclusion_ids, field_lst, min_list_size=1000)[source]

Bases: object

This class provides a framework to exclude fields for certain identifiers. Up to three arguments are passed to this class, an identifier list, a list of fields to remove, and minimum list size. The identifier list is a list of document identifiers to act on. The list of fields are fields that will be removed; they are specified using a dotstring notation. The minimum list size is the minimum number of elements that should be in a list in order for it to be removed. The ‘drugbank’, ‘chebi’, and ‘ndc’ data sources were manually tested with this class.

Fields to truncate are specified by field_lst. The dot-notation is accepted.

biothings.utils.hub

exception biothings.utils.hub.AlreadyRunningException[source]

Bases: Exception

class biothings.utils.hub.BaseHubReloader(paths, reload_func, wait=5.0)[source]

Bases: object

Monitor sources’ code and reload hub accordingly to update running code

Monitor given paths for directory deletion/creation and for file deletion/creation. Poll for events every ‘wait’ seconds.

poll()[source]

Start monitoring changes on files and/directories

watched_files()[source]

Return a list of files/directories being watched

class biothings.utils.hub.CommandDefinition[source]

Bases: dict

exception biothings.utils.hub.CommandError[source]

Bases: Exception

class biothings.utils.hub.CommandInformation[source]

Bases: dict

exception biothings.utils.hub.CommandNotAllowed[source]

Bases: Exception

class biothings.utils.hub.CompositeCommand(cmd)[source]

Bases: str

Defines a composite hub commands, that is, a new command made of other commands. Useful to define shortcuts when typing commands in hub console.

class biothings.utils.hub.HubShell(**kwargs: Any)[source]

Bases: InteractiveShell

Create a configurable given a config config.

Parameters
  • config (Config) – If this is empty, default values are used. If config is a Config instance, it will be used to configure the instance.

  • parent (Configurable instance, optional) – The parent Configurable instance of this object.

Notes

Subclasses of Configurable must call the __init__() method of Configurable before doing anything else and using super():

class MyConfigurable(Configurable):
    def __init__(self, config=None):
        super(MyConfigurable, self).__init__(config=config)
        # Then any other code you need to finish initialization.

This ensures that instances will be configured properly.

cmd = None
cmd_cnt = None
classmethod command_info(id=None, running=None, failed=None)[source]
eval(line, return_cmdinfo=False, secure=False)[source]
extract_command_name(cmd)[source]
help(func=None)[source]

Display help on given function/object or list all available commands

launch(pfunc)[source]

Helper to run a command and register it pfunc is partial taking no argument. Command name is generated from partial’s func and arguments

launched_commands = {}
pending_outputs = {}
classmethod refresh_commands()[source]
register_command(cmd, result, force=False)[source]

Register a command ‘cmd’ inside the shell (so we can keep track of it). ‘result’ is the original value that was returned when cmd was submitted. Depending on the type, returns a cmd number (ie. result was an asyncio task and we need to wait before getting the result) or directly the result of ‘cmd’ execution, returning, in that case, the output.

register_managers(managers)[source]
restart(force=False, stop=False)[source]
classmethod save_cmd(_id, cmd)[source]
classmethod set_command_counter()[source]
set_commands(basic_commands, *extra_ns)[source]
stop(force=False)[source]
exception biothings.utils.hub.NoSuchCommand[source]

Bases: Exception

class biothings.utils.hub.TornadoAutoReloadHubReloader(paths, reload_func, wait=5)[source]

Bases: BaseHubReloader

Reloader based on tornado.autoreload module

Monitor given paths for directory deletion/creation and for file deletion/creation. Poll for events every ‘wait’ seconds.

add_watch(paths)[source]

This method recursively adds the input paths, and their children to tornado autoreload for watching them. If any file changes, the tornado will call our hook to reload the hub.

Each path will be forced to become an absolute path. If a path is matched excluding patterns, it will be ignored. Only file is added for watching. Directory will be passed to another add_watch.

monitor()[source]
watched_files()[source]

Return a list of files/directories being watched

biothings.utils.hub.exclude_from_reloader(path)[source]
biothings.utils.hub.get_hub_reloader(*args, **kwargs)[source]
biothings.utils.hub.jsonreadify(cmd)[source]
biothings.utils.hub.publish_data_version(s3_bucket, s3_folder, version_info, update_latest=True, aws_key=None, aws_secret=None)[source]
Update remote files:
  • versions.json: add version_info to the JSON list

    or replace if arg version_info is a list

  • latest.json: update redirect so it points to latest version url

“versions” is dict such as:

{"build_version":"...",         # version name for this release/build
 "require_version":"...",       # version required for incremental update
 "target_version": "...",       # version reached once update is applied
 "type" : "incremental|full"    # release type
 "release_date" : "...",        # ISO 8601 timestamp, release date/time
 "url": "http...."}             # url pointing to release metadata
biothings.utils.hub.stats(src_dump)[source]
biothings.utils.hub.template_out(field, confdict)[source]

Return field as a templated-out filed, substituting some “%(…)s” part with confdict, Fields can follow dotfield notation. Fields like “$(…)” are replaced with a timestamp following specified format (see time.strftime) Example:

confdict = {"a":"one"}
field = "%(a)s_two_three_$(%Y%m)"
=> "one_two_three_201908" # assuming we're in August 2019

biothings.utils.hub_db

hub_db module is a place-holder for internal hub database functions. Hub DB contains informations about sources, configurations variables, etc… It’s for internal usage. When biothings.config_for_app() is called, this module will be “filled” with the actual implementations from the specified backend (speficied in config.py, or defaulting to MongoDB).

Hub DB can be implemented over different backend, it’s orginally been done using MongoDB, so the dialect is very inspired by pymongo. Any hub db backend implementation must implement the functions and classes below. See biothings.utils.mongo and biothings.utils.sqlit3 for some examples.

class biothings.utils.hub_db.ChangeListener[source]

Bases: object

read()[source]
class biothings.utils.hub_db.ChangeWatcher[source]

Bases: object

classmethod add(listener)[source]
col_entity = {'cmd': 'command', 'hub_config': 'config', 'src_build': 'build', 'src_build_config': 'build_config', 'src_dump': 'source', 'src_master': 'master'}
do_publish = False
event_queue = <Queue at 0x7f0adb163ca0 maxsize=0>
listeners = {}
classmethod monitor(func, entity, op)[source]
classmethod publish()[source]
classmethod wrap(getfunc)[source]
class biothings.utils.hub_db.Collection(colname, db)[source]

Bases: object

Defines a minimal subset of MongoDB collection behavior. Note: Collection instances must be pickleable (if not, __getstate__ can be implemented to deal with those attributes for instance)

Init args can differ depending on the backend requirements. colname is the only one required.

count()[source]

Return the number of documents in the collection

database()[source]

Return the database name

find(*args, **kwargs)[source]

Return an iterable of documents matching criterias defined in *args[0] (which will be a dict). Query dialect is a minimal one, inspired by MongoDB. Dict can contain the name of a key, and the value being searched for. Ex: {“field1”:”value1”} will return all documents where field1 == “value1”. Nested key (field1.subfield1) aren’t supported (no need to implement). Exact matches only are required.

If no query is passed, or if query is an empty dict, return all documents.

find_one(*args, **kwargs)[source]

Return one document from the collection. *args will contain a dict with the query parameters. See also find()

insert_one(doc)[source]

Insert a document in the collection. Raise an error if already inserted

property name

Return the collection/table name

remove(query)[source]

Delete all documents matching ‘query’

replace_one(query, doc)[source]

Replace a document matching ‘query’ (or the first found one) with passed doc

save(doc)[source]

Shortcut to update_one() or insert_one(). Save the document, by either inserting if it doesn’t exist, or update existing one

update(query, what)[source]

Same as update_one() but operate on all documents matching ‘query’

update_one(query, what, upsert=False)[source]

Update one document (or the first matching query). See find() for query parameter. “what” tells how to update the document. $set/$unset/$push operators must be implemented (refer to MongoDB documentation for more). Nested keys operation aren’t necesary.

class biothings.utils.hub_db.IDatabase[source]

Bases: object

This class declares an interface and partially implements some of it, mimicking mongokit.Connection class. It’s used to keep used document model. Any internal backend should implement (derives) this interface

property address

Returns sufficient information so a connection to a database can be created. Information can be a dictionary, object, etc… and depends on the actual backend

collection_names()[source]

Return a list of all collections (or tables) found in this database

create_collection(colname)[source]

Create a table/colleciton named colname. If backend is using a schema-based database (ie. SQL), backend should enforce the schema with at least field “_id” as the primary key (as a string).

biothings.utils.hub_db.backup(folder='.', archive=None)[source]

Dump the whole hub_db database in given folder. “archive” can be pass to specify the target filename, otherwise, it’s randomly generated

Note

this doesn’t backup source/merge data, just the internal data used by the hub

biothings.utils.hub_db.get_api()
biothings.utils.hub_db.get_cmd()
biothings.utils.hub_db.get_data_plugin()
biothings.utils.hub_db.get_event()
biothings.utils.hub_db.get_hub_config()
biothings.utils.hub_db.get_src_build()
biothings.utils.hub_db.get_src_build_config()
biothings.utils.hub_db.get_src_dump()
biothings.utils.hub_db.get_src_master()
biothings.utils.hub_db.restore(archive, drop=False)[source]

Restore database from given archive. If drop is True, then delete existing collections

biothings.utils.hub_db.setup(config)[source]

biothings.utils.info

class biothings.utils.info.DevInfo[source]

Bases: object

get()[source]
class biothings.utils.info.FieldNote(path)[source]

Bases: object

get_field_notes()[source]

Return the cached field notes associated with this instance.

biothings.utils.inspect

This module contains util functions may be shared by both BioThings data-hub and web components. In general, do not include utils depending on any third-party modules. Note: unittests available in biothings.tests.hub

class biothings.utils.inspect.BaseMode[source]

Bases: object

key = None
merge(target, tomerge)[source]

Merge two different maps together (from tomerge into target)

post(mapt, mode, clean)[source]
report(struct, drep, orig_struct=None)[source]

Given a data structure “struct” being inspected, report (fill) “drep” dictionary with useful values for this mode, under drep[self.key] key. Sometimes “struct” is already converted to its analytical value at this point (inspect may count number of dict and would force to pass struct as “1”, instead of the whole dict, where number of keys could be then be reported), “orig_struct” is that case contains the original structure that was to be reported, whatever the pre-conversion step did.

template = {}
class biothings.utils.inspect.DeepStatsMode[source]

Bases: StatsMode

key = '_stats'
merge(target_stats, tomerge_stats)[source]

Merge two different maps together (from tomerge into target)

post(mapt, mode, clean)[source]
report(val, drep, orig_struct=None)[source]

Given a data structure “struct” being inspected, report (fill) “drep” dictionary with useful values for this mode, under drep[self.key] key. Sometimes “struct” is already converted to its analytical value at this point (inspect may count number of dict and would force to pass struct as “1”, instead of the whole dict, where number of keys could be then be reported), “orig_struct” is that case contains the original structure that was to be reported, whatever the pre-conversion step did.

template = {'_stats': {'__vals': [], '_count': 0, '_max': -inf, '_min': inf}}
class biothings.utils.inspect.IdentifiersMode[source]

Bases: RegexMode

ids = None
key = '_ident'
matchers = None
class biothings.utils.inspect.RegexMode[source]

Bases: BaseMode

matchers = []
merge(target, tomerge)[source]

Merge two different maps together (from tomerge into target)

report(val, drep, orig_struct=None)[source]

Given a data structure “struct” being inspected, report (fill) “drep” dictionary with useful values for this mode, under drep[self.key] key. Sometimes “struct” is already converted to its analytical value at this point (inspect may count number of dict and would force to pass struct as “1”, instead of the whole dict, where number of keys could be then be reported), “orig_struct” is that case contains the original structure that was to be reported, whatever the pre-conversion step did.

class biothings.utils.inspect.StatsMode[source]

Bases: BaseMode

flatten_stats(stats)[source]
key = '_stats'
maxminiflist(val, func)[source]
merge(target_stats, tomerge_stats)[source]

Merge two different maps together (from tomerge into target)

report(struct, drep, orig_struct=None)[source]

Given a data structure “struct” being inspected, report (fill) “drep” dictionary with useful values for this mode, under drep[self.key] key. Sometimes “struct” is already converted to its analytical value at this point (inspect may count number of dict and would force to pass struct as “1”, instead of the whole dict, where number of keys could be then be reported), “orig_struct” is that case contains the original structure that was to be reported, whatever the pre-conversion step did.

sumiflist(val)[source]
template = {'_stats': {'_count': 0, '_max': -inf, '_min': inf, '_none': 0}}
biothings.utils.inspect.compute_metadata(mapt, mode)[source]
biothings.utils.inspect.get_converters(modes, logger=<module 'logging' from '/home/docs/.asdf/installs/python/3.10.4/lib/python3.10/logging/__init__.py'>)[source]
biothings.utils.inspect.get_mode_layer(mode)[source]
biothings.utils.inspect.inspect(struct, key=None, mapt=None, mode='type', level=0, logger=<module 'logging' from '/home/docs/.asdf/installs/python/3.10.4/lib/python3.10/logging/__init__.py'>)[source]

Explore struct and report types contained in it.

Parameters
  • struct – is the data structure to explore

  • mapt – if not None, will complete that type map with passed struct. This is useful when iterating over a dataset of similar data, trying to find a good type summary contained in that dataset.

  • level – is for internal purposes, mostly debugging

  • mode – see inspect_docs() documentation

biothings.utils.inspect.inspect_docs(docs, mode='type', clean=True, merge=False, logger=<module 'logging' from '/home/docs/.asdf/installs/python/3.10.4/lib/python3.10/logging/__init__.py'>, pre_mapping=False, limit=None, sample=None, metadata=True, auto_convert=True)[source]

Inspect docs and return a summary of its structure:

Parameters
  • mode

    possible values are:

    • ”type”: (default) explore documents and report strict data structure

    • ”mapping”: same as type but also perform test on data so guess best mapping

      (eg. check if a string is splitable, etc…). Implies merge=True

    • ”stats”: explore documents and compute basic stats (count,min,max,sum)

    • ”deepstats”: same as stats but record values and also compute mean,stdev,median

      (memory intensive…)

    • ”jsonschema”, same as “type” but returned a json-schema formatted result

    mode can also be a list of modes, eg. [“type”,”mapping”]. There’s little overhead computing multiple types as most time is spent on actually getting the data.

  • clean – don’t delete recorded vqlues or temporary results

  • merge – merge scalar into list when both exist (eg. {“val”:..} and [{“val”:…}]

  • limit – can limit the inspection to the x first docs (None = no limit, inspects all)

  • sample – in combination with limit, randomly extract a sample of ‘limit’ docs (so not necessarily the x first ones defined by limit). If random.random() is greater than sample, doc is inspected, otherwise it’s skipped

  • metadata – compute metadata on the result

  • auto_convert – run converters automatically (converters are used to convert one mode’s output to another mode’s output, eg. type to jsonschema)

biothings.utils.inspect.merge_record(target, tomerge, mode)[source]
biothings.utils.inspect.merge_scalar_list(mapt, mode)[source]
biothings.utils.inspect.run_converters(_map, converters, logger=<module 'logging' from '/home/docs/.asdf/installs/python/3.10.4/lib/python3.10/logging/__init__.py'>)[source]
biothings.utils.inspect.stringify_inspect_doc(dmap)[source]
biothings.utils.inspect.typify_inspect_doc(dmap)[source]

dmap is an inspect which was converted to be stored in a database, namely actual python types were stringify to be storabled. This function does the oposite and restore back python types within the inspect doc

biothings.utils.jsondiff

The MIT License (MIT)

Copyright (c) 2014 Ilya Volkov

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

biothings.utils.jsondiff.make(src, dst, **kwargs)[source]

biothings.utils.jsonpatch

Apply JSON-Patches (RFC 6902)

class biothings.utils.jsonpatch.AddOperation(operation)[source]

Bases: PatchOperation

Adds an object property or an array element.

apply(obj)[source]

Abstract method that applies patch operation to specified object.

class biothings.utils.jsonpatch.CopyOperation(operation)[source]

Bases: PatchOperation

Copies an object property or an array element to a new location

apply(obj)[source]

Abstract method that applies patch operation to specified object.

exception biothings.utils.jsonpatch.InvalidJsonPatch[source]

Bases: JsonPatchException

Raised if an invalid JSON Patch is created

class biothings.utils.jsonpatch.JsonPatch(patch)[source]

Bases: object

A JSON Patch is a list of Patch Operations.

>>> patch = JsonPatch([
...     {'op': 'add', 'path': '/foo', 'value': 'bar'},
...     {'op': 'add', 'path': '/baz', 'value': [1, 2, 3]},
...     {'op': 'remove', 'path': '/baz/1'},
...     {'op': 'test', 'path': '/baz', 'value': [1, 3]},
...     {'op': 'replace', 'path': '/baz/0', 'value': 42},
...     {'op': 'remove', 'path': '/baz/1'},
... ])
>>> doc = {}
>>> result = patch.apply(doc)
>>> expected = {'foo': 'bar', 'baz': [42]}
>>> result == expected
True

JsonPatch object is iterable, so you could easily access to each patch statement in loop:

>>> lpatch = list(patch)
>>> expected = {'op': 'add', 'path': '/foo', 'value': 'bar'}
>>> lpatch[0] == expected
True
>>> lpatch == patch.patch
True

Also JsonPatch could be converted directly to bool if it contains any operation statements:

>>> bool(patch)
True
>>> bool(JsonPatch([]))
False

This behavior is very handy with make_patch() to write more readable code:

>>> old = {'foo': 'bar', 'numbers': [1, 3, 4, 8]}
>>> new = {'baz': 'qux', 'numbers': [1, 4, 7]}
>>> patch = make_patch(old, new)
>>> if patch:
...     # document have changed, do something useful
...     patch.apply(old)    
{...}
apply(orig_obj, in_place=False, ignore_conflicts=False, verify=False)[source]

Applies the patch to given object.

Parameters
  • obj (dict) – Document object.

  • in_place (bool) – Tweaks way how patch would be applied - directly to specified obj or to his copy.

Returns

Modified obj.

classmethod from_diff(src, dst)[source]

Creates JsonPatch instance based on comparing of two document objects. Json patch would be created for src argument against dst one.

Parameters
  • src (dict) – Data source document object.

  • dst (dict) – Data source document object.

Returns

JsonPatch instance.

>>> src = {'foo': 'bar', 'numbers': [1, 3, 4, 8]}
>>> dst = {'baz': 'qux', 'numbers': [1, 4, 7]}
>>> patch = JsonPatch.from_diff(src, dst)
>>> new = patch.apply(src)
>>> new == dst
True
classmethod from_string(patch_str)[source]

Creates JsonPatch instance from string source.

Parameters

patch_str (str) – JSON patch as raw string.

Returns

JsonPatch instance.

to_string()[source]

Returns patch set as JSON string.

exception biothings.utils.jsonpatch.JsonPatchConflict[source]

Bases: JsonPatchException

Raised if patch could not be applied due to conflict situation such as: - attempt to add object key then it already exists; - attempt to operate with nonexistence object key; - attempt to insert value to array at position beyond of it size; - etc.

exception biothings.utils.jsonpatch.JsonPatchException[source]

Bases: Exception

Base Json Patch exception

exception biothings.utils.jsonpatch.JsonPatchTestFailed[source]

Bases: JsonPatchException, AssertionError

A Test operation failed

class biothings.utils.jsonpatch.MoveOperation(operation)[source]

Bases: PatchOperation

Moves an object property or an array element to new location.

apply(obj)[source]

Abstract method that applies patch operation to specified object.

class biothings.utils.jsonpatch.PatchOperation(operation)[source]

Bases: object

A single operation inside a JSON Patch.

apply(obj)[source]

Abstract method that applies patch operation to specified object.

class biothings.utils.jsonpatch.RemoveOperation(operation)[source]

Bases: PatchOperation

Removes an object property or an array element.

apply(obj)[source]

Abstract method that applies patch operation to specified object.

class biothings.utils.jsonpatch.ReplaceOperation(operation)[source]

Bases: PatchOperation

Replaces an object property or an array element by new value.

apply(obj)[source]

Abstract method that applies patch operation to specified object.

class biothings.utils.jsonpatch.TestOperation(operation)[source]

Bases: PatchOperation

Test value by specified location.

apply(obj)[source]

Abstract method that applies patch operation to specified object.

biothings.utils.jsonpatch.apply_patch(doc, patch, in_place=False, ignore_conflicts=False, verify=False)[source]

Apply list of patches to specified json document.

Parameters
  • doc (dict) – Document object.

  • patch (list or str) – JSON patch as list of dicts or raw JSON-encoded string.

  • in_place (bool) – While True patch will modify target document. By default patch will be applied to document copy.

  • ignore_conflicts (bool) – Ignore JsonConflicts errors

  • verify (bool) – works with ignore_conflicts = True, if errors and verify is True (recommanded), make sure the resulting objects is the same as the original one. ignore_conflicts and verify are used to run patches multiple times and get rif of errors when operations can’t be performed multiple times because the object has already been patched This will force in_place to False in order the comparison to occur.

Returns

Patched document object.

Return type

dict

>>> doc = {'foo': 'bar'}
>>> patch = [{'op': 'add', 'path': '/baz', 'value': 'qux'}]
>>> other = apply_patch(doc, patch)
>>> doc is not other
True
>>> other == {'foo': 'bar', 'baz': 'qux'}
True
>>> patch = [{'op': 'add', 'path': '/baz', 'value': 'qux'}]
>>> apply_patch(doc, patch, in_place=True) == {'foo': 'bar', 'baz': 'qux'}
True
>>> doc == other
True
biothings.utils.jsonpatch.get_loadjson()[source]

adds the object_pairs_hook parameter to json.load when possible

The “object_pairs_hook” parameter is used to handle duplicate keys when loading a JSON object. This parameter does not exist in Python 2.6. This methods returns an unmodified json.load for Python 2.6 and a partial function with object_pairs_hook set to multidict for Python versions that support the parameter.

biothings.utils.jsonpatch.make_patch(src, dst)[source]

Generates patch by comparing of two document objects. Actually is a proxy to JsonPatch.from_diff() method.

Parameters
  • src (dict) – Data source document object.

  • dst (dict) – Data source document object.

>>> src = {'foo': 'bar', 'numbers': [1, 3, 4, 8]}
>>> dst = {'baz': 'qux', 'numbers': [1, 4, 7]}
>>> patch = make_patch(src, dst)
>>> new = patch.apply(src)
>>> new == dst
True
biothings.utils.jsonpatch.multidict(ordered_pairs)[source]

Convert duplicate keys values to lists.

biothings.utils.jsonpatch.reapply_patch(doc, patch)[source]

Apply or (safely) re-apply patch to doc

biothings.utils.jsonschema

biothings.utils.jsonschema.generate_json_schema(dmap)[source]
biothings.utils.jsonschema.test()[source]

biothings.utils.loggers

class biothings.utils.loggers.Colors(value)[source]

Bases: Enum

An enumeration.

CRITICAL = '#7b0099'
DEBUG = '#a1a1a1'
ERROR = 'danger'
INFO = 'good'
NOTSET = '#d6d2d2'
WARNING = 'warning'
class biothings.utils.loggers.EventRecorder(*args, **kwargs)[source]

Bases: StreamHandler

Initialize the handler.

If stream is not specified, sys.stderr is used.

emit(record)[source]

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.

class biothings.utils.loggers.GZipRotator[source]

Bases: object

class biothings.utils.loggers.LookUpList(initlist)[source]

Bases: UserList

find(val)[source]
find_index(val)[source]
class biothings.utils.loggers.Range(start: Union[int, float] = 0, end: Union[int, float] = inf)[source]

Bases: object

end: Union[int, float] = inf
start: Union[int, float] = 0
class biothings.utils.loggers.ReceiverGroup(initlist=None)[source]

Bases: UserList

class biothings.utils.loggers.Record(range, value)[source]

Bases: NamedTuple

Create new instance of Record(range, value)

range: Range

Alias for field number 0

value: Enum

Alias for field number 1

class biothings.utils.loggers.ShellLogger(*args, **kwargs)[source]

Bases: Logger

Custom “levels” for input going to the shell and output coming from it (just for naming)

Initialize the logger with a name and an optional level.

INPUT = 1001
OUTPUT = 1000
input(msg, *args, **kwargs)[source]
output(msg, *args, **kwargs)[source]
class biothings.utils.loggers.SlackHandler(webhook, mentions)[source]

Bases: StreamHandler

Initialize the handler.

If stream is not specified, sys.stderr is used.

emit(record)[source]

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.

static send(webhook, message, level, mentions=())[source]
class biothings.utils.loggers.SlackMentionPolicy(policy)[source]

Bases: object

mentions(level)[source]
class biothings.utils.loggers.SlackMessage[source]

Bases: object

build()[source]
markdown(text, prefixes=(), suffixes=())[source]
plaintext(text, color)[source]
class biothings.utils.loggers.Squares(value)[source]

Bases: Enum

An enumeration.

CRITICAL = ':large_purple_square:'
DEBUG = ':white_large_square:'
ERROR = ':large_red_square:'
INFO = ':large_blue_square:'
NOTSET = ''
WARNING = ':large_orange_square:'
class biothings.utils.loggers.WSLogHandler(listener)[source]

Bases: StreamHandler

when listener is a bt.hub.api.handlers.ws.LogListener instance, log statements are propagated through existing websocket

Initialize the handler.

If stream is not specified, sys.stderr is used.

emit(record)[source]

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.

payload(record)[source]
class biothings.utils.loggers.WSShellHandler(listener)[source]

Bases: WSLogHandler

when listener is a bt.hub.api.handlers.ws.LogListener instance, log statements are propagated through existing websocket

Initialize the handler.

If stream is not specified, sys.stderr is used.

payload(record)[source]
biothings.utils.loggers.configurate_file_handler(logger, logfile, formater=None, force=False)[source]
biothings.utils.loggers.create_logger(log_folder, logger_name, level=10)[source]
biothings.utils.loggers.get_logger(logger_name, log_folder=None, handlers=('console', 'file', 'slack'), timestamp=None, force=False)[source]

Configure a logger object from logger_name and return (logger, logfile)

biothings.utils.loggers.setup_default_log(default_logger_name, log_folder, level=10)[source]

biothings.utils.manager

class biothings.utils.manager.BaseManager(job_manager, poll_schedule=None)[source]

Bases: object

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

poll(state, func, col)[source]

Search for source in collection ‘col’ with a pending flag list containing ‘state’ and and call ‘func’ for each document found (with doc as only param)

class biothings.utils.manager.BaseSourceManager(job_manager, datasource_path='dataload.sources', *args, **kwargs)[source]

Bases: BaseManager

Base class to provide source management: discovery, registration Actual launch of tasks must be defined in subclasses

SOURCE_CLASS = None
filter_class(klass)[source]

Gives opportunity for subclass to check given class and decide to keep it or not in the discovery process. Returning None means “skip it”.

find_classes(src_module, fail_on_notfound=True)[source]

Given a python module, return a list of classes in this module, matching SOURCE_CLASS (must inherit from)

register_classes(klasses)[source]

Register each class in self.register dict. Key will be used to retrieve the source class, create an instance and run method from it. It must be implemented in subclass as each manager may need to access its sources differently,based on different keys.

register_source(src, fail_on_notfound=True)[source]

Register a new data source. src can be a module where some classes are defined. It can also be a module path as a string, or just a source name in which case it will try to find information from default path.

register_sources(sources)[source]
class biothings.utils.manager.BaseStatusRegisterer[source]

Bases: object

property collection

Return collection object used to fetch doc in which we store status

load_doc(key_name, stage)[source]

Find document using key_name and stage, stage being a key within the document matching a specific process name: Ex: {“_id”:”123”,”snapshot”:”abc”}

load_doc(“abc”,”snapshot”)

will return the document. Note key_name is first used to find the doc by its _id. Ex: with another doc {“_id” : “abc”, “snapshot” : “somethingelse”}

load_doc{“abc”,”snapshot”)

will return doc with _id=”abc”, not “123”

register_status(doc, stage, status, transient=False, init=False, **extra)[source]
class biothings.utils.manager.JobManager(loop, process_queue=None, thread_queue=None, max_memory_usage=None, num_workers=None, num_threads=None, auto_recycle=True)[source]

Bases: object

COLUMNS = ['pid', 'source', 'category', 'step', 'description', 'mem', 'cpu', 'started_at', 'duration']
DATALINE = '{pid:<10}|{source:<35}|{category:<10}|{step:<20}|{description:<30}|{mem:<10}|{cpu:<6}|{started_at:<20}|{duration:<10}'
HEADER = {'category': 'CATEGORY', 'cpu': 'CPU', 'description': 'DESCRIPTION', 'duration': 'DURATION', 'mem': 'MEM', 'pid': 'PID', 'source': 'SOURCE', 'started_at': 'STARTED_AT', 'step': 'STEP'}
HEADERLINE = '{pid:^10}|{source:^35}|{category:^10}|{step:^20}|{description:^30}|{mem:^10}|{cpu:^6}|{started_at:^20}|{duration:^10}'
async check_constraints(pinfo=None)[source]
clean_staled()[source]
async defer_to_process(pinfo=None, func=None, *args, **kwargs)[source]
async defer_to_thread(pinfo=None, func=None, *args)[source]
extract_pending_info(pending)[source]
extract_worker_info(worker)[source]
get_pending_processes()[source]
get_pending_summary(getstr=False)[source]
get_pid_files(child=None)[source]
get_process_summary()[source]
get_summary(child=None)[source]
get_thread_files()[source]
get_thread_summary()[source]
property hub_memory
property hub_process
job_info()[source]
property pchildren
print_pending_info(num, info)[source]
print_workers(workers)[source]
recycle_process_queue()[source]

Replace current process queue with a new one. When processes are used over and over again, memory tends to grow as python interpreter keeps some data (…). Calling this method will perform a clean shutdown on current queue, waiting for running processes to terminate, then discard current queue and replace it a new one.

schedule(crontab, func, *args, **kwargs)[source]

Helper to create a cron job from a callable “func”. *argd, and **kwargs are passed to func. “crontab” follows aicron notation.

show_pendings(running=None)[source]
stop(force=False, recycling=False, wait=1)[source]
submit(pfunc, schedule=None)[source]

Helper to submit and run tasks. Tasks will run async’ly. pfunc is a functools.partial schedule is a string representing a cron schedule, task will then be scheduled accordingly.

top(action='summary')[source]
exception biothings.utils.manager.ManagerError[source]

Bases: Exception

exception biothings.utils.manager.ResourceError[source]

Bases: Exception

exception biothings.utils.manager.ResourceNotFound[source]

Bases: Exception

exception biothings.utils.manager.UnknownResource[source]

Bases: Exception

biothings.utils.manager.do_work(job_id, ptype, pinfo=None, func=None, *args, **kwargs)[source]
biothings.utils.manager.find_process(pid)[source]
biothings.utils.manager.norm(value, maxlen)[source]
biothings.utils.manager.track(func)[source]

biothings.utils.mongo

class biothings.utils.mongo.Collection(*args, **kwargs)[source]

Bases: HandleAutoReconnectMixin, Collection

Get / create a Mongo collection.

Raises TypeError if name is not an instance of basestring (str in python 3). Raises InvalidName if name is not a valid collection name. Any additional keyword arguments will be used as options passed to the create command. See create_collection() for valid options.

If create is True, collation is specified, or any additional keyword arguments are present, a create command will be sent, using session if specified. Otherwise, a create command will not be sent and the collection will be created implicitly on first use. The optional session argument is only used for the create command, it is not associated with the collection afterward.

Parameters
  • database: the database to get a collection from

  • name: the name of the collection to get

  • create (optional): if True, force collection creation even without options being set

  • codec_options (optional): An instance of CodecOptions. If None (the default) database.codec_options is used.

  • read_preference (optional): The read preference to use. If None (the default) database.read_preference is used.

  • write_concern (optional): An instance of WriteConcern. If None (the default) database.write_concern is used.

  • read_concern (optional): An instance of ReadConcern. If None (the default) database.read_concern is used.

  • collation (optional): An instance of Collation. If a collation is provided, it will be passed to the create collection command.

  • session (optional): a ClientSession that is used with the create collection command

  • **kwargs (optional): additional keyword arguments will be passed as options for the create collection command

Changed in version 4.2: Added the clusteredIndex and encryptedFields parameters.

Changed in version 4.0: Removed the reindex, map_reduce, inline_map_reduce, parallel_scan, initialize_unordered_bulk_op, initialize_ordered_bulk_op, group, count, insert, save, update, remove, find_and_modify, and ensure_index methods. See the pymongo4-migration-guide.

Changed in version 3.6: Added session parameter.

Changed in version 3.4: Support the collation option.

Changed in version 3.2: Added the read_concern option.

Changed in version 3.0: Added the codec_options, read_preference, and write_concern options. Removed the uuid_subtype attribute. Collection no longer returns an instance of Collection for attribute names with leading underscores. You must use dict-style lookups instead::

collection[‘__my_collection__’]

Not:

collection.__my_collection__

See also

The MongoDB documentation on collections.

count(_filter=None, **kwargs)[source]
insert(doc_or_docs, *args, **kwargs)[source]
remove(spec_or_id=None, **kwargs)[source]
save(doc, *args, **kwargs)[source]
update(spec, doc, *args, **kwargs)[source]
class biothings.utils.mongo.Database(*args, **kwargs)[source]

Bases: HandleAutoReconnectMixin, Database

Get a database by client and name.

Raises TypeError if name is not an instance of basestring (str in python 3). Raises InvalidName if name is not a valid database name.

Parameters
  • client: A MongoClient instance.

  • name: The database name.

  • codec_options (optional): An instance of CodecOptions. If None (the default) client.codec_options is used.

  • read_preference (optional): The read preference to use. If None (the default) client.read_preference is used.

  • write_concern (optional): An instance of WriteConcern. If None (the default) client.write_concern is used.

  • read_concern (optional): An instance of ReadConcern. If None (the default) client.read_concern is used.

See also

The MongoDB documentation on databases.

Changed in version 4.0: Removed the eval, system_js, error, last_status, previous_error, reset_error_history, authenticate, logout, collection_names, current_op, add_user, remove_user, profiling_level, set_profiling_level, and profiling_info methods. See the pymongo4-migration-guide.

Changed in version 3.2: Added the read_concern option.

Changed in version 3.0: Added the codec_options, read_preference, and write_concern options. Database no longer returns an instance of Collection for attribute names with leading underscores. You must use dict-style lookups instead::

db[‘__my_collection__’]

Not:

db.__my_collection__

collection_names(include_system_collections=True, session=None)[source]
class biothings.utils.mongo.DatabaseClient(*args, **kwargs)[source]

Bases: HandleAutoReconnectMixin, MongoClient, IDatabase

Client for a MongoDB instance, a replica set, or a set of mongoses.

Warning

Starting in PyMongo 4.0, directConnection now has a default value of False instead of None. For more details, see the relevant section of the PyMongo 4.x migration guide: pymongo4-migration-direct-connection.

The client object is thread-safe and has connection-pooling built in. If an operation fails because of a network error, ConnectionFailure is raised and the client reconnects in the background. Application code should handle this exception (recognizing that the operation failed) and then continue to execute.

The host parameter can be a full mongodb URI, in addition to a simple hostname. It can also be a list of hostnames but no more than one URI. Any port specified in the host string(s) will override the port parameter. For username and passwords reserved characters like ‘:’, ‘/’, ‘+’ and ‘@’ must be percent encoded following RFC 2396:

from urllib.parse import quote_plus

uri = "mongodb://%s:%s@%s" % (
    quote_plus(user), quote_plus(password), host)
client = MongoClient(uri)

Unix domain sockets are also supported. The socket path must be percent encoded in the URI:

uri = "mongodb://%s:%s@%s" % (
    quote_plus(user), quote_plus(password), quote_plus(socket_path))
client = MongoClient(uri)

But not when passed as a simple hostname:

client = MongoClient('/tmp/mongodb-27017.sock')

Starting with version 3.6, PyMongo supports mongodb+srv:// URIs. The URI must include one, and only one, hostname. The hostname will be resolved to one or more DNS SRV records which will be used as the seed list for connecting to the MongoDB deployment. When using SRV URIs, the authSource and replicaSet configuration options can be specified using TXT records. See the Initial DNS Seedlist Discovery spec for more details. Note that the use of SRV URIs implicitly enables TLS support. Pass tls=false in the URI to override.

Note

MongoClient creation will block waiting for answers from DNS when mongodb+srv:// URIs are used.

Note

Starting with version 3.0 the MongoClient constructor no longer blocks while connecting to the server or servers, and it no longer raises ConnectionFailure if they are unavailable, nor ConfigurationError if the user’s credentials are wrong. Instead, the constructor returns immediately and launches the connection process on background threads. You can check if the server is available like this:

from pymongo.errors import ConnectionFailure
client = MongoClient()
try:
    # The ping command is cheap and does not require auth.
    client.admin.command('ping')
except ConnectionFailure:
    print("Server not available")

Warning

When using PyMongo in a multiprocessing context, please read multiprocessing first.

Note

Many of the following options can be passed using a MongoDB URI or keyword parameters. If the same option is passed in a URI and as a keyword parameter the keyword parameter takes precedence.

Parameters
  • host (optional): hostname or IP address or Unix domain socket path of a single mongod or mongos instance to connect to, or a mongodb URI, or a list of hostnames (but no more than one mongodb URI). If host is an IPv6 literal it must be enclosed in ‘[’ and ‘]’ characters following the RFC2732 URL syntax (e.g. ‘[::1]’ for localhost). Multihomed and round robin DNS addresses are not supported.

  • port (optional): port number on which to connect

  • document_class (optional): default class to use for documents returned from queries on this client

  • tz_aware (optional): if True, datetime instances returned as values in a document by this MongoClient will be timezone aware (otherwise they will be naive)

  • connect (optional): if True (the default), immediately begin connecting to MongoDB in the background. Otherwise connect on the first operation.

  • type_registry (optional): instance of TypeRegistry to enable encoding and decoding of custom types.

  • datetime_conversion: Specifies how UTC datetimes should be decoded within BSON. Valid options include ‘datetime_ms’ to return as a DatetimeMS, ‘datetime’ to return as a datetime.datetime and raising a ValueError for out-of-range values, ‘datetime_auto’ to return DatetimeMS objects when the underlying datetime is out-of-range and ‘datetime_clamp’ to clamp to the minimum and maximum possible datetimes. Defaults to ‘datetime’. See handling-out-of-range-datetimes for details.

Other optional parameters can be passed as keyword arguments:
  • directConnection (optional): if True, forces this client to

    connect directly to the specified MongoDB host as a standalone. If false, the client connects to the entire replica set of which the given MongoDB host(s) is a part. If this is True and a mongodb+srv:// URI or a URI containing multiple seeds is provided, an exception will be raised.

  • maxPoolSize (optional): The maximum allowable number of concurrent connections to each connected server. Requests to a server will block if there are maxPoolSize outstanding connections to the requested server. Defaults to 100. Can be either 0 or None, in which case there is no limit on the number of concurrent connections.

  • minPoolSize (optional): The minimum required number of concurrent connections that the pool will maintain to each connected server. Default is 0.

  • maxIdleTimeMS (optional): The maximum number of milliseconds that a connection can remain idle in the pool before being removed and replaced. Defaults to None (no limit).

  • maxConnecting (optional): The maximum number of connections that each pool can establish concurrently. Defaults to 2.

  • timeoutMS: (integer or None) Controls how long (in milliseconds) the driver will wait when executing an operation (including retry attempts) before raising a timeout error. 0 or None means no timeout.

  • socketTimeoutMS: (integer or None) Controls how long (in milliseconds) the driver will wait for a response after sending an ordinary (non-monitoring) database operation before concluding that a network error has occurred. 0 or None means no timeout. Defaults to None (no timeout).

  • connectTimeoutMS: (integer or None) Controls how long (in milliseconds) the driver will wait during server monitoring when connecting a new socket to a server before concluding the server is unavailable. 0 or None means no timeout. Defaults to 20000 (20 seconds).

  • server_selector: (callable or None) Optional, user-provided function that augments server selection rules. The function should accept as an argument a list of ServerDescription objects and return a list of server descriptions that should be considered suitable for the desired operation.

  • serverSelectionTimeoutMS: (integer) Controls how long (in milliseconds) the driver will wait to find an available, appropriate server to carry out a database operation; while it is waiting, multiple server monitoring operations may be carried out, each controlled by connectTimeoutMS. Defaults to 30000 (30 seconds).

  • waitQueueTimeoutMS: (integer or None) How long (in milliseconds) a thread will wait for a socket from the pool if the pool has no free sockets. Defaults to None (no timeout).

  • heartbeatFrequencyMS: (optional) The number of milliseconds between periodic server checks, or None to accept the default frequency of 10 seconds.

  • appname: (string or None) The name of the application that created this MongoClient instance. The server will log this value upon establishing each connection. It is also recorded in the slow query log and profile collections.

  • driver: (pair or None) A driver implemented on top of PyMongo can pass a DriverInfo to add its name, version, and platform to the message printed in the server log when establishing a connection.

  • event_listeners: a list or tuple of event listeners. See monitoring for details.

  • retryWrites: (boolean) Whether supported write operations executed within this MongoClient will be retried once after a network error. Defaults to True. The supported write operations are:

    • bulk_write(), as long as UpdateMany or DeleteMany are not included.

    • delete_one()

    • insert_one()

    • insert_many()

    • replace_one()

    • update_one()

    • find_one_and_delete()

    • find_one_and_replace()

    • find_one_and_update()

    Unsupported write operations include, but are not limited to, aggregate() using the $out pipeline operator and any operation with an unacknowledged write concern (e.g. {w: 0})). See https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst

  • retryReads: (boolean) Whether supported read operations executed within this MongoClient will be retried once after a network error. Defaults to True. The supported read operations are: find(), find_one(), aggregate() without $out, distinct(), count(), estimated_document_count(), count_documents(), pymongo.collection.Collection.watch(), list_indexes(), pymongo.database.Database.watch(), list_collections(), pymongo.mongo_client.MongoClient.watch(), and list_databases().

    Unsupported read operations include, but are not limited to command() and any getMore operation on a cursor.

    Enabling retryable reads makes applications more resilient to transient errors such as network failures, database upgrades, and replica set failovers. For an exact definition of which errors trigger a retry, see the retryable reads specification.

  • compressors: Comma separated list of compressors for wire protocol compression. The list is used to negotiate a compressor with the server. Currently supported options are “snappy”, “zlib” and “zstd”. Support for snappy requires the python-snappy package. zlib support requires the Python standard library zlib module. zstd requires the zstandard package. By default no compression is used. Compression support must also be enabled on the server. MongoDB 3.6+ supports snappy and zlib compression. MongoDB 4.2+ adds support for zstd.

  • zlibCompressionLevel: (int) The zlib compression level to use when zlib is used as the wire protocol compressor. Supported values are -1 through 9. -1 tells the zlib library to use its default compression level (usually 6). 0 means no compression. 1 is best speed. 9 is best compression. Defaults to -1.

  • uuidRepresentation: The BSON representation to use when encoding from and decoding to instances of UUID. Valid values are the strings: “standard”, “pythonLegacy”, “javaLegacy”, “csharpLegacy”, and “unspecified” (the default). New applications should consider setting this to “standard” for cross language compatibility. See handling-uuid-data-example for details.

  • unicode_decode_error_handler: The error handler to apply when a Unicode-related error occurs during BSON decoding that would otherwise raise UnicodeDecodeError. Valid options include ‘strict’, ‘replace’, ‘backslashreplace’, ‘surrogateescape’, and ‘ignore’. Defaults to ‘strict’.

  • srvServiceName: (string) The SRV service name to use for “mongodb+srv://” URIs. Defaults to “mongodb”. Use it like so:

    MongoClient("mongodb+srv://example.com/?srvServiceName=customname")
    
Write Concern options:
(Only set if passed. No default values.)
  • w: (integer or string) If this is a replica set, write operations will block until they have been replicated to the specified number or tagged set of servers. w=<int> always includes the replica set primary (e.g. w=3 means write to the primary and wait until replicated to two secondaries). Passing w=0 disables write acknowledgement and all other write concern options.

  • wTimeoutMS: (integer) Used in conjunction with w. Specify a value in milliseconds to control how long to wait for write propagation to complete. If replication does not complete in the given timeframe, a timeout exception is raised. Passing wTimeoutMS=0 will cause write operations to wait indefinitely.

  • journal: If True block until write operations have been committed to the journal. Cannot be used in combination with fsync. Write operations will fail with an exception if this option is used when the server is running without journaling.

  • fsync: If True and the server is running without journaling, blocks until the server has synced all data files to disk. If the server is running with journaling, this acts the same as the j option, blocking until write operations have been committed to the journal. Cannot be used in combination with j.

Replica set keyword arguments for connecting with a replica set - either directly or via a mongos:
  • replicaSet: (string or None) The name of the replica set to connect to. The driver will verify that all servers it connects to match this name. Implies that the hosts specified are a seed list and the driver should attempt to find all members of the set. Defaults to None.

Read Preference:
  • readPreference: The replica set read preference for this client. One of primary, primaryPreferred, secondary, secondaryPreferred, or nearest. Defaults to primary.

  • readPreferenceTags: Specifies a tag set as a comma-separated list of colon-separated key-value pairs. For example dc:ny,rack:1. Defaults to None.

  • maxStalenessSeconds: (integer) The maximum estimated length of time a replica set secondary can fall behind the primary in replication before it will no longer be selected for operations. Defaults to -1, meaning no maximum. If maxStalenessSeconds is set, it must be a positive integer greater than or equal to 90 seconds.

See also

/examples/server_selection

Authentication:
  • username: A string.

  • password: A string.

    Although username and password must be percent-escaped in a MongoDB URI, they must not be percent-escaped when passed as parameters. In this example, both the space and slash special characters are passed as-is:

    MongoClient(username="user name", password="pass/word")
    
  • authSource: The database to authenticate on. Defaults to the database specified in the URI, if provided, or to “admin”.

  • authMechanism: See MECHANISMS for options. If no mechanism is specified, PyMongo automatically SCRAM-SHA-1 when connected to MongoDB 3.6 and negotiates the mechanism to use (SCRAM-SHA-1 or SCRAM-SHA-256) when connected to MongoDB 4.0+.

  • authMechanismProperties: Used to specify authentication mechanism specific options. To specify the service name for GSSAPI authentication pass authMechanismProperties=’SERVICE_NAME:<service name>’. To specify the session token for MONGODB-AWS authentication pass authMechanismProperties='AWS_SESSION_TOKEN:<session token>'.

See also

/examples/authentication

TLS/SSL configuration:
  • tls: (boolean) If True, create the connection to the server using transport layer security. Defaults to False.

  • tlsInsecure: (boolean) Specify whether TLS constraints should be relaxed as much as possible. Setting tlsInsecure=True implies tlsAllowInvalidCertificates=True and tlsAllowInvalidHostnames=True. Defaults to False. Think very carefully before setting this to True as it dramatically reduces the security of TLS.

  • tlsAllowInvalidCertificates: (boolean) If True, continues the TLS handshake regardless of the outcome of the certificate verification process. If this is False, and a value is not provided for tlsCAFile, PyMongo will attempt to load system provided CA certificates. If the python version in use does not support loading system CA certificates then the tlsCAFile parameter must point to a file of CA certificates. tlsAllowInvalidCertificates=False implies tls=True. Defaults to False. Think very carefully before setting this to True as that could make your application vulnerable to on-path attackers.

  • tlsAllowInvalidHostnames: (boolean) If True, disables TLS hostname verification. tlsAllowInvalidHostnames=False implies tls=True. Defaults to False. Think very carefully before setting this to True as that could make your application vulnerable to on-path attackers.

  • tlsCAFile: A file containing a single or a bundle of “certification authority” certificates, which are used to validate certificates passed from the other end of the connection. Implies tls=True. Defaults to None.

  • tlsCertificateKeyFile: A file containing the client certificate and private key. Implies tls=True. Defaults to None.

  • tlsCRLFile: A file containing a PEM or DER formatted certificate revocation list. Only supported by python 2.7.9+ (pypy 2.5.1+). Implies tls=True. Defaults to None.

  • tlsCertificateKeyFilePassword: The password or passphrase for decrypting the private key in tlsCertificateKeyFile. Only necessary if the private key is encrypted. Only supported by python 2.7.9+ (pypy 2.5.1+) and 3.3+. Defaults to None.

  • tlsDisableOCSPEndpointCheck: (boolean) If True, disables certificate revocation status checking via the OCSP responder specified on the server certificate. tlsDisableOCSPEndpointCheck=False implies tls=True. Defaults to False.

  • ssl: (boolean) Alias for tls.

Read Concern options:
(If not set explicitly, this will use the server default)
  • readConcernLevel: (string) The read concern level specifies the level of isolation for read operations. For example, a read operation using a read concern level of majority will only return data that has been written to a majority of nodes. If the level is left unspecified, the server default will be used.

Client side encryption options:
(If not set explicitly, client side encryption will not be enabled.)
  • auto_encryption_opts: A AutoEncryptionOpts which configures this client to automatically encrypt collection commands and automatically decrypt results. See automatic-client-side-encryption for an example. If a MongoClient is configured with auto_encryption_opts and a non-None maxPoolSize, a separate internal MongoClient is created if any of the following are true:

    • A key_vault_client is not passed to AutoEncryptionOpts

    • bypass_auto_encrpytion=False is passed to AutoEncryptionOpts

Stable API options:
(If not set explicitly, Stable API will not be enabled.)
  • server_api: A ServerApi which configures this client to use Stable API. See versioned-api-ref for details.

See also

The MongoDB documentation on connections.

Changed in version 4.2: Added the timeoutMS keyword argument.

Changed in version 4.0:

  • Removed the fsync, unlock, is_locked, database_names, and close_cursor methods. See the pymongo4-migration-guide.

  • Removed the waitQueueMultiple and socketKeepAlive keyword arguments.

  • The default for uuidRepresentation was changed from pythonLegacy to unspecified.

  • Added the srvServiceName and maxConnecting URI and keyword argument.

Changed in version 3.12: Added the server_api keyword argument. The following keyword arguments were deprecated:

  • ssl_certfile and ssl_keyfile were deprecated in favor of tlsCertificateKeyFile.

Changed in version 3.11: Added the following keyword arguments and URI options:

  • tlsDisableOCSPEndpointCheck

  • directConnection

Changed in version 3.9: Added the retryReads keyword argument and URI option. Added the tlsInsecure keyword argument and URI option. The following keyword arguments and URI options were deprecated:

  • wTimeout was deprecated in favor of wTimeoutMS.

  • j was deprecated in favor of journal.

  • ssl_cert_reqs was deprecated in favor of tlsAllowInvalidCertificates.

  • ssl_match_hostname was deprecated in favor of tlsAllowInvalidHostnames.

  • ssl_ca_certs was deprecated in favor of tlsCAFile.

  • ssl_certfile was deprecated in favor of tlsCertificateKeyFile.

  • ssl_crlfile was deprecated in favor of tlsCRLFile.

  • ssl_pem_passphrase was deprecated in favor of tlsCertificateKeyFilePassword.

Changed in version 3.9: retryWrites now defaults to True.

Changed in version 3.8: Added the server_selector keyword argument. Added the type_registry keyword argument.

Changed in version 3.7: Added the driver keyword argument.

Changed in version 3.6: Added support for mongodb+srv:// URIs. Added the retryWrites keyword argument and URI option.

Changed in version 3.5: Add username and password options. Document the authSource, authMechanism, and authMechanismProperties options. Deprecated the socketKeepAlive keyword argument and URI option. socketKeepAlive now defaults to True.

Changed in version 3.0: MongoClient is now the one and only client class for a standalone server, mongos, or replica set. It includes the functionality that had been split into MongoReplicaSetClient: it can connect to a replica set, discover all its members, and monitor the set for stepdowns, elections, and reconfigs.

The MongoClient constructor no longer blocks while connecting to the server or servers, and it no longer raises ConnectionFailure if they are unavailable, nor ConfigurationError if the user’s credentials are wrong. Instead, the constructor returns immediately and launches the connection process on background threads.

Therefore the alive method is removed since it no longer provides meaningful information; even if the client is disconnected, it may discover a server in time to fulfill the next operation.

In PyMongo 2.x, MongoClient accepted a list of standalone MongoDB servers and used the first it could connect to:

MongoClient(['host1.com:27017', 'host2.com:27017'])

A list of multiple standalones is no longer supported; if multiple servers are listed they must be members of the same replica set, or mongoses in the same sharded cluster.

The behavior for a list of mongoses is changed from “high availability” to “load balancing”. Before, the client connected to the lowest-latency mongos in the list, and used it until a network error prompted it to re-evaluate all mongoses’ latencies and reconnect to one of them. In PyMongo 3, the client monitors its network latency to all the mongoses continuously, and distributes operations evenly among those with the lowest latency. See mongos-load-balancing for more information.

The connect option is added.

The start_request, in_request, and end_request methods are removed, as well as the auto_start_request option.

The copy_database method is removed, see the copy_database examples for alternatives.

The MongoClient.disconnect() method is removed; it was a synonym for close().

MongoClient no longer returns an instance of Database for attribute names with leading underscores. You must use dict-style lookups instead:

client['__my_database__']

Not:

client.__my_database__
class biothings.utils.mongo.DummyCollection[source]

Bases: dotdict

count()[source]
drop()[source]
class biothings.utils.mongo.DummyDatabase[source]

Bases: dotdict

collection_names()[source]
class biothings.utils.mongo.HandleAutoReconnectMixin(*args, **kwargs)[source]

Bases: object

This mixin will decor any non-hidden method with handle_autoreconnect decorator

exception biothings.utils.mongo.MaxRetryAutoReconnectException(message: str = '', errors: Optional[Union[Mapping[str, Any], Sequence]] = None)[source]

Bases: AutoReconnect

Raised when we reach maximum retry to connect to Mongo server

biothings.utils.mongo.check_document_size(doc)[source]

Return True if doc isn’t too large for mongo DB

biothings.utils.mongo.doc_feeder(collection, step=1000, s=None, e=None, inbatch=False, query=None, batch_callback=None, fields=None, logger=<module 'logging' from '/home/docs/.asdf/installs/python/3.10.4/lib/python3.10/logging/__init__.py'>)[source]

A iterator for returning docs in a collection, with batch query. additional filter query can be passed via “query”, e.g., doc_feeder(collection, query={‘taxid’: {‘$in’: [9606, 10090, 10116]}}) batch_callback is a callback function as fn(cnt, t), called after every batch fields is optional parameter passed to find to restrict fields to return.

biothings.utils.mongo.get_api(conn=None)[source]
biothings.utils.mongo.get_cache_filename(col_name)[source]
biothings.utils.mongo.get_cmd(conn=None)[source]
biothings.utils.mongo.get_conn(server, port)[source]
biothings.utils.mongo.get_data_plugin(conn=None)[source]
biothings.utils.mongo.get_event(conn=None)[source]
biothings.utils.mongo.get_hub_config(conn=None)[source]
biothings.utils.mongo.get_hub_db_conn()[source]
biothings.utils.mongo.get_last_command(conn=None)[source]
biothings.utils.mongo.get_previous_collection(new_id)[source]

Given ‘new_id’, an _id from src_build, as the “new” collection, automatically select an “old” collection. By default, src_build’s documents will be sorted according to their name (_id) and old colleciton is the one just before new_id. Note: because there can more than one build config used, the actual build config name is first determined using new_id collection name, then the find.sort is done on collections containing that build config name.

biothings.utils.mongo.get_source_fullname(col_name)[source]

Assuming col_name is a collection created from an upload process, find the main source & sub_source associated.

biothings.utils.mongo.get_source_fullnames(col_names)[source]
biothings.utils.mongo.get_src_build(conn=None)[source]
biothings.utils.mongo.get_src_build_config(conn=None)[source]
biothings.utils.mongo.get_src_conn()[source]
biothings.utils.mongo.get_src_db(conn=None)[source]
biothings.utils.mongo.get_src_dump(conn=None)[source]
biothings.utils.mongo.get_src_master(conn=None)[source]
biothings.utils.mongo.get_target_conn()[source]
biothings.utils.mongo.get_target_db(conn=None)[source]
biothings.utils.mongo.get_target_master(conn=None)[source]
biothings.utils.mongo.handle_autoreconnect(cls_instance, func)[source]

After upgrading the pymongo package from 3.12 to 4.x, the AutoReconnect: “connection pool paused” problem appears quite often. It is not clear that the problem happens with our codebase, maybe a pymongo’s problem.

This function is an attempt to handle the AutoReconnect exception, without modifying our codebase. When the exception is raised, we just wait for some time, then retry. If the error still happens after MAX_RETRY, it must be a connection-related problem. We should stop retrying and raise error.

Ref: https://github.com/newgene/biothings.api/pull/40#issuecomment-1185334545

biothings.utils.mongo.id_feeder(col, batch_size=1000, build_cache=True, logger=<module 'logging' from '/home/docs/.asdf/installs/python/3.10.4/lib/python3.10/logging/__init__.py'>, force_use=False, force_build=False, validate_only=False)[source]

Return an iterator for all _ids in collection “col” Search for a valid cache file if available, if not return a doc_feeder for that collection. Valid cache is a cache file that is newer than the collection. “db” can be “target” or “src”. “build_cache” True will build a cache file as _ids are fetched, if no cache file was found “force_use” True will use any existing cache file and won’t check whether it’s valid of not. “force_build” True will build a new cache even if current one exists and is valid. “validate_only” will directly return [] if the cache is valid (convenient way to check if the cache is valid)

biothings.utils.mongo.invalidate_cache(col_name, col_type='src')[source]
biothings.utils.mongo.requires_config(func)[source]

biothings.utils.parallel

biothings.utils.parallel_mp

class biothings.utils.parallel_mp.ErrorHandler(errpath, chunk_num)[source]

Bases: object

handle(exception)[source]
class biothings.utils.parallel_mp.ParallelResult(agg_function, agg_function_init)[source]

Bases: object

aggregate(curr)[source]
biothings.utils.parallel_mp.agg_by_append(prev, curr)[source]
biothings.utils.parallel_mp.agg_by_sum(prev, curr)[source]
biothings.utils.parallel_mp.run_parallel_on_ids_dir(fun, ids_dir, backend_options=None, agg_function=<function agg_by_append>, agg_function_init=[], outpath=None, num_workers=2, mget_chunk_size=10000, ignore_None=True, error_path=None, **query_kwargs)[source]

This function will run function fun on chunks defined by the files in ids_dir.

All parameters are fed to run_parallel_on_iterable, except:

Params ids_dir

Directory containing only files with ids, one per line. The number of files defines the number of chunks.

biothings.utils.parallel_mp.run_parallel_on_ids_file(fun, ids_file, backend_options=None, agg_function=<function agg_by_append>, agg_function_init=[], chunk_size=1000000, num_workers=2, outpath=None, mget_chunk_size=10000, ignore_None=True, error_path=None, **query_kwargs)[source]

Implementation of run_parallel_on_iterable, where iterable comes from the lines of a file.

All parameters are fed to run_on_ids_iterable, except:

Parameters

ids_file – Path to file with ids, one per line.

biothings.utils.parallel_mp.run_parallel_on_iterable(fun, iterable, backend_options=None, agg_function=<function agg_by_append>, agg_function_init=[], chunk_size=1000000, num_workers=2, outpath=None, mget_chunk_size=10000, ignore_None=True, error_path=None, **query_kwargs)[source]

This function will run a user function on all documents in a backend database in parallel using multiprocessing.Pool. The overview of the process looks like this:

Chunk (into chunks of size “chunk_size”) items in iterable, and run the following script on each chunk using a multiprocessing.Pool object with “num_workers” processes:

For each document in list of ids in this chunk (documents retrived in chunks of “mget_chunk_size”):

Run function “fun” with parameters (doc, chunk_num, f <file handle only passed if “outpath” is not None>), and aggregate the result with the current results using function “agg_function”.

Parameters
  • fun – The function to run on all documents. If outpath is NOT specified, fun must accept two parameters: (doc, chunk_num), where doc is the backend document, and chunk_num is essentially a unique process id. If outpath IS specified, an additional open file handle (correctly tagged with the current chunk’s chunk_num) will also be passed to fun, and thus it must accept three parameters: (doc, chunk_num, f)

  • iterable – Iterable of ids.

  • backend_options – An instance of biothings.utils.backend.DocBackendOptions. This contains the options necessary to instantiate the correct backend class (ES, mongo, etc).

  • agg_function – This function aggregates the return value of each run of function fun. It should take 2 parameters: (prev, curr), where prev is the previous aggregated result, and curr is the output of the current function run. It should return some value that represents the aggregation of the previous aggregated results with the output of the current function.

  • agg_function_init – Initialization value for the aggregated result.

  • chunk_size – Length of the ids list sent to each chunk.

  • num_workers – Number of processes that consume chunks in parallel. https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool

  • outpath – Base path for output files. Because function fun can be run many times in parallel, each chunk is sequentially numbered, and the output file name for any chunk is outpath_{chunk_num}, e.g., if outpath is out, all output files will be of the form: /path/to/cwd/out_1, /path/to/cwd/out_2, etc.

  • error_path – Base path for error files. If included, exceptions inside each chunk thread will be printed to these files.

  • mget_chunk_size – The size of each mget chunk inside each chunk thread. In each thread, the ids list is consumed by passing chunks to a mget_by_ids function. This parameter controls the size of each mget.

  • ignore_None – If set, then falsy values will not be aggregated (0, [], None, etc) in the aggregation step. Default True.

All other parameters are fed to the backend query.

biothings.utils.parallel_mp.run_parallel_on_query(fun, backend_options=None, query=None, agg_function=<function agg_by_append>, agg_function_init=[], chunk_size=1000000, num_workers=2, outpath=None, mget_chunk_size=10000, ignore_None=True, error_path=None, full_doc=False, **query_kwargs)[source]

Implementation of run_parallel_on_ids_iterable, where the ids iterable comes from the result of a query on the specified backend.

All parameters are fed to run_parallel_on_ids_iterable, except:

Parameters
  • query – ids come from results of this query run on backend, default: “match_all”

  • full_doc – If True, a list of documents is passed to each subprocess, rather than ids that are looked up later. Should be faster? Unknown how this works with very large query sets…

biothings.utils.parsers

biothings.utils.parsers.json_array_parser(patterns: Optional[Iterable[str]] = None) Callable[[str], Generator[dict, None, None]][source]

Create JSON Array Parser given filename patterns

For use with manifest.json based plugins. The data comes in a JSON that is an JSON array, containing multiple documents.

Parameters

patterns – glob-compatible patterns for filenames, like .json, data.json

Returns

parser_func

biothings.utils.parsers.ndjson_parser(patterns: Optional[Iterable[str]] = None) Callable[[str], Generator[dict, None, None]][source]

Create NDJSON Parser given filename patterns

For use with manifest.json based plugins. Caveat: Only handles valid NDJSON (no extra newlines, UTF8, etc.)

Parameters

patterns – glob-compatible patterns for filenames, like .ndjson, data.ndjson

Returns

Generator that takes in a data_folder and returns documents from

NDJSON files that matches the filename patterns

Return type

parser_func

biothings.utils.redis

class biothings.utils.redis.RedisClient(connection_params)[source]

Bases: object

check()[source]
client = None
classmethod get_client(params)[source]
get_db(db_name=None)[source]

Return a redict client instance from a database name or database number (if db_name is an integer)

initialize(deep=False)[source]

Careful: this may delete data. Prepare Redis instance to work with biothings hub: - database 0: this db is used to store a mapping between

database index and database name (so a database can be accessed by name). This method will flush this db and prepare it.

  • any other databases will be flushed if deep is True, making the redis server fully dedicated to

property mapdb
pick_db()[source]

Return a database number, preferably not used (db doesn’t exist). If no database available (all are used), will be one and flush it…

exception biothings.utils.redis.RedisClientError[source]

Bases: Exception

biothings.utils.serializer

class biothings.utils.serializer.URL(seq)[source]

Bases: UserString

remove(param='format')[source]
biothings.utils.serializer.orjson_default(o)[source]

The default function passed to orjson to serialize non-serializable objects

biothings.utils.serializer.to_json(data, indent=False, sort_keys=False)[source]
biothings.utils.serializer.to_json_0(data)[source]

deprecated

biothings.utils.serializer.to_json_file(data, fobj, indent=False, sort_keys=False)[source]
biothings.utils.serializer.to_msgpack(data)[source]
biothings.utils.serializer.to_yaml(data, stream=None, Dumper=<class 'yaml.dumper.SafeDumper'>, default_flow_style=False)[source]

biothings.utils.shelve

biothings.utils.sqlite3

class biothings.utils.sqlite3.Collection(colname, db)[source]

Bases: object

count()[source]
property database
drop()[source]
find(*args, **kwargs)[source]
find_one(*args, **kwargs)[source]
get_conn()[source]
insert_one(doc)[source]
property name
remove(query)[source]
replace_one(query, doc, upsert=False)[source]
save(doc)[source]
update(query, what)[source]
update_one(query, what, upsert=False)[source]
class biothings.utils.sqlite3.Database[source]

Bases: IDatabase

CONFIG = <ConfigurationWrapper over <module 'config' from '/home/docs/checkouts/readthedocs.org/user_builds/biothingsapi/checkouts/0.12.x/biothings/hub/default_config.py'>>
property address

Returns sufficient information so a connection to a database can be created. Information can be a dictionary, object, etc… and depends on the actual backend

collection_names()[source]

Return a list of all collections (or tables) found in this database

create_collection(colname)[source]

Create a table/colleciton named colname. If backend is using a schema-based database (ie. SQL), backend should enforce the schema with at least field “_id” as the primary key (as a string).

create_if_needed(table)[source]
get_conn()[source]
biothings.utils.sqlite3.get_api()[source]
biothings.utils.sqlite3.get_cmd()[source]
biothings.utils.sqlite3.get_data_plugin()[source]
biothings.utils.sqlite3.get_event()[source]
biothings.utils.sqlite3.get_hub_config()[source]
biothings.utils.sqlite3.get_hub_db_conn()[source]
biothings.utils.sqlite3.get_last_command()[source]
biothings.utils.sqlite3.get_source_fullname(col_name)[source]

Assuming col_name is a collection created from an upload process, find the main source & sub_source associated.

biothings.utils.sqlite3.get_src_build()[source]
biothings.utils.sqlite3.get_src_build_config()[source]
biothings.utils.sqlite3.get_src_dump()[source]
biothings.utils.sqlite3.get_src_master()[source]

biothings.utils.version

Functions to return versions of things.

biothings.utils.version.check_new_version(folder, max_commits=10)[source]

Given a folder pointing to a Git repo, return a dict containing info about remote commits not qpplied yet to the repo, or empty dict if nothing new.

biothings.utils.version.get_biothings_commit()[source]

Gets the biothings commit information.

biothings.utils.version.get_python_exec_version()[source]

return Python version

biothings.utils.version.get_python_version()[source]

Get a list of python packages installed and their versions.

biothings.utils.version.get_repository_information(app_dir=None)[source]

Get the repository information for the local repository, if it exists.

biothings.utils.version.get_software_info(app_dir=None)[source]

return current application info

biothings.utils.version.get_source_code_info(src_file)[source]

Given a path to a source code, try to find information about repository, revision, URL pointing to that file, etc… Return None if nothing can be determined. Tricky cases:

  • src_file could refer to another repo, within current repo (namely a remote data plugin, cloned within the api’s plugins folder

  • src_file could point to a folder, when for instance a dataplugin is analized. This is because we can’t point to an uploader file since it’s dynamically generated

biothings.utils.version.get_version(folder)[source]

return revision of a git folder

biothings.utils.version.set_versions(config, app_folder)[source]

Propagate versions (git branch name) in config module. Also set app and biothings folder paths (though not exposed as a config param since they are lower-cased, see biothings.__init__.py, regex PARAM_PAT)