Source code for epsearch._cycle

from collections.abc import Sequence
from typing import Generic, TypeVar

import attrs
import networkx as nx
import numpy as np
from numpy.ma import MaskedArray

TNumber = TypeVar("TNumber", bound=np.number)


[docs] @attrs.frozen(kw_only=True) class Cycles(Generic[TNumber]): """Cycles of the eigenvalues.""" cycles: list[np.ndarray[tuple[int, int], np.dtype[TNumber]]] """Cycles of the eigenvalues of length num_cycles. Ordered by length in descending order of shape (cycle_length, num_points).""" graph: nx.DiGraph """Graph of the cycles. If eigvals[i, -1] ~ eigvals[j, 0], then there is an edge from i to j.""" eigvals: np.ndarray[tuple[int, int], np.dtype[TNumber]] """Continuous eigenvalues of shape (num_complete_eigenvalues, num_points).""" incomplete_eigvals: MaskedArray[tuple[int, int], np.dtype[TNumber]] """Continuous eigenvalues of shape (num_eigenvalues, num_points) with some NaN values.""" @property def connected_cycles(self) -> list[np.ndarray[tuple[int], np.dtype[TNumber]]]: """ List of connected sequences of eigenvalues of length num_cycles. Prdered by length in descending order same as self.cycles of shape (cycle_length, num_points). """ return [ np.stack( [np.concatenate(np.roll(cycle, -i, axis=0)) for i in range(cycle.shape[0])], axis=0, ) for cycle in self.cycles ] @property def cycle_lengths(self) -> list[int]: """Lengths of the cycles in descending order, same as self.cycles.""" return [cycle.shape[0] for cycle in self.cycles] @property def max_cycle_length(self) -> int: """Maximum length of the cycles.""" if len(self.cycle_lengths) == 0: return 0 return np.max(self.cycle_lengths)
def argmatch_from_closest( x: np.ndarray[tuple[int], np.dtype[TNumber]], y: np.ndarray[tuple[int], np.dtype[TNumber]], /, ) -> MaskedArray[tuple[int], np.dtype[TNumber]]: """ Match the elements of x and y from the closest without duplicates. Parameters ---------- x : np.ndarray[tuple[int], np.dtype[TNumber]] First array of shape (n,). y : np.ndarray[tuple[int], np.dtype[TNumber]] Second array of shape (m,). Returns ------- MaskedArray[tuple[int], np.dtype[TNumber]] Array of shape (n,). (x[i], y[result[i]]) is the pair. """ if x.ndim != 1: raise ValueError(f"{x.ndim=} should be 1") if y.ndim != 1: raise ValueError(f"{y.ndim=} should be 1") dist = np.ma.masked_array(np.abs(x[:, None] - y[None, :]), mask=False) result = np.full_like(x, -1, dtype=int) for _ in range(min(len(x), len(y))): idx = np.unravel_index(dist.argmin(), dist.shape) result[idx[0]] = idx[1] dist.mask[idx[0], :] = np.inf dist.mask[:, idx[1]] = np.inf result = np.ma.masked_array(result, mask=result == -1) return result def argmatch_from_closest_masked( x: MaskedArray[tuple[int], np.dtype[TNumber]], y: np.ndarray[tuple[int], np.dtype[TNumber]], ) -> MaskedArray[tuple[int], np.dtype[TNumber]]: """ Match the elements of x and y from the closest without duplicates. Parameters ---------- x : MaskedArray[tuple[int], np.dtype[TNumber]] The first array of shape (n,). y : np.ndarray[tuple[int], np.dtype[TNumber]], The second array. must be smaller than x. Returns ------- MaskedArray[tuple[int], np.dtype[TNumber]] (x[i], y[result[i]]) is the pair. If x.mask.sum() < len(y), the indices of the remaining y elements are added to the extra indices. """ if x.ndim != 1: raise ValueError(f"{x.ndim=} should be 1") if y.ndim != 1: raise ValueError(f"{y.ndim=} should be 1") if x.shape[0] < y.shape[0]: raise ValueError(f"{x.shape[0]=} < {y.shape[0]=}") arg = np.ma.masked_array(np.full_like(x, -1, dtype=int), mask=True) arg[~x.mask] = argmatch_from_closest(x[~x.mask], y) if y.shape[0] > x.mask.sum(): arg_new = np.asarray(list(set(np.arange(len(y))) - set(arg[~arg.mask]))) arg[x.mask.nonzero()[0][: len(arg_new)]] = np.asarray(arg_new) return arg
[docs] def get_cycles( eigvals: Sequence[Sequence[TNumber]] | np.ndarray[tuple[int, int], np.dtype[TNumber]], /, *, decompose_threshold_diff_factor: float = 2, decompose_threshold_rate: float = 0.5, ) -> Cycles[TNumber]: """ Get cycles from the eigenvalues for each point. Parameters ---------- eigvals : Sequence[Sequence[TNumber]] | np.ndarray[tuple[int, int], np.dtype[TNumber]] A (ordered) sequence which contains the eigenvalues for each point. decompose_threshold_diff_factor : float, optional If the rate of elements which are closer than max(mean difference) * decompose_threshold_diff_factor is higher than decompose_threshold_rate, the cycle is decomposed, by default 0.5. decompose_threshold_rate : float, optional If the rate of elements which are closer than max(mean difference) * decompose_threshold_diff_factor is higher than decompose_threshold_rate, the cycle is decomposed, by default 0.5. Returns ------- bool Whether there is a branching inside the boundary. """ # cycle the eigenvalues so that # forall i. len(eigenvalues[0]) <= len(eigenvalues[i]) # eigv is a list which contains the eigenvalues for each point eig_list: list[np.ndarray[tuple[int], np.dtype[TNumber]]] = [ np.asarray(eigs) for eigs in eigvals ] del eigvals eig_max_count = np.max([len(eigs) for eigs in eig_list]) # reorder eigenvalues so that the distances between # eigenvalues_sorted[i] and eigenvalues_sorted[i+1] are minimized # [B, eigvals] eigvals_c_incomp = np.ma.masked_array( np.full( (len(eig_list), eig_max_count), np.nan, dtype=np.common_type(*eig_list), device=eig_list[0].device, ), mask=True, ) for i, eigs in enumerate(eig_list): if i == 0: eigvals_c_incomp[i, : len(eigs)] = eigs else: arg = argmatch_from_closest_masked(eigvals_c_incomp[i - 1, :], eigs) eigvals_c_incomp[i, ~arg.mask] = eigs[arg[~arg.mask]] # for each continuous eigenvalues sequences, find a sequence # which start is the closest to the end eigvals_c = eigvals_c_incomp[:, ~eigvals_c_incomp.mask.any(axis=0)] arg = argmatch_from_closest(eigvals_c[-1, :], eigvals_c[0, :]) G = nx.DiGraph(list(enumerate(arg[~arg.mask]))) cycles = list(nx.simple_cycles(G)) # decompose cycles which are too close for cycle in cycles: mean_diff = np.max(np.mean(np.abs(np.diff(eigvals_c[:, cycle], axis=0)), axis=0)) if np.all( np.mean( np.abs(eigvals_c[:, cycle[0]][:, None] - eigvals_c[:, cycle]) < mean_diff * decompose_threshold_diff_factor, axis=0, ) > decompose_threshold_rate ): G.remove_edges_from( [(cycle[i], cycle[(i + 1) % len(cycle)]) for i in range(len(cycle))] ) G.add_edges_from([(cycle[i], cycle[i]) for i in range(len(cycle))]) cycles = list(nx.simple_cycles(G)) # order by length cycles = sorted(cycles, key=len, reverse=True) return Cycles( cycles=[eigvals_c[:, cycle].swapaxes(0, 1) for cycle in cycles], graph=G, eigvals=eigvals_c.T, incomplete_eigvals=eigvals_c_incomp.T, )