.. _optimizer-py: ############ optimizer.py ############ Super class for optimization algorithms ************************ class Optimizer(object): ************************ Parent class for all heuristic optimization algorithms. Parameters: - ... def __init__(self, logger, logname, optlogger, taskdef, simdbin, opdbin, simdbout, opdbout, \*\*keywords): ========================================================================================================== Initialize optimizer:: >>> epsilon = 0.00001 >>> from simo.optimization.optimizer import Optimizer >>> execfile('optimization/test/mocks4optimizer.py') >>> chart_path = 'optimization/test' >>> opt = Optimizer(logger, logname, taskdef, simdbin, simdbout, ... False, False, False, chart_path, True, 'test', True, ... keyword1='test-kw') def _add_error(self, msg): ========================== Log error message:: >>> opt._add_error('Test error message') # doctest: +NORMALIZE_WHITESPACE Called Logger.log_message('optimization-test', 'ERROR', 'Test error message') def _add_info(self, msg): ========================= Log info message :: >>> opt._add_info('Test info message') # doctest: +NORMALIZE_WHITESPACE Called Logger.log_message('optimization-test', 'INFO', 'Test info message') def set_data(self, iteration): ============================== Parameters :: iteration -- current simulation iteration indice as int Set optimization data: collect values from data and operation result databases into data structure:: >>> opt._data = omatrix1 # replace data handler with a mock >>> opt.set_data(0) # doctest: +ELLIPSIS Called OMatrix.construct_data(0) Called Logger.log_message( 'optimization-test', 'ERROR', 'Could not construct optimization data matrix') False >>> opt._data = omatrix2 # replace data handler with another mock >>> opt.set_data(0) # doctest: +ELLIPSIS Called OMatrix.construct_data(0) True >>> opt._global_best_U >>> opt._global_best_sol array([0, 0, 0, 0, 0]) def _lambda_opt_move(self, solution, mylambda): =============================================== Do a random lambda opt move, ie. change the treatment of lambda units randomly in the current solution Parameters:: solution -- current solution as numpy array mylambda -- the number of units to be changed as int :: >>> import numpy >>> S = numpy.zeros(5, dtype=int) >>> S_new = numpy.copy(S) >>> opt._lambda_opt_move(S_new, 1) True >>> sum(S != S_new) 1 >>> S_new = numpy.copy(S) >>> opt._lambda_opt_move(S_new, 2) True >>> sum(S != S_new) 2 >>> S_new = numpy.copy(S) >>> opt._lambda_opt_move(S_new, 3) True >>> sum(S != S_new) 3 >>> S_new = numpy.copy(S) >>> opt._lambda_opt_move(S_new, 4) True >>> sum(S != S_new) 4 Try doing N-opt move when number of units equals N and some of the units have only a single branch (impossible lambda-opt move):: >>> S_new = numpy.copy(S) >>> opt._lambda_opt_move(S_new, 5) Called Logger.log_message( 'optimization-test', 'ERROR', 'Could not make 5-opt move!') False def _generate_random_solution(self, num_of_solutions=100): ========================================================== Choose a feasible random solution by randomly choosing a branch for each unit in the data. Parameters:: num_of_solutions -- number of initial solutions as int :: >>> opt._generate_random_solution(2) # doctest: +ELLIPSIS Called Logger.log_message( 'optimization-test', 'INFO', ' finding random initial solution') Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called Logger.log_message( 'optimization-test', 'INFO', ' feasible initial solution found in ... secs') True >>> opt._curr_sol array([0, 0, 0, 0, 0]) >>> opt._best_sol array([0, 0, 0, 0, 0]) def _store_solution(self, limit=None): ====================================== Store the optimal solution of a single iteration Parameters:: limit -- maximum unit number as int :: >>> opt._best_U = 0.8 >>> opt._best_sol[:] = [2,9,5,3,7] >>> opt._store_solution() #doctest: +ELLIPSIS Called OMatrix.compare_utilities(None, 0.8...) Called OMatrix.get_status(array([2, 9, 5, 3, 7])) Called Logger.log_message( 'optimization-test', 'INFO', ' utility: 0.800\tbest so far: 0.800') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') >>> opt._best_sol[:] = [3,4,6,0,1] >>> opt._store_solution() #doctest: +ELLIPSIS Called OMatrix.compare_utilities(0.8..., 0.8...) Called OMatrix.get_status(array([3, 4, 6, 0, 1])) Called Logger.log_message( 'optimization-test', 'INFO', ' utility: 0.800\tbest so far: 0.800') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') >>> opt._global_best_sol array([3, 4, 6, 0, 1]) def _store_results(self, repeats, iteration, log_util=True, skip_weight_splits=None): ===================================================================================== Parameters:: iteration -- current simulated iteration indice as int Store the optimum treatment schedules to result databases :: >>> opt._store_results(3, 0) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE Called OMatrix.get_status(array([3, 4, 6, 0, 1])) Called Logger.log_message( 'optimization-test', 'INFO', 'Best utility out of 3 repeats: 0.800') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') Called Logger.log_message( 'optimization-test', 'INFO', 'Writing optimized treatment schedule to result database(s)...') Called SimInputDB.store_solution( [...], 'test', False) Called SimInputDB.copy_to_db( [...], 'comp_unit', , 0, skip_weight_splits=None) Called Logger.log_message( 'optimization-test', 'INFO', 'Writing completed in ... min') def _run_algorithm(self, runf, repeats, initial_solutions): =========================================================== Run an optimization algorithm Parameters: runf -- algorithm specific runner method repeats -- number of repeats as int initial_solutions -- number of initial random solutions as int :: >>> from minimock import Mock >>> runf = Mock('run_my_algorithm') >>> opt._stat_logger = Mock('StatLogger') >>> opt._run_algorithm(runf, 3, 2) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE Called OMatrix.analyze_data( , >) Called OMatrix.construct_data(0) Called StatLogger.start( 0, {0: [0, 1], 1: [0, 2, 3, 4, 5], 2: [0, 1, 2], 3: [0, 1, 2, 3], 4: [0]}, 'max') Called Logger.log_message( 'optimization-test', 'INFO', ' finding random initial solution') Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called Logger.log_message( 'optimization-test', 'INFO', ' feasible initial solution found in ... secs') Called run_my_algorithm() Called StatLogger.add_repeat() Called OMatrix.compare_utilities(None, 1.0) Called OMatrix.get_status(array([0, 0, 0, 0, 0])) Called Logger.log_message( 'optimization-test', 'INFO', ' utility: 1.000\tbest so far: 1.000') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') Called Logger.log_message( 'optimization-test', 'INFO', ' finding random initial solution') Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called Logger.log_message( 'optimization-test', 'INFO', ' feasible initial solution found in ... secs') Called run_my_algorithm() Called StatLogger.add_repeat() Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_status(array([0, 0, 0, 0, 0])) Called Logger.log_message( 'optimization-test', 'INFO', ' utility: 1.000\tbest so far: 1.000') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') Called Logger.log_message( 'optimization-test', 'INFO', ' finding random initial solution') Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called Logger.log_message( 'optimization-test', 'INFO', ' feasible initial solution found in ... secs') Called run_my_algorithm() Called StatLogger.add_repeat() Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_status(array([0, 0, 0, 0, 0])) Called Logger.log_message( 'optimization-test', 'INFO', ' utility: 1.000\tbest so far: 1.000') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') Called StatLogger.stop(0) Called OMatrix.get_status(array([0, 0, 0, 0, 0])) Called Logger.log_message( 'optimization-test', 'INFO', 'Best utility out of 3 repeats: 1.000') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') Called Logger.log_message( 'optimization-test', 'INFO', 'Writing optimized treatment schedule to result database(s)...') Called SimInputDB.store_solution( [(0, 'UNIT1', 'UNIT1', 0, 1.0, False), (0, 'UNIT2', 'UNIT2', 0, 1.0, False), (0, 'UNIT3', 'UNIT3', 0, 1.0, False), (0, 'UNIT4', 'UNIT4', 0, 1.0, False), (0, 'UNIT5', 'UNIT5', 0, 1.0, False)], 'test', False) Called SimInputDB.copy_to_db( [(0, 'UNIT1', 'UNIT1', 0, 1.0, False), (0, 'UNIT2', 'UNIT2', 0, 1.0, False), (0, 'UNIT3', 'UNIT3', 0, 1.0, False), (0, 'UNIT4', 'UNIT4', 0, 1.0, False), (0, 'UNIT5', 'UNIT5', 0, 1.0, False)], 'comp_unit', , 0, skip_weight_splits=None) Called Logger.log_message( 'optimization-test', 'INFO', 'Writing completed in ... min') Called OMatrix.construct_data(1) Called StatLogger.start( 1, {0: [0, 1], 1: [0, 2, 3, 4, 5], 2: [0, 1, 2], 3: [0, 1, 2, 3], 4: [0]}, 'max') Called Logger.log_message( 'optimization-test', 'INFO', ' finding random initial solution') Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called Logger.log_message( 'optimization-test', 'INFO', ' feasible initial solution found in ... secs') Called run_my_algorithm() Called StatLogger.add_repeat() Called OMatrix.compare_utilities(None, 1.0) Called OMatrix.get_status(array([0, 0, 0, 0, 0])) Called Logger.log_message( 'optimization-test', 'INFO', ' utility: 1.000\tbest so far: 1.000') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') Called Logger.log_message( 'optimization-test', 'INFO', ' finding random initial solution') Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called Logger.log_message( 'optimization-test', 'INFO', ' feasible initial solution found in ... secs') Called run_my_algorithm() Called StatLogger.add_repeat() Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_status(array([0, 0, 0, 0, 0])) Called Logger.log_message( 'optimization-test', 'INFO', ' utility: 1.000\tbest so far: 1.000') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') Called Logger.log_message( 'optimization-test', 'INFO', ' finding random initial solution') Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_random_solution() Called OMatrix.solution_feasibility(array([0, 0, 0, 0, 0]), first_seek=True) Called OMatrix.solution_utility(array([0, 0, 0, 0, 0])) Called OMatrix.compare_utilities(1.0, 1.0) Called Logger.log_message( 'optimization-test', 'INFO', ' feasible initial solution found in ... secs') Called run_my_algorithm() Called StatLogger.add_repeat() Called OMatrix.compare_utilities(1.0, 1.0) Called OMatrix.get_status(array([0, 0, 0, 0, 0])) Called Logger.log_message( 'optimization-test', 'INFO', ' utility: 1.000\tbest so far: 1.000') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') Called StatLogger.stop(1) Called OMatrix.get_status(array([0, 0, 0, 0, 0])) Called Logger.log_message( 'optimization-test', 'INFO', 'Best utility out of 3 repeats: 1.000') Called Logger.log_message( 'optimization-test', 'INFO', ' 1. subobjective: 12345.679') Called Logger.log_message( 'optimization-test', 'INFO', ' 2. subobjective: 234.568') Called Logger.log_message( 'optimization-test', 'INFO', 'Writing optimized treatment schedule to result database(s)...') Called SimInputDB.store_solution( [(1, 'UNIT1', 'UNIT1', 0, 1.0, False), (1, 'UNIT2', 'UNIT2', 0, 1.0, False), (1, 'UNIT3', 'UNIT3', 0, 1.0, False), (1, 'UNIT4', 'UNIT4', 0, 1.0, False), (1, 'UNIT5', 'UNIT5', 0, 1.0, False)], 'test', False) Called SimInputDB.copy_to_db( [(1, 'UNIT1', 'UNIT1', 0, 1.0, False), (1, 'UNIT2', 'UNIT2', 0, 1.0, False), (1, 'UNIT3', 'UNIT3', 0, 1.0, False), (1, 'UNIT4', 'UNIT4', 0, 1.0, False), (1, 'UNIT5', 'UNIT5', 0, 1.0, False)], 'comp_unit', , 1, skip_weight_splits=None) Called Logger.log_message( 'optimization-test', 'INFO', 'Writing completed in ... min')