biothings.hub.dataindex

biothings.hub.dataindex.idcache

class biothings.hub.dataindex.idcache.IDCache[source]

Bases: object

load(name, id_provider, flush=True)[source]

name is the cache name id_provider returns batch of ids, ie. list(_ids) flush to delete existing cache

mark_done(_ids)[source]
class biothings.hub.dataindex.idcache.RedisIDCache(name, connection_params)[source]

Bases: IDCache

load(id_provider, flush=True)[source]

name is the cache name id_provider returns batch of ids, ie. list(_ids) flush to delete existing cache

mark_done(_ids)[source]

biothings.hub.dataindex.indexer_cleanup

class biothings.hub.dataindex.indexer_cleanup.CleanUpResult(iterable=(), /)[source]

Bases: list

class biothings.hub.dataindex.indexer_cleanup.Cleaner(collection, indexers, logger=None)[source]

Bases: object

async clean(cleanups)[source]
find(env=None, keep=3, **filters)[source]
static plain_text(cleanups)[source]
biothings.hub.dataindex.indexer_cleanup.test_clean()[source]
biothings.hub.dataindex.indexer_cleanup.test_find()[source]
biothings.hub.dataindex.indexer_cleanup.test_str()[source]

biothings.hub.dataindex.indexer_payload

class biothings.hub.dataindex.indexer_payload.IndexMappings(dict=None, /, **kwargs)[source]

Bases: _IndexPayload

async finalize(client)[source]

Generate the ES payload format of the corresponding entities originally in Hub representation. May require querying the ES client for certain metadata to determine the compatible data format.

class biothings.hub.dataindex.indexer_payload.IndexSettings(dict=None, /, **kwargs)[source]

Bases: _IndexPayload

async finalize(client)[source]

Generate the ES payload format of the corresponding entities originally in Hub representation. May require querying the ES client for certain metadata to determine the compatible data format.

biothings.hub.dataindex.indexer

class biothings.hub.dataindex.indexer.ColdHotIndexer(build_doc, indexer_env, index_name)[source]

Bases: object

MongoDB to Elasticsearch 2-pass Indexer. (

1st pass: <MongoDB Cold Collection>, # static data 2nd pass: <MongoDB Hot Collection> # changing data

) =>

<Elasticsearch Index>

INDEXER

alias of Indexer

async index(job_manager, batch_size=10000, steps=('pre', 'index', 'post'), ids=None, mode=None, **kwargs)[source]
class biothings.hub.dataindex.indexer.DynamicIndexerFactory(urls, es_host, suffix='_current')[source]

Bases: object

In the context of autohub/standalone instances, create indexer with parameters taken from versions.json URL. A list of URLs is provided so the factory knows how to create these indexers for each URLs. There’s no way to “guess” an ES host from a URL, so this parameter must be specified as well, common to all URLs “suffix” param is added at the end of index names.

create(name)[source]
class biothings.hub.dataindex.indexer.IndexManager(*args, **kwargs)[source]

Bases: BaseManager

An example of config dict for this module. {

“indexer_select”: {

None: “hub.dataindex.indexer.DrugIndexer”, # default “build_config.cold_collection” : “mv.ColdHotVariantIndexer”,

}, “env”: {

“prod”: {

“host”: “localhost:9200”, “indexer”: {

“args”: {

“timeout”: 300, “retry_on_timeout”: True, “max_retries”: 10,

}, “bulk”: {

“chunk_size”: 50 “raise_on_exception”: False

}, “concurrency”: 3

}, “index”: [

# for information only, only used in index_info {“index”: “mydrugs_current”, “doc_type”: “drug”}, {“index”: “mygene_current”, “doc_type”: “gene”}

],

}, “dev”: { … }

}

}

DEFAULT_INDEXER

alias of Indexer

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

cleanup(env=None, keep=3, dryrun=True, **filters)[source]

Delete old indices except for the most recent ones.

Examples

>>> index_cleanup()
>>> index_cleanup("production")
>>> index_cleanup("local", build_config="demo")
>>> index_cleanup("local", keep=0)
>>> index_cleanup(_id="<elasticsearch_index>")
configure(conf)[source]
get_indexes_by_name(index_name=None, env_name=None, limit=10)[source]

Accept an index_name and return a list of indexes get from all elasticsearch environments or from specific elasticsearch environment.

If index_name is blank, it will be return all indexes. limit can be used to specify how many indexes should be return.

The list of indexes will be like this: [

{

“index_name”: “…”, “build_version”: “…”, “count”: 1000, “creation_date”: 1653468868933, “environment”: {

“name”: “env name”, “host”: “localhost:9200”,

}

},

]

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

get_predicates()[source]
index(indexer_env, build_name, index_name=None, ids=None, **kwargs)[source]

Trigger an index creation to index the collection build_name and create an index named index_name (or build_name if None). Optional list of IDs can be passed to index specific documents.

index_info(remote=False)[source]

Show index manager config with enhanced index information.

update_metadata(indexer_env, index_name, build_name=None, _meta=None)[source]
Update _meta field of the index mappings, basing on
  1. the _meta value provided, including {}.

  2. the _meta value of the build_name in src_build.

  3. the _meta value of the build with the same name as the index.

Examples

update_metadata(“local”, “mynews_201228_vsdevjd”) update_metadata(“local”, “mynews_201228_vsdevjd”, _meta={}) update_metadata(“local”, “mynews_201228_vsdevjd”, _meta={“author”:”b”}) update_metadata(“local”, “mynews_201228_current”, “mynews_201228_vsdevjd”)

validate_mapping(mapping, env)[source]
class biothings.hub.dataindex.indexer.Indexer(build_doc, indexer_env, index_name)[source]

Bases: object

MongoDB -> Elasticsearch Indexer.

async do_index(job_manager, batch_size, ids, mode, **kwargs)[source]
async index(job_manager, **kwargs)[source]

Build an Elasticsearch index (self.es_index_name) with data from MongoDB collection (self.mongo_collection_name).

“ids” can be passed to selectively index documents.

“mode” can have the following values:
  • ‘purge’: will delete an index if it exists.

  • ‘resume’: will use an existing index and add missing documents.

  • ‘merge’: will merge data to an existing index.

  • ‘index’ (default): will create a new index.

async post_index(*args, **kwargs)[source]
async pre_index(*args, mode, **kwargs)[source]
setup_log()[source]
class biothings.hub.dataindex.indexer.IndexerCumulativeResult(dict=None, /, **kwargs)[source]

Bases: _IndexerResult

exception biothings.hub.dataindex.indexer.IndexerException[source]

Bases: Exception

class biothings.hub.dataindex.indexer.IndexerStepResult(dict=None, /, **kwargs)[source]

Bases: _IndexerResult

class biothings.hub.dataindex.indexer.MainIndexStep(indexer)[source]

Bases: Step

method: property(abc.abstractmethod(lambda _: ...)) = 'do_index'
name: property(abc.abstractmethod(lambda _: ...)) = 'index'
state

alias of MainIndexJSR

class biothings.hub.dataindex.indexer.PostIndexStep(indexer)[source]

Bases: Step

method: property(abc.abstractmethod(lambda _: ...)) = 'post_index'
name: property(abc.abstractmethod(lambda _: ...)) = 'post'
state

alias of PostIndexJSR

class biothings.hub.dataindex.indexer.PreIndexStep(indexer)[source]

Bases: Step

method: property(abc.abstractmethod(lambda _: ...)) = 'pre_index'
name: property(abc.abstractmethod(lambda _: ...)) = 'pre'
state

alias of PreIndexJSR

class biothings.hub.dataindex.indexer.ProcessInfo(indexer, concurrency)[source]

Bases: object

get_pinfo(step='', description='')[source]

Return dict containing information about the current process (used to report in the hub)

get_predicates()[source]
class biothings.hub.dataindex.indexer.Step(indexer)[source]

Bases: ABC

catelog = {'index': <class 'biothings.hub.dataindex.indexer.MainIndexStep'>, 'post': <class 'biothings.hub.dataindex.indexer.PostIndexStep'>, 'pre': <class 'biothings.hub.dataindex.indexer.PreIndexStep'>}
classmethod dispatch(name)[source]
async execute(*args, **kwargs)[source]
method: <property object at 0x7f2042654900>
name: <property object at 0x7f2042654950>
static order(steps)[source]
state: <property object at 0x7f2042654e00>

biothings.hub.dataindex.indexer_registrar

class biothings.hub.dataindex.indexer_registrar.IndexJobStateRegistrar(collection, build_name, index_name, **context)[source]

Bases: object

failed(error)[source]
static prune(collection)[source]
started(step='index')[source]
succeed(result)[source]
class biothings.hub.dataindex.indexer_registrar.MainIndexJSR(collection, build_name, index_name, **context)[source]

Bases: IndexJobStateRegistrar

started()[source]
class biothings.hub.dataindex.indexer_registrar.PostIndexJSR(collection, build_name, index_name, **context)[source]

Bases: IndexJobStateRegistrar

started()[source]
class biothings.hub.dataindex.indexer_registrar.PreIndexJSR(collection, build_name, index_name, **context)[source]

Bases: IndexJobStateRegistrar

started()[source]
succeed(result)[source]
class biothings.hub.dataindex.indexer_registrar.Stage(value)[source]

Bases: Enum

An enumeration.

DONE = 2
READY = 0
STARTED = 1
at(stage)[source]
biothings.hub.dataindex.indexer_registrar.test_registrar()[source]

biothings.hub.dataindex.indexer_schedule

class biothings.hub.dataindex.indexer_schedule.Schedule(total, batch_size)[source]

Bases: object

completed()[source]
suffix(string)[source]
biothings.hub.dataindex.indexer_schedule.test_01()[source]
biothings.hub.dataindex.indexer_schedule.test_02()[source]
biothings.hub.dataindex.indexer_schedule.test_03()[source]
biothings.hub.dataindex.indexer_schedule.test_04()[source]

biothings.hub.dataindex.indexer_task

class biothings.hub.dataindex.indexer_task.ESIndex(client, index_name, **bulk_index_args)[source]

Bases: ESIndex

mexists(ids)[source]

Return a list of tuples like [

(_id_0, True), (_id_1, False), (_id_2, True), ….

]

mget(ids)[source]

Return a list of documents like [

{ “_id”: “0”, “a”: “b” }, { “_id”: “1”, “c”: “d” }, # 404s are skipped

]

mindex(docs)[source]

Index and return the number of docs indexed.

class biothings.hub.dataindex.indexer_task.IndexingTask(es, mongo, ids, mode=None, logger=None, name='task')[source]

Bases: object

Index one batch of documents from MongoDB to Elasticsearch. The documents to index are specified by their ids.

dispatch()[source]
index()[source]
merge()[source]
resume()[source]
class biothings.hub.dataindex.indexer_task.Mode(value)[source]

Bases: Enum

An enumeration.

INDEX = 'index'
MERGE = 'merge'
PURGE = 'purge'
RESUME = 'resume'
biothings.hub.dataindex.indexer_task.dispatch(mg_client_args, mg_dbs_name, mg_col_name, es_client_args, es_blk_args, es_idx_name, ids, mode, name)[source]
biothings.hub.dataindex.indexer_task.test0()[source]
biothings.hub.dataindex.indexer_task.test1()[source]
biothings.hub.dataindex.indexer_task.test_00()[source]
biothings.hub.dataindex.indexer_task.test_clients()[source]

biothings.hub.dataindex.snapshooter

class biothings.hub.dataindex.snapshooter.Bucket(client, bucket, region=None)[source]

Bases: object

create(acl='private')[source]
exists()[source]
class biothings.hub.dataindex.snapshooter.CloudStorage(type: str, access_key: str, secret_key: str, region: str = 'us-west-2')[source]

Bases: object

access_key: str
get()[source]
region: str = 'us-west-2'
secret_key: str
type: str
class biothings.hub.dataindex.snapshooter.CumulativeResult(dict=None, /, **kwargs)[source]

Bases: _SnapshotResult

class biothings.hub.dataindex.snapshooter.ProcessInfo(env)[source]

Bases: object

JobManager Process Info. Reported in Biothings Studio.

get_pinfo(step, snapshot, description='')[source]
get_predicates()[source]
class biothings.hub.dataindex.snapshooter.RenderedStr(seq)[source]

Bases: _UserString

class biothings.hub.dataindex.snapshooter.RepositoryConfig(dict=None, /, **kwargs)[source]

Bases: UserDict

{

“type”: “s3”, “name”: “s3-$(Y)”, “settings”: {

“bucket”: “<SNAPSHOT_BUCKET_NAME>”, “base_path”: “mynews.info/$(Y)”, # per year

}

}

property bucket
format(doc=None)[source]

Template special values in this config.

For example: {

“bucket”: “backup-$(Y)”, “base_path” : “snapshots/%(_meta.build_version)s”

} where “_meta.build_version” value is taken from doc in dot field notation, and the current year replaces “$(Y)”.

property region
property repo
class biothings.hub.dataindex.snapshooter.SnapshotEnv(job_manager, cloud, repository, indexer, **kwargs)[source]

Bases: object

post_snapshot(cfg, index, snapshot, **kwargs)[source]
pre_snapshot(cfg, index, snapshot, **kwargs)[source]
setup_log(index)[source]
snapshot(index, snapshot=None, recreate_repo=False)[source]
class biothings.hub.dataindex.snapshooter.SnapshotManager(index_manager, *args, **kwargs)[source]

Bases: BaseManager

Hub ES Snapshot Management

Config Ex:

# env.<name>: {

“cloud”: {

“type”: “aws”, # default, only one supported. “access_key”: <——————>, “secret_key”: <——————>, “region”: “us-west-2”

}, “repository”: {

“name”: “s3-$(Y)”, “type”: “s3”, “settings”: {

“bucket”: “<SNAPSHOT_BUCKET_NAME>”, “base_path”: “mygene.info/$(Y)”, # year

}, “acl”: “private”,

}, “indexer”: {

“name”: “local”, “args”: {

“timeout”: 100, “max_retries”: 5

}

}, “monitor_delay”: 15,

}

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

cleanup(env=None, keep=3, group_by='build_config', dryrun=True, **filters)[source]

Delete past snapshots and keep only the most recent ones.

Examples

>>> snapshot_cleanup()
>>> snapshot_cleanup("s3_outbreak")
>>> snapshot_cleanup("s3_outbreak", keep=0)
configure(conf)[source]
delete_snapshots(snapshots_data)[source]
list_snapshots(env=None, **filters)[source]
static pending_snapshot(build_name)[source]
poll(state, func)[source]

Search for source in collection ‘col’ with a pending flag list containing ‘state’ and and call ‘func’ for each document found (with doc as only param)

snapshot(snapshot_env, index, snapshot=None, recreate_repo=False)[source]

Create a snapshot named “snapshot” (or, by default, same name as the index) from “index” according to environment definition (repository, etc…) “env”.

snapshot_a_build(build_doc)[source]

Create a snapshot basing on the autobuild settings in the build config. If the build config associated with this build has: {

“autobuild”: {

“type”: “snapshot”, // implied when env is set. env must be set. “env”: “local” // which es env to make the snapshot.

} Attempt to make a snapshot for this build on the specified es env “local”.

snapshot_info(env=None, remote=False)[source]
class biothings.hub.dataindex.snapshooter.StepResult(dict=None, /, **kwargs)[source]

Bases: _SnapshotResult

class biothings.hub.dataindex.snapshooter.TemplateStr(seq)[source]

Bases: _UserString

biothings.hub.dataindex.snapshot_cleanup

biothings.hub.dataindex.snapshot_cleanup.delete(collection, element, envs)[source]
biothings.hub.dataindex.snapshot_cleanup.find(collection, env=None, keep=3, group_by=None, return_db_cols=False, **filters)[source]
biothings.hub.dataindex.snapshot_cleanup.plain_text(element)[source]
biothings.hub.dataindex.snapshot_cleanup.test_find()[source]
biothings.hub.dataindex.snapshot_cleanup.test_print()[source]

biothings.hub.dataindex.snapshot_registrar

class biothings.hub.dataindex.snapshot_registrar.MainSnapshotState(col, _id)[source]

Bases: _TaskState

func = '_snapshot'
name = 'snapshot'
regx = True
step = 'snapshot'
class biothings.hub.dataindex.snapshot_registrar.PostSnapshotState(col, _id)[source]

Bases: _TaskState

func = 'post_snapshot'
name = 'post'
regx = True
step = 'post-snapshot'
class biothings.hub.dataindex.snapshot_registrar.PreSnapshotState(col, _id)[source]

Bases: _TaskState

func = 'pre_snapshot'
name = 'pre'
step = 'pre-snapshot'
biothings.hub.dataindex.snapshot_registrar.audit(src_build, logger=None)[source]
biothings.hub.dataindex.snapshot_registrar.dispatch(step)[source]
biothings.hub.dataindex.snapshot_registrar.test()[source]

biothings.hub.dataindex.snapshot_repo

class biothings.hub.dataindex.snapshot_repo.Repository(client, repository)[source]

Bases: object

create(**body)[source]
delete()[source]
exists()[source]
verify(config)[source]

A repository is consider properly setup and working, when: - passes verification of ElasticSearch - it’s settings must match with the snapshot’s config.

biothings.hub.dataindex.snapshot_repo.test_01()[source]
biothings.hub.dataindex.snapshot_repo.test_02()[source]

biothings.hub.dataindex.snapshot_task

class biothings.hub.dataindex.snapshot_task.Snapshot(client, repository, snapshot)[source]

Bases: object

create(indices)[source]
delete()[source]
exists()[source]
state()[source]
biothings.hub.dataindex.snapshot_task.test_01()[source]
biothings.hub.dataindex.snapshot_task.test_02()[source]