Source code for gwadama.tat

"""tat.py

Time analysis toolkit.

"""
from collections.abc import Sequence
from fractions import Fraction
import warnings

import numpy as np
from numpy.typing import NDArray
import scipy as sp
from scipy.interpolate import PchipInterpolator
from scipy.signal import resample_poly



def _build_uniform_grid(t0: float, t1: float, fs: float) -> np.ndarray:
    """Closed interval grid [t0, t1] with step 1/fs."""
    n = int(np.floor((t1 - t0) * fs)) + 1
    return np.linspace(t0, t1, n, endpoint=True)


[docs] def resample( strain: NDArray, times: NDArray, fs: int, *, full_output: bool = True, uniform_rtol: float = 1e-6, uniform_atol: float = 1e-12, percentile: float = 90.0, # robust cadence from the 90th percentile of instantaneous fs f_min_factor: float = 1.25, # ensure f_u >= f_min_factor * fs to leave FIR headroom f_cap_factor: float = 8.0, # cap f_u <= f_cap_factor * fs to avoid blow-ups frac_den_limit: int = 8192 # cap denominator when reducing rational ratios ): """ Resample (possibly irregularly-sampled) 'strain' onto a uniform grid at target sampling rate 'fs' (Hz). Logic: 1) Validate input. 2) If times are approximately uniform: - If fs_in == fs (within 0.5 Hz): return a copy aligned to the original start. - Else: resample_poly directly from fs_in -> fs (anti-aliasing included). Else (times irregular): a) If max instantaneous rate < fs: issue a warning and directly interpolate to fs via PCHIP (upsampling). b) Otherwise: - Pick robust f_u from the 'percentile' of instantaneous rates. - Enforce fs <= f_u <= min(max_inst_rate, f_cap_factor*fs), and f_u >= f_min_factor*fs. - PCHIP-uniformise at f_u, then resample_poly f_u -> fs. Parameters ---------- strain : numpy.ndarray One-dimensional input strain samples, shape (N,). times : numpy.ndarray One-dimensional, strictly increasing time stamps (seconds), shape (N,). No duplicates; must be sorted ascending. fs : int Target sampling frequency in Hz. Must be a positive integer. full_output : bool, default=True If True, also return the output time grid, an input-rate estimate, and the (up, down) integers used in the final polyphase step. uniform_rtol : float, default=1e-6 Relative tolerance for the uniform-spacing check. uniform_atol : float, default=1e-12 Absolute tolerance for the uniform-spacing check. percentile : float, default=90.0 Percentile (in [0, 100]) of instantaneous sampling rates `1/np.diff(times)` used to choose the robust uniformisation rate `f_u` when input is irregular. f_min_factor : float, default=1.25 Lower bound multiplier for `f_u` relative to `fs` (i.e. `f_u ≥ f_min_factor*fs`) to leave margin for the FIR anti-alias filter. f_cap_factor : float, default=8.0 Upper bound multiplier for `f_u` relative to `fs` (i.e. `f_u ≤ f_cap_factor*fs`) to avoid excessively large intermediate grids. Must satisfy `f_cap_factor ≥ f_min_factor`. `f_u` is also capped by the maximum instantaneous rate. frac_den_limit : int, default=2048 Maximum allowed denominator when approximating the rate ratio with `fractions.Fraction(...).limit_denominator`. Larger values give a closer ratio but longer polyphase FIR filters (slower); smaller values shorten the filter at the cost of a tiny ratio error. Returns ------- y : 1d-array Resampled strain at 'fs'. t : 1d-array (if full_output) Time grid at 'fs', starting at times[0]. fs_in : int (if full_output) Estimated original sampling frequency (rounded to nearest integer). For irregular inputs, this is the robust uniformisation rate if used; otherwise a robust estimate from the mean spacing. up, down : int, int (if full_output) Up/down integers used in the final polyphase step. (0, 0) if N/A. """ # --- Basic validations if not isinstance(times, np.ndarray): raise TypeError("'times' must be a NumPy array.") if not isinstance(strain, np.ndarray): raise TypeError("'strain' must be a NumPy array.") if times.ndim != 1 or strain.ndim != 1 or len(times) != len(strain): raise ValueError("'times' and 'strain' must be 1D arrays of equal length.") if fs <= 0: raise ValueError("Target 'fs' must be a positive integer.") if len(times) < 3: # Need at least 3 points to sensibly infer cadence / interpolate raise ValueError("Need at least 3 samples to resample.") # Ensure strictly increasing times (and no duplicates) if not np.all(np.diff(times) > 0): raise ValueError( "'times' must be strictly increasing (no duplicates, sorted " "ascending)." ) # --- Uniformity check is_uni = is_arithmetic_progression(times, rtol=uniform_rtol, atol=uniform_atol) t0, t1 = float(times[0]), float(times[-1]) # Helper: rational up/down from two (positive) rates def _ratio_ud(f_out: float, f_in: float) -> tuple[int, int]: r = Fraction(f_out/f_in).limit_denominator(frac_den_limit) return r.numerator, r.denominator # ---- Case A: approximately uniform input ---- if is_uni: fs_in = float(1 / np.mean(np.diff(times))) # cast from np.floating to float # Sanity check: if input rate < target, warn (we're effectively # upsampling) if fs_in + 0.5 < fs: warnings.warn( f"Input sampling rate ({fs_in:.3f} Hz) is below target ({fs} Hz). " "Proceeding with upsampling; no anti-alias filtering is needed." ) # If already at the target rate (within ~0.5 Hz), just align to exact # integer fs if abs(fs_in - fs) < 0.5: t = _build_uniform_grid(t0, t1, fs=float(fs)) y = PchipInterpolator(times, strain, extrapolate=False)(t) if full_output: return y, t, int(round(fs_in)), 0, 0 return y # Proper rate conversion with polyphase FIR up, down = _ratio_ud(fs, fs_in) # Align a uniform grid at the original cadence t_u = _build_uniform_grid(t0, t1, fs=fs_in) # Interpolate once onto that grid y_u = PchipInterpolator(times, strain, extrapolate=False)(t_u) y = resample_poly(y_u, up, down) # Build output time grid (anchored at t0, step 1/fs) t = t0 + np.arange(len(y)) / fs if full_output: return y, t, int(round(fs_in)), up, down return y # ---- Case B: irregular input ---- dt = np.diff(times) fs_inst = 1.0 / dt fs_inst_max = float(np.max(fs_inst)) # maximum instantaneous sampling rate # If even the maximum instantaneous rate is below the target, we can only # upsample → direct PCHIP to fs if fs_inst_max < fs: warnings.warn( f"Maximum instantaneous sampling rate ({fs_inst_max:.3f} Hz) is " f"below target ({fs} Hz). Directly interpolating to the target " "grid; anti-alias filtering is unnecessary." ) t = _build_uniform_grid(t0, t1, fs=float(fs)) y = PchipInterpolator(times, strain, extrapolate=False)(t) if full_output: # Use a robust estimate of the original cadence for reporting fs_in_report = 1 / np.mean(dt) return y, t, int(round(fs_in_report)), 0, 0 return y # Otherwise, choose a robust uniformisation rate f_u, e.g. 90th percentile # of instantaneous fs. Start from this robust value. f_u = float(np.percentile(fs_inst, percentile)) # Enforce lower bound: keep some headroom for the FIR low-pass (avoid # too-low uniformisation) f_u = max(f_u, f_min_factor * float(fs)) # Enforce practical cap relative to the target to avoid huge temporary arrays f_u = min(f_u, f_cap_factor * float(fs)) # Never exceed the actual max instantaneous rate f_u = min(f_u, fs_inst_max) # Edge case: numerical tie with fs (avoid zero-length filters/degenerate # ratios). If the rounding pushed us down, bump slightly if f_u < fs: f_u = float(fs) t_u = _build_uniform_grid(t0, t1, fs=f_u) y_u = PchipInterpolator(times, strain, extrapolate=False)(t_u) up, down = _ratio_ud(float(fs), f_u) y = resample_poly(y_u, up, down) # Output time grid at fs t = t0 + np.arange(len(y)) / fs if full_output: return y, t, int(round(f_u)), up, down return y
[docs] def gen_time_array( t0: float, t1: float, *, fs: int, length: int|None = None ) -> NDArray[np.floating]: """Generate a time array for a given time range and sampling frequency. Generate a time array for a given time range and sampling frequency with an optional consistency check. Floating-point precision can cause `(t1 - t0) * fs` to yield an unexpected number of samples, leading to off-by-one errors in :func:`numpy.linspace`. If `length` is provided, the function verifies that it matches the expected length using `t1` as reference. A mismatch raises will raise `ValueError`, ensuring consistency between the expected and actual number of samples. If `length` is not provided, the function simply calls :func:`numpy.linspace` without performing the safety check. Parameters ---------- t0 : float Start time (inclusive). t1 : float Final time (exclusive). fs : int sampling frequency. length : int, optional If provided, must match `int((t1 - t0) * fs)` exactly. Acts as a safeguard against miscalculations due to floating-point precision. Returns ------- numpy.ndarray A 1D array of evenly spaced time values from `t0` (inclusive) to `t1` (exclusive). Raises ------ ValueError If `length` is provided and does not match the expected number of samples. """ expected_length = int((t1 - t0) * fs) if length is not None and length != expected_length: raise ValueError( f"Inconsistent input: Expected {expected_length} samples, got {length}." ) return np.linspace(t0, t1, expected_length, endpoint=False)
[docs] def time_array_like(array, fs=4096, t0=0.0): """Generate a time array matching the length of the input array. Computes a time array starting from `t0` with evenly spaced values based on the sampling frequency `fs`, matching the length of the input 1D array. Parameters ---------- array : array_like Input 1D array whose length determines the number of time samples. fs : float, optional sampling frequency in Hz. Default is 4096. t0 : float, optional Start time in seconds. Default is 0.0. Returns ------- numpy.ndarray A 1D array of evenly spaced time values starting at `t0` with spacing `1/fs` and the same length as the input array. """ n = len(array) return np.linspace(t0, t0+n/fs, n, endpoint=False)
[docs] def pad_time_array(times: NDArray, pad: int|Sequence[int]) -> np.ndarray: """Extend a uniformly sampled time array by 'pad' number of samples. Parameters ---------- times: numpy.ndarray 1D array of time samples. Must be uniformly sampled. pad: int or Sequence of two ints Number of samples to add. * If int, the same number of samples is added on both sides. * If Sequence of length 2, interpreted as (pad_before, pad_after). numpy.ndarray New time array with the specified padding, using the same time step as the input. Raises ------ ValueError If `times` is not uniformly sampled. Notes ----- - The function recomputes the entire time array using :func:`numpy.linspace`. - Due to floating-point round-off, intermediate values may differ slightly from the input. Examples -------- >>> t = np.array([0.0, 0.1, 0.2]) >>> pad_time_array(t, 1) array([-0.1, 0.0, 0.1, 0.2, 0.3]) """ if not is_arithmetic_progression(times): raise ValueError("Input time array must be uniformly sampled.") if isinstance(pad, int): pad0, pad1 = pad, pad else: pad0, pad1 = pad length = len(times) + pad0 + pad1 dt = times[1] - times[0] t0 = times[0] - pad0*dt t1 = t0 + (length-1)*dt return np.linspace(t0, t1, length)
[docs] def find_time_origin(times: NDArray) -> int: """Return the index of the element closest to zero. Finds the position in the time array whose value is nearest to 0. Parameters ---------- times : NDArray Time array. Returns ------- int Index position of the time origin (0). """ return int(np.argmin(np.abs(times)))
[docs] def find_merger(h: NDArray) -> int: """Estimate the index of the merger in a gravitational-wave strain. This function provides a rough estimate of the merger time index by finding the position of the maximum absolute amplitude in the time-domain strain data. It assumes that the merger approximately coincides with this peak, which is often valid for certain clean or high‑SNR simulated CBC (compact binary coalescence) signals. Parameters ---------- h : numpy.ndarray 1D array containing the gravitational-wave strain. If the strain is split into polarisations, combine them in a complex array such as: ``h = s_plus - 1j * s_cross`` Returns ------- int Index of the estimated merger position in `h`. Warnings -------- This method is **ad hoc** and may give misleading results on some data, particularly low-SNR or real detector data where noise fluctuations dominate the peak amplitude. Notes ----- * Assumes that the peak amplitude in the time domain corresponds to the merger, which is a simplification. * This function is a **temporary placeholder** and will be replaced in future versions by a more formal estimator (for example, a Gaussian fit around the expected merger region). Examples -------- >>> h = np.array([0.0, 0.2, -0.1, 0.5, 0.3]) >>> find_merger(h) 3 """ return int(np.argmax(np.abs(h)))
[docs] def planck(N, nleft=0, nright=0): """Return a Planck taper window. Parameters ---------- N : `int` Number of samples in the output window nleft : `int`, optional Number of samples to taper on the left, should be less than `N/2` nright : `int`, optional Number of samples to taper on the right, should be less than `N/2` Returns ------- w : `ndarray` The window, with the maximum value normalized to 1 and at least one end tapered smoothly to 0. References ---------- Based on :func:`gwpy.signal.window.planck`. .. [1] McKechan, D.J.A., Robinson, C., and Sathyaprakash, B.S. (April 2010). "A tapering window for time-domain templates and simulated signals in the detection of gravitational waves from coalescing compact binaries". Classical and Quantum Gravity 27 (8). :doi:`10.1088/0264-9381/27/8/084020` """ w = np.ones(N, dtype=float) if nleft: k = np.arange(1, nleft) z = nleft * (1/k + 1/(k - nleft)) w[1:nleft] *= sp.special.expit(-z) # sigmoid w[0] = 0 if nright: k = np.arange(1, nright) z = -nright * (1/(k - nright) + 1/k) w[-nright:-1] *= sp.special.expit(-z) w[-1] = 0 return w
[docs] def truncate_transfer(transfer, ncorner=None): """Smoothly zero the edges of a frequency domain transfer function Parameters ---------- transfer : `numpy.ndarray` transfer function to start from, must have at least ten samples ncorner : `int`, optional number of extra samples to zero off at low frequency, default: `None` Returns ------- out : `numpy.ndarray` the smoothly truncated transfer function References ---------- Based on :func:`gwpy.signal.filter_design.truncate_transfer`. """ nsamp = transfer.size ncorner = ncorner or 0 out = transfer.copy() out[:ncorner] = 0 out[ncorner:] *= planck(nsamp-ncorner, nleft=5, nright=5) return out
[docs] def truncate_impulse(impulse, ntaps, window: str|tuple = 'hann'): """Smoothly truncate a time domain impulse response Parameters ---------- impulse : `numpy.ndarray` the impulse response to start from ntaps : `int` number of taps in the final filter window : `str`, `numpy.ndarray`, optional window function to truncate with, default: ``'hann'`` see :func:`scipy.signal.get_window` for details on acceptable formats Returns ------- out : `numpy.ndarray` the smoothly truncated impulse response References ---------- Based on :func:`gwpy.signal.filter_design.truncate_impulse`. """ # Set up window: if isinstance(window, np.ndarray): if window.ndim != 1: raise ValueError("`window` must be a 1‑D array") if window.size != ntaps: raise ValueError("`window` length must equal `ntaps`") else: window = sp.signal.get_window(window, ntaps) out = impulse.copy() trunc_start = ntaps // 2 trunc_stop = out.size - trunc_start out[:trunc_start] *= window[trunc_start:] out[trunc_stop:] *= window[:trunc_start] out[trunc_start:trunc_stop] = 0 return out
[docs] def fir_from_transfer(transfer, ntaps, window: str|tuple = 'hann', ncorner=None): """Design a Type II FIR filter given an arbitrary transfer function Parameters ---------- transfer : `numpy.ndarray` transfer function to start from, must have at least ten samples ntaps : `int` number of taps in the final filter, must be an even number window : `str`, `numpy.ndarray`, optional window function to truncate with, default: ``'hann'`` see :func:`scipy.signal.get_window` for details on acceptable formats ncorner : `int`, optional number of extra samples to zero off at low frequency, default: `None` Returns ------- out : `numpy.ndarray` A time domain FIR filter of length `ntaps` References ---------- Based on :func:`gwpy.signal.filter_design.fir_from_transfer`. """ # truncate and highpass the transfer function transfer = truncate_transfer(transfer, ncorner=ncorner) # compute and truncate the impulse response impulse = np.fft.irfft(transfer) impulse = truncate_impulse(impulse, ntaps=ntaps, window=window) # wrap around and normalise to construct the filter out = np.roll(impulse, shift=ntaps//2-1)[:ntaps] return out
[docs] def convolve(strain, fir, window: str|tuple = 'hann'): """Convolve a time series. Convolve a time series with a FIR filter using the overlap-save method. Parameters ---------- strain : numpy.ndarray The input time series strain. fir : numpy.ndarray The FIR filter coefficients. window : str | tuple, optional The window function to apply to the boundaries (default: 'hann'). Returns ------- numpy.ndarray The convolved time series, same length as input data. Notes ----- This function, based on the implementation of GWpy[1]_, differs in its implementation from the `oaconvolve` implementation of SciPy[2]_, which uses the overlap-add method: - **Algorithm**: Overlap-save discards edge artifacts after convolution, whereas overlap-add sums overlapping output segments. - **Boundary Windowing**: Explicitly applies a window (e.g., Hann) to the first/last ``N/2`` samples (where ``N`` is the filter length) to suppress spectral leakage at boundaries. SciPy implementations do not modify input boundaries. - **Edge Corruption**: Intentionally tolerates edge corruption in a segment of length ``N/2`` at both ends, while SciPy's output validity depends on the selected mode (`full`/`valid`/`same`). - **Use Case Focus**: Optimized for 1D time-series processing with boundary-aware windowing. SciPy's `oaconvolve` is more general-purpose, focused in computational efficiency for large N-dimensional arrays and arbitrary convolution modes. References ---------- .. [1] Based on :meth:`gwpy.timeseries.TimeSeries.convolve`. .. [2] SciPy's more general implementation :func:`scipy.signal.oaconvolve`. """ pad = int(np.ceil(len(fir) / 2)) nfft = min(8 * len(fir), len(strain)) # Apply window to the boundaries of the input data window_arr = sp.signal.get_window(window, len(fir)) padded_data = strain.copy() padded_data[:pad] *= window_arr[:pad] padded_data[-pad:] *= window_arr[-pad:] if nfft >= len(strain) // 2: # Perform a single convolution if FFT length is sufficiently large conv = sp.signal.fftconvolve(padded_data, fir, mode='same') else: nstep = nfft - 2 * pad conv = np.zeros_like(strain) # Process the first chunk first_chunk = padded_data[:nfft] conv[:nfft-pad] = sp.signal.fftconvolve(first_chunk, fir, mode='same')[:nfft-pad] # Process middle chunks for k in range(nfft-pad, len(strain)-nfft+pad, nstep): yk = sp.signal.fftconvolve(padded_data[k-pad:k+nstep+pad], fir, mode='same') conv[k:k+yk.size-2*pad] = yk[pad:-pad] # Process the last chunk conv[-nfft+pad:] = sp.signal.fftconvolve( padded_data[-nfft:], fir, mode='same' )[-nfft+pad:] return conv
[docs] def whiten(strain: NDArray, *, asd: NDArray, fs: int, flength: int, window: str|tuple = 'hann', highpass: float|None = None, normed: bool = True) -> np.ndarray: """Whiten a single strain signal using a FIR filter. Whiten a strain using the input amplitude spectral density 'asd' to design the FIR filter. This is a standalone implementation of GWpy's whiten method.[1]_ Parameters ---------- strain : NDArray Strain data points in time domain. asd : 2d-array Amplitude spectral density assumed for the 'set_strain'. Its components are: - asd[0] = frequency points - asd[1] = ASD points NOTE: It must have a linear and constant sampling frequency! fs : int The sampling frequency of the strain data. flength : int Length (in samples) of the time-domain FIR whitening filter. window : str, np.ndarray, optional window function to apply to timeseries prior to FFT, default: 'hann' see :func:`scipy.signal.get_window` for details on acceptable formats. highpass : float, optional Highpass corner frequency (in Hz) of the FIR whitening filter. normed : bool If True, normalizes the strains to their maximum absolute amplitude. Returns ------- strain_w : NDArray Whitened strain (in time domain). Notes ----- Due to filter settle-in, a segment of length `0.5*fduration` will be corrupted at the beginning and end of the output. References ---------- .. [1] Based on :meth:`gwpy.timeseries.TimeSeries.whiten`. """ if asd.ndim != 2 or asd.shape[0] != 2: raise ValueError("'asd' must have 2 dimensions") if not is_arithmetic_progression(asd[0]): raise ValueError("frequency points in 'asd[0]' must be ascending with constant increment") if not isinstance(flength, int): raise TypeError("'flength' must be an integer") # Constant detrending strain_detrended = strain - np.mean(strain) asd_freq, asd_vals = asd dt = 1 / fs freq_target = np.fft.rfftfreq(len(strain), dt) # Assumes ASD is defined or extrapolated over full frequency range. # No check needed if extrapolated with zeros and signal is bandpass-filtered. #-------------------------------------------------------------------------- # if asd_freq[-1] < freq_target[-1]: # raise ValueError("ASD frequency range is insufficient for the strain data.") #-------------------------------------------------------------------------- # Linear interpolation of ASD if needed, assuming ASD_freq is already # covering the strain frequency range. asd_interp_function = sp.interpolate.interp1d( asd_freq, asd_vals, kind='linear', bounds_error=False, fill_value=0 ) asd_interpolated = asd_interp_function(freq_target) # Compute transfer function with np.errstate(divide='ignore'): transfer_function = np.reciprocal(asd_interpolated, where=asd_interpolated!=0) transfer_function[np.isinf(transfer_function)] = 0 # Handle division by zero # Handle highpass duration = len(strain) / fs delta_f = 1 / duration if highpass: ncorner = int(highpass / delta_f) else: ncorner = None # Create the causal FIR filter fir_filter = fir_from_transfer(transfer_function, ntaps=flength, ncorner=ncorner) # Convolve with filter strain_whitened = convolve(strain_detrended, fir_filter, window=window) strain_whitened *= np.sqrt(2 * dt) # scaling factor # Normalize if needed if normed: max_abs = np.max(np.abs(strain_whitened)) if max_abs != 0: strain_whitened /= max_abs return strain_whitened
[docs] def is_arithmetic_progression(arr: NDArray, rtol=1e-5, atol=1e-8) -> bool: """Check if the array is an arithmetic progression. Check if the array is a progression with a constant increment, allowing for numerical tolerances. Parameters ---------- arr : NDArray Input array to check. rtol : float, optional Relative tolerance for comparing floating-point values. Default is 1e-5. atol : float, optional Absolute tolerance for comparing floating-point values. Default is 1e-8. Returns ------- bool True if the array is an arithmetic progression (within a specified tolerance at their increments), False otherwise. """ if len(arr) < 2: raise ValueError step = arr[1] - arr[0] expected_last = arr[0] + step*(len(arr) - 1) if not np.isclose(arr[-1], expected_last, rtol=rtol, atol=atol): return False return np.allclose(np.diff(arr), step, rtol=rtol, atol=atol)