kaiwu.preprocess._precision_reducer 源代码

"""
提供CIMOptimizer降低精度的装饰类
"""

import logging
import numpy as np
from kaiwu.core._base_solver import IsingSolver, QuboSolver
from kaiwu.core._error import KaiwuError
from kaiwu.preprocess._precision import (
    calculate_ising_matrix_bit_width,
    adjust_ising_matrix_precision,
)
from kaiwu.preprocess._precision_adaption_split import (
    perform_precision_adaption_split,
    restore_split_solution,
)

logger = logging.getLogger(__name__)


[文档] class PrecisionReducer(IsingSolver, QuboSolver): """降低精度装饰类 通过精度归约和矩阵分裂技术,将大规模或高精度Ising矩阵转换为 可在有限比特精度SPQC设备上求解的形式。 先截断后split,如果split后精度仍不满足要求则直接截断至目标精度。 适用场景: - 矩阵系数精度过高(如含小数1.23或大数10000.0)超出设备表示范围 - 矩阵规模过大,需要进行split降维 可打开日志看到详细输出:kw.common.set_log_level("DEBUG") Args: component (IsingSolver): 基础求解器,如SimulatedAnnealingOptimizer precision (int): 矩阵目标精度(小数位数),控制矩阵分裂后的精度 target_bits (int): 矩阵目标比特数,默认为None不进行控制 only_feasible_solution (bool): 是否只要可行解,默认为False。 当only_feasible_solution=True且所有解都不是可行解时会抛出异常KaiwuError truncated_precision (int): 截断精度,与 precision 的差值越大,矩阵分裂后新增的变量数越多。 Example1 (基本用法:降低矩阵精度): >>> import numpy as np >>> import kaiwu as kw >>> matrix = -np.array([[0., 1.23, 0., 1., 1.], ... [1.23, 0., 0., 1., 1.], ... [0., 0., 0., 1., 1.], ... [1., 1., 1., 0., 1.], ... [1., 1., 1., 1., 0.]]) >>> optimizer = kw.classical.SimulatedAnnealingOptimizer( ... initial_temperature=100, alpha=0.99, ... cutoff_temperature=0.001, iterations_per_t=10 ... ) >>> new_optimizer = kw.preprocess.PrecisionReducer( ... optimizer, precision=4 ... ) >>> solutions = new_optimizer.solve(matrix) Example2 (处理高精度/大系数矩阵): >>> matrix2 = -np.array([[0., 1.23, 0., 1., 10000.], ... [1.23, 0., 0., 1., 1.], ... [0., 0., 0., 1., 1.], ... [1., 1., 1., 0., 1.], ... [10000., 1., 1., 1., 0.]]) >>> new_optimizer2 = kw.preprocess.PrecisionReducer( ... optimizer, precision=4, only_feasible_solution=False ... ) >>> solutions = new_optimizer2.solve(matrix2) Example3 (控制目标比特数): >>> new_optimizer3 = kw.preprocess.PrecisionReducer( ... optimizer, precision=4, target_bits=10 ... ) """ def __init__( self, component: IsingSolver, precision=8, target_bits=None, only_feasible_solution=False, truncated_precision=20, ): super().__init__() self._component = component self.precision = precision self.target_bits = target_bits self.only_feasible_solution = only_feasible_solution self.truncated_precision = truncated_precision def _solve(self, ising_matrix=None): if ising_matrix is None: ising_matrix = self.matrix if ising_matrix is None: raise ValueError("ising_matrix is required") if self.target_bits is not None and ising_matrix.shape[0] >= self.target_bits: raise ValueError( "The original matrix size cannot be larger than the matrix size after " "reducing the precision" ) ans = calculate_ising_matrix_bit_width(ising_matrix, self.truncated_precision) logger.debug( "Matrix size: %s, Matrix initial precision: %s", ising_matrix.shape, ans.get("precision"), ) # mutated_mat = perform_precision_adaption_mutate(self.mat) # mutated_mat = mutated_mat + mutated_mat.T if ans.get("multiplier") != float("inf"): mat_int = ising_matrix * ans.get("multiplier") else: mat_int = adjust_ising_matrix_precision( ising_matrix, self.truncated_precision ) mat_int = np.round(mat_int).astype(int) # 如果没做split不需要restore restore = True splitted_ret, last_idx = perform_precision_adaption_split( mat_int, self.precision ) if self.target_bits is not None and splitted_ret.shape[0] > self.target_bits: # 如果差值过小直接去尾 diff_precision = self.truncated_precision - self.precision if diff_precision > 4: mid_precision = (self.precision + self.truncated_precision) // 2 mat_int = adjust_ising_matrix_precision(ising_matrix, mid_precision) mat_int = np.round(mat_int).astype(int) splitted_ret, last_idx = perform_precision_adaption_split( mat_int, self.precision ) # 如果矩阵比特数还是比目标比特数大,直接去尾 if splitted_ret.shape[0] > self.target_bits: splitted_ret = adjust_ising_matrix_precision( ising_matrix, self.precision ) restore = False ans = calculate_ising_matrix_bit_width(splitted_ret, self.truncated_precision) logger.debug( "Matrix size after reducing precision: %s, precision: %s", splitted_ret.shape[0], ans.get("precision"), ) solutions = self._component.solve(splitted_ret) if solutions is None: return solutions if not restore: return solutions s_list = [] for solution in solutions: try: solution = restore_split_solution(solution, last_idx) except KaiwuError: if self.only_feasible_solution: continue solution = solution[last_idx] s_list.append(solution) if self.only_feasible_solution and not s_list: raise KaiwuError("No feasible solution found") return np.vstack(s_list)