"""
该求解器算法参照 Fred Glover, Gary A. Kochenberger, Bahram Alidaee, (1998)
Adaptive Memory Tabu Search for Binary Quadratic Programs. Management Science 44(3):336-345. 修改
部分术语
constructive phase: 在此相位中,用基于贪心算法的禁忌搜索将-1的spin翻成+1
destructive phase: 在此相位中,用基于贪心算法的禁忌搜索将+1的spin翻成-1
critical event: 如果下一次翻转造成能量上升,则本次操作为critical event
span: 控制spin在critical event后继续翻转的参数
recency tabu: 避免最近刚翻转过的spin被再次翻转
frequency tabu: 避免搜索在critical events中出现频率最高的配置
"""
import math
import numpy as np
from kaiwu.core import IsingSolver, QuboSolver, get_sorted_solutions
from kaiwu.common import HeapUniquePool
from kaiwu.license import track_data
[文档]
class TabuSearchOptimizer(IsingSolver, QuboSolver):
"""求解Ising模型的禁忌搜索求解器.
Args:
max_iter (int): 最大迭代次数
recency_size (int): recency禁忌表的大小。如输入为空,则使用矩阵边长的1/10向上取整。
kmax (int): 模型参数变量k的最大值。默认值为3。
span_control_p1 (int): 影响span变化的参数p1.默认值为3。
span_control_p2 (int): 影响span变化的参数p2.默认值为7。
size_limit (int): 维护解集的大小
Examples:
>>> import numpy as np
>>> import kaiwu as kw
>>> matrix = -np.array([[ 0. , 1. , 0. , 1. , 1. ],
... [ 1. , 0. , 0. , 1., 1. ],
... [ 0. , 0. , 0. , 1., 1. ],
... [ 1. , 1., 1. , 0. , 1. ],
... [ 1. , 1., 1. , 1. , 0. ]])
>>> worker = kw.classical.TabuSearchOptimizer(10, size_limit=1)
>>> worker.solve(matrix) # doctest: +SKIP
array([[ 1, 1, 1, -1, -1]])
"""
def __init__(
self,
max_iter,
recency_size=None,
kmax=3,
span_control_p1=3,
span_control_p2=7,
size_limit=1,
):
super().__init__()
self.mat = None
# 原mat矩阵。H = - x mat x.T
self.nodiag_mat = None
# 化为对称矩阵,取负,在neg_mat的基础上将对角线换为0
self.size = None
# 变量个数
self.x_conf = None
# 变量初值值,若为空则随机生成
self.recency_size = recency_size
# tabu的recency记忆大小,若不填,则自动为size/10
self.penalty_recency = None
# recency tabu的系数,取mat矩阵绝对值最大的元素
self.recency = None
# recency tabu
self.frequency = None
# frequency tabu
self.best_solution = None
# 当前最好的spin配置
self.best_hamilton = None
# 当前最好的能量
self.size_limit = size_limit
self.solution_pool = None
self.span = 1
self.k = 1
# k的初始值。k为每个constructive或destructive中受到tabu影响的翻转spin个数
self.kmax = kmax # k的最大值
self.span_control_p1 = span_control_p1 # 控制span变化的参数p1
self.span_control_p2 = span_control_p2 # 控制span变化的参数p2
self.iter_span = 0 # 控制span变化的参数iter_span
self.direction = "increase" # span变化的方向
self.max_iter = max_iter
self.ct_var = 0 # 本phase翻转spin的个数
self.ct_span = 0 # 本phase最少翻转spin的个数
self.last_t = [] # 记录过去t个critical event的配置
self.tabu_value = 0
self.iterations = 0 # 已运行的大循环的次数
[文档]
def set_matrix(self, ising_matrix):
"""设置矩阵并更新相关内容"""
self.mat = ising_matrix
self.nodiag_mat = -(self.mat + self.mat.T) / 2
# 化为对称矩阵,取负,在neg_mat的基础上将对角线换为0
np.fill_diagonal(self.nodiag_mat, 0)
self.size = self.mat.shape[0] # 变量个数
self.solution_pool = HeapUniquePool(
self.mat, self.mat.shape[0], self.size_limit
)
self.recency_size = (
math.ceil(self.size / 10)
if self.recency_size is None
else self.recency_size
)
# tabu的recency记忆大小,若不填,则自动为size/10
self.penalty_recency = np.abs(self.mat).max()
# recency tabu的系数,取mat矩阵绝对值最大的元素
self.recency = np.zeros(self.size) # recency tabu
self.frequency = np.zeros(self.size) # frequency tabu
[文档]
def init_solution(self, solution):
"""初始化解向量
Args:
solution (np.ndarray): 初始解向量
"""
# 设置变量初值
self.x_conf = (
2 * (np.random.randn(self.size) > 0) - 1 if solution is None else solution
)
self.best_solution = self.x_conf.copy() # 当前最好的spin配置
self.best_hamilton = -(self.best_solution.dot(self.mat)).dot(self.best_solution)
# 当前最好的能量
def _sampler(self, solution_vector):
"""对解向量采样"""
val = -(solution_vector.dot(self.mat)).dot(solution_vector)
return val
@property
def _x_val(self):
"""对当前变量配置采样"""
return self._sampler(self.x_conf)
@property
def _penalty_frequency(self):
"""frequency tabu的系数,随iterations变化"""
return self.penalty_recency / 1000 * self.iterations
def _update_recency(self, x_last):
"""更新recency tabu。值为过去t个critical event变量配置之和"""
if len(self.last_t) <= self.recency_size - 1:
self.last_t.append(x_last)
self.recency += x_last
else:
self.recency += x_last - self.last_t.pop(0)
self.last_t.append(x_last)
def _update_frequency(self, x_last):
"""更新frequency tabu。值为所有critical event累加"""
self.frequency += x_last
def _update_tabu(self):
"""更新两种tabu"""
self._update_recency(self.x_conf)
self._update_frequency(self.x_conf)
self.tabu_value = (
self.penalty_recency * self.recency
+ self._penalty_frequency * self.frequency
)
def _net_increase(self):
"""计算当前x_conf下,独立翻转每个spin对能量造成的净增长。"""
return -4 * np.sum(np.outer(self.x_conf, self.x_conf) * self.nodiag_mat, axis=1)
def _j_star(self, val):
"""寻找从-val翻转为val后,能量评价下降最多的index。"""
evaluation = self._net_increase() # 独立翻转每个spin对能量造成的净增长
if self.ct_var <= self.k: # 如果ct_var <= k,要在净增长中添加tabu的惩罚
evaluation += val * self.tabu_value
masked_eval = np.ma.array(
evaluation, mask=self.x_conf == val
) # 将x_conf == val的项mask掉
# 找到最小的evaluation
j_eval = np.min(masked_eval)
# 如果有相等的值,随机选取index
j_star = np.random.choice(np.argwhere(masked_eval == j_eval).flatten())
# 计算j_star翻转后能量的净增长
j_inc = (
-4 * self.x_conf[j_star] * np.inner(self.x_conf, self.nodiag_mat[j_star, :])
)
return j_star, j_inc
def _count(self, val):
"""计算当前值为val的spin的个数"""
return np.count_nonzero(self.x_conf == val)
def _constructive(self):
"""constructive phase"""
self.ct_var = 0 # 初始化翻转个数计数
while self._count(-1) > 0: # 当spin不全为1时
j_star, j_inc = self._j_star(1) # 在值为-1的spin中取evaluation最小的index
if j_inc < 0: # 如果该index造成的净增加小于0,翻转为+1
self.x_conf[j_star] = 1
self.ct_var += 1 # 更新翻转数量
else: # 如果无法使能量减小了,则遇到了critical event
hamilton = self._x_val
if hamilton < self.best_hamilton: # 与当前最好能量比较,如更好,则更新
self.best_solution = self.x_conf.copy()
self.best_hamilton = hamilton
self.solution_pool.push(self.x_conf, hamilton)
self._update_tabu() # 更新此critical event到tabu
break
while (
self.ct_span <= self.span and self._count(-1) > 0
): # 如果ct_span个数没有到span,则继续翻转直到到达span为止
self.ct_span += 1
j_star, j_inc = self._j_star(1)
self.x_conf[j_star] = 1
def _destructive(self):
"""constructive phase,与constructive phase类似,只是从+1翻转为-1"""
self.ct_var = 0 # 初始化翻转个数计数
while self._count(1) > 0: # 当spin不全为-1时
j_star, j_inc = self._j_star(-1) # 在值为+1的spin中取evaluation最小的index
if j_inc < 0: # 如果该index造成的净增加小于0,翻转为-1
self.x_conf[j_star] = -1
self.ct_var += 1 # 更新翻转数量
else: # 如果无法使能量减小了,则遇到了critical event
hamilton = self._x_val
if hamilton < self.best_hamilton: # 与当前最好能量比较,如更好,则更新
self.best_solution = self.x_conf.copy()
self.best_hamilton = hamilton
self.solution_pool.push(self.x_conf, hamilton)
self._update_tabu() # 更新此critical event到tabu
break
while (
self.ct_span <= self.span and self._count(1) > 0
): # 如果ct_span个数没有到span,则继续翻转直到到达span为止
self.ct_span += 1
j_star, j_inc = self._j_star(-1)
self.x_conf[j_star] = -1
def _transfer(self):
"""用于更新span参数"""
self.iter_span += 1
if self.direction == "increase":
if self.span <= self.span_control_p1:
if self.iter_span > self.span_control_p2 * self.span:
self.span += 1
self.iter_span = 0
else:
if self.iter_span > 4:
self.span += 1
self.iter_span = 0
if self.span > self.span_control_p2:
self.span = self.span_control_p2
self.direction = "decrease"
else:
if self.span_control_p1 + 1 <= self.span <= self.span_control_p2:
if self.iter_span > 4:
self.span -= 1
self.iter_span = 0
else:
if self.iter_span > self.span_control_p2 * self.span:
self.span -= 1
self.iter_span = 0
if self.span == 0:
self.span = 1
self.direction = "increase"
@track_data()
def _solve(self, ising_matrix=None, solution=None):
"""求解Ising矩阵
Args:
ising_matrix (np.ndarray, optional): Ising矩阵. 默认为None.
solution (np.ndarray, optional): 初始解向量. 默认为None.
Returns:
np.ndarray: 解向量
"""
if ising_matrix is not None:
self.set_matrix(ising_matrix)
self.init_solution(solution)
elif solution is not None:
self.init_solution(solution)
while self.iterations < self.max_iter: # 大循环
self.iterations += 1
self._constructive()
self._transfer()
self.ct_span = 0
self._destructive()
self._transfer()
self.ct_span += 1
if (
self.iterations % (2 * self.recency_size) == 0
): # 每过2t次循环,更新一次k值
if self.k == self.kmax: # 当k到达最大值时,重置
self.k = 0
self.k += 1
# clear counter before return
self.iterations = 0
return self.solution_pool.get_solutions()
[文档]
def solve(
self, ising_matrix=None, negtail_flip=True, sort_solutions=False, solution=None
):
"""求解接口,调用_solve方法
Args:
ising_matrix (np.ndarray, optional): Ising矩阵. 默认为None.
negtail_flip (bool): 是否进行负尾翻转
sort_solutions (bool): 是否对解进行排序
solution (np.ndarray, optional): 初始解向量. 默认为None.
Returns:
np.ndarray: 解向量
"""
self.set_matrix(ising_matrix)
solutions = self._solve(self.matrix, solution=solution)
solutions, self._hamiltonian = get_sorted_solutions(
self.matrix, solutions, 0, negtail_flip, sort_solutions
)
return solutions
if __name__ == "__main__":
import doctest
doctest.testmod()