Hub component¶
The purpose of the BioThings hub component is to allow you to easily automate the parsing and uploading of your data to an Elasticsearch backend.
biothings.hub¶
-
class
biothings.hub.
HubSSHServer
[source]¶ -
begin_auth
(username)[source]¶ Authentication has been requested by the client
This method will be called when authentication is attempted for the specified user. Applications should use this method to prepare whatever state they need to complete the authentication, such as loading in the set of authorized keys for that user. If no authentication is required for this user, this method should return False to cause the authentication to immediately succeed. Otherwise, it should return True to indicate that authentication should proceed.
If blocking operations need to be performed to prepare the state needed to complete the authentication, this method may be defined as a coroutine.
- Parameters
username (str) – The name of the user being authenticated
- Returns
A bool indicating whether authentication is required
-
connection_lost
(exc)[source]¶ Called when a connection is lost or closed
This method is called when a connection is closed. If the connection is shut down cleanly, exc will be None. Otherwise, it will be an exception explaining the reason for the disconnect.
-
connection_made
(connection)[source]¶ Called when a connection is made
This method is called when a new TCP connection is accepted. The conn parameter should be stored if needed for later use.
- Parameters
conn (
SSHServerConnection
) – The connection which was successfully opened
-
password_auth_supported
()[source]¶ Return whether or not password authentication is supported
This method should return True if password authentication is supported. Applications wishing to support it must have this method return True and implement
validate_password()
to return whether or not the password provided by the client is valid for the user being authenticated.By default, this method returns False indicating that password authentication is not supported.
- Returns
A bool indicating if password authentication is supported or not
-
session_requested
()[source]¶ Handle an incoming session request
This method is called when a session open request is received from the client, indicating it wishes to open a channel to be used for running a shell, executing a command, or connecting to a subsystem. If the application wishes to accept the session, it must override this method to return either an
SSHServerSession
object to use to process the data received on the channel or a tuple consisting of anSSHServerChannel
object created withcreate_server_channel
and anSSHServerSession
, if the application wishes to pass non-default arguments when creating the channel.If blocking operations need to be performed before the session can be created, a coroutine which returns an
SSHServerSession
object can be returned instead of the session iself. This can be either returned directly or as a part of a tuple with anSSHServerChannel
object.To reject this request, this method should return False to send back a “Session refused” response or raise a
ChannelOpenError
exception with the reason for the failure.The details of what type of session the client wants to start will be delivered to methods on the
SSHServerSession
object which is returned, along with other information such as environment variables, terminal type, size, and modes.By default, all session requests are rejected.
- Returns
One of the following:
An
SSHServerSession
object or a coroutine which returns anSSHServerSession
A tuple consisting of an
SSHServerChannel
and the aboveA callable or coroutine handler function which takes AsyncSSH stream objects for stdin, stdout, and stderr as arguments
A tuple consisting of an
SSHServerChannel
and the aboveFalse to refuse the request
- Raises
ChannelOpenError
if the session shouldn’t be accepted
-
validate_password
(username, password)[source]¶ Return whether password is valid for this user
This method should return True if the specified password is a valid password for the user being authenticated. It must be overridden by applications wishing to support password authentication.
If the password provided is valid but expired, this method may raise
PasswordChangeRequired
to request that the client provide a new password before authentication is allowed to complete. In this case, the application must overridechange_password()
to handle the password change request.This method may be called multiple times with different passwords provided by the client. Applications may wish to limit the number of attempts which are allowed. This can be done by having
password_auth_supported()
begin returning False after the maximum number of attempts is exceeded.If blocking operations need to be performed to determine the validity of the password, this method may be defined as a coroutine.
By default, this method returns False for all passwords.
- Parameters
username (str) – The user being authenticated
password (str) – The password sent by the client
- Returns
A bool indicating if the specified password is valid for the user being authenticated
- Raises
PasswordChangeRequired
if the password provided is expired and needs to be changed
-
-
class
biothings.hub.
HubSSHServerSession
(name, shell)[source]¶ -
break_received
(msec)[source]¶ The client has sent a break
This method is called when the client requests that the server perform a break operation on the terminal. If the break is performed, this method should return True. Otherwise, it should return False.
By default, this method returns False indicating that no break was performed.
- Parameters
msec (int) – The duration of the break in milliseconds
- Returns
A bool to indicate if the break operation was performed or not
-
connection_made
(chan)[source]¶ Called when a channel is opened successfully
This method is called when a channel is opened successfully. The channel parameter should be stored if needed for later use.
- Parameters
chan (
SSHServerChannel
) – The channel which was successfully opened.
-
data_received
(data, datatype)[source]¶ Called when data is received on the channel
This method is called when data is received on the channel. If an encoding was specified when the channel was created, the data will be delivered as a string after decoding with the requested encoding. Otherwise, the data will be delivered as bytes.
- Parameters
data (str or bytes) – The data received on the channel
datatype – The extended data type of the data, from extended data types
-
eof_received
()[source]¶ Called when EOF is received on the channel
This method is called when an end-of-file indication is received on the channel, after which no more data will be received. If this method returns True, the channel remains half open and data may still be sent. Otherwise, the channel is automatically closed after this method returns. This is the default behavior for classes derived directly from
SSHSession
, but not when using the higher-level streams API. Because input is buffered in that case, streaming sessions enable half-open channels to allow applications to respond to input read after an end-of-file indication is received.
-
exec_requested
(command)[source]¶ The client has requested to execute a command
This method should be implemented by the application to perform whatever processing is required when a client makes a request to execute a command. It should return True to accept the request, or False to reject it.
If the application returns True, the
session_started()
method will be called once the channel is fully open. No output should be sent until this method is called.By default this method returns False to reject all requests.
- Parameters
command (str) – The command the client has requested to execute
- Returns
A bool indicating if the exec request was allowed or not
-
session_started
()[source]¶ Called when the session is started
This method is called when a session has started up. For client and server sessions, this will be called once a shell, exec, or subsystem request has been successfully completed. For TCP and UNIX domain socket sessions, it will be called immediately after the connection is opened.
-
shell_requested
()[source]¶ The client has requested a shell
This method should be implemented by the application to perform whatever processing is required when a client makes a request to open an interactive shell. It should return True to accept the request, or False to reject it.
If the application returns True, the
session_started()
method will be called once the channel is fully open. No output should be sent until this method is called.By default this method returns False to reject all requests.
- Returns
A bool indicating if the shell request was allowed or not
-
-
biothings.hub.
status
(managers)[source]¶ Return a global hub status (number or sources, documents, etc…) according to available managers
api¶
handlers¶
base¶
-
class
biothings.hub.api.handlers.base.
BaseHandler
(application, request, **kwargs)[source]¶ -
initialize
(managers, **kwargs)[source]¶ Hook for subclass initialization. Called for each request.
A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().
Example:
class ProfileHandler(RequestHandler): def initialize(self, database): self.database = database def get(self, username): ... app = Application([ (r'/user/(.*)', ProfileHandler, dict(database=database)), ])
-
-
class
biothings.hub.api.handlers.base.
DefaultHandler
(application, request, **kwargs)[source]¶ -
set_default_headers
()[source]¶ Override this to set HTTP headers at the beginning of the request.
For example, this is the place to set a custom
Server
header. Note that setting such headers in the normal flow of request processing may not do what you want, since headers may be reset during error handling.
-
write
(result)[source]¶ Writes the given chunk to the output buffer.
To write the output to the network, use the flush() method below.
If the given chunk is a dictionary, we write it as JSON and set the Content-Type of the response to be
application/json
. (if you want to send JSON as a differentContent-Type
, call set_header after calling write()).Note that lists are not converted to JSON because of a potential cross-site security vulnerability. All JSON output should be wrapped in a dictionary. More details at http://haacked.com/archive/2009/06/25/json-hijacking.aspx/ and https://github.com/facebook/tornado/issues/1009
-
write_error
(status_code, **kwargs)[source]¶ Override to implement custom error pages.
write_error
may call write, render, set_header, etc to produce output as usual.If this error was caused by an uncaught exception (including HTTPError), an
exc_info
triple will be available askwargs["exc_info"]
. Note that this exception may not be the “current” exception for purposes of methods likesys.exc_info()
ortraceback.format_exc
.
-
-
class
biothings.hub.api.handlers.base.
GenericHandler
(application, request, **kwargs)[source]¶ -
initialize
(shell, **kwargs)[source]¶ Hook for subclass initialization. Called for each request.
A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().
Example:
class ProfileHandler(RequestHandler): def initialize(self, database): self.database = database def get(self, username): ... app = Application([ (r'/user/(.*)', ProfileHandler, dict(database=database)), ])
-
-
class
biothings.hub.api.handlers.base.
RootHandler
(application, request, **kwargs)[source]¶ -
initialize
(features, hub_name=None, **kwargs)[source]¶ Hook for subclass initialization. Called for each request.
A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().
Example:
class ProfileHandler(RequestHandler): def initialize(self, database): self.database = database def get(self, username): ... app = Application([ (r'/user/(.*)', ProfileHandler, dict(database=database)), ])
-
shell¶
-
class
biothings.hub.api.handlers.shell.
ShellHandler
(application, request, **kwargs)[source]¶ -
initialize
(shell, shellog, **kwargs)[source]¶ Hook for subclass initialization. Called for each request.
A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().
Example:
class ProfileHandler(RequestHandler): def initialize(self, database): self.database = database def get(self, username): ... app = Application([ (r'/user/(.*)', ProfileHandler, dict(database=database)), ])
-
upload¶
-
class
biothings.hub.api.handlers.upload.
UploadHandler
(application, request, **kwargs)[source]¶ -
data_received
(chunk)[source]¶ Implement this method to handle streamed request data.
Requires the .stream_request_body decorator.
-
initialize
(upload_root, **kwargs)[source]¶ Hook for subclass initialization. Called for each request.
A dictionary passed as the third argument of a url spec will be supplied as keyword arguments to initialize().
Example:
class ProfileHandler(RequestHandler): def initialize(self, database): self.database = database def get(self, username): ... app = Application([ (r'/user/(.*)', ProfileHandler, dict(database=database)), ])
-
prepare
()[source]¶ Called at the beginning of a request before get/post/etc.
Override this method to perform common initialization regardless of the request method.
Asynchronous support: Decorate this method with .gen.coroutine or use
async def
to make it asynchronous (the asynchronous decorator cannot be used on prepare). If this method returns a .Future execution will not proceed until the .Future is done.New in version 3.1: Asynchronous support.
-
ws¶
-
class
biothings.hub.api.handlers.ws.
HubDBListener
[source]¶ Get events from Hub DB and propagate them through the websocket instance
-
class
biothings.hub.api.handlers.ws.
WebSocketConnection
(session, listeners)[source]¶ Listen to Hub DB through a listener object, and publish events to any client connected
SockJSConnection.__init__() takes only a session as argument, and there’s no way to pass custom settings. In order to use that class, we need to use partial to partially init the instance with ‘listeners’ and let the rest use the ‘session’
- parameter:
pconn = partial(WebSocketConnection,listeners=listeners) ws_router = sockjs.tornado.SockJSRouter(pconn,”/path”)
-
on_open
(info)[source]¶ Default on_open() handler.
Override when you need to do some initialization or request validation. If you return False, connection will be rejected.
You can also throw Tornado HTTPError to close connection.
- request
ConnectionInfo
object which contains caller IP address, query string parameters and cookies associated with this request (if any).
dataload¶
dumper¶
-
class
biothings.hub.dataload.dumper.
DummyDumper
(*args, **kwargs)[source]¶ DummyDumper will do nothing… (useful for datasources that can’t be downloaded anymore but still need to be integrated, ie. fill src_dump, etc…)
-
class
biothings.hub.dataload.dumper.
DumperManager
(job_manager, datasource_path='dataload.sources', *args, **kwargs)[source]¶ -
call
(src, method_name, *args, **kwargs)[source]¶ Create a dumper for datasource “src” and call method “method_name” on it, with given arguments. Used to create arbitrary calls on a dumper. “method_name” within dumper definition must a coroutine.
-
clean_stale_status
()[source]¶ During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.
-
-
class
biothings.hub.dataload.dumper.
FilesystemDumper
(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]¶ This dumpers works locally and copy (or move) files to datasource folder
-
download
(remotefile, localfile)[source]¶ Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)
-
-
class
biothings.hub.dataload.dumper.
GitDumper
(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]¶ Git dumper gets data from a git repo. Repo is stored in SRC_ROOT_FOLDER (without versioning) and then versions/releases are fetched in SRC_ROOT_FOLDER/<release>
-
download
(remotefile, localfile)[source]¶ Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)
-
dump
(release='HEAD', force=False, job_manager=None, **kwargs)[source]¶ Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.
-
property
new_data_folder
¶ Generate a new data folder path using src_root_folder and specified suffix attribute. Also sync current (aka previous) data folder previously registeted in database. This method typically has to be called in create_todump_list() when the dumper actually knows some information about the resource, like the actual release.
-
-
class
biothings.hub.dataload.dumper.
GoogleDriveDumper
(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]¶ -
download
(remoteurl, localfile)[source]¶ - remoteurl is a google drive link containing a document ID, such as:
https://drive.google.com/open?id=<1234567890ABCDEF>
https://drive.google.com/file/d/<1234567890ABCDEF>/view
It can also be just a document ID
-
-
class
biothings.hub.dataload.dumper.
HTTPDumper
(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]¶ Dumper using HTTP protocol and “requests” library
-
download
(remoteurl, localfile, headers={})[source]¶ Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)
-
-
class
biothings.hub.dataload.dumper.
LastModifiedBaseDumper
(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]¶ Use SRC_URLS as a list of URLs to download and implement create_todump_list() according to that list. Shoud be used in parallel with a dumper talking the actual underlying protocol
-
class
biothings.hub.dataload.dumper.
LastModifiedFTPDumper
(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]¶ SRC_URLS containing a list of URLs pointing to files to download, use FTP’s MDTM command to check whether files should be downloaded The release is generated from the last file’s MDTM in SRC_URLS, and formatted according to RELEASE_FORMAT. See also LastModifiedHTTPDumper, working the same way but for HTTP protocol. Note: this dumper is a wrapper over FTPDumper, one URL will give one FTPDumper instance.
-
download
(urlremotefile, localfile, headers={})[source]¶ Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)
-
release_client
()[source]¶ Do whatever necessary (like closing network connection) to “release” the client
-
-
class
biothings.hub.dataload.dumper.
LastModifiedHTTPDumper
(src_name=None, src_root_folder=None, log_folder=None, archive=None)[source]¶ Given a list of URLs, check Last-Modified header to see whether the file should be downloaded. Sub-class should only have to declare SRC_URLS. Optionally, another field name can be used instead of Last-Modified, but date format must follow RFC 2616. If that header doesn’t exist, it will always download the data (bypass) The release is generated from the last file’s Last-Modified in SRC_URLS, and formatted according to RELEASE_FORMAT.
-
class
biothings.hub.dataload.dumper.
ManualDumper
(*args, **kwargs)[source]¶ This dumper will assist user to dump a resource. It will usually expect the files to be downloaded first (sometimes there’s no easy way to automate this process). Once downloaded, a call to dump() will make sure everything is fine in terms of files and metadata
-
dump
(path, release=None, force=False, job_manager=None, **kwargs)[source]¶ Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.
-
property
new_data_folder
¶ Generate a new data folder path using src_root_folder and specified suffix attribute. Also sync current (aka previous) data folder previously registeted in database. This method typically has to be called in create_todump_list() when the dumper actually knows some information about the resource, like the actual release.
-
uploader¶
-
class
biothings.hub.dataload.uploader.
BaseSourceUploader
(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]¶ Default datasource uploader. Database storage can be done in batch or line by line. Duplicated records aren’t not allowed
db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.
-
classmethod
create
(db_conn_info, *args, **kwargs)[source]¶ Factory-like method, just return an instance of this uploader (used by SourceManager, may be overridden in sub-class to generate more than one instance per class, like a true factory. This is usefull when a resource is splitted in different collection but the data structure doesn’t change (it’s really just data splitted accros multiple collections, usually for parallelization purposes). Instead of having actual class for each split collection, factory will generate them on-the-fly.
-
get_pinfo
()[source]¶ Return dict containing information about the current process (used to report in the hub)
-
get_predicates
()[source]¶ Return a list of predicates (functions returning true/false, as in math logic) which instructs/dictates if job manager should start a job (process/thread)
-
load
(steps=['data', 'post', 'master', 'clean'], force=False, batch_size=10000, job_manager=None, **kwargs)[source]¶ Main resource load process, reads data from doc_c using chunk sized as batch_size. steps defines the different processes used to laod the resource: - “data” : will store actual data into single collections - “post” : will perform post data load operations - “master” : will register the master document in src_master
-
load_data
(data_folder)[source]¶ Parse data inside data_folder and return structure ready to be inserted in database
-
make_temp_collection
()[source]¶ Create a temp collection for dataloading, e.g., entrez_geneinfo_INEMO.
-
post_update_data
(steps, force, batch_size, job_manager, **kwargs)[source]¶ Override as needed to perform operations after data has been uploaded
-
prepare_src_dump
()[source]¶ Sync with src_dump collection, collection information (src_doc) Return src_dump collection
-
register_status
(status, subkey='upload', **extra)[source]¶ Register step status, ie. status for a sub-resource
-
switch_collection
()[source]¶ after a successful loading, rename temp_collection to regular collection name, and renaming existing collection to a temp name for archiving purpose.
-
classmethod
-
class
biothings.hub.dataload.uploader.
DummySourceUploader
(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]¶ Dummy uploader, won’t upload any data, assuming data is already there but make sure every other bit of information is there for the overall process (usefull when online data isn’t available anymore)
db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.
-
class
biothings.hub.dataload.uploader.
IgnoreDuplicatedSourceUploader
(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]¶ Same as default uploader, but will store records and ignore if any duplicated error occuring (use with caution…). Storage is done using batch and unordered bulk operations.
db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.
-
class
biothings.hub.dataload.uploader.
MergerSourceUploader
(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]¶ db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.
-
storage_class
¶
-
-
class
biothings.hub.dataload.uploader.
NoBatchIgnoreDuplicatedSourceUploader
(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]¶ Same as default uploader, but will store records and ignore if any duplicated error occuring (use with caution…). Storage is done line by line (slow, not using a batch) but preserve order of data in input file.
db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.
-
storage_class
¶ alias of
biothings.hub.dataload.storage.NoBatchIgnoreDuplicatedStorage
-
-
class
biothings.hub.dataload.uploader.
NoDataSourceUploader
(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]¶ This uploader won’t upload any data and won’t even assume there’s actual data (different from DummySourceUploader on this point). It’s usefull for instance when mapping need to be stored (get_mapping()) but data doesn’t comes from an actual upload (ie. generated)
db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.
-
storage_class
¶
-
-
class
biothings.hub.dataload.uploader.
ParallelizedSourceUploader
(db_conn_info, collection_name=None, log_folder=None, *args, **kwargs)[source]¶ db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state.
-
class
biothings.hub.dataload.uploader.
UploaderManager
(poll_schedule=None, *args, **kwargs)[source]¶ After registering datasources, manager will orchestrate source uploading.
-
SOURCE_CLASS
¶
-
clean_stale_status
()[source]¶ During startup, search for action in progress which would have been interrupted and change the state to “canceled”. Ex: some donwloading processes could have been interrupted, at startup, “downloading” status should be changed to “canceled” so to reflect actual state on these datasources. This must be overriden in subclass.
-
filter_class
(klass)[source]¶ Gives opportunity for subclass to check given class and decide to keep it or not in the discovery process. Returning None means “skip it”.
-
poll
(state, func)[source]¶ Search for source in collection ‘col’ with a pending flag list containing ‘state’ and and call ‘func’ for each document found (with doc as only param)
-
register_classes
(klasses)[source]¶ Register each class in self.register dict. Key will be used to retrieve the source class, create an instance and run method from it. It must be implemented in subclass as each manager may need to access its sources differently,based on different keys.
-
-
biothings.hub.dataload.uploader.
upload_worker
(name, storage_class, loaddata_func, col_name, batch_size, batch_num, *args)[source]¶ Pickable job launcher, typically running from multiprocessing. storage_class will instanciate with col_name, the destination collection name. loaddata_func is the parsing/loading function, called with *args.
storage¶
-
class
biothings.hub.dataload.storage.
MergerStorage
(db, dest_col_name, logger=<module 'logging' from '/home/docs/.pyenv/versions/3.6.12/lib/python3.6/logging/__init__.py'>)[source]¶ This storage will try to merge documents when finding duplicated errors. It’s useful when data is parsed using iterator. A record can be stored in database, then later, another record with the same ID is sent to the db, raising a duplicated error. These two documents would have been merged before using a ‘put all in memory’ parser. Since data is here read line by line, the merge is done while storing
-
class
biothings.hub.dataload.storage.
NoBatchIgnoreDuplicatedStorage
(db, dest_col_name, logger=<module 'logging' from '/home/docs/.pyenv/versions/3.6.12/lib/python3.6/logging/__init__.py'>)[source]¶ You should use IgnoreDuplicatedStorag, which works using batch and is thus way faster…
-
class
biothings.hub.dataload.storage.
NoStorage
(db_info, dest_col_name, logger)[source]¶ This a kind of a place-holder, this storage will just store nothing… (but it will respect storage interface)
-
class
biothings.hub.dataload.storage.
RootKeyMergerStorage
(db, dest_col_name, logger=<module 'logging' from '/home/docs/.pyenv/versions/3.6.12/lib/python3.6/logging/__init__.py'>)[source]¶ Just like MergerStorage, this storage deals with duplicated error by appending key’s content to existing document. Keys in existing document will be converted to a list as needed.
Note
root keys must have the same type in each documents
inner structures aren’t merged together, the merge happend at root key level
source¶
-
class
biothings.hub.dataload.source.
SourceManager
(source_list, dump_manager, upload_manager, data_plugin_manager)[source]¶ Helper class to get information about a datasource, whether it has a dumper and/or uploaders associated.
-
reset
(name, key='upload', subkey=None)[source]¶ Reset, ie. delete, internal data (src_dump document) for given source name, key subkey. This method is useful to clean outdated information in Hub’s internal database.
- Ex: key=upload, name=mysource, subkey=mysubsource, will delete entry in corresponding
src_dump doc (_id=mysource), under key “upload”, for sub-source named “mysubsource”
“key” can be either ‘download’, ‘upload’ or ‘inspect’. Because there’s no such notion of subkey for dumpers (ie. ‘download’, subkey is optional.
-
databuild¶
builder¶
-
class
biothings.hub.databuild.builder.
DataBuilder
(build_name, source_backend, target_backend, log_folder, doc_root_key='root', mappers=[], default_mapper_class=<class 'biothings.hub.databuild.mapper.TransparentMapper'>, sources=None, target_name=None, **kwargs)[source]¶ Generic data builder.
-
document_cleaner
(src_name, *args, **kwargs)[source]¶ Return a function taking a document as argument, cleaning the doc as needed, and returning that doc. If no function is needed, None. Note: the returned function must be pickleable, careful with lambdas and closures.
-
get_build_version
()[source]¶ Generate an arbitrary major build version. Default is using a timestamp (YYMMDD) ‘.’ char isn’t allowed in build version as it’s reserved for minor versions
-
get_custom_metadata
(sources, job_manager)[source]¶ If more metadata is required, this method can be overridden and should return a dict. Existing metadata dict will be update with that one before storage.
-
get_pinfo
()[source]¶ Return dict containing information about the current process (used to report in the hub)
-
get_predicates
()[source]¶ Return a list of predicates (functions returning true/false, as in math logic) which instructs/dictates if job manager should start a job (process/thread)
-
get_stats
(sources, job_manager)[source]¶ Return a dictionnary of metadata for this build. It’s usually app-specific and this method may be overridden as needed. By default though, the total number of documents in the merged collection is stored (key “total”)
Return dictionary will be merged with any existing metadata in src_build collection. This behavior can be changed by setting a special key within metadata dict: {“__REPLACE__” : True} will… replace existing metadata with the one returned here.
“job_manager” is passed in case parallelization is needed. Be aware that this method is already running in a dedicated thread, in order to use job_manager, the following code must be used at the very beginning of its implementation: asyncio.set_event_loop(job_manager.loop)
-
merge
(sources=None, target_name=None, force=False, ids=None, steps=['merge', 'post', 'metadata'], job_manager=None, *args, **kwargs)[source]¶ Merge given sources into a collection named target_name. If sources argument is omitted, all sources defined for this merger will be merged together, according to what is defined insrc_build_config. If target_name is not defined, a unique name will be generated.
- Optional parameters:
force=True will bypass any safety check
ids: list of _ids to merge, specifically. If None, all documents are merged.
- steps:
merge: actual merge step, create merged documents and store them
post: once merge, run optional post-merge process
- metadata: generate and store metadata (depends on merger, usually specifies the amount
of merged data, source versions, etc…)
-
merge_sources
(source_names, steps=['merge', 'post'], batch_size=100000, ids=None, job_manager=None)[source]¶ Merge resources from given source_names or from build config. Identify root document sources from the list to first process them. ids can a be list of documents to be merged in particular.
-
register_status
(status, transient=False, init=False, **extra)[source]¶ Register current build status. A build status is a record in src_build The key used in this dict the target_name. Then, any operation acting on this target_name is registered in a “jobs” list.
-
resolve_sources
(sources)[source]¶ Source can be a string that may contain regex chars. It’s usefull when you have plenty of sub-collections prefixed with a source name. For instance, given a source named “blah” stored in as many collections as chromosomes, insteand of passing each name as “blah_1”, “blah_2”, etc… “blah_.*” can be specified in build_config. This method resolves potential regexed source name into real, existing collection names
-
-
class
biothings.hub.databuild.builder.
LinkDataBuilder
(build_name, source_backend, target_backend, *args, **kwargs)[source]¶ LinkDataBuilder creates a link to the original datasource to be merged, without actually copying the data (merged collection remains empty). This builder is only valid when using only one datasource (thus no real merge) is declared in the list of sources to be merged, and is useful to prevent data duplication between the datasource itself and the resulting merged collection.
-
biothings.hub.databuild.builder.
fix_batch_duplicates
(docs, fail_if_struct_is_different=False)[source]¶ Remove duplicates from docs based on _id. If _id’s the same but structure is different (not real “duplicates”, but different documents with the same _ids), merge docs all together (dict.update) or raise an error if fail_if_struct_is_different.
mapper¶
-
class
biothings.hub.databuild.mapper.
BaseMapper
(name=None, *args, **kwargs)[source]¶ Basic mapper used to convert documents. if mapper’s name matches source’s metadata’s mapper, mapper.convert(docs) call will be used to process/convert/whatever passed documents
-
class
biothings.hub.databuild.mapper.
IDBaseMapper
(name=None, convert_func=None, *args, **kwargs)[source]¶ Provide mapping between different sources
‘name’ may match a “mapper” metatdata field (see uploaders). If None, mapper will be applied to any document from a resource without “mapper” argument
-
process
(docs, key_to_convert='_id', transparent=True)[source]¶ Process ‘key_to_convert’ document key using mapping. If transparent and no match, original key will be used (so there’s no change). Else, if no match, document will be discarded (default). Warning: key to be translated must not be None (it’s considered a non-match)
-
prebuilder¶
dataindex¶
indexer¶
-
class
biothings.hub.dataindex.indexer.
ColdHotIndexer
(*args, **kwargs)[source]¶ This indexer works with 2 mongo collections to create a single index. - one premerge collection contains “cold” data, which never changes (not updated) - another collection contains “hot” data, regularly updated Index is created fetching the premerge documents. Then, documents from the hot collection are merged by fetching docs from the index, updating them, and putting them back in the index.
-
index
(hot_name, index_name, job_manager, steps=['index', 'post'], batch_size=10000, ids=None, mode='index')[source]¶ Same as Indexer.index method but works with a cold/hot collections strategy: first index the cold collection then complete the index with hot collection (adding docs or merging them in existing docs within the index)
-
-
class
biothings.hub.dataindex.indexer.
DynamicIndexerFactory
(urls, es_host, suffix='_current')[source]¶ In the context of autohub/standalone instances, create indexer with parameters taken from versions.json URL. A list of URLs is provided so the factory knows how to create these indexers for each URLs. There’s no way to “guess” an ES host from a URL, so this parameter must be specified as well, common to all URLs “suffix” param is added at the end of index names.
-
class
biothings.hub.dataindex.indexer.
Indexer
(es_host, target_name=None, **kwargs)[source]¶ Basic indexer, reading documents from a mongo collection (target_name) and sending documents to ES.
-
enrich_final_mapping
(final_mapping)[source]¶ final_mapping is the ES mapping ready to be sent, (with “dynamic” and “all” at its root for instance) this method gives opportunity to add more mapping definitions not directly related to datasources, such as other root keys
-
get_index_creation_settings
()[source]¶ Override to return a dict containing some extra settings for index creation. Dict will be merged with mandatory settings, see biothings.utils.es.ESIndexer.create_index for more.
-
get_pinfo
()[source]¶ Return dict containing information about the current process (used to report in the hub)
-
index
(target_name, index_name, job_manager, steps=['index', 'post'], batch_size=10000, ids=None, mode='index', worker=None)[source]¶ Build an index named “index_name” with data from collection “target_collection”.
“ids” can be passed to selectively index documents.
- “mode” can have the following values:
‘purge’: will delete index if it exists
- ‘resume’: will use existing index and add documents. “ids” can be passed as a list of missing IDs,
or, if not pass, ES will be queried to identify which IDs are missing for each batch in order to complete the index.
‘merge’: will merge data with existing index’ documents, used when populated several distinct times (cold/hot merge for instance)
None (default): will create a new index, assuming it doesn’t already exist
-
post_index
(target_name, index_name, job_manager, steps=['index', 'post'], batch_size=10000, ids=None, mode=None)[source]¶ Override in sub-class to add a post-index process. Method’s signature is the same as index() to get the full context. This method will run in a thread (using job_manager.defer_to_thread())
-
snapshooter¶
Elasticsearch Snapshot Feature
Snapshot Config Example: {
- “cloud”: {
“type”: “aws”, # default, only one supported by now “access_key”: None, “secret_key”: None,
}, “repository”: {
“name”: “s3-$(Y)”, “type”: “s3”, “settings”: {
“bucket”: “<SNAPSHOT_BUCKET_NAME>”, “base_path”: “mynews.info/$(Y)”, # per year “region”: “us-west-2”,
}, “acl”: “private”,
}, #—————————– inferred from build doc from now on “indexer”: {
# reference to INDEX_CONFIG “env”: “local”,
}, #—————————– # when creating a snapshot, how long should we wait before querying ES # to check snapshot status/completion ? (in seconds) “monitor_delay”: 60 * 5,
}
SnapshotManager => SnapshotEnvConfig(s) SnapshotEnvConfig + (Build) -> SnapshotEnv SnapshotEnv + Index + Snapshot -> SnapshotTaskEnv
-
class
biothings.hub.dataindex.snapshooter.
BuildSpecificEnv
(env_conf, build_doc)[source]¶ Snapshot Env % Build
-
class
biothings.hub.dataindex.snapshooter.
SnapshotEnv
(job_manager, env_config, build_doc)[source]¶ Corresponds to an ES repository for a specific build. The repository type can be what are supported by ES.
-
class
biothings.hub.dataindex.snapshooter.
SnapshotEnvConfig
(name, env_class, env_config)[source]¶ Snapshot Env before Combining with Build Info.
-
class
biothings.hub.dataindex.snapshooter.
SnapshotFSEnv
(job_manager, env_config, build_doc)[source]¶
-
class
biothings.hub.dataindex.snapshooter.
SnapshotS3Env
(job_manager, env_config, build_doc)[source]¶ Relevent Config Entries: {
- “cloud”: {
“type”: “aws”, # default, only one supported by now “access_key”: None, “secret_key”: None,
}, “repository”: {
“name”: “s3-$(Y)”, “type”: “s3”, “settings”: {
“bucket”: “<SNAPSHOT_BUCKET_NAME>”, “base_path”: “mynews.info/$(Y)”, # per year “region”: “us-west-2”,
}, “acl”: “private”,
}
}
dataexport¶
ids¶
-
biothings.hub.dataexport.ids.
export_ids
(col_name)[source]¶ Export all _ids from collection named col_name. If col_name refers to a build where a cold_collection is defined, will also extract _ids and sort/uniq them to have the full list of _ids of the actual merged (cold+hot) collection Output file is stored in DATA_EXPORT_FOLDER/ids, defaulting to <DATA_ARCHIVE_ROOT>/export/ids. Output filename is returned as the end, if successful.
dataplugin¶
assistant¶
-
class
biothings.hub.dataplugin.assistant.
AssistantManager
(data_plugin_manager, dumper_manager, uploader_manager, keylookup=None, default_export_folder='hub/dataload/sources', *args, **kwargs)[source]¶ -
export
(plugin_name, folder=None, what=['dumper', 'uploader', 'mapping'], purge=False)[source]¶ Export generated code for a given plugin name, in given folder (or use DEFAULT_EXPORT_FOLDER if None). Exported information can be: - dumper: dumper class generated from the manifest - uploader: uploader class generated from the manifest - mapping: mapping generated from inspection or from the manifest If “purge” is true, any existing folder/code will be deleted first, otherwise, will raise an error if some folder/files already exist.
-
load
(autodiscover=True)[source]¶ Load plugins registered in internal Hub database and generate/register dumpers & uploaders accordingly. If autodiscover is True, also search DATA_PLUGIN_FOLDER for existing plugin directories not registered yet in the database, and register them automatically.
-
datatransform¶
ciidstruct¶
CIIDStruct - case insenstive id matching data structure
-
class
biothings.hub.datatransform.ciidstruct.
CIIDStruct
(field=None, doc_lst=None)[source]¶ CIIDStruct - id structure for use with the DataTransform classes. The basic idea is to provide a structure that provides a list of (original_id, current_id) pairs.
This is a case-insensitive version of IDStruct.
Initialize the structure :param field: field for documents to use as an initial id (optional) :param doc_lst: list of documents to use when building an initial list (optional)
datatransform_api¶
DataTransforAPI - classes around API based key lookup.
-
class
biothings.hub.datatransform.datatransform_api.
BiothingsAPIEdge
(lookup, fields, weight=1, label=None, url=None)[source]¶ APIEdge - IDLookupEdge object for API calls
Initialize the class :param label: A label can be used for debugging purposes.
-
property
client
¶ property getter for client
-
property
-
class
biothings.hub.datatransform.datatransform_api.
DataTransformAPI
(input_types, output_types, *args, **kwargs)[source]¶ Perform key lookup or key conversion from one key type to another using an API endpoint as a data source.
This class uses biothings apis to conversion from one key type to another. Base classes are used with the decorator syntax shown below:
@IDLookupMyChemInfo(input_types, output_types) def load_document(doc_lst): for d in doc_lst: yield d
Lookup fields are configured in the ‘lookup_fields’ object, examples of which can be found in ‘IDLookupMyGeneInfo’ and ‘IDLookupMyChemInfo’.
- Required Options:
- input_types
‘type’
(‘type’, ‘nested_source_field’)
[(‘type1’, ‘nested.source_field1’), (‘type2’, ‘nested.source_field2’), …]
- output_types:
‘type’
[‘type1’, ‘type2’]
Additional Options: see DataTransform class
Initialize the IDLookupAPI object.
-
class
biothings.hub.datatransform.datatransform_api.
DataTransformMyChemInfo
(input_types, output_types=None, skip_on_failure=False, skip_w_regex=None)[source]¶ Single key lookup for MyChemInfo
Initialize the class by seting up the client object.
-
class
biothings.hub.datatransform.datatransform_api.
DataTransformMyGeneInfo
(input_types, output_types=['entrezgene'], skip_on_failure=False, skip_w_regex=None)[source]¶ deprecated
Initialize the class by seting up the client object.
-
class
biothings.hub.datatransform.datatransform_api.
MyChemInfoEdge
(lookup, field, weight=1, label=None, url=None)[source]¶ The MyChemInfoEdge uses the MyChem.info API to convert identifiers.
- Parameters
lookup (str) – The field in the API to search with the input identifier.
field (str) – The field in the API to convert to.
weight (int) – Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1.
-
class
biothings.hub.datatransform.datatransform_api.
MyGeneInfoEdge
(lookup, field, weight=1, label=None, url=None)[source]¶ The MyGeneInfoEdge uses the MyGene.info API to convert identifiers.
- Parameters
lookup (str) – The field in the API to search with the input identifier.
field (str) – The field in the API to convert to.
weight (int) – Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1.
datatransform_mdb¶
DataTransform MDB module - class for performing key lookup using conversions described in a networkx graph.
-
class
biothings.hub.datatransform.datatransform_mdb.
CIMongoDBEdge
(collection_name, lookup, field, weight=1, label=None)[source]¶ Case-insensitive MongoDBEdge
- Parameters
collection_name (str) – The name of the MongoDB collection.
lookup (str) – The field that will match the input identifier in the collection.
field (str) – The output identifier field that will be read out of matching documents.
weight (int) – Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1.
-
class
biothings.hub.datatransform.datatransform_mdb.
DataTransformMDB
(graph, *args, **kwargs)[source]¶ Convert document identifiers from one type to another.
The DataTransformNetworkX module was written as a decorator class which should be applied to the load_data function of a Biothings Uploader. The load_data function yields documents, which are then post processed by call and the ‘id’ key conversion is performed.
- Parameters
graph – nx.DiGraph (networkx 2.1) configuration graph
input_types – A list of input types for the form (identifier, field) where identifier matches a node and field is an optional dotstring field for where the identifier should be read from (the default is ‘_id’).
output_types (list(str)) – A priority list of identifiers to convert to. These identifiers should match nodes in the graph.
id_priority_list (list(str)) – A priority list of identifiers to to sort input and output types by.
skip_on_failure (bool) – If True, documents where identifier conversion fails will be skipped in the final document list.
skip_w_regex (bool) – Do not perform conversion if the identifier matches the regular expression provided to this argument. By default, this option is disabled.
skip_on_success (bool) – If True, documents where identifier conversion succeeds will be skipped in the final document list.
idstruct_class (class) – Override an internal data structure used by the this module (advanced usage)
copy_from_doc (bool) – If true then an identifier is copied from the input source document regardless as to weather it matches an edge or not. (advanced usage)
-
class
biothings.hub.datatransform.datatransform_mdb.
MongoDBEdge
(collection_name, lookup, field, weight=1, label=None, check_index=True)[source]¶ The MongoDBEdge uses data within a MongoDB collection to convert one identifier to another. The input identifier is used to search a collection. The output identifier values are read out of that collection:
- Parameters
collection_name (str) – The name of the MongoDB collection.
lookup (str) – The field that will match the input identifier in the collection.
field (str) – The output identifier field that will be read out of matching documents.
weight (int) – Weights are used to prefer one path over another. The path with the lowest weight is preferred. The default weight is 1.
-
property
collection
¶ getting for collection member variable
-
collection_find
(id_lst, lookup, field)[source]¶ Abstract out (as one line) the call to collection.find
histogram¶
DataTransform Histogram class - track keylookup statistics