diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2ab892a..afb0f87 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -15,7 +15,7 @@ and this project adheres to `Semantic Versioning None: + """Initialize the Chebyshev convolution context. + + Parameters + ---------- + L : csc_matrix + Sparse Graph Laplacian. The spectral bound is estimated + automatically via :func:`~sgwt.util.estimate_spectral_bound`. + """ self.n_vertices = L.shape[0] self.spectrum_bound = estimate_spectral_bound(L) self.chol = CholWrapper(L) @@ -32,30 +40,43 @@ def __init__(self, L: csc_matrix) -> None: self._cached_spectrum_bound = None def __enter__(self) -> "ChebyConvolve": + """Start CHOLMOD and perform symbolic factorization. + + Returns + ------- + ChebyConvolve + This context manager instance. + """ self.chol.start() self.chol.sym_factor() return self def __exit__(self, exc_type, exc_val, exc_tb): + """Free CHOLMOD resources and cached recurrence matrix.""" if self._cached_M_ptr is not None: self.chol.free_sparse(self._cached_M_ptr) self.chol.free_factor(self.chol.fact_ptr) self.chol.finish() - def _get_recurrence_matrix(self, spectrum_bound: float): - """Get cached recurrence matrix M = (2/λ_max)L - I.""" + def _get_recurrence_matrix(self, spectrum_bound: float, min_lambda: float = 0.0): + """Get cached recurrence matrix M = alpha*L + beta*I for domain [min_lambda, spectrum_bound].""" + cache_key = (spectrum_bound, min_lambda) if self._cached_M_ptr is not None: - if self._cached_spectrum_bound == spectrum_bound: + if self._cached_spectrum_bound == cache_key: return self._cached_M_ptr self.chol.free_sparse(self._cached_M_ptr) - + + range_ = spectrum_bound - min_lambda + alpha = 2.0 / range_ + beta = -(spectrum_bound + min_lambda) / range_ + EYE = self.chol.speye(self.n_vertices, self.n_vertices) try: self._cached_M_ptr = self.chol.add( byref(self.chol.A), EYE, - alpha=2.0 / spectrum_bound, beta=-1.0 + alpha=alpha, beta=beta ) - self._cached_spectrum_bound = spectrum_bound + self._cached_spectrum_bound = cache_key return self._cached_M_ptr finally: self.chol.free_sparse(EYE) @@ -122,7 +143,7 @@ def _convolve_real(self, B: np.ndarray, C: ChebyKernel) -> np.ndarray: if n_order == 1: return W - M = self._get_recurrence_matrix(C.spectrum_bound) + M = self._get_recurrence_matrix(C.spectrum_bound, C.min_lambda) T_km1 = self.chol.allocate_dense(n_vertex, n_signals) # T_1(L)B = M @ B @@ -184,20 +205,20 @@ def convolve_multi(self, B: np.ndarray, kernels: list) -> list: n_vertex, n_signals = B.shape - # Group by spectrum_bound + # Group by (spectrum_bound, min_lambda) from collections import defaultdict groups = defaultdict(list) for idx, k in enumerate(kernels): - groups[k.spectrum_bound].append((idx, k)) - + groups[(k.spectrum_bound, k.min_lambda)].append((idx, k)) + results = [None] * len(kernels) - - for bound, group in groups.items(): + + for (bound, min_lam), group in groups.items(): max_order = max(k.C.shape[0] for _, k in group) - + # Stack all coefficients for this group for vectorized application # Compute terms and apply via tensor contraction - T_terms = self._compute_terms(B, max_order, bound) + T_terms = self._compute_terms(B, max_order, bound, min_lam) T_stack = np.stack(T_terms, axis=0) # (max_order, n_vertex, n_signals) for idx, kernel in group: @@ -208,20 +229,20 @@ def convolve_multi(self, B: np.ndarray, kernels: list) -> list: return results - def _compute_terms(self, B: np.ndarray, max_order: int, bound: float) -> list: + def _compute_terms(self, B: np.ndarray, max_order: int, bound: float, min_lambda: float = 0.0) -> list: """Compute Chebyshev terms T_k(L)B for k = 0..max_order-1.""" n_vertex, n_signals = B.shape terms = [] - + B_chol = byref(self.chol.numpy_to_chol_dense(B)) T_km2, T_km1 = None, None - + try: T_km2 = self.chol.copy_dense(B_chol) terms.append(self.chol.chol_dense_to_numpy(T_km2).copy()) - + if max_order > 1: - M = self._get_recurrence_matrix(bound) + M = self._get_recurrence_matrix(bound, min_lambda) T_km1 = self.chol.allocate_dense(n_vertex, n_signals) self.chol.sdmult(M, T_km2, T_km1, alpha=1.0, beta=0.0) diff --git a/sgwt/cholconv.py b/sgwt/cholconv.py index bb6365f..bc0ddb8 100644 --- a/sgwt/cholconv.py +++ b/sgwt/cholconv.py @@ -109,9 +109,9 @@ class Convolve: Examples -------- - >>> from sgwt import Convolve, LAPLACIAN_TEXAS_DELAY + >>> from sgwt import Convolve, DELAY_TEXAS >>> import numpy as np - >>> L = LAPLACIAN_TEXAS_DELAY + >>> L = DELAY_TEXAS >>> signal = np.random.randn(L.shape[0], 100) >>> with Convolve(L) as conv: ... lp = conv.lowpass(signal, scales=[0.1, 1.0, 10.0]) @@ -119,6 +119,14 @@ class Convolve: """ def __init__(self, L:csc_matrix) -> None: + """Initialize the static convolution context. + + Parameters + ---------- + L : csc_matrix + Sparse Graph Laplacian of shape ``(n_vertices, n_vertices)``. + Must be symmetric positive semi-definite. + """ # Store number of vertices self.n_vertices = L.shape[0] @@ -128,6 +136,16 @@ def __init__(self, L:csc_matrix) -> None: def __enter__(self) -> "Convolve": + """Start CHOLMOD and perform symbolic factorization. + + Allocates workspace pointers for solve2 operations. The symbolic + factorization is reused across all subsequent numeric factorizations. + + Returns + ------- + Convolve + This context manager instance. + """ # Start Cholmod self.chol.start() @@ -147,6 +165,11 @@ def __enter__(self) -> "Convolve": return self def __exit__(self, exc_type, exc_val, exc_tb): + """Free all CHOLMOD resources. + + Releases the factored matrix, workspace memory, and finalizes + the CHOLMOD common object. + """ # Free the factored matrix object self.chol.free_factor(self.chol.fact_ptr) @@ -156,7 +179,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.chol.free_dense(self.X2) self.chol.free_sparse(self.Xset) - # Free Y & E (workspacce for solve2) + # Free Y & E (workspace for solve2) self.chol.free_dense(self.Y) self.chol.free_dense(self.E) @@ -229,7 +252,7 @@ def _convolve_impl(self, B: np.ndarray, K: Union[VFKernel, dict]) -> np.ndarray: return W - def lowpass(self, B: np.ndarray, scales: Union[float, List[float]] = [1], Bset: Optional[csc_matrix] = None, refactor: bool = True, order = 1) -> Union[np.ndarray, List[np.ndarray]]: + def lowpass(self, B: np.ndarray, scales: Union[float, List[float]] = None, Bset: Optional[csc_matrix] = None, refactor: bool = True, order = 1) -> Union[np.ndarray, List[np.ndarray]]: """ Computes low-pass filtered scaling coefficients at specified scales. @@ -263,7 +286,7 @@ def lowpass(self, B: np.ndarray, scales: Union[float, List[float]] = [1], Bset: """ return _process_signal(self._lowpass_impl, B, scales, Bset, refactor, order) - def _lowpass_impl(self, B: np.ndarray, scales: List[float] = [1], Bset: Optional[csc_matrix] = None, refactor: bool = True, order = 1) -> List[np.ndarray]: + def _lowpass_impl(self, B: np.ndarray, scales: List[float] = None, Bset: Optional[csc_matrix] = None, refactor: bool = True, order = 1) -> List[np.ndarray]: # Using this requires the number of columns in f to be 1 if Bset is not None: # pragma: no cover @@ -281,10 +304,10 @@ def _lowpass_impl(self, B: np.ndarray, scales: List[float] = [1], Bset: Optional # Calculate Scaling Coefficients of 'f' for each scale - for i, scale in enumerate(scales): + for scale in scales: - # Step 1 -> Numeric Factorization - # In some instances it will alreayd be factord at appropriate scale, so we allow option to skip + # Step 1 -> Numeric Factorization + # In some instances it will already be factored at appropriate scale, so we allow option to skip if refactor: self.chol.num_factor(A_ptr, fact_ptr, 1/scale) @@ -308,7 +331,7 @@ def _lowpass_impl(self, B: np.ndarray, scales: List[float] = [1], Bset: Optional ) return W - def bandpass(self, B: np.ndarray, scales: Union[float, List[float]] = [1], order: int = 1) -> Union[np.ndarray, List[np.ndarray]]: + def bandpass(self, B: np.ndarray, scales: Union[float, List[float]] = None, order: int = 1) -> Union[np.ndarray, List[np.ndarray]]: """ Computes band-pass filtered wavelet coefficients at specified scales. @@ -339,8 +362,8 @@ def bandpass(self, B: np.ndarray, scales: Union[float, List[float]] = [1], order """ return _process_signal(self._bandpass_impl, B, scales, order) - def _bandpass_impl(self, B: np.ndarray, scales: List[float] = [1], order: int = 1) -> List[np.ndarray]: - + def _bandpass_impl(self, B: np.ndarray, scales: List[float] = None, order: int = 1) -> List[np.ndarray]: + # Pointer to bB (The function being convolved) B_chol_struct = self.chol.numpy_to_chol_dense(B) @@ -353,11 +376,11 @@ def _bandpass_impl(self, B: np.ndarray, scales: List[float] = [1], order: int = fact_ptr = self.chol.fact_ptr # Calculate Scaling Coefficients of 'f' for each scale - for i, scale in enumerate(scales): + for scale in scales: # Step 1 -> Numeric Factorization self.chol.num_factor(A_ptr, fact_ptr, 1/scale) - + # Solve more than once iff order > 1 in_ptr = byref(B_chol_struct) for _ in range(order): @@ -383,7 +406,7 @@ def _bandpass_impl(self, B: np.ndarray, scales: List[float] = [1], order: int = return W - def highpass(self, B: np.ndarray, scales: Union[float, List[float]] = [1]) -> Union[np.ndarray, List[np.ndarray]]: + def highpass(self, B: np.ndarray, scales: Union[float, List[float]] = None) -> Union[np.ndarray, List[np.ndarray]]: """ Computes high-pass filtered coefficients at specified scales. @@ -410,8 +433,8 @@ def highpass(self, B: np.ndarray, scales: Union[float, List[float]] = [1]) -> Un otherwise a list of arrays for each scale. """ return _process_signal(self._highpass_impl, B, scales) - - def _highpass_impl(self, B: np.ndarray, scales: List[float] = [1]) -> List[np.ndarray]: + + def _highpass_impl(self, B: np.ndarray, scales: List[float] = None) -> List[np.ndarray]: # List, malloc, numpy, etc. W = [] X1, X2 = self.X1, self.X2 @@ -466,11 +489,23 @@ class DyConvolve: L : csc_matrix Sparse Graph Laplacian of shape ``(n_vertices, n_vertices)``. poles : list[float] | VFKernel - Predetermined set of poles (equivalent to 1/scale for analytical filters). + Predetermined set of poles (``q = 1/s`` for analytical filters, + where ``s`` is the corresponding scale). """ def __init__(self, L:csc_matrix, poles: Union[List[float], VFKernel]) -> None: + """Initialize the dynamic convolution context. + + Parameters + ---------- + L : csc_matrix + Sparse Graph Laplacian of shape ``(n_vertices, n_vertices)``. + poles : list[float] | VFKernel + Predetermined set of poles. For analytical filters, these are + the inverse of the desired scales (``q = 1/s``). If a VFKernel + is passed, its poles, residues, and direct term are extracted. + """ # Store number of vertices self.n_vertices = L.shape[0] @@ -494,6 +529,18 @@ def __init__(self, L:csc_matrix, poles: Union[List[float], VFKernel]) -> None: # Context Manager for using CHOLMOD def __enter__(self) -> "DyConvolve": + """Start CHOLMOD, perform symbolic factorization, and pre-factor all poles. + + Creates copies of the symbolic factor and performs numeric + factorization for each pole ``(L + qI)``. This pre-computation + allows subsequent topology updates via rank-1 Cholesky updates + without full re-factorization. + + Returns + ------- + DyConvolve + This context manager instance. + """ # Start Cholmod self.chol.start() @@ -504,7 +551,7 @@ def __enter__(self) -> "DyConvolve": # Make copies of the symbolic factor object self.factors = [ self.chol.copy_factor(self.chol.fact_ptr) - for i in range(self.npoles) + for _ in range(self.npoles) ] # Now perform each unique numeric factorization A + qI @@ -523,6 +570,7 @@ def __enter__(self) -> "DyConvolve": return self def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]) -> Optional[bool]: + """Free all CHOLMOD resources including per-pole factor copies.""" # Free the factored matrix object self.chol.free_factor(self.chol.fact_ptr) @@ -536,7 +584,7 @@ def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[Ba self.chol.free_dense(self.X2) self.chol.free_sparse(self.Xset) - # Free Y & E (workspacce for solve2) + # Free Y & E (workspace for solve2) self.chol.free_dense(self.Y) self.chol.free_dense(self.E) @@ -595,13 +643,14 @@ def lowpass(self, B: np.ndarray, Bset: Optional[csc_matrix] = None, order = 1) - """ Computes low-pass filtered scaling coefficients. - Applies the spectral filter: + Applies the spectral filter at each pre-defined scale: .. math:: - \\phi_q(\\mathbf{L}) = \\left( \\frac{q\\mathbf{I}}{\\mathbf{L} + q\\mathbf{I}} \\right)^n + \\phi_s(\\mathbf{L}) = \\left( \\frac{\\mathbf{I}}{s\\mathbf{L} + \\mathbf{I}} \\right)^n - where :math:`q` is the pre-defined pole and :math:`n` is the filter order. + where :math:`s` is the scale and :math:`n` is the filter order. + Internally, the solve uses pole :math:`q = 1/s`. Parameters ---------- @@ -615,7 +664,7 @@ def lowpass(self, B: np.ndarray, Bset: Optional[csc_matrix] = None, order = 1) - Returns ------- list[np.ndarray] - Filtered signals for each pre-defined pole. + Filtered signals, one per pre-defined scale. """ return _process_signal(self._lowpass_impl, B, None, Bset, order) @@ -663,13 +712,14 @@ def bandpass(self, B: np.ndarray, order: int = 1) -> List[np.ndarray]: """ Computes band-pass filtered wavelet coefficients. - Applies the spectral wavelet kernel: + Applies the spectral wavelet kernel at each pre-defined scale: .. math:: - \\Psi_q(\\mathbf{L}) = \\left( \\frac{4q\\mathbf{L}}{(\\mathbf{L} + q\\mathbf{I})^2} \\right)^n + \\Psi_s(\\mathbf{L}) = \\left( \\frac{4\\mathbf{L}/s}{(\\mathbf{L} + \\mathbf{I}/s)^2} \\right)^n - where :math:`q` is the pre-defined pole and :math:`n` is the filter order. + where :math:`s` is the scale and :math:`n` is the filter order. + Internally, the solve uses pole :math:`q = 1/s`. Parameters ---------- @@ -681,7 +731,7 @@ def bandpass(self, B: np.ndarray, order: int = 1) -> List[np.ndarray]: Returns ------- list[np.ndarray] - Filtered signals for each pre-defined pole. + Filtered signals, one per pre-defined scale. """ return _process_signal(self._bandpass_impl, B, None, order) @@ -727,13 +777,14 @@ def highpass(self, B: np.ndarray) -> List[np.ndarray]: """ Computes high-pass filtered coefficients. - Applies the spectral filter: + Applies the spectral filter at each pre-defined scale: .. math:: - \\mu_q(\\mathbf{L}) = \\frac{\\mathbf{L}}{\\mathbf{L} + q\\mathbf{I}} + \\mu_s(\\mathbf{L}) = \\frac{s\\mathbf{L}}{s\\mathbf{L} + \\mathbf{I}} - where :math:`q` is the pre-defined pole. + where :math:`s` is the scale. Internally, the solve uses + pole :math:`q = 1/s`. Parameters ---------- @@ -743,7 +794,7 @@ def highpass(self, B: np.ndarray) -> List[np.ndarray]: Returns ------- list[np.ndarray] - Filtered signals for each pre-defined pole. + Filtered signals, one per pre-defined scale. """ return _process_signal(self._highpass_impl, B, None) @@ -823,7 +874,7 @@ def addbranch(self, i: int, j: int, w: float) -> bool: vals=data ) - # TODO we can optize performance eventually by + # TODO we can optimize performance eventually by # splitting updown into symbolic and numeric, since symbolic same for all # Update all factors diff --git a/sgwt/cholesky/__init__.py b/sgwt/cholesky/__init__.py index 8f68cde..5b8ac76 100644 --- a/sgwt/cholesky/__init__.py +++ b/sgwt/cholesky/__init__.py @@ -3,7 +3,6 @@ Sparse Spectral Graph Wavelet Transform (SGWT) ---------------------------------------------- Author: Luke Lowery (lukel@tamu.edu) -File: sgwt/cholesky/__init__.py Description: Cholesky factorization module initialization. """ diff --git a/sgwt/cholesky/structs.py b/sgwt/cholesky/structs.py index 4703368..df84222 100644 --- a/sgwt/cholesky/structs.py +++ b/sgwt/cholesky/structs.py @@ -3,7 +3,6 @@ Sparse Spectral Graph Wavelet Transform (SGWT) ---------------------------------------------- Author: Luke Lowery (lukel@tamu.edu) -File: sgwt/cholesky/structs.py Description: Ctypes definitions for CHOLMOD C structures. """ diff --git a/sgwt/cholesky/wrapper.py b/sgwt/cholesky/wrapper.py index 55cc0a6..9061001 100644 --- a/sgwt/cholesky/wrapper.py +++ b/sgwt/cholesky/wrapper.py @@ -3,7 +3,6 @@ Sparse Spectral Graph Wavelet Transform (SGWT) ---------------------------------------------- Author: Luke Lowery (lukel@tamu.edu) -File: sgwt/cholesky/wrapper.py Description: Low-level Python wrapper for the CHOLMOD C library. """ @@ -34,8 +33,8 @@ CHOLMOD_SUPERNODAL = 1 # supernodal # Factor form -CHOLMOD_L = 0 # LLᵀ -CHOLMOD_LT = 1 # LDLᵀ +CHOLMOD_FACTOR_LL = 0 # LLᵀ +CHOLMOD_FACTOR_LDL = 1 # LDLᵀ # Up/down-date CHOLMOD_UPDATE = 1 # update diff --git a/sgwt/fitting/__init__.py b/sgwt/fitting/__init__.py new file mode 100644 index 0000000..c6f18c5 --- /dev/null +++ b/sgwt/fitting/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +""" +Fitting Subpackage +------------------ +Model fitting tools for constructing rational and polynomial +approximations from frequency response data. +""" +from .vfit import VFModel +from .chebyshev import ChebyModel + +__all__ = ["VFModel", "ChebyModel"] diff --git a/sgwt/fitting/chebyshev.py b/sgwt/fitting/chebyshev.py new file mode 100644 index 0000000..7b4b221 --- /dev/null +++ b/sgwt/fitting/chebyshev.py @@ -0,0 +1,359 @@ +# -*- coding: utf-8 -*- +"""Chebyshev Polynomial Approximation for SGWT. + +Provides :class:`ChebyModel`, the fitting model for constructing Chebyshev +polynomial approximations of spectral filters. Mirrors the :class:`VFModel` +interface for a consistent fitting API. + +Author: Luke Lowery (lukel@tamu.edu) +""" + +from dataclasses import dataclass + +import numpy as np +from numpy import ndarray +from scipy.sparse import csc_matrix +from typing import Callable, Optional + + +@dataclass +class ChebyModel: + """Chebyshev polynomial approximation model. + + Represents a spectral filter approximated by Chebyshev polynomials + of the first kind on the interval + ``[min_lambda, spectrum_bound]``. + + Parameters + ---------- + C : ndarray, shape (order + 1, n_dims) + Chebyshev coefficient matrix. + spectrum_bound : float + Upper bound of the approximation domain. + min_lambda : float + Lower bound of the approximation domain. + + See Also + -------- + sgwt.util.ChebyKernel : Lightweight kernel used by :class:`~sgwt.ChebyConvolve`. + VFModel : Rational (Vector Fitting) approximation model. + + Examples + -------- + Fit a lowpass function and evaluate: + + >>> import numpy as np + >>> model = ChebyModel.fit(lambda x: 1 / (x + 1), order=20, spectrum_bound=100.0) + >>> model(np.array([0.0, 1.0, 50.0])) + array(...) + + Fit and return a kernel ready for convolution: + + >>> from scipy.sparse import eye + >>> L = eye(10, format="csc") + >>> K = ChebyModel.kernel(L, lambda x: 1 / (x + 1), order=20) + """ + + C: ndarray + """Chebyshev coefficient matrix, shape ``(order + 1, n_dims)``.""" + + spectrum_bound: float + """Upper bound of the approximation domain.""" + + min_lambda: float = 0.0 + """Lower bound of the approximation domain.""" + + # ------------------------------------------------------------------ + # Fitting classmethods + # ------------------------------------------------------------------ + + @classmethod + def fit( + cls, + f: Callable[[np.ndarray], np.ndarray], + order: int, + spectrum_bound: float, + n_samples: Optional[int] = None, + sampling: str = 'chebyshev', + min_lambda: float = 0.0, + rtol: float = 1e-12, + adaptive: bool = False, + max_order: int = 500, + target_error: float = 1e-10, + ) -> 'ChebyModel': + """Fit a Chebyshev polynomial to a vectorized function. + + Parameters + ---------- + f : callable + Vectorized function ``f(x) -> y`` to approximate. + order : int + Polynomial order (must be >= 1). + spectrum_bound : float + Upper bound of the approximation domain. + n_samples : int, optional + Number of sample points (non-Chebyshev sampling only). + sampling : str, default 'chebyshev' + Sampling strategy: ``'chebyshev'``, ``'linear'``, + ``'quadratic'``, or ``'logarithmic'``. + min_lambda : float, default 0.0 + Lower bound of the approximation domain. + rtol : float, default 1e-12 + Relative tolerance for truncating negligible coefficients. + adaptive : bool, default False + Automatically determine optimal order to achieve + *target_error*. + max_order : int, default 500 + Maximum order for adaptive mode. + target_error : float, default 1e-10 + Target approximation error for adaptive mode. + + Returns + ------- + ChebyModel + Fitted polynomial model. + """ + if order < 1: + raise ValueError("Order must be >= 1") + + if adaptive: + return cls._adaptive_fit(f, order, spectrum_bound, min_lambda, + sampling, rtol, max_order, target_error) + + coeffs = cls._compute_coefficients(f, order, spectrum_bound, min_lambda, + n_samples, sampling) + coeffs = cls._ensure_2d(coeffs) + coeffs = cls._truncate(coeffs, rtol) + + return cls(C=coeffs, spectrum_bound=spectrum_bound, min_lambda=min_lambda) + + @classmethod + def kernel( + cls, + L: csc_matrix, + f: Callable[[np.ndarray], np.ndarray], + order: int, + **kw, + ): + """Fit a graph Chebyshev kernel and return it for convolution. + + Estimates the spectral bound from *L*, performs the fit, and + returns a :class:`~sgwt.util.ChebyKernel` ready for use with + :class:`~sgwt.ChebyConvolve`. + + Parameters + ---------- + L : csc_matrix + Graph Laplacian. + f : callable + Vectorized function ``f(x) -> y`` to approximate. + order : int + Polynomial order (must be >= 1). + **kw + Forwarded to :meth:`fit`. + + Returns + ------- + ChebyKernel + Kernel ready for graph convolution. + """ + from ..util import ChebyKernel, estimate_spectral_bound + + spectrum_bound = estimate_spectral_bound(L) + model = cls.fit(f, order, spectrum_bound, **kw) + return ChebyKernel(C=model.C, spectrum_bound=model.spectrum_bound, + min_lambda=model.min_lambda) + + @classmethod + def kernel_on_graph( + cls, + L: csc_matrix, + f: Callable[[np.ndarray], np.ndarray], + order: int, + **kw, + ): + """Backward-compatible alias for :meth:`kernel`. + + Parameters + ---------- + L : csc_matrix + Graph Laplacian. + f : callable + Vectorized function ``f(x) -> y`` to approximate. + order : int + Polynomial order (must be >= 1). + **kw + Forwarded to :meth:`kernel`. + + Returns + ------- + ChebyKernel + Kernel ready for graph convolution. + """ + return cls.kernel(L, f, order, **kw) + + # ------------------------------------------------------------------ + # Private fitting helpers + # ------------------------------------------------------------------ + + @classmethod + def _compute_coefficients(cls, f, order, spectrum_bound, min_lambda, n_samples, sampling): + """Compute Chebyshev coefficients using optimal or fallback sampling. + + Uses Chebyshev-Gauss-Lobatto nodes for the 'chebyshev' strategy + (optimal polynomial interpolation), or weighted least-squares + fitting for other strategies. + + Parameters + ---------- + f : callable + Vectorized function to approximate. + order : int + Polynomial order. + spectrum_bound : float + Upper bound of the approximation domain. + min_lambda : float + Lower bound of the approximation domain. + n_samples : int or None + Number of sample points (non-Chebyshev sampling only). + sampling : str + Sampling strategy: 'chebyshev', 'linear', 'quadratic', 'logarithmic'. + + Returns + ------- + np.ndarray + Coefficient array of shape ``(order + 1,)`` or ``(order + 1, n_dims)``. + """ + lambda_range = spectrum_bound - min_lambda + lambda_mid = (spectrum_bound + min_lambda) / 2.0 + + if sampling == 'chebyshev': + # Chebyshev-Gauss-Lobatto nodes: optimal for polynomial interpolation + n = order + 1 + k = np.arange(n) + x_cheb = np.cos(np.pi * k / order) # Nodes in [-1, 1] + sample_x = lambda_mid + (lambda_range / 2.0) * x_cheb + f_values = cls._ensure_2d(f(sample_x)) + + # Compute coefficients via discrete orthogonality (DCT-like) + coeffs = np.zeros((n, f_values.shape[1])) + w = np.ones(n); w[0] = w[-1] = 0.5 # Endpoint weights + + for j in range(n): + T_j = np.cos(j * np.pi * k / order) + scale = 2.0 / order if 0 < j < order else 1.0 / order + coeffs[j] = scale * np.sum(w[:, None] * f_values * T_j[:, None], axis=0) + + return coeffs + + # Fallback: least-squares fitting for other sampling strategies + n_samples = n_samples or max(4 * (order + 1), 1000) + t = np.linspace(0, 1, n_samples) + + if sampling == 'quadratic': + sample_x = min_lambda + lambda_range * (t ** 2) + elif sampling == 'logarithmic': + eps = max(min_lambda * 0.001, 1e-10) + sample_x = np.exp(np.log(min_lambda + eps) + t * np.log(spectrum_bound / (min_lambda + eps))) + else: # linear + sample_x = min_lambda + lambda_range * t + + x_scaled = 2.0 * (sample_x - min_lambda) / lambda_range - 1.0 + + # Chebyshev-weighted least squares + weights = 1.0 / np.sqrt(1.0 - np.clip(x_scaled ** 2, 0, 0.9999)) + return np.polynomial.chebyshev.chebfit(x_scaled, f(sample_x), order, w=weights) + + @classmethod + def _adaptive_fit(cls, f, start_order, spectrum_bound, min_lambda, + sampling, rtol, max_order, target_error): + """Adaptively determine optimal polynomial order. + + Iteratively increases the polynomial order until the relative + approximation error on a test grid falls below ``target_error`` + or ``max_order`` is reached. + + Parameters + ---------- + f : callable + Vectorized function to approximate. + start_order : int + Initial polynomial order. + spectrum_bound : float + Upper bound of the approximation domain. + min_lambda : float + Lower bound of the approximation domain. + sampling : str + Sampling strategy. + rtol : float + Relative tolerance for coefficient truncation. + max_order : int + Maximum polynomial order. + target_error : float + Target relative approximation error. + + Returns + ------- + ChebyModel + Fitted model with adaptively chosen order. + """ + test_x = np.linspace(min_lambda, spectrum_bound, 1000) + f_exact = cls._ensure_2d(f(test_x)) + order = max(start_order, 8) + + while True: + coeffs = cls._ensure_2d( + cls._compute_coefficients(f, order, spectrum_bound, min_lambda, None, sampling) + ) + + # Evaluate and compute relative error + x_scaled = 2.0 * (test_x - min_lambda) / (spectrum_bound - min_lambda) - 1.0 + f_approx = np.polynomial.chebyshev.chebval(x_scaled, coeffs) + f_approx = f_approx.T if f_approx.ndim > 1 else f_approx[:, None] + + rel_error = np.max(np.abs(f_exact - f_approx) / np.maximum(np.abs(f_exact), 1e-15)) + + if rel_error <= target_error or order >= max_order: + break + order = min(int(order * 1.5) + 1, max_order) + + return cls(C=cls._truncate(coeffs, rtol), spectrum_bound=spectrum_bound, min_lambda=min_lambda) + + @classmethod + def _ensure_2d(cls, arr): + """Ensure array is 2D with shape (n, dims).""" + arr = np.atleast_1d(arr) + return arr[:, None] if arr.ndim == 1 else arr + + @classmethod + def _truncate(cls, coeffs, rtol): + """Truncate negligible higher-order coefficients.""" + threshold = rtol * np.max(np.abs(coeffs)) + row_max = np.max(np.abs(coeffs), axis=1) + significant = np.where(row_max > threshold)[0] + last_idx = significant[-1] + 1 if significant.size > 0 else 1 + return coeffs[:last_idx] + + # ------------------------------------------------------------------ + # Evaluation + # ------------------------------------------------------------------ + + def __call__(self, x): + """Evaluate the Chebyshev approximation at given points. + + Parameters + ---------- + x : array_like + Points in ``[min_lambda, spectrum_bound]`` at which to + evaluate. + + Returns + ------- + ndarray + Approximated values. + """ + x = np.asarray(x) + x_scaled = (2.0 * (x - self.min_lambda) + / (self.spectrum_bound - self.min_lambda) - 1.0) + y = np.polynomial.chebyshev.chebval(x_scaled, self.C) + return y.T if y.ndim > 1 else y diff --git a/sgwt/fitting/vfit.py b/sgwt/fitting/vfit.py new file mode 100644 index 0000000..b92105a --- /dev/null +++ b/sgwt/fitting/vfit.py @@ -0,0 +1,398 @@ +# -*- coding: utf-8 -*- +"""Vector Fitting (Rational Approximation) for SGWT. + +Implements the Vector Fitting algorithm for constructing rational +approximations of frequency response data. Provides both general-purpose +fitting and a convenience constructor for graph spectral kernels. + +Author: Luke Lowery (lukel@tamu.edu) +""" + +from dataclasses import dataclass + +from numpy import ( + abs, append, asarray, atleast_2d, concatenate, + diag, geomspace, hstack, mean, ndarray, + ones, outer, sort_complex, sqrt, vstack, zeros, +) +from numpy.linalg import eigvals, lstsq, qr + + +# --------------------------------------------------------------------------- +# Private helpers +# --------------------------------------------------------------------------- + +def _cauchy(s, poles): + """Build the Cauchy matrix. + + Parameters + ---------- + s : ndarray, shape (K,) + Sample points. + poles : ndarray, shape (N,) + Current pole locations. + + Returns + ------- + ndarray, shape (K, N) + Matrix where element ``[i, j] = 1 / (s[i] - poles[j])``. + """ + return 1.0 / (s[:, None] - poles) + + +def _lstsq(A, b): + """Solve an overdetermined least-squares system. + + Parameters + ---------- + A : ndarray, shape (M, N) + Coefficient matrix. + b : ndarray, shape (M,) or (M, P) + Right-hand side. + + Returns + ------- + ndarray, shape (N,) or (N, P) + Least-squares solution. + """ + return lstsq(A, b, rcond=None)[0] + + +def _basis(s, poles, fit_d, fit_e): + """Construct the regression matrix ``[Cauchy | 1 | s]``. + + Parameters + ---------- + s : ndarray, shape (K,) + Sample points. + poles : ndarray, shape (N,) + Current pole locations. + fit_d : bool + Append a column of ones for the direct term. + fit_e : bool + Append a column of ``s`` for the linear term. + + Returns + ------- + ndarray, shape (K, N + fit_d + fit_e) + Augmented regression matrix. + """ + cols = [_cauchy(s, poles)] + if fit_d: + cols.append(ones((len(s), 1))) + if fit_e: + cols.append(s[:, None]) + return hstack(cols) + + +def _enforce_conjugates(poles): + """Snap complex poles to exact conjugate pairs. + + Any pole whose imaginary part is below a tolerance is forced real. + Remaining complex poles are paired with their conjugates. + + Parameters + ---------- + poles : ndarray, shape (N,) + Poles to regularize. + + Returns + ------- + ndarray, shape (N,) + Poles with exact conjugate symmetry. + """ + tol = 1e-6 * (abs(poles).max() + 1) + real = poles[abs(poles.imag) < tol].real + 0j + upper = poles[poles.imag >= tol] + return sort_complex(concatenate([real, upper, upper.conj()])) + + +def _initial_poles(s, N, real_only): + """Generate logarithmically-spaced starting poles. + + For ``real_only=True``, produces real negative poles spanning the + data range. Otherwise, produces complex conjugate pairs with small + negative real parts. + + Parameters + ---------- + s : ndarray, shape (K,) + Sample points used to determine the frequency range. + N : int + Number of poles to generate. + real_only : bool + If True, generate only real negative poles. + + Returns + ------- + ndarray, shape (N,) + Initial pole locations (complex dtype). + """ + if real_only or N < 2: + r = abs(s[s != 0]) if (s != 0).any() else abs(s) + 1 + return -geomspace(max(r.min() * 0.1, 1e-6), r.max() * 2, N).astype(complex) + w = abs(s.imag) + w = w[w > 0] + pts = geomspace(max(w.min() * 0.5, 1e-3), w.max() * 1.5, N // 2) + cc = -pts * 0.01 + 1j * pts + poles = concatenate([cc, cc.conj()]) + return append(poles, -pts[-1])[:N] if N % 2 else poles[:N] + + +def _relocate(s, f, poles, fit_d, fit_e, real_only): + """Pole relocation step of the Vector Fitting algorithm. + + Builds an augmented least-squares system whose solution yields a + weight function, then computes the zeros of that weight function as + eigenvalues of a companion matrix. These zeros become the relocated + poles, with stability enforced (negative real parts). + + Parameters + ---------- + s : ndarray, shape (K,) + Sample points. + f : ndarray, shape (K, A) + Response data (A channels). + poles : ndarray, shape (N,) + Current pole locations. + fit_d : bool + Include direct term in the basis. + fit_e : bool + Include linear term in the basis. + real_only : bool + Force relocated poles to be real. + + Returns + ------- + ndarray, shape (N,) + Relocated pole locations. + """ + K, A = f.shape + N = len(poles) + + Phi_sig = _basis(s, poles, True, False) + Q, _ = qr(_basis(s, poles, fit_d, fit_e), mode='reduced') + + # Remove the signal subspace so we only fit the weight function + weighted = f[:, :, None] * Phi_sig[:, None, :] + flat = weighted.reshape(K, -1) + rows = (flat - Q @ (Q.conj().T @ flat)).reshape(-1, N + 1) + + # Normalize so the weight function can't vanish + w = sqrt(K * A) + M = vstack([rows, Phi_sig.sum(0)[None, :] * w]) + rhs = concatenate([zeros(M.shape[0] - 1), [K * w]]) + z = _lstsq(M, rhs) + c_tilde, d_tilde = z[:N], z[N] + 1e-30 + + # Zeros of the weight function = new poles, forced stable + H = diag(poles) - outer(ones(N), c_tilde / d_tilde) + new_poles = eigvals(H) + new_poles = -abs(new_poles.real) + 1j * new_poles.imag + + if real_only: + return sort_complex(new_poles.real.astype(complex)) + return _enforce_conjugates(new_poles) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +@dataclass +class VFModel: + """Rational approximation model from Vector Fitting. + + Represents a transfer function of the form: + + .. math:: + + \\mathbf{g}(\\lambda) \\approx \\mathbf{d} + \\mathbf{e}\\lambda + + \\sum_{k=1}^{K} \\frac{\\mathbf{r}_k}{\\lambda + q_k} + + where :math:`q_k` are the poles and :math:`\\mathbf{r}_k` the residues, + both shared across all response channels. + + Parameters + ---------- + poles : ndarray, shape (N,) + Pole locations (complex or real). + residues : ndarray, shape (N, A) + Residue matrix (A = number of response channels). + d : ndarray, shape (A,) + Direct (constant) term. + e : ndarray, shape (A,) + Linear term (defaults to zeros). + rmse : float + Root-mean-square error of the fit. + iters : int + Number of pole relocation iterations performed. + + See Also + -------- + sgwt.util.VFKernel : Lightweight kernel representation for graph convolution. + + Examples + -------- + General-purpose fitting of frequency response data: + + >>> import numpy as np + >>> s = 1j * np.linspace(1, 100, 200) + >>> f = 1.0 / (s + 5) + 2.0 / (s + 20) + >>> model = VFModel.fit(s, f, n_poles=2) + >>> model + VFModel(2 poles, 1 ch, rmse=..., ... iters) + + Graph kernel fitting for use with :class:`sgwt.Convolve`: + + >>> lam = np.linspace(0.01, 100, 500) + >>> g = 1.0 / (lam + 1) + >>> K = VFModel.kernel(lam, g, n_poles=4) + """ + + poles: ndarray + """Pole locations, shape ``(N,)``.""" + + residues: ndarray + """Residue matrix, shape ``(N, A)``.""" + + d: ndarray + """Direct (constant) term, shape ``(A,)``.""" + + e: ndarray + """Linear term, shape ``(A,)``. Zero for graph kernels.""" + + rmse: float + """Root-mean-square error of the fit.""" + + iters: int + """Number of pole relocation iterations performed.""" + + def __repr__(self): + return ( + f"VFModel({len(self.poles)} poles, {self.residues.shape[1]} ch, " + f"rmse={self.rmse:.3e}, {self.iters} iters)" + ) + + def __call__(self, s): + """Evaluate the rational model at sample points. + + Parameters + ---------- + s : array_like + Points at which to evaluate the model. + + Returns + ------- + ndarray, shape (len(s), A) + Model response at each point. + """ + s = asarray(s, dtype=complex).ravel() + return _cauchy(s, self.poles) @ self.residues + self.d + s[:, None] * self.e + + # ------------------------------------------------------------------ + # Fitting classmethods + # ------------------------------------------------------------------ + + @classmethod + def fit(cls, s, f, n_poles, *, poles=None, iters=30, tol=1e-9, + fit_d=True, fit_e=False, real_only=False): + """General-purpose Vector Fitting. + + Iteratively relocates poles to minimize the least-squares error + of a partial-fraction rational approximation. + + Parameters + ---------- + s : array_like + Sample points (complex or real), shape ``(M,)``. + f : array_like + Response data, shape ``(M,)`` or ``(M, A)`` for multi-channel. + n_poles : int + Number of poles to fit. + poles : array_like, optional + Custom initial poles. If *None*, poles are auto-generated + using logarithmic spacing across the data range. + iters : int, default 30 + Maximum number of pole relocation iterations. + tol : float, default 1e-9 + Convergence tolerance on relative pole movement. + fit_d : bool, default True + Include a constant (direct) term in the model. + fit_e : bool, default False + Include a linear term in the model. + real_only : bool, default False + Force all poles to be real and negative. + + Returns + ------- + VFModel + Fitted rational model. + """ + s = asarray(s, dtype=complex).ravel() + f = atleast_2d(asarray(f, dtype=complex)) + if f.shape[0] != len(s): + f = f.T + + a = (asarray(poles, complex) + if poles is not None + else _initial_poles(s, n_poles, real_only)) + + for it in range(iters): + a_prev = a.copy() + a = _relocate(s, f, a, fit_d, fit_e, real_only) + if (abs(a - a_prev) / (abs(a_prev) + 1e-15)).max() < tol: + break + + X = _lstsq(_basis(s, a, fit_d, fit_e), f) + N, A = len(a), f.shape[1] + d = X[N] if fit_d else zeros(A, complex) + e = X[N + fit_d] if fit_e else zeros(A, complex) + + model = cls(a, X[:N], d, e, 0.0, it + 1) + model.rmse = float(sqrt(mean(abs(f - model(s)) ** 2))) + return model + + @classmethod + def kernel(cls, lam, g, n_poles, **kw): + """Fit a graph spectral kernel with real negative poles. + + Performs a constrained Vector Fitting and returns a + :class:`~sgwt.util.VFKernel` ready for use with + :class:`~sgwt.Convolve` and :class:`~sgwt.DyConvolve`. + Enforces ``real_only=True`` and ``fit_e=False`` for stability + in the CHOLMOD convolution pipeline. + + The resulting kernel satisfies: + + .. math:: + + \\mathbf{g}(\\lambda) \\approx \\mathbf{d} + + \\sum_{k=1}^{K} \\frac{\\mathbf{r}_k}{\\lambda + q_k} + + where all :math:`q_k > 0` are real. + + Parameters + ---------- + lam : array_like + Eigenvalues (real, non-negative). + g : array_like + Target filter response evaluated at *lam*, shape ``(M,)`` + or ``(M, A)``. + n_poles : int + Number of poles. + **kw + Forwarded to :meth:`fit`. ``fit_d`` defaults to True. + + Returns + ------- + VFKernel + Kernel with real negative poles, ready for graph convolution. + """ + from ..util import VFKernel + + kw.setdefault('fit_d', True) + kw['fit_e'] = False + model = cls.fit(asarray(lam, float), asarray(g, float), + n_poles, real_only=True, **kw) + return VFKernel(R=model.residues, Q=model.poles, D=model.d) \ No newline at end of file diff --git a/sgwt/library/KERNELS/MODIFIED_MORLET.json b/sgwt/library/KERNELS/MODIFIED_MORLET.json index b5b2f0e..eaca554 100644 --- a/sgwt/library/KERNELS/MODIFIED_MORLET.json +++ b/sgwt/library/KERNELS/MODIFIED_MORLET.json @@ -89,5 +89,5 @@ ] } ], - "description": "Modified Morlet Wavelet With Central Frequnecy of 2pi" + "description": "Modified Morlet Wavelet With Central Frequency of 2pi" } diff --git a/sgwt/sgma.py b/sgwt/sgma.py index bd0e851..543fc45 100644 --- a/sgwt/sgma.py +++ b/sgwt/sgma.py @@ -21,7 +21,25 @@ def _peak_local_max_fallback( image: np.ndarray, min_distance: int = 1, num_peaks: int = np.inf, exclude_border: bool = False ) -> np.ndarray: - """Fallback for skimage peak_local_max using scipy maximum_filter.""" + """Fallback for skimage peak_local_max using scipy maximum_filter. + + Parameters + ---------- + image : np.ndarray + 2D array in which to find local maxima. + min_distance : int, default: 1 + Minimum index distance between detected peaks. + num_peaks : int, default: np.inf + Maximum number of peaks to return. + exclude_border : bool, default: False + Unused; kept for API compatibility with skimage. + + Returns + ------- + np.ndarray + Array of shape ``(n_peaks, 2)`` with row, column indices of peaks, + sorted by descending magnitude. + """ min_distance = max(1, min_distance) size = 2 * min_distance + 1 local_max = (image == maximum_filter(image, size=size, mode="constant")) & (image > 0) @@ -47,13 +65,26 @@ def _peak_local_max_fallback( from .functions import gaussian_wavelet from .util import impulse -NetworkAnalysisResult = NamedTuple('NetworkAnalysisResult', [('peaks', Dict), ('clusters', Dict)]) +class NetworkAnalysisResult(NamedTuple): + """Result container for multi-bus SGMA analysis. + + Attributes + ---------- + peaks : dict + Aggregated peak data with keys: Wavelength, Frequency, Magnitude, Bus_ID. + clusters : dict + Density-based cluster data with keys: Wavelength, Frequency, Density. + """ + peaks: Dict + clusters: Dict class ModeTable: """ Container for identified oscillatory modes with tabular display. + Stores mode parameters (frequency, damping ratio, wavelength, magnitude) and provides a formatted string representation for easy inspection. + Attributes ---------- frequency : ndarray diff --git a/sgwt/tests/conftest.py b/sgwt/tests/conftest.py index e91ac7d..6137a1b 100644 --- a/sgwt/tests/conftest.py +++ b/sgwt/tests/conftest.py @@ -131,7 +131,7 @@ def simple_chebykernel(small_laplacian): """Create a simple ChebyKernel approximating exp(-x).""" import sgwt f = lambda x: np.exp(-x) - return sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=10) + return sgwt.ChebyModel.kernel(small_laplacian, f, order=10) # --------------------------------------------------------------------------- diff --git a/sgwt/tests/test_chebymodel.py b/sgwt/tests/test_chebymodel.py new file mode 100644 index 0000000..c461d8a --- /dev/null +++ b/sgwt/tests/test_chebymodel.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +""" +Tests for the Chebyshev fitting module (fitting.chebyshev). +""" +import numpy as np +import pytest + +import sgwt +from sgwt.fitting import ChebyModel + + +class TestChebyModel: + """Tests for ChebyModel — Chebyshev polynomial approximation.""" + + def test_fit_approximates_function(self): + """fit() produces a model that approximates the target function.""" + f = lambda x: 1.0 / (x + 1) + model = ChebyModel.fit(f, order=40, spectrum_bound=100.0) + x = np.linspace(0.01, 100, 200) + assert np.max(np.abs(np.ravel(model(x)) - f(x))) < 1e-3 + + def test_kernel_returns_chebykernel(self, small_laplacian): + """kernel() returns a ChebyKernel, not a ChebyModel.""" + f = lambda x: 1.0 / (x + 1) + K = ChebyModel.kernel(small_laplacian, f, order=20) + assert isinstance(K, sgwt.ChebyKernel) + assert np.isclose(K.spectrum_bound, sgwt.estimate_spectral_bound(small_laplacian)) + + def test_zero_function_keeps_constant_term(self, small_laplacian): + """Fitting a zero function keeps at least the constant term.""" + kern = ChebyModel.kernel(small_laplacian, lambda x: np.zeros_like(x), order=5) + assert kern.C.shape[0] >= 1 + + def test_multioutput_function_preserves_2d_coeffs(self, small_laplacian): + """Fitting a multi-output function preserves 2D coefficient structure.""" + def multi_func(x): + return np.column_stack([np.exp(-x), np.sin(x)]) + + kern = ChebyModel.kernel(small_laplacian, multi_func, order=5) + assert kern.C.shape[1] == 2 + x_test = np.linspace(0, kern.spectrum_bound, 10) + result = kern.evaluate(x_test) + assert result.shape == (10, 2) + + @pytest.mark.parametrize("order", [0, -5]) + def test_invalid_order_raises_valueerror(self, small_laplacian, order): + """Order < 1 raises ValueError.""" + with pytest.raises(ValueError, match="Order must be >= 1"): + ChebyModel.kernel(small_laplacian, lambda x: x, order=order) + + def test_all_negligible_coefficients(self, small_laplacian): + """Fitting where all higher-order coefficients are negligible.""" + kern = ChebyModel.kernel( + small_laplacian, lambda x: np.full_like(x, 1e-20), order=10 + ) + assert kern.C.shape[0] >= 1 + + @pytest.mark.parametrize("sampling", ['linear', 'quadratic', 'logarithmic']) + def test_sampling_strategies(self, small_laplacian, sampling): + """kernel() works with various sampling strategies.""" + kern = ChebyModel.kernel( + small_laplacian, lambda x: np.exp(-x), order=5, sampling=sampling + ) + assert kern.C.shape[0] > 0 + x_test = np.linspace(0, min(2.0, kern.spectrum_bound), 20) + result = kern.evaluate(x_test) + expected = np.exp(-x_test) + np.testing.assert_allclose(result.flatten(), expected, atol=0.1) + + def test_adaptive_fitting(self, small_laplacian): + """kernel() with adaptive order selection.""" + kern = ChebyModel.kernel( + small_laplacian, lambda x: np.exp(-x), order=5, + adaptive=True, target_error=0.01, max_order=50 + ) + assert kern.C.shape[0] > 0 + x_test = np.linspace(0, min(2.0, kern.spectrum_bound), 100) + result = kern.evaluate(x_test) + expected = np.exp(-x_test) + rel_error = np.max(np.abs(result.flatten() - expected) / np.maximum(np.abs(expected), 1e-15)) + assert rel_error < 0.1 + + def test_adaptive_reaches_max_order(self, small_laplacian): + """Adaptive fitting that hits max_order still produces valid kernel.""" + kern = ChebyModel.kernel( + small_laplacian, lambda x: np.exp(-x), order=5, + adaptive=True, target_error=1e-20, max_order=10 + ) + assert kern.C.shape[0] > 0 diff --git a/sgwt/tests/test_chebyshev.py b/sgwt/tests/test_chebyshev.py index 09c9b88..a7ca57e 100644 --- a/sgwt/tests/test_chebyshev.py +++ b/sgwt/tests/test_chebyshev.py @@ -9,20 +9,19 @@ class TestChebyKernel: - """Tests for ChebyKernel construction and evaluation.""" + """Tests for ChebyKernel construction and fitting via ChebyModel.""" - def test_from_function_approximates_linear(self): - """ChebyKernel.from_function approximates f(x)=x correctly.""" - bound = 4.0 + def test_kernel_approximates_linear(self, small_laplacian): + """ChebyModel.kernel approximates f(x)=x correctly.""" f = lambda x: x - kern = sgwt.ChebyKernel.from_function(f, order=5, spectrum_bound=bound) - x_eval = np.linspace(0, bound, 20) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=5) + x_eval = np.linspace(0, kern.spectrum_bound, 20) np.testing.assert_allclose(kern.evaluate(x_eval).flatten(), f(x_eval), atol=1e-2) - def test_from_function_on_graph_estimates_bound(self, small_laplacian): - """from_function_on_graph estimates spectral bound and creates valid kernel.""" + def test_kernel_estimates_bound(self, small_laplacian): + """kernel estimates spectral bound from L and creates valid kernel.""" f = lambda x: np.exp(-x) - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=10) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=10) assert isinstance(kern, sgwt.ChebyKernel) # Spectral bound should be positive and reasonable assert kern.spectrum_bound > 0.01, \ @@ -30,10 +29,10 @@ def test_from_function_on_graph_estimates_bound(self, small_laplacian): assert kern.C.shape[0] == 11 # order + 1 @pytest.mark.parametrize("order", [5, 10, 20]) - def test_from_function_respects_order(self, small_laplacian, order): + def test_kernel_respects_order(self, small_laplacian, order): """Kernel C matrix has at most (order+1) rows (may be truncated for efficiency).""" f = lambda x: np.exp(-x) - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=order) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=order) # Coefficients may be truncated if high-order terms are negligible assert kern.C.shape[0] <= order + 1 assert kern.C.shape[0] >= 1 @@ -61,20 +60,11 @@ def test_identity_kernel_returns_input(self, small_laplacian, identity_signal): result = conv.convolve(identity_signal, kern) np.testing.assert_allclose(result.squeeze(), identity_signal, atol=1e-10) - def test_linear_kernel_applies_laplacian(self, small_laplacian, identity_signal): - """Convolution with f(x)=x applies the Laplacian.""" - f = lambda x: x - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=10) - with sgwt.ChebyConvolve(small_laplacian) as conv: - result = conv.convolve(identity_signal, kern) - expected = small_laplacian @ identity_signal - np.testing.assert_allclose(result.squeeze(), expected, atol=1e-2) - @pytest.mark.parametrize("order", [10, 30, 50]) def test_high_order_is_stable(self, small_laplacian, identity_signal, order): """High-order polynomial remains numerically stable.""" f = lambda x: np.exp(-x) - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=order) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=order) with sgwt.ChebyConvolve(small_laplacian) as conv: result = conv.convolve(identity_signal, kern) assert not np.any(np.isnan(result)) @@ -83,7 +73,7 @@ def test_high_order_is_stable(self, small_laplacian, identity_signal, order): def test_convolve_with_random_signal(self, small_laplacian, random_signal): """Convolution works with multi-column random signal.""" f = lambda x: 1.0 / (x + 1.0) # lowpass-like - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=15) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=15) with sgwt.ChebyConvolve(small_laplacian) as conv: result = conv.convolve(random_signal, kern) assert result.shape[0] == random_signal.shape[0] @@ -92,7 +82,7 @@ def test_convolve_with_random_signal(self, small_laplacian, random_signal): def test_convolve_with_1d_input(self, small_laplacian, identity_signal): """Convolution works with 1D input signal and returns squeezed output.""" f = lambda x: np.exp(-x) - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=10) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=10) # Ensure signal is 1D signal_1d = identity_signal.flatten() assert signal_1d.ndim == 1 @@ -109,7 +99,7 @@ def test_complex_input_handling(self, small_laplacian, identity_signal): # Simple kernel f(x) = x f = lambda x: x - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=5) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=5) with sgwt.ChebyConvolve(small_laplacian) as conv: result = conv.convolve(complex_signal, kern) @@ -128,7 +118,7 @@ def test_c_contiguous_input(self, small_laplacian, random_signal): # Simple kernel f = lambda x: np.exp(-x) - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=5) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=5) with sgwt.ChebyConvolve(small_laplacian) as conv: # Should not raise error @@ -139,8 +129,8 @@ def test_convolve_multi_basic(self, small_laplacian, identity_signal): """convolve_multi applies multiple kernels efficiently.""" f1 = lambda x: np.exp(-x) f2 = lambda x: 1.0 / (x + 1.0) - kern1 = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f1, order=10) - kern2 = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f2, order=10) + kern1 = sgwt.ChebyModel.kernel(small_laplacian, f1, order=10) + kern2 = sgwt.ChebyModel.kernel(small_laplacian, f2, order=10) with sgwt.ChebyConvolve(small_laplacian) as conv: results = conv.convolve_multi(identity_signal, [kern1, kern2]) @@ -162,7 +152,7 @@ def test_convolve_multi_complex(self, small_laplacian, identity_signal): """convolve_multi handles complex inputs.""" complex_signal = identity_signal + 1j * identity_signal f = lambda x: np.exp(-x) - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=10) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=10) with sgwt.ChebyConvolve(small_laplacian) as conv: results = conv.convolve_multi(complex_signal, [kern]) @@ -173,7 +163,7 @@ def test_convolve_multi_1d_input(self, small_laplacian): """convolve_multi handles 1D input signal.""" signal_1d = np.ones(small_laplacian.shape[0]) f = lambda x: np.exp(-x) - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=10) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=10) with sgwt.ChebyConvolve(small_laplacian) as conv: results = conv.convolve_multi(signal_1d, [kern]) @@ -206,7 +196,7 @@ def test_many_dimensions_einsum_path(self, small_laplacian, identity_signal): def test_cache_hit_same_spectrum_bound(self, small_laplacian, identity_signal): """Test that recurrence matrix is cached when spectrum_bound is same.""" f = lambda x: np.exp(-x) - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=10) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=10) with sgwt.ChebyConvolve(small_laplacian) as conv: # First convolution creates cache @@ -237,7 +227,7 @@ def test_complex_f_contiguous_input(self, small_laplacian): assert signal.flags['F_CONTIGUOUS'] f = lambda x: np.exp(-x) - kern = sgwt.ChebyKernel.from_function_on_graph(small_laplacian, f, order=5) + kern = sgwt.ChebyModel.kernel(small_laplacian, f, order=5) with sgwt.ChebyConvolve(small_laplacian) as conv: result = conv.convolve(signal, kern) @@ -254,4 +244,4 @@ def test_convolve_multi_single_order_kernel(self, small_laplacian, identity_sign results = conv.convolve_multi(identity_signal, [kern]) assert len(results) == 1 # Should just scale input by 2.0 - np.testing.assert_allclose(results[0].squeeze(), 2.0 * identity_signal, atol=1e-10) \ No newline at end of file + np.testing.assert_allclose(results[0].squeeze(), 2.0 * identity_signal, atol=1e-10) diff --git a/sgwt/tests/test_cholconv.py b/sgwt/tests/test_cholconv.py index d9fafb0..baa814a 100644 --- a/sgwt/tests/test_cholconv.py +++ b/sgwt/tests/test_cholconv.py @@ -49,14 +49,6 @@ def test_lowpass_with_bset_subset(self, texas_laplacian): results = conv.lowpass(X_single, SCALES, Bset=bset) assert len(results) == len(SCALES) - def test_zero_signal_returns_zero(self, texas_laplacian, texas_signal): - """Convolving zero signal returns zero.""" - X_zero = np.zeros_like(texas_signal) - with sgwt.Convolve(texas_laplacian) as conv: - results = conv.lowpass(X_zero, SCALES) - for r in results: - assert np.allclose(r, 0) - def test_empty_signal_raises_error(self, texas_laplacian): """Empty signal (0 rows) raises ValueError.""" X_empty = np.empty((0, 1), dtype=float) @@ -95,38 +87,21 @@ def test_lowpass_refactor_false(self, texas_laplacian, texas_signal): result_without = conv.lowpass(texas_signal, [scale], refactor=False)[0] # Results should be identical when using the same scale - # This tests that refactor=False doesn't break anything np.testing.assert_allclose(result_with, result_without, atol=1e-10) - def test_multiple_scales_work_correctly(self, texas_laplacian, texas_signal): - """Convolving with multiple different scales produces different results.""" - scale1, scale2 = SCALES[0], SCALES[1] - with sgwt.Convolve(texas_laplacian) as conv: - # Convolution with scale1 - result1 = conv.lowpass(texas_signal, [scale1])[0] - - # Convolution with different scale2 - result2 = conv.lowpass(texas_signal, [scale2])[0] - - # Results should be different for different scales - diff = np.abs(result1 - result2) - min_expected_diff = 1e-6 - assert np.max(diff) > min_expected_diff, \ - f"Different scales should produce different results (max diff={np.max(diff):.2e})" - def test_complex_signal_handling(self, texas_laplacian, texas_signal): """Complex signals are processed by splitting real/imag parts.""" # Create a complex signal: x + i*x complex_signal = texas_signal + 1j * texas_signal scale = SCALES[0] - + with sgwt.Convolve(texas_laplacian) as conv: # Process complex signal results_complex = conv.lowpass(complex_signal, [scale])[0] - + # Process real part manually for verification results_real = conv.lowpass(texas_signal, [scale])[0] - + # Check linearity: L(x + iy) = L(x) + iL(y) expected = results_real + 1j * results_real np.testing.assert_allclose(results_complex, expected, atol=1e-10) @@ -135,13 +110,13 @@ def test_complex_signal_handling(self, texas_laplacian, texas_signal): def test_convolve_complex_returns_array(self, texas_laplacian, texas_signal, library_kernel): """Convolve with complex signal returns ndarray (covers else branch in _process_signal).""" complex_signal = texas_signal + 1j * texas_signal - + with sgwt.Convolve(texas_laplacian) as conv: # convolve returns np.ndarray, not list, triggering the 'else' block result = conv.convolve(complex_signal, library_kernel) assert isinstance(result, np.ndarray) assert np.iscomplexobj(result) - + # Verify linearity real_res = conv.convolve(texas_signal, library_kernel) expected = real_res + 1j * real_res @@ -169,14 +144,6 @@ def test_float32_input_converted(self, texas_laplacian, texas_signal): res_64 = conv.lowpass(texas_signal, SCALES) np.testing.assert_allclose(res[0], res_64[0], atol=1e-5) - def test_int_input_converted(self, texas_laplacian, texas_signal): - """Integer input is converted to float64 and processed correctly.""" - # Create integer signal (rounded) - X_int = np.round(texas_signal).astype(np.int32) - with sgwt.Convolve(texas_laplacian) as conv: - res = conv.lowpass(X_int, SCALES) - assert len(res) == len(SCALES) - def test_scalar_scale_returns_array(self, texas_laplacian, texas_signal): """Passing scalar scale returns single array instead of list.""" with sgwt.Convolve(texas_laplacian) as conv: @@ -184,26 +151,6 @@ def test_scalar_scale_returns_array(self, texas_laplacian, texas_signal): assert isinstance(result, np.ndarray) assert result.shape == texas_signal.shape - def test_scalar_scale_matches_list_scale(self, texas_laplacian, texas_signal): - """Scalar scale result matches first element of list scale result.""" - scale = 1.0 - with sgwt.Convolve(texas_laplacian) as conv: - scalar_result = conv.lowpass(texas_signal, scale) - list_result = conv.lowpass(texas_signal, [scale]) - np.testing.assert_allclose(scalar_result, list_result[0]) - - def test_scalar_scale_bandpass(self, texas_laplacian, texas_signal): - """Scalar scale works for bandpass filter.""" - with sgwt.Convolve(texas_laplacian) as conv: - result = conv.bandpass(texas_signal, 1.0) - assert isinstance(result, np.ndarray) - - def test_scalar_scale_highpass(self, texas_laplacian, texas_signal): - """Scalar scale works for highpass filter.""" - with sgwt.Convolve(texas_laplacian) as conv: - result = conv.highpass(texas_signal, 1.0) - assert isinstance(result, np.ndarray) - def test_1d_signal_lowpass(self, texas_laplacian): """1D signal input returns 1D output for lowpass.""" signal_1d = np.random.randn(texas_laplacian.shape[0]) @@ -212,29 +159,6 @@ def test_1d_signal_lowpass(self, texas_laplacian): assert result.ndim == 1 assert result.shape[0] == texas_laplacian.shape[0] - def test_1d_signal_matches_2d(self, texas_laplacian): - """1D signal result matches 2D signal with single column.""" - signal_1d = np.random.randn(texas_laplacian.shape[0]) - signal_2d = signal_1d.reshape(-1, 1) - with sgwt.Convolve(texas_laplacian) as conv: - result_1d = conv.lowpass(signal_1d, 1.0) - result_2d = conv.lowpass(signal_2d, 1.0) - np.testing.assert_allclose(result_1d, result_2d.squeeze()) - - def test_1d_signal_bandpass(self, texas_laplacian): - """1D signal input works for bandpass filter.""" - signal_1d = np.random.randn(texas_laplacian.shape[0]) - with sgwt.Convolve(texas_laplacian) as conv: - result = conv.bandpass(signal_1d, 1.0) - assert result.ndim == 1 - - def test_1d_signal_highpass(self, texas_laplacian): - """1D signal input works for highpass filter.""" - signal_1d = np.random.randn(texas_laplacian.shape[0]) - with sgwt.Convolve(texas_laplacian) as conv: - result = conv.highpass(signal_1d, 1.0) - assert result.ndim == 1 - def test_1d_signal_convolve(self, texas_laplacian, library_kernel): """1D signal input works for convolve.""" signal_1d = np.random.randn(texas_laplacian.shape[0]) @@ -244,16 +168,6 @@ def test_1d_signal_convolve(self, texas_laplacian, library_kernel): assert result.ndim == 2 assert result.shape[0] == texas_laplacian.shape[0] - def test_1d_signal_list_scales(self, texas_laplacian): - """1D signal with list of scales returns list of 1D arrays.""" - signal_1d = np.random.randn(texas_laplacian.shape[0]) - with sgwt.Convolve(texas_laplacian) as conv: - results = conv.lowpass(signal_1d, SCALES) - assert isinstance(results, list) - assert len(results) == len(SCALES) - for r in results: - assert r.ndim == 1 - class TestConvolveVFKernel: """Tests for VFKernel convolution in Convolve context.""" @@ -264,14 +178,6 @@ def test_vfkernel_from_dict(self, texas_laplacian, texas_signal, library_kernel) result = conv.convolve(texas_signal, library_kernel) assert result.shape[0] == texas_laplacian.shape[0] - def test_vfkernel_object_matches_dict(self, texas_laplacian, texas_signal, library_kernel): - """VFKernel object produces same result as dict.""" - vk = sgwt.VFKernel.from_dict(library_kernel) - with sgwt.Convolve(texas_laplacian) as conv: - res_dict = conv.convolve(texas_signal, library_kernel) - res_obj = conv.convolve(texas_signal, vk) - np.testing.assert_allclose(res_dict, res_obj) - def test_invalid_kernel_raises_typeerror(self, texas_laplacian, texas_signal): """Invalid kernel type raises TypeError with helpful message.""" with sgwt.Convolve(texas_laplacian) as conv: @@ -355,33 +261,6 @@ def test_vfkernel_direct_term(self, texas_laplacian, texas_signal): expected = lp[:, None] + texas_signal[:, None] * 10.0 np.testing.assert_allclose(result, expected) - def test_consistency_with_static_convolve(self, texas_laplacian, texas_signal, library_kernel): - """DyConvolve produces same results as Convolve.""" - poles = [1.0 / s for s in SCALES] - vk = sgwt.VFKernel.from_dict(library_kernel) - - with sgwt.DyConvolve(texas_laplacian, vk) as dy: - dy_vf = dy.convolve(texas_signal) - - with sgwt.DyConvolve(texas_laplacian, poles) as dy: - dy_lp = dy.lowpass(texas_signal) - dy_bp = dy.bandpass(texas_signal) - dy_hp = dy.highpass(texas_signal) - - with sgwt.Convolve(texas_laplacian) as st: - st_vf = st.convolve(texas_signal, vk) - st_lp = st.lowpass(texas_signal, SCALES) - st_bp = st.bandpass(texas_signal, SCALES) - st_hp = st.highpass(texas_signal, SCALES) - - np.testing.assert_allclose(dy_vf, st_vf, atol=1e-10) - for dy_r, st_r in zip(dy_lp, st_lp): - np.testing.assert_allclose(dy_r, st_r, atol=1e-10) - for dy_r, st_r in zip(dy_bp, st_bp): - np.testing.assert_allclose(dy_r, st_r, atol=1e-10) - for dy_r, st_r in zip(dy_hp, st_hp): - np.testing.assert_allclose(dy_r, st_r, atol=1e-10) - def test_float32_input_converted(self, texas_laplacian, texas_signal): """Float32 input is converted to float64 in DyConvolve.""" X_32 = texas_signal.astype(np.float32) @@ -401,34 +280,6 @@ def test_1d_signal_lowpass(self, texas_laplacian): assert r.ndim == 1 assert r.shape[0] == texas_laplacian.shape[0] - def test_1d_signal_matches_2d(self, texas_laplacian): - """1D signal result matches 2D signal with single column.""" - signal_1d = np.random.randn(texas_laplacian.shape[0]) - signal_2d = signal_1d.reshape(-1, 1) - poles = [1.0] - with sgwt.DyConvolve(texas_laplacian, poles) as conv: - result_1d = conv.lowpass(signal_1d)[0] - result_2d = conv.lowpass(signal_2d)[0] - np.testing.assert_allclose(result_1d, result_2d.squeeze()) - - def test_1d_signal_bandpass(self, texas_laplacian): - """1D signal input works for bandpass filter.""" - signal_1d = np.random.randn(texas_laplacian.shape[0]) - poles = [1.0] - with sgwt.DyConvolve(texas_laplacian, poles) as conv: - results = conv.bandpass(signal_1d) - for r in results: - assert r.ndim == 1 - - def test_1d_signal_highpass(self, texas_laplacian): - """1D signal input works for highpass filter.""" - signal_1d = np.random.randn(texas_laplacian.shape[0]) - poles = [1.0] - with sgwt.DyConvolve(texas_laplacian, poles) as conv: - results = conv.highpass(signal_1d) - for r in results: - assert r.ndim == 1 - def test_1d_signal_convolve(self, texas_laplacian, library_kernel): """1D signal input works for convolve.""" signal_1d = np.random.randn(texas_laplacian.shape[0]) @@ -458,15 +309,6 @@ def test_addbranch_modifies_response(self, texas_laplacian, texas_signal): assert max_diff > min_change, \ f"Topology update should significantly affect response (max diff={max_diff:.2e}, expected >{min_change:.2e})" - def test_multiple_branch_updates(self, texas_laplacian, texas_signal): - """Multiple sequential branch additions work.""" - with sgwt.DyConvolve(texas_laplacian, [1.0]) as conv: - ok1 = conv.addbranch(10, 20, 1.0) - ok2 = conv.addbranch(30, 40, 1.0) - assert ok1 and ok2 - result = conv.lowpass(texas_signal) - assert len(result) == 1 - def test_out_of_bounds_indices_fail_gracefully(self, texas_laplacian, texas_signal): """Out-of-bounds node indices return False, not crash.""" n = texas_laplacian.shape[0] diff --git a/sgwt/tests/test_functions.py b/sgwt/tests/test_functions.py index 4b6ac22..a1fb8d9 100644 --- a/sgwt/tests/test_functions.py +++ b/sgwt/tests/test_functions.py @@ -15,18 +15,6 @@ def test_dc_gain_is_one(self): """Lowpass gain at λ=0 is 1.""" assert lowpass(np.array([0.0]), scale=1.0)[0] == pytest.approx(1.0) - def test_high_frequency_approaches_zero(self): - """Lowpass gain at large λ approaches 0.""" - result = lowpass(np.array([1e6]), scale=1.0)[0] - # At λ=1e6, lowpass should be essentially zero (< 1e-5) - assert result < 1e-5, f"Expected near-zero gain at high frequency, got {result}" - - @pytest.mark.parametrize("scale", [0.1, 1.0, 10.0]) - def test_cutoff_at_one_over_scale(self, scale): - """Lowpass gain at λ=1/scale is 0.5.""" - x = np.array([1.0 / scale]) - assert lowpass(x, scale=scale)[0] == pytest.approx(0.5) - def test_monotonically_decreasing(self): """Lowpass is monotonically decreasing for λ > 0.""" x = np.linspace(0, 10, 100) @@ -41,19 +29,6 @@ def test_dc_gain_is_zero(self): """Highpass gain at λ=0 is 0.""" assert highpass(np.array([0.0]), scale=1.0)[0] == pytest.approx(0.0) - def test_high_frequency_approaches_one(self): - """Highpass gain at large λ approaches 1.""" - result = highpass(np.array([1e6]), scale=1.0)[0] - # At λ=1e6, highpass should be essentially 1.0 (within 1e-5) - min_gain = 0.99999 - assert result > min_gain, f"Expected gain >{min_gain} at high frequency, got {result}" - - @pytest.mark.parametrize("scale", [0.1, 1.0, 10.0]) - def test_cutoff_at_one_over_scale(self, scale): - """Highpass gain at λ=1/scale is 0.5.""" - x = np.array([1.0 / scale]) - assert highpass(x, scale=scale)[0] == pytest.approx(0.5) - def test_monotonically_increasing(self): """Highpass is monotonically increasing for λ > 0.""" x = np.linspace(0, 10, 100) @@ -68,12 +43,6 @@ def test_dc_gain_is_zero(self): """Bandpass gain at λ=0 is 0.""" assert bandpass(np.array([0.0]), scale=1.0)[0] == pytest.approx(0.0) - def test_high_frequency_approaches_zero(self): - """Bandpass gain at large λ approaches 0.""" - result = bandpass(np.array([1e6]), scale=1.0)[0] - # At λ=1e6, bandpass should be essentially zero (< 1e-5) - assert result < 1e-5, f"Expected near-zero gain at high frequency, got {result}" - def test_peak_at_center_frequency(self): """Bandpass has maximum near center frequency λ=1/scale.""" scale = 1.0 @@ -87,19 +56,3 @@ def test_peak_at_center_frequency(self): deviation = abs(peak_x - expected_peak) assert deviation < tolerance_fraction, \ f"Peak at λ={peak_x:.3f}, expected near λ={expected_peak:.3f} (tolerance={tolerance_fraction})" - - @pytest.mark.parametrize("order", [1, 2, 3]) - def test_order_sharpens_response(self, order): - """Higher order makes bandpass narrower.""" - scale = 1.0 - x = np.linspace(0.01, 10, 1000) - y = bandpass(x, scale=scale, order=order) - # Peak value should be <= 1 (order 1 peaks at 1.0) - assert np.max(y) <= 1.0 + 1e-10 - - def test_lowpass_highpass_sum_approximation(self): - """At moderate frequencies, lowpass + highpass ≈ 1.""" - x = np.linspace(0.1, 10, 100) - lp = lowpass(x, scale=1.0) - hp = highpass(x, scale=1.0) - np.testing.assert_allclose(lp + hp, np.ones_like(x), atol=1e-10) diff --git a/sgwt/tests/test_sgma.py b/sgwt/tests/test_sgma.py index 687d19b..94c082d 100644 --- a/sgwt/tests/test_sgma.py +++ b/sgwt/tests/test_sgma.py @@ -491,21 +491,6 @@ def test_find_modes_boundary_peaks(self, sgma_engine): # Should find both boundary peaks assert modes.n_modes == 2 - def test_find_modes_damping_finite(self, sgma_engine, random_signal): - """Test that damping values are finite (not inf/nan).""" - n_time = random_signal.shape[1] - t = np.linspace(0, 5, n_time) - time_target = 2.5 - - spectrum = sgma_engine.spectrum( - random_signal, t, bus=0, time=time_target, return_complex=True - ) - - modes = sgma_engine.find_modes(spectrum, top_n=3, min_dist=1) - - # All damping values should be finite - assert np.all(np.isfinite(modes.damping)) - class TestSGMACoverage: """Additional tests to cover remaining branches.""" diff --git a/sgwt/tests/test_util.py b/sgwt/tests/test_util.py index f67a1f7..f2a9356 100644 --- a/sgwt/tests/test_util.py +++ b/sgwt/tests/test_util.py @@ -95,11 +95,6 @@ def test_signal_is_2d_array(self, signal_name): assert S.ndim == 2 assert S.shape[1] in [2, 3] - def test_laplacian_signal_dimension_match(self): - """Laplacian and signal node counts match.""" - assert sgwt.DELAY_TEXAS.shape[0] == sgwt.COORD_TEXAS.shape[0] - assert sgwt.DELAY_USA.shape[0] == sgwt.COORD_USA.shape[0] - class TestMeshSignals: """Tests for built-in mesh signals.""" @@ -161,34 +156,6 @@ def test_mismatched_coeffs_raises(self): sgwt.ChebyKernel.from_dict(data) -class TestChebyKernelFromFunction: - """Tests for ChebyKernel.from_function edge cases.""" - - def test_zero_function_keeps_constant_term(self): - """Fitting a zero function keeps at least the constant term.""" - kern = sgwt.ChebyKernel.from_function(lambda x: np.zeros_like(x), order=5, spectrum_bound=1.0) - assert kern.C.shape[0] >= 1 - - def test_multioutput_function_preserves_2d_coeffs(self): - """Fitting a multi-output function preserves 2D coefficient structure.""" - # Function returning 2D array (multi-output) - def multi_func(x): - return np.column_stack([np.exp(-x), np.sin(x)]) - - kern = sgwt.ChebyKernel.from_function(multi_func, order=5, spectrum_bound=4.0) - # Should have 2 dimensions (one per output) - assert kern.C.shape[1] == 2 - # Verify evaluation works for both outputs - x_test = np.linspace(0, 4, 10) - result = kern.evaluate(x_test) - assert result.shape == (10, 2) - - @pytest.mark.parametrize("order", [0, -5]) - def test_invalid_order_raises_valueerror(self, order): - """Order < 1 raises ValueError with descriptive message.""" - with pytest.raises(ValueError, match="Order must be >= 1"): - sgwt.ChebyKernel.from_function(lambda x: x, order=order, spectrum_bound=1.0) - class TestMatLoader: """Tests for _mat_loader edge cases.""" @@ -276,30 +243,6 @@ def test_returns_positive_value(self, small_laplacian): assert bound > min_bound, \ f"Expected spectral bound >{min_bound}, got {bound}" - def test_bound_exceeds_max_eigenvalue(self, small_laplacian): - """Bound is >= largest eigenvalue (with small margin).""" - from scipy.sparse.linalg import eigsh - bound = sgwt.estimate_spectral_bound(small_laplacian) - # Compute actual max eigenvalue - max_eig = eigsh(small_laplacian.astype(float), k=1, which='LM', return_eigenvectors=False)[0] - assert bound >= max_eig * 0.99 # allow small numerical tolerance - - -class TestChebyKernelFromFunctionOnGraph: - """Tests for ChebyKernel.from_function_on_graph convenience method.""" - - def test_creates_kernel_from_graph(self, small_laplacian): - """from_function_on_graph estimates spectral bound and fits kernel.""" - from sgwt.util import ChebyKernel - kernel = ChebyKernel.from_function_on_graph( - small_laplacian, lambda x: np.exp(-x), order=10 - ) - # Should produce at least 1 Chebyshev coefficient - assert kernel.C.shape[0] > 0, "Kernel should have at least one Chebyshev coefficient" - # Spectral bound should be positive and reasonable - assert kernel.spectrum_bound > 0.01, \ - f"Expected reasonable spectral bound, got {kernel.spectrum_bound}" - class TestChebyKernelEvaluate: """Tests for ChebyKernel.evaluate method.""" @@ -315,16 +258,6 @@ def test_evaluate_multidimensional(self): assert result.shape == (3, 2) -class TestImpulse: - """Tests for impulse signal generator.""" - - def test_impulse_creates_correct_signal(self, small_laplacian): - """impulse creates signal with 1 at specified vertex.""" - signal = sgwt.impulse(small_laplacian, n=2, n_timesteps=5) - assert signal.shape == (small_laplacian.shape[0], 5) - assert signal[2, 0] == 1.0 - assert np.sum(signal[:, 0]) == 1.0 - class TestPlyParsing: """Tests for PLY file parsing functions.""" @@ -447,14 +380,6 @@ def test_load_ply_xyz(self, fixture_name, request): assert np.allclose(xyz[0], [0.0, 0.0, 0.0]) assert np.allclose(xyz[1], [1.0, 0.0, 0.0]) - def test_laplacian_xyz_consistency(self, ascii_ply_file): - """Laplacian and XYZ have consistent vertex counts.""" - from sgwt.util import load_ply_laplacian, load_ply_xyz - L = load_ply_laplacian(ascii_ply_file) - xyz = load_ply_xyz(ascii_ply_file) - - assert L.shape[0] == xyz.shape[0] - def test_parse_ply_with_blank_lines_in_header(self, tmp_path): """_parse_ply handles blank lines in header.""" from sgwt.util import _parse_ply @@ -576,70 +501,6 @@ def test_invalid_attribute_raises(self): _ = sgwt.util.NONEXISTENT_RESOURCE_NAME -class TestChebyKernelCoverage: - """Additional tests for ChebyKernel edge cases.""" - - def test_from_function_all_negligible_coefficients(self): - """Fitting a function where all higher-order coefficients are negligible.""" - # A constant function should result in only the constant term being kept - kern = sgwt.ChebyKernel.from_function( - lambda x: np.full_like(x, 1e-20), # Nearly zero constant - order=10, - spectrum_bound=1.0 - ) - # Should keep at least the constant term - assert kern.C.shape[0] >= 1 - - @pytest.mark.parametrize("sampling", ['linear', 'quadratic', 'logarithmic']) - def test_from_function_sampling_strategies(self, sampling): - """Test from_function with various sampling strategies.""" - kern = sgwt.ChebyKernel.from_function( - lambda x: np.exp(-x), - order=5, - spectrum_bound=2.0, - sampling=sampling - ) - assert kern.C.shape[0] > 0 - # Verify the kernel approximates reasonably well - x_test = np.linspace(0, 2.0, 20) - result = kern.evaluate(x_test) - expected = np.exp(-x_test) - np.testing.assert_allclose(result.flatten(), expected, atol=0.1) - - def test_from_function_adaptive_fitting(self): - """Test from_function with adaptive order selection.""" - kern = sgwt.ChebyKernel.from_function( - lambda x: np.exp(-x), - order=5, # Starting order - spectrum_bound=2.0, - adaptive=True, - target_error=0.01, - max_order=50 - ) - assert kern.C.shape[0] > 0 - # Adaptive fitting should find appropriate order - x_test = np.linspace(0, 2.0, 100) - result = kern.evaluate(x_test) - expected = np.exp(-x_test) - # Should achieve target error approximately - rel_error = np.max(np.abs(result.flatten() - expected) / np.maximum(np.abs(expected), 1e-15)) - assert rel_error < 0.1 # Allow some slack in convergence - - def test_from_function_adaptive_reaches_max_order(self): - """Test adaptive fitting that hits max_order.""" - # Use a simple function but with impossibly tight target - kern = sgwt.ChebyKernel.from_function( - lambda x: np.exp(-x), - order=5, - spectrum_bound=2.0, - adaptive=True, - target_error=1e-20, # Impossibly tight - will hit max_order - max_order=10 # Very low max to finish quickly - ) - # Should still produce a valid kernel even if target not met - assert kern.C.shape[0] > 0 - - class TestListGraphs: """Tests for list_graphs utility function.""" diff --git a/sgwt/tests/test_vfit.py b/sgwt/tests/test_vfit.py new file mode 100644 index 0000000..93a2200 --- /dev/null +++ b/sgwt/tests/test_vfit.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +""" +Tests for the Vector Fitting module (fitting.vfit). +""" +import numpy as np +import pytest + +import sgwt +from sgwt.fitting import VFModel + + +class TestVFModel: + """Tests for VFModel — Vector Fitting rational approximation.""" + + def test_fit_recovers_known_rational(self): + """Recovers poles, residues, d, and e from a known rational function.""" + s = 1j * np.linspace(1, 100, 300) + # f(s) = 5 + 0.1*s + 1/(s+3) + 2/(s+20) + f = 5.0 + 0.1 * s + 1.0 / (s + 3) + 2.0 / (s + 20) + model = VFModel.fit(s, f, n_poles=2, fit_d=True, fit_e=True) + + recovered = sorted(model.poles.real) + assert abs(recovered[0] + 20) < 2.0 + assert abs(recovered[1] + 3) < 2.0 + assert abs(model.d[0].real - 5.0) < 1.0 + assert abs(model.e[0].real - 0.1) < 0.05 + assert model.rmse < 1e-3 + assert '2 poles' in repr(model) + + def test_fit_multi_channel(self): + """Multi-channel response produces correct residue shape.""" + s = 1j * np.linspace(1, 50, 200) + f = np.column_stack([1.0 / (s + 3), 2.0 / (s + 10)]) + model = VFModel.fit(s, f, n_poles=2) + assert model.residues.shape[1] == 2 + assert model.d.shape == (2,) + assert model.rmse < 1e-2 + + def test_fit_without_direct_term(self): + """fit_d=False omits constant term from the model.""" + s = 1j * np.linspace(1, 50, 200) + f = 1.0 / (s + 5) + model = VFModel.fit(s, f, n_poles=1, fit_d=False) + assert np.allclose(model.d, 0) + + def test_fit_transposed_input(self): + """Handles f with shape (A, M) instead of (M, A).""" + s = 1j * np.linspace(1, 50, 200) + f = (1.0 / (s + 5)).reshape(1, -1) # (1, 200) + model = VFModel.fit(s, f, n_poles=1) + assert model.rmse < 1e-3 + + def test_kernel_returns_constrained_vfkernel(self): + """kernel() returns VFKernel with real negative poles.""" + lam = np.linspace(0.01, 100, 500) + g = 1.0 / (lam + 1) + K = VFModel.kernel(lam, g, n_poles=4) + assert isinstance(K, sgwt.VFKernel) + assert np.allclose(K.Q.imag, 0) + assert np.all(K.Q.real < 0) + assert K.R.shape[0] == 4 diff --git a/sgwt/util.py b/sgwt/util.py index 925680c..4bdece2 100644 --- a/sgwt/util.py +++ b/sgwt/util.py @@ -1,6 +1,6 @@ """General Utilities -Description: Utilities for accessing built-in data, VFKern, and impulse helper function. +Description: Utilities for accessing built-in data, VFKernel, and impulse helper function. Author: Luke Lowery (lukel@tamu.edu) """ @@ -41,6 +41,8 @@ class ChebyKernel: """Coefficient matrix of shape (order + 1, n_dims).""" spectrum_bound: float """Shared upper spectrum bound for all kernels.""" + min_lambda: float = 0.0 + """Lower bound of the approximation domain.""" @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'ChebyKernel': @@ -52,154 +54,12 @@ def from_dict(cls, data: Dict[str, Any]) -> 'ChebyKernel': coeffs = [np.asarray(a.get('coeffs', [])) for a in approxs] if any(len(c) != len(coeffs[0]) for c in coeffs): raise ValueError("All 'coeffs' arrays must have the same length.") - return cls(C=np.stack(coeffs, axis=1), spectrum_bound=bound) - - @classmethod - def from_function( - cls, - f: Callable[[np.ndarray], np.ndarray], - order: int, - spectrum_bound: float, - n_samples: int = None, - sampling: str = 'chebyshev', - min_lambda: float = 0.0, - rtol: float = 1e-12, - adaptive: bool = False, - max_order: int = 500, - target_error: float = 1e-10 - ) -> 'ChebyKernel': - """Creates a ChebyKernel by fitting a vectorized function. - - Parameters - ---------- - f : Callable[[np.ndarray], np.ndarray] - The vectorized function to approximate. - order : int - Order of the Chebyshev polynomial to fit. - spectrum_bound : float - Upper bound of the function's domain. - n_samples : int, optional - Number of sample points (only used for non-Chebyshev sampling). - sampling : str, default 'chebyshev' - Sampling strategy: 'chebyshev' (optimal), 'linear', 'quadratic', 'logarithmic'. - min_lambda : float, default 0.0 - Lower bound of the sampling range. - rtol : float, default 1e-12 - Relative tolerance for truncating negligible coefficients. - adaptive : bool, default False - If True, automatically determines optimal order to achieve target_error. - max_order : int, default 500 - Maximum order for adaptive mode. - target_error : float, default 1e-10 - Target approximation error for adaptive mode. - """ - if order < 1: - raise ValueError("Order must be >= 1") - - if adaptive: - return cls._adaptive_fit(f, order, spectrum_bound, min_lambda, - sampling, rtol, max_order, target_error) - - coeffs = cls._compute_coefficients(f, order, spectrum_bound, min_lambda, - n_samples, sampling) - coeffs = cls._ensure_2d(coeffs) - coeffs = cls._truncate(coeffs, rtol) - - return cls(C=coeffs, spectrum_bound=spectrum_bound) - - @classmethod - def _compute_coefficients(cls, f, order, spectrum_bound, min_lambda, n_samples, sampling): - """Compute Chebyshev coefficients using optimal or fallback method.""" - lambda_range = spectrum_bound - min_lambda - lambda_mid = (spectrum_bound + min_lambda) / 2.0 - - if sampling == 'chebyshev': - # Chebyshev-Gauss-Lobatto nodes: optimal for polynomial interpolation - n = order + 1 - k = np.arange(n) - x_cheb = np.cos(np.pi * k / order) # Nodes in [-1, 1] - sample_x = lambda_mid + (lambda_range / 2.0) * x_cheb - f_values = cls._ensure_2d(f(sample_x)) - - # Compute coefficients via discrete orthogonality (DCT-like) - coeffs = np.zeros((n, f_values.shape[1])) - w = np.ones(n); w[0] = w[-1] = 0.5 # Endpoint weights - - for j in range(n): - T_j = np.cos(j * np.pi * k / order) - scale = 2.0 / order if 0 < j < order else 1.0 / order - coeffs[j] = scale * np.sum(w[:, None] * f_values * T_j[:, None], axis=0) - - return coeffs - - # Fallback: least-squares fitting for other sampling strategies - n_samples = n_samples or max(4 * (order + 1), 1000) - t = np.linspace(0, 1, n_samples) - - if sampling == 'quadratic': - sample_x = min_lambda + lambda_range * (t ** 2) - elif sampling == 'logarithmic': - eps = max(min_lambda * 0.001, 1e-10) - sample_x = np.exp(np.log(min_lambda + eps) + t * np.log(spectrum_bound / (min_lambda + eps))) - else: # linear - sample_x = min_lambda + lambda_range * t - - x_scaled = 2.0 * (sample_x - min_lambda) / lambda_range - 1.0 - - # Chebyshev-weighted least squares - weights = 1.0 / np.sqrt(1.0 - np.clip(x_scaled ** 2, 0, 0.9999)) - return np.polynomial.chebyshev.chebfit(x_scaled, f(sample_x), order, w=weights) - - @classmethod - def _adaptive_fit(cls, f, start_order, spectrum_bound, min_lambda, - sampling, rtol, max_order, target_error): - """Adaptively determine optimal polynomial order.""" - test_x = np.linspace(min_lambda, spectrum_bound, 1000) - f_exact = cls._ensure_2d(f(test_x)) - order = max(start_order, 8) - - while True: - coeffs = cls._ensure_2d( - cls._compute_coefficients(f, order, spectrum_bound, min_lambda, None, sampling) - ) - - # Evaluate and compute relative error - x_scaled = 2.0 * test_x / spectrum_bound - 1.0 - f_approx = np.polynomial.chebyshev.chebval(x_scaled, coeffs) - f_approx = f_approx.T if f_approx.ndim > 1 else f_approx[:, None] - - rel_error = np.max(np.abs(f_exact - f_approx) / np.maximum(np.abs(f_exact), 1e-15)) - - if rel_error <= target_error or order >= max_order: - break - order = min(int(order * 1.5) + 1, max_order) - - return cls(C=cls._truncate(coeffs, rtol), spectrum_bound=spectrum_bound) - - @classmethod - def _ensure_2d(cls, arr): - """Ensure array is 2D with shape (n, dims).""" - arr = np.atleast_1d(arr) - return arr[:, None] if arr.ndim == 1 else arr - - @classmethod - def _truncate(cls, coeffs, rtol): - """Truncate negligible higher-order coefficients.""" - threshold = rtol * np.max(np.abs(coeffs)) - row_max = np.max(np.abs(coeffs), axis=1) - significant = np.where(row_max > threshold)[0] - last_idx = significant[-1] + 1 if significant.size > 0 else 1 - return coeffs[:last_idx] - - @classmethod - def from_function_on_graph(cls, L: csc_matrix, f: Callable[[np.ndarray], np.ndarray], - order: int, **kwargs) -> 'ChebyKernel': - """Creates a ChebyKernel fitted to a graph's spectrum.""" - return cls.from_function(f, order, estimate_spectral_bound(L), **kwargs) + min_lam = data.get('min_lambda', 0.0) + return cls(C=np.stack(coeffs, axis=1), spectrum_bound=bound, min_lambda=min_lam) def _scale_x(self, x: np.ndarray) -> np.ndarray: - """Maps points from [0, spectrum_bound] to Chebyshev domain [-1, 1].""" - return (2.0 / self.spectrum_bound) * x - 1.0 + """Maps points from [min_lambda, spectrum_bound] to Chebyshev domain [-1, 1].""" + return 2.0 * (x - self.min_lambda) / (self.spectrum_bound - self.min_lambda) - 1.0 def evaluate(self, x: np.ndarray) -> np.ndarray: """Evaluates the Chebyshev approximation at given points.""" @@ -270,13 +130,13 @@ def from_dict(cls, data: Dict[str, Any]) -> 'VFKernel': ) -def impulse(lap: csc_matrix, n: int = 0, n_timesteps: int = 1) -> np.ndarray: +def impulse(L: csc_matrix, n: int = 0, n_timesteps: int = 1) -> np.ndarray: """ Generates a Dirac impulse signal at a specified vertex. Parameters ---------- - lap : csc_matrix + L : csc_matrix Graph Laplacian defining the number of vertices. n : int Index of the vertex where the impulse is applied. @@ -290,10 +150,10 @@ def impulse(lap: csc_matrix, n: int = 0, n_timesteps: int = 1) -> np.ndarray: (n_vertices, n_timesteps) with 1.0 at index n and 0.0 elsewhere. """ if n_timesteps == 1: - b: np.ndarray = np.zeros(lap.shape[0]) + b: np.ndarray = np.zeros(L.shape[0]) b[n] = 1.0 else: - b = np.zeros((lap.shape[0], n_timesteps), order='F') + b = np.zeros((L.shape[0], n_timesteps), order='F') b[n] = 1.0 return b @@ -374,7 +234,7 @@ def _mat_loader(path: str, to_csc: bool = False) -> Union[np.ndarray, csc_matrix return res.T if (res.ndim == 2 and res.shape[0] == 1) else res def _json_kern_loader(path: str) -> Dict[str, Any]: - """Loads a VFKern from a JSON file.""" + """Loads a VFKernel dictionary from a JSON file.""" with open(path, "r") as f: return jsonload(f)