import os
import subprocess
import shutil
from boto import connect_s3
from biothings.utils.aws import send_s3_file
from biothings.utils.hub_db import get_src_build
from biothings.utils.mongo import get_src_db, id_feeder, get_target_db, \
get_cache_filename
from biothings.utils.common import anyfile
from biothings import config as btconfig
logging = btconfig.logger
[docs]def export_ids(col_name):
"""
Export all _ids from collection named col_name.
If col_name refers to a build where a cold_collection is defined,
will also extract _ids and sort/uniq them to have the full list of _ids
of the actual merged (cold+hot) collection
Output file is stored in DATA_EXPORT_FOLDER/ids,
defaulting to <DATA_ARCHIVE_ROOT>/export/ids. Output filename is
returned as the end, if successful.
"""
# prepare output directory
DATA_EXPORT_FOLDER = getattr(btconfig, "DATA_EXPORT_FOLDER", None)
if not DATA_EXPORT_FOLDER:
DATA_EXPORT_FOLDER = os.path.join(btconfig.DATA_ARCHIVE_ROOT, "export")
ids_export_folder = os.path.join(DATA_EXPORT_FOLDER, "ids")
if not os.path.exists(ids_export_folder):
logging.debug("Creating export/ids folder: %s" % ids_export_folder)
os.makedirs(ids_export_folder)
build = get_src_build().find_one({"_id": col_name})
cold = None
if build:
col = get_target_db()[col_name]
if build.get("build_config", {}).get("cold_collection"):
cold_name = build["build_config"]["cold_collection"]
cold = get_target_db()[cold_name]
logging.info("Found a cold collection '%s' associated to '%s'" % (cold_name, col_name))
else:
# it's a src
col = get_src_db()[col_name]
# first iterate over all _ids. This will potentially update underlying _id cache it's not valid anymore,
# so we're sure to work with latest data. If cache is valid, this will be pretty fast
logging.info("Screening _ids in collection '%s'" % col.name)
for _id in id_feeder(col, validate_only=True):
pass
# now accessing cache
col_ids_cache = get_cache_filename(col.name)
assert os.path.exists(col_ids_cache)
logging.info("Now using cache file %s" % col_ids_cache)
if cold:
logging.info("Screening _ids in cold collection '%s'" % cold.name)
for _id in id_feeder(cold, validate_only=True):
pass
# now accessing cache
cold_ids_cache = get_cache_filename(cold.name)
assert os.path.exists(cold_ids_cache)
logging.info("Now using cache file %s" % cold_ids_cache)
outfn = os.path.join(ids_export_folder, "%s_ids.xz" % col_name)
# NOTE: can't use anyfile to open cache files and send _id through pipes
# because it would load _id in memory (unless using hacks) so use cat (and
# existing uncompressing ones, like gzcat/xzcat/...) to fully run the pipe
# on the shell
if cold:
fout = anyfile(outfn, "wb")
colext = os.path.splitext(col_ids_cache)[1]
coldext = os.path.splitext(cold_ids_cache)[1]
assert colext == coldext, "Hot and Cold _id cache are compressed differently (%s and %s), it should be the same" % (coldext, coldext)
comp = colext.replace(".", "")
supportedcomps = ["xz", "gz", ""] # no compression allowed as well
assert comp in supportedcomps, "Compression '%s' isn't supported (%s)" % (comp, supportedcomps)
# IDs sent to pipe's input (sort) then compress it (xz)
pcat = subprocess.Popen(["%scat" % comp, col_ids_cache, cold_ids_cache], stdout=subprocess.PIPE)
psort = subprocess.Popen(["sort", "-u"], stdin=pcat.stdout, stdout=subprocess.PIPE, universal_newlines=True)
pcat.stdout.close() # will raise end of pipe error when finished
if comp:
pcomp = subprocess.Popen(["xz", "-c"], stdin=psort.stdout, stdout=fout)
else:
# just print stdin to stdout
pcomp = subprocess.Popen(["tee"], stdin=psort.stdout, stdout=fout)
psort.stdout.close()
try:
logging.info("Running pipe to compute list of unique _ids")
(out, err) = pcomp.communicate() # run the pipe! (blocking)
if err:
raise Exception(err)
except Exception as e:
logging.error("Error while running pipe to export _ids: %s" % e)
# make sure to clean empty or half processed files
try:
os.unlink(outfn)
finally:
pass
raise
else:
logging.info("Copying cache _id file")
try:
shutil.copyfile(col_ids_cache, outfn)
except Exception as e:
logging.error("Error while exporting _ids: %s" % e)
# make sure to clean empty or half processed files
try:
os.unlink(outfn)
finally:
pass
raise
logging.info("Done exporting _ids to '%s'" % outfn)
return outfn
[docs]def upload_ids(ids_file, redirect_from, s3_bucket, aws_key, aws_secret):
"""
Upload file ids_file into s3_bucket and modify redirect_from key's metadata
so redirect_from link will now point to ids_file
redirect_from s3 key must exist.
"""
# cache file should be named the same as target_name
logging.info("Uploading _ids file %s to s3 (bucket: %s)" % (repr(ids_file), s3_bucket))
s3path = os.path.basename(ids_file)
send_s3_file(ids_file, s3path, overwrite=True,
s3_bucket=s3_bucket,
aws_key=aws_key,
aws_secret=aws_secret)
# make the file public
s3 = connect_s3(aws_key, aws_secret)
bucket = s3.get_bucket(s3_bucket)
s3key = bucket.get_key(s3path)
s3key.set_acl("public-read")
# update permissions and redirect metadata
k = bucket.get_key(redirect_from)
assert k, "Can't find s3 key '%s' to set redirection" % redirect_from
k.set_redirect("/%s" % s3path)
k.set_acl("public-read")
logging.info("IDs file '%s' uploaded to s3, redirection set from '%s'" % (ids_file, redirect_from), extra={"notify": True})