kaiwu.hybrid._penalty_method_solver 源代码

# encoding=utf8
"""
PenaltyMethodOptimizer
"""
import logging
from kaiwu.common import JsonSerializableMixin, CheckpointManager as ckpt
from kaiwu.core import get_val, QuboSolver, PenaltyMethodConstraint

logger = logging.getLogger(__name__)


class SolutionResult(JsonSerializableMixin):
    """求解结果定义

    Args:
        solution_dict (dict): 解字典,{变量名:变量值}

        objective (float): 目标函数值
    """

    def __init__(self, solution_dict: dict, objective: float):
        self.solution_dict = solution_dict
        self.objective = objective

    def to_json_dict(self, exclude_fields=()):
        """转化为json字典

        Returns:
            dict: json字典
        """
        return {"solution_dict": self.solution_dict, "objective": self.objective}


def _update_model(sol_dict, qubo_model):
    # 计算违反约束项的个数,并更新惩罚系数
    all_hard_constraints_value = 0
    hard_constraints_fails = soft_constraints_fails = 0

    for _, constr_info in qubo_model.hard_constraints_made.items():
        # 不满足约束时增加惩罚系数
        if not constr_info.is_satisfied(sol_dict):
            qubo_model.made = False
            constr_info.penalize_more()
            all_hard_constraints_value += constr_info.current_value
            hard_constraints_fails += 1

    for _, constr_info in qubo_model.soft_constraints_made.items():
        constr_val = float(get_val(constr_info.constraint_expr, sol_dict))
        # 不满足约束时增加惩罚系数
        if not constr_info.is_satisfied(sol_dict):
            soft_constraints_fails += 1
            if constr_info.pre_constr_val < constr_val:
                qubo_model.made = False
                constr_info.penalize_more()

    # 硬约束项都满足时再尝试降低惩罚系数
    if hard_constraints_fails == 0:
        for _, constr_info in qubo_model.hard_constraints_made.items():
            qubo_model.made = False
            constr_info.penalize_less()
        if soft_constraints_fails == 0:
            for _, constr_info in qubo_model.soft_constraints_made.items():
                qubo_model.made = False
                constr_info.penalize_less()

    return hard_constraints_fails, soft_constraints_fails, all_hard_constraints_value


[文档] class PenaltyMethodOptimizer(QuboSolver, JsonSerializableMixin): """罚函数法 设置 kw.common.CheckpointManager.save_dir = '/tmp' 会打开缓存开关,当前状态和符合硬约束的惩罚系数组合都会被缓存 打开缓存后 PenaltyMethodOptimizer 可以在上一次程序终止的基础上继续迭代求解, 程序运行过程中的可行解会保存对应json文件里面 results 中. Args: optimizer (Optimizer): 优化器 controller (SolverLoopController): 循环控制器 Examples: >>> import kaiwu as kw >>> import numpy as np >>> taskNums = 20 >>> machineNums = 5 >>> duration = np.array(range(1, taskNums + 1)) >>> machine_start_time = np.array(range(1, machineNums + 1)) >>> # Building a Qubo Model >>> qubo_model = kw.core.QuboModel() >>> X = kw.core.ndarray([taskNums, machineNums], "X", kw.core.Binary) >>> J_mean = (np.sum(duration) + np.sum(machine_start_time)) / machineNums >>> J = [machine_start_time[i] + duration.dot(X[:, i]) for i in range(machineNums)] >>> # Set objective function >>> qubo_model.set_objective(kw.core.quicksum([(J[i] - J_mean) ** 2 for i in range(machineNums)]) / machineNums) >>> # Set constraint >>> for j in range(taskNums): ... qubo_model.add_constraint(kw.core.quicksum([X[j][i] for i in range(machineNums)]) == 1, ... f"c{j}", penalty=45) >>> # Loop control >>> controller = kw.common.SolverLoopController(max_repeat_step=5) >>> # optimizer >>> optimizer = kw.classical.SimulatedAnnealingOptimizer(initial_temperature=1e8, ... alpha=0.9, ... cutoff_temperature=0.01, ... iterations_per_t=200, ... size_limit=100) >>> solver = kw.hybrid.PenaltyMethodOptimizer(optimizer, controller) >>> sol_dict, hmt = solver.solve_qubo(qubo_model) >>> J_end = np.zeros(machineNums) >>> for i in range(machineNums): ... ifturn_temp = kw.core.get_val(kw.core.quicksum(X[:, i].tolist()), sol_dict) ... ifturn = 1 if ifturn_temp > 0 else 0 ... J_temp = duration.dot(X[:, i]) + machine_start_time[i] * ifturn ... J_end[i] = kw.core.get_val(J_temp, sol_dict) >>> print("duration array: ", duration) # doctest: +SKIP duration array: [ 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20] >>> print("Machine startup time: ", machine_start_time) # doctest: +SKIP Machine startup time: [1 2 3 4 5] >>> print("End time of all machines:", J_end) # doctest: +SKIP End time of all machines: [43. 43. 48. 45. 46.] >>> print('final result:{}'.format(np.max(J_end))) # doctest: +SKIP final result:48.0 >>> print('variance:', np.var(J_end)) # doctest: +SKIP variance: 3.6 >>> kw.common.CheckpointManager.save_dir = None """ def __init__(self, optimizer, controller): super().__init__() self._optimizer = optimizer self._controller = controller self._results = [] self._pre_penalty_dict = {} self._pre_constr_val_dict = {} def _save_obj(self): """保存当前实例对象""" ckpt.dump(self) def solve_qubo(self, qubo_model): """求解QUBO模型,返回目标值最低的解(因惩罚系数改变目标值最低的解不一定是最优解) Args: qubo_model (QuboModel): QUBO模型 Returns: tuple: QUBO模型解的字典、目标值 """ ans = self.solve_qubo_multi_results(qubo_model) if not ans or ans[0] is None: return None, None return ans[0].solution_dict, ans[0].objective
[文档] def solve_qubo_multi_results(self, qubo_model, size_limit=10): """求解QUBO模型, 并返回多个解 Args: qubo_model (QuboModel): QUBO模型 size_limit (int): 返回解得数量 Returns: list: QUBO模型解SolutionResult的字典、目标值 """ self.load_json_dict(ckpt.load(self)) qubo_model.set_constraint_handler(PenaltyMethodConstraint) qubo_model.compile_constraints() qubo_model.initialize_penalties() while not self._controller.is_finished(): _sol_dict, _qubo_value = self._optimizer.solve_qubo(qubo_model) if _qubo_value is None: # CIMOptimizer中断 return None ( hard_constraints_violations, soft_constraints_violations, all_hard_constraints_value, ) = _update_model(_sol_dict, qubo_model) _objective = _qubo_value - all_hard_constraints_value self._controller.update_status(_objective, hard_constraints_violations) # 打印 QUBOModel 相关内容 logger.debug( "Number of hard constraint violations: %s, Number of soft constraint violations %s, " "Current objective: %s", hard_constraints_violations, soft_constraints_violations, _objective, ) # 打印 controller 相关内容 logger.debug( "Current iteration count: %s, Current number of non convergence attempts: %s", self._controller.repeat_step, self._controller.pass_count, ) if hard_constraints_violations == 0: self._results.append(SolutionResult(_sol_dict, _objective)) self._results.sort(key=lambda x: x.objective) self._results = self._results[:size_limit] # 每次迭代结束缓存当前对象 self._save_obj() return self._results
[文档] def to_json_dict(self, exclude_fields=()): json_dict = super().to_json_dict(exclude_fields=("_optimizer", "_results")) json_dict["_results"] = [result.to_json_dict() for result in self._results] return json_dict
[文档] def load_json_dict(self, json_dict): if json_dict is None: return super().load_json_dict(json_dict) new_results = [] for result in self._results: new_result = SolutionResult(result["solution_dict"], result["objective"]) new_results.append(new_result) self._results = new_results
if __name__ == "__main__": import doctest doctest.testmod()