Source code for biothings.utils.sqlite3

import os
import sqlite3
import json

from biothings.utils.hub_db import IDatabase
from biothings.utils.dotfield import parse_dot_fields
from biothings.utils.dataload import update_dict_recur
from biothings.utils.common import json_serial

[docs]def get_hub_db_conn(): return Database()
[docs]def get_src_dump(): db = Database() return db[getattr(db.CONFIG, "DATA_SRC_DUMP_COLLECTION", "src_dump")]
[docs]def get_src_master(): db = Database() return db[db.CONFIG.DATA_SRC_MASTER_COLLECTION]
[docs]def get_src_build(): db = Database() return db[db.CONFIG.DATA_SRC_BUILD_COLLECTION]
[docs]def get_src_build_config(): db = Database() return db[db.CONFIG.DATA_SRC_BUILD_CONFIG_COLLECTION]
[docs]def get_data_plugin(): db = Database() return db[db.CONFIG.DATA_PLUGIN_COLLECTION]
[docs]def get_api(): db = Database() return db[db.CONFIG.API_COLLECTION]
[docs]def get_cmd(): db = Database() return db[db.CONFIG.CMD_COLLECTION]
[docs]def get_event(): db = Database() return db[getattr(db.CONFIG, "EVENT_COLLECTION", "event")]
[docs]def get_hub_config(): db = Database() return db[getattr(db.CONFIG, "HUB_CONFIG_COLLECTION", "hub_config")]
[docs]def get_last_command(): try: db = Database() res = db.get_conn().execute("SELECT MAX(_id) FROM cmd").fetchall() assert res[0][0], "No command ID found, bootstrap ?" return {"_id": res[0][0]} except Exception: return {"_id": 1}
[docs]def get_source_fullname(col_name): """ Assuming col_name is a collection created from an upload process, find the main source & sub_source associated. """ src_dump = get_src_dump() info = None for doc in src_dump.find(): if col_name in doc.get("upload", {}).get("jobs", {}).keys(): info = doc if info: name = info["_id"] if name != col_name: # col_name was a sub-source name return "%s.%s" % (name, col_name) else: return name
[docs]class Database(IDatabase): def __init__(self): super(Database, self).__init__() self.name = getattr(self.CONFIG, "DATA_HUB_DB_DATABASE", "hubdb") if not os.path.exists(self.CONFIG.HUB_DB_BACKEND["sqlite_db_folder"]): os.makedirs(self.CONFIG.HUB_DB_BACKEND["sqlite_db_folder"]) self.dbfile = os.path.join(self.CONFIG.HUB_DB_BACKEND["sqlite_db_folder"], self.name) self.cols = {} @property def address(self): return self.dbfile
[docs] def get_conn(self): return sqlite3.connect(self.dbfile)
[docs] def collection_names(self): tables = self.get_conn().execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() return [name[0] for name in tables]
[docs] def create_collection(self, colname): return self[colname]
[docs] def create_if_needed(self, table): existings = [tname[0] for tname in self.get_conn().execute( "SELECT name FROM sqlite_master WHERE type='table' and " + "name = ?", (table,) ).fetchall()] if table not in existings: # TODO: injection... self.get_conn().execute("CREATE TABLE %s (_id TEXT PRIMARY KEY, document TEXT)" % table).fetchone()
def __getitem__(self, colname): if colname not in self.cols: self.create_if_needed(colname) self.cols[colname] = Collection(colname, self) return self.cols[colname]
[docs]class Collection(object): def __init__(self, colname, db): self.colname = colname self.db = db
[docs] def get_conn(self): return sqlite3.connect(self.db.dbfile)
@property def name(self): return self.colname @property def database(self): return self.db
[docs] def find_one(self, *args, **kwargs): if args and len(args) == 1 and isinstance(args[0], dict): if len(args[0]) == 1 and "_id" in args[0]: strdoc = self.get_conn().execute("SELECT document FROM %s WHERE _id = ?" % self.colname, (args[0]["_id"],)).fetchone() if strdoc: return json.loads(strdoc[0]) else: return None else: return self.find(*args, find_one=True) elif args or kwargs: raise NotImplementedError("find(): %s %s" % (repr(args), repr(kwargs))) else: return self.find(find_one=True)
[docs] def find(self, *args, **kwargs): results = [] if args and len(args) == 1 and isinstance(args[0], dict) and len(args[0]) > 0: # it's key/value search, let's iterate for doc in self.get_conn().execute("SELECT document FROM %s" % self.colname).fetchall(): found = False doc = json.loads(doc[0]) for k, v in args[0].items(): if k in doc: if doc[k] == v: found = True else: found = False break if found: if "find_one" in kwargs: return doc else: results.append(doc) return results elif not args or len(args) == 1 and len(args[0]) == 0: # nothing or empty dict return [json.loads(doc[0]) for doc in self.get_conn().execute("SELECT document FROM %s" % self.colname).fetchall()] else: raise NotImplementedError("find: args=%s kwargs=%s" % (repr(args), repr(kwargs)))
[docs] def insert_one(self, doc): assert "_id" in doc with self.get_conn() as conn: conn.execute( "INSERT INTO %s (_id,document) VALUES (?,?)" % self.colname, (doc["_id"], json.dumps(doc, default=json_serial)) ).fetchone() conn.commit()
[docs] def update_one(self, query, what, upsert=False): assert len(what) == 1 and ("$set" in what or "$unset" in what or "$push" in what), "$set/$unset/$push operators not found" doc = self.find_one(query) if doc: if "$set" in what: # parse_dot_fields uses json.dumps internally, we can to make # sure everything is serializable first what = json.loads(json.dumps(what, default=json_serial)) what = parse_dot_fields(what["$set"]) doc = update_dict_recur(doc, what) elif "$unset" in what: for keytounset in what["$unset"].keys(): doc.pop(keytounset, None) elif "$push" in what: for listkey, elem in what["$push"].items(): assert "." not in listkey, "$push not supported for nested keys: %s" % listkey doc.setdefault(listkey, []).append(elem) self.save(doc) elif upsert: assert "$set" in what query.update(what["$set"]) self.save(query)
[docs] def update(self, query, what): docs = self.find(query) for doc in docs: self.update_one({"_id": doc["_id"]}, what)
[docs] def save(self, doc): if self.find_one({"_id": doc["_id"]}): with self.get_conn() as conn: conn.execute( "UPDATE %s SET document = ? WHERE _id = ?" % self.colname, (json.dumps(doc, default=json_serial), doc["_id"]) ) conn.commit() else: self.insert_one(doc)
[docs] def replace_one(self, query, doc, upsert=False): assert "_id" in query orig = self.find_one(query) if orig: orig["_id"] = query["_id"] with self.get_conn() as conn: conn.execute( "UPDATE %s SET document = ? WHERE _id = ?" % self.colname, (json.dumps(doc, default=json_serial), orig["_id"]) ) conn.commit() elif upsert: doc["_id"] = query["_id"] self.save(doc)
[docs] def remove(self, query): docs = self.find(query) with self.get_conn() as conn: for doc in docs: conn.execute("DELETE FROM %s WHERE _id = ?" % self.colname, (doc["_id"],)).fetchone() conn.commit()
[docs] def count(self): return self.get_conn().execute("SELECT count(_id) FROM %s" % self.colname).fetchone()[0]
[docs] def drop(self): self.get_conn().execute("DROP TABLE %s" % self.colname).fetchall()
def __getitem__(self, _id): return self.find_one({"_id": _id}) def __getstate__(self): self.__dict__.pop("db", None) return self.__dict__