Source code for clawdia.lib
"""Collection of auxiliary functions.
This module provides a temporary collection of various utility functions,
including mathematical operations, signal normalization, optimization routines,
and signal patching utilities.
Notes
-----
As the CLAWDIA pipeline grows, these functions will be organized into more
semantically appropriate modules.
"""
import warnings
import numpy as np
from numpy.typing import NDArray
# Default values same as Scipy's `optimize._zeros_py.py` module.
# Ref: https://github.com/scipy/scipy/blob/v1.15.0/scipy/optimize/_zeros_py.py
_xtol = 2e-12
_rtol = 4 * np.finfo(float).eps
[docs]
class BoundaryError(ValueError):
"""Exception raised when a boundary condition is violated.
This exception is a subclass of `ValueError` and is used to signal
issues with input values exceeding or falling below defined boundaries.
"""
pass
[docs]
def abs_normalize(array, axis=-1):
"""TODO
Normalitza inplace un array ignorant els errors de divissió entre 0 i
canviant els nan a 0.
"""
with np.errstate(divide='ignore', invalid='ignore'):
array /= np.max(np.abs(array), axis=axis, keepdims=True)
np.nan_to_num(array, copy=False)
[docs]
def l2_normalize(array, axis=-1):
"""TODO
Normalitza inplace un array amb la norma L2 ignorant els errors de divissió
entre 0 i canviant els nan a 0.
"""
with np.errstate(divide='ignore', invalid='ignore'):
array /= np.linalg.norm(array, axis=axis, keepdims=True)
np.nan_to_num(array, copy=False)
[docs]
def semibool_bisect(f, a, b, args=(), xtol=_xtol, rtol=_rtol, maxiter=100, verbose=False):
"""Semi-boolean bisection method for solving ``f(x)``.
Find the boundary point ``x0`` in the interval [a, b] using a modified
bisection method such that:
- ``f(x) > 0 for x <= x0``
- ``f(x) == 0 for x > x0``
or vice versa. One of the two endpoints of the interval [a, b] must
satisfy ``f(x) = 0``.
The method iteratively narrows down the interval [a, b] until the
solution is found to within the specified tolerances `xtol` and `rtol`, or
the maximum number of iterations is reached.
This algorithm is based on the `scipy.optimize.bisect` method, but includes
modifications for this specific boundary behavior.
Parameters
----------
f : callable
A continuous function of a single variable, ``f(x)``. The function must
accept a single positional argument and any additional arguments via
`args`.
a : float
Lower bound of the interval [a, b].
b : float
Upper bound of the interval [a, b].
args : tuple, optional
Additional arguments to pass to the function `f`.
xtol : float, optional
Absolute tolerance for the solution. Default is ``2e-12``.
rtol : float, optional
Relative tolerance for the solution. Default is `_rtol`.
maxiter : int, optional
Maximum number of iterations to perform. Default is 100.
verbose : bool, optional
If `True`, prints the progress of each iteration, including the
evaluation of ``f(x)`` at the midpoint. Default is `False`.
Returns
-------
solver_stats : dict
A dictionary containing information about the solution:
- `x` : float
The last point where ``f(x) != 0``. Approximates the boundary point
`x0`.
- `f` : float
The value of ``f(x)`` at the solution.
- `converged` : bool
Indicates whether the algorithm converged to a solution.
- `niters` : int
The number of iterations performed.
- `funcalls` : int
The total number of function evaluations.
Raises
------
BoundaryError
If the initial interval [a, b] does not satisfy the condition that
one endpoint evaluates to ``f(x) = 0``.
ValueError
If the function is not continuous or if `a` and `b` are not proper
bounds.
Notes
-----
- The `xtol` parameter controls the absolute precision of the solution,
while the `rtol` parameter ensures relative precision with respect to the
value of `x`.
- This method assumes that ``f(x)`` contains exactly one boundary point
within [a, b].
Examples
--------
>>> def f(x):
... return 0 if x < 2 else x
>>> result = semibool_bisect(f, 0, 4)
>>> print(result['x'])
2.0
>>> print(result['converged'])
True
"""
fa = f(a, *args)
fb = f(b, *args)
solver_stats = {'funcalls': 2}
if (fa*fb != 0) or (fa == fb == 0):
raise BoundaryError("There isn't a boundary point in the 0 zone")
if fa == 0:
a, b = b, a
fa, fb = fb, fa
dm = b - a
for i in range(maxiter):
dm *= 0.5
xm = a + dm
if verbose:
print(f" iteration {i}, evaluating f({xm}) ...")
fm = f(xm, *args)
solver_stats['funcalls'] += 1
if fm != 0:
a = xm
fa = fm
if abs(dm) < xtol + rtol*abs(xm):
solver_stats['converged'] = True
solver_stats['niters'] = i+1
solver_stats['x'] = a # Last point where f(x) != 0
solver_stats['f'] = fa
return solver_stats
# Not converged
solver_stats['converged'] = False
solver_stats['niters'] = i+1
solver_stats['x'] = a
solver_stats['f'] = fa
return solver_stats
[docs]
def extract_patches(
signals, *, patch_size, n_patches=None, random_state=None,
step=1, limits=None, patch_min=1, l2_normed=False,
return_norm_coefs=False, allow_allzeros=True
) -> tuple[NDArray, NDArray] | NDArray:
"""Extract patches from 'signals'.
TODO
Note that if randomly selected, it is not prevented to repeat patches.
PARAMETERS
----------
signals: ndarray
If 2d-array, must be in C-contiguous order with shape
(n_signals, l_signal).
patch_size: int
Length of the patches to extract.
limits: ndarray, optional
Limit(s) in 'signals' such that:
```
i0, i1 = limits[i_signal]
signal = signals[i_signal, i0:i1]
```
patch_min: int, optional
When 'limits' are given, patch_min is the minimum samples inside the
limits to be included in the extracted patches.
Defaults to 1.
n_patches: int, optional
Total number of patches to extract. If None (default) extract the
maximum amount.
random_state: {None, int, array_like[ints], SeedSequence, BitGenerator, Generator}, optional
Given to 'numpy.random.PCG64(random_state)'.
step: int, optional
Minimum windowing step (separation between patches).
l2_normed: bool, optional
If True will norm each patch to its L2 norm. False by default.
return_norm_coefs: bool, optional
If True, return also the coefficients used to normalize the signals
(useful for when 'signals' come from a windowed signal that will be
reassembled afterwards).
False by default.
allow_allzeros: bool, optional
When extracting random patches, if False and `l2_normed == True`,
generate another random window position until the l2 norm is != 0.
"""
if signals.ndim > 2:
raise ValueError("'signals' must be 2d-array at most")
if limits is not None:
if limits.shape[1] != 2:
raise ValueError(
f"'limits' has a wrong shape: {limits.shape}"
)
if patch_min > np.min(np.diff(limits, axis=1)):
raise ValueError(
"there is at least one signal according to its 'limits' shorter"
" than 'patch_min'"
)
if not allow_allzeros and not signals.any():
raise ValueError(
"'allow_allzeros' is False, but 'signals' contains only zeros. "
"Random patch extraction would result in an infinite loop."
)
if signals.ndim == 1:
signals = signals[np.newaxis,:]
# Compute the maximum patches per signal that can be obtained with the
# given 'step' ignoring the limits.
rng = np.random.Generator(np.random.PCG64(random_state))
n_signals, l_signals = signals.shape
max_pps = (l_signals - patch_size) / step + 1
if not max_pps.is_integer() and limits is None and n_signals == 1:
warnings.warn(
"'signals' cannot be fully divided into patches, the last"
f" {(max_pps-1)*step % step:.0f} bins of each signal will be left out",
RuntimeWarning
)
max_pps = int(max_pps)
# Compute the maximum TOTAL number of patches and the limits from where to
# extract patches for each signal.
if limits is None:
window_limits = [(0, l_signals-patch_size+1)] * n_signals
max_patches = max_pps * n_signals
else:
window_limits = []
max_patches = 0
for p0, p1 in limits:
p0 += patch_min - patch_size
p1 -= patch_min
if p0 < 0:
p0 = 0
if p1 + patch_size >= l_signals:
p1 = l_signals - patch_size
window_limits.append((p0, p1))
max_patches += int(np.ceil((p1-p0)/step))
if n_patches is None:
n_patches = max_patches
elif n_patches > max_patches:
raise ValueError(
f"the keyword argument 'n_patches' ({n_patches}) exceeds"
f" the maximum number of patches that can be extracted ({max_patches})."
)
patches = np.empty((n_patches, patch_size), dtype=signals.dtype)
# Extract all possible patches.
if n_patches == max_patches:
if allow_allzeros:
k = 0
for i in range(n_signals):
p0, p1 = window_limits[i]
for j in range(p0, p1, step):
patches[k] = signals[i, j:j+patch_size]
k += 1
else:
# Discard all-zero patches; shrink output accordingly.
collected = []
for i in range(n_signals):
p0, p1 = window_limits[i]
for j in range(p0, p1, step):
patch = signals[i, j:j+patch_size]
if patch.any():
collected.append(patch)
if collected:
patches = np.vstack(collected).astype(signals.dtype, copy=False)
else:
raise ValueError(
"no non-zero patches could be extracted with the given "
"signals, limits and parameters (all patches are all-zero)"
)
n_patches = patches.shape[0]
# Extract a limited number of patches randomly selected.
else:
for k in range(n_patches):
i = rng.integers(0, n_signals)
j = rng.integers(*window_limits[i])
signal_aux = signals[i, j:j+patch_size]
if not allow_allzeros:
while not signal_aux.any():
j = rng.integers(*window_limits[i])
signal_aux = signals[i, j:j+patch_size]
patches[k] = signal_aux
# Normalize each patch to its L2 norm
if l2_normed:
coefs = np.linalg.norm(patches, axis=1, keepdims=True)
# Ignore x/0 and 0/0 cases
with np.errstate(divide='ignore', invalid='ignore'):
patches /= coefs
patches = np.nan_to_num(patches)
coefs = np.ravel(coefs)
elif return_norm_coefs:
coefs = np.ones(n_patches)
return (patches, coefs) if return_norm_coefs else patches
[docs]
def reconstruct_from_patches_1d(patches, step):
n_patches, l_patches = patches.shape
total_len = (n_patches - 1) * step + l_patches
reconstructed = np.zeros(total_len, dtype=patches.dtype)
normalizer = np.zeros_like(reconstructed)
for i in range(n_patches):
reconstructed[i*step:i*step+l_patches] += patches[i]
normalizer[i*step:i*step+l_patches] += 1
normalizer[i*step+l_patches:] = 1
reconstructed /= normalizer
return reconstructed
FINISHED = "FINISHED " + f"\n\n{''.join(chr(x) for x in (9995,128524,128076))}"