Source code for cosmocore.basis.pixel

"""
Pixel-projected compression (Gjerløw-like) for CMB Fisher matrix computation.

This module implements pixel-space compression with a harmonic projector P_h,
followed by eigenvalue decomposition to find the optimal subspace for compression.

Available compression bases (from Gjerløw et al. 2019):
- "harmonic": Pure harmonic projector P_h = V^T V
- "noise_weighted": P_h N^{-1} P_h (default) - inverse noise weighting
- "total_covariance": P_h C^{-1} P_h where C = N + S - full covariance weighting
- "snr": S^{1/2} N^{-1} S^{1/2} - signal-to-noise ratio matrix
"""

from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np

from ..basics import (
    cholesky_solve,
    eigh,
    eigvalsh,
    matrix_inverse_symm,
    matrix_mult,
    matrix_trace,
    svd,
    symmetrize_inplace,
)
from .base import BasisPrepared, ComputationBasis

if TYPE_CHECKING:
    from matplotlib.figure import Figure

# Available compression basis presets
COMPRESSION_BASES = {
    "harmonic": "P_h = V^T V (pure harmonic projector)",
    "noise_weighted": "P_h N^{-1} P_h (inverse noise weighting)",
    "total_covariance": "P_h C^{-1} P_h (full covariance weighting, requires C_ell)",
    "snr": "S^{1/2} N^{-1} S^{1/2} (signal-to-noise ratio, requires C_ell)",
}


[docs] class PixelBasis(ComputationBasis): """ Pixel-space compression with projector (Gjerløw-like). Stays in n_pix space with projector P_h, then uses eigenvalue decomposition to find optimal subspace. Compression: n_pix → dim. The key operations are: - Projector: P_h = V^T V (harmonic projector, n_pix × n_pix but rank n_modes) - Eigendecomposition of compression matrix to find optimal modes - Data compression: d_c = P @ d where P = U^T (dim × n_pix) This approach is more flexible than HarmonicBasis because it allows custom projectors to handle systematics (foreground deprojection, etc.). Available compression bases (from Gjerløw et al. 2019): - **harmonic**: Pure harmonic projector P_h = V^T V. Selects modes based purely on harmonic content, ignoring noise properties. - **noise_weighted**: P_h N^{-1} P_h (default). Weights modes by inverse noise, preferring low-noise harmonic modes. - **total_covariance**: P_h C^{-1} P_h where C = N + S. Uses full signal+noise covariance for optimal compression. - **snr**: S^{1/2} N^{-1} S^{1/2}. Signal-to-noise ratio matrix, prioritizing modes with highest SNR. Parameters ---------- N : numpy.ndarray Noise covariance matrix of shape (n_pix, n_pix). The basis takes ownership of this buffer; ``_factorise_noise()`` overwrites it in place during setup (V-based mode only; pixel-direct keeps it). theta : numpy.ndarray Colatitude angles for active pixels in radians. phi : numpy.ndarray Longitude angles for active pixels in radians. lmax : int Maximum multipole for harmonic expansion. beam : numpy.ndarray or None, optional Beam window function B_ℓ for ℓ=2 to lmax. basis : str, default "noise_weighted" Compression basis to use. Options: - "harmonic": P_h = V^T V (pure harmonic projector) - "noise_weighted": P_h N^{-1} P_h (inverse noise weighting) - "total_covariance": P_h C^{-1} P_h where C = N + S - "snr": S^{1/2} N^{-1} S^{1/2} (signal-to-noise ratio) C_ell : numpy.ndarray or None, optional Power spectrum values for ell = 2 to lmax. Required for "total_covariance" and "snr" bases. epsilon : float, optional Eigenvalue threshold relative to maximum. mode_fraction : float, optional Fraction of modes to keep. Attributes ---------- dim : int Number of modes kept after compression (initially n_pix). Examples -------- >>> import numpy as np >>> from cosmocore.basis import PixelBasis >>> N = np.diag(noise_variance) # Noise covariance matrix >>> # Compressed mode: pass epsilon at construction. >>> ppc = PixelBasis( ... N, theta, phi, lmax_signal=100, ... epsilon=1e-4, compression_target="noise_weighted", ... ) >>> ppc.setup() >>> fisher_element = ppc.compute_fisher_element(C_ell, ell_i=10, ell_j=10) References ---------- .. [1] Gjerløw, E. et al. "Component separation for the CMB with a low-resolution analysis" A&A 629, A51 (2019) """
[docs] def __init__( self, N: np.ndarray, theta: np.ndarray, phi: np.ndarray, lmax_signal: int, beam: np.ndarray | None = None, spins: list[int] | None = None, compression_target: str = "noise_weighted", C_ell: np.ndarray | None = None, epsilon: float | list[float | tuple[float, float]] | None = None, mode_fraction: float | list[float | tuple[float, float]] | None = None, lmin_signal: list[int] | None = None, lmin: int | None = None, lmax: int | None = None, S_fixed: np.ndarray | None = None, fields=None, ): super().__init__( N, theta, phi, lmax_signal, beam, spins=spins, lmin_signal=lmin_signal, lmin=lmin, lmax=lmax, S_fixed=S_fixed, ) self._fields = fields # Before compression, dim = n_pix self.dim = self.n_pix # Compression quantities self._compression_target = compression_target self._C_ell_for_basis = C_ell self._epsilon = epsilon self._mode_fraction = mode_fraction self._eigenvectors = None # Direct mode (no compression intent) skips the harmonic-builder # machinery; setup() will route to _setup_direct. Compressed mode # builds V and runs _apply_compression inside _setup_v_based. if self._is_compressed: self._init_harmonic_internals() # Parse per-field thresholds self._epsilon_per_field = self._parse_per_field_thresholds(epsilon, "epsilon") self._mode_fraction_per_field = self._parse_per_field_thresholds( mode_fraction, "mode_fraction" )
def _parse_per_field_thresholds( self, value: float | list[float | tuple[float, float]] | None, name: str, ) -> list[float | tuple[float, float]] | None: """ Normalize threshold parameter to per-field list. Parameters ---------- value : float, list, or None Threshold value(s). float broadcasts to all fields; list must match n_components; tuples in list are only valid for spin-2 fields. name : str Parameter name for error messages. Returns ------- list or None Per-field threshold list, or None if value is None. """ if value is None: return None if isinstance(value, (int, float)): return [float(value)] * self.n_components if isinstance(value, list): if len(value) != self.n_components: raise ValueError( f"{name} list length ({len(value)}) must match " f"number of components ({self.n_components})" ) for i, v in enumerate(value): if isinstance(v, tuple): if self._spins[i] != 2: raise ValueError( f"{name}[{i}] is a tuple (E/B split) but component " f"{i} has spin {self._spins[i]}, not 2" ) if len(v) != 2: raise ValueError( f"{name}[{i}] tuple must have exactly 2 elements " f"(E_threshold, B_threshold), got {len(v)}" ) return value raise TypeError( f"{name} must be float, list, or None, got {type(value).__name__}" ) def _eigendecompose_single( self, comp_matrix: np.ndarray, epsilon: float | None, mode_fraction: float | None, ) -> tuple[np.ndarray, np.ndarray | None]: """ Eigendecompose a compression matrix and apply threshold. Parameters ---------- comp_matrix : numpy.ndarray Symmetric compression matrix. epsilon : float or None Eigenvalue threshold relative to maximum. mode_fraction : float or None Fraction of modes to keep. Returns ------- U : numpy.ndarray Kept eigenvectors, shape (n, dim). eigenvalues : numpy.ndarray or None Kept eigenvalues (descending), or None. """ eigenvalues, eigenvectors = eigh(comp_matrix) # Sort descending sort_idx = np.argsort(eigenvalues)[::-1] eigenvalues = eigenvalues[sort_idx] eigenvectors = eigenvectors[:, sort_idx] if mode_fraction is not None: n_significant = np.sum(eigenvalues > 1e-10 * np.max(np.abs(eigenvalues))) n_to_keep = max(1, int(np.ceil(mode_fraction * n_significant))) mask = np.zeros(len(eigenvalues), dtype=bool) mask[:n_to_keep] = True elif epsilon is not None: max_eigenvalue = np.max(np.abs(eigenvalues)) threshold = epsilon * max_eigenvalue mask = np.abs(eigenvalues) > threshold else: # Default: keep everything above numerical noise max_eigenvalue = np.max(np.abs(eigenvalues)) mask = np.abs(eigenvalues) > 1e-10 * max_eigenvalue return eigenvectors[:, mask], eigenvalues[mask] def _build_compression_matrix_for_subfield( self, V_sub: np.ndarray, N_field: np.ndarray, N_field_inv: np.ndarray, basis: str, C_ell_sub: np.ndarray | None = None, ) -> np.ndarray: """ Build compression matrix for a sub-field (E or B rows of spin-2, or full spin-0). Parameters ---------- V_sub : numpy.ndarray V sub-block, shape (n_sub_modes, n_field_pix). N_field : numpy.ndarray Noise covariance for this field, shape (n_field_pix, n_field_pix). N_field_inv : numpy.ndarray Noise inverse for this field. basis : str Compression basis. C_ell_sub : numpy.ndarray or None Auto-spectrum diagonal for this sub-field (EE for E, BB for B). Returns ------- numpy.ndarray Compression matrix, shape (n_field_pix, n_field_pix). """ P_sub = matrix_mult(V_sub.T, V_sub) if basis == "harmonic": return P_sub elif basis == "noise_weighted": return matrix_mult(matrix_mult(P_sub, N_field_inv), P_sub) elif basis == "total_covariance": if C_ell_sub is None: raise ValueError("C_ell required for 'total_covariance' basis") V_scaled = V_sub * C_ell_sub[:, np.newaxis] S_sub = matrix_mult(V_sub.T, V_scaled) C_sub = N_field + S_sub C_sub_inv = matrix_inverse_symm(C_sub, overwrite=True) return matrix_mult(matrix_mult(P_sub, C_sub_inv), P_sub) elif basis == "snr": if C_ell_sub is None: raise ValueError("C_ell required for 'snr' basis") V_scaled = V_sub * C_ell_sub[:, np.newaxis] S_sub = matrix_mult(V_sub.T, V_scaled) eigvals_S, eigvecs_S = eigh(S_sub) sqrt_eigvals = np.sqrt(np.maximum(eigvals_S, 1e-30)) Q_scaled = eigvecs_S * sqrt_eigvals S_sqrt = matrix_mult(Q_scaled, eigvecs_S.T) return matrix_mult(matrix_mult(S_sqrt, N_field_inv), S_sqrt) else: raise ValueError(f"Unknown compression basis '{basis}'") def _eigendecompose_spin2_split( self, V_comp: np.ndarray, N_field: np.ndarray, N_field_inv: np.ndarray, basis: str, epsilon: tuple[float, float] | None, mode_fraction: tuple[float, float] | None, C_ell: np.ndarray | None = None, ) -> np.ndarray: """ Eigendecompose a spin-2 field with separate E/B thresholds. Parameters ---------- V_comp : numpy.ndarray Full V block for this spin-2 component, shape (2*n_base, 2*n_phys). N_field : numpy.ndarray Noise covariance for this field. N_field_inv : numpy.ndarray Noise inverse for this field. basis : str Compression basis. epsilon : tuple of float or None (E_epsilon, B_epsilon) thresholds. mode_fraction : tuple of float or None (E_fraction, B_fraction) mode fractions. C_ell : numpy.ndarray or None C_ell for basis that needs it. For spin-2, this should be a dict-like or we extract EE/BB diagonals. Returns ------- U_combined : numpy.ndarray Orthogonalized eigenvectors, shape (n_field_pix, dim). """ n_base = self._n_modes_base # Split V into E and B rows V_E = V_comp[:n_base, :] # E modes, all QU pixels V_B = V_comp[n_base:, :] # B modes, all QU pixels # Build per-sub-field C_ell diagonals if needed C_ell_E = None C_ell_B = None if C_ell is not None and isinstance(C_ell, dict): # Extract EE and BB diagonals C_ell_EE = C_ell.get("EE", None) C_ell_BB = C_ell.get("BB", None) if C_ell_EE is not None: C_ell_E = self._build_lambda_diagonal(C_ell_EE) if C_ell_BB is not None: C_ell_B = self._build_lambda_diagonal(C_ell_BB) # Build compression matrices for E and B separately comp_E = self._build_compression_matrix_for_subfield( V_E, N_field, N_field_inv, basis, C_ell_E ) comp_B = self._build_compression_matrix_for_subfield( V_B, N_field, N_field_inv, basis, C_ell_B ) # Eigendecompose each with separate thresholds eps_E = epsilon[0] if epsilon is not None else None eps_B = epsilon[1] if epsilon is not None else None mf_E = mode_fraction[0] if mode_fraction is not None else None mf_B = mode_fraction[1] if mode_fraction is not None else None U_E, _ = self._eigendecompose_single(comp_E, eps_E, mf_E) U_B, _ = self._eigendecompose_single(comp_B, eps_B, mf_B) # Combine and SVD-orthogonalize (E and B pixel patterns overlap on cut sky) U_combined = np.hstack([U_E, U_B]) Q, S, _ = svd(U_combined) # Keep columns where singular values are significant keep = S > 1e-10 * S[0] return Q[:, keep] def _eigendecompose_field( self, comp_idx: int, basis: str, epsilon: float | tuple[float, float] | None, mode_fraction: float | tuple[float, float] | None, C_ell: np.ndarray | dict | None = None, ) -> tuple[np.ndarray, np.ndarray | None]: """ Eigendecompose a single field component. Routes to E/B split for spin-2 with tuple thresholds, or standard eigendecomposition otherwise. Parameters ---------- comp_idx : int Component index. basis : str Compression basis. epsilon : float, tuple, or None Threshold(s). mode_fraction : float, tuple, or None Mode fraction(s). C_ell : array or dict or None Power spectrum for basis that needs it. Returns ------- U : numpy.ndarray Eigenvectors for this field, shape (n_field_pix, dim). eigenvalues : numpy.ndarray or None Eigenvalues if available (None for E/B split). """ spin = self._spins[comp_idx] pix_start = self._pix_offsets[comp_idx] pix_end = self._pix_offsets[comp_idx + 1] # Extract field noise blocks. Reads via _N_symmetric and N_inv so the # method works whether called during setup (symmetric N still alive) # or post-setup (lazy reconstruction via Cholesky factor). N_field = self._N_symmetric[pix_start:pix_end, pix_start:pix_end] N_field_inv = self.N_inv[pix_start:pix_end, pix_start:pix_end] V_comp = self._V_blocks[comp_idx] use_eb_split = spin == 2 and ( isinstance(epsilon, tuple) or isinstance(mode_fraction, tuple) ) if use_eb_split: U = self._eigendecompose_spin2_split( V_comp, N_field, N_field_inv, basis, epsilon if isinstance(epsilon, tuple) else None, mode_fraction if isinstance(mode_fraction, tuple) else None, C_ell, ) return U, None else: # Standard: build compression matrix for full field eps_scalar = epsilon if not isinstance(epsilon, tuple) else epsilon[0] mf_scalar = mode_fraction if not isinstance(mode_fraction, tuple) else None # Build P_h for this field P_sub = matrix_mult(V_comp.T, V_comp) if basis == "harmonic": comp_matrix = P_sub elif basis == "noise_weighted": comp_matrix = matrix_mult(matrix_mult(P_sub, N_field_inv), P_sub) elif basis == "total_covariance": if C_ell is None: raise ValueError("C_ell required for 'total_covariance' basis") Lambda_diag = self._build_lambda_diagonal( C_ell if not isinstance(C_ell, dict) else next(iter(C_ell.values())) ) V_scaled = V_comp * Lambda_diag[: V_comp.shape[0], np.newaxis] S = matrix_mult(V_comp.T, V_scaled) C_total = N_field + S C_inv = matrix_inverse_symm(C_total, overwrite=True) comp_matrix = matrix_mult(matrix_mult(P_sub, C_inv), P_sub) elif basis == "snr": if C_ell is None: raise ValueError("C_ell required for 'snr' basis") Lambda_diag = self._build_lambda_diagonal( C_ell if not isinstance(C_ell, dict) else next(iter(C_ell.values())) ) V_scaled = V_comp * Lambda_diag[: V_comp.shape[0], np.newaxis] S = matrix_mult(V_comp.T, V_scaled) eigvals_S, eigvecs_S = eigh(S) sqrt_eigvals = np.sqrt(np.maximum(eigvals_S, 1e-30)) Q_scaled = eigvecs_S * sqrt_eigvals S_sqrt = matrix_mult(Q_scaled, eigvecs_S.T) comp_matrix = matrix_mult(matrix_mult(S_sqrt, N_field_inv), S_sqrt) else: raise ValueError(f"Unknown compression basis '{basis}'") U, eigvals = self._eigendecompose_single(comp_matrix, eps_scalar, mf_scalar) return U, eigvals @property def method(self) -> str: """Compression method name.""" return "pixel" @property def _is_compressed(self) -> bool: """True when this PixelBasis is configured for eigenmode compression. Compression intent is fully determined by the constructor kwargs: if either ``epsilon`` or ``mode_fraction`` is non-None, compression will be applied by :meth:`setup`. The property is intentionally derived (no separate flag) so intent and configuration cannot drift. """ return self._epsilon is not None or self._mode_fraction is not None @property def projector(self) -> np.ndarray: """ Get the projection matrix U^T (dim × n_pix). Maps pixel space to the basis. In pixel-direct mode the projector is the identity; in compressed mode it is ``U^T``, the transpose of the kept eigenvectors. Raises ------ RuntimeError If compression has not been applied yet (V-based mode only). """ if not self._is_compressed: # Cache the identity projector — n_pix×n_pix dense, allocated once. cached = getattr(self, "_direct_projector", None) if cached is None: cached = np.eye(self.n_pix) self._direct_projector = cached return cached if self._eigenvectors is None: raise RuntimeError( "Compression not applied. Pass epsilon or mode_fraction " "to PixelBasis(...) at construction." ) return self._eigenvectors.T
[docs] def to_basis(self, data: np.ndarray) -> np.ndarray: """ Project pixel-space data into the basis. Pixel-direct mode short-circuits the multiplication against the identity projector (otherwise an ``n_pix × n_sims`` matmul against an ``n_pix × n_pix`` identity, allocated and run on every call). """ if not self._is_compressed: return data return super().to_basis(data)
[docs] def setup(self) -> None: """ Initialize the basis. In V-based mode: build V and P_h, optionally apply eigenvalue compression. In direct mode (n_pix small enough that V is overkill): skip V construction and rely on existing pixel-space machinery (compute_signal_matrix, do_derivative_step). """ if not self._is_compressed: self._setup_direct() else: self._setup_v_based()
def _setup_v_based(self) -> None: """V-based setup: build V, P_h, optionally apply compression. When switch optimization is active (lswitch_high < lmax), absorbs high-ℓ signal into N_eff and uses N_eff for compression matrix construction. Raw N is preserved as ``self._N_original`` for noise-bias computations downstream. """ self._build_basis() if self._use_switch_optimization: self._compute_effective_noise() self._P_h = matrix_mult(self._V.T, self._V) if self._epsilon is not None or self._mode_fraction is not None: self._apply_compression() # Precompute U^T N_raw U so _N_original can be released; the # noise-bias path reads only the compressed form thereafter. self._precompute_noise_in_basis() # Factor self._N in place. After this point the symmetric N buffer # is gone; consumers go through cholesky_solve via _N_chol or the # lazy _N_symmetric reconstruction (deferred compression-basis paths). self._factorise_noise() def _precompute_noise_in_basis(self) -> None: """Cache U^T N_orig U and release the n_pix x n_pix raw N reference. Only possible when compression is active (``_eigenvectors`` is set) and switch optimization captured a raw ``_N_original``. Otherwise ``get_noise_for_bias`` falls back to ``_N`` / ``_N_original``. """ N_original = getattr(self, "_N_original", None) if N_original is None or self._eigenvectors is None: return U = self._eigenvectors UN_raw_U = U.T @ N_original @ U self._noise_in_basis_cache = symmetrize_inplace(UN_raw_U) del self._N_original def _compute_effective_noise(self) -> None: """Absorb high-ℓ signal into N_eff (mirrors HarmonicBasis). Replaces ``self._N`` with ``N_eff = N + S_fixed``. The factorisation happens later, in ``_factorise_noise`` at the end of setup, after compression-basis selection has read symmetric N for U^T N U. """ if self._S_fixed is None: raise ValueError( "S_fixed must be provided when using switch optimization " "(lswitch_high < lmax)" ) S_fixed = np.asarray(self._S_fixed, dtype=np.float64) # Preserve raw N for noise-bias computations (still read lazily by # get_noise_for_bias, so we cannot release it here). self._N_original = self._N # Build N_eff = N + S_fixed straight into a preallocated F-order # buffer to avoid the C-order intermediate that `_N + S_fixed` # would otherwise produce (one extra n_pix^2 alloc at peak). N_eff = np.empty_like(self._N, order="F") np.add(self._N, S_fixed, out=N_eff) self._N = N_eff # S_fixed has been absorbed into N_eff and is not read again; release. self._S_fixed = None
[docs] def get_noise_for_bias(self) -> np.ndarray: """U^T N_raw U — raw noise projected into the eigenmode basis. See :meth:`ComputationBasis.get_noise_for_bias` for the cross-basis contract. Use this for noise-bias computations: ``Tr[C^{-1} N C^{-1} dS]`` requires the actual noise N, not the effective N_eff that includes the absorbed high-ℓ signal. Without switch optimization, N_eff = N and this is identical to ``get_covariance(0)``. """ cached = getattr(self, "_noise_in_basis_cache", None) if cached is not None: return cached N_raw = getattr(self, "_N_original", self._N_symmetric) if self._eigenvectors is None: return N_raw U = self._eigenvectors UN_raw_U = U.T @ N_raw @ U return symmetrize_inplace(UN_raw_U)
def _setup_direct(self) -> None: """Direct pixel-space setup. No V operator, no harmonic machinery. Direct mode reuses existing pixel-space machinery from ``cosmocore.signal_kernels`` — no Legendre or geometry recomputation. """ if self._fields is None: raise ValueError( "Direct mode requires fields (FieldCollection) to be passed to PixelBasis" ) # Build (label -> spectrum_idx) mapping for derivative dispatch self._spec_label_to_idx = { label: idx for idx, label in enumerate(self._fields.spectra_labels) } # ========================================================================= # Direct pixel-space methods (no V operator, reuse cosmocore.signal_kernels) # ========================================================================= def _spectrum_idx_from_components(self, comp_i: int, comp_j: int, mode: int) -> int: """Map (comp_i, comp_j, mode) to spectrum_idx for do_derivative_step. Mode encoding follows the QML/CosmoForge convention: - spin-0 × spin-0: mode 0 = auto (e.g. TT) - spin-2 × spin-2 same component: mode 0 = EE, 1 = BB, 2 = EB - spin-2 × spin-2 cross-component: linear over label pairs - spin-0 × spin-2: linear over label pairs (e.g. TE, TB) """ fi = self._fields.fields[comp_i] fj = self._fields.fields[comp_j] labels_i = fi.maps_label if isinstance(fi.maps_label, list) else [fi.maps_label] labels_j = fj.maps_label if isinstance(fj.maps_label, list) else [fj.maps_label] if fi.spin == 2 and fj.spin == 2 and comp_i == comp_j: # Auto-spectrum on a single spin-2 field: mode 0=EE, 1=BB, 2=EB if mode == 0: label = labels_i[0] + labels_j[0] elif mode == 1: label = labels_i[1] + labels_j[1] else: # mode == 2 label = labels_i[0] + labels_j[1] else: combined = [a + b for a in labels_i for b in labels_j] label = combined[mode] return self._spec_label_to_idx[label] @staticmethod def _symmetrize_inplace(block: np.ndarray) -> None: """Set block[i,j] = block[j,i] for i < j, in-place. Block must be square.""" # Equivalent to: block += block.T - diag(block) # but uses in-place ops to avoid temporary allocations of size n_pix^2. idx = np.triu_indices_from(block, k=1) block[idx] = block[(idx[1], idx[0])] def _direct_buffer(self) -> np.ndarray: """Return a reusable n_pix × n_pix Fortran-ordered buffer. Allocated lazily on first use. Repeatedly zeroed and refilled by the per-spectrum contribution functions to avoid 8 GB+ allocation churn during binned-derivative loops. """ if not hasattr(self, "_direct_pix_buffer"): self._direct_pix_buffer = np.asfortranarray( np.zeros((self.n_pix, self.n_pix), dtype=np.float64) ) return self._direct_pix_buffer def _get_derivative_direct( self, ell: int, comp_i: int | None = None, comp_j: int | None = None, mode: int = 0, ) -> np.ndarray: """Compute dS/dC_ell via existing pixel-space machinery. Reuses ``do_derivative_step`` from ``cosmocore.signal_kernels`` — no duplication of Legendre or dispatch logic. """ from ..signal_kernels import do_derivative_step ci = 0 if comp_i is None else comp_i cj = 0 if comp_j is None else comp_j spectrum_idx = self._spectrum_idx_from_components(ci, cj, mode) dS = self._direct_buffer() dS.fill(0.0) do_derivative_step( dS, spectrum_idx, self._fields.n_active, self._fields.spin, ell, self._fields, ) return dS
[docs] def get_binned_derivative_direct( self, bin_idx: int, bins, beam_smoothing: np.ndarray | None, key, ) -> np.ndarray: """Build the binned derivative dC^b in a single Legendre/Wigner pass. Mathematically equivalent to summing per-ℓ ``_get_derivative_direct`` results with weights ``beam²(ℓ)`` over ℓ ∈ [bins.lmins[bin_idx], bins.lmaxs[bin_idx]], but uses one ``compute_*_contribution`` call instead of N (where N = bin width). The contribution functions accumulate ``Σ_ℓ cl[ell] × kernel[ell]`` per pixel pair on ℓ-indexed arrays, so populating ``cl[ell] = beam²(ell)`` inside the bin (zero elsewhere) yields the exact binned derivative. Parameters ---------- key : SpectrumKey Identifies the spectrum (component pair + spin kind). Decomposed into ``(comp_i, comp_j, mode)`` once at entry for the Numba kernels below. """ from ..signal_kernels import ( compute_00_contribution, compute_02_contribution, compute_22_contribution, ) from ..spectrum_key import kind_to_legacy_mode is_cross = key.comp_i != key.comp_j comp_i = key.comp_i comp_j = key.comp_j mode = kind_to_legacy_mode(key.kind, is_cross=is_cross) lmin_b = bins.lmins[bin_idx] lmax_b = bins.lmaxs[bin_idx] # Build ℓ-indexed weight array w[ell] = beam²(ell) inside the bin and # 0 elsewhere. The contribution functions evaluate Σ_ℓ w[ell] × # kernel[ell] for ell=lmin..lmax, giving Σ_{ell∈bin} beam²(ell) × # dC/dC_ell. beam_smoothing (when provided) is the inference-range # slice (ell=bins.lmin..lmax, offset-from-bins.lmin; ADR 0009). weights = np.zeros(self.lmax + 1, dtype=np.float64) for ell in range(lmin_b, lmax_b + 1): if ell > self.lmax: continue w = 1.0 if beam_smoothing is not None: w = beam_smoothing[ell - bins.lmin] weights[ell] = w fi = self._fields.fields[comp_i] fj = self._fields.fields[comp_j] spin_i = fi.spin spin_j = fj.spin # Pixel layout matches do_derivative_step's convention. ri = sum( 2 * n if s == 2 else n for n, s in zip( self._fields.n_active[:comp_i], self._fields.spin[:comp_i], ) ) rj = sum( 2 * n if s == 2 else n for n, s in zip( self._fields.n_active[:comp_j], self._fields.spin[:comp_j], ) ) nrow = self._n_pix_per_component[comp_i] ncol = self._n_pix_per_component[comp_j] # Allocate a fresh output array — Fisher stores all binned derivatives # in a dict and reuses them later, so we cannot share a buffer across # binned calls without aliasing. dS = np.asfortranarray(np.zeros((self.n_pix, self.n_pix), dtype=np.float64)) block = dS[rj : rj + ncol, ri : ri + nrow] legendre = np.empty(self.lmax + 1, dtype=np.float64) zeros = np.zeros(self.lmax + 1, dtype=np.float64) if spin_i == 0 and spin_j == 0: sub_mode = 0 if comp_i == comp_j else 1 compute_00_contribution( weights, block, fi.point_vectors, fj.point_vectors, legendre, sub_mode ) if sub_mode == 0: # compute_00_contribution mode=0 fills only lower + diagonal; # derivative_step_00 mode=0 symmetrizes upper at the end. self._symmetrize_inplace(block) else: # Cross-field (mode=1): both halves filled inside, but the # transposed cross-block in dS must be set to match # do_derivative_step's outer-level mirror. dS[ri : ri + nrow, rj : rj + ncol] = block.T elif spin_i == 2 and spin_j == 2 and comp_i == comp_j: f1 = np.empty(self.lmax + 1, dtype=np.float64) f2 = np.empty(self.lmax + 1, dtype=np.float64) if mode == 0: # EE cl11, cl22, cl12 = weights, zeros, zeros elif mode == 1: # BB cl11, cl22, cl12 = zeros, weights, zeros elif mode == 2: # EB cl11, cl22, cl12 = zeros, zeros, weights else: raise ValueError(f"Invalid spin-2 mode {mode}") compute_22_contribution( cl11, cl22, cl12, block, fi.point_vectors, fj.point_vectors, legendre, f1, f2, ) # derivative_step_22 symmetrizes its full output; compute_22 does # not, so do it here. The block is the full square spin-2 derivative. self._symmetrize_inplace(block) elif (spin_i, spin_j) in ((0, 2), (2, 0)): cl12 = weights if mode == 0 else zeros cl13 = weights if mode == 1 else zeros if spin_i == 0: compute_02_contribution( cl12, cl13, block, fi.point_vectors, fj.point_vectors, legendre, ) # Mirror to the (i,j) block so the full dS is symmetric, matching # do_derivative_step's outer-level mirror (cosmocore/pixel.py # line ~781) for the spin-0×spin-2 case. dS[ri : ri + nrow, rj : rj + ncol] = block.T else: block_swapped = dS[ri : ri + nrow, rj : rj + ncol] compute_02_contribution( cl12, cl13, block_swapped, fj.point_vectors, fi.point_vectors, legendre, ) # Same mirror, opposite direction. dS[rj : rj + ncol, ri : ri + nrow] = block_swapped.T else: raise NotImplementedError( f"Direct binned derivative not implemented for spins ({spin_i}, {spin_j})" ) return dS
def _build_signal_matrix_direct(self) -> np.ndarray: """Build signal matrix S via existing pixel-space machinery. Uses the spectra currently set on the FieldCollection. Reuses ``compute_signal_matrix`` from ``cosmocore.signal_kernels``. """ from ..signal_kernels import compute_signal_matrix S = np.zeros((self.n_pix, self.n_pix), dtype=np.float64) S = np.asfortranarray(S) compute_signal_matrix(S, self.lmax_signal, self._fields) return S def _build_compression_matrix( self, basis: str, C_ell: np.ndarray | None = None, ) -> np.ndarray: """ Build the matrix for eigendecomposition based on compression basis. Parameters ---------- basis : str Compression basis: "harmonic", "noise_weighted", "total_covariance", "snr". C_ell : numpy.ndarray, optional Power spectrum values for ell = 2 to lmax. Required for "total_covariance" and "snr" bases. Returns ------- numpy.ndarray Compression matrix of shape (n_pix, n_pix). """ if basis not in COMPRESSION_BASES: raise ValueError( f"Unknown compression basis '{basis}'. " f"Available: {list(COMPRESSION_BASES.keys())}" ) if basis == "harmonic": # Pure harmonic projector P_h = V^T V return self._P_h elif basis == "noise_weighted": # P_h N^{-1} P_h return matrix_mult(matrix_mult(self._P_h, self.N_inv), self._P_h) elif basis == "total_covariance": # P_h C^{-1} P_h where C = N + S if C_ell is None: raise ValueError( "C_ell is required for 'total_covariance' basis. " "Provide power spectrum values for ell = 2 to lmax." ) # Build signal covariance S = V^T Λ V # Use broadcasting to avoid creating full diagonal matrix Lambda_diag = self._build_lambda_diagonal(C_ell) # (n_modes, n_pix) * (n_modes, 1) V_scaled = self._V * Lambda_diag[:, np.newaxis] S = matrix_mult(self._V.T, V_scaled) # Total covariance C = N + S (symmetric N via lazy reconstruction # if the basis has been factorised in place). C = self._N_symmetric + S # C^{-1} C_inv = matrix_inverse_symm(C, overwrite=True) # P_h C^{-1} P_h return matrix_mult(matrix_mult(self._P_h, C_inv), self._P_h) elif basis == "snr": # S^{1/2} N^{-1} S^{1/2} - signal-to-noise ratio matrix if C_ell is None: raise ValueError( "C_ell is required for 'snr' basis. " "Provide power spectrum values for ell = 2 to lmax." ) # Build signal covariance S = V^T Λ V # Use broadcasting to avoid creating full diagonal matrix Lambda_diag = self._build_lambda_diagonal(C_ell) V_scaled = self._V * Lambda_diag[:, np.newaxis] S = matrix_mult(self._V.T, V_scaled) eigvals_S, eigvecs_S = eigh(S) sqrt_eigvals = np.sqrt(np.maximum(eigvals_S, 1e-30)) Q_scaled = eigvecs_S * sqrt_eigvals # (n_pix, n_pix) * (n_pix,) S_sqrt = matrix_mult(Q_scaled, eigvecs_S.T) # S^{1/2} N^{-1} S^{1/2} return matrix_mult(matrix_mult(S_sqrt, self.N_inv), S_sqrt) # Should never reach here raise ValueError(f"Unhandled basis: {basis}")
[docs] def compute_eigenspectrum( self, basis: str = "noise_weighted", C_ell: np.ndarray | None = None, ) -> tuple[np.ndarray, np.ndarray]: """ Compute eigenvalue spectrum for a given compression basis. This method computes the eigendecomposition without applying compression, allowing inspection of the spectrum to choose an appropriate threshold. Parameters ---------- basis : str, default "noise_weighted" Compression basis: "harmonic", "noise_weighted", "total_covariance", "snr". C_ell : numpy.ndarray, optional Power spectrum values for ell = 2 to lmax. Required for "total_covariance" and "snr" bases. Returns ------- eigenvalues : numpy.ndarray Eigenvalues sorted in descending order. normalized_eigenvalues : numpy.ndarray Eigenvalues normalized by maximum value (for threshold selection). """ compression_matrix = self._build_compression_matrix(basis, C_ell) eigenvalues, _ = eigh(compression_matrix) # Sort in descending order eigenvalues = np.sort(eigenvalues)[::-1] # Normalize by maximum max_eigenvalue = np.max(np.abs(eigenvalues)) if max_eigenvalue > 0: normalized_eigenvalues = eigenvalues / max_eigenvalue else: normalized_eigenvalues = eigenvalues.copy() return eigenvalues, normalized_eigenvalues
[docs] def compute_eigenspectrum_per_field( self, basis: str = "noise_weighted", C_ell: np.ndarray | dict | None = None, ) -> list[dict]: """ Compute per-component eigenvalue spectra. Returns one entry per component with eigenvalues and normalized eigenvalues. For spin-2 components the result additionally contains separate E- and B-mode eigenspectra. Parameters ---------- basis : str, default "noise_weighted" Compression basis. C_ell : numpy.ndarray, dict, or None Power spectrum (required for "total_covariance" and "snr" bases). Returns ------- list of dict One dict per component with keys: ``component``, ``spin``, ``label``, ``eigenvalues``, ``normalized_eigenvalues``. Spin-2 components additionally have ``E_eigenvalues``, ``E_normalized``, ``B_eigenvalues``, ``B_normalized``. """ if basis not in COMPRESSION_BASES: raise ValueError( f"Unknown compression basis '{basis}'. " f"Available: {list(COMPRESSION_BASES.keys())}" ) results: list[dict] = [] for comp_idx in range(self.n_components): spin = self._spins[comp_idx] pix_start = self._pix_offsets[comp_idx] pix_end = self._pix_offsets[comp_idx + 1] N_field = self._N_symmetric[pix_start:pix_end, pix_start:pix_end] N_field_inv = self.N_inv[pix_start:pix_end, pix_start:pix_end] V_comp = self._V_blocks[comp_idx] # Resolve per-mode C_ell diagonals for this component cell_diag_0, cell_diag_1 = self._resolve_cell_diagonals(C_ell, comp_idx, spin) # Full-field C_ell diagonal if spin == 2 and cell_diag_0 is not None: cell_sub_full = np.concatenate( [ cell_diag_0, cell_diag_1 if cell_diag_1 is not None else np.zeros_like(cell_diag_0), ] ) else: cell_sub_full = cell_diag_0 comp_matrix = self._build_compression_matrix_for_subfield( V_comp, N_field, N_field_inv, basis, cell_sub_full ) eigenvalues = np.sort(eigvalsh(comp_matrix))[::-1] max_ev = np.max(np.abs(eigenvalues)) normalized = eigenvalues / max_ev if max_ev > 0 else eigenvalues.copy() entry: dict = { "component": comp_idx, "spin": spin, "label": f"Field {comp_idx} (spin-{spin})", "eigenvalues": eigenvalues, "normalized_eigenvalues": normalized, } if spin == 2: n_base = self._n_modes_base V_E = V_comp[:n_base, :] V_B = V_comp[n_base:, :] comp_E = self._build_compression_matrix_for_subfield( V_E, N_field, N_field_inv, basis, cell_diag_0 ) ev_E = np.sort(eigvalsh(comp_E))[::-1] max_E = np.max(np.abs(ev_E)) norm_E = ev_E / max_E if max_E > 0 else ev_E.copy() comp_B = self._build_compression_matrix_for_subfield( V_B, N_field, N_field_inv, basis, cell_diag_1 ) ev_B = np.sort(eigvalsh(comp_B))[::-1] max_B = np.max(np.abs(ev_B)) norm_B = ev_B / max_B if max_B > 0 else ev_B.copy() entry["E_eigenvalues"] = ev_E entry["E_normalized"] = norm_E entry["B_eigenvalues"] = ev_B entry["B_normalized"] = norm_B results.append(entry) return results
def _resolve_cell_diagonals( self, C_ell: np.ndarray | dict | None, comp_idx: int, spin: int, ) -> tuple[np.ndarray | None, np.ndarray | None]: """ Extract per-mode C_ell diagonals for a component. Returns ------- cell_diag_0 : numpy.ndarray or None Auto-spectrum diagonal (TT for spin-0, EE for spin-2). cell_diag_1 : numpy.ndarray or None BB diagonal (only for spin-2, None otherwise). """ if C_ell is None: return None, None if not isinstance(C_ell, dict): return self._build_lambda_diagonal(C_ell), None # Dict: try 3-tuple key (comp, comp, mode), then 2-tuple arr_0 = C_ell.get((comp_idx, comp_idx, 0)) if arr_0 is None: arr_0 = C_ell.get((comp_idx, comp_idx)) diag_0 = self._build_lambda_diagonal(arr_0) if arr_0 is not None else None diag_1 = None if spin == 2: arr_1 = C_ell.get((comp_idx, comp_idx, 1)) if arr_1 is not None: diag_1 = self._build_lambda_diagonal(arr_1) return diag_0, diag_1
[docs] def plot_eigenvalue_spectrum( self, basis: str = "noise_weighted", C_ell: np.ndarray | dict | None = None, axes: np.ndarray | None = None, log_scale: bool = True, show_threshold_lines: bool = True, threshold_values: list[float] | None = None, show_eb_split: bool = True, ) -> tuple[Figure, np.ndarray]: """ Plot eigenvalue spectrum for compression threshold selection. Creates one subplot per component. The y-axis shows eigenvalues normalized by the maximum value, so values can be directly used as the ``epsilon`` constructor kwarg of :class:`PixelBasis`. For spin-2 components the E and B sub-spectra are shown as dashed curves when ``show_eb_split`` is True. Parameters ---------- basis : str, default "noise_weighted" Compression basis. C_ell : numpy.ndarray, dict, or None Power spectrum (required for "total_covariance" and "snr" bases). axes : numpy.ndarray of Axes or None Pre-created axes array (length ``n_components``). If None, a new figure is created. log_scale : bool, default True Whether to use logarithmic y-axis. show_threshold_lines : bool, default True Whether to show reference threshold lines. threshold_values : list of float or None Custom threshold values to show. Default: [1e-2, 1e-4, 1e-6, 1e-8]. show_eb_split : bool, default True For spin-2 components, overlay E and B sub-spectra. Returns ------- fig : matplotlib.figure.Figure The figure containing the plot. axes : numpy.ndarray 1-D array of Axes (length ``n_components``). Examples -------- >>> # Pick threshold by plotting first, then construct a compressed basis. >>> # The probe construction uses epsilon=0.0 to opt into the V-based >>> # path (keeps all modes) so plot_eigenvalue_spectrum has V to read. >>> ppc_probe = PixelBasis( ... N, theta, phi, lmax_signal=100, epsilon=0.0, ... ) >>> ppc_probe.setup() >>> fig, axes = ppc_probe.plot_eigenvalue_spectrum(basis="noise_weighted") >>> # From the plot, decide threshold (e.g., 1e-4). >>> ppc = PixelBasis( ... N, theta, phi, lmax_signal=100, ... epsilon=1e-4, compression_target="noise_weighted", ... ) >>> ppc.setup() """ import matplotlib.pyplot as plt per_field = self.compute_eigenspectrum_per_field(basis, C_ell) n_comp = len(per_field) if axes is None: fig, axes_raw = plt.subplots( 1, n_comp, figsize=(6 * n_comp, 6), squeeze=False ) axes_arr: np.ndarray = axes_raw[0] else: axes_arr = np.atleast_1d(axes) fig = axes_arr[0].get_figure() if threshold_values is None: threshold_values = [1e-2, 1e-4, 1e-6, 1e-8] for idx, entry in enumerate(per_field): ax = axes_arr[idx] normalized = entry["normalized_eigenvalues"] mode_indices = np.arange(1, len(normalized) + 1) ax.plot(mode_indices, normalized, "b-", linewidth=1.5, label="Eigenvalues") # E/B overlay for spin-2 if show_eb_split and entry["spin"] == 2: norm_E = entry["E_normalized"] norm_B = entry["B_normalized"] ax.plot( np.arange(1, len(norm_E) + 1), norm_E, "r--", linewidth=1.2, label="E modes", ) ax.plot( np.arange(1, len(norm_B) + 1), norm_B, "g--", linewidth=1.2, label="B modes", ) if show_threshold_lines: colors = plt.cm.Reds(np.linspace(0.3, 0.9, len(threshold_values))) for thresh, color in zip(threshold_values, colors): dim = int(np.sum(normalized > thresh)) ax.axhline( y=thresh, color=color, linestyle="--", alpha=0.7, label=f"\u03b5={thresh:.0e} ({dim} modes)", ) if log_scale: ax.set_yscale("log") ax.set_xlabel("Mode index", fontsize=12) ax.set_ylabel("Normalized eigenvalue", fontsize=12) ax.set_title(f"{entry['label']}: {basis}", fontsize=12) ax.legend(loc="upper right", fontsize=9) ax.grid(True, alpha=0.3) ax.set_xlim(1, len(normalized)) n_total = len(normalized) n_significant = int(np.sum(normalized > 1e-10)) ax.text( 0.02, 0.02, f"Total modes: {n_total}\nSignificant (>1e-10): {n_significant}", transform=ax.transAxes, fontsize=9, verticalalignment="bottom", bbox=dict(boxstyle="round", facecolor="wheat", alpha=0.5), ) plt.tight_layout() return fig, axes_arr
[docs] def plot_eigenvalue_comparison( self, bases: list[str] | None = None, C_ell: np.ndarray | dict | None = None, axes: np.ndarray | None = None, log_scale: bool = True, ) -> tuple[Figure, np.ndarray]: """ Compare eigenvalue spectra across different compression bases. Creates one subplot per component, overlaying the different bases. Parameters ---------- bases : list of str or None Compression bases to compare. Default: all available (or just "harmonic"/"noise_weighted" if C_ell is not provided). C_ell : numpy.ndarray, dict, or None Power spectrum (required for "total_covariance" and "snr" bases). axes : numpy.ndarray of Axes or None Pre-created axes array (length ``n_components``). If None, a new figure is created. log_scale : bool, default True Whether to use logarithmic y-axis. Returns ------- fig : matplotlib.figure.Figure The figure containing the plot. axes : numpy.ndarray 1-D array of Axes (length ``n_components``). """ import matplotlib.pyplot as plt if bases is None: if C_ell is not None: bases = list(COMPRESSION_BASES.keys()) else: bases = ["harmonic", "noise_weighted"] n_comp = self.n_components if axes is None: fig, axes_raw = plt.subplots( 1, n_comp, figsize=(6 * n_comp, 6), squeeze=False ) axes_arr: np.ndarray = axes_raw[0] else: axes_arr = np.atleast_1d(axes) fig = axes_arr[0].get_figure() basis_colors = plt.cm.tab10(np.linspace(0, 1, len(bases))) for basis, color in zip(bases, basis_colors): try: per_field = self.compute_eigenspectrum_per_field(basis, C_ell) except ValueError as e: print(f"Skipping basis '{basis}': {e}") continue for idx, entry in enumerate(per_field): ax = axes_arr[idx] normalized = entry["normalized_eigenvalues"] mode_indices = np.arange(1, len(normalized) + 1) ax.plot( mode_indices, normalized, color=color, linewidth=1.5, label=basis, ) for idx in range(n_comp): ax = axes_arr[idx] spin = self._spins[idx] if log_scale: ax.set_yscale("log") ax.set_xlabel("Mode index", fontsize=12) ax.set_ylabel("Normalized eigenvalue", fontsize=12) ax.set_title(f"Field {idx} (spin-{spin}): Comparison", fontsize=12) ax.legend(loc="upper right", fontsize=10) ax.grid(True, alpha=0.3) plt.tight_layout() return fig, axes_arr
def _apply_compression(self) -> None: """ Apply eigenvalue compression to find the optimal subspace. Internal helper invoked by :meth:`setup` when the constructor was called with ``epsilon`` or ``mode_fraction``. Reads compression configuration from the instance state set at construction: ``self._epsilon``, ``self._mode_fraction``, ``self._compression_target``, ``self._C_ell_for_basis``. Computes the eigendecomposition of the compression matrix (chosen by ``self._compression_target``) and selects modes to keep based on either an eigenvalue threshold or a fraction of total modes. Supports per-field thresholds and separate E/B thresholds for spin-2: - float: broadcast to all fields - list[float]: per-field threshold - list[float | tuple[float, float]]: tuples give (E, B) split for spin-2 Raises ------ ValueError If both ``epsilon`` and ``mode_fraction`` were provided at construction. If ``mode_fraction`` is not in (0, 1]. If ``C_ell`` is required but not provided. """ epsilon = self._epsilon mode_fraction = self._mode_fraction compression_target = self._compression_target C_ell = self._C_ell_for_basis # Parse per-field thresholds eps_list = self._parse_per_field_thresholds(epsilon, "epsilon") mf_list = self._parse_per_field_thresholds(mode_fraction, "mode_fraction") # Validate mutual exclusivity (check scalar level) if eps_list is not None and mf_list is not None: raise ValueError( "epsilon and mode_fraction are mutually exclusive. " "Provide only one compression criterion." ) if mf_list is not None: # Use original error message format for scalar inputs scalar_input = isinstance(mode_fraction, (int, float)) for i, mf in enumerate(mf_list): if isinstance(mf, tuple): for v in mf: if not (0 < v <= 1): raise ValueError( f"mode_fraction[{i}] values must be in (0, 1], got {mf}" ) else: if not (0 < mf <= 1): if scalar_input: raise ValueError(f"mode_fraction must be in (0, 1], got {mf}") raise ValueError( f"mode_fraction[{i}] must be in (0, 1], got {mf}" ) # Default epsilon if neither specified if eps_list is None and mf_list is None: eps_list = [1e-6] * self.n_components # Determine if we need per-field decomposition has_any_tuple = ( eps_list is not None and any(isinstance(e, tuple) for e in eps_list) ) or (mf_list is not None and any(isinstance(m, tuple) for m in mf_list)) need_per_field = ( has_any_tuple or self.n_components > 1 or any(s == 2 for s in self._spins) ) if need_per_field: # Per-field decomposition path U_blocks = [] for comp_idx in range(self.n_components): eps_i = eps_list[comp_idx] if eps_list is not None else None mf_i = mf_list[comp_idx] if mf_list is not None else None U_i, _ = self._eigendecompose_field( comp_idx, compression_target, eps_i, mf_i, C_ell ) U_blocks.append(U_i) # Assemble block-diagonal U total_kept = sum(U_i.shape[1] for U_i in U_blocks) U_full = np.zeros((self.n_pix, total_kept), dtype=np.float64) col = 0 for comp_idx, U_i in enumerate(U_blocks): pix_start = self._pix_offsets[comp_idx] pix_end = self._pix_offsets[comp_idx + 1] n_kept_i = U_i.shape[1] U_full[pix_start:pix_end, col : col + n_kept_i] = U_i col += n_kept_i self._eigenvectors = U_full self._eigenvalues = None # Not meaningful for combined E/B self.dim = total_kept else: # Single-field spin-0 path (backward compatible) eps_scalar = eps_list[0] if eps_list is not None else None mf_scalar = mf_list[0] if mf_list is not None else None compression_matrix = self._build_compression_matrix(compression_target, C_ell) eigenvectors, eigenvalues = self._eigendecompose_single( compression_matrix, eps_scalar, mf_scalar ) self._eigenvalues = eigenvalues self._eigenvectors = eigenvectors self.dim = eigenvectors.shape[1] # Precompute compression-dependent quantities that don't depend on C_ell self._precompute_compression_products() def _precompute_compression_products(self) -> None: """ Precompute matrices that depend only on compression eigenvectors. These quantities are independent of C_ell and can be computed once after compression is applied, providing O(ℓ²) speedup for Fisher matrix computation. Precomputes: - _U_N_U: U^T @ N @ U (compressed noise covariance) - _VU: V @ U (harmonic operator applied to eigenvectors) - _V_N_inv: V @ N^{-1} (for SMW formula in QML) - _V_Ninv_VT: V @ N^{-1} @ V^T (SMW kernel matrix) - Derivative diagonals: E_ℓ for all ℓ (inherited from base class) - Buffers for intermediate computations References ---------- .. [1] Gjerløw, E., et al. (2015). Section 6.2 - "Precompute: PY" """ U = self._eigenvectors # (n_pix, dim) # U^T @ N @ U - compressed noise covariance (independent of C_ell). # Goes through _N_symmetric so the precompute can run after # _factorise_noise has overwritten self._N in place. self._U_N_U = U.T @ self._N_symmetric @ U # V @ U - used for signal covariance transformation (independent of C_ell) # VU has shape (n_modes, dim) self._VU = self._V @ U # Make U_N_U symmetric (numerical stability) — in place to avoid # the dim² broadcast temporaries. symmetrize_inplace(self._U_N_U) # SMW components for get_weighted_data # V @ N^{-1} via Cholesky solve: (N^{-1} V^T)^T = cholesky_solve(N_chol, V^T)^T self._V_N_inv = cholesky_solve(self._N_chol, self._V.T).T # V @ N^{-1} @ V^T (n_modes, n_modes) - the M matrix in SMW self._V_Ninv_VT = matrix_mult(self._V_N_inv, self._V.T) symmetrize_inplace(self._V_Ninv_VT) # Precompute derivative diagonals (from base class) self._precompute_derivative_diagonals() # Pre-allocate buffers for frequently called methods self._allocate_buffers() def _allocate_buffers(self) -> None: """ Pre-allocate reusable buffers for intermediate computations. This reduces memory allocation overhead in frequently called methods like get_derivative_matrix and get_projected_inverse. """ # Buffer for VU * diagonal scaling: (n_modes, dim) self._VU_scaled_buffer = np.empty( (self.n_modes, self.dim), dtype=np.float64, order="C" ) # Buffer for U_S_U computation: (dim, dim) self._U_S_U_buffer = np.empty((self.dim, self.dim), dtype=np.float64, order="F")
[docs] def get_projected_inverse(self, C_ell) -> np.ndarray: """ Compute the basis-space inverse covariance. In pixel-direct mode this is the exact ``(N + S)^{-1}``; on a truncated compressed basis it is ``(U^T (N + S) U)^{-1}`` — the inverse of the restricted operator on the kept subspace. Parameters ---------- C_ell : numpy.ndarray or dict Power spectrum (array for single-field, dict for multi-field). Returns ------- numpy.ndarray Basis-space inverse covariance, shape (dim, dim). """ if not self._is_compressed: C = self._build_signal_matrix_direct() + self._N return matrix_inverse_symm(np.asfortranarray(C)) if self._eigenvectors is None: raise RuntimeError( "Compression not applied. Pass epsilon or mode_fraction " "to PixelBasis(...) at construction." ) return self.get_inverse(C_ell)
[docs] def get_derivative_matrix( self, ell: int, key, *, symmetry_mode=None, ) -> np.ndarray: """ Get derivative matrix ∂C/∂C_ℓ in the basis space. Parameters ---------- ell : int Multipole. key : SpectrumKey Identifies the spectrum (component pair + spin kind). symmetry_mode : SymmetryMode or None ``None`` keeps the SYMMETRIC default. Returns ------- numpy.ndarray Derivative matrix of shape (dim, dim). """ from ..spectrum_key import SpectrumKind, SymmetryMode, kind_to_legacy_mode if symmetry_mode is None: symmetry_mode = SymmetryMode.SYMMETRIC is_cross = key.comp_i != key.comp_j mode = kind_to_legacy_mode(key.kind, is_cross=is_cross) if not self._is_compressed: return self._get_derivative_direct(ell, key.comp_i, key.comp_j, mode) if self._eigenvectors is None: raise RuntimeError( "Compression not applied. Pass epsilon or mode_fraction " "to PixelBasis(...) at construction." ) # Single spin-0 field: use precomputed diagonal. The precomputed # diagonal is the SS derivative; reject other kinds so a bogus key # doesn't silently return wrong-kind data. if self.n_components == 1 and self._spins[0] == 0: if key.kind is not SpectrumKind.SS or key.comp_i != 0 or key.comp_j != 0: raise ValueError( f"single spin-0 basis only supports SpectrumKey(0, 0, SS); got {key}" ) E_diag = self._derivative_diagonals[ell] np.multiply(self._VU, E_diag[:, np.newaxis], out=self._VU_scaled_buffer) return matrix_mult(self._VU_scaled_buffer.T, self._VU) E = self._build_derivative_matrix_with_spins( ell, key.comp_i, key.comp_j, mode, symmetry_mode=symmetry_mode ) E_VU = matrix_mult(E, self._VU) return matrix_mult(self._VU.T, E_VU)
[docs] def get_covariance(self, C_ell) -> np.ndarray: """ Compute the basis-space covariance. In pixel-direct mode this is the exact ``N + S``; on a truncated compressed basis it is ``U^T (N + S) U`` — the restriction of the full covariance to the kept subspace. Parameters ---------- C_ell : numpy.ndarray or dict Power spectrum (array for single-field, dict for multi-field). Returns ------- numpy.ndarray Basis-space covariance, shape (dim, dim). """ if not self._is_compressed: return self._build_signal_matrix_direct() + self._N if self._eigenvectors is None: raise RuntimeError( "Compression not applied. Pass epsilon or mode_fraction " "to PixelBasis(...) at construction." ) if isinstance(C_ell, dict): lambda_matrix = self._build_lambda_matrix(C_ell) VU_Lambda = matrix_mult(lambda_matrix, self._VU) U_S_U = matrix_mult(self._VU.T, VU_Lambda) return self._U_N_U + U_S_U # Single-field: use buffers for efficiency Lambda_diag = self._build_lambda_diagonal(C_ell) np.multiply(self._VU, Lambda_diag[:, np.newaxis], out=self._VU_scaled_buffer) np.matmul(self._VU.T, self._VU_scaled_buffer, out=self._U_S_U_buffer) return self._U_N_U + self._U_S_U_buffer
[docs] def get_weighted_data( self, data: np.ndarray, C_ell, C_c_inv: np.ndarray | None = None, stable_inner_inv: np.ndarray | None = None, ) -> np.ndarray: """ Compute C^{-1} @ d for QML estimation. Parameters ---------- data : numpy.ndarray Pixel-space data vector of shape (n_pix,) or (n_pix, n_sims). C_ell : numpy.ndarray or dict Power spectrum (array for single-field, dict for multi-field). C_c_inv : numpy.ndarray, optional Precomputed inverse covariance. stable_inner_inv : numpy.ndarray, optional Unused for pixel basis; accepted for ABC signature parity. Returns ------- numpy.ndarray Weighted data of shape (dim,) or (dim, n_sims). """ if not self._is_compressed: if C_c_inv is None: C_c_inv = self.get_projected_inverse(C_ell) return matrix_mult(C_c_inv, data) if self._eigenvectors is None: raise RuntimeError( "Compression not applied. Pass epsilon or mode_fraction " "to PixelBasis(...) at construction." ) d_compressed = self._eigenvectors.T @ data if C_c_inv is None: C_c_inv = self.get_inverse(C_ell) return matrix_mult(C_c_inv, d_compressed)
[docs] def quadratic_form(self, data: np.ndarray, C_ell) -> float: """ Compute ``d^T C^{-1} d`` using the basis-space inverse. In pixel-direct mode this equals the full pixel-space quadratic form ``d^T (N + S)^{-1} d``. On a truncated compressed basis it is ``(U^T d)^T (U^T (N + S) U)^{-1} (U^T d)`` — the quadratic form of the restricted operator on the kept subspace. Parameters ---------- data : numpy.ndarray Pixel-space data vector of length n_pix. C_ell : numpy.ndarray or dict Power spectrum (array for single-field, dict for multi-field). Returns ------- float Quadratic form value. """ if not self._is_compressed: C_inv = self.get_projected_inverse(C_ell) return float(data.T @ C_inv @ data) if self._eigenvectors is None: raise RuntimeError( "Compression not applied. Pass epsilon or mode_fraction " "to PixelBasis(...) at construction." ) d_basis = self._eigenvectors.T @ data C_inv = self.get_inverse(C_ell) return float(d_basis.T @ C_inv @ d_basis)
[docs] def compute_fisher_matrix( self, C_ell, spectra_list=None, ell_min: int = 2, ell_max: int | None = None, *, symmetry_mode=None, ) -> np.ndarray: """Compute Fisher matrix. Parameters ---------- C_ell : numpy.ndarray or dict Power spectrum. Single-field: numpy.ndarray. Multi-field: dict keyed by SpectrumKey. spectra_list : list[SpectrumKey] or None For multi-field, the list of SpectrumKey instances enumerating the spectra to include. None for single-field. ell_min : int Minimum multipole. ell_max : int or None Maximum multipole. symmetry_mode : SymmetryMode or None Forwarded to the per-ℓ derivative builder. ``None`` keeps the SYMMETRIC default. Required as ``DIRECTIONAL`` when ``spectra_list`` contains a ``CG`` cross-pair entry — the builder otherwise rejects the ``mode=2`` slot used by CG. Returns ------- numpy.ndarray Fisher matrix. """ if ell_max is None: ell_max = self.lmax # Single-field path if not isinstance(C_ell, dict): if spectra_list is not None: raise ValueError( "spectra_list should be None for single-field (array) input" ) n_ell = ell_max - ell_min + 1 fisher = np.zeros((n_ell, n_ell)) C_c_inv = self.get_inverse(C_ell) from ..spectrum_key import SpectrumKey, SpectrumKind # Array C_ell input is the spin-0 single-field signature; spin-2 # single fields carry multiple spectra (EE/BB/EB) and must enter # through the dict path below. if self._spins[0] != 0: raise ValueError( f"Array C_ell requires spin-0; got spin={self._spins[0]}. " "Pass a dict for spin-2 single-field cases." ) single_field_key = SpectrumKey(0, 0, SpectrumKind.SS, spins=(0,)) cinv_times_dc = {} for ell in range(ell_min, ell_max + 1): dC = self.get_derivative_matrix(ell, single_field_key) cinv_times_dc[ell] = matrix_mult(C_c_inv, dC) for ell_i in range(ell_min, ell_max + 1): for ell_j in range(ell_i, ell_max + 1): idx_i = ell_i - ell_min idx_j = ell_j - ell_min fisher_val = 0.5 * matrix_trace( cinv_times_dc[ell_i], cinv_times_dc[ell_j] ) fisher[idx_i, idx_j] = fisher_val if idx_i != idx_j: fisher[idx_j, idx_i] = fisher_val return fisher # Multi-field path if spectra_list is None: raise ValueError("spectra_list is required for multi-field (dict) input") n_ell = ell_max - ell_min + 1 n_spec = len(spectra_list) fisher = np.zeros((n_spec * n_ell, n_spec * n_ell)) C_c_inv = self.get_inverse(C_ell) cinv_times_dc = {} for spec_idx, spec_entry in enumerate(spectra_list): for ell in range(ell_min, ell_max + 1): dC = self.get_derivative_matrix( ell, spec_entry, symmetry_mode=symmetry_mode ) cinv_times_dc[(spec_idx, ell)] = matrix_mult(C_c_inv, dC) for spec_a in range(n_spec): for ell_a in range(ell_min, ell_max + 1): idx_a = spec_a * n_ell + (ell_a - ell_min) for spec_b in range(spec_a, n_spec): ell_b_start = ell_a if spec_a == spec_b else ell_min for ell_b in range(ell_b_start, ell_max + 1): idx_b = spec_b * n_ell + (ell_b - ell_min) fisher_val = 0.5 * matrix_trace( cinv_times_dc[(spec_a, ell_a)], cinv_times_dc[(spec_b, ell_b)], ) fisher[idx_a, idx_b] = fisher_val if idx_a != idx_b: fisher[idx_b, idx_a] = fisher_val return fisher
[docs] def prepare_for_basis(self, C_ell_dict: dict) -> BasisPrepared: """Pre-bake the basis-space inverse and logdet for reuse across sims.""" if self._is_compressed and self._eigenvectors is None: raise RuntimeError( "Compression not applied. Pass epsilon or mode_fraction " "to PixelBasis(...) at construction." ) from ..basics import matrix_slogdet_symm C_c = self.get_covariance(C_ell_dict) C_c_inv = matrix_inverse_symm(C_c) _, logdet = matrix_slogdet_symm(C_c) return BasisPrepared(C_c_inv, logdet)
[docs] def quadratic_form_from_prepared( self, data: np.ndarray, C_c_inv: np.ndarray ) -> float: """Compute ``d^T C^{-1} d`` using a pre-baked basis-space inverse.""" if not self._is_compressed: return float(data.T @ C_c_inv @ data) if self._eigenvectors is None: raise RuntimeError( "Compression not applied. Pass epsilon or mode_fraction " "to PixelBasis(...) at construction." ) d_c = self._eigenvectors.T @ data return float(d_c.T @ C_c_inv @ d_c)
[docs] def get_logdet(self, C_ell) -> float: """ Compute the log determinant of the basis-space covariance. In pixel-direct mode this equals the exact full ``log|N + S|``. On a truncated compressed basis it is the logdet of the restricted operator ``U^T (N + S) U`` — not the full ``log|N + S|``; see :meth:`get_full_logdet` for the ABC contract. Parameters ---------- C_ell : numpy.ndarray or dict Power spectrum (array for single-field, dict for multi-field). """ if isinstance(C_ell, dict): _, logdet = self.prepare_for_basis(C_ell) return logdet from ..basics import matrix_slogdet_symm C_c = self.get_covariance(C_ell) _, logdet = matrix_slogdet_symm(np.asfortranarray(C_c)) return logdet
[docs] def get_full_inverse(self, C_ell) -> np.ndarray: """Reconstruct full ``n_pix x n_pix`` inverse from the basis-space form. Exact in pixel-direct mode (``U`` is the identity). Lossy on a truncated compressed pixel basis — the result lives in the kept subspace and is zero on the truncated complement. """ C_basis_inv = self.get_inverse(C_ell) if not self._is_compressed or self._eigenvectors is None: return C_basis_inv U = self._eigenvectors return U @ C_basis_inv @ U.T
@property def compression_ratio(self) -> float: """ Ratio of kept modes to original pixels. Returns ------- float Compression ratio (1.0 means no compression). """ return self.dim / self.n_pix @property def eigenvalues(self) -> np.ndarray | None: """ Eigenvalues of kept modes (sorted descending). Returns ------- numpy.ndarray or None Eigenvalues if compression has been applied, None otherwise. """ return self._eigenvalues @property def compression_target(self) -> str | None: """ The selector configured at construction (``compression_target`` kwarg). Names the matrix that ``_apply_compression`` eigendecomposes: ``"harmonic"``, ``"noise_weighted"``, ``"total_covariance"``, or ``"snr"``. Returns the configured value regardless of whether compression has actually been applied yet. """ return self._compression_target
[docs] @classmethod def available_bases(cls) -> dict[str, str]: """ Get available compression bases and their descriptions. Returns ------- dict Dictionary mapping basis names to their descriptions. Examples -------- >>> PixelBasis.available_bases() {'harmonic': 'P_h = V^T V (pure harmonic projector)', 'noise_weighted': 'P_h N^{-1} P_h (inverse noise weighting)', 'total_covariance': 'P_h C^{-1} P_h (full covariance weighting, requires C_ell)', 'snr': 'S^{1/2} N^{-1} S^{1/2} (signal-to-noise ratio, requires C_ell)'} """ return COMPRESSION_BASES.copy()