Source code for biothings.cli.dataplugin

from typing import Optional

import typer
from typing_extensions import Annotated

from biothings.cli import utils

logger = utils.get_logger("dataplugin")

short_help = (
    "[green]Test an individual data plugin locally and make simple queries to inspect your parsed data objects.[/green]"
long_help = (
    + "\n\n[magenta]   :sparkles: Go to your existing data plugin folder.[/magenta]"
    + "\n[magenta]   :sparkles: Dumping, uploading and inspecting your data plugin.[/magenta]"
    + "\n[magenta]   :sparkles: Serving your data as a web service for making simple queries[/magenta]"
    + "\n\n[green]   :point_right: Always run this command inside of your data plugin folder.[/green]"
    + "\n[green]   :point_right: Default traceback errors are kept minimal, but you can set [bold]BTCLI_RICH_TRACEBACK=1[/bold][/green]"
    + "\n[green]      ENV variable to enable full and pretty-formatted tracebacks, [/green]"
    + "\n[green]      or set [bold]BTCLI_DEBUG=1[/bold] to enable even more debug logs for debugging purpose.[/green]"
    + "\n[green]   :point_right: You can include a at the working directly to override the default biothings.config settings.[/green]"
    + "\n   :rocket::boom::sparkling_heart:"

app = typer.Typer(

[docs] @app.command( name="create", help="Create a new data plugin from the tempplate", ) def create_data_plugin( name: Annotated[ str, typer.Option("--name", "-n", help="Provide a data plugin name", prompt="What's your data plugin name?"), ] = "", multi_uploaders: Annotated[ Optional[bool], typer.Option("--multi-uploaders", help="If provided, the data plugin includes multiple uploaders"), ] = False, # parallelizer: bool = typer.Option(False, "--parallelizer", help="Using parallelizer or not? Default: No"), parallelizer: Annotated[ Optional[bool], typer.Option("--parallelizer", help="If provided, the data plugin's upload step will run in parallel"), ] = False, ): """*create* command for creating a new data plugin from the template""" utils.do_create(name, multi_uploaders, parallelizer, logger=logger)
[docs] @app.command( name="dump", help="Download source data files to local", ) def dump_data(): """*dump* command for downloading source data files to local""" utils.do_dump(plugin_name=None, logger=logger)
[docs] @app.command( name="upload", help="Convert downloaded data from dump step into JSON documents and upload the to the source database", ) def upload_source( batch_limit: Annotated[ Optional[int], typer.Option( "--batch-limit", help="The maximum number of batches that should be uploaded. Batch size is 1000 docs", ), ] = None, ): """*upload* command for converting downloaded data from dump step into JSON documents and upload the to the source database. A local sqlite database used to store the uploaded data""" utils.do_upload(plugin_name=None, logger=logger)
[docs] @app.command( "dump_and_upload", help="Download data source to local folder then convert to Json document and upload to the source database", ) def dump_and_upload( # multi_uploaders: bool = typer.Option( # False, "--multi-uploaders", help="Add this option if you want to create multiple uploaders" # ), # parallelizer: bool = typer.Option( # False, "--parallelizer", help="Using parallelizer or not? Default: No" # ), ): """*dump_and_upload* command for downloading source data files to local, then converting them into JSON documents and uploading them to the source database. Two steps in one command.""" utils.do_dump_and_upload(plugin_name=None, logger=logger)
[docs] @app.command( name="list", help="Listing dumped files or uploaded sources", ) def listing( dump: Annotated[Optional[bool], typer.Option("--dump", help="Listing dumped files")] = False, upload: Annotated[Optional[bool], typer.Option("--upload", help="Listing uploaded sources")] = False, hubdb: Annotated[Optional[bool], typer.Option("--hubdb", help="Listing internal hubdb content")] = False, ): """*list* command for listing dumped files and/or uploaded sources""" utils.do_list(plugin_name=None, dump=dump, upload=upload, hubdb=hubdb, logger=logger)
[docs] @app.command( name="inspect", help="Giving detailed information about the structure of documents coming from the parser", ) def inspect_source( sub_source_name: Annotated[ Optional[str], typer.Option("--sub-source-name", "-s", help="Your sub source name") ] = "", mode: Annotated[ Optional[str], typer.Option( "--mode", "-m", help=""" The inspect mode or list of modes (comma separated), e.g. "type,mapping".\n Possible values are:\n - "type": explore documents and report strict data structure\n - "mapping": same as type but also perform test on data so guess best mapping\n (eg. check if a string is splitable, etc...). Implies merge=True\n - "stats": explore documents and compute basic stats (count,min,max,sum)\n """, ), ] = "type,stats", limit: Annotated[ Optional[int], typer.Option( "--limit", "-l", help=""" can limit the inspection to the x first docs (None = no limit, inspects all) """, ), ] = None, # merge: Annotated[ # Optional[bool], # typer.Option( # "--merge", # "-m", # help="""Merge scalar into list when both exist (eg. {"val":..} and [{"val":...}])""", # ), # ] = False, output: Annotated[ Optional[str], typer.Option( "--output", "-o", help="The local JSON file path for storing mapping info if you run with mode 'mapping' (absolute path or relative path)", ), ] = None, ): """*inspect* command for giving detailed information about the structure of documents coming from the parser after the upload step""" utils.do_inspect( plugin_name=None, sub_source_name=sub_source_name, mode=mode, limit=limit, merge=False, output=output, logger=logger, )
[docs] @app.command(name="serve") def serve( host: Annotated[ Optional[str], typer.Option( "--host", help="The host name to run the test API server", ), ] = "localhost", port: Annotated[ Optional[int], typer.Option( "--port", "-p", help="The port number to tun the test API server", ), ] = 9999, ): """ *serve* command runs a simple API server for serving documents from the source database. For example, after run 'dump_and_upload', we have a source_name = "test" with a document structure like this: doc = {"_id": "123", "key": {"a":{"b": "1"},"x":[{"y": "3", "z": "4"}, "5"]}}. An API server will run at http://host:port/<your source name>/, like http://localhost:9999/test/: - You can see all available sources on the index page: http://localhost:9999/ - You can list all docs: http://localhost:9999/test/ (default is to return the first 10 docs) - You can paginate doc list: http://localhost:9999/test/?start=10&limit=10 - You can retrieve a doc by id: http://localhost:9999/test/123 - You can filter out docs with one or multiple fielded terms: - http://localhost:9999/test/?q=key.a.b:1 (query by any field with dot notation like key.a.b=1) - http://localhost:9999/test/?q=key.a.b:1%20AND%20key.x.y=3 (find all docs that match two fields) - http://localhost:9999/test/?q=key.x.z:4* (field value can contain wildcard * or ?) - http://localhost:9999/test/?q=key.x:5&start=10&limit=10 (pagination also works) """ utils.do_serve(plugin_name=None, host=host, port=port, logger=logger)
[docs] @app.command( name="clean", help="Delete all dumped files and drop uploaded sources tables", no_args_is_help=True, ) def clean_data( dump: Annotated[Optional[bool], typer.Option("--dump", help="Delete all dumped files")] = False, upload: Annotated[Optional[bool], typer.Option("--upload", help="Drop uploaded sources tables")] = False, clean_all: Annotated[ Optional[bool], typer.Option( "--all", help="Delete all dumped files and drop uploaded sources tables", ), ] = False, ): """*clean* command for deleting all dumped files and/or drop uploaded sources tables""" utils.do_clean(plugin_name=None, dump=dump, upload=upload, clean_all=clean_all, logger=logger)