# encoding=utf8
"""
PenaltyMethodOptimizer
"""
import logging
from kaiwu.common import JsonSerializableMixin, CheckpointManager as ckpt
from kaiwu.core import get_val, QuboSolver, PenaltyMethodConstraint
logger = logging.getLogger(__name__)
class SolutionResult(JsonSerializableMixin):
"""求解结果定义
Args:
solution_dict (dict): 解字典,{变量名:变量值}
objective (float): 目标函数值
"""
def __init__(self, solution_dict: dict, objective: float):
self.solution_dict = solution_dict
self.objective = objective
def to_json_dict(self, exclude_fields=()):
"""转化为json字典
Returns:
dict: json字典
"""
return {"solution_dict": self.solution_dict, "objective": self.objective}
def _update_model(sol_dict, qubo_model):
# 计算违反约束项的个数,并更新惩罚系数
all_hard_constraints_value = 0
hard_constraints_fails = soft_constraints_fails = 0
for _, constr_info in qubo_model.hard_constraints_made.items():
# 不满足约束时增加惩罚系数
if not constr_info.is_satisfied(sol_dict):
qubo_model.made = False
constr_info.penalize_more()
all_hard_constraints_value += constr_info.current_value
hard_constraints_fails += 1
for _, constr_info in qubo_model.soft_constraints_made.items():
constr_val = float(get_val(constr_info.constraint_expr, sol_dict))
# 不满足约束时增加惩罚系数
if not constr_info.is_satisfied(sol_dict):
soft_constraints_fails += 1
if constr_info.pre_constr_val < constr_val:
qubo_model.made = False
constr_info.penalize_more()
# 硬约束项都满足时再尝试降低惩罚系数
if hard_constraints_fails == 0:
for _, constr_info in qubo_model.hard_constraints_made.items():
qubo_model.made = False
constr_info.penalize_less()
if soft_constraints_fails == 0:
for _, constr_info in qubo_model.soft_constraints_made.items():
qubo_model.made = False
constr_info.penalize_less()
return hard_constraints_fails, soft_constraints_fails, all_hard_constraints_value
[文档]
class PenaltyMethodOptimizer(QuboSolver, JsonSerializableMixin):
"""罚函数法
设置 kw.common.CheckpointManager.save_dir = '/tmp' 会打开缓存开关,当前状态和符合硬约束的惩罚系数组合都会被缓存
打开缓存后 PenaltyMethodOptimizer 可以在上一次程序终止的基础上继续迭代求解, 程序运行过程中的可行解会保存对应json文件里面 results 中.
Args:
optimizer (Optimizer): 优化器
controller (SolverLoopController): 循环控制器
Examples:
>>> import kaiwu as kw
>>> import numpy as np
>>> taskNums = 20
>>> machineNums = 5
>>> duration = np.array(range(1, taskNums + 1))
>>> machine_start_time = np.array(range(1, machineNums + 1))
>>> # Building a Qubo Model
>>> qubo_model = kw.core.QuboModel()
>>> X = kw.core.ndarray([taskNums, machineNums], "X", kw.core.Binary)
>>> J_mean = (np.sum(duration) + np.sum(machine_start_time)) / machineNums
>>> J = [machine_start_time[i] + duration.dot(X[:, i]) for i in range(machineNums)]
>>> # Set objective function
>>> qubo_model.set_objective(kw.core.quicksum([(J[i] - J_mean) ** 2 for i in range(machineNums)]) / machineNums)
>>> # Set constraint
>>> for j in range(taskNums):
... qubo_model.add_constraint(kw.core.quicksum([X[j][i] for i in range(machineNums)]) == 1,
... f"c{j}", penalty=45)
>>> # Loop control
>>> controller = kw.common.SolverLoopController(max_repeat_step=5)
>>> # optimizer
>>> optimizer = kw.classical.SimulatedAnnealingOptimizer(initial_temperature=1e8,
... alpha=0.9,
... cutoff_temperature=0.01,
... iterations_per_t=200,
... size_limit=100)
>>> solver = kw.hybrid.PenaltyMethodOptimizer(optimizer, controller)
>>> sol_dict, hmt = solver.solve_qubo(qubo_model)
>>> J_end = np.zeros(machineNums)
>>> for i in range(machineNums):
... ifturn_temp = kw.core.get_val(kw.core.quicksum(X[:, i].tolist()), sol_dict)
... ifturn = 1 if ifturn_temp > 0 else 0
... J_temp = duration.dot(X[:, i]) + machine_start_time[i] * ifturn
... J_end[i] = kw.core.get_val(J_temp, sol_dict)
>>> print("duration array: ", duration) # doctest: +SKIP
duration array: [ 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20]
>>> print("Machine startup time: ", machine_start_time) # doctest: +SKIP
Machine startup time: [1 2 3 4 5]
>>> print("End time of all machines:", J_end) # doctest: +SKIP
End time of all machines: [43. 43. 48. 45. 46.]
>>> print('final result:{}'.format(np.max(J_end))) # doctest: +SKIP
final result:48.0
>>> print('variance:', np.var(J_end)) # doctest: +SKIP
variance: 3.6
>>> kw.common.CheckpointManager.save_dir = None
"""
def __init__(self, optimizer, controller):
super().__init__()
self._optimizer = optimizer
self._controller = controller
self._results = []
self._pre_penalty_dict = {}
self._pre_constr_val_dict = {}
def _save_obj(self):
"""保存当前实例对象"""
ckpt.dump(self)
def solve_qubo(self, qubo_model):
"""求解QUBO模型,返回目标值最低的解(因惩罚系数改变目标值最低的解不一定是最优解)
Args:
qubo_model (QuboModel): QUBO模型
Returns:
tuple: QUBO模型解的字典、目标值
"""
ans = self.solve_qubo_multi_results(qubo_model)
if not ans or ans[0] is None:
return None, None
return ans[0].solution_dict, ans[0].objective
[文档]
def solve_qubo_multi_results(self, qubo_model, size_limit=10):
"""求解QUBO模型, 并返回多个解
Args:
qubo_model (QuboModel): QUBO模型
size_limit (int): 返回解得数量
Returns:
list: QUBO模型解SolutionResult的字典、目标值
"""
self.load_json_dict(ckpt.load(self))
qubo_model.set_constraint_handler(PenaltyMethodConstraint)
qubo_model.compile_constraints()
qubo_model.initialize_penalties()
while not self._controller.is_finished():
_sol_dict, _qubo_value = self._optimizer.solve_qubo(qubo_model)
if _qubo_value is None: # CIMOptimizer中断
return None
(
hard_constraints_violations,
soft_constraints_violations,
all_hard_constraints_value,
) = _update_model(_sol_dict, qubo_model)
_objective = _qubo_value - all_hard_constraints_value
self._controller.update_status(_objective, hard_constraints_violations)
# 打印 QUBOModel 相关内容
logger.debug(
"Number of hard constraint violations: %s, Number of soft constraint violations %s, "
"Current objective: %s",
hard_constraints_violations,
soft_constraints_violations,
_objective,
)
# 打印 controller 相关内容
logger.debug(
"Current iteration count: %s, Current number of non convergence attempts: %s",
self._controller.repeat_step,
self._controller.pass_count,
)
if hard_constraints_violations == 0:
self._results.append(SolutionResult(_sol_dict, _objective))
self._results.sort(key=lambda x: x.objective)
self._results = self._results[:size_limit]
# 每次迭代结束缓存当前对象
self._save_obj()
return self._results
[文档]
def to_json_dict(self, exclude_fields=()):
json_dict = super().to_json_dict(exclude_fields=("_optimizer", "_results"))
json_dict["_results"] = [result.to_json_dict() for result in self._results]
return json_dict
[文档]
def load_json_dict(self, json_dict):
if json_dict is None:
return
super().load_json_dict(json_dict)
new_results = []
for result in self._results:
new_result = SolutionResult(result["solution_dict"], result["objective"])
new_results.append(new_result)
self._results = new_results
if __name__ == "__main__":
import doctest
doctest.testmod()