"""
Methods for dealing with residuals.
"""
# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------
from itertools import product
from math import fmod
from typing import Dict, List, Tuple, Union
from astropy.modeling import models, fitting
from photutils.centroids import centroid_com
from polarTransform import convertToPolarImage
from skimage.feature import blob_log, match_template
from skimage.filters import gaussian
import h5py
import numpy as np
from hsr4hci.coordinates import get_center
from hsr4hci.data import get_field_rotation
from hsr4hci.general import shift_image, crop_or_pad
from hsr4hci.masking import mask_frame_around_position
# -----------------------------------------------------------------------------
# FUNCTION DEFINITIONS
# -----------------------------------------------------------------------------
[docs]def assemble_residual_stack_from_hypotheses(
hypotheses: np.ndarray,
selection_mask: np.ndarray,
residuals: Union[h5py.File, Dict[str, np.ndarray]],
) -> np.ndarray:
"""
Assemble the residual stack based on the ``selection_mask`` and the
given ``hypotheses``:
For each spatial pixel where the ``selection_mask`` is `True`, use
the residual from the model given by the respective entry in
``hypotheses``; for all other pixels, use the "default" residual.
Args:
hypotheses: A 2D numpy array of shape `(x_size, y_size)`. Each
position contains an integers which represents the signal
time which appears to be the best hypothesis for this pixel
(i.e., "if there ever is a planet in this pixel, it should
be at this time").
selection_mask: A 2D numpy array of shape `(x_size, y_size)`
which contains a mask that determines for which pixels the
default residual is used and for which pixel the residual
based on signal fitting / masking is used.
residuals: The dictionary, or an open HDF file, which contains
the full results from training both the "default" (vanilla)
and the models based on signal fitting / masking.
Returns:
A 3D numpy array (whose shape matches the stack) containing the
"best" residuals based on the given the ``hypotheses``.
"""
# Initialize the result as the default residuals
result = np.array(residuals['default'])
_, x_size, y_size = result.shape
# Loop over all spatial positions and pick the signal masking-based
# residual based on the respective hypothesis for the pixel
for x, y in product(np.arange(x_size), np.arange(y_size)):
# If we do not have a hypothesis, or the selection mask did not select
# this pixel, do nothing (i.e., keep the "default" residual)
if np.isnan(hypotheses[x, y]) or (not selection_mask[x, y]):
continue
# Otherwise (i.e., if the selection mask is True), replace the default
# residual with the one that matches the hypothesis for this pixel
signal_time = str(int(hypotheses[x, y]))
result[:, x, y] = np.array(residuals[signal_time][:, x, y])
return result
def _get_expected_signal(
frame_size: Tuple[int, int],
field_rotation: float,
psf_template: np.ndarray,
grid_size: int,
relative_rho: float = 0.500,
relative_phi: float = 0.275,
) -> np.ndarray:
"""
Auxiliary function to compute the expected signal (or: expected
signature of a planet) in the polar match fraction map.
"""
# Define shortcuts
center = get_center(frame_size)
rho = min(center[0], center[1]) * relative_rho
phi = np.pi * (1 + relative_phi)
# Resize the PSF template
psf_resized = crop_or_pad(psf_template, frame_size)
psf_resized /= np.max(psf_resized)
psf_resized = np.asarray(np.clip(psf_resized, 0.2, None)) - 0.2
psf_resized /= np.max(psf_resized)
# Create the expected signal in Cartesian coordinates
expected_signal_cartesian = np.zeros(frame_size)
alpha = np.deg2rad(field_rotation / 2)
for offset in np.linspace(-alpha, alpha, 2 * int(field_rotation)):
x = rho * np.cos(phi + offset)
y = rho * np.sin(phi + offset)
shifted = shift_image(psf_resized, (x, y))
expected_signal_cartesian += shifted
# Convert the expected signal to polar coordinates
expected_signal_polar, _ = convertToPolarImage(
image=np.nan_to_num(expected_signal_cartesian),
radiusSize=grid_size,
angleSize=grid_size,
initialRadius=0,
finalRadius=min(center[0], center[1]),
)
expected_signal_polar = expected_signal_polar.T
expected_signal_polar /= np.max(expected_signal_polar)
# Make sure that the expected signal is properly centered
polar_center = get_center(expected_signal_polar.shape)
com = centroid_com(expected_signal_polar)
d_rho = polar_center[0] - com[0]
d_phi = polar_center[1] - com[1]
expected_signal_polar = shift_image(expected_signal_polar, (d_rho, d_phi))
return np.asarray(expected_signal_polar)
def _prune_blobs(blobs: List[Tuple[float, float, float]]) -> np.ndarray:
"""
Prune a list of blobs: if there are two blobs at (approximately) the
same separation, we only keep the brighter blob.
Note: This is an extremely simple version of "pruning", and it does
*not* cover all corner cases. However, it seems sufficient for most
cases that will cause problems in practice.
Args:
blobs: A list of 3-tuples describing the blobs we want to prune,
where each tuple has the form `(separation, polar angle,
brightness)`.
Returns:
A 2D numpy array containing only the positions (separation and
polar angle) of the pruned list of blobs.
"""
# Store the pruned blobs that we will return
pruned = []
# Loop over all blobs to check which of them we will keep
for i, blob_1 in enumerate(blobs):
# Unpack the reference blob; this is the blob for which we
# decide if we want to keep it or not
rho_1, phi_1, brightness_1 = blob_1
# Loop (again) over all blobs
for j, blob_2 in enumerate(blobs):
# We do not compare a blob with itself
if i == j:
continue
# Unpack the blob with which we will compare
rho_2, _, brightness_2 = blob_2
# If the blob that we are comparing with is radially close and
# brighter than the reference blob, we break the inner loop
if abs(rho_1 - rho_2) <= 8 and brightness_2 > brightness_1:
break
# Only if the inner for-loop terminated normally, that is, not via the
# break command, do we add the reference blob
else:
pruned.append((rho_1, phi_1))
return np.array(pruned)
def _refit_blob(
frame: np.ndarray,
position: Tuple[float, float],
) -> Tuple[float, float, float]:
"""
Auxiliary function to fit a blob at the given `position` to refine
its position and get its brightness / amplitude for pruning.
"""
# Define the grid for the fit
x = np.arange(frame.shape[0])
y = np.arange(frame.shape[1])
x, y = np.meshgrid(x, y)
# Create a new Gaussian2D object
gaussian_model = models.Gaussian2D(x_mean=position[0], y_mean=position[1])
# Define "search area" by setting minimum and maximum values for the mean
gaussian_model.x_mean.min = position[0] - 2
gaussian_model.x_mean.max = position[0] + 2
gaussian_model.y_mean.min = position[1] - 2
gaussian_model.y_mean.max = position[1] + 2
# Mask the frame (set everything to zero that is too far from position)
masked_frame = mask_frame_around_position(
frame=np.nan_to_num(frame),
position=position,
radius=8,
)
# Fit the model to the data
fit_p = fitting.LevMarLSQFitter()
gaussian_model = fit_p(gaussian_model, x, y, masked_frame)
# Get the final position and amplitude of the Gaussian after the fit
rho = float(gaussian_model.x_mean.value)
phi = float(gaussian_model.y_mean.value)
amplitude = float(gaussian_model.amplitude.value)
return rho, phi, amplitude
[docs]def get_gradient_mask(
grid_size: int,
frame_size: Tuple[int, int],
zero_radius: int = 4,
) -> np.ndarray:
"""
Compute a gradient mask to re-weight the match fraction radially.
Rationale: The number of "affected pixels" that we use to compute
a match fraction scales linearly with the separation from the star.
For pixels very close to the center, only very few pixels contribute
while for pixels at large separations, the MF is computed as the
average of many pixels. To reduce both the number of false positives
at small separations and false negative at large separations, it is
useful to apply a "gradient mask" to the match fraction to re-weigh
the MF values based on the separation from the center.
Args:
grid_size: Frame size of the polar representation.
frame_size: Frame size of the Cartesian representation.
zero_radius: Radius (in pixels) around the center where the
mask is set to zero; typically 1 FWHM. Even if there is a
planet this close to the star, we should not be able to
detect it.
Returns:
A gradient mask that can be used to re-weigh the polar MF map.
"""
# Re-scale the radius between the Cartesian and the polar coordinates
radius = int(2 * zero_radius / min(frame_size) * grid_size)
# Construct the gradient mask
gradient = np.concatenate(
(np.full(radius, 1e-8), np.linspace(1e-8, 1, grid_size - radius))
)
gradient = np.tile(gradient.reshape(-1, 1), reps=(1, grid_size))
return np.asarray(gradient)
[docs]def get_residual_selection_mask(
match_fraction: np.ndarray,
parang: np.ndarray,
psf_template: np.ndarray,
grid_size: int = 128,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
r"""
Based on the ``match_fraction``, determine the ``selection_mask``,
that is, the mask that decides for which pixels we use the default
model and for which we use the model based on signal fitting or
signal masking.
Ideally, it would be sufficient to simply threshold the match
fraction to obtain the selection mask. In practice, however, this
does not always work well.
Therefore, this function uses the following, more complex heuristic:
1. Convert the match fraction from Cartesian to polar
coordinates $(\rho, \phi)$.
2. In polar coordinates, the planet signal is translation
invariant; and even more importantly, we know exactly how
it should look like. We can, therefore, compute a cross-
correlation with the expected signal.
3. In the result of this template matching, we can find peaks,
which correspond to the $(\rho, \phi)$ position of the planet
signals in the match fraction.
4. Based on these peaks, we explicitly construct the selection
mask. These masks are more interpretable and will not contain
any "random" pixels or shapes, unlike the masks that can be
obtained using the simple thresholding approach.
To further improve the result, some additional tricks (e.g., re-
weighting of match fraction using a radial gradient) are used which
have proven useful in preliminary experiments.
Args:
match_fraction: 2D numpy array containing the match fraction.
parang: 1D numpy array containing the parallactic angles.
psf_template: 2D numpy array containing the PSF template.
grid_size: The size (width and height) of the match fraction
when projecting to polar coordinates. This value should
usually be chosen to be larger than the size of the MF in
Cartesian coordinates (i.e, the usual frame size). Larger
values can give better results, but slow things down.
Returns:
A 5-tuple of numpy array, consisting of
1. The final ``selection_mask``, that is, a 2D numpy array
containing a binary mask that can be used to select the
pixels for which the residuals based on signal fitting or
signal masking should be used.
2. The ``polar`` projection, that is, a 2D numpy array of shape
`(grid_size, grid_size)` containing the polar projection
of the match fraction (mostly for debugging purposes).
3. The output of the template matching ("heatmap"), which has
the same shape as (2). This is the cross-correlation of
the polar projection with the expected signal.
4. The template / expected signal, that is, a 2D numpy array of
shape `(grid_size, grid_size)` that contains the approximate
expected planet signal that we would hope to find in the
polar projections.
5. The positions of the (centers) of the planet trace arcs,
as found by the cross-correlation procedure, that is, a 2D
numpy array of shape `(N, 2)` where `N` is the number of
found planet traces. Each tuple consists of the separation
and the polar angle (in radian).
"""
# -------------------------------------------------------------------------
# Preparations
# -------------------------------------------------------------------------
# Prepare the Cartesian match fraction: Make sure all entries are valid
# numbers with values between 0 and 1.
cartesian = np.copy(match_fraction)
cartesian = np.nan_to_num(cartesian)
cartesian = np.clip(cartesian, a_min=0, a_max=1)
# Define shortcuts
x_size, y_size = cartesian.shape
frame_size = (x_size, y_size)
center = get_center((cartesian.shape[0], cartesian.shape[1]))
# Compute field rotation
field_rotation = get_field_rotation(parang=parang)
# -------------------------------------------------------------------------
# Prepare the template of the expected signal for the cross-correlation
# -------------------------------------------------------------------------
# Compute the signal that we expect to see in polar coordinates
expected_signal = _get_expected_signal(
frame_size=frame_size,
field_rotation=field_rotation,
psf_template=psf_template,
grid_size=grid_size,
)
# Add a small negative "glow" around the expected signal. This is needed
# because otherwise, the template matching (which is really just a cross-
# correlation) does not pay enough attention to the signal *shape*.
# Note: The exact parameters here may require some additional fine-tuning.
negative = np.clip(gaussian(expected_signal, sigma=5), 0, 0.05)
expected_signal -= negative
# -------------------------------------------------------------------------
# Project to polar coordinates
# -------------------------------------------------------------------------
# Project the match fraction from Cartesian to polar coordinates
polar, _ = convertToPolarImage(
image=np.nan_to_num(cartesian),
radiusSize=grid_size,
angleSize=grid_size,
initialRadius=0,
finalRadius=min(center[0], center[1]),
initialAngle=-np.pi,
finalAngle=np.pi,
)
polar = polar.T
# Compute and apply gradient mask to re-weight the match fraction based on
# the separation: The number of "affected pixels" that are used to compute
# the match fraction scales linearly with the separation meaning that high
# values at large separations are less likely to be just a fluke.
gradient_mask = get_gradient_mask(
grid_size=grid_size, frame_size=frame_size, zero_radius=4
)
polar *= gradient_mask
# -------------------------------------------------------------------------
# Cross-correlate with expected signal
# -------------------------------------------------------------------------
# Cross-correlate the polar match fraction with the expected signal
matched = match_template(
image=polar,
template=expected_signal,
pad_input=True,
mode='wrap',
)
# noinspection PyTypeChecker
matched = np.array(matched)
# -------------------------------------------------------------------------
# Run blob finder, and prune blobs
# -------------------------------------------------------------------------
# Loop over two offset for the global phase: The choice of the phase for
# the polar representation is arbitrary, and if we are unlucky, we might
# have a planet signal that is "split into two", that, half the signal is
# on the left-hand edge of the polar representation, and the other half is
# on the right-hand edge. Therefore, we run with two different phases.
blobs: List[Tuple[float, float, float]] = []
for global_phase_offset in (0, np.pi):
# ---------------------------------------------------------------------
# Run blob finder and construct signal mask
# ---------------------------------------------------------------------
# Apply the global phase shift to the outputs of the template matching
# and pad it with zeros. The reason for the latter is that the blob
# finder below sometimes gives wrong results if a blob is too close to
# the border of the image, and padding with zeros seems like a simple
# solution for this problem.
ratio = global_phase_offset / (2 * np.pi)
phase_shifted_matched = np.roll(
matched, shift=int(grid_size * ratio), axis=1
)
padded_phase_shifted_matched = crop_or_pad(
phase_shifted_matched, (3 * grid_size, 3 * grid_size)
)
# Apply the blob finding algorithm to the matched filter result
tmp_blobs = blob_log(
image=padded_phase_shifted_matched,
min_sigma=0.1,
max_sigma=grid_size / 4,
overlap=0.0,
)
# Slightly post-process the blobs that we have found:
# (1) Drop all blobs that are too close to the image border (they will
# be found by the other `global_phase_offset` value).
# (2) Replace the last entry (by default the radius of the blob) by the
# brightness of the blob. (Needed for pruning blobs.)
# (3) Convert the first two entries (the coordinates of the blob in the
# polar match fraction map) to values for the radius and separation
# in the original (Cartesian) match fraction map.
for blob in tmp_blobs:
# Unpack blob coordinates
rho, phi, _ = blob
# Undo the zero-padding that we needed for the blob finder
rho -= grid_size
phi -= grid_size
# Ignore blobs that are too close to the border (these will be
# caught by the respective other `global_phase_offset`)
if phi < grid_size / 4 or phi > 3 * grid_size / 4:
continue
# Re-fit the exact blob position after un-doing the gradient mask
# and measure the brightness / amplitude for pruning
phi, rho, brightness = _refit_blob(
frame=phase_shifted_matched / gradient_mask,
position=(phi, rho),
)
# Adjust for coordinate system conventions / scaling
rho *= min(center[0], center[1]) / grid_size
phi = (2 * np.pi * phi / grid_size) + global_phase_offset + np.pi
phi = fmod(phi, 2 * np.pi)
# Store the final position and brightness of the blob
blobs.append((rho, phi, brightness))
# Prune the list of blobs to get the positions: If there are two planet
# candidates at the same separation, we should only keep the brighter one.
# This step is, in principle, optional. However, it helps to reduce false
# positives because we *know* that it is unlikely to observe two planets
# at the same separation.
positions = _prune_blobs(blobs)
# -------------------------------------------------------------------------
# Finally, create the selection mask
# -------------------------------------------------------------------------
# Resize and normalize the PSF template
psf_resized = crop_or_pad(psf_template, frame_size)
psf_resized /= np.max(psf_resized)
psf_resized = np.asarray(np.clip(psf_resized, 0.2, None)) - 0.2
psf_resized /= np.max(psf_resized)
# Initialize the selection mask
selection_mask = np.zeros(frame_size)
# Loop over blobs and create an arc at the position defined by (rho, phi):
# place many shifted copies of the PSF template at the positions of the arc
for rho, phi in positions:
alpha = np.deg2rad(field_rotation / 2)
for offset in np.linspace(-alpha, alpha, 2 * int(field_rotation)):
x = rho * np.cos(phi + offset)
y = rho * np.sin(phi + offset)
shifted = shift_image(psf_resized, (x, y))
selection_mask += shifted
# Finally, multiply the selection mask with the (normalized) original match
# fraction to drop any "bad pixels", and threshold to binarize it to a mask
selection_mask *= np.nan_to_num(match_fraction) / np.nanmax(match_fraction)
selection_mask = np.asarray(selection_mask > 0.1)
return selection_mask, polar, matched, expected_signal, positions