Source code for biothings.utils.dataload

"""
Utility functions for parsing flatfiles,
mapping to JSON, cleaning.
"""
import csv

# see tabfile_feeder(coerce_unicode) if needed
# from __future__ import unicode_literals
import itertools
import json
import os
import os.path
from collections import Counter, OrderedDict
from collections.abc import Mapping
from functools import total_ordering

from .common import anyfile, is_str, open_anyfile, safewfile
from .dotstring import key_value, set_key_value

csv.field_size_limit(10000000)  # default is 131072, too small for some big files


[docs] def dict_sweep(d, vals=None, remove_invalid_list=False): """ Remove keys whose values are ".", "-", "", "NA", "none", " "; and remove empty dictionaries Args: d (dict): a dictionary vals (str or list): a string or list of strings to sweep, or None to use the default values remove_invalid_list (boolean): when true, will remove key for which list has only one value, which is part of "vals". Ex:: test_dict = {'gene': [None, None], 'site': ["Intron", None], 'snp_build' : 136} with `remove_invalid_list == False`:: {'gene': [None], 'site': ['Intron'], 'snp_build': 136} with `remove_invalid_list == True`:: {'site': ['Intron'], 'snp_build': 136} """ # set default supported vals for empty values vals = vals or [".", "-", "", "NA", "none", " ", "Not Available", "unknown"] for key, val in list(d.items()): if val in vals: del d[key] elif isinstance(val, list): if remove_invalid_list: val = [v for v in val if v not in vals] for item in val: if isinstance(item, dict): dict_sweep(item, vals, remove_invalid_list=remove_invalid_list) # if len(val) == 0: if not val: del d[key] else: d[key] = val else: for item in val: if item in vals: val.remove(item) elif isinstance(item, dict): dict_sweep(item, vals, remove_invalid_list=remove_invalid_list) # if len(val) == 0: if not val: del d[key] elif isinstance(val, dict): dict_sweep(val, vals, remove_invalid_list=remove_invalid_list) # if len(val) == 0: if not val: del d[key] return d
[docs] def safe_type(f, val): """ Convert an input string to int/float/... using passed function. If the conversion fails then None is returned. If value of a type other than a string then the original value is returned. """ if is_str(val): try: return f(val) except ValueError: pass return val
[docs] def to_float(val): """convert an input string to int""" return safe_type(float, val)
[docs] def to_int(val): """convert an input string to float""" return safe_type(int, val)
[docs] def to_number(val): """convert an input string to int/float.""" if is_str(val): try: return int(val) except ValueError: try: return float(val) except ValueError: pass return val
[docs] def boolean_convert(d, convert_keys=None, level=0): """ Convert values specified by `convert_keys` in document `d` to boolean. Dotfield notation can be used to specify inner keys. Note that `None` values are converted to `False` in Python. Use `dict_sweep()` before calling this function if such `False` values are not expected. See https://github.com/biothings/biothings.api/issues/274 for details. """ convert_keys = convert_keys or [] for key, val in d.items(): if isinstance(val, dict): d[key] = boolean_convert(val, convert_keys) if key in [ak.split(".")[level] for ak in convert_keys if len(ak.split(".")) > level]: if isinstance(val, list) or isinstance(val, tuple): if val and isinstance(val[0], dict): d[key] = [boolean_convert(v, convert_keys, level + 1) for v in val] else: d[key] = [to_boolean(x) for x in val] elif isinstance(val, dict) or isinstance(val, OrderedDict): d[key] = boolean_convert(val, convert_keys, level + 1) else: d[key] = to_boolean(val) return d
[docs] def float_convert(d, include_keys=None, exclude_keys=None): """Convert elements in a document to floats. By default, traverse all keys If include_keys is specified, only convert the list from include_keys a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys :param d: a dictionary to traverse keys on :param include_keys: only convert these keys (optional) :param exclude_keys: exclude all other keys except these keys (optional) :return: generate key, value pairs """ return value_convert_incexcl(d, to_float, include_keys, exclude_keys)
[docs] def int_convert(d, include_keys=None, exclude_keys=None): """Convert elements in a document to integers. By default, traverse all keys If include_keys is specified, only convert the list from include_keys a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys :param d: a dictionary to traverse keys on :param include_keys: only convert these keys (optional) :param exclude_keys: exclude all other keys except these keys (optional) :return: generate key, value pairs """ return value_convert_incexcl(d, to_int, include_keys, exclude_keys)
[docs] def to_boolean(val, true_str=None, false_str=None): """Normalize str value to boolean value""" # set default true_str and false_str true_str = true_str or {"true", "1", "t", "y", "yes", "Y", "Yes", "YES", 1} false_str = false_str or {"false", "0", "f", "n", "N", "No", "no", "NO", 0} # if type(val)!=str: if not isinstance(val, str): return bool(val) else: if val in true_str: return True elif val in false_str: return False
[docs] def merge_duplicate_rows(rows, db): """ @param rows: rows to be grouped by @param db: database name, string """ rows = list(rows) keys = set() for row in rows: for k in row[db]: keys.add(k) first_row = rows[0] other_rows = rows[1:] for row in other_rows: for i in keys: try: aa = first_row[db][i] except KeyError: try: first_row[db][i] = row[db][i] except KeyError: pass continue if i in row[db]: if row[db][i] != first_row[db][i]: if not isinstance(aa, list): aa = [aa] aa.append(row[db][i]) first_row[db][i] = aa else: continue return first_row
[docs] def unique_ids(src_module): i = src_module.load_data() out = list(i) id_list = [a["_id"] for a in out if a] myset = set(id_list) print(len(out), "Documents produced") print(len(myset), "Unique IDs") return out
[docs] def rec_handler(infile, block_end="\n", skip=0, include_block_end=False, as_list=False): """ A generator to return a record (block of text) at once from the `infile`. The record is separated by one or more empty lines by default. `skip` can be used to skip top n-th lines if `include_block_end` is True, the line matching block_end will also be returned. If `as_list` is True, return a list of lines in one record. """ with open_anyfile(infile) as in_f: if skip: for i in range(skip): in_f.readline() del i for key, group in itertools.groupby(in_f, lambda line: line == block_end): if not key: if include_block_end: _g = itertools.chain(group, (block_end,)) yield list(_g) if as_list else "".join(_g)
# =============================================================================== # List Utility functions # =============================================================================== # if dict value is a list of length 1, unlist
[docs] def unlist(d): for key, val in d.items(): if isinstance(val, list): if len(val) == 1: d[key] = val[0] elif isinstance(val, dict): unlist(val) return d
[docs] def unlist_incexcl(d, include_keys=None, exclude_keys=None): """Unlist elements in a document. If there is 1 value in the list, set the element to that value. Otherwise, leave the list unchanged. By default, traverse all keys If include_keys is specified, only traverse the list from include_keys a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys :param d: a dictionary to unlist :param include_keys: only unlist these keys (optional) :param exclude_keys: exclude all other keys except these keys (optional) :return: generate key, value pairs """ def unlist_helper(d, include_keys=None, exclude_keys=None, keys=None): include_keys = include_keys or [] exclude_keys = exclude_keys or [] keys = keys or [] if isinstance(d, dict): for key, val in d.items(): if isinstance(val, list): if len(val) == 1: path = ".".join(keys + [key]) if include_keys: if path in include_keys: d[key] = val[0] elif path not in exclude_keys: d[key] = val[0] elif isinstance(val, dict): unlist_helper(val, include_keys, exclude_keys, keys + [key]) unlist_helper(d, include_keys, exclude_keys, []) return d
[docs] def list_split(d, sep): """Split fields by sep into comma separated lists, strip.""" for key, val in d.items(): if isinstance(val, dict): list_split(val, sep) try: if len(val.split(sep)) > 1: d[key] = val.rstrip().rstrip(sep).split(sep) except AttributeError: pass return d
[docs] def id_strip(id_list): id_list = id_list.split("|") ids = [] for _id in id_list: ids.append(_id.rstrip().lstrip()) return ids
[docs] def llist(li, sep="\t"): """Nicely output the list with each item a line.""" for x in li: if isinstance(x, (li, tuple)): xx = sep.join([str(i) for i in x]) else: xx = str(x) print(xx)
[docs] def listitems(a_list, *idx): """Return multiple items from list by given indexes.""" if isinstance(a_list, tuple): return tuple(a_list[i] for i in idx) else: return [a_list[i] for i in idx]
[docs] def list2dict(a_list, keyitem, alwayslist=False): # pylint: disable=redefined-outer-name """ Return a dictionary with specified `keyitem` as key, others as values. `keyitem` can be an index or a sequence of indexes. For example:: li=[['A','a',1], ['B','a',2], ['A','b',3]] list2dict(li,0)---> {'A':[('a',1),('b',3)], 'B':('a',2)} If `alwayslist` is True, values are always a list even there is only one item in it:: list2dict(li,0,True)---> {'A':[('a',1),('b',3)], 'B':[('a',2),]} """ _dict = {} for x in a_list: if isinstance(keyitem, int): # single item as key key = x[keyitem] value = tuple(x[:keyitem] + x[keyitem + 1 :]) else: key = tuple(x[i] for i in keyitem) value = tuple(x[i] for i in range(len(a_list)) if i not in keyitem) if len(value) == 1: # single value value = value[0] if key not in _dict: if alwayslist: _dict[key] = [ value, ] else: _dict[key] = value else: current_value = _dict[key] if not isinstance(current_value, list): current_value = [ current_value, ] current_value.append(value) _dict[key] = current_value return _dict
[docs] def listsort(a_list, by, reverse=False, cmp=None, key=None): """ Given `a_list` is a list of sub(list/tuple.), return a new list sorted by the ith (given from "by" item) item of each sublist. """ new_li = [(x[by], x) for x in a_list] new_li.sort(cmp=cmp, key=key, reverse=reverse) return [x[1] for x in new_li]
[docs] def list_itemcnt(a_list): """Return number of occurrence for each item in the list.""" return list(Counter(a_list).items())
[docs] def alwayslist(value): """If input value is not a list/tuple type, return it as a single value list.""" if value is None: return [] if isinstance(value, (list, tuple)): return value else: return [value]
# =============================================================================== # File Utility functions # ===============================================================================
[docs] def tabfile_tester(datafile, header=1, sep="\t"): reader = csv.reader(anyfile(datafile), delimiter=sep) lineno = 0 try: for _ in range(header): next(reader) lineno += 1 for _ in reader: lineno += 1 del _ except Exception: print("Error at line number:", lineno) raise
[docs] def dupline_seperator(dupline, dup_sep, dup_idx=None, strip=False): """ for a line like this:: a b1,b2 c1,c2 return a generator of this list (breaking out of the duplicates in each field):: [(a,b1,c1), (a,b2,c1), (a,b1,c2), (a,b2,c2)] Example:: dupline_seperator(dupline=['a', 'b1,b2', 'c1,c2'], dup_idx=[1,2], dup_sep=',') if dup_idx is None, try to split on every field. if strip is True, also tripe out of extra spaces. """ value_li = list(dupline) for idx, value in enumerate(value_li): if dup_idx: if idx in dup_idx: value = value.split(dup_sep) if strip: value = [x.strip() for x in value] else: value = [value] else: value = value.split(dup_sep) if strip: value = [x.strip() for x in value] value_li[idx] = value return itertools.product(*value_li) # itertools.product fits exactly the purpose here
[docs] def tabfile_feeder(datafile, header=1, sep="\t", includefn=None, coerce_unicode=True, assert_column_no=None): """a generator for each row in the file.""" in_f = anyfile(datafile) reader = csv.reader(in_f, delimiter=sep) lineno = 0 try: for _ in range(header): next(reader) lineno += 1 for ld in reader: if assert_column_no: if len(ld) != assert_column_no: err = "Unexpected column number: got {}, should be {}".format(len(ld), assert_column_no) raise ValueError(err) if not includefn or includefn(ld): lineno += 1 if coerce_unicode: yield [str(x) for x in ld] else: yield ld except ValueError: print("Error at line number:", lineno) raise
[docs] def tab2list(datafile, cols, **kwargs): if os.path.exists(datafile): if isinstance(cols, int): return [ld[cols] for ld in tabfile_feeder(datafile, **kwargs)] else: return [listitems(ld, *cols) for ld in tabfile_feeder(datafile, **kwargs)] else: print('Error: missing "%s". Skipped!' % os.path.split(datafile)[1]) return {}
[docs] def tab2dict(datafile, cols, key, alwayslist=False, **kwargs): # pylint: disable=redefined-outer-name if isinstance(datafile, tuple): _datafile = datafile[0] else: _datafile = datafile if os.path.exists(_datafile): return list2dict( [listitems(ld, *cols) for ld in tabfile_feeder(datafile, **kwargs)], key, alwayslist=alwayslist ) else: print('Error: missing "%s". Skipped!' % os.path.split(_datafile)[1]) return {}
[docs] def tab2dict_iter(datafile, cols, key, alwayslist=False, **kwargs): # pylint: disable=redefined-outer-name """ Args: cols (array of int): an array of indices (of a list) indicating which element(s) are kept in bulk key (int): an index (of a list) indicating which element is treated as a bulk key Iterate `datafile` by row, subset each row (as a list of strings) by `cols`. Adjacent rows sharing the same value at the `key` index are put into one bulk. Each bulk is then transformed to a dict with the value at the `key` index as the dict key. E.g. given the following datafile, cols=[0,1,2], and key=1, two bulks are generated: key a1 b1 c1 -------------------------------------------------- a2 b1 c2 # bulk_1 => {b1: [(a1, c1), (a2, c2), (a3, c3)]} # a3 b1 c3 -------------------------------------------------- a4 b2 c4 -------------------------------------------------- a5 b2 c5 # bulk_2 => {b2: [(a4, c4), (a5, c5), (a6, c6)]} # a6 b2 c6 -------------------------------------------------- """ if isinstance(datafile, tuple): _datafile = datafile[0] else: _datafile = datafile if not os.path.exists(_datafile): print('Error: missing "%s". Skipped!' % os.path.split(_datafile)[1]) return {} bulk = [] current_key = None for ld in tabfile_feeder(datafile, **kwargs): li = listitems(ld, *cols) if current_key is None or (li[key] == current_key): # same key, put into bulk bulk.append(li) current_key = li[key] else: # key changed # first step: yield the current bulk di = list2dict(bulk, key, alwayslist=alwayslist) yield di # key changed # second step: start a new bulk bulk = [li] current_key = li[key] # flush remaining bulk if bulk: di = list2dict(bulk, key, alwayslist=alwayslist) yield di
[docs] def file_merge(infiles, outfile=None, header=1, verbose=1): """ Merge a list of input files with the same format. If `header` is n then the top n lines will be discarded since reading the 2nd file in the list. """ outfile = outfile or "_merged".join(os.path.splitext(infiles[0])) out_f, outfile = safewfile(outfile) if verbose: print("Merging...") cnt = 0 for i, fn in enumerate(infiles): print(os.path.split(fn)[1], "...", end="") line_no = 0 in_f = anyfile(fn) if i > 0: for k in range(header): in_f.readline() del k for line in in_f: out_f.write(line) line_no += 1 in_f.close() cnt += line_no print(line_no) out_f.close() print("=" * 20) print("Done![total %d lines output]" % cnt)
# =============================================================================== # Dictionary & other structures Utility functions # =============================================================================== # http://stackoverflow.com/questions/12971631/sorting-list-by-an-attribute-that-can-be-none # used to sort list with None element (because python3 suddenly decided it wasn't possible # anymore. because...) # from functools import total_ordering
[docs] @total_ordering class MinType(object): def __le__(self, other): return True def __eq__(self, other): return self is other
Min = MinType()
[docs] def traverse_keys(d, include_keys=None, exclude_keys=None): """Return all key, value pairs for a document. By default, traverse all keys If include_keys is specified, only traverse the list from include_kes a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys if a key in include_keys/exclude_keys is not found in d, it's skipped quietly. :param d: a dictionary to traverse keys on :param include_keys: only traverse these keys (optional) :param exclude_keys: exclude all other keys except these keys (optional) :return: generate key, value pairs """ include_keys = include_keys or [] exclude_keys = exclude_keys or [] def traverse_helper(d, keys): if isinstance(d, dict): for k in d.keys(): yield from traverse_helper(d[k], keys + [k]) elif isinstance(d, list): for i in d: yield from traverse_helper(i, keys) else: yield keys, d if include_keys: for k in include_keys: for val in key_value(d, k): if val: # only yield non-empty value # when val is None, it could be either: # 1. k is not found in d # 2. the value of k in d is indeed None # For now, we cannot tell which case, just skip it yield k, val else: for kl, val in traverse_helper(d, []): key = ".".join(kl) if key not in exclude_keys: yield key, val
# from mygene, originally
[docs] def value_convert(_dict, fn, traverse_list=True): """ For each value in _dict, apply fn and then update _dict with return the value. If `traverse_list` is True and a value is a list, apply `fn` to each item of the list. """ for k in _dict: if traverse_list and isinstance(_dict[k], list): _dict[k] = [fn(x) for x in _dict[k]] else: _dict[k] = fn(_dict[k]) return _dict
[docs] def value_convert_incexcl(d, fn, include_keys=None, exclude_keys=None): """Convert elements in a document using a function fn. By default, traverse all keys If include_keys is specified, only convert the list from include_keys a.b, a.b.c If exclude_keys is specified, only exclude the list from exclude_keys :param d: a dictionary to traverse keys on :param fn: function to convert elements with :param include_keys: only convert these keys (optional) :param exclude_keys: exclude all other keys except these keys (optional) :return: generate key, value pairs """ for path, value in traverse_keys(d, include_keys, exclude_keys): new_value = fn(value) set_key_value(d, path, new_value) return d
# from biothings, originally # closed to value_convert, could be refactored except this one # is recursive for dict typed values
[docs] def value_convert_to_number(d, skipped_keys=None): """ Convert string numbers into integers or floats; skip converting certain keys in skipped_keys list. """ skipped_keys = skipped_keys or [] for key, val in d.items(): if isinstance(val, dict): value_convert_to_number(val, skipped_keys) if key not in skipped_keys: if isinstance(val, list): d[key] = [ to_number(x) if not isinstance(x, dict) else value_convert_to_number(x, skipped_keys) for x in val ] elif isinstance(val, tuple): d[key] = tuple( to_number(x) if not isinstance(x, dict) else value_convert_to_number(x, skipped_keys) for x in val ) else: d[key] = to_number(val) return d
[docs] def dict_convert(_dict, keyfn=None, valuefn=None): """Return a new dict with each key converted by keyfn (if not None), and each value converted by valuefn (if not None). """ if keyfn is None and valuefn is not None: for k in _dict: _dict[k] = valuefn(_dict[k]) return _dict elif keyfn is not None: out_dict = {} for k in _dict: out_dict[keyfn(k)] = valuefn(_dict[k]) if valuefn else _dict[k] return out_dict else: return _dict
[docs] def updated_dict(_dict, attrs): """Same as `dict.update`, but return the updated dictionary.""" out = _dict.copy() out.update(attrs) return out
[docs] def update_dict_recur(d, u): """ Update dict `d` with dict `u`'s values, recursively (so existing values in `d` but not in `u` are kept even if nested) """ for k, v in u.items(): if isinstance(v, Mapping): r = update_dict_recur(d.get(k, {}), v) d[k] = r else: d[k] = u[k] return d
[docs] def merge_dict(dict_li, attr_li, missingvalue=None): """ Merging multiple dictionaries into a new one. Example:: In [136]: d1 = {'id1': 100, 'id2': 200} In [137]: d2 = {'id1': 'aaa', 'id2': 'bbb', 'id3': 'ccc'} In [138]: merge_dict([d1,d2], ['number', 'string']) Out[138]: {'id1': {'number': 100, 'string': 'aaa'}, 'id2': {'number': 200, 'string': 'bbb'}, 'id3': {'string': 'ccc'}} In [139]: merge_dict([d1,d2], ['number', 'string'], missingvalue='NA') Out[139]: {'id1': {'number': 100, 'string': 'aaa'}, 'id2': {'number': 200, 'string': 'bbb'}, 'id3': {'number': 'NA', 'string': 'ccc'}} """ dd = dict(zip(attr_li, dict_li)) key_set = set() for attr in dd: key_set = key_set | set(dd[attr]) out_dict = {} for k in key_set: value = {} for attr in dd: if k in dd[attr]: value[attr] = dd[attr][k] elif missingvalue is not None: value[attr] = missingvalue out_dict[k] = value return out_dict
[docs] def normalized_value(value, sort=True): """Return a "normalized" value: 1. if a list, remove duplicate and sort it 2. if a list with one item, convert to that single item only 3. if a list, remove empty values 4. otherwise, return value as it is. """ if isinstance(value, list): value = [x for x in value if x] # remove empty values try: _v = list(set(value)) except TypeError: _v = [json.loads(x) for x in {json.dumps(x) for x in value}] if _v and sort: # py3 won't sort dict anymore... if isinstance(_v[0], dict): _v = sorted(_v, key=lambda x: sorted(x.keys())) else: try: _v = sorted(_v) except TypeError: # probably some None values to sort, not handle anymore in py3 # let's use a trick... _v = sorted(_v, key=lambda x: Min if x is None or (not isinstance(x, str) and None in x) else x) if len(_v) == 1: _v = _v[0] else: _v = value return _v
[docs] def dict_nodup(_dict, sort=True): for k in _dict: _dict[k] = normalized_value(_dict[k], sort=sort) return _dict
[docs] def dict_attrmerge(dict_li, removedup=True, sort=True, special_fns=None): """ dict_attrmerge([{'a': 1, 'b':[2,3]}, {'a': [1,2], 'b':[3,5], 'c'=4}]) should return {'a': [1,2], 'b':[2,3,5], 'c'=4} special_fns is a dictionary of {attr: merge_fn} used for some special attr, which need special merge_fn e.g., {'uniprot': _merge_uniprot} """ special_fns = special_fns or {} out_dict = {} keys = [] for d in dict_li: keys.extend(d.keys()) keys = set(keys) for k in keys: _value = [] for d in dict_li: if d.get(k, None): if isinstance(d[k], list): _value.extend(d[k]) else: _value.append(d[k]) if len(_value) == 1: out_dict[k] = _value[0] else: out_dict[k] = _value if k in special_fns: out_dict[k] = special_fns[k](out_dict[k]) if removedup: out_dict = dict_nodup(out_dict, sort=sort) return out_dict
[docs] def merge_root_keys(doc1, doc2, exclude=None): """ Ex: d1 = {"_id":1,"a":"a","b":{"k":"b"}} d2 = {"_id":1,"a":"A","b":{"k":"B"},"c":123} Both documents have the same _id, and 2 root keys, "a" and "b". Using this storage, the resulting document will be: {'_id': 1, 'a': ['A', 'a'], 'b': [{'k': 'B'}, {'k': 'b'}],"c":123} """ # we'll "eat" from doc2 so clean it first as needed exclude = exclude or [] for k in exclude: doc2.pop(k, None) for k1 in doc1: if k1 in exclude: continue v2 = doc2.pop(k1, None) if not isinstance(v2, list): v2 = [v2] if v2: if isinstance(doc1[k1], list): doc1[k1].extend(v2) else: doc1[k1] = [doc1[k1]] + v2 # merge what's remaining in doc2 that wasn't in doc1 doc1.update(doc2) return doc1
[docs] def dict_apply(d, key, value, sort=True): """add value to d[key], append it if key exists >>> d = {'a': 1} >>> dict_apply(d, 'a', 2) {'a': [1, 2]} >>> dict_apply(d, 'a', 3) {'a': [1, 2, 3]} >>> dict_apply(d, 'b', 2) {'a': 1, 'b': 2} """ if key in d: _value = d[key] if not isinstance(_value, list): _value = [_value] if isinstance(value, list): _value.extend(value) else: _value.append(value) else: _value = value d[key] = normalized_value(_value, sort=sort)
[docs] def dict_to_list(gene_d): """return a list of genedoc from genedoc dictionary and make sure the "_id" field exists. """ doc_li = [updated_dict(gene_d[k], {"_id": str(k)}) for k in sorted(gene_d.keys())] return doc_li
[docs] def merge_struct(v1, v2, aslistofdict=None, include=None, exclude=None): """merge two structures, v1 and v2, into one. :param aslistofdict: a string indicating the key name that should be treated as a list of dict :param include: when given a list of strings, only merge these keys (optional) :param exclude: when given a list of strings, exclude these keys from merging (optional) """ if isinstance(v1, list): if isinstance(v2, list): v1 = v1 + [x for x in v2 if x not in v1] else: if v2 not in v1: v1.append(v2) elif isinstance(v2, list) and isinstance(v1, dict): if v1 not in v2: v2.append(v1) elif isinstance(v1, dict): assert isinstance(v2, dict), "v2 %s not a dict (v1: %s)" % (v2, v1) to_merge = list(v1.keys()) if include: to_merge = include for k in to_merge: if exclude and k in exclude: continue elif k in v2: if aslistofdict == k: v1elem = v1[k] v2elem = v2[k] if not isinstance(v1elem, list): v1elem = [v1elem] if not isinstance(v2elem, list): v2elem = [v2elem] # v1elem and v2elem may be the same, in this case as a result # we may have transformed it in a list (no merge, but just type change). # if so, back to scalar if v1elem != v2elem: v1[k] = merge_struct(v1elem, v2elem) else: v1[k] = merge_struct(v1[k], v2[k]) else: v2[k] = v1[k] for k in v2: if k in v1: pass # already done else: v1[k] = v2[k] elif isinstance(v1, str) or isinstance(v1, int) or isinstance(v1, float): if isinstance(v2, str) or isinstance(v2, int) or isinstance(v2, float): if v1 != v2: v1 = [v1, v2] else: pass else: return merge_struct(v2, v1) else: raise TypeError("dunno how to merge type %s" % type(v1)) return v1
[docs] def dict_walk(dictionary, key_func): """Recursively apply key_func to dict's keys""" if not isinstance(dictionary, dict): return dictionary return {key_func(k): dict_walk(v, key_func) for k, v in dictionary.items()}
[docs] def dict_traverse(d, func, traverse_list=False): """ Recursively traverse dictionary d, calling func(k,v) for each key/value found. func must return a tuple(new_key,new_value) """ try: items = sorted(d.items(), key=lambda x: x[0]) except TypeError: # not sortable # need to make a copy first, because d will be updated during # the iteration, a RuntimeError will be raised otherwise: # RuntimeError: dictionary keys changed during iteration items = d.copy().items() for k, v in items: if isinstance(v, dict): dict_traverse(v, func, traverse_list=traverse_list) elif traverse_list and isinstance(v, list): for e in v: if isinstance(e, dict): dict_traverse(e, func, traverse_list=traverse_list) else: newk, newv = func(k, v) d.pop(k) d[newk] = newv