"""
模块: HOBO Model
功能: 方便用户定义高阶二值模型,并转化成二次二值模型
"""
import logging
import numbers
from collections import defaultdict
from kaiwu.core._binary_model import BinaryModel
from kaiwu.core._binary_expression import BinaryExpression, quicksum
from kaiwu.core._constraint import Constraint
from kaiwu.core._penalty_method_constraint import PenaltyMethodConstraint
from kaiwu.core import QuboModel
logger = logging.getLogger(__name__)
def _get_expr_tuple_sorted(binary_expression):
expr_tuple_sorted = BinaryExpression({}, binary_expression.offset)
for var_tuple, value in binary_expression.coefficient.items():
new_var_tuple = tuple(sorted(list(set(var_tuple))))
if new_var_tuple in expr_tuple_sorted.coefficient:
expr_tuple_sorted.coefficient[new_var_tuple] += value
else:
expr_tuple_sorted.coefficient[new_var_tuple] = value
if (
isinstance(expr_tuple_sorted.coefficient[new_var_tuple], numbers.Number)
and expr_tuple_sorted.coefficient[new_var_tuple] == 0
):
expr_tuple_sorted.coefficient.pop(new_var_tuple)
return expr_tuple_sorted
[文档]
class HoboModel(BinaryModel):
"""支持添加约束的HOBO模型类
Args:
objective(Expression): 目标函数
hobo_default_penalty(float): HOBO约束的默认惩罚系数, 默认值为1
"""
def __init__(self, objective=None, hobo_default_penalty=1):
super().__init__(objective)
self._pair_frequencies = defaultdict(int)
self._hobo_var_dict = {}
self._hobo_constraints = {}
self._reduction_map = {} # 存储 (var_x, var_y) -> aux_var 的映射
self.hobo_default_penalty = hobo_default_penalty
def __str__(self):
print_data = super().__str__()
if len(self._hobo_constraints) > 0:
print_data += "\nSubject to (hobo constraints):\n"
print_data += "\n".join(
f"{aux_var} := {var_x} * {var_y}"
for (var_x, var_y), aux_var in self._reduction_map.items()
)
return print_data
[文档]
def verify_hobo_constraint(self, solution_dict):
"""验证 HOBO 降阶约束是否满足。
Args:
solution_dict (dict): HOBO 降阶后的解字典。
Returns:
tuple: 约束满足情况信息
- int: 未满足的 HOBO 约束数量
- dict: 包含每个 HOBO 约束取值的字典
"""
unsatisfied_count = 0
result_dict = {}
for (var_x, var_y), aux_var in self._reduction_map.items():
constraint_value = solution_dict.get(var_x, 0.0) * solution_dict.get(
var_y, 0.0
) - solution_dict.get(aux_var, 0.0)
if abs(float(constraint_value)) >= 1e-5:
unsatisfied_count += 1
result_dict[aux_var] = constraint_value
return unsatisfied_count, result_dict
def _precompute_pair_frequencies(self, binary_expression):
"""确定最频繁出现的变量对"""
expr_tuple_sorted = _get_expr_tuple_sorted(binary_expression)
for key, _ in expr_tuple_sorted.coefficient.items():
len_key = len(key)
for i in range(len_key):
for j in range(i + 1, len_key):
pair = key[i], key[j]
self._pair_frequencies[pair] += 1
def _prepare_pair_frequencies(self):
constraint_list = self.get_constraints_expr_list()
qubo_expr_made = self.objective + quicksum(constraint_list)
self._precompute_pair_frequencies(qubo_expr_made)
return qubo_expr_made
[文档]
def reduce(self, predefined_pairs=None):
"""对Hobo Model高阶表达式进行降阶处理(降至二阶)
Args:
predefined_pairs (list): 预定义要合并的变量对列表,格式为[(var1, var2), ...]
Returns:
QuboModel: 降阶后的QuboModel。注意不要在该模型上修改约束,可能会导致错误
"""
# 处理预定义变量对
self.compile_constraints()
self._reduction_map = {
pair: self._create_auxiliary_variable(pair)
for pair in (set(predefined_pairs) if predefined_pairs else set())
}
qubo_expr_made = self._prepare_pair_frequencies()
qubo_expr_made = self._reduce_expression(qubo_expr_made)
qubo_model = QuboModel(self._reduce_expression(self.objective))
qubo_expr_made = quicksum(
[qubo_expr_made]
+ [
constraint_info.default_penalty * constraint_info.left_operand
for constraint_info in self._hobo_constraints.values()
]
)
# fill constraint dict for QUBO model
qubo_model.soft_constraints = self.soft_constraints.copy()
qubo_model.hard_constraints = self.hard_constraints.copy()
qubo_model.hard_constraints.update(self._hobo_constraints)
for name, constraint in self.hard_constraints_made.items():
qubo_model.hard_constraints_made[name] = PenaltyMethodConstraint(
self._reduce_expression(constraint.constraint_expr),
constraint.penalty,
qubo_model,
)
for name, constraint in self._hobo_constraints.items():
qubo_model.hard_constraints_made[name] = PenaltyMethodConstraint(
constraint.left_operand,
constraint.default_penalty,
qubo_model,
)
for name, constraint in self.soft_constraints_made.items():
qubo_model.soft_constraints_made[name] = PenaltyMethodConstraint(
self._reduce_expression(constraint.constraint_expr),
constraint.penalty,
qubo_model,
)
qubo_model.qubo_expr_made = qubo_expr_made
qubo_model.variables = qubo_model.qubo_expr_made.get_variables()
# qubo_model.variables = dict(zip(variables_set, range(len(variables_set))))
qubo_model.compiled = qubo_model.made = True
return qubo_model
def _reduce_expression(self, current_expr):
if current_expr is None:
return None
expr_tuple_sorted = _get_expr_tuple_sorted(current_expr)
# 创建降阶映射表和新表达式容器
quadratic_expr = BinaryExpression({}, expr_tuple_sorted.offset)
# 遍历表达式中的每个项
for current_term, coefficient in expr_tuple_sorted.coefficient.items():
# 持续降阶直到变为二阶项
while len(current_term) > 2:
# 寻找当前项中最优的合并变量对
current_term = self._reduce_one_pair(current_term)
# 将降阶后的项加入新表达式
quadratic_expr.coefficient[current_term] = (
quadratic_expr.coefficient.get(current_term, 0) + coefficient
)
return quadratic_expr
def _reduce_one_pair(self, current_term):
best_pair = self._select_optimal_pair(current_term)
# 获取或创建辅助变量
if best_pair in self._reduction_map:
aux_var = self._reduction_map[best_pair]
else:
aux_var = self._create_auxiliary_variable(best_pair)
self._reduction_map[best_pair] = aux_var
# 更新当前项(用辅助变量替换选中的变量对)
var_x, var_y = best_pair
new_term = [v for v in current_term if v not in (var_x, var_y)]
new_term.append(aux_var)
return tuple(sorted(new_term)) # 保持排序以确保一致性
def _select_optimal_pair(self, term_vars):
"""在项中选择最优变量对进行合并"""
best_pair = None
max_frequency = -1
for i, var_x in enumerate(term_vars):
for j, var_y in enumerate(term_vars):
if i >= j: # 避免重复检查相同变量对
continue
# 优先选择已存在降阶映射的对
if (var_x, var_y) in self._reduction_map:
return var_x, var_y
# 最后选择频率最高的对
pair_freq = self._pair_frequencies.get((var_x, var_y), 0)
if pair_freq > max_frequency:
max_frequency = pair_freq
best_pair = (var_x, var_y)
return best_pair
def _create_auxiliary_variable(self, pair):
"""创建辅助变量并记录约束"""
var_x, var_y = pair
return self._gen_auxiliary_with_set(var_x, var_y)
def _gen_auxiliary_with_set(self, var_x0, var_x1):
r"""生成辅助变量和相应的约束
Args:
var_x0, var_x1: 被替换的变量
Returns:
y_tag: 降阶生成变量, var_x0 * var_x1 --> y_tag
为var_x0, var_x1生成替换变量y_tag, 并更新约束多项式和变量字典
惩罚项: :math:`P = x_0*x_1 - 2*x_0*y - 2*x_1*y + 3*y`
满足 :math:`x_0 * x_1 = y => P = 0, x_0 * x_1 \neq y => P \neq 0`
"""
# 进行变量合并, 例如, _x1_x2* x1, 直接返回_x1_x2即可,因为变量是binary类型,满足$ x^n = x $
y_tag = self._get_merged_tag(var_x0, var_x1)
if y_tag in [var_x0, var_x1]:
return y_tag
hobo_coe = {
(var_x0, var_x1): 1,
tuple(sorted((var_x0, y_tag))): -2,
tuple(sorted((var_x1, y_tag))): -2,
(y_tag,): 3,
}
self._hobo_constraints[y_tag] = Constraint(
BinaryExpression(hobo_coe, 0), penalty=self.hobo_default_penalty
)
return y_tag
def _get_merged_tag(self, var_x0, var_x1):
"""根据x0,x1变量获取合并的变量y"""
if var_x0 in self._hobo_var_dict:
var_x0_set = self._hobo_var_dict[var_x0]
else:
var_x0_set = {var_x0}
if var_x1 in self._hobo_var_dict:
var_x1_set = self._hobo_var_dict[var_x1]
else:
var_x1_set = {var_x1}
if var_x1_set.issubset(var_x0_set):
return var_x0
if var_x0_set.issubset(var_x1_set):
return var_x1
merged_set = var_x1_set | var_x0_set
y_tag = "_" + "_".join(sorted(list(merged_set)))
self._hobo_var_dict[y_tag] = merged_set
return y_tag
if __name__ == "__main__":
import doctest
doctest.testmod()