biothings.hub.dataindex
biothings.hub.dataindex.idcache
biothings.hub.dataindex.indexer_cleanup
biothings.hub.dataindex.indexer_payload
- class biothings.hub.dataindex.indexer_payload.IndexMappings(dict=None, /, **kwargs)[source]
Bases:
_IndexPayload
biothings.hub.dataindex.indexer
- class biothings.hub.dataindex.indexer.ColdHotIndexer(build_doc, indexer_env, index_name)[source]
Bases:
object
MongoDB to Elasticsearch 2-pass Indexer. (
1st pass: <MongoDB Cold Collection>, # static data 2nd pass: <MongoDB Hot Collection> # changing data
- ) =>
<Elasticsearch Index>
- class biothings.hub.dataindex.indexer.DynamicIndexerFactory(urls, es_host, suffix='_current')[source]
Bases:
object
In the context of autohub/standalone instances, create indexer with parameters taken from versions.json URL. A list of URLs is provided so the factory knows how to create these indexers for each URLs. There’s no way to “guess” an ES host from a URL, so this parameter must be specified as well, common to all URLs “suffix” param is added at the end of index names.
- class biothings.hub.dataindex.indexer.IndexManager(*args, **kwargs)[source]
Bases:
BaseManager
An example of config dict for this module. {
- “indexer_select”: {
None: “hub.dataindex.indexer.DrugIndexer”, # default “build_config.cold_collection” : “mv.ColdHotVariantIndexer”,
}, “env”: {
- “prod”: {
“host”: “localhost:9200”, “indexer”: {
- “args”: {
“timeout”: 300, “retry_on_timeout”: True, “max_retries”: 10,
}, “bulk”: {
“chunk_size”: 50 “raise_on_exception”: False
}, “concurrency”: 3
}, “index”: [
# for information only, only used in index_info {“index”: “mydrugs_current”, “doc_type”: “drug”}, {“index”: “mygene_current”, “doc_type”: “gene”}
],
}, “dev”: { … }
}
}
- clean_stale_status()[source]
During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.
- cleanup(env=None, keep=3, dryrun=True, **filters)[source]
Delete old indices except for the most recent ones.
Examples
>>> index_cleanup() >>> index_cleanup("production") >>> index_cleanup("local", build_config="demo") >>> index_cleanup("local", keep=0) >>> index_cleanup(_id="<elasticsearch_index>")
- get_indexes_by_name(index_name=None, env_name=None, limit=10)[source]
Accept an index_name and return a list of indexes get from all elasticsearch environments or from specific elasticsearch environment.
If index_name is blank, it will be return all indexes. limit can be used to specify how many indexes should be return.
The list of indexes will be like this: [
- {
“index_name”: “…”, “build_version”: “…”, “count”: 1000, “creation_date”: 1653468868933, “environment”: {
“name”: “env name”, “host”: “localhost:9200”,
}
},
]
- get_pinfo()[source]
Return dict containing information about the current process (used to report in the hub)
- index(indexer_env, build_name, index_name=None, ids=None, **kwargs)[source]
Trigger an index creation to index the collection build_name and create an index named index_name (or build_name if None). Optional list of IDs can be passed to index specific documents.
- update_metadata(indexer_env, index_name, build_name=None, _meta=None)[source]
- Update _meta field of the index mappings, basing on
the _meta value provided, including {}.
the _meta value of the build_name in src_build.
the _meta value of the build with the same name as the index.
Examples
update_metadata(“local”, “mynews_201228_vsdevjd”) update_metadata(“local”, “mynews_201228_vsdevjd”, _meta={}) update_metadata(“local”, “mynews_201228_vsdevjd”, _meta={“author”:”b”}) update_metadata(“local”, “mynews_201228_current”, “mynews_201228_vsdevjd”)
- class biothings.hub.dataindex.indexer.Indexer(build_doc, indexer_env, index_name)[source]
Bases:
object
MongoDB -> Elasticsearch Indexer.
- async index(job_manager, **kwargs)[source]
Build an Elasticsearch index (self.es_index_name) with data from MongoDB collection (self.mongo_collection_name).
“ids” can be passed to selectively index documents.
- “mode” can have the following values:
‘purge’: will delete an index if it exists.
‘resume’: will use an existing index and add missing documents.
‘merge’: will merge data to an existing index.
‘index’ (default): will create a new index.
- class biothings.hub.dataindex.indexer.IndexerCumulativeResult(dict=None, /, **kwargs)[source]
Bases:
_IndexerResult
- class biothings.hub.dataindex.indexer.IndexerStepResult(dict=None, /, **kwargs)[source]
Bases:
_IndexerResult
- class biothings.hub.dataindex.indexer.MainIndexStep(indexer)[source]
Bases:
Step
- method: property(abc.abstractmethod(lambda _: ...)) = 'do_index'
- name: property(abc.abstractmethod(lambda _: ...)) = 'index'
- state
alias of
MainIndexJSR
- class biothings.hub.dataindex.indexer.PostIndexStep(indexer)[source]
Bases:
Step
- method: property(abc.abstractmethod(lambda _: ...)) = 'post_index'
- name: property(abc.abstractmethod(lambda _: ...)) = 'post'
- state
alias of
PostIndexJSR
- class biothings.hub.dataindex.indexer.PreIndexStep(indexer)[source]
Bases:
Step
- method: property(abc.abstractmethod(lambda _: ...)) = 'pre_index'
- name: property(abc.abstractmethod(lambda _: ...)) = 'pre'
- state
alias of
PreIndexJSR
- class biothings.hub.dataindex.indexer.ProcessInfo(indexer, concurrency)[source]
Bases:
object
- class biothings.hub.dataindex.indexer.Step(indexer)[source]
Bases:
ABC
- catelog = {'index': <class 'biothings.hub.dataindex.indexer.MainIndexStep'>, 'post': <class 'biothings.hub.dataindex.indexer.PostIndexStep'>, 'pre': <class 'biothings.hub.dataindex.indexer.PreIndexStep'>}
- method: <property object at 0x7f2042654900>
- name: <property object at 0x7f2042654950>
- state: <property object at 0x7f2042654e00>
biothings.hub.dataindex.indexer_registrar
- class biothings.hub.dataindex.indexer_registrar.IndexJobStateRegistrar(collection, build_name, index_name, **context)[source]
Bases:
object
- class biothings.hub.dataindex.indexer_registrar.MainIndexJSR(collection, build_name, index_name, **context)[source]
Bases:
IndexJobStateRegistrar
- class biothings.hub.dataindex.indexer_registrar.PostIndexJSR(collection, build_name, index_name, **context)[source]
Bases:
IndexJobStateRegistrar
- class biothings.hub.dataindex.indexer_registrar.PreIndexJSR(collection, build_name, index_name, **context)[source]
Bases:
IndexJobStateRegistrar
biothings.hub.dataindex.indexer_schedule
biothings.hub.dataindex.indexer_task
- class biothings.hub.dataindex.indexer_task.ESIndex(client, index_name, **bulk_index_args)[source]
Bases:
ESIndex
- mexists(ids)[source]
Return a list of tuples like [
(_id_0, True), (_id_1, False), (_id_2, True), ….
]
- class biothings.hub.dataindex.indexer_task.IndexingTask(es, mongo, ids, mode=None, logger=None, name='task')[source]
Bases:
object
Index one batch of documents from MongoDB to Elasticsearch. The documents to index are specified by their ids.
- class biothings.hub.dataindex.indexer_task.Mode(value)[source]
Bases:
Enum
An enumeration.
- INDEX = 'index'
- MERGE = 'merge'
- PURGE = 'purge'
- RESUME = 'resume'
biothings.hub.dataindex.snapshooter
- class biothings.hub.dataindex.snapshooter.Bucket(client, bucket, region=None)[source]
Bases:
object
- class biothings.hub.dataindex.snapshooter.CloudStorage(type: str, access_key: str, secret_key: str, region: str = 'us-west-2')[source]
Bases:
object
- access_key: str
- region: str = 'us-west-2'
- secret_key: str
- type: str
- class biothings.hub.dataindex.snapshooter.CumulativeResult(dict=None, /, **kwargs)[source]
Bases:
_SnapshotResult
- class biothings.hub.dataindex.snapshooter.ProcessInfo(env)[source]
Bases:
object
JobManager Process Info. Reported in Biothings Studio.
- class biothings.hub.dataindex.snapshooter.RepositoryConfig(dict=None, /, **kwargs)[source]
Bases:
UserDict
- {
“type”: “s3”, “name”: “s3-$(Y)”, “settings”: {
“bucket”: “<SNAPSHOT_BUCKET_NAME>”, “base_path”: “mynews.info/$(Y)”, # per year
}
}
- property bucket
- format(doc=None)[source]
Template special values in this config.
For example: {
“bucket”: “backup-$(Y)”, “base_path” : “snapshots/%(_meta.build_version)s”
} where “_meta.build_version” value is taken from doc in dot field notation, and the current year replaces “$(Y)”.
- property region
- property repo
- class biothings.hub.dataindex.snapshooter.SnapshotEnv(job_manager, cloud, repository, indexer, **kwargs)[source]
Bases:
object
- class biothings.hub.dataindex.snapshooter.SnapshotManager(index_manager, *args, **kwargs)[source]
Bases:
BaseManager
Hub ES Snapshot Management
Config Ex:
# env.<name>: {
- “cloud”: {
“type”: “aws”, # default, only one supported. “access_key”: <——————>, “secret_key”: <——————>, “region”: “us-west-2”
}, “repository”: {
“name”: “s3-$(Y)”, “type”: “s3”, “settings”: {
“bucket”: “<SNAPSHOT_BUCKET_NAME>”, “base_path”: “mygene.info/$(Y)”, # year
}, “acl”: “private”,
}, “indexer”: {
“name”: “local”, “args”: {
“timeout”: 100, “max_retries”: 5
}
}, “monitor_delay”: 15,
}
- clean_stale_status()[source]
During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.
- cleanup(env=None, keep=3, group_by='build_config', dryrun=True, **filters)[source]
Delete past snapshots and keep only the most recent ones.
Examples
>>> snapshot_cleanup() >>> snapshot_cleanup("s3_outbreak") >>> snapshot_cleanup("s3_outbreak", keep=0)
- poll(state, func)[source]
Search for source in collection ‘col’ with a pending flag list containing ‘state’ and and call ‘func’ for each document found (with doc as only param)
- snapshot(snapshot_env, index, snapshot=None, recreate_repo=False)[source]
Create a snapshot named “snapshot” (or, by default, same name as the index) from “index” according to environment definition (repository, etc…) “env”.
- snapshot_a_build(build_doc)[source]
Create a snapshot basing on the autobuild settings in the build config. If the build config associated with this build has: {
- “autobuild”: {
“type”: “snapshot”, // implied when env is set. env must be set. “env”: “local” // which es env to make the snapshot.
} Attempt to make a snapshot for this build on the specified es env “local”.
biothings.hub.dataindex.snapshot_cleanup
biothings.hub.dataindex.snapshot_registrar
- class biothings.hub.dataindex.snapshot_registrar.MainSnapshotState(col, _id)[source]
Bases:
_TaskState
- func = '_snapshot'
- name = 'snapshot'
- regx = True
- step = 'snapshot'
- class biothings.hub.dataindex.snapshot_registrar.PostSnapshotState(col, _id)[source]
Bases:
_TaskState
- func = 'post_snapshot'
- name = 'post'
- regx = True
- step = 'post-snapshot'