Hub component

The purpose of the BioThings hub component is to allow you to easily automate the parsing and uploading of your data to an Elasticsearch backend.

biothings.hub

class biothings.hub.HubCommands[source]
class biothings.hub.HubSSHServer[source]
begin_auth(username)[source]

Authentication has been requested by the client

This method will be called when authentication is attempted for the specified user. Applications should use this method to prepare whatever state they need to complete the authentication, such as loading in the set of authorized keys for that user. If no authentication is required for this user, this method should return False to cause the authentication to immediately succeed. Otherwise, it should return True to indicate that authentication should proceed.

Parameters

username (str) – The name of the user being authenticated

Returns

A bool indicating whether authentication is required

connection_lost(exc)[source]

Called when a connection is lost or closed

This method is called when a connection is closed. If the connection is shut down cleanly, exc will be None. Otherwise, it will be an exception explaining the reason for the disconnect.

connection_made(connection)[source]

Called when a connection is made

This method is called when a new TCP connection is accepted. The connection parameter should be stored if needed for later use.

password_auth_supported()[source]

Return whether or not password authentication is supported

This method should return True if password authentication is supported. Applications wishing to support it must have this method return True and implement validate_password() to return whether or not the password provided by the client is valid for the user being authenticated.

By default, this method returns False indicating that password authentication is not supported.

Returns

A bool indicating if password authentication is supported or not

session_requested()[source]

Handle an incoming session request

This method is called when a session open request is received from the client, indicating it wishes to open a channel to be used for running a shell, executing a command, or connecting to a subsystem. If the application wishes to accept the session, it must override this method to return either an SSHServerSession object to use to process the data received on the channel or a tuple consisting of an SSHServerChannel object created with create_server_channel and an SSHServerSession, if the application wishes to pass non-default arguments when creating the channel.

If blocking operations need to be performed before the session can be created, a coroutine which returns an SSHServerSession object can be returned instead of the session iself. This can be either returned directly or as a part of a tuple with an SSHServerChannel object.

To reject this request, this method should return False to send back a “Session refused” response or raise a ChannelOpenError exception with the reason for the failure.

The details of what type of session the client wants to start will be delivered to methods on the SSHServerSession object which is returned, along with other information such as environment variables, terminal type, size, and modes.

By default, all session requests are rejected.

Returns

One of the following:

  • An SSHServerSession object or a coroutine which returns an SSHServerSession

  • A tuple consisting of an SSHServerChannel and the above

  • A callable or coroutine handler function which takes AsyncSSH stream objects for stdin, stdout, and stderr as arguments

  • A tuple consisting of an SSHServerChannel and the above

  • False to refuse the request

Raises

ChannelOpenError if the session shouldn’t be accepted

validate_password(username, password)[source]

Return whether password is valid for this user

This method should return True if the specified password is a valid password for the user being authenticated. It must be overridden by applications wishing to support password authentication.

If the password provided is valid but expired, this method may raise PasswordChangeRequired to request that the client provide a new password before authentication is allowed to complete. In this case, the application must override change_password() to handle the password change request.

This method may be called multiple times with different passwords provided by the client. Applications may wish to limit the number of attempts which are allowed. This can be done by having password_auth_supported() begin returning False after the maximum number of attempts is exceeded.

If blocking operations need to be performed to determine the validity of the password, this method may be defined as a coroutine.

By default, this method returns False for all passwords.

Parameters
  • username (str) – The user being authenticated

  • password (str) – The password sent by the client

Returns

A bool indicating if the specified password is valid for the user being authenticated

Raises

PasswordChangeRequired if the password provided is expired and needs to be changed

class biothings.hub.HubSSHServerSession(name, shell)[source]
break_received(msec)[source]

The client has sent a break

This method is called when the client requests that the server perform a break operation on the terminal. If the break is performed, this method should return True. Otherwise, it should return False.

By default, this method returns False indicating that no break was performed.

Parameters

msec (int) – The duration of the break in milliseconds

Returns

A bool to indicate if the break operation was performed or not

connection_made(chan)[source]

Called when a channel is opened successfully

This method is called when a channel is opened successfully. The channel parameter should be stored if needed for later use.

Parameters

chan (SSHClientChannel) – The channel which was successfully opened.

data_received(data, datatype)[source]

Called when data is received on the channel

This method is called when data is received on the channel. If an encoding was specified when the channel was created, the data will be delivered as a string after decoding with the requested encoding. Otherwise, the data will be delivered as bytes.

Parameters
  • data (str or bytes) – The data received on the channel

  • datatype – The extended data type of the data, from extended data types

eof_received()[source]

Called when EOF is received on the channel

This method is called when an end-of-file indication is received on the channel, after which no more data will be received. If this method returns True, the channel remains half open and data may still be sent. Otherwise, the channel is automatically closed after this method returns. This is the default behavior for classes derived directly from SSHSession, but not when using the higher-level streams API. Because input is buffered in that case, streaming sessions enable half-open channels to allow applications to respond to input read after an end-of-file indication is received.

exec_requested(command)[source]

The client has requested to execute a command

This method should be implemented by the application to perform whatever processing is required when a client makes a request to execute a command. It should return True to accept the request, or False to reject it.

If the application returns True, the session_started() method will be called once the channel is fully open. No output should be sent until this method is called.

By default this method returns False to reject all requests.

Parameters

command (str) – The command the client has requested to execute

Returns

A bool indicating if the exec request was allowed or not

session_started()[source]

Called when the session is started

This method is called when a session has started up. For client and server sessions, this will be called once a shell, exec, or subsystem request has been successfully completed. For TCP and UNIX domain socket sessions, it will be called immediately after the connection is opened.

shell_requested()[source]

The client has requested a shell

This method should be implemented by the application to perform whatever processing is required when a client makes a request to open an interactive shell. It should return True to accept the request, or False to reject it.

If the application returns True, the session_started() method will be called once the channel is fully open. No output should be sent until this method is called.

By default this method returns False to reject all requests.

Returns

A bool indicating if the shell request was allowed or not

biothings.hub.get_schedule(loop)[source]

try to render job in a human-readable way…

biothings.hub.status(managers)[source]

Return a global hub status (number or sources, documents, etc…) according to available managers

api

class biothings.hub.api.EndpointDefinition[source]

manager

exception biothings.hub.api.manager.APIManagerException[source]

handlers

base
class biothings.hub.api.handlers.base.BaseHandler(application, request, **kwargs)[source]
initialize(managers, **kwargs)[source]

Hook for subclass initialization. Called for each request.

A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().

Example:

class ProfileHandler(RequestHandler):
    def initialize(self, database):
        self.database = database

    def get(self, username):
        ...

app = Application([
    (r'/user/(.*)', ProfileHandler, dict(database=database)),
    ])
class biothings.hub.api.handlers.base.DefaultHandler(application, request, **kwargs)[source]
set_default_headers()[source]

Override this to set HTTP headers at the beginning of the request.

For example, this is the place to set a custom Server header. Note that setting such headers in the normal flow of request processing may not do what you want, since headers may be reset during error handling.

write(result)[source]

Writes the given chunk to the output buffer.

To write the output to the network, use the flush() method below.

If the given chunk is a dictionary, we write it as JSON and set the Content-Type of the response to be application/json. (if you want to send JSON as a different Content-Type, call set_header after calling write()).

Note that lists are not converted to JSON because of a potential cross-site security vulnerability. All JSON output should be wrapped in a dictionary. More details at http://haacked.com/archive/2009/06/25/json-hijacking.aspx/ and https://github.com/facebook/tornado/issues/1009

write_error(status_code, **kwargs)[source]

Override to implement custom error pages.

write_error may call write, render, set_header, etc to produce output as usual.

If this error was caused by an uncaught exception (including HTTPError), an exc_info triple will be available as kwargs["exc_info"]. Note that this exception may not be the “current” exception for purposes of methods like sys.exc_info() or traceback.format_exc.

class biothings.hub.api.handlers.base.GenericHandler(application, request, **kwargs)[source]
initialize(shell, **kwargs)[source]

Hook for subclass initialization. Called for each request.

A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().

Example:

class ProfileHandler(RequestHandler):
    def initialize(self, database):
        self.database = database

    def get(self, username):
        ...

app = Application([
    (r'/user/(.*)', ProfileHandler, dict(database=database)),
    ])
class biothings.hub.api.handlers.base.RootHandler(application, request, **kwargs)[source]
initialize(features, hub_name=None, **kwargs)[source]

Hook for subclass initialization. Called for each request.

A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().

Example:

class ProfileHandler(RequestHandler):
    def initialize(self, database):
        self.database = database

    def get(self, username):
        ...

app = Application([
    (r'/user/(.*)', ProfileHandler, dict(database=database)),
    ])
shell
class biothings.hub.api.handlers.shell.ShellHandler(application, request, **kwargs)[source]
initialize(shell, shellog, **kwargs)[source]

Hook for subclass initialization. Called for each request.

A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().

Example:

class ProfileHandler(RequestHandler):
    def initialize(self, database):
        self.database = database

    def get(self, username):
        ...

app = Application([
    (r'/user/(.*)', ProfileHandler, dict(database=database)),
    ])
upload
class biothings.hub.api.handlers.upload.UploadHandler(application, request, **kwargs)[source]
data_received(chunk)[source]

Implement this method to handle streamed request data.

Requires the .stream_request_body decorator.

initialize(upload_root, **kwargs)[source]

Hook for subclass initialization. Called for each request.

A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().

Example:

class ProfileHandler(RequestHandler):
    def initialize(self, database):
        self.database = database

    def get(self, username):
        ...

app = Application([
    (r'/user/(.*)', ProfileHandler, dict(database=database)),
    ])
prepare()[source]

Called at the beginning of a request before get/post/etc.

Override this method to perform common initialization regardless of the request method.

Asynchronous support: Decorate this method with .gen.coroutine or use async def to make it asynchronous (the asynchronous decorator cannot be used on prepare). If this method returns a .Future execution will not proceed until the .Future is done.

New in version 3.1: Asynchronous support.

ws
class biothings.hub.api.handlers.ws.HubDBListener[source]

Get events from Hub DB and propagate them through the websocket instance

class biothings.hub.api.handlers.ws.WebSocketConnection(session, listeners)[source]

Listen to Hub DB through a listener object, and publish events to any client connected

SockJSConnection.__init__() takes only a session as argument, and there’s no way to pass custom settings. In order to use that class, we need to use partial to partially init the instance with ‘listeners’ and let the rest use the ‘session’

parameter:

pconn = partial(WebSocketConnection,listeners=listeners) ws_router = sockjs.tornado.SockJSRouter(pconn,”/path”)

on_close()[source]

Default on_close handler.

on_message(message)[source]

Default on_message handler. Must be overridden in your application

on_open(info)[source]

Default on_open() handler.

Override when you need to do some initialization or request validation. If you return False, connection will be rejected.

You can also throw Tornado HTTPError to close connection.

request

ConnectionInfo object which contains caller IP address, query string parameters and cookies associated with this request (if any).

dataload

dumper

class biothings.hub.dataload.dumper.DummyDumper(*args, **kwargs)[source]

DummyDumper will do nothing… (useful for datasources that can’t be downloaded anymore but still need to be integrated, ie. fill src_dump, etc…)

dump(force=False, job_manager=None, *args, **kwargs)[source]

Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.

prepare_client()[source]

do initialization to make the client ready to dump files

exception biothings.hub.dataload.dumper.DumperException[source]
class biothings.hub.dataload.dumper.DumperManager(job_manager, datasource_path='dataload.sources', *args, **kwargs)[source]
call(src, method_name, *args, **kwargs)[source]

Create a dumper for datasource “src” and call method “method_name” on it, with given arguments. Used to create arbitrary calls on a dumper. “method_name” within dumper definition must a coroutine.

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

dump_all(force=False, **kwargs)[source]

Run all dumpers, except manual ones

get_source_ids()[source]

Return displayable list of registered source names (not private)

register_classes(klasses)[source]

Register each class in self.register dict. Key will be used to retrieve the source class, create an instance and run method from it. It must be implemented in subclass as each manager may need to access its sources differently,based on different keys.

schedule_all(raise_on_error=False, **kwargs)[source]

Run all dumpers, except manual ones

class biothings.hub.dataload.dumper.FilesystemDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

This dumpers works locally and copy (or move) files to datasource folder

download(remotefile, localfile)[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

need_prepare()[source]

check whether some prepare step should executed before running dump

prepare_client()[source]

Check if ‘cp’ and ‘mv’ executable exists…

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

class biothings.hub.dataload.dumper.GitDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Git dumper gets data from a git repo. Repo is stored in SRC_ROOT_FOLDER (without versioning) and then versions/releases are fetched in SRC_ROOT_FOLDER/<release>

download(remotefile, localfile)[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

dump(release='HEAD', force=False, job_manager=None, **kwargs)[source]

Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.

need_prepare()[source]

check whether some prepare step should executed before running dump

property new_data_folder

Generate a new data folder path using src_root_folder and specified suffix attribute. Also sync current (aka previous) data folder previously registeted in database. This method typically has to be called in create_todump_list() when the dumper actually knows some information about the resource, like the actual release.

prepare_client()[source]

Check if ‘git’ executable exists

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

class biothings.hub.dataload.dumper.GoogleDriveDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]
download(remoteurl, localfile)[source]
remoteurl is a google drive link containing a document ID, such as:

It can also be just a document ID

prepare_client()[source]

do initialization to make the client ready to dump files

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

class biothings.hub.dataload.dumper.HTTPDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Dumper using HTTP protocol and “requests” library

download(remoteurl, localfile, headers={})[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

need_prepare()[source]

check whether some prepare step should executed before running dump

prepare_client()[source]

do initialization to make the client ready to dump files

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

class biothings.hub.dataload.dumper.LastModifiedBaseDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Use SRC_URLS as a list of URLs to download and implement create_todump_list() according to that list. Shoud be used in parallel with a dumper talking the actual underlying protocol

create_todump_list(force=False)[source]

Fill self.to_dump list with dict(“remote”:remote_path,”local”:local_path) elements. This is the todo list for the dumper. It’s a good place to check whether needs to be downloaded. If ‘force’ is True though, all files will be considered for download

set_release()[source]

Set self.release attribute as the last-modified datetime found in the last SRC_URLs element (so releae is the datetime of the last file to download)

class biothings.hub.dataload.dumper.LastModifiedFTPDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

SRC_URLS containing a list of URLs pointing to files to download, use FTP’s MDTM command to check whether files should be downloaded The release is generated from the last file’s MDTM in SRC_URLS, and formatted according to RELEASE_FORMAT. See also LastModifiedHTTPDumper, working the same way but for HTTP protocol. Note: this dumper is a wrapper over FTPDumper, one URL will give one FTPDumper instance.

download(urlremotefile, localfile, headers={})[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

prepare_client()[source]

do initialization to make the client ready to dump files

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(urlremotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

set_release()[source]

Set self.release attribute as the last-modified datetime found in the last SRC_URLs element (so releae is the datetime of the last file to download)

class biothings.hub.dataload.dumper.LastModifiedHTTPDumper(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]

Given a list of URLs, check Last-Modified header to see whether the file should be downloaded. Sub-class should only have to declare SRC_URLS. Optionally, another field name can be used instead of Last-Modified, but date format must follow RFC 2616. If that header doesn’t exist, it will always download the data (bypass) The release is generated from the last file’s Last-Modified in SRC_URLS, and formatted according to RELEASE_FORMAT.

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

set_release()[source]

Set self.release attribute as the last-modified datetime found in the last SRC_URLs element (so releae is the datetime of the last file to download)

class biothings.hub.dataload.dumper.ManualDumper(*args, **kwargs)[source]

This dumper will assist user to dump a resource. It will usually expect the files to be downloaded first (sometimes there’s no easy way to automate this process). Once downloaded, a call to dump() will make sure everything is fine in terms of files and metadata

dump(path, release=None, force=False, job_manager=None, **kwargs)[source]

Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.

property new_data_folder

Generate a new data folder path using src_root_folder and specified suffix attribute. Also sync current (aka previous) data folder previously registeted in database. This method typically has to be called in create_todump_list() when the dumper actually knows some information about the resource, like the actual release.

prepare_client()[source]

do initialization to make the client ready to dump files

uploader

class biothings.hub.dataload.uploader.BaseSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Default datasource uploader. Database storage can be done in batch or line by line. Duplicated records aren’t not allowed

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

classmethod create(db_conn_info, *args, **kwargs)[source]

Factory-like method, just return an instance of this uploader (used by SourceManager, may be overridden in sub-class to generate more than one instance per class, like a true factory. This is usefull when a resource is splitted in different collection but the data structure doesn’t change (it’s really just data splitted accros multiple collections, usually for parallelization purposes). Instead of having actual class for each split collection, factory will generate them on-the-fly.

classmethod get_mapping()[source]

Return ES mapping

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

get_predicates()[source]

Return a list of predicates (functions returning true/false, as in math logic) which instructs/dictates if job manager should start a job (process/thread)

load(steps=['data', 'post', 'master', 'clean'], force=False, batch_size=10000, job_manager=None, **kwargs)[source]

Main resource load process, reads data from doc_c using chunk sized as batch_size. steps defines the different processes used to laod the resource: - “data” : will store actual data into single collections - “post” : will perform post data load operations - “master” : will register the master document in src_master

load_data(data_folder)[source]

Parse data inside data_folder and return structure ready to be inserted in database

make_temp_collection()[source]

Create a temp collection for dataloading, e.g., entrez_geneinfo_INEMO.

post_update_data(steps, force, batch_size, job_manager, **kwargs)[source]

Override as needed to perform operations after data has been uploaded

prepare(state={})[source]

Sync uploader information with database (or given state dict)

prepare_src_dump()[source]

Sync with src_dump collection, collection information (src_doc) Return src_dump collection

register_status(status, subkey='upload', **extra)[source]

Register step status, ie. status for a sub-resource

switch_collection()[source]

after a successful loading, rename temp_collection to regular collection name, and renaming existing collection to a temp name for archiving purpose.

unprepare()[source]

reset anything that’s not pickable (so self can be pickled) return what’s been reset as a dict, so self can be restored once pickled

update_data(batch_size, job_manager)[source]

Iterate over load_data() to pull data and store it

class biothings.hub.dataload.uploader.DummySourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Dummy uploader, won’t upload any data, assuming data is already there but make sure every other bit of information is there for the overall process (usefull when online data isn’t available anymore)

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

prepare_src_dump()[source]

Sync with src_dump collection, collection information (src_doc) Return src_dump collection

update_data(batch_size, job_manager=None, release=None)[source]

Iterate over load_data() to pull data and store it

class biothings.hub.dataload.uploader.IgnoreDuplicatedSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Same as default uploader, but will store records and ignore if any duplicated error occuring (use with caution…). Storage is done using batch and unordered bulk operations.

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

class biothings.hub.dataload.uploader.MergerSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

storage_class

alias of biothings.hub.dataload.storage.MergerStorage

class biothings.hub.dataload.uploader.NoBatchIgnoreDuplicatedSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

Same as default uploader, but will store records and ignore if any duplicated error occuring (use with caution…). Storage is done line by line (slow, not using a batch) but preserve order of data in input file.

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

storage_class

alias of biothings.hub.dataload.storage.NoBatchIgnoreDuplicatedStorage

class biothings.hub.dataload.uploader.NoDataSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

This uploader won’t upload any data and won’t even assume there’s actual data (different from DummySourceUploader on this point). It’s usefull for instance when mapping need to be stored (get_mapping()) but data doesn’t comes from an actual upload (ie. generated)

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

storage_class

alias of biothings.hub.dataload.storage.NoStorage

update_data(batch_size, job_manager=None)[source]

Iterate over load_data() to pull data and store it

class biothings.hub.dataload.uploader.ParallelizedSourceUploader(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.

jobs()[source]

Return list of (*arguments) passed to self.load_data, in order. for each parallelized jobs. Ex: [(x,1),(y,2),(z,3)] If only one argument is required, it still must be passed as a 1-element tuple

update_data(batch_size, job_manager=None)[source]

Iterate over load_data() to pull data and store it

exception biothings.hub.dataload.uploader.ResourceError[source]
exception biothings.hub.dataload.uploader.ResourceNotReady[source]
class biothings.hub.dataload.uploader.UploaderManager(poll_schedule=None, *args, **kwargs)[source]

After registering datasources, manager will orchestrate source uploading.

SOURCE_CLASS

alias of BaseSourceUploader

clean_stale_status()[source]

During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.

filter_class(klass)[source]

Gives opportunity for subclass to check given class and decide to keep it or not in the discovery process. Returning None means “skip it”.

get_source_ids()[source]

Return displayable list of registered source names (not private)

poll(state, func)[source]

Search for source in collection ‘col’ with a pending flag list containing ‘state’ and and call ‘func’ for each document found (with doc as only param)

register_classes(klasses)[source]

Register each class in self.register dict. Key will be used to retrieve the source class, create an instance and run method from it. It must be implemented in subclass as each manager may need to access its sources differently,based on different keys.

upload_all(raise_on_error=False, **kwargs)[source]

Trigger upload processes for all registered resources. **kwargs are passed to upload_src() method

upload_src(src, *args, **kwargs)[source]

Trigger upload for registered resource named ‘src’. Other args are passed to uploader’s load() method

biothings.hub.dataload.uploader.upload_worker(name, storage_class, loaddata_func, col_name, batch_size, batch_num, *args)[source]

Pickable job launcher, typically running from multiprocessing. storage_class will instanciate with col_name, the destination collection name. loaddata_func is the parsing/loading function, called with *args.

storage

class biothings.hub.dataload.storage.MergerStorage(db, dest_col_name, logger=<module 'logging' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/logging/__init__.py'>)[source]

This storage will try to merge documents when finding duplicated errors. It’s useful when data is parsed using iterator. A record can be stored in database, then later, another record with the same ID is sent to the db, raising a duplicated error. These two documents would have been merged before using a ‘put all in memory’ parser. Since data is here read line by line, the merge is done while storing

process(doc_d, batch_size)[source]

Process iterable to store data. Must return the number of inserted records (even 0 if none)

class biothings.hub.dataload.storage.NoBatchIgnoreDuplicatedStorage(db, dest_col_name, logger=<module 'logging' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/logging/__init__.py'>)[source]

You should use IgnoreDuplicatedStorag, which works using batch and is thus way faster…

process(doc_d, batch_size)[source]

Process iterable to store data. Must return the number of inserted records (even 0 if none)

class biothings.hub.dataload.storage.NoStorage(db_info, dest_col_name, logger)[source]

This a kind of a place-holder, this storage will just store nothing… (but it will respect storage interface)

class biothings.hub.dataload.storage.RootKeyMergerStorage(db, dest_col_name, logger=<module 'logging' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/logging/__init__.py'>)[source]

Just like MergerStorage, this storage deals with duplicated error by appending key’s content to existing document. Keys in existing document will be converted to a list as needed.

Note

  • root keys must have the same type in each documents

  • inner structures aren’t merged together, the merge happend at root key level

exception biothings.hub.dataload.storage.StorageException[source]
class biothings.hub.dataload.storage.UpsertStorage(db, dest_col_name, logger=<module 'logging' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/logging/__init__.py'>)[source]

Insert or update documents, based on _id

process(iterable, batch_size)[source]

Process iterable to store data. Must return the number of inserted records (even 0 if none)

source

class biothings.hub.dataload.source.SourceManager(source_list, dump_manager, upload_manager, data_plugin_manager)[source]

Helper class to get information about a datasource, whether it has a dumper and/or uploaders associated.

reset(name, key='upload', subkey=None)[source]

Reset, ie. delete, internal data (src_dump document) for given source name, key subkey. This method is useful to clean outdated information in Hub’s internal database.

Ex: key=upload, name=mysource, subkey=mysubsource, will delete entry in corresponding

src_dump doc (_id=mysource), under key “upload”, for sub-source named “mysubsource”

“key” can be either ‘download’, ‘upload’ or ‘inspect’. Because there’s no such notion of subkey for dumpers (ie. ‘download’, subkey is optional.

sumup_source(src, detailed=False)[source]

Return minimal info about src

datainspect

inspector

exception biothings.hub.datainspect.inspector.InspectorError[source]

databuild

builder

exception biothings.hub.databuild.builder.BuilderException[source]
class biothings.hub.databuild.builder.DataBuilder(build_name, source_backend, target_backend, log_folder, doc_root_key='root', mappers=[], default_mapper_class=<class 'biothings.hub.databuild.mapper.TransparentMapper'>, sources=None, target_name=None, **kwargs)[source]

Generic data builder.

document_cleaner(src_name, *args, **kwargs)[source]

Return a function taking a document as argument, cleaning the doc as needed, and returning that doc. If no function is needed, None. Note: the returned function must be pickleable, careful with lambdas and closures.

get_build_version()[source]

Generate an arbitrary major build version. Default is using a timestamp (YYMMDD) ‘.’ char isn’t allowed in build version as it’s reserved for minor versions

get_custom_metadata(sources, job_manager)[source]

If more metadata is required, this method can be overridden and should return a dict. Existing metadata dict will be update with that one before storage.

get_mapping(sources)[source]

Merge mappings from src_master

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

get_predicates()[source]

Return a list of predicates (functions returning true/false, as in math logic) which instructs/dictates if job manager should start a job (process/thread)

get_stats(sources, job_manager)[source]

Return a dictionnary of metadata for this build. It’s usually app-specific and this method may be overridden as needed. By default though, the total number of documents in the merged collection is stored (key “total”)

Return dictionary will be merged with any existing metadata in src_build collection. This behavior can be changed by setting a special key within metadata dict: {“__REPLACE__” : True} will… replace existing metadata with the one returned here.

“job_manager” is passed in case parallelization is needed. Be aware that this method is already running in a dedicated thread, in order to use job_manager, the following code must be used at the very beginning of its implementation: asyncio.set_event_loop(job_manager.loop)

merge(sources=None, target_name=None, force=False, ids=None, steps=['merge', 'post', 'metadata'], job_manager=None, *args, **kwargs)[source]

Merge given sources into a collection named target_name. If sources argument is omitted, all sources defined for this merger will be merged together, according to what is defined insrc_build_config. If target_name is not defined, a unique name will be generated.

Optional parameters:
  • force=True will bypass any safety check

  • ids: list of _ids to merge, specifically. If None, all documents are merged.

  • steps:
    • merge: actual merge step, create merged documents and store them

    • post: once merge, run optional post-merge process

    • metadata: generate and store metadata (depends on merger, usually specifies the amount

      of merged data, source versions, etc…)

merge_sources(source_names, steps=['merge', 'post'], batch_size=100000, ids=None, job_manager=None)[source]

Merge resources from given source_names or from build config. Identify root document sources from the list to first process them. ids can a be list of documents to be merged in particular.

register_status(status, transient=False, init=False, **extra)[source]

Register current build status. A build status is a record in src_build The key used in this dict the target_name. Then, any operation acting on this target_name is registered in a “jobs” list.

resolve_sources(sources)[source]

Source can be a string that may contain regex chars. It’s usefull when you have plenty of sub-collections prefixed with a source name. For instance, given a source named “blah” stored in as many collections as chromosomes, insteand of passing each name as “blah_1”, “blah_2”, etc… “blah_.*” can be specified in build_config. This method resolves potential regexed source name into real, existing collection names

unprepare()[source]

reset anything that’s not pickable (so self can be pickled) return what’s been reset as a dict, so self can be restored once pickled

class biothings.hub.databuild.builder.LinkDataBuilder(build_name, source_backend, target_backend, *args, **kwargs)[source]

LinkDataBuilder creates a link to the original datasource to be merged, without actually copying the data (merged collection remains empty). This builder is only valid when using only one datasource (thus no real merge) is declared in the list of sources to be merged, and is useful to prevent data duplication between the datasource itself and the resulting merged collection.

exception biothings.hub.databuild.builder.ResumeException[source]
biothings.hub.databuild.builder.fix_batch_duplicates(docs, fail_if_struct_is_different=False)[source]

Remove duplicates from docs based on _id. If _id’s the same but structure is different (not real “duplicates”, but different documents with the same _ids), merge docs all together (dict.update) or raise an error if fail_if_struct_is_different.

differ

exception biothings.hub.databuild.differ.DifferException[source]

mapper

class biothings.hub.databuild.mapper.BaseMapper(name=None, *args, **kwargs)[source]

Basic mapper used to convert documents. if mapper’s name matches source’s metadata’s mapper, mapper.convert(docs) call will be used to process/convert/whatever passed documents

load()[source]

Do whatever is required to fill mapper with mapping data Can be called multiple time, the first time only will load data

process(docs)[source]

Convert given docs into other docs.

class biothings.hub.databuild.mapper.IDBaseMapper(name=None, convert_func=None, *args, **kwargs)[source]

Provide mapping between different sources

‘name’ may match a “mapper” metatdata field (see uploaders). If None, mapper will be applied to any document from a resource without “mapper” argument

process(docs, key_to_convert='_id', transparent=True)[source]

Process ‘key_to_convert’ document key using mapping. If transparent and no match, original key will be used (so there’s no change). Else, if no match, document will be discarded (default). Warning: key to be translated must not be None (it’s considered a non-match)

translate(_id, transparent=False)[source]

Return _id translated through mapper, or _id itself if not part of mapper If ‘transparent’ and no match, original _id will be returned

class biothings.hub.databuild.mapper.TransparentMapper(name=None, *args, **kwargs)[source]
load(*args, **kwargs)[source]

Do whatever is required to fill mapper with mapping data Can be called multiple time, the first time only will load data

process(docs, *args, **kwargs)[source]

Convert given docs into other docs.

prebuilder

syncer

exception biothings.hub.databuild.syncer.SyncerException[source]
biothings.hub.databuild.syncer.sync_es_jsondiff_worker(diff_file, es_config, new_db_col_names, batch_size, cnt, force=False, selfcontained=False, metadata={}, debug=False)[source]

Worker to sync data between a new mongo collection and an elasticsearch index

biothings.hub.databuild.syncer.sync_mongo_jsondiff_worker(diff_file, old_db_col_names, new_db_col_names, batch_size, cnt, force=False, selfcontained=False, metadata={}, debug=False)[source]

Worker to sync data between a new and an old mongo collection

dataindex

indexer

class biothings.hub.dataindex.indexer.ColdHotIndexer(*args, **kwargs)[source]

This indexer works with 2 mongo collections to create a single index. - one premerge collection contains “cold” data, which never changes (not updated) - another collection contains “hot” data, regularly updated Index is created fetching the premerge documents. Then, documents from the hot collection are merged by fetching docs from the index, updating them, and putting them back in the index.

get_mapping()[source]

collect mapping data from data sources.

index(hot_name, index_name, job_manager, steps=['index', 'post'], batch_size=10000, ids=None, mode='index')[source]

Same as Indexer.index method but works with a cold/hot collections strategy: first index the cold collection then complete the index with hot collection (adding docs or merging them in existing docs within the index)

load_build()[source]

Load cold and hot build documents. Index settings are the one declared in the hot build doc.

class biothings.hub.dataindex.indexer.DynamicIndexerFactory(urls, es_host, suffix='_current')[source]

In the context of autohub/standalone instances, create indexer with parameters taken from versions.json URL. A list of URLs is provided so the factory knows how to create these indexers for each URLs. There’s no way to “guess” an ES host from a URL, so this parameter must be specified as well, common to all URLs “suffix” param is added at the end of index names.

class biothings.hub.dataindex.indexer.Indexer(es_host, target_name=None, **kwargs)[source]

Basic indexer, reading documents from a mongo collection (target_name) and sending documents to ES.

enrich_final_mapping(final_mapping)[source]

final_mapping is the ES mapping ready to be sent, (with “dynamic” and “all” at its root for instance) this method gives opportunity to add more mapping definitions not directly related to datasources, such as other root keys

get_index_creation_settings()[source]

Override to return a dict containing some extra settings for index creation. Dict will be merged with mandatory settings, see biothings.utils.es.ESIndexer.create_index for more.

get_mapping()[source]

collect mapping data from data sources.

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

index(target_name, index_name, job_manager, steps=['index', 'post'], batch_size=10000, ids=None, mode='index', worker=None)[source]

Build an index named “index_name” with data from collection “target_collection”.

“ids” can be passed to selectively index documents.

“mode” can have the following values:
  • ‘purge’: will delete index if it exists

  • ‘resume’: will use existing index and add documents. “ids” can be passed as a list of missing IDs,

    or, if not pass, ES will be queried to identify which IDs are missing for each batch in order to complete the index.

  • ‘merge’: will merge data with existing index’ documents, used when populated several distinct times (cold/hot merge for instance)

  • None (default): will create a new index, assuming it doesn’t already exist

load_build(target_name=None)[source]

Load build info from src_build collection.

post_index(target_name, index_name, job_manager, steps=['index', 'post'], batch_size=10000, ids=None, mode=None)[source]

Override in sub-class to add a post-index process. Method’s signature is the same as index() to get the full context. This method will run in a thread (using job_manager.defer_to_thread())

exception biothings.hub.dataindex.indexer.IndexerException[source]

snapshooter

dataexport

ids

biothings.hub.dataexport.ids.export_ids(col_name)[source]

Export all _ids from collection named col_name. If col_name refers to a build where a cold_collection is defined, will also extract _ids and sort/uniq them to have the full list of _ids of the actual merged (cold+hot) collection Output file is stored in DATA_EXPORT_FOLDER/ids, defaulting to <DATA_ARCHIVE_ROOT>/export/ids. Output filename is returned as the end, if successful.

biothings.hub.dataexport.ids.upload_ids(ids_file, redirect_from, s3_bucket, aws_key, aws_secret)[source]

Upload file ids_file into s3_bucket and modify redirect_from key’s metadata so redirect_from link will now point to ids_file redirect_from s3 key must exist.

dataplugin

assistant

exception biothings.hub.dataplugin.assistant.AssistantException[source]
class biothings.hub.dataplugin.assistant.AssistantManager(data_plugin_manager, dumper_manager, uploader_manager, keylookup=None, default_export_folder='hub/dataload/sources', *args, **kwargs)[source]
export(plugin_name, folder=None, what=['dumper', 'uploader', 'mapping'], purge=False)[source]

Export generated code for a given plugin name, in given folder (or use DEFAULT_EXPORT_FOLDER if None). Exported information can be: - dumper: dumper class generated from the manifest - uploader: uploader class generated from the manifest - mapping: mapping generated from inspection or from the manifest If “purge” is true, any existing folder/code will be deleted first, otherwise, will raise an error if some folder/files already exist.

load(autodiscover=True)[source]

Load plugins registered in internal Hub database and generate/register dumpers & uploaders accordingly. If autodiscover is True, also search DATA_PLUGIN_FOLDER for existing plugin directories not registered yet in the database, and register them automatically.

register_classes(klasses)[source]

Register each class in self.register dict. Key will be used to retrieve the source class, create an instance and run method from it. It must be implemented in subclass as each manager may need to access its sources differently,based on different keys.

setup_log()[source]

Setup and return a logger instance

exception biothings.hub.dataplugin.assistant.LoaderException[source]

manager

class biothings.hub.dataplugin.manager.DataPluginManager(job_manager, datasource_path='dataload.sources', *args, **kwargs)[source]
class biothings.hub.dataplugin.manager.GitDataPlugin(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]
class biothings.hub.dataplugin.manager.ManualDataPlugin(*args, **kwargs)[source]
dump(*args, **kwargs)[source]

Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.

datarelease

publisher

exception biothings.hub.datarelease.publisher.PublisherException[source]

releasenote

datatransform

ciidstruct

CIIDStruct - case insenstive id matching data structure

class biothings.hub.datatransform.ciidstruct.CIIDStruct(field=None, doc_lst=None)[source]

CIIDStruct - id structure for use with the DataTransform classes. The basic idea is to provide a structure that provides a list of (original_id, current_id) pairs.

This is a case-insensitive version of IDStruct.

Initialize the structure :param field: field for documents to use as an initial id (optional) :param doc_lst: list of documents to use when building an initial list (optional)

add(left, right)[source]

add a (original_id, current_id) pair to the list, All string values are typecast to lowercase

find(where, ids)[source]

Case insensitive lookup of ids

datatransform_api

DataTransforAPI - classes around API based key lookup.

class biothings.hub.datatransform.datatransform_api.BiothingsAPIEdge(lookup, fields, weight=1, label=None, url=None)[source]

APIEdge - IDLookupEdge object for API calls

Initialize the class :param label: A label can be used for debugging purposes.

property client

property getter for client

edge_lookup(keylookup_obj, id_strct, debug=False)[source]

Follow an edge given a key.

This method uses the data in the edge_object to find one key to another key using an api. :param edge: :param key: :return:

init_state()[source]

initialize state - pickleable member variables

prepare_client()[source]

Load the biothings_client for the class :return:

class biothings.hub.datatransform.datatransform_api.DataTransformAPI(input_types, output_types, *args, **kwargs)[source]

Perform key lookup or key conversion from one key type to another using an API endpoint as a data source.

This class uses biothings apis to conversion from one key type to another. Base classes are used with the decorator syntax shown below:

@IDLookupMyChemInfo(input_types, output_types)
def load_document(doc_lst):
    for d in doc_lst:
        yield d

Lookup fields are configured in the ‘lookup_fields’ object, examples of which can be found in ‘IDLookupMyGeneInfo’ and ‘IDLookupMyChemInfo’.

Required Options:
  • input_types
    • ‘type’

    • (‘type’, ‘nested_source_field’)

    • [(‘type1’, ‘nested.source_field1’), (‘type2’, ‘nested.source_field2’), …]

  • output_types:
    • ‘type’

    • [‘type1’, ‘type2’]

Additional Options: see DataTransform class

Initialize the IDLookupAPI object.

key_lookup_batch(batchiter)[source]

Look up all keys for ids given in the batch iterator (1 block) :param batchiter: 1 lock of records to look up keys for :return:

class biothings.hub.datatransform.datatransform_api.DataTransformMyChemInfo(input_types, output_types=None, skip_on_failure=False, skip_w_regex=None)[source]

Single key lookup for MyChemInfo

Initialize the class by seting up the client object.

class biothings.hub.datatransform.datatransform_api.DataTransformMyGeneInfo(input_types, output_types=['entrezgene'], skip_on_failure=False, skip_w_regex=None)[source]

deprecated

Initialize the class by seting up the client object.

class biothings.hub.datatransform.datatransform_api.MyChemInfoEdge(lookup, field, weight=1, label=None, url=None)[source]

The MyChemInfoEdge uses the MyChem.info API to convert identifiers.

Parameters
  • lookup (str) – The field in the API to search with the input identifier.

  • field (str) – The field in the API to convert to.

  • weight (int) – Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1.

class biothings.hub.datatransform.datatransform_api.MyGeneInfoEdge(lookup, field, weight=1, label=None, url=None)[source]

The MyGeneInfoEdge uses the MyGene.info API to convert identifiers.

Parameters
  • lookup (str) – The field in the API to search with the input identifier.

  • field (str) – The field in the API to convert to.

  • weight (int) – Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1.

datatransform_mdb

DataTransform MDB module - class for performing key lookup using conversions described in a networkx graph.

class biothings.hub.datatransform.datatransform_mdb.CIMongoDBEdge(collection_name, lookup, field, weight=1, label=None)[source]

Case-insensitive MongoDBEdge

Parameters
  • collection_name (str) – The name of the MongoDB collection.

  • lookup (str) – The field that will match the input identifier in the collection.

  • field (str) – The output identifier field that will be read out of matching documents.

  • weight (int) – Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1.

collection_find(id_lst, lookup, field)[source]

Abstract out (as one line) the call to collection.find and use a case-insensitive collation

class biothings.hub.datatransform.datatransform_mdb.DataTransformMDB(graph, *args, **kwargs)[source]

Convert document identifiers from one type to another.

The DataTransformNetworkX module was written as a decorator class which should be applied to the load_data function of a Biothings Uploader. The load_data function yields documents, which are then post processed by call and the ‘id’ key conversion is performed.

Parameters
  • graph – nx.DiGraph (networkx 2.1) configuration graph

  • input_types – A list of input types for the form (identifier, field) where identifier matches a node and field is an optional dotstring field for where the identifier should be read from (the default is ‘_id’).

  • output_types (list(str)) – A priority list of identifiers to convert to. These identifiers should match nodes in the graph.

  • id_priority_list (list(str)) – A priority list of identifiers to to sort input and output types by.

  • skip_on_failure (bool) – If True, documents where identifier conversion fails will be skipped in the final document list.

  • skip_w_regex (bool) – Do not perform conversion if the identifier matches the regular expression provided to this argument. By default, this option is disabled.

  • skip_on_success (bool) – If True, documents where identifier conversion succeeds will be skipped in the final document list.

  • idstruct_class (class) – Override an internal data structure used by the this module (advanced usage)

  • copy_from_doc (bool) – If true then an identifier is copied from the input source document regardless as to weather it matches an edge or not. (advanced usage)

key_lookup_batch(batchiter)[source]

Look up all keys for ids given in the batch iterator (1 block) :param batchiter: 1 lock of records to look up keys for :return:

travel(input_type, target, doc_lst)[source]

Traverse a graph from a start key type to a target key type using precomputed paths.

Parameters
  • start – key type to start from

  • target – key type to end at

  • key – key value of type ‘start’

Returns

class biothings.hub.datatransform.datatransform_mdb.MongoDBEdge(collection_name, lookup, field, weight=1, label=None, check_index=True)[source]

The MongoDBEdge uses data within a MongoDB collection to convert one identifier to another. The input identifier is used to search a collection. The output identifier values are read out of that collection:

Parameters
  • collection_name (str) – The name of the MongoDB collection.

  • lookup (str) – The field that will match the input identifier in the collection.

  • field (str) – The output identifier field that will be read out of matching documents.

  • weight (int) – Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1.

property collection

getting for collection member variable

collection_find(id_lst, lookup, field)[source]

Abstract out (as one line) the call to collection.find

edge_lookup(keylookup_obj, id_strct, debug=False)[source]

Follow an edge given a key.

An edge represets a document and this method uses the data in the edge_object to find one key to another key using exactly one mongodb lookup. :param keylookup_obj: :param id_strct: :return:

init_state()[source]

initialize the state of pickleable objects

prepare_collection()[source]

Load the mongodb collection specified by collection_name. :return:

histogram

DataTransform Histogram class - track keylookup statistics

class biothings.hub.datatransform.histogram.Histogram[source]

Histogram - track keylookup statistics

update_edge(vert1, vert2, size)[source]

Update the edge histogram

update_io(input_type, output_type, size)[source]

Update the edge histogram