>>> import numpy
>>> from datetime import date
>>> from minimock import Mock
>>> model = Mock('Model')
>>> model.name = 'dummymodel'
>>> data = Mock('Data')
>>> data.get_tind.mock_returns = numpy.array([[0,0,1,0,0],
... [0,0,2,0,0]], dtype=int)
>>> data.get_date.mock_returns = numpy.array(\
... [date(2005,1,1) for i in range(4)])
>>> timespan = Mock('Timespan')
>>> timespan.time_step = 5
>>> sim = Mock('Simulator')
>>> sim.chain_index = 5
>>> sim.timespan = timespan
>>> sim.level = 0
>>> sim.data = data
>>> arg = Mock('Arg')
Class for handling prediction model memory
Attributes:
Initialize model memory
>>> from simo.simulation.caller.predictionmemory import PredictionModelMemory
>>> m = PredictionModelMemory()
Check the given is mapped to memory indice, create mapping if not and return memory indice:
>>> m._key_check(('mod1',0), 1)
0
>>> m._key_check(('mod1',0), 1)
0
>>> m._key_check(('mod1',5), 2)
1
>>> m._key_check(('mod2',0), 1)
2
>>> m._key_check(('mod2',5), 2)
3
>>> m._key_check(('mod3',10), 3)
4
>>> keys = m._map.keys()
>>> keys.sort()
>>> print keys
[('mod1', 0), ('mod1', 5), ('mod2', 0), ('mod2', 5), ('mod3', 10)]
>>> vals = m._map.values()
>>> vals.sort()
>>> print vals
[0, 1, 2, 3, 4]
>>> m._level_map[1]
set([0, 2])
>>> m._level_map[2]
set([1, 3])
>>> m._level_map[3]
set([4])
Check that model memory structure is large enough in all dimensions, resize memory if not:
>>> m._shp
(1, 1, 0, 0)
>>> obj_row = (0,2,2)
>>> m._size_check(obj_row, 2)
>>> m._shp
(1, 3, 3, 3)
>>> obj_row = (0,4,2)
>>> m._size_check(obj_row, 2)
>>> m._shp
(1, 5, 3, 3)
>>> obj_row = (0,4,5)
>>> m._size_check(obj_row, 2)
>>> m._shp
(1, 5, 6, 3)
>>> m._size_check(obj_row, 6)
>>> m._shp
(1, 5, 6, 7)
Check if model memory should be used:
>>> m = PredictionModelMemory()
>>> m.use(model, sim, 0, None)
(False, None)
Update prediction model memory. Divide model results for the defined time period and set the ‘last usage’ for the model result values for given objects.
>>> arg.result_vars = [Mock('ResultVariable'),
... Mock('ResultVariable'),
... Mock('ResultVariable'),
... Mock('ResultVariable')]
>>> arg.result_vars[0].cumulation = 'once'
>>> arg.result_vars[1].cumulation = 'total'
>>> arg.result_vars[2].cumulation = 'annual'
>>> arg.result_vars[3].cumulation = 'annual%'
>>> arg.result_vars[0].time_span = 10
>>> arg.result_vars[1].time_span = 10
>>> arg.result_vars[2].time_span = 10
>>> arg.result_vars[3].time_span = 10
>>> arg.result_vars[0].time_unit = 'year'
>>> arg.result_vars[1].time_unit = 'year'
>>> arg.result_vars[2].time_unit = 'year'
>>> arg.result_vars[3].time_unit = 'year'
>>> arg.targetindex = numpy.array([[0,0,1,0,0,],
... [0,0,2,0,0,],
... [0,1,0,0,0,],
... [0,1,1,0,0,]], dtype=int)
>>> arg.mem = numpy.ones((4, 4), dtype=float) * 5.0
>>> arg.num_of_res_vars = 4
>>> arg.num_of_res_objs = numpy.ones(4, dtype=int)
Update model memory for the first time:
>>> arg.target_index = numpy.array([[0,0,0,0,0,],
... [0,0,3,0,0,],
... [0,2,0,0,0,],
... [0,2,3,0,0,]], dtype=int)
>>> m.update(arg, model, sim, 0)
Called Data.get_date(
array([[0, 0, 0, 0, 0],
[0, 0, 3, 0, 0],
[0, 2, 0, 0, 0],
[0, 2, 3, 0, 0]]))
True
>>> arg.mem
array([[ 5. , 5. , 5. , 5. ],
[ 2.5 , 2.5 , 2.5 , 2.5 ],
[ 25. , 25. , 25. , 25. ],
[ 27.62815625, 27.62815625, 27.62815625, 27.62815625]])
>>> m._map
{('dummymodel', 5): 0}
>>> m._shp
(1, 3, 4, 1)
>>> m._mem[0,0,:,0]
array([2014-01-01, 0001-01-01, 0001-01-01, 2014-01-01], dtype=object)
Update already existing model memory:
>>> m.update(arg, model, sim, 0)
Called Data.get_date(
array([[0, 0, 0, 0, 0],
[0, 0, 3, 0, 0],
[0, 2, 0, 0, 0],
[0, 2, 3, 0, 0]]))
True
>>> m._map
{('dummymodel', 5): 0}
>>> m._mem[0,0,:,0]
array([2014-01-01, 0001-01-01, 0001-01-01, 2014-01-01], dtype=object)
Update model memory with new chain index:
>>> sim.chain_index = 0
>>> m.update(arg, model, sim, 0)
Called Data.get_date(
array([[0, 0, 0, 0, 0],
[0, 0, 3, 0, 0],
[0, 2, 0, 0, 0],
[0, 2, 3, 0, 0]]))
True
>>> m._map
{('dummymodel', 5): 0, ('dummymodel', 0): 1}
>>> m._mem[0,0,:,1]
array([2014-01-01, 0001-01-01, 0001-01-01, 2014-01-01], dtype=object)
Test use of existing model memory:
>>> sim.chain_index = 5
>>> tind = numpy.array([[0,0,0,0,0],
... [0,0,1,0,0],
... [0,0,2,0,0],
... [0,0,3,0,0]], dtype=int)
>>> blocks = []
>>> use, tind = m.use(model, sim, 0, tind)
Called Data.get_date(
array([[0, 0, 0, 0, 0],
[0, 0, 1, 0, 0],
[0, 0, 2, 0, 0],
[0, 0, 3, 0, 0]]))
Called Data.block(0, 0, array([[0, 0, 0, 0, 0],
[0, 0, 3, 0, 0]]))
Called Data.get_tind(0, 0)
>>> use
False
>>> tind
array([0, 0, 1, 0, 0])
Release objects that were blocked from evaluation because the model had active results stored in the memory:
>>> m.release_blocked_objects(sim, 2)
Called Data.release(0, 2, array([[0, 0, 0, 0, 0],
[0, 0, 3, 0, 0]]))
Delete objects from operation memory (reset objects on given level)
>>> m.del_objects(0, 0, [0,1,2], 0)
>>> m._mem[0,0,:,:]
array([[0001-01-01, 0001-01-01],
[0001-01-01, 0001-01-01],
[0001-01-01, 0001-01-01],
[2014-01-01, 2014-01-01]], dtype=object)
Update prediction memory after adding a new branch to simulation data matrix
>>> m._level_map = {2: set([0, 1])}
>>> par_obj = (0,0,3)
>>> new_obj = (0,1,3)
>>> m.add_branch(par_obj, new_obj, 2)
>>> m._mem[0,(0,1),3,:]
array([[2014-01-01, 2014-01-01],
[2014-01-01, 2014-01-01]], dtype=object)
Reset prediction memory
>>> m.reset()
>>> m._mem[0,0,:,:]
array([[0001-01-01, 0001-01-01],
[0001-01-01, 0001-01-01],
[0001-01-01, 0001-01-01],
[0001-01-01, 0001-01-01]], dtype=object)
Calculate total percentage value:
>>> m._totalpercentage(5.0, 5.0, sim)
0.9805797673485328
Calculate total removal probability over a period of time:
>>> m._totalprob(5.0, 5.0)
1.3797296614612149
Divide the result values between timesteps in the timespan. Results can be cumulated over multiple years in alternative ways which is controlled with cumulation type attribute. The cumulation types are: - once: the result is calculated once, do nothing - total: the value is divided with result timespan / timestep - total%: see _totalpercentage method - annual: the result is multiplied with simulation timestep - annual%: the same as above, but for percentages - totalprob: ...
Divide the results with alternative cumulation types
>>> arg = Mock('Arg')
>>> arg.num_of_res_vars = 5
>>> arg.result_vars = [Mock('RV'),Mock('RV'),Mock('RV'),Mock('RV'),
... Mock('RV')]
>>> arg.result_vars[0].cumulation = 'total'
>>> arg.result_vars[1].cumulation = 'total%'
>>> arg.result_vars[2].cumulation = 'annual'
>>> arg.result_vars[3].cumulation = 'annual%'
>>> arg.result_vars[4].cumulation = 'totalprob'
>>> arg.num_of_res_objs = numpy.array([1, 1, 1], dtype=int)
>>> arg.mem = numpy.ones((5, 3), dtype=float)
>>> m._divide_results(arg, model, sim, 5.0)
True
>>> arg.mem
array([[ 0.2 , 0.2 , 0.2 ],
[ 0.19920477, 0.19920477, 0.19920477],
[ 5. , 5. , 5. ],
[ 5.10100501, 5.10100501, 5.10100501],
[ 1. , 1. , 1. ]])