# -*- coding: utf-8 -*-
"""
提供模拟退火求解器
"""
import os
import math
import time
import logging
from abc import ABCMeta, abstractmethod
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from kaiwu.core import IsingSolver, QuboSolver
logger = logging.getLogger(__name__)
def split_near_uniform(m: int, n: int) -> list[int]:
"""
将整数 m 拆成 n 份,每份为两个相邻整数中的一个(最接近平均值),
且拆分后的 n 个数之和仍为 m。
返回长度为 n 的列表,包含 r 个 (q+1) 和 n-r 个 q,其中 q, r = divmod(m, n).
"""
q, r = divmod(m, n)
# r 个 (q+1),n-r 个 q
return [q + 1] * r + [q] * (n - r)
class SimulatedAnnealingSamplerBase(IsingSolver, QuboSolver, metaclass=ABCMeta):
"""求解Ising模型的模拟退火求解器(输出结果具有随机性).
Args:
initial_temperature (float): 初始温度.
alpha (float): 降温系数.
cutoff_temperature (float): 截止温度.
iterations_per_t (int): 每个温度迭代深度.
size_limit (int): 输出解的个数,默认输出100个解
rand_seed (int, optional): numpy随机数生成器的随机种子
process_num (int, optional): 并行进程数 (-1为自动调用所有可用核心,1为单进程). Defaults to 1.
"""
def __init__(
self,
initial_temperature=None,
alpha=0.99,
cutoff_temperature=None,
iterations_per_t=10,
size_limit=1,
rand_seed=None,
process_num=1,
):
super().__init__()
self.initial_temperature = initial_temperature # 初始温度.
self.alpha = alpha # 降温系数.
self.cutoff_temperature = cutoff_temperature # 截止温度.
self.iterations_per_t = iterations_per_t
self.size_limit = size_limit
if rand_seed is None:
self.rand_seed = np.random.randint(2**31)
else:
self.rand_seed = rand_seed
self.alpha_schedule = None
self.process_num = (
os.cpu_count()
if process_num == -1 or process_num > os.cpu_count()
else process_num
)
def _solve(self, ising_matrix=None, init_solution=None, size_limit=None):
"""SA求解Ising矩阵solve接口
Args:
ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None.
init_solution (np.ndarray, optional): 初始解向量. Defaults to None.
Returns:
np.ndarray: 多个进程合并去重之后的解向量.
"""
size_limit = size_limit if size_limit is not None else self.size_limit
if self.process_num == 1:
# 单进程SA求解,class random seed就是solve random seed
solutions = self.single_process_solve(
ising_matrix, init_solution, self.rand_seed
)
else:
# 多进程SA求解,class random seed用于给每个进程的随机数生成器设定random seed
np.random.seed(self.rand_seed)
seeds_list = np.random.randint(low=2**30, size=self.process_num)
pool = ProcessPoolExecutor(max_workers=self.process_num)
# 创建多个SA进程求解
size_limits = split_near_uniform(size_limit, self.process_num)
args = [
(ising_matrix, init_solution, seeds_list[i], size_limits[i])
for i in range(self.process_num)
]
solutions = list(pool.map(self.single_process_solve, *zip(*args)))
# 合并SA的解并去重
solutions = np.concatenate(solutions, axis=0)
return solutions
@abstractmethod
def single_process_solve(
self, ising_matrix, init_solution, rand_seed, size_limit=None
):
"""SA求解Ising矩阵solve接口
Args:
ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None.
init_solution (np.ndarray, optional): 初始解向量. Defaults to None.
Returns:
np.ndarray: 多个进程合并去重之后的解向量.
"""
return
[文档]
class SimulatedAnnealingSampler(SimulatedAnnealingSamplerBase):
"""求解Ising模型的模拟退火采样器。每个解都独立求解
Args:
initial_temperature (float): 初始温度.
alpha (float): 降温系数.
cutoff_temperature (float): 截止温度.
iterations_per_t (int): 每个温度迭代深度.
size_limit (int): 输出解的个数,默认输出100个解
flag_evolution_history (bool): 是否输出哈密顿量演化历史,默认False,当值为True时,通过get_ha_history方法获取演化历史
verbose (bool): 是否在控制台输出计算进度,默认False
rand_seed (int, optional): numpy随机数生成器的随机种子
process_num (int, optional): 并行进程数 (-1为自动调用所有可用核心,1为单进程). Defaults to 1.
Examples:
>>> import numpy as np
>>> import kaiwu as kw
>>> matrix = -np.array([[ 0. , 1. , 0. , 1. , 1. ],
... [ 1. , 0. , 0. , 1., 1. ],
... [ 0. , 0. , 0. , 1., 1. ],
... [ 1. , 1., 1. , 0. , 1. ],
... [ 1. , 1., 1. , 1. , 0. ]])
>>> worker = kw.classical.SimulatedAnnealingOptimizer(initial_temperature=100,
... alpha=0.99,
... cutoff_temperature=0.001,
... iterations_per_t=10,
... size_limit=10)
>>> # This value is random and cannot be predicted
>>> worker.solve(matrix) # doctest: +SKIP
array([[-1, 1, -1, 1, 1],
[-1, 1, 1, -1, 1],
[-1, 1, 1, -1, -1],
[ 1, -1, -1, 1, 1],
[ 1, -1, 1, -1, 1],
[ 1, -1, 1, -1, -1],
[ 1, -1, -1, -1, 1],
[ 1, -1, 1, 1, -1],
[ 1, 1, 1, -1, -1],
[-1, -1, -1, 1, 1]])
"""
def __init__(
self,
initial_temperature=100,
alpha=0.99,
cutoff_temperature=0.001,
iterations_per_t=10,
size_limit=100,
flag_evolution_history=False,
verbose=False,
rand_seed=None,
process_num=1,
):
super().__init__() # Ising 矩阵.
self.initial_temperature = initial_temperature # 初始温度.
self.alpha = alpha # 降温系数.
self.cutoff_temperature = cutoff_temperature # 截止温度.
self.iterations_per_t = iterations_per_t # 每个温度迭代深度.
self.size_limit = size_limit # 输出解的个数的限制
self.verbose = verbose
self.ha_history = {}
self.flag_evolution_history = flag_evolution_history
self.solution_pool = None
if rand_seed is None:
rand_seed = np.random.randint(2**30)
self.rand_seed = rand_seed
self.process_num = (
os.cpu_count()
if process_num == -1 or process_num > os.cpu_count()
else process_num
)
def _dlt_h_single_flip(self, solution, idx):
"""TODO 一些向量运算"""
return 4 * solution[idx] * np.inner(solution, self.matrix[idx, :])
[文档]
def on_matrix_change(self):
"""更新矩阵相关信息"""
self.solution_pool = np.zeros(
(self.size_limit, self.matrix.shape[0]), dtype=int
)
[文档]
def get_ha_history(self):
"""获取哈密顿量演化历史
Returns:
dict: 哈密顿量随时间演化历史,key是时间,单位为秒,value是 hamilton
"""
return self.ha_history
[文档]
def single_process_solve(
self, ising_matrix=None, init_solution=None, rand_seed=None, size_limit=None
):
"""单进程求解Ising矩阵
Args:
ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None.
init_solution (np.ndarray, optional): 初始解向量. Defaults to None.
rand_seed (int, optional): numpy随机数生成器的随机种子
Returns:
np.ndarray: 解向量
"""
curr_rng = np.random.default_rng(rand_seed)
self.set_matrix(ising_matrix)
if size_limit is not None and size_limit != self.size_limit:
self.solution_pool = np.zeros((size_limit, self.matrix.shape[0]), dtype=int)
size_limit = size_limit if size_limit is not None else self.size_limit
for i in range(size_limit):
if init_solution is None:
init_solution = (
2 * (curr_rng.normal(loc=0.0, scale=1.0, size=len(self.matrix)) > 0)
- 1
)
temperature = self.initial_temperature
hamilton = -(init_solution.dot(self.matrix)).dot(init_solution)
best_solution = init_solution.copy()
best_hamilton = hamilton
# 哈密顿量演化历史
start_time = time.perf_counter()
if self.flag_evolution_history:
self.ha_history.update({0: hamilton})
# 预计迭代次数
iterations = int(
np.ceil(
np.log(self.cutoff_temperature / self.initial_temperature)
/ np.log(self.alpha)
)
)
# 预计完成时间
total_time = None
count = 0
while temperature > self.cutoff_temperature:
count += 1
flag = False # 用来标识该温度下是否有新值被接受
for _ in range(self.iterations_per_t):
dlt_h = self._dlt_h_single_flip(init_solution, randidx)
randidx = curr_rng.integers(len(init_solution))
if dlt_h < 0 or math.exp(-dlt_h / temperature) > curr_rng.random():
flag = True
init_solution[randidx] = -init_solution[randidx]
hamilton += dlt_h
if self.flag_evolution_history:
current_time = time.perf_counter()
self.ha_history[current_time - start_time] = hamilton
if hamilton < best_hamilton:
best_solution = init_solution.copy()
best_hamilton = hamilton
if flag:
init_solution = best_solution.copy()
hamilton = best_hamilton
temperature *= self.alpha
iteration_time = time.perf_counter() - start_time
if total_time is None:
total_time = iteration_time * iterations
logger.info(
"Expected execution completion time is %s seconds", total_time
)
if self.verbose:
# 输出当前进度和当前温度
logger.info(
"Current progress %s/%s; Current temperature: %s",
count,
iterations,
temperature,
)
self.solution_pool[i, :] = best_solution
return self.solution_pool * self.solution_pool[:, -1:]
def auto_temperature(ising_mat):
"""自动确定模拟退火的起始和结束温度
Args:
ising_mat (numpy.array): Ising矩阵
Returns:
tuple:
float: 起始温度
float: 截止温度
"""
sum_abs_bias_vec = np.sum(np.abs(ising_mat), axis=1)
max_abs_bias = np.max(sum_abs_bias_vec)
min_abs_bias_vec = np.min(np.abs(ising_mat[ising_mat != 0]))
min_abs_bias = np.min(min_abs_bias_vec)
number_min_gaps = np.sum(min_abs_bias_vec == min_abs_bias)
hot_temp = max_abs_bias / np.log(2)
max_single_qubit_excitation_rate = 0.01
cold_temp = min_abs_bias / np.log(
number_min_gaps / max_single_qubit_excitation_rate
)
return hot_temp, cold_temp
if __name__ == "__main__":
import doctest
doctest.testmod()