Source code for hsr4hci.observing_conditions

"""
Methods for dealing with observing conditions.
"""

# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------

from datetime import datetime, timezone
from typing import Dict, List, Tuple, Union

import io
import math

from scipy.interpolate import CubicSpline

import numpy as np
import pandas as pd
import requests

from hsr4hci.time_conversion import (
    timestamp_to_date_string,
    timestamp_to_datetime,
)


# -----------------------------------------------------------------------------
# CLASS DEFINITIONS
# -----------------------------------------------------------------------------

[docs]class ObservingConditions: """ This class provides a wrapper around different representations of the observing conditions (dictionary, numpy array and pandas data frame), and provides the option to select a subset of them. Args: observing_conditions: The observing conditions in the form of a dictionary where each key maps onto a 1D numpy array that contains, for example, the wind speed. """
[docs] def __init__(self, observing_conditions: Dict[str, np.ndarray]): # Ensure that no observing condition is name 'all' (this is a protected # keyword that is used to select all available observing conditions) if 'all' in observing_conditions.keys(): raise KeyError('Illegal name: "all"!') # Ensure that all observing conditions are numpy arrays if not all( isinstance(_, np.ndarray) for _ in observing_conditions.values() ): raise ValueError('All observing conditions must be numpy arrays!') # Ensure that all observing conditions have the same length n_time_steps = len(list(observing_conditions.values())[0]) if not all( len(_) == n_time_steps for _ in observing_conditions.values() ): raise ValueError('All arrays must have the same length!') self.observing_conditions = observing_conditions
def _verify_selected_keys( self, selected_keys: Union[List[str], str, None], ) -> bool: """ Check if the ``selected_keys`` constitute a valid subset of the available observing conditions. Return `True` if yes, otherwise raise a ``KeyError``. """ # Check if `selected_keys` contains a valid subset selection if (not selected_keys) or (selected_keys is None): return True if selected_keys == 'all': return True if isinstance(selected_keys, list) and all( _ in self.available_keys for _ in selected_keys ): return True # Otherwise, raise a KeyError raise KeyError( f'{selected_keys} is not a valid value for selected_keys!' ) @property def available_keys(self) -> List[str]: """ Return a sorted list of the available observing conditions. Returns: A sorted list of the available observing conditions. """ return sorted(list(self.observing_conditions.keys())) @property def n_frames(self) -> int: """ Get the number of frames from the observing conditions. Returns: An integer containing the number of frames. """ return len(next(iter(self.observing_conditions.values())))
[docs] def as_dict( self, selected_keys: Union[List[str], str, None] = 'all', ) -> Dict[str, np.ndarray]: """ Return the subset of observing conditions selected by ``selected_keys`` as a dictionary. Args: selected_keys: A valid specification of a subset of observing conditions. Either `None` (to select no OC at all), or a list of keys, or `"all"` (to select all available observing conditions). Returns: A dictionary containing the selected observing conditions. """ # Make sure are selecting a valid subset self._verify_selected_keys(selected_keys) # If we do not select any keys, return an empty dictionary if (not selected_keys) or (selected_keys is None): return {} # Resolve 'all' into a list of all available observing conditions if selected_keys == 'all': selected_keys = sorted(list(self.observing_conditions.keys())) # Otherwise, return the selected subset of self.observing_conditions return { k: v for k, v in self.observing_conditions.items() if k in selected_keys }
[docs] def as_array( self, selected_keys: Union[List[str], str, None] = 'all', ) -> np.ndarray: """ Return the subset of observing conditions selected by ``selected_keys`` as a numpy array. Args: selected_keys: A valid specification of a subset of observing conditions. Either `None` (to select no OC at all), or a list of keys, or `"all"` (to select all available observing conditions). Returns: A 2D numpy array of shape `(n_frames, n_obscon)` containing the selected observing conditions. """ # Make sure are selecting a valid subset self._verify_selected_keys(selected_keys) # Get the selected observing conditions as a dictionary observing_conditions = self.as_dict(selected_keys) # If no keys were selected (i.e., selected_keys was either None or an # empty list), we return an empty 2D array of shape `(n_frames, 0)`. # In this form, it can still be concatenated to the predictors of an # HSR model, so we do not need to take special care of it later. if not observing_conditions: return np.empty((self.n_frames, 0)) # Otherwise, we can convert the dictionary to a 2D numpy array return np.hstack( [_.reshape(-1, 1) for _ in observing_conditions.values()] )
[docs] def as_dataframe( self, selected_keys: Union[List[str], str, None] = 'all', ) -> pd.DataFrame: """ Return the subset of observing conditions selected by ``selected_keys`` as a pandas data frame. Args: selected_keys: A valid specification of a subset of observing conditions. Either `None` (to select no OC at all), or a list of keys, or `"all"` (to select all available observing conditions). Returns: A pandas data frame containing the selected observing conditions. """ # Make sure are selecting a valid subset self._verify_selected_keys(selected_keys) # Get the selected observing conditions as a dictionary observing_conditions = self.as_dict(selected_keys) # Convert the observing conditions to a data frame and return them return pd.DataFrame(observing_conditions)
# ----------------------------------------------------------------------------- # FUNCTION DEFINITIONS # -----------------------------------------------------------------------------
[docs]def query_archive( start_date: str, end_date: str, archive: str, parameter_key: str, ) -> pd.DataFrame: """ Send a request to one of ESO's ambient condition query forms [1] to retrieve the values of a particular observing condition. [1]: https://archive.eso.org/cms/eso-data/ambient-conditions/paranal-ambient-query-forms.html Args: start_date: The start datetime (in UTC) as a string in ISO 8061 format. Example: `"2012-12-20T20:00:00.0000"`. end_date: The end datetime (in UTC) as a string in ISO 8061 format. Example: `"2012-12-21T10:00:00.0000"`. archive: The name of the archive to which to send the query. Currently, the following archives are supported: * `"meteo"` * `"dimm_old"` * `"dimm_new"` * `"mass"` * `"lhatpro"` * `"lhatpro_irt"` See [1] for more information about these archives. parameter_key: The key under which a parameter is available from the respective archive. These keys can be reverse-engineered from the source code of the respective archive. For example, `"press"` will get you the air pressure, or more precisely, the "temporal (1 minute) mean of observatory site ambient barometric air pressure measured at ground during measurement period [hPa]." For the default parameters, :func:`resolve_parameter_name()` will resolve "intuitive" parameter names (such as, e.g., `"air_pressure"`) to the correct, archive-specific keys. Returns: A data frame with the datetime and timestamp, the integration time (in seconds) and the value of the requested parameter averaged over the integration time. """ # Define the payload for the query data = { 'wdbo': 'csv/download', 'max_rows_returned': 10_000_000, 'start_date': f'{start_date}..{end_date}', 'tab_integration': 'on', f'tab_{parameter_key}': 'on', 'order': 'start_date', } # Define names of columns that we expect to retrieve names = ['datetime', 'integration_time', parameter_key] # Define the URL for query based on the archive if archive == 'meteo': url = 'https://archive.eso.org/wdb/wdb/asm/meteo_paranal/query' elif archive == 'dimm_old': url = ( 'https://archive.eso.org/wdb/wdb/asm/' 'historical_ambient_paranal/query' ) elif archive == 'dimm_new': url = 'https://archive.eso.org/wdb/wdb/asm/dimm_paranal/query' elif archive == 'mass': url = 'https://archive.eso.org/wdb/wdb/asm/mass_paranal/query' elif archive == 'lhatpro': url = 'https://archive.eso.org/wdb/wdb/asm/lhatpro_paranal/query' names = ['platform'] + names elif archive == 'lhatpro_irt': url = 'https://archive.eso.org/wdb/wdb/asm/lhatpro_irt_paranal/query' names = ['platform'] + names else: raise ValueError(f'Invalid archive: "{archive}"!') # Send a POST request to the "Meteorology Query Form"; raise error # if the return code is 4XX or 5XX. response = requests.post(url=url, data=data) response.raise_for_status() # Parse response (which is a CSV) to a pandas data frame df = pd.read_csv( filepath_or_buffer=io.StringIO(response.content.decode('utf-8')), names=names, header=0, parse_dates=['datetime'], date_parser=pd.to_datetime, comment='#', ) # Add a column to the data frame for the timestamp df['timestamp'] = df.datetime.values.astype(np.int64) // 10 ** 9 df.sort_values(by='timestamp', inplace=True) return df
[docs]def interpolate_observing_conditions( timestamps: np.ndarray, df: pd.DataFrame, parameter_key: str, ) -> np.ndarray: """ Take the values of the observing conditions in the data frame ``df`` and interpolate them temporally so that we get values for the timestamp of each frame. The interpolation procedure is based on Cubic splines. See https://stats.stackexchange.com/a/511394 for the original idea. Args: timestamps: A 1D numpy array of floats, containing the UTC timestamps of the frames in the stack. df: A data frame containing the result from querying one of the ESO archives (e.g., :func:`query_meteo`). parameter_key: The key under which a parameter is available from the respective archive. (See also :func:`query_archive`). Returns: A 1D numpy array with shape `(n_frames, )` which contains an interpolated value of the target parameter for every frame. """ # Define shortcuts avg = df[parameter_key].values x = df['timestamp'].values dx = x[1] - x[0] # Remove NaNs, because they break the spline interpolation nan_idx = np.isnan(avg) avg = avg[~nan_idx] x = x[~nan_idx] # Compute y, which is essentially the cumulative sum of the parameter y = np.zeros(len(x)) for i in range(1, len(x)): y[i] = y[i - 1] + avg[i - 1] * (x[i] - x[i - 1]) # Set up an interpolation using splines. We use the first derivative # here, because the cumulative sum above is basically an integral. interpolator = CubicSpline(x, y).derivative(1) # Evaluate the interpolator at the time of each frame. The additional # "+ dx" seems necessary based on visual comparison of the original # and the interpolated time series? return np.asarray(interpolator(timestamps + dx))
[docs]def get_observing_conditions( parameter_name: str, timestamps: np.ndarray, ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: """ This is a convenience wrapper to query the ESO ambient condition archives for a given parameter, interpolate the results, and evaluate them at the requested ``timestamps``. Args: parameter_name: Name of the parameter to retrieve from the archive. This needs to be resolvable into a parameter key and archive by :func:`resolve_parameter_name()`. timestamps: A 1D numpy array of floats, containing the UTC timestamps of the frames in the stack. Returns: A 1D numpy array with shape `(n_frames, )` which contains an interpolated value of the target parameter for every frame. """ # Get the start and end date for the query (as strings). We need an # offset of the observation duration both before and after the first # and last frame for interpolation purposes (see below). start_timestamp = math.floor(min(timestamps) - 120) end_timestamp = math.ceil(max(timestamps) + 120) # Resolve the parameter name archive, key, _ = resolve_parameter_name( parameter_name=parameter_name, obs_date=timestamp_to_datetime(start_timestamp), ) # Query the archive to get data frame with parameter of interest df = query_archive( start_date=timestamp_to_date_string(start_timestamp), end_date=timestamp_to_date_string(end_timestamp), parameter_key=key, archive=archive, ) # Convert the data frame to a dictionary (for storing them in the HDF file) query_results = dict( timestamp=df['timestamp'].values, integration_time=df['integration_time'].values, parameter=df[key].values, ) # Interpolate the observing conditions for each frame interpolated = interpolate_observing_conditions(timestamps, df, key) return interpolated, query_results
[docs]def resolve_parameter_name( parameter_name: str, obs_date: datetime, ) -> Tuple[str, str, str]: """ Resolves a ``parameter_name`` into a dictionary that contains information about which archive the parameter can be obtained from, using which parameter_key. Args: parameter_name: Name of a parameter (e.g., `"air_pressure"`). obs_date: Date at which the data set was observed. Returns: A 3-tuple `(archive, parameter_key, description)` which tells us from which ESO ambient server we can retrieve the parameter and which key we have to use for the query. """ # ------------------------------------------------------------------------- # Preliminaries # ------------------------------------------------------------------------- # For non-meteorological parameters, the archive depends on the observation # date, as the Astronomical Site Monitoring (ASM) was updated in April 2016 if obs_date > datetime(2016, 4, 4, 12, 0, 0, 0, timezone.utc): archive_version = 'new' else: archive_version = 'old' # Initialize default values archive = '' parameter_key = '' description = '' # ------------------------------------------------------------------------- # Resolve the parameter name # ------------------------------------------------------------------------- # Air pressure if parameter_name == 'air_pressure': archive = 'meteo' parameter_key = 'press' description = ( 'Temporal (1 minute) mean of observatory site ambient barometric ' 'air pressure measured at ground during measurement period [hPa].' ) # Coherence time (tau_0) elif parameter_name == 'coherence_time': if archive_version == 'old': archive = 'dimm_old' parameter_key = 'tau' description = 'Coherence time [s].' if archive_version == 'new': archive = 'mass' parameter_key = 'tau' description = ( 'Coherence time (weights method) from MASS stand-alone [s].' ) # Isoplanatic angle (theta_0) elif parameter_name == 'isoplanatic_angle': if archive_version == 'old': archive = 'dimm_old' parameter_key = 'tet' description = 'Isoplanatic angle [arcsec].' if archive_version == 'new': archive = 'mass' parameter_key = 'tet' description = ( 'Isoplanatic angle from MASS-DIMM integrated profile [J1:J6] ' '[arcsec].' ) # Observatory temperature elif parameter_name == 'observatory_temperature': archive = 'meteo' parameter_key = 'temp2' description = ( 'Temporal (1 minute) mean of site ambient temperature measured ' 'at 2m [deg Celsius].' ) # Relative humidity elif parameter_name == 'relative_humidity': archive = 'meteo' parameter_key = 'rhum1' description = ( 'Temporal (1 minute) mean of observatory site ambient relative ' 'humidity measured at sensor position 30m above ground during ' 'measurement period [%].' ) # Seeing elif parameter_name == 'seeing': if archive_version == 'old': archive = 'dimm_old' parameter_key = 'fwhm' description = ( 'Reference observatory site seeing measured by the ASM-DIMM ' 'telescope, Full Width Half Maximum at 500nm [arcsec].' ) if archive_version == 'new': archive = 'dimm_new' parameter_key = 'fwhm' description = ( 'The total seeing calculated with DIMM telescope [arcsec]. ' 'The value is calculated using the following formula: ' 'FWHM = 2E(+7) Cn2**(0.6).' ) # Wind speed (U component) elif parameter_name == 'wind_speed_u': archive = 'meteo' parameter_key = 'wind_speedu' description = ( 'Temporal (1 minute) mean of observatory site ambient wind ' 'speed U vector component, where U is horizontal and points ' 'to 330 degree measured at sensor position 20m during ' 'measurement period [m/s].' ) # Wind speed (V component) elif parameter_name == 'wind_speed_v': archive = 'meteo' parameter_key = 'wind_speedv' description = ( 'Temporal (1 minute) mean of observatory site ambient wind ' 'speed V vector component, where V is horizontal and points ' 'to 240 degree measured at sensor position 20m during ' 'measurement period [m/s].' ) # Wind speed (W component) elif parameter_name == 'wind_speed_w': archive = 'meteo' parameter_key = 'wind_speedw' description = ( 'Temporal (1 minute) mean of observatory site ambient wind ' 'speed W vector component, where W is vertically pointing ' 'upwards, measured at sensor position 20m during measurement ' 'period [m/s]. ' ) # For all other parameter names, raise an error else: raise ValueError(f'Invalid parameter_name: "{parameter_name}"!') return archive, parameter_key, description