kaiwu.common._loop_controller 源代码

# -*- coding: utf-8 -*-
"""
循环控制器等一些辅助求解工具
"""
import time
import math
from kaiwu.common._json_serializable_mixin import JsonSerializableMixin


class BaseTimer:
    """计时器"""

    def __init__(self):
        self.prev_time = time.time()
        self.cpu_time = 0
        self.subqubo_time = 0

    def add_to_cpu_time(self):
        """把计时加到cpu时间并更新prev_time"""
        cur_time = time.time()
        self.cpu_time += cur_time - self.prev_time
        self.prev_time = cur_time

    def reset(self):
        """重置时间"""
        self.prev_time = time.time()
        self.cpu_time = 0
        self.subqubo_time = 0


[文档] class BaseLoopController(JsonSerializableMixin): """循环控制器,并计算时间。用于调试和测试算法 Args: max_repeat_step: 最大步数,默认值为math.inf target_objective:目标优化函数,达到即停止,默认值为-math.inf no_improve_limit:收敛条件,指定更新次数没有改进则停止,默认值为math.inf iterate_per_update:每次更新哈密顿量前运行的次数,默认值为5 """ def __init__( self, max_repeat_step=math.inf, target_objective=-math.inf, no_improve_limit=math.inf, iterate_per_update=5, ): # 判断步数 self.repeat_step = 0 self.max_repeat_step = max_repeat_step # 判断目标函数 self.prev_objective = math.inf self.target_objective = target_objective # 判断收敛 self.pass_count = 0 self.no_improve_limit = no_improve_limit self.iterate_step = 0 self.iterate_per_update = iterate_per_update self.unsatisfied_constraints_count = None if all( v == math.inf for v in ( self.max_repeat_step, self.target_objective, self.no_improve_limit, ) ): raise ValueError( "At least one termination condition must be set " "(max_repeat_step, target_objective, or no_improve_limit)" ) self.timer = BaseTimer()
[文档] def update_status(self, objective, unsatisfied_constraints_count=None): """在计算子问题后更新状态 Args: objective (float): 目标函数值 unsatisfied_constraints_count (int): 未满足约束项的数量 """ self.unsatisfied_constraints_count = unsatisfied_constraints_count if objective is not None and objective < self.prev_objective: self.prev_objective = objective self.pass_count = 0 else: self.pass_count += 1 self.repeat_step += 1
[文档] def is_finished(self): """判断是否应该停止 Returns: bool: 是否应该停止 """ finish = False if self.prev_objective <= self.target_objective and ( self.unsatisfied_constraints_count is None or self.unsatisfied_constraints_count == 0 ): finish = True if self.pass_count >= self.no_improve_limit: finish = True if self.repeat_step >= self.max_repeat_step: finish = True return finish
[文档] def restart(self): """重新初始化计数""" self.repeat_step = 0 self.prev_objective = math.inf self.pass_count = 0 self.timer.reset()
[文档] def to_json_dict(self, exclude_fields=("timer",)): """转化为json字典 Returns: dict: json字典 """ return super().to_json_dict(exclude_fields=exclude_fields)
[文档] class OptimizerLoopController(BaseLoopController): """Optimizer循环控制器,并计算时间。用于调试和测试算法 Args: max_repeat_step: 最大步数,默认值为math.inf target_objective:目标优化函数,达到即停止,默认值为-math.inf no_improve_limit:收敛条件,指定更新次数没有改进则停止,默认值为20000 iterate_per_update:每次更新哈密顿量前运行的次数,默认值为5 """ def __init__( self, max_repeat_step=math.inf, target_objective=-math.inf, no_improve_limit=20000, iterate_per_update=5, ): # 判断步数 super().__init__( max_repeat_step, target_objective, no_improve_limit, iterate_per_update )
[文档] class SolverLoopController(BaseLoopController, JsonSerializableMixin): """Solver循环控制器,并计算时间。用于调试和测试算法 Args: max_repeat_step (int): 最大步数,默认值为math.inf target_objective (float): 目标优化函数,达到即停止,默认值为-math.inf no_improve_limit (int): 收敛条件,指定更新次数没有改进则停止,默认值为20000 iterate_per_update (int): 每次更新哈密顿量前运行的次数,默认值为5 stop_after_feasible_count (int): 找到指定数量的可行解后停止 """ def __init__( self, max_repeat_step=math.inf, target_objective=-math.inf, no_improve_limit=math.inf, iterate_per_update=5, stop_after_feasible_count=None, ): # 判断步数 super().__init__( max_repeat_step, target_objective, no_improve_limit, iterate_per_update ) self.stop_after_feasible_count = stop_after_feasible_count self.feasible_count = 0
[文档] def update_status(self, objective, unsatisfied_constraints_count=None): """在计算子问题后更新状态 Args: objective (float): 目标函数值 unsatisfied_constraints_count (int): 未满足约束项的数量 """ super().update_status(objective, unsatisfied_constraints_count) if unsatisfied_constraints_count == 0: self.feasible_count += 1
[文档] def is_finished(self): """判断是否应该停止 Returns: bool: 是否应该停止 """ return super().is_finished() or ( self.stop_after_feasible_count is not None and self.feasible_count == self.stop_after_feasible_count )