kaiwu.preprocess._precision_adaption_mutate 源代码

# -*- coding: utf-8 -*-
"""
模块: qubo

功能: QUBO模型系数动态范围,满足真机位宽限制
"""
import numpy as np
import portion as P
from kaiwu.core import KaiwuError
from kaiwu.preprocess._range_heuristic import Greedy, MaintainOrder, _MatrixOrder
from kaiwu.preprocess._bound import _compute_pre_opt_bounds
from kaiwu.preprocess._utils import _get_dynamic_range_metric
from kaiwu.license import track_data

HEURISTICS = {"greedy": Greedy, "order": MaintainOrder}


def _get_random_index_pair(matrix):
    """获取非零值的索引"""
    row_indices, column_indices = np.where(np.invert(matrix == 0))
    if row_indices.shape[0] == 0:
        return 0, 1
    random_index = np.random.randint(0, row_indices.shape[0])
    i, j = row_indices[random_index], column_indices[random_index]
    return i, j


def _check_to_next_increase(matrix_order, change, i, j):
    """在系数增大时,通过避免使最小差减小,排除会使得动态范围增加的修改"""
    current_entry = matrix_order.matrix[i, j]
    new_entry = current_entry + change
    lower_index = np.searchsorted(matrix_order.unique, new_entry, side="right")
    lower_entry = matrix_order.unique[lower_index - 1]
    min_dis = matrix_order.min_distance

    lower_interval = P.open(lower_entry - min_dis, lower_entry + min_dis)
    if lower_index < matrix_order.unique.shape[0]:
        upper_entry = matrix_order.unique[lower_index]
        upper_interval = P.open(upper_entry - min_dis, upper_entry + min_dis)
        forbidden_interval = lower_interval | upper_interval
    else:
        forbidden_interval = lower_interval
    possible_interval = P.openclosed(-P.inf, new_entry)
    difference = possible_interval.difference(forbidden_interval)
    difference = difference | P.singleton(lower_entry)  # pylint: disable=E1131
    return difference.upper - current_entry


def _check_to_next_decrease(matrix_order, change, i, j):
    """在系数减小时,通过避免使最小差减小,排除会使得动态范围增加的修改"""
    current_entry = matrix_order.matrix[i, j]
    new_entry = current_entry + change
    upper_index = np.searchsorted(matrix_order.unique, new_entry, side="left")
    upper_entry = matrix_order.unique[upper_index]
    min_dis = matrix_order.min_distance
    upper_interval = P.open(upper_entry - min_dis, upper_entry + min_dis)
    if upper_index - 1 >= 0:
        lower_entry = matrix_order.unique[upper_index - 1]
        lower_interval = P.open(lower_entry - min_dis, lower_entry + min_dis)
        forbidden_interval = lower_interval | upper_interval
    else:
        forbidden_interval = upper_interval
    possible_interval = P.openclosed(new_entry, P.inf)
    difference = possible_interval.difference(forbidden_interval)
    difference = difference | P.singleton(upper_entry)  # pylint: disable=E1131
    return difference.lower - current_entry


def _compute_index_change(matrix_order, i, j, heuristic=None, set_to_zero=True):
    """计算matrix[i,j]能够变化的量
    set_to_zero: 是否在可以将元素设为0时优先设为0
    """
    # Decide whether to increase or decrease
    increase = heuristic.decide_increase(matrix_order, i, j)
    # Bounds on changes based on reducing the dynamic range
    dyn_range_change = heuristic.compute_change(matrix_order, i, j, increase)
    # Bounds on changes based on preserving the optimum
    if increase:
        _, pre_opt_change = _compute_pre_opt_bounds(matrix_order.matrix, i, j)
        change = min(pre_opt_change, dyn_range_change)
        if change < 0:
            change = 0
        elif 0 > matrix_order.matrix[i, j] > -change and set_to_zero:
            change = -matrix_order.matrix[i, j]
        else:
            change = _check_to_next_increase(matrix_order, change, i, j)
    else:
        pre_opt_change, _ = _compute_pre_opt_bounds(matrix_order.matrix, i, j)
        change = max(pre_opt_change, dyn_range_change)
        if change > 0:
            change = 0
        elif 0 < matrix_order.matrix[i, j] < -change and set_to_zero:
            change = -matrix_order.matrix[i, j]
        else:
            change = _check_to_next_decrease(matrix_order, change, i, j)
    return change


def _dynamic_range_change(i, j, change, matrix_order):
    """计算更新matrix[i,j]后动态范围的变化量"""
    old_dynamic_range = matrix_order.dynamic_range
    mat = matrix_order.matrix
    mat[i, j] += change
    new_dynamic_range = _get_dynamic_range_metric(mat)
    mat[i, j] -= change  # 恢复原值,感觉比copy好一点
    dynamic_range_diff = old_dynamic_range - new_dynamic_range
    return dynamic_range_diff


def _compute_change(matrix_order, heuristic=None, decision="heuristic"):
    """求矩阵可以调整元素及其可以调整的量"""
    if decision == "random":
        i, j = _get_random_index_pair(matrix_order.matrix)
        change = _compute_index_change(matrix_order, i, j, heuristic=heuristic)
    elif decision == "heuristic":
        order_indices = matrix_order.dynamic_range_impact()
        mat_size = matrix_order.matrix.shape[0]
        indices = [(index // mat_size, index % mat_size) for index in order_indices]
        changes = [
            _compute_index_change(matrix_order, x[0], x[1], heuristic=heuristic)
            for x in indices
        ]
        drs = [
            _dynamic_range_change(x[0], x[1], changes[index], matrix_order)
            for index, x in enumerate(indices)
        ]
        if np.any(drs):
            index = np.argmax(drs)
            i, j = indices[index]
            change = changes[index]
        else:
            i, j = _get_random_index_pair(matrix_order.matrix)
            change = _compute_index_change(matrix_order, i, j, heuristic=heuristic)
    else:
        raise NotImplementedError
    return i, j, change


[文档] @track_data() def perform_precision_adaption_mutate( ising_matrix, iterations=100, heuristic="greedy", decision="heuristic" ): """迭代减小Ising矩阵的动态范围,每次改变一个系数,保持最优解不变。思路参考 `Mücke et al. (2023) <http://arxiv.org/abs/2307.02195>`_. 此方法会改变矩阵系数数值,但不保证一定能降低精度。可做探索性尝试 Args: ising_matrix (np.ndarray): Ising矩阵 iterations (int, optional): 迭代次数,默认为100 heuristic (str, optional): 确定系数变化量的启发式方法,包括'greedy'和'order'。默认为'greedy' decision (str, optional): 决定下一个修改位置的方法。包括'random'和'heuristic'。'heuristic'会优先选择直接影响动态范围的变量。 默认为'heuristic'。 Returns: np.ndarray: 压缩参数的Ising矩阵 Examples: >>> import numpy as np >>> mat0 = np.array([[0., -10., 0., 20., 0.55], ... [-10., 0., 6120., 0.5, 60.], ... [0., 6120., 0., 0., -5120.], ... [20., 0.5, 0., 0., 1.025], ... [0.55, 60., -5120., 1.025, 0.]]) >>> import kaiwu as kw >>> kw.preprocess.perform_precision_adaption_mutate(mat0) # doctest: +SKIP array([[ 0. , -2.05, 0. , 40. , 0. ], [-2.05, 0. , 4.1 , 0. , 0. ], [ 0. , 4.1 , 0. , 0. , -2.05], [40. , 0. , 0. , 0. , 2.05], [ 0. , 0. , -2.05, 2.05, 0. ]]) """ try: heuristic = HEURISTICS[heuristic] except KeyError as exc: raise ValueError(f'Unknown heuristic "{heuristic}"') from exc if (np.diag(ising_matrix) != 0).any(): raise KaiwuError("The diagonal of the passed Ising matrix must be zero.") ising_matrix = np.triu(ising_matrix) + np.tril(ising_matrix).T mat_copy = ising_matrix.copy() matrix_order = _MatrixOrder(mat_copy) stop_update = False matrix_order.matrix = np.round(matrix_order.matrix, decimals=8) for _ in range(iterations): if stop_update: break i, j, change = _compute_change( matrix_order, heuristic=heuristic, decision=decision ) stop_update = matrix_order.update_entry(i, j, change) reduced_matrix = matrix_order.matrix + matrix_order.matrix.T return reduced_matrix