kaiwu.classical._tabu_search 源代码

"""
该求解器算法参照 Fred Glover, Gary A. Kochenberger, Bahram Alidaee, (1998)
Adaptive Memory Tabu Search for Binary Quadratic Programs. Management Science 44(3):336-345. 修改

部分术语
constructive phase: 在此相位中,用基于贪心算法的禁忌搜索将-1的spin翻成+1
destructive phase: 在此相位中,用基于贪心算法的禁忌搜索将+1的spin翻成-1
critical event: 如果下一次翻转造成能量上升,则本次操作为critical event
span: 控制spin在critical event后继续翻转的参数
recency tabu: 避免最近刚翻转过的spin被再次翻转
frequency tabu: 避免搜索在critical events中出现频率最高的配置
"""

import math
import numpy as np
from kaiwu.core import IsingSolver, QuboSolver, get_sorted_solutions
from kaiwu.common import HeapUniquePool
from kaiwu.license import track_data


[文档] class TabuSearchOptimizer(IsingSolver, QuboSolver): """求解Ising模型的禁忌搜索求解器. Args: max_iter (int): 最大迭代次数 recency_size (int): recency禁忌表的大小。如输入为空,则使用矩阵边长的1/10向上取整。 kmax (int): 模型参数变量k的最大值。默认值为3。 span_control_p1 (int): 影响span变化的参数p1.默认值为3。 span_control_p2 (int): 影响span变化的参数p2.默认值为7。 size_limit (int): 维护解集的大小 Examples: >>> import numpy as np >>> import kaiwu as kw >>> matrix = -np.array([[ 0. , 1. , 0. , 1. , 1. ], ... [ 1. , 0. , 0. , 1., 1. ], ... [ 0. , 0. , 0. , 1., 1. ], ... [ 1. , 1., 1. , 0. , 1. ], ... [ 1. , 1., 1. , 1. , 0. ]]) >>> worker = kw.classical.TabuSearchOptimizer(10, size_limit=1) >>> worker.solve(matrix) # doctest: +SKIP array([[ 1, 1, 1, -1, -1]]) """ def __init__( self, max_iter, recency_size=None, kmax=3, span_control_p1=3, span_control_p2=7, size_limit=1, ): super().__init__() self.mat = None # 原mat矩阵。H = - x mat x.T self.nodiag_mat = None # 化为对称矩阵,取负,在neg_mat的基础上将对角线换为0 self.size = None # 变量个数 self.x_conf = None # 变量初值值,若为空则随机生成 self.recency_size = recency_size # tabu的recency记忆大小,若不填,则自动为size/10 self.penalty_recency = None # recency tabu的系数,取mat矩阵绝对值最大的元素 self.recency = None # recency tabu self.frequency = None # frequency tabu self.best_solution = None # 当前最好的spin配置 self.best_hamilton = None # 当前最好的能量 self.size_limit = size_limit self.solution_pool = None self.span = 1 self.k = 1 # k的初始值。k为每个constructive或destructive中受到tabu影响的翻转spin个数 self.kmax = kmax # k的最大值 self.span_control_p1 = span_control_p1 # 控制span变化的参数p1 self.span_control_p2 = span_control_p2 # 控制span变化的参数p2 self.iter_span = 0 # 控制span变化的参数iter_span self.direction = "increase" # span变化的方向 self.max_iter = max_iter self.ct_var = 0 # 本phase翻转spin的个数 self.ct_span = 0 # 本phase最少翻转spin的个数 self.last_t = [] # 记录过去t个critical event的配置 self.tabu_value = 0 self.iterations = 0 # 已运行的大循环的次数
[文档] def set_matrix(self, ising_matrix): """设置矩阵并更新相关内容""" self.mat = ising_matrix self.nodiag_mat = -(self.mat + self.mat.T) / 2 # 化为对称矩阵,取负,在neg_mat的基础上将对角线换为0 np.fill_diagonal(self.nodiag_mat, 0) self.size = self.mat.shape[0] # 变量个数 self.solution_pool = HeapUniquePool( self.mat, self.mat.shape[0], self.size_limit ) self.recency_size = ( math.ceil(self.size / 10) if self.recency_size is None else self.recency_size ) # tabu的recency记忆大小,若不填,则自动为size/10 self.penalty_recency = np.abs(self.mat).max() # recency tabu的系数,取mat矩阵绝对值最大的元素 self.recency = np.zeros(self.size) # recency tabu self.frequency = np.zeros(self.size) # frequency tabu
[文档] def init_solution(self, solution): """初始化解向量 Args: solution (np.ndarray): 初始解向量 """ # 设置变量初值 self.x_conf = ( 2 * (np.random.randn(self.size) > 0) - 1 if solution is None else solution ) self.best_solution = self.x_conf.copy() # 当前最好的spin配置 self.best_hamilton = -(self.best_solution.dot(self.mat)).dot(self.best_solution)
# 当前最好的能量 def _sampler(self, solution_vector): """对解向量采样""" val = -(solution_vector.dot(self.mat)).dot(solution_vector) return val @property def _x_val(self): """对当前变量配置采样""" return self._sampler(self.x_conf) @property def _penalty_frequency(self): """frequency tabu的系数,随iterations变化""" return self.penalty_recency / 1000 * self.iterations def _update_recency(self, x_last): """更新recency tabu。值为过去t个critical event变量配置之和""" if len(self.last_t) <= self.recency_size - 1: self.last_t.append(x_last) self.recency += x_last else: self.recency += x_last - self.last_t.pop(0) self.last_t.append(x_last) def _update_frequency(self, x_last): """更新frequency tabu。值为所有critical event累加""" self.frequency += x_last def _update_tabu(self): """更新两种tabu""" self._update_recency(self.x_conf) self._update_frequency(self.x_conf) self.tabu_value = ( self.penalty_recency * self.recency + self._penalty_frequency * self.frequency ) def _net_increase(self): """计算当前x_conf下,独立翻转每个spin对能量造成的净增长。""" return -4 * np.sum(np.outer(self.x_conf, self.x_conf) * self.nodiag_mat, axis=1) def _j_star(self, val): """寻找从-val翻转为val后,能量评价下降最多的index。""" evaluation = self._net_increase() # 独立翻转每个spin对能量造成的净增长 if self.ct_var <= self.k: # 如果ct_var <= k,要在净增长中添加tabu的惩罚 evaluation += val * self.tabu_value masked_eval = np.ma.array( evaluation, mask=self.x_conf == val ) # 将x_conf == val的项mask掉 # 找到最小的evaluation j_eval = np.min(masked_eval) # 如果有相等的值,随机选取index j_star = np.random.choice(np.argwhere(masked_eval == j_eval).flatten()) # 计算j_star翻转后能量的净增长 j_inc = ( -4 * self.x_conf[j_star] * np.inner(self.x_conf, self.nodiag_mat[j_star, :]) ) return j_star, j_inc def _count(self, val): """计算当前值为val的spin的个数""" return np.count_nonzero(self.x_conf == val) def _constructive(self): """constructive phase""" self.ct_var = 0 # 初始化翻转个数计数 while self._count(-1) > 0: # 当spin不全为1时 j_star, j_inc = self._j_star(1) # 在值为-1的spin中取evaluation最小的index if j_inc < 0: # 如果该index造成的净增加小于0,翻转为+1 self.x_conf[j_star] = 1 self.ct_var += 1 # 更新翻转数量 else: # 如果无法使能量减小了,则遇到了critical event hamilton = self._x_val if hamilton < self.best_hamilton: # 与当前最好能量比较,如更好,则更新 self.best_solution = self.x_conf.copy() self.best_hamilton = hamilton self.solution_pool.push(self.x_conf, hamilton) self._update_tabu() # 更新此critical event到tabu break while ( self.ct_span <= self.span and self._count(-1) > 0 ): # 如果ct_span个数没有到span,则继续翻转直到到达span为止 self.ct_span += 1 j_star, j_inc = self._j_star(1) self.x_conf[j_star] = 1 def _destructive(self): """constructive phase,与constructive phase类似,只是从+1翻转为-1""" self.ct_var = 0 # 初始化翻转个数计数 while self._count(1) > 0: # 当spin不全为-1时 j_star, j_inc = self._j_star(-1) # 在值为+1的spin中取evaluation最小的index if j_inc < 0: # 如果该index造成的净增加小于0,翻转为-1 self.x_conf[j_star] = -1 self.ct_var += 1 # 更新翻转数量 else: # 如果无法使能量减小了,则遇到了critical event hamilton = self._x_val if hamilton < self.best_hamilton: # 与当前最好能量比较,如更好,则更新 self.best_solution = self.x_conf.copy() self.best_hamilton = hamilton self.solution_pool.push(self.x_conf, hamilton) self._update_tabu() # 更新此critical event到tabu break while ( self.ct_span <= self.span and self._count(1) > 0 ): # 如果ct_span个数没有到span,则继续翻转直到到达span为止 self.ct_span += 1 j_star, j_inc = self._j_star(-1) self.x_conf[j_star] = -1 def _transfer(self): """用于更新span参数""" self.iter_span += 1 if self.direction == "increase": if self.span <= self.span_control_p1: if self.iter_span > self.span_control_p2 * self.span: self.span += 1 self.iter_span = 0 else: if self.iter_span > 4: self.span += 1 self.iter_span = 0 if self.span > self.span_control_p2: self.span = self.span_control_p2 self.direction = "decrease" else: if self.span_control_p1 + 1 <= self.span <= self.span_control_p2: if self.iter_span > 4: self.span -= 1 self.iter_span = 0 else: if self.iter_span > self.span_control_p2 * self.span: self.span -= 1 self.iter_span = 0 if self.span == 0: self.span = 1 self.direction = "increase" @track_data() def _solve(self, ising_matrix=None, solution=None): """求解Ising矩阵 Args: ising_matrix (np.ndarray, optional): Ising矩阵. 默认为None. solution (np.ndarray, optional): 初始解向量. 默认为None. Returns: np.ndarray: 解向量 """ if ising_matrix is not None: self.set_matrix(ising_matrix) self.init_solution(solution) elif solution is not None: self.init_solution(solution) while self.iterations < self.max_iter: # 大循环 self.iterations += 1 self._constructive() self._transfer() self.ct_span = 0 self._destructive() self._transfer() self.ct_span += 1 if ( self.iterations % (2 * self.recency_size) == 0 ): # 每过2t次循环,更新一次k值 if self.k == self.kmax: # 当k到达最大值时,重置 self.k = 0 self.k += 1 # clear counter before return self.iterations = 0 return self.solution_pool.get_solutions()
[文档] def solve( self, ising_matrix=None, negtail_flip=True, sort_solutions=False, solution=None ): """求解接口,调用_solve方法 Args: ising_matrix (np.ndarray, optional): Ising矩阵. 默认为None. negtail_flip (bool): 是否进行负尾翻转 sort_solutions (bool): 是否对解进行排序 solution (np.ndarray, optional): 初始解向量. 默认为None. Returns: np.ndarray: 解向量 """ self.set_matrix(ising_matrix) solutions = self._solve(self.matrix, solution=solution) solutions, self._hamiltonian = get_sorted_solutions( self.matrix, solutions, 0, negtail_flip, sort_solutions ) return solutions
if __name__ == "__main__": import doctest doctest.testmod()