# -*- coding: utf-8 -*-
"""
提供模拟退火求解器
"""
import os
import math
import time
import logging
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from kaiwu.license import verify_license
from kaiwu.core import IsingSolver, QuboSolver
from kaiwu.common import JsonSerializableMixin
from kaiwu.common import HeapUniquePool
from kaiwu.license import track_data
logger = logging.getLogger(__name__)
[文档]
class SimulatedAnnealingOptimizer(IsingSolver, QuboSolver, JsonSerializableMixin):
"""求解Ising模型的模拟退火求解器(输出结果具有随机性).
Args:
initial_temperature (float): 初始温度.
alpha (float): 降温系数.
cutoff_temperature (float): 截止温度.
iterations_per_t (int): 每个温度迭代深度.
size_limit (int): 输出解的个数,默认输出100个解
flag_evolution_history (bool): 是否输出哈密顿量演化历史,默认False,当值为True时,通过get_ha_history方法获取演化历史
verbose (bool): 是否在控制台输出计算进度,默认False
rand_seed (int, optional): numpy随机数生成器的随机种子
process_num (int, optional): 并行进程数 (-1为自动调用所有可用核心,1为单进程). Defaults to 1.
Examples:
>>> import numpy as np
>>> import kaiwu as kw
>>> matrix = -np.array([[ 0. , 1. , 0. , 1. , 1. ],
... [ 1. , 0. , 0. , 1., 1. ],
... [ 0. , 0. , 0. , 1., 1. ],
... [ 1. , 1., 1. , 0. , 1. ],
... [ 1. , 1., 1. , 1. , 0. ]])
>>> worker = kw.classical.SimulatedAnnealingOptimizer(initial_temperature=100,
... alpha=0.99,
... cutoff_temperature=0.001,
... iterations_per_t=10,
... size_limit=10)
>>> # This value is random and cannot be predicted
>>> worker.solve(matrix) # doctest: +SKIP
array([[-1, 1, -1, 1, 1],
[-1, 1, 1, -1, 1],
[-1, 1, 1, -1, -1],
[ 1, -1, -1, 1, 1],
[ 1, -1, 1, -1, 1],
[ 1, -1, 1, -1, -1],
[ 1, -1, -1, -1, 1],
[ 1, -1, 1, 1, -1],
[ 1, 1, 1, -1, -1],
[-1, -1, -1, 1, 1]])
"""
def __init__(
self,
initial_temperature=100,
alpha=0.99,
cutoff_temperature=0.001,
iterations_per_t=10,
size_limit=100,
flag_evolution_history=False,
verbose=False,
rand_seed=None,
process_num=1,
):
super().__init__() # Ising 矩阵.
self._initial_temperature = initial_temperature # 初始温度.
self._alpha = alpha # 降温系数.
self._cutoff_temperature = cutoff_temperature # 截止温度.
self._iterations_per_t = iterations_per_t # 每个温度迭代深度.
self._size_limit = size_limit # 输出解的个数的限制
self._verbose = verbose
self._ha_history = {}
self._flag_evolution_history = flag_evolution_history
self._solution_pool = None
if rand_seed is None:
rand_seed = np.random.randint(2**30)
self._rand_seed = rand_seed
logger.debug("The selected rand_seed=%s", self._rand_seed)
self._process_num = (
os.cpu_count()
if process_num == -1 or process_num > os.cpu_count()
else process_num
)
def _dlt_h_single_flip(self, solution, idx):
"""TODO 一些向量运算"""
return 4 * solution[idx] * np.inner(solution, self.matrix[idx, :])
[文档]
def on_matrix_change(self):
"""更新矩阵相关信息"""
self._solution_pool = HeapUniquePool(
self.matrix, self.matrix.shape[0], self._size_limit
)
[文档]
def get_ha_history(self):
"""获取哈密顿量演化历史
Returns:
dict: 哈密顿量随时间演化历史,key是时间,单位为秒,value是 hamilton
"""
return self._ha_history
[文档]
@track_data(alg_name="sa", enabled=True)
def single_process_solve(
self, ising_matrix=None, init_solution=None, rand_seed=None
):
"""单进程求解Ising矩阵
Args:
ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None.
init_solution (np.ndarray, optional): 初始解向量. Defaults to None.
rand_seed (int, optional): numpy随机数生成器的随机种子
Returns:
np.ndarray: 解向量
"""
curr_rng = np.random.default_rng(rand_seed)
self.set_matrix(ising_matrix)
verify_license(self.matrix.shape[0])
if init_solution is None:
init_solution = (
2 * (curr_rng.normal(loc=0.0, scale=1.0, size=len(self.matrix)) > 0) - 1
)
temperature = self._initial_temperature
hamilton = -(init_solution.dot(self.matrix)).dot(init_solution)
best_solution = init_solution.copy()
best_hamilton = hamilton
# 哈密顿量演化历史
start_time = time.perf_counter()
if self._flag_evolution_history:
self._ha_history.update({0: hamilton})
# 预计迭代次数
iterations = int(
np.ceil(
np.log(self._cutoff_temperature / self._initial_temperature)
/ np.log(self._alpha)
)
)
# 预计完成时间
total_time = None
count = 0
while temperature > self._cutoff_temperature:
count += 1
flag = False # 用来标识该温度下是否有新值被接受
for _ in range(self._iterations_per_t):
randidx = curr_rng.integers(len(init_solution))
dlt_h = self._dlt_h_single_flip(init_solution, randidx)
if dlt_h < 0 or math.exp(-dlt_h / temperature) > curr_rng.random():
flag = True
init_solution[randidx] = -init_solution[randidx]
hamilton += dlt_h
if self._flag_evolution_history:
current_time = time.perf_counter()
self._ha_history[current_time - start_time] = hamilton
if hamilton < best_hamilton:
best_solution = init_solution.copy()
best_hamilton = hamilton
self._solution_pool.push(init_solution, hamilton)
if flag:
init_solution = best_solution.copy()
hamilton = best_hamilton
temperature *= self._alpha
iteration_time = time.perf_counter() - start_time
if total_time is None:
total_time = iteration_time * iterations
logger.debug(
"Expected execution completion time is %s seconds", total_time
)
if self._verbose:
# 输出当前进度和当前温度
logger.debug(
"Current progress %s/%s; Current temperature: %s",
count,
iterations,
temperature,
)
solutions = self._solution_pool.get_solutions()
return solutions
def _solve(self, ising_matrix=None, init_solution=None):
"""SA求解Ising矩阵solve接口
Args:
ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None.
init_solution (np.ndarray, optional): 初始解向量. Defaults to None.
Returns:
np.ndarray: 多个进程合并去重之后的解向量.
"""
if self._process_num == 1:
# 单进程SA求解,class random seed就是solve random seed
solutions = self.single_process_solve(
ising_matrix, init_solution, self._rand_seed
)
else:
# 多进程SA求解,class random seed用于给每个进程的随机数生成器设定random seed
np.random.seed(self._rand_seed)
seeds_list = np.random.randint(low=2**30, size=self._process_num)
pool = ProcessPoolExecutor(max_workers=self._process_num)
# 创建多个SA进程求解
args = [
(ising_matrix, init_solution, seeds_list[_])
for _ in range(self._process_num)
]
solutions = list(pool.map(self.single_process_solve, *zip(*args)))
# 合并SA的解并去重
solutions = np.concatenate(solutions, axis=0)
solutions = np.unique(solutions, axis=0)
return solutions
if __name__ == "__main__":
import doctest
doctest.testmod()