gwadama.tat#

tat.py

Time analysis toolkit.

gwadama.tat.convolve(strain, fir, window: str | tuple = 'hann')[source]#

Convolve a time series.

Convolve a time series with a FIR filter using the overlap-save method.

Parameters:
strainnumpy.ndarray

The input time series strain.

firnumpy.ndarray

The FIR filter coefficients.

windowstr | tuple, optional

The window function to apply to the boundaries (default: ‘hann’).

Returns:
numpy.ndarray

The convolved time series, same length as input data.

Notes

This function, based on the implementation of GWpy[R1eef69e3511f-1]_, differs in its implementation from the oaconvolve implementation of SciPy[R1eef69e3511f-2]_, which uses the overlap-add method:

  • Algorithm: Overlap-save discards edge artifacts after convolution, whereas overlap-add sums overlapping output segments.

  • Boundary Windowing: Explicitly applies a window (e.g., Hann) to the first/last N/2 samples (where N is the filter length) to suppress spectral leakage at boundaries. SciPy implementations do not modify input boundaries.

  • Edge Corruption: Intentionally tolerates edge corruption in a segment of length N/2 at both ends, while SciPy’s output validity depends on the selected mode (full/valid/same).

  • Use Case Focus: Optimized for 1D time-series processing with boundary-aware windowing. SciPy’s oaconvolve is more general-purpose, focused in computational efficiency for large N-dimensional arrays and arbitrary convolution modes.

References

[2]

SciPy’s more general implementation scipy.signal.oaconvolve().

gwadama.tat.find_merger(h: ndarray[Any, dtype[_ScalarType_co]]) int[source]#

Estimate the index of the merger in a gravitational-wave strain.

This function provides a rough estimate of the merger time index by finding the position of the maximum absolute amplitude in the time-domain strain data. It assumes that the merger approximately coincides with this peak, which is often valid for certain clean or high‑SNR simulated CBC (compact binary coalescence) signals.

Parameters:
hnumpy.ndarray

1D array containing the gravitational-wave strain. If the strain is split into polarisations, combine them in a complex array such as: h = s_plus - 1j * s_cross

Returns:
int

Index of the estimated merger position in h.

Warning

This method is ad hoc and may give misleading results on some data, particularly low-SNR or real detector data where noise fluctuations dominate the peak amplitude.

Notes

  • Assumes that the peak amplitude in the time domain corresponds to the merger, which is a simplification.

  • This function is a temporary placeholder and will be replaced in future versions by a more formal estimator (for example, a Gaussian fit around the expected merger region).

Examples

>>> h = np.array([0.0, 0.2, -0.1, 0.5, 0.3])
>>> find_merger(h)
3
gwadama.tat.find_time_origin(times: ndarray[Any, dtype[_ScalarType_co]]) int[source]#

Return the index of the element closest to zero.

Finds the position in the time array whose value is nearest to 0.

Parameters:
timesNDArray

Time array.

Returns:
int

Index position of the time origin (0).

gwadama.tat.fir_from_transfer(transfer, ntaps, window: str | tuple = 'hann', ncorner=None)[source]#

Design a Type II FIR filter given an arbitrary transfer function

Parameters:
transfernumpy.ndarray

transfer function to start from, must have at least ten samples

ntapsint

number of taps in the final filter, must be an even number

windowstr, numpy.ndarray, optional

window function to truncate with, default: 'hann' see scipy.signal.get_window() for details on acceptable formats

ncornerint, optional

number of extra samples to zero off at low frequency, default: None

Returns:
outnumpy.ndarray

A time domain FIR filter of length ntaps

References

Based on gwpy.signal.filter_design.fir_from_transfer().

gwadama.tat.gen_time_array(t0: float, t1: float, *, fs: int, length: int | None = None) ndarray[Any, dtype[floating]][source]#

Generate a time array for a given time range and sampling frequency.

Generate a time array for a given time range and sampling frequency with an optional consistency check.

Floating-point precision can cause (t1 - t0) * fs to yield an unexpected number of samples, leading to off-by-one errors in numpy.linspace(). If length is provided, the function verifies that it matches the expected length using t1 as reference. A mismatch raises will raise ValueError, ensuring consistency between the expected and actual number of samples.

If length is not provided, the function simply calls numpy.linspace() without performing the safety check.

Parameters:
t0float

Start time (inclusive).

t1float

Final time (exclusive).

fsint

sampling frequency.

lengthint, optional

If provided, must match int((t1 - t0) * fs) exactly. Acts as a safeguard against miscalculations due to floating-point precision.

Returns:
numpy.ndarray

A 1D array of evenly spaced time values from t0 (inclusive) to t1 (exclusive).

Raises:
ValueError

If length is provided and does not match the expected number of samples.

gwadama.tat.is_arithmetic_progression(arr: ndarray[Any, dtype[_ScalarType_co]], rtol=1e-05, atol=1e-08) bool[source]#

Check if the array is an arithmetic progression.

Check if the array is a progression with a constant increment, allowing for numerical tolerances.

Parameters:
arrNDArray

Input array to check.

rtolfloat, optional

Relative tolerance for comparing floating-point values. Default is 1e-5.

atolfloat, optional

Absolute tolerance for comparing floating-point values. Default is 1e-8.

Returns:
bool

True if the array is an arithmetic progression (within a specified tolerance at their increments), False otherwise.

gwadama.tat.pad_time_array(times: ndarray[Any, dtype[_ScalarType_co]], pad: int | Sequence[int]) ndarray[source]#

Extend a uniformly sampled time array by ‘pad’ number of samples.

Parameters:
times: numpy.ndarray

1D array of time samples. Must be uniformly sampled.

pad: int or Sequence of two ints

Number of samples to add. * If int, the same number of samples is added on both sides. * If Sequence of length 2, interpreted as (pad_before, pad_after).

numpy.ndarray

New time array with the specified padding, using the same time step as the input.

Raises:
ValueError

If times is not uniformly sampled.

Notes

  • The function recomputes the entire time array using numpy.linspace().

  • Due to floating-point round-off, intermediate values may differ slightly from the input.

Examples

>>> t = np.array([0.0, 0.1, 0.2])
>>> pad_time_array(t, 1)
array([-0.1, 0.0, 0.1, 0.2, 0.3])
gwadama.tat.planck(N, nleft=0, nright=0)[source]#

Return a Planck taper window.

Parameters:
Nint

Number of samples in the output window

nleftint, optional

Number of samples to taper on the left, should be less than N/2

nrightint, optional

Number of samples to taper on the right, should be less than N/2

Returns:
wndarray

The window, with the maximum value normalized to 1 and at least one end tapered smoothly to 0.

References

Based on gwpy.signal.window.planck().

[1]

McKechan, D.J.A., Robinson, C., and Sathyaprakash, B.S. (April 2010). “A tapering window for time-domain templates and simulated signals in the detection of gravitational waves from coalescing compact binaries”. Classical and Quantum Gravity 27 (8). :doi:`10.1088/0264-9381/27/8/084020`

gwadama.tat.resample(strain: ndarray[Any, dtype[_ScalarType_co]], times: ndarray[Any, dtype[_ScalarType_co]], fs: int, *, full_output: bool = True, uniform_rtol: float = 1e-06, uniform_atol: float = 1e-12, percentile: float = 90.0, f_min_factor: float = 1.25, f_cap_factor: float = 8.0, frac_den_limit: int = 8192)[source]#

Resample (possibly irregularly-sampled) ‘strain’ onto a uniform grid at target sampling rate ‘fs’ (Hz).

Logic:

  1. Validate input.

  2. If times are approximately uniform:
    • If fs_in == fs (within 0.5 Hz): return a copy aligned to the original start.

    • Else: resample_poly directly from fs_in -> fs (anti-aliasing included).

    Else (times irregular):
    1. If max instantaneous rate < fs: issue a warning and directly interpolate to fs via PCHIP (upsampling).

    2. Otherwise:
      • Pick robust f_u from the ‘percentile’ of instantaneous rates.

      • Enforce fs <= f_u <= min(max_inst_rate, f_cap_factor*fs), and f_u >= f_min_factor*fs.

      • PCHIP-uniformise at f_u, then resample_poly f_u -> fs.

Parameters:
strainnumpy.ndarray

One-dimensional input strain samples, shape (N,).

timesnumpy.ndarray

One-dimensional, strictly increasing time stamps (seconds), shape (N,). No duplicates; must be sorted ascending.

fsint

Target sampling frequency in Hz. Must be a positive integer.

full_outputbool, default=True

If True, also return the output time grid, an input-rate estimate, and the (up, down) integers used in the final polyphase step.

uniform_rtolfloat, default=1e-6

Relative tolerance for the uniform-spacing check.

uniform_atolfloat, default=1e-12

Absolute tolerance for the uniform-spacing check.

percentilefloat, default=90.0

Percentile (in [0, 100]) of instantaneous sampling rates 1/np.diff(times) used to choose the robust uniformisation rate f_u when input is irregular.

f_min_factorfloat, default=1.25

Lower bound multiplier for f_u relative to fs (i.e. f_u ≥ f_min_factor*fs) to leave margin for the FIR anti-alias filter.

f_cap_factorfloat, default=8.0

Upper bound multiplier for f_u relative to fs (i.e. f_u ≤ f_cap_factor*fs) to avoid excessively large intermediate grids. Must satisfy f_cap_factor ≥ f_min_factor. f_u is also capped by the maximum instantaneous rate.

frac_den_limitint, default=2048

Maximum allowed denominator when approximating the rate ratio with fractions.Fraction(…).limit_denominator. Larger values give a closer ratio but longer polyphase FIR filters (slower); smaller values shorten the filter at the cost of a tiny ratio error.

Returns:
y1d-array

Resampled strain at ‘fs’.

t1d-array (if full_output)

Time grid at ‘fs’, starting at times[0].

fs_inint (if full_output)

Estimated original sampling frequency (rounded to nearest integer). For irregular inputs, this is the robust uniformisation rate if used; otherwise a robust estimate from the mean spacing.

up, downint, int (if full_output)

Up/down integers used in the final polyphase step. (0, 0) if N/A.

gwadama.tat.time_array_like(array, fs=4096, t0=0.0)[source]#

Generate a time array matching the length of the input array.

Computes a time array starting from t0 with evenly spaced values based on the sampling frequency fs, matching the length of the input 1D array.

Parameters:
arrayarray_like

Input 1D array whose length determines the number of time samples.

fsfloat, optional

sampling frequency in Hz. Default is 4096.

t0float, optional

Start time in seconds. Default is 0.0.

Returns:
numpy.ndarray

A 1D array of evenly spaced time values starting at t0 with spacing 1/fs and the same length as the input array.

gwadama.tat.truncate_impulse(impulse, ntaps, window: str | tuple = 'hann')[source]#

Smoothly truncate a time domain impulse response

Parameters:
impulsenumpy.ndarray

the impulse response to start from

ntapsint

number of taps in the final filter

windowstr, numpy.ndarray, optional

window function to truncate with, default: 'hann' see scipy.signal.get_window() for details on acceptable formats

Returns:
outnumpy.ndarray

the smoothly truncated impulse response

References

Based on gwpy.signal.filter_design.truncate_impulse().

gwadama.tat.truncate_transfer(transfer, ncorner=None)[source]#

Smoothly zero the edges of a frequency domain transfer function

Parameters:
transfernumpy.ndarray

transfer function to start from, must have at least ten samples

ncornerint, optional

number of extra samples to zero off at low frequency, default: None

Returns:
outnumpy.ndarray

the smoothly truncated transfer function

References

Based on gwpy.signal.filter_design.truncate_transfer().

gwadama.tat.whiten(strain: ndarray[Any, dtype[_ScalarType_co]], *, asd: ndarray[Any, dtype[_ScalarType_co]], fs: int, flength: int, window: str | tuple = 'hann', highpass: float | None = None, normed: bool = True) ndarray[source]#

Whiten a single strain signal using a FIR filter.

Whiten a strain using the input amplitude spectral density ‘asd’ to design the FIR filter.

This is a standalone implementation of GWpy’s whiten method.[Rbff3c05209a2-1]_

Parameters:
strainNDArray

Strain data points in time domain.

asd2d-array

Amplitude spectral density assumed for the ‘set_strain’. Its components are: - asd[0] = frequency points - asd[1] = ASD points NOTE: It must have a linear and constant sampling frequency!

fsint

The sampling frequency of the strain data.

flengthint

Length (in samples) of the time-domain FIR whitening filter.

windowstr, np.ndarray, optional

window function to apply to timeseries prior to FFT, default: ‘hann’ see scipy.signal.get_window() for details on acceptable formats.

highpassfloat, optional

Highpass corner frequency (in Hz) of the FIR whitening filter.

normedbool

If True, normalizes the strains to their maximum absolute amplitude.

Returns:
strain_wNDArray

Whitened strain (in time domain).

Notes

Due to filter settle-in, a segment of length 0.5*fduration will be corrupted at the beginning and end of the output.

References