Table Of Contents

Previous topic

messagetranslationtable.py

Next topic

db.py

This Page

db.py

Writing fast queries/indexes:

First rule of writing indexes is: If it doesn’t make a big impact, don’t do it. Every index will slow down every insert into the table it’s made for so the more inserts you have to a given table, the more careful you need to be here.

If you do make a new index, bear in mind that you need to put the variable key (there should be only one) to the last (f.e. iteration, branch, date, if you have a lot of queries with “WHERE iteration=? and branch=? and date<?” or similar (IN, OR, BETWEEN, <, >, etc)). If there are no other variables, put the date column for last (if you have it), as its handling appears to be a bit different to text and numbers.

When writing queries, you should have the parameters in the same order that they are in the index and again, it’s preferrable to have the variable key(s) (including date) for last.

class SQLiteDB(object):

This object is mainly to be used as a base class. It will connect to the given database file (and wipe it if need be). It also houses a close method for when you’re done (and an open if you decide to continue later, in which case a new connection to the original file is formed)

def __init__(self, db_path=’default.db’, wipe=False, memory=False):

Connects to the given database file. Destroys the old one first if wipe = True. Stores to db in memory only if memory is set to True:

>>> from datetime import date
>>> from simo.db.datadb import db
>>> from pysqlite2 import dbapi2 as sqlite3
>>> import hashlib
>>> test = db.SQLiteDB('test_mock.db', wipe=True)
>>> sql = 'CREATE TABLE "test" (id INTEGER PRIMARY KEY, message TEXT)'
>>> c = test.execute(sql)
>>> sql = 'INSERT INTO "test" (message) VALUES ("Test text")'
>>> c = test.execute(sql)
>>> iter = test.get('SELECT "test".* FROM "test"')
>>> for row in iter:
...     print row
(1, u'Test text')
>>> test.close()
>>> test = db.SQLiteDB(memory=True)
>>> sql = "CREATE TABLE test (id INTEGER PRIMARY KEY, message TEXT)"
>>> c = test.execute(sql)
>>> c = test.execute('INSERT INTO test (message) VALUES ("Test text")')
>>> iter = test.get('SELECT * FROM test')
>>> for row in iter:
...     print row
(1, u'Test text')
>>> test.close()
>>> test.open()
>>> try:
...     iter = test.get('SELECT * FROM test')
... except sqlite3.OperationalError, e:
...     print e
no such table: test
>>> test.close()

def open(self):

Re-opens a connection to the database file given in init.

>>> test = db.SQLiteDB(db_path='test_mock.db', wipe=False)
>>> test.close()
>>> try:
...     cursor = test.conn.cursor()
... except sqlite3.ProgrammingError, e:
...     print 'Database closed;', e.args[0]
Database closed; Cannot operate on a closed database.
>>> test.open()
>>> iter = test.get('SELECT * FROM test')
>>> for row in iter:
...     print row
(1, u'Test text')

def close(self):

Commits any lingering changes and closes the database connection.

>>> test = db.SQLiteDB(db_path='test_mock.db', wipe=False)
>>> c = test.execute('CREATE TABLE fun (id INTEGER PRIMARY KEY)')
>>> c = test.execute('INSERT INTO fun VALUES (1)')
>>> test.close()
>>> test.open()
>>> iter = test.get('SELECT * FROM fun')
>>> for row in iter:
...     print row
(1,)
>>> test.close()

class DataDB(SQLiteDB):

The object will connect to a given database file on init or create one if needed, using the provided Lexicon for the table structure. It will also create a log table on initialization. If the make_new_db parameter (default True) is not set to False, an existing database, if any, will be destroyed.

def __init__(self, db_type, content_def, hierarchy, level_meta, opres_def, cf_cfiers, logger, db_path=’default.db’, make_new_db=False, memory=False, spatial=False, constraints=None, db_host=’localhost’, db_port=‘5432’, user=’test’, pw=’test’, db_class=’SQLITE’):

Connects to the given database file. Destroys the old file and sets up the new one with the given lexicon, if make_new_db==True and sets up initial values (self.content_def, self.hierarchy), will complain of changed schema if db_type is ‘write’ and make_new_db is False and the schema (content_def) has changed. If using PostgreSQL (db_class=’POSTGRESQL’), host, port, username and password must be defined:

>>> from simo.db.datadb.test import mock_db
>>> from pprint import pprint
>>> hierarchy = mock_db.hierarchy
>>> content_def = mock_db.base_cd
>>> levels = mock_db.base_levels
>>> opres_def = mock_db.opres_def
>>> cf_cfiers = mock_db.cf_cfiers
>>> test = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, levels,
...         opres_def,
...         cf_cfiers, mock_db.MockLogger(), 'test_mock.db',
...         make_new_db=True, memory=False, spatial=(3067, None),
...         create_branch_desc=True, track_solution=True)

def _create_meta_tables(self):

Sets up tables for storing the numerical and categorical attribute definitions

def _set_meta_tables(self, level_meta):

Writes numerical and categorical variable definitions to the meta tables:

>>> res = test.db.get('SELECT * from META_NUM')
>>> pprint(res)
[(1, u'tree', u'd', None, None, None, u'Diameter...'),
 (2, u'sample_esplot', u'BA', None, None, None, u'Basal area...'),
 (3,
  u'simulation',
  u'DIAM_CLASS_WIDTH',
  None,
  None,
  None,
  u'Width of a single class...'),
 (4, u'sample_tree', u'h', None, None, None, u'Height...'),
 (5, u'sample_plot', u'BA', None, None, None, u'Basal area...'),
 (6, u'stratum', u'N', None, None, None, u'Number of stems...')]
>>> res = test.db.get('SELECT * from META_CAT')
>>> pprint(res)
[(1, u'stratum', u'SP', None, None, u'Tree species...'),
 (2, u'comp_unit', u'SC', None, None, u'Site class...')]
>>> sql = 'SELECT * from META_CAT_VAL'
>>> res = test.db.get(sql)
>>> pprint(res)
[(1, 1, 1.0, u'Pine'), (2, 1, 2.0, u'Spruce'), (3, 2, 1.0, u'Very good')]

def _create_tabledef(self, defkey, content_def=None, levels=None):

Creates table definitions for each data level in content definition. Table definition is stored keyed with the level name and stores the level index, parent level name, attribute names and corresponding matrix indices, optional geometry type of the level, and a set of attributes names that are of type TEXT:

>>> metadata = test._get_table_columns('comp_unit')
>>> metadata 
[u'data_id', u'id', u'oid', u'pid', u'SC', u'StandLabel', u'geom']
>>> cu = test.table_def['comp_unit']
>>> assert cu['attr ind'] == [0, 1]
>>> assert cu['attr name'] == [u'SC', u'StandLabel']
>>> assert cu['columns'] == set([u'data_id', u'oid', u'pid',
...                              u'StandLabel', u'SC', u'geom', u'id'])
>>> assert cu['create attr'] == u' "SC" DOUBLE PRECISION,\n "StandLabel" TEXT,\n'
>>> assert cu['float attr ind'] == [0]
>>> assert cu['float attr name'] == [u'SC']
>>> assert cu['float insert sql'] == u'INSERT INTO comp_unit '\
...                             '(data_id, id, oid, pid, "SC") '\
...                             'VALUES (?, ?, ?, ?, ?)'
>>> assert cu['geom type'] == 'MULTIPOLYGON'
>>> assert cu['geom update sql'] == 'UPDATE comp_unit '\
...                          'SET geom=GeomFromText(?, ?) WHERE '\
...                          'data_id=? AND id=?'
>>> assert cu['insert sql'] == u'INSERT INTO comp_unit (data_id, id, '\
...                             'oid, pid, '\
...                             '"SC", "StandLabel") '\
...                             'VALUES (?, ?, ?, ?, ?, ?)'
>>> assert cu['level'] == 2
>>> assert cu['other attr name'] == [u'StandLabel']
>>> assert cu['parent name'] == 'estate'
>>> assert cu['text vars'] == set(['StandLabel'])
>>> assert cu['to remove'] == [1]

def _get_parent_level(self, my_hierarchy):

Will get a parent_level value from a given hierarchy item (from self.lexicon.hierarchy) or None, if the item is root (level 0):

>>> test._get_parent_level(test.hierarchy[2])
1
>>> test._get_parent_level(test.hierarchy[0])

def _create_table(self, lname):

Based on the table definitions, creates new tables in the db. For existing tables adds new columns if the content definition has changed since the table was created. If the schema, i.e. the lexicon on which the database is based, is changed between runs and make_new_db is set to False, an error is triggered:

>>> lname = 'comp_unit'
>>> metadata = test._get_table_columns(lname)
>>> print metadata 
[u'data_id', u'id', u'oid', u'pid', u'SC', u'StandLabel', u'geom']
>>> test.db.execute("INSERT INTO simulation "\
...                 "(id, oid) "\
...                 "VALUES (?, ?)",
...                 ('simulation', 'simulation'))
>>> test.db.execute("INSERT INTO estate "\
...                 "(id, oid, pid) "\
...                 "VALUES (?, ?, ?)",
...                 ('estate', 'estate',
...                  'simulation'))
>>> lind, id, iterr, branch, ddate = 2, 'id', 0, 0, date(2011, 11, 11)
>>> data_hash = str(lind) + id + str(iterr) + str(branch) + str(ddate)
>>> data_id = hashlib.md5(data_hash).hexdigest()
>>> test.db.execute("INSERT INTO data_link VALUES (?, ?, ?, ?, ?, ?)",
...                 ('comp_unit', 'id', 0, 0, date(2011, 11, 11), data_id))
>>> test.db.execute("INSERT INTO comp_unit "\
...                 "VALUES (?, ?, ?, ?, ?, ?, MPolyFromText(?, ?))",
...                 (data_id, 'id', 'oid', 'estate', 5.7, 8.9,
...                 'MULTIPOLYGON(((1 1,5 1,5 5,1 5,1 1),'\
...                               '(2 2,2 3,3 3,3 2,2 2)),'\
...                               '((6 3,9 2,9 4,6 3)))', db.SRID))
>>> test.db.get('SELECT * FROM comp_unit')
... 
[(u'dfbf3e2bfab1b484e80268a83af829ab', u'id', u'oid', u'estate',
  5.7..., u'8.9...', ...)]
>>> test.close()
>>> content_def = mock_db.aug_cd
>>> levels = mock_db.aug_levels
>>> try:
...     test = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, levels,
...         opres_def, cf_cfiers, mock_db.MockLogger(), 'test_mock.db',
...         make_new_db=False, memory=False, spatial=(3067, None),
...         create_branch_desc=True, track_solution=True)
... except ValueError, e:
...     print e[0] 
... except Exception, e:
...     print str(e)
simo.db.datadb.db, warning, Lexicon has changed! Updating comp_unit.
simo.db.datadb.db, warning, We can't alter column- or geometry types in
                            this fashion. If you need those changed,
                            wipe the existing database.
>>> backup = content_def[('comp_unit', 2)].pop()
>>> try:
...     test = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, levels,
...         opres_def, cf_cfiers, mock_db.MockLogger(), 'test_mock.db',
...         make_new_db=False, memory=False, spatial=(3067, None),
...         create_branch_desc=True, track_solution=True)
... except ValueError, e:
...     print e[0] 
simo.db.datadb.db, warning, Lexicon has changed! Updating comp_unit.
simo.db.datadb.db, warning, We can't alter column- or geometry types in
                            this fashion. If you need those changed,
                            wipe the existing database.
simo.db.datadb.db, warning, Lexicon change resulted to a drop request for
                            column(s) (AnotherTextVar). Columns can't be
                            dropped in SQLite! This should only cause
                            increased database size, however.
>>> content_def[('comp_unit', 2)].append(backup)
>>> try:
...     test = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy, levels,
...         opres_def, cf_cfiers, mock_db.MockLogger(), 'test_mock.db',
...         make_new_db=False, memory=False, spatial=(3067, None),
...         create_branch_desc=True, track_solution=True)
... except ValueError, e:
...     print e[0] 
>>> metadata = test._get_table_columns(lname)
>>> print metadata 
[u'data_id', u'id', u'oid', u'pid', u'SC', u'StandLabel', u'geom',
 u'AnotherTextVar', u'BA', u'AnotherCatVar']
>>> test.db.get('SELECT * FROM comp_unit')
... 
[(u'dfbf3e2bfab1b484e80268a83af829ab', u'id', u'oid', u'estate',
  5.7..., u'8.9', <read-write buffer ptr ..., size ... at ...>, None,
  None, None)]

def _create_timber_price_table(self, cf_cfiers):

Build a datatable for storing timber prices. The cf_cfiers are the additional classifiers required.

>>> set(test._get_table_columns('timber_price')) == \
... set([u'id', u'iteration', u'branch', u'data_date', u'assortment',
...      u'SP', u'price_table', u'price'])
True
>>> test._create_timber_price_table(cf_cfiers)
>>> test._create_timber_price_table(cf_cfiers|set(['some', 'more']))
... 
simo.db.datadb.db, warning, Timber price classifiers have changed!
>>> set(test._get_table_columns('timber_price')) == \
... set([u'id', u'iteration', u'branch', u'data_date', u'assortment',
...      u'SP', u'price_table', u'price', u'some', u'more'])
True
>>> test._create_timber_price_table(cf_cfiers-set(['some', 'more']))
... 
simo.db.datadb.db, warning, Timber price classifiers have changed!
simo.db.datadb.db, warning, Timber price classifier change resulted to a
                            drop request for column(s) (some, more).
                            Columns can't be dropped in SQLite! This
                            should only cause increased database size,
                            however.
>>> set(test._get_table_columns('timber_price')) == \
... set([u'id', u'iteration', u'branch', u'data_date', u'assortment',
...      u'SP', u'price_table', u'price', u'some', u'more'])
True
>>> test.close()
>>> test = db.DataDB('write', (2, 'comp_unit'), content_def,
...                  hierarchy, levels, opres_def,
...                  cf_cfiers, mock_db.MockLogger(), 'test_mock.db',
...                  make_new_db=True, memory=False, spatial=(3067, None),
...                  create_branch_desc=True, track_solution=True)

def add_data_from_dictionary(self, data, iter, branch):

Adds new data from a directory {levelname: [(ddate, {‘id’:id, ‘oid’: original_id, ‘parent id’: pid, ‘values’: [(‘key’, value), (‘key’, value)]}), ...], ...}:

>>> ddate = date(2009, 2, 2)
>>> for i_b, data in mock_db.data_dict.items():
...     test.add_data_from_dictionary(data, i_b[0], i_b[1])
>>> sql = 'SELECT data_id, id, oid, pid, BA, SC, "StandLabel" '\
...       'FROM comp_unit ORDER BY '\
...       'data_id'
>>> res = test.db.get(sql)
>>> for row in res:
...     print row 
(u'6362e8cae83710615afcf6421cc03d20', u'stand2', u'o-stand2', u'estate1', 91.0, 9.0, u'six#$!"\'"\xe4\xf6')
(u'76e7844f84e2a0f5cc1ad63994e9916f', u'stand1', u'o-stand1', u'estate1', 90.0, 9.0, u"five'")
(u'77268ff29f302f9723bc88214a7c0dd6', u'stand2', u'o-stand2', u'estate1', 11.0, 1.0, u'two')
(u'b5ad2a1b333491a3298b39c48596122e', u'stand2', u'o-stand2', u'estate1', 21.0, 2.0, u'four')
(u'be062d5143706801364cfd2d7296661a', u'stand1', u'o-stand1', u'estate1', 20.0, 2.0, u'three')
(u'f74874ac2247f3cf504fe398e491c2ae', u'stand1', u'o-stand1', u'estate1', 10.0, 1.0, u'one')
>>> sql = 'SELECT data_id, id, d FROM tree ORDER BY data_id'
>>> res = test.db.get(sql)
>>> for row in res:
...     print row 
(u'6362e8cae83710615afcf6421cc03d20', u'tree2-1-1', 93.0)
(u'6362e8cae83710615afcf6421cc03d20', u'tree2-2-1', 94.0)
(u'6362e8cae83710615afcf6421cc03d20', u'tree2-2-2', 95.0)
(u'76e7844f84e2a0f5cc1ad63994e9916f', u'tree1-1-1', 91.0)
(u'76e7844f84e2a0f5cc1ad63994e9916f', u'tree1-2-1', 92.0)
(u'77268ff29f302f9723bc88214a7c0dd6', u'tree2-1-1', 13.0)
(u'77268ff29f302f9723bc88214a7c0dd6', u'tree2-2-1', 14.0)
(u'77268ff29f302f9723bc88214a7c0dd6', u'tree2-2-2', 15.0)
(u'be062d5143706801364cfd2d7296661a', u'tree1-1-1', 21.0)
(u'be062d5143706801364cfd2d7296661a', u'tree1-2-1', 22.0)
(u'f74874ac2247f3cf504fe398e491c2ae', u'tree1-1-1', 11.0)
(u'f74874ac2247f3cf504fe398e491c2ae', u'tree1-2-1', 12.0)

>>> sql = 'SELECT data_id, id, h FROM sample_tree ORDER BY data_id'
>>> res = test.db.get(sql)
>>> for row in res:
...     print row 
(u'76e7844f84e2a0f5cc1ad63994e9916f', u'sample_tree1-1', 91.0)
(u'76e7844f84e2a0f5cc1ad63994e9916f', u'sample_tree1-2', 92.0)
(u'be062d5143706801364cfd2d7296661a', u'sample_tree1-1', 41.0)
(u'be062d5143706801364cfd2d7296661a', u'sample_tree1-2', 42.0)
(u'f74874ac2247f3cf504fe398e491c2ae', u'sample_tree1-1', 31.0)
(u'f74874ac2247f3cf504fe398e491c2ae', u'sample_tree1-2', 32.0)

>>> sql = 'SELECT DISTINCT data_level FROM data_link'
>>> res = test.db.get(sql)
>>> len(res[0])
1
>>> res[0][0]
u'comp_unit'

def fill_matrix(self, handler, lind, full_ids, iter=0, branch=0):

Fills a data matrix with data for the objects identified by the level and a set of object ids. The matrix is filled for all data levels and objects linked to from the given level and ids in top down order; i.e., simulation level first. All lineages are filled as well:

>>> sql = """UPDATE comp_unit SET StandLabel=?"""
>>> test.db.execute(sql, [u'Kuvionumero #"!$€\''])
>>> obj_ids = [('stand1', 'o-stand1'), ('stand2', 'o-stand2')]
>>> test.fill_matrix(mock_db.handler, 2, obj_ids)
...       
>>> pprint(mock_db.handler.added_ids)
[[(u'sim1', u'o-sim1')],
 [(u'estate1', u'o-estate1')],
 [('stand1', 'o-stand1')],
 [(u'stratum1-1', u'o-stratum1-1'), (u'stratum1-2', u'o-stratum1-2')],
 [(u'tree1-1-1', u'o-tree1-1-1')],
 [(u'tree1-2-1', u'o-tree1-2-1')],
 [(u'plot1', u'o-plot1')],
 [(u'sample_tree1-1', u'o-sample_tree1-1'),
  (u'sample_tree1-2', u'o-sample_tree1-2')],
 [('stand2', 'o-stand2')],
 [(u'stratum2-1', u'o-stratum2-1'), (u'stratum2-2', u'o-stratum2-2')],
 [(u'tree2-1-1', u'o-tree2-1-1')],
 [(u'tree2-2-1', u'o-tree2-2-1'), (u'tree2-2-2', u'o-tree2-2-2')]]

def add_branching_info(self, lind, binfo, ind2id, start_iter=0):

Adds a new item to the BRANCHING_INFO table. binfo is a dictionary consisting of key and data tuples as (iteration index, child branch index, object index):(parent branch index, branching operation group, operation, branch name, date)

>>> bdate = date(2009, 7, 8)
>>> test.add_branching_info(2, {(0,2,0):(1,'tgrp','top', 'name2',
...                             'bgroup2', bdate)},
...                         mock_db.ind2id)
>>> test.add_branching_info(2, {(0,3,0):(1,'tgrp','top', 'name3',
...                             'bgroup3', bdate)},
...                         mock_db.ind2id)
>>> test.add_branching_info(2,
...              {(0,1,0):(0,'tgrp','top', 'name', 'bgroup', bdate)},
...                         mock_db.ind2id)
... 
db, error, ...
>>> e = test.logger.log
>>> 'violates unique' in e or 'not unique' in e
True
>>> sql = 'SELECT * FROM BRANCHING_INFO ORDER BY from_branch, to_branch'
>>> iter = test.db.get(sql)
>>> for row in iter: print row 
(u'stand1', 0, 0, 1, datetime.date(2009, 7, 8), u'tgrp', u'top', u'name',
 u'bgroup')
(u'stand1', 0, 1, 2, datetime.date(2009, 7, 8), u'tgrp', u'top', u'name2',
 u'bgroup2')
(u'stand1', 0, 1, 3, datetime.date(2009, 7, 8), u'tgrp', u'top', u'name3',
 u'bgroup3')

def add_opres(self, levelname, data):

This adds new rows to the database. data is a list of dictionaries, each containing date, id, iteration, branch, values, op_name and op_group. values is a list of dictionaries containing var-val pairs

>>> ddate = date(2009, 1, 1)
>>> data = [{'date': ddate,
...          'op_id':1,
...          'id':'stand1',
...          'iteration': 0,
...          'branch': 1,
...          'values': [([{'cash_flow': 3.0},
...                       {'Volume': 2.0, 'SP': 1., 'assortment': 2.},
...                       {'Volume': 6.0, 'SP': 1., 'assortment': 1.}],
...                       ddate)],
...          'op_name': 'thinning',
...          'op_group': 'cutting',
...          'op_type': 'simulated',
...          'notes': 'These here are the notes!',
...          'materials': ['one material', 'two material']}]
>>> test.add_opres('comp_unit', data)
[]
>>> iter = test.db.get('SELECT * FROM op_link')
>>> for row in iter:
...     print row 
(u'stand1', 0, 1, datetime.date(2009, 1, 1),
 u'4d66c66840df6fdf9c32a51224c72701', u'1', u'comp_unit', u'thinning',
 u'cutting', u'simulated')
>>> iter = test.db.get('SELECT * FROM op_res')
>>> for row in iter:
...     print row 
(u'4d66c66840df6fdf9c32a51224c72701', 3.0, None, None, None)
(u'4d66c66840df6fdf9c32a51224c72701', None, 1.0, 2.0, 2.0)
(u'4d66c66840df6fdf9c32a51224c72701', None, 1.0, 1.0, 6.0)
>>> iter = test.db.get('SELECT * FROM op_note')
>>> for row in iter:
...     print row 
(u'4d66c66840df6fdf9c32a51224c72701', u'These here are the notes!')
>>> iter = test.db.get('SELECT * FROM op_material')
>>> for row in iter:
...     print row 
(u'4d66c66840df6fdf9c32a51224c72701', u'one material')
(u'4d66c66840df6fdf9c32a51224c72701', u'two material')

def fill_dictionary(self, level, oid):

Returns a dictionary in the format that add_data_from_dictionary uses, containing all the iterations, branches and dates of the given level and oid:

>>> res = test.fill_dictionary('comp_unit', 'stand2')
>>> for item in res['comp_unit']:
...     ddict = item[1]
...     print item[0], ddict['iteration'], ddict['branch'], ddict['values']
...     
2009-02-02 0 0 [(u'BA', 11.0), (u'SC', 1.0), (u'AnotherCatVar', 2.0)]
2009-02-02 0 1 [(u'BA', 21.0), (u'SC', 2.0), (u'AnotherCatVar', 3.0)]
2009-02-02 1 0 [(u'BA', 91.0), (u'SC', 9.0), (u'AnotherCatVar', 8.0)]

def get_level_var_headers(self, level):

Will return the variable headers of the table of the given level

>>> test.get_level_var_headers(0)
['DIAM_CLASS_WIDTH']
>>> test.get_level_var_headers(1)
['EstateName']
>>> test.get_level_var_headers(3)
['N', 'SP']

def get_data_from_level(self, from_data, headers, level, constraints={}, dates=None, rsort=None, required=None):

Will return an iterator for the data (as requested by headers) from the given level with the given constraints based on the given dictionary ({col: [op, val(, val)]}, op one of (‘gt’, ‘ge’, ‘eq’, ‘ue’, ‘le’, ‘lt’, ‘in’) and multiple vals only for ‘in’) dates is a tuple with the start and end date from between which the data is retrieved. rsort is the column by which the returned data should be sorted and required is a list of columns that must be non-null for the rows to be included in the results.

>>> headers = test.get_level_var_headers(4)
>>> headers = ['id', 'iteration', 'branch'] + headers
>>> constraints = [('id', 'in', ('tree1-1-1', 'tree1-2-1'), 'and'),
...                ('branch', 'in', (0, 1))]
>>> iter = test.get_data_from_level('data', headers, 4, constraints,
...                                 rsort=('id', 'iteration', 'branch'))
>>> for item in iter:
...     print item
(u'tree1-1-1', 0, 0, 11.0)
(u'tree1-1-1', 0, 1, 21.0)
(u'tree1-1-1', 1, 0, 91.0)
(u'tree1-2-1', 0, 0, 12.0)
(u'tree1-2-1', 0, 1, 22.0)
(u'tree1-2-1', 1, 0, 92.0)
>>> headers = ['id', 'branch', 'op_name', 'cash_flow', 'Volume',
...            'assortment', 'op_date']
>>> iter = test.get_data_from_level('op_res', headers, 0)
>>> for item in iter:
...     print item 
(u'stand1', 1, u'thinning', 3.0, 8.0, 3.0, datetime.date(2009, 1, 1))
>>> iter = test.get_data_from_level('op_res', headers, 0,
...                                 required=['Volume'])
>>> for item in iter:
...     print item 
(u'stand1', 1, u'thinning', None, 8.0, 3.0, datetime.date(2009, 1, 1))
>>> headers = [('data', 'id'), ('data', 'iteration'), ('data', 'branch')]
>>> headers += [('data', h) for h in test.get_level_var_headers(4)]
>>> headers += [('op', 'id'), ('op', 'branch'), ('op', 'op_name'),
...             ('op', 'cash_flow'), ('op', 'Volume'), ('op', 'op_date')]
>>> constraints = [('data', 'id', 'in', ('tree1-1-1', 'tree1-2-1'), 'and'),
...                ('op', 'branch', 'in', (0, 1))]
>>> iter = test.get_data_from_level('combined', headers, 4, constraints,
...                 dates=(date(2008, 10, 10), date(2010, 10, 10)),
...                 rsort=(('op', 'id'), ('data', 'iteration'),
...                        ('data', 'branch'), ('op', 'Assortment')),
...                 required=(('op', 'Volume'), ('data', 'id')),
...                 distinct=True,
...                 group_by_headers=['Assortment'])
>>> for item in iter:
...     print item 
(u'tree1-2-1', 0, 1, 22.0, u'stand1', 1, u'thinning', None, 12.0,
 datetime.date(2009, 1, 1), 1.0)
(u'tree1-2-1', 0, 1, 22.0, u'stand1', 1, u'thinning', None, 4.0,
 datetime.date(2009, 1, 1), 2.0)

def get_ids(self, level):

Will return all ids from the level

>>> idlist = test.get_ids('data', 2)
>>> set(idlist)==set([u'stand1', u'stand2'])
True
>>> idset = set(test.get_ids('data', 3))
>>> resset = set([u'stratum1-1', u'stratum1-2', u'stratum2-1',
...               u'stratum2-2'])
>>> resset==idset
True

def get_max_iter(self, from_data):

Will return the max iteration in data:

>>> test.get_max_iter('data')
1

def get_max_branch(self, from_data):

Will return the max branch in data:

>>> test.get_max_branch('data')
3

def get_dates(self, from_data, dates):

Will return the unique dates on a given level, between the given tuple of dates (lower limit, higher limit) ::
>>> date_lim = (date(2000, 1, 1), date(2010, 1, 1))
>>> dset = set(test.get_dates('data', date_lim))
>>> dset==set([date(2009, 1, 1), date(2009, 2, 2)])
True

def get_dates_by_id_and_iter(self, level, dates):

Will return a dictionary with (id, iter) pairs as keys and unique dates for a given id as values

>>> rd = test.get_dates_by_id_and_iter(2, (date(2000, 1, 1),
...                                    date(2010, 1, 1)))
>>> rd[(u'stand1', 0)]
[datetime.date(2009, 1, 1)]
>>> rd[(u'stand1', 1)]
[datetime.date(2009, 1, 1)]
>>> rd[(u'stand2', 1)]
[datetime.date(2009, 2, 2)]
>>> rd[(u'stand2', 0)]
[datetime.date(2009, 2, 2)]
>>> rd = test.get_dates_by_id_and_iter(3, (date(2009, 2, 2),
...                                    date(2010, 1, 1)))
>>> len(rd)
0

def copy_to_db(self, key_list, lname, db, iteration):

For the given keys of (iteration, object id, orig id, branch, weight, copy_only_ops) copies the data to a new db from the given level. Level is taken as base level, below which all the children from different levels are copied as well:

>>> keys = [(0, 'stand1', 'stand1', 1, 1., False),
...         (0, 'stand2', 'stand2', 1, 1., False),
...         (0, 'stand1', 'stand1', 0, 1., False),
...         (1, 'stand1', 'stand1', 0, 1., False),
...         (1, 'stand2', 'stand2', 0, 1., False)]
>>> copydb = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy,
...                 levels, opres_def,
...                 cf_cfiers, mock_db.MockLogger(), 'test_copy.db',
...                 True, False, (3067, None), create_branch_desc=True,
...                 track_solution=True)
>>> test.copy_to_db(keys, 'comp_unit', copydb, 0)
>>> sql = 'SELECT DISTINCT id, iteration, branch FROM data_link '\
...       'ORDER BY data_id'
>>> copydb.db.get(sql) 
[(u'stand1', 0, 0), (u'stand2', 0, 1),
 (u'stand2', 1, 0), (u'stand1', 1, 0), (u'stand1', 0, 1)]
>>> sql = 'SELECT DISTINCT id FROM tree ORDER BY id'
>>> copydb.db.get(sql) 
[(u'tree1-1-1',), (u'tree1-2-1',), (u'tree2-1-1',), (u'tree2-2-1',),
 (u'tree2-2-2',)]
>>> headers = ['id', 'iteration', 'branch', 'op_name', 'op_group',
...            'cash_flow', 'Volume', 'assortment', 'op_date']
>>> opres1 = test.get_data_from_level('op_res', headers, 0)
>>> opres2 = copydb.get_data_from_level('op_res', headers, 0)
>>> opres1 = [i for i in opres1]
>>> opres1.sort()
>>> opres2 = [i for i in opres2]
>>> opres2.sort()
>>> opres1==opres2
True
>>> copydb.close()

>>> keys = [(0, 'stand1', 'stand1', 1, 0.25, False),
...         (0, 'stand2', 'stand2', 1, 1., False),
...         (0, 'stand1', 'stand1', 0, 0.75, False),
...         (1, 'stand1', 'stand1', 0, 1., False),
...         (1, 'stand2', 'stand2', 0, 1., False)]
>>> copydb = db.DataDB('write', (2, 'comp_unit'), content_def, hierarchy,
...                 levels, opres_def,
...                 cf_cfiers, mock_db.MockLogger(), 'test_copy.db',
...                 True, False, (3067, None), create_branch_desc=True,
...                 track_solution=True)
>>> test.copy_to_db(keys, 'comp_unit', copydb, 0)
>>> sql = 'SELECT DISTINCT id, iteration, branch FROM data_link '\
...       'ORDER BY data_id'
>>> copydb.db.get(sql) 
[(u'stand1', 0, 0), (u'stand2', 0, 1),
 (u'stand2', 1, 0), (u'stand1', 1, 0), (u'stand1', 0, 1)]
>>> sql = 'SELECT DISTINCT id FROM tree ORDER BY id'
>>> copydb.db.get(sql) 
[(u'tree1-1-1',), (u'tree1-2-1',), (u'tree2-1-1',), (u'tree2-2-1',),
 (u'tree2-2-2',)]
>>> headers = ['id', 'iteration', 'branch', 'op_name', 'op_group',
...            'cash_flow', 'Volume', 'assortment', 'op_date']
>>> opres1 = test.get_data_from_level('op_res', headers, 0)
>>> opres2 = copydb.get_data_from_level('op_res', headers, 0)
>>> opres1 = [i for i in opres1]
>>> opres1.sort()
>>> opres2 = [i for i in opres2]
>>> opres2.sort()
>>> opres1==opres2
True
>>> copydb.close()

def _create_constraints(self, constraints):

Will return a SQLite string of constraints based on the given dictionary ({col: [val, val, val]})

>>> const = [('id', 'in', ('tree1-1-1', 'tree1-2-1'), 'and'),
...          ('branch', 'in', (0, 1), 'or'), ('iteration', 'eq', 0)]
>>> test._create_constraints('data', const) 
('l.id IN (?,?) AND d.branch IN (?,?) OR d.iteration = ? ',
['tree1-1-1', 'tree1-2-1', 0, 1, 0])
>>> const = [('id', 'fail', 'this', 'and'),
...          ('test', 'eq', ('fail', 'fail')),
...          ('should not come here',)]
>>> test._create_constraints('data', const) 
db, error, Bad operation (fail) not in (le, lt, ge, gt, in, ue, eq)
db, error, missing concatenation operand from [('id', 'fail', 'this', 'and'),
('test', 'eq', ('fail', 'fail')), ('should not come here',)] index 1
('', [])
>>> const = [('arr', 'eq', ('fail', 'fail'), 'and'), ('rar', 'fail', 1)]
>>> test._create_constraints('data', const) 
db, error, Too many values for operation type (eq)
db, error, Bad operation (fail) not in (le, lt, ge, gt, in, ue, eq)
('', [])

def get_child_ids(self, lind, oid, iter=0, branch=0, ddate=None):

Recursively gets child ids for the given object and optionally for the given date:

>>> dd = test.get_child_ids(2, 'stand1')
>>> set(dd[3]) == set([u'stratum1-1', u'stratum1-2'])
True
>>> set(dd[4]) == set([u'tree1-1-1', u'tree1-2-1'])
True
>>> set(dd[5]) == set([u'plot1'])
True
>>> set(dd[6]) == set([u'sample_tree1-1', u'sample_tree1-2'])
True

def _get_branch_desc(self, id, iteration, to_branch):

Get a merged description for the given id, iteration, branch combo:

>>> test._get_branch_desc('stand1', 0, 0)
''
>>> test._get_branch_desc('stand1', 0, 1)
'name'
>>> test._get_branch_desc('stand1', 0, 2)
'name|name2'

def fill_branch_desc_table(self):

Fills the branch description table:

>>> test.db.get('SELECT * FROM branch_desc')
[]
>>> test.fill_branch_desc_table(('stand1', 'stand2'))
>>> test.db.get('SELECT * FROM branch_desc ORDER BY branch')
...     
[(0, u'stand1', 1, u'name', u'bgroup'),
 (0, u'stand1', 2, u'name|name2', u'bgroup2'),
 (0, u'stand1', 3, u'name|name3', u'bgroup3')]

def store_solution(self, keys, run_id):

This function will store a solution to the optimal table in the database, removing any and all old data under the given run_id. keys is an array of iteration, id, branch comboes:

>>> test.db.get('SELECT * FROM optimal')
[]
>>> test.store_solution([(0, 'test', 'test', 0, 1., False),
...                      (1, 'test2', 'test2', 1, 1., False)], 'test')
>>> test.db.get('SELECT * FROM optimal')
[(u'test', 0, u'test', 0, 1.0), (u'test', 1, u'test2', 1, 1.0)]
>>> test.store_solution([(2, 'test3', 'test3', 2, 1., False)], 'test')
>>> test.db.get('SELECT * FROM optimal')
[(u'test', 2, u'test3', 2, 1.0)]
>>> test.store_solution([(3, 'test4', 'test4', 3, 1., False)], 'test2')
>>> test.db.get('SELECT * FROM optimal')
[(u'test', 2, u'test3', 2, 1.0), (u'test2', 3, u'test4', 3, 1.0)]

def drop_id(self, id):

This function will drop all data from all the tables that refer to the given id:

>>> test.db.get('SELECT count(id) FROM comp_unit WHERE id=\'stand1\'')
[(3,)]
>>> test.db.get('SELECT count(id) FROM stratum WHERE pid=\'stand1\'')
[(6,)]
>>> test.db.get('SELECT count(id) FROM BRANCHING_INFO WHERE id=\'stand1\'')
[(3,)]
>>> test.db.get('SELECT count(id) FROM op_link WHERE id=\'stand1\'')
[(1,)]
>>> op_ids = test.db.get('SELECT op_id FROM op_link WHERE id=\'stand1\'')
>>> op_ids = [op_id[0] for op_id in op_ids]
>>> test.db.get('SELECT count(*) FROM op_res WHERE op_id IN (%s)' \
...             % ', '.join('?'*len(op_ids)), op_ids)
[(3,)]
>>> test.drop_id('stand1')
>>> test.db.get('SELECT count(*) FROM comp_unit WHERE id=\'stand1\'')
[(0,)]
>>> test.db.get('SELECT count(*) FROM stratum WHERE pid=\'stand1\'')
[(0,)]
>>> test.db.get('SELECT count(*) FROM BRANCHING_INFO WHERE id=\'stand1\'')
[(0,)]
>>> test.db.get('SELECT count(*) FROM op_link WHERE id=\'stand1\'')
[(0,)]
>>> test.db.get('SELECT count(*) FROM op_res WHERE op_id IN (%s)' \
...             % ', '.join('?'*len(op_ids)), op_ids)
[(0,)]

def drop_ids(self, ids):

This function will drop all data from all the tables that refer to the given ids:

>>> test.db.get('SELECT count(id) FROM optimal WHERE id=\'test3\'')
[(1,)]
>>> test.drop_ids(('stand1', 'test3'))
>>> test.db.get('SELECT count(id) FROM optimal WHERE id=\'test3\'')
[(0,)]

def _get_data_hash(self, lind, uid, iterr, branch, ddate):

Construct a hash string from data object key

Parameters

lind -- level ind as integer
uid -- unique id as string
iterr -- iteration as integer
branch -- branch as integer
ddate -- data date as datetime object
>>> from datetime import date
>>> test._get_data_hash(1, u'STAND1', 11, 0, date(2011,1,1))
u'1STAND1i11b02011-01-01'
>>> test._get_data_hash(1, u'STAND1', 1, 10, date(2011,1,1))
u'1STAND1i1b102011-01-01'

def _get_op_hash(self, uid, iterr, branch, ddate, op_name, counter=0):

Construct a hash string from operation key

Parameters

uid -- unique id as string
iterr -- iteration as integer
branch -- branch as integer
ddate -- data date as datetime object
op_name -- operation name as string
counter -- optional counter argument
>>> test._get_op_hash(u'STAND1', 11, 0, date(2011,1,1), u'thinning', 1)
u'STAND1i11b02011-01-01thinning1'
>>> test._get_op_hash(u'STAND1', 1, 10, date(2011,1,1), u'thinning', 1)
u'STAND1i1b102011-01-01thinning1'

def _encode_hash(self, hash):

Compute the md5 hash of a string

Parameters

hash -- hash string
>>> test._encode_hash(u'1STAND1i11b02011-01-01')
'954457c3b270b4b3b04c3da96559f2eb'
>>> test._encode_hash(u'1STAND1i1b102011-01-01')
'3c6e97a3468d9082a875681b227fd1d6'

def clear_data(self):

Clears data from level tables:

>>> test.clear_data()
>>> keys = test.content_def.keys()
>>> keys.sort()
>>> for cdef_key in keys:
...     level_name = cdef_key[0]
...     sql = 'SELECT * FROM %s' % level_name
...     res = test.db.get(sql)
...     print level_name, len(res)
comp_unit 0
estate 0
sample_esplot 0
sample_estate 0
sample_plot 0
sample_tree 0
simulation 0
stratum 0
tree 0
>>> test.close()

class OperationDB(SQLiteDB):

def __init__(self, db_name=’default.db’, wipe=False, memory=False,
db_host=’localhost’, db_port=5432, user=’test’, pw=’test’, db_class=’SQLITE’, db_database=’SIMO’, db_schema=’‘, extra_columns=[]):

>>> opdb = db.OperationDB('test_mock.db', True,
...                       extra_columns=[("comp_unit", "EXTRA_COLUMN")])
>>> opdb.close()

The init will create/open a connection to the given database file (wiping the old one if necessary) and will then create the tables it needs.

def _create_tables(self):

Creates the tables that OperationDB needs.

>>> opdb.open()
>>> sql = "select sql from sqlite_master where type = 'table'"
>>> iter = opdb.db.get(sql)
>>> for item in iter:
...     print item[0] 
CREATE TABLE forced_operation
                (op_id INTEGER NOT NULL,
                unit_id TEXT NOT NULL,
                op_level TEXT NOT NULL,
                iteration INTEGER NOT NULL,
                branch INTEGER NOT NULL,
                name TEXT NOT NULL,
                timingtype TEXT NOT NULL,
                timestep INTEGER,
                op_date DATE,
                stepunit TEXT,
                "EXTRA_COLUMN" DOUBLE PRECISION,
                CONSTRAINT pk_forced_operation
                    PRIMARY KEY (op_id, unit_id))
CREATE TABLE operation_chain
                (id INTEGER PRIMARY KEY,
                op_id INTEGER NOT NULL,
                pid TEXT NOT NULL,
                chain_ordinal INTEGER NOT NULL,
                chain TEXT NOT NULL,
                CONSTRAINT fk_pid FOREIGN KEY (op_id, pid) REFERENCES
                forced_operation (op_id, unit_id)
                ON DELETE CASCADE)

def add_operation(unit_id, level, iteration, branch, name, timingtype, timestep, date, stepunit, chains, op_id=None, ext_col_vals=None):

This adds new rows to the database.

>>> odate = date(2009, 3, 9)
>>> opdb.add_operation('stand1', 'comp_unit', 0, 0, 'mounding', 'step',
...                     2, None, 'year', ['Forced mounding'])
>>> opdb.add_operation('stand2', 'comp_unit', 0, 0, 'clearcut', 'step', 2,
...              None, 'year', ['Forced clearcut',
...              'Update comp_unit after forced clearcut or strip cut',
...              'Calculate productive value'])
>>> opdb.add_operation('stand3', 'comp_unit', 0,0,'clearcut', 'date', None,
...      odate, 'year', ['Forced clearcut',
...      'Update comp_unit after forced clearcut or strip cut',
...      'Calculate productive value'])
>>> opdb.add_operation('stand4', 'comp_unit', 0,0,'clearcut', 'date', None,
...      odate, 'year', ['Forced clearcut',
...      'Update comp_unit after forced clearcut or strip cut',
...      'Calculate productive value'], ext_col_vals={"EXTRA_COLUMN": 1000.})
>>> iter = opdb.db.get('select * from forced_operation')
>>> expected_results = [
...     [1, u'stand1', u'comp_unit', 0, 0, u'mounding', u'step', 2, None,
...      u'year'],
...     [1, u'stand2', u'comp_unit', 0, 0, u'clearcut', u'step', 2, None,
...      u'year'],
...     [1, u'stand3', u'comp_unit', 0, 0, u'clearcut', u'date', None,
...      odate, u'year'],
...     [1, u'stand4', u'comp_unit', 0, 0, u'clearcut', u'date', None,
...      odate, u'year']]
>>> i = 0
>>> for row in iter:
...     res = expected_results[i]
...     i+=1
...     for val, exp in zip(row, res):
...         if type(val) != type(exp):
...             print 'Type mismatch:', type(val), type(exp)
...         if val != exp:
...             print 'Value mismatch: got', val, 'expected', exp
...
>>> iter = opdb.db.get('select * from operation_chain')
>>> expected_results = [
...     [1, 1, u'stand1', 0, u'Forced mounding'],
...     [2, 1, u'stand2', 0, u'Forced clearcut'],
...     [3, 1, u'stand2', 1, u'Update comp_unit after forced clearcut or strip cut'],
...     [4, 1, u'stand2', 2, u'Calculate productive value'],
...     [5, 1, u'stand3', 0, u'Forced clearcut'],
...     [6, 1, u'stand3', 1, u'Update comp_unit after forced clearcut or strip cut'],
...     [7, 1, u'stand3', 2, u'Calculate productive value'],
...     [8, 1, u'stand4', 0, u'Forced clearcut'],
...     [9, 1, u'stand4', 1, u'Update comp_unit after forced clearcut or strip cut'],
...     [10, 1, u'stand4', 2, u'Calculate productive value']]
>>> i = 0
>>> for row in iter:
...     res = expected_results[i]
...     i+=1
...     for val, exp in zip(row, res):
...         if type(val) != type(exp):
...             print 'Type mismatch:', type(val), type(exp)
...         if val != exp:
...             print 'Value mismatch: got', val, 'expected', exp

def fill_dictionary(self, levelname, ids):

Fills a dictionary with object ids as keys (ids) and ForcedOperation instances as data

>>> obj_ids = [('stand1', 'o-stand1'), ('stand2', 'stand2')]
>>> fcd_op_dict = opdb.fill_dictionary('comp_unit', obj_ids)
>>> for key, vals in fcd_op_dict.items(): 
...     for val in vals:
...         print key, '-', val.id, val.level, val.timing_type, \
...               val.time_step,
...         print val.date, val.step_unit,
...         print val.chain_names,
...         print val.chain_indices
(0, 0, u'stand1') -
    stand1 comp_unit step 2 None year
    [u'Forced mounding']
    None
(0, 0, u'stand2') -
    stand2 comp_unit step 2 None year
    [u'Forced clearcut',
     u'Update comp_unit after forced clearcut or strip cut',
     u'Calculate productive value']
    None

def clear_data(self):

Clears data from data tables:

>>> opdb.clear_data()
>>> sql = 'SELECT * FROM forced_operation'
>>> res = opdb.db.get(sql)
>>> res
[]
>>> sql = 'SELECT * FROM operation_chain'
>>> res = opdb.db.get(sql)
>>> res
[]
>>> opdb.close()
>>> opdb = None

class LoggerDB(SQLiteDB):

def __init__(self, db_path=’default.db’, wipe=False):

Connects to the given database file (wiping the existing one if requested) and runs _create_log_table():

>>> test = db.LoggerDB('test_mock.db', True)

def _create_log_table(self):

Creates the log table into the database:

>>> sql = "select sql from sqlite_master where type='table' and "\
...       "name='log'"
>>> iter = test.db.get(sql)
>>> for item in iter:
...     print item[0]
CREATE TABLE log(
                run_id TEXT,
                log_level TEXT,
                module TEXT,
                time TIMESTAMP,
                sim_id TEXT,
                message TEXT)

def add_log_message(self, level, module, timestamp, sim_id, message):

This will add a new log message into the database:

>>> from datetime import datetime
>>> date = datetime(2009, 2, 2, 15, 17, 56, 626329)
>>> date2 = datetime(2010, 4, 9, 10, 25)
>>> test.add_log_message('test', 'one', 'two', date, 'stand1', 'Ni!')
>>> test.add_log_message('test2', 'three', 'four', date2, 'stand2',
...                      'Shrubbery!')
>>> iter = test.db.get('select * from log')
>>> for item in iter:
...     print item 
(u'test', u'one', u'two',
 datetime.datetime(2009, 2, 2, 15, 17, 56, 626329), u'stand1', u'Ni!')
(u'test2', u'three', u'four',
 datetime.datetime(2010, 4, 9, 10, 25), u'stand2', u'Shrubbery!')

def get_log_messages(self, level=None, module=None, start_time=None, end_time=None):

This method retrieves log data with the given constraints. level and module are tuples containing wanted levels and modules respectively. start_time and end_time are timestamps (datetime instances) to constrain the output.

>>> pprint(test.get_log_messages())
[(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329),
  u'two',
  u'one',
  u'test',
  u'stand1',
  u'Ni!'),
 (datetime.datetime(2010, 4, 9, 10, 25),
  u'four',
  u'three',
  u'test2',
  u'stand2',
  u'Shrubbery!')]

>>> pprint(test.get_log_messages(level=('one','six','ten')))
[(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329),
  u'two',
  u'one',
  u'test',
  u'stand1',
  u'Ni!')]

>>> pprint(test.get_log_messages(module=('four', 'three', 'seven')))
[(datetime.datetime(2010, 4, 9, 10, 25),
  u'four',
  u'three',
  u'test2',
  u'stand2',
  u'Shrubbery!')]

>>> pprint(test.get_log_messages(run_id=('test2', 'tset3', 'nonexistant')))
[(datetime.datetime(2010, 4, 9, 10, 25),
  u'four',
  u'three',
  u'test2',
  u'stand2',
  u'Shrubbery!')]

>>> pprint(test.get_log_messages(start_time=date2))
[(datetime.datetime(2010, 4, 9, 10, 25),
  u'four',
  u'three',
  u'test2',
  u'stand2',
  u'Shrubbery!')]

>>> pprint(test.get_log_messages(end_time=date))
[(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329),
  u'two',
  u'one',
  u'test',
  u'stand1',
  u'Ni!')]

>>> pprint(test.get_log_messages(start_time=date, end_time=date2))
[(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329),
  u'two',
  u'one',
  u'test',
  u'stand1',
  u'Ni!'),
 (datetime.datetime(2010, 4, 9, 10, 25),
  u'four',
  u'three',
  u'test2',
  u'stand2',
  u'Shrubbery!')]

>>> pprint(test.get_log_messages(run_id=('test', 'test2'),
...                              level=('one', 'three'),
...                              module=('two', 'four'),
...                              start_time=date, end_time=date2))
[(datetime.datetime(2009, 2, 2, 15, 17, 56, 626329),
  u'two',
  u'one',
  u'test',
  u'stand1',
  u'Ni!'),
 (datetime.datetime(2010, 4, 9, 10, 25),
  u'four',
  u'three',
  u'test2',
  u'stand2',
  u'Shrubbery!')]

>>> test.get_log_messages(level=('fail'))
[]
>>> test.get_log_messages(module=('fail'))
[]
>>> test.get_log_messages(start_time=datetime(1800, 1, 1),
...                       end_time=datetime(1805, 1, 1))
[]

>>> test.get_log_messages(run_id=('test',), empty_log=True)
>>> pprint(test.get_log_messages())
[(datetime.datetime(2010, 4, 9, 10, 25),
  u'four',
  u'three',
  u'test2',
  u'stand2',
  u'Shrubbery!')]

>>> test.close()
>>> test = None