Source code for biothings.utils.diff

"""
Utils to compare two list of gene documents, requires to setup Biothing Hub.
"""
import os
import os.path
import time

# from ..hub.databuild.backend import create_backend
from .backend import DocMongoDBBackend
from .common import dump, filter_dict, get_timestamp, timesofar
from .diff_common import full_diff_doc

# from .es import ESIndexer
from .jsondiff import make as jsondiff


[docs] def two_docs_iterator(b1, b2, id_list, step=10000, verbose=False): t0 = time.time() n = len(id_list) for i in range(0, n, step): t1 = time.time() if verbose: print("Processing %d-%d documents..." % (i + 1, min(i + step, n)), end="") _ids = id_list[i : i + step] iter1 = sorted([d for d in b1.mget_from_ids(_ids, asiter=True)], key=lambda a: a["_id"]) iter2 = sorted([d for d in b2.mget_from_ids(_ids, asiter=True)], key=lambda a: a["_id"]) for doc1, doc2 in zip(iter1, iter2): yield doc1, doc2 if verbose: print("Done.[%.1f%%,%s]" % (i * 100.0 / n, timesofar(t1))) if verbose: print("=" * 20) print("Finished.[total time: %s]" % timesofar(t0))
def _diff_doc_worker(args): _b1, _b2, ids, _path = args import importlib import biothings.utils.diff importlib.reload(biothings.utils.diff) from biothings.utils.diff import _diff_doc_inner_worker, get_backend b1 = get_backend(*_b1) b2 = get_backend(*_b2) _updates = _diff_doc_inner_worker(b1, b2, ids) return _updates def _diff_doc_inner_worker(b1, b2, ids, fastdiff=False, diff_func=full_diff_doc): """if fastdiff is True, only compare the whole doc, do not traverse into each attributes. """ _updates = [] for doc1, doc2 in two_docs_iterator(b1, b2, ids): assert doc1["_id"] == doc2["_id"], repr((ids, len(ids))) if fastdiff: if doc1 != doc2: _updates.append({"_id": doc1["_id"]}) else: _diff = diff_func(doc1, doc2) if _diff: _diff["_id"] = doc1["_id"] _updates.append(_diff) return _updates
[docs] def diff_docs_jsonpatch(b1, b2, ids, fastdiff=False, exclude_attrs=None): """if fastdiff is True, only compare the whole doc, do not traverse into each attributes. """ exclude_attrs = exclude_attrs or [] _updates = [] for doc1, doc2 in two_docs_iterator(b1, b2, ids): assert doc1["_id"] == doc2["_id"], "Different ids: '%s' != '%s'" % (doc1["_id"], doc2["_id"]) if exclude_attrs: doc1 = filter_dict(doc1, exclude_attrs) doc2 = filter_dict(doc2, exclude_attrs) if fastdiff: if doc1 != doc2: _updates.append(doc1["_id"]) else: _patch = jsondiff(doc1, doc2) if _patch: _diff = {} _diff["patch"] = _patch _diff["_id"] = doc1["_id"] _updates.append(_diff) return _updates
# TODO: move to mongodb backend class
[docs] def get_mongodb_uri(backend): db = backend.target_collection.database mongo_client = db.client # Credential information of current PyMongo placed in pool_options opt = mongo_client.options.pool_options._credentials # Still keep the old implementation to not break other feature. if hasattr(mongo_client._MongoClient__options, "credentials"): opt = mongo_client._MongoClient__options.credentials username = opt and opt.username or None password = opt and opt.password or None dbase = opt and opt.source or None uri = "mongodb://" if username: if password: uri += "%s:%s@" % (username, password) else: uri += "%s@" % username host, port = mongo_client.address uri += "%s:%s" % (host, port) uri += "/%s" % (dbase or db.name) # uri += "/%s" % backend.target_collection.name print("uri: %s" % uri) return uri
[docs] def diff_collections(b1, b2, use_parallel=True, step=10000): """ b1, b2 are one of supported backend class in databuild.backend. e.g.:: b1 = DocMongoDBBackend(c1) b2 = DocMongoDBBackend(c2) """ id_s1 = set(b1.get_id_list()) id_s2 = set(b2.get_id_list()) print("Size of collection 1:\t", len(id_s1)) print("Size of collection 2:\t", len(id_s2)) id_in_1 = id_s1 - id_s2 id_in_2 = id_s2 - id_s1 id_common = id_s1 & id_s2 print("# of docs found only in collection 1:\t", len(id_in_1)) print("# of docs found only in collection 2:\t", len(id_in_2)) print("# of docs found in both collections:\t", len(id_common)) print("Comparing matching docs...") _updates = [] if len(id_common) > 0: if not use_parallel: _updates = _diff_doc_inner_worker(b1, b2, list(id_common)) else: from .parallel import run_jobs_on_parallel _path = os.path.split(os.path.split(os.path.abspath(__file__))[0])[0] + "/.." id_common = list(id_common) _b1 = (get_mongodb_uri(b1), b1.target_collection.database.name, b1.target_name, b1.name) _b2 = (get_mongodb_uri(b2), b2.target_collection.database.name, b2.target_name, b2.name) task_li = [(_b1, _b2, id_common[i : i + step], _path) for i in range(0, len(id_common), step)] job_results = run_jobs_on_parallel(_diff_doc_worker, task_li) _updates = [] if job_results: for res in job_results: _updates.extend(res) else: print("Parallel jobs failed or were interrupted.") return None print("Done. [{} docs changed]".format(len(_updates))) _deletes = [] if len(id_in_1) > 0: _deletes = sorted(id_in_1) _adds = [] if len(id_in_2) > 0: _adds = sorted(id_in_2) changes = {"update": _updates, "delete": _deletes, "add": _adds} return changes
[docs] def get_backend(uri, db, col, bk_type): if bk_type != "mongodb": raise NotImplementedError("Backend type '%s' not supported" % bk_type) from biothings.utils.mongo import MongoClient colobj = MongoClient(uri)[db][col] return DocMongoDBBackend(colobj)
[docs] def diff_collections_batches(b1, b2, result_dir, step=10000): """ b2 is new collection, b1 is old collection """ from biothings.utils.mongo import doc_feeder DIFFFILE_PATH = "/home/kevinxin/diff_result/" DATA_FOLDER = os.path.join(DIFFFILE_PATH, result_dir) if not os.path.exists(DATA_FOLDER): os.mkdir(DATA_FOLDER) data_new = doc_feeder(b2.target_collection, step=step, inbatch=True, fields=[]) data_old = doc_feeder(b1.target_collection, step=step, inbatch=True, fields=[]) cnt = 0 cnt_update = 0 cnt_add = 0 cnt_delete = 0 for _batch in data_new: cnt += 1 id_list_new = [_doc["_id"] for _doc in _batch] docs_common = b1.target_collection.find({"_id": {"$in": id_list_new}}, projection=[]) ids_common = [_doc["_id"] for _doc in docs_common] id_in_new = list(set(id_list_new) - set(ids_common)) _updates = [] if len(ids_common) > 0: _updates = diff_docs_jsonpatch(b1, b2, list(ids_common), fastdiff=True) file_name = DATA_FOLDER + "/" + str(cnt) + ".pyobj" _result = { "add": id_in_new, "update": _updates, "delete": [], "source": b2.target_collection.name, "timestamp": get_timestamp(), } if len(_updates) != 0 or len(id_in_new) != 0: dump(_result, file_name) print("(Updated: {}, Added: {})".format(len(_updates), len(id_in_new)), end="") cnt_update += len(_updates) cnt_add += len(id_in_new) print( "Finished calculating diff for the new collection. Total number of docs updated: {}, added: {}".format( cnt_update, cnt_add ) ) print("=" * 100) for _batch in data_old: cnt += 1 id_list_old = [_doc["_id"] for _doc in _batch] docs_common = b2.target_collection.find({"_id": {"$in": id_list_old}}, projection=[]) ids_common = [_doc["_id"] for _doc in docs_common] id_in_old = list(set(id_list_old) - set(ids_common)) file_name = DATA_FOLDER + "/" + str(cnt) + ".pyobj" _result = { "delete": id_in_old, "add": [], "update": [], "source": b2.target_collection.name, "timestamp": get_timestamp(), } if len(id_in_old) != 0: dump(_result, file_name) print("(Deleted: {})".format(len(id_in_old)), end="") cnt_delete += len(id_in_old) print("Finished calculating diff for the old collection. Total number of docs deleted: {}".format(cnt_delete)) print("=" * 100) print("Summary: (Updated: {}, Added: {}, Deleted: {})".format(cnt_update, cnt_add, cnt_delete))