Source code for cosmocore.bins

"""
Multipole binning for power spectrum estimation.

Provides the Bins class for defining multipole bins with configurable
widths and weights. Used by QUBE for binned QML estimation, where
per-bin derivative matrices replace per-ell derivatives.

When delta_ell=1, each bin contains a single multipole and the binned
estimator reduces to the standard per-multipole QML estimator.
"""

from __future__ import annotations

import numpy as np


[docs] class Bins: """ Multipole binning specification. Defines bins by their lower and upper multipole bounds. Both bounds are **inclusive**: a bin with lmin=2, lmax=4 contains multipoles {2, 3, 4}. Bins must not overlap and must be sorted in ascending order. Gaps between bins are allowed (those multipoles are simply excluded from the analysis). The binning operator formalism follows Bond, Jaffe & Knox (1998). The implementation is adapted from the xQML package (Vanneste et al. 2018, Phys. Rev. D 98, 103526), extended with input validation, D_ell weighting, covariance binning, and lmin zero-padding support. Parameters ---------- lmins : array_like Lower bound of each bin (inclusive). Bins with lmax < 2 are automatically discarded since CMB power spectra start at ell=2. lmaxs : array_like Upper bound of each bin (inclusive). Must satisfy lmaxs[i] >= lmins[i] for each bin. Attributes ---------- lmins : np.ndarray Lower bounds after filtering (ell >= 2). lmaxs : np.ndarray Upper bounds after filtering (ell >= 2). nbins : int Number of bins. lbin : np.ndarray Effective multipole for each bin (midpoint). dl : np.ndarray Width of each bin (lmaxs - lmins + 1). lmin : int Global minimum multipole. lmax : int Global maximum multipole. Examples -------- Uniform bins of width 3 from ell=2 to ell=10: >>> bins = Bins.fromdeltal(2, 10, 3) >>> bins.lmins array([2, 5, 8]) >>> bins.lmaxs array([4, 7, 10]) Custom non-uniform bins: >>> bins = Bins([2, 5, 10], [4, 9, 20]) >>> bins.dl array([3, 5, 11]) """
[docs] def __init__(self, lmins, lmaxs, lmin_floor=2): if len(lmins) != len(lmaxs): raise ValueError( f"lmins and lmaxs must have the same length, " f"got {len(lmins)} and {len(lmaxs)}" ) lmins = np.asarray(lmins, dtype=int) lmaxs = np.asarray(lmaxs, dtype=int) if lmin_floor < 0: raise ValueError(f"lmin_floor must be >= 0, got {lmin_floor}") # Reject bins entirely below lmin_floor (default 2 reproduces the # legacy spin-2 floor; pass lmin_floor=1 for dipole or 0 for # monopole-aware analyses). keep = np.logical_and(lmaxs >= lmin_floor, lmins >= lmin_floor) self.lmins = lmins[keep] self.lmaxs = lmaxs[keep] self.lmin_floor = int(lmin_floor) self._validate_and_derive()
[docs] @classmethod def fromdeltal(cls, lmin, lmax, delta_ell): """Create uniform bins with constant width. ``lmin`` doubles as the bin floor; values below 2 are honoured (e.g. ``Bins.fromdeltal(1, 4, 1)`` includes the dipole). """ nbins = (lmax - lmin + 1) // delta_ell lmins = lmin + np.arange(nbins) * delta_ell lmaxs = lmins + delta_ell - 1 return cls(lmins, lmaxs, lmin_floor=lmin)
def _validate_and_derive(self): if len(self.lmins) == 0: raise ValueError( f"No valid bins (all bins below lmin_floor={self.lmin_floor})" ) for i, (l1, l2) in enumerate(zip(self.lmins, self.lmaxs)): if l1 > l2: raise ValueError(f"Bin {i}: lmin={l1} > lmax={l2}") # Check for overlaps (bins must be non-overlapping and sorted) order = np.argsort(self.lmins) self.lmins = self.lmins[order] self.lmaxs = self.lmaxs[order] for i in range(len(self.lmins) - 1): if self.lmins[i + 1] <= self.lmaxs[i]: raise ValueError( f"Bins {i} and {i + 1} overlap: " f"[{self.lmins[i]}, {self.lmaxs[i]}] and " f"[{self.lmins[i + 1]}, {self.lmaxs[i + 1]}]" ) self.lmin = int(np.min(self.lmins)) self.lmax = int(np.max(self.lmaxs)) self.nbins = len(self.lmins) self.lbin = (self.lmins + self.lmaxs) / 2.0 self.dl = self.lmaxs - self.lmins + 1
[docs] def bins(self): """Return (lmins, lmaxs) tuple.""" return (self.lmins, self.lmaxs)
def _bin_operators(self, *, Dl=False, cov=False): """ Build binning (P) and unbinning (Q) operator matrices. Parameters ---------- Dl : bool If True, weight by ell*(ell+1)/(2*pi). cov : bool If True, build Q for covariance binning. Returns ------- p : np.ndarray Binning matrix, shape (nbins, lmax+1). q : np.ndarray Unbinning matrix, shape (lmax+1, nbins). """ if Dl: ell2 = np.arange(self.lmax + 1) ell2 = ell2 * (ell2 + 1) / (2 * np.pi) else: ell2 = np.ones(self.lmax + 1) p = np.zeros((self.nbins, self.lmax + 1)) q = np.zeros((self.lmax + 1, self.nbins)) for b, (a, z) in enumerate(zip(self.lmins, self.lmaxs)): dl = z - a + 1 p[b, a : z + 1] = ell2[a : z + 1] / dl if cov: q[a : z + 1, b] = 1 / ell2[a : z + 1] / dl else: q[a : z + 1, b] = 1 / ell2[a : z + 1] return p, q
[docs] def bin_spectra(self, spectra, Dl=False, lmin=0): """ Average spectra in bins. Parameters ---------- spectra : array_like Power spectra, last axis is multipole. Dl : bool If True, weight by ell*(ell+1)/(2*pi). lmin : int Starting multipole of the input spectra. If > 0, the input is zero-padded for ell < lmin before binning. Returns ------- np.ndarray Binned spectra. """ spectra = np.asarray(spectra) if lmin > 0: pad = np.zeros((*spectra.shape[:-1], lmin)) spectra = np.concatenate([pad, spectra], axis=-1) minlmax = np.min([spectra.shape[-1] - 1, self.lmax]) _p, _q = self._bin_operators(Dl=Dl) return np.dot(spectra[..., : minlmax + 1], _p.T[: minlmax + 1, ...])
[docs] def bin_covariance(self, clcov): """Bin a covariance matrix: P @ clcov @ Q.""" p, q = self._bin_operators(cov=True) return np.matmul(p, np.matmul(clcov, q))