"""
Harmonic computation basis (Tegmark-like) for CMB Fisher matrix computation.
This module implements direct harmonic space projection where data is
transformed directly to n_modes dimensions via the harmonic operator V.
"""
from __future__ import annotations
from functools import cached_property
import numpy as np
from ..basics import (
add_diagonal,
cholesky_decomposition,
cholesky_solve,
inv,
matrix_inverse_symm,
matrix_mult,
matrix_slogdet_symm,
matrix_trace,
smw_inverse,
smw_logdet,
smw_quadratic_form,
solve_linear,
symmetrize_inplace,
)
from .base import (
_LAMBDA_INV_CAP,
_LAMBDA_ZERO_THRESHOLD,
_MATRIX_REGULARIZATION,
BasisPrepared,
ComputationBasis,
)
[docs]
class HarmonicBasis(ComputationBasis):
"""
Direct harmonic space computation basis (Tegmark-like).
Transforms directly to n_modes dimensions via V. Fast and efficient
when n_modes << n_pix. No additional eigenvalue truncation.
The key operations are:
- Data projection: d̄ = P @ d where P = V
- Projected covariance: C̄ = V N V^T + Λ
- Projected inverse: V C^{-1} V^T = M - M K^{-1} M (via SMW)
Parameters
----------
N : numpy.ndarray
Noise covariance matrix of shape (n_pix, n_pix). The basis takes
ownership of this buffer; ``_factorise_noise()`` overwrites it
in place during setup.
theta : numpy.ndarray
Colatitude angles for active pixels in radians.
phi : numpy.ndarray
Longitude angles for active pixels in radians.
lmax : int
Maximum multipole for harmonic expansion.
beam : numpy.ndarray or None, optional
Beam window function B_ℓ for ℓ=2 to lmax.
compress : bool, optional
If True, use m-block compression for K inversion.
Approximates K as block-diagonal in azimuthal quantum number |m|,
giving ~lmax^2 speedup. Default is False.
delta_m : int, optional
Bandwidth for m-block coupling. delta_m=0 means block-diagonal
(no coupling between different |m|). delta_m=lmax recovers exact
result. Default is 0.
Examples
--------
>>> import numpy as np
>>> from cosmocore.basis import HarmonicBasis
>>> hc = HarmonicBasis(N, theta, phi, lmax=100)
>>> hc.setup()
>>> fisher_element = hc.compute_fisher_element(C_ell, ell_i=10, ell_j=10)
References
----------
.. [1] Tegmark, M. "How to measure CMB power spectra without losing information"
Phys. Rev. D 55, 5895 (1997)
"""
[docs]
def __init__(
self,
N: np.ndarray,
theta: np.ndarray | tuple[np.ndarray, ...],
phi: np.ndarray | tuple[np.ndarray, ...],
lmax_signal: int,
beam: np.ndarray | None = None,
spins: list[int] | None = None,
lmin_signal: list[int] | None = None,
lmin: int | None = None,
lmax: int | None = None,
fiducial_C_ell: np.ndarray | None = None,
S_fixed: np.ndarray | None = None,
compress: bool = False,
delta_m: int = 0,
):
super().__init__(
N=N,
theta=theta,
phi=phi,
lmax_signal=lmax_signal,
beam=beam,
spins=spins,
lmin_signal=lmin_signal,
lmin=lmin,
lmax=lmax,
fiducial_C_ell=fiducial_C_ell,
S_fixed=S_fixed,
)
self._init_harmonic_internals()
self.dim = self.n_modes_total
self._compress = compress
self._delta_m = delta_m
if self._compress:
# Multi-field + compress not yet supported
if self.n_components > 1:
raise NotImplementedError(
"m-block compression is only supported for single-field "
"spin-0 configurations. Multi-field support will be added later."
)
if any(s != 0 for s in self._spins):
raise NotImplementedError(
"m-block compression is only supported for spin-0 fields."
)
@property
def method(self) -> str:
"""Computation basis name."""
return "harmonic"
@property
def projector(self) -> np.ndarray:
"""Projection matrix V (n_modes × n_pix)."""
return self._V
[docs]
def release_pixel_projector(self) -> None:
"""Drop V after SMW components are built.
QML hot paths (Fisher trace, Spectra weighted-data) read only
``_V_N_inv`` and ``_V_Ninv_VT``. The remaining V consumers —
``get_covariance``/``get_inverse``,
``to_basis``, m-block compression, and PICSLike ``do_cross``
with the harmonic basis — must not be invoked after this call.
Raises if m-block compression was requested at construction.
"""
if self._compress:
raise RuntimeError(
"release_pixel_projector incompatible with m-block compression"
)
self._harmonic_basis._V = None
[docs]
def setup(self) -> None:
"""
Build V and precompute SMW components.
Computes:
- V: harmonic operator (n_modes × n_pix)
- V N^{-1}: precomputed for SMW
- V N^{-1} V^T: SMW kernel
- V N V^T: harmonic-space noise covariance
- log|N|: for determinant calculations
- Derivative diagonals: E_ℓ for all ℓ
When switch optimization is enabled (lswitch_high < lmax):
- V is built only for ℓ in [lswitch_low, lswitch_high]
- S_fixed is computed for ℓ > lswitch_high using fiducial C_ell
- N_eff = N + S_fixed is used instead of N
This dramatically reduces the SMW dimension.
"""
import time as _time
# Build harmonic operator, ell-mode mapping, and derivative diagonals
_t0 = _time.time()
self._build_basis()
# _V_blocks is the C-order source PixelBasis re-reads during
# apply_compression. HarmonicBasis never does; release here.
self._harmonic_basis._V_blocks = None
_t_basis = _time.time() - _t0
# Compute effective noise matrix when switch optimization is enabled
_t_eff = 0.0
if self._use_switch_optimization:
_t0 = _time.time()
self._compute_effective_noise()
_t_eff = _time.time() - _t0
# Factor the (effective) noise in place; releases the symmetric N
# buffer and any caller-provided dense N_inv. After this call,
# cholesky_solve via self._N_chol is the only path to N^{-1}.
self._factorise_noise()
_t0 = _time.time()
self._compute_smw_components()
_t_smw = _time.time() - _t0
from ..logger import get_logger
logger = get_logger("basis", feedback_level=4)
logger.debug(
f"Basis setup: V={_t_basis:.2f}s, eff_noise={_t_eff:.2f}s, SMW={_t_smw:.2f}s"
)
if self._compress:
self._compute_mblock_smw_components()
def _compute_effective_noise(self) -> None:
"""
Compute effective noise N_eff = N + S_fixed for switch optimization.
S_fixed is the signal matrix contribution from multipoles outside
the switch range (ℓ > lswitch_high). This contribution is constant
across all parameter points and can be absorbed into an effective
noise matrix.
The SMW formula then applies only to the varying multipoles:
C = N_eff + S_varied
(N_eff + S_varied)^{-1} via SMW with reduced V
This significantly reduces the SMW dimension from
(lmax+1)² - 4 to (lswitch_high+1)² - 4.
S_fixed can be provided directly (preferred) or computed from
fiducial_C_ell (legacy). The direct approach ensures consistency
with the parent class's signal matrix computation.
"""
if self._S_fixed is not None:
S_fixed = np.asarray(self._S_fixed, dtype=np.float64)
elif self._fiducial_C_ell is not None:
S_fixed = self._compute_s_fixed_from_fiducial()
else:
raise ValueError(
"Either S_fixed or fiducial_C_ell must be provided when using "
"switch optimization (lswitch_high < lmax)"
)
# In-place — _compute_smw_components rebuilds T from S_fixed via the
# identity T = V_Ninv_VT − V_N_inv S_fixed V_N_inv^T, so we don't need
# an _N_original copy.
np.add(self._N, S_fixed, out=self._N)
self._S_fixed = S_fixed
def _compute_s_fixed_from_fiducial(self) -> np.ndarray:
"""
Legacy method: compute S_fixed from fiducial_C_ell using compute_00_contribution.
This is kept for backward compatibility but the preferred approach is to
compute S_fixed externally using the existing signal matrix infrastructure.
"""
from ..signal_kernels import compute_00_contribution
cl_fixed = np.zeros(self.lmax_signal + 1, dtype=np.float64)
for ell in range(self.lmax + 1, self.lmax_signal + 1):
if ell < len(self._fiducial_C_ell):
cl_fixed[ell] = self._fiducial_C_ell[ell]
point_vectors = np.column_stack(
[
np.sin(self.theta) * np.cos(self.phi),
np.sin(self.theta) * np.sin(self.phi),
np.cos(self.theta),
]
)
S_fixed = np.zeros((self.n_pix, self.n_pix), dtype=np.float64)
legendre_buffer = np.empty(self.lmax_signal + 1, dtype=np.float64)
compute_00_contribution(
cl_fixed,
S_fixed,
point_vectors,
point_vectors,
legendre_buffer,
mode=0,
)
# compute_00_contribution fills lower triangle only; symmetrise.
S_fixed = S_fixed + S_fixed.T - np.diag(np.diag(S_fixed))
return S_fixed
def _compute_smw_components(self) -> None:
"""
Precompute matrices needed for SMW formula.
Computes:
- V @ N^{-1} (used in SMW and for projecting data)
- V @ N^{-1} @ V^T (SMW kernel matrix)
- log|N| (for determinant computation)
Note: V @ N @ V^T is computed lazily when needed (get_covariance)
to avoid expensive O(n_pix³) inversion when only Fisher is needed.
"""
self._V_N_inv = cholesky_solve(self._N_chol, self._V.T).T
# Symmetrise in place: M+M.T and 0.5*(...) each allocate a fresh
# n_modes² buffer (~88 GB transient at QU/nside=64/lmax=192).
self._V_Ninv_VT = matrix_mult(self._V_N_inv, self._V.T)
symmetrize_inplace(self._V_Ninv_VT)
# T = V N_eff^{-1} N_orig N_eff^{-1} V^T, consumed by
# Spectra._compute_noise_cov_compressed. With N_orig = N_eff − S_fixed
# the algebra reduces to T = V_Ninv_VT − V_N_inv S_fixed V_N_inv^T,
# avoiding an n_pix² N_orig copy. Without switch optimisation, T = V_Ninv_VT.
if self._S_fixed is not None:
corr = matrix_mult(matrix_mult(self._V_N_inv, self._S_fixed), self._V_N_inv.T)
T = self._V_Ninv_VT - corr
self._noise_cov_T = symmetrize_inplace(T)
self._S_fixed = None
else:
self._noise_cov_T = self._V_Ninv_VT
@cached_property
def _V_N_VT(self) -> np.ndarray:
"""Lazily compute V @ N @ V^T via the Cholesky factor.
Uses N = L L^T so V N V^T = (V L)(V L)^T — cheaper than building
N from L and tolerant of in-place factor storage in commit 3.
"""
VL = matrix_mult(self._V, self._L)
return matrix_mult(VL, VL.T)
# =================================================================
# SMW helpers
# =================================================================
[docs]
def prepare_stable_inner_inv(
self, C_ell, lambda_matrix: np.ndarray | None = None
) -> np.ndarray:
"""Build (I + Lambda M)^{-1} for the stable QML algebra.
This matrix appears identically in three places of the QML
algebra and replaces the unstable subtractive forms:
- Projected inverse: V C^{-1} V^T = M @ (I + Lambda M)^{-1}
- Data weighting: V C^{-1} d = (I + Lambda M)^{-1} @ V N^{-1} d
- Noise bias matrix: A = I - M K^{-1} = (I + Lambda M)^{-1}
All three follow from the identity
``M - M K^{-1} M = M K^{-1} Lambda^{-1} = M (I + Lambda M)^{-1}``
and never subtract large nearly-equal matrices, so they remain
accurate in the cosmic-variance-limited regime where the legacy
SMW form loses precision (e.g. T at low ell with sub-uK
polarization noise).
Parameters
----------
C_ell : np.ndarray or dict
Power spectrum (single-field array or multi-field dict).
lambda_matrix : np.ndarray, optional
Pre-built full Lambda matrix. If None, built from C_ell.
Returns
-------
np.ndarray
(I + Lambda M)^{-1}, n_modes_total x n_modes_total.
"""
if lambda_matrix is None:
c_ell_arr, c_ell_dict, is_single = self._normalize_c_ell(C_ell)
if is_single:
lambda_diag = self._build_lambda_diagonal(c_ell_arr)
lambda_matrix = np.diag(lambda_diag)
else:
lambda_matrix = self._build_lambda_matrix(c_ell_dict)
inner = lambda_matrix @ self._V_Ninv_VT
inner[np.diag_indices_from(inner)] += 1.0
return inv(inner)
def _smw_projected_inverse(
self,
M: np.ndarray,
*,
lambda_inv_diag: np.ndarray | None = None,
lambda_inv_matrix: np.ndarray | None = None,
K: np.ndarray | None = None,
lambda_matrix: np.ndarray | None = None,
) -> np.ndarray:
"""Compute the projected inverse V (S+N)^{-1} V^T.
Two algebraically equivalent forms are supported:
- **Subtractive (legacy)**: ``M - M K^{-1} M`` with
``K = Lambda^{-1} + M``. Numerically unstable in the
signal-dominated regime (``Lambda >> M^{-1}``) because the two
terms become large and nearly equal — float64 cancellation can
drive entries to spuriously negative values, breaking
downstream Fisher PD-ness. Used when ``K`` or
``lambda_inv_*`` is supplied (preserves backward compatibility
for callers that already built K).
- **Stable**: ``(M^{-1} + Lambda)^{-1}``. Algebraically
identical, but never subtracts large nearly-equal matrices —
accurate in both noise- and signal-dominated regimes.
Triggered automatically when only ``lambda_matrix`` is
provided.
Parameters
----------
M : numpy.ndarray
V @ N^{-1} @ V^T (or a sub-block of it).
lambda_inv_diag, lambda_inv_matrix, K : optional
Trigger the legacy subtractive form (see above).
lambda_matrix : numpy.ndarray, optional
Full Lambda matrix (signal-space block). Triggers the stable
``(M^{-1} + Lambda)^{-1}`` form. Mutually exclusive with the
legacy keywords.
"""
if lambda_matrix is not None:
if (
K is not None
or lambda_inv_diag is not None
or lambda_inv_matrix is not None
):
raise ValueError(
"lambda_matrix is mutually exclusive with K / "
"lambda_inv_diag / lambda_inv_matrix"
)
# Stable form: V C^{-1} V^T = M (I + Lambda M)^{-1}.
# Algebraically equivalent to M - M K^{-1} M but never
# subtracts large nearly-equal matrices and never inverts
# M (which is rank-deficient on a partial sky) or Lambda
# (which can be ill-conditioned in the cosmic-variance
# limit). The matrix (I + Lambda M) has eigenvalues 1 +
# eig(Lambda^{1/2} M Lambda^{1/2}) >= 1, so it is always
# well-conditioned and invertible. Result must be symmetric;
# we enforce it to clean float-eps drift.
# We want VCVT = M @ inv(I + Lambda @ M). Compute via
# solve((I + Lambda @ M)^T, M^T)^T which is equivalent. By
# symmetry of Lambda and M, (I + Lambda M)^T = I + M Lambda;
# we use the form that lets numpy's solve do the work.
inner = lambda_matrix @ M
inner[np.diag_indices_from(inner)] += 1.0
VCVT = solve_linear(inner.T, M.T).T
symmetrize_inplace(VCVT)
return np.asfortranarray(VCVT)
if K is None:
if lambda_inv_diag is not None:
K = M.copy()
K[np.diag_indices_from(K)] += lambda_inv_diag
else:
K = lambda_inv_matrix + M
kernel_inv = matrix_inverse_symm(np.asfortranarray(K), overwrite=True)
return M - matrix_mult(matrix_mult(M, kernel_inv), M)
def _normalize_c_ell(self, C_ell):
"""Normalize C_ell input to canonical form.
Returns (c_ell_array, c_ell_dict, is_single_field).
Single-component spin-0 dicts with a single 1D entry are unwrapped.
"""
if not isinstance(C_ell, dict):
return C_ell, None, True
if self.n_components == 1 and self._spins[0] == 0:
first_val = next(iter(C_ell.values()))
if isinstance(first_val, np.ndarray) and first_val.ndim == 1:
return first_val, None, True
return None, C_ell, False
def _lambda_inv_diag_from_array(self, C_ell: np.ndarray) -> np.ndarray:
"""Build Lambda^{-1} diagonal from single-field C_ell array."""
Lambda_diag = self._build_lambda_diagonal(C_ell)
return np.where(
Lambda_diag > _LAMBDA_ZERO_THRESHOLD, 1.0 / Lambda_diag, _LAMBDA_INV_CAP
)
# =================================================================
# M-block compression
# =================================================================
def _compute_mblock_smw_components(self) -> None:
"""Compute block-wise V N^{-1} V^T for m-block compression.
Instead of storing the full V N^{-1} V^T matrix, stores blocks
indexed by (mi, mj) pairs within the delta_m bandwidth.
The full V_N_inv is still computed for data projection.
"""
m_to_modes = self._m_to_modes
max_m = max(m_to_modes.keys())
# Block-wise V_m N^{-1} V_{m'}^T
self._vninvvt_blocks = {}
for mi in sorted(m_to_modes.keys()):
V_mi_Ninv = self._V_N_inv[m_to_modes[mi], :]
for mj in range(
max(0, mi - self._delta_m), min(max_m, mi + self._delta_m) + 1
):
if mj not in m_to_modes:
continue
V_mj = self._V[m_to_modes[mj], :]
block = V_mi_Ninv @ V_mj.T
self._vninvvt_blocks[(mi, mj)] = block
# Build reverse mapping: global mode index -> (m, local_index)
self._mode_to_m_local = {}
for m, modes in m_to_modes.items():
for local_idx, global_idx in enumerate(modes):
self._mode_to_m_local[global_idx] = (m, local_idx)
# Build mapping: for each m-block, which local indices belong to each ell
self._mblock_ell_local_indices = {}
for m in sorted(m_to_modes.keys()):
modes = m_to_modes[m]
mode_set = set(modes)
mode_to_local = {g: i for i, g in enumerate(modes)}
ell_map = {}
for ell in range(self._lmin_smw, self._lmax_smw + 1):
ell_modes = self._ell_to_modes[ell]
local_indices = [mode_to_local[em] for em in ell_modes if em in mode_set]
if local_indices:
ell_map[ell] = local_indices
self._mblock_ell_local_indices[m] = ell_map
def _get_projected_inverse_mblock(self, C_ell):
"""Compute V C^{-1} V^T block by block using SMW formula.
For delta_m=0 (block-diagonal): each m-block is independent.
Returns dict mapping m -> block matrix.
Parameters
----------
C_ell : numpy.ndarray
Power spectrum values for ell = 2 to lmax.
Returns
-------
dict
Mapping from m -> block matrix (V C^{-1} V^T restricted to m).
"""
lambda_inv_diag = self._lambda_inv_diag_from_array(C_ell)
result_blocks = {}
for m in sorted(self._m_to_modes.keys()):
modes = self._m_to_modes[m]
M_m = self._vninvvt_blocks[(m, m)]
result_blocks[m] = self._smw_projected_inverse(
M_m, lambda_inv_diag=lambda_inv_diag[modes]
)
return result_blocks
def _get_projected_inverse_mblock_banded(self, C_ell):
"""Compute full V C^{-1} V^T using banded m-block K.
Groups m-values into coupled sets within delta_m bandwidth,
builds combined K for each group, inverts, and places entries
into the full n_modes x n_modes result matrix.
Parameters
----------
C_ell : numpy.ndarray
Power spectrum values for ell = 2 to lmax.
Returns
-------
numpy.ndarray
Full projected inverse matrix (n_modes, n_modes).
"""
lambda_inv_diag = self._lambda_inv_diag_from_array(C_ell)
m_values = sorted(self._m_to_modes.keys())
max_m = max(m_values)
# Build groups of coupled m-values via BFS
assigned = set()
groups = []
for m in m_values:
if m in assigned:
continue
group = []
queue = [m]
while queue:
current = queue.pop(0)
if current in assigned or current not in self._m_to_modes:
continue
assigned.add(current)
group.append(current)
for neighbor in range(
max(0, current - self._delta_m),
min(max_m, current + self._delta_m) + 1,
):
if neighbor not in assigned and neighbor in self._m_to_modes:
queue.append(neighbor)
groups.append(sorted(group))
n = self.n_modes
result = np.zeros((n, n), dtype=np.float64)
for group in groups:
# Build combined mode list
combined_modes = []
group_offsets = {}
for mg in group:
group_offsets[mg] = len(combined_modes)
combined_modes.extend(self._m_to_modes[mg])
n_combined = len(combined_modes)
# Assemble combined M matrix
M_combined = np.zeros((n_combined, n_combined), dtype=np.float64)
for mi in group:
oi = group_offsets[mi]
ni = len(self._m_to_modes[mi])
for mj in group:
if (mi, mj) not in self._vninvvt_blocks:
continue
oj = group_offsets[mj]
nj = len(self._m_to_modes[mj])
M_combined[oi : oi + ni, oj : oj + nj] = self._vninvvt_blocks[
(mi, mj)
]
result_combined = self._smw_projected_inverse(
M_combined, lambda_inv_diag=lambda_inv_diag[combined_modes]
)
# Place combined result into output matrix
ix = np.ix_(combined_modes, combined_modes)
result[ix] = result_combined
return result
def _assemble_full_from_blocks(self, blocks):
"""Assemble full matrix from m-block dict (zeros in off-block positions).
Parameters
----------
blocks : dict
Mapping from m -> block matrix.
Returns
-------
numpy.ndarray
Full (n_modes, n_modes) matrix with blocks on diagonal.
"""
n = self.n_modes
result = np.zeros((n, n), dtype=np.float64)
for m, block in blocks.items():
modes = self._m_to_modes[m]
ix = np.ix_(modes, modes)
result[ix] = block
return result
def _compute_fisher_mblock(self, C_ell, ell_min, ell_max):
"""Compute Fisher matrix using m-block projected inverse.
For delta_m=0 (block-diagonal), computes Fisher directly from
independent m-blocks. For delta_m>0, assembles the full projected
inverse from banded blocks and uses the standard Fisher path.
Parameters
----------
C_ell : numpy.ndarray
Power spectrum values for ell = 2 to lmax.
ell_min : int
Minimum multipole.
ell_max : int
Maximum multipole.
Returns
-------
numpy.ndarray
Fisher matrix of shape (n_ell, n_ell).
"""
if self._delta_m > 0:
# For banded case, cross-block contributions matter for Fisher.
# Use full matrix from banded K inversion.
V_Cinv_VT = self._get_projected_inverse_mblock_banded(C_ell)
return self._compute_fisher_from_full(V_Cinv_VT, ell_min, ell_max)
# delta_m=0: block-diagonal, each m contributes independently
proj_inv_blocks = self._get_projected_inverse_mblock(C_ell)
n_ell = ell_max - ell_min + 1
fisher = np.zeros((n_ell, n_ell))
for m, block in proj_inv_blocks.items():
ell_local = self._mblock_ell_local_indices[m]
block_size = block.shape[0]
# Precompute block * E_diag for each ell in this m-block
block_E = {}
for ell, local_indices in ell_local.items():
if ell < ell_min or ell > ell_max:
continue
E_diag_local = np.zeros(block_size)
for li in local_indices:
E_diag_local[li] = 1.0
block_E[ell] = block * E_diag_local
# F_{ij} += 0.5 * Tr[(block * E_i)(block * E_j)^T]
ells_in_block = sorted(block_E.keys())
for ii, ell_i in enumerate(ells_in_block):
idx_i = ell_i - ell_min
block_deriv_i = block_E[ell_i]
for ell_j in ells_in_block[ii:]:
idx_j = ell_j - ell_min
block_deriv_j = block_E[ell_j]
val = np.sum(block_deriv_i * block_deriv_j.T)
fisher[idx_i, idx_j] += 0.5 * val
if idx_i != idx_j:
fisher[idx_j, idx_i] += 0.5 * val
return fisher
def _compute_fisher_from_full(self, V_Cinv_VT, ell_min, ell_max):
"""Compute Fisher matrix from full projected inverse (standard path)."""
n_ell = ell_max - ell_min + 1
fisher = np.zeros((n_ell, n_ell))
VCinvVT_E = {}
for ell in range(ell_min, ell_max + 1):
E_diag = self._derivative_diagonals[ell]
VCinvVT_E[ell] = V_Cinv_VT * E_diag
for ell_i in range(ell_min, ell_max + 1):
for ell_j in range(ell_i, ell_max + 1):
idx_i = ell_i - ell_min
idx_j = ell_j - ell_min
fisher_val = 0.5 * matrix_trace(VCinvVT_E[ell_i], VCinvVT_E[ell_j])
fisher[idx_i, idx_j] = fisher_val
if idx_i != idx_j:
fisher[idx_j, idx_i] = fisher_val
return fisher
def _get_projected_inverse_field_blocks(
self, C_ell_dict: dict, field_groups: list[list[int]]
) -> np.ndarray:
"""Compute V C^{-1} V^T exploiting field block-diagonal K.
When K is block-diagonal across field groups, each group's block
can be inverted independently. The result is assembled into the
full ``n_modes_total x n_modes_total`` matrix (with zeros in
cross-group blocks).
Parameters
----------
C_ell_dict : dict
Power spectrum dictionary (multi-field).
field_groups : list of list of int
Independent field groups from ``_detect_field_blocks``.
Returns
-------
numpy.ndarray
Projected inverse of shape ``(n_modes_total, n_modes_total)``.
"""
lambda_matrix = self._build_lambda_matrix(C_ell_dict)
n_total = self.n_modes_total
result = np.zeros((n_total, n_total), dtype=np.float64)
for group in field_groups:
# Collect mode indices for this group
mode_indices = []
for comp in group:
start = self._mode_offsets[comp]
end = self._mode_offsets[comp + 1]
mode_indices.extend(range(start, end))
mode_indices = np.array(mode_indices, dtype=int)
ix = np.ix_(mode_indices, mode_indices)
M_block = self._V_Ninv_VT[ix]
Lambda_block = lambda_matrix[ix]
Lambda_reg = Lambda_block + np.eye(len(mode_indices)) * _MATRIX_REGULARIZATION
Lambda_inv_block = matrix_inverse_symm(np.asfortranarray(Lambda_reg))
result[ix] = self._smw_projected_inverse(
M_block, lambda_inv_matrix=Lambda_inv_block
)
return result
[docs]
def get_projected_inverse(self, C_ell, field_groups=None):
"""
Compute V C^{-1} V^T efficiently using SMW formula.
Parameters
----------
C_ell : numpy.ndarray or dict
Power spectrum. Can be array (single-field) or dict (multi-field).
field_groups : list of list of int or None, optional
Independent field groups from ``_detect_field_blocks``.
If provided and contains more than one group, exploits
field block-diagonal structure for faster K inversion.
Returns
-------
numpy.ndarray
Projected inverse covariance V C^{-1} V^T.
"""
c_ell_arr, c_ell_dict, is_single = self._normalize_c_ell(C_ell)
# M-block compressed path (single-field only)
if self._compress and is_single:
if self._delta_m > 0:
return self._get_projected_inverse_mblock_banded(c_ell_arr)
blocks = self._get_projected_inverse_mblock(c_ell_arr)
return self._assemble_full_from_blocks(blocks)
if is_single:
lambda_inv_diag = self._lambda_inv_diag_from_array(c_ell_arr)
return self._smw_projected_inverse(
self._V_Ninv_VT, lambda_inv_diag=lambda_inv_diag
)
# Multi-field dict path
if field_groups is not None and len(field_groups) > 1:
return self._get_projected_inverse_field_blocks(c_ell_dict, field_groups)
# Stable form: (M^{-1} + Lambda)^{-1}. Avoids the catastrophic
# cancellation of M - M K^{-1} M in the signal-dominated regime,
# which is otherwise the dominant source of numerical noise for
# multi-spectrum QML at high SNR (e.g. T at low ell with sub-uK
# polarization noise).
lambda_matrix = self._build_lambda_matrix(c_ell_dict)
return self._smw_projected_inverse(self._V_Ninv_VT, lambda_matrix=lambda_matrix)
[docs]
def get_derivative_matrix(
self,
ell: int,
key,
*,
symmetry_mode=None,
) -> np.ndarray:
"""
Get the derivative matrix ∂S/∂C_ℓ in harmonic space.
Parameters
----------
ell : int
Multipole.
key : SpectrumKey
Identifies the spectrum (component pair + spin kind).
symmetry_mode : SymmetryMode or None
``None`` keeps the SYMMETRIC default.
Returns
-------
numpy.ndarray
Derivative matrix of shape (n_modes, n_modes).
"""
from ..spectrum_key import SpectrumKind, SymmetryMode, kind_to_legacy_mode
if self.n_components == 1 and self._spins[0] == 0:
if key.kind is not SpectrumKind.SS or key.comp_i != 0 or key.comp_j != 0:
raise ValueError(
f"single spin-0 basis only supports SpectrumKey(0, 0, SS); got {key}"
)
return np.diag(self._derivative_diagonals[ell])
if symmetry_mode is None:
symmetry_mode = SymmetryMode.SYMMETRIC
is_cross = key.comp_i != key.comp_j
mode = kind_to_legacy_mode(key.kind, is_cross=is_cross)
return self._build_derivative_matrix_with_spins(
ell, key.comp_i, key.comp_j, mode, symmetry_mode=symmetry_mode
)
[docs]
def get_covariance(self, C_ell):
"""
Compute harmonic-space covariance C̄ = V N V^T + Λ.
Parameters
----------
C_ell : numpy.ndarray or dict
Power spectrum. Can be:
- numpy.ndarray: C_ell values for ell = 2 to lmax (single-field)
- dict: Multi-field with 2-tuple or 3-tuple keys
Returns
-------
numpy.ndarray
Compressed covariance matrix.
"""
c_ell_arr, c_ell_dict, is_single = self._normalize_c_ell(C_ell)
if is_single:
Lambda_diag = self._build_lambda_diagonal(c_ell_arr)
return add_diagonal(self._V_N_VT, Lambda_diag)
lambda_matrix = self._build_lambda_matrix(c_ell_dict)
return self._V_N_VT + lambda_matrix
# === Full pixel-space operations (if needed) ===
[docs]
def get_full_inverse(self, C_ell) -> np.ndarray:
"""
Compute full (N + S)^{-1} using the SMW formula.
Parameters
----------
C_ell : numpy.ndarray or dict
Power spectrum (array for single-field, dict for multi-field).
Returns
-------
numpy.ndarray
Full inverse covariance matrix of shape (n_pix, n_pix).
"""
c_ell_arr, c_ell_dict, is_single = self._normalize_c_ell(C_ell)
if is_single:
Lambda_diag = self._build_lambda_diagonal(c_ell_arr)
return smw_inverse(self.N_inv, self._V_N_inv, self._V_Ninv_VT, Lambda_diag)
# Multi-field: (N + S)^{-1} = N^{-1} - V_N_inv^T @ K^{-1} @ V_N_inv
# with K = Lambda^{-1} + V N^{-1} V^T solved via Cholesky.
K_chol, _ = self.prepare_for_basis(c_ell_dict)
K_inv_V_N_inv = cholesky_solve((K_chol, True), self._V_N_inv)
return self.N_inv - self._V_N_inv.T @ K_inv_V_N_inv
[docs]
def get_full_logdet(self, C_ell) -> float:
"""
Compute log|N + S| using SMW formula.
Parameters
----------
C_ell : numpy.ndarray or dict
Power spectrum (array for single-field, dict for multi-field).
Returns
-------
float
Log determinant of the full covariance matrix.
"""
c_ell_arr, c_ell_dict, is_single = self._normalize_c_ell(C_ell)
if not is_single:
_, logdet = self.prepare_for_basis(c_ell_dict)
return logdet
Lambda_diag = self._build_lambda_diagonal(c_ell_arr)
return smw_logdet(self._log_det_N, self._V_Ninv_VT, Lambda_diag)
[docs]
def get_noise_for_bias(self) -> np.ndarray:
"""Return the SMW noise intermediate ``T = V N_eff^{-1} N N_eff^{-1} V^T``.
See :meth:`ComputationBasis.get_noise_for_bias` for the cross-basis
contract. ``T`` is precomputed during basis setup.
"""
return self._noise_cov_T
[docs]
def get_weighted_data(
self,
data: np.ndarray,
C_ell,
C_c_inv: np.ndarray | None = None,
stable_inner_inv: np.ndarray | None = None,
) -> np.ndarray:
"""Compute ``w = V C^{-1} d`` via the stable SMW algebra.
``w = (I + Lambda M)^{-T} (V N^{-1} d)`` — algebraically equivalent to
``M K^{-1} d`` but never subtracts large nearly-equal matrices, so it
retains precision in the cosmic-variance-limited regime where the
legacy subtractive form ``y - M K^{-1} y`` cancelled.
Parameters
----------
data : numpy.ndarray
Pixel-space data, shape ``(n_pix,)`` or ``(n_pix, n_sims)``.
C_ell : numpy.ndarray or dict
Power spectrum (single-field array or multi-field dict).
C_c_inv : numpy.ndarray, optional
Unused for harmonic basis; accepted for ABC signature parity.
stable_inner_inv : numpy.ndarray, optional
Precomputed ``(I + Lambda M)^{-1}``. If supplied, avoids
rebuilding it internally. Callers that weight several data
vectors with the same ``C_ell`` should build it once via
:meth:`prepare_stable_inner_inv` and pass it in.
Returns
-------
numpy.ndarray
Weighted harmonic-space data ``w = V C^{-1} d``.
"""
if stable_inner_inv is None:
stable_inner_inv = self.prepare_stable_inner_inv(C_ell)
return stable_inner_inv.T @ (self._V_N_inv @ data)
def _build_smw_kernel(self, C_ell_dict: dict) -> tuple[np.ndarray, np.ndarray]:
"""Build K = lambda_inv + V N^{-1} V^T and return (K, lambda_matrix)."""
lambda_matrix = self._build_lambda_matrix(C_ell_dict)
lambda_regularized = (
lambda_matrix + np.eye(lambda_matrix.shape[0]) * _MATRIX_REGULARIZATION
)
lambda_inv = matrix_inverse_symm(np.asfortranarray(lambda_regularized))
K = lambda_inv + self._V_Ninv_VT
return K, lambda_matrix
[docs]
def prepare_for_basis(self, C_ell_dict: dict) -> BasisPrepared:
"""Precompute K Cholesky factor and log determinant for reuse across sims."""
K, lambda_matrix = self._build_smw_kernel(C_ell_dict)
_, log_det_Lambda = matrix_slogdet_symm(lambda_matrix)
K_chol = cholesky_decomposition(K)
log_det_K = 2.0 * np.sum(np.log(np.diag(K_chol)))
logdet = self._log_det_N + log_det_Lambda + log_det_K
return BasisPrepared(K_chol, logdet)
[docs]
def get_derivative_diagonal(self, ell: int, key) -> np.ndarray:
"""Get derivative diagonal for an auto-pair diagonal SpectrumKey.
Returns a 1D vector — the diagonal of ∂S/∂C_ℓ — for use in the
sparse-trace fast path. Valid only for ``key.comp_i == key.comp_j``
and diagonal kinds (``SS``, ``GG``, ``CC``) — i.e. TT, EE, or BB.
Cross-component keys and ``GC/CG/SG/SC/GS/CS`` raise ``ValueError``.
"""
from ..spectrum_key import SpectrumKind, kind_to_legacy_mode
if key.comp_i != key.comp_j:
raise ValueError(
"get_derivative_diagonal requires an auto-pair key; "
f"got comp_i={key.comp_i}, comp_j={key.comp_j}"
)
if key.kind not in (SpectrumKind.SS, SpectrumKind.GG, SpectrumKind.CC):
raise ValueError(
"get_derivative_diagonal requires a diagonal kind "
f"(SS/GG/CC); got {key.kind.name}"
)
return self._get_derivative_diagonal(
ell, key.comp_i, key.comp_j, kind_to_legacy_mode(key.kind)
)
def _get_derivative_diagonal(
self, ell: int, comp_i: int = 0, comp_j: int = 0, mode: int = 0
) -> np.ndarray:
"""
Get derivative diagonal for auto-spectrum (comp_i, comp_j, mode).
Only valid for auto-spectra (comp_i == comp_j) where the derivative
matrix is diagonal. For spin-2 auto-spectra, ``mode`` selects EE (0)
or BB (1); EB (mode=2) is NOT diagonal and must not use this method.
Parameters
----------
ell : int
Multipole.
comp_i : int
First component index.
comp_j : int
Second component index (must equal comp_i).
mode : int
Spin mode: 0=TT/EE, 1=BB. Only used for spin-2 components.
Returns
-------
numpy.ndarray
Diagonal of derivative matrix, shape (dim,).
"""
spin = self._spins[comp_i]
local_mode_indices = self._ell_to_modes_local[ell]
# Single-component spin-0: use precomputed diagonal directly
if self.n_components == 1 and spin == 0:
return self._derivative_diagonals[ell]
E_diag = np.zeros(self.n_modes_total, dtype=np.float64)
offset = self._mode_offsets[comp_i] if self.n_components > 1 else 0
if spin == 0:
for idx in local_mode_indices:
E_diag[offset + idx] = 1.0
elif spin == 2:
n_base = self._n_modes_base
if mode == 0: # EE
for idx in local_mode_indices:
E_diag[offset + idx] = 1.0
elif mode == 1: # BB
for idx in local_mode_indices:
E_diag[offset + n_base + idx] = 1.0
return E_diag
[docs]
def compute_fisher_matrix(
self,
C_ell,
spectra_list=None,
ell_min: int = 2,
ell_max: int | None = None,
*,
symmetry_mode=None,
) -> np.ndarray:
"""Compute Fisher matrix.
Parameters
----------
C_ell : numpy.ndarray or dict
Power spectrum. Single-field: numpy.ndarray (spectra_list=None).
Multi-field: dict keyed by SpectrumKey (spectra_list required).
spectra_list : list[SpectrumKey] or None
For multi-field, the list of SpectrumKey instances enumerating
the spectra to include. None for single-field.
ell_min : int
Minimum multipole.
ell_max : int or None
Maximum multipole.
symmetry_mode : SymmetryMode or None
Forwarded to the per-ℓ derivative builder. ``None`` keeps the
SYMMETRIC default. Required as ``DIRECTIONAL`` when
``spectra_list`` contains a ``CG`` cross-pair entry — the
builder otherwise rejects the ``mode=2`` slot used by CG.
Returns
-------
numpy.ndarray
Fisher matrix.
"""
c_ell_arr, c_ell_dict, is_single = self._normalize_c_ell(C_ell)
# Single-field path
if is_single:
if spectra_list is not None and not (len(spectra_list) == 1 and is_single):
raise ValueError(
"spectra_list should be None for single-field (array) input"
)
if ell_max is None:
ell_max = self.lmax
if self._compress:
return self._compute_fisher_mblock(c_ell_arr, ell_min, ell_max)
n_ell = ell_max - ell_min + 1
fisher = np.zeros((n_ell, n_ell))
V_Cinv_VT = self.get_projected_inverse(c_ell_arr)
# (V C⁻¹ V^T) @ diag(E) = V_Cinv_VT * E [O(n²) column scaling]
VCinvVT_E = {}
for ell in range(ell_min, ell_max + 1):
VCinvVT_E[ell] = V_Cinv_VT * self._derivative_diagonals[ell]
# F_ij = 0.5 * Tr[A_i @ A_j]
for ell_i in range(ell_min, ell_max + 1):
for ell_j in range(ell_i, ell_max + 1):
idx_i = ell_i - ell_min
idx_j = ell_j - ell_min
fisher_val = 0.5 * matrix_trace(VCinvVT_E[ell_i], VCinvVT_E[ell_j])
fisher[idx_i, idx_j] = fisher_val
if idx_i != idx_j:
fisher[idx_j, idx_i] = fisher_val
return fisher
# Multi-field dict path
if spectra_list is None:
raise ValueError("spectra_list is required for multi-field (dict) input")
if ell_max is None:
ell_max = self.lmax
n_ell = ell_max - ell_min + 1
n_spec = len(spectra_list)
fisher = np.zeros((n_spec * n_ell, n_spec * n_ell))
field_groups = self._detect_field_blocks(c_ell_dict)
V_Cinv_VT = self.get_projected_inverse(c_ell_dict, field_groups=field_groups)
VCinvVT_E = {}
for spec_idx, spec_entry in enumerate(spectra_list):
for ell in range(ell_min, ell_max + 1):
E_matrix = self.get_derivative_matrix(
ell, spec_entry, symmetry_mode=symmetry_mode
)
VCinvVT_E[(spec_idx, ell)] = matrix_mult(V_Cinv_VT, E_matrix)
for spec_a in range(n_spec):
for ell_a in range(ell_min, ell_max + 1):
idx_a = spec_a * n_ell + (ell_a - ell_min)
for spec_b in range(spec_a, n_spec):
ell_b_start = ell_a if spec_a == spec_b else ell_min
for ell_b in range(ell_b_start, ell_max + 1):
idx_b = spec_b * n_ell + (ell_b - ell_min)
fisher_val = 0.5 * matrix_trace(
VCinvVT_E[(spec_a, ell_a)],
VCinvVT_E[(spec_b, ell_b)],
)
fisher[idx_a, idx_b] = fisher_val
if idx_a != idx_b:
fisher[idx_b, idx_a] = fisher_val
return fisher