gwadama.tat#
tat.py
Time analysis toolkit.
- gwadama.tat.convolve(strain, fir, window: str | tuple = 'hann')[source]#
Convolve a time series.
Convolve a time series with a FIR filter using the overlap-save method.
- Parameters:
- strainnumpy.ndarray
The input time series strain.
- firnumpy.ndarray
The FIR filter coefficients.
- windowstr | tuple, optional
The window function to apply to the boundaries (default: ‘hann’).
- Returns:
- numpy.ndarray
The convolved time series, same length as input data.
Notes
This function, based on the implementation of GWpy[R1eef69e3511f-1]_, differs in its implementation from the oaconvolve implementation of SciPy[R1eef69e3511f-2]_, which uses the overlap-add method:
Algorithm: Overlap-save discards edge artifacts after convolution, whereas overlap-add sums overlapping output segments.
Boundary Windowing: Explicitly applies a window (e.g., Hann) to the first/last
N/2samples (whereNis the filter length) to suppress spectral leakage at boundaries. SciPy implementations do not modify input boundaries.Edge Corruption: Intentionally tolerates edge corruption in a segment of length
N/2at both ends, while SciPy’s output validity depends on the selected mode (full/valid/same).Use Case Focus: Optimized for 1D time-series processing with boundary-aware windowing. SciPy’s oaconvolve is more general-purpose, focused in computational efficiency for large N-dimensional arrays and arbitrary convolution modes.
References
[1]Based on
gwpy.timeseries.TimeSeries.convolve().[2]SciPy’s more general implementation
scipy.signal.oaconvolve().
- gwadama.tat.find_merger(h: ndarray[Any, dtype[_ScalarType_co]]) int[source]#
Estimate the index of the merger in a gravitational-wave strain.
This function provides a rough estimate of the merger time index by finding the position of the maximum absolute amplitude in the time-domain strain data. It assumes that the merger approximately coincides with this peak, which is often valid for certain clean or high‑SNR simulated CBC (compact binary coalescence) signals.
- Parameters:
- hnumpy.ndarray
1D array containing the gravitational-wave strain. If the strain is split into polarisations, combine them in a complex array such as:
h = s_plus - 1j * s_cross
- Returns:
- int
Index of the estimated merger position in h.
Warning
This method is ad hoc and may give misleading results on some data, particularly low-SNR or real detector data where noise fluctuations dominate the peak amplitude.
Notes
Assumes that the peak amplitude in the time domain corresponds to the merger, which is a simplification.
This function is a temporary placeholder and will be replaced in future versions by a more formal estimator (for example, a Gaussian fit around the expected merger region).
Examples
>>> h = np.array([0.0, 0.2, -0.1, 0.5, 0.3]) >>> find_merger(h) 3
- gwadama.tat.find_time_origin(times: ndarray[Any, dtype[_ScalarType_co]]) int[source]#
Return the index of the element closest to zero.
Finds the position in the time array whose value is nearest to 0.
- Parameters:
- timesNDArray
Time array.
- Returns:
- int
Index position of the time origin (0).
- gwadama.tat.fir_from_transfer(transfer, ntaps, window: str | tuple = 'hann', ncorner=None)[source]#
Design a Type II FIR filter given an arbitrary transfer function
- Parameters:
- transfernumpy.ndarray
transfer function to start from, must have at least ten samples
- ntapsint
number of taps in the final filter, must be an even number
- windowstr, numpy.ndarray, optional
window function to truncate with, default:
'hann'seescipy.signal.get_window()for details on acceptable formats- ncornerint, optional
number of extra samples to zero off at low frequency, default: None
- Returns:
- outnumpy.ndarray
A time domain FIR filter of length ntaps
References
Based on
gwpy.signal.filter_design.fir_from_transfer().
- gwadama.tat.gen_time_array(t0: float, t1: float, *, fs: int, length: int | None = None) ndarray[Any, dtype[floating]][source]#
Generate a time array for a given time range and sampling frequency.
Generate a time array for a given time range and sampling frequency with an optional consistency check.
Floating-point precision can cause (t1 - t0) * fs to yield an unexpected number of samples, leading to off-by-one errors in
numpy.linspace(). If length is provided, the function verifies that it matches the expected length using t1 as reference. A mismatch raises will raise ValueError, ensuring consistency between the expected and actual number of samples.If length is not provided, the function simply calls
numpy.linspace()without performing the safety check.- Parameters:
- t0float
Start time (inclusive).
- t1float
Final time (exclusive).
- fsint
sampling frequency.
- lengthint, optional
If provided, must match int((t1 - t0) * fs) exactly. Acts as a safeguard against miscalculations due to floating-point precision.
- Returns:
- numpy.ndarray
A 1D array of evenly spaced time values from t0 (inclusive) to t1 (exclusive).
- Raises:
- ValueError
If length is provided and does not match the expected number of samples.
- gwadama.tat.is_arithmetic_progression(arr: ndarray[Any, dtype[_ScalarType_co]], rtol=1e-05, atol=1e-08) bool[source]#
Check if the array is an arithmetic progression.
Check if the array is a progression with a constant increment, allowing for numerical tolerances.
- Parameters:
- arrNDArray
Input array to check.
- rtolfloat, optional
Relative tolerance for comparing floating-point values. Default is 1e-5.
- atolfloat, optional
Absolute tolerance for comparing floating-point values. Default is 1e-8.
- Returns:
- bool
True if the array is an arithmetic progression (within a specified tolerance at their increments), False otherwise.
- gwadama.tat.pad_time_array(times: ndarray[Any, dtype[_ScalarType_co]], pad: int | Sequence[int]) ndarray[source]#
Extend a uniformly sampled time array by ‘pad’ number of samples.
- Parameters:
- times: numpy.ndarray
1D array of time samples. Must be uniformly sampled.
- pad: int or Sequence of two ints
Number of samples to add. * If int, the same number of samples is added on both sides. * If Sequence of length 2, interpreted as (pad_before, pad_after).
- numpy.ndarray
New time array with the specified padding, using the same time step as the input.
- Raises:
- ValueError
If times is not uniformly sampled.
Notes
The function recomputes the entire time array using
numpy.linspace().Due to floating-point round-off, intermediate values may differ slightly from the input.
Examples
>>> t = np.array([0.0, 0.1, 0.2]) >>> pad_time_array(t, 1) array([-0.1, 0.0, 0.1, 0.2, 0.3])
- gwadama.tat.planck(N, nleft=0, nright=0)[source]#
Return a Planck taper window.
- Parameters:
- Nint
Number of samples in the output window
- nleftint, optional
Number of samples to taper on the left, should be less than N/2
- nrightint, optional
Number of samples to taper on the right, should be less than N/2
- Returns:
- wndarray
The window, with the maximum value normalized to 1 and at least one end tapered smoothly to 0.
References
Based on
gwpy.signal.window.planck().[1]McKechan, D.J.A., Robinson, C., and Sathyaprakash, B.S. (April 2010). “A tapering window for time-domain templates and simulated signals in the detection of gravitational waves from coalescing compact binaries”. Classical and Quantum Gravity 27 (8). :doi:`10.1088/0264-9381/27/8/084020`
- gwadama.tat.resample(strain: ndarray[Any, dtype[_ScalarType_co]], times: ndarray[Any, dtype[_ScalarType_co]], fs: int, *, full_output: bool = True, uniform_rtol: float = 1e-06, uniform_atol: float = 1e-12, percentile: float = 90.0, f_min_factor: float = 1.25, f_cap_factor: float = 8.0, frac_den_limit: int = 8192)[source]#
Resample (possibly irregularly-sampled) ‘strain’ onto a uniform grid at target sampling rate ‘fs’ (Hz).
Logic:
Validate input.
- If times are approximately uniform:
If fs_in == fs (within 0.5 Hz): return a copy aligned to the original start.
Else: resample_poly directly from fs_in -> fs (anti-aliasing included).
- Else (times irregular):
If max instantaneous rate < fs: issue a warning and directly interpolate to fs via PCHIP (upsampling).
- Otherwise:
Pick robust f_u from the ‘percentile’ of instantaneous rates.
Enforce fs <= f_u <= min(max_inst_rate, f_cap_factor*fs), and f_u >= f_min_factor*fs.
PCHIP-uniformise at f_u, then resample_poly f_u -> fs.
- Parameters:
- strainnumpy.ndarray
One-dimensional input strain samples, shape (N,).
- timesnumpy.ndarray
One-dimensional, strictly increasing time stamps (seconds), shape (N,). No duplicates; must be sorted ascending.
- fsint
Target sampling frequency in Hz. Must be a positive integer.
- full_outputbool, default=True
If True, also return the output time grid, an input-rate estimate, and the (up, down) integers used in the final polyphase step.
- uniform_rtolfloat, default=1e-6
Relative tolerance for the uniform-spacing check.
- uniform_atolfloat, default=1e-12
Absolute tolerance for the uniform-spacing check.
- percentilefloat, default=90.0
Percentile (in [0, 100]) of instantaneous sampling rates 1/np.diff(times) used to choose the robust uniformisation rate f_u when input is irregular.
- f_min_factorfloat, default=1.25
Lower bound multiplier for f_u relative to fs (i.e. f_u ≥ f_min_factor*fs) to leave margin for the FIR anti-alias filter.
- f_cap_factorfloat, default=8.0
Upper bound multiplier for f_u relative to fs (i.e. f_u ≤ f_cap_factor*fs) to avoid excessively large intermediate grids. Must satisfy f_cap_factor ≥ f_min_factor. f_u is also capped by the maximum instantaneous rate.
- frac_den_limitint, default=2048
Maximum allowed denominator when approximating the rate ratio with fractions.Fraction(…).limit_denominator. Larger values give a closer ratio but longer polyphase FIR filters (slower); smaller values shorten the filter at the cost of a tiny ratio error.
- Returns:
- y1d-array
Resampled strain at ‘fs’.
- t1d-array (if full_output)
Time grid at ‘fs’, starting at times[0].
- fs_inint (if full_output)
Estimated original sampling frequency (rounded to nearest integer). For irregular inputs, this is the robust uniformisation rate if used; otherwise a robust estimate from the mean spacing.
- up, downint, int (if full_output)
Up/down integers used in the final polyphase step. (0, 0) if N/A.
- gwadama.tat.time_array_like(array, fs=4096, t0=0.0)[source]#
Generate a time array matching the length of the input array.
Computes a time array starting from t0 with evenly spaced values based on the sampling frequency fs, matching the length of the input 1D array.
- Parameters:
- arrayarray_like
Input 1D array whose length determines the number of time samples.
- fsfloat, optional
sampling frequency in Hz. Default is 4096.
- t0float, optional
Start time in seconds. Default is 0.0.
- Returns:
- numpy.ndarray
A 1D array of evenly spaced time values starting at t0 with spacing 1/fs and the same length as the input array.
- gwadama.tat.truncate_impulse(impulse, ntaps, window: str | tuple = 'hann')[source]#
Smoothly truncate a time domain impulse response
- Parameters:
- impulsenumpy.ndarray
the impulse response to start from
- ntapsint
number of taps in the final filter
- windowstr, numpy.ndarray, optional
window function to truncate with, default:
'hann'seescipy.signal.get_window()for details on acceptable formats
- Returns:
- outnumpy.ndarray
the smoothly truncated impulse response
References
Based on
gwpy.signal.filter_design.truncate_impulse().
- gwadama.tat.truncate_transfer(transfer, ncorner=None)[source]#
Smoothly zero the edges of a frequency domain transfer function
- Parameters:
- transfernumpy.ndarray
transfer function to start from, must have at least ten samples
- ncornerint, optional
number of extra samples to zero off at low frequency, default: None
- Returns:
- outnumpy.ndarray
the smoothly truncated transfer function
References
Based on
gwpy.signal.filter_design.truncate_transfer().
- gwadama.tat.whiten(strain: ndarray[Any, dtype[_ScalarType_co]], *, asd: ndarray[Any, dtype[_ScalarType_co]], fs: int, flength: int, window: str | tuple = 'hann', highpass: float | None = None, normed: bool = True) ndarray[source]#
Whiten a single strain signal using a FIR filter.
Whiten a strain using the input amplitude spectral density ‘asd’ to design the FIR filter.
This is a standalone implementation of GWpy’s whiten method.[Rbff3c05209a2-1]_
- Parameters:
- strainNDArray
Strain data points in time domain.
- asd2d-array
Amplitude spectral density assumed for the ‘set_strain’. Its components are: - asd[0] = frequency points - asd[1] = ASD points NOTE: It must have a linear and constant sampling frequency!
- fsint
The sampling frequency of the strain data.
- flengthint
Length (in samples) of the time-domain FIR whitening filter.
- windowstr, np.ndarray, optional
window function to apply to timeseries prior to FFT, default: ‘hann’ see
scipy.signal.get_window()for details on acceptable formats.- highpassfloat, optional
Highpass corner frequency (in Hz) of the FIR whitening filter.
- normedbool
If True, normalizes the strains to their maximum absolute amplitude.
- Returns:
- strain_wNDArray
Whitened strain (in time domain).
Notes
Due to filter settle-in, a segment of length 0.5*fduration will be corrupted at the beginning and end of the output.
References
[1]Based on
gwpy.timeseries.TimeSeries.whiten().