kaiwu.common._argpartition_unique_solution_pool 源代码
"""
求解器的解集
"""
import numpy as np
from kaiwu.common._util import hamiltonian
[文档]
class ArgpartitionUniquePool:
"""解集。使用argpartition的线性期望复杂度k小值来进行维护"""
def __init__(self, mat, size, size_limit):
"""
初始化
输入:
mat: ising矩阵
size: 矩阵维度
size_limit: 保留解的个数
"""
self.mat = mat
self.size = size
self.size_limit = size_limit
self.threshold = float("inf") # 当前解集中最差的解的哈密顿量,用于过滤输入的解
self.candidate_solutions = np.array([]).reshape(0, size) # 缓存
self.opt = np.array([]).reshape(0, size) # 解集
self.hamilton = []
[文档]
def extend(self, solutions, final=False):
"""插入多个解"""
if solutions.size == 0:
return
self.candidate_solutions = np.concatenate(
(self.candidate_solutions, solutions), axis=0
)
if self.candidate_solutions.shape[0] > 0 and (
final
or len(self.candidate_solutions) + len(self.opt) >= 2 * self.size_limit
):
ha_candidate = hamiltonian(self.mat, self.candidate_solutions)
filter_index = ha_candidate < self.threshold
ha_candidate = ha_candidate[filter_index]
self.candidate_solutions = self.candidate_solutions[filter_index]
self.hamilton = np.concatenate((self.hamilton, ha_candidate), axis=0)
self.candidate_solutions = np.concatenate(
(self.opt, self.candidate_solutions), axis=0
)
hamilton, unique_solution = zip(
*set(zip(self.hamilton, map(tuple, self.candidate_solutions)))
)
unique_solution = np.array(unique_solution)
self.hamilton = np.array(hamilton)
num = min(self.size_limit, len(unique_solution))
indices = np.argpartition(self.hamilton, num - 1)[:num]
self.opt = unique_solution[indices]
self.hamilton = self.hamilton[indices]
self.candidate_solutions = np.array([]).reshape(0, self.size)
self.threshold = (
max(self.hamilton) if len(self.opt) == self.size_limit else float("inf")
)
[文档]
def get_solutions(self):
"""获取解集"""
self.extend(
np.array([]).reshape(0, self.size), final=True
) # 把缓存的解加入计算
return self.opt
[文档]
def clear(self):
"""清空解集"""
self.threshold = float("inf")
self.candidate_solutions = np.array([]).reshape(0, self.size)
self.opt = np.array([]).reshape(0, self.size)
self.hamilton = []
[文档]
def to_json_dict(self):
"""转化为json字典
Returns:
dict: json字典
"""
json_dict = {}
self_dict = self.__dict__
for attr_name, attr_value in self_dict.items():
if isinstance(attr_value, np.ndarray):
json_dict[attr_name] = attr_value.tolist()
else:
json_dict[attr_name] = attr_value
return json_dict
[文档]
@classmethod
def from_json_dict(cls, json_dict):
"""从json文件读取的dict构造HeapUniquePool对象
Args:
json_dict (dict): json字典
Returns:
ArgpartitionUniquePool: 对象实例
"""
param_dict = json_dict.copy()
param_dict["mat"] = np.array(json_dict["mat"])
param_dict["candidate_solutions"] = np.array(json_dict["candidate_solutions"])
param_dict["opt"] = np.array(json_dict["opt"])
obj = cls(None, 0, 0)
for attr_name, attr_value in param_dict.items():
setattr(obj, attr_name, attr_value)
return obj