kaiwu.sampler._simulated_annealing 源代码

# -*- coding: utf-8 -*-
"""
提供模拟退火求解器
"""

import os
import math
import time
import logging
from abc import ABCMeta, abstractmethod
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from kaiwu.core import IsingSolver, QuboSolver

logger = logging.getLogger(__name__)


def split_near_uniform(m: int, n: int) -> list[int]:
    """
    将整数 m 拆成 n 份,每份为两个相邻整数中的一个(最接近平均值),
    且拆分后的 n 个数之和仍为 m。

    返回长度为 n 的列表,包含 r 个 (q+1) 和 n-r 个 q,其中 q, r = divmod(m, n).
    """
    q, r = divmod(m, n)
    # r 个 (q+1),n-r 个 q
    return [q + 1] * r + [q] * (n - r)


class SimulatedAnnealingSamplerBase(IsingSolver, QuboSolver, metaclass=ABCMeta):
    """求解Ising模型的模拟退火求解器(输出结果具有随机性).

    Args:
        initial_temperature (float): 初始温度.

        alpha (float): 降温系数.

        cutoff_temperature (float): 截止温度.

        iterations_per_t (int): 每个温度迭代深度.

        size_limit (int): 输出解的个数,默认输出100个解

        rand_seed (int, optional): numpy随机数生成器的随机种子

        process_num (int, optional): 并行进程数 (-1为自动调用所有可用核心,1为单进程). Defaults to 1.
    """

    def __init__(
        self,
        initial_temperature=None,
        alpha=0.99,
        cutoff_temperature=None,
        iterations_per_t=10,
        size_limit=1,
        rand_seed=None,
        process_num=1,
    ):
        super().__init__()
        self.initial_temperature = initial_temperature  # 初始温度.
        self.alpha = alpha  # 降温系数.
        self.cutoff_temperature = cutoff_temperature  # 截止温度.
        self.iterations_per_t = iterations_per_t
        self.size_limit = size_limit
        if rand_seed is None:
            self.rand_seed = np.random.randint(2**31)
        else:
            self.rand_seed = rand_seed
        self.alpha_schedule = None
        self.process_num = (
            os.cpu_count()
            if process_num == -1 or process_num > os.cpu_count()
            else process_num
        )

    def _solve(self, ising_matrix=None, init_solution=None, size_limit=None):
        """SA求解Ising矩阵solve接口

        Args:
            ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None.

            init_solution (np.ndarray, optional): 初始解向量. Defaults to None.

        Returns:
            np.ndarray: 多个进程合并去重之后的解向量.
        """
        size_limit = size_limit if size_limit is not None else self.size_limit
        if self.process_num == 1:
            # 单进程SA求解,class random seed就是solve random seed
            solutions = self.single_process_solve(
                ising_matrix, init_solution, self.rand_seed
            )
        else:
            # 多进程SA求解,class random seed用于给每个进程的随机数生成器设定random seed
            np.random.seed(self.rand_seed)
            seeds_list = np.random.randint(low=2**30, size=self.process_num)
            pool = ProcessPoolExecutor(max_workers=self.process_num)
            # 创建多个SA进程求解
            size_limits = split_near_uniform(size_limit, self.process_num)
            args = [
                (ising_matrix, init_solution, seeds_list[i], size_limits[i])
                for i in range(self.process_num)
            ]
            solutions = list(pool.map(self.single_process_solve, *zip(*args)))

            # 合并SA的解并去重
            solutions = np.concatenate(solutions, axis=0)
        return solutions

    @abstractmethod
    def single_process_solve(
        self, ising_matrix, init_solution, rand_seed, size_limit=None
    ):
        """SA求解Ising矩阵solve接口

        Args:
            ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None.

            init_solution (np.ndarray, optional): 初始解向量. Defaults to None.

        Returns:
            np.ndarray: 多个进程合并去重之后的解向量.
        """
        return


[文档] class SimulatedAnnealingSampler(SimulatedAnnealingSamplerBase): """求解Ising模型的模拟退火采样器。每个解都独立求解 Args: initial_temperature (float): 初始温度. alpha (float): 降温系数. cutoff_temperature (float): 截止温度. iterations_per_t (int): 每个温度迭代深度. size_limit (int): 输出解的个数,默认输出100个解 flag_evolution_history (bool): 是否输出哈密顿量演化历史,默认False,当值为True时,通过get_ha_history方法获取演化历史 verbose (bool): 是否在控制台输出计算进度,默认False rand_seed (int, optional): numpy随机数生成器的随机种子 process_num (int, optional): 并行进程数 (-1为自动调用所有可用核心,1为单进程). Defaults to 1. Examples: >>> import numpy as np >>> import kaiwu as kw >>> matrix = -np.array([[ 0. , 1. , 0. , 1. , 1. ], ... [ 1. , 0. , 0. , 1., 1. ], ... [ 0. , 0. , 0. , 1., 1. ], ... [ 1. , 1., 1. , 0. , 1. ], ... [ 1. , 1., 1. , 1. , 0. ]]) >>> worker = kw.classical.SimulatedAnnealingOptimizer(initial_temperature=100, ... alpha=0.99, ... cutoff_temperature=0.001, ... iterations_per_t=10, ... size_limit=10) >>> # This value is random and cannot be predicted >>> worker.solve(matrix) # doctest: +SKIP array([[-1, 1, -1, 1, 1], [-1, 1, 1, -1, 1], [-1, 1, 1, -1, -1], [ 1, -1, -1, 1, 1], [ 1, -1, 1, -1, 1], [ 1, -1, 1, -1, -1], [ 1, -1, -1, -1, 1], [ 1, -1, 1, 1, -1], [ 1, 1, 1, -1, -1], [-1, -1, -1, 1, 1]]) """ def __init__( self, initial_temperature=100, alpha=0.99, cutoff_temperature=0.001, iterations_per_t=10, size_limit=100, flag_evolution_history=False, verbose=False, rand_seed=None, process_num=1, ): super().__init__() # Ising 矩阵. self.initial_temperature = initial_temperature # 初始温度. self.alpha = alpha # 降温系数. self.cutoff_temperature = cutoff_temperature # 截止温度. self.iterations_per_t = iterations_per_t # 每个温度迭代深度. self.size_limit = size_limit # 输出解的个数的限制 self.verbose = verbose self.ha_history = {} self.flag_evolution_history = flag_evolution_history self.solution_pool = None if rand_seed is None: rand_seed = np.random.randint(2**30) self.rand_seed = rand_seed self.process_num = ( os.cpu_count() if process_num == -1 or process_num > os.cpu_count() else process_num ) def _dlt_h_single_flip(self, solution, idx): """TODO 一些向量运算""" return 4 * solution[idx] * np.inner(solution, self.matrix[idx, :])
[文档] def on_matrix_change(self): """更新矩阵相关信息""" self.solution_pool = np.zeros( (self.size_limit, self.matrix.shape[0]), dtype=int )
[文档] def get_ha_history(self): """获取哈密顿量演化历史 Returns: dict: 哈密顿量随时间演化历史,key是时间,单位为秒,value是 hamilton """ return self.ha_history
[文档] def single_process_solve( self, ising_matrix=None, init_solution=None, rand_seed=None, size_limit=None ): """单进程求解Ising矩阵 Args: ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None. init_solution (np.ndarray, optional): 初始解向量. Defaults to None. rand_seed (int, optional): numpy随机数生成器的随机种子 Returns: np.ndarray: 解向量 """ curr_rng = np.random.default_rng(rand_seed) self.set_matrix(ising_matrix) if size_limit is not None and size_limit != self.size_limit: self.solution_pool = np.zeros((size_limit, self.matrix.shape[0]), dtype=int) size_limit = size_limit if size_limit is not None else self.size_limit for i in range(size_limit): if init_solution is None: init_solution = ( 2 * (curr_rng.normal(loc=0.0, scale=1.0, size=len(self.matrix)) > 0) - 1 ) temperature = self.initial_temperature hamilton = -(init_solution.dot(self.matrix)).dot(init_solution) best_solution = init_solution.copy() best_hamilton = hamilton # 哈密顿量演化历史 start_time = time.perf_counter() if self.flag_evolution_history: self.ha_history.update({0: hamilton}) # 预计迭代次数 iterations = int( np.ceil( np.log(self.cutoff_temperature / self.initial_temperature) / np.log(self.alpha) ) ) # 预计完成时间 total_time = None count = 0 while temperature > self.cutoff_temperature: count += 1 flag = False # 用来标识该温度下是否有新值被接受 for _ in range(self.iterations_per_t): dlt_h = self._dlt_h_single_flip(init_solution, randidx) randidx = curr_rng.integers(len(init_solution)) if dlt_h < 0 or math.exp(-dlt_h / temperature) > curr_rng.random(): flag = True init_solution[randidx] = -init_solution[randidx] hamilton += dlt_h if self.flag_evolution_history: current_time = time.perf_counter() self.ha_history[current_time - start_time] = hamilton if hamilton < best_hamilton: best_solution = init_solution.copy() best_hamilton = hamilton if flag: init_solution = best_solution.copy() hamilton = best_hamilton temperature *= self.alpha iteration_time = time.perf_counter() - start_time if total_time is None: total_time = iteration_time * iterations logger.info( "Expected execution completion time is %s seconds", total_time ) if self.verbose: # 输出当前进度和当前温度 logger.info( "Current progress %s/%s; Current temperature: %s", count, iterations, temperature, ) self.solution_pool[i, :] = best_solution return self.solution_pool * self.solution_pool[:, -1:]
def auto_temperature(ising_mat): """自动确定模拟退火的起始和结束温度 Args: ising_mat (numpy.array): Ising矩阵 Returns: tuple: float: 起始温度 float: 截止温度 """ sum_abs_bias_vec = np.sum(np.abs(ising_mat), axis=1) max_abs_bias = np.max(sum_abs_bias_vec) min_abs_bias_vec = np.min(np.abs(ising_mat[ising_mat != 0])) min_abs_bias = np.min(min_abs_bias_vec) number_min_gaps = np.sum(min_abs_bias_vec == min_abs_bias) hot_temp = max_abs_bias / np.log(2) max_single_qubit_excitation_rate = 0.01 cold_temp = min_abs_bias / np.log( number_min_gaps / max_single_qubit_excitation_rate ) return hot_temp, cold_temp if __name__ == "__main__": import doctest doctest.testmod()