biothings.hub.databuild

biothings.hub.databuild.backend

Backend for storing merged genedoc after building. Support MongoDB, ES, CouchDB

class biothings.hub.databuild.backend.LinkTargetDocMongoBackend(*args, **kwargs)[source]

Bases: TargetDocBackend

This backend type act as a dummy target backend, the data is actually stored in source database. It means only one datasource can be linked to that target backend, as a consequence, when this backend is used in a merge, there’s no actual data merge. This is useful when “merging/indexing” only one datasource, where the merge step is just a duplication of datasource data.

drop()[source]
get_backend_url()[source]

Return backend URL (see create_backend() for formats)

name = 'link'
property target_collection
class biothings.hub.databuild.backend.ShardedTargetDocMongoBackend(*args, **kwargs)[source]

Bases: TargetDocMongoBackend

target_collection is a pymongo collection object.

prepare()[source]

if needed, add extra preparation steps here.

class biothings.hub.databuild.backend.SourceDocBackendBase(build_config, build, master, dump, sources)[source]

Bases: DocBackendBase

get_build_configuration(build_name)[source]
get_src_master_docs()[source]
get_src_metadata()[source]
validate_sources(sources=None)[source]
class biothings.hub.databuild.backend.SourceDocMongoBackend(build_config, build, master, dump, sources)[source]

Bases: SourceDocBackendBase

get_build_configuration(build_name)[source]
get_src_master_docs()[source]
get_src_metadata()[source]

Return source versions which have been previously accessed wit this backend object or all source versions if none were accessed. Accessing means going through __getitem__ (the usual way) and allows to auto-keep track of sources of interest, thus returning versions only for those.

validate_sources(sources=None)[source]
class biothings.hub.databuild.backend.TargetDocBackend(*args, **kwargs)[source]

Bases: DocBackendBase

generate_target_name(build_config_name)[source]
get_backend_url()[source]

Return backend URL (see create_backend() for formats)

post_merge()[source]
set_target_name(target_name, build_name=None)[source]

Create/prepare a target backend, either strictly named “target_name” or named derived from “build_name” (for temporary backends)

property target_name
class biothings.hub.databuild.backend.TargetDocMongoBackend(*args, **kwargs)[source]

Bases: TargetDocBackend, DocMongoBackend

target_collection is a pymongo collection object.

set_target_name(target_name=None, build_name=None)[source]

Create/prepare a target backend, either strictly named “target_name” or named derived from “build_name” (for temporary backends)

biothings.hub.databuild.backend.create_backend(db_col_names, name_only=False, follow_ref=False, **kwargs)[source]

Guess what’s inside ‘db_col_names’ and return the corresponding backend. - It could be a string (will first check for an src_build doc to check

a backend_url field, if nothing there, will lookup a mongo collection in target database)

  • or a tuple(“target|src”,”col_name”)

  • or a (“mongodb://user:pass@host”,”db”,”col_name”) URI.

  • or a (“es_host:port”,”index_name”,”doc_type”)

If name_only is true, just return the name uniquely identifying the collection or index URI connection.

biothings.hub.databuild.backend.generate_folder(root_folder, old_db_col_names, new_db_col_names)[source]
biothings.hub.databuild.backend.merge_src_build_metadata(build_docs)[source]

Merge metadata from src_build documents. A list of docs should be passed, the order is important: the 1st element has the less precedence, the last the most. It means that, when needed, some values from documents on the “left” of the list may be overridden by one on the right. Ex: build_version field Ideally, build docs shouldn’t have any sources in common to prevent any unexpected conflicts…

biothings.hub.databuild.buildconfig

A build config contains autobuild configs and other information.

TODO: not all features already supported in the code

For exmaple: {

“_id”: “mynews”, “name”: “mynews”, “doc_type”: “news”, “sources”: [“mynews”], “root”: [“mynews”], “builder_class”: “biothings.hub.databuild.builder.DataBuilder”, “autobuild”: { … }, “autopublish”: { … }, “build_version”: “%Y%m%d%H%M”

}

Autobuild: - build - diff/snapshot

Autopublish: - release note - publish

Autorelease: - release

class biothings.hub.databuild.buildconfig.AutoBuildConfig(confdict)[source]

Bases: object

Parse automation configurations after each steps following ‘build’.

Example: {

“autobuild”: {

“schedule”: “0 8 * * 7”, // Make a build every 08:00 on Sunday. “type”: “diff”, // Auto create a “diff” w/previous version.

// The other option is “snapshot”.

“env”: “local”, // ES env to create an index and snapshot,

// not required when type above is diff. // Setting the env also implies auto snapshot. // It could be in addition to auto diff. // Also accept (indexer_env, snapshot_env).

}, “autopublish”: {

“type”: “snapshot”, // Auto publish new snapshots for new builds.

// The type field can also be ‘diff’.

“env”: “prod”, // The release environment to publish snapshot.

// Or the release environment to publish diff. // This field is required for either type.

“note”: True // If we should publish with a release note

// TODO not implemented yet

}, “autorelease”: {

“schedule”: “0 0 * * 1”, // Make a release every Monday at midnight

// (if there’s a new published version.)

“type”: “full”, // Only auto install full releases.

// The release type can also be ‘incremental’.

}

} The terms below are used interchangeably.

BUILD_TYPES = ('diff', 'snapshot')
RELEASE_TO_BUILD = {'full': 'snapshot', 'incremental': 'diff'}
RELEASE_TYPES = ('incremental', 'full')
export()[source]
should_diff_new_build()[source]
should_install_new_diff()[source]
should_install_new_release()[source]

Install the latest version regardless of update type/path.

should_install_new_snapshot()[source]
should_publish_new_diff()[source]
should_publish_new_snapshot()[source]
should_snapshot_new_build()[source]
exception biothings.hub.databuild.buildconfig.AutoBuildConfigError[source]

Bases: Exception

biothings.hub.databuild.buildconfig.test()[source]

biothings.hub.databuild.builder

exception biothings.hub.databuild.builder.BuilderException[source]

Bases: Exception

class biothings.hub.databuild.builder.BuilderManager(source_backend_factory=None, target_backend_factory=None, builder_class=None, poll_schedule=None, *args, **kwargs)[source]

Bases: BaseManager

BuilderManager deals with the different builders used to merge datasources. It is connected to src_build() via sync(), where it grabs build information and register builder classes, ready to be instantiate when triggering builds. source_backend_factory can be a optional factory function (like a partial) that builder can call without any argument to generate a SourceBackend. Same for target_backend_factory for the TargetBackend. builder_class if given will be used as the actual Builder class used for the merge and will be passed same arguments as the base DataBuilder. It can also be a list of classes, in which case the default used one is the first, when it’s necessary to define multiple builders.

archive_merge(merge_name)[source]

Delete merged collections and associated metadata

build_config_info()[source]
build_info(id=None, conf_name=None, fields=None, only_archived=False, status=None)[source]

Return build information given an build _id, or all builds if _id is None. “fields” can be passed to select which fields to return or not (mongo notation for projections), if None return everything except:

  • “mapping” (too long)

If id is None, more are filtered:
  • “sources” and some of “build_config”

only_archived=True will return archived merges only status: will return only successful/failed builds. Can be “success” or “failed”

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

clean_temp_collections(build_name, date=None, prefix='')[source]

Delete all target collections created from builder named “build_name” at given date (or any date is none given – carefull…). Date is a string (YYYYMMDD or regex) Common collection name prefix can also be specified if needed.

configure()[source]

Sync with src_build_config and register all build config

create_build_configuration(name, doc_type, sources, roots=None, builder_class=None, params=None, archived=False)[source]
delete_build_configuration(name)[source]
delete_merge(merge_name)[source]

Delete merged collections and associated metadata

delete_merged_data(merge_name)[source]
find_builder_classes()[source]
Find all available build class:
  1. classes passed during manager init (build_class) (that includes the default builder)

  2. all subclassing DataBuilder in:

  1. biothings.hub.databuilder.*

  2. hub.databuilder.* (app-specific)

get_builder(col_name)[source]
get_builder_class(build_config_name)[source]

builder class can be specified different way (in order): 1. within the build_config document (so, per configuration) 2. or defined in the builder manager (so, per manager) 3. or default to DataBuilder

get_query_for_list_merge(only_archived, status=None)[source]
list_merge(build_config=None, only_archived=False)[source]
list_sources(build_name)[source]

List all registered sources used to trigger a build named ‘build_name’

merge(build_name, sources=None, target_name=None, **kwargs)[source]

Trigger a merge for build named ‘build_name’. Optional list of sources can be passed (one single or a list). target_name is the target collection name used to store to merge data. If none, each call will generate a unique target_name.

poll()[source]

Check “whatsnew()” to idenfity builds which could be automatically built, if {“autobuild” : {…}} is part of the build configuration. “autobuild” contains a dict with “schedule” (aiocron/crontab format), so each build configuration can have a different polling schedule.

register_builder(build_name)[source]
resolve_builder_class(klass)[source]

Resolve class/partial definition to (obj,”type”,”mod.class”) where names (class name, module, docstring, etc…) can directly be accessed whether it’s a standard class or not

save_mapping(name, mapping=None, dest='build', mode='mapping')[source]
setup_log()[source]
property source_backend
property target_backend
trigger_merge(doc)[source]
update_build_configuration(name, doc_type, sources, roots=None, builder_class=None, params=None, archived=False)[source]
upsert_build_conf(name, doc_type, sources, roots, builder_class, params, archived)[source]
whatsnew(build_name=None, old=None)[source]

Return datasources which have changed since last time (last time is datasource information from metadata, either from given old src_build doc name, or the latest found if old=None)

class biothings.hub.databuild.builder.DataBuilder(build_name, source_backend, target_backend, log_folder, doc_root_key='root', mappers=None, default_mapper_class=<class 'biothings.hub.databuild.mapper.TransparentMapper'>, sources=None, target_name=None, **kwargs)[source]

Bases: object

Generic data builder.

property build_config
check_ready(force=False)[source]
clean_old_collections()[source]
document_cleaner(src_name, *args, **kwargs)[source]

Return a function taking a document as argument, cleaning the doc as needed, and returning that doc. If no function is needed, None. Note: the returned function must be pickleable, careful with lambdas and closures.

generate_document_query(src_name)[source]
get_build_version()[source]

Generate an arbitrary major build version. Default is using a timestamp (YYMMDD) ‘.’ char isn’t allowed in build version as it’s reserved for minor versions

get_custom_metadata(sources, job_manager)[source]

If more metadata is required, this method can be overridden and should return a dict. Existing metadata dict will be update with that one before storage.

get_mapper_for_source(src_name, init=True)[source]
get_mapping(sources)[source]

Merge mappings from src_master

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

get_predicates()[source]

Return a list of predicates (functions returning true/false, as in math logic) which instructs/dictates if job manager should start a job (process/thread)

get_root_document_sources()[source]
get_stats(sources, job_manager)[source]

Return a dictionnary of metadata for this build. It’s usually app-specific and this method may be overridden as needed. By default though, the total number of documents in the merged collection is stored (key “total”)

Return dictionary will be merged with any existing metadata in src_build collection. This behavior can be changed by setting a special key within metadata dict: {“__REPLACE__” : True} will… replace existing metadata with the one returned here.

“job_manager” is passed in case parallelization is needed. Be aware that this method is already running in a dedicated thread, in order to use job_manager, the following code must be used at the very beginning of its implementation: asyncio.set_event_loop(job_manager.loop)

get_target_name()[source]
init_mapper(mapper_name)[source]
init_state()[source]
keep_archive = 10
property logger
merge(sources=None, target_name=None, force=False, ids=None, steps=('merge', 'post', 'metadata'), job_manager=None, *args, **kwargs)[source]

Merge given sources into a collection named target_name. If sources argument is omitted, all sources defined for this merger will be merged together, according to what is defined insrc_build_config. If target_name is not defined, a unique name will be generated.

Optional parameters:
  • force=True will bypass any safety check

  • ids: list of _ids to merge, specifically. If None, all documents are merged.

  • steps:
    • merge: actual merge step, create merged documents and store them

    • post: once merge, run optional post-merge process

    • metadata: generate and store metadata (depends on merger, usually specifies the amount

      of merged data, source versions, etc…)

merge_order(other_sources)[source]

Optionally we can override this method to customize the order in which sources should be merged. Default as sorted by name.

async merge_source(src_name, batch_size=100000, ids=None, job_manager=None)[source]
async merge_sources(source_names, steps=('merge', 'post'), batch_size=100000, ids=None, job_manager=None)[source]

Merge resources from given source_names or from build config. Identify root document sources from the list to first process them. ids can a be list of documents to be merged in particular.

post_merge(source_names, batch_size, job_manager)[source]
prepare(state=None)[source]
register_status(status, transient=False, init=False, **extra)[source]

Register current build status. A build status is a record in src_build The key used in this dict the target_name. Then, any operation acting on this target_name is registered in a “jobs” list.

resolve_sources(sources)[source]

Source can be a string that may contain regex chars. It’s usefull when you have plenty of sub-collections prefixed with a source name. For instance, given a source named “blah” stored in as many collections as chromosomes, insteand of passing each name as “blah_1”, “blah_2”, etc… “blah_.*” can be specified in build_config. This method resolves potential regexed source name into real, existing collection names

setup(sources=None, target_name=None)[source]
setup_log()[source]
property source_backend
store_metadata(res, sources, job_manager)[source]
property target_backend
unprepare()[source]

reset anything that’s not pickable (so self can be pickled) return what’s been reset as a dict, so self can be restored once pickled

update_src_meta_stats()[source]
class biothings.hub.databuild.builder.LinkDataBuilder(build_name, source_backend, target_backend, *args, **kwargs)[source]

Bases: DataBuilder

LinkDataBuilder creates a link to the original datasource to be merged, without actually copying the data (merged collection remains empty). This builder is only valid when using only one datasource (thus no real merge) is declared in the list of sources to be merged, and is useful to prevent data duplication between the datasource itself and the resulting merged collection.

async merge_source(src_name, *args, **kwargs)[source]
exception biothings.hub.databuild.builder.ResumeException[source]

Bases: Exception

biothings.hub.databuild.builder.fix_batch_duplicates(docs, fail_if_struct_is_different=False)[source]

Remove duplicates from docs based on _id. If _id’s the same but structure is different (not real “duplicates”, but different documents with the same _ids), merge docs all together (dict.update) or raise an error if fail_if_struct_is_different.

biothings.hub.databuild.builder.merger_worker(col_name, dest_name, ids, mapper, cleaner, upsert, merger, batch_num, merger_kwargs=None)[source]
biothings.hub.databuild.builder.pending(build_name, action_name)[source]
biothings.hub.databuild.builder.set_pending_to_build(conf_name=None)[source]

biothings.hub.databuild.differ

class biothings.hub.databuild.differ.BaseDiffer(diff_func, job_manager, log_folder)[source]

Bases: object

diff(old_db_col_names, new_db_col_names, batch_size=100000, steps=('content', 'mapping', 'reduce', 'post'), mode=None, exclude=None)[source]

wrapper over diff_cols() coroutine, return a task

async diff_cols(old_db_col_names, new_db_col_names, batch_size, steps, mode=None, exclude=None)[source]

Compare new with old collections and produce diff files. Root keys can be excluded from comparison with “exclude” parameter

*_db_col_names can be:
  1. a colleciton name (as a string) asusming they are in the target database.

  2. tuple with 2 elements, the first one is then either “source” or “target” to respectively specify src or target database, and the second element is the collection name.

  3. tuple with 3 elements (URI,db,collection), looking like: (“mongodb://user:pass@host”,”dbname”,”collection”), allowing to specify any connection on any server

steps: - ‘content’ will perform diff on actual content.
  • ‘mapping’ will perform diff on ES mappings (if target collection involved)

  • ‘reduce’ will merge diff files, trying to avoid having many small files

  • ‘post’ is a hook to do stuff once everything is merged (override method post_diff_cols)

mode: ‘purge’ will remove any existing files for this comparison while ‘resume’ will happily ignore

existing data and to whatever it’s requested (like running steps=”post” on existing diff folder…)

diff_type = None
get_metadata()[source]
get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

get_predicates()[source]
post_diff_cols(old_db_col_names, new_db_col_names, batch_size, steps, mode=None, exclude=None)[source]

Post diff process hook. This coroutine will in a dedicated thread

register_status(status, transient=False, init=False, **extra)[source]
setup_log(old=None, new=None)[source]
class biothings.hub.databuild.differ.ColdHotDiffer(diff_func, job_manager, log_folder)[source]

Bases: BaseDiffer

async diff_cols(old_db_col_names, new_db_col_names, *args, **kwargs)[source]

Compare new with old collections and produce diff files. Root keys can be excluded from comparison with “exclude” parameter

*_db_col_names can be:
  1. a colleciton name (as a string) asusming they are in the target database.

  2. tuple with 2 elements, the first one is then either “source” or “target” to respectively specify src or target database, and the second element is the collection name.

  3. tuple with 3 elements (URI,db,collection), looking like: (“mongodb://user:pass@host”,”dbname”,”collection”), allowing to specify any connection on any server

steps: - ‘content’ will perform diff on actual content.
  • ‘mapping’ will perform diff on ES mappings (if target collection involved)

  • ‘reduce’ will merge diff files, trying to avoid having many small files

  • ‘post’ is a hook to do stuff once everything is merged (override method post_diff_cols)

mode: ‘purge’ will remove any existing files for this comparison while ‘resume’ will happily ignore

existing data and to whatever it’s requested (like running steps=”post” on existing diff folder…)

get_metadata()[source]
class biothings.hub.databuild.differ.ColdHotJsonDiffer(diff_func=<function diff_docs_jsonpatch>, *args, **kwargs)[source]

Bases: ColdHotJsonDifferBase, JsonDiffer

diff_type = 'coldhot-jsondiff'
class biothings.hub.databuild.differ.ColdHotJsonDifferBase(diff_func, job_manager, log_folder)[source]

Bases: ColdHotDiffer

post_diff_cols(old_db_col_names, new_db_col_names, batch_size, steps, mode=None, exclude=None)[source]

Post-process the diff files by adjusting some jsondiff operation. Here’s the process. For updated documents, some operations might illegal in the context of cold/hot merged collections. Case #1: “remove” op in an update

from a cold/premerge collection, we have that doc:

coldd = {“_id”:1, “A”:”123”, “B”:”456”, “C”:True}

from the previous hot merge we have this doc:

prevd = {“_id”:1, “D”:”789”, “C”:True, “E”:”abc”}

At that point, the final document, fully merged and indexed is:

finald = {“_id”:1, “A”:”123”, “B”:”456”, “C”:True, “D”:”789”, “E”:”abc”}

We can notice field “C” is common to coldd and prevd.

from the new hot merge, we have:

newd = {“_id”:1, “E”,”abc”} # C and D don’t exist anymore

Diffing prevd vs. newd will give jssondiff operations:

[{‘op’: ‘remove’, ‘path’: ‘/C’}, {‘op’: ‘remove’, ‘path’: ‘/D’}]

The problem here is ‘C’ is removed while it was already in cold merge, it should stay because it has come with some resource involved in the premerge (dependent keys, eg. myvariant, “observed” key comes with certain sources) => the jsondiff opetation on “C” must be discarded.

Note: If operation involved a root key (not ‘/a/c’ for instance) and if that key is found in the premerge, then

then remove the operation. (note we just consider root keys, if the deletion occurs deeper in the document, it’s just a legal operation updating innder content)

For deleted documents, the same kind of logic applies Case #2: “delete”

from a cold/premerge collection, we have that doc:

coldd = {“_id”:1, “A”:”123”, “B”:”456”, “C”:True}

from the previous hot merge we have this doc:

prevd = {“_id”:1, “D”:”789”, “C”:True}

fully merged doc:

finald = {“_id”:1, “A”:”123”, “B”:”456”, “C”:True, “D”:”789”}

from the new hot merge, we have:

newd = {} # document doesn’t exist anymore

Diffing prevd vs. newd will mark document with _id == 1 to be deleted The problem is we have data for _id=1 on the premerge collection, if we delete the whole document we’d loose too much information. => the deletion must converted into specific “remove” jsondiff operations, for the root keys found in prevd on not in coldd

(in that case: [{‘op’:’remove’, ‘path’:’/D’}], and not “C” as C is in premerge)

class biothings.hub.databuild.differ.ColdHotSelfContainedJsonDiffer(diff_func=<function diff_docs_jsonpatch>, *args, **kwargs)[source]

Bases: ColdHotJsonDifferBase, SelfContainedJsonDiffer

diff_type = 'coldhot-jsondiff-selfcontained'
class biothings.hub.databuild.differ.DiffReportRendererBase(max_reported_ids=None, max_randomly_picked=None, detailed=False)[source]

Bases: object

save(report, filename)[source]

Save report output (rendered) into filename

class biothings.hub.databuild.differ.DiffReportTxt(max_reported_ids=None, max_randomly_picked=None, detailed=False)[source]

Bases: DiffReportRendererBase

save(report, filename='report.txt')[source]

Save report output (rendered) into filename

exception biothings.hub.databuild.differ.DifferException[source]

Bases: Exception

class biothings.hub.databuild.differ.DifferManager(poll_schedule=None, *args, **kwargs)[source]

Bases: BaseManager

DifferManager deals with the different differ objects used to create and analyze diff between datasources.

build_diff_report(diff_folder, detailed=True, max_reported_ids=None)[source]

Analyze diff files in diff_folder and give a summy of changes. max_reported_ids is the number of IDs contained in the report for each part. detailed will trigger a deeper analysis, takes more time.

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

configure(partial_differs=(<class 'biothings.hub.databuild.differ.JsonDiffer'>, <class 'biothings.hub.databuild.differ.SelfContainedJsonDiffer'>))[source]
diff(diff_type, old, new, batch_size=100000, steps=('content', 'mapping', 'reduce', 'post'), mode=None, exclude=('_timestamp',))[source]

Run a diff to compare old vs. new collections. using differ algorithm diff_type. Results are stored in a diff folder. Steps can be passed to choose what to do: - count: will count root keys in new collections and stores them as statistics. - content: will diff the content between old and new. Results (diff files) format depends on diff_type

diff_info()[source]
diff_report(old_db_col_names, new_db_col_names, report_filename='report.txt', format='txt', detailed=True, max_reported_ids=None, max_randomly_picked=None, mode=None)[source]
get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

get_predicates()[source]
poll(state, func)[source]

Search for source in collection ‘col’ with a pending flag list containing ‘state’ and and call ‘func’ for each document found (with doc as only param)

rebuild_diff_file_list(diff_folder)[source]
register_differ(klass)[source]
setup_log()[source]
trigger_diff(diff_type, doc, **kwargs)[source]

Launch a diff given a src_build document. In order to know the first collection to diff against, get_previous_collection() method is used.

class biothings.hub.databuild.differ.JsonDiffer(diff_func=<function diff_docs_jsonpatch>, *args, **kwargs)[source]

Bases: BaseDiffer

diff_type = 'jsondiff'
class biothings.hub.databuild.differ.SelfContainedJsonDiffer(diff_func=<function diff_docs_jsonpatch>, *args, **kwargs)[source]

Bases: JsonDiffer

diff_type = 'jsondiff-selfcontained'
biothings.hub.databuild.differ.diff_worker_count(id_list, db_col_names, batch_num)[source]
biothings.hub.databuild.differ.diff_worker_new_vs_old(id_list_new, old_db_col_names, new_db_col_names, batch_num, diff_folder, diff_func, exclude=None, selfcontained=False)[source]
biothings.hub.databuild.differ.diff_worker_old_vs_new(id_list_old, new_db_col_names, batch_num, diff_folder)[source]
biothings.hub.databuild.differ.reduce_diffs(diffs, num, diff_folder, done_folder)[source]
biothings.hub.databuild.differ.set_pending_to_diff(col_name)[source]

biothings.hub.databuild.mapper

class biothings.hub.databuild.mapper.BaseMapper(name=None, *args, **kwargs)[source]

Bases: object

Basic mapper used to convert documents. if mapper’s name matches source’s metadata’s mapper, mapper.convert(docs) call will be used to process/convert/whatever passed documents

load()[source]

Do whatever is required to fill mapper with mapping data Can be called multiple time, the first time only will load data

process(docs)[source]

Convert given docs into other docs.

class biothings.hub.databuild.mapper.IDBaseMapper(name=None, convert_func=None, *args, **kwargs)[source]

Bases: BaseMapper

Provide mapping between different sources

‘name’ may match a “mapper” metatdata field (see uploaders). If None, mapper will be applied to any document from a resource without “mapper” argument

need_load()[source]
process(docs, key_to_convert='_id', transparent=True)[source]

Process ‘key_to_convert’ document key using mapping. If transparent and no match, original key will be used (so there’s no change). Else, if no match, document will be discarded (default). Warning: key to be translated must not be None (it’s considered a non-match)

translate(_id, transparent=False)[source]

Return _id translated through mapper, or _id itself if not part of mapper If ‘transparent’ and no match, original _id will be returned

class biothings.hub.databuild.mapper.TransparentMapper(name=None, *args, **kwargs)[source]

Bases: BaseMapper

load(*args, **kwargs)[source]

Do whatever is required to fill mapper with mapping data Can be called multiple time, the first time only will load data

process(docs, *args, **kwargs)[source]

Convert given docs into other docs.

biothings.hub.databuild.prebuilder

class biothings.hub.databuild.prebuilder.BasePreCompiledDataProvider(name)[source]

Bases: object

‘name’ is a way to identify this provider (usually linked to a database name behind the scene)

get_all()[source]

Iterate over all register _ids, return a list of collection names where they can be found

register(_id, col_name)[source]

Tell provider that _id can be found in collection named ‘col_name’

class biothings.hub.databuild.prebuilder.MongoDBPreCompiledDataProvider(db_name, name, connection_params)[source]

Bases: BasePreCompiledDataProvider

‘name’ is a way to identify this provider (usually linked to a database name behind the scene)

get_all(batch_size=100000)[source]

Iterate over all register _ids, return a list of collection names where they can be found

register(_id, col_name)[source]

Tell provider that _id can be found in collection named ‘col_name’

class biothings.hub.databuild.prebuilder.RedisPreCompiledDataProvider(name, connection_params)[source]

Bases: BasePreCompiledDataProvider

‘name’ is a way to identify this provider (usually linked to a database name behind the scene)

get_all()[source]

Iterate over all register _ids, return a list of collection names where they can be found

register(_id, col_name)[source]

Tell provider that _id can be found in collection named ‘col_name’

biothings.hub.databuild.syncer

class biothings.hub.databuild.syncer.BaseSyncer(job_manager, log_folder)[source]

Bases: object

diff_type = None
get_pinfo()[source]
get_predicates()[source]
get_target_backend()[source]
load_metadata(diff_folder)[source]
post_sync_cols(diff_folder, batch_size, mode, force, target_backend, steps)[source]

Post-sync hook, can be implemented in sub-class

register_status(status, transient=False, init=False, **extra)[source]
setup_log(build_name=None)[source]
sync(diff_folder=None, batch_size=10000, mode=None, target_backend=None, steps=('mapping', 'content', 'meta', 'post'), debug=False)[source]

wrapper over sync_cols() coroutine, return a task

async sync_cols(diff_folder, batch_size=10000, mode=None, force=False, target_backend=None, steps=('mapping', 'content', 'meta', 'post'), debug=False)[source]

Sync a collection with diff files located in diff_folder. This folder contains a metadata.json file which describes the different involved collection: “old” is the collection/index to be synced, “new” is the collecion that should be obtained once all diff files are applied (not used, just informative). If target_backend (bt.databbuild.backend.create_backend() notation), then it will replace “old” (that is, the one being synced)

target_backend_type = None
class biothings.hub.databuild.syncer.ESColdHotJsonDiffSelfContainedSyncer(job_manager, log_folder)[source]

Bases: BaseSyncer

diff_type = 'coldhot-jsondiff-selfcontained'
target_backend_type = 'es'
class biothings.hub.databuild.syncer.ESColdHotJsonDiffSyncer(job_manager, log_folder)[source]

Bases: BaseSyncer

diff_type = 'coldhot-jsondiff'
target_backend_type = 'es'
class biothings.hub.databuild.syncer.ESJsonDiffSelfContainedSyncer(job_manager, log_folder)[source]

Bases: BaseSyncer

diff_type = 'jsondiff-selfcontained'
target_backend_type = 'es'
class biothings.hub.databuild.syncer.ESJsonDiffSyncer(job_manager, log_folder)[source]

Bases: BaseSyncer

diff_type = 'jsondiff'
target_backend_type = 'es'
class biothings.hub.databuild.syncer.MongoJsonDiffSelfContainedSyncer(job_manager, log_folder)[source]

Bases: BaseSyncer

diff_type = 'jsondiff-selfcontained'
target_backend_type = 'mongo'
class biothings.hub.databuild.syncer.MongoJsonDiffSyncer(job_manager, log_folder)[source]

Bases: BaseSyncer

diff_type = 'jsondiff'
target_backend_type = 'mongo'
exception biothings.hub.databuild.syncer.SyncerException[source]

Bases: Exception

class biothings.hub.databuild.syncer.SyncerManager(*args, **kwargs)[source]

Bases: BaseManager

SyncerManager deals with the different syncer objects used to synchronize different collections or indices using diff files

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

configure(klasses=None)[source]

Register default syncers (if klasses is None) or given klasses. klasses is a list of class, or a list of partial’ly initialized classes.

register_syncer(klass)[source]
setup_log()[source]
sync(backend_type, old_db_col_names, new_db_col_names, diff_folder=None, batch_size=10000, mode=None, target_backend=None, steps=('mapping', 'content', 'meta', 'post'), debug=False)[source]
class biothings.hub.databuild.syncer.ThrottledESColdHotJsonDiffSelfContainedSyncer(max_sync_workers, *args, **kwargs)[source]

Bases: ThrottlerSyncer, ESColdHotJsonDiffSelfContainedSyncer

class biothings.hub.databuild.syncer.ThrottledESColdHotJsonDiffSyncer(max_sync_workers, *args, **kwargs)[source]

Bases: ThrottlerSyncer, ESColdHotJsonDiffSyncer

class biothings.hub.databuild.syncer.ThrottledESJsonDiffSelfContainedSyncer(max_sync_workers, *args, **kwargs)[source]

Bases: ThrottlerSyncer, ESJsonDiffSelfContainedSyncer

class biothings.hub.databuild.syncer.ThrottledESJsonDiffSyncer(max_sync_workers, *args, **kwargs)[source]

Bases: ThrottlerSyncer, ESJsonDiffSyncer

class biothings.hub.databuild.syncer.ThrottlerSyncer(max_sync_workers, *args, **kwargs)[source]

Bases: BaseSyncer

get_predicates()[source]
biothings.hub.databuild.syncer.sync_es_coldhot_jsondiff_worker(diff_file, es_config, new_db_col_names, batch_size, cnt, force=False, selfcontained=False, metadata=None, debug=False)[source]
biothings.hub.databuild.syncer.sync_es_for_update(diff_file, indexer, diffupdates, batch_size, res, debug)[source]
biothings.hub.databuild.syncer.sync_es_jsondiff_worker(diff_file, es_config, new_db_col_names, batch_size, cnt, force=False, selfcontained=False, metadata=None, debug=False)[source]

Worker to sync data between a new mongo collection and an elasticsearch index

biothings.hub.databuild.syncer.sync_mongo_jsondiff_worker(diff_file, old_db_col_names, new_db_col_names, batch_size, cnt, force=False, selfcontained=False, metadata=None, debug=False)[source]

Worker to sync data between a new and an old mongo collection