Table Of Contents

Previous topic

predictioncaller.py

Next topic

aggregationarg.py

This Page

predictionmemory.py

>>> import numpy
>>> from datetime import date
>>> from minimock import Mock

>>> model = Mock('Model')
>>> model.name = 'dummymodel'
>>> data = Mock('Data')
>>> data.get_tind.mock_returns = numpy.array([[0,0,1,0,0],
...                                           [0,0,2,0,0]], dtype=int)
>>> data.get_date.mock_returns = numpy.array(\
...     [date(2005,1,1) for i in range(4)])
>>> timespan = Mock('Timespan')
>>> timespan.time_step = 5
>>> sim = Mock('Simulator')
>>> sim.chain_index = 5
>>> sim.timespan = timespan
>>> sim.level = 0
>>> sim.data = data
>>> arg = Mock('Arg')

class PredictionModelMemory(object):

Class for handling prediction model memory

Attributes:

  • _mem: three dimensional numpy array with following dimensions (branches, objects, (modelname,chain) indices)
  • _map: dictionary mapping (modelname,chain) keys to memory array indices
  • _shp: memory array shape
  • _blocked: currently blocked objects

def __init__(self):

Initialize model memory

>>> from simo.simulation.caller.predictionmemory import PredictionModelMemory
>>> m = PredictionModelMemory()

def _key_check(self, key, level):

Check the given is mapped to memory indice, create mapping if not and return memory indice:

>>> m._key_check(('mod1',0), 1)
0
>>> m._key_check(('mod1',0), 1)
0
>>> m._key_check(('mod1',5), 2)
1
>>> m._key_check(('mod2',0), 1)
2
>>> m._key_check(('mod2',5), 2)
3
>>> m._key_check(('mod3',10), 3)
4
>>> keys = m._map.keys()
>>> keys.sort()
>>> print keys
[('mod1', 0), ('mod1', 5), ('mod2', 0), ('mod2', 5), ('mod3', 10)]
>>> vals = m._map.values()
>>> vals.sort()
>>> print vals
[0, 1, 2, 3, 4]
>>> m._level_map[1]
set([0, 2])
>>> m._level_map[2]
set([1, 3])
>>> m._level_map[3]
set([4])

def _size_check(self, tind, ind):

Check that model memory structure is large enough in all dimensions, resize memory if not:

>>> m._shp
(1, 1, 0, 0)
>>> obj_row = (0,2,2)
>>> m._size_check(obj_row, 2)
>>> m._shp
(1, 3, 3, 3)
>>> obj_row = (0,4,2)
>>> m._size_check(obj_row, 2)
>>> m._shp
(1, 5, 3, 3)
>>> obj_row = (0,4,5)
>>> m._size_check(obj_row, 2)
>>> m._shp
(1, 5, 6, 3)
>>> m._size_check(obj_row, 6)
>>> m._shp
(1, 5, 6, 7)

def use(self, model, sim, depthind, tind):

Check if model memory should be used:

>>> m = PredictionModelMemory()
>>> m.use(model, sim, 0, None)
(False, None)

def update(self, arg, model, sim):

Update prediction model memory. Divide model results for the defined time period and set the ‘last usage’ for the model result values for given objects.

>>> arg.result_vars = [Mock('ResultVariable'),
...                   Mock('ResultVariable'),
...                   Mock('ResultVariable'),
...                   Mock('ResultVariable')]
>>> arg.result_vars[0].cumulation = 'once'
>>> arg.result_vars[1].cumulation = 'total'
>>> arg.result_vars[2].cumulation = 'annual'
>>> arg.result_vars[3].cumulation = 'annual%'
>>> arg.result_vars[0].time_span = 10
>>> arg.result_vars[1].time_span = 10
>>> arg.result_vars[2].time_span = 10
>>> arg.result_vars[3].time_span = 10
>>> arg.result_vars[0].time_unit = 'year'
>>> arg.result_vars[1].time_unit = 'year'
>>> arg.result_vars[2].time_unit = 'year'
>>> arg.result_vars[3].time_unit = 'year'
>>> arg.targetindex = numpy.array([[0,0,1,0,0,],
...                                [0,0,2,0,0,],
...                                [0,1,0,0,0,],
...                                [0,1,1,0,0,]], dtype=int)
>>> arg.mem = numpy.ones((4, 4), dtype=float) * 5.0
>>> arg.num_of_res_vars = 4
>>> arg.num_of_res_objs = numpy.ones(4, dtype=int)

Update model memory for the first time:

>>> arg.target_index = numpy.array([[0,0,0,0,0,],
...                                 [0,0,3,0,0,],
...                                 [0,2,0,0,0,],
...                                 [0,2,3,0,0,]], dtype=int)
>>> m.update(arg, model, sim, 0)
Called Data.get_date(
    array([[0, 0, 0, 0, 0],
       [0, 0, 3, 0, 0],
       [0, 2, 0, 0, 0],
       [0, 2, 3, 0, 0]]))
True
>>> arg.mem
array([[  5.        ,   5.        ,   5.        ,   5.        ],
       [  2.5       ,   2.5       ,   2.5       ,   2.5       ],
       [ 25.        ,  25.        ,  25.        ,  25.        ],
       [ 27.62815625,  27.62815625,  27.62815625,  27.62815625]])
>>> m._map
{('dummymodel', 5): 0}
>>> m._shp
(1, 3, 4, 1)
>>> m._mem[0,0,:,0]
array([2014-01-01, 0001-01-01, 0001-01-01, 2014-01-01], dtype=object)

Update already existing model memory:

>>> m.update(arg, model, sim, 0)
Called Data.get_date(
    array([[0, 0, 0, 0, 0],
       [0, 0, 3, 0, 0],
       [0, 2, 0, 0, 0],
       [0, 2, 3, 0, 0]]))
True
>>> m._map
{('dummymodel', 5): 0}
>>> m._mem[0,0,:,0]
array([2014-01-01, 0001-01-01, 0001-01-01, 2014-01-01], dtype=object)

Update model memory with new chain index:

>>> sim.chain_index = 0
>>> m.update(arg, model, sim, 0)
Called Data.get_date(
    array([[0, 0, 0, 0, 0],
       [0, 0, 3, 0, 0],
       [0, 2, 0, 0, 0],
       [0, 2, 3, 0, 0]]))
True
>>> m._map
{('dummymodel', 5): 0, ('dummymodel', 0): 1}
>>> m._mem[0,0,:,1]
array([2014-01-01, 0001-01-01, 0001-01-01, 2014-01-01], dtype=object)

Test use of existing model memory:

>>> sim.chain_index = 5
>>> tind = numpy.array([[0,0,0,0,0],
...                     [0,0,1,0,0],
...                     [0,0,2,0,0],
...                     [0,0,3,0,0]], dtype=int)
>>> blocks = []
>>> use, tind = m.use(model, sim, 0, tind)
Called Data.get_date(
    array([[0, 0, 0, 0, 0],
       [0, 0, 1, 0, 0],
       [0, 0, 2, 0, 0],
       [0, 0, 3, 0, 0]]))
Called Data.block(0, 0, array([[0, 0, 0, 0, 0],
       [0, 0, 3, 0, 0]]))
Called Data.get_tind(0, 0)
>>> use
False
>>> tind
array([0, 0, 1, 0, 0])

def release_blocked_objects(self, sim, depthind):

Release objects that were blocked from evaluation because the model had active results stored in the memory:

>>> m.release_blocked_objects(sim, 2)
Called Data.release(0, 2, array([[0, 0, 0, 0, 0],
       [0, 0, 3, 0, 0]]))

def del_objects(self, it, br, objs, level):

Delete objects from operation memory (reset objects on given level)

>>> m.del_objects(0, 0, [0,1,2], 0)
>>> m._mem[0,0,:,:]
array([[0001-01-01, 0001-01-01],
       [0001-01-01, 0001-01-01],
       [0001-01-01, 0001-01-01],
       [2014-01-01, 2014-01-01]], dtype=object)

def add_branch(self, par_obj, new_obj, level):

Update prediction memory after adding a new branch to simulation data matrix

>>> m._level_map = {2: set([0, 1])}
>>> par_obj = (0,0,3)
>>> new_obj = (0,1,3)
>>> m.add_branch(par_obj, new_obj, 2)
>>> m._mem[0,(0,1),3,:]
array([[2014-01-01, 2014-01-01],
       [2014-01-01, 2014-01-01]], dtype=object)

def reset(self):

Reset prediction memory

>>> m.reset()
>>> m._mem[0,0,:,:]
array([[0001-01-01, 0001-01-01],
       [0001-01-01, 0001-01-01],
       [0001-01-01, 0001-01-01],
       [0001-01-01, 0001-01-01]], dtype=object)

def _total(self, value, preddiv):

Divide value with prediction divider:

>>> m._total(5.0, 5.0)
1.0

def _totalpercentage(self, value, preddiv, sim):

Calculate total percentage value:

>>> m._totalpercentage(5.0, 5.0, sim)
0.9805797673485328

def _totalprob(self, value, preddiv):

Calculate total removal probability over a period of time:

>>> m._totalprob(5.0, 5.0)
1.3797296614612149

def _divide_results(self, arg, model, sim, preddiv):

Divide the result values between timesteps in the timespan. Results can be cumulated over multiple years in alternative ways which is controlled with cumulation type attribute. The cumulation types are: - once: the result is calculated once, do nothing - total: the value is divided with result timespan / timestep - total%: see _totalpercentage method - annual: the result is multiplied with simulation timestep - annual%: the same as above, but for percentages - totalprob: ...

Divide the results with alternative cumulation types

>>> arg = Mock('Arg')
>>> arg.num_of_res_vars = 5
>>> arg.result_vars = [Mock('RV'),Mock('RV'),Mock('RV'),Mock('RV'),
...                   Mock('RV')]
>>> arg.result_vars[0].cumulation = 'total'
>>> arg.result_vars[1].cumulation = 'total%'
>>> arg.result_vars[2].cumulation = 'annual'
>>> arg.result_vars[3].cumulation = 'annual%'
>>> arg.result_vars[4].cumulation = 'totalprob'
>>> arg.num_of_res_objs = numpy.array([1, 1, 1], dtype=int)
>>> arg.mem = numpy.ones((5, 3), dtype=float)
>>> m._divide_results(arg, model, sim, 5.0)
True
>>> arg.mem
array([[ 0.2       ,  0.2       ,  0.2       ],
       [ 0.19920477,  0.19920477,  0.19920477],
       [ 5.        ,  5.        ,  5.        ],
       [ 5.10100501,  5.10100501,  5.10100501],
       [ 1.        ,  1.        ,  1.        ]])