import types
import copy
import time
import logging
from pymongo.errors import DuplicateKeyError, BulkWriteError
from biothings.utils.common import timesofar, iter_n
from biothings.utils.dataload import merge_struct, merge_root_keys
from biothings.utils.mongo import get_src_db, check_document_size
[docs]class StorageException(Exception):
pass
class BaseStorage(object):
def __init__(self, db, dest_col_name, logger=logging):
db = db or get_src_db()
self.temp_collection = db[dest_col_name]
self.logger = logger
def process(self, iterable, *args, **kwargs):
"""
Process iterable to store data. Must return the number
of inserted records (even 0 if none)
"""
raise NotImplementedError("implement-me in subclass")
def check_doc_func(self, doc):
"""
Return doc if it's alright, False if doc should be ignore for some reason
Subclass and override as needed.
"""
return doc
class CheckSizeStorage(BaseStorage):
def check_doc_func(self, doc):
ok = check_document_size(doc)
# this is typically used to skip LFQSCWFLJHTTHZ-UHFFFAOYSA-N (Ethanol)
# because there are too many elements in "ndc" list
if not ok:
self.logger.warning("Skip document '%s' because too large" %
doc.get("_id"))
return False
return ok
class BasicStorage(BaseStorage):
def doc_iterator(self, doc_d, batch=True, batch_size=10000):
if (isinstance(doc_d, types.GeneratorType)
or isinstance(doc_d, list)) and batch:
for doc_li in iter_n(doc_d, n=batch_size):
doc_li = [d for d in doc_li if self.check_doc_func(d)]
yield doc_li
else:
if batch:
doc_li = []
i = 0
for _id, doc in doc_d.items():
doc['_id'] = _id
_doc = {}
_doc.update(doc)
if batch:
doc_li.append(_doc)
i += 1
if i % batch_size == 0:
doc_li = [d for d in doc_li if self.check_doc_func(d)]
yield doc_li
doc_li = []
else:
yield self.check_doc_func(_doc)
if batch:
doc_li = [d for d in doc_li if self.check_doc_func(d)]
yield doc_li
def process(self, doc_d, batch_size):
self.logger.info("Uploading to the DB...")
t0 = time.time()
total = 0
for doc_li in self.doc_iterator(doc_d,
batch=True,
batch_size=batch_size):
self.temp_collection.insert(doc_li,
manipulate=False,
check_keys=False)
total += len(doc_li)
self.logger.info('Done[%s]' % timesofar(t0))
return total
[docs]class MergerStorage(BasicStorage):
"""
This storage will try to merge documents when finding duplicated errors.
It's useful when data is parsed using iterator. A record can be stored in database,
then later, another record with the same ID is sent to the db, raising a duplicated error.
These two documents would have been merged before using a 'put all in memory' parser.
Since data is here read line by line, the merge is done while storing
"""
merge_func = merge_struct
[docs] def process(self, doc_d, batch_size):
self.logger.info("Uploading to the DB...")
t0 = time.time()
tinner = time.time()
aslistofdict = None
total = 0
for doc_li in self.doc_iterator(doc_d,
batch=True,
batch_size=batch_size):
toinsert = len(doc_li)
nbinsert = 0
self.logger.info("Inserting %s records ... " % toinsert)
try:
bob = self.temp_collection.initialize_unordered_bulk_op()
for d in doc_li:
aslistofdict = d.pop("__aslistofdict__", None)
bob.insert(d)
res = bob.execute()
nbinsert += res["nInserted"]
self.logger.info("OK [%s]" % timesofar(tinner))
except BulkWriteError as e:
inserted = e.details["nInserted"]
nbinsert += inserted
self.logger.info("Fixing %d records " %
len(e.details["writeErrors"]))
ids = [d["op"]["_id"] for d in e.details["writeErrors"]]
# build hash of existing docs
docs = self.temp_collection.find({"_id": {"$in": ids}})
hdocs = {}
for doc in docs:
hdocs[doc["_id"]] = doc
bob2 = self.temp_collection.initialize_unordered_bulk_op()
for err in e.details["writeErrors"]:
errdoc = err["op"]
existing = hdocs[errdoc["_id"]]
if errdoc is existing:
# if the same document has been yielded twice,
# they could be the same, so we ignore it but
# count it as processed (see assert below)
nbinsert += 1
continue
assert "_id" in existing
_id = errdoc.pop("_id")
merged = self.__class__.merge_func(
errdoc, existing, aslistofdict=aslistofdict)
# update previously fetched doc. if several errors are about the same doc id,
# we would't merged things properly without an updated document
assert "_id" in merged
bob2.find({"_id": _id}).update_one({"$set": merged})
hdocs[_id] = merged
nbinsert += 1
res = bob2.execute()
self.logger.info("OK [%s]" % timesofar(tinner))
assert nbinsert == toinsert, "nb %s to %s" % (nbinsert, toinsert)
# end of loop so it counts the time spent in doc_iterator
tinner = time.time()
total += nbinsert
self.logger.info('Done[%s]' % timesofar(t0))
return total
[docs]class RootKeyMergerStorage(MergerStorage):
"""
Just like MergerStorage, this storage deals with duplicated error
by appending key's content to existing document. Keys in existing
document will be converted to a list as needed.
Note:
- root keys must have the same type in each documents
- inner structures aren't merged together, the merge happend
at root key level
"""
@classmethod
def merge_func(klass, doc1, doc2, **kwargs):
# caller popped it from doc1, let's take from doc2
_id = doc2["_id"]
# exclude_id will remove _if from doc2, that's why we kept it from before
# also, copy doc2 ref as the merged doc will be stored in
# a bulk op object, since doc2 is modified in place, this could impact
# the bulk op and procude empty $set error from mongo
doc = merge_root_keys(doc1, copy.copy(doc2), exclude=["_id"])
doc["_id"] = _id
return doc
class IgnoreDuplicatedStorage(BasicStorage):
def process(self, iterable, batch_size):
self.logger.info("Uploading to the DB...")
t0 = time.time()
tinner = time.time()
total = 0
for doc_li in self.doc_iterator(iterable,
batch=True,
batch_size=batch_size):
try:
bob = self.temp_collection.initialize_unordered_bulk_op()
for d in doc_li:
bob.insert(d)
res = bob.execute()
total += res['nInserted']
self.logger.info("Inserted %s records [%s]" %
(res['nInserted'], timesofar(tinner)))
except BulkWriteError as e:
self.logger.info(
"Inserted %s records, ignoring %d [%s]" %
(e.details['nInserted'], len(
e.details["writeErrors"]), timesofar(tinner)))
except Exception:
raise
tinner = time.time()
self.logger.info('Done[%s]' % timesofar(t0))
return total
[docs]class NoBatchIgnoreDuplicatedStorage(BasicStorage):
"""
You should use IgnoreDuplicatedStorag, which works using batch
and is thus way faster...
"""
[docs] def process(self, doc_d, batch_size):
self.logger.info("Uploading to the DB...")
t0 = time.time()
tinner = time.time()
# force step = 1
cnt = 0
total = 0
dups = 0
for doc_li in self.doc_iterator(doc_d, batch=True, batch_size=1):
try:
self.temp_collection.insert(doc_li,
manipulate=False,
check_keys=False)
cnt += 1
total += 1
if (cnt + dups) % batch_size == 0:
# we insert one by one but display progress on a "batch_size" base
self.logger.info("Inserted %s records, ignoring %s [%s]" %
(cnt, dups, timesofar(tinner)))
cnt = 0
dups = 0
tinner = time.time()
except DuplicateKeyError:
dups += 1
pass
self.logger.info('Done[%s]' % timesofar(t0))
return total
[docs]class UpsertStorage(BasicStorage):
"""Insert or update documents, based on _id"""
[docs] def process(self, iterable, batch_size):
self.logger.info("Uploading to the DB...")
t0 = time.time()
tinner = time.time()
total = 0
for doc_li in self.doc_iterator(iterable,
batch=True,
batch_size=batch_size):
try:
bob = self.temp_collection.initialize_unordered_bulk_op()
for d in doc_li:
bob.find({"_id": d["_id"]}).upsert().replace_one(d)
res = bob.execute()
nb = res["nUpserted"] + res["nModified"]
total += nb
self.logger.info("Upserted %s records [%s]" %
(nb, timesofar(tinner)))
except Exception:
raise
tinner = time.time()
self.logger.info('Done[%s]' % timesofar(t0))
return total
[docs]class NoStorage(object):
"""
This a kind of a place-holder, this storage will just store nothing...
(but it will respect storage interface)
"""
def __init__(self, db_info, dest_col_name, logger):
db = get_src_db()
self.temp_collection = db[dest_col_name]
self.logger = logger
def process(self, iterable, *args, **kwargs):
self.logger.info("NoStorage stores nothing, skip...")
return 0