kaiwu.hobo._hobo_model 源代码

"""
模块: HOBO Model

功能: 方便用户定义高阶二值模型,并转化成二次二值模型
"""

import logging
import numbers
from collections import defaultdict
from kaiwu.core._binary_model import BinaryModel
from kaiwu.core._binary_expression import BinaryExpression, quicksum

from kaiwu.core._constraint import Constraint
from kaiwu.core._penalty_method_constraint import PenaltyMethodConstraint
from kaiwu.core import QuboModel

logger = logging.getLogger(__name__)


def _get_expr_tuple_sorted(binary_expression):
    expr_tuple_sorted = BinaryExpression({}, binary_expression.offset)
    for var_tuple, value in binary_expression.coefficient.items():
        new_var_tuple = tuple(sorted(list(set(var_tuple))))
        if new_var_tuple in expr_tuple_sorted.coefficient:
            expr_tuple_sorted.coefficient[new_var_tuple] += value
        else:
            expr_tuple_sorted.coefficient[new_var_tuple] = value
        if (
            isinstance(expr_tuple_sorted.coefficient[new_var_tuple], numbers.Number)
            and expr_tuple_sorted.coefficient[new_var_tuple] == 0
        ):
            expr_tuple_sorted.coefficient.pop(new_var_tuple)
    return expr_tuple_sorted


[文档] class HoboModel(BinaryModel): """支持添加约束的HOBO模型类 Args: objective(Expression): 目标函数 hobo_default_penalty(float): HOBO约束的默认惩罚系数, 默认值为1 """ def __init__(self, objective=None, hobo_default_penalty=1): super().__init__(objective) self._pair_frequencies = defaultdict(int) self._hobo_var_dict = {} self._hobo_constraints = {} self._reduction_map = {} # 存储 (var_x, var_y) -> aux_var 的映射 self.hobo_default_penalty = hobo_default_penalty def __str__(self): print_data = super().__str__() if len(self._hobo_constraints) > 0: print_data += "\nSubject to (hobo constraints):\n" print_data += "\n".join( f"{aux_var} := {var_x} * {var_y}" for (var_x, var_y), aux_var in self._reduction_map.items() ) return print_data
[文档] def verify_hobo_constraint(self, solution_dict): """验证 HOBO 降阶约束是否满足。 Args: solution_dict (dict): HOBO 降阶后的解字典。 Returns: tuple: 约束满足情况信息 - int: 未满足的 HOBO 约束数量 - dict: 包含每个 HOBO 约束取值的字典 """ unsatisfied_count = 0 result_dict = {} for (var_x, var_y), aux_var in self._reduction_map.items(): constraint_value = solution_dict.get(var_x, 0.0) * solution_dict.get( var_y, 0.0 ) - solution_dict.get(aux_var, 0.0) if abs(float(constraint_value)) >= 1e-5: unsatisfied_count += 1 result_dict[aux_var] = constraint_value return unsatisfied_count, result_dict
def _precompute_pair_frequencies(self, binary_expression): """确定最频繁出现的变量对""" expr_tuple_sorted = _get_expr_tuple_sorted(binary_expression) for key, _ in expr_tuple_sorted.coefficient.items(): len_key = len(key) for i in range(len_key): for j in range(i + 1, len_key): pair = key[i], key[j] self._pair_frequencies[pair] += 1 def _prepare_pair_frequencies(self): constraint_list = self.get_constraints_expr_list() qubo_expr_made = self.objective + quicksum(constraint_list) self._precompute_pair_frequencies(qubo_expr_made) return qubo_expr_made
[文档] def reduce(self, predefined_pairs=None): """对Hobo Model高阶表达式进行降阶处理(降至二阶) Args: predefined_pairs (list): 预定义要合并的变量对列表,格式为[(var1, var2), ...] Returns: QuboModel: 降阶后的QuboModel。注意不要在该模型上修改约束,可能会导致错误 """ # 处理预定义变量对 self.compile_constraints() self._reduction_map = { pair: self._create_auxiliary_variable(pair) for pair in (set(predefined_pairs) if predefined_pairs else set()) } qubo_expr_made = self._prepare_pair_frequencies() qubo_expr_made = self._reduce_expression(qubo_expr_made) qubo_model = QuboModel(self._reduce_expression(self.objective)) qubo_expr_made = quicksum( [qubo_expr_made] + [ constraint_info.default_penalty * constraint_info.left_operand for constraint_info in self._hobo_constraints.values() ] ) # fill constraint dict for QUBO model qubo_model.soft_constraints = self.soft_constraints.copy() qubo_model.hard_constraints = self.hard_constraints.copy() qubo_model.hard_constraints.update(self._hobo_constraints) for name, constraint in self.hard_constraints_made.items(): qubo_model.hard_constraints_made[name] = PenaltyMethodConstraint( self._reduce_expression(constraint.constraint_expr), constraint.penalty, qubo_model, ) for name, constraint in self._hobo_constraints.items(): qubo_model.hard_constraints_made[name] = PenaltyMethodConstraint( constraint.left_operand, constraint.default_penalty, qubo_model, ) for name, constraint in self.soft_constraints_made.items(): qubo_model.soft_constraints_made[name] = PenaltyMethodConstraint( self._reduce_expression(constraint.constraint_expr), constraint.penalty, qubo_model, ) qubo_model.qubo_expr_made = qubo_expr_made qubo_model.variables = qubo_model.qubo_expr_made.get_variables() # qubo_model.variables = dict(zip(variables_set, range(len(variables_set)))) qubo_model.compiled = qubo_model.made = True return qubo_model
def _reduce_expression(self, current_expr): if current_expr is None: return None expr_tuple_sorted = _get_expr_tuple_sorted(current_expr) # 创建降阶映射表和新表达式容器 quadratic_expr = BinaryExpression({}, expr_tuple_sorted.offset) # 遍历表达式中的每个项 for current_term, coefficient in expr_tuple_sorted.coefficient.items(): # 持续降阶直到变为二阶项 while len(current_term) > 2: # 寻找当前项中最优的合并变量对 current_term = self._reduce_one_pair(current_term) # 将降阶后的项加入新表达式 quadratic_expr.coefficient[current_term] = ( quadratic_expr.coefficient.get(current_term, 0) + coefficient ) return quadratic_expr def _reduce_one_pair(self, current_term): best_pair = self._select_optimal_pair(current_term) # 获取或创建辅助变量 if best_pair in self._reduction_map: aux_var = self._reduction_map[best_pair] else: aux_var = self._create_auxiliary_variable(best_pair) self._reduction_map[best_pair] = aux_var # 更新当前项(用辅助变量替换选中的变量对) var_x, var_y = best_pair new_term = [v for v in current_term if v not in (var_x, var_y)] new_term.append(aux_var) return tuple(sorted(new_term)) # 保持排序以确保一致性 def _select_optimal_pair(self, term_vars): """在项中选择最优变量对进行合并""" best_pair = None max_frequency = -1 for i, var_x in enumerate(term_vars): for j, var_y in enumerate(term_vars): if i >= j: # 避免重复检查相同变量对 continue # 优先选择已存在降阶映射的对 if (var_x, var_y) in self._reduction_map: return var_x, var_y # 最后选择频率最高的对 pair_freq = self._pair_frequencies.get((var_x, var_y), 0) if pair_freq > max_frequency: max_frequency = pair_freq best_pair = (var_x, var_y) return best_pair def _create_auxiliary_variable(self, pair): """创建辅助变量并记录约束""" var_x, var_y = pair return self._gen_auxiliary_with_set(var_x, var_y) def _gen_auxiliary_with_set(self, var_x0, var_x1): r"""生成辅助变量和相应的约束 Args: var_x0, var_x1: 被替换的变量 Returns: y_tag: 降阶生成变量, var_x0 * var_x1 --> y_tag 为var_x0, var_x1生成替换变量y_tag, 并更新约束多项式和变量字典 惩罚项: :math:`P = x_0*x_1 - 2*x_0*y - 2*x_1*y + 3*y` 满足 :math:`x_0 * x_1 = y => P = 0, x_0 * x_1 \neq y => P \neq 0` """ # 进行变量合并, 例如, _x1_x2* x1, 直接返回_x1_x2即可,因为变量是binary类型,满足$ x^n = x $ y_tag = self._get_merged_tag(var_x0, var_x1) if y_tag in [var_x0, var_x1]: return y_tag hobo_coe = { (var_x0, var_x1): 1, tuple(sorted((var_x0, y_tag))): -2, tuple(sorted((var_x1, y_tag))): -2, (y_tag,): 3, } self._hobo_constraints[y_tag] = Constraint( BinaryExpression(hobo_coe, 0), penalty=self.hobo_default_penalty ) return y_tag def _get_merged_tag(self, var_x0, var_x1): """根据x0,x1变量获取合并的变量y""" if var_x0 in self._hobo_var_dict: var_x0_set = self._hobo_var_dict[var_x0] else: var_x0_set = {var_x0} if var_x1 in self._hobo_var_dict: var_x1_set = self._hobo_var_dict[var_x1] else: var_x1_set = {var_x1} if var_x1_set.issubset(var_x0_set): return var_x0 if var_x0_set.issubset(var_x1_set): return var_x1 merged_set = var_x1_set | var_x0_set y_tag = "_" + "_".join(sorted(list(merged_set))) self._hobo_var_dict[y_tag] = merged_set return y_tag
if __name__ == "__main__": import doctest doctest.testmod()