import pathlib
from typing import Callable, Generator, Iterable, Optional
from urllib.parse import parse_qsl, urlparse
import orjson
[docs]def ndjson_parser(
patterns: Optional[Iterable[str]] = None,
) -> Callable[[str], Generator[dict, None, None]]:
"""
Create NDJSON Parser given filename patterns
For use with manifest.json based plugins.
Caveat: Only handles valid NDJSON (no extra newlines, UTF8, etc.)
Args:
patterns: glob-compatible patterns for filenames, like *.ndjson, data*.ndjson
Returns:
parser_func: Generator that takes in a data_folder and returns documents from
NDJSON files that matches the filename patterns
"""
if patterns is None:
raise TypeError("Must provide keyword argument patterns to" "match files for NDJSON Parser")
def ndjson_parser_func(data_folder):
work_dir = pathlib.Path(data_folder)
for pattern in patterns:
for filename in work_dir.glob(pattern):
with open(filename, "rb") as f:
for line in f:
doc = orjson.loads(line)
yield doc
return ndjson_parser_func
[docs]def json_array_parser(
patterns: Optional[Iterable[str]] = None,
) -> Callable[[str], Generator[dict, None, None]]:
"""
Create JSON Array Parser given filename patterns
For use with manifest.json based plugins. The data comes in a JSON that is
an JSON array, containing multiple documents.
Args:
patterns: glob-compatible patterns for filenames, like *.json, data*.json
Returns:
parser_func
"""
if patterns is None:
raise TypeError("Must provide keyword argument patterns to" "match files for JSON Array Parser")
def json_array_parser(data_folder):
work_dir = pathlib.Path(data_folder)
for pattern in patterns:
for filename in work_dir.glob(pattern):
with open(filename, "r") as f:
data = orjson.loads(f.read())
try:
iterator = iter(data)
except TypeError:
raise RuntimeError(f"{filename} does not contain a valid" "JSON Array")
for doc in iterator:
yield doc
return json_array_parser
[docs]def docker_source_info_parser(url):
"""
:param url: file url include docker connection string
format: docker://CONNECTION_NAME?image=DOCKER_IMAGE&tag=TAG&dump_command="python run.py"&path=/path/to/file
the CONNECTION_NAME must be defined in the biothings Hub config.
example:
docker://CONNECTION_NAME?image=docker_image&tag=docker_tag&dump_command="python run.py"&path=/path/to/file
docker://CONNECTION_NAME?image=docker_image&tag=docker_tag&dump_command="python run.py"&path=/path/to/file
docker://CONNECTION_NAME?image=docker_image&tag=docker_tag&dump_command="python run.py"&path=/path/to/file
docker"//CONNECTION_NAME?image=docker_image&tag=docker_tag&dump_command="python run.py"&path=/path/to/file
:return:
"""
parsed = urlparse(url)
query = dict(parse_qsl(parsed.query))
image = query.get("image")
image_tag = query.get("tag")
dump_command = query.get("dump_command")
keep_container = query.get("keep_container")
container_name = query.get("container_name")
get_version_cmd = query.get("get_version_cmd")
if keep_container:
keep_container = keep_container.lower() in {"true", "yes", "1", "y"}
if dump_command:
dump_command = dump_command.strip('"')
if get_version_cmd:
get_version_cmd = get_version_cmd.strip('"')
if not image_tag:
image_tag = "latest"
docker_image = image and f"{image}:{image_tag}" or None
source_config = {
"docker_image": docker_image,
"path": query.get("path"),
"dump_command": dump_command,
"connection_name": parsed.netloc,
"container_name": container_name,
"keep_container": keep_container,
"get_version_cmd": get_version_cmd,
}
if keep_container is None:
# remove keep_container if not set, so that later we can check its value from image/container metadata
source_config.pop("keep_container")
return source_config