Source code for malibu.database.dbmapper

# -*- coding: utf-8 -*-
import copy
import re
import sqlite3
import types
from malibu.database import dbtypeconv

__doc__ = """
malibu.database.dbmapper
------------------------

This is a small, hackish ORM for SQLite3.


Note from the author: (01 / 14 / 2016)
--------------------------------------

I've got to be honest, this is probably the worst code I have ever written and read.
At this point, this code is so difficult to maintain and keep up to date for 2/3 compat that
it is almost not worth the work.
Especially considering that there are things like Peewee, SQLAlchemy, etc, this is not worth
using or maintaining.

From this point forward, I recommend using some other, cleaner, better maintained solution
such as Peewee.
This DBMapper code will no longer be maintained and will be deprecated starting
with the 0.1.6 release.
The code will be removed as the 1.0.0 release approaches.
There may be plans to replace this with a SQLite adapter for the malibu.design.brine series
of classes that behave similar to this, just without all the cruft.
"""


[docs]class DBMapper(object): """ This is code for a relatively small ORM for SQLite built on top of the python-sqlite3 module. """ # FETCH Constants for __execute() FETCH_ONE = 'one' FETCH_MANY = 'many' FETCH_ALL = 'all' # INDEX Constants for options dictionary. INDEX_PRIMARY = 'primaryIndex' INDEX_AUTOINCR = 'autoincrIndex' INDEX_UNIQUE = 'uniqueIndices' GENERATE_FTS_VT = 'genFTSVTs' # Global variables for static database methods _options = None __default_options = { INDEX_PRIMARY: 0, INDEX_AUTOINCR: True, INDEX_UNIQUE: set(), GENERATE_FTS_VT: False # Do NOT generate FTS by default. } @staticmethod
[docs] def get_default_options(): """ DBMapper.get_default_options() Returns a deep copy of the default options dictionary for modification in subclasses. """ return copy.deepcopy(DBMapper.__default_options)
@staticmethod
[docs] def connect_database(dbpath): """ DBMapper.connect_database(dbpath) Connects to a database at 'dbpath' and installs the json type converter "middleware" into the database system. """ dbtypeconv.install_json_converter() __db = sqlite3.connect(dbpath, detect_types=sqlite3.PARSE_DECLTYPES) return __db
@classmethod
[docs] def set_db_options(cls, db, keys, ktypes, options=__default_options): """ DBMapper.set_db_options(db => database instance keys => list of keys ktypes => list of key types options => options dictionary (optional)) Sets options for a subclasses DBMapper context. """ if cls._options is None: cls._options = {} cls._options['database'] = db cls._options['keys'] = keys cls._options['keytypes'] = ktypes cls._options['options'] = options else: cls._options['database'] = db cls._options['keys'] = keys cls._options['keytypes'] = ktypes
@classmethod
[docs] def load(cls, **kw): """ DBMapper.load(**kw) Loads a *single* row from the database and populates it into the context cls this method was called under. If the database returns more than one row for the kwarg query, this method will only return the first result! If you want a list of matching rows, use find() or search(). """ if cls._options is None: raise DBMapperException( 'Static database options have not been set.') dbo = cls._options obj = cls(dbo['database']) cur = dbo['database'].cursor() keys = [] vals = [] for key, val in kw.items(): keys.append(key) vals.append(val) whc = [] for pair in zip(keys, vals): whc.append("%s=?" % (pair[0])) query = "select * from %s where (%s)" % (obj._table, ' and '.join(whc)) result = obj.__execute(cur, query, args=vals) if result is None: for key in dbo['keys']: setattr(obj, "_%s" % (key), None) return for key, dbv in zip(dbo['keys'], result): setattr(obj, "_%s" % (key), dbv) return obj
@classmethod
[docs] def new(cls, **kw): """ DBMapper.new(**kw) Creates a new contextual instance and returns the object. Only parameters defined in the kwargs will be passed in to the record creation query, as there is no support for default values yet. (06/11/15) """ if cls._options is None: raise DBMapperException( 'Static database options have not been set.') dbo = cls._options obj = cls(dbo['database']) cur = dbo['database'].cursor() keys = [] vals = [] for key, val in kw.items(): keys.append(key) vals.append(val) anonvals = [] for val in vals: anonvals.append('?') query = "insert into %s (%s) values (%s)" % ( obj._table, ','.join(keys), ','.join(anonvals)) obj.__execute(cur, query, args=vals) res = cls.find(**kw) if len(res) == 0: return None else: return res[0]
@classmethod
[docs] def find(cls, **kw): """ DBMapper.find(**kw) Searches for a set of records that match the query built by the contents of the kwargs and returns a filterable list of contextualized results that can be modified. """ if cls._options is None: raise DBMapperException( 'Static database options have not been set.') dbo = cls._options obj = cls(dbo['database']) cur = dbo['database'].cursor() primaryKey = dbo['keys'][dbo['options'][DBMapper.INDEX_PRIMARY]] keys = [] vals = [] for key, val in kw.items(): keys.append(key) vals.append(val) whc = [] for pair in zip(keys, vals): whc.append('%s=?' % (pair[0])) query = "select %s from %s where (%s)" % ( primaryKey, obj._table, ' and '.join(whc)) result = obj.__execute(cur, query, args=vals, fetch=DBMapper.FETCH_ALL) load_pairs = [] for row in result: load_pairs.append( {primaryKey: row[dbo['options'][DBMapper.INDEX_PRIMARY]]} ) return DBResultList([cls.load(**pair) for pair in load_pairs])
@classmethod
[docs] def find_all(cls): """ DBMapper.find_all() Finds all rows that belong to a table and returns a filterable list of contextualized results. Please note that the list that is returned can be empty, but it should never be none. """ if cls._options is None: raise DBMapperException( 'Static database options have not been set.') dbo = cls._options obj = cls(dbo['database']) cur = dbo['database'].cursor() primaryKey = dbo['keys'][dbo['options'][DBMapper.INDEX_PRIMARY]] query = "select %s from %s" % (primaryKey, obj._table) result = obj.__execute(cur, query, fetch=DBMapper.FETCH_ALL) load_pairs = [] for row in result: load_pairs.append( {primaryKey: row[dbo['options'][DBMapper.INDEX_PRIMARY]]} ) return DBResultList([cls.load(**pair) for pair in load_pairs])
@classmethod
[docs] def search(cls, param): """ DBMapper.search(param) This function will return a list of results that match the given param for a full text query. The search parameter should be in the form of a sqlite full text query, as defined here: http://www.sqlite.org/fts3.html#section_3 As an example, suppose your table looked like this: +----+---------+----------------+ | id | name | description | +----+---------+----------------+ | 1 | linux | some magic | | 2 | freebsd | daemonic magic | | 3 | windows | tomfoolery | +----+---------+----------------+ A full text query for "name:linux magic" would return the first row because the name is linux and the description contains "magic". A full text query just for "description:magic" would return both rows one and two because the descriptions contain the word "magic". """ if cls._options is None: raise DBMapperException( 'Static database options have not been set.') if not cls._options['options'][DBMapper.GENERATE_FTS_VT]: raise DBMapperException( 'Full-text search table not enabled on this table.') dbo = cls._options obj = cls(dbo['database']) cur = dbo['database'].cursor() query = """select docid from _search_%s where _search_%s match \"?\"""" % \ (obj._table, obj._table) result = obj.__execute( cur, query, args=[param], fetch=DBMapper.FETCH_ALL) load_pairs = [] for row in result: load_pairs.append({cls._options['keys'][0]: row[0]}) return DBResultList([cls.load(**pair) for pair in load_pairs])
@classmethod
[docs] def join(cls, cond, a, b): """ DBMapper.join(cond => other table to join on a => left column to join b => right column to join) Performs a sqlite join on two tables. Returns the join results in a filterable list. """ if cls._options is None or cond._options is None: raise DBMapperException( 'Static database options have not been set.') dba = cls._options obja = cls(dba['database']) dbb = cond._options objb = cond(dbb['database']) cur = dba['database'].cursor() primaryKeyA = dba['keys'][dba['options'][DBMapper.INDEX_PRIMARY]] primaryKeyB = dbb['keys'][dba['options'][DBMapper.INDEX_PRIMARY]] query = "select A.%s, B.%s from %s as A join %s as B on A.%s=B.%s" % ( primaryKeyA, primaryKeyB, obja._table, objb._table, a, b) result = obja.__execute(cur, query, fetch=DBMapper.FETCH_ALL) load_pair_a = [] load_pair_b = [] for row in result: load_pair_a.append({primaryKeyA: row[0]}) load_pair_b.append({primaryKeyB: row[1]}) return ( DBResultList([cls.load(**pair) for pair in load_pair_a]), DBResultList([cond.load(**pair) for pair in load_pair_b]), )
def __init__(self, db, keys, keytypes, options=__default_options): self._db = db self._options = options if 'tableName' not in self._options: self._table = self.__class__.__name__.lower() else: self._options['tableName'] self._keys = keys self._keytypes = keytypes self._primary_ind = self._options[DBMapper.INDEX_PRIMARY] self._autoincr_ind = self._options[DBMapper.INDEX_AUTOINCR] self._primary = self._keys[self._primary_ind] self._unique_keys = self._options[DBMapper.INDEX_UNIQUE] self.__generate_structure() self.__generate_getters() self.__generate_setters() self.__generate_properties() def __execute(self, cur, sql, fetch=FETCH_ONE, limit=-1, args=()): """ __execute(self, cur => pointer to database cursor sql => sql query to execute fetch => amount of results to fetch limit => query limit if not use FETCH_ONE args => query arguments to parse in) Filters, quotes, and executes the provided sql query and returns a list of database rows. """ query = sql try: if len(args) >= 1: cur.execute("select " + ", ".join(["quote(?)" for i in args]), args) quoted_values = cur.fetchone() for quoted_value in quoted_values: query = query.replace('?', str(quoted_value), 1) except: pass try: cur.execute(query) except (sqlite3.ProgrammingError): try: cur.execute(query, args) except Exception as e: raise DBMapperException( "Error while executing query [%s]" % (query), cause=e) except Exception as e: raise DBMapperException( "Error while executing query [%s]" % (query), cause=e) if fetch == DBMapper.FETCH_ONE: return cur.fetchone() elif fetch == DBMapper.FETCH_MANY: if limit == -1: limit = cur.arraysize return cur.fetchmany(size=limit) elif fetch == DBMapper.FETCH_ALL: return cur.fetchall() else: return cur.fetchall() def __get_table_info(self, table=None): """ __get_table_info(self, table) Returns pragma information for a table. """ table = self._table if table is None else table cur = self._db.cursor() query = "pragma table_info(%s)" % (table) return self.__execute(cur, query, fetch=DBMapper.FETCH_ALL) def __generate_structure(self): """ __generate_structure(self) Generates table structure for determining column updates and search information. """ # use pragma constructs to get table into tblinfo = self.__get_table_info() # create the table if the statement does not exist if len(tblinfo) == 0: ins = zip(self._keys, self._keytypes) typarr = [] for pair in ins: if pair[0] == self._primary: # identifier type primary key if self._autoincr_ind: typarr.append("%s %s primary key autoincrement" % ( pair[0], pair[1])) else: typarr.append("%s %s primary key" % (pair[0], pair[1])) elif pair[0] in self._unique_keys: typarr.append("%s %s unique" % (pair[0], pair[1])) else: # identifier type typarr.append("%s %s" % (pair[0], pair[1])) cur = self._db.cursor() # create table if not exists <table> (<typarr>) query = "create table if not exists %s (%s)" % \ (self._table, ', '.join(typarr)) self.__execute(cur, query) # make sure table columns are up to date. if len(tblinfo) > 0: # use pragma table info to build database schema schema_ids = [] schema_types = [] for col in tblinfo: schema_ids.append(col[1]) schema_types.append(col[2]) # use schema to determine / apply database updates schema_updates = [] for pair in zip(self._keys, self._keytypes): if pair[0] in schema_ids: continue else: schema_updates.append("%s %s" % (pair[0], pair[1])) for defn in schema_updates: query = "alter table %s add column %s" % (self._table, defn) cur = self._db.cursor() self.__execute(cur, query) # generate full text search table that corresponds with this dbo if self._options[DBMapper.GENERATE_FTS_VT]: if len(self.__get_table_info("_search_%s" % (self._table))) == 0: cur = self._db.cursor() # fts4 table doesn't exist, make it. query = "create virtual table _search_%s using fts4(%s, content='%s')" % \ (self._table, ','.join(self._keys), self._table) self.__execute(cur, query) # create pre/post update/delete triggers for cascading updates # XXX - [trigger warning] DO WE NEED THE TRIGGERS query = "create trigger _%s_bu before update on %s begin delete from _search_%s where docid=old.rowid; end;" % \ (self._table, self._table, self._table) self.__execute(cur, query) query = "create trigger _%s_bd before delete on %s begin delete from _search_%s where docid=old.rowid; end;" % \ (self._table, self._table, self._table) self.__execute(cur, query) search_keys = ','.join(['docid'] + self._keys[1:]) target_keys = ','.join(['new.' + vkey for vkey in self._keys]) query = "create trigger _%s_au after update on %s begin insert into _search_%s(%s) values(%s); end;" % \ (self._table, self._table, self._table, search_keys, target_keys) self.__execute(cur, query) query = "create trigger _%s_ai after insert on %s begin insert into _search_%s(%s) values(%s); end;" % \ (self._table, self._table, self._table, search_keys, target_keys) self.__execute(cur, query) self._db.commit() def __generate_getters(self): """ __generate_getters(self) Generates magical getter methods for pull data from the underlying database. """ for _key in self._keys: def getter_templ(self, __key=_key): if __key not in self._keys: return cur = self._db.cursor() # select * from table where key=<key> query = "select %s from %s where %s=?" % ( __key, self._table, self._primary) result = self.__execute( cur, query, args=(getattr(self, "_%s" % (self._primary)),)) try: return result[0] except: return result setattr(self, "get_" + _key, types.MethodType(getter_templ, self)) def __generate_setters(self): for _key in self._keys: def setter_templ(self, value, __key=_key): if __key not in self._keys: return cur = self._db.cursor() # update table set key=value where primary=id query = "update %s set %s=? where %s=?" % ( self._table, __key, self._primary) self.__execute( cur, query, args=(value, getattr(self, "_%s" % (self._primary)),)) self._db.commit() setattr(self, "_%s" % (__key), value) setattr(self, "set_" + _key, types.MethodType(setter_templ, self)) def __generate_properties(self): for _key in self._keys: setattr(self, "_%s" % (_key), None) for _key in self._keys: getf = getattr(self, "get_%s" % (_key)) setf = getattr(self, "set_%s" % (_key)) setattr(self, _key, property(getf, setf, None, "[%s] property")) def create(self): cur = self._db.cursor() vals = [] for key in self._keys: if key == self._primary and self._autoincr_ind: vals.append(None) # Put None in for the index because autoinc else: vals.append(getattr(self, "_%s" % (key))) qst = ', '.join(["?" for item in vals]) query = "insert into %s values (%s)" % (self._table, qst) self.__execute(cur, query, args=vals) setattr(self, "_%s" % (self._primary), cur.lastrowid) def delete(self): cur = self._db.cursor() qst = "%s=?" % (self._keys[self._primary_ind]) primary_val = getattr(self, "_%s" % (self._keys[self._primary_ind])) query = "delete from %s where (%s)" % (self._table, qst) self.__execute(cur, query, args=(primary_val,))
class DBResultList(list): def __init__(self, extend=None): if isinstance(extend, list): for item in extend: if isinstance(item, DBMapper): self.append(item) else: continue def filter_equals(self, key, val): """ filter_equals(key, val) -> filters database find result based on key-value equality. """ res = DBResultList() for dbo in self: try: if getattr(dbo, "_%s" % (key)) == val: res.append(dbo) else: continue except: continue return res def filter_iequals(self, key, val): """ filter_iequals(key, val) -> filters database find result based on case insensitive key-value equality. assumes that db attribute and val are strings. """ res = DBResultList() for dbo in self: try: if getattr(dbo, "_%s" % (key)).lower() == val.lower(): res.append(dbo) else: continue except: continue return res def filter_inequals(self, key, val): """ filter_inequals(key, val) -> filters database find result based on key-value inequality. """ res = DBResultList() for dbo in self: try: if getattr(dbo, "_%s" % (key)) != val: res.append(dbo) else: continue except: continue return res def filter_regex(self, key, regex): """ filter_regex(key, regex) -> filters database find result based on regex value matching. """ res = DBResultList() for dbo in self: try: if re.match(regex, getattr(dbo, "_%s" % (key))) is not None: res.append(dbo) else: continue except: continue return res class DBMapperException(Exception): def __init__(self, message, cause=None): if cause is not None: super(DBMapperException, self).__init__( message + u', caused by ' + repr(cause)) elif cause is None: super(DBMapperException, self).__init__(message) self.message = message self.cause = cause def __str__(self): return repr(self.message) + u', caused by ' + repr(self.cause)