Source code for gwadama.datasets

"""datasets.py

Main classes to manage GW datasets.

There are two basic type of datasets, clean and injected:

- Clean datasets' classes inherit from the Base class, extending their properties
  as needed.

- Injected datasets' classes inherit from the BaseInjected class, and
  optionally from other UserDefined(Base) classes.

"""
from copy import deepcopy
import itertools
from typing import Callable
import warnings

from gwpy.timeseries import TimeSeries
import numpy as np
import pandas as pd
import scipy as sp
from scipy.interpolate import make_interp_spline as sp_make_interp_spline
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm

from . import ioo
from . import detectors
from . import dictools
from . import fat
from . import synthetic
from . import tat
from .units import *


__all__ = ['Base', 'BaseInjected', 'SyntheticWaves', 'InjectedSyntheticWaves',
           'UnlabeledWaves', 'CoReWaves', 'InjectedCoReWaves']


[docs] class Base: """Base class for all datasets. TODO: Update docstring. Any dataset made of 'clean' (noiseless) GW must inherit this class. It is designed to store strains as nested dictionaries, with each level's key identifying a class/property of the strain. Each individual strain is a 1D NDArray containing the features. By default there are two basic levels: - Class; to group up strains in categories. - Id; An unique identifier for each strain, which must exist in the metadata DataFrame as Index. Extra depths can be added, and will be thought of as modifications of the same original strains from the upper identifier level. If splitting the dataset into train and test susbsets, only combinations of (Class, Id) will be considered. NOTE: This class shall not be called directly. Use one of its subclasses. Attributes ---------- classes : dict Dict of strings and their integer labels, one per class (category). metadata : pandas.DataFrame All parameters and data related to the strains. The order is the same as inside 'strains' if unrolled to a flat list of strains up to the second depth level (the ID). The total number of different waves must be equal to `len(metadata)`; this does not include possible variations such polarizations or multiple scallings of the same waveform when performing injections. strains : dict[dict [...]] Strains stored as a nested dictionary, with each strain in an independent array to provide more flexibility with data of a wide range of lengths. - Shape: {class: {id: strain} } - The 'class' key is the name of the class, a string which must exist in the 'classes' list. - The 'id' is a unique identifier for each strain, and must exist in the index of the 'metadata' (DataFrame) attribute. - Extra depths can be added as variations of each strain, such as polarizations. labels : dict Class label of each wave ID, with shape {id: class_label}. Each ID points to the label of its class in the 'classes' attribute. Can be automatically constructed by calling the '_gen_labels()' method. max_length : int Length of the longest strain in the dataset. Remember to update it if modifying the strains length. times : dict, optional Time samples associated with the strains, following the same structure up to the second depth level: {class: {id: time_points} } Useful when the sampling rate is variable or different between strains. If None, all strains are assumed to be constantly sampled to the sampling rate indicated by the 'sample_rate' attribute. sample_rate : int, optional If the 'times' attribute is present, this value is ignored. Otherwise it is assumed all strains are constantly sampled to this value. NOTE: If dealing with variable sampling rates, avoid setting this attribute to anything other than None. random_seed : int, optional Value passed to 'sklearn.model_selection.train_test_split' to generate the Train and Test subsets. Saved for reproducibility purposes. Xtrain, Xtest : dict, optional Train and test subsets randomly split using SKLearn train_test_split function with stratified labels. Shape: {id: strain}. The 'id' corresponds to the strain's index at 'self.metadata'. They are just another views into the same data stored at 'self.strains', so no copies are performed. Ytrain, Ytest : NDArray[int], optional 1D Array containing the labels in the same order as 'Xtrain' and 'Xtest' respectively. See the attribute 'labels' for more info. id_train, id_test : NDArray[int], optional 1D Array containing the id of the signals in the same order as 'Xtrain' and 'Xtest' respectively. Caveats ------- - The additional depths in the strains nested dictionary can't be directly tracked by the metadata Dataframe. - If working with two polarizations, they can be stored with just an extra depth layer. """
[docs] def __init__(self): """Overwrite when inheriting!""" raise NotImplementedError("Base class should not be called directly.") #---------------------------------------------------------------------- # Attributes whose values must be set up during initialization. #---------------------------------------------------------------------- self.strains: dict = None self.classes: dict[str] = None self._check_classes_dict(self.classes) self.metadata: pd.DataFrame = None self.labels: dict[int] = self._gen_labels() # Number of nested layers in strains' dictionary. Keep updated always: self._dict_depth: int = dictools.get_depth(self.strains) self.max_length = self._find_max_length() self.random_seed: int = None # SKlearn train_test_split doesn't accept a Generator yet. self._track_times = False # If True, self.times must be not None. #---------------------------------------------------------------------- # Attributes whose values can be set up or otherwise left as follows. #---------------------------------------------------------------------- # Whitening related attributes. self.whitened = False self.whiten_params = {} self.nonwhiten_strains = None # Time tracking related attributes. self.sample_rate: int = None self.times: dict = None # Train/Test subset splits (views into the same 'self.strains'). # Timeseries: self.Xtrain: np.ndarray = None self.Xtest: np.ndarray = None # Labels: self.Ytrain: np.ndarray = None self.Ytest: np.ndarray = None # Indices (sorted as in train and test splits respectively): self.id_train: np.ndarray = None self.id_test: np.ndarray = None
def __str__(self): """Return a summary of the dataset.""" # Get the name of the class class_name = self.__class__.__name__ # Basic information num_classes = len(self.classes) if self.classes else 0 num_strains = len(self) if self.strains else 0 max_length = self.max_length if hasattr(self, 'max_length') else 0 sample_rate = self.sample_rate if hasattr(self, 'sample_rate') else None whitened = self.whitened if hasattr(self, 'whitened') else False train_test_split = (self.Xtrain is not None and self.Xtest is not None) # Metadata information metadata_shape = self.metadata.shape if hasattr(self, 'metadata') and self.metadata is not None else (0, 0) # Time tracking information time_tracking = self._track_times if hasattr(self, '_track_times') else False # Whitening information whitening_info = "Whitened" if whitened else "Not whitened" # Train/Test split information split_info = "Train/Test split performed" if train_test_split else "No Train/Test split" # Construct the string summary = [ f"=== {class_name} Dataset Summary ===", f" Classes: {num_classes}", f" Strains: {num_strains}", f" Max Strain Length: {max_length} samples", f" Sample Rate: {sample_rate} Hz" if sample_rate else " Sample Rate: Not specified", f" Time Tracking: {'Enabled' if time_tracking else 'Disabled'}", f" Whitening: {whitening_info}", f" Train/Test Split: {split_info}", f" Metadata Shape: {metadata_shape}", "=" * (len(class_name) + 24) # Add a separator line matching the header length ] return "\n".join(summary) def _check_classes_dict(self, classes: dict[str]): if not isinstance(classes, dict): raise TypeError("'classes' must be a dictionary") if not all(isinstance(k, str) for k in classes.keys()): raise TypeError("'classes' keys must be strings") labels = classes.values() if not all(isinstance(label, int) for label in labels): raise TypeError("'classes' values must be integers") if len(set(labels)) != len(classes): raise ValueError("'classes' values must be unique") def __len__(self): """Return the total number of strains.""" return dictools.get_number_of_elements(self.strains) def _gen_labels(self) -> dict: """Constructs the labels' dictionary. The labels attribute maps each ID to the integer value of its class, mapped in the 'classes' attribute. Returns ------- labels : dict Shape {id: class_label} for each GW in the dataset. """ labels = {} for clas, id_ in self.keys(max_depth=2): labels[id_] = self.classes[clas] return labels def _init_strains_dict(self) -> dict: return {clas: {} for clas in self.classes} def _init_times_dict(self) -> dict: return dictools._replicate_structure_nested_dict(self.strains) def _find_max_length(self) -> int: """Return the length of the longest signal present in strains.""" max_length = 0 for *_, strain in self.items(): l = len(strain) if l > max_length: max_length = l return max_length def _gen_times(self) -> dict: """Generate the time arrays associated to the strains. Assumes a constant sampling rate. Returns ------- times : dict Nested dictionary with the same shape as 'self.strains'. """ times = self._init_times_dict() for *keys, strain in self.items(): length = len(strain) t_end = (length - 1) / self.sample_rate time = np.linspace(0, t_end, length) dictools.set_value_to_nested_dict(times, keys, time) return times
[docs] def keys(self, max_depth: int = None) -> list: """Return the unrolled combinations of all strain identifiers. Return the unrolled combinations of all keys of the nested dictionary of strains by a hierarchical recursive search. It can be thought of as the extended version of Python's 'dict().keys()', although this returns a plain list. Parameters ---------- max_depth : int, optional If specified, it is the number of layers to iterate to at most in the nested 'strains' dictionary. Returns ------- keys : list The unrolled combination in a Python list. """ keys = dictools.unroll_nested_dictionary_keys(self.strains, max_depth=max_depth) return keys
[docs] def items(self): """Return a new view of the dataset's items with unrolled indices. Each iteration consists on a tuple containing all the nested keys in 'self.strains' along with the corresponding strain, (clas, id, *, strain). It can be thought of as an extension of Python's `dict.items()`. Useful to quickly iterate over all items in the dataset. Example of usage with an arbitrary number of keys in the nested dictionary of strains: ``` for *keys, strain in self.items(): print(f"Number of identifiers: {len(keys)}") print(f"Length of the strain: {len(strain)}") do_something(strain) ``` """ for indices in self.keys(): yield (*indices, self.get_strain(*indices))
[docs] def find_class(self, id): """Find which 'class' corresponds the strain 'id'. Finds the 'class' of the strain represented by the unique identifier 'id'. Parameters ---------- id : str Unique identifier of the string, that which also appears in the `metadata.index` DataFrame. Returns ------- clas : int | str Class key associated to the strain 'id'. """ return dictools._find_level0_of_level1(self.strains, id)
[docs] def get_strain(self, *indices, normalize=False) -> np.ndarray: """Get a single strain from the complete index coordinates. This is just a shortcut to avoid having to write several squared brackets. NOTE: The returned strain is not a copy; if its contents are modified, the changes will be reflected inside the 'strains' attribute. Parameters ---------- *indices : str | int The indices of the strain to retrieve. normalize : bool If True, the returned strain will be normalized to its maximum amplitude. Returns ------- strain : np.ndarray The requested strain. """ if len(indices) != self._dict_depth: raise ValueError("indices must match the depth of 'self.strains'") strain = dictools.get_value_from_nested_dict(self.strains, indices) if normalize: strain /= np.max(np.abs(strain)) return strain
[docs] def get_strains_array(self, length: int = None) -> np.ndarray: """Get all strains stacked in a zero-padded Numpy 2d-array. Stacks all signals into an homogeneous numpy array whose length (axis=1) is determined by either 'length' or, if None, by the longest strain in the subset. The remaining space is zeroed. Parameters ---------- length : int, optional Target length of the 'strains_array'. If None, the longest signal determines the length. Returns ------- strains_array : np.ndarray train subset. lengths : list Original length of each strain, following the same order as the first axis of 'train_array'. """ strains_flat = dictools.flatten_nested_dict(self.strains) strains_array, lengths = dictools.dict_to_stacked_array(strains_flat, target_length=length) return strains_array, lengths
[docs] def get_times(self, *indices) -> np.ndarray: """Get a single time array from the complete index coordinates. This is just a shortcut to avoid having to write several squared brackets. NOTE: The returned strain is not a copy; if its contents are modified, the changes will be reflected inside the 'times' attribute. """ if len(indices) != self._dict_depth: raise ValueError("indices must match the depth of 'self.strains'") return dictools.get_value_from_nested_dict(self.times, indices)
[docs] def pad_strains(self, padding: int | tuple | dict) -> None: """ Pad strains with zeros on both sides. This function pads each strain with a specific number of samples on both sides. It also updates the 'max_length' attribute to reflect the new maximum length of the padded strains. Parameters ---------- padding : int | tuple | dict The padding to apply to each strain. If padding is an integer, it will be applied at both sides of all strains. If padding is a tuple, it must be of the form (left_pad, right_pad) in samples. If padding is a dictionary, it must be of the form {id: (left_pad, right_pad)}, where id is the identifier of each strain. Notes ----- - If time arrays are present, they are also padded accordingly. """ if isinstance(padding, int): padding_d = {id: (padding, padding) for id in self.labels} elif isinstance(padding, tuple): padding_d = {id: padding for id in self.labels} elif isinstance(padding, dict): padding_d = padding else: raise ValueError("padding must be an integer, a tuple or a dictionary") for clas, id, *keys in self.keys(): strain = self.get_strain(clas, id, *keys) left_pad, right_pad = padding_d[id] # Pad the strain strain_padded = np.pad(strain, (left_pad, right_pad), mode='constant') dictools.set_value_to_nested_dict(self.strains, [clas, id, *keys], strain_padded) # Pad the corresponding time array if time tracking is enabled if self._track_times: times = self.get_times(clas, id, *keys) time_step = (times[-1] - times[0]) / (len(times) - 1) left_time_points = np.arange(times[0] - left_pad * time_step, times[0], time_step) right_time_points = np.arange(times[-1] + time_step, times[-1] + (right_pad + 1) * time_step, time_step) times_padded = np.concatenate([left_time_points, times, right_time_points]) dictools.set_value_to_nested_dict(self.times, [clas, id, *keys], times_padded) # Update the maximum strain length attribute self.max_length = self._find_max_length()
[docs] def shrink_strains(self, limits: tuple | dict) -> None: """Shrink strains to a specific interval. Shrink strains (and their associated time arrays if present) to the interval given by 'limits'. It also updates the 'max_length' attribute. Parameters ---------- limits : tuple | dict The limits of the interval to shrink to. If limits is a tuple, it must be of the form (start, end) in samples. If limits is a dictionary, it must be of the form {id: (start, end)}, where id is the identifier of each strain. NOTE: If extra layers below ID are present, they will be shrunk accordingly. """ if isinstance(limits, tuple): limits_d = {id: limits for id in self.labels} else: limits_d = limits for clas, id, *keys in self.keys(): strain = self.get_strain(clas, id, *keys) # Same shrinking limits for all possible strains below ID layer. start, end = limits_d[id] strain = strain[start:end] dictools.set_value_to_nested_dict(self.strains, [clas,id,*keys], strain) if self._track_times: times = self.get_times(clas, id, *keys) times = times[start:end] dictools.set_value_to_nested_dict(self.times, [clas,id,*keys], times) self.max_length = self._find_max_length()
[docs] def resample(self, sample_rate, verbose=False) -> None: """Resample strain and time arrays to a constant rate. This assumes time tracking either with time arrays or with the sampling rate provided during initialization, which will be used to generate the time arrays previous to the resampling. This method updates the sample_rate and the max_length. Parameters ---------- sample_rate : int The new sampling rate in Hz. verbose : bool If True, print information about the resampling. """ # Set up the time points associated to each strain in case it is not # provided. # if self._track_times: times = self.times else: if sample_rate == self.sample_rate: raise ValueError("trying to resample to the same sampling rate") if self.sample_rate is None: raise ValueError("neither time samples nor a global sampling rate were defined") self.times = self._gen_times() self._track_times = True for *keys, strain in self.items(): time = dictools.get_value_from_nested_dict(times, keys) strain_resampled, time_resampled, sf_up, factor_down = tat.resample( strain, time, sample_rate, full_output=True ) dictools.set_value_to_nested_dict(self.strains, keys, strain_resampled) dictools.set_value_to_nested_dict(times, keys, time_resampled) if verbose: print( f"Strain {keys[0]}::{keys[1]} up. to {sf_up} Hz, down by factor {factor_down}" ) self.sample_rate = sample_rate self.max_length = self._find_max_length()
[docs] def whiten(self, asd_array: np.ndarray = None, pad: int = 0, highpass: int = None, flength: float = None, normed: bool = False, verbose=False) -> None: """Whiten the strains. Calling this method performs the whitening of all strains. Optionally, strains are first zero-padded, whitened and then shrunk to their initial size. This is useful to remove the vignetting effect. NOTE: Original (non-whitened) strains will be stored in the 'nonwhiten_strains' attribute. """ if self.whitened: raise RuntimeError("dataset already whitened") if self.strains is None: raise RuntimeError("no strains have been given or generated yet") self.nonwhiten_strains = deepcopy(self.strains) loop_aux = tqdm(self.items(), total=len(self)) if verbose else self.items() for *keys, strain in loop_aux: strain_w = fat.whiten( strain, asd=asd_array, sample_rate=self.sample_rate, flength=flength, highpass=highpass, pad=pad, normed=normed ) # Update strains attribute. dictools.set_value_to_nested_dict(self.strains, keys, strain_w) self.whitened = True self.whiten_params = { "asd_array": asd_array, "pad": pad, "highpass": highpass, "flength": flength, "normed": normed } # Update side-effect attributes. if self.Xtrain is not None: self._update_train_test_subsets()
[docs] def build_train_test_subsets(self, train_size: int | float, random_seed: int = None): """Generate a random Train and Test subsets. Only indices in the 'labels' attribute are considered independent waveforms, any extra key (layer) in the 'strains' dict is treated monolithically during the shuffle. The strain values are just new views into the 'strains' attribute. The shuffling is performed by Scikit-Learn's function 'train_test_split', with stratification enabled. Parameters ---------- train_size : int | float If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train subset. If int, represents the absolute number of train waves. Ref: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html random_seed : int, optional Passed directly to 'sklearn.model_selection.train_test_split'. It is also saved in its homonymous attribute. """ match (self.random_seed, random_seed): case (int() as existing_seed, int() as new_seed): if new_seed != existing_seed: warnings.warn( "'random_seed' was already set as attribute in this instance," f" replacing it by the value {random_seed} passed as parameter." ) self.random_seed = random_seed case (None, int() as new_seed): self.random_seed = new_seed case (int(), None): pass case (None, None): warnings.warn( "'random_seed' is not being manually set; this could result" " in unreproducible results." ) indices = list(self.labels) self.id_train, self.id_test = train_test_split( indices, train_size=train_size, random_state=self.random_seed, shuffle=True, stratify=list(self.labels.values()) ) self.Xtrain, self.Ytrain = self._build_subset_strains(self.id_train) self.Xtest, self.Ytest = self._build_subset_strains(self.id_test)
def _build_subset_strains(self, indices): """Return a subset of strains and their labels based on their ID. Return a new view into 'self.strains' using the input indices (ID) as the first layer of the nested dictionary. This collapses the first layer, the class, leaving the unique identifier ID as first layer. Nevertheless, the rest of possible layers beneath 'ID' are monolithically preserved. Parameters ---------- indices : array-like The indices are w.r.t. 'self.labels'. Returns ------- strains : dict {id: strain} The id key is the strain's index at 'self.metadata'. labels : NDArray 1D Array containing the labels associated to 'strains'. """ strains = {} labels = np.empty(len(indices), dtype=int) for i, id_ in enumerate(indices): labels[i] = self.labels[id_] clas = self.find_class(id_) strains[id_] = self.strains[clas][id_] return strains, labels def _update_train_test_subsets(self): """Builds again the Train/Test subsets from the main strains attribute.""" id_train = list(self.Xtrain.keys()) id_test = list(self.Xtest.keys()) self.Xtrain, self.Ytrain = self._build_subset_strains(id_train) self.Xtest, self.Ytest = self._build_subset_strains(id_test)
[docs] def get_xtrain_array(self, length=None, classes='all'): """Get the train subset stacked in a zero-padded Numpy 2d-array. Stacks all signals in the train subset into an homogeneous numpy array whose length (axis=1) is determined by either 'length' or, if None, by the longest strain in the subset. The remaining space is zeroed. Optionally, classes can be filtered by specifying which to include with the `classes` parameter. Parameters ---------- length : int, optional Target length of the 'train_array'. If None, the longest signal determines the length. classes : str | List[str], optional Specify which classes to include. Include 'all' by default. Returns ------- train_array : np.ndarray train subset. lengths : list Original length of each strain, following the same order as the first axis of 'train_array'. """ train_subset = self.Xtrain.copy() if classes != 'all': class_labels = [self.classes[c] for c in classes] for class_int, id in zip(self.Ytrain, list(train_subset.keys())): if class_int not in class_labels: del train_subset[id] return dictools.dict_to_stacked_array(train_subset, target_length=length)
[docs] def get_xtest_array(self, length=None, classes='all'): """Get the test subset stacked in a zero-padded Numpy 2d-array. Stacks all signals in the test subset into an homogeneous numpy array whose length (axis=1) is determined by either 'length' or, if None, by the longest strain in the subset. The remaining space is zeroed. Optionally, classes can be filtered by specifying which to include with the `classes` parameter. Parameters ---------- length : int, optional classes : str | List[str], optional Specify which classes to include. Include 'all' by default. Returns ------- test_array : np.ndarray test subset. lengths : list Original length of each strain, following the same order as the first axis of 'test_array'. """ test_subset = self.Xtest.copy() if classes != 'all': class_labels = [self.classes[c] for c in classes] for class_int, id in zip(self.Ytest, list(test_subset.keys())): if class_int not in class_labels: del test_subset[id] return dictools.dict_to_stacked_array(test_subset, target_length=length)
[docs] def get_ytrain_array(self, classes='all', with_id=False, with_index=False): """Get the filtered training labels. Parameters ---------- classes : str | list[str] | 'all' The classes to include in the labels. All classes are included by default. with_id : bool If True, return also the list of related IDs. with_index : bool If True, return also the related GLOBAL indices; w.r.t. the stacked arrays returned by 'get_xtrain_array' WITHOUT filters. False by default. Returns ------- np.ndarray Filtered train labels. np.ndarray, optional IDs associated to the filtered train labels. np.ndarray, optional Indices associated to the filtered train labels. """ return self._filter_labels( self.Ytrain, list(self.Xtrain), classes, with_id=with_id, with_index=with_index )
[docs] def get_ytest_array(self, classes='all', with_id=False, with_index=False): """Get the filtered test labels. Parameters ---------- classes : str | list[str] | 'all' The classes to include in the labels. All classes are included by default. with_id : bool If True, return also the list of related IDs. with_index : bool If True, return also the related GLOBAL indices; w.r.t. the stacked arrays returned by 'get_xtest_array' WITHOUT filters. Returns ------- np.ndarray Filtered test labels. np.ndarray, optional IDs associated to the filtered test labels. np.ndarray, optional Indices associated to the filtered test labels. """ return self._filter_labels( self.Ytest, list(self.Xtest), classes, with_id=with_id, with_index=with_index )
def _filter_labels(self, labels, labels_id, classes, with_id=False, with_index=False): """Filter labels based on 'classes'. This is a helper function for 'get_ytrain_array' and 'get_ytest_array'. Parameters ---------- labels : np.ndarray The array containing the labels. labels_id : list IDs associated to the labels. classes : str | list[str] | 'all' The classes to include in the labels. All classes are included by default. with_id : bool If True, return also the related IDs. False by default. with_index : bool If True, return also the related indices w.r.t. the stacked array returned by '_stack_subset' given the strains related to 'labels' WITHOUT filters. False by default. Returns ------- filtered_labels : np.ndarray Filtered labels. filtered_ids : np.ndarray, optional IDs associated to the filtered labels. filtered_indices : np.ndarray, optional Indices associated to the filtered labels. """ if len(labels) != len(labels_id): raise ValueError("'labels' and 'labels_id' must have the same length.") if isinstance(classes, str): if classes == 'all': return (labels, labels_id) if with_id else labels else: classes = [classes] elif not isinstance(classes, list): raise TypeError("'classes' must be a string or list of strings.") filtered_labels = [] filtered_ids = [] filtered_indices = [] i = 0 for label, id in zip(labels, labels_id): if self.find_class(id) in classes: filtered_labels.append(label) filtered_ids.append(id) filtered_indices.append(i) i += 1 # Indices w.r.t. the FILTERED set!!! filtered_labels = np.array(filtered_labels) filtered_ids = np.array(filtered_ids) filtered_indices = np.array(filtered_indices) if with_id and with_index: return filtered_labels, filtered_ids, filtered_indices if with_id: return filtered_labels, filtered_ids if with_index: return filtered_labels, filtered_indices return filtered_labels
[docs] def stack_by_id(self, id_list: list, length: int = None): """Stack an subset of strains by their ID into a Numpy array. Stack an arbitrary selection of strains by their original ID into a zero-padded 2d-array. The resulting order is the same as the order of that in 'id_list'. Parameters ---------- id_list : list The IDs of the strains to be stacked. length : int, optional The target length of the stacked array. If None, the longest signal determines the length. Returns ------- stacked_signals : np.ndarray The array containing the stacked strains. lengths : list The original lengths of each strain, following the same order as the first axis of 'stacked_signals'. Notes ----- - Unlike in 'get_xtrain_array' and 'get_xtest_array', this method does not filter by 'classes' since it would be redundant, as IDs are unique. """ if not isinstance(id_list, list): raise TypeError("'id_list' must be a list of IDs.") # Collapse the Class layer. strains = {id: ds for sub_strains in self.strains.values() for id, ds in sub_strains.items()} # Filter out those not in the 'id_list'. strains = dictools.filter_nested_dict(strains, lambda k: k in id_list, layer=0) assert len(strains) == len(id_list) # Sort them to match the order in 'id_list'. strains = {id: strains[id] for id in id_list} strains = dictools.flatten_nested_dict(strains) stacked_signals, lengths = dictools.dict_to_stacked_array(strains, target_length=length) return stacked_signals, lengths
[docs] class BaseInjected(Base): """Manage an injected dataset with multiple SNR values. It is designed to store strains as nested dictionaries, with each level's key identifying a class/property of the strain. Each individual strain is a 1D NDArray containing the features. NOTE: Instances of this class or any other Class(BaseInjected) are initialized from an instance of any Class(Base) instance (clean dataset). By default there are THREE basic levels: - Class; to group up strains in categories. - Id; An unique identifier for each strain, which must exist in the metadata DataFrame as Index. - SNR; the signal-to-noise ratio at which has been injected w.r.t. a power spectral density of reference (e.g. the sensitivity of a GW detector). An extra depth can be added below, and will be treated as multiple injections at the same SNR value. This is usfeul for example to make injections at multiple noise realizations. Attributes ---------- classes : list[str] List of labels, one per class (category). metadata : pandas.DataFrame All parameters and data related to the original strains, inherited (copied) from a clean Class(Base) instance. The order is the same as inside 'strains' if unrolled to a flat list of strains up to the second depth level (the ID). The total number of different waves must be equal to `len(metadata)`; this does not include possible variations such polarizations or multiple scallings of the same waveform when performing injections. strains_clean : dict[dict] Strains inherited (copied) from a clean Class(Base) instance. This copy is kept in order to perform new injections. - Shape: {class: {id: strain} } - The 'class' key is the name of the class, a string which must exist in the 'classes' list. - The 'id' is a unique identifier for each strain, and must exist in the index of the 'metadata' (DataFrame) attribute. NOTE: These strains should be not modified. If new clean strains are needed, create a new clean dataset instance first, and then initialise this class with it. TODO: Accept extra layers in the clean_strains dictionary. strains : dict[dict] Injected trains stored as a nested dictionary, with each strain in an independent array to provide more flexibility with data of a wide range of lengths. - Shape: {class: {id: {snr: strain} } } - The 'class' key is the name of the class, a string which must exist in the 'classes' list. - The 'id' is a unique identifier for each strain, and must exist in the index of the 'metadata' (DataFrame) attribute. - The 'snr' key is an integer indicating the signal-to-noise ratio of the injection. - A fourth depth can be added below as additional injections per SNR. labels : dict Indices of the class of each wave ID, inherited from a clean Class(Base) instance, with shape {id: class_index}. Each ID points to the index of its class in the 'classes' attribute. units : str Flag indicating whether the data is in 'geometrized' or 'IS' units. times : dict, optional Time samples associated with the strains, following the same structure. Useful when the sampling rate is variable or different between strains. If None, all strains are assumed to be constantly sampled to the sampling rate indicated by the 'sample_rate' attribute. sample_rate : int Inherited from the parent Class(Base) instance. max_length : int Length of the longest strain in the dataset. Remember to update it if manually changing strains' length. random_seed : int Value passed to 'sklearn.model_selection.train_test_split' to generate the Train and Test subsets. Saved for reproducibility purposes. Also used to initialize Numpy's default RandomGenerator. rng : np.random.Generator Random number generator used for sampling the background noise. Initialized with `np.random.default_rng(random_seed)`. detector : str GW detector name. psd_ : NDArray Numerical representation of the Power Spectral Density (PSD) of the detector's sensitivity. asd_ : NDArray Numerical representation of the Amplitude Spectral Density (ASD) of the detector's sensitivity. noise : gwadama.synthetic.NonwhiteGaussianNoise Background noise instance from NonwhiteGaussianNoise. snr_list : list List of SNR values at which each signal has been injected. pad : dict Padding introduced at each SNR injection, used in case the strains will be whitened after, to remove the vigneting at edges. It is associated to SNR values because the only implemented way to pad the signals is during the signal injection. injections_per_snr : int Number of injections per SNR value. whitened : bool Flat indicating whether the dataset has been whitened. Initially will be set to False, and changed to True after calling the 'whiten' method. Once whitened, this flag will remain True, since the whitening is implemented to be irreversible instance-wise. whiten_params : dict TODO freq_cutoff : int | float Frequency cutoff below which no noise bins will be generated in the frequency space, and also used for the high-pass filter applied to clean signals before injection. freq_butter_order : int Butterworth filter order. See (https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html) for more information. Xtrain, Xtest : dict, optional Train and test subsets randomly split using SKLearn train_test_split function with stratified labels. Shape adds the SNR layer: {id: {snr: strain}}. The 'id' corresponds to the strain's index at 'self.metadata'. Ytrain, Ytest : NDArray[int], optional 1D Array containing the labels in the same order as 'Xtrain' and 'Xtest' respectively. NOTE: Does not include the SNR layer, therefore labels are not repeated. """
[docs] def __init__(self, clean_dataset: Base, *, psd: np.ndarray | Callable, detector: str, noise_length: int, freq_cutoff: int | float, freq_butter_order: int | float, whiten_params: dict = None, random_seed: int = None): """Base constructor for injected datasets. TODO: Update docstring. When inheriting from this class, it is recommended to run this method first in your __init__ function. Relevant attributes are inherited from the 'clean_dataset' instance, which can be any inherited from BaseDataset whose strains have not been injected yet. If train/test subsets are present, they too are updated when performing injections or changing units, but only through re-building them from the main 'strains' attribute using the already generated indices. Original train/test subsets from the clean dataset are not inherited. WARNING: Initializing this class does not perform the injections! For that use the method 'gen_injections'. Parameters ---------- clean_dataset : Base Instance of a Class(Base) with noiseless signals. psd : np.ndarray | Callable Power Spectral Density of the detector's sensitivity in the range of frequencies of interest. Can be given as a callable function whose argument is expected to be an array of frequencies, or as a 2d-array with shape (2, psd_length) so that ``` psd[0] = frequency_samples psd[1] = psd_samples ```. NOTE: It is also used to compute the 'asd' attribute (ASD). detector : str GW detector name. Not used, just for identification. noise_length : int Length of the background noise array to be generated for later use. It should be at least longer than the longest signal expected to be injected. freq_cutoff : int | float Frequency cutoff below which no noise bins will be generated in the frequency space, and also used for the high-pass filter applied to clean signals before injection. freq_butter_order : int | float Butterworth filter order. See (https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html) for more information. flength : int Length (in samples) of the time-domain FIR whitening filter. whiten_params : dict, optional Parameters of the whitening filter, with the following entries: - 'flength' : int Length (in samples) of the time-domain FIR whitening. - 'highpass' : float Frequency cutoff. - 'normed' : bool Normalization applied after the whitening filter. random_seed : int, optional Value passed to 'sklearn.model_selection.train_test_split' to generate the Train and Test subsets. Saved for reproducibility purposes, and also used to initialize Numpy's default RandomGenerator. """ # Inherit clean strain instance attributes. #---------------------------------------------------------------------- self.classes = clean_dataset.classes.copy() self._check_classes_dict(self.classes) self.labels = clean_dataset.labels.copy() self.metadata = deepcopy(clean_dataset.metadata) self.strains_clean = deepcopy(clean_dataset.strains) self._track_times = clean_dataset._track_times if self._track_times: self.times = deepcopy(clean_dataset.times) self.sample_rate = clean_dataset.sample_rate self.max_length = clean_dataset.max_length # Noise instance and related attributes. #---------------------------------------------------------------------- self.random_seed = random_seed self.rng = np.random.default_rng(random_seed) self.detector = detector # Highpass parameters applied when generating the noise array. self.freq_cutoff = freq_cutoff self.freq_butter_order = freq_butter_order self._psd, self.psd_array = self._setup_psd(psd) self._asd, self.asd_array = self._setup_asd_from_psd(psd) self.noise = self._generate_background_noise(noise_length) # Injection related: #---------------------------------------------------------------------- # TODO: ¿Implement the case when clean_dataset is already whitened? # It should mark it and use the clean copy of nonwhitened data instead. self.strains = None self._dict_depth = clean_dataset._dict_depth + 1 # Depth of the strains dict. self.snr_list = [] self.pad = {} # {snr: pad} self.injections_per_snr = 1 # Default value. self.whitened = False # Switched to True after calling self.whiten(). self.whiten_params = whiten_params # NOTE: I designed this while building the InjectedCoReWaves class, so # chances are this is not general enough. if whiten_params is not None: self.whiten_params.update({ 'asd_array': self.asd_array, # Referenced here again for consistency. 'pad': 0, # Signals are expected to be already padded. 'unpad': self.pad, # Referenced here again for consistency. 'highpass': self.freq_cutoff # Referenced here again for consistency. }) # Train/Test subset views: #---------------------------------------------------------------------- if clean_dataset.Xtrain is not None: self.Xtrain = {k: None for k in clean_dataset.Xtrain.keys()} self.Xtest = {k: None for k in clean_dataset.Xtest.keys()} self.Ytrain = clean_dataset.Ytrain self.Ytest = clean_dataset.Ytest self.id_train = clean_dataset.id_train self.id_test = clean_dataset.id_test else: self.Xtrain = None self.Xtest = None self.Ytrain = None self.Ytest = None self.id_train = None self.id_test = None
def __getstate__(self): """Avoid error when trying to pickle PSD and ASD interpolants. Turns out Pickle tries to serialize the PSD and ASD interpolants, however Pickle is not able to serialize encapsulated functions. This is solved by removing said functions and computing the interpolants from their array representations when unpickling. NOTE: The loss of accuracy over repeated (de)serialization using this method has not been studied, use at your own discretion. """ state = self.__dict__.copy() del state['_psd'] del state['_asd'] return state def __setstate__(self, state): """Avoid error when trying to unpickle PSD and ASD interpolants. Turns out Pickle tries to serialize the PSD and ASD interpolants, however Pickle is not able to serialize encapsulated functions. This is solved by removing said functions and computing the interpolants from their array representations when unpickling. NOTE: The loss of accuracy over repeated (de)serialization using this method has not been studied, use at your own discretion. """ _psd, _ = self._setup_psd(state['psd_array']) _asd, _ = self._setup_asd_from_psd(state['psd_array']) state['_psd'] = _psd state['_asd'] = _asd self.__dict__.update(state) def _setup_psd(self, psd: np.ndarray | Callable) -> tuple[Callable, np.ndarray]: """Setup the PSD function or array depending on the input. Setup the power spectral density function and array from any of those. """ if callable(psd): psd_fun = psd # Compute a realization of the PSD function with 16 bins per # integer frequency to ensure the numerical representation has # enough precision. freqs = np.linspace(0, self.sample_rate//2, self.sample_rate*8) psd_array = np.stack([freqs, psd(freqs)]) elif isinstance(psd, np.ndarray): # Build a spline quadratic interpolant for the input PSD array. psd_fun = sp_make_interp_spline(psd[0], psd[1], k=2) psd_array = np.asarray(psd) else: raise TypeError("'psd' type not recognized") return psd_fun, psd_array def _setup_asd_from_psd(self, psd): """Setup the ASD function or array depending on the input. Setup the amplitude spectral density function and array from any of those. """ if callable(psd): asd_fun = lambda f: np.sqrt(psd) # Compute a realization of the ASD function with 16 bins per # integer frequency to ensure the numerical representation has # enough precision. freqs = np.linspace(0, self.sample_rate//2, self.sample_rate*8) asd_array = np.stack([freqs, asd_fun(freqs)]) elif isinstance(psd, np.ndarray): # Build a spline quadratic interpolant for the input ASD array. asd_array = psd.copy() asd_array[1] = np.sqrt(psd[1]) asd_fun = sp_make_interp_spline(asd_array[0], asd_array[1], k=2) else: raise TypeError("'psd' type not recognized") return asd_fun, asd_array
[docs] def psd(self, frequencies: float | np.ndarray[float]) -> np.ndarray[float]: """Power spectral density (PSD) of the detector at given frequencies. Interpolates the PSD at the given frequencies from their array representation. If during initialization the PSD was given as its array representation, the interpolant is computed using SciPy's quadratic spline interpolant function. """ return self._psd(frequencies)
[docs] def asd(self, frequencies: float | np.ndarray[float]) -> np.ndarray[float]: """Amplitude spectral density (ASD) of the detector at given frequencies. Interpolates the ASD at the given frequencies from their array representation. If during initialization the ASD was given as its array representation, the interpolant is computed using SciPy's quadratic spline interpolant function. """ return self._asd(frequencies)
def _generate_background_noise(self, noise_length: int) -> synthetic.NonwhiteGaussianNoise: """The noise realization is generated by NonwhiteGaussianNoise.""" d: float = noise_length / self.sample_rate noise = synthetic.NonwhiteGaussianNoise( duration=d, psd=self.psd, sample_rate=self.sample_rate, rng=self.rng, freq_cutoff=self.freq_cutoff ) return noise def _init_strains_dict(self) -> dict[dict[dict]]: """Initializes the nested dictionary of strains. Initializes the nested dictionary of strains following the hierarchy in the clean strains attribute, and adding the SNR layer. """ strains_dict = dictools._replicate_structure_nested_dict(self.strains_clean) for indices in dictools.unroll_nested_dictionary_keys(strains_dict): dictools.set_value_to_nested_dict(strains_dict, indices, {}) return strains_dict
[docs] def get_times(self, *indices) -> np.ndarray: """Get a single time array from the complete index coordinates. This is just a shortcut to avoid having to write several squared brackets. NOTE: The returned strain is not a copy; if its contents are modified, the changes will be reflected inside the 'times' attribute. """ return dictools.get_value_from_nested_dict(self.times, indices)
[docs] def gen_injections(self, snr: int|float|list, pad: int = 0, randomize_noise: bool = False, random_seed: int = None, injections_per_snr: int = 1, verbose=False): """Inject all strains in simulated noise with the given SNR values. - The SNR is computed using a matched filter against the noise PSD. - If `pad > 0`, it also updates the time arrays. - If strain units are in geometrized, they will be converted first to IS, injected, and converted back to geometrized. - After each injection, applies a highpass filter at the low-cut frequency specified at __init__. - If the method 'whiten' has been already called, all further injections will automatically be whitened and their pad removed. Parameters ---------- snr : int | float | list pad : int Number of zeros to pad the signal at both ends before the injection. randomize_noise : bool If True, the noise segment is randomly chosen before the injection. This can be used to avoid having the same noise injected for all clean strains. False by default. NOTE: To avoid the possibility of repeating the same noise section in different injections, the noise realization must be reasonably large, e.g: `noise_length > n_clean_strains * self.max_length * len(snr)` random_seed : int, optional Random seed for the noise realization. Only used when randomize_noise is True. injections_per_snr : int Number of injections per SNR value. Defaults to 1. This is useful to minimize the statistical impact of the noise when performing injections at a sensitive (low) SNR. Notes ----- - If whitening is intended to be applied afterwards it is useful to pad the signal in order to avoid the window vignetting produced by the whitening itself. This pad will be cropped afterwards. - New injections are stored in the 'strains' atrribute, with the pad associated to all the injections performed at once. Even when whitening is also performed right after the injections. Raises ------ ValueError Once injections have been performed at a certain SNR value, there cannot be injected again at the same value. Trying it will trigger this exception. """ if isinstance(snr, (int, float)): snr_list = [snr] elif isinstance(snr, list): snr_list = snr else: raise TypeError(f"'{type(snr)}' is not a valid 'snr' type") if set(snr_list) & set(self.snr_list): raise ValueError("one or more SNR values are already present in the dataset") if self._track_times: # Replaced temporarily because when injecting for the first time # we need to keep the original time arrays. times_new = self.times # 1st time making injections. if self.strains is None: self.strains = self._init_strains_dict() if self._track_times: # Redo the dictionary structure to include the SNR layer. times_new = self._init_strains_dict() if randomize_noise: rng = np.random.default_rng(random_seed) if verbose: n_injections = ( dictools.get_number_of_elements(self.strains_clean) * len(snr_list) * injections_per_snr ) pbar = tqdm(total=n_injections) for clas, id_ in dictools.unroll_nested_dictionary_keys(self.strains_clean): gw_clean = self.strains_clean[clas][id_] strain_clean_padded = np.pad(gw_clean, pad) # NOTE: Do not update the metadata nor times with this pad in case # the whitening is applied immediately after the injections. # Highpass filter to the clean signal. # NOTE: The noise realization is already generated without # frequency components lower than the cutoff (they are set to # 0 during the random sampling). strain_clean_padded = fat.highpass_filter( strain_clean_padded, f_cut=self.freq_cutoff, f_order=self.freq_butter_order, sample_rate=self.sample_rate ) # Strain injections for snr_, rep in itertools.product(snr_list, range(injections_per_snr)): if randomize_noise: pos0 = rng.integers(0, len(self.noise) - len(strain_clean_padded)) else: pos0 = 0 # 'pad' is added to 'snr_offset' to compensate for the padding # which has not been updated in the 'metadata' yet. injected = self._inject( strain_clean_padded, snr_, id=id_, snr_offset=pad, pos=pos0 ) if self.whitened: injected = fat.whiten( injected, asd=self.asd_array, unpad=pad, sample_rate=self.sample_rate, # Parameters for GWpy's whiten() function: highpass=self.freq_cutoff, flength=self.flength ) if injections_per_snr == 1: self.strains[clas][id_][snr_] = injected else: dictools.set_value_to_nested_dict( self.strains, [clas, id_, snr_, rep], injected, add_missing_keys=True ) if verbose: pbar.update() # Time arrays: # - All SNR entries pointing to the SAME time array. # - Enlarge if the strains were padded and no whitening followed. if self._track_times: times_i = self.get_times(clas, id_) if pad > 0 and not self.whitened: times_i = tat.pad_time_array(times_i, pad) for snr_, rep in itertools.product(snr_list, range(injections_per_snr)): if injections_per_snr == 1: times_new[clas][id_][snr_] = times_i else: dictools.set_value_to_nested_dict( times_new, [clas, id_, snr_, rep], times_i, add_missing_keys=True ) if verbose: pbar.close() if self._track_times: self.times = times_new self.snr_list += snr_list for snr_ in snr_list: # NOTE: Even if whitening is applied (and hence the length unaltered) # pad values are still registered, just in case. self.pad[snr_] = pad self.injections_per_snr = injections_per_snr if injections_per_snr > 1: # Make sure the depth attribute is updated. self._dict_depth = dictools.get_depth(self.strains) # Side-effect attributes updated. self.max_length = self._find_max_length() if self.Xtrain is not None: self._update_train_test_subsets()
def _inject(self, strain: np.ndarray, snr: int | float, pos: int = 0, **_) -> np.ndarray: """Inject 'strain' at 'snr' into noise using the 'self.noise' instance. NOTE: This is writen as an independent method to allow for other classes inheriting this to modify its behaviour without having to rewrite the entire 'gen_injections' method. Parameters ---------- strain : NDArray Signal to be injected into noise. snr : int | float Signal to noise ratio. pos : int, optional Index position in the noise array where to inject the signal. 0 by default. Returns ------- injected : NDArray Injected signal. """ injected, _ = self.noise.inject(strain, snr=snr, pos=pos) return injected
[docs] def export_strains_to_gwf(self, path: str, channel: str, # Name of the channel in which to save strains in the GWFs. t0_gps: float = 0, verbose=False) -> None: """Export all strains to GWF format, one file per strain.""" from pathlib import Path for indices in self.keys(): strain = self.get_strain(*indices) times = self.get_times(*indices) ts = TimeSeries( data=strain, times=t0_gps + times, channel=channel ) key = indices[1].replace(':', '_') + '_snr' + str(indices[2]) fields = [ self.detector, key, str(int(t0_gps)), str(int(ts.duration.value * 1000)) # In milliseconds ] file = Path(path) / ('-'.join(fields) + '.gwf') ts.write(file) if verbose: print("Strain exported to", file)
[docs] def whiten(self, verbose=False): """Whiten injected strains. Calling this method performs the whitening of all injected strains. Strains are later cut to their original size before adding the pad, to remove the vigneting. NOTE: This is an irreversible action; if the original injections need to be preserved it is advised to make a copy of the instance before performing the whitening. """ if self.whitened: raise RuntimeError("dataset already whitened") if self.whiten_params is None: raise RuntimeError("missing whitening parameters") if self.strains is None: raise RuntimeError("no injections have been performed yet") flength = self.whiten_params['flength'] asd_array = self.whiten_params['asd_array'] pad = self.whiten_params['pad'] unpad = self.whiten_params['unpad'] highpass = self.whiten_params['highpass'] loop_aux = tqdm(self.items(), total=len(self)) if verbose else self.items() for *keys, strain in loop_aux: snr = keys[2] # Shape of self.strains dict-> (class, id, snr[, rep]) strain_w = fat.whiten( strain, asd=asd_array, pad=pad, unpad=unpad[snr], sample_rate=self.sample_rate, highpass=highpass, flength=flength ) # Update strains attribute. dictools.set_value_to_nested_dict(self.strains, keys, strain_w) # Shrink time arrays accordingly. if self._track_times: clas_id_snr_layers = dictools.unroll_nested_dictionary_keys( self.times, max_depth=3 ) for clas_id_snr in clas_id_snr_layers: # Since all time arrays below SNR layer are the same, # get the first one, shrink it to its corresponding SNR-unpad, # and set it to all subsequent layers. snr = clas_id_snr[2] times_sublayer = self.get_times(*clas_id_snr) time = dictools.get_next_item(times_sublayer) time = tat.shrink_time_array(time, unpad[snr]) dictools.fill(times_sublayer, time, deepcopy=False) self.whitened = True # Side-effect attributes updated. self.max_length = self._find_max_length() if self.Xtrain is not None: self._update_train_test_subsets()
[docs] def get_xtrain_array(self, length: int = None, classes: str | list = 'all', snr: int | list | str = 'all', with_metadata: bool = False): """Get the train subset stacked in a zero-padded Numpy 2d-array. Stacks all signals in the train subset into an homogeneous numpy array whose length (axis=1) is determined by either 'length' or, if None, by the longest strain in the subset. The remaining space is zeroed. Allows the possibility to filter by class and SNR. NOTE: Same signals injected at different SNR are stacked continuously. Parameters ---------- length : int, optional Target length of the 'train_array'. If None, the longest signal determines the length. classes : str | list[str] Whitelist of classes to include in the stack. All classes are included by default. snr : int | list[int] | str Whitelist of SNR injections to include in the stack. If more than one are selected, they are stacked zipped as follows: ``` eos0 id0 snr0 eos0 id0 snr1 ... ``` All injections are included by default. with_metadata : bool If True, the associated metadata is returned in addition to the train array in a Pandas DataFrame instance. This metadata is obtained from the original 'metadata' attribute, with the former index inserted as the first column, 'id', and with an additional column for the SNR values. False by default. Returns ------- train_array : np.ndarray Train subset. lengths : list Original length of each strain, following the same order as the first axis of 'train_array'. metadata : pd.DataFrame, optional If 'with_metadata' is True, the associated metadata is returned with its entries in the same order as the 'train_array'. """ return self._stack_subset(self.Xtrain, length, classes, snr, with_metadata)
[docs] def get_xtest_array(self, length: int = None, classes: str | list = 'all', snr: int | list | str = 'all', with_metadata: bool = False): """Get the test subset stacked in a zero-padded Numpy 2d-array. Stacks all signals in the test subset into an homogeneous numpy array whose length (axis=1) is determined by either 'length' or, if None, by the longest strain in the subset. The remaining space is zeroed. Allows the possibility to filter by class and SNR. NOTE: Same signals injected at different SNR are stacked continuously. Parameters ---------- length : int, optional Target length of the 'test_array'. If None, the longest signal determines the length. classes : str | list[str] Whitelist of classes to include in the stack. All classes are included by default. snr : int | list[int] | str Whitelist of SNR injections to include in the stack. If more than one are selected, they are stacked zipped as follows: ``` eos0 id0 snr0 eos0 id0 snr1 ... ``` All injections are included by default. with_metadata : bool If True, the associated metadata is returned in addition to the test array in a Pandas DataFrame instance. This metadata is obtained from the original 'metadata' attribute, with the former index inserted as the first column, 'id', and with an additional column for the SNR values. False by default. Returns ------- test_array : np.ndarray Test subset. lengths : list Original length of each strain, following the same order as the first axis of 'test_array'. metadata : pd.DataFrame, optional If 'with_metadata' is True, the associated metadata is returned with its entries in the same order as the 'test_array'. """ return self._stack_subset(self.Xtest, length=length, classes=classes, snr=snr, with_metadata=with_metadata)
def _stack_subset(self, strains: dict, length: int = None, classes: str | list = 'all', snr: int | list | str = 'all', with_metadata: bool = False): """Stack 'strains' into a zero-padded 2d-array. This is a helper function for 'get_xtrain_array' and 'get_xtest_array'. Parameters ---------- strains : dict A dictionary containing the strains to be stacked. The keys of the first layer are the IDs of the strains. length : int, optional The target length of the stacked array. If None, the longest signal determines the length. classes : str | list[str] Whitelist of classes to include in the stack. All classes are included by default. snr : int | list[int] | str Whitelist of SNR injections to include in the stack. If more than one are selected, they are stacked zipped as follows: ``` eos0 id0 snr0 eos0 id0 snr1 ... ``` All injections are included by default. with_metadata : bool If True, the associated metadata is returned in addition to the stacked array in a Pandas DataFrame instance. This metadata is obtained from the original 'metadata' attribute, with the former index inserted as the first column, 'id', and with an additional column for the SNR values. False by default. Returns ------- stacked_signals : np.ndarray The array containing the stacked strains. lengths : list The original lengths of each strain, following the same order as the first axis of 'stacked_signals'. metadata : pd.DataFrame, optional If 'with_metadata' is True, the associated metadata is returned with its entries in the same order as the 'stacked_signals'. This metadata is obtained from the original 'metadata' attribute, with the former index inserted as the first column, 'id', and with an additional column for the SNR values. Raises ------ ValueError If the value of 'classes' or 'snr' is not valid. """ if isinstance(classes, (str, list)) and classes != 'all': if isinstance(classes, str): classes = [classes] # NOTE: Here there is no 'class' layer, therefore it must be # traced back from the ID, and filtered over this same layer. def filter_class(id): clas = self.find_class(id) return clas in classes strains = dictools.filter_nested_dict(strains, filter_class, layer=0) elif classes != 'all': raise TypeError("the type of 'classes' is not valid") if isinstance(snr, (int, list)): if isinstance(snr, int): snr = [snr] # NOTE: Here SNR is in Layer 1 because the Train/Test subset # dictionaries do not have the 'class' first layer. strains = dictools.filter_nested_dict(strains, lambda k: k in snr, layer=1) # If `snr == 'all'`, no filter is applied over 'strains'. elif isinstance(snr, str): if snr != 'all': raise ValueError("the value of 'snr' is not valid") else: pass else: raise TypeError("the type of 'snr' is not valid") strains = dictools.flatten_nested_dict(strains) stacked_signals, lengths = dictools.dict_to_stacked_array(strains, target_length=length) if with_metadata: id_list = [k[0] for k in strains] snr_list = [k[1] for k in strains] rep_list = [k[2] for k in strains] metadata = self.metadata.loc[id_list] # sorts and makes all necessary copies. metadata.reset_index(inplace=True, names='id') metadata.insert(1, 'snr', snr_list) # after 'id'. metadata.insert(2, 'rep', rep_list) # after 'snr'. return stacked_signals, lengths, metadata return stacked_signals, lengths
[docs] def get_ytrain_array(self, classes='all', snr='all', with_id=False, with_index=False): """Get the filtered training labels. Parameters ---------- classes : str | list[str] | 'all' Whitelist of classes to include in the labels. All classes are included by default. snr : int | list[int] | str Whitelist of SNR injections to include in the labels. All injections are included by default. with_id : bool If True, return also the related IDs. False by default. with_index : bool If True, return also the related GLOBAL indices w.r.t. the stacked arrays returned by 'get_xtrain_array' WITHOUT filters. False by default. Returns ------- np.ndarray Filtered train labels. np.ndarray, optional IDs associated to the filtered train labels. np.ndarray, optional Indices associated to the filtered train labels. """ return self._filter_labels( self.Ytrain, list(self.Xtrain), classes, snr, with_id=with_id, with_index=with_index )
[docs] def get_ytest_array(self, classes='all', snr='all', with_id=False, with_index=False): """Get the filtered test labels. Parameters ---------- classes : str | list[str] | 'all' Whitelist of classes to include in the labels. All classes are included by default. snr : int | list[int] | str Whitelist of SNR injections to include in the labels. All injections are included by default. with_id : bool If True, return also the related IDs. False by default. with_index : bool If True, return also the related GLOBAL indices w.r.t. the stacked arrays returned by 'get_xtest_array' WITHOUT filters. Returns ------- np.ndarray Filtered test labels. np.ndarray, optional IDs associated to the filtered test labels. np.ndarray, optional Indices associated to the filtered test labels. """ return self._filter_labels( self.Ytest, list(self.Xtest), classes, snr, with_id=with_id, with_index=with_index )
def _filter_labels(self, labels, labels_id, classes, snr, with_id=False, with_index=False): """Filter 'labels' based on 'classes' and 'snr'. This is a helper function for 'get_ytrain_array' and 'get_ytest_array'. Parameters ---------- labels : np.ndarray The array containing the labels. labels_id : list IDs associated to the labels. classes : str | list[str] | 'all' Whitelist of classes to include in the labels. All classes are included by default. snr : int | list[int] | str Whitelist of SNR injections to include in the labels. All injections are included by default. with_id : bool If True, return also the related IDs. False by default. with_index : bool If True, return also the related indices w.r.t. the stacked array returned by '_stack_subset' given the strains related to 'labels' WITHOUT filters. False by default. Returns ------- filtered_labels : np.ndarray Filtered labels. filtered_ids : np.ndarray, optional IDs associated to the filtered labels. filtered_indices : np.ndarray, optional Indices associated to the filtered labels. """ # Get labels and IDs filtered by 'classes'. filtered_labels, filtered_ids, filtered_indices = super()._filter_labels( labels, labels_id, classes, with_id=True, with_index=True ) if isinstance(snr, str): if snr != 'all': raise ValueError("only the str 'all' is allowed for 'snr'.") elif isinstance(snr, int): snr = [snr] elif not isinstance(snr, list): raise TypeError("the type of 'snr' is not valid") n_snr_total = len(self.snr_list) # Repeat all by the total number of SNR values. filtered_labels = np.repeat(filtered_labels, n_snr_total) filtered_ids = np.repeat(filtered_ids, n_snr_total) filtered_indices = np.repeat(filtered_indices, n_snr_total) n_filtered = len(filtered_labels) # Convert the indices to include the TOTAL number of SNR repetitions. for i in range(0, n_filtered, n_snr_total): i_old = filtered_indices[i] i_new0 = i_old * n_snr_total i_new1 = i_new0 + n_snr_total filtered_indices[i:i+n_snr_total] = np.arange(i_new0, i_new1) # Filter out those not present in the 'snr' list. if snr != 'all': mask = np.isin(self.snr_list, snr) mask = np.tile(mask, n_filtered//n_snr_total) filtered_labels = filtered_labels[mask] filtered_ids = filtered_ids[mask] filtered_indices = filtered_indices[mask] # Repeat labels and IDs by 'injections_per_snr', and extend the indices # accordingly. if self.injections_per_snr > 1: n_reps = self.injections_per_snr filtered_labels = np.repeat(filtered_labels, n_reps) filtered_ids = np.repeat(filtered_ids, n_reps) filtered_indices = np.repeat(filtered_indices, n_reps) # Convert the indices to also include the TOTAL number of # repetitions per SNR. for i in range(0, len(filtered_indices), n_reps): i_old = filtered_indices[i] i_new0 = i_old * n_reps i_new1 = i_new0 + n_reps filtered_indices[i:i+n_reps] = np.arange(i_new0, i_new1) if with_id and with_index: return filtered_labels, filtered_ids, filtered_indices if with_id: return filtered_labels, filtered_ids if with_index: return filtered_labels, filtered_indices return filtered_labels
[docs] def stack_by_id(self, id_list: list, length: int = None, snr_included: int | list[int] | str = 'all'): """Stack a subset of strains by ID into a zero-padded 2d-array. This may allow (for example) to group up strains by their original ID without leaking differnet injections (SNR) of the same strain into different splits. Parameters ---------- id_list : array-like The IDs of the strains to be stacked. length : int, optional The target length of the stacked array. If None, the longest signal determines the length. snr_included : int | list[int] | str, optional The SNR injections to include in the stack. If more than one are selected, they are stacked zipped as follows: ``` id0 snr0 id0 snr1 ... ``` All injections are included by default. Returns ------- stacked_signals : np.ndarray The array containing the stacked strains. lengths : list The original lengths of each strain, following the same order as the first axis of 'stacked_signals'. Notes ----- - Unlike in 'get_xtrain_array' and 'get_xtest_array', this method does not filter by 'classes' since it would be redundant, as IDs are unique. Raises ------ ValueError If the value of 'snr' is not valid. """ if not isinstance(id_list, list): raise TypeError("'id_list' must be a list of IDs.") # Collapse the Class layer. strains = {id: ds for sub_strains in self.strains.values() for id, ds in sub_strains.items()} # Filter out those not in the 'id_list'. strains = dictools.filter_nested_dict(strains, lambda k: k in id_list, layer=0) # Filter out those injections whose SNR isnot in the 'snr' list. if isinstance(snr_included, (int, list)): if isinstance(snr_included, int): snr_included = [snr_included] # NOTE: Here SNR is in Layer 1 because we collapsed the Class layer. strains = dictools.filter_nested_dict(strains, lambda k: k in snr_included, layer=1) elif snr_included != 'all': raise ValueError("the value of 'snr' is not valid") strains = dictools.flatten_nested_dict(strains) # keys: "(id, snr)" stacked_signals, lengths = dictools.dict_to_stacked_array(strains, target_length=length) return stacked_signals, lengths
[docs] class SyntheticWaves(Base): """Class for building synthetically generated wavforms and background noise. Part of the datasets for the CLAWDIA main paper. The classes are hardcoded: SG: Sine Gaussian, G: Gaussian, RD: Ring-Down. Attributes ---------- classes : dict Dict of strings and their integer labels, one per class (category). strains : dict {class: {key: gw_strains} } Strains stored as a nested dictionary, with each strain in an independent array to provide more flexibility with data of a wide range of lengths. The class key is the name of the class, a string which must exist in the 'classes' attribute. The 'key' is an identifier of each strain. In this case it's just the global index ranging from 0 to 'self.n_samples'. labels : NDArray[int] Indices of the classes, one per waveform. Each one points its respective waveform inside 'strains' to its class in 'classes'. The order is that of the index of 'self.metadata', and coincides with the order of the strains inside 'self.strains' if unrolled to a flat list of arrays. metadata : pandas.DataFrame All parameters and data related to the strains. The order is the same as inside 'strains' if unrolled to a flat list of strains. train_size : int | float If int, total number of samples to include in the train dataset. If float, fraction of the total samples to include in the train dataset. For more details see 'sklearn.model_selection.train_test_split' with the flag `stratified=True`. units : str Flag indicating whether the data is in 'geometrized' or 'IS' units. Xtrain, Xtest : dict {key: strain} Train and test subsets randomly split using SKLearn train_test_split function with stratified labels. The key corresponds to the strain's index at 'self.metadata'. Ytrain, Ytest : NDArray[int] 1D Array containing the labels in the same order as 'Xtrain' and 'Xtest' respectively. """
[docs] def __init__(self, *, classes: dict, n_waves_per_class: int, wave_parameters_limits: dict, max_length: int, peak_time_max_length: float, amp_threshold: float, tukey_alpha: float, sample_rate: int, random_seed: int = None): """ Parameters ---------- n_waves_per_class : int Number of waves per class to produce. wave_parameters_limits : dict Min/Max limits of the waveforms' parameters, 9 in total. Keys: - mf0, Mf0: min/Max central frequency (SG and RD). - mQ, MQ: min/Max quality factor (SG and RD). - mhrss, Mhrss: min/Max sum squared amplitude of the wave. - mT, MT: min/Max duration (only G). max_length : int Maximum length of the waves. This parameter is used to generate the initial time array with which the waveforms are computed. peak_time_max_length : float Time of the peak of the envelope of the waves in the initial time array (built with 'max_length'). amp_threshold : float Fraction w.r.t. the maximum absolute amplitude of the wave envelope below which to end the wave by shrinking the array and applying a windowing to the edges. tukey_alpha : float Alpha parameter (width) of the Tukey window applied to each wave to make sure their values end at the exact duration determined by either the duration parameter or the amplitude threshold. sample_rate : int random_seed : int, optional. """ self._check_classes_dict(classes) self.classes = classes self.n_waves_per_class = n_waves_per_class self.sample_rate = sample_rate self.wave_parameters_limits = wave_parameters_limits self.max_length = max_length self.peak_time_max_length = peak_time_max_length self.tukey_alpha = tukey_alpha self.amp_threshold = amp_threshold self.random_seed = random_seed self.rng = np.random.default_rng(random_seed) self._gen_metadata() self._track_times = False self._gen_dataset() self.labels = self._gen_labels() self.Xtrain = None self.Xtest = None self.Ytrain = None self.Ytest = None
def _gen_metadata(self): """Generate random metadata associated with each waveform.""" classes_list = [] f0s_list = [] Q_list = [] hrss_list = [] duration_list = [] # Will be modified afterwards to take into account # the amplitude threshold. for clas in self.classes: for _ in range(self.n_waves_per_class): # Need to pass 'self' explicitely since I'm calling the methods # inside a dictionary attribute. Python doesn't seem to # recognise them as the same class methods this way. f0, Q, hrss, duration = self._gen_parameters[clas](self) classes_list.append(clas) f0s_list.append(f0) Q_list.append(Q) hrss_list.append(hrss) duration_list.append(duration) self.metadata = pd.DataFrame({ 'Class': classes_list, # strings 'f0': f0s_list, 'Q': Q_list, 'hrss': hrss_list, 'duration': duration_list }) def _gen_dataset(self): """Generate the dataset from the previously generated metadata. After generating the waveforms with the analytical expressions it shrinks them to the specified duration in the metadata. This is necessary because the analytical expressions are infinite, so we apply a window to get perfect edges. However this does not necessary align with the exact duration provided by the metadata due to the signals being sampled at discrete values. Therefore after the windowing the final duration is computed again and updated in the metadata attribute. Attributes ---------- strains : dict[dict] Creates the strains attribute with the properties stated at the class' docstring. _dict_depth : int Number of nested layers in strains' dictionary. metadata : pd.DataFrame Updates the duration of the waveforms after shrinking them. """ if self.metadata is None: raise AttributeError("'metadata' needs to be generated first!") self.strains = self._init_strains_dict() t_max = (self.max_length - 1) / self.sample_rate times = np.linspace(0, t_max, self.max_length) for id in range(len(self.metadata)): params = self.metadata.loc[id].to_dict() clas = params['Class'] match clas: case 'SG': self.strains[clas][id] = synthetic.sine_gaussian_waveform( times, t0=self.peak_time_max_length, f0=self.metadata.at[id,'f0'], Q=self.metadata.at[id,'Q'], hrss=self.metadata.at[id,'hrss'] ) case 'G': self.strains[clas][id] = synthetic.gaussian_waveform( times, t0=self.peak_time_max_length, hrss=self.metadata.at[id,'hrss'], duration=self.metadata.at[id,'duration'], amp_threshold=self.amp_threshold ) case 'RD': self.strains[clas][id] = synthetic.ring_down_waveform( times, t0=self.peak_time_max_length, f0=self.metadata.at[id,'f0'], Q=self.metadata.at[id,'Q'], hrss=self.metadata.at[id,'hrss'] ) self._dict_depth = dictools.get_depth(self.strains) self._apply_threshold_windowing() def _random_log_uniform(self, min, max): """Returns a random number between [min, max] spaced logarithmically.""" exponent = self.rng.uniform(np.log10(min), np.log10(max)) random = 10**exponent return random def _random_log_int(self, min, max): """Returns a random integer between [min, max] spaced logarithmically.""" return int(self._random_log_uniform(min, max)) def _gen_parameters_sine_gaussian(self): """Generate random parameters for a single Sine Gaussian.""" limits = self.wave_parameters_limits thres = self.amp_threshold f0 = self._random_log_int(limits['mf0'], limits['Mf0']) # Central frequency Q = self._random_log_int(limits['mQ'], limits['MQ']+1) # Quality factor hrss = self._random_log_uniform(limits['mhrss'], limits['Mhrss']) duration = 2 * Q / (np.pi * f0) * np.sqrt(-np.log(thres)) return (f0, Q, hrss, duration) def _gen_parameters_gaussian(self): """Generate random parameters for a single Gaussian.""" lims = self.wave_parameters_limits f0 = None # Casted to np.nan afterwards. Q = None #-/ hrss = self._random_log_uniform(lims['mhrss'], lims['Mhrss']) duration = self._random_log_uniform(lims['mT'], lims['MT']) # Duration return (f0, Q, hrss, duration) def _gen_parameters_ring_down(self): """Generate random parameters for a single Ring-Down.""" lims = self.wave_parameters_limits thres = self.amp_threshold f0 = self._random_log_int(lims['mf0'], lims['Mf0']) # Central frequency Q = self._random_log_int(lims['mQ'], lims['MQ']+1) # Quality factor hrss = self._random_log_uniform(lims['mhrss'], lims['Mhrss']) duration = -np.sqrt(2) * Q / (np.pi * f0) * np.log(thres) return (f0, Q, hrss, duration) _gen_parameters = { 'SG': _gen_parameters_sine_gaussian, 'G': _gen_parameters_gaussian, 'RD': _gen_parameters_ring_down } def _apply_threshold_windowing(self): """Shrink waves in the dataset and update its duration in the metadata. Shrink them according to their pre-computed duration in the metadata to avoid almost-but-not-zero edges, and correct those marginal durations longer than the window. """ for i in range(len(self)): clas = self.metadata.at[i,'Class'] duration = self.metadata.at[i,'duration'] ref_length = int(duration * self.sample_rate) if clas == 'RD': # Ring-Down waves begin at the center. However we want to # emphasize their energetic beginning, therefore we will leave # a symmetric part before their start with zeros. i0 = self.max_length // 2 - ref_length i1 = i0 + 2*ref_length else: # SG and G are both centered. i0 = (self.max_length - ref_length) // 2 i1 = self.max_length - i0 new_lenght = i1 - i0 if i0 < 0: new_lenght += i0 i0 = 0 if i1 > self.max_length: new_lenght -= i1 - self.max_length i1 = self.max_length window = sp.signal.windows.tukey(new_lenght, alpha=self.tukey_alpha) # Shrink and window self.strains[clas][i] = self.strains[clas][i][i0:i1] * window self.metadata.at[i,'duration'] = new_lenght / self.sample_rate
[docs] class InjectedSyntheticWaves(BaseInjected): """TODO """
[docs] def __init__(self, clean_dataset: SyntheticWaves, *, psd: np.ndarray | Callable, detector: str, noise_length: int, freq_cutoff: int | float, freq_butter_order: int | float, random_seed: int): super().__init__( clean_dataset, psd=psd, detector=detector, noise_length=noise_length, freq_cutoff=freq_cutoff, freq_butter_order=freq_butter_order, random_seed=random_seed ) # Initialize the Train/Test subsets inheriting the indices of the input # clean dataset instance. if clean_dataset.Xtrain is not None: self.Xtrain = dictools._replicate_structure_nested_dict(clean_dataset.Xtrain) self.Xtest = dictools._replicate_structure_nested_dict(clean_dataset.Xtest) self.Ytrain = dictools._replicate_structure_nested_dict(clean_dataset.Ytrain) self.Ytest = dictools._replicate_structure_nested_dict(clean_dataset.Ytest) else: self.Xtrain = None self.Xtest = None self.Ytrain = None self.Ytest = None
[docs] class UnlabeledWaves(Base): """Dataset class for clean gravitational wave signals without labels. This class extends `Base`, modifying its behavior to handle datasets where gravitational wave signals are provided without associated labels. Unlike `Base`, it does not require a classification structure but retains methods for loading, storing, and managing waveform data. The dataset consists of nested dictionaries, storing each waveform in an independent array to accommodate variable lengths. Attributes ---------- strains : dict Dictionary of stored waveforms, indexed by unique identifiers. max_length : int Length of the longest waveform in the dataset. sample_rate : int, optional The constant sampling rate for the waveforms, if provided. Xtrain, Xtest : dict, optional Train and test subsets randomly split using `train_test_split`, if required. These are views into `strains`, without associated labels. Notes ----- - Unlike `Base`, this class does not track class labels. - Train/Test split is still supported but is not stratified. """
[docs] def __init__(self, strains_array, strain_limits=None, sample_rate=None, random_seed=None): """Initialize an UnlabeledWaves dataset. This constructor processes a NumPy array of gravitational wave signals, storing them in a structured dictionary while optionally discarding unnecessary zero-padding. Unlike `Base`, this class does not require labeled categories or metadata but retains support for dataset splitting and signal management. Parameters ---------- strains_array : np.ndarray A 2D array containing gravitational wave signals, where each row represents a separate waveform, possibly zero-padded. strain_limits : list[tuple[int, int]] | None, optional A list of (start, end) indices defining the valid range for each waveform in `strains_array`. If None, waveforms are assumed to contain no unnecessary padding. sample_rate : int, optional The assumed constant sampling rate for the waveforms. If None, time tracking is disabled. random_seed : int, optional Seed for random operations such as dataset splitting, ensuring reproducibility. Notes ----- - A dummy class label ('unique': 1) is assigned for compatibility. - Metadata is omitted in this class. - The dataset structure supports train/test splitting, but labels are not relevant. """ self.classes = {'unique': 1} # Dummy class. self.strains = self._unpack_strains(strains_array, strain_limits) self.labels = self._gen_labels() # Dummy labels. # self.metadata: pd.DataFrame = None # OMMITED IN THIS CLASS # Number of nested layers in strains' dictionary. Keep updated always: self._dict_depth: int = dictools.get_depth(self.strains) self.max_length = self._find_max_length() self.random_seed = random_seed # SKlearn train_test_split doesn't accept a Generator yet. self._track_times = False # If True, self.times must be not None. # Whitening related attributes. self.whitened = False self.whiten_params = {} self.nonwhiten_strains = None # Time tracking related attributes. self.sample_rate = sample_rate self.times: dict = None # Train/Test subset splits (views into the same 'self.strains'). # Timeseries: self.Xtrain: np.ndarray = None self.Xtest: np.ndarray = None # Labels: self.Ytrain: np.ndarray = None self.Ytest: np.ndarray = None # Indices (sorted as in train and test splits respectively): self.id_train: np.ndarray = None self.id_test: np.ndarray = None
def _unpack_strains(self, strain_array: np.ndarray, strain_limits: np.ndarray = None) -> dict: num_signals = strain_array.shape[0] if strain_limits is None: extracted_signals = {i: strain_array[i, :] for i in range(num_signals)} elif strain_limits.shape == (2,): start, end = strain_limits extracted_signals = {i: strain_array[i, start:end] for i in range(num_signals)} elif strain_limits.shape == (num_signals, 2): extracted_signals = {i: strain_array[i, start:end] for i, (start, end) in enumerate(strain_limits)} else: raise ValueError("Invalid shape for strain_limits. Must be None, (2,), or (N,2).") # Add the outer (class) layer: class_name = next(iter(self.classes)) # Get the first class name return {class_name: extracted_signals}
[docs] class CoReWaves(Base): """Manage all operations needed to perform over a noiseless CoRe dataset. Initial strains and metadata are obtained from a CoReManager instance. NOTE: This class treats as different classes (categories) each equation of state (EOS) present in the CoReManager instance. NOTE^2: This class adds a time attribute with time samples related to each GW. Workflow: - Load the strains from a CoreWaEasy instance, discarding or cropping those indicated with their respective arguments. - Resample. - Project onto the ET detector arms. - Change units and scale from geometrized to IS and vice versa. - Export the (latest version of) dataset to a HDF5. - Export the (latest version of) dataset to a GWF. Attributes ---------- classes : dict Dict of strings and their integer labels, one per class (category). The keys are the name of the Equation of State (EOS) used to describe the physics behind the simulation which produced each strain. strains : dict {class: {id: gw_strain} } Strains stored as a nested dictionary, with each strain in an independent array to provide more flexibility with data of a wide range of lengths. The class key is the name of the class, a string which must exist in the 'classes' list. The 'id' is an unique identifier for each strain, and must exist in the `self.metadata.index` column of the metadata DataFrame. Initially, an extra depth layer is defined to store the polarizations of the CoRe GW simulated data. After the projection this layer will be collapsed to a single strain. times : dict {class: {id: gw_time_points} } Time samples associated with the strains, following the same structure. Useful when the sampling rate is variable or different between strains. metadata : pandas.DataFrame All parameters and data related to the strains. The order is the same as inside 'strains' if unrolled to a flat list of strains up to the second depth level (the id.). Example: ``` metadata[eos][key] = { 'id': str, 'mass': float, 'mass_ratio': float, 'eccentricity': float, 'mass_starA': float, 'mass_starB': float, 'spin_starA': float, 'spin_starB': float } ``` units : str Flag indicating whether the data is in 'geometrized' or 'IS' units. sample_rate : int, optional Initially this attribute is None because the initial GW from CoRe are sampled at different and non-constant sampling rates. After the resampling, this attribute will be set to the new global sampling rate. Caveat: If the 'times' attribute is present, this value is ignored. Otherwise it is assumed all strains are constantly sampled to this. """
[docs] def __init__(self, *, coredb: ioo.CoReManager, classes: dict[str], discarded: set, cropped: dict, # Source: distance: float, inclination: float, phi: float): """Initialize a CoReWaves dataset. TODO Parameters ---------- coredb : ioo.CoReManager Instance of CoReManager with the actual data. classes : dict[str] Dictionary with the Equation of State (class) name as key and the corresponding label index as value. discarded : set[str] Set of GW IDs to discard from the dataset. cropped : dict[str] Dictionary with the class name as key and the corresponding cropping range as value. The range is given as a tuple of the form (start_index, stop_index). distance : float Distance to the source in Mpc. inclination : float Inclination of the source in radians. phi : float Azimuthal angle of the source in radians. """ if not isinstance(coredb, ioo.CoReManager): raise TypeError("Expected 'coredb' to be an instance of CoReManager.") self._check_classes_dict(classes) self.classes = classes self.discarded = discarded self.cropped = cropped # Source parameters self.distance = distance self.inclination = inclination self.phi = phi self.units = 'IS' self.strains, self.times, self.metadata = self._get_strain_and_metadata(coredb) self._track_times = True self._dict_depth = dictools.get_depth(self.strains) self.labels = self._gen_labels() self.max_length = self._find_max_length() self.sample_rate = None # Set up after resampling self.random_seed = None # Set if calling the 'build_train_test_subsets' method. self.whitened = False self.whiten_params = {} self.nonwhiten_strains = None # Train/Test subset splits (views into the same 'self.strains'). # Timeseries: self.Xtrain: np.ndarray = None self.Xtest: np.ndarray = None # Labels: self.Ytrain: np.ndarray = None self.Ytest: np.ndarray = None
def _get_strain_and_metadata(self, coredb: ioo.CoReManager) -> tuple[dict, dict, pd.DataFrame]: """Obtain the strain and metadata from a CoReManager instance. The strains are the Pluss and Cross polarizations obtained from the direct output of numerical relativistic simulations. They are expected to be projected at the detector afterwards, collapsing the polarization layer to a single strain per GW. Returns ------- strains : dict{eos: {id: {pol: strain} } } times : dict{'eos': {'id': {pol: time_samples}} } Time samples associated to each GW. Since it has to follow the same nested structure as 'strains', but the time samples are the same among polarizations, for each GW both polarizations point to the same array in memory. metadata : pandas.DataFrame All parameters and data related to the strains. The order is the same as inside 'strains' if unrolled to a flat list of strains up to the second depth level (the id.). """ strains = self._init_strains_dict() times = self._init_strains_dict() # Metadata columns/keys: index: list[str] = [] mass: list[float] = [] mass_ratio: list[float] = [] eccentricity: list[float] = [] lambda_tidal: list[float] = [] # Tidal deformability mass_starA: list[float] = [] mass_starB: list[float] = [] spin_starA: list[float] = [] spin_starB: list[float] = [] merger_pos: list[int] = [] # Index position of the merger inside the array. for eos in self.classes: # Get and filter out GW simulations. ids = set(coredb.filter_by('id_eos', eos).index) try: ids -= self.discarded[eos] except KeyError: pass # No discards. ids = sorted(ids) # IMPORTANT!!! Keep order to be able to trace back simulations. for id_ in ids: # CoRe Rh data (in IS units): times_, h_plus, h_cros = coredb.gen_strain( id_, self.distance, self.inclination, self.phi ) # Crop those indicated at the parameter file, and leave whole # the rest. try: t0, t1 = self.cropped[eos][id_] except KeyError: crop = slice(None) else: crop = slice( np.argmin(np.abs(times_-t0)), np.argmin(np.abs(times_-t1)) ) strains[eos][id_] = { 'plus': h_plus[crop], 'cross': h_cros[crop] } # Both polarizations have the same sampling times, hence we # point each time polarization to the same array in memory. times[eos][id_] = {} times[eos][id_]['plus'] = times[eos][id_]['cross'] = times_[crop] # The time is centered at the merger. i_merger = tat.find_time_origin(times_[crop]) # Associated metadata: md = coredb.metadata.loc[id_] index.append(md['database_key']) mass.append(md['id_mass']) mass_ratio.append(md['id_mass_ratio']) eccentricity.append(md['id_eccentricity']) lambda_tidal.append(md['id_Lambda']) mass_starA.append(md['id_mass_starA']) mass_starB.append(md['id_mass_starB']) spin_starA.append(md['id_spin_starA']) spin_starB.append(md['id_spin_starB']) merger_pos.append(i_merger) metadata = pd.DataFrame( data=dict( mass=mass, mass_ratio=mass_ratio, eccentricity=eccentricity, lambda_tidal=lambda_tidal, mass_starA=mass_starA, mass_starB=mass_starB, spin_starA=spin_starA, spin_starB=spin_starB, merger_pos=merger_pos ), index=index ) return strains, times, metadata
[docs] def find_merger(self, strain: np.ndarray) -> int: return tat.find_merger(strain)
def _update_merger_positions(self): """Update all 'merger_pos' tags inside the metadata attribute. Time arrays are defined with the origin at the merger. When the length of the strain arrays is modified, the index position of the merger must be updated. NOTE: This method updates ALL the merger positions. """ for clas, id_ in self.keys(max_depth=2): times = self.times[clas][id_] # If more layers are present, only get the first instance of times # since all will be the same. if isinstance(times, dict): times = dictools.get_next_item(times) self.metadata.at[id_,'merger_pos'] = tat.find_time_origin(times)
[docs] def resample(self, sample_rate, verbose=False) -> None: """Resample strain and time arrays to a constant rate. Resample CoRe strains (from NR simulations) to a constant rate. This method updates the sample_rate, the max_length and the merger_pos inside the metadata attribute. Parameters ---------- sample_rate : int The new sampling rate in Hz. verbose : bool If True, print information about the resampling. """ super().resample(sample_rate, verbose) # Update side-effect attributes. self._update_merger_positions() if self.Xtrain is not None: self._update_train_test_subsets()
[docs] def project(self, *, detector: str, ra: float, dec: float, geo_time: float, psi: float): """Project strains into the chosen detector at specified coordinates. Project strains into the chosen detector at specified coordinates, using Bilby. This collapses the polarization layer in 'strains' and 'times' to a single strain. The times are rebuilt taking as a reference point the merger (t = 0). Parameters ---------- detector : str Name of the ET arm in Bilby for InterferometerList(). ra, dec : float Sky position in equatorial coordinates. geo_time : int | float Time of injection in GPS. psi : float Polarization angle. Caveats ------- - The detector's name must exist in Bilby's InterferometerList(). - Only one arm can be chosen. """ project_pars = dict(ra=ra, dec=dec, geocent_time=geo_time, psi=psi) for clas, id_ in self.keys(max_depth=2): hp = self.strains[clas][id_]['plus'] hc = self.strains[clas][id_]['cross'] # Drop the polarization layer. strain = detectors.project( hp, hc, parameters=project_pars, sf=self.sample_rate, nfft=2*self.sample_rate, detector=detector ) self.strains[clas][id_] = strain # Regenerate the time array with the merger located at the origin. duration = len(strain) / self.sample_rate t_merger = self.find_merger(strain) / self.sample_rate t0 = -t_merger t1 = duration - t_merger self.times[clas][id_] = tat.gen_time_array(t0, t1, self.sample_rate) # Update side-effect attributes self._dict_depth = dictools.get_depth(self.strains) self._update_merger_positions() self.max_length = self._find_max_length() if self.Xtrain is not None: self._update_train_test_subsets()
[docs] def shrink_to_merger(self, offset: int = 0) -> None: """Shrink strains and time arrays w.r.t. the merger. Shrink strains (and their associated time arrays) discarding the left side of the merger (inspiral), with a given offset in samples. This also updates the metadata column 'merger_pos'. NOTE: This is an irreversible action. Parameters ---------- offset : int Offset in samples relative to the merger position. """ limits = {} for clas, id, *keys in self.keys(): i_merger = self.metadata.at[id, 'merger_pos'] # Same shrinking limits for all possible strains below ID layer. limits[id] = (i_merger+offset, -1) self.shrink_strains(limits) # Update side-effect attributes. self._update_merger_positions() if self.Xtrain is not None: self._update_train_test_subsets()
[docs] def convert_to_IS_units(self) -> None: """Convert data from scaled geometrized units to IS units. Convert strains and times from geometrized units (scaled to the mass of the system and the source distance) to IS units. Will raise an error if the data is already in IS units. """ if self.units == 'IS': raise RuntimeError("data already in IS units") for keys in self.keys(): id_ = keys[1] mass = self.metadata.at[id_,'mass'] strain = self.get_strain(*keys) times = self.get_times(*keys) strain *= mass * MSUN_MET / (self.distance * MPC_MET) times *= mass * MSUN_SEC self.units = 'IS' # Update side-effect attributes. if self.Xtrain is not None: self._update_train_test_subsets()
[docs] def convert_to_scaled_geometrized_units(self) -> None: """Convert data from IS to scaled geometrized units. Convert strains and times from IS to geometrized units, and scaled to the mass of the system and the source distance. Will raise an error if the data is already in geometrized units. """ if self.units == 'geometrized': raise RuntimeError("data already in geometrized units") for keys in self.keys(): id_ = keys[1] mass = self.metadata.at[id_,'mass'] strain = self.get_strain(*keys) times = self.get_times(*keys) strain /= mass * MSUN_MET / (self.distance * MPC_MET) times /= mass * MSUN_SEC self.units = 'geometrized' # Update side-effect attributes. if self.Xtrain is not None: self._update_train_test_subsets()
[docs] class InjectedCoReWaves(BaseInjected): """Manage injections of GW data from CoRe dataset. - Tracks index position of the merger. - Computes the SNR only at the ring-down starting from the merger. - Computes also the usual SNR over the whole signal and stores it for later reference (attr. 'whole_snr_list'). Attributes ---------- snr_list : list Partial SNR values at which each signal is injected. This SNR is computed ONLY over the Ring-Down section of the waveform starting from the merger, hence the name 'partial SNR'. whole_snr : dict Nested dictionary storing for each injection the equivalent SNR value computed over the whole signal, hence the name 'whole SNR'. Structure: {id_: {partial_snr: whole_snr}} TODO """
[docs] def __init__(self, clean_dataset: Base, *, psd: np.ndarray | Callable, detector: str, noise_length: int, whiten_params: dict, freq_cutoff: int | float, freq_butter_order: int | float, random_seed: int): """ Initializes an instance of the InjectedCoReWaves class. Parameters ---------- clean_dataset : Base An instance of a BaseDataset class with noiseless signals. psd : np.ndarray | Callable Power Spectral Density of the detector's sensitivity in the range of frequencies of interest. Can be given as a callable function whose argument is expected to be an array of frequencies, or as a 2d-array with shape (2, psd_length) so that ``` psd[0] = frequency_samples psd[1] = psd_samples. ``` NOTE: It is also used to compute the 'asd' attribute (ASD). detector : str GW detector name. noise_length : int Length of the background noise array to be generated for later use. It should be at least longer than the longest signal expected to be injected. whiten_params : dict Parameters to be passed to the 'whiten' method of the 'BaseInjected' class. freq_cutoff : int | float Frequency cutoff for the filter applied to the signal. freq_butter_order : int | float Order of the Butterworth filter applied to the signal. random_seed : int Random seed for generating random numbers. """ super().__init__( clean_dataset, psd=psd, detector=detector, noise_length=noise_length, whiten_params=whiten_params, freq_cutoff=freq_cutoff, freq_butter_order=freq_butter_order, random_seed=random_seed ) self.whole_snr = {id_: {} for id_ in self.labels}
def _update_merger_positions(self): """Update all 'merger_pos' tags inside the metadata attribute. Time arrays are defined with the origin at the merger. When the length of the strain arrays is modified, the index position of the merger must be updated. NOTE: This method updates ALL the merger positions. """ for clas, id_ in self.keys(max_depth=2): # Same time array for all SNR variations. times = dictools.get_next_item(self.times[clas][id_]) self.metadata.at[id_,'merger_pos'] = tat.find_time_origin(times)
[docs] def gen_injections(self, snr: int|float|list, pad: int = 0, randomize_noise: bool = False, random_seed: int = None, injections_per_snr: int = 1, verbose=False): """Inject all strains in simulated noise with the given SNR values. See 'BaseInjected.gen_injections' for more details. Parameters ---------- snr : int | float | list pad : int Number of zeros to pad the signal at both ends before the injection. randomize_noise : bool If True, the noise segment is randomly chosen before the injection. This can be used to avoid having the same noise injected for all clean strains. False by default. NOTE: To avoid the possibility of repeating the same noise section in different injections, the noise realization must be reasonably large, e.g: `noise_length > n_clean_strains * self.max_length * len(snr)` random_seed : int, optional Random seed for the noise realization. Only used when randomize_noise is True. injections_per_snr : int Number of injections per SNR value. 1 by default. Notes ----- - If whitening is intended to be applied afterwards it is useful to pad the signal in order to avoid the window vignetting produced by the whitening itself. This pad will be cropped afterwards. - New injections are stored in the 'strains' atrribute, with the pad associated to all the injections performed at once. Even when whitening is also performed right after the injections. Raises ------ ValueError Once injections have been performed at a certain SNR value, there cannot be injected again at the same value. Trying it will trigger this exception. """ super().gen_injections( snr, pad=pad, randomize_noise=randomize_noise, random_seed=random_seed, injections_per_snr=injections_per_snr, verbose=verbose ) self._update_merger_positions()
def _inject(self, strain: np.ndarray, snr: int | float, *, id: str, snr_offset: int, pos: int = 0) -> np.ndarray: """Inject a strain at 'snr' into noise using 'self.noise' instance. Parameters ---------- strain : NDArray Signal to be injected into noise. snr : int | float Targeted signal-to-noise ratio. id : str Signal identifier (2nd layer of 'strains' dict). snr_offset : int Offset (w.r.t. the merger) added to the start of the range for computing the SNR. pos : int, optional Index position in the noise array where to inject the signal. 0 by default. Returns ------- injected : NDArray Injected signal. NOTES ----- - The SNR is computed over the Post-Merger only. - The metadata is expected to reflect the original state of the strains previous to any padding performed right before calling this function, which may be done to avoid the vignette effect. """ clas = self.find_class(id) merger_pos = self.metadata.at[id,'merger_pos'] original_length = len(self.strains_clean[clas][id]) i0 = merger_pos + snr_offset i1 = (original_length - merger_pos) + snr_offset injected, scale = self.noise.inject(strain, snr=snr, snr_lim=(i0, i1), pos=pos) # Compute the equivalent SNR over the entire waveform. self.whole_snr[id][snr] = self.noise.snr(strain*scale) return injected
[docs] def whiten(self, verbose=False): """Whiten injected strains. Calling this method performs the whitening of all injected strains. Strains are later cut to their original size before adding the pad, to remove the vigneting. NOTE: This is an irreversible action; if the original injections need to be preserved it is advised to make a copy of the instance before performing the whitening. """ super().whiten(verbose=verbose) self._update_merger_positions()