Source code for biothings.utils.mongo

import time, logging, os, io, glob, datetime
import dateutil.parser as dtparser
from functools import wraps
from pymongo import MongoClient, DESCENDING
from pymongo.collection import Collection
from functools import partial
from collections import defaultdict
import bson

from biothings.utils.common import timesofar, get_random_string, iter_n, \
                                   open_compressed_file, get_compressed_outfile, \
                                   dotdict
from biothings.utils.backend import DocESBackend, DocMongoBackend
from biothings.utils.hub_db import IDatabase, ChangeWatcher
# stub, until set to real config module
config = None


[docs]class DummyCollection(dotdict): def count(self): return None def drop(self): pass def __getitem__(self,what): return DummyCollection() # ???
[docs]class DummyDatabase(dotdict): def collection_names(self): return [] def __getitem__(self,what): return DummyCollection()
[docs]class Database(MongoClient,IDatabase): def __init__(self,dbname,*args,**kwargs): super(Database,self).__init__(dbname) self.name = dbname
def requires_config(func): @wraps(func) def func_wrapper(*args,**kwargs): global config if not config: try: from biothings import config as config_mod config = config_mod except ImportError: raise Exception("call biothings.config_for_app() first") return func(*args,**kwargs) return func_wrapper @requires_config def get_conn(server, port): try: if config.DATA_SRC_SERVER_USERNAME and config.DATA_SRC_SERVER_PASSWORD: uri = "mongodb://{}:{}@{}:{}".format(config.DATA_SRC_SERVER_USERNAME, config.DATA_SRC_SERVER_PASSWORD, server, port) else: uri = "mongodb://{}:{}".format(server, port) conn = Database(uri) return conn except (AttributeError,ValueError) as e: # missing config variables (or invalid), we'll pretend it's a dummy access to mongo # (dummy here means there really shouldn't be any call to get_conn() # but mongo is too much tied to the code and needs more work to # unlink it return DummyDatabase() @requires_config def get_hub_db_conn(): conn = Database(config.HUB_DB_BACKEND["uri"]) return conn @requires_config def get_src_conn(): return get_conn(config.DATA_SRC_SERVER, getattr(config,"DATA_SRC_PORT",27017)) @requires_config def get_src_db(conn=None): conn = conn or get_src_conn() return conn[config.DATA_SRC_DATABASE] @requires_config def get_src_master(conn=None): conn = conn or get_hub_db_conn() return conn[config.DATA_HUB_DB_DATABASE][config.DATA_SRC_MASTER_COLLECTION] @requires_config def get_src_dump(conn=None): conn = conn or get_hub_db_conn() return conn[config.DATA_HUB_DB_DATABASE][getattr(config,"DATA_SRC_DUMP_COLLECTION","src_dump")] @requires_config def get_src_build(conn=None): conn = conn or get_hub_db_conn() return conn[config.DATA_HUB_DB_DATABASE][config.DATA_SRC_BUILD_COLLECTION] @requires_config def get_src_build_config(conn=None): conn = conn or get_hub_db_conn() return conn[config.DATA_HUB_DB_DATABASE][config.DATA_SRC_BUILD_COLLECTION + "_config"] @requires_config def get_data_plugin(conn=None): conn = conn or get_hub_db_conn() return conn[config.DATA_HUB_DB_DATABASE][config.DATA_PLUGIN_COLLECTION] @requires_config def get_api(conn=None): conn = conn or get_hub_db_conn() return conn[config.DATA_HUB_DB_DATABASE][config.API_COLLECTION] @requires_config def get_cmd(conn=None): conn = conn or get_hub_db_conn() return conn[config.DATA_HUB_DB_DATABASE][config.CMD_COLLECTION] @requires_config def get_event(conn=None): conn = conn or get_hub_db_conn() return conn[config.DATA_HUB_DB_DATABASE][getattr(config,"EVENT_COLLECTION","event")] @requires_config def get_hub_config(conn=None): conn = conn or get_hub_db_conn() return conn[config.DATA_HUB_DB_DATABASE][getattr(config,"HUB_CONFIG_COLLECTION","hub_config")] @requires_config def get_last_command(conn=None): cmd = get_cmd(conn) cur = cmd.find({},{"_id":1}).sort("_id",DESCENDING).limit(1) return next(cur) @requires_config def get_target_conn(): if config.DATA_TARGET_SERVER_USERNAME and config.DATA_TARGET_SERVER_PASSWORD: uri = "mongodb://{}:{}@{}:{}".format(config.DATA_TARGET_SERVER_USERNAME, config.DATA_TARGET_SERVER_PASSWORD, config.DATA_TARGET_SERVER, config.DATA_TARGET_PORT) else: uri = "mongodb://{}:{}".format(config.DATA_TARGET_SERVER,config.DATA_TARGET_PORT) conn = Database(uri) return conn @requires_config def get_target_db(conn=None): conn = conn or get_target_conn() return conn[config.DATA_TARGET_DATABASE] @requires_config def get_target_master(conn=None): conn = conn or get_target_conn() return conn[config.DATA_TARGET_DATABASE][config.DATA_TARGET_MASTER_COLLECTION]
[docs]@requires_config def get_source_fullname(col_name): """ Assuming col_name is a collection created from an upload process, find the main source & sub_source associated. """ src_dump = get_src_dump() # "sources" in config is a list a collection names. src_dump _id is the name of the # resource but can have sub-resources with different collection names. We need # to query inner keys upload.job.*.step, which always contains the collection name info = src_dump.find_one({"$where":"function() {if(this.upload) {for(var index in this.upload.jobs) {if(this.upload.jobs[index].step == \"%s\") return this;}}}" % col_name}) if info: name = info["_id"] if name != col_name: # col_name was a sub-source name return "%s.%s" % (name,col_name) else: return name
def get_source_fullnames(col_names): main_sources = set() for col_name in col_names: main_source = get_source_fullname(col_name) if main_source: main_sources.add(main_source) return list(main_sources)
[docs]def doc_feeder(collection, step=1000, s=None, e=None, inbatch=False, query=None, batch_callback=None, fields=None, logger=logging): '''A iterator for returning docs in a collection, with batch query. additional filter query can be passed via "query", e.g., doc_feeder(collection, query={'taxid': {'$in': [9606, 10090, 10116]}}) batch_callback is a callback function as fn(cnt, t), called after every batch fields is optional parameter passed to find to restrict fields to return. ''' if isinstance(collection,DocMongoBackend): collection = collection.target_collection cur = collection.find(query, no_cursor_timeout=True, projection=fields) n = cur.count() s = s or 0 e = e or n ##logger.info('Retrieving %d documents from database "%s".' % (n, collection.name)) t0 = time.time() if inbatch: doc_li = [] cnt = 0 t1 = time.time() try: if s: cur.skip(s) cnt = s ##logger.info("Skipping %d documents." % s) if e: cur.limit(e - (s or 0)) cur.batch_size(step) ##logger.info("Processing %d-%d documents..." % (cnt + 1, min(cnt + step, e))) for doc in cur: if inbatch: doc_li.append(doc) else: yield doc cnt += 1 if cnt % step == 0: if inbatch: yield doc_li doc_li = [] if n: pass ##logger.info('Done.[%.1f%%,%s]' % (cnt * 100. / n, timesofar(t1))) else: pass ##logger.info('Nothing to do...') if batch_callback: batch_callback(cnt, time.time()-t1) if cnt < e: t1 = time.time() ##logger.info("Processing %d-%d documents..." % (cnt + 1, min(cnt + step, e))) if inbatch and doc_li: #Important: need to yield the last batch here yield doc_li #print 'Done.[%s]' % timesofar(t1) if n: pass ##logger.info('Done.[%.1f%%,%s]' % (cnt * 100. / n, timesofar(t1))) else: pass ##logger.info('Nothing to do...') ##logger.info("=" * 20) ##logger.info('Finished.[total time: %s]' % timesofar(t0)) finally: cur.close()
def get_cache_filename(col_name): cache_folder = getattr(config,"CACHE_FOLDER",None) if not cache_folder: return # we don't even use cache, forget it cache_format = getattr(config,"CACHE_FORMAT",None) cache_file = os.path.join(config.CACHE_FOLDER,col_name) cache_file = cache_format and (cache_file + ".%s" % cache_format) or cache_file return cache_file def invalidate_cache(col_name,col_type="src"): if col_type == "src": src_dump = get_src_dump() if not "." in col_name: fullname = get_source_fullname(col_name) assert fullname, "Can't resolve source '%s' (does it exist ?)" % col_name main,sub = fullname.split(".") doc = src_dump.find_one({"_id":main}) assert doc, "No such source '%s'" % main assert doc.get("upload",{}).get("jobs",{}).get(sub), "No such sub-source '%s'" % sub # this will make the cache too old doc["upload"]["jobs"][sub]["started_at"] = datetime.datetime.now() src_dump.update_one({"_id":main},{"$set" : {"upload.jobs.%s.started_at" % sub:datetime.datetime.now()}}) elif col_type == "target": # just delete the cache file cache_file = get_cache_filename(col_name) if cache_file: try: os.remove(cache_file) except FileNotFoundError: pass # TODO: this func deals with different backend, should not be in bt.utils.mongo # and doc_feeder should do the same as this function regarding backend support
[docs]@requires_config def id_feeder(col, batch_size=1000, build_cache=True, logger=logging, force_use=False, force_build=False, validate_only=False): """Return an iterator for all _ids in collection "col" Search for a valid cache file if available, if not return a doc_feeder for that collection. Valid cache is a cache file that is newer than the collection. "db" can be "target" or "src". "build_cache" True will build a cache file as _ids are fetched, if no cache file was found "force_use" True will use any existing cache file and won't check whether it's valid of not. "force_build" True will build a new cache even if current one exists and is valid. "validate_only" will directly return [] if the cache is valid (convenient way to check if the cache is valid) """ src_db = get_src_db() ts = None found_meta = True if isinstance(col,DocMongoBackend): col = col.target_collection try: if col.database.name == config.DATA_TARGET_DATABASE: info = src_db["src_build"].find_one({"_id": col.name}) if not info: logger.warning("Can't find information for target collection '%s'" % col.name) else: ts = info.get("_meta",{}).get("build_date") ts = ts and dtparser.parse(ts).timestamp() elif col.database.name == config.DATA_SRC_DATABASE: src_dump = get_src_dump() info = src_dump.find_one({"$where":"function() {if(this.upload) {for(var index in this.upload.jobs) {if(this.upload.jobs[index].step == \"%s\") return this;}}}" % col.name}) if not info: logger.warning("Can't find information for source collection '%s'" % col.name) else: ts = info["upload"]["jobs"][col.name]["started_at"].timestamp() else: logging.warning("Can't find metadata for collection '%s' (not a target, not a source collection)" % col) found_meta = False build_cache = False except KeyError: logger.warning("Couldn't find timestamp in database for '%s'" % col.name) except Exception as e: logger.info("%s is not a mongo collection, _id cache won't be built (error: %s)" % (col,e)) build_cache = False # try to find a cache file use_cache = False cache_file = None cache_format = getattr(config,"CACHE_FORMAT",None) if found_meta and getattr(config,"CACHE_FOLDER",None): cache_file = get_cache_filename(col.name) try: # size of empty file differs depending on compression empty_size = {None:0,"xz":32,"gzip":25,"bz2":14} if force_build: logger.warning("Force building cache file") use_cache = False # check size, delete if invalid elif os.path.getsize(cache_file) <= empty_size.get(cache_format,32): logger.warning("Cache file exists but is empty, delete it") os.remove(cache_file) elif force_use: use_cache = True logger.info("Force using cache file") else: mt = os.path.getmtime(cache_file) if ts and mt >= ts: dtmt = datetime.datetime.fromtimestamp(mt).isoformat() dtts = datetime.datetime.fromtimestamp(ts).isoformat() logging.debug("Cache is valid, modiftime_cache:%s >= col_timestamp:%s" % (dtmt,dtts)) use_cache = True else: logger.info("Cache is too old, discard it") except FileNotFoundError: pass if use_cache: logger.debug("Found valid cache file for '%s': %s" % (col.name,cache_file)) if validate_only: logging.debug("Only validating cache, now return") return [] with open_compressed_file(cache_file) as cache_in: if cache_format: iocache = io.TextIOWrapper(cache_in) else: iocache = cache_in for ids in iter_n(iocache,batch_size): yield [_id.strip() for _id in ids if _id.strip()] else: logger.debug("No cache file found (or invalid) for '%s', use doc_feeder" % col.name) cache_out = None cache_temp = None if getattr(config,"CACHE_FOLDER",None) and config.CACHE_FOLDER and build_cache: if not os.path.exists(config.CACHE_FOLDER): os.makedirs(config.CACHE_FOLDER) cache_temp = "%s._tmp_" % cache_file # clean aborted cache file generation for tmpcache in glob.glob(os.path.join(config.CACHE_FOLDER,"%s*" % cache_temp)): logger.info("Removing aborted cache file '%s'" % tmpcache) os.remove(tmpcache) # use temp file and rename once done cache_temp = "%s%s" % (cache_temp,get_random_string()) cache_out = get_compressed_outfile(cache_temp,compress=cache_format) logger.info("Building cache file '%s'" % cache_temp) else: logger.info("Can't build cache, cache not allowed or no cache folder") build_cache = False if isinstance(col,Collection): doc_feeder_func = partial(doc_feeder,col, step=batch_size, inbatch=True, fields={"_id":1}) elif isinstance(col,DocMongoBackend): doc_feeder_func = partial(doc_feeder,col.target_collection, step=batch_size, inbatch=True, fields={"_id":1}) elif isinstance(col,DocESBackend): # get_id_list directly return the _id, wrap it to match other # doc_feeder_func returned vals. Also return a batch of id def wrap_id(): ids = [] for _id in col.get_id_list(step=batch_size): ids.append({"_id":_id}) if len(ids) >= batch_size: yield ids ids = [] if ids: yield ids doc_feeder_func = partial(wrap_id) else: raise Exception("Unknown backend %s" % col) for doc_ids in doc_feeder_func(): doc_ids = [str(_doc["_id"]) for _doc in doc_ids] if build_cache: strout = "\n".join(doc_ids) + "\n" if cache_format: # assuming binary format (b/ccompressed) cache_out.write(strout.encode()) else: cache_out.write(strout) yield doc_ids if build_cache: cache_out.close() cache_final = os.path.splitext(cache_temp)[0] try: os.rename(cache_temp,cache_final) except Exception as e: logger.exception("Couldn't set final cache filename, building cache failed")
[docs]def check_document_size(doc): """ Return True if doc isn't too large for mongo DB """ return len(bson.BSON.encode(doc)) < 16777216 #16*1024*1024
[docs]def get_previous_collection(new_id): """ Given 'new_id', an _id from src_build, as the "new" collection, automatically select an "old" collection. By default, src_build's documents will be sorted according to their name (_id) and old colleciton is the one just before new_id. Note: because there can more than one build config used, the actual build config name is first determined using new_id collection name, then the find.sort is done on collections containing that build config name. """ # TODO: this is not compatible with a generic hub_db backend # TODO: this should return a collection with status=success col = get_src_build() doc = col.find_one({"_id":new_id}) assert doc, "No build document found for '%s'" % new_id assert "build_config" in doc, "No build configuration found for document '%s'" % new_id assert doc["build_config"]["name"] == doc["build_config"]["_id"] confname = doc["build_config"]["name"] docs = get_src_build().find({ "$and":[ {"started_at":{"$lte":doc["started_at"]}}, {"build_config.name":confname}, {"archived":{"$exists":0}}, ]}, {"_id":1}).sort([("started_at",-1)]).limit(2) _ids = [d["_id"] for d in docs] assert len(_ids) == 2, "Expecting 2 collection _ids, got: %s" % _ids assert _ids[0] == new_id, "Can't find collection _id '%s'" % new_id return _ids[1]