import os
import sys
import copy
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm
from scipy import sparse
from scipy import interpolate
from scipy.signal import convolve2d
import time
from spectractor import parameters
from spectractor.config import set_logger, load_config
from spectractor.extractor.images import Image, find_target, turn_image
from spectractor.extractor.spectrum import Spectrum, calibrate_spectrum
from spectractor.extractor.background import extract_spectrogram_background_sextractor
from spectractor.extractor.chromaticpsf import ChromaticPSF
from spectractor.extractor.psf import load_PSF
from spectractor.tools import (ensure_dir, plot_image_simple, from_lambda_to_colormap, plot_spectrum_simple,
mask_cosmics, cholesky_solve)
from spectractor.fit.fitter import (run_minimisation, run_minimisation_sigma_clipping, write_fitparameter_json,
RegFitWorkspace, FitWorkspace, FitParameters)
try:
import sparse_dot_mkl
except ModuleNotFoundError:
sparse_dot_mkl = None
[docs]
def dumpParameters():
for item in dir(parameters):
if not item.startswith("__"):
print(item, getattr(parameters, item))
[docs]
class FullForwardModelFitWorkspace(FitWorkspace):
def __init__(self, spectrum, amplitude_priors_method="noprior", verbose=False, plot=False, live_fit=False, truth=None):
"""Class to fit a full forward model on data to extract a spectrum, with ADR prediction and order 2 subtraction.
Parameters
----------
spectrum: Spectrum
amplitude_priors_method
verbose
plot
live_fit
truth
Examples
--------
>>> spec = Spectrum("./tests/data/sim_20170530_134_spectrum.fits")
>>> w = FullForwardModelFitWorkspace(spectrum=spec, amplitude_priors_method="spectrum")
"""
self.my_logger = set_logger(self.__class__.__name__)
# prepare parameters to fit
length = len(spectrum.chromatic_psf.table)
self.diffraction_orders = np.arange(spectrum.order, spectrum.order + 3 * np.sign(spectrum.order), np.sign(spectrum.order))
if len(self.diffraction_orders) == 0:
raise ValueError(f"At least one diffraction order must be given for spectrogram simulation.")
self.psf_poly_params = np.copy(spectrum.chromatic_psf.from_table_to_poly_params())
self.psf_poly_params = self.psf_poly_params[length:]
psf_poly_params_labels = np.copy(spectrum.chromatic_psf.params.labels[length:])
psf_poly_params_names = np.copy(spectrum.chromatic_psf.params.axis_names[length:])
spectrum.chromatic_psf.psf.apply_max_width_to_bounds(max_half_width=spectrum.spectrogram_Ny)
psf_poly_params_bounds = spectrum.chromatic_psf.set_bounds()
D2CCD = np.copy(spectrum.header['D2CCD'])
p = np.array([1, 1, 0, D2CCD, np.copy(spectrum.header['PIXSHIFT']), 0,
np.copy(spectrum.rotation_angle), 1, 1, parameters.OBS_CAMERA_ROTATION,
np.copy(spectrum.pressure), np.copy(spectrum.temperature), np.copy(spectrum.airmass)])
self.psf_params_start_index = np.array([p.size + len(self.psf_poly_params) * k for k in range(len(self.diffraction_orders))])
self.saturation = spectrum.spectrogram_saturation
p = np.concatenate([p] + [self.psf_poly_params] * len(self.diffraction_orders))
# for order in self.diffraction_orders:
# p = np.concatenate([p] + [self.psf_poly_params * order])
input_labels = [f"A{order}" for order in self.diffraction_orders]
input_labels += [r"D_CCD [mm]", r"shift_x [pix]", r"shift_y [pix]", r"angle [deg]", "B", "A_star", "R", "P [hPa]", "T [Celsius]", "z"]
for order in self.diffraction_orders:
input_labels += [label+f"_{order}" for label in psf_poly_params_labels]
axis_names = [f"$A_{order}$" for order in self.diffraction_orders]
axis_names += [r"$D_{CCD}$ [mm]", r"$\delta_{\mathrm{x}}^{(\mathrm{fit})}$ [pix]",
r"$\delta_{\mathrm{y}}^{(\mathrm{fit})}$ [pix]", r"$\alpha$ [deg]", "$B$", r"$A_{star}$", "R",
r"$P_{\mathrm{atm}}$ [hPa]", r"$T_{\mathrm{atm}}$ [Celcius]", "$z$"]
for order in self.diffraction_orders:
axis_names += [label+rf"$\!_{order}$" for label in psf_poly_params_names]
bounds = [[0, 2], [0, 2], [0, 2],
[D2CCD - 3 * parameters.DISTANCE2CCD_ERR, D2CCD + 3 * parameters.DISTANCE2CCD_ERR],
[-parameters.PIXSHIFT_PRIOR, parameters.PIXSHIFT_PRIOR],
[-10 * parameters.PIXSHIFT_PRIOR, 10 * parameters.PIXSHIFT_PRIOR],
[-90, 90], [0.2, 5], [0.5, 2], [-360, 360], [0, np.inf], [-100, 100], [1.001, 3]]
bounds += list(psf_poly_params_bounds) * len(self.diffraction_orders)
fixed = [False] * p.size
for k, par in enumerate(input_labels):
if "x_c" in par or "saturation" in par: # or "y_c" in par:
fixed[k] = True
for k, par in enumerate(input_labels):
if "y_c" in par:
fixed[k] = True
p[k] = 0
for k, par in enumerate(input_labels):
#if "y_c" in par and (("y_c_0" not in par and "y_c_1" not in par)) and par[-2:] == f"_{spectrum.order}": # or (par[-2:] == "_2" or par[-2:] == "_3")):
# fixed[k] = False
# p[k] = 0
#if k >= self.psf_params_start_index[0] and "y_c" in par and par[-2:] != f"_{spectrum.order}": # and (("y_c_0" in par or "y_c_1" in par)): # or (par[-2:] == "_2" or par[-2:] == "_3")):
# fixed[k] = False
# p[k] = 0
if k >= self.psf_params_start_index[0] and "y_c" not in par and "x_c" not in par and par[-2:] != f"_{spectrum.order}" and "_0_" not in par:
fixed[k] = True
p[k] = 0
if k >= self.psf_params_start_index[0] and "eta" in par and par[-2:] != f"_{spectrum.order}":
fixed[k] = True
p[k] = 0
params = FitParameters(p, labels=input_labels, axis_names=axis_names, fixed=fixed, bounds=bounds,
truth=truth, filename=spectrum.filename)
# This set of fixed parameters was determined so that the reconstructed spectrum has a ZERO bias
# with respect to the true spectrum injected in the simulation
# A2 is free only if spectrogram is a simulation or if the order 2/1 ratio is not known and flat
for order in self.diffraction_orders:
params.fixed[params.get_index(f"A{order}")] = True
if "A2" in params.labels:
params.fixed[params.get_index("A2")] = (not spectrum.disperser.flat_ratio_order_2over1) and (not ("A2_T" in spectrum.header))
if spectrum.spectrogram_starfield is None:
params.fixed[params.get_index("A_star")] = True # Astar
params.fixed[params.get_index("D_CCD [mm]")] = True # D2CCD: spectrogram can not tell something on this parameter: rely on calibrate_spectrum
params.fixed[params.get_index("shift_x [pix]")] = True # delta x: if False, extracted spectrum is biased compared with truth
params.fixed[params.get_index("shift_y [pix]")] = False # delta y
params.fixed[params.get_index("angle [deg]")] = False # angle
params.fixed[params.get_index("B")] = True # B: not needed in simulations, to check with data
params.fixed[params.get_index("R")] = True # camera rot
params.fixed[params.get_index("P [hPa]")] = False # pressure
params.fixed[params.get_index("T [Celsius]")] = True # temperature
params.fixed[params.get_index("z")] = True # airmass
# crop data to fit faster
self.spectrum = spectrum
self.lambdas = self.spectrum.lambdas
self.bgd_width = parameters.PIXWIDTH_BACKGROUND + parameters.PIXDIST_BACKGROUND - parameters.PIXWIDTH_SIGNAL
if spectrum.spectrogram_data.shape[0] < 2 * self.bgd_width:
# Data has been already cropped
self.bgd_width = 0
rows = np.arange(self.bgd_width, spectrum.spectrogram_data.shape[0]-self.bgd_width)
self.Ny, self.Nx = spectrum.spectrogram_data[rows, :].shape
yy, xx = np.mgrid[:self.Ny, :self.Nx]
self.pixels = np.asarray([xx, yy], dtype=int)
FitWorkspace.__init__(self, params, epsilon=1e-4,
data=spectrum.spectrogram_data[rows, :].flatten(),
err=spectrum.spectrogram_err[rows, :].flatten(),
file_name=spectrum.filename, verbose=verbose, plot=plot, live_fit=live_fit, truth=truth)
self.bgd = spectrum.spectrogram_bgd[rows, :].flatten()
if spectrum.spectrogram_flat is not None:
self.flat = spectrum.spectrogram_flat[rows, :].flatten()
self.bgd *= self.flat
else:
self.flat = None
if spectrum.spectrogram_starfield is not None:
self.starfield = spectrum.spectrogram_starfield[rows, :].flatten()
if self.flat is not None:
self.starfield *= self.flat
else:
self.starfield = None
if spectrum.spectrogram_mask is not None:
self.mask = list(np.where(spectrum.spectrogram_mask[rows, :].astype(bool).ravel())[0])
else:
self.mask = []
# adapt the ChromaticPSF table shape
if self.Nx != self.spectrum.chromatic_psf.Nx:
self.spectrum.chromatic_psf.resize_table(new_Nx=self.Nx)
# load the disperser relative transmissions
self.tr_ratio = interpolate.interp1d(parameters.LAMBDAS, np.ones_like(parameters.LAMBDAS), bounds_error=False, fill_value=1.)
if abs(self.spectrum.order) == 1:
self.tr_ratio_next_order = self.spectrum.disperser.ratio_order_2over1
self.tr_ratio_next_next_order = self.spectrum.disperser.ratio_order_3over1
elif abs(self.spectrum.order) == 2:
self.tr_ratio_next_order = self.spectrum.disperser.ratio_order_3over2
self.tr_ratio_next_next_order = None
elif abs(self.spectrum.order) == 3:
self.tr_ratio_next_order = None
self.tr_ratio_next_next_order = None
else:
raise ValueError(f"{abs(self.spectrum.order)=}: must be 1, 2 or 3. "
f"Higher diffraction orders not implemented yet in full forward model.")
self.tr = [self.tr_ratio, self.tr_ratio_next_order, self.tr_ratio_next_next_order]
# PSF cube computation
self.psf_cubes_masked = {}
self.psf_profile_params = {}
self.boundaries = {}
for order in self.diffraction_orders:
self.psf_cubes_masked[order] = np.empty(1)
self.boundaries[order] = {}
self.psf_profile_params[order] = None
self.fix_psf_cube = False
# error matrix
# here image uncertainties are assumed to be uncorrelated
# (which is not exactly true in rotated images)
self.W = 1. / (self.err * self.err)
self.data_before_mask = np.copy(self.data)
self.mask_before_mask = list(np.copy(self.mask))
self.W_before_mask = np.copy(self.W)
self.sqrtW = sparse.diags(np.sqrt(self.W), format="dia", dtype="float32")
# design matrix
self.M = None
self.psf_cube_sparse_indices = {}
self.M_sparse_indices = {}
for order in self.diffraction_orders:
self.M_sparse_indices[order] = None
self.psf_cube_sparse_indices[order] = None
self.set_mask(fwhmx_clip=3*parameters.PSF_FWHM_CLIP, fwhmy_clip=2*parameters.PSF_FWHM_CLIP) # not a narrow mask for first fit
self.M_dot_W_dot_M = None
# prepare results
self.amplitude_params = np.zeros(self.Nx)
self.amplitude_params_err = np.zeros(self.Nx)
self.amplitude_cov_matrix = np.zeros((self.Nx, self.Nx))
# priors on amplitude parameters
self.amplitude_priors_list = ['noprior', 'positive', 'smooth', 'spectrum', 'fixed', 'keep']
self.amplitude_priors_method = amplitude_priors_method
self.fwhm_priors = np.copy(spectrum.chromatic_psf.table['fwhm'])
self.reg = spectrum.chromatic_psf.opt_reg
if 'PSF_REG' in spectrum.header and float(spectrum.header["PSF_REG"]) > 0:
self.reg = float(spectrum.header['PSF_REG'])
if self.reg < 0:
self.reg = parameters.PSF_FIT_REG_PARAM
self.trace_r = self.Nx / np.median(self.fwhm_priors) # spectrophotometric uncertainty principle
self.my_logger.info(f"\n\tFull forward model fitting with regularisation parameter r={self.reg}.")
self.Q = np.zeros((self.Nx, self.Nx), dtype="float32")
self.Q_dot_A0 = np.zeros(self.Nx, dtype="float32")
if amplitude_priors_method not in self.amplitude_priors_list:
raise ValueError(f"Unknown prior method for the amplitude fitting: {self.amplitude_priors_method}. "
f"Must be either {self.amplitude_priors_list}.")
self.spectrum.convert_from_flam_to_ADUrate()
self.amplitude_priors = np.copy(self.spectrum.data)
if self.amplitude_priors_method == "spectrum":
self.amplitude_priors_cov_matrix = np.copy(self.spectrum.cov_matrix)
else:
self.amplitude_priors_cov_matrix = np.diag(self.spectrum.err**2)
if self.spectrum.data.size != self.Nx: # must rebin the priors
old_x = np.linspace(0, 1, self.spectrum.data.size)
new_x = np.linspace(0, 1, self.Nx)
self.spectrum.lambdas = np.interp(new_x, old_x, self.spectrum.lambdas)
self.amplitude_priors = np.interp(new_x, old_x, self.amplitude_priors)
if self.amplitude_priors_method == "spectrum":
# rebin prior cov matrix with monte-carlo
niter = 10000
samples = np.random.multivariate_normal(np.zeros_like(old_x), cov=self.amplitude_priors_cov_matrix, size=niter)
new_samples = np.zeros((niter, new_x.size))
for i in range(niter):
new_samples[i] = np.interp(new_x, old_x, samples[i])
self.amplitude_priors_cov_matrix = np.cov(new_samples.T)
# regularisation matrices
if amplitude_priors_method == "spectrum":
# U = np.diag([1 / np.sqrt(np.sum(self.err[:, x]**2)) for x in range(self.Nx)])
# self.U = np.diag([1 / np.sqrt(self.amplitude_priors_cov_matrix[x, x]) for x in range(self.Nx)])
self.UTU = np.linalg.inv(self.amplitude_priors_cov_matrix)
L = np.diag(-2 * np.ones(self.Nx)) + np.diag(np.ones(self.Nx), -1)[:-1, :-1] \
+ np.diag(np.ones(self.Nx), 1)[:-1, :-1]
L.astype(float)
L[0, 0] = -1
L[-1, -1] = -1
self.L = L
self.Q = (L.T @ self.UTU @ L).astype("float32") # Q is dense do not sparsify it (leads to errors otherwise)
self.Q_dot_A0 = self.Q @ self.amplitude_priors.astype("float32")
[docs]
def set_mask(self, params=None, fwhmx_clip=3*parameters.PSF_FWHM_CLIP, fwhmy_clip=parameters.PSF_FWHM_CLIP):
"""
Parameters
----------
params
fwhmx_clip
fwhmy_clip
Returns
-------
Examples
--------
>>> spec = Spectrum("./tests/data/reduc_20170530_134_spectrum.fits")
>>> w = FullForwardModelFitWorkspace(spectrum=spec, amplitude_priors_method="fixed", verbose=True)
>>> _ = w.simulate(*w.params.values)
>>> w.plot_fit()
"""
self.my_logger.info("\n\tReset spectrogram mask with current parameters.")
if params is None:
params = self.params.values
A1, A2, A3, D2CCD, dx0, dy0, angle, B, Astar, rot, pressure, temperature, airmass, *psf_poly_params_all = params
poly_params = np.array(psf_poly_params_all).reshape((len(self.diffraction_orders), -1))
lambdas = self.spectrum.compute_lambdas_in_spectrogram(D2CCD, dx0, dy0, angle, niter=5, with_adr=True,
order=self.spectrum.order)
self.psf_cubes_masked = {}
self.M_sparse_indices = {}
self.psf_cube_sparse_indices = {}
for k, order in enumerate(self.diffraction_orders):
profile_params = self.spectrum.chromatic_psf.from_poly_params_to_profile_params(poly_params[k],
apply_bounds=True)
if order == self.diffraction_orders[0]: # only first diffraction order
self.spectrum.chromatic_psf.from_profile_params_to_shape_params(profile_params)
dispersion_law = self.spectrum.compute_dispersion_in_spectrogram(lambdas, D2CCD, dx0, dy0, angle,
with_adr=True,
order=order)
profile_params[:, 0] = 1
profile_params[:, 1] = dispersion_law.real + self.spectrum.spectrogram_x0
profile_params[:, 2] += dispersion_law.imag - self.bgd_width
psf_cube_masked = self.spectrum.chromatic_psf.build_psf_cube_masked(self.pixels, profile_params,
fwhmx_clip=fwhmx_clip,
fwhmy_clip=fwhmy_clip)
self.psf_cubes_masked[order] = self.spectrum.chromatic_psf.convolve_psf_cube_masked(psf_cube_masked)
# make rectangular mask per wavelength
self.boundaries[order], self.psf_cubes_masked[order] = self.spectrum.chromatic_psf.set_rectangular_boundaries(self.psf_cubes_masked[order])
if k > 0:
# spectrogram model must be accurate inside the k=0 order footprint: enlarge the next order footprints
self.boundaries[order]["ymin"] = np.zeros_like(self.boundaries[order]["ymin"])
self.boundaries[order]["ymax"] = self.Ny * np.ones_like(self.boundaries[order]["ymax"])
self.psf_cube_sparse_indices[order], self.M_sparse_indices[order] = self.spectrum.chromatic_psf.get_sparse_indices(self.boundaries[order])
# mask = np.sum(self.psf_cubes_masked[self.diffraction_orders[0]].reshape(psf_cube_masked.shape[0], psf_cube_masked[0].size), axis=0) == 0
# cumulate the boolean values as int
weight_mask = np.sum(self.psf_cubes_masked[self.diffraction_orders[0]], axis=0)
# look for indices with maximum weight per column (all sheets of the psf cube have contributed)
res = np.max(weight_mask, axis=0)[np.newaxis,:] * np.ones((weight_mask.shape[0],1))
# keep only the pixels where all psf_cube sheets have contributed per column
mask = (weight_mask != res).ravel()
self.mask = list(self.mask_before_mask) + list(np.where(mask)[0])
self.mask = list(set(self.mask))
self.W = np.copy(self.W_before_mask)
self.W[self.mask] = 0
self.sqrtW = sparse.diags(np.sqrt(self.W), format="dia", dtype="float32")
[docs]
def simulate(self, *params):
r"""
Compute a ChromaticPSF2D model given PSF shape parameters and minimizing
amplitude parameters using a spectrogram data array.
The full forward model of the spectrogram image :math:`\vec{I}(\vec{x},\vec{p})` can be written as
.. math ::
:label: ffm
\vec{I}(\vec{x},\vec{p}) = \sum_{i=0}^{N_x} A_i F_i \phi\left(\vec{x},\vec{p}_i\right)
+ \bar F B b\left(\vec{x}\right) + \bar F S s\left(\vec{x}\right)
with
- :math:`\vec{x}` the 2D array of the pixel coordinates,
- :math:`\vec{A}` the amplitude parameter array along the x axis of the spectrogram,
- :math:`\phi\left(\vec{x},\vec{p}_i\right)` the 2D PSF kernel whose integral is normalised to one parametrized
with the :math:`\vec{p}_i` non-linear parameter array,
- math:`B b\left(\vec{x}\right)` the background function weighted by a scalar math:`B`,
- math:`S s\left(\vec{x}\right)` the star field function weighted by a scalar math:`S`,
- :math:`F_i` a flat cube (wavelengths indexed by :math:`i`) and \bar F the mean flat.
If the :math:`\vec{x}` 2D array is flatten in 1D, equation :eq:`ffm` is
.. math ::
:label: chromaticpsf2d_matrix
:nowrap:
\begin{align}
\vec{m}(\vec{x},\vec{p}) & = \mathbf{M}\left(\vec{x},\vec{p}\right) \mathbf{A} \\
\mathbf{M}\left(\vec{x},\vec{p}\right) & = \left(\begin{array}{cccc}
F_1\phi\left(\vec{x}_1,\vec{p}_1\right) & F_2\phi\left(\vec{x}_2,\vec{p}_1\right) & ...
& F_{N_x}\phi\left(\vec{x}_{N_x},\vec{p}_1\right) \\
... & ... & ... & ...\\
F_1\phi\left(\vec{x}_1,\vec{p}_{N_x}\right) & F_2\phi\left(\vec{x}_2,\vec{p}_{N_x}\right) & ...
& F_{N_x}\phi\left(\vec{x}_{N_x},\vec{p}_{N_x}\right) \\
\end{array}\right)
\end{align}
with :math:`\mathbf{M}` the design matrix.
The goal of this function is to perform a minimisation of the amplitude vector :math:`\mathbf{A}` given
a set of non-linear parameters :math:`\mathbf{p}` and a spectrogram data array :math:`\mathbf{D}` modelised as
.. math:: \mathbf{D} = \mathbf{m}(\vec{x},\vec{p}) + \bar F B b\left(\vec{x}\right) + \bar F S s\left(\vec{x}\right) + \vec{\epsilon}
with :math:`\vec{\epsilon}` a random noise vector. The :math:`\chi^2` function to minimise is
.. math::
:label: ffm_chi2
\chi^2(\mathbf{A})= \left(\mathbf{D} - \bar F B b\left(\vec{x}\right) - \bar F S s\left(\vec{x}\right) - \mathbf{M}\left(\vec{x},\vec{p}\right) \mathbf{A}\right)^T \mathbf{W}
\left(\mathbf{D} - \bar F B b\left(\vec{x}\right) - \bar F S s\left(\vec{x}\right) -\mathbf{M}\left(\vec{x},\vec{p}\right) \mathbf{A} \right)
with :math:`\mathbf{W}` the weight matrix, inverse of the covariance matrix. In our case this matrix is diagonal
as the pixels are considered all independent. The minimum of equation :eq:`ffm_chi2` is reached for
a set of amplitude parameters :math:`\hat{\mathbf{A}}` given by
.. math::
\hat{\mathbf{A}} = (\mathbf{M}^T \mathbf{W} \mathbf{M})^{-1} \mathbf{M}^T \mathbf{W}\left( \mathbf{D} - \bar F B b\left(\vec{x}\right) - \bar F S s\left(\vec{x}\right) \right)
The error matrix on the :math:`\hat{\mathbf{A}}` coefficient is simply
:math:`(\mathbf{M}^T \mathbf{W} \mathbf{M})^{-1}`.
See Also
--------
ChromaticPSF2DFitWorkspace.simulate
Parameters
----------
params: array_like
Full forward model parameter array.
Examples
--------
Load data:
>>> spec = Spectrum("./tests/data/sim_20170530_134_spectrum.fits")
>>> spec.plot_spectrogram()
Simulate the data with fixed amplitude priors:
.. doctest::
>>> w = FullForwardModelFitWorkspace(spectrum=spec, amplitude_priors_method="fixed", verbose=True)
>>> y, mod, mod_err = w.simulate(*w.params.values)
>>> w.plot_fit()
.. doctest::
:hide:
>>> assert mod is not None
Simulate the data with a Tikhonov prior on amplitude parameters:
.. doctest::
>>> spec = Spectrum("./tests/data/sim_20170530_134_spectrum.fits")
>>> w = FullForwardModelFitWorkspace(spectrum=spec, amplitude_priors_method="spectrum", verbose=True)
>>> y, mod, mod_err = w.simulate(*w.params.values)
>>> w.plot_fit()
"""
# linear regression for the amplitude parameters
# prepare the vectors
self.params.values = np.asarray(params)
A1, A2, A3, D2CCD, dx0, dy0, angle, B, Astar, rot, pressure, temperature, airmass, *poly_params_all = params
poly_params = np.array(poly_params_all).reshape((len(self.diffraction_orders), -1))
self.spectrum.adr_params[2] = temperature
self.spectrum.adr_params[3] = pressure
self.spectrum.adr_params[-1] = airmass
parameters.OBS_CAMERA_ROTATION = rot
R = self.data - B * self.bgd
if self.starfield is not None:
R -= self.starfield
W_dot_data = (self.W * R).astype("float32")
# Evaluate ADR and compute wavelength arrays
self.lambdas = self.spectrum.compute_lambdas_in_spectrogram(D2CCD, dx0, dy0, angle, niter=5, with_adr=True,
order=self.diffraction_orders[0])
M = None
for k, order in enumerate(self.diffraction_orders):
if self.tr[k] is None or self.params[f"A{order}"] == 0: # diffraction order undefined
self.psf_profile_params[order] = None
continue
# Evaluate PSF profile
if k == 0:
self.psf_profile_params[order] = self.spectrum.chromatic_psf.update(poly_params[k], self.spectrum.spectrogram_x0 + dx0,
self.spectrum.spectrogram_y0 + dy0, angle, plot=False, apply_bounds=True)
else:
self.psf_profile_params[order] = self.spectrum.chromatic_psf.from_poly_params_to_profile_params(poly_params[k], apply_bounds=True)
# Dispersion law
dispersion_law = self.spectrum.compute_dispersion_in_spectrogram(self.lambdas, D2CCD, dx0, dy0, angle,
with_adr=True, order=order)
# Fill spectrogram trace as a function of the pixel column x
self.psf_profile_params[order][:, 0] = self.params[f"A{order}"] * self.tr[k](self.lambdas)
self.psf_profile_params[order][:, 1] = dispersion_law.real + self.spectrum.spectrogram_x0
self.psf_profile_params[order][:, 2] += dispersion_law.imag - self.bgd_width
# Matrix filling
M_order = self.spectrum.chromatic_psf.build_sparse_M(self.pixels, self.psf_profile_params[order],
dtype="float32", M_sparse_indices=self.M_sparse_indices[order],
boundaries=self.boundaries[order])
if M is None:
M = M_order
else:
M += M_order
if self.flat is not None:
# multiply each M matrix columns by the flat array (see the docstring)
# TODO: if flat array is a cube flat, needs to multiply directly in build_sparse_M
dia = sparse.dia_matrix(([self.flat.astype("float32")], [0]), shape=(self.flat.size, self.flat.size))
M = (dia @ M).tocsc()
# Algebra to compute amplitude parameters
if self.amplitude_priors_method != "fixed":
M_dot_W = M.T @ self.sqrtW
if sparse_dot_mkl is None:
M_dot_W_dot_M = M_dot_W @ M_dot_W.T
else:
tri = sparse_dot_mkl.gram_matrix_mkl(M_dot_W, transpose=True)
dia = sparse.csr_matrix((tri.diagonal(), (np.arange(tri.shape[0]), np.arange(tri.shape[0]))),
shape=tri.shape, dtype="float32")
M_dot_W_dot_M = (tri + tri.T - dia).toarray()
if self.amplitude_priors_method != "spectrum":
if self.amplitude_priors_method == "keep":
amplitude_params = np.copy(self.amplitude_params)
else:
amplitude_params = cholesky_solve(M_dot_W_dot_M.toarray(), M.T @ W_dot_data)
if self.amplitude_priors_method == "positive":
amplitude_params[amplitude_params < 0] = 0
elif self.amplitude_priors_method == "smooth":
null_indices = np.where(amplitude_params < 0)[0]
for index in null_indices:
right = amplitude_params[index]
for i in range(index, min(index + 10, self.Nx)):
right = amplitude_params[i]
if i not in null_indices:
break
left = amplitude_params[index]
for i in range(index, max(0, index - 10), -1):
left = amplitude_params[i]
if i not in null_indices:
break
amplitude_params[index] = 0.5 * (right + left)
elif self.amplitude_priors_method == "noprior":
pass
else:
M_dot_W_dot_M_plus_Q = M_dot_W_dot_M + np.float32(self.reg) * self.Q
amplitude_params = cholesky_solve(M_dot_W_dot_M_plus_Q, M.T @ W_dot_data + np.float32(self.reg) * self.Q_dot_A0)
self.M_dot_W_dot_M = M_dot_W_dot_M
amplitude_params = np.asarray(amplitude_params).reshape(-1)
else:
amplitude_params = np.copy(self.amplitude_priors)
# Save results
self.M = M
self.psf_poly_params = np.copy(poly_params[0])
self.amplitude_params = np.copy(amplitude_params)
# Compute the model
self.model = M @ amplitude_params
self.model += B * self.bgd
if self.starfield is not None:
self.model += Astar * self.starfield
self.model_err = np.zeros_like(self.model)
return self.pixels, self.model, self.model_err
[docs]
def amplitude_covariance(self):
r"""
Compute the covariance matrix for the amplitude parameters.
The error matrix on the :math:`\hat{\mathbf{A}}` coefficient is simply
:math:`(\mathbf{M}^T \mathbf{W} \mathbf{M})^{-1}`.
See Also
--------
ChromaticPSF2DFitWorkspace.simulate
Examples
--------
Load data:
>>> from spectractor.tools import plot_covariance_matrix
>>> spec = Spectrum("./tests/data/sim_20170530_134_spectrum.fits")
>>> spec.plot_spectrogram()
Simulate the data with fixed amplitude priors:
.. doctest::
>>> w = FullForwardModelFitWorkspace(spectrum=spec, amplitude_priors_method="fixed", verbose=True)
>>> y, mod, mod_err = w.simulate(*w.params.values)
>>> cov = w.amplitude_covariance()
>>> plot_covariance_matrix(cov)
.. doctest::
:hide:
>>> assert mod is not None
Simulate the data with a Tikhonov prior on amplitude parameters:
.. doctest::
>>> spec = Spectrum("./tests/data/sim_20170530_134_spectrum.fits")
>>> w = FullForwardModelFitWorkspace(spectrum=spec, amplitude_priors_method="spectrum", verbose=True)
>>> y, mod, mod_err = w.simulate(*w.params.values)
>>> cov = w.amplitude_covariance()
>>> plot_covariance_matrix(cov)
"""
if self.amplitude_priors_method != "fixed":
if self.amplitude_priors_method != "spectrum":
if self.amplitude_priors_method == "keep":
cov_matrix = np.copy(self.amplitude_cov_matrix)
else:
cov_matrix = np.linalg.inv(self.M_dot_W_dot_M.toarray())
else:
M_dot_W_dot_M_plus_Q = self.M_dot_W_dot_M + self.reg * self.Q
cov_matrix = np.linalg.inv(M_dot_W_dot_M_plus_Q)
else:
err2 = np.copy(self.amplitude_priors)
err2[err2 <= 0] = np.min(np.abs(err2[err2 > 0]))
cov_matrix = np.diag(err2)
# TODO: propagate and marginalize over the shape parameter uncertainties ?
self.amplitude_params_err = np.array([np.sqrt(np.abs(cov_matrix[x, x])) for x in range(self.Nx)])
self.amplitude_cov_matrix = np.copy(cov_matrix)
return self.amplitude_cov_matrix
[docs]
def jacobian(self, params, model_input=None):
if model_input is not None:
lambdas, model, model_err = model_input
else:
lambdas, model, model_err = self.simulate(*params)
J = np.zeros((params.size, model.size))
method = copy.copy(self.amplitude_priors_method)
self.amplitude_priors_method = "keep"
for ip, p in enumerate(params):
if self.params.fixed[ip]:
continue
if ip >= self.psf_params_start_index[0]:
continue
tmp_p = np.copy(params)
if tmp_p[ip] + self.epsilon[ip] < self.params.bounds[ip][0] or tmp_p[ip] + self.epsilon[ip] > self.params.bounds[ip][1]:
self.epsilon[ip] = - self.epsilon[ip]
tmp_p[ip] += self.epsilon[ip]
tmp_lambdas, tmp_model, tmp_model_err = self.simulate(*tmp_p)
J[ip] = (tmp_model - model) / self.epsilon[ip]
self.amplitude_priors_method = method
for k, order in enumerate(self.diffraction_orders):
if self.psf_profile_params[order] is None:
continue
start = self.psf_params_start_index[k]
profile_params = np.copy(self.psf_profile_params[order])
amplitude_params = np.copy(self.amplitude_params)
profile_params[:, 0] *= amplitude_params
J[start:start+len(self.psf_poly_params)] = self.spectrum.chromatic_psf.build_psf_jacobian(self.pixels, profile_params=profile_params,
psf_cube_sparse_indices=self.psf_cube_sparse_indices[order],
boundaries=self.boundaries[order], dtype="float32")
return J
[docs]
def amplitude_derivatives(self):
r"""
Compute analytically the amplitude vector \hat{\mathbf{A}} derivatives with respect to the PSF parameters.
With
.. math::
\hat{\mathbf{A}} = \hat{\mathbf{C}} \cdot \mathbf{M}^T \mathbf{W} \mathbf{y}
\hat{\mathbf{C}} = (\mathbf{M}^T \mathbf{W} \mathbf{M})^{-1}
derivatives are
.. math::
\frac{\partial \hat{\mathbf{A}}}{\partial \theta} = \frac{\partial \hat{\mathbf{C}}}{\partial \theta} \cdot \mathbf{M}^T \mathbf{W} \mathbf{y} + \hat{\mathbf{C}} \cdot \frac{\partial \mathbf{M}^T \mathbf{W} \mathbf{y}}{\partial \theta}
\frac{\partial \hat{\mathbf{C}}}{\partial \theta} = - \hat{\mathbf{C}} \cdot \frac{\partial \mathbf{M}^T \mathbf{W} \mathbf{M}}{\partial \theta} \cdot \hat{\mathbf{C}}
\frac{\partial \mathbf{M}^T \mathbf{W} \mathbf{M}}{\partial \theta} = 2 \frac{\partial \mathbf{M}^T}{\partial \theta} \mathbf{W} \mathbf{M}
\frac{\partial \mathbf{M}^T \mathbf{W} \mathbf{y}}{\partial \theta} = \frac{\partial \mathbf{M}^T}{\partial \theta} \mathbf{W} \mathbf{y}
If amplitude vector is regularized via Tikhonov regularisation, regularisation term is added appropriately.
Returns
-------
dA_dtheta: list
List of amplitude vector derivatives.
Examples
--------
>>> spec = Spectrum("./tests/data/sim_20170530_134_spectrum.fits")
>>> w = FullForwardModelFitWorkspace(spectrum=spec, amplitude_priors_method="spectrum", verbose=True)
>>> y, mod, mod_err = w.simulate(*w.params.values)
>>> dA_dtheta = w.amplitude_derivatives()
>>> print(np.array(dA_dtheta).shape, w.amplitude_params.shape)
(26, 669) (669,)
"""
# compute matrices without derivatives
WM = sparse.dia_matrix((self.W, 0), shape=(self.W.size, self.W.size), dtype="float32") @ self.M
WD = (self.W * (self.data + (1 - self.params.values[self.params.get_index("B")]) * self.bgd)).astype("float32")
MWD = self.M.T @ WD
if self.amplitude_priors_method == "spectrum":
MWD += np.float32(self.reg) * self.Q_dot_A0
# compute list of partial derivatives of model matrix M for all diffraction orders
dM_dtheta = []
Jpsf_indices = []
for k, order in enumerate(self.diffraction_orders):
if self.psf_profile_params[order] is None:
continue
profile_params = np.copy(self.psf_profile_params[order])
dMsparse_order = self.spectrum.chromatic_psf.build_sparse_dM(self.pixels, profile_params=profile_params,
M_sparse_indices=self.M_sparse_indices[order],
boundaries=self.boundaries[order], dtype="float32")
dM_dtheta += dMsparse_order
start = self.psf_params_start_index[k]
Jpsf_indices += list(range(start, start+len(dMsparse_order)))
# compute partial derivatives of amplitude vector A
nparams = len(dM_dtheta)
dMWD_dtheta = [dM_dtheta[ip].T @ WD for ip in range(nparams)]
dMWM_rQA_dtheta = [2 * dM_dtheta[ip].T @ WM for ip in range(nparams)]
dcov_dtheta = [-self.amplitude_cov_matrix @ (dMWM_rQA_dtheta[ip] @ self.amplitude_cov_matrix) for ip in range(nparams)]
dA_dtheta = [self.amplitude_cov_matrix @ dMWD_dtheta[ip] + dcov_dtheta[ip] @ MWD for ip in range(nparams)]
return dA_dtheta
def __post_fit__(self):
self.amplitude_covariance()
[docs]
def plot_spectrogram_comparison_simple(self, ax, title='', extent=None, dispersion=False): # pragma: no cover
"""Method to plot a spectrogram issued from data and compare it with simulations.
Parameters
----------
ax: Axes
Axes instance of shape (4, 2).
title: str, optional
Title for the simulation plot (default: '').
extent: array_like, optional
Extent argument for imshow to crop plots (default: None).
dispersion: bool, optional
If True, plot a colored bar to see the associated wavelength color along the x axis (default: False).
"""
cmap_bwr = copy.copy(cm.get_cmap('bwr'))
cmap_bwr.set_bad(color='lightgrey')
cmap_viridis = copy.copy(cm.get_cmap('viridis'))
cmap_viridis.set_bad(color='lightgrey')
data = np.copy(self.data_before_mask)
if len(self.outliers) > 0 or len(self.mask) > 0:
bad_indices = np.array(list(self.get_bad_indices()) + list(self.mask)).astype(int)
data[bad_indices] = np.nan
lambdas = self.spectrum.lambdas
sub = np.where((lambdas > parameters.LAMBDA_MIN) & (lambdas < parameters.LAMBDA_MAX))[0]
sub = np.where(sub < self.spectrum.spectrogram_data.shape[1])[0]
data = data.reshape((self.Ny, self.Nx))
err = self.err.reshape((self.Ny, self.Nx))
model = self.model.reshape((self.Ny, self.Nx))
if extent is not None:
sub = np.where((lambdas > extent[0]) & (lambdas < extent[1]))[0]
if len(sub) > 0:
norm = np.nanmax(data[:, sub])
plot_image_simple(ax[0, 0], data=data[:, sub] / norm, title='Data', aspect='auto',
cax=ax[0, 1], vmin=0, vmax=1, units='1/max(data)', cmap=cmap_viridis)
ax[0, 0].set_title('Data', fontsize=10, loc='center', color='white', y=0.8)
plot_image_simple(ax[1, 0], data=model[:, sub] / norm, aspect='auto', cax=ax[1, 1], vmin=0, vmax=1,
units='1/max(data)', cmap=cmap_viridis)
if dispersion:
x = self.spectrum.chromatic_psf.table['Dx'][sub[5:-5]] + self.spectrum.spectrogram_x0 - sub[0]
y = np.ones_like(x)
ax[1, 0].scatter(x, y, cmap=from_lambda_to_colormap(self.lambdas[sub[5:-5]]), edgecolors='None',
c=self.lambdas[sub[5:-5]],
label='', marker='o', s=10)
ax[1, 0].set_xlim(0, model[:, sub].shape[1])
ax[1, 0].set_ylim(0, model[:, sub].shape[0])
# p0 = ax.plot(lambdas, self.model(lambdas), label='model')
# # ax.plot(self.lambdas, self.model_noconv, label='before conv')
if title != '':
ax[1, 0].set_title(title, fontsize=10, loc='center', color='white', y=0.8)
residuals = (data - model)
# residuals_err = self.spectrum.spectrogram_err / self.model
norm = err
residuals /= norm
std = float(np.nanstd(residuals[:, sub]))
plot_image_simple(ax[2, 0], data=residuals[:, sub], vmin=-5 * std, vmax=5 * std, title='(Data-Model)/Err',
aspect='auto', cax=ax[2, 1], units='', cmap=cmap_bwr)
ax[2, 0].set_title('(Data-Model)/Err', fontsize=10, loc='center', color='black', y=0.8)
ax[2, 0].text(0.05, 0.05,
f'mean={np.nanmean(residuals[:, sub]):.3f}\nstd={np.nanstd(residuals[:, sub]):.3f}',
horizontalalignment='left', verticalalignment='bottom',
color='black', transform=ax[2, 0].transAxes)
ax[0, 0].set_xticks(ax[2, 0].get_xticks()[1:-1])
ax[0, 1].get_yaxis().set_label_coords(3.5, 0.5)
ax[1, 1].get_yaxis().set_label_coords(3.5, 0.5)
ax[2, 1].get_yaxis().set_label_coords(3.5, 0.5)
ax[3, 1].remove()
ax[3, 0].plot(self.lambdas[sub], np.nansum(data, axis=0)[sub], label='Data')
model[np.isnan(data)] = np.nan # mask background values outside fitted region
ax[3, 0].plot(self.lambdas[sub], np.nansum(model, axis=0)[sub], label='Model')
ax[3, 0].set_ylabel('Cross spectrum')
ax[3, 0].set_xlabel(r'$\lambda$ [nm]')
ax[3, 0].legend(fontsize=7)
ax[3, 0].grid(True)
[docs]
def plot_fit(self): # pragma: no cover
"""Plot the fit result.
Examples
--------
>>> spec = Spectrum('tests/data/reduc_20170530_134_spectrum.fits')
>>> w = FullForwardModelFitWorkspace(spec, verbose=True, plot=True, live_fit=False)
>>> lambdas, model, model_err = w.simulate(*w.params.values)
>>> w.plot_fit()
.. plot::
:include-source:
from spectractor.fit.fit_spectrogram import SpectrogramFitWorkspace
file_name = 'tests/data/reduc_20170530_134_spectrum.fits'
atmgrid_file_name = file_name.replace('spectrum', 'atmsim')
fit_workspace = SpectrogramFitWorkspace(file_name, atmgrid_file_name=atmgrid_file_name, verbose=True)
lambdas, model, model_err = fit_workspace.simulation.simulate(*w.params.values)
fit_workspace.lambdas = lambdas
fit_workspace.model = model
fit_workspace.model_err = model_err
fit_workspace.plot_fit()
"""
if np.max(self.lambdas) < 730:
gs_kw = dict(width_ratios=[1, 0.15], height_ratios=[1, 1, 1, 1])
fig, ax = plt.subplots(nrows=4, ncols=2, figsize=(8, 10), gridspec_kw=gs_kw)
elif 800 < np.max(self.lambdas) < 950:
gs_kw = dict(width_ratios=[3, 0.15, 1, 0.15], height_ratios=[1, 1, 1, 1])
fig, ax = plt.subplots(nrows=4, ncols=4, figsize=(14, 10), gridspec_kw=gs_kw)
else:
gs_kw = dict(width_ratios=[3, 0.15, 1, 0.15, 1, 0.15], height_ratios=[1, 1, 1, 1])
fig, ax = plt.subplots(nrows=4, ncols=6, figsize=(18, 10), gridspec_kw=gs_kw)
# main plot
self.plot_spectrogram_comparison_simple(ax[:, 0:2], title='Spectrogram model', dispersion=True)
# zoom O2
if np.max(self.lambdas) > 800:
self.plot_spectrogram_comparison_simple(ax[:, 2:4], extent=[730, 800], title='Zoom $O_2$', dispersion=True)
# zoom H2O
if np.max(self.lambdas) > 950:
self.plot_spectrogram_comparison_simple(ax[:, 4:6], extent=[870, min(1000, int(np.max(self.lambdas)))], title='Zoom $H_2 O$', dispersion=True)
# for i in range(ax.shape[0]-1): # clear middle colorbars
# for j in range(ax.shape[1]//2-1):
# plt.delaxes(ax[i, 2 * j + 1])
for i in range(ax.shape[0]): # clear middle y axis labels
for j in range(1, ax.shape[1]//2):
ax[i, 2 * j].set_ylabel("")
fig.tight_layout()
if parameters.LSST_SAVEFIGPATH:
figname = os.path.join(parameters.LSST_SAVEFIGPATH, f'ffm_bestfit.pdf')
self.my_logger.info(f"\n\tSave figure {figname}.")
fig.savefig(figname, dpi=100, bbox_inches='tight')
gs_kw = dict(width_ratios=[3, 0.15], height_ratios=[1, 1, 1, 1])
fig2, ax2 = plt.subplots(nrows=4, ncols=2, figsize=(10, 12), gridspec_kw=gs_kw)
self.plot_spectrogram_comparison_simple(ax2, title='Spectrogram model', dispersion=True)
# plt.delaxes(ax2[3, 1])
fig2.tight_layout()
figname = os.path.join(parameters.LSST_SAVEFIGPATH, f'ffm_bestfit_2.pdf')
self.my_logger.info(f"\n\tSave figure {figname}.")
fig2.savefig(figname, dpi=100, bbox_inches='tight')
if self.live_fit:
plt.draw()
plt.pause(1e-8)
plt.close()
else:
if parameters.DISPLAY and self.verbose:
plt.show()
[docs]
def adjust_spectrogram_position_parameters(self):
# fit the spectrogram trace
fixed_default = np.copy(self.params.fixed)
self.params.fixed = [True] * len(self.params.values)
strategy = copy.copy(self.amplitude_priors_method)
self.amplitude_priors_method = "fixed"
# let A1 free to help finding the spectrogram trace, with amplitude fixed to prior
self.params.fixed[self.params.get_index(f"A{self.diffraction_orders[0]}")] = True # A1
self.params.fixed[self.params.get_index(r"shift_y [pix]")] = False # shift y
self.params.fixed[self.params.get_index(r"angle [deg]")] = False # angle
run_minimisation(self, "newton", xtol=1e-2, ftol=0.01, with_line_search=False) # 1000 / self.data.size)
self.params.fixed = fixed_default
self.set_mask(params=self.params.values, fwhmx_clip=3 * parameters.PSF_FWHM_CLIP, fwhmy_clip=parameters.PSF_FWHM_CLIP)
# refix A1=1 and let amplitude parameters free
self.amplitude_priors_method = strategy
self.params.values[self.params.get_index(f"A{self.diffraction_orders[0]}")] = 1
[docs]
def run_ffm_minimisation(w, method="newton", niter=2):
"""Interface function to fit spectrogram simulation parameters to data.
Parameters
----------
w: FullForwardModelFitWorkspace
An instance of the SpectrogramFitWorkspace class.
method: str, optional
Fitting method (default: 'newton').
niter: int, optional
Number of FFM iterations to final result (default: 2).
Returns
-------
spectrum: Spectrum
The extracted spectrum.
Examples
--------
>>> spec = Spectrum("./tests/data/sim_20170530_134_spectrum.fits")
>>> parameters.VERBOSE = True
>>> w = FullForwardModelFitWorkspace(spec, verbose=True, plot=True, live_fit=False, amplitude_priors_method="spectrum")
>>> spec = run_ffm_minimisation(w, method="newton") # doctest: +ELLIPSIS
>>> if 'LBDAS_T' in spec.header: plot_comparison_truth(spec, w)
.. doctest:
:hide:
>>> assert w.costs[-1] / w.data.size < 1.22 # reduced chisq
>>> assert np.isclose(w.params[r"angle [deg]"], -1.56, rtol=0.05) # angle
>>> assert np.isclose(w.params["B"], 1, rtol=0.05) # B
"""
my_logger = set_logger(__name__)
my_logger.info(f"\n\tStart FFM with adjust_spectrogram_position_parameters.")
w.adjust_spectrogram_position_parameters()
if method != "newton":
run_minimisation(w, method=method)
else:
costs = np.array([w.chisq(w.params.values)])
if parameters.DISPLAY and (parameters.DEBUG or w.live_fit):
w.plot_fit()
start = time.time()
my_logger.info(f"\tStart guess:\n\t" + '\n\t'.join([f'{w.params.labels[k]}: {w.params.values[k]} (fixed={w.params.fixed[k]})' for k in range(w.params.ndim)]))
run_minimisation(w, method=method, xtol=1e-3, ftol=1e-2, with_line_search=False) # 1000 / (w.data.size - len(w.mask)))
if parameters.DEBUG and parameters.DISPLAY:
w.plot_fit()
weighted_mean_fwhm = np.average(w.spectrum.chromatic_psf.table['fwhm'], weights=w.spectrum.chromatic_psf.table['amplitude'])
my_logger.info(f"\n\tMean FWHM: {weighted_mean_fwhm} pixels (weighted with spectrum amplitude)")
if parameters.DEBUG: # pragma: no cover
fig, ax = plt.subplots(1, 1, figsize=(7, 5), sharex="all")
ax.plot(w.spectrum.lambdas, np.array(w.spectrum.chromatic_psf.table['fwhm']), label=f"weighted mean={weighted_mean_fwhm} pix")
ax.set_xlabel(r"$\lambda$ [nm]")
ax.set_ylabel("Transverse FWHM [pixels]")
ax.set_ylim((0.8 * np.min(w.spectrum.chromatic_psf.table['fwhm']), 1.2 * np.max(w.spectrum.chromatic_psf.table['fwhm']))) # [-10:])))
ax.grid()
ax.legend()
if parameters.DISPLAY:
plt.show()
if parameters.LSST_SAVEFIGPATH:
fig.savefig(os.path.join(parameters.LSST_SAVEFIGPATH, 'fwhm_2.pdf'))
# Optimize the regularisation parameter only if it was not done before
if w.amplitude_priors_method == "spectrum" and w.reg == parameters.PSF_FIT_REG_PARAM: # pragma: no cover
my_logger.info("\n\tStart regularization parameter estimation...")
w_reg = RegFitWorkspace(w, opt_reg=parameters.PSF_FIT_REG_PARAM, verbose=True)
w_reg.run_regularisation(Ndof=w.trace_r)
w.opt_reg = w_reg.opt_reg
w.reg = np.copy(w_reg.opt_reg)
w.trace_r = np.trace(w_reg.resolution)
w.simulate(*w.params.values)
if np.trace(w.amplitude_cov_matrix) < np.trace(w.amplitude_priors_cov_matrix):
w.my_logger.warning(
f"\n\tTrace of final covariance matrix ({np.trace(w.amplitude_cov_matrix)}) is "
f"below the trace of the prior covariance matrix "
f"({np.trace(w.amplitude_priors_cov_matrix)}). This is probably due to a very "
f"high regularisation parameter in case of a bad fit. Therefore the final "
f"covariance matrix is multiplied by the ratio of the traces and "
f"the amplitude parameters are very close the amplitude priors.")
r = np.trace(w.amplitude_priors_cov_matrix) / np.trace(w.amplitude_cov_matrix)
w.amplitude_cov_matrix *= r
w.amplitude_params_err = np.array(
[np.sqrt(w.amplitude_cov_matrix[x, x]) for x in range(w.Nx)])
w.spectrum.header['PSF_REG'] = w.opt_reg
w.spectrum.header['TRACE_R'] = w.trace_r
if parameters.DEBUG and parameters.DISPLAY:
w.plot_fit()
my_logger.info(f"\n\tStart run_minimisation_sigma_clipping "
f"with sigma={parameters.SPECTRACTOR_DECONVOLUTION_SIGMA_CLIP}.")
for i in range(niter):
w.set_mask(params=w.params.values, fwhmx_clip=3 * parameters.PSF_FWHM_CLIP, fwhmy_clip=parameters.PSF_FWHM_CLIP)
run_minimisation_sigma_clipping(w, "newton", xtol=1e-5,
ftol=1e-3, niter_clip=3, # ftol=100 / (w.data.size - len(w.mask))
sigma_clip=parameters.SPECTRACTOR_DECONVOLUTION_SIGMA_CLIP, verbose=True,
with_line_search=False)
my_logger.info(f"\n\t niter = {i} : Newton: total computation time: {time.time() - start}s")
if parameters.DEBUG and parameters.DISPLAY:
w.plot_fit()
w.spectrum.lambdas = np.copy(w.lambdas)
w.spectrum.chromatic_psf.table['lambdas'] = np.copy(w.lambdas)
w.spectrum.data = np.copy(w.amplitude_params)
w.spectrum.err = np.copy(w.amplitude_params_err)
w.spectrum.cov_matrix = np.copy(w.amplitude_cov_matrix)
w.spectrum.chromatic_psf.fill_table_with_profile_params(w.psf_profile_params[w.diffraction_orders[0]])
w.spectrum.chromatic_psf.table["amplitude"] = np.copy(w.amplitude_params)
w.spectrum.chromatic_psf.from_profile_params_to_shape_params(w.psf_profile_params[w.diffraction_orders[0]])
w.spectrum.chromatic_psf.params.values = w.spectrum.chromatic_psf.from_table_to_poly_params()
w.spectrum.spectrogram_fit = w.model.reshape((w.Ny, w.Nx))
w.spectrum.spectrogram_residuals = (w.data.reshape((w.Ny, w.Nx)) - w.spectrum.spectrogram_fit) / w.err.reshape((w.Ny, w.Nx))
w.spectrum.header['CHI2_FIT'] = w.costs[-1] / (w.data.size - len(w.mask))
w.spectrum.header['PIXSHIFT'] = w.params[r"shift_x [pix]"]
w.spectrum.header['D2CCD'] = w.params[r"D_CCD [mm]"]
if len(w.diffraction_orders) >= 2:
w.spectrum.header['A2_FIT'] = w.params[f"A{w.diffraction_orders[1]}"]
w.spectrum.header["ROTANGLE"] = w.params[r"angle [deg]"]
w.spectrum.header["AM_FIT"] = w.params["z"]
# Compute next order contamination
if len(w.diffraction_orders) >= 2 and w.tr[1]:
w.spectrum.data_next_order = w.params.values[w.diffraction_orders[1]] * w.amplitude_params * w.tr[1](w.lambdas)
w.spectrum.err_next_order = np.abs(w.params.values[w.diffraction_orders[1]] * w.amplitude_params_err * w.tr[1](w.lambdas))
# Calibrate the spectrum
calibrate_spectrum(w.spectrum, with_adr=(parameters.OBS_OBJECT_TYPE == "STAR"), grid_search=False)
w.params.set(r"D_CCD [mm]", w.spectrum.header['D2CCD'])
w.params.set(r"shift_x [pix]", w.spectrum.header['PIXSHIFT'])
w.spectrum.convert_from_flam_to_ADUrate()
# Mask forgotten cosmics
cr_mask = mask_cosmics(w.spectrum.spectrogram_residuals, maxiter=3, sigma_clip=5, convolve_kernel_size=0)
if np.sum(cr_mask) > 0:
my_logger.info(f"\n\t{np.sum(cr_mask)} new pixels identified and masked as cosmics.")
cr_mask_flat = cr_mask.flatten()
w.mask += [i for i in range(cr_mask_flat.size) if cr_mask_flat[i]]
w.mask = list(set(w.mask))
w.mask.sort()
if w.filename != "":
parameters.SAVE = True
w.params.plot_correlation_matrix()
write_fitparameter_json(w.params.json_filename, w.params,
extra={"chi2": costs[-1] / (w.data.size - len(w.outliers) - len(w.mask)),
"date-obs": w.spectrum.date_obs})
w.plot_fit()
parameters.SAVE = False
# Propagate parameters
w.spectrum.rotation_angle = w.params.values[w.params.get_index("angle [deg]")]
w.spectrum.spectrogram_bgd *= w.params.values[w.params.get_index("B")]
w.spectrum.spectrogram_bgd_rms *= w.params.values[w.params.get_index("B")]
w.spectrum.spectrogram_x0 += w.params.values[w.params.get_index("shift_x [pix]")]
w.spectrum.spectrogram_y0 += w.params.values[w.params.get_index("shift_y [pix]")]
w.spectrum.x0[0] += w.params.values[w.params.get_index("shift_x [pix]")]
w.spectrum.x0[1] += w.params.values[w.params.get_index("shift_y [pix]")]
w.spectrum.header["TARGETX"] = w.spectrum.x0[0]
w.spectrum.header["TARGETY"] = w.spectrum.x0[1]
w.spectrum.header['MEANFWHM'] = np.mean(np.array(w.spectrum.chromatic_psf.table['fwhm']))
# Convert to flam
w.spectrum.convert_from_ADUrate_to_flam()
# Compare with truth if available
if 'LBDAS_T' in w.spectrum.header and parameters.DEBUG:
plot_comparison_truth(w.spectrum, w)
return w.spectrum
[docs]
def SpectractorInit(file_name, target_label='', disperser_label="", config=''):
""" Spectractor initialisation: load the config parameters and build the Image instance.
Parameters
----------
file_name: str
Input file nam of the image to analyse.
target_label: str, optional
The name of the targeted object (default: "").
disperser_label: str, optional
The name of the disperser (default: "").
config: str
The config file name (default: "").
Returns
-------
image: Image
The prepared Image instance ready for spectrum extraction.
Examples
--------
Extract the spectrogram and its characteristics from the image:
.. doctest::
>>> import os
>>> from spectractor.logbook import LogBook
>>> logbook = LogBook(logbook='./tests/data/ctiofulllogbook_jun2017_v5.csv')
>>> file_names = ['./tests/data/reduc_20170530_134.fits']
>>> for file_name in file_names:
... tag = file_name.split('/')[-1]
... disperser_label, target_label, xpos, ypos = logbook.search_for_image(tag)
... if target_label is None or xpos is None or ypos is None:
... continue
... image = SpectractorInit(file_name, target_label=target_label,
... disperser_label=disperser_label, config='./config/ctio.ini')
.. doctest::
:hide:
>>> assert image is not None
"""
my_logger = set_logger(__name__)
my_logger.info('\n\tSpectractor initialisation')
# Load config file
if config != "":
load_config(config)
if parameters.LSST_SAVEFIGPATH: # pragma: no cover
ensure_dir(parameters.LSST_SAVEFIGPATH)
# Load reduced image
image = Image(file_name, target_label=target_label, disperser_label=disperser_label)
return image
[docs]
def SpectractorRun(image, output_directory, guess=None):
""" Spectractor main function to extract a spectrum from an image
Parameters
----------
image: Image
Input Image instance.
output_directory: str
Output directory.
guess: [int,int], optional
[x0,y0] list of the guessed pixel positions of the target in the image (must be integers). Mandatory if
WCS solution is absent (default: None).
Returns
-------
spectrum: Spectrum
The extracted spectrum object.
Examples
--------
Extract the spectrogram and its characteristics from the image:
.. doctest::
>>> import os
>>> from spectractor.logbook import LogBook
>>> logbook = LogBook(logbook='./tests/data/ctiofulllogbook_jun2017_v5.csv')
>>> file_names = ['./tests/data/reduc_20170530_134.fits']
>>> for file_name in file_names:
... tag = file_name.split('/')[-1]
... disperser_label, target_label, xpos, ypos = logbook.search_for_image(tag)
... if target_label is None or xpos is None or ypos is None:
... continue
... image = SpectractorInit(file_name, target_label=target_label,
... disperser_label=disperser_label, config='./config/ctio.ini')
... spectrum = SpectractorRun(image, './tests/data/', guess=[xpos, ypos])
.. doctest::
:hide:
>>> assert spectrum is not None
>>> assert os.path.isfile('tests/data/reduc_20170530_134_spectrum.fits')
"""
my_logger = set_logger(__name__)
my_logger.info('\n\tRun Spectractor')
# Guess position of order 0
if guess is not None and image.target_guess is None:
image.target_guess = np.asarray(guess)
if image.target_guess is None:
from scipy.signal import medfilt2d
data = medfilt2d(image.data.T, kernel_size=9)
image.target_guess = np.unravel_index(np.argmax(data), data.shape)
my_logger.info(f"\n\tNo guess position of order 0 has been given. Assuming the spectrum to extract comes "
f"from the brightest object, guess position is set as {image.target_guess}.")
if parameters.DEBUG:
image.plot_image(scale='symlog', title="before rebinning", target_pixcoords=image.target_guess, vmax=1e3)
# Use fast mode
if parameters.CCD_REBIN > 1:
my_logger.info('\n\t ======================= REBIN =============================')
image.rebin()
if parameters.DEBUG:
image.plot_image(scale='symlog', title="after rebinning ", target_pixcoords=image.target_guess)
# Set output path
ensure_dir(output_directory)
output_filename = os.path.basename(image.file_name)
output_filename = output_filename.replace('.fits', '_spectrum.fits')
output_filename = output_filename.replace('.fz', '_spectrum.fits')
output_filename = os.path.join(output_directory, output_filename)
# Find the exact target position in the raw cut image: several methods
my_logger.info(f'\n\tSearch for the target in the image with guess={image.target_guess}...')
find_target(image, image.target_guess, widths=(parameters.XWINDOW, parameters.YWINDOW))
# Simulate star field
if parameters.SPECTRACTOR_SIMULATE_STARFIELD:
image.starfield = image.simulate_starfield_with_gaia()
# Rotate the image
turn_image(image)
# Find the exact target position in the rotated image: several methods
my_logger.info('\n\tSearch for the target in the rotated image...')
find_target(image, image.target_guess, rotated=True, widths=(parameters.XWINDOW_ROT,
parameters.YWINDOW_ROT))
# Create Spectrum object
spectrum = Spectrum(image=image, order=parameters.SPEC_ORDER)
# First 1D spectrum extraction and background extraction
my_logger.info('\n\t ======================== PSF1D Extraction ====================================')
w_psf1d, bgd_model_func = extract_spectrum_from_image(image, spectrum, signal_width=parameters.PIXWIDTH_SIGNAL,
ws=(parameters.PIXDIST_BACKGROUND,
parameters.PIXDIST_BACKGROUND
+ parameters.PIXWIDTH_BACKGROUND))
# PSF2D deconvolution
if parameters.SPECTRACTOR_DECONVOLUTION_PSF2D:
my_logger.info('\n\t ========================== PSF2D DECONVOLUTION ===============================')
run_spectrogram_deconvolution_psf2d(spectrum, bgd_model_func=bgd_model_func)
# Calibrate the spectrum
my_logger.info(f'\n\tCalibrating order {spectrum.order:d} spectrum...')
calibrate_spectrum(spectrum, with_adr=(parameters.OBS_OBJECT_TYPE == "STAR"), grid_search=True)
spectrum.data_next_order = np.zeros_like(spectrum.lambdas)
spectrum.err_next_order = np.zeros_like(spectrum.lambdas)
# Full forward model extraction: add transverse ADR and order 2 subtraction
if parameters.SPECTRACTOR_DECONVOLUTION_FFM:
my_logger.info('\n\t ======================= FFM DECONVOLUTION =============================')
w = FullForwardModelFitWorkspace(spectrum, verbose=parameters.VERBOSE, plot=True, live_fit=False,
amplitude_priors_method="spectrum")
spectrum = run_ffm_minimisation(w, method="newton", niter=2)
# Save the spectrum
my_logger.info('\n\t ======================= SAVE SPECTRUM =============================')
spectrum.save_spectrum(output_filename, overwrite=True)
spectrum.lines.table = spectrum.lines.build_detected_line_table(amplitude_units=spectrum.units)
# Plot the spectrum
if parameters.VERBOSE and parameters.DISPLAY:
spectrum.plot_spectrum(xlim=None)
return spectrum
[docs]
def Spectractor(file_name, output_directory, target_label='', guess=None, disperser_label="", config=''):
""" Spectractor main function to extract a spectrum from a FITS file.
Parameters
----------
file_name: str
Input file nam of the image to analyse.
output_directory: str
Output directory.
target_label: str, optional
The name of the targeted object (default: "").
guess: [int,int], optional
[x0,y0] list of the guessed pixel positions of the target in the image (must be integers). Mandatory if
WCS solution is absent (default: None).
disperser_label: str, optional
The name of the disperser (default: "").
config: str
The config file name (default: "").
Returns
-------
spectrum: Spectrum
The extracted spectrum object.
Examples
--------
Extract the spectrogram and its characteristics from the image:
.. doctest::
>>> import os
>>> from spectractor.logbook import LogBook
>>> logbook = LogBook(logbook='./tests/data/ctiofulllogbook_jun2017_v5.csv')
>>> file_names = ['./tests/data/reduc_20170530_134.fits']
>>> for file_name in file_names:
... tag = file_name.split('/')[-1]
... disperser_label, target_label, xpos, ypos = logbook.search_for_image(tag)
... if target_label is None or xpos is None or ypos is None:
... continue
... spectrum = Spectractor(file_name, './tests/data/', guess=[xpos, ypos], target_label=target_label,
... disperser_label=disperser_label, config='./config/ctio.ini')
.. doctest::
:hide:
>>> assert spectrum is not None
>>> assert os.path.isfile('tests/data/reduc_20170530_134_spectrum.fits')
"""
image = SpectractorInit(file_name, target_label=target_label, disperser_label=disperser_label, config=config)
spectrum = SpectractorRun(image, guess=guess, output_directory=output_directory)
return spectrum
[docs]
def run_spectrogram_deconvolution_psf2d(spectrum, bgd_model_func):
"""Get the spectrum from a 2D PSF deconvolution of the spectrogram.
Parameters
----------
spectrum: Spectrum
bgd_model_func: callable
Returns
-------
w: ChromaticPSFFitWorkspace
"""
my_logger = set_logger(__name__)
s = spectrum.chromatic_psf
Ny, Nx = spectrum.spectrogram_data.shape
# build 1D priors
Dx_rot = np.copy(s.table['Dx'])
amplitude_priors = np.copy(s.table['amplitude'])
amplitude_priors_err = np.copy(s.table['flux_err'])
psf_poly_priors = s.from_table_to_poly_params()[s.Nx:]
Dy_disp_axis = np.copy(s.table["Dy_disp_axis"])
# initialize a new ChromaticPSF
s = ChromaticPSF(s.psf, Nx=Nx, Ny=Ny, x0=spectrum.spectrogram_x0, y0=spectrum.spectrogram_y0,
deg=s.deg, saturation=s.saturation)
# fill a first table with first guess
s.table['Dx'] = (np.arange(spectrum.spectrogram_xmin, spectrum.spectrogram_xmax, 1)
- spectrum.x0[0])[:len(s.table['Dx'])]
s.table["amplitude"] = np.interp(s.table['Dx'], Dx_rot, amplitude_priors)
s.table["flux_sum"] = np.interp(s.table['Dx'], Dx_rot, amplitude_priors)
s.table["flux_err"] = np.interp(s.table['Dx'], Dx_rot, amplitude_priors_err)
s.table['Dy_disp_axis'] = np.interp(s.table['Dx'], Dx_rot, Dy_disp_axis)
s.params.values = np.concatenate((s.table["amplitude"], psf_poly_priors))
s.cov_matrix = np.copy(spectrum.cov_matrix)
s.profile_params = s.from_poly_params_to_profile_params(s.params.values, apply_bounds=True)
s.fill_table_with_profile_params(s.profile_params)
s.from_profile_params_to_shape_params(s.profile_params)
s.table['Dy'] = s.table['y_c'] - spectrum.spectrogram_y0
# deconvolve and regularize with 1D priors
method = "psf1d"
mode = "2D"
my_logger.debug(f"\n\tTransverse fit table before PSF_2D fit:"
f"\n{s.table[['amplitude', 'x_c', 'y_c', 'Dx', 'Dy', 'Dy_disp_axis']]}")
my_logger.info(f'\n\tStart ChromaticPSF polynomial fit with '
f'mode={mode} and amplitude_priors_method={method}...')
data = np.copy(spectrum.spectrogram_data)
err = np.copy(spectrum.spectrogram_err)
# clean the data: this is truly a backward spectrum extraction to feed correctly the forward model
# if available, apply flats
if spectrum.spectrogram_flat is not None:
data /= spectrum.spectrogram_flat
# if available, subtract starfield before 1D spectrum estimate
# (important as it is used as a prior for regularisation)
if spectrum.spectrogram_starfield is not None:
data -= spectrum.spectrogram_starfield
my_logger.info('\n\t ======================= ChromaticPSF2D polynomial fit =============================')
w = s.fit_chromatic_psf(data, bgd_model_func=bgd_model_func, data_errors=err, live_fit=False, mask=spectrum.spectrogram_mask,
amplitude_priors_method=method, mode=mode, verbose=parameters.VERBOSE, analytical=True)
# save results
spectrum.spectrogram_fit = w.model.reshape((w.Ny, w.Nx)) #s.evaluate(s.set_pixels(mode=mode), poly_params=s.params.values)
spectrum.spectrogram_residuals = (w.data.reshape((w.Ny, w.Nx)) - spectrum.spectrogram_fit) / w.err.reshape((w.Ny, w.Nx))
lambdas = spectrum.disperser.grating_pixel_to_lambda(s.get_algebraic_distance_along_dispersion_axis(),
D=parameters.DISTANCE2CCD, x0=spectrum.x0,
order=spectrum.order)
s.table['lambdas'] = lambdas
spectrum.lambdas = np.array(lambdas)
spectrum.data = np.copy(w.amplitude_params)
spectrum.err = np.copy(w.amplitude_params_err)
spectrum.cov_matrix = np.copy(w.amplitude_cov_matrix)
spectrum.pixels = np.copy(s.table['Dx'])
s.table['Dy'] = s.table['y_c'] - spectrum.spectrogram_y0
s.table['Dy_fwhm_inf'] = s.table['Dy'] - 0.5 * s.table['fwhm']
s.table['Dy_fwhm_sup'] = s.table['Dy'] + 0.5 * s.table['fwhm']
spectrum.chromatic_psf = s
spectrum.header['PSF_REG'] = s.opt_reg
spectrum.header['TRACE_R'] = w.trace_r
spectrum.header['MEANFWHM'] = np.mean(np.array(s.table['fwhm']))
# Plot FHWM(lambda)
if parameters.DEBUG: # pragma: no cover
fig, ax = plt.subplots(2, 1, figsize=(10, 8), sharex="all")
ax[0].plot(spectrum.lambdas, np.array(s.table['fwhm']))
ax[0].set_xlabel(r"$\lambda$ [nm]")
ax[0].set_ylabel("Transverse FWHM [pixels]")
ax[0].set_ylim((0.8 * np.min(s.table['fwhm']), 1.2 * np.max(s.table['fwhm']))) # [-10:])))
ax[0].grid()
ax[1].plot(spectrum.lambdas, np.array(s.table['y_c']))
ax[1].set_xlabel(r"$\lambda$ [nm]")
ax[1].set_ylabel("Distance from mean dispersion axis [pixels]")
# ax[1].set_ylim((0.8*np.min(s.table['Dy']), 1.2*np.max(s.table['fwhm'][-10:])))
ax[1].grid()
if parameters.DISPLAY:
plt.show()
if parameters.LSST_SAVEFIGPATH:
fig.savefig(os.path.join(parameters.LSST_SAVEFIGPATH, 'fwhm_2.pdf'))
return w
[docs]
def plot_comparison_truth(spectrum, w): # pragma: no cover
s = spectrum.chromatic_psf
lambdas_truth = np.fromstring(spectrum.header['LBDAS_T'][1:-1], sep=',')
psf_poly_truth = np.fromstring(spectrum.header['PSF_P_T'][1:-1], sep=',', dtype=float)
deg_truth = int(spectrum.header["PSF_DEG"])
psf_poly_truth[-1] = spectrum.spectrogram_saturation
amplitude_truth = np.fromstring(spectrum.header['AMPLIS_T'][1:-1], sep=',', dtype=float)
amplitude_truth *= parameters.FLAM_TO_ADURATE * lambdas_truth * np.gradient(lambdas_truth) * parameters.CCD_REBIN
s0 = ChromaticPSF(s.psf, lambdas_truth.size, s.Ny, deg=deg_truth,
saturation=spectrum.spectrogram_saturation)
s0.params.values = np.asarray(list(amplitude_truth) + list(psf_poly_truth))
# s0.deg = (len(s0.poly_params[s0.Nx:]) - 1) // ((len(s0.psf.param_names) - 2) - 1) // 2
# s0.set_polynomial_degrees(s0.deg)
s0.profile_params = s0.from_poly_params_to_profile_params(s0.params.values)
s0.from_profile_params_to_shape_params(s0.profile_params)
gs_kw = dict(width_ratios=[2, 1], height_ratios=[2, 1])
fig, ax = plt.subplots(2, 2, figsize=(11, 5), sharex="all", gridspec_kw=gs_kw)
ax[0, 0].plot(lambdas_truth, amplitude_truth, label="truth")
amplitude_priors_err = [np.sqrt(w.amplitude_priors_cov_matrix[x, x]) for x in range(w.Nx)]
ax[0, 0].errorbar(spectrum.lambdas, w.amplitude_priors, yerr=amplitude_priors_err, label="prior")
ax[0, 0].errorbar(spectrum.lambdas, w.amplitude_params, yerr=w.amplitude_params_err, label="2D")
ax[0, 0].grid()
ax[0, 0].legend()
ax[0, 0].set_xlabel(r"$\lambda$ [nm]")
ax[0, 0].set_ylabel(f"Amplitude $A$ [ADU/s]")
ax[1, 0].plot(lambdas_truth, np.zeros_like(lambdas_truth), label="truth")
amplitude_truth_interp = np.interp(spectrum.lambdas, lambdas_truth, amplitude_truth)
res = (w.amplitude_priors - amplitude_truth_interp) / amplitude_priors_err
ax[1, 0].errorbar(spectrum.lambdas, res, yerr=np.ones_like(res),
label=f"prior: mean={np.mean(res):.2f}, std={np.std(res):.2f}")
res = (w.amplitude_params - amplitude_truth_interp) / w.amplitude_params_err
ax[1, 0].errorbar(spectrum.lambdas, res, yerr=np.ones_like(res),
label=f"2D: mean={np.mean(res):.2f}, std={np.std(res):.2f}")
ax[1, 0].grid()
ax[1, 0].legend()
ax[1, 0].set_xlabel(r"$\lambda$ [nm]")
ax[1, 0].set_ylim(-5, 5)
ax[1, 0].set_ylabel(r"$(A - A_{\rm truth})/\sigma_A$")
fwhm_truth = np.interp(spectrum.lambdas, lambdas_truth, s0.table["fwhm"]) / parameters.CCD_REBIN
# fwhm_prior = np.interp(spectrum.lambdas, np.arange(w.amplitude_priors.size), w.fwhm_priors)
# fwhm_1d = np.interp(np.arange(len(s.table)), np.arange(w.fwhm_1d.size), w.fwhm_1d)
# fwhm_2d = np.interp(np.arange(len(s.table)), np.arange(s.Nx), fwhm_1d)
ax[0, 1].plot(lambdas_truth, s0.table["fwhm"], label="truth")
ax[0, 1].plot(spectrum.lambdas, w.fwhm_priors, label="prior")
ax[0, 1].plot(spectrum.lambdas, s.table["fwhm"], label="fit")
ax[0, 1].grid()
ax[0, 1].set_ylim(0, 10)
ax[0, 1].legend()
ax[0, 1].set_xlabel(r"$\lambda$ [nm]")
ax[1, 1].set_xlabel(r"$\lambda$ [nm]")
ax[0, 1].set_ylabel(f"FWHM [pix]")
ax[1, 1].set_ylabel(r"FWHM - FWHM$_{\rm truth}$ [pix]")
ax[1, 1].plot(lambdas_truth, np.zeros_like(lambdas_truth), label="truth")
ax[1, 1].plot(spectrum.lambdas, w.fwhm_priors - fwhm_truth, label="prior")
ax[1, 1].plot(spectrum.lambdas, s.table["fwhm"] - fwhm_truth, label="2D")
ax[1, 1].grid()
ax[1, 1].set_ylim(-0.5, 0.5)
ax[1, 1].legend()
plt.subplots_adjust(hspace=0)
fig.tight_layout()
if parameters.DISPLAY:
plt.show()
if parameters.LSST_SAVEFIGPATH:
fig.savefig(os.path.join(parameters.LSST_SAVEFIGPATH, 'deconvolution_truth.pdf'), transparent=True)
plt.show()