"""
Utils to compare two list of gene documents, requires to setup Biothing Hub.
"""
import os
import os.path
import time
# from ..hub.databuild.backend import create_backend
from .backend import DocMongoDBBackend
from .common import dump, filter_dict, get_timestamp, timesofar
from .diff_common import full_diff_doc
# from .es import ESIndexer
from .jsondiff import make as jsondiff
[docs]
def two_docs_iterator(b1, b2, id_list, step=10000, verbose=False):
t0 = time.time()
n = len(id_list)
for i in range(0, n, step):
t1 = time.time()
if verbose:
print("Processing %d-%d documents..." % (i + 1, min(i + step, n)), end="")
_ids = id_list[i : i + step]
iter1 = sorted([d for d in b1.mget_from_ids(_ids, asiter=True)], key=lambda a: a["_id"])
iter2 = sorted([d for d in b2.mget_from_ids(_ids, asiter=True)], key=lambda a: a["_id"])
for doc1, doc2 in zip(iter1, iter2):
yield doc1, doc2
if verbose:
print("Done.[%.1f%%,%s]" % (i * 100.0 / n, timesofar(t1)))
if verbose:
print("=" * 20)
print("Finished.[total time: %s]" % timesofar(t0))
def _diff_doc_worker(args):
_b1, _b2, ids, _path = args
import importlib
import biothings.utils.diff
importlib.reload(biothings.utils.diff)
from biothings.utils.diff import _diff_doc_inner_worker, get_backend
b1 = get_backend(*_b1)
b2 = get_backend(*_b2)
_updates = _diff_doc_inner_worker(b1, b2, ids)
return _updates
def _diff_doc_inner_worker(b1, b2, ids, fastdiff=False, diff_func=full_diff_doc):
"""if fastdiff is True, only compare the whole doc,
do not traverse into each attributes.
"""
_updates = []
for doc1, doc2 in two_docs_iterator(b1, b2, ids):
assert doc1["_id"] == doc2["_id"], repr((ids, len(ids)))
if fastdiff:
if doc1 != doc2:
_updates.append({"_id": doc1["_id"]})
else:
_diff = diff_func(doc1, doc2)
if _diff:
_diff["_id"] = doc1["_id"]
_updates.append(_diff)
return _updates
[docs]
def diff_docs_jsonpatch(b1, b2, ids, fastdiff=False, exclude_attrs=None):
"""if fastdiff is True, only compare the whole doc,
do not traverse into each attributes.
"""
exclude_attrs = exclude_attrs or []
_updates = []
for doc1, doc2 in two_docs_iterator(b1, b2, ids):
assert doc1["_id"] == doc2["_id"], "Different ids: '%s' != '%s'" % (doc1["_id"], doc2["_id"])
if exclude_attrs:
doc1 = filter_dict(doc1, exclude_attrs)
doc2 = filter_dict(doc2, exclude_attrs)
if fastdiff:
if doc1 != doc2:
_updates.append(doc1["_id"])
else:
_patch = jsondiff(doc1, doc2)
if _patch:
_diff = {}
_diff["patch"] = _patch
_diff["_id"] = doc1["_id"]
_updates.append(_diff)
return _updates
# TODO: move to mongodb backend class
[docs]
def get_mongodb_uri(backend):
opt = backend.target_collection.database.client._MongoClient__options.credentials
username = opt and opt.username or None
password = opt and opt.password or None
dbase = opt and opt.source or None
uri = "mongodb://"
if username:
if password:
uri += "%s:%s@" % (username, password)
else:
uri += "%s@" % username
host, port = backend.target_collection.database.client.address
uri += "%s:%s" % (host, port)
uri += "/%s" % (dbase or backend.target_collection.database.name)
# uri += "/%s" % backend.target_collection.name
print("uri: %s" % uri)
return uri
[docs]
def diff_collections(b1, b2, use_parallel=True, step=10000):
"""
b1, b2 are one of supported backend class in databuild.backend.
e.g.::
b1 = DocMongoDBBackend(c1)
b2 = DocMongoDBBackend(c2)
"""
id_s1 = set(b1.get_id_list())
id_s2 = set(b2.get_id_list())
print("Size of collection 1:\t", len(id_s1))
print("Size of collection 2:\t", len(id_s2))
id_in_1 = id_s1 - id_s2
id_in_2 = id_s2 - id_s1
id_common = id_s1 & id_s2
print("# of docs found only in collection 1:\t", len(id_in_1))
print("# of docs found only in collection 2:\t", len(id_in_2))
print("# of docs found in both collections:\t", len(id_common))
print("Comparing matching docs...")
_updates = []
if len(id_common) > 0:
if not use_parallel:
_updates = _diff_doc_inner_worker(b1, b2, list(id_common))
else:
from .parallel import run_jobs_on_ipythoncluster
_path = os.path.split(os.path.split(os.path.abspath(__file__))[0])[0] + "/.."
id_common = list(id_common)
_b1 = (get_mongodb_uri(b1), b1.target_collection.database.name, b1.target_name, b1.name)
_b2 = (get_mongodb_uri(b2), b2.target_collection.database.name, b2.target_name, b2.name)
task_li = [(_b1, _b2, id_common[i : i + step], _path) for i in range(0, len(id_common), step)]
job_results = run_jobs_on_ipythoncluster(_diff_doc_worker, task_li)
_updates = []
if job_results:
for res in job_results:
_updates.extend(res)
else:
print("Parallel jobs failed or were interrupted.")
return None
print("Done. [{} docs changed]".format(len(_updates)))
_deletes = []
if len(id_in_1) > 0:
_deletes = sorted(id_in_1)
_adds = []
if len(id_in_2) > 0:
_adds = sorted(id_in_2)
changes = {"update": _updates, "delete": _deletes, "add": _adds}
return changes
[docs]
def get_backend(uri, db, col, bk_type):
if bk_type != "mongodb":
raise NotImplementedError("Backend type '%s' not supported" % bk_type)
from biothings.utils.mongo import MongoClient
colobj = MongoClient(uri)[db][col]
return DocMongoDBBackend(colobj)
[docs]
def diff_collections_batches(b1, b2, result_dir, step=10000):
"""
b2 is new collection, b1 is old collection
"""
from biothings.utils.mongo import doc_feeder
DIFFFILE_PATH = "/home/kevinxin/diff_result/"
DATA_FOLDER = os.path.join(DIFFFILE_PATH, result_dir)
if not os.path.exists(DATA_FOLDER):
os.mkdir(DATA_FOLDER)
data_new = doc_feeder(b2.target_collection, step=step, inbatch=True, fields=[])
data_old = doc_feeder(b1.target_collection, step=step, inbatch=True, fields=[])
cnt = 0
cnt_update = 0
cnt_add = 0
cnt_delete = 0
for _batch in data_new:
cnt += 1
id_list_new = [_doc["_id"] for _doc in _batch]
docs_common = b1.target_collection.find({"_id": {"$in": id_list_new}}, projection=[])
ids_common = [_doc["_id"] for _doc in docs_common]
id_in_new = list(set(id_list_new) - set(ids_common))
_updates = []
if len(ids_common) > 0:
_updates = diff_docs_jsonpatch(b1, b2, list(ids_common), fastdiff=True)
file_name = DATA_FOLDER + "/" + str(cnt) + ".pyobj"
_result = {
"add": id_in_new,
"update": _updates,
"delete": [],
"source": b2.target_collection.name,
"timestamp": get_timestamp(),
}
if len(_updates) != 0 or len(id_in_new) != 0:
dump(_result, file_name)
print("(Updated: {}, Added: {})".format(len(_updates), len(id_in_new)), end="")
cnt_update += len(_updates)
cnt_add += len(id_in_new)
print(
"Finished calculating diff for the new collection. Total number of docs updated: {}, added: {}".format(
cnt_update, cnt_add
)
)
print("=" * 100)
for _batch in data_old:
cnt += 1
id_list_old = [_doc["_id"] for _doc in _batch]
docs_common = b2.target_collection.find({"_id": {"$in": id_list_old}}, projection=[])
ids_common = [_doc["_id"] for _doc in docs_common]
id_in_old = list(set(id_list_old) - set(ids_common))
file_name = DATA_FOLDER + "/" + str(cnt) + ".pyobj"
_result = {
"delete": id_in_old,
"add": [],
"update": [],
"source": b2.target_collection.name,
"timestamp": get_timestamp(),
}
if len(id_in_old) != 0:
dump(_result, file_name)
print("(Deleted: {})".format(len(id_in_old)), end="")
cnt_delete += len(id_in_old)
print("Finished calculating diff for the old collection. Total number of docs deleted: {}".format(cnt_delete))
print("=" * 100)
print("Summary: (Updated: {}, Added: {}, Deleted: {})".format(cnt_update, cnt_add, cnt_delete))