Source code for spectractor.fit.fit_spectrogram

import time
import os
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
from scipy.signal import convolve2d
import copy
import getCalspec

from spectractor import parameters
from spectractor.config import set_logger
from spectractor.tools import plot_image_simple, from_lambda_to_colormap
from spectractor.extractor.spectrum import Spectrum
from spectractor.simulation.simulator import SpectrogramModel
from spectractor.simulation.atmosphere import Atmosphere, AtmosphereGrid
from spectractor.fit.fitter import (FitWorkspace, FitParameters, run_minimisation, run_minimisation_sigma_clipping,
                                    write_fitparameter_json)
try:
    from gaiaspec import getGaia
except ModuleNotFoundError:
    getGaia = None

plot_counter = 0


[docs] class SpectrogramFitWorkspace(FitWorkspace): def __init__(self, spectrum, atmgrid_file_name="", fit_angstrom_exponent=False, verbose=False, plot=False, live_fit=False, truth=None): """Class to fit a spectrogram extracted with Spectractor. First the spectrogram is cropped using the parameters.PIXWIDTH_SIGNAL parameter to increase speedness. The truth parameters are loaded from the file header if provided. If provided, the atmospheric grid is used for the atmospheric transmission simulations and interpolated with splines, otherwise Libradtran is called at each step (slower). Parameters ---------- spectrum: Spectrum Spectrum object. atmgrid_file_name: str, optional Atmospheric grid file name (default: ""). fit_angstrom_exponent: bool, optional If True, fit angstrom exponent (default: False). verbose: bool, optional Verbosity level (default: False). plot: bool, optional If True, many plots are produced (default: False). live_fit: bool, optional If True, many plots along the fitting procedure are produced to see convergence in live (default: False). truth: array_like, optional Array of truth parameters to compare with the best fit result (default: None). Examples -------- >>> spec = Spectrum('tests/data/reduc_20170530_134_spectrum.fits') >>> parameters.SPECTRACTOR_ATMOSPHERE_SIM = "libradtran" >>> atmgrid_filename = spec.filename.replace('spectrum', 'atmsim') >>> w = SpectrogramFitWorkspace(spec, atmgrid_file_name=atmgrid_filename, verbose=True, plot=True, live_fit=False) >>> lambdas, model, model_err = w.simulate(*w.params.values) >>> w.plot_fit() """ if not getCalspec.is_calspec(spectrum.target.label): if getGaia is not None: is_gaiaspec = getGaia.is_gaiaspec(spectrum.target.label) is_gaia_full = False if is_gaiaspec == False: is_gaia_full = getGaia.is_gaia_full(spectrum.target.label) if not is_gaiaspec: if not is_gaia_full: raise ValueError(f"{spectrum.target.label=} must be a CALSPEC or GAIA star.") else: raise ValueError(f"{spectrum.target.label=} must be a CALSPEC star according to getCalspec package.") self.spectrum = spectrum self.filename = spectrum.filename.replace("spectrum", "spectrogram") self.diffraction_orders = np.arange(spectrum.order, spectrum.order + 3 * np.sign(spectrum.order), np.sign(spectrum.order)) if len(self.diffraction_orders) == 0: raise ValueError(f"At least one diffraction order must be given for spectrogram simulation.") length = len(self.spectrum.chromatic_psf.table) self.psf_poly_params = self.spectrum.chromatic_psf.from_table_to_poly_params()[length:] self.spectrum.chromatic_psf.psf.apply_max_width_to_bounds(max_half_width=self.spectrum.spectrogram_Ny) self.saturation = self.spectrum.spectrogram_saturation D2CCD = np.copy(spectrum.header['D2CCD']) p = np.array([1, 1, 0, 0.05, 1.2, 400, 5, 1, 1, D2CCD, self.spectrum.header['PIXSHIFT'], 0, self.spectrum.rotation_angle, self.spectrum.pressure]) # parameter indices for which we don't need to recompute the PSF cube for model evaluation # warning: they must be contiguous to preserve psf_cube in jacobian function loop self.fixed_psf_params = np.arange(0, 9, dtype=int) self.psf_params_start_index = np.array([p.size + len(self.psf_poly_params) * k for k in range(len(self.diffraction_orders))]) psf_poly_params_labels = np.copy(self.spectrum.chromatic_psf.params.labels[length:]) psf_poly_params_names = np.copy(self.spectrum.chromatic_psf.params.axis_names[length:]) psf_poly_params_bounds = self.spectrum.chromatic_psf.set_bounds() p = np.concatenate([p] + [self.psf_poly_params] * len(self.diffraction_orders)) input_labels = [f"A{order}" for order in self.diffraction_orders] input_labels += ["VAOD", "angstrom_exp", "ozone [db]", "PWV [mm]", "B", "A_star", r"D_CCD [mm]", r"shift_x [pix]", r"shift_y [pix]", r"angle [deg]", "P [hPa]"] for order in self.diffraction_orders: input_labels += [label + f"_{order}" for label in psf_poly_params_labels] axis_names = [f"$A_{order}$" for order in self.diffraction_orders] axis_names += ["VAOD", r'$\"a$', "ozone [db]", "PWV [mm]", "$B$", r"$A_{star}$", r"$D_{CCD}$ [mm]", r"$\Delta_{\mathrm{x}}$ [pix]", r"$\Delta_{\mathrm{y}}$ [pix]", r"$\theta$ [deg]", r"$P_{\mathrm{atm}}$ [hPa]"] for order in self.diffraction_orders: axis_names += [label+rf"$\!_{order}$" for label in psf_poly_params_names] bounds = [[0, 2], [0, 2], [0, 2], [0, 10], [0, 4], [100, 700], [0, 20], [0.8, 1.2], [0, np.inf], [D2CCD - 20 * parameters.DISTANCE2CCD_ERR, D2CCD + 20 * parameters.DISTANCE2CCD_ERR], [-10, 10], [-10, 10], [-90, 90], [0, np.inf]] bounds += list(psf_poly_params_bounds) * len(self.diffraction_orders) fixed = [False] * p.size for k, par in enumerate(input_labels): if "x_c" in par or "saturation" in par: fixed[k] = True for k, par in enumerate(input_labels): if "y_c" in par: fixed[k] = True p[k] = 0 for k, par in enumerate(input_labels): if k >= self.psf_params_start_index[0] and "y_c" not in par and "x_c" not in par and par[-2:] != f"_{spectrum.order}" and "_0_" not in par: fixed[k] = True p[k] = 0 if k >= self.psf_params_start_index[0] and "eta" in par and par[-2:] != f"_{spectrum.order}": fixed[k] = True p[k] = 0 # for k, par in enumerate(input_labels): # if k >= self.psf_params_start_index[0] and "y_c" not in par and "x_c" not in par and par[-2:] != f"_{spectrum.order}" and "_0_" not in par: # fixed[k] = True # p[k] = 0 params = FitParameters(p, labels=input_labels, axis_names=axis_names, bounds=bounds, fixed=fixed, truth=truth, filename=self.filename) params.fixed[params.get_index(f"A{self.diffraction_orders[0]}")] = False # A1 self.atm_params_indices = np.array([params.get_index(label) for label in ["VAOD", "angstrom_exp", "ozone [db]", "PWV [mm]"]]) # A2 is free only if spectrogram is a simulation or if the order 2/1 ratio is not known and flat params.fixed[params.get_index(f"A{self.diffraction_orders[0]}")] = True # A1 if "A2" in params.labels: params.fixed[params.get_index(f"A{self.diffraction_orders[1]}")] = True #not getCalspec.is_calspec(spectrum.target.label) #"A2_T" not in self.spectrum.header if "A3" in params.labels: params.fixed[params.get_index(f"A{self.diffraction_orders[2]}")] = "A3_T" not in self.spectrum.header params.fixed[params.get_index(r"shift_x [pix]")] = False # Delta x params.fixed[params.get_index(r"shift_y [pix]")] = False # Delta y params.fixed[params.get_index(r"angle [deg]")] = False # angle params.fixed[params.get_index("B")] = True # B params.fixed[params.get_index("P [hPa]")] = False # pressure for ADR if self.spectrum.spectrogram_Ny > 2 * parameters.PIXDIST_BACKGROUND: self.crop_spectrogram() FitWorkspace.__init__(self, params, data=self.spectrum.spectrogram_data.flatten(), err=self.spectrum.spectrogram_err.flatten(), epsilon=1e-4, verbose=verbose, plot=plot, live_fit=live_fit, file_name=self.filename) self.my_logger = set_logger(self.__class__.__name__) if atmgrid_file_name == "": self.atmosphere = Atmosphere(self.spectrum.airmass, self.spectrum.pressure, self.spectrum.temperature) if self.atmosphere.emulator is not None: self.params.bounds[self.params.get_index("ozone [db]")] = (self.atmosphere.emulator.OZMIN, self.atmosphere.emulator.OZMAX) self.params.bounds[self.params.get_index("PWV [mm]")] = (self.atmosphere.emulator.PWVMIN, self.atmosphere.emulator.PWVMAX) else: self.use_grid = True self.atmosphere = AtmosphereGrid(spectrum_filename=spectrum.filename, atmgrid_filename=atmgrid_file_name) self.params.bounds[self.params.get_index("VAOD")] = (min(self.atmosphere.AER_Points), max(self.atmosphere.AER_Points)) self.params.bounds[self.params.get_index("ozone [db]")] = (min(self.atmosphere.OZ_Points), max(self.atmosphere.OZ_Points)) self.params.bounds[self.params.get_index("PWV [mm]")] = (min(self.atmosphere.PWV_Points), max(self.atmosphere.PWV_Points)) self.params.fixed[self.params.get_index("angstrom_exp")] = True # angstrom exponent self.my_logger.info(f'\n\tUse atmospheric grid models from file {atmgrid_file_name}. ') self.lambdas = self.spectrum.lambdas self.Ny, self.Nx = self.spectrum.spectrogram_data.shape self.bgd = self.spectrum.spectrogram_bgd.flatten() if self.spectrum.spectrogram_flat is not None: self.flat = self.spectrum.spectrogram_flat.flatten() else: self.flat = None if self.spectrum.spectrogram_starfield is not None: self.starfield = self.spectrum.spectrogram_starfield.flatten() else: self.starfield = None if self.spectrum.spectrogram_mask is not None: self.mask = list(np.where(spectrum.spectrogram_mask.astype(bool).ravel())[0]) else: self.mask = [] self.fit_angstrom_exponent = fit_angstrom_exponent if not fit_angstrom_exponent: self.params.fixed[self.params.get_index("angstrom_exp")] = True # angstrom exponent self.params.values[self.params.get_index("angstrom_exp")] = self.atmosphere.angstrom_exponent_default if np.min(self.spectrum.lambdas) > 500: self.fit_angstrom_exponent = False self.params.fixed[self.params.get_index("angstrom_exp")] = True self.params.values[self.params.get_index("angstrom_exp")] = 0 self.my_logger.warning("\n\tWavelengths below 500nm detected: angstrom exponent fitting disabled and fixed to 0.") self.spectrogram_simulation = SpectrogramModel(self.spectrum, atmosphere=self.atmosphere, diffraction_orders=self.diffraction_orders, fast_sim=False, with_adr=True) self.lambdas_truth = None self.amplitude_truth = None self.get_spectrogram_truth() # error matrix # here image uncertainties are assumed to be uncorrelated # (which is not exactly true in rotated images) self.W = 1. / (self.err * self.err) self.W = self.W.flatten() # flat data for fitworkspace self.data_before_mask = np.copy(self.data) self.W_before_mask = np.copy(self.W) self.mask_before_mask = list(np.copy(self.mask)) # create mask self.set_mask()
[docs] def crop_spectrogram(self): """Crop the spectrogram in the middle, keeping a vertical width of 2*parameters.PIXWIDTH_SIGNAL around the signal region. """ bgd_width = parameters.PIXWIDTH_BACKGROUND + parameters.PIXDIST_BACKGROUND - parameters.PIXWIDTH_SIGNAL self.spectrum.spectrogram_ymax = self.spectrum.spectrogram_ymax - bgd_width self.spectrum.spectrogram_ymin += bgd_width self.spectrum.spectrogram_bgd = self.spectrum.spectrogram_bgd[bgd_width:-bgd_width, :] self.spectrum.spectrogram_data = self.spectrum.spectrogram_data[bgd_width:-bgd_width, :] self.spectrum.spectrogram_err = self.spectrum.spectrogram_err[bgd_width:-bgd_width, :] if self.spectrum.spectrogram_flat is not None: self.spectrum.spectrogram_flat = self.spectrum.spectrogram_flat[bgd_width:-bgd_width, :] if self.spectrum.spectrogram_starfield is not None: self.spectrum.spectrogram_starfield = self.spectrum.spectrogram_starfield[bgd_width:-bgd_width, :] if self.spectrum.spectrogram_mask is not None: self.spectrum.spectrogram_mask = self.spectrum.spectrogram_mask[bgd_width:-bgd_width, :] self.spectrum.spectrogram_y0 -= bgd_width self.spectrum.chromatic_psf.y0 -= bgd_width self.spectrum.spectrogram_Ny, self.spectrum.spectrogram_Nx = self.spectrum.spectrogram_data.shape self.spectrum.chromatic_psf.table["y_c"] -= bgd_width
[docs] def set_mask(self, params=None): """ Parameters ---------- params Returns ------- Examples -------- >>> spec = Spectrum('tests/data/reduc_20170530_134_spectrum.fits') >>> parameters.SPECTRACTOR_ATMOSPHERE_SIM = "libradtran" >>> w = SpectrogramFitWorkspace(spec, verbose=True) >>> _ = w.simulate(*w.params.values) >>> w.plot_fit() """ self.my_logger.info("\n\tReset spectrogram mask with current parameters.") if params is None: params = self.params.values A1, A2, A3, aerosols, angstrom_exponent, ozone, pwv, B, Astar, D, shift_x, shift_y, angle, pressure, *psf_poly_params_all = params poly_params = np.array(psf_poly_params_all).reshape((len(self.diffraction_orders), -1)) self.spectrogram_simulation.psf_cubes_masked = {} self.spectrogram_simulation.M_sparse_indices = {} self.spectrogram_simulation.psf_cube_sparse_indices = {} self.spectrum.adr_params[3] = pressure for k, order in enumerate(self.diffraction_orders): profile_params = self.spectrum.chromatic_psf.from_poly_params_to_profile_params(poly_params[k], apply_bounds=True) if order == self.diffraction_orders[0]: # only first diffraction order self.spectrum.chromatic_psf.from_profile_params_to_shape_params(profile_params) dispersion_law = self.spectrum.compute_dispersion_in_spectrogram(self.lambdas, D, shift_x, shift_y, angle, with_adr=True, order=order) profile_params[:, 0] = 1 profile_params[:, 1] = dispersion_law.real + self.spectrogram_simulation.r0.real profile_params[:, 2] += dispersion_law.imag # - self.bgd_width psf_cube_masked = self.spectrum.chromatic_psf.build_psf_cube_masked(self.spectrogram_simulation.pixels, profile_params, fwhmx_clip=3 * parameters.PSF_FWHM_CLIP, fwhmy_clip=parameters.PSF_FWHM_CLIP) psf_cube_masked = self.spectrum.chromatic_psf.convolve_psf_cube_masked(psf_cube_masked) # make rectangular mask per wavelength self.spectrogram_simulation.boundaries[order], self.spectrogram_simulation.psf_cubes_masked[order] = self.spectrum.chromatic_psf.set_rectangular_boundaries(psf_cube_masked) if k > 0: # spectrogram model must be accurate inside the k=0 order footprint: enlarge the next order footprints self.spectrogram_simulation.boundaries[order]["ymin"] = np.zeros_like(self.spectrogram_simulation.boundaries[order]["ymin"]) self.spectrogram_simulation.boundaries[order]["ymax"] = self.Ny * np.ones_like(self.spectrogram_simulation.boundaries[order]["ymax"]) self.spectrogram_simulation.psf_cube_sparse_indices[order], self.spectrogram_simulation.M_sparse_indices[order] = self.spectrum.chromatic_psf.get_sparse_indices(self.spectrogram_simulation.boundaries[order]) # mask = np.sum(self.spectrogram_simulation.psf_cubes_masked[self.diffraction_orders[0]].reshape(psf_cube_masked.shape[0], self.spectrogram_simulation.pixels[0].size), axis=0) == 0 # cumulate the boolean values as int weight_mask = np.sum(self.spectrogram_simulation.psf_cubes_masked[self.diffraction_orders[0]], axis=0) # look for indices with maximum weight per column (all sheets of the psf cube have contributed) res = np.max(weight_mask, axis=0)[np.newaxis,:] * np.ones((weight_mask.shape[0],1)) # keep only the pixels where all psf_cube sheets have contributed per column mask = (weight_mask != res).ravel() self.mask = list(self.mask_before_mask) + list(np.where(mask)[0]) self.mask = list(set(self.mask)) self.W = np.copy(self.W_before_mask) self.W[self.mask] = 0
[docs] def get_spectrogram_truth(self): """Load the truth parameters (if provided) from the file header. """ if 'A1_T' in list(self.spectrum.header.keys()): A1_truth = self.spectrum.header['A1_T'] A2_truth = self.spectrum.header['A2_T'] if 'A3_T' in self.spectrum.header: A3_truth = self.spectrum.header['A3_T'] else: A3_truth = 0 ozone_truth = self.spectrum.header['OZONE_T'] pwv_truth = self.spectrum.header['PWV_T'] aerosols_truth = self.spectrum.header['VAOD_T'] D_truth = self.spectrum.header['D2CCD_T'] shiftx_truth = 0 shifty_truth = 0 rotation_angle = self.spectrum.header['ROT_T'] B = 1 Astar = 1 pressure = self.spectrum.header["OUTPRESS"] poly_truth = np.fromstring(self.spectrum.header['PSF_P_T'][1:-1], sep=',', dtype=float) self.truth = (A1_truth, A2_truth, A3_truth, aerosols_truth, ozone_truth, pwv_truth, D_truth, shiftx_truth, shifty_truth, rotation_angle, B, Astar, pressure, *poly_truth) self.lambdas_truth = np.fromstring(self.spectrum.header['LBDAS_T'][1:-1], sep=',', dtype=float) self.amplitude_truth = np.fromstring(self.spectrum.header['AMPLIS_T'][1:-1], sep=',', dtype=float) else: self.truth = None
[docs] def plot_spectrogram_comparison_simple(self, ax, title='', extent=None, dispersion=False): """Method to plot a spectrogram issued from data and compare it with simulations. Parameters ---------- ax: Axes Axes instance of shape (3, 2). title: str, optional Title for the simulation plot (default: ''). extent: array_like, optional Extent argument for imshow to crop plots (default: None). dispersion: bool, optional If True, plot a colored bar to see the associated wavelength color along the x axis (default: False). """ cmap_bwr = copy.copy(mpl.colormaps["bwr"]) cmap_bwr.set_bad(color='lightgrey') cmap_viridis = copy.copy(mpl.colormaps["viridis"]) cmap_viridis.set_bad(color='lightgrey') data = np.copy(self.data_before_mask) if len(self.outliers) > 0 or len(self.mask) > 0: bad_indices = np.array(list(self.get_bad_indices()) + list(self.mask)).astype(int) data[bad_indices] = np.nan lambdas = self.spectrum.lambdas sub = np.where((lambdas > parameters.LAMBDA_MIN) & (lambdas < parameters.LAMBDA_MAX))[0] sub = np.where(sub < self.spectrum.spectrogram_Nx)[0] data = data.reshape((self.Ny, self.Nx)) model = self.model.reshape((self.Ny, self.Nx)) err = self.err.reshape((self.Ny, self.Nx)) if extent is not None: sub = np.where((lambdas > extent[0]) & (lambdas < extent[1]))[0] if len(sub) > 0: norm = np.nanmax(data[:, sub]) plot_image_simple(ax[0, 0], data=data[:, sub] / norm, title='Data', aspect='auto', cax=ax[0, 1], vmin=0, vmax=1, units='1/max(data)', cmap=cmap_viridis) ax[0, 0].set_title('Data', fontsize=10, loc='center', color='white', y=0.8) plot_image_simple(ax[1, 0], data=model[:, sub] / norm, aspect='auto', cax=ax[1, 1], vmin=0, vmax=1, units='1/max(data)', cmap=cmap_viridis) if dispersion: x = self.spectrum.chromatic_psf.table['Dx'][sub[5:-5]] + self.spectrum.spectrogram_x0 - sub[0] y = np.ones_like(x) ax[1, 0].scatter(x, y, cmap=from_lambda_to_colormap(self.lambdas[sub[5:-5]]), edgecolors='None', c=self.lambdas[sub[5:-5]], label='', marker='o', s=10) ax[1, 0].set_xlim(0, model[:, sub].shape[1]) ax[1, 0].set_ylim(0, model[:, sub].shape[0]) # p0 = ax.plot(lambdas, self.model(lambdas), label='model') # # ax.plot(self.lambdas, self.model_noconv, label='before conv') if title != '': ax[1, 0].set_title(title, fontsize=10, loc='center', color='white', y=0.8) residuals = (data - model) # residuals_err = self.spectrum.spectrogram_err / self.model norm = np.sqrt(err**2 + self.model_err.reshape((self.Ny, self.Nx))**2) residuals /= norm std = float(np.nanstd(residuals[:, sub])) plot_image_simple(ax[2, 0], data=residuals[:, sub], vmin=-5 * std, vmax=5 * std, title='(Data-Model)/Err', aspect='auto', cax=ax[2, 1], units='', cmap=cmap_bwr) ax[2, 0].set_title('(Data-Model)/Err', fontsize=10, loc='center', color='black', y=0.8) ax[2, 0].text(0.05, 0.05, f'mean={np.nanmean(residuals[:, sub]):.3f}\nstd={np.nanstd(residuals[:, sub]):.3f}', horizontalalignment='left', verticalalignment='bottom', color='black', transform=ax[2, 0].transAxes) ax[0, 0].set_xticks(ax[2, 0].get_xticks()[1:-1]) ax[0, 1].get_yaxis().set_label_coords(3.5, 0.5) ax[1, 1].get_yaxis().set_label_coords(3.5, 0.5) ax[2, 1].get_yaxis().set_label_coords(3.5, 0.5)
[docs] def simulate(self, *params): """Interface method to simulate a spectrogram. Parameters ---------- params: array_like Simulation parameter array. Returns ------- lambdas: array_like Array of wavelengths (1D). model: array_like Flat 1D array of the spectrogram simulation. model_err: array_like Flat 1D array of the spectrogram simulation uncertainty. Examples -------- >>> spec = Spectrum('tests/data/reduc_20170530_134_spectrum.fits') >>> parameters.SPECTRACTOR_ATMOSPHERE_SIM = "libradtran" >>> w = SpectrogramFitWorkspace(spec, verbose=True) >>> lambdas, model, model_err = w.simulate(*w.params.values) >>> w.plot_fit() """ A1, A2, A3, aerosols, angstrom_exponent, ozone, pwv, B, Astar, D, shift_x, shift_y, angle, pressure, *psf_poly_params = params self.params.values = np.asarray(params) self.spectrogram_simulation.adr_params[3] = pressure if not self.fit_angstrom_exponent: angstrom_exponent = None lambdas, model, model_err = self.spectrogram_simulation.simulate(A1, A2, A3, aerosols, angstrom_exponent, ozone, pwv, D, shift_x, shift_y, angle, psf_poly_params) self.lambdas = lambdas self.model = model.flatten() self.model_err = model_err.flatten() self.model += B * self.bgd if self.starfield is not None: self.model += Astar * self.starfield if self.flat is not None: # TODO: if flat array is a cube flat, needs to multiply directly in build_psf_cube self.model *= self.flat return self.lambdas, self.model, self.model_err
[docs] def jacobian(self, params, model_input=None): start = time.time() if model_input is not None: lambdas, model, model_err = model_input else: lambdas, model, model_err = self.simulate(*params) model = model.flatten() J = np.zeros((params.size, model.size)) strategy = copy.copy(self.spectrogram_simulation.fix_psf_cube) atmosphere = copy.copy(self.spectrogram_simulation.atmosphere_sim) for ip, p in enumerate(params): if self.params.fixed[ip]: continue if ip in self.fixed_psf_params: self.spectrogram_simulation.fix_psf_cube = True else: self.spectrogram_simulation.fix_psf_cube = False if ip in self.atm_params_indices: self.spectrogram_simulation.fix_atm_sim = False else: self.spectrogram_simulation.fix_atm_sim = True if ip >= self.psf_params_start_index[0]: continue tmp_p = np.copy(params) if tmp_p[ip] + self.epsilon[ip] < self.params.bounds[ip][0] or tmp_p[ip] + self.epsilon[ip] > self.params.bounds[ip][1]: self.epsilon[ip] = - self.epsilon[ip] tmp_p[ip] += self.epsilon[ip] tmp_lambdas, tmp_model, tmp_model_err = self.simulate(*tmp_p) if self.spectrogram_simulation.fix_atm_sim is False: self.spectrogram_simulation.atmosphere_sim = atmosphere J[ip] = (tmp_model.flatten() - model) / self.epsilon[ip] self.spectrogram_simulation.fix_atm_sim = True self.spectrogram_simulation.fix_psf_cube = False for k, order in enumerate(self.diffraction_orders): if self.spectrogram_simulation.profile_params[order] is None: continue start = self.psf_params_start_index[k] profile_params = np.copy(self.spectrogram_simulation.profile_params[order]) J[start:start+len(self.psf_poly_params)] = self.spectrogram_simulation.chromatic_psf.build_psf_jacobian(self.spectrogram_simulation.pixels, profile_params=profile_params, psf_cube_sparse_indices=self.spectrogram_simulation.psf_cube_sparse_indices[order], boundaries=self.spectrogram_simulation.boundaries[order], dtype="float32") self.spectrogram_simulation.fix_psf_cube = strategy self.spectrogram_simulation.fix_atm_sim = False self.my_logger.debug(f"\n\tJacobian time computation = {time.time() - start:.1f}s") return J
[docs] def plot_fit(self): """Plot the fit result. Examples -------- >>> spec = Spectrum('tests/data/reduc_20170530_134_spectrum.fits') >>> parameters.SPECTRACTOR_ATMOSPHERE_SIM = "libradtran" >>> w = SpectrogramFitWorkspace(spec, verbose=True) >>> lambdas, model, model_err = w.simulate(*w.params.values) >>> w.plot_fit() .. plot:: :include-source: from spectractor.fit.fit_spectrogram import SpectrogramFitWorkspace file_name = 'tests/data/reduc_20170530_134_spectrum.fits' atmgrid_file_name = file_name.replace('spectrum', 'atmsim') fit_workspace = SpectrogramFitWorkspace(file_name, atmgrid_file_name=atmgrid_file_name, verbose=True) A1, A2, aerosols, ozone, pwv, D, shift_x, shift_y, angle, *psf = fit_workspace.p lambdas, model, model_err = fit_workspace.simulation.simulate(A1, A2, aerosols, ozone, pwv, D, shift_x, shift_y, angle, psf) fit_workspace.lambdas = lambdas fit_workspace.model = model fit_workspace.model_err = model_err fit_workspace.plot_fit() """ gs_kw = dict(width_ratios=[3, 0.01, 1, 0.01, 1, 0.15], height_ratios=[1, 1, 1]) fig, ax = plt.subplots(nrows=3, ncols=6, figsize=(10, 8), gridspec_kw=gs_kw) # A1, A2, aerosols, ozone, pwv, D, shift_x, shift_y, shift_t, B, *psf = self.p # plt.suptitle(f'A1={A1:.3f}, A2={A2:.3f}, PWV={pwv:.3f}, OZ={ozone:.3g}, VAOD={aerosols:.3f}, ' # f'D={D:.2f}mm, shift_y={shift_y:.2f}pix, B={B:.3f}', y=1) # main plot self.plot_spectrogram_comparison_simple(ax[:, 0:2], title='Spectrogram model', dispersion=True) # zoom O2 if np.max(self.spectrum.lambdas) > 800 and np.min(self.spectrum.lambdas) < 730: self.plot_spectrogram_comparison_simple(ax[:, 2:4], extent=[730, 800], title='Zoom $O_2$', dispersion=True) # zoom H2O if np.max(self.spectrum.lambdas) > 1000 and np.min(self.spectrum.lambdas) < 870: self.plot_spectrogram_comparison_simple(ax[:, 4:6], extent=[870, 1000], title='Zoom $H_2 O$', dispersion=True) for i in range(3): # clear middle colorbars for j in range(2): plt.delaxes(ax[i, 2*j+1]) for i in range(3): # clear middle y axis labels for j in range(1, 3): ax[i, 2*j].set_ylabel("") fig.tight_layout() if self.live_fit: # pragma: no cover plt.draw() plt.pause(1e-8) plt.close() else: if parameters.DISPLAY and self.verbose: plt.show() if parameters.PdfPages: parameters.PdfPages.savefig() if parameters.SAVE: figname = os.path.splitext(self.filename)[0] + "_bestfit.pdf" self.my_logger.info(f"\n\tSave figure {figname}.") fig.savefig(figname, dpi=100, bbox_inches='tight', transparent=True)
[docs] def lnprob_spectrogram(p): # pragma: no cover """Logarithmic likelihood function to maximize in MCMC exploration. Parameters ---------- p: array_like Array of SpectrogramFitWorkspace parameters. Returns ------- lp: float Log of the likelihood function. """ global fit_workspace lp = fit_workspace.lnprior(p) if not np.isfinite(lp): return -1e20 return lp + fit_workspace.lnlike_spectrogram(p)
[docs] def run_spectrogram_minimisation(fit_workspace, method="newton", verbose=False): """Interface function to fit spectrogram simulation parameters to data. Parameters ---------- fit_workspace: SpectrogramFitWorkspace An instance of the SpectrogramFitWorkspace class. method: str, optional Fitting method (default: 'newton'). Examples -------- >>> spec = Spectrum('tests/data/reduc_20170530_134_spectrum.fits') >>> parameters.SPECTRACTOR_ATMOSPHERE_SIM = "libradtran" >>> w = SpectrogramFitWorkspace(spec, verbose=True, atmgrid_file_name='tests/data/reduc_20170530_134_atmsim.fits') >>> parameters.VERBOSE = True >>> run_spectrogram_minimisation(w, method="newton") """ my_logger = set_logger(__name__) guess = np.asarray(fit_workspace.params.values) fit_workspace.simulate(*guess) # fit_workspace.plot_fit() if method != "newton": run_minimisation(fit_workspace, method=method) else: # costs = np.array([fit_workspace.chisq(guess)]) # if parameters.DISPLAY and (parameters.DEBUG or fit_workspace.live_fit): # fit_workspace.plot_fit() # params_table = np.array([guess]) start = time.time() my_logger.info(f"\n\tStart guess: {guess}\n\twith {fit_workspace.params.labels}") fixed_default = np.copy(fit_workspace.params.fixed) # fit_workspace.simulation.fast_sim = True # fit_workspace.simulation.fix_psf_cube = False # fit_workspace.fixed = np.copy(fixed) # fit_workspace.fixed[:fit_workspace.psf_params_start_index] = True # params_table, costs = run_gradient_descent(fit_workspace, guess, epsilon, params_table, costs, # fix=fit_workspace.fixed, xtol=1e-3, ftol=1e-2, niter=10) # fit_workspace.simulation.fast_sim = True # fit_workspace.simulation.fix_psf_cube = False # fit_workspace.fixed = np.copy(fixed) # for ip, label in enumerate(fit_workspace.input_labels): # if "y_c_0" in label: # fit_workspace.fixed[ip] = False # else: # fit_workspace.fixed[ip] = True # run_minimisation(fit_workspace, method="newton", epsilon=epsilon, fix=fit_workspace.fixed, # xtol=1e-2, ftol=10 / fit_workspace.data.size, verbose=False) fit_workspace.spectrogram_simulation.fast_sim = False fit_workspace.spectrogram_simulation.fix_psf_cube = False fit_workspace.params.fixed = [True] * len(fit_workspace.params.values) fit_workspace.params.fixed[fit_workspace.params.get_index(r"VAOD")] = False # VAOD fit_workspace.params.fixed[fit_workspace.params.get_index(r"shift_y [pix]")] = False # shift y fit_workspace.params.fixed[fit_workspace.params.get_index(r"angle [deg]")] = False # angle run_minimisation(fit_workspace, "newton", xtol=1e-2, ftol=0.01, with_line_search=False) fit_workspace.params.fixed = fixed_default fit_workspace.spectrogram_simulation.fast_sim = False fit_workspace.spectrogram_simulation.fix_psf_cube = False fit_workspace.params.fixed = np.copy(fixed_default) # fit_workspace.params.values[fit_workspace.params.get_index(r"A1")] = 1.0 # A1 # guess = fit_workspace.p # params_table, costs = run_gradient_descent(fit_workspace, guess, epsilon, params_table, costs, # fix=fit_workspace.fixed, xtol=1e-6, ftol=1 / fit_workspace.data.size, # niter=40) run_minimisation_sigma_clipping(fit_workspace, method="newton", xtol=1e-10, ftol=1e-3 / fit_workspace.data.size, sigma_clip=100, niter_clip=3, verbose=verbose, with_line_search=True) extra = {"chi2": fit_workspace.costs[-1] / fit_workspace.data.size, "date-obs": fit_workspace.spectrum.date_obs, "outliers": len(fit_workspace.outliers)} fit_workspace.params.extra = extra my_logger.info(f"\n\tNewton: total computation time: {time.time() - start}s") if fit_workspace.filename != "": fit_workspace.params.plot_correlation_matrix() write_fitparameter_json(fit_workspace.params.json_filename, fit_workspace.params, extra=extra) # save_gradient_descent(fit_workspace, costs, params_table) fit_workspace.plot_fit()
if __name__ == "__main__": import doctest doctest.testmod()