Source code for gwadama.fat

"""fat.py

Frequency analysis toolkit.

"""
import warnings

import numpy as np
import scipy as sp


[docs] def highpass_filter(signal: np.ndarray, *, f_cut: int | float, f_order: int | float, fs: int) -> np.ndarray: """Apply a forward-backward digital highpass filter. Apply a forward-backward digital highpass filter to 'signal' at frequency 'f_cut' with an order of 'f_order'. Reference --------- Design: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html Filter: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.sosfiltfilt.html """ sos = sp.signal.butter(f_order, f_cut, btype='highpass', fs=fs, output='sos') filtered = sp.signal.sosfiltfilt(sos, signal) return filtered
[docs] def bandpass_filter(signal: np.ndarray, *, f_low: int | float, f_high: int | float, f_order: int | float, fs: int) -> np.ndarray: """Apply a forward-backward digital bandpass filter. Apply a forward-backward digital bandpass filter to 'signal' between frequencies 'f_low' and 'f_high' with an order of 'f_order'. Reference --------- Design: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html Filter: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.sosfiltfilt.html """ sos = sp.signal.butter(f_order, [f_low, f_high], btype='bandpass', fs=fs, output='sos') filtered = sp.signal.sosfiltfilt(sos, signal) return filtered
[docs] def instant_frequency(signal, *, fs, phase_corrections=None): """Computes the instantaneous frequency of a time-domain signal. Computes the instantaneous frequency of a time-domain signal using the central difference method, with optional phase corrections. If negative frequencies are detected, a warning is raised. Parameters ---------- signal : ndarray The input time-domain signal. fs : float The sampling frequency of the signal (in Hz). phase_corrections : list of tuples, optional A list of phase corrections. Each tuple contains: (jump_start, jump_end, correction_factor). If None, no phase correction is applied. Returns ------- inst_freq : ndarray The instantaneous frequency of the signal (in Hz). If negative frequencies are detected, a RuntimeWarning is issued. """ # Step 1: Compute the analytic signal using the Hilbert transform analytic_signal = sp.signal.hilbert(signal) # Extract the instantaneous phase inst_phase = np.unwrap(np.angle(analytic_signal)) # Step 2: Apply multiple phase corrections if provided if phase_corrections is not None: # Get the time array corresponding to the signal length time = np.arange(len(signal)) / fs for (jump_start, jump_end, correction_factor) in phase_corrections: inst_phase = correct_phase(inst_phase, np.arange(len(signal)) / fs, jump_start, jump_end, correction_factor) # Step 3: Compute the instantaneous frequency by differentiating the phase dt = 1.0 / fs inst_phase_diff = (inst_phase[2:] - inst_phase[:-2]) / (2 * dt) # Convert phase difference to frequency inst_freq = inst_phase_diff / (2.0 * np.pi) # Pad the result to match the input length inst_freq = np.pad(inst_freq, (1, 1), mode='edge') if np.any(inst_freq < 0): warnings.warn("Non-physical negative frequencies detected in the array.", RuntimeWarning) return inst_freq
[docs] def correct_phase(phase, time, jump_start, jump_end, correction_factor=1.0): """Manually correct a phase jump. Fine-tunes the manual phase correction by adjusting the phase after a phase jump. The phase after the jump is scaled by the correction factor. Parameters ---------- phase : ndarray The unwrapped phase of the signal. time : ndarray The time array corresponding to the signal. jump_start : float The time where the phase jump starts. jump_end : float The time where the phase jump ends. correction_factor : float, optional The factor to scale the phase correction for fine-tuning. Default is 1.0. Returns ------- corrected_phase : ndarray The phase with fine-tuned manual correction applied after the specified jump. """ corrected_phase = np.copy(phase) # Identify the indices corresponding to the jump start and end start_idx = np.argmin(np.abs(time - jump_start)) end_idx = np.argmin(np.abs(time - jump_end)) # Calculate the phase difference between start and end of the jump phase_diff = corrected_phase[end_idx] - corrected_phase[start_idx] # Adjust the phase after the jump by scaling the correction factor corrected_phase[end_idx:] -= phase_diff * correction_factor return corrected_phase
[docs] def snr(strain, *, at, psd=None, window=('tukey', 0.5), fbounds=None): """Compute the signal-to-noise ratio (SNR) of a signal segment. Compute the SNR of a signal with respect to a background spectrum (`psd` provided), or assuming the signal is already whitened (`psd=None`). Parameters ---------- strain : array_like Time-domain signal segment. at : float Sampling interval (seconds per sample). psd : array_like (freq_array, psd_array), optional One-sided power spectral density of the background noise. If provided, the SNR is computed using this PSD. If None, the function assumes the signal is already whitened. window : str, tuple, or array_like, optional Window to apply before the FFT. Passed to `scipy.signal.get_window` if a string/tuple. If an array, it must match the length of `strain`. fbounds : array_like (f_min, f_max), optional Frequency range (Hz) over which to compute the SNR. If None: * with `psd`, uses the min and max of the PSD frequencies, * otherwise uses (0, Nyquist). Returns ------- snr : float The computed signal-to-noise ratio. Notes ----- - The function uses a one-sided rFFT and sums over positive frequencies. - In whitened space, PSD is assumed to be identity and the formula reduces to: ``sqrt(2 * Δt * Δf * Σ |h_w(f)|²)``. - In non-whitened space, the formula used is: ``sqrt(4 * Δt² * Δf * Σ |h(f)|² / S_n(f))``. """ strain = np.asarray(strain) ns = len(strain) # Build window if isinstance(window, (tuple, str)): window = sp.signal.windows.get_window(window, ns) else: window = np.asarray(window) if window.shape != (ns,): raise ValueError("Window length must match strain length.") # rFFT hh = np.fft.rfft(strain * window) ff = np.fft.rfftfreq(ns, d=at) af = ff[1] # frequency spacing # Frequency range if psd is None: f_min, f_max = fbounds if fbounds is not None else (0, ff[-1]) else: f_min, f_max = psd[0][[0,-1]] i_min = np.searchsorted(ff, f_min, side='left') i_max = np.searchsorted(ff, f_max, side='right') hh = hh[i_min:i_max] ff = ff[i_min:i_max] # SNR if psd is None: # Whitened case sum_ = np.sum(np.abs(hh)**2) snr = np.sqrt(2 * at * af * sum_) else: # Non-whitened case psd_interp = sp.interpolate.interp1d(*psd, bounds_error=True)(ff) sum_ = np.sum(np.abs(hh)**2 / psd_interp) snr = np.sqrt(4 * at**2 * af * sum_) return snr
[docs] def find_power_excess(strain, fs, nperseg=16, noverlap=15, return_time=False): """ Estimate the index of maximum broadband power excess in a 1D signal. This function provides a fast and simple estimate of the time region where the total broadband power in the input signal is maximised. A spectrogram is computed via Welch's method, and the time slice with the highest total power (summed over all frequencies) is identified. Formally, at each time step `t`, the total power is estimated as: P(t) = ∑_f S(f, t) where S(f, t) is the estimated power spectral density at frequency `f` and time `t`. The accuracy of this estimate is limited by the spectrogram parameters chosen by the user (`nperseg`, `noverlap`), which determine the temporal resolution and frequency leakage. Parameters ---------- strain : np.ndarray Input signal (1D array). fs : int sampling frequency in Hz. nperseg : int, optional Length of each FFT segment (in samples). noverlap : int, optional Number of overlapping samples between segments. return_time : bool, optional If True, also return the estimated time (in seconds) corresponding to the peak index. Returns ------- peak_index : int Sample index (in the input array) corresponding to the centre of the most power-concentrated region. peak_time : float, optional Time in seconds of the estimated peak, assuming the first sample occurs at t=0. Only returned if `return_time=True`. Notes ----- - In real detector data, whitening or equalisation is recommended to avoid biasing the result towards dominant background frequencies. - The returned index refers to the *centre* of the peak time bin in the spectrogram, mapped back to the corresponding sample index of the original signal. - Spectrogram time resolution is limited by `nperseg` and `noverlap`; high accuracy is not guaranteed. - This method assumes only one dominant transient is present in the input. It is not suitable for detecting or resolving multiple events. """ # Compute spectrogram: PSD estimate over time f, t, Sxx = sp.signal.spectrogram( strain, fs=fs, nperseg=nperseg, noverlap=noverlap, scaling='spectrum', mode='psd', window='hann' ) # Total power per time bin (sum over all frequencies) power_per_time = Sxx.sum(axis=0) # Identify time of maximum total power peak_idx = np.argmax(power_per_time) peak_time = t[peak_idx] # Convert peak time (seconds) to sample index peak_index = int(round(peak_time * fs)) peak_index = np.clip(peak_index, 0, len(strain) - 1) if return_time: return peak_index, peak_time else: return peak_index