kaiwu.classical._simulated_annealing 源代码

# -*- coding: utf-8 -*-
"""
提供模拟退火求解器
"""

import os
import math
import time
import logging
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from kaiwu.license import verify_license
from kaiwu.core import IsingSolver, QuboSolver
from kaiwu.common import JsonSerializableMixin
from kaiwu.common import HeapUniquePool
from kaiwu.license import track_data

logger = logging.getLogger(__name__)


[文档] class SimulatedAnnealingOptimizer(IsingSolver, QuboSolver, JsonSerializableMixin): """求解Ising模型的模拟退火求解器(输出结果具有随机性). Args: initial_temperature (float): 初始温度. alpha (float): 降温系数. cutoff_temperature (float): 截止温度. iterations_per_t (int): 每个温度迭代深度. size_limit (int): 输出解的个数,默认输出100个解 flag_evolution_history (bool): 是否输出哈密顿量演化历史,默认False,当值为True时,通过get_ha_history方法获取演化历史 verbose (bool): 是否在控制台输出计算进度,默认False rand_seed (int, optional): numpy随机数生成器的随机种子 process_num (int, optional): 并行进程数 (-1为自动调用所有可用核心,1为单进程). Defaults to 1. Examples: >>> import numpy as np >>> import kaiwu as kw >>> matrix = -np.array([[ 0. , 1. , 0. , 1. , 1. ], ... [ 1. , 0. , 0. , 1., 1. ], ... [ 0. , 0. , 0. , 1., 1. ], ... [ 1. , 1., 1. , 0. , 1. ], ... [ 1. , 1., 1. , 1. , 0. ]]) >>> worker = kw.classical.SimulatedAnnealingOptimizer(initial_temperature=100, ... alpha=0.99, ... cutoff_temperature=0.001, ... iterations_per_t=10, ... size_limit=10) >>> # This value is random and cannot be predicted >>> worker.solve(matrix) # doctest: +SKIP array([[-1, 1, -1, 1, 1], [-1, 1, 1, -1, 1], [-1, 1, 1, -1, -1], [ 1, -1, -1, 1, 1], [ 1, -1, 1, -1, 1], [ 1, -1, 1, -1, -1], [ 1, -1, -1, -1, 1], [ 1, -1, 1, 1, -1], [ 1, 1, 1, -1, -1], [-1, -1, -1, 1, 1]]) """ def __init__( self, initial_temperature=100, alpha=0.99, cutoff_temperature=0.001, iterations_per_t=10, size_limit=100, flag_evolution_history=False, verbose=False, rand_seed=None, process_num=1, ): super().__init__() # Ising 矩阵. self._initial_temperature = initial_temperature # 初始温度. self._alpha = alpha # 降温系数. self._cutoff_temperature = cutoff_temperature # 截止温度. self._iterations_per_t = iterations_per_t # 每个温度迭代深度. self._size_limit = size_limit # 输出解的个数的限制 self._verbose = verbose self._ha_history = {} self._flag_evolution_history = flag_evolution_history self._solution_pool = None if rand_seed is None: rand_seed = np.random.randint(2**30) self._rand_seed = rand_seed logger.debug("The selected rand_seed=%s", self._rand_seed) self._process_num = ( os.cpu_count() if process_num == -1 or process_num > os.cpu_count() else process_num ) def _dlt_h_single_flip(self, solution, idx): """TODO 一些向量运算""" return 4 * solution[idx] * np.inner(solution, self.matrix[idx, :])
[文档] def on_matrix_change(self): """更新矩阵相关信息""" self._solution_pool = HeapUniquePool( self.matrix, self.matrix.shape[0], self._size_limit )
[文档] def get_ha_history(self): """获取哈密顿量演化历史 Returns: dict: 哈密顿量随时间演化历史,key是时间,单位为秒,value是 hamilton """ return self._ha_history
[文档] @track_data(alg_name="sa", enabled=True) def single_process_solve( self, ising_matrix=None, init_solution=None, rand_seed=None ): """单进程求解Ising矩阵 Args: ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None. init_solution (np.ndarray, optional): 初始解向量. Defaults to None. rand_seed (int, optional): numpy随机数生成器的随机种子 Returns: np.ndarray: 解向量 """ curr_rng = np.random.default_rng(rand_seed) self.set_matrix(ising_matrix) verify_license(self.matrix.shape[0]) if init_solution is None: init_solution = ( 2 * (curr_rng.normal(loc=0.0, scale=1.0, size=len(self.matrix)) > 0) - 1 ) temperature = self._initial_temperature hamilton = -(init_solution.dot(self.matrix)).dot(init_solution) best_solution = init_solution.copy() best_hamilton = hamilton # 哈密顿量演化历史 start_time = time.perf_counter() if self._flag_evolution_history: self._ha_history.update({0: hamilton}) # 预计迭代次数 iterations = int( np.ceil( np.log(self._cutoff_temperature / self._initial_temperature) / np.log(self._alpha) ) ) # 预计完成时间 total_time = None count = 0 while temperature > self._cutoff_temperature: count += 1 flag = False # 用来标识该温度下是否有新值被接受 for _ in range(self._iterations_per_t): randidx = curr_rng.integers(len(init_solution)) dlt_h = self._dlt_h_single_flip(init_solution, randidx) if dlt_h < 0 or math.exp(-dlt_h / temperature) > curr_rng.random(): flag = True init_solution[randidx] = -init_solution[randidx] hamilton += dlt_h if self._flag_evolution_history: current_time = time.perf_counter() self._ha_history[current_time - start_time] = hamilton if hamilton < best_hamilton: best_solution = init_solution.copy() best_hamilton = hamilton self._solution_pool.push(init_solution, hamilton) if flag: init_solution = best_solution.copy() hamilton = best_hamilton temperature *= self._alpha iteration_time = time.perf_counter() - start_time if total_time is None: total_time = iteration_time * iterations logger.debug( "Expected execution completion time is %s seconds", total_time ) if self._verbose: # 输出当前进度和当前温度 logger.debug( "Current progress %s/%s; Current temperature: %s", count, iterations, temperature, ) solutions = self._solution_pool.get_solutions() return solutions
def _solve(self, ising_matrix=None, init_solution=None): """SA求解Ising矩阵solve接口 Args: ising_matrix (np.ndarray, optional): Ising矩阵. Defaults to None. init_solution (np.ndarray, optional): 初始解向量. Defaults to None. Returns: np.ndarray: 多个进程合并去重之后的解向量. """ if self._process_num == 1: # 单进程SA求解,class random seed就是solve random seed solutions = self.single_process_solve( ising_matrix, init_solution, self._rand_seed ) else: # 多进程SA求解,class random seed用于给每个进程的随机数生成器设定random seed np.random.seed(self._rand_seed) seeds_list = np.random.randint(low=2**30, size=self._process_num) pool = ProcessPoolExecutor(max_workers=self._process_num) # 创建多个SA进程求解 args = [ (ising_matrix, init_solution, seeds_list[_]) for _ in range(self._process_num) ] solutions = list(pool.map(self.single_process_solve, *zip(*args))) # 合并SA的解并去重 solutions = np.concatenate(solutions, axis=0) solutions = np.unique(solutions, axis=0) return solutions
if __name__ == "__main__": import doctest doctest.testmod()