.. _db-py_data: ##### db.py ##### ***************************** Writing fast queries/indexes: ***************************** First rule of writing indexes is: If it doesn't make a big impact, don't do it. Every index will slow down every insert into the table it's made for so the more inserts you have to a given table, the more careful you need to be here. If you do make a new index, bear in mind that you need to put the variable key (there should be only one) to the last (f.e. iteration, branch, date, if you have a lot of queries with "WHERE iteration=? and branch=? and date, etc)). If there are no other variables, put the date column for last (if you have it), as its handling appears to be a bit different to text and numbers. When writing queries, you should have the parameters in the same order that they are in the index and again, it's preferrable to have the variable key(s) (including date) for last. *********************** class SQLiteDB(object): *********************** This object is mainly to be used as a base class. It will connect to the given database file (and wipe it if need be). It also houses a close method for when you're done (and an open if you decide to continue later, in which case a new connection to the original file is formed) def __init__(self, db_path='default.db', wipe=False, memory=False): =================================================================== Connects to the given database file. Destroys the old one first if wipe = True. Stores to db in memory only if memory is set to True:: >>> from datetime import date >>> from simo.db.datadb import db >>> from pysqlite2 import dbapi2 as sqlite3 >>> import hashlib >>> test = db.SQLiteDB('test_mock.db', wipe=True) >>> sql = 'CREATE TABLE "test" (id INTEGER PRIMARY KEY, message TEXT)' >>> c = test.execute(sql) >>> sql = 'INSERT INTO "test" (message) VALUES ("Test text")' >>> c = test.execute(sql) >>> iter = test.get('SELECT "test".* FROM "test"') >>> for row in iter: ... print row (1, u'Test text') >>> test.close() >>> test = db.SQLiteDB(memory=True) >>> sql = "CREATE TABLE test (id INTEGER PRIMARY KEY, message TEXT)" >>> c = test.execute(sql) >>> c = test.execute('INSERT INTO test (message) VALUES ("Test text")') >>> iter = test.get('SELECT * FROM test') >>> for row in iter: ... print row (1, u'Test text') >>> test.close() >>> test.open() >>> try: ... iter = test.get('SELECT * FROM test') ... except sqlite3.OperationalError, e: ... print e no such table: test >>> test.close() def open(self): =============== Re-opens a connection to the database file given in init. :: >>> test = db.SQLiteDB(db_path='test_mock.db', wipe=False) >>> test.close() >>> try: ... cursor = test.conn.cursor() ... except sqlite3.ProgrammingError, e: ... print 'Database closed;', e.args[0] Database closed; Cannot operate on a closed database. >>> test.open() >>> iter = test.get('SELECT * FROM test') >>> for row in iter: ... print row (1, u'Test text') def close(self): ================ Commits any lingering changes and closes the database connection. :: >>> test = db.SQLiteDB(db_path='test_mock.db', wipe=False) >>> c = test.execute('CREATE TABLE fun (id INTEGER PRIMARY KEY)') >>> c = test.execute('INSERT INTO fun VALUES (1)') >>> test.close() >>> test.open() >>> iter = test.get('SELECT * FROM fun') >>> for row in iter: ... print row (1,) >>> test.close() *********************** class DataDB(SQLiteDB): *********************** The object will connect to a given database file on init or create one if needed, using the provided Lexicon for the table structure. It will also create a log table on initialization. If the make_new_db parameter (default True) is not set to False, an existing database, if any, will be destroyed. def __init__(self, db_type, content_def, hierarchy, level_meta, opres_def, cf_cfiers, logger, db_path='default.db', make_new_db=False, memory=False, spatial=False, constraints=None, db_host='localhost', db_port='5432', user='test', pw='test', db_class='SQLITE'): ===================================================================================================================================================================================================================================================================== Connects to the given database file. Destroys the old file and sets up the new one with the given lexicon, if make_new_db==True and sets up initial values (self.content_def, self.hierarchy), will complain of changed schema if db_type is 'write' and make_new_db is False and the schema (content_def) has changed. If using PostgreSQL (db_class='POSTGRESQL'), host, port, username and password must be defined:: >>> from simo.db.datadb.test import mock_db >>> from pprint import pprint >>> hierarchy = mock_db.hierarchy >>> content_def = mock_db.base_cd >>> levels = mock_db.base_levels >>> opres_def = mock_db.opres_def >>> cf_cfiers = mock_db.cf_cfiers >>> test = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, levels, ... opres_def, ... cf_cfiers, mock_db.MockLogger(), 'test_mock.db', ... make_new_db=True, memory=False, spatial=(3067, None), ... create_branch_desc=True, track_solution=True) def _create_meta_tables(self): ============================== Sets up tables for storing the numerical and categorical attribute definitions def _set_meta_tables(self, level_meta): ======================================= Writes numerical and categorical variable definitions to the meta tables:: >>> res = test.db.get('SELECT * from META_NUM') >>> pprint(res) [(1, u'tree', u'd', None, None, None, u'Diameter...'), (2, u'sample_esplot', u'BA', None, None, None, u'Basal area...'), (3, u'simulation', u'DIAM_CLASS_WIDTH', None, None, None, u'Width of a single class...'), (4, u'sample_tree', u'h', None, None, None, u'Height...'), (5, u'sample_plot', u'BA', None, None, None, u'Basal area...'), (6, u'stratum', u'N', None, None, None, u'Number of stems...')] >>> res = test.db.get('SELECT * from META_CAT') >>> pprint(res) [(1, u'stratum', u'SP', None, None, u'Tree species...'), (2, u'comp_unit', u'SC', None, None, u'Site class...')] >>> sql = 'SELECT * from META_CAT_VAL' >>> res = test.db.get(sql) >>> pprint(res) [(1, 1, 1.0, u'Pine'), (2, 1, 2.0, u'Spruce'), (3, 2, 1.0, u'Very good')] def _create_tabledef(self, defkey, content_def=None, levels=None): ================================================================== Creates table definitions for each data level in content definition. Table definition is stored keyed with the level name and stores the level index, parent level name, attribute names and corresponding matrix indices, optional geometry type of the level, and a set of attributes names that are of type TEXT:: >>> metadata = test._get_table_columns('comp_unit') >>> metadata #doctest: +NORMALIZE_WHITESPACE [u'data_id', u'id', u'oid', u'pid', u'SC', u'StandLabel', u'geom'] >>> cu = test.table_def['comp_unit'] >>> assert cu['attr ind'] == [0, 1] >>> assert cu['attr name'] == [u'SC', u'StandLabel'] >>> assert cu['columns'] == set([u'data_id', u'oid', u'pid', ... u'StandLabel', u'SC', u'geom', u'id']) >>> assert cu['create attr'] == u' "SC" DOUBLE PRECISION,\n "StandLabel" TEXT,\n' >>> assert cu['float attr ind'] == [0] >>> assert cu['float attr name'] == [u'SC'] >>> assert cu['float insert sql'] == u'INSERT INTO comp_unit '\ ... '(data_id, id, oid, pid, "SC") '\ ... 'VALUES (?, ?, ?, ?, ?)' >>> assert cu['geom type'] == 'MULTIPOLYGON' >>> assert cu['geom update sql'] == 'UPDATE comp_unit '\ ... 'SET geom=GeomFromText(?, ?) WHERE '\ ... 'data_id=? AND id=?' >>> assert cu['insert sql'] == u'INSERT INTO comp_unit (data_id, id, '\ ... 'oid, pid, '\ ... '"SC", "StandLabel") '\ ... 'VALUES (?, ?, ?, ?, ?, ?)' >>> assert cu['level'] == 2 >>> assert cu['other attr name'] == [u'StandLabel'] >>> assert cu['parent name'] == 'estate' >>> assert cu['text vars'] == set(['StandLabel']) >>> assert cu['to remove'] == [1] def _get_parent_level(self, my_hierarchy): =============================================== Will get a parent_level value from a given hierarchy item (from self.lexicon.hierarchy) or None, if the item is root (level 0):: >>> test._get_parent_level(test.hierarchy[2]) 1 >>> test._get_parent_level(test.hierarchy[0]) def _create_table(self, lname): ================================== Based on the table definitions, creates new tables in the db. For existing tables adds new columns if the content definition has changed since the table was created. If the schema, i.e. the lexicon on which the database is based, is changed between runs and make_new_db is set to False, an error is triggered:: >>> lname = 'comp_unit' >>> metadata = test._get_table_columns(lname) >>> print metadata # doctest: +NORMALIZE_WHITESPACE [u'data_id', u'id', u'oid', u'pid', u'SC', u'StandLabel', u'geom'] >>> test.db.execute("INSERT INTO simulation "\ ... "(id, oid) "\ ... "VALUES (?, ?)", ... ('simulation', 'simulation')) >>> test.db.execute("INSERT INTO estate "\ ... "(id, oid, pid) "\ ... "VALUES (?, ?, ?)", ... ('estate', 'estate', ... 'simulation')) >>> lind, id, iterr, branch, ddate = 2, 'id', 0, 0, date(2011, 11, 11) >>> data_hash = str(lind) + id + str(iterr) + str(branch) + str(ddate) >>> data_id = hashlib.md5(data_hash).hexdigest() >>> test.db.execute("INSERT INTO data_link VALUES (?, ?, ?, ?, ?, ?)", ... ('comp_unit', 'id', 0, 0, date(2011, 11, 11), data_id)) >>> test.db.execute("INSERT INTO comp_unit "\ ... "VALUES (?, ?, ?, ?, ?, ?, MPolyFromText(?, ?))", ... (data_id, 'id', 'oid', 'estate', 5.7, 8.9, ... 'MULTIPOLYGON(((1 1,5 1,5 5,1 5,1 1),'\ ... '(2 2,2 3,3 3,3 2,2 2)),'\ ... '((6 3,9 2,9 4,6 3)))', db.SRID)) >>> test.db.get('SELECT * FROM comp_unit') ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE [(u'dfbf3e2bfab1b484e80268a83af829ab', u'id', u'oid', u'estate', 5.7..., u'8.9...', ...)] >>> test.close() >>> content_def = mock_db.aug_cd >>> levels = mock_db.aug_levels >>> try: ... test = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, levels, ... opres_def, cf_cfiers, mock_db.MockLogger(), 'test_mock.db', ... make_new_db=False, memory=False, spatial=(3067, None), ... create_branch_desc=True, track_solution=True) ... except ValueError, e: ... print e[0] # doctest: +NORMALIZE_WHITESPACE ... except Exception, e: ... print str(e) simo.db.datadb.db, warning, Lexicon has changed! Updating comp_unit. simo.db.datadb.db, warning, We can't alter column- or geometry types in this fashion. If you need those changed, wipe the existing database. >>> backup = content_def[('comp_unit', 2)].pop() >>> try: ... test = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, levels, ... opres_def, cf_cfiers, mock_db.MockLogger(), 'test_mock.db', ... make_new_db=False, memory=False, spatial=(3067, None), ... create_branch_desc=True, track_solution=True) ... except ValueError, e: ... print e[0] # doctest: +NORMALIZE_WHITESPACE simo.db.datadb.db, warning, Lexicon has changed! Updating comp_unit. simo.db.datadb.db, warning, We can't alter column- or geometry types in this fashion. If you need those changed, wipe the existing database. simo.db.datadb.db, warning, Lexicon change resulted to a drop request for column(s) (AnotherTextVar). Columns can't be dropped in SQLite! This should only cause increased database size, however. >>> content_def[('comp_unit', 2)].append(backup) >>> try: ... test = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, levels, ... opres_def, cf_cfiers, mock_db.MockLogger(), 'test_mock.db', ... make_new_db=False, memory=False, spatial=(3067, None), ... create_branch_desc=True, track_solution=True) ... except ValueError, e: ... print e[0] # doctest: +NORMALIZE_WHITESPACE >>> metadata = test._get_table_columns(lname) >>> print metadata # doctest: +NORMALIZE_WHITESPACE [u'data_id', u'id', u'oid', u'pid', u'SC', u'StandLabel', u'geom', u'AnotherTextVar', u'BA', u'AnotherCatVar'] >>> test.db.get('SELECT * FROM comp_unit') ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE [(u'dfbf3e2bfab1b484e80268a83af829ab', u'id', u'oid', u'estate', 5.7..., u'8.9', , None, None, None)] def _create_timber_price_table(self, cf_cfiers): ================================================ Build a datatable for storing timber prices. The cf_cfiers are the additional classifiers required. >>> set(test._get_table_columns('timber_price')) == \ ... set([u'id', u'iteration', u'branch', u'data_date', u'assortment', ... u'SP', u'price_table', u'price']) True >>> test._create_timber_price_table(cf_cfiers) >>> test._create_timber_price_table(cf_cfiers|set(['some', 'more'])) ... # doctest: +NORMALIZE_WHITESPACE simo.db.datadb.db, warning, Timber price classifiers have changed! >>> set(test._get_table_columns('timber_price')) == \ ... set([u'id', u'iteration', u'branch', u'data_date', u'assortment', ... u'SP', u'price_table', u'price', u'some', u'more']) True >>> test._create_timber_price_table(cf_cfiers-set(['some', 'more'])) ... # doctest: +NORMALIZE_WHITESPACE simo.db.datadb.db, warning, Timber price classifiers have changed! simo.db.datadb.db, warning, Timber price classifier change resulted to a drop request for column(s) (some, more). Columns can't be dropped in SQLite! This should only cause increased database size, however. >>> set(test._get_table_columns('timber_price')) == \ ... set([u'id', u'iteration', u'branch', u'data_date', u'assortment', ... u'SP', u'price_table', u'price', u'some', u'more']) True >>> test.close() >>> test = db.DataDB('write', (2, 'comp_unit'), content_def, ... hierarchy, levels, opres_def, ... cf_cfiers, mock_db.MockLogger(), 'test_mock.db', ... make_new_db=True, memory=False, spatial=(3067, None), ... create_branch_desc=True, track_solution=True) def add_data_from_dictionary(self, data, iter, branch): ======================================================= Adds new data from a directory {levelname: [(ddate, {'id':id, 'oid': original_id, 'parent id': pid, 'values': [('key', value), ('key', value)]}), ...], ...}:: >>> ddate = date(2009, 2, 2) >>> for i_b, data in mock_db.data_dict.items(): ... test.add_data_from_dictionary(data, i_b[0], i_b[1]) >>> sql = 'SELECT data_id, id, oid, pid, BA, SC, "StandLabel" '\ ... 'FROM comp_unit ORDER BY '\ ... 'data_id' >>> res = test.db.get(sql) >>> for row in res: ... print row #doctest: +NORMALIZE_WHITESPACE (u'6362e8cae83710615afcf6421cc03d20', u'stand2', u'o-stand2', u'estate1', 91.0, 9.0, u'six#$!"\'"\xe4\xf6') (u'76e7844f84e2a0f5cc1ad63994e9916f', u'stand1', u'o-stand1', u'estate1', 90.0, 9.0, u"five'") (u'77268ff29f302f9723bc88214a7c0dd6', u'stand2', u'o-stand2', u'estate1', 11.0, 1.0, u'two') (u'b5ad2a1b333491a3298b39c48596122e', u'stand2', u'o-stand2', u'estate1', 21.0, 2.0, u'four') (u'be062d5143706801364cfd2d7296661a', u'stand1', u'o-stand1', u'estate1', 20.0, 2.0, u'three') (u'f74874ac2247f3cf504fe398e491c2ae', u'stand1', u'o-stand1', u'estate1', 10.0, 1.0, u'one') >>> sql = 'SELECT data_id, id, d FROM tree ORDER BY data_id' >>> res = test.db.get(sql) >>> for row in res: ... print row #doctest: +NORMALIZE_WHITESPACE (u'6362e8cae83710615afcf6421cc03d20', u'tree2-1-1', 93.0) (u'6362e8cae83710615afcf6421cc03d20', u'tree2-2-1', 94.0) (u'6362e8cae83710615afcf6421cc03d20', u'tree2-2-2', 95.0) (u'76e7844f84e2a0f5cc1ad63994e9916f', u'tree1-1-1', 91.0) (u'76e7844f84e2a0f5cc1ad63994e9916f', u'tree1-2-1', 92.0) (u'77268ff29f302f9723bc88214a7c0dd6', u'tree2-1-1', 13.0) (u'77268ff29f302f9723bc88214a7c0dd6', u'tree2-2-1', 14.0) (u'77268ff29f302f9723bc88214a7c0dd6', u'tree2-2-2', 15.0) (u'be062d5143706801364cfd2d7296661a', u'tree1-1-1', 21.0) (u'be062d5143706801364cfd2d7296661a', u'tree1-2-1', 22.0) (u'f74874ac2247f3cf504fe398e491c2ae', u'tree1-1-1', 11.0) (u'f74874ac2247f3cf504fe398e491c2ae', u'tree1-2-1', 12.0) >>> sql = 'SELECT data_id, id, h FROM sample_tree ORDER BY data_id' >>> res = test.db.get(sql) >>> for row in res: ... print row #doctest: +NORMALIZE_WHITESPACE (u'76e7844f84e2a0f5cc1ad63994e9916f', u'sample_tree1-1', 91.0) (u'76e7844f84e2a0f5cc1ad63994e9916f', u'sample_tree1-2', 92.0) (u'be062d5143706801364cfd2d7296661a', u'sample_tree1-1', 41.0) (u'be062d5143706801364cfd2d7296661a', u'sample_tree1-2', 42.0) (u'f74874ac2247f3cf504fe398e491c2ae', u'sample_tree1-1', 31.0) (u'f74874ac2247f3cf504fe398e491c2ae', u'sample_tree1-2', 32.0) >>> sql = 'SELECT DISTINCT data_level FROM data_link' >>> res = test.db.get(sql) >>> len(res[0]) 1 >>> res[0][0] u'comp_unit' def fill_matrix(self, handler, lind, full_ids, iter=0, branch=0): ================================================================= Fills a data matrix with data for the objects identified by the level and a set of object ids. The matrix is filled for all data levels and objects linked to from the given level and ids in top down order; i.e., simulation level first. All lineages are filled as well:: >>> sql = """UPDATE comp_unit SET StandLabel=?""" >>> test.db.execute(sql, [u'Kuvionumero #"!$€\'']) >>> obj_ids = [('stand1', 'o-stand1'), ('stand2', 'o-stand2')] >>> test.fill_matrix(mock_db.handler, 2, obj_ids) ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE >>> pprint(mock_db.handler.added_ids) [[(u'sim1', u'o-sim1')], [(u'estate1', u'o-estate1')], [('stand1', 'o-stand1')], [(u'stratum1-1', u'o-stratum1-1'), (u'stratum1-2', u'o-stratum1-2')], [(u'tree1-1-1', u'o-tree1-1-1')], [(u'tree1-2-1', u'o-tree1-2-1')], [(u'plot1', u'o-plot1')], [(u'sample_tree1-1', u'o-sample_tree1-1'), (u'sample_tree1-2', u'o-sample_tree1-2')], [('stand2', 'o-stand2')], [(u'stratum2-1', u'o-stratum2-1'), (u'stratum2-2', u'o-stratum2-2')], [(u'tree2-1-1', u'o-tree2-1-1')], [(u'tree2-2-1', u'o-tree2-2-1'), (u'tree2-2-2', u'o-tree2-2-2')]] def add_data_from_matrix(self, ddate, matrix, ind2id, links, blocked, terminated, main_level, text_data, update=False, start_iter=0): ========================================================================================================================== From the base level down, writes the data from the matrix using the matrix index mapping stored in the table def. Starting iteration is added to the original iteration index. NB! in update mode the geometry and text attribute content is lost for children of the base level. When not updating, they are lost from all levels:: >>> test.close() >>> test = db.DataDB('write', (2, 'comp_unit'), content_def, ... hierarchy, levels, opres_def, ... cf_cfiers, mock_db.MockLogger(), 'test_mock.db', ... make_new_db=True, memory=False, spatial=(3067, None), ... create_branch_desc=True, track_solution=True) >>> test.add_data_from_matrix(mock_db.matrixdates, mock_db.datamatrix, ... mock_db.Ind2Id(), mock_db.links, set([]), ... set([]), mock_db.main_level, mock_db.text_data) >>> iter = test.db.get("SELECT * FROM simulation") >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (None, u'sim1', u'o-sim1', None, 1.0) >>> sql = 'SELECT * FROM estate' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (None, u'estate1', u'o-estate1', u'sim1', None) >>> sql = 'SELECT * FROM comp_unit ORDER BY data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'6362e8cae83710615afcf6421cc03d20', u'stand2', u'o-stand2', u'estate1', 91.0, 9.0, 8.0, None, None, None) (u'76e7844f84e2a0f5cc1ad63994e9916f', u'stand1', u'o-stand1', u'estate1', 90.0, 9.0, 8.0, u'Kuvionumero #"!$\xe2\x82\xac\'', None, None) (u'77268ff29f302f9723bc88214a7c0dd6', u'stand2', u'o-stand2', u'estate1', 11.0, 1.0, 2.0, None, None, None) (u'b5ad2a1b333491a3298b39c48596122e', u'stand2', u'o-stand2', u'estate1', 21.0, 2.0, 3.0, None, None, None) (u'be062d5143706801364cfd2d7296661a', u'stand1', u'o-stand1', u'estate1', 20.0, 2.0, 3.0, u'Kuvionumero #"!$\xe2\x82\xac\'', None, None) (u'f74874ac2247f3cf504fe398e491c2ae', u'stand1', u'o-stand1', u'estate1', 10.0, 1.0, 2.0, u'Kuvionumero #"!$\xe2\x82\xac\'', None, None) NB! Above stand2 should really have Standlabel as u'Kuvionumero' but due to incompleteness of the mock handler, it gets overridden as None. :: >>> sql = 'SELECT * FROM stratum ORDER BY data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'6362e8cae83710615afcf6421cc03d20', u'stratum2-1', u'o-stratum2-1', u'stand2', 920.0, 9.0) (u'6362e8cae83710615afcf6421cc03d20', u'stratum2-2', u'o-stratum2-2', u'stand2', 930.0, 9.0) (u'76e7844f84e2a0f5cc1ad63994e9916f', u'stratum1-1', u'o-stratum1-1', u'stand1', 900.0, 9.0) (u'76e7844f84e2a0f5cc1ad63994e9916f', u'stratum1-2', u'o-stratum1-2', u'stand1', 910.0, 9.0) (u'77268ff29f302f9723bc88214a7c0dd6', u'stratum2-1', u'o-stratum2-1', u'stand2', 120.0, 1.0) (u'77268ff29f302f9723bc88214a7c0dd6', u'stratum2-2', u'o-stratum2-2', u'stand2', 130.0, 1.0) (u'be062d5143706801364cfd2d7296661a', u'stratum1-1', u'o-stratum1-1', u'stand1', 200.0, 2.0) (u'be062d5143706801364cfd2d7296661a', u'stratum1-2', u'o-stratum1-2', u'stand1', 210.0, 2.0) (u'f74874ac2247f3cf504fe398e491c2ae', u'stratum1-1', u'o-stratum1-1', u'stand1', 100.0, 1.0) (u'f74874ac2247f3cf504fe398e491c2ae', u'stratum1-2', u'o-stratum1-2', u'stand1', 110.0, 1.0) >>> sql = 'SELECT * FROM tree ORDER BY data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'6362e8cae83710615afcf6421cc03d20', u'tree2-1-1', u'o-tree2-1-1', u'stratum2-1', 93.0, None) (u'6362e8cae83710615afcf6421cc03d20', u'tree2-2-1', u'o-tree2-2-1', u'stratum2-2', 94.0, None) (u'6362e8cae83710615afcf6421cc03d20', u'tree2-2-2', u'o-tree2-2-2', u'stratum2-2', 95.0, None) (u'76e7844f84e2a0f5cc1ad63994e9916f', u'tree1-1-1', u'o-tree1-1-1', u'stratum1-1', 91.0, None) (u'76e7844f84e2a0f5cc1ad63994e9916f', u'tree1-2-1', u'o-tree1-2-1', u'stratum1-2', 92.0, None) (u'77268ff29f302f9723bc88214a7c0dd6', u'tree2-1-1', u'o-tree2-1-1', u'stratum2-1', 13.0, None) (u'77268ff29f302f9723bc88214a7c0dd6', u'tree2-2-1', u'o-tree2-2-1', u'stratum2-2', 14.0, None) (u'77268ff29f302f9723bc88214a7c0dd6', u'tree2-2-2', u'o-tree2-2-2', u'stratum2-2', 15.0, None) (u'be062d5143706801364cfd2d7296661a', u'tree1-1-1', u'o-tree1-1-1', u'stratum1-1', 21.0, None) (u'be062d5143706801364cfd2d7296661a', u'tree1-2-1', u'o-tree1-2-1', u'stratum1-2', 22.0, None) (u'f74874ac2247f3cf504fe398e491c2ae', u'tree1-1-1', u'o-tree1-1-1', u'stratum1-1', 11.0, None) (u'f74874ac2247f3cf504fe398e491c2ae', u'tree1-2-1', u'o-tree1-2-1', u'stratum1-2', 12.0, None) >>> sql = 'SELECT * FROM sample_plot ORDER BY data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'76e7844f84e2a0f5cc1ad63994e9916f', u'plot1', u'o-plot1', u'stand1', 91.0, None) (u'be062d5143706801364cfd2d7296661a', u'plot1', u'o-plot1', u'stand1', 31.0, None) (u'f74874ac2247f3cf504fe398e491c2ae', u'plot1', u'o-plot1', u'stand1', 21.0, None) >>> sql = 'SELECT * FROM sample_tree ORDER BY data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'76e7844f84e2a0f5cc1ad63994e9916f', u'sample_tree1-1', u'o-sample_tree1-1', u'plot1', 91.0) (u'76e7844f84e2a0f5cc1ad63994e9916f', u'sample_tree1-2', u'o-sample_tree1-2', u'plot1', 92.0) (u'be062d5143706801364cfd2d7296661a', u'sample_tree1-1', u'o-sample_tree1-1', u'plot1', 41.0) (u'be062d5143706801364cfd2d7296661a', u'sample_tree1-2', u'o-sample_tree1-2', u'plot1', 42.0) (u'f74874ac2247f3cf504fe398e491c2ae', u'sample_tree1-1', u'o-sample_tree1-1', u'plot1', 31.0) (u'f74874ac2247f3cf504fe398e491c2ae', u'sample_tree1-2', u'o-sample_tree1-2', u'plot1', 32.0) >>> sql = 'SELECT * FROM sample_estate' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE >>> sql = 'SELECT * FROM sample_esplot ORDER BY data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE Updating causes the levels from simulation level onwards to to be updated to new dates. Levels above the simulation level will be untouched. Text and geometry attributes survive the update on the simulation level, below base level not:: >>> from datetime import date >>> sql = """UPDATE comp_unit SET StandLabel='my text' WHERE id='stand1'""" >>> c = test.db.execute(sql) >>> #TODO: Why doesn't nosetests work with these queries? >>> #sql = 'UPDATE comp_unit SET geom=GeomFromText(?, ?) WHERE id=?' >>> #data = ('POLYGON((1,1),(2,2),(3,1),(1.1))', 3067, 'stand1') >>> #c = test.db.get(sql, data) >>> #sql = 'UPDATE tree SET geom=GeomFromText(?, ?) WHERE id=?' >>> #data = ('POINT(1,1)', 3067, 'tree1-1-1') >>> #c = test.db.get(sql, data) >>> sql = 'SELECT * FROM data_link order by id, data_date' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'comp_unit', u'stand1', 0, 1, datetime.date(1, 1, 1), u'be062d5143706801364cfd2d7296661a') (u'comp_unit', u'stand1', 1, 0, datetime.date(1, 1, 1), u'76e7844f84e2a0f5cc1ad63994e9916f') (u'comp_unit', u'stand1', 0, 0, datetime.date(1, 1, 1), u'f74874ac2247f3cf504fe398e491c2ae') (u'comp_unit', u'stand2', 0, 1, datetime.date(1, 2, 2), u'b5ad2a1b333491a3298b39c48596122e') (u'comp_unit', u'stand2', 1, 0, datetime.date(1, 2, 2), u'6362e8cae83710615afcf6421cc03d20') (u'comp_unit', u'stand2', 0, 0, datetime.date(1, 2, 2), u'77268ff29f302f9723bc88214a7c0dd6') >>> test.add_data_from_matrix(mock_db.up_dates, mock_db.datamatrix, ... mock_db.Ind2Id(), mock_db.links, set([]), set([]), ... mock_db.main_level, mock_db.text_data, update=True) >>> sql = 'SELECT * FROM data_link order by id, data_date' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'comp_unit', u'stand1', 0, 1, datetime.date(2009, 1, 1), u'6fb751ef39fd73331608b29660235a42') (u'comp_unit', u'stand1', 1, 0, datetime.date(2009, 1, 1), u'c18de6ee8b8ce1c936900f23bf27c8ca') (u'comp_unit', u'stand1', 0, 0, datetime.date(2009, 1, 1), u'349c83e6e20a3071535ca97e6f0f7a8f') (u'comp_unit', u'stand2', 0, 1, datetime.date(2009, 2, 2), u'4b88c3e28e7309341be79354f07550a1') (u'comp_unit', u'stand2', 1, 0, datetime.date(2009, 2, 2), u'a587df930d07a47d8f2402c41cd1e67c') (u'comp_unit', u'stand2', 0, 0, datetime.date(2009, 2, 2), u'7f1a4986f68fa131fc45f3b688b191ce') >>> sql = 'SELECT * FROM simulation' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (None, u'sim1', u'o-sim1', None, 1.0) >>> sql = 'SELECT * FROM estate' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (None, u'estate1', u'o-estate1', u'sim1', None) >>> sql = 'SELECT * FROM comp_unit ORDER BY data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'349c83e6e20a3071535ca97e6f0f7a8f', u'stand1', u'o-stand1', u'estate1', 10.0, 1.0, 2.0, u'my text', None, None) (u'4b88c3e28e7309341be79354f07550a1', u'stand2', u'o-stand2', u'estate1', 21.0, 2.0, 3.0, None, None, None) (u'6fb751ef39fd73331608b29660235a42', u'stand1', u'o-stand1', u'estate1', 20.0, 2.0, 3.0, u'my text', None, None) (u'7f1a4986f68fa131fc45f3b688b191ce', u'stand2', u'o-stand2', u'estate1', 11.0, 1.0, 2.0, None, None, None) (u'a587df930d07a47d8f2402c41cd1e67c', u'stand2', u'o-stand2', u'estate1', 91.0, 9.0, 8.0, None, None, None) (u'c18de6ee8b8ce1c936900f23bf27c8ca', u'stand1', u'o-stand1', u'estate1', 90.0, 9.0, 8.0, u'my text', None, None) >>> sql = 'SELECT id, BA, SC, AnotherCatVar, StandLabel '\ ... 'FROM comp_unit ORDER BY data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'stand1', 10.0, 1.0, 2.0, u'my text') (u'stand2', 21.0, 2.0, 3.0, None) (u'stand1', 20.0, 2.0, 3.0, u'my text') (u'stand2', 11.0, 1.0, 2.0, None) (u'stand2', 91.0, 9.0, 8.0, None) (u'stand1', 90.0, 9.0, 8.0, u'my text') >>> sql = 'SELECT id, data_id, geom FROM tree '\ ... 'ORDER BY id, data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'tree1-1-1', u'349c83e6e20a3071535ca97e6f0f7a8f', None) (u'tree1-1-1', u'6fb751ef39fd73331608b29660235a42', None) (u'tree1-1-1', u'c18de6ee8b8ce1c936900f23bf27c8ca', None) (u'tree1-2-1', u'349c83e6e20a3071535ca97e6f0f7a8f', None) (u'tree1-2-1', u'6fb751ef39fd73331608b29660235a42', None) (u'tree1-2-1', u'c18de6ee8b8ce1c936900f23bf27c8ca', None) (u'tree2-1-1', u'7f1a4986f68fa131fc45f3b688b191ce', None) (u'tree2-1-1', u'a587df930d07a47d8f2402c41cd1e67c', None) (u'tree2-2-1', u'7f1a4986f68fa131fc45f3b688b191ce', None) (u'tree2-2-1', u'a587df930d07a47d8f2402c41cd1e67c', None) (u'tree2-2-2', u'7f1a4986f68fa131fc45f3b688b191ce', None) (u'tree2-2-2', u'a587df930d07a47d8f2402c41cd1e67c', None) >>> sql = 'SELECT id, data_id FROM sample_tree '\ ... 'ORDER BY id, data_id' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (u'sample_tree1-1', u'349c83e6e20a3071535ca97e6f0f7a8f') (u'sample_tree1-1', u'6fb751ef39fd73331608b29660235a42') (u'sample_tree1-1', u'c18de6ee8b8ce1c936900f23bf27c8ca') (u'sample_tree1-2', u'349c83e6e20a3071535ca97e6f0f7a8f') (u'sample_tree1-2', u'6fb751ef39fd73331608b29660235a42') (u'sample_tree1-2', u'c18de6ee8b8ce1c936900f23bf27c8ca') >>> sql = 'SELECT * FROM sample_estate' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE >>> sql = 'SELECT * FROM sample_esplot' >>> iter = test.db.get(sql) >>> for item in iter: ... print item # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE def add_branching_info(self, lind, binfo, ind2id, start_iter=0): ================================================================= Adds a new item to the BRANCHING_INFO table. binfo is a dictionary consisting of key and data tuples as (iteration index, child branch index, object index):(parent branch index, branching operation group, operation, branch name, date) :: >>> bdate = date(2009, 7, 8) >>> test.add_branching_info(2, {(0,2,0):(1,'tgrp','top', 'name2', ... 'bgroup2', bdate)}, ... mock_db.ind2id) >>> test.add_branching_info(2, {(0,3,0):(1,'tgrp','top', 'name3', ... 'bgroup3', bdate)}, ... mock_db.ind2id) >>> test.add_branching_info(2, ... {(0,1,0):(0,'tgrp','top', 'name', 'bgroup', bdate)}, ... mock_db.ind2id) ... #doctest: +ELLIPSIS db, error, ... >>> e = test.logger.log >>> 'violates unique' in e or 'not unique' in e True >>> sql = 'SELECT * FROM BRANCHING_INFO ORDER BY from_branch, to_branch' >>> iter = test.db.get(sql) >>> for row in iter: print row # doctest: +NORMALIZE_WHITESPACE (u'stand1', 0, 0, 1, datetime.date(2009, 7, 8), u'tgrp', u'top', u'name', u'bgroup') (u'stand1', 0, 1, 2, datetime.date(2009, 7, 8), u'tgrp', u'top', u'name2', u'bgroup2') (u'stand1', 0, 1, 3, datetime.date(2009, 7, 8), u'tgrp', u'top', u'name3', u'bgroup3') def add_opres(self, levelname, data): ===================================== This adds new rows to the database. data is a list of dictionaries, each containing date, id, iteration, branch, values, op_name and op_group. values is a list of dictionaries containing var-val pairs :: >>> ddate = date(2009, 1, 1) >>> data = [{'date': ddate, ... 'op_id':1, ... 'id':'stand1', ... 'iteration': 0, ... 'branch': 1, ... 'values': [([{'cash_flow': 3.0}, ... {'Volume': 2.0, 'SP': 1., 'assortment': 2.}, ... {'Volume': 6.0, 'SP': 1., 'assortment': 1.}], ... ddate)], ... 'op_name': 'thinning', ... 'op_group': 'cutting', ... 'op_type': 'simulated', ... 'notes': 'These here are the notes!', ... 'materials': ['one material', 'two material']}] >>> test.add_opres('comp_unit', data) [] >>> iter = test.db.get('SELECT * FROM op_link') >>> for row in iter: ... print row # doctest: +NORMALIZE_WHITESPACE (u'stand1', 0, 1, datetime.date(2009, 1, 1), u'4d66c66840df6fdf9c32a51224c72701', u'1', u'comp_unit', u'thinning', u'cutting', u'simulated') >>> iter = test.db.get('SELECT * FROM op_res') >>> for row in iter: ... print row # doctest: +NORMALIZE_WHITESPACE (u'4d66c66840df6fdf9c32a51224c72701', 3.0, None, None, None) (u'4d66c66840df6fdf9c32a51224c72701', None, 1.0, 2.0, 2.0) (u'4d66c66840df6fdf9c32a51224c72701', None, 1.0, 1.0, 6.0) >>> iter = test.db.get('SELECT * FROM op_note') >>> for row in iter: ... print row # doctest: +NORMALIZE_WHITESPACE (u'4d66c66840df6fdf9c32a51224c72701', u'These here are the notes!') >>> iter = test.db.get('SELECT * FROM op_material') >>> for row in iter: ... print row # doctest: +NORMALIZE_WHITESPACE (u'4d66c66840df6fdf9c32a51224c72701', u'one material') (u'4d66c66840df6fdf9c32a51224c72701', u'two material') def fill_dictionary(self, level, oid): ====================================== Returns a dictionary in the format that add_data_from_dictionary uses, containing all the iterations, branches and dates of the given level and oid:: >>> res = test.fill_dictionary('comp_unit', 'stand2') >>> for item in res['comp_unit']: ... ddict = item[1] ... print item[0], ddict['iteration'], ddict['branch'], ddict['values'] ... #doctest: +NORMALIZE_WHITESPACE 2009-02-02 0 0 [(u'BA', 11.0), (u'SC', 1.0), (u'AnotherCatVar', 2.0)] 2009-02-02 0 1 [(u'BA', 21.0), (u'SC', 2.0), (u'AnotherCatVar', 3.0)] 2009-02-02 1 0 [(u'BA', 91.0), (u'SC', 9.0), (u'AnotherCatVar', 8.0)] def get_level_var_headers(self, level): ======================================= Will return the variable headers of the table of the given level :: >>> test.get_level_var_headers(0) ['DIAM_CLASS_WIDTH'] >>> test.get_level_var_headers(1) ['EstateName'] >>> test.get_level_var_headers(3) ['N', 'SP'] def get_data_from_level(self, from_data, headers, level, constraints={}, dates=None, rsort=None, required=None): ================================================================================================================ Will return an iterator for the data (as requested by headers) from the given level with the given constraints based on the given dictionary ({col: [op, val(, val)]}, op one of ('gt', 'ge', 'eq', 'ue', 'le', 'lt', 'in') and multiple vals only for 'in') dates is a tuple with the start and end date from between which the data is retrieved. rsort is the column by which the returned data should be sorted and required is a list of columns that must be non-null for the rows to be included in the results. :: >>> headers = test.get_level_var_headers(4) >>> headers = ['id', 'iteration', 'branch'] + headers >>> constraints = [('id', 'in', ('tree1-1-1', 'tree1-2-1'), 'and'), ... ('branch', 'in', (0, 1))] >>> iter = test.get_data_from_level('data', headers, 4, constraints, ... rsort=('id', 'iteration', 'branch')) >>> for item in iter: ... print item (u'tree1-1-1', 0, 0, 11.0) (u'tree1-1-1', 0, 1, 21.0) (u'tree1-1-1', 1, 0, 91.0) (u'tree1-2-1', 0, 0, 12.0) (u'tree1-2-1', 0, 1, 22.0) (u'tree1-2-1', 1, 0, 92.0) >>> headers = ['id', 'branch', 'op_name', 'cash_flow', 'Volume', ... 'assortment', 'op_date'] >>> iter = test.get_data_from_level('op_res', headers, 0) >>> for item in iter: ... print item # doctest: +NORMALIZE_WHITESPACE (u'stand1', 1, u'thinning', 3.0, 8.0, 3.0, datetime.date(2009, 1, 1)) >>> iter = test.get_data_from_level('op_res', headers, 0, ... required=['Volume']) >>> for item in iter: ... print item # doctest: +NORMALIZE_WHITESPACE (u'stand1', 1, u'thinning', None, 8.0, 3.0, datetime.date(2009, 1, 1)) >>> headers = [('data', 'id'), ('data', 'iteration'), ('data', 'branch')] >>> headers += [('data', h) for h in test.get_level_var_headers(4)] >>> headers += [('op', 'id'), ('op', 'branch'), ('op', 'op_name'), ... ('op', 'cash_flow'), ('op', 'Volume'), ('op', 'op_date')] >>> constraints = [('data', 'id', 'in', ('tree1-1-1', 'tree1-2-1'), 'and'), ... ('op', 'branch', 'in', (0, 1))] >>> iter = test.get_data_from_level('combined', headers, 4, constraints, ... dates=(date(2008, 10, 10), date(2010, 10, 10)), ... rsort=(('op', 'id'), ('data', 'iteration'), ... ('data', 'branch'), ('op', 'Assortment')), ... required=(('op', 'Volume'), ('data', 'id')), ... distinct=True, ... group_by_headers=['Assortment']) >>> for item in iter: ... print item # doctest: +NORMALIZE_WHITESPACE (u'tree1-2-1', 0, 1, 22.0, u'stand1', 1, u'thinning', None, 12.0, datetime.date(2009, 1, 1), 1.0) (u'tree1-2-1', 0, 1, 22.0, u'stand1', 1, u'thinning', None, 4.0, datetime.date(2009, 1, 1), 2.0) def get_ids(self, level): ========================= Will return all ids from the level :: >>> idlist = test.get_ids('data', 2) >>> set(idlist)==set([u'stand1', u'stand2']) True >>> idset = set(test.get_ids('data', 3)) >>> resset = set([u'stratum1-1', u'stratum1-2', u'stratum2-1', ... u'stratum2-2']) >>> resset==idset True def get_max_iter(self, from_data): ================================== Will return the max iteration in data:: >>> test.get_max_iter('data') 1 def get_max_branch(self, from_data): ==================================== Will return the max branch in data:: >>> test.get_max_branch('data') 3 def get_dates(self, from_data, dates): ====================================== Will return the unique dates on a given level, between the given tuple of dates (lower limit, higher limit) :: >>> date_lim = (date(2000, 1, 1), date(2010, 1, 1)) >>> dset = set(test.get_dates('data', date_lim)) >>> dset==set([date(2009, 1, 1), date(2009, 2, 2)]) True def get_dates_by_id_and_iter(self, level, dates): ================================================= Will return a dictionary with (id, iter) pairs as keys and unique dates for a given id as values :: >>> rd = test.get_dates_by_id_and_iter(2, (date(2000, 1, 1), ... date(2010, 1, 1))) >>> rd[(u'stand1', 0)] [datetime.date(2009, 1, 1)] >>> rd[(u'stand1', 1)] [datetime.date(2009, 1, 1)] >>> rd[(u'stand2', 1)] [datetime.date(2009, 2, 2)] >>> rd[(u'stand2', 0)] [datetime.date(2009, 2, 2)] >>> rd = test.get_dates_by_id_and_iter(3, (date(2009, 2, 2), ... date(2010, 1, 1))) >>> len(rd) 0 def copy_to_db(self, key_list, lname, db, iteration): ===================================================== For the given keys of (iteration, object id, orig id, branch, weight, copy_only_ops) copies the data to a new db from the given level. Level is taken as base level, below which all the children from different levels are copied as well:: >>> keys = [(0, 'stand1', 'stand1', 1, 1., False), ... (0, 'stand2', 'stand2', 1, 1., False), ... (0, 'stand1', 'stand1', 0, 1., False), ... (1, 'stand1', 'stand1', 0, 1., False), ... (1, 'stand2', 'stand2', 0, 1., False)] >>> copydb = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, ... levels, opres_def, ... cf_cfiers, mock_db.MockLogger(), 'test_copy.db', ... True, False, (3067, None), create_branch_desc=True, ... track_solution=True) >>> test.copy_to_db(keys, 'comp_unit', copydb, 0) >>> sql = 'SELECT DISTINCT id, iteration, branch FROM data_link '\ ... 'ORDER BY data_id' >>> copydb.db.get(sql) # doctest: +NORMALIZE_WHITESPACE [(u'stand1', 0, 0), (u'stand2', 0, 1), (u'stand2', 1, 0), (u'stand1', 1, 0), (u'stand1', 0, 1)] >>> sql = 'SELECT DISTINCT id FROM tree ORDER BY id' >>> copydb.db.get(sql) # doctest: +NORMALIZE_WHITESPACE [(u'tree1-1-1',), (u'tree1-2-1',), (u'tree2-1-1',), (u'tree2-2-1',), (u'tree2-2-2',)] >>> headers = ['id', 'iteration', 'branch', 'op_name', 'op_group', ... 'cash_flow', 'Volume', 'assortment', 'op_date'] >>> opres1 = test.get_data_from_level('op_res', headers, 0) >>> opres2 = copydb.get_data_from_level('op_res', headers, 0) >>> opres1 = [i for i in opres1] >>> opres1.sort() >>> opres2 = [i for i in opres2] >>> opres2.sort() >>> opres1==opres2 True >>> copydb.close() >>> keys = [(0, 'stand1', 'stand1', 1, 0.25, False), ... (0, 'stand2', 'stand2', 1, 1., False), ... (0, 'stand1', 'stand1', 0, 0.75, False), ... (1, 'stand1', 'stand1', 0, 1., False), ... (1, 'stand2', 'stand2', 0, 1., False)] >>> copydb = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, ... levels, opres_def, ... cf_cfiers, mock_db.MockLogger(), 'test_copy.db', ... True, False, (3067, None), create_branch_desc=True, ... track_solution=True) >>> test.copy_to_db(keys, 'comp_unit', copydb, 0) >>> sql = 'SELECT DISTINCT id, iteration, branch FROM data_link '\ ... 'ORDER BY data_id' >>> copydb.db.get(sql) # doctest: +NORMALIZE_WHITESPACE [(u'stand1', 0, 0), (u'stand2', 0, 1), (u'stand2', 1, 0), (u'stand1', 1, 0), (u'stand1', 0, 1)] >>> sql = 'SELECT DISTINCT id FROM tree ORDER BY id' >>> copydb.db.get(sql) # doctest: +NORMALIZE_WHITESPACE [(u'tree1-1-1',), (u'tree1-2-1',), (u'tree2-1-1',), (u'tree2-2-1',), (u'tree2-2-2',)] >>> headers = ['id', 'iteration', 'branch', 'op_name', 'op_group', ... 'cash_flow', 'Volume', 'assortment', 'op_date'] >>> opres1 = test.get_data_from_level('op_res', headers, 0) >>> opres2 = copydb.get_data_from_level('op_res', headers, 0) >>> opres1 = [i for i in opres1] >>> opres1.sort() >>> opres2 = [i for i in opres2] >>> opres2.sort() >>> opres1==opres2 True >>> copydb.close() def _create_constraints(self, constraints): =========================================== Will return a SQLite string of constraints based on the given dictionary ({col: [val, val, val]}) :: >>> const = [('id', 'in', ('tree1-1-1', 'tree1-2-1'), 'and'), ... ('branch', 'in', (0, 1), 'or'), ('iteration', 'eq', 0)] >>> test._create_constraints('data', const) #doctest: +NORMALIZE_WHITESPACE ('l.id IN (?,?) AND d.branch IN (?,?) OR d.iteration = ? ', ['tree1-1-1', 'tree1-2-1', 0, 1, 0]) >>> const = [('id', 'fail', 'this', 'and'), ... ('test', 'eq', ('fail', 'fail')), ... ('should not come here',)] >>> test._create_constraints('data', const) #doctest: +NORMALIZE_WHITESPACE db, error, Bad operation (fail) not in (le, lt, ge, gt, in, ue, eq) db, error, missing concatenation operand from [('id', 'fail', 'this', 'and'), ('test', 'eq', ('fail', 'fail')), ('should not come here',)] index 1 ('', []) >>> const = [('arr', 'eq', ('fail', 'fail'), 'and'), ('rar', 'fail', 1)] >>> test._create_constraints('data', const) #doctest: +NORMALIZE_WHITESPACE db, error, Too many values for operation type (eq) db, error, Bad operation (fail) not in (le, lt, ge, gt, in, ue, eq) ('', []) def get_child_ids(self, lind, oid, iter=0, branch=0, ddate=None): ================================================================= Recursively gets child ids for the given object and optionally for the given date:: >>> dd = test.get_child_ids(2, 'stand1') >>> set(dd[3]) == set([u'stratum1-1', u'stratum1-2']) True >>> set(dd[4]) == set([u'tree1-1-1', u'tree1-2-1']) True >>> set(dd[5]) == set([u'plot1']) True >>> set(dd[6]) == set([u'sample_tree1-1', u'sample_tree1-2']) True def _get_branch_desc(self, id, iteration, to_branch): ===================================================== Get a merged description for the given id, iteration, branch combo:: >>> test._get_branch_desc('stand1', 0, 0) '' >>> test._get_branch_desc('stand1', 0, 1) 'name' >>> test._get_branch_desc('stand1', 0, 2) 'name|name2' def fill_branch_desc_table(self): ================================= Fills the branch description table:: >>> test.db.get('SELECT * FROM branch_desc') [] >>> test.fill_branch_desc_table(('stand1', 'stand2')) >>> test.db.get('SELECT * FROM branch_desc ORDER BY branch') ... #doctest:+NORMALIZE_WHITESPACE [(0, u'stand1', 1, u'name', u'bgroup'), (0, u'stand1', 2, u'name|name2', u'bgroup2'), (0, u'stand1', 3, u'name|name3', u'bgroup3')] def store_solution(self, keys, run_id): ======================================= This function will store a solution to the optimal table in the database, removing any and all old data under the given run_id. keys is an array of iteration, id, branch comboes:: >>> test.db.get('SELECT * FROM optimal') [] >>> test.store_solution([(0, 'test', 'test', 0, 1., False), ... (1, 'test2', 'test2', 1, 1., False)], 'test') >>> test.db.get('SELECT * FROM optimal') [(u'test', 0, u'test', 0, 1.0), (u'test', 1, u'test2', 1, 1.0)] >>> test.store_solution([(2, 'test3', 'test3', 2, 1., False)], 'test') >>> test.db.get('SELECT * FROM optimal') [(u'test', 2, u'test3', 2, 1.0)] >>> test.store_solution([(3, 'test4', 'test4', 3, 1., False)], 'test2') >>> test.db.get('SELECT * FROM optimal') [(u'test', 2, u'test3', 2, 1.0), (u'test2', 3, u'test4', 3, 1.0)] def drop_id(self, id): ====================== This function will drop all data from all the tables that refer to the given id:: >>> test.db.get('SELECT count(id) FROM comp_unit WHERE id=\'stand1\'') [(3,)] >>> test.db.get('SELECT count(id) FROM stratum WHERE pid=\'stand1\'') [(6,)] >>> test.db.get('SELECT count(id) FROM BRANCHING_INFO WHERE id=\'stand1\'') [(3,)] >>> test.db.get('SELECT count(id) FROM op_link WHERE id=\'stand1\'') [(1,)] >>> op_ids = test.db.get('SELECT op_id FROM op_link WHERE id=\'stand1\'') >>> op_ids = [op_id[0] for op_id in op_ids] >>> test.db.get('SELECT count(*) FROM op_res WHERE op_id IN (%s)' \ ... % ', '.join('?'*len(op_ids)), op_ids) [(3,)] >>> test.drop_id('stand1') >>> test.db.get('SELECT count(*) FROM comp_unit WHERE id=\'stand1\'') [(0,)] >>> test.db.get('SELECT count(*) FROM stratum WHERE pid=\'stand1\'') [(0,)] >>> test.db.get('SELECT count(*) FROM BRANCHING_INFO WHERE id=\'stand1\'') [(0,)] >>> test.db.get('SELECT count(*) FROM op_link WHERE id=\'stand1\'') [(0,)] >>> test.db.get('SELECT count(*) FROM op_res WHERE op_id IN (%s)' \ ... % ', '.join('?'*len(op_ids)), op_ids) [(0,)] def drop_ids(self, ids): ======================== This function will drop all data from all the tables that refer to the given ids:: >>> test.db.get('SELECT count(id) FROM optimal WHERE id=\'test3\'') [(1,)] >>> test.drop_ids(('stand1', 'test3')) >>> test.db.get('SELECT count(id) FROM optimal WHERE id=\'test3\'') [(0,)] def _get_data_hash(self, lind, uid, iterr, branch, ddate): ========================================================== Construct a hash string from data object key Parameters :: lind -- level ind as integer uid -- unique id as string iterr -- iteration as integer branch -- branch as integer ddate -- data date as datetime object :: >>> from datetime import date >>> test._get_data_hash(1, u'STAND1', 11, 0, date(2011,1,1)) u'1STAND1i11b02011-01-01' >>> test._get_data_hash(1, u'STAND1', 1, 10, date(2011,1,1)) u'1STAND1i1b102011-01-01' def _get_op_hash(self, uid, iterr, branch, ddate, op_name, counter=0): ====================================================================== Construct a hash string from operation key Parameters :: uid -- unique id as string iterr -- iteration as integer branch -- branch as integer ddate -- data date as datetime object op_name -- operation name as string counter -- optional counter argument :: >>> test._get_op_hash(u'STAND1', 11, 0, date(2011,1,1), u'thinning', 1) u'STAND1i11b02011-01-01thinning1' >>> test._get_op_hash(u'STAND1', 1, 10, date(2011,1,1), u'thinning', 1) u'STAND1i1b102011-01-01thinning1' def _encode_hash(self, hash): ============================= Compute the md5 hash of a string Parameters :: hash -- hash string :: >>> test._encode_hash(u'1STAND1i11b02011-01-01') '954457c3b270b4b3b04c3da96559f2eb' >>> test._encode_hash(u'1STAND1i1b102011-01-01') '3c6e97a3468d9082a875681b227fd1d6' def clear_data(self): ===================== Clears data from level tables:: >>> test.clear_data() >>> keys = test.content_def.keys() >>> keys.sort() >>> for cdef_key in keys: ... level_name = cdef_key[0] ... sql = 'SELECT * FROM %s' % level_name ... res = test.db.get(sql) ... print level_name, len(res) comp_unit 0 estate 0 sample_esplot 0 sample_estate 0 sample_plot 0 sample_tree 0 simulation 0 stratum 0 tree 0 >>> test.close() **************************** class OperationDB(SQLiteDB): **************************** def __init__(self, db_name='default.db', wipe=False, memory=False, db_host='localhost', db_port=5432, user='test', pw='test', db_class='SQLITE', db_database='SIMO', db_schema='', extra_columns=[]): ================================================================== :: >>> opdb = db.OperationDB('test_mock.db', True, ... extra_columns=[("comp_unit", "EXTRA_COLUMN")]) >>> opdb.close() The init will create/open a connection to the given database file (wiping the old one if necessary) and will then create the tables it needs. def _create_tables(self): ========================= Creates the tables that OperationDB needs. :: >>> opdb.open() >>> sql = "select sql from sqlite_master where type = 'table'" >>> iter = opdb.db.get(sql) >>> for item in iter: ... print item[0] # doctest: +NORMALIZE_WHITESPACE CREATE TABLE forced_operation (op_id INTEGER NOT NULL, unit_id TEXT NOT NULL, op_level TEXT NOT NULL, iteration INTEGER NOT NULL, branch INTEGER NOT NULL, name TEXT NOT NULL, timingtype TEXT NOT NULL, timestep INTEGER, op_date DATE, stepunit TEXT, "EXTRA_COLUMN" DOUBLE PRECISION, CONSTRAINT pk_forced_operation PRIMARY KEY (op_id, unit_id)) CREATE TABLE operation_chain (id INTEGER PRIMARY KEY, op_id INTEGER NOT NULL, pid TEXT NOT NULL, chain_ordinal INTEGER NOT NULL, chain TEXT NOT NULL, CONSTRAINT fk_pid FOREIGN KEY (op_id, pid) REFERENCES forced_operation (op_id, unit_id) ON DELETE CASCADE) def add_operation(unit_id, level, iteration, branch, name, timingtype, timestep, date, stepunit, chains, op_id=None, ext_col_vals=None): ======================================================================================================================================== This adds new rows to the database. :: >>> odate = date(2009, 3, 9) >>> opdb.add_operation('stand1', 'comp_unit', 0, 0, 'mounding', 'step', ... 2, None, 'year', ['Forced mounding']) >>> opdb.add_operation('stand2', 'comp_unit', 0, 0, 'clearcut', 'step', 2, ... None, 'year', ['Forced clearcut', ... 'Update comp_unit after forced clearcut or strip cut', ... 'Calculate productive value']) >>> opdb.add_operation('stand3', 'comp_unit', 0,0,'clearcut', 'date', None, ... odate, 'year', ['Forced clearcut', ... 'Update comp_unit after forced clearcut or strip cut', ... 'Calculate productive value']) >>> opdb.add_operation('stand4', 'comp_unit', 0,0,'clearcut', 'date', None, ... odate, 'year', ['Forced clearcut', ... 'Update comp_unit after forced clearcut or strip cut', ... 'Calculate productive value'], ext_col_vals={"EXTRA_COLUMN": 1000.}) >>> iter = opdb.db.get('select * from forced_operation') >>> expected_results = [ ... [1, u'stand1', u'comp_unit', 0, 0, u'mounding', u'step', 2, None, ... u'year'], ... [1, u'stand2', u'comp_unit', 0, 0, u'clearcut', u'step', 2, None, ... u'year'], ... [1, u'stand3', u'comp_unit', 0, 0, u'clearcut', u'date', None, ... odate, u'year'], ... [1, u'stand4', u'comp_unit', 0, 0, u'clearcut', u'date', None, ... odate, u'year']] >>> i = 0 >>> for row in iter: ... res = expected_results[i] ... i+=1 ... for val, exp in zip(row, res): ... if type(val) != type(exp): ... print 'Type mismatch:', type(val), type(exp) ... if val != exp: ... print 'Value mismatch: got', val, 'expected', exp ... >>> iter = opdb.db.get('select * from operation_chain') >>> expected_results = [ ... [1, 1, u'stand1', 0, u'Forced mounding'], ... [2, 1, u'stand2', 0, u'Forced clearcut'], ... [3, 1, u'stand2', 1, u'Update comp_unit after forced clearcut or strip cut'], ... [4, 1, u'stand2', 2, u'Calculate productive value'], ... [5, 1, u'stand3', 0, u'Forced clearcut'], ... [6, 1, u'stand3', 1, u'Update comp_unit after forced clearcut or strip cut'], ... [7, 1, u'stand3', 2, u'Calculate productive value'], ... [8, 1, u'stand4', 0, u'Forced clearcut'], ... [9, 1, u'stand4', 1, u'Update comp_unit after forced clearcut or strip cut'], ... [10, 1, u'stand4', 2, u'Calculate productive value']] >>> i = 0 >>> for row in iter: ... res = expected_results[i] ... i+=1 ... for val, exp in zip(row, res): ... if type(val) != type(exp): ... print 'Type mismatch:', type(val), type(exp) ... if val != exp: ... print 'Value mismatch: got', val, 'expected', exp def fill_dictionary(self, levelname, ids): ========================================== Fills a dictionary with object ids as keys (ids) and ForcedOperation instances as data :: >>> obj_ids = [('stand1', 'o-stand1'), ('stand2', 'stand2')] >>> fcd_op_dict = opdb.fill_dictionary('comp_unit', obj_ids) >>> for key, vals in fcd_op_dict.items(): # doctest: +NORMALIZE_WHITESPACE ... for val in vals: ... print key, '-', val.id, val.level, val.timing_type, \ ... val.time_step, ... print val.date, val.step_unit, ... print val.chain_names, ... print val.chain_indices (0, 0, u'stand1') - stand1 comp_unit step 2 None year [u'Forced mounding'] None (0, 0, u'stand2') - stand2 comp_unit step 2 None year [u'Forced clearcut', u'Update comp_unit after forced clearcut or strip cut', u'Calculate productive value'] None def clear_data(self): ===================== Clears data from data tables:: >>> opdb.clear_data() >>> sql = 'SELECT * FROM forced_operation' >>> res = opdb.db.get(sql) >>> res [] >>> sql = 'SELECT * FROM operation_chain' >>> res = opdb.db.get(sql) >>> res [] >>> opdb.close() >>> opdb = None ************************* class LoggerDB(SQLiteDB): ************************* def __init__(self, db_path='default.db', wipe=False): ===================================================== Connects to the given database file (wiping the existing one if requested) and runs _create_log_table():: >>> test = db.LoggerDB('test_mock.db', True) def _create_log_table(self): ============================ Creates the log table into the database:: >>> sql = "select sql from sqlite_master where type='table' and "\ ... "name='log'" >>> iter = test.db.get(sql) >>> for item in iter: ... print item[0] CREATE TABLE log( run_id TEXT, log_level TEXT, module TEXT, time TIMESTAMP, sim_id TEXT, message TEXT) def add_log_message(self, level, module, timestamp, sim_id, message): ===================================================================== This will add a new log message into the database:: >>> from datetime import datetime >>> date = datetime(2009, 2, 2, 15, 17, 56, 626329) >>> date2 = datetime(2010, 4, 9, 10, 25) >>> test.add_log_message('test', 'one', 'two', date, 'stand1', 'Ni!') >>> test.add_log_message('test2', 'three', 'four', date2, 'stand2', ... 'Shrubbery!') >>> iter = test.db.get('select * from log') >>> for item in iter: ... print item # doctest: +NORMALIZE_WHITESPACE (u'test', u'one', u'two', datetime.datetime(2009, 2, 2, 15, 17, 56, 626329), u'stand1', u'Ni!') (u'test2', u'three', u'four', datetime.datetime(2010, 4, 9, 10, 25), u'stand2', u'Shrubbery!') def get_log_messages(self, level=None, module=None, start_time=None, end_time=None): ==================================================================================== This method retrieves log data with the given constraints. level and module are tuples containing wanted levels and modules respectively. start_time and end_time are timestamps (datetime instances) to constrain the output. :: >>> pprint(test.get_log_messages()) [(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329), u'two', u'one', u'test', u'stand1', u'Ni!'), (datetime.datetime(2010, 4, 9, 10, 25), u'four', u'three', u'test2', u'stand2', u'Shrubbery!')] >>> pprint(test.get_log_messages(level=('one','six','ten'))) [(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329), u'two', u'one', u'test', u'stand1', u'Ni!')] >>> pprint(test.get_log_messages(module=('four', 'three', 'seven'))) [(datetime.datetime(2010, 4, 9, 10, 25), u'four', u'three', u'test2', u'stand2', u'Shrubbery!')] >>> pprint(test.get_log_messages(run_id=('test2', 'tset3', 'nonexistant'))) [(datetime.datetime(2010, 4, 9, 10, 25), u'four', u'three', u'test2', u'stand2', u'Shrubbery!')] >>> pprint(test.get_log_messages(start_time=date2)) [(datetime.datetime(2010, 4, 9, 10, 25), u'four', u'three', u'test2', u'stand2', u'Shrubbery!')] >>> pprint(test.get_log_messages(end_time=date)) [(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329), u'two', u'one', u'test', u'stand1', u'Ni!')] >>> pprint(test.get_log_messages(start_time=date, end_time=date2)) [(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329), u'two', u'one', u'test', u'stand1', u'Ni!'), (datetime.datetime(2010, 4, 9, 10, 25), u'four', u'three', u'test2', u'stand2', u'Shrubbery!')] >>> pprint(test.get_log_messages(run_id=('test', 'test2'), ... level=('one', 'three'), ... module=('two', 'four'), ... start_time=date, end_time=date2)) [(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329), u'two', u'one', u'test', u'stand1', u'Ni!'), (datetime.datetime(2010, 4, 9, 10, 25), u'four', u'three', u'test2', u'stand2', u'Shrubbery!')] >>> test.get_log_messages(level=('fail')) [] >>> test.get_log_messages(module=('fail')) [] >>> test.get_log_messages(start_time=datetime(1800, 1, 1), ... end_time=datetime(1805, 1, 1)) [] >>> test.get_log_messages(run_id=('test',), empty_log=True) >>> pprint(test.get_log_messages()) [(datetime.datetime(2010, 4, 9, 10, 25), u'four', u'three', u'test2', u'stand2', u'Shrubbery!')] >>> test.close() >>> test = None