Source code for hsr4hci.training

"""
Methods for training half-sibling regression models.
"""

# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------

from typing import Any, Dict, Optional, Tuple, Union

from astropy.units import Quantity
from sklearn.preprocessing import StandardScaler
from tqdm.auto import tqdm

import numpy as np

from hsr4hci.base_models import BaseModelCreator
from hsr4hci.forward_modeling import get_time_series_for_position
from hsr4hci.general import fast_corrcoef
from hsr4hci.masking import (
    get_predictor_pixel_selection_mask,
    get_positions_from_mask,
    get_partial_roi_mask,
)

from hsr4hci.splitting import AlternatingSplit
from hsr4hci.typehinting import RegressorModel
from hsr4hci.utils import check_consistent_size


# -----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# -----------------------------------------------------------------------------

[docs]def get_signal_times(n_frames: int, n_signal_times: int) -> np.ndarray: """ Generate a temporal grid of signal times. This function is just a wrapper around :func:`numpy.linspace` and exists mainly to ensure consistency everywhere. Args: n_frames: The total number of frames in the stack. n_signal_times: The number of positions on the temporal grid that we create. Returns: A 1D numpy array of shape `(n_signal_times, )` containing the temporal grid (i.e., signal times) as integers. """ # Generate `n_signal_times` different possible points in time (distributed # uniformly over the observation) at which the planet signal could be return np.linspace(0, n_frames - 1, n_signal_times).astype(int)
[docs]def add_obscon_as_predictors( predictors: np.ndarray, obscon_array: np.ndarray, expected_signal: np.ndarray, max_correlation: float = 0.5, ) -> np.ndarray: """ Merge the ``predictors`` and the ``obscon_array`` and take into account the desired maximum correlation with the expected signal. Args: predictors: A 2D numpy array with shape `(n_frames, n_pixels)` that contains the *predictors* for a target pixel. obscon_array: A 2D numpy array with shape `(n_frames, n_obscon)` that contains the (global) observing conditions. expected_signal: A 1D numpy array with shape `(n_frames,)` that contains the expected signal for a target pixel. In case we are training a default model, the ``expected_signal`` will be all `NaN`. max_correlation: Maximum value for the correlation between the expected signal and the observing conditions. OCs with a higher correlation will not be added as predictors. Returns: A 2D numpy array with shape `(n_frames, n_predictors)` that contains the (full) predictors: both pixels and admissible OC. This means: `n_predictors <= n_pixels + n_obscon`. """ # Initialize the output output = [predictors] # If the expected_signal is all-NaN, we are training a default model. In # this case, we do not need to worry about the correlation between the # observing conditions and the expected signal (because there is none). # Also, if all values of `expected_signal` are the same, we cannot compute # the correlation, because the variance of `expected_signal` is 0. (This # should only happen at extremely small separations.) if np.isnan(expected_signal).any() or len(np.unique(expected_signal)) == 1: return np.hstack((predictors, obscon_array)) # Otherwise (i.e., for signal fitting / masking), we loop over all # observing conditions, compute their correlation with the expected # signal, and only add them to the output if the correlation does not # exceed the given maximum: for obscon in obscon_array.T: correlation = fast_corrcoef( obscon.squeeze(), expected_signal.squeeze() ) if np.abs(correlation) <= max_correlation: output.append(obscon.reshape(-1, 1)) return np.hstack(output)
[docs]def train_all_models( roi_mask: np.ndarray, stack: np.ndarray, parang: np.ndarray, obscon_array: np.ndarray, selection_mask_config: Dict[str, Any], base_model_creator: BaseModelCreator, psf_template: np.ndarray, train_mode: str, max_oc_correlation: float = 0.5, n_train_splits: int = 3, n_signal_times: int = 30, n_roi_splits: int = 1, roi_split: int = 0, return_format: str = 'full', ) -> Dict[str, Union[np.ndarray, Dict[str, np.ndarray]]]: """ Loop over all positions selected by the ``roi_mask`` (or rather, the subset given by ``roi_split`` and ``n_roi_splits``), train a model for each pixel (using :func:`train_model_for_position()`) and each potential signal time from the temporal grid, and return the results formatted according to the requested ``return_format``. Args: roi_mask: A 2D numpy array of shape `(x_size, y_size)` that contains a binary mask to select the region of interest, that is, the pixels for which to train noise models. stack: A 3D numpy array of shape `(n_frames, x_size, y_size)` containing the data on which to train the noise models. parang: A 1D numpy array of shape `(n_frames, )` containing the corresponding parallactic angle for each frame. obscon_array: A 2D numpy array of shape `(n_frames, n_features)` containing the observing conditions that should be used as additional predictors. selection_mask_config: A dictionary containing two keys (namely `"radius_position"` and `"radius_opposite"`) that define the mask that is used to select the predictor pixels. The values of the dict should be tuples of the form `(value, "unit")`. base_model_creator: An instance of ``BaseModelCreator`` that can be used to instantiate new base models. psf_template: A 2D numpy array containing the unsaturated PSF template. train_mode: The mode to use for training; must be one of the following: `"default"` (for the vanilla HSR model), `"signal_fitting"` or `"signal_masking"`. max_oc_correlation: Maximum value for the correlation between the ``expected_signal`` and an observing conditions (OC) for this OC to be used as a predictor. (Basically, we do not want to use OC that are "accidentally" strongly correlated with a potential signal.) n_train_splits: The number of training / test splits to use. n_signal_times: The size of the temporal grid, that is, the number of different (temporal) signal positions that are assumed for each pixel. n_roi_splits: The (total) number of splits into which the ROI should be divided. roi_split: The index of the split for which to return the mask. return_format: The format in which the residuals are returned. If `"full"`, the residuals are 3D arrays that have the same size as the ``stack``. If `"partial"`, the residuals are 2D arrays that have the shape `(n_frames, n_pixels_in_split)`. The latter is recommended when training in parallel, because otherwise we waste a *lot* of storage for storing `NaN` in the intermediate result files. Returns: A dictionary containing three keys 1. `"stack_shape"`: the shape of the original stack; required when merging partial result files. 2. `"roi_mask"`: the *partial* ROI mask that was used for training; also required when merging partial result files. 3. `"residuals"`: a dictionary with keys `"default"`, `"0"`, ..., `"N"`, where the latter are the signal times for which we have trained models. Each key maps onto a numpy array containing the residuals for the respective model. """ # Run some basic sanity checks check_consistent_size(stack, parang, axis=0) if train_mode not in ('default', 'signal_fitting', 'signal_masking'): raise ValueError('Illegal value for train_mode!') if return_format not in ('full', 'partial'): raise ValueError('Illegal value for return_format!') if not (isinstance(n_roi_splits, int) and n_roi_splits > 0): raise ValueError('n_roi_splits must be a positive integer!') if not (isinstance(roi_split, int) and 0 <= roi_split < n_roi_splits): raise ValueError('roi_split must be an integer in [0, n_roi_splits)!') # Define shortcuts n_frames = stack.shape[0] signal_times = get_signal_times(n_frames, n_signal_times) # Get the partial ROI mask (that selects the subset of the ROI defined by # `roi_split` and `n_roi_splits`) and count the number of models to train partial_roi_mask = get_partial_roi_mask(roi_mask, roi_split, n_roi_splits) n_models = int((n_signal_times + 1) * np.sum(partial_roi_mask)) # Set up a progress bar to keep track of the training progress with tqdm(ncols=80, total=n_models) as progressbar: # Initialize dictionary in which we will collect *all* residuals residuals: Dict[str, np.ndarray] = {} # Loop over both the default model and the temporal grid, train the # respective models, compute the residuals, and store them for key in ['default'] + list(signal_times): # Initialize temporary array for residuals for the current key key_residuals = np.full_like(stack, np.nan) # Define train mode and signal time train_mode_ = 'default' if key == 'default' else train_mode signal_time = None if key == 'default' else int(key) # Loop over the pixels in the ROI split and train a model for each for x, y in get_positions_from_mask(partial_roi_mask): residual, _ = train_model_for_position( stack=stack, parang=parang, obscon_array=obscon_array, position=(x, y), train_mode=train_mode_, signal_time=signal_time, selection_mask_config=selection_mask_config, psf_template=psf_template, n_train_splits=n_train_splits, base_model_creator=base_model_creator, max_oc_correlation=max_oc_correlation, ) key_residuals[:, x, y] = residual progressbar.update() # Store the residuals either in the full or partial format if return_format == 'full': residuals[str(key)] = key_residuals elif return_format == 'partial': residuals[str(key)] = key_residuals[:, partial_roi_mask] # Return the residuals together with the stack shape and the mask for # the (subset of) the ROI that we processed here return { 'stack_shape': np.array(stack.shape), 'roi_mask': partial_roi_mask, 'residuals': residuals, }
[docs]def train_model_for_position( stack: np.ndarray, parang: np.ndarray, obscon_array: np.ndarray, position: Tuple[int, int], train_mode: str, signal_time: Optional[int], selection_mask_config: Dict[str, Any], psf_template: np.ndarray, n_train_splits: int, base_model_creator: BaseModelCreator, expected_signal: Optional[np.ndarray] = None, max_oc_correlation: float = 0.5, ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: """ Train a model for a given position (or rather: a set of models, because of the train / test splitting scheme). Args: stack: A 3D numpy array of shape `(n_frames, x_size, y_size)` containing the data on which to train the noise models. parang: A 1D numpy array of shape `(n_frames, )` containing the corresponding parallactic angle for each frame. obscon_array: A 2D numpy array of shape `(n_frames, n_features)` containing the observing conditions that should be used as additional predictors. position: A tuple `(x, y)` of integers containing the position for which to train the model(s). train_mode: The mode to use for training; must be one of the following: `"default"` (for the vanilla HSR model), `"signal_fitting"` or `"signal_masking"`. signal_time: If ``train_mode`` is `"default"`, this should be `None`. Otherwise, this should contain the time at which the planet signal is assumed to peak at the given ``position`` (we need this value to be able to compute the forward model for signal fitting / masking). selection_mask_config: A dictionary containing two keys (namely `"radius_position"` and `"radius_opposite"`) that define the mask that is used to select the predictor pixels. The values of the dict should be tuples of the form `(value, "unit")`. psf_template: A 2D numpy array containing the unsaturated PSF template. n_train_splits: The number of training / test splits to use. base_model_creator: An instance of ``BaseModelCreator`` that can be used to instantiate new base models. expected_signal: If the ``train_mode`` is `"signal_fitting"` or `"signal_masking"`, you can *optionally* also pass the expected signal explicitly to this function to avoid computing it here. This option may be useful when the HSR is used "hypothesis- based" instead of for a blind search, that is, we already have a hypothesis about the planet position from which we can compute the expected signal stack, meaning we do not need to loop over a temporal grid but only train a single model per pixel (either `"default"` or `"signal_fitting"` / `"signal_masking"`). Note that the ``expected_signal`` should be consistent with the given ``signal_time``; otherwise the mask that is used for the pixel predictor selection will be wrong. max_oc_correlation: Maximum value for the correlation between the ``expected_signal`` and an observing conditions (OC) for this OC to be used as a predictor. (Basically, we do not want to use OC that are "accidentally" strongly correlated with a potential signal.) Returns: A 2-tuple consisting of 1. the residual time series for the given `position`, 2. a dictionary containing additional debugging information about the model(s) that we have trained; for example, the values of the coefficients or regularization coefficients. """ # ------------------------------------------------------------------------- # Construct predictor pixel selection mask; select predictor pixels # ------------------------------------------------------------------------- # Define a few useful shortcuts n_frames = stack.shape[0] frame_size = (stack.shape[1], stack.shape[2]) # Construct the selection mask for the pixel predictors. # Note: `get_predictor_pixel_selection_mask()` expects the position to be # in the astropy coordinate convention, but `position` (since it is usually # produced by `get_positions_from_mask()`) is in numpy coordinates; # therefore we need to flip it. selection_mask = get_predictor_pixel_selection_mask( mask_size=frame_size, position=position[::-1], radius_position=Quantity(*selection_mask_config['radius_position']), radius_opposite=Quantity(*selection_mask_config['radius_opposite']), radius_excluded=Quantity(*selection_mask_config['radius_excluded']), ) # Compute the number of predictor *pixels* (since we might still add the # observing conditions, this is not necessarily the number of predictors) n_pred_pixels = int(np.sum(selection_mask)) # Select the full targets and predictors for the current position full_predictors = stack[:, selection_mask] full_targets = stack[:, position[0], position[1]].reshape(-1, 1) # ------------------------------------------------------------------------- # Prepare result variables # ------------------------------------------------------------------------- # Prepare array for predictions full_predictions = np.full(n_frames, np.nan) # Keep track of several model parameters alphas = np.full(n_train_splits, np.nan) pixel_coefs = np.full((n_train_splits, n_pred_pixels), np.nan) planet_coefs = np.full(n_train_splits, np.nan) # ------------------------------------------------------------------------- # Compute the expected signal (if needed) # ------------------------------------------------------------------------- # If we have already received an expected_signal (e.g., because we are # running the HSR in "hypothesis-based mode"), we do not compute it here if expected_signal is None: # Always initialize the expected signal -- for default models, the # expected_signal will simply be all NaN. expected_signal = np.full(n_frames, np.nan) # Only compute it if we are not training a default model. This happens # here, so we don't have to re-compute it in each train / test-split. if train_mode in ('signal_fitting', 'signal_masking'): # Ensure that the signal time is not None if signal_time is None: raise RuntimeError('signal_time must not be None!') # Compute expected signal based on position and signal_time. The # resulting time series is already normalized to a maximum of 1. expected_signal = get_time_series_for_position( position=position, signal_time=signal_time, frame_size=frame_size, parang=parang, psf_template=psf_template, ) # ------------------------------------------------------------------------- # Add observing conditions to the predictors # ------------------------------------------------------------------------- # When adding the observing conditions as predictors, we want to make sure # that none of the OC is too strongly correlated with the expected signal full_predictors = add_obscon_as_predictors( predictors=full_predictors, obscon_array=obscon_array, expected_signal=expected_signal, max_correlation=max_oc_correlation, ) # ------------------------------------------------------------------------- # Train model(s) # ------------------------------------------------------------------------- # To prevent overfitting, we do not simply fit a model to the full data # and then get its prediction (i.e., the fit result). Instead, we split # the data into "training" and "application" (in a cross-validation way) # and learn models on the training data before applying them (to get the # prediction) to the application data. # This ensures that the model cannot simply memorize the "correct" # prediction, which otherwise can happen even for simple models in cases # where we have more predictors than time steps. splitter = AlternatingSplit(n_splits=n_train_splits) for i, (train_idx, apply_idx) in enumerate(splitter.split(full_targets)): # --------------------------------------------------------------------- # Select and prepare (i.e., scale / whiten) the training data # --------------------------------------------------------------------- # Select predictors, targets and sample weights for training train_predictors = full_predictors[train_idx] apply_predictors = full_predictors[apply_idx] train_targets = full_targets[train_idx] # Apply a scaler to the predictors p_scaler = StandardScaler() train_predictors = p_scaler.fit_transform(train_predictors) apply_predictors = p_scaler.transform(apply_predictors) # Apply a predictor to the targets t_scaler = StandardScaler() train_targets = t_scaler.fit_transform(train_targets).ravel() # --------------------------------------------------------------------- # Train the model (which, in the end, should only predict noise!) # --------------------------------------------------------------------- # Either train a vanilla / default model... if train_mode == 'default': model = _train_default_model( base_model_creator=base_model_creator, train_predictors=train_predictors, train_targets=train_targets, ) # ... or a model with signal fitting... elif train_mode == 'signal_fitting': model, planet_coefficient = _train_signal_fitting_model( base_model_creator=base_model_creator, train_predictors=train_predictors, train_targets=train_targets, expected_signal=expected_signal[train_idx], ) planet_coefs[i] = planet_coefficient # ... or a model with signal masking elif train_mode == 'signal_masking': model = _train_signal_masking_model( base_model_creator=base_model_creator, train_predictors=train_predictors, train_targets=train_targets, expected_signal=expected_signal[train_idx], ) else: raise ValueError('Illegal value for train_mode!') # --------------------------------------------------------------------- # Apply the model to the hold-out data to get the predictions # --------------------------------------------------------------------- # If the model is None, training did not succeed for some reason (for # example, the signal masked would have excluded a too large fraction # of the data). In this case, we do not get any predictions. # Otherwise---that if the training succeeded---we can apply the model # to the data that were held out by the split to get the predictions. if model is not None: # Use the model (that we learned on the `train_predictors`) to # get a prediction on the `apply_predictors`, and undo the target # normalization predictions = model.predict(X=apply_predictors).reshape(-1, 1) predictions = t_scaler.inverse_transform(predictions).ravel() # Store the predictions for the current split full_predictions[apply_idx] = predictions # For regularized models: store the regularization strength of # this split (for debugging purposes) if hasattr(model, 'alpha_'): if np.isscalar(model.alpha_): alphas[i] = float(model.alpha_) # type: ignore # For linear models: store pixel coefficients. In the case of a # linear model, this is basically the model (up to the intercept). if hasattr(model, 'coef_'): pixel_coefs[i] = model.coef_[:n_pred_pixels] # ------------------------------------------------------------------------- # Compute residuals and return results # ------------------------------------------------------------------------- # After loop over all train/test splits is complete, compute residuals full_residuals = full_targets.ravel() - full_predictions.ravel() # Finally, return the residuals and the information about the model return ( full_residuals, dict( alphas=alphas, pixel_coefs=pixel_coefs, planet_coefs=planet_coefs, selection_mask=selection_mask, ), )
[docs]def _train_default_model( base_model_creator: BaseModelCreator, train_predictors: np.ndarray, train_targets: np.ndarray, ) -> Optional[RegressorModel]: """ Train a default model (i.e., a "vanilla" HSR model with no signal fitting or masking). .. caution:: This function should not be used directly, but rather through :func:`train_model_for_position` with ``train_mode`` set to `"default"`. Args: base_model_creator: Instance of ``BaseModelCreator`` that can be used to instantiate a new model. train_predictors: A 2D numpy array containing the (normalized) predictors. Shape: `(n_time_steps, n_features)`. train_targets: A 1D numpy array containing the (normalized) targets. Shape `(n_time_steps, )`. Returns: The trained model instance, or None, if the training failed with a :class:`numpy.linalg.LinAlgError`. """ # Instantiate a new model model = base_model_creator.get_model_instance() # Fit the model to the (full) training data try: model.fit(X=train_predictors, y=train_targets) return model except np.linalg.LinAlgError: # pragma: no cover return None
[docs]def _train_signal_fitting_model( base_model_creator: BaseModelCreator, train_predictors: np.ndarray, train_targets: np.ndarray, expected_signal: np.ndarray, ) -> Tuple[Optional[RegressorModel], float]: """ Train a model with signal fitting. .. caution:: This function should not be used directly, but rather through :func:`train_model_for_position` with ``train_mode`` set to `"signal_fitting"`. Args: base_model_creator: Instance of `BaseModelCreator` that can be used to instantiate a new model. train_predictors: A 2D numpy array containing the (normalized) predictors. Shape: `(n_time_steps, n_features)`. train_targets: A 1D numpy array containing the (normalized) targets. Shape: `(n_time_steps, )`. expected_signal: A 1D numpy array containing the expected signal for the hypothesis under which we are training the current model. Shape: `(n_time_steps, )`. Returns: A 2-tuple, consisting of 1. the trained noise model instance (i.e., with the planet coefficient removed), and 2. the value of the planet coefficient that was removed. If the training fails with a :class:`numpy.linalg.LinAlgError`, this function returns `(None, numpy.nan)`. """ # Instantiate a new model model: Any = base_model_creator.get_model_instance() # Check that we have instantiated a linear model: signal fitting is only # possible if we can remove the coefficient that corresponds to the signal # after training. This only works for linear models and neural networks # with a special network architecture; this implementation only supports # the former. if 'linear_model' not in model.__module__: raise RuntimeError('Signal fitting only works with linear models!') # In "signal fitting" mode, we add the expected signal as an additional # predictor to a (linear) model. Usually, we are using regularized models, # such as ridge regression, and choose the regularization strength via # cross-validation. In this case, we do not want the coefficient that # belongs to the planet signal (i.e., the expected signal) to affect the # choice of the regularization strength, or rather, we do not want the # model to choose a "too small" coefficient for the planet signal because # of the regularization. # A simple (but somewhat hacky...) solution is to multiply the expected # signal with a large number, meaning that the corresponding coefficient # can be small (compared to the "noise part" of the model) and will thus # have only negligible influence on the regularization term of the loss # function. # Note: the scaling factor should also not be *too large*, because in this # case you get "RuntimeWarning: invalid value encountered in multiply" for # many pixels, and the residuals are starting to look worse. expected_signal_ = np.copy(expected_signal) expected_signal_ *= 1_000 expected_signal_ = expected_signal_.reshape(-1, 1) # Add the expected signal to the train predictors. We add it as the last # column in the predictors-matrix, meaning that we know that we can access # the corresponding coefficient of the model as `model.coef_[-1]`. # Note also that we do this *after* the "regular" `train_predictors` have # already been normalized / whitened! We do *not* normalize the expected # signal again after the above re-scaling procedure! train_predictors_ = np.column_stack([train_predictors, expected_signal_]) # Fit the model to the (full) training data, including the extra predictor # in form of the expected signal try: model.fit(X=train_predictors_, y=train_targets) except np.linalg.LinAlgError: # pragma: no cover return None, np.nan # Ideally, we would constrain the coefficient of the expected signal (and # ONLY this coefficient) to be non-negative. After all, there is no such # thing as a "negative planet". However, such a constrained model does not # have an analytic solution (unlike "normal" linear models) and can only # be learned used optimization / quadratic programming, which would require # a custom model class (sklearn do not provide linear models where you can # place constraints on individual coefficients) and also increase training # time. # For these reasons, we use the following simple (and, again, somewhat # hacky...) solution: We simply check if the coefficient that belongs to # the expected signal is negative. In this case, we train the model again, # this time WITHOUT the expected signal as a predictor (effectively forcing # the coefficient to 0). # Get the coefficient that belongs to the expected signal, and undo the # scaling that we applied to the expected signal due to the regularization planet_coefficient = float(model.coef_[-1]) * 1_000 # If the planet coefficient is negative, re-train the model *without* the # expected signal as a predictor if planet_coefficient < 0: try: model.fit(X=train_predictors, y=train_targets) except np.linalg.LinAlgError: # pragma: no cover return None, np.nan # If the planet coefficient is NOT negative, we can create a "noise # only"-model by simply dropping the last coefficient from the model else: model.coef_ = model.coef_[:-1] model.n_features_in_ -= 1 # Return the model and the planet coefficient return model, planet_coefficient
[docs]def _train_signal_masking_model( base_model_creator: BaseModelCreator, train_predictors: np.ndarray, train_targets: np.ndarray, expected_signal: np.ndarray, ) -> Optional[RegressorModel]: """ Train a model with signal masking. .. caution:: This function should not be used directly, but rather through :func:`train_model_for_position` with ``train_mode`` set to `"signal_masking"`. Args: base_model_creator: Instance of ``BaseModelCreator`` that can be used to instantiate a new model. train_predictors: A 2D numpy array containing the (normalized) predictors. Shape: `(n_time_steps, n_features)`. train_targets: A 1D numpy array containing the (normalized) targets. Shape: `(n_time_steps, )`. expected_signal: A 1D numpy array containing the expected signal for the hypothesis under which we are training the current model. Shape: `(n_time_steps, )`. Returns: The trained model instance, or None, if the training failed with a :class:`numpy.linalg.LinAlgError`. """ # Threshold the expected signal to find the time steps that we must not use # for training (the threshold value of 0.2 is, of course, a bit arbitrary). # We have to use "<" because we want the mask to be True for all steps that # do NOT contain signal (= *can* be used for training). signal_mask = expected_signal < 0.2 # Check if the signal mask excludes more than a given fraction of the # training data, namely, 33% of the data (again, this threshold is rather # arbitrary). In this case, we cannot / should not train a model. if np.mean(signal_mask) < 0.33: return None # Instantiate a new model model = base_model_creator.get_model_instance() # Fit the model to the training data to which we have to apply the signal # mask in order to ignore all parts that contain too much planet signal try: model.fit( X=train_predictors[signal_mask], y=train_targets[signal_mask], ) return model except np.linalg.LinAlgError: # pragma: no cover return None