Source code for gwadama.ioo

"""Input & Output Operations

Everything related to input data retrieval and output files.

"""
from pathlib import Path
from shutil import rmtree

import h5py
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

from .units import *


# Note: 'watpy' is an optional dependency and is imported lazily inside CoReManager.
# This allows users to use CLAWDIA without requiring 'watpy' unless they need CoReManager.




[docs] class CoReManager: """Manage easily usual tasks of Watpy's CoRe_db instances. Ad-hoc manager to automate and make easier usual tasks in Watpy, including metadata and strain downloads, and spatial projection. ATTRIBUTES ---------- cdb : watpy.coredb.coredb.CoRe_db Instance of CoRe_db from which everything is managed. db_path : Path Folder where is or will be stored CoRe database. Refer to 'watpy.coredb.coredb.CoRe_db' for more details. eos : set All EOS found available. metadata : DataFrame Metadata from all simulations in 'cdb' collected in a single DF. downloaded : dict 3-Level dictionary containing the path to existing strains (saved as TXT files), their eccentricity and radius of extraction. Tree-format: txt_files[simkey][run] = { 'file': 'path/to/file.txt', 'eccentricity': ecc, 'r_extraction': rext } NOTE: ONLY KEEPS 1 RUN PER SIMULATION FOR NOW """ fields_float = [ 'id_mass', 'id_rest_mass', 'id_mass_ratio', 'id_ADM_mass', 'id_ADM_angularmomentum', 'id_gw_frequency_Hz', 'id_gw_frequency_Momega22', 'id_kappa2T', 'id_Lambda', 'id_eccentricity', 'id_mass_starA', 'id_rest_mass_starA', 'id_mass_starB', 'id_rest_mass_starB' ] header_gw_txt = "u/M:0 Reh/M:1 Imh/M:2 Redh/M:3 Imdh/M:4 Momega:5 A/M:6 phi:7 t:8"
[docs] def __init__(self, db_path): """Initialize the manager. PARAMETERS ---------- db_path : str Folder where is or will be stored CoRe database. Refer to 'watpy.coredb.coredb.CoRe_db' for more details. """ try: import watpy # Lazy import except ImportError: raise ImportError( "The 'watpy' package is required for CoReManager but is not installed." ) self.db_path = Path(db_path) self.cdb = watpy.coredb.coredb.CoRe_db(db_path) self.metadata = self._gen_metadata_dataframe() self.eos = set(self.metadata['id_eos']) self.downloaded = self._look_for_existing_strains()
def __repr__(self): return str(self.metadata) def __getitem__(self, i): return self.metadata[i] def __len__(self): return len(self.metadata)
[docs] def show(self, key, to_float=False, to_file=None): return self.cdb.idb.show(key, to_float=False, to_file=None)
[docs] def filter_by(self, key, value): return self.metadata[self.metadata[key] == value]
[docs] def filter_multiple(self, filters): md = self.metadata.copy() for k, v in filters: md = md[md[k] == v] return md
[docs] def count_runs(self, filters=[]): """Count total number of runs in the database. Parameters ---------- filters : list of lists Format: [[key0, value0], [key1, value1], ...] """ md = self.filter_multiple(filters) counts = 0 for ind in md.index: runs = md.loc[ind].available_runs if runs is not None: counts += len(runs.split(', ')) return counts
[docs] def download_mode22(self, simkeys, keep_h5=False, overwrite=False, prot='https', lfs=False, verbose=True): """Download ONLY the optimum strains Rh_22. Downloads each simulation, keeps the strains with the lowest eccentricity and highest extraction point 'r' in a TXT file, updates the database 'self.downloaded', and (optional) removes the original HDF5 file from CoRe to free up space. Parameters ---------- simkeys : list List of simulation keys ('db_keys' in watpy) to download. keep_h5 : bool If False (default) removes the HDF5 file downloaded by watpy. overwrite : bool If False (default) and a certain simulation in 'simkeys' is already present in 'self.downloaded', skip it. Otherwise downloads everything again. verbose : bool If True (default), print which simulations are downloaded and which are skipped. prot, lfs : Refer to 'watpy.coredb.coredb.CoRe_db.sync'. """ for skey in tqdm(simkeys): if (not overwrite) and (skey in self.downloaded): if verbose: print(f"{skey} already downloaded, skipping.") continue self.cdb.sync(dbkeys=[skey], prot=prot, lfs=lfs, verbose=False) # Get the gw data with lowest eccentricity and at the highest 'r' # of extraction. runkey, ecc = self.get_runkey_lowest_eccentricity(skey) run = self.cdb.sim[skey].run[runkey] data = run.data.read_dset()['rh_22'] rext_key, rext = self._get_highest_r_extraction_(data) gw_data = data[rext_key] # Save gw data as TXT. ofile = Path(run.path) / f'Rh_l2_m2_r{rext:05d}.txt' np.savetxt(ofile, gw_data, header=self.header_gw_txt) # Update download database. self.downloaded[skey] = {} # ONLY 1 RUN PER SIM FOR NOW self.downloaded[skey][runkey] = { 'file': ofile, 'eccentricity': ecc, 'r_extraction': rext } if not keep_h5: self._clean_h5_data(skey) if verbose: print(f"{skey} downloaded.")
[docs] def load_sim(self, skey): """Load a previously downloaded gw simulation.""" meta_sim = self.downloaded[skey] file = next(iter(meta_sim.values()))['file'] gw_data = np.loadtxt(file) return gw_data
[docs] @staticmethod def sw_Y22(i, phi): """Spin-weighted spherical harmonic mode lm = 22. Ref: Ajith et al., 2011 Parameters: ----------- i : float Inclination angle from the z-axis. phi : float Phase angle. """ return np.sqrt(5/(64*np.pi)) * (1 + np.cos(i))**2 * np.exp(2j*phi)
[docs] def gen_strain(self, skey, distance, inclination, phi): """Build strain from time-domain mode 22 in mass rescaled, geom. units. Parameters: ----------- skey : str Key (database_key) of the simulation. distance : float Distance to the source in Mpc. inclination, phi: float Angle positions. Returns: -------- time : ndarray(float) Time points in seconds. hplus, hcross : ndarray(float) Polarizations of the rescaled strain. """ gw_data = self.load_sim(skey) mass = self.metadata.id_mass.loc[skey] u_M = gw_data[:,0] Rh = gw_data[:,1] - 1j*gw_data[:,2] # Convert time. time = u_M * mass * MSUN_SEC # Genearte Strain polarizations. sY22 = self.sw_Y22(phi, inclination) amplitude_prefactor = mass * MSUN_MET / (distance * MPC_MET) h = amplitude_prefactor * Rh * sY22 hplus = h.real hcross = -h.imag return time, hplus, hcross
[docs] def get_runkey_lowest_eccentricity(self, skey): """Find the run with the lowest eccentricity for a given simulation. Return the key of the run and the value of its eccentricity for which this parameter is the lowest among all runs of the 'skey' simulation. If a simulation has multiple runs with the same eccentricity (typically all values set to 0 or NAN) it will pick the first run in the list. If there are one or more runs with eccentricity = NAN, the first one will be returned. Parameters ---------- skey : str Key (database_key) of the simulation. Returns ------- run_key : str Key of the run. ecc : float Eccentricity of the run. """ runs = self.cdb.sim[skey].run.copy() run_key = 'R01' ecc = self.cast_to_float(runs.pop('R01').md.data['id_eccentricity']) if np.isnan(ecc): return run_key, ecc # Look for the minimum eccentricity, or the first one to be NAN. for rkey, run in runs.items(): ecc_i = self.cast_to_float(run.md.data['id_eccentricity']) if ecc_i < ecc: run_key = rkey ecc = ecc_i elif np.isnan(ecc_i): run_key = rkey ecc = ecc_i break return run_key, ecc
[docs] @staticmethod def cast_to_float(string): """Cast a string to float, considering '' also a NaN.""" if string in ['', None]: n = np.nan else: n = float(string) return n
def _clean_h5_data(self, skey): """Remove HDF5 files from a downloaded 'skey' simulation. Parameters ---------- skey : str Simulation key. """ root = Path(self.cdb.sim[skey].path) # Remove HDF5 files. files = root.glob('*/*.h5') for file in files: file.unlink() # Remove .git folder. folder = root / '.git' rmtree(folder) def _gen_metadata_dataframe(self): idb = self.cdb.idb key_list = idb.dbkeys metalist = [core_md.data for core_md in idb.index] md = pd.DataFrame(metalist, index=key_list) # Convert data types of the selected columns: for field in self.fields_float: mask = (md[field] == 'NAN') | (md[field] == '') md[field].values[mask] = np.nan md[field] = md[field].astype(float) return md def _get_highest_r_extraction_(self, extractions): """Return the key and value of 'r' of the gw with the highest 'r'. It also includes the case when the value of 'r' in the data is Inf instead of a number. Parameters ---------- extractions : dict Channel 'rh_22' returned by CoRe_h5.read_dset(). """ rext_key = max(extractions.keys()) if 'Inf' in rext_key: rext = 99999 # Highest number with 5 figures, to represent infinity. else: rext = int(rext_key[-9:-4]) return rext_key, rext def _look_for_existing_strains(self): """Strains that were already extracted from CoRe's HDF5 files. Save their paths, alongside the eccentricity and radius of extraction, in a dictionary tree by simulation key and run. Returns ------- txt_files : dict 3-Level dictionary containing the path to existing strains (extracted as TXT files), their eccentricity and radius of extraction. Tree-format: txt_files[simkey][run] = { 'file': 'path/to/file.txt', 'eccentricity': ecc, 'r_extraction': rext } """ txt_files = {} for file in self.db_path.rglob('Rh*.txt'): key = file.parts[-3].replace('_', ':') run = file.parts[-2] ecc = self._read_eccentricity(file.parent/'metadata.txt') rext = int(file.stem[-5:]) if key not in txt_files: txt_files[key] = {} # If there are multiple files of the same sim and run, only keep # the highest extraction point available. elif run in txt_files[key]: r0 = int(txt_files[key][run]['file'].stem[-5:]) if rext < r0: continue txt_files[key][run] = { 'file': file, 'eccentricity': ecc, 'r_extraction': rext } return txt_files def _read_eccentricity(self, file): """Get the value of eccentricity from a metadata file.""" with open(file) as f: for line in f: if 'id_eccentricity' in line: break else: raise EOFError("no value of eccentricity found in the file") ecc_txt = line.split()[-1] try: ecc = float(ecc_txt) except ValueError: ecc = np.nan return ecc
[docs] def save_to_hdf5(file: str, *, data: dict, metadata: dict) -> None: """Save nested dictionary of NumPy arrays to HDF5 file with metadata. PARAMETERS ---------- file: str Path to the HDF5 file. data: dict Nested dictionary containing NumPy arrays. metadata: dict Nested dictionary containing metadata for arrays. EXAMPLE ------- ``` data = { 'group1': { 'array1': np.array([1, 2, 3]), 'array2': np.array([4, 5, 6]) }, 'group2': { 'array3': np.array([7, 8, 9]), 'array4': np.array([10, 11, 12]) } } metadata = { 'group1': { 'array1': {'description': 'Example data'}, 'array2': {'description': 'Another example'} }, 'group2': { 'array3': {'unit': 'm/s^2'}, 'array4': {'unit': 'kg'} } } save_to_hdf5('output.h5', data=data, metadata=metadata) ``` """ with h5py.File(file, 'w') as hf: _save_to_hdf5_recursive( h5_group=hf, data=data, metadata=metadata )
def _save_to_hdf5_recursive(*, h5_group: h5py.Group, data: dict, metadata: dict) -> None: """Save nested dictionary of NumPy arrays to HDF5 group with metadata. Save nested dictionary of NumPy arrays to HDF5 group with metadata formatted in the same way as the data. PARAMETERS ---------- h5_group: h5py.Group H5PY Group into which store the data and optional metadata. data_dict: dict Nested dictionary containing NumPy arrays. metadata_dict: dict Nested dictionary containing metadata for arrays. """ for key, value in data.items(): if isinstance(value, dict): subgroup = h5_group.create_group(key) _save_to_hdf5_recursive( data=value, metadata=metadata[key], h5_group=subgroup ) else: dataset = h5_group.create_dataset(key, data=value) for metadata_key, metadata_value in metadata[key].items(): dataset.attrs[metadata_key] = metadata_value
[docs] def save_experiment_results(file: str, *, parameters: dict, results: dict, sep=';', start_index: int = 0, return_dataframe=False): """Save (update) experiment results to a CSV using pandas.DataFrame. Parameters and results are saved as columns in a Pandas DataFrame, in a new row of an existing CSV file, or as the first row if the file does not exists. Each row represents an experiment. Parameters' keys may change depending on the experiment; when adding a new key or omitting one, the corresponding place in the CSV file will be filled with NaN. Results are inserted at the beginning of the DataFrame's columns, and their keys must not change between experiments. If there is a complex parameter structure, it is recommended to provide a flattened version with new and simpler keys. Parameters ---------- file : str Path to the CSV file. If the file already exists, loads it and appends the experiment data to the end of the DataFrame, and updates the file. If the file does not exist, creates it. parameters : dict Experiment settings and hyper-parameters. They must be registered in a flat dictionary. results : dict Experiment results, tipically the LOSS overall statistics. For example: ``` results = { 'loss max': 0.3, 'loss_mean': 0.1, 'loss_median': 0.08, 'loss_min': 0.001 } ``` They will be inserted at the beginning of the DataFrame's columns. sep : str Column separator in the CSV file. ';' by default. start_index : int Index of the first row in the experiment (and the CSV file) when creating a new file. If the file already exists, this is ignored. 0 by default. return_dataframe : bool If True, return the DataFrame instance. False by default. Returns ------- df : pandas.DataFrame, optional The pandas DataFrame instance, only returned if 'return_dataframe' is True. """ data = {**results, **parameters} try: df = pd.read_csv(file, sep=sep, index_col=0) except FileNotFoundError: df = pd.DataFrame([], columns=data.keys()) df.index.name = 'Experiment' i_exp = start_index else: # Make sure input results keys are the same as the ones in the file. columns_old = list(df.columns) results_keys = list(results.keys()) i_sep = columns_old.index(results_keys[-1]) + 1 if set(results_keys) != set(columns_old[:i_sep]): raise ValueError('Experiment results keys must not change between experiments.') i_exp = df.index[-1] + 1 for k, v in data.items(): df.at[i_exp, k] = v df.to_csv(file, sep=sep) if return_dataframe: return df