# -*- coding: utf-8 -*-
"""
模块: qubo
功能: QUBO模型系数动态范围,满足真机位宽限制
"""
import numpy as np
import portion as P
from kaiwu.core import KaiwuError
from kaiwu.preprocess._range_heuristic import Greedy, MaintainOrder, _MatrixOrder
from kaiwu.preprocess._bound import _compute_pre_opt_bounds
from kaiwu.preprocess._utils import _get_dynamic_range_metric
from kaiwu.license import track_data
HEURISTICS = {"greedy": Greedy, "order": MaintainOrder}
def _get_random_index_pair(matrix):
"""获取非零值的索引"""
row_indices, column_indices = np.where(np.invert(matrix == 0))
if row_indices.shape[0] == 0:
return 0, 1
random_index = np.random.randint(0, row_indices.shape[0])
i, j = row_indices[random_index], column_indices[random_index]
return i, j
def _check_to_next_increase(matrix_order, change, i, j):
"""在系数增大时,通过避免使最小差减小,排除会使得动态范围增加的修改"""
current_entry = matrix_order.matrix[i, j]
new_entry = current_entry + change
lower_index = np.searchsorted(matrix_order.unique, new_entry, side="right")
lower_entry = matrix_order.unique[lower_index - 1]
min_dis = matrix_order.min_distance
lower_interval = P.open(lower_entry - min_dis, lower_entry + min_dis)
if lower_index < matrix_order.unique.shape[0]:
upper_entry = matrix_order.unique[lower_index]
upper_interval = P.open(upper_entry - min_dis, upper_entry + min_dis)
forbidden_interval = lower_interval | upper_interval
else:
forbidden_interval = lower_interval
possible_interval = P.openclosed(-P.inf, new_entry)
difference = possible_interval.difference(forbidden_interval)
difference = difference | P.singleton(lower_entry) # pylint: disable=E1131
return difference.upper - current_entry
def _check_to_next_decrease(matrix_order, change, i, j):
"""在系数减小时,通过避免使最小差减小,排除会使得动态范围增加的修改"""
current_entry = matrix_order.matrix[i, j]
new_entry = current_entry + change
upper_index = np.searchsorted(matrix_order.unique, new_entry, side="left")
upper_entry = matrix_order.unique[upper_index]
min_dis = matrix_order.min_distance
upper_interval = P.open(upper_entry - min_dis, upper_entry + min_dis)
if upper_index - 1 >= 0:
lower_entry = matrix_order.unique[upper_index - 1]
lower_interval = P.open(lower_entry - min_dis, lower_entry + min_dis)
forbidden_interval = lower_interval | upper_interval
else:
forbidden_interval = upper_interval
possible_interval = P.openclosed(new_entry, P.inf)
difference = possible_interval.difference(forbidden_interval)
difference = difference | P.singleton(upper_entry) # pylint: disable=E1131
return difference.lower - current_entry
def _compute_index_change(matrix_order, i, j, heuristic=None, set_to_zero=True):
"""计算matrix[i,j]能够变化的量
set_to_zero: 是否在可以将元素设为0时优先设为0
"""
# Decide whether to increase or decrease
increase = heuristic.decide_increase(matrix_order, i, j)
# Bounds on changes based on reducing the dynamic range
dyn_range_change = heuristic.compute_change(matrix_order, i, j, increase)
# Bounds on changes based on preserving the optimum
if increase:
_, pre_opt_change = _compute_pre_opt_bounds(matrix_order.matrix, i, j)
change = min(pre_opt_change, dyn_range_change)
if change < 0:
change = 0
elif 0 > matrix_order.matrix[i, j] > -change and set_to_zero:
change = -matrix_order.matrix[i, j]
else:
change = _check_to_next_increase(matrix_order, change, i, j)
else:
pre_opt_change, _ = _compute_pre_opt_bounds(matrix_order.matrix, i, j)
change = max(pre_opt_change, dyn_range_change)
if change > 0:
change = 0
elif 0 < matrix_order.matrix[i, j] < -change and set_to_zero:
change = -matrix_order.matrix[i, j]
else:
change = _check_to_next_decrease(matrix_order, change, i, j)
return change
def _dynamic_range_change(i, j, change, matrix_order):
"""计算更新matrix[i,j]后动态范围的变化量"""
old_dynamic_range = matrix_order.dynamic_range
mat = matrix_order.matrix
mat[i, j] += change
new_dynamic_range = _get_dynamic_range_metric(mat)
mat[i, j] -= change # 恢复原值,感觉比copy好一点
dynamic_range_diff = old_dynamic_range - new_dynamic_range
return dynamic_range_diff
def _compute_change(matrix_order, heuristic=None, decision="heuristic"):
"""求矩阵可以调整元素及其可以调整的量"""
if decision == "random":
i, j = _get_random_index_pair(matrix_order.matrix)
change = _compute_index_change(matrix_order, i, j, heuristic=heuristic)
elif decision == "heuristic":
order_indices = matrix_order.dynamic_range_impact()
mat_size = matrix_order.matrix.shape[0]
indices = [(index // mat_size, index % mat_size) for index in order_indices]
changes = [
_compute_index_change(matrix_order, x[0], x[1], heuristic=heuristic)
for x in indices
]
drs = [
_dynamic_range_change(x[0], x[1], changes[index], matrix_order)
for index, x in enumerate(indices)
]
if np.any(drs):
index = np.argmax(drs)
i, j = indices[index]
change = changes[index]
else:
i, j = _get_random_index_pair(matrix_order.matrix)
change = _compute_index_change(matrix_order, i, j, heuristic=heuristic)
else:
raise NotImplementedError
return i, j, change