biothings.hub.dataload

biothings.hub.dataload.dumper

class biothings.hub.dataload.dumper.APIDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: BaseDumper

Dump data from APIs

This will run API calls in a clean process and write its results in one or more NDJSON documents.

Populate the static methods get_document and get_release in your subclass, along with other necessary bits common to all dumpers.

For details on specific parts, read the docstring for individual methods.

An example subclass implementation can be found in the unii data source for MyGene.info.

property client
create_todump_list(force=False, **kwargs)[source]

This gets called by method dump, to populate self.to_dump

download(remotefile, localfile)[source]

Runs helper function in new process to download data

This is run in a new process by the do_dump coroutine of the parent class. Then this will spawn another process that actually does all the work. This method is mostly for setting up the environment, setting up the the process pool executor to correctly use spawn and using concurrent.futures to simply run tasks in the new process, and periodically check the status of the task.

Explanation: because this is actually running inside a process that forked from a threaded process, the internal state is more or less corrupt/broken, see man 2 fork for details. More discussions are in Slack from some time in 2021 on why it has to be forked and why it is broken.

Caveats: the existing job manager will not know how much memory the actual worker process is using.

static get_document() Generator[Tuple[str, Any], None, None][source]

Get document from API source

Populate this method to yield documents to be stored on disk. Every time you want to save something to disk, do this: >>> yield ‘name_of_file.ndjson’, {‘stuff’: ‘you want saved’} While the type definition says Any is accepted, it has to be JSON serilizable, so basically Python dictionaries/lists with strings and numbers as the most basic elements.

Later on in your uploader, you can treat the files as NDJSON documents, i.e. one JSON document per line.

It is recommended that you only do the minimal necessary processing in this step.

A default HTTP client is not provided so you get the flexibility of choosing your most favorite tool.

This MUST be a static method or it cannot be properly serialized to run in a separate process.

This method is expected to be blocking (synchronous). However, be sure to properly SET TIMEOUTS. You open the resources here in this function so you have to deal with properly checking/closing them. If the invoker forcefully stops this method, it will leave a mess behind, therefore we do not do that.

You can do a 5 second timeout using the popular requests package by doing something like this: >>> import requests >>> r = requests.get(’https://example.org’, timeout=5.0) You can catch the exception or setup retries. If you cannot handle the situation, just raise exceptions or not catch them. APIDumper will handle it properly: documents are only saved when the entire method completes successfully.

static get_release() str[source]

Get the string for the release information.

This is run in the main process and thread so it must return quickly. This must be populated

Returns:

string representing the release.

need_prepare()[source]

check whether some prepare step should executed before running dump

prepare_client()[source]

do initialization to make the client ready to dump files

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

If there is a simple method to check whether remote is better

class biothings.hub.dataload.dumper.BaseDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: object

ARCHIVE = True
AUTO_UPLOAD = True
MAX_PARALLEL_DUMP = None
SCHEDULE = None
SLEEP_BETWEEN_DOWNLOAD = 0.0
SRC_NAME = None
SRC_ROOT_FOLDER = None
SUFFIX_ATTR = 'release'
property client
create_todump_list(force=False, **kwargs)[source]

Fill self.to_dump list with dict(“remote”:remote_path,”local”:local_path) elements. This is the todo list for the dumper. It’s a good place to check whether needs to be downloaded. If ‘force’ is True though, all files will be considered for download

property current_data_folder
property current_release
async do_dump(job_manager=None)[source]
download(remotefile, localfile)[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

async dump(steps=None, force=False, job_manager=None, check_only=False, **kwargs)[source]

Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

get_predicates()[source]

Return a list of predicates (functions returning true/false, as in math logic) which instructs/dictates if job manager should start a job (process/thread)

init_state()[source]
property logger
mark_success(dry_run=True)[source]

Mark the datasource as successful dumped. It’s useful in case the datasource is unstable, and need to be manually downloaded.

need_prepare()[source]

check whether some prepare step should executed before running dump

property new_data_folder

Generate a new data folder path using src_root_folder and specified suffix attribute. Also sync current (aka previous) data folder previously registeted in database. This method typically has to be called in create_todump_list() when the dumper actually knows some information about the resource, like the actual release.

post_download(remotefile, localfile)[source]

Placeholder to add a custom process once a file is downloaded. This is a good place to check file’s integrity. Optional

post_dump(*args, **kwargs)[source]

Placeholder to add a custom process once the whole resource has been dumped. Optional.

post_dump_delete_files()[source]

Delete files after dump

Invoke this method in post_dump to synchronously delete the list of paths stored in self.to_delete, in order.

Non-recursive. If directories need to be removed, build the list such that files residing in the directory are removed first and then the directory. (Hint: see os.walk(dir, topdown=False))

prepare(state={})[source]
prepare_client()[source]

do initialization to make the client ready to dump files

prepare_local_folders(localfile)[source]
prepare_src_dump()[source]
register_status(status, transient=False, dry_run=False, **extra)[source]
release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

setup_log()[source]
property src_doc
property src_dump
to_delete: List[str | bytes | PathLike]

Populate with list of relative path of files to delete

unprepare()[source]

reset anything that’s not pickable (so self can be pickled) return what’s been reset as a dict, so self can be restored once pickled

class biothings.hub.dataload.dumper.DockerContainerDumper(*args, **kwargs)[source]

Bases: BaseDumper

Start a docker container (typically runs on a different server) to prepare the data file on the remote container, and then download this file to the local data source folder. This dumper will do the following steps: - Booting up a container from provided parameters: image, tag, container_name. - The container entrypoint will be override by this long running command: “tail -f /dev/null” - When the container_name and image is provided together, the dumper will try to run the container_name.

If the container with container_name does not exist, the dumper will start a new container from image param, and set its name as container_name.

  • Run the dump_command inside this container. This command MUST block the dumper until the data file is completely prepare.

    It will guarantees that the remote file is ready for downloading.

  • Run the get_version_cmd inside this container - if it provided. Set this command out put as self.release.

  • Download the remote file via Docker API, extract the downloaded .tar file.

  • When the downloading is complete:
    • if keep_container=false: Remove the above container after.

    • if keep_container=true: leave this container running.

  • If there are any execption when dump data, the remote container won’t be removed, it will help us address the problem.

These are supported connection types from the Hub server to the remote Docker host server:
  • ssh: Prerequisite: the SSH Key-Based Authentication is configured

  • unix: Local connection

  • http: Use an insecure HTTP connection over a TCP socket

  • https: Use a secured HTTPS connection using TLS. Prerequisite:
    • The Docker API on the remote server MUST BE secured with TLS

    • A TLS key pair is generated on the Hub server and placed inside the same data plugin folder or the data source folder

All info about Docker client connection MUST BE defined in the config.py file, under the DOCKER_CONFIG key, Ex Optional DOCKER_HOST can be used to override the docker connections in any docker dumper regardless of the value of the src_url For example, you can set DOCKER_HOST=”localhost” for local testing:

DOCKER_CONFIG = {
“CONNECTION_NAME_1”: {

“tls_cert_path”: “/path/to/cert.pem”, “tls_key_path”: “/path/to/key.pem”, “client_url”: “https://remote-docker-host:port

}, “CONNECTION_NAME_2”: {

“client_url”: “ssh://user@remote-docker-host

}, “localhost”: {

“client_url”: “unix://var/run/docker.sock”

}

} DOCKER_HOST = “localhost”

The data_url should match the following format:

docker://CONNECTION_NAME?image=DOCKER_IMAGE&tag=TAG&path=/path/to/remote_file&dump_command=”this is custom command”&container_name=CONTAINER_NAME&keep_container=true&get_version_cmd=”cmd” # NOQA

Supported params:
  • image: (Optional) the Docker image name

  • tag: (Optional) the image tag

  • container_name: (Optional) If this param is provided, the image param will be discard when dumper run.

  • path: (Required) path to the remote file inside the Docker container.

  • dump_command: (Required) This command will be run inside the Docker container in order to create the remote file.

  • keep_container: (Optional) accepted values: true/false, default: false.
    • If keep_container=true, the remote container will be persisted.

    • If keep_container=false, the remote container will be removed in the end of dump step.

  • get_version_cmd: (Optional) The custom command for checking release version of local and remote file. Note that:
    • This command must run-able in both local Hub (for checking local file) and remote container (for checking remote file).

    • “{}” MUST exists in the command, it will be replace by the data file path when dumper runs,

      ex: get_version_cmd=”md5sum {} | awk ‘{ print $1 }’” will be run as: md5sum /path/to/remote_file | awk ‘{ print $1 }’ and /path/to/local_file

Ex:
  • docker://CONNECTION_NAME?image=IMAGE_NAME&tag=IMAGE_TAG&path=/path/to/remote_file(inside the container)&dump_command=”run something with output is written to -O /path/to/remote_file (inside the container)” # NOQA

  • docker://CONNECTION_NAME?container_name=CONTAINER_NAME&path=/path/to/remote_file(inside the container)&dump_command=”run something with output is written to -O /path/to/remote_file (inside the container)&keep_container=true&get_version_cmd=”md5sum {} | awk ‘{ print $1 }’” # NOQA

  • docker://localhost?image=dockstore_dumper&path=/data/dockstore_crawled/data.ndjson&dump_command=”/home/biothings/run-dockstore.sh”&keep_container=1

  • docker://localhost?image=dockstore_dumper&tag=latest&path=/data/dockstore_crawled/data.ndjson&dump_command=”/home/biothings/run-dockstore.sh”&keep_container=True # NOQA

  • docker://localhost?image=praqma/network-multitool&tag=latest&path=/tmp/annotations.zip&dump_command=”/usr/bin/wget https://s3.pgkb.org/data/annotations.zip -O /tmp/annotations.zip”&keep_container=false&get_version_cmd=”md5sum {} | awk ‘{ print $1 }’” # NOQA

  • docker://localhost?container_name=<YOUR CONTAINER NAME>&path=/tmp/annotations.zip&dump_command=”/usr/bin/wget https://s3.pgkb.org/data/annotations.zip -O /tmp/annotations.zip”&keep_container=true&get_version_cmd=”md5sum {} | awk ‘{ print $1 }’” # NOQA

Container metadata: - All above params in the data_url can be pre-config in the Dockerfile by adding LABELs. This config will be used as the fallback of the data_url params:

The dumper will find those params from both data_url and container metadata. If a param does not exist in data_url, dumper will use its value from container metadata (of course if it exist).

For example:

… Dockerfile LABEL “path”=”/tmp/annotations.zip” LABEL “dump_command”=”/usr/bin/wget https://s3.pgkb.org/data/annotations.zip -O /tmp/annotations.zip” LABEL keep_container=”true” LABEL desc=test LABEL container_name=mydocker

CONTAINER_NAME = None
DATA_PATH = None
DOCKER_CLIENT_URL = None
DOCKER_IMAGE = None
DUMP_COMMAND = None
GET_VERSION_CMD = None
KEEP_CONTAINER = False
MAX_PARALLEL_DUMP = 1
ORIGINAL_CONTAINER_STATUS = None
TIMEOUT = 300
async create_todump_list(force=False, job_manager=None, **kwargs)[source]

Create the list of files to dump, called in dump method. This method will execute dump_command to generate the remote file in docker container, so we define this method as async to make it non-blocking.

delete_or_restore_container()[source]

Delete the container if it’s created by the dumper, or restore it to its original status if it’s pre-existing.

download(remote_file, local_file)[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

generate_remote_file()[source]

Execute dump_command to generate the remote file, called in create_todump_list method

get_remote_file()[source]

return the remote file path within the container. In most of cases, dump_command should either generate this file or check if it’s ready if there is another automated pipeline generates this file.

get_remote_lastmodified(remote_file)[source]

get the last modified time of the remote file within the container using stat command

need_prepare()[source]

check whether some prepare step should executed before running dump

post_dump(*args, **kwargs)[source]

Delete container or restore the container status if necessary, called in the dump method after the dump is done (during the “post” step)

prepare_client()[source]

do initialization to make the client ready to dump files

prepare_dumper_params()[source]

Read all docker dumper parameters from either the data plugin manifest or the Docker image or container metadata. Of course, at least one of docker_image or container_name parameters must be defined in the data plugin manifest first. If the parameter is not defined in the data plugin manifest, we will try to read it from the Docker image metadata.

prepare_local_folders(localfile)[source]

prepare the local folder for the localfile, called in download method

prepare_remote_container()[source]

prepare the remote container and set self.container, called in create_todump_list method

release_client()[source]

Release the docker client connection, called in dump method

remote_is_better(remote_file, local_file)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

set_data_path()[source]
set_dump_command()[source]
set_get_version_cmd()[source]
set_keep_container()[source]
set_release()[source]

call the get_version_cmd to get the releases, called in get_todump_list method. if get_version_cmd is not defined, use timestamp as the release.

This is currently a blocking method, assuming get_version_cmd is a quick command. But if necessary, we can make it async in the future.

property source_config
exception biothings.hub.dataload.dumper.DockerContainerException[source]

Bases: Exception

class biothings.hub.dataload.dumper.DummyDumper(*args, **kwargs)[source]

Bases: BaseDumper

DummyDumper will do nothing… (useful for datasources that can’t be downloaded anymore but still need to be integrated, ie. fill src_dump, etc…)

async dump(force=False, job_manager=None, *args, **kwargs)[source]

Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.

prepare_client()[source]

do initialization to make the client ready to dump files

exception biothings.hub.dataload.dumper.DumperException[source]

Bases: Exception

class biothings.hub.dataload.dumper.DumperManager(job_manager, datasource_path='dataload.sources', *args, **kwargs)[source]

Bases: BaseSourceManager

SOURCE_CLASS

alias of BaseDumper

call(src, method_name, *args, **kwargs)[source]

Create a dumper for datasource “src” and call method “method_name” on it, with given arguments. Used to create arbitrary calls on a dumper. “method_name” within dumper definition must a coroutine.

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

async create_and_call(klass, method_name, *args, **kwargs)[source]
async create_and_dump(klass, *args, **kwargs)[source]
create_instance(klass)[source]
dump_all(force=False, **kwargs)[source]

Run all dumpers, except manual ones

dump_info()[source]
dump_src(src, force=False, skip_manual=False, schedule=False, check_only=False, **kwargs)[source]
get_schedule(dumper_name)[source]

Return the corresponding schedule for dumper_name Example result: {

“cron”: “0 9 * * *”, “strdelta”: “15h:20m:33s”,

}

get_source_ids()[source]

Return displayable list of registered source names (not private)

mark_success(src, dry_run=True)[source]
register_classes(klasses)[source]

Register each class in self.register dict. Key will be used to retrieve the source class, create an instance and run method from it. It must be implemented in subclass as each manager may need to access its sources differently,based on different keys.

schedule_all(raise_on_error=False, **kwargs)[source]

Run all dumpers, except manual ones

source_info(source=None)[source]
class biothings.hub.dataload.dumper.FTPDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: BaseDumper

BLOCK_SIZE: int | None = None
CWD_DIR = ''
FTP_HOST = ''
FTP_PASSWD = ''
FTP_TIMEOUT = 600.0
FTP_USER = ''
download(remotefile, localfile)[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

need_prepare()[source]

check whether some prepare step should executed before running dump

prepare_client()[source]

do initialization to make the client ready to dump files

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

‘remotefile’ is relative path from current working dir (CWD_DIR), ‘localfile’ is absolute path

class biothings.hub.dataload.dumper.FilesystemDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: BaseDumper

This dumpers works locally and copy (or move) files to datasource folder

FS_OP = 'cp'
download(remotefile, localfile)[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

need_prepare()[source]

check whether some prepare step should executed before running dump

prepare_client()[source]

Check if ‘cp’ and ‘mv’ executable exists…

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

class biothings.hub.dataload.dumper.GitDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: BaseDumper

Git dumper gets data from a git repo. Repo is stored in SRC_ROOT_FOLDER (without versioning) and then versions/releases are fetched in SRC_ROOT_FOLDER/<release>

DEFAULT_BRANCH = None
GIT_REPO_URL = None
download(remotefile, localfile)[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

async dump(release='HEAD', force=False, job_manager=None, **kwargs)[source]

Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.

need_prepare()[source]

check whether some prepare step should executed before running dump

property new_data_folder

Generate a new data folder path using src_root_folder and specified suffix attribute. Also sync current (aka previous) data folder previously registeted in database. This method typically has to be called in create_todump_list() when the dumper actually knows some information about the resource, like the actual release.

prepare_client()[source]

Check if ‘git’ executable exists

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

class biothings.hub.dataload.dumper.GoogleDriveDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: HTTPDumper

download(remoteurl, localfile)[source]
remoteurl is a google drive link containing a document ID, such as:

It can also be just a document ID

get_document_id(url)[source]
prepare_client()[source]

do initialization to make the client ready to dump files

remote_is_better(remotefile, localfile)[source]

Determine if remote is better

Override if necessary.

class biothings.hub.dataload.dumper.HTTPDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: BaseDumper

Dumper using HTTP protocol and “requests” library

IGNORE_HTTP_CODE = []
RESOLVE_FILENAME = False
VERIFY_CERT = True
download(remoteurl, localfile, headers={})[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

need_prepare()[source]

check whether some prepare step should executed before running dump

prepare_client()[source]

do initialization to make the client ready to dump files

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

Determine if remote is better

Override if necessary.

class biothings.hub.dataload.dumper.LastModifiedBaseDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: BaseDumper

Use SRC_URLS as a list of URLs to download and implement create_todump_list() according to that list. Shoud be used in parallel with a dumper talking the actual underlying protocol

SRC_URLS = []
create_todump_list(force=False)[source]

Fill self.to_dump list with dict(“remote”:remote_path,”local”:local_path) elements. This is the todo list for the dumper. It’s a good place to check whether needs to be downloaded. If ‘force’ is True though, all files will be considered for download

set_release()[source]

Set self.release attribute as the last-modified datetime found in the last SRC_URLs element (so releae is the datetime of the last file to download)

class biothings.hub.dataload.dumper.LastModifiedFTPDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: LastModifiedBaseDumper

SRC_URLS containing a list of URLs pointing to files to download, use FTP’s MDTM command to check whether files should be downloaded The release is generated from the last file’s MDTM in SRC_URLS, and formatted according to RELEASE_FORMAT. See also LastModifiedHTTPDumper, working the same way but for HTTP protocol. Note: this dumper is a wrapper over FTPDumper, one URL will give one FTPDumper instance.

RELEASE_FORMAT = '%Y-%m-%d'
download(urlremotefile, localfile, headers={})[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

get_client_for_url(url)[source]
get_remote_file(url)[source]
prepare_client()[source]

do initialization to make the client ready to dump files

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(urlremotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

set_release()[source]

Set self.release attribute as the last-modified datetime found in the last SRC_URLs element (so releae is the datetime of the last file to download)

class biothings.hub.dataload.dumper.LastModifiedHTTPDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: HTTPDumper, LastModifiedBaseDumper

Given a list of URLs, check Last-Modified header to see whether the file should be downloaded. Sub-class should only have to declare SRC_URLS. Optionally, another field name can be used instead of Last-Modified, but date format must follow RFC 2616. If that header doesn’t exist, it will always download the data (bypass) The release is generated from the last file’s Last-Modified in SRC_URLS, and formatted according to RELEASE_FORMAT.

ETAG = 'ETag'
LAST_MODIFIED = 'Last-Modified'
RELEASE_FORMAT = '%Y-%m-%d'
RESOLVE_FILENAME = True
remote_is_better(remotefile, localfile)[source]

Determine if remote is better

Override if necessary.

set_release()[source]

Set self.release attribute as the last-modified datetime found in the last SRC_URLs element (so releae is the datetime of the last file to download)

class biothings.hub.dataload.dumper.ManualDumper(*args, **kwargs)[source]

Bases: BaseDumper

This dumper will assist user to dump a resource. It will usually expect the files to be downloaded first (sometimes there’s no easy way to automate this process). Once downloaded, a call to dump() will make sure everything is fine in terms of files and metadata

async dump(path, release=None, force=False, job_manager=None, **kwargs)[source]

Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.

property new_data_folder

Generate a new data folder path using src_root_folder and specified suffix attribute. Also sync current (aka previous) data folder previously registeted in database. This method typically has to be called in create_todump_list() when the dumper actually knows some information about the resource, like the actual release.

prepare(state={})[source]
prepare_client()[source]

do initialization to make the client ready to dump files

class biothings.hub.dataload.dumper.WgetDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Bases: BaseDumper

create_todump_list(force=False, **kwargs)[source]

Fill self.to_dump list with dict(“remote”:remote_path,”local”:local_path) elements. This is the todo list for the dumper. It’s a good place to check whether needs to be downloaded. If ‘force’ is True though, all files will be considered for download

download(remoteurl, localfile)[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

need_prepare()[source]

check whether some prepare step should executed before running dump

prepare_client()[source]

Check if ‘wget’ executable exists

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

biothings.hub.dataload.source

class biothings.hub.dataload.source.SourceManager(source_list, dump_manager, upload_manager, data_plugin_manager)[source]

Bases: BaseSourceManager

Helper class to get information about a datasource, whether it has a dumper and/or uploaders associated.

find_sources(paths)[source]
get_source(name, debug=False)[source]
get_sources(id=None, debug=False, detailed=False)[source]
reload()[source]
reset(name, key='upload', subkey=None)[source]

Reset, ie. delete, internal data (src_dump document) for given source name, key subkey. This method is useful to clean outdated information in Hub’s internal database.

Ex: key=upload, name=mysource, subkey=mysubsource, will delete entry in corresponding

src_dump doc (_id=mysource), under key “upload”, for sub-source named “mysubsource”

“key” can be either ‘download’, ‘upload’ or ‘inspect’. Because there’s no such notion of subkey for dumpers (ie. ‘download’, subkey is optional.

save_mapping(name, mapping=None, dest='master', mode='mapping')[source]
set_mapping_src_meta(subsrc, mini)[source]
sumup_source(src, detailed=False)[source]

Return minimal info about src

biothings.hub.dataload.storage

biothings.hub.dataload.sync

Deprecated. This module is not used any more.

class biothings.hub.dataload.sync.MongoSync[source]

Bases: object

add_update(source, merge_collection, ids)[source]
delete(merge_collection, field, ids)[source]
main(diff_filepath, merge_collection, field)[source]

biothings.hub.dataload.uploader

class biothings.hub.dataload.uploader.BaseSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Bases: object

Default datasource uploader. Database storage can be done in batch or line by line. Duplicated records aren’t not allowed

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

check_ready(force=False)[source]
clean_archived_collections()[source]
config = <ConfigurationWrapper over <module 'config' from '/home/docs/checkouts/readthedocs.org/user_builds/biothingsapi/checkouts/0.12.x/biothings/hub/default_config.py'>>
classmethod create(db_conn_info, *args, **kwargs)[source]

Factory-like method, just return an instance of this uploader (used by SourceManager, may be overridden in sub-class to generate more than one instance per class, like a true factory. This is usefull when a resource is splitted in different collection but the data structure doesn’t change (it’s really just data splitted accros multiple collections, usually for parallelization purposes). Instead of having actual class for each split collection, factory will generate them on-the-fly.

property fullname
generate_doc_src_master()[source]
get_current_and_new_master()[source]
classmethod get_mapping()[source]

Return ES mapping

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

get_predicates()[source]

Return a list of predicates (functions returning true/false, as in math logic) which instructs/dictates if job manager should start a job (process/thread)

init_state()[source]
keep_archive = 10
async load(steps=('data', 'post', 'master', 'clean'), force=False, batch_size=10000, job_manager=None, **kwargs)[source]

Main resource load process, reads data from doc_c using chunk sized as batch_size. steps defines the different processes used to laod the resource: - “data” : will store actual data into single collections - “post” : will perform post data load operations - “master” : will register the master document in src_master

load_data(data_path)[source]

Parse data from data_path and return structure ready to be inserted in database In general, data_path is a folder path. But in parallel mode (use parallelizer option), data_path is a file path :param data_path: It can be a folder path or a file path :return: structure ready to be inserted in database

main_source = None
make_temp_collection()[source]

Create a temp collection for dataloading, e.g., entrez_geneinfo_INEMO.

name = None
post_update_data(steps, force, batch_size, job_manager, **kwargs)[source]

Override as needed to perform operations after data has been uploaded

prepare(state={})[source]

Sync uploader information with database (or given state dict)

prepare_src_dump()[source]

Sync with src_dump collection, collection information (src_doc) Return src_dump collection

regex_name = None
register_status(status, subkey='upload', **extra)[source]

Register step status, ie. status for a sub-resource

save_doc_src_master(_doc)[source]
setup_log()[source]
storage_class

alias of BasicStorage

switch_collection()[source]

after a successful loading, rename temp_collection to regular collection name, and renaming existing collection to a temp name for archiving purpose.

unprepare()[source]

reset anything that’s not pickable (so self can be pickled) return what’s been reset as a dict, so self can be restored once pickled

async update_data(batch_size, job_manager)[source]

Iterate over load_data() to pull data and store it

update_master()[source]
class biothings.hub.dataload.uploader.DummySourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Bases: BaseSourceUploader

Dummy uploader, won’t upload any data, assuming data is already there but make sure every other bit of information is there for the overall process (usefull when online data isn’t available anymore)

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

check_ready(force=False)[source]
prepare_src_dump()[source]

Sync with src_dump collection, collection information (src_doc) Return src_dump collection

async update_data(batch_size, job_manager=None, release=None)[source]

Iterate over load_data() to pull data and store it

class biothings.hub.dataload.uploader.IgnoreDuplicatedSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Bases: BaseSourceUploader

Same as default uploader, but will store records and ignore if any duplicated error occuring (use with caution…). Storage is done using batch and unordered bulk operations.

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

storage_class

alias of IgnoreDuplicatedStorage

class biothings.hub.dataload.uploader.MergerSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Bases: BaseSourceUploader

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

storage_class

alias of MergerStorage

class biothings.hub.dataload.uploader.NoBatchIgnoreDuplicatedSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Bases: BaseSourceUploader

Same as default uploader, but will store records and ignore if any duplicated error occuring (use with caution…). Storage is done line by line (slow, not using a batch) but preserve order of data in input file.

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

storage_class

alias of NoBatchIgnoreDuplicatedStorage

class biothings.hub.dataload.uploader.NoDataSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Bases: BaseSourceUploader

This uploader won’t upload any data and won’t even assume there’s actual data (different from DummySourceUploader on this point). It’s usefull for instance when mapping need to be stored (get_mapping()) but data doesn’t comes from an actual upload (ie. generated)

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

storage_class

alias of NoStorage

async update_data(batch_size, job_manager=None)[source]

Iterate over load_data() to pull data and store it

class biothings.hub.dataload.uploader.ParallelizedSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Bases: BaseSourceUploader

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

jobs()[source]

Return list of (*arguments) passed to self.load_data, in order. for each parallelized jobs. Ex: [(x,1),(y,2),(z,3)] If only one argument is required, it still must be passed as a 1-element tuple

async update_data(batch_size, job_manager=None)[source]

Iterate over load_data() to pull data and store it

exception biothings.hub.dataload.uploader.ResourceError[source]

Bases: Exception

exception biothings.hub.dataload.uploader.ResourceNotReady[source]

Bases: Exception

class biothings.hub.dataload.uploader.UploaderManager(poll_schedule=None, *args, **kwargs)[source]

Bases: BaseSourceManager

After registering datasources, manager will orchestrate source uploading.

SOURCE_CLASS

alias of BaseSourceUploader

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

async create_and_load(klass, *args, **kwargs)[source]
async create_and_update_master(klass, dry=False)[source]
create_instance(klass)[source]
filter_class(klass)[source]

Gives opportunity for subclass to check given class and decide to keep it or not in the discovery process. Returning None means “skip it”.

get_source_ids()[source]

Return displayable list of registered source names (not private)

poll(state, func)[source]

Search for source in collection ‘col’ with a pending flag list containing ‘state’ and and call ‘func’ for each document found (with doc as only param)

register_classes(klasses)[source]

Register each class in self.register dict. Key will be used to retrieve the source class, create an instance and run method from it. It must be implemented in subclass as each manager may need to access its sources differently,based on different keys.

source_info(source=None)[source]
update_source_meta(src, dry=False)[source]

Trigger update for registered resource named ‘src’.

upload_all(raise_on_error=False, **kwargs)[source]

Trigger upload processes for all registered resources. **kwargs are passed to upload_src() method

upload_info()[source]
upload_src(src, *args, **kwargs)[source]

Trigger upload for registered resource named ‘src’. Other args are passed to uploader’s load() method

biothings.hub.dataload.uploader.set_pending_to_upload(src_name)[source]

biothings.hub.dataload.validator