Source code for hsr4hci.general

"""
Methods for general purpose usage (e.g., cropping arrays).
"""

# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------

from bisect import bisect_left
from functools import reduce
from typing import Any, Callable, List, Sequence, Tuple, Union, TypeVar

import operator

from scipy.ndimage import shift

import numpy as np


# -----------------------------------------------------------------------------
# TYPE VARS
# -----------------------------------------------------------------------------

T = TypeVar('T', np.ndarray, Sequence)


# -----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# -----------------------------------------------------------------------------

[docs]def crop_center( array: np.ndarray, size: Tuple[int, ...], ) -> np.ndarray: """ Crop an n-dimensional array to the given size around its center. Example: >>> crop_center(np.array([1, 2, 3, 4, 5]), (3,)) array([2, 3, 4]) Args: array: The numpy array to be cropped. size: A tuple containing the size of the cropped array. To not crop along a specific axis, you can specify the size of that axis as `-1`. Returns: The input array, cropped to the desired size around its center. """ # Ensure that the array shape and the size variable match if not array.ndim == len(size): raise RuntimeError( 'Length of size must match number of dimensions of array!' ) # Loop over the axes of the array to create slices slices = list() for old_len, new_len in zip(array.shape, size): # Compute start and end position for axis: if the new length is # greater the current one, we do not crop. if new_len > old_len: start = None end = None # Otherwise, we crop the same amount on both sides else: start = old_len // 2 - new_len // 2 if new_len != -1 else None end = start + new_len if start is not None else None # Create a slice object for axis slices.append(slice(start, end)) return np.asarray(array[tuple(slices)])
[docs]def prestack_array( array: np.ndarray, stacking_factor: int, stacking_function: Callable = np.mean, axis: int = 0, ) -> np.ndarray: """ Perform "pre-stacking" (= temporal binning) on a given ``array``: The array is split into blocks (each of size ``stacking_factor``) along the given axis, and the given ``stacking_function`` is applied to each block (again along the specified axis). The results for each block are combined and returned, resulting in a numpy array that has the same shape as the input array, except that the specified axis has been reduced by the given ``stacking_factor``. Args: array: A numpy array containing the input array. stacking_factor: An integer defining how many elements of the input go into one block and are combined for the output. stacking_function: The function to be used for combining the blocks. For most cases, this will be :func:`numpy.mean` or :func:`numpy.median`. axis: Axis along which the stacking operation is performed. By default, we stack along the time axis, which by convention is the first axis (`0`). Returns: A version of the input ``stack`` where blocks of the specified size have been merged using the given ``stacking_function``. """ # If stacking factor is 1, we do not need to stack anything if stacking_factor == 1: return array # Otherwise, we compute the number of splits and splitting indices n_splits = np.ceil(array.shape[0] / stacking_factor).astype(int) split_indices = [i * stacking_factor for i in range(1, n_splits)] # Now, split the input array into that many splits, merge each split # according to the given stacking_function, and return the result return np.stack( [ stacking_function(block, axis=axis) for block in np.split(array, split_indices, axis=axis) ], axis=axis, )
[docs]def get_from_nested_dict( nested_dict: dict, location: Sequence, ) -> Any: """ Get a value from a nested dictionary at a given ``location``, described by a sequence of keys. Example: >>> dictionary = {'a': {'b': 42}} >>> get_from_nested_dict(dictionary, ['a', 'b']) 42 Args: nested_dict: A nested dictionary. location: The location within the nested dictionary, described by a sequence (i.e., a list or tuple) of keys used to access the target value. Returns: The value of the ``nested_dict`` at the specified ``location``. """ return reduce(operator.getitem, location, nested_dict)
[docs]def set_in_nested_dict( nested_dict: dict, location: Sequence, value: Any, ) -> None: """ Set a value at a given ``location`` (described by a sequence of keys) in a nested dictionary. Example: >>> dictionary = {'a': {'b': 42}} >>> set_in_nested_dict(dictionary, ['a', 'b'], 23) >>> dictionary {'a': {'b': 23}} Args: nested_dict: A nested dictionary. location: The target location within the nested dictionary, described by a sequence (i.e., a list or tuple) of keys used to access the target value. value: The value to be written to the target location. """ get_from_nested_dict(nested_dict, location[:-1])[location[-1]] = value
[docs]def rotate_position( position: Union[Tuple[float, float], np.ndarray], center: Tuple[float, float], angle: Union[float, np.ndarray], ) -> Union[Tuple[float, float], np.ndarray]: """ Take a ``position`` and rotate it around the given ``center`` for the given ``angle``. Either the ``position`` or the ``angle`` can also be a numpy array (but not both!). Args: position: The initial position as a 2-tuple `(x, y)`, or as a numpy array of shape `(2, n_positions)`. center: The center of the rotation as a 2-tuple `(center_x, center_y)`. angle: The rotation angle *in degrees* (not radian); either as a ``float`` or as a numpy array of shape `(n_angles, )`. Returns: The rotated position, either as a 2-tuple, or as a numpy array of shape `(2, n_positions)`. """ # Make sure that not both `position` and `angle` are arrays if isinstance(position, np.ndarray) and isinstance(angle, np.ndarray): raise ValueError('position and angle cannot both be arrays!') # Convert angle from degree to radian for numpy phi = np.deg2rad(angle) # Compute x- and y-coordinate of rotated position(s) x = ( center[1] + (position[0] - center[1]) * np.cos(phi) - (position[1] - center[0]) * np.sin(phi) ) y = ( center[0] + (position[0] - center[1]) * np.sin(phi) + (position[1] - center[0]) * np.cos(phi) ) # If we called the function on an array, we have to return an array if isinstance(x, np.ndarray): return np.vstack((x, y)) return float(x), float(y)
[docs]def find_closest(sequence: Sequence, value: Any) -> Tuple[int, Any]: """ Given a sorted ``sequence``, find the entry (and its index) in it that is the closest to the given ``value``. Original source: https://stackoverflow.com/a/12141511/4100721 Args: sequence: A sequence (= a ``list``, ``tuple`` or numpy array). value: A numeric value (i.e., usually an ``int`` or ``float``). Returns: A tuple `(index, value)` where `value` is the entry in `sequence` that is the closest to `value`, and `index` is its index: ``sequence[index] == value``. """ pos = bisect_left(sequence, value) if pos == 0: return 0, sequence[0] if pos == len(sequence): return len(sequence) - 1, sequence[-1] before = sequence[pos - 1] after = sequence[pos] if after - value < value - before: return pos, after return pos - 1, before
[docs]def pad_array_to_shape( array: np.ndarray, shape: Tuple[int, ...], **kwargs: Any, ) -> np.ndarray: """ Pad a numpy array to a given target shape. (In a way, this is the complement to :func:`numpy.pad`, which adds a given amount of padding.) By default, zero-padding is used. Args: array: An n-dimensional numpy array. shape: The tuple of integers specifying the target shape to which the ``array`` is padded. The length of this tuple must match exactly the number of dimensions of `array`, i.e., this function will *not* automatically add new axes (use ``array.reshape()`` to add a new dimension first for this). Also, the new `shape` must be greater or equal to the current ``array.shape`` for every axis, i.e., this function cannot be used for negative padding (cropping). kwargs: Additional keyword arguments that are passed to :func:`numpy.pad`; for example ``constant_values`` to determine the value with which the array is padded. Returns: A copy of the given ``array`` that has been padded to the given target ``shape``. """ # Make sure that `array` and `shape` have matching dimensions if not array.ndim == len(shape): raise ValueError('Dimensions of array and shape do not align!') # Compute padding tuples pad_width = [] for i in range(array.ndim): # Compute difference between target size and current size and ensure # that it is non-negative (this function does not handle cropping!) difference = float(shape[i] - array.shape[i]) if difference < 0: raise ValueError( f'Target size {shape[i]} along axis {i} is smaller than ' f'current size {array.shape[i]}!' ) # If the difference is non-negative, compute the left and right padding padding = (int(difference // 2), int(difference // 2 + difference % 2)) pad_width.append(padding) # Finally, call np.pad() with the pad_width values that we have computed return np.asarray( np.pad(array=array, pad_width=pad_width, mode='constant', **kwargs) )
[docs]def crop_or_pad( array: np.ndarray, size: Tuple[int, ...], **kwargs: Any, ) -> np.ndarray: """ Take an ``array`` and a target ``size`` and either crop or pad the ``array`` to match the given size. Args: array: A numpy array. size: A tuple of integers specifying the target size. kwargs: Additional keyword arguments that are passed to :func:`pad_array_to_shape()`. For example, use ``constant_values`` to specify the padding value (default: `0`). Returns: The original ``array``, cropped or padded to match the ``size``. """ # If all array dimensions are larger than the target, we crop the array if all(array.shape[_] >= size[_] for _ in range(array.ndim)): return crop_center(array, size) # If all array dimensions are smaller than the target, we pad the array if all(array.shape[_] <= size[_] for _ in range(array.ndim)): return pad_array_to_shape(array, size, **kwargs) # If some dimensions are larger and some are smaller, we raise an error raise RuntimeError('Mixing of cropping and padding is not supported!')
[docs]def shift_image( image: np.ndarray, offset: Tuple[float, float], interpolation: str = 'bilinear', mode: str = 'constant', ) -> np.ndarray: """ Function to shift a 2D array (i.e., an ``image``) by a given 2D ``offset``. This function is essentially a simplified port of the PynPoint function of the same name: :func:`pynpoint.util.image.shift_image`. Args: image: A 2D numpy array containing the image to be shifted. offset: A tuple of floats `(x_shift, y_shift)` containing the amount (in pixels) how much the `image` should be shifted. interpolation: The interpolation method to be used. Must be one of the following: `"spline"`, `"bilinear"`. Default is "`bilinear"` because it is flux-preserving. mode: The mode parameter determines how the input array is extended beyond its boundaries. See :py:func:`scipy.ndimage.shift` for a full documentation. Returns: The ``image`` shifted by the amount specified in ``offset``. """ # Ensure that the image is really 2D if image.ndim != 2: raise ValueError('Input image must be 2D!') # Call the respective scipy routine with the correct arguments. # NOTE: We flip the order of `offset` here, because shift() operates on # a numpy array, which uses the matrix-like indexing convention, unlike # the rest of the code, which uses "intuitive" coordinates. if interpolation == 'spline': shifted_image = shift(image, offset[::-1], order=5, mode=mode) elif interpolation == 'bilinear': shifted_image = shift(image, offset[::-1], order=1, mode=mode) else: raise ValueError( 'The value of interpolation must be one of the following: ' '"spline", "bilinear"!' ) return np.asarray(shifted_image)
[docs]def flatten_nested_dict(d: dict, parent_key: str = '', sep: str = '_') -> dict: """ Flatten a nested dictionary into a dictionary with only 1 level. Original source: https://stackoverflow.com/a/6027615/4100721 Example: >>> flatten_nested_dict({'a': {'b': 5}, 'c': 2}) {'a_b': 5, 'c': 2}. Args: d: Dictionary to be flattened. parent_key: Key of the parent of ``d``; this is needed because the function calls itself recursively. sep: The separator to use for merging keys. Returns: A flattened version of the input dictionary. """ items: List[Tuple[str, Any]] = [] for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k if isinstance(v, dict): items.extend(flatten_nested_dict(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items)
[docs]def fast_corrcoef( x: np.ndarray, y: np.ndarray ) -> float: """ A fast(er) way to compute the correlation coefficient between the variables ``x`` and ``y``, based on :func:`numpy.einsum`. Args: x: A numpy array with realizations of the random variable `X`. y: A numpy array with realizations of the random variable `Y`. Returns: The correlation coefficient `Corr(X, Y)`. """ n = x.shape[0] dx = (x - (np.einsum("n->", x) / np.double(n))) dy = (y - (np.einsum("n->", y) / np.double(n))) cov = np.einsum("i,i->", dy, dx) var_x = np.einsum("n,n->", dy, dy) var_y = np.einsum("n,n->", dx, dx) tmp = var_x * var_y return float(cov / np.sqrt(tmp))