kaiwu.preprocess._precision_adaption_split 源代码

# -*- coding: utf-8 -*-
"""
模块: qubo

功能: QUBO模型系数动态范围,满足真机位宽限制
"""
import numpy as np
from kaiwu.core import KaiwuError
from kaiwu.license import track_data


def get_split_num(ising_matrix, var_num):
    """
    计算每个变量需要被拆分为多少个
    """
    num_split = np.ones(var_num, dtype=int)
    # 未拆分记录原值,已经拆分记录拆分后的值
    mat = ising_matrix.copy()
    # mat_cmp: 行列都未拆分的记录开根号后的数值,行或列已经拆分的记录拆分后的值
    # 这个矩阵的最大值就是最大的num_split
    mat_cmp = np.sqrt(ising_matrix.copy())

    while True:
        max_idx = np.argmax(mat_cmp)
        max_pos_x, max_pos_y = max_idx // var_num, max_idx % var_num
        max_val = mat[max_pos_x, max_pos_y]
        if max_val <= 1:
            break

        # 若某个变量之前已经分拆过,就不再分拆
        if num_split[max_pos_x] == 1 and num_split[max_pos_y] == 1:
            num_split_x = int(np.ceil(np.sqrt(max_val)))
        else:
            num_split_x = int(np.ceil(max_val))

        # 将未分拆的变量分拆
        if num_split[max_pos_x] == 1:
            mat[max_pos_x, :] /= num_split_x
            mat[:, max_pos_x] /= num_split_x
            mat_cmp[max_pos_x, :] = mat[max_pos_x, :]
            mat_cmp[:, max_pos_x] = mat[:, max_pos_x]
            num_split[max_pos_x] = num_split_x

        if num_split[max_pos_y] == 1:
            mat[max_pos_y, :] /= num_split_x
            mat[:, max_pos_y] /= num_split_x
            mat_cmp[max_pos_y, :] = mat[max_pos_y, :]
            mat_cmp[:, max_pos_y] = mat[:, max_pos_y]
            num_split[max_pos_y] = num_split_x
    return num_split


def _get_int_ising_matrix(ising_ret, ising_mat, minimum, num_split, tail):
    """使所有系数都是minimum的整数倍,减小误差

    方法:将同一个项分拆出的系数向上取整,并将累计的误差在一个分拆项补偿回来
    举例:当精度取0.5 [3.9 3.9 3.9 3.9] 变为 [4 4 4 4], 在将累计的误差在第一项减掉
     变为[4-0.4 4 4 4]=[3.6 4 4 4] 最终结果为[3.5 4 4 4]
    """

    abs_ising_ret = np.abs(ising_ret) / minimum
    abs_ising_mat = np.abs(ising_mat) / minimum

    factor = np.expand_dims(num_split, axis=0)
    factor = factor.T.dot(factor)

    delta = np.ceil(abs_ising_mat / factor - 1e-8).astype(int) * factor - abs_ising_mat
    ceil_abs_ising_ret = np.ceil(abs_ising_ret - 1e-8)
    var_num = len(num_split)
    tail = np.concatenate([[0], tail + 1])
    for i in range(var_num):
        for j in range(i, var_num):
            finish = False
            for tl_i in range(tail[i], tail[i + 1]):
                for tl_j in range(tail[j], tail[j + 1]):
                    if delta[i][j] <= ceil_abs_ising_ret[tl_i, tl_j]:
                        ceil_abs_ising_ret[tl_i, tl_j] -= delta[i][j]
                        ceil_abs_ising_ret[tl_j, tl_i] -= delta[i][j]
                        finish = True
                        break
                    delta[i][j] -= ceil_abs_ising_ret[tl_i, tl_j]
                    ceil_abs_ising_ret[tl_i, tl_j] = ceil_abs_ising_ret[tl_j, tl_i] = 0
                if finish:
                    break
    ising_ret = ceil_abs_ising_ret * (((ising_ret) > 0).astype(int) * 2 - 1)
    return ising_ret


[文档] @track_data() def perform_precision_adaption_split( ising_matrix: np.ndarray, param_bit=8, min_increment=None, penalty=None, round_to_increment=True, ): """将变量拆分, 使得QUBO表达式的系数范围缩小, 能够使用要求的比特数表达 Args: ising_matrix (np.ndarray): Ising矩阵 param_bit (int): Ising矩阵元素可以使用的比特数,表示矩阵的参数精度。默认为8 min_increment (float): 矩阵元素的最小变化量,表示转化后矩阵的分辨率。默认值取矩阵元素间差值的最小正值 penalty (float): 惩罚项系数, 默认为min_increment * (2^(param_bit-1) - 1) round_to_increment (bool): 将矩阵的所有元素转化为min_increment的整数倍,使得其可以用不超过param_bit的比特数表达 Returns: tuple: 返回元组,包含新矩阵和变量索引 - np.ndarray: 包含缩小范围的新矩阵。新矩阵单个元素并不一定在精度范围内,但是整体除以min_increment后会在精度范围内 - np.ndarray: 变量分拆后该变量第一次出现的位置 Examples: >>> import numpy as np >>> import kaiwu as kw >>> mat = np.array([[0, 18, -12], ... [18, 0, 1], ... [-12, 1, 0]]) >>> kw.preprocess.perform_precision_adaption_split(mat, param_bit=5, min_increment=1, penalty=4, ... round_to_increment=True) (array([[ 0., 4., 3., 5., -6.], [ 4., 0., 5., 5., -6.], [ 3., 5., 0., 4., 0.], [ 5., 5., 4., 0., 1.], [-6., -6., 0., 1., 0.]]), array([1, 3, 4])) """ thresh = 2 ** (param_bit - 1) - 1 ising_matrix = (ising_matrix + ising_matrix.T) / 2 abs_ising_mat = np.abs(ising_matrix) var_num = ising_matrix.shape[0] if min_increment is None: params = np.unique(ising_matrix) min_increment = np.diff(params).min() if penalty is None: penalty = thresh * min_increment ising_norm = abs_ising_mat / thresh / min_increment num_split = get_split_num(ising_norm, var_num) sum_split = np.sum(num_split) last_var_idx = np.zeros(var_num, dtype=int) indices_split = np.zeros(sum_split, dtype=int) last_var_idx[0] = num_split[0] - 1 for i in range(1, var_num): last_var_idx[i] = last_var_idx[i - 1] + num_split[i] indices_split[last_var_idx[i - 1] + 1 : last_var_idx[i] + 1] = i # prepare coefficient to reduce the ising matrix item factor_ori = 1 / num_split factor_split = factor_ori[indices_split] factor_split = np.expand_dims(factor_split, axis=0) factor_split = factor_split.T.dot(factor_split) # prepare penalty for constraint of splited variable np.fill_diagonal(ising_matrix, penalty * num_split * num_split) # fill ising matrix and reduce it ising_split = ising_matrix[indices_split, :] ising_split = ising_split[:, indices_split] ising_ret = ising_split * factor_split np.fill_diagonal(ising_ret, 0) ising_ret = _get_int_ising_matrix( ising_ret, ising_matrix, min_increment, num_split, last_var_idx ) if round_to_increment: ising_ret = np.round(ising_ret) ising_ret *= min_increment return ising_ret, last_var_idx
[文档] @track_data() def restore_split_solution(solution, last_var_idx): """将缩小范围多项式的解转换回原来的表达式的解 Args: solution(np.ndarray): 求得的解 last_var_idx(np.ndarray): 分拆后每个变量最后一次出现的位置 Returns: np.ndarray: 原多项式的解 Examples: >>> import kaiwu as kw >>> import numpy as np >>> mat = np.array([[0, -15, 0, 30], ... [-15, 0, 0, 2], ... [0, 0, 0, 0], ... [30, 2, 0, 0]]) >>> r, f = kw.preprocess.perform_precision_adaption_split(mat, 5, min_increment=0.5, round_to_increment=True) >>> worker = kw.classical.SimulatedAnnealingOptimizer() >>> opt = worker.solve(r) >>> sol = opt[0] * opt[0, -1] >>> kw.preprocess.restore_split_solution(sol, f) # doctest: +SKIP array([ 1, -1, -1, 1], dtype=int8) """ var_num = len(last_var_idx) for i in range(var_num - 1): for j in range(last_var_idx[i] + 1, last_var_idx[i + 1]): if solution[j] != solution[j + 1]: raise KaiwuError( "constraint for expression narrowing is not satisfied. " f"The {i + 2}th variable is in conflict" ) for j in range(0, last_var_idx[0]): if solution[j] != solution[j + 1]: raise KaiwuError( "constraint for expression narrowing is not satisfied. " "The 1st variable is in conflict" ) return solution[last_var_idx]
[文档] @track_data() def construct_split_solution(solution, last_var_idx): """将原矩阵分拆变量降低参数精度后,根据原矩阵的解构造新矩阵的解 Args: solution (np.ndarray): 原矩阵的解 last_var_idx (np.ndarray): 分拆后每个变量最后一次出现的位置 Returns: np.ndarray: 新矩阵的解 Examples: >>> import numpy as np >>> import kaiwu as kw >>> mat = np.array([[0, -15, 0, 40], ... [-15, 0, 0, 2], ... [0, 0, 0, 0], ... [40, 2, 0, 0]]) >>> nmat, tail = kw.preprocess.perform_precision_adaption_split(mat, 5, min_increment=0.5, ... round_to_increment=True, ... penalty=0) >>> sol = np.array([1, 1, -1, -1]) >>> ans = np.array([1, 1, 1, 1, -1, -1, -1, -1]) >>> kw.preprocess.construct_split_solution(sol, tail) array([ 1., 1., 1., 1., -1., -1., -1., -1.]) """ if solution.shape != last_var_idx.shape: raise KaiwuError("unmatched shape for solution and last_var_idx") split_solution = np.zeros(last_var_idx[-1] + 1) var_num = len(last_var_idx) for i in range(1, var_num): for j in range(last_var_idx[i - 1] + 1, last_var_idx[i] + 1): split_solution[j] = solution[i] for j in range(0, last_var_idx[0] + 1): split_solution[j] = solution[0] return split_solution
if __name__ == "__main__": import doctest doctest.testmod()