from scipy.signal import argrelextrema, savgol_filter
from scipy.interpolate import interp1d
from astropy.io import fits
from astropy.table import Table
import astropy.units as u
from scipy import integrate
from scipy import optimize
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import string
import astropy
import warnings
import itertools
warnings.filterwarnings('ignore', category=astropy.io.fits.card.VerifyWarning, append=True)
from spectractor import parameters
from spectractor.config import set_logger, load_config, update_derived_parameters
from spectractor.extractor.dispersers import Hologram
from spectractor.extractor.targets import load_target
from spectractor.tools import (ensure_dir, load_fits, plot_image_simple, plot_table_in_axis,
find_nearest, plot_spectrum_simple, fit_poly1d_legendre, gauss,
rescale_x_to_legendre, fit_multigauss_and_bgd, multigauss_and_bgd, multigauss_and_bgd_jacobian)
from spectractor.extractor.psf import load_PSF
from spectractor.extractor.chromaticpsf import ChromaticPSF
from spectractor.simulation.adr import adr_calib, flip_and_rotate_adr_to_image_xy_coordinates
from spectractor.simulation.throughput import TelescopeTransmission
from spectractor.fit.fitter import FitWorkspace, FitParameters, run_minimisation
from lsst.utils.threads import disable_implicit_threading
disable_implicit_threading()
fits_mappings = {'config': 'CONFIG',
'date_obs': 'DATE-OBS',
'expo': 'EXPTIME',
'airmass': 'AIRMASS',
'disperser_label': 'GRATING',
'units': 'UNIT2',
'rotation_angle': 'ROTANGLE',
'dec': 'DEC',
'hour_angle': 'HA',
'temperature': 'OUTTEMP',
'pressure': 'OUTPRESS',
'humidity': 'OUTHUM',
'lambda_ref': 'LBDA_REF',
'parallactic_angle': 'PARANGLE',
'filter_label': 'FILTER',
'camera_angle': 'CAM_ROT',
'spectrogram_x0': 'S_X0',
'spectrogram_y0': 'S_Y0',
'spectrogram_xmin': 'S_XMIN',
'spectrogram_xmax': 'S_XMAX',
'spectrogram_ymin': 'S_YMIN',
'spectrogram_ymax': 'S_YMAX',
'spectrogram_Nx': 'S_NX',
'spectrogram_Ny': 'S_NY',
'spectrogram_deg': 'S_DEG',
'spectrogram_saturation': 'S_SAT',
'order': 'S_ORDER'
}
[docs]
class Spectrum:
""" Class used to store information and methods relative to spectra and their extraction.
Attributes
----------
my_logger: logging
Logging object
fast_load: bool
If True, only load the spectrum but not the spectrogram.
units: str
Units of the spectrum.
lambdas: array
Spectrum wavelengths in nm.
data: array
Spectrum amplitude array in self.units units.
err: array
Spectrum amplitude uncertainties in self.units units.
cov_matrix: array
Spectrum amplitude covariance matrix between wavelengths in self.units units.
lambdas_binwidths: array
Bin widths of the wavelength array in nm.
data_next_order: array
Spectrum amplitude array for next diffraction order in self.units units.
err_next_order: array
Spectrum amplitude uncertainties for next diffraction order in self.units units.
lambda_ref: float
Reference wavelength for ADR computations in nm.
order: int
Index of the diffraction order.
x0: array
Target position [x,y] in the image in pixels.
psf: PSF
PSF instance to model the spectrum PSF.
chromatic_psf: ChromaticPSF
ChromaticPSF object that contains data on the PSF shape and evolution in wavelength.
date_obs: str
Date of the observation.
airmass: float
Airmass of the current target.
expo: float
Exposure time in seconds.
disperser_label: str
Label of the disperser.
filter_label: str:
Label of the filter.
rotation_angle: float
Dispersion axis angle in the image in degrees, positive if anticlockwise.
parallactic_angle: float
Parallactic angle in degrees.
camera_angle: float
The North-West axe angle with respect to the camera horizontal axis in degrees.
lines: Lines
Lines instance that contains data on the emission or absorption lines to be searched and fitted in the spectrum.
header: Fits.Header
FITS file header.
disperser: Disperser
Disperser instance that describes the disperser.
target: Target
Target instance that describes the current exposure.
dec: float
Declination coordinate of the current exposure in degrees.
hour_angle float
Hour angle coordinate of the current exposure in degrees.
temperature: float
Outside temperature in Celsius degrees.
pressure: float
Outside pressure in hPa.
humidity: float
Outside relative humidity in fraction of one.
throughput: callable
Instrumental throughput of the telescope.
spectrogram_data: array
Spectrogram 2D image in image units.
spectrogram_bgd: array
Estimated 2D background fitted below the spectrogram in image units.
spectrogram_bgd_rms: array
Estimated 2D background RMS fitted below the spectrogram in image units.
spectrogram_err: array
Estimated 2D background uncertainty fitted below the spectrogram in image units.
spectrogram_fit: array
Best fitting model of the spectrogram in image units.
spectrogram_residuals: array
Residuals between the spectrogram data and the best fitting model of the spectrogram in image units.
spectrogram_flat: array
Flat array for the spectrogram with average=1.
spectrogram_starfield: array
Star field simulation array for the spectrogram in ADU/s.
spectrogram_mask: array
Boolean mask array to flag the defects.
spectrogram_x0: float
Relative position of the target in the spectrogram array along the x axis.
spectrogram_y0: float
Relative position of the target in the spectrogram array along the y axis.
spectrogram_xmin: int
Left index of the spectrogram crop in the image.
spectrogram_xmax: int
Right index of the spectrogram crop in the image.
spectrogram_ymin: int
Bottom index of the spectrogram crop in the image.
spectrogram_ymax: int
Top index of the spectrogram crop in the image.
spectrogram_deg: int
Degree of the polynomial functions to model wavelength evolutions of the PSF parameters.
spectrogram_saturation: float
Level of saturation in the spectrogram in image units.
spectrogram_Nx: int
Size of the spectrogram along the x axis.
spectrogram_Ny: int
Size of the spectrogram along the y axis.
"""
def __init__(self, file_name="", image=None, order=1, target=None, config="", fast_load=False,
spectrogram_file_name_override=None,
psf_file_name_override=None,):
""" Class used to store information and methods relative to spectra and their extraction.
If a file name is provided, for Spectractor software version strictly below 2.4 one must provide
a config file also, otherwise do not set a config file (default).
Config parameters are loaded from file header since version 2.4.
Parameters
----------
file_name: str, optional
Path to the spectrum file (default: "").
image: Image, optional
Image object from which to create the Spectrum object:
copy the information from the Image header (default: None).
order: int
Order of the spectrum (default: 1)
target: Target, optional
Target object if provided (default: None)
config: str, optional
A config file name to load some parameter values for a given instrument (default: "").
fast_load: bool, optional
If True, only the spectrum is loaded (not the PSF nor the spectrogram data) (default: False).
config: str, optional
If empty, load the config from the spectrum file if it exists, otherwise load the config from the given config file (deftault: '').
Examples
--------
Load a spectrum from a fits file
>>> s = Spectrum(file_name='./tests/data/reduc_20170530_134_spectrum.fits', config="")
>>> print(s.order)
1
>>> print(s.target.label)
HD111980
>>> print(s.disperser_label)
HoloAmAg
Load a spectrum from a fits image file
>>> from spectractor.extractor.images import Image
>>> image = Image('tests/data/reduc_20170605_028.fits', target_label='PNG321.0+3.9', config="./config/ctio.ini")
>>> s = Spectrum(image=image)
>>> print(s.target.label)
PNG321.0+3.9
"""
self.fast_load = fast_load
self.my_logger = set_logger(self.__class__.__name__)
self.config = config
if config != "":
load_config(config)
self.target = target
self.data = None
self.err = None
self.cov_matrix = None
self.x0 = None
self.pixels = None
self.lambdas = None
self.lambdas_binwidths = None
self.lambdas_indices = None
self.lambda_ref = 550
self.order = order
self.chromatic_psf = None
self.filter_label = ""
self.filters = None
self.units = 'ADU/s'
self.gain = parameters.CCD_GAIN
self.psf = load_PSF(psf_type="Moffat", target=self.target)
self.chromatic_psf = ChromaticPSF(self.psf, Nx=1, Ny=1, deg=1, saturation=1)
self.rotation_angle = 0
self.parallactic_angle = None
self.camera_angle = 0
self.spectrogram_data = None
self.spectrogram_bgd = None
self.spectrogram_bgd_rms = None
self.spectrogram_err = None
self.spectrogram_residuals = None
self.spectrogram_fit = None
self.spectrogram_flat = None
self.spectrogram_starfield = None
self.spectrogram_mask = None
self.spectrogram_x0 = None
self.spectrogram_y0 = None
self.spectrogram_xmin = None
self.spectrogram_xmax = None
self.spectrogram_ymin = None
self.spectrogram_ymax = None
self.spectrogram_deg = None
self.spectrogram_saturation = None
self.spectrogram_Nx = None
self.spectrogram_Ny = None
self.data_next_order = None
self.err_next_order = None
self.dec = None
self.hour_angle = None
self.temperature = None
self.pressure = None
self.humidity = None
self.parallactic_angle = None
self.filename = file_name
if file_name != "":
self.load_spectrum(file_name,
spectrogram_file_name_override=spectrogram_file_name_override,
psf_file_name_override=psf_file_name_override, fast_load=fast_load)
if image is not None:
self.header = image.header
self.date_obs = image.date_obs
self.airmass = image.airmass
self.expo = image.expo
self.filters = image.filters
self.filter_label = image.filter_label
self.disperser_label = image.disperser_label
self.disperser = image.disperser
self.target = image.target
self.lines = self.target.lines
self.x0 = image.target_pixcoords
self.target_pixcoords = image.target_pixcoords
self.target_pixcoords_rotated = image.target_pixcoords_rotated
self.units = image.units
self.gain = image.gain
self.rotation_angle = image.rotation_angle
self.camera_angle = parameters.OBS_CAMERA_ROTATION
self.my_logger.info('\n\tSpectrum info copied from image')
self.dec = image.dec
self.hour_angle = image.hour_angle
self.temperature = image.temperature
self.pressure = image.pressure
self.humidity = image.humidity
self.parallactic_angle = image.parallactic_angle
self.adr_params = [self.dec, self.hour_angle, self.temperature, self.pressure,
self.humidity, self.airmass]
self.throughput = self.load_filter()
if self.target is not None and len(self.target.spectra) > 0:
spec = self.target.spectra[0] * self.throughput.transmission(self.target.wavelengths[0])
lambda_ref = np.sum(self.target.wavelengths[0] * spec) / np.sum(spec)
self.lambda_ref = lambda_ref
self.header['LBDA_REF'] = lambda_ref
[docs]
def convert_from_ADUrate_to_flam(self):
"""Convert units from ADU/s to erg/s/cm^2/nm.
The SED is supposed to be in flam units ie erg/s/cm^2/nm
Examples
--------
>>> s = Spectrum(file_name='tests/data/reduc_20170605_028_spectrum.fits', config="./config/ctio.ini")
>>> s.convert_from_ADUrate_to_flam()
.. doctest::
:hide:
>>> assert np.max(s.data) < 1e-2
>>> assert np.max(s.err) < 1e-2
"""
if self.units == 'erg/s/cm$^2$/nm' or self.units == "flam":
self.my_logger.warning(f"You ask to convert spectrum already in {self.units}"
f" in erg/s/cm^2/nm... check your code ! Skip the instruction.")
return
ldl = parameters.FLAM_TO_ADURATE * self.lambdas * np.abs(self.lambdas_binwidths)
self.data /= ldl
if self.err is not None:
self.err /= ldl
if self.cov_matrix is not None:
ldl_mat = np.outer(ldl, ldl)
self.cov_matrix /= ldl_mat
if self.data_next_order is not None:
self.data_next_order /= ldl
self.err_next_order /= ldl
self.units = 'erg/s/cm$^2$/nm'
[docs]
def convert_from_flam_to_ADUrate(self):
"""Convert units from erg/s/cm^2/nm to ADU/s.
The SED is supposed to be in flam units ie erg/s/cm^2/nm
Examples
--------
>>> s = Spectrum(file_name='tests/data/reduc_20170605_028_spectrum.fits', config="./config/ctio.ini")
>>> s.convert_from_flam_to_ADUrate()
.. doctest::
:hide:
>>> assert np.max(s.data) > 1e-2
>>> assert np.max(s.err) > 1e-2
"""
if self.units == "ADU/s":
self.my_logger.warning(f"You ask to convert spectrum already in {self.units} in ADU/s... check your code ! "
f"Skip the instruction")
return
ldl = parameters.FLAM_TO_ADURATE * self.lambdas * np.abs(self.lambdas_binwidths)
self.data *= ldl
if self.err is not None:
self.err *= ldl
if self.cov_matrix is not None:
ldl_mat = np.outer(ldl, ldl)
self.cov_matrix *= ldl_mat
if self.data_next_order is not None:
self.data_next_order *= ldl
self.err_next_order *= ldl
self.units = 'ADU/s'
[docs]
def load_filter(self):
"""Load filter properties and set relevant LAMBDA_MIN and LAMBDA_MAX values.
Examples
--------
>>> s = Spectrum(config="./config/ctio.ini")
>>> s.filter_label = 'FGB37'
>>> _ = s.load_filter()
.. doctest::
:hide:
>>> assert np.isclose(parameters.LAMBDA_MIN, 358, atol=1)
>>> assert np.isclose(parameters.LAMBDA_MAX, 760, atol=1)
"""
t = TelescopeTransmission(filter_label=self.filter_label)
if self.filter_label != "" and "empty" not in self.filter_label.lower():
t.reset_lambda_range(transmission_threshold=1e-4)
return t
[docs]
def plot_spectrum(self, ax=None, xlim=None, live_fit=False, label='', force_lines=False, calibration_only=False):
"""Plot spectrum with emission and absorption lines.
Parameters
----------
ax: Axes, optional
Axes instance (default: None).
label: str
Label for the legend (default: '').
xlim: list, optional
List of minimum and maximum abscisses (default: None)
live_fit: bool, optional
If True the spectrum is plotted in live during the fitting procedures
(default: False).
force_lines: bool
Force the over plot of vertical lines for atomic lines if set to True (default: False).
calibration_only: bool
Plot only the lines used for calibration if True (default: False).
Examples
--------
>>> s = Spectrum(file_name='tests/data/reduc_20170530_134_spectrum.fits')
>>> s.plot_spectrum(xlim=[500,900], live_fit=False, force_lines=True)
"""
if ax is None:
doplot = True
plt.figure(figsize=[12, 6])
ax = plt.gca()
else:
doplot = False
if label == '':
label = f'Order {self.order:d} spectrum\n' \
r'$D_{\mathrm{CCD}}=' \
rf'{self.header["D2CCD"]:.2f}\,$mm'
if self.x0 is not None:
label += rf', $x_0={self.x0[0]:.2f}\,$pix'
title = self.target.label
if self.data_next_order is not None and np.sum(np.abs(self.data_next_order)) > 0.05 * np.sum(np.abs(self.data)):
distance = self.disperser.grating_lambda_to_pixel(self.lambdas, self.x0, D=self.header["D2CCD"], order=parameters.SPEC_ORDER+1)
max_index = np.argmin(np.abs(distance + self.x0[0] - parameters.CCD_IMSIZE))
plot_spectrum_simple(ax, self.lambdas[:max_index], self.data_next_order[:max_index], data_err=self.err_next_order[:max_index],
xlim=xlim, label=f'Order {parameters.SPEC_ORDER+1} spectrum', linestyle="--", lw=1, color="firebrick")
plot_spectrum_simple(ax, self.lambdas, self.data, data_err=self.err, xlim=xlim, label=label,
title=title, units=self.units, lw=1, linestyle="-")
if len(self.target.spectra) > 0:
for k in range(len(self.target.spectra)):
plot_indices = np.logical_and(self.target.wavelengths[k] > np.min(self.lambdas),
self.target.wavelengths[k] < np.max(self.lambdas))
s = self.target.spectra[k] / np.max(self.target.spectra[k][plot_indices]) * np.max(self.data)
ax.plot(self.target.wavelengths[k], s, lw=2, label=f'Tabulated spectra #{k}')
if self.lambdas is not None:
self.lines.plot_detected_lines(ax)
if self.lines is not None and len(self.lines.table) > 0:
self.my_logger.info(f"\n{self.lines.table}")
if self.lambdas is not None and self.lines is not None:
self.lines.plot_atomic_lines(ax, fontsize=12, force=force_lines, calibration_only=calibration_only)
ax.legend(loc='best')
if self.filters is not None:
ax.get_legend().set_title(self.filters)
plt.gcf().tight_layout()
if parameters.LSST_SAVEFIGPATH: # pragma: no cover
plt.gcf().savefig(os.path.join(parameters.LSST_SAVEFIGPATH, f'{self.target.label}_spectrum.pdf'))
if parameters.DISPLAY and doplot: # pragma: no cover
if live_fit:
plt.draw()
plt.pause(1e-8)
plt.close()
else:
plt.show()
[docs]
def plot_spectrogram(self, ax=None, scale="lin", title="", units="Image units", plot_stats=False,
target_pixcoords=None, vmin=None, vmax=None, figsize=[9.3, 8], aspect=None,
cmap=None, cax=None):
"""Plot spectrogram.
Parameters
----------
ax: Axes, optional
Axes instance (default: None).
scale: str
Scaling of the image (choose between: lin, log or log10) (default: lin)
title: str
Title of the image (default: "")
units: str
Units of the image to be written in the color bar label (default: "Image units")
cmap: colormap
Color map label (default: None)
target_pixcoords: array_like, optional
2D array giving the (x,y) coordinates of the targets on the image: add a scatter plot (default: None)
vmin: float
Minimum value of the image (default: None)
vmax: float
Maximum value of the image (default: None)
aspect: str
Aspect keyword to be passed to imshow (default: None)
cax: Axes, optional
Color bar axes if necessary (default: None).
figsize: tuple
Figure size (default: [9.3, 8]).
plot_stats: bool
If True, plot the uncertainty map instead of the spectrogram (default: False).
Examples
--------
>>> s = Spectrum(file_name='tests/data/reduc_20170530_134_spectrum.fits')
>>> s.plot_spectrogram()
>>> if parameters.DISPLAY: plt.show()
.. plot::
from spectractor.extractor.spectrum import Spectrum
s = Spectrum(file_name='tests/data/reduc_20170530_134_spectrum.fits')
s.plot_spectrogram()
"""
if ax is None:
plt.figure(figsize=figsize)
ax = plt.gca()
data = np.copy(self.spectrogram_data)
if plot_stats:
data = np.copy(self.spectrogram_err)
plot_image_simple(ax, data=data, scale=scale, title=title, units=units, cax=cax,
target_pixcoords=target_pixcoords, aspect=aspect, vmin=vmin, vmax=vmax, cmap=cmap)
if parameters.DISPLAY: # pragma: no cover
plt.show()
if parameters.PdfPages: # pragma: no cover
parameters.PdfPages.savefig()
[docs]
def plot_spectrum_summary(self, xlim=None, figsize=(12, 12), save_as=''):
"""Plot spectrum with emission and absorption lines.
Parameters
----------
xlim: list, optional
List of minimum and maximum abscisses (default: None).
figsize: tuple
Figure size (default: (12, 12)).
save_as : str, optional
Path to save the figure to, if specified.
Examples
--------
>>> s = Spectrum(file_name='tests/data/reduc_20170530_134_spectrum.fits')
>>> s.plot_spectrum_summary()
"""
if not np.any([line.fitted for line in self.lines.lines]):
fwhm_func = interp1d(self.chromatic_psf.table['lambdas'],
self.chromatic_psf.table['fwhm'],
fill_value=(parameters.CALIB_PEAK_WIDTH, parameters.CALIB_PEAK_WIDTH),
bounds_error=False)
detect_lines(self.lines, self.lambdas, self.data, self.err, fwhm_func=fwhm_func,
calibration_lines_only=True)
def generate_axes(fig):
tableShrink = 3
tableGap = 1
gridspec = fig.add_gridspec(nrows=23, ncols=20)
axes = {}
axes['A'] = fig.add_subplot(gridspec[0:3, 0:19])
axes['C'] = fig.add_subplot(gridspec[3:6, 0:19], sharex=axes['A'])
axes['B'] = fig.add_subplot(gridspec[6:14, 0:19])
axes['CA'] = fig.add_subplot(gridspec[0:3, 19:20])
axes['CC'] = fig.add_subplot(gridspec[3:6, 19:20])
axes['D'] = fig.add_subplot(gridspec[14:16, 0:19], sharex=axes['B'])
axes['E'] = fig.add_subplot(gridspec[16+tableGap:23, tableShrink:19-tableShrink])
return axes
fig = plt.figure(figsize=figsize)
axes = generate_axes(fig)
plt.suptitle(f"{self.target.label} {self.date_obs}", y=0.91, fontsize=16)
mainPlot = axes['B']
spectrogramPlot = axes['A']
spectrogramPlotCb = axes['CA']
residualsPlot = axes['C']
residualsPlotCb = axes['CC']
widthPlot = axes['D']
tablePlot = axes['E']
label = f'Order {self.order:d} spectrum\n' \
r'$D_{\mathrm{CCD}}=' \
rf'{self.header["D2CCD"]:.2f}\,$mm'
plot_spectrum_simple(mainPlot, self.lambdas, self.data, data_err=self.err, xlim=xlim, label=label,
title='', units=self.units, lw=1, linestyle="-")
if len(self.target.spectra) > 0:
plot_indices = np.logical_and(self.target.wavelengths[0] > np.min(self.lambdas),
self.target.wavelengths[0] < np.max(self.lambdas))
s = self.target.spectra[0] / np.max(self.target.spectra[0][plot_indices]) * np.max(self.data)
mainPlot.plot(self.target.wavelengths[0], s, lw=2, label='Normalized\nCALSPEC spectrum')
self.lines.plot_atomic_lines(mainPlot, fontsize=12, force=False, calibration_only=True)
self.lines.plot_detected_lines(mainPlot, calibration_only=True)
table = self.lines.build_detected_line_table(calibration_only=True)
plot_table_in_axis(tablePlot, table)
mainPlot.legend()
widthPlot.plot(self.lambdas, np.array(self.chromatic_psf.table['fwhm']), "r-", lw=2)
widthPlot.set_ylabel("FWHM [pix]")
widthPlot.set_xlabel(r'$\lambda$ [nm]')
widthPlot.grid()
spectrogram = np.copy(self.spectrogram_data)
res = self.spectrogram_residuals.reshape((-1, self.spectrogram_Nx))
std = np.std(res)
if spectrogram.shape[0] != res.shape[0]:
margin = (spectrogram.shape[0] - res.shape[0]) // 2
spectrogram = spectrogram[margin:-margin]
plot_image_simple(spectrogramPlot, data=spectrogram, title='Data',
aspect='auto', cax=spectrogramPlotCb, units='ADU/s', cmap='viridis')
spectrogramPlot.set_title('Data', fontsize=10, loc='center', color='white', y=0.8)
spectrogramPlot.grid(False)
plot_image_simple(residualsPlot, data=res, vmin=-5 * std, vmax=5 * std, title='(Data-Model)/Err',
aspect='auto', cax=residualsPlotCb, units=r'$\sigma$', cmap='bwr')
residualsPlot.set_title('(Data-Model)/Err', fontsize=10, loc='center', color='black', y=0.8)
residualsPlot.grid(False)
# hide the tick labels in the plots which share an x axis
for label in itertools.chain(mainPlot.get_xticklabels(), residualsPlot.get_xticklabels(), spectrogramPlot.get_xticklabels()):
label.set_visible(False)
# align y labels
for ax in [spectrogramPlot, residualsPlot, mainPlot, widthPlot]:
ax.yaxis.set_label_coords(-0.05, 0.5)
fig.subplots_adjust(hspace=0)
if save_as:
plt.savefig(save_as)
plt.show()
[docs]
def save_spectrum(self, output_file_name, overwrite=False):
"""Save the spectrum into a fits file (data, error and wavelengths).
Parameters
----------
output_file_name: str
Path of the output fits file.
overwrite: bool
If True overwrite the output file if needed (default: False).
Examples
--------
>>> import os
>>> s = Spectrum(file_name='tests/data/reduc_20170530_134_spectrum.fits')
>>> s.save_spectrum('./tests/test.fits')
.. doctest::
:hide:
>>> assert os.path.isfile('./tests/test.fits')
Overwrite previous file:
>>> s.save_spectrum('./tests/test.fits', overwrite=True)
.. doctest::
:hide:
>>> assert os.path.isfile('./tests/test.fits')
>>> os.remove('./tests/test.fits')
"""
from spectractor._version import __version__
self.header["VERSION"] = str(__version__)
self.header["CCD_REBIN"] = parameters.CCD_REBIN
self.header.comments['CCD_REBIN'] = 'original image rebinning factor to get spectrum.'
self.header['UNIT1'] = "nanometer"
self.header['UNIT2'] = self.units
self.header['COMMENTS'] = 'First column gives the wavelength in unit UNIT1, ' \
'second column gives the spectrum in unit UNIT2, ' \
'third column the corresponding errors.'
hdu1 = fits.PrimaryHDU()
hdu1.header = self.header
for attribute, header_key in fits_mappings.items():
try:
value = getattr(self, attribute)
except AttributeError:
self.my_logger.warning(f"Failed to get {attribute}")
continue
if isinstance(value, astropy.coordinates.angles.Angle):
value = value.degree
hdu1.header[header_key] = value
# print(f"Set header key {header_key} to {value} from attr {attribute}")
extnames = ["SPECTRUM", "SPEC_COV", "ORDER2"] # spectrum data
extnames += ["ORDER0"] # spectrum order0 timestamps
extnames += ["S_DATA", "S_ERR", "S_BGD", "S_BGD_ER", "S_FIT", "S_RES", "S_FLAT", "S_STAR", "S_MASK"]
extnames += ["PSF_TAB"] # PSF parameter table
extnames += ["LINES"] # spectroscopic line table
extnames += ["CONFIG"] # config parameters
hdus = {"SPECTRUM": hdu1}
for k, extname in enumerate(extnames):
if extname == "SPECTRUM":
hdus[extname].data = [self.lambdas, self.data, self.err]
continue
hdus[extname] = fits.ImageHDU()
if extname == "SPEC_COV":
hdus[extname].data = self.cov_matrix
elif extname == "ORDER2":
if self.data_next_order is None:
data_next_order = np.zeros_like(self.data)
err_next_order = np.zeros_like(self.err)
else:
data_next_order = self.data_next_order
err_next_order = self.err_next_order
hdus[extname].data = [self.lambdas, data_next_order, err_next_order]
elif extname == "ORDER0":
hdus[extname].data = self.target.image
hdus[extname].header["IM_X0"] = self.target.image_x0
hdus[extname].header["IM_Y0"] = self.target.image_y0
elif extname == "S_DATA":
hdus[extname].data = self.spectrogram_data
hdus[extname].header['UNIT1'] = self.units
elif extname == "S_ERR":
hdus[extname].data = self.spectrogram_err
elif extname == "S_BGD":
hdus[extname].data = self.spectrogram_bgd
elif extname == "S_BGD_ER":
hdus[extname].data = self.spectrogram_bgd_rms
elif extname == "S_FIT":
hdus[extname].data = self.spectrogram_fit
elif extname == "S_RES":
hdus[extname].data = self.spectrogram_residuals
elif extname == "S_FLAT":
hdus[extname].data = self.spectrogram_flat
elif extname == "S_STAR":
hdus[extname].data = self.spectrogram_starfield
elif extname == "S_MASK":
if self.spectrogram_mask is not None:
hdus[extname].data = self.spectrogram_mask.astype(int)
else:
hdus[extname].data = self.spectrogram_mask
elif extname == "PSF_TAB":
hdus[extname] = fits.table_to_hdu(self.chromatic_psf.table)
elif extname == "LINES":
u.set_enabled_aliases({'flam': u.erg / u.s / u.cm**2 / u.nm,
'reduced': u.dimensionless_unscaled})
tab = self.lines.build_detected_line_table(amplitude_units=self.units.replace("erg/s/cm$^2$/nm", "flam"))
hdus[extname] = fits.table_to_hdu(tab)
elif extname == "CONFIG":
# HIERARCH and CONTINUE not compatible together in FITS headers
# We must use short keys built by parametersToShortKeyedDict and use CONTINUE
# waiting for cfitsio upgrade
# Store the parameter translation <-> shortkeys
for item in dir(parameters):
if item.startswith("__") or item[0].islower(): # ignore the special stuff
continue
if item in parameters.STYLE_PARAMETERS: # don't save plot or verbosity parameters
continue
try:
value = getattr(parameters, item)
if isinstance(value, astropy.coordinates.angles.Angle):
value = value.degree
if isinstance(value, astropy.units.quantity.Quantity):
value = value.value
if isinstance(value, (np.ndarray, list)):
continue
if not isinstance(value, (float, int, str, np.ndarray, list)):
raise ValueError(f"Can't handle {parameters.item} type {type(parameters.item)}.")
except AttributeError:
raise KeyError(f"Failed to get parameters.{item}.")
if len(item) > 8:
fits_longkey = "HIERARCH " + item
char_set = string.ascii_uppercase + string.digits
while (shortkey := "X_" + ''.join(random.sample(char_set * 6, 6))) in hdus[extname].header.values():
pass
hdus[extname].header[fits_longkey] = shortkey
hdus[extname].header[shortkey] = value
else:
hdus[extname].header[item] = value
else:
raise ValueError(f"Unknown EXTNAME extension: {extname}.")
hdus[extname].header["EXTNAME"] = extname
hdu = fits.HDUList([hdus[extname] for extname in extnames])
ensure_dir(os.path.dirname(output_file_name))
hdu.writeto(output_file_name, overwrite=overwrite)
self.my_logger.info(f'\n\tSpectrum saved in {output_file_name}')
[docs]
def save_spectrogram(self, output_file_name, overwrite=False): # pragma: no cover
"""OBOSOLETE: save the spectrogram into a fits file (data, error and background).
Parameters
----------
output_file_name: str
Path of the output fits file.
overwrite: bool, optional
If True overwrite the output file if needed (default: False).
Examples
--------
"""
self.header['UNIT1'] = self.units
self.header['COMMENTS'] = 'First HDU gives the data in UNIT1 units, ' \
'second HDU gives the uncertainties, ' \
'third HDU the fitted background.'
self.header['S_X0'] = self.spectrogram_x0
self.header['S_Y0'] = self.spectrogram_y0
self.header['S_XMIN'] = self.spectrogram_xmin
self.header['S_XMAX'] = self.spectrogram_xmax
self.header['S_YMIN'] = self.spectrogram_ymin
self.header['S_YMAX'] = self.spectrogram_ymax
self.header['S_DEG'] = self.spectrogram_deg
self.header['S_SAT'] = self.spectrogram_saturation
hdu1 = fits.PrimaryHDU()
hdu1.header["EXTNAME"] = "S_DATA"
hdu2 = fits.ImageHDU()
hdu2.header["EXTNAME"] = "S_ERR"
hdu3 = fits.ImageHDU()
hdu3.header["EXTNAME"] = "S_BGD"
hdu4 = fits.ImageHDU()
hdu4.header["EXTNAME"] = "S_BGD_ER"
hdu5 = fits.ImageHDU()
hdu5.header["EXTNAME"] = "S_FIT"
hdu6 = fits.ImageHDU()
hdu6.header["EXTNAME"] = "S_RES"
hdu1.header = self.header
hdu1.data = self.spectrogram_data
hdu2.data = self.spectrogram_err
hdu3.data = self.spectrogram_bgd
hdu4.data = self.spectrogram_bgd_rms
hdu5.data = self.spectrogram_fit
hdu6.data = self.spectrogram_residuals
hdu = fits.HDUList([hdu1, hdu2, hdu3, hdu4, hdu5, hdu6])
output_directory = '/'.join(output_file_name.split('/')[:-1])
ensure_dir(output_directory)
hdu.writeto(output_file_name, overwrite=overwrite)
self.my_logger.info('\n\tSpectrogram saved in %s' % output_file_name)
[docs]
def load_spectrum(self, input_file_name, spectrogram_file_name_override=None,
psf_file_name_override=None, fast_load=False):
"""Load the spectrum from a fits file (data, error and wavelengths).
Parameters
----------
input_file_name: str
Path to the input fits file
spectrogram_file_name_override : str
Manually specify a path to the spectrogram file.
psf_file_name_override : str
Manually specify a path to the psf file.
fast_load: bool, optional
If True, only the spectrum is loaded (not the PSF nor the spectrogram data) (default: False).
Examples
--------
Latest Spectractor output format: do not provide a config file (parameters are loaded from file header)
>>> from spectractor import parameters
>>> s = Spectrum(config="")
>>> s.load_spectrum('tests/data/reduc_20170530_134_spectrum.fits')
.. doctest::
:hide:
>>> assert parameters.OBS_CAMERA_ROTATION == s.header["CAM_ROT"]
>>> assert parameters.CCD_REBIN == s.header["CCD_REBIN"]
>>> assert s.parallactic_angle == s.header["PARANGLE"]
Spectractor output format older than version <=2.3: must give the config file
>>> parameters.VERBOSE = False
>>> s = Spectrum(config="./config/ctio.ini")
>>> s.load_spectrum('tests/data/reduc_20170605_028_spectrum.fits')
>>> print(s.units)
erg/s/cm$^2$/nm
.. doctest::
:hide:
>>> assert parameters.OBS_CAMERA_ROTATION == s.header["CAM_ROT"]
>>> assert parameters.CCD_REBIN == s.header["REBIN"]
>>> assert s.parallactic_angle == s.header["PARANGLE"]
"""
self.fast_load = fast_load
if not os.path.isfile(input_file_name):
raise FileNotFoundError(f'\n\tSpectrum file {input_file_name} not found')
self.header, raw_data = load_fits(input_file_name)
# check the version of the file
if "VERSION" in self.header:
from spectractor._version import __version__
from packaging import version
if self.config != "":
raise AttributeError(f"With Spectractor above 2.4 do not provide a config file in Spectrum(config=...)."
f"Now config parameters are loaded from the file header. Got {self.config=}.")
if self.header["VERSION"] != str(__version__):
self.my_logger.debug(f"\n\tSpectrum file spectractor version {self.header['VERSION']} is "
f"different from current Spectractor software {__version__}.")
if version.parse(self.header["VERSION"]) < version.parse("3.0"):
self.my_logger.warning(f"\n\tSpectrum file spectractor version {self.header['VERSION']} is "
f"below Spectractor software 3.0. It may be deprecated.")
self.load_spectrum_latest(input_file_name)
else:
self.my_logger.warning("\n\tNo information about Spectractor software version is given in the header. "
"Use old load function.")
if self.config == "":
raise AttributeError("With old Spectrum files you must provide a config file in Spectrum(config=...).")
self.load_spectrum_older_24(input_file_name, spectrogram_file_name_override=spectrogram_file_name_override,
psf_file_name_override=psf_file_name_override)
[docs]
def load_spectrum_older_24(self, input_file_name, spectrogram_file_name_override=None,
psf_file_name_override=None, fast_load=False):
"""Load the spectrum from a FITS file (data, error and wavelengths) from Spectrum files generated
with Spectractor software strictly older than 2.4 version. The parameters must be loaded via the config files.
Parameters
----------
input_file_name: str
Path to the input fits file
spectrogram_file_name_override : str
Manually specify a path to the spectrogram file.
psf_file_name_override : str
Manually specify a path to the psf file.
fast_load: bool, optional
If True, only the spectrum is loaded (not the PSF nor the spectrogram data) (default: False).
Examples
--------
>>> s = Spectrum(config="./config/ctio.ini")
>>> s.load_spectrum('tests/data/reduc_20170605_028_spectrum.fits')
>>> print(s.units)
erg/s/cm$^2$/nm
"""
self.fast_load = fast_load
if not os.path.isfile(input_file_name):
raise FileNotFoundError(f'\n\tSpectrum file {input_file_name} not found')
self.header, raw_data = load_fits(input_file_name)
self.lambdas = raw_data[0]
self.lambdas_binwidths = np.gradient(self.lambdas)
self.data = raw_data[1]
if len(raw_data) > 2:
self.err = raw_data[2]
self.cov_matrix = np.diag(self.err ** 2)
# set the config parameters first
if "CAM_ROT" in self.header:
parameters.OBS_CAMERA_ROTATION = float(self.header["CAM_ROT"])
else:
self.my_logger.warning("\n\tNo information about camera rotation in Spectrum header.")
if self.header.get('CCDREBIN'):
if parameters.CCD_REBIN != self.header.get('CCDREBIN'):
raise ValueError("Different values of rebinning parameters between config file and header. Choose.")
parameters.CCD_REBIN = self.header.get('CCDREBIN')
if self.header.get('D2CCD'):
parameters.DISTANCE2CCD = float(self.header.get('D2CCD'))
# set the simple items from the mappings. More complex items, i.e.
# those needing function calls, follow
for attribute, header_key in fits_mappings.items():
if self.header.get(header_key) is not None:
setattr(self, attribute, self.header.get(header_key))
else:
self.my_logger.warning(f'\n\tFailed to set spectrum attribute {attribute} using header {header_key}')
# set the more complex items by hand here
if self.header.get('TARGET'):
self.target = load_target(self.header.get('TARGET'), verbose=parameters.VERBOSE)
self.lines = self.target.lines
if self.header.get('TARGETX') and self.header.get('TARGETY'):
self.x0 = [self.header.get('TARGETX'), self.header.get('TARGETY')] # should be a tuple not a list
self.my_logger.info(f'\n\tLoading disperser {self.disperser_label}...')
self.disperser = Hologram(self.disperser_label, data_dir=parameters.DISPERSER_DIR)
self.my_logger.info(f'\n\tSpectrum loaded from {input_file_name}')
if parameters.OBS_OBJECT_TYPE == "STAR":
self.adr_params = [self.dec, self.hour_angle, self.temperature,
self.pressure, self.humidity, self.airmass]
self.psf = load_PSF(psf_type=parameters.PSF_TYPE, target=self.target)
self.chromatic_psf = ChromaticPSF(self.psf, self.spectrogram_Nx, self.spectrogram_Ny,
x0=self.spectrogram_x0, y0=self.spectrogram_y0,
deg=self.spectrogram_deg, saturation=self.spectrogram_saturation)
if 'PSF_REG' in self.header and float(self.header["PSF_REG"]) > 0:
self.chromatic_psf.opt_reg = float(self.header["PSF_REG"])
# original, hard-coded spectrogram/table relative paths
spectrogram_file_name = input_file_name.replace('spectrum', 'spectrogram')
psf_file_name = input_file_name.replace('spectrum.fits', 'table.csv')
if spectrogram_file_name_override and psf_file_name_override:
self.fast_load = False
spectrogram_file_name = spectrogram_file_name_override
psf_file_name = psf_file_name_override
if not self.fast_load:
with fits.open(input_file_name) as hdu_list:
# load other spectrum info
if len(hdu_list) > 1:
self.cov_matrix = hdu_list["SPEC_COV"].data
if len(hdu_list) > 2:
_, self.data_next_order, self.err_next_order = hdu_list["ORDER2"].data
if len(hdu_list) > 3:
self.target.image = hdu_list["ORDER0"].data
self.target.image_x0 = float(hdu_list["ORDER0"].header["IM_X0"])
self.target.image_y0 = float(hdu_list["ORDER0"].header["IM_Y0"])
# load spectrogram info
if len(hdu_list) > 4:
self.spectrogram_data = hdu_list["S_DATA"].data
self.spectrogram_err = hdu_list["S_ERR"].data
self.spectrogram_bgd = hdu_list["S_BGD"].data
if len(hdu_list) > 7:
self.spectrogram_bgd_rms = hdu_list["S_BGD_ER"].data
self.spectrogram_fit = hdu_list["S_FIT"].data
self.spectrogram_residuals = hdu_list["S_RES"].data
elif os.path.isfile(spectrogram_file_name):
self.my_logger.info(f'\n\tLoading spectrogram from {spectrogram_file_name}...')
self.load_spectrogram(spectrogram_file_name)
else:
raise FileNotFoundError(f"\n\tNo spectrogram info in {input_file_name} "
f"and not even a spectrogram file {spectrogram_file_name}.")
if "PSF_TAB" in hdu_list:
self.chromatic_psf.init_from_table(Table.read(hdu_list["PSF_TAB"]),
saturation=self.spectrogram_saturation)
elif os.path.isfile(psf_file_name): # retro-compatibility
self.my_logger.info(f'\n\tLoading PSF from {psf_file_name}...')
self.load_chromatic_psf(psf_file_name)
else:
raise FileNotFoundError(f"\n\tNo PSF info in {input_file_name} "
f"and not even a PSF file {psf_file_name}.")
if "LINES" in hdu_list:
self.lines.table = Table.read(hdu_list["LINES"], unit_parse_strict="silent")
[docs]
def load_spectrum_latest(self, input_file_name):
"""Load the spectrum from a FITS file (data, error and wavelengths) from Spectrum files generated
with Spectractor software above or equal 2.4 version. The parameters are loaded via the FITS file header
and overwrites those loaded via the config file.
Parameters
----------
input_file_name: str
Path to the input fits file
Examples
--------
>>> s = Spectrum(config="")
>>> s.load_spectrum('tests/data/reduc_20170530_134_spectrum.fits')
>>> print(s.units)
erg/s/cm$^2$/nm
.. doctest::
:hide:
>>> assert parameters.OBS_CAMERA_ROTATION == s.header["CAM_ROT"]
>>> assert parameters.CCD_REBIN == s.header["CCD_REBIN"]
>>> assert parameters.OBS_OBJECT_TYPE == "STAR"
>>> assert s.parallactic_angle == s.header["PARANGLE"]
"""
self.header, raw_data = load_fits(input_file_name)
self.lambdas = raw_data[0]
self.lambdas_binwidths = np.gradient(self.lambdas)
self.data = raw_data[1]
if len(raw_data) > 2:
self.err = raw_data[2]
self.cov_matrix = np.diag(self.err ** 2)
# set the config parameters first
param_header, _ = load_fits(input_file_name, hdu_index="CONFIG")
for key, value in param_header.items():
if "X_" not in key and (not isinstance(param_header[key], str) or (isinstance(param_header[key], str) and "X_" not in param_header[key])):
setattr(parameters, key, value)
elif "X_" in key:
continue
elif "X_" in param_header[key]:
setattr(parameters, key, param_header[value])
else:
continue
update_derived_parameters()
# loaded parameters have already been rebinned normally
# if parameters.CCD_REBIN > 1:
# apply_rebinning_to_parameters()
# set the simple items from the mappings. More complex items, i.e.
# those needing function calls, follow
for attribute, header_key in fits_mappings.items():
if self.header.get(header_key) is not None:
setattr(self, attribute, self.header.get(header_key))
else:
self.my_logger.warning(f'\n\tFailed to set spectrum attribute {attribute} using header {header_key}')
# set the more complex items by hand here
if self.header.get('TARGET'):
self.target = load_target(self.header.get('TARGET'), verbose=parameters.VERBOSE)
self.lines = self.target.lines
if self.header.get('TARGETX') and self.header.get('TARGETY'):
self.x0 = [self.header.get('TARGETX'), self.header.get('TARGETY')] # should be a tuple not a list
self.my_logger.info(f'\n\tLoading disperser {self.disperser_label}...')
self.disperser = Hologram(self.disperser_label, data_dir=parameters.DISPERSER_DIR)
self.my_logger.info(f'\n\tSpectrum loaded from {input_file_name}')
if parameters.OBS_OBJECT_TYPE == "STAR":
self.adr_params = [self.dec, self.hour_angle, self.temperature,
self.pressure, self.humidity, self.airmass]
self.psf = load_PSF(psf_type=parameters.PSF_TYPE, target=self.target)
self.chromatic_psf = ChromaticPSF(self.psf, self.spectrogram_Nx, self.spectrogram_Ny,
x0=self.spectrogram_x0, y0=self.spectrogram_y0,
deg=self.spectrogram_deg, saturation=self.spectrogram_saturation)
if 'PSF_REG' in self.header and float(self.header["PSF_REG"]) > 0:
self.chromatic_psf.opt_reg = float(self.header["PSF_REG"])
if not self.fast_load:
with (fits.open(input_file_name) as hdu_list):
self.cov_matrix = hdu_list["SPEC_COV"].data
_, self.data_next_order, self.err_next_order = hdu_list["ORDER2"].data
# load other spectrum info
self.target.image = hdu_list["ORDER0"].data
self.target.image_x0 = float(hdu_list["ORDER0"].header["IM_X0"])
self.target.image_y0 = float(hdu_list["ORDER0"].header["IM_Y0"])
# load spectrogram info
self.spectrogram_data = hdu_list["S_DATA"].data
self.spectrogram_err = hdu_list["S_ERR"].data
self.spectrogram_bgd = hdu_list["S_BGD"].data
self.spectrogram_bgd_rms = hdu_list["S_BGD_ER"].data
self.spectrogram_fit = hdu_list["S_FIT"].data
self.spectrogram_residuals = hdu_list["S_RES"].data
if "S_FLAT" in [hdu.name for hdu in hdu_list]:
self.spectrogram_flat = hdu_list["S_FLAT"].data
if "S_STAR" in [hdu.name for hdu in hdu_list]:
self.spectrogram_starfield = hdu_list["S_STAR"].data
if "S_MASK" in [hdu.name for hdu in hdu_list]:
self.spectrogram_mask = hdu_list["S_MASK"].data
if self.spectrogram_mask is not None:
self.spectrogram_mask = self.spectrogram_mask.astype(bool)
self.chromatic_psf.init_from_table(Table.read(hdu_list["PSF_TAB"]),
saturation=self.spectrogram_saturation)
self.lines.table = Table.read(hdu_list["LINES"], unit_parse_strict="silent")
[docs]
def load_spectrogram(self, input_file_name): # pragma: no cover
"""OBSOLETE: Load the spectrum from a fits file (data, error and wavelengths).
Parameters
----------
input_file_name: str
Path to the input fits file
Examples
--------
>>> s = Spectrum(config="./config/ctio.ini")
>>> s.load_spectrum('tests/data/reduc_20170605_028_spectrum.fits')
"""
if os.path.isfile(input_file_name):
with fits.open(input_file_name) as hdu_list:
header = hdu_list[0].header
self.spectrogram_data = hdu_list[0].data
self.spectrogram_err = hdu_list[1].data
self.spectrogram_bgd = hdu_list[2].data
if len(hdu_list) > 3:
self.spectrogram_bgd_rms = hdu_list[3].data
self.spectrogram_fit = hdu_list[4].data
self.spectrogram_residuals = hdu_list[5].data
self.spectrogram_x0 = float(header['S_X0'])
self.spectrogram_y0 = float(header['S_Y0'])
self.spectrogram_xmin = int(header['S_XMIN'])
self.spectrogram_xmax = int(header['S_XMAX'])
self.spectrogram_ymin = int(header['S_YMIN'])
self.spectrogram_ymax = int(header['S_YMAX'])
self.spectrogram_deg = int(header['S_DEG'])
self.spectrogram_saturation = float(header['S_SAT'])
self.spectrogram_Nx = self.spectrogram_xmax - self.spectrogram_xmin
self.spectrogram_Ny = self.spectrogram_ymax - self.spectrogram_ymin
self.my_logger.info('\n\tSpectrogram loaded from %s' % input_file_name)
else:
self.my_logger.warning('\n\tSpectrogram file %s not found' % input_file_name)
[docs]
def load_chromatic_psf(self, input_file_name): # pragma: no cover
"""OBSOLETE: Load the spectrum from a fits file (data, error and wavelengths).
Parameters
----------
input_file_name: str
Path to the input fits file
Examples
--------
>>> s = Spectrum('./tests/data/reduc_20170530_134_spectrum.fits')
>>> print(s.chromatic_psf.table) #doctest: +ELLIPSIS
lambdas Dx ...
"""
if os.path.isfile(input_file_name):
self.psf = load_PSF(psf_type=parameters.PSF_TYPE, target=self.target)
self.chromatic_psf = ChromaticPSF(self.psf, self.spectrogram_Nx, self.spectrogram_Ny,
x0=self.spectrogram_x0, y0=self.spectrogram_y0,
deg=self.spectrogram_deg, saturation=self.spectrogram_saturation,
file_name=input_file_name)
if 'PSF_REG' in self.header and float(self.header["PSF_REG"]) > 0:
self.chromatic_psf.opt_reg = float(self.header["PSF_REG"])
self.my_logger.info(f'\n\tSpectrogram loaded from {input_file_name}')
else:
self.my_logger.warning(f'\n\tSpectrogram file {input_file_name} not found')
[docs]
def compute_disp_axis_in_spectrogram(self, shift_x, shift_y, angle):
"""Compute the dispersion axis position in a spectrogram.
Origin is the order 0 centroid.
Parameters
----------
shift_x: float
Shift in the x axis direction for order 0 position in pixel.
shift_y: float
Shift in the y axis direction for order 0 position in pixel.
angle: float
Main dispersion axis angle in degrees.
Returns
-------
Dx: array_like
Position array along x axis of spectrogram
Dy_disp_axis: array_like
Dispersion axis position along y axis of spectrogram with respect to Dx.
Examples
--------
>>> s = Spectrum("./tests/data/reduc_20170530_134_spectrum.fits")
>>> Dx, Dy_disp_axis = s.compute_disp_axis_in_spectrogram(0, 0, 0)
>>> Dx[0] == -s.spectrogram_x0
True
>>> Dy_disp_axis[:4]
array([0., 0., 0., 0.])
"""
# Distance in x and y with respect to the TRUE order 0 position at lambda_ref
Dx = np.arange(self.spectrogram_Nx) - self.spectrogram_x0 - shift_x # distance in (x,y) spectrogram frame for column x
Dy_disp_axis = np.tan(angle * np.pi / 180) * Dx - shift_y # disp axis height in spectrogram frame for x with respect to order 0
return Dx, Dy_disp_axis
[docs]
def compute_lambdas_in_spectrogram(self, D, shift_x, shift_y, angle, niter=3, with_adr=True, order=1):
"""Compute the dispersion relation in a spectrogram, using grating dispersion model and ADR,
for a given diffraction order. Origin is the order 0 centroid.
Parameters
----------
D: float
The distance between the CCD and the disperser in mm.
shift_x: float
Shift in the x axis direction for order 0 position in pixel.
shift_y: float
Shift in the y axis direction for order 0 position in pixel.
angle: float
Main dispersion axis angle in degrees.
niter: int, optional
Number of iterations to compute ADR (default: 3).
with_adr: bool, optional
If True, add ADR effect to grating dispersion model (default: True).
order: int, optional
Diffraction order (default: 1).
Returns
-------
lambdas: array_like
Wavelength array for the given diffraction order.
Examples
--------
>>> s = Spectrum("./tests/data/reduc_20170530_134_spectrum.fits")
>>> s.x0 = [743, 683]
>>> s.spectrogram_x0 = -280
>>> lambdas = s.compute_lambdas_in_spectrogram(58, 0, 0, 0)
>>> lambdas[:4] #doctest: +ELLIPSIS
array([334.874..., 336.022..., 337.170..., 338.318...])
>>> lambdas_order2 = s.compute_lambdas_in_spectrogram(58, 0, 0, 0, order=2)
>>> lambdas_order2[:4] #doctest: +ELLIPSIS
array([175.248..., 175.661..., 176.088..., 176.527...])
"""
# Distance in x and y with respect to the true order 0 position at lambda_ref
Dx, Dy_disp_axis = self.compute_disp_axis_in_spectrogram(shift_x=shift_x, shift_y=shift_y, angle=angle)
distance = np.sign(Dx) * np.sqrt(Dx * Dx + Dy_disp_axis * Dy_disp_axis) # algebraic distance along dispersion axis
# Wavelengths using the order 0 shifts (ADR has no impact as it shifts order 0 and order p equally)
new_x0 = [self.x0[0] + shift_x, self.x0[1] + shift_y]
# First guess of wavelengths
lambdas = self.disperser.grating_pixel_to_lambda(distance, new_x0, D=D, order=order)
# Evaluate ADR
if with_adr:
for k in range(niter):
adr_ra, adr_dec = adr_calib(lambdas, self.adr_params, parameters.OBS_LATITUDE,
lambda_ref=self.lambda_ref)
adr_u, adr_v = flip_and_rotate_adr_to_image_xy_coordinates(adr_ra, adr_dec, dispersion_axis_angle=angle)
# Compute lambdas at pixel column x
lambdas = self.disperser.grating_pixel_to_lambda(distance - adr_u, new_x0, D=D, order=order)
return lambdas
[docs]
def compute_dispersion_in_spectrogram(self, lambdas, D, shift_x, shift_y, angle, with_adr=True, order=1):
"""Compute the dispersion relation in a spectrogram, using grating dispersion model and ADR, for a given
diffraction order. Origin is the order 0 centroid.
Parameters
----------
lambdas: array_like
Wavelength array for the given diffraction order.
D: float
The distance between the CCD and the disperser in mm.
shift_x: float
Shift in the x axis direction for order 0 position in pixel.
shift_y: float
Shift in the y axis direction for order 0 position in pixel.
angle: float
Main dispersion axis angle in degrees.
with_adr: bool, optional
If True, add ADR effect to grating dispersion model (default: True).
order: int, optional
Diffraction order (default: 1).
Returns
-------
dispersion_law: array_like
Complex array coding the 2D dispersion relation in the spectrogram for the given diffraction order.
Examples
--------
>>> s = Spectrum("./tests/data/reduc_20170530_134_spectrum.fits")
>>> s.x0 = [743, 683]
>>> s.spectrogram_x0 = -280
>>> lambdas = s.compute_lambdas_in_spectrogram(58, 0, 0, 0)
>>> lambdas[:4] #doctest: +ELLIPSIS
array([334.874..., 336.022..., 337.170..., 338.318...])
>>> dispersion_law = s.compute_dispersion_in_spectrogram(lambdas, 58, 0, 0, 0, order=1)
>>> dispersion_law[:4] #doctest: +ELLIPSIS
array([280.0... +1.0...j, 281.0...+1.0...j,
282.0...+1.0...j, 283.0... +1.0...j])
>>> dispersion_law_order2 = s.compute_dispersion_in_spectrogram(lambdas, 58, 0, 0, 0, order=2)
>>> dispersion_law_order2[:4] #doctest: +ELLIPSIS
array([573.6...+1.0...j, 575.8...+1.0...j,
577.9...+1.0...j, 580.0...+1.0...j])
"""
new_x0 = [self.x0[0] + shift_x, self.x0[1] + shift_y]
# Distance (not position) in pixel of wavelength lambda centroid in the (x,y) spectrogram frame
# with respect to order 0 centroid
distance_along_disp_axis = self.disperser.grating_lambda_to_pixel(lambdas, x0=new_x0, D=D, order=order)
Dx = distance_along_disp_axis * np.cos(angle * np.pi / 180)
Dy_disp_axis = distance_along_disp_axis * np.sin(angle * np.pi / 180)
# Evaluate ADR
adr_x = np.zeros_like(Dx)
adr_y = np.zeros_like(Dy_disp_axis)
if with_adr:
adr_ra, adr_dec = adr_calib(lambdas, self.adr_params, parameters.OBS_LATITUDE,
lambda_ref=self.lambda_ref)
adr_x, adr_y = flip_and_rotate_adr_to_image_xy_coordinates(adr_ra, adr_dec, dispersion_axis_angle=0)
# Position (not distance) in pixel of wavelength lambda centroid in the (x,y) spectrogram frame
# with respect to order 0 initial centroid position.
dispersion_law = (Dx + shift_x + with_adr * adr_x) + 1j * (Dy_disp_axis + with_adr * adr_y + shift_y)
return dispersion_law
[docs]
class MultigaussAndBgdFitWorkspace(FitWorkspace):
def __init__(self, lines, guess, x, data, err, bounds, bgd_npar, indices, file_name="", verbose=False, plot=False,
live_fit=False, truth=None):
"""
Parameters
----------
Examples
--------
>>> from spectractor.config import load_config
>>> load_config("default.ini")
>>> x = np.arange(600.,800.,1)
>>> x_norm = rescale_x_to_legendre(x)
>>> p = [20, 0, 0, 0, 20, 650, 3, 40, 750, 5]
>>> y = multigauss_and_bgd(np.array([x_norm, x]), *p)
>>> print(f'{y[0]:.2f}')
20.00
>>> err = 0.1 * np.sqrt(y)
>>> guess = (10,0,0,0.1,10,640,2,20,750,7)
>>> bounds = ((-np.inf,-np.inf,-np.inf,-np.inf,1,600,1,1,600,1),(np.inf,np.inf,np.inf,np.inf,100,800,100,100,800,100))
>>> w = MultigaussAndBgdFitWorkspace(lines=None, guess=guess, x=x, data=y, err=err, bounds=np.array(bounds).T, bgd_npar=4, indices=x)
>>> w = run_multigaussandbgd_minimisation(w)
>>> popt = w.params.values
>>> assert np.allclose(p, w.params.values, rtol=1e-4, atol=1e-5)
>>> _ = w.plot_fit()
.. plot::
import matplotlib.pyplot as plt
import numpy as np
from spectractor.tools import multigauss_and_bgd, fit_multigauss_and_bgd
x = np.arange(600.,800.,1)
x_norm = rescale_x_to_legendre(x)
p = [20, 0, 0, 0, 20, 650, 3, 40, 750, 5]
y = multigauss_and_bgd(np.array([x_norm, x]), *p)
err = 0.1 * np.sqrt(y)
guess = (10,0,0,0.1,10,640,2,20,750,7)
bounds = ((-np.inf,-np.inf,-np.inf,-np.inf,1,600,1,1,600,1),(np.inf,np.inf,np.inf,np.inf,100,800,100,100,800,100))
w = MultigaussAndBgdFitWorkspace(guess, x, y, err, np.array(bounds).T)
run_multigaussandbgd_minimisation(w, method="newton")
w.plot_fit()
"""
self.lines = lines
self.bgd_npar = bgd_npar
self.indices = indices
labels = [f"b_{k}" for k in range(bgd_npar)]
for ngauss in range((len(guess) - bgd_npar) // 3):
labels += [f"A_{ngauss}", f"x0_{ngauss}", f"sigma_{ngauss}"]
params = FitParameters(values=guess, labels=labels, bounds=bounds, truth=truth)
FitWorkspace.__init__(self, params, file_name=file_name, verbose=verbose, plot=plot,
live_fit=live_fit, truth=truth)
self.my_logger = set_logger(self.__class__.__name__)
if err is not None and data.shape != err.shape:
raise ValueError(f"Data and uncertainty arrays must have the same shapes. "
f"Here data.shape={data.shape} and data_errors.shape={err.shape}.")
self.x = x
self.x_norm = rescale_x_to_legendre(x)
self.xs = np.array([self.x_norm, self.x])
self.data = data
self.err = err
[docs]
def reset_x(self, x):
self.x = x
self.x_norm = rescale_x_to_legendre(x)
self.xs = np.array([self.x_norm, self.x])
[docs]
def simulate(self, *p):
self.model = multigauss_and_bgd(self.xs, *p)
return self.x, self.model, np.zeros_like(self.model)
[docs]
def jacobian(self, params, model_input=None):
return multigauss_and_bgd_jacobian(self.xs, *params).T
[docs]
def run_multigaussandbgd_minimisation(w):
run_minimisation(w, method="curve_fit", ftol=1e-4, xtol=1e-6)
return w
def _init_fit_lines(lines, lambdas, spec, spec_err=None, cov_matrix=None, fwhm_func=None,
calibration_lines_only=False,
xlim=(parameters.LAMBDA_MIN, parameters.LAMBDA_MAX)):
"""Detect the lines in a spectrum. The method is to look at maxima or minima
around emission or absorption tabulated lines, and to select surrounding pixels
to fit a (positive or negative) gaussian and a polynomial background. If several regions
overlap, a multi-gaussian fit is performed above a common polynomial background.
The order of the polynomial background is set in parameters.py with CALIB_BGD_ORDER.
Parameters
----------
lines: Lines
The Lines object containing the line characteristics
lambdas: float array
The wavelength array (in nm)
spec: float array
The spectrum amplitude array
spec_err: float array, optional
The spectrum amplitude uncertainty array (default: None)
cov_matrix: float array, optional
The spectrum amplitude 2D covariance matrix array (default: None)
fwhm_func: callable, optional
The fwhm of the cross spectrum to reset CALIB_PEAK_WIDTH parameter as a function of lambda (default: None)
ax: Axes, optional
An Axes instance to over plot the result of the fit (default: None).
calibration_lines_only: bool, optional
If True, try to detect only the lines with use_for_calibration attributes set True.
xlim: array, optional
(min, max) list limiting the wavelength interval where to detect spectral lines (default:
(parameters.LAMBDA_MIN, parameters.LAMBDA_MAX))
Returns
-------
DetectedLines: DetectedLines
Class containing the detected lines before fitting
"""
# main settings
baseline_prior = 3 # *sigma gaussian prior on base line fit
peak_width = parameters.CALIB_PEAK_WIDTH
bgd_width = parameters.CALIB_BGD_WIDTH
# if lines.hydrogen_only:
# peak_width = 7
# bgd_width = 15
fwhm_to_peak_width_factor = 1.5
len_index_to_bgd_npar_factor = 0 * 0.12 / 0.024 * parameters.CCD_PIXEL2MM
# filter the noise
# plt.errorbar(lambdas,spec,yerr=spec_err)
spec = np.copy(spec)
spec_smooth = savgol_filter(spec, parameters.CALIB_SAVGOL_WINDOW, parameters.CALIB_SAVGOL_ORDER)
# plt.plot(lambdas,spec)
# plt.show()
# initialisation
lambda_shifts = []
calib_lines = []
snrs = []
index_list = []
bgd_npar_list = []
peak_index_list = []
guess_list = []
bounds_list = []
lines_list = []
fitworkspaces = []
for line in lines.lines:
# reset line fit attributes
line.fitted = False
line.fit_popt = None
line.high_snr = False
if not line.use_for_calibration and calibration_lines_only:
continue
# wavelength of the line: find the nearest pixel index
line_wavelength = line.wavelength
if fwhm_func is not None:
peak_width = max(fwhm_to_peak_width_factor * fwhm_func(line_wavelength), parameters.CALIB_PEAK_WIDTH)
if line_wavelength < xlim[0] or line_wavelength > xlim[1]:
continue
l_index, l_lambdas = find_nearest(lambdas, line_wavelength)
# reject if pixel index is too close to image bounds
if l_index < peak_width or l_index > len(lambdas) - peak_width:
continue
# search for local extrema to detect emission or absorption line
# around pixel index +/- peak_width
line_strategy = np.greater # look for emission line
bgd_strategy = np.less
if not lines.emission_spectrum or line.atmospheric:
line_strategy = np.less # look for absorption line
bgd_strategy = np.greater
index = np.arange(l_index - peak_width, l_index + peak_width, 1).astype(int)
# skip if data is masked with NaN
if np.any(np.isnan(spec_smooth[index])):
continue
extrema = argrelextrema(spec_smooth[index], line_strategy)
if len(extrema[0]) == 0:
continue
peak_index = index[0] + extrema[0][0]
# if several extrema, look for the greatest
if len(extrema[0]) > 1:
if line_strategy == np.greater:
test = -1e20
for m in extrema[0]:
idx = index[0] + m
if spec_smooth[idx] > test:
peak_index = idx
test = spec_smooth[idx]
elif line_strategy == np.less:
test = 1e20
for m in extrema[0]:
idx = index[0] + m
if spec_smooth[idx] < test:
peak_index = idx
test = spec_smooth[idx]
# search for first local minima around the local maximum
# or for first local maxima around the local minimum
# around +/- 3*peak_width
index_inf = peak_index - 1 # extrema on the left
while index_inf > max(0, peak_index - 3 * peak_width):
test_index = np.arange(index_inf, peak_index, 1).astype(int)
minm = argrelextrema(spec_smooth[test_index], bgd_strategy)
if len(minm[0]) > 0:
index_inf = index_inf + minm[0][0]
break
else:
index_inf -= 1
index_sup = peak_index + 1 # extrema on the right
while index_sup < min(len(spec_smooth) - 1, peak_index + 3 * peak_width):
test_index = np.arange(peak_index, index_sup, 1).astype(int)
minm = argrelextrema(spec_smooth[test_index], bgd_strategy)
if len(minm[0]) > 0:
index_sup = peak_index + minm[0][0]
break
else:
index_sup += 1
index_sup = max(index_sup, peak_index + peak_width)
index_inf = min(index_inf, peak_index - peak_width)
# pixel range to consider around the peak, adding bgd_width pixels
# to fit for background around the peak
index = list(np.arange(max(0, index_inf - bgd_width),
min(len(lambdas), index_sup + bgd_width), 1).astype(int))
# exclude pixels very weak compared to the median signal in this zone
# if most pixels are excluded, temove the line from the fit
mask = spec_smooth[index] > 5e-2 * np.median(spec_smooth[index])
if np.sum(mask) > peak_width:
index = list(np.array(index)[mask])
else:
continue
# skip if data is masked with NaN
if np.any(np.isnan(spec_smooth[index])):
continue
# first guess and bounds to fit the line properties and
# the background with CALIB_BGD_ORDER order polynom
# guess = [0] * bgd_npar + [0.5 * np.max(spec_smooth[index]), lambdas[peak_index],
# 0.5 * (line.width_bounds[0] + line.width_bounds[1])]
bgd_npar = max(parameters.CALIB_BGD_NPARAMS, int(len_index_to_bgd_npar_factor * (index[-1] - index[0])))
bgd_npar_list.append(bgd_npar)
guess = [0] * bgd_npar + [0.5 * np.max(spec_smooth[index]), line_wavelength,
0.5 * (line.width_bounds[0] + line.width_bounds[1])]
if line_strategy == np.less:
# noinspection PyTypeChecker
guess[bgd_npar] = -0.5 * np.max(spec_smooth[index]) # look for abosrption under bgd
# bounds = [[-np.inf] * bgd_npar + [-abs(np.max(spec[index])), lambdas[index_inf], line.width_bounds[0]],
# [np.inf] * bgd_npar + [abs(np.max(spec[index])), lambdas[index_sup], line.width_bounds[1]]]
bounds = [[-np.inf] * bgd_npar + [-abs(np.max(spec[index])), line_wavelength - peak_width,
line.width_bounds[0]],
[np.inf] * bgd_npar + [abs(np.max(spec[index])), line_wavelength + peak_width,
line.width_bounds[1]]]
# gaussian amplitude bounds depend if line is emission/absorption
if line_strategy == np.less:
bounds[1][bgd_npar] = 0 # look for absorption under bgd
else:
bounds[0][bgd_npar] = 0 # look for emission above bgd
peak_index_list.append(peak_index)
index_list.append(index)
lines_list.append(line)
guess_list.append(guess)
bounds_list.append(bounds)
# now gather lines together if pixel index ranges overlap
idx = 0
merges = [[0]]
while idx < len(index_list) - 1:
idx = merges[-1][-1]
if idx == len(index_list) - 1:
break
if index_list[idx + 1][0] > index_list[idx][0]: # increasing order
if index_list[idx][-1] > index_list[idx + 1][0]:
merges[-1].append(idx + 1)
else:
merges.append([idx + 1])
idx += 1
else: # decreasing order
if index_list[idx][0] < index_list[idx + 1][-1]:
merges[-1].append(idx + 1)
else:
merges.append([idx + 1])
idx += 1
# reorder merge list with respect to lambdas in guess list
new_merges = []
for merge in merges:
if len(guess_list) == 0:
continue
tmp_guess = [guess_list[i][-2] for i in merge]
new_merges.append([x for _, x in sorted(zip(tmp_guess, merge))])
# reorder lists with merges
new_peak_index_list = []
new_index_list = []
new_guess_list = []
new_bounds_list = []
new_lines_list = []
for merge in new_merges:
new_peak_index_list.append([])
new_index_list.append([])
new_guess_list.append([])
new_bounds_list.append([[], []])
new_lines_list.append([])
for i in merge:
# add the bgd parameters
bgd_npar = bgd_npar_list[i]
# if i == merge[0]:
# new_guess_list[-1] += guess_list[i][:bgd_npar]
# new_bounds_list[-1][0] += bounds_list[i][0][:bgd_npar]
# new_bounds_list[-1][1] += bounds_list[i][1][:bgd_npar]
# add the gauss parameters
new_peak_index_list[-1].append(peak_index_list[i])
new_index_list[-1] += index_list[i]
new_guess_list[-1] += guess_list[i][bgd_npar:]
new_bounds_list[-1][0] += bounds_list[i][0][bgd_npar:]
new_bounds_list[-1][1] += bounds_list[i][1][bgd_npar:]
new_lines_list[-1].append(lines_list[i])
# set central peak bounds exactly between two close lines
# for k in range(len(merge) - 1):
# new_bounds_list[-1][0][3 * (k + 1) + 1] = 0.5 * (
# new_guess_list[-1][3 * k + 1] + new_guess_list[-1][3 * (k + 1) + 1])
# new_bounds_list[-1][1][3 * k + 1] = 0.5 * (
# new_guess_list[-1][3 * k + 1] + new_guess_list[-1][3 * (k + 1) + 1]) + 1e-3
# last term is to avoid equalities
# between bounds in some pathological case
# sort pixel indices and remove doublons
new_index_list[-1] = sorted(list(set(new_index_list[-1])))
for k in range(len(new_index_list)):
# first guess for the base line with the lateral bands
peak_index = new_peak_index_list[k]
index = new_index_list[k]
guess = new_guess_list[k]
bounds = new_bounds_list[k]
bgd_index = []
if fwhm_func is not None:
peak_width = fwhm_to_peak_width_factor * np.mean(fwhm_func(lambdas[index]))
for i in index:
is_close_to_peak = False
for j in peak_index:
if abs(i - j) < peak_width:
is_close_to_peak = True
break
if not is_close_to_peak:
bgd_index.append(i)
# add background guess and bounds
bgd_npar = max(parameters.CALIB_BGD_ORDER + 1, int(len_index_to_bgd_npar_factor * len(bgd_index)))
parameters.CALIB_BGD_NPARAMS = bgd_npar
guess = [0] * bgd_npar + guess
bounds[0] = [-np.inf] * bgd_npar + bounds[0]
bounds[1] = [np.inf] * bgd_npar + bounds[1]
if len(bgd_index) > 0:
try:
if spec_err is not None:
w = 1. / spec_err[bgd_index]
else:
w = np.ones_like(lambdas[bgd_index])
fit, cov, model = fit_poly1d_legendre(lambdas[bgd_index], spec[bgd_index], order=bgd_npar - 1, w=w)
except:
if spec_err is not None:
w = 1. / spec_err[index]
else:
w = np.ones_like(lambdas[index])
fit, cov, model = fit_poly1d_legendre(lambdas[index], spec[index], order=bgd_npar - 1, w=w)
else:
if spec_err is not None:
w = 1. / spec_err[index]
else:
w = np.ones_like(lambdas[index])
fit, cov, model = fit_poly1d_legendre(lambdas[index], spec[index], order=bgd_npar - 1, w=w)
bgd_mean = float(np.mean(spec_smooth[bgd_index]))
bgd_std = float(np.std(spec_smooth[bgd_index]))
for n in range(bgd_npar):
guess[n] = fit[n]
b = abs(baseline_prior * guess[n])
# b = abs(baseline_prior * np.sqrt(cov[n,n]))
# Following is useful if by mistake guess is a zero vector
if np.isclose(b, 0, atol=1e-2 * bgd_mean):
b = baseline_prior * bgd_std
if np.isclose(b, 0, atol=1e-2 * bgd_mean):
b = np.inf
bounds[0][n] = -np.inf # guess[n] - b
bounds[1][n] = np.inf # guess[n] + b
for j in range(len(new_lines_list[k])):
idx = new_peak_index_list[k][j]
x_norm = rescale_x_to_legendre(lambdas[idx])
guess[bgd_npar + 3 * j] = np.sign(guess[bgd_npar + 3 * j]) * abs(
spec_smooth[idx] - np.polynomial.legendre.legval(x_norm, guess[:bgd_npar]))
# guess[bgd_npar + 3 * j] = np.sign(guess[bgd_npar + 3 * j]) * abs(spec_smooth[idx] - np.polyval(guess[:bgd_npar], lambdas[idx]))
if np.sign(guess[bgd_npar + 3 * j]) < 0: # absorption
bounds[0][bgd_npar + 3 * j] = 5 * guess[bgd_npar + 3 * j]
bounds[1][bgd_npar + 3 * j] = 0
else: # emission
bounds[0][bgd_npar + 3 * j] = 0
bounds[1][bgd_npar + 3 * j] = 5 * guess[bgd_npar + 3 * j]
# fit local extrema with a multigaussian + CALIB_BGD_ORDER polynom
# account for the spectrum uncertainties if provided
sigma = None
if spec_err is not None:
sigma = spec_err[index]
if cov_matrix is not None:
sigma = cov_matrix[index, index]
x_norm = rescale_x_to_legendre(lambdas[index])
w = MultigaussAndBgdFitWorkspace(new_lines_list[k], guess, lambdas[index], spec[index], sigma,
np.array(bounds).T, verbose=True, bgd_npar=bgd_npar, indices=index)
fitworkspaces.append(w)
return fitworkspaces
def _fit_lines(fitworkspaces, snr_minlevel=3, ax=None):
"""Fit the lines in a spectrum.
The order of the polynomial background is set in parameters.py with CALIB_BGD_ORDER.
Parameters
----------
fitworkspaces: List[FitWorkspace]
The list of FitWorkspace instances to fit the lines
snr_minlevel: float
The minimum signal over noise ratio to consider using a fitted line in the computation of the mean
shift output and to print it in the outpur table (default: 3)
ax: Axes, optional
An Axes instance to over plot the result of the fit (default: None).
Returns
-------
shift: float
The mean shift (in nm) between the detected and tabulated lines
"""
lambda_shifts = []
snrs = []
res = []
global_chisq = 0
calib_lines = []
for w in fitworkspaces:
bgd_npar = w.bgd_npar
index = w.indices
lambdas = w.x
w = run_multigaussandbgd_minimisation(w)
popt = w.params.values
pcov = w.params.cov
# noise level defined as the std of the residuals if no error
x_norm = rescale_x_to_legendre(w.x)
best_fit_model = multigauss_and_bgd(np.array([x_norm, w.x]), *popt)
noise_level = np.std(w.data - best_fit_model)
# otherwise mean of error bars of bgd lateral bands
if w.err is not None:
chisq = np.sum((best_fit_model - w.data) ** 2 / (w.err * w.err))
else:
chisq = np.sum((best_fit_model - w.data) ** 2)
chisq /= len(w.indices)
global_chisq += chisq
if w.err is not None:
noise_level = np.sqrt(np.mean(w.err ** 2))
for j in range(len(w.lines)):
line = w.lines[j]
peak_pos = popt[bgd_npar + 3 * j + 1]
# FWHM
FWHM = np.abs(popt[bgd_npar + 3 * j + 2]) * 2.355
# SNR computation
# signal_level = popt[bgd_npar+3*j]
# multigauss_and_bgd(peak_pos, *popt) - np.polyval(popt[:bgd_npar], peak_pos)
signal_level = popt[bgd_npar + 3 * j]
snr = np.abs(signal_level / noise_level)
# save fit results
if w.params.fixed[w.params.get_index(f"x0_{j}")]:
line.fitted = False
else:
line.fitted = True
line.fit_index = index
line.fit_lambdas = lambdas
x_norm = rescale_x_to_legendre(lambdas)
x_step = 0.1 # nm
x_int = np.arange(max(np.min(lambdas), peak_pos - 5 * np.abs(popt[bgd_npar + 3 * j + 2])),
min(np.max(lambdas), peak_pos + 5 * np.abs(popt[bgd_npar + 3 * j + 2])), x_step)
if len(x_int) < 2:
line.my_logger.warning(f"Not enough points to fit line {line.label} at {peak_pos:.3f}nm. "
f"Only {len(x_int)} points in the range [{lambdas[0]:.3f}, {lambdas[-1]:.3f}]")
line.fitted = False
continue
x_int_norm = rescale_x_to_legendre(x_int)
# jmin and jmax a bit larger than x_int to avoid extrapolation
jmin = max(0, int(np.argmin(np.abs(lambdas - (x_int[0] - x_step))) - 2))
jmax = min(len(lambdas), int(np.argmin(np.abs(lambdas - (x_int[-1] + x_step))) + 2))
if jmax - 2 < jmin + 2: # decreasing order
jmin, jmax = max(0, jmax - 4), min(len(lambdas), jmin + 4)
spectr_data = interp1d(lambdas[jmin:jmax], w.data[jmin:jmax],
bounds_error=False, fill_value="extrapolate")(x_int)
Continuum = np.polynomial.legendre.legval(x_int_norm, popt[:bgd_npar])
# Continuum = np.polyval(popt[:bgd_npar], x_int)
Gauss = gauss(x_int, *popt[bgd_npar + 3 * j:bgd_npar + 3 * j + 3])
Y = -Gauss / Continuum
Ydata = 1 - spectr_data / Continuum
line.fit_eqwidth_mod = integrate.simpson(Y, x=x_int) # sol1
line.fit_eqwidth_data = integrate.simpson(Ydata, x=x_int) # sol2
line.fit_popt = popt
line.fit_pcov = pcov
line.fit_popt_gaussian = popt[bgd_npar + 3 * j:bgd_npar + 3 * j + 3]
if pcov is not None:
line.fit_pcov_gaussian = pcov[bgd_npar + 3 * j:bgd_npar + 3 * j + 3, bgd_npar + 3 * j:bgd_npar + 3 * j + 3]
line.fit_gauss = gauss(lambdas, *popt[bgd_npar + 3 * j:bgd_npar + 3 * j + 3])
line.fit_bgd = np.polynomial.legendre.legval(x_norm, popt[:bgd_npar])
# line.fit_bgd = np.polyval(popt[:bgd_npar], x_int)
line.fit_snr = snr
line.fit_chisq = chisq
line.fit_fwhm = FWHM
line.fit_bgd_npar = bgd_npar
line.high_snr = True
if line.use_for_calibration:
# wavelength shift between tabulate and observed lines
if snr > snr_minlevel:
lambda_shifts.append(peak_pos - line.wavelength)
snrs.append(snr)
calib_lines.append(line)
# print(line.label, line.wavelength, peak_pos - line.wavelength)
res.append((peak_pos - line.wavelength) / max(0.1, popt[bgd_npar + 3 * j + 2])) # max(0.1, w.params.err[
#w.params.get_index(f"x0_{j}")])) # np.sqrt(pcov[bgd_npar + 3 * j + 1,bgd_npar + 3 * j + 1]))
# * np.abs(popt[bgd_npar + 3 * j + 0])**2
if len(lambda_shifts) > 0:
global_chisq /= len(lambda_shifts)
shift = np.average(np.abs(lambda_shifts) ** 2, weights=np.array(snrs) ** 2)
# if guess values on tabulated lines have not moved: penalize the chisq
global_chisq += shift
# lines.my_logger.debug(f'\n\tNumber of calibration lines detected {len(lambda_shifts):d}'
# f'\n\tTotal chisq: {global_chisq:.3f} with shift {shift:.3f}pix')
else:
global_chisq = 2 * len(parameters.LAMBDAS)
# lines.my_logger.debug(
# f'\n\tNumber of calibration lines detected {len(lambda_shifts):d}\n\tTotal chisq: {global_chisq:.3f}')
return global_chisq, np.array(res) #* np.array(snrs)
[docs]
def detect_lines(lines, lambdas, spec, spec_err=None, cov_matrix=None, fwhm_func=None, snr_minlevel=3, ax=None,
calibration_lines_only=False,
xlim=(parameters.LAMBDA_MIN, parameters.LAMBDA_MAX)):
"""Detect and fit the lines in a spectrum. The method is to look at maxima or minima
around emission or absorption tabulated lines, and to select surrounding pixels
to fit a (positive or negative) gaussian and a polynomial background. If several regions
overlap, a multi-gaussian fit is performed above a common polynomial background.
The mean global shift (in nm) between the detected and tabulated lines is returned, considering
only the lines with a signal-to-noise ratio above a threshold.
The order of the polynomial background is set in parameters.py with CALIB_BGD_ORDER.
Parameters
----------
lines: Lines
The Lines object containing the line characteristics
lambdas: float array
The wavelength array (in nm)
spec: float array
The spectrum amplitude array
spec_err: float array, optional
The spectrum amplitude uncertainty array (default: None)
cov_matrix: float array, optional
The spectrum amplitude 2D covariance matrix array (default: None)
fwhm_func: callable, optional
The fwhm of the cross spectrum to reset CALIB_PEAK_WIDTH parameter as a function of lambda (default: None)
snr_minlevel: float
The minimum signal over noise ratio to consider using a fitted line in the computation of the mean
shift output and to print it in the outpur table (default: 3)
ax: Axes, optional
An Axes instance to over plot the result of the fit (default: None).
calibration_lines_only: bool, optional
If True, try to detect only the lines with use_for_calibration attributes set True.
xlim: array, optional
(min, max) list limiting the wavelength interval where to detect spectral lines (default:
(parameters.LAMBDA_MIN, parameters.LAMBDA_MAX))
Returns
-------
shift: float
The mean shift (in nm) between the detected and tabulated lines
Examples
--------
Creation of a mock spectrum with emission and absorption lines:
>>> import numpy as np
>>> from spectractor.extractor.spectroscopy import Lines, HALPHA, HBETA, O2_1
>>> lambdas = np.arange(300,1000,1)
>>> spectrum = 1e4*np.exp(-((lambdas-600)/200)**2)
>>> spectrum += HALPHA.gaussian_model(lambdas, A=5000, sigma=3)
>>> spectrum += HBETA.gaussian_model(lambdas, A=3000, sigma=2)
>>> spectrum += O2_1.gaussian_model(lambdas, A=-3000, sigma=7)
>>> spectrum_err = np.sqrt(spectrum)
>>> cov = np.diag(spectrum_err)
>>> spectrum = np.random.poisson(spectrum)
>>> spec = Spectrum()
>>> spec.lambdas = lambdas
>>> spec.data = spectrum
>>> spec.err = spectrum_err
>>> fwhm_func = interp1d(lambdas, 0.01 * lambdas)
Detect the lines:
>>> lines = Lines([HALPHA, HBETA, O2_1], hydrogen_only=True,
... atmospheric_lines=True, redshift=0, emission_spectrum=True)
>>> global_chisq, _, _ = detect_lines(lines, lambdas, spectrum, spectrum_err, cov, fwhm_func=fwhm_func)
.. doctest::
:hide:
>>> assert(global_chisq < 2)
Plot the result:
>>> import matplotlib.pyplot as plt
>>> spec.lines = lines
>>> fig = plt.figure()
>>> plot_spectrum_simple(plt.gca(), lambdas, spec.data, data_err=spec.err)
>>> lines.plot_detected_lines(plt.gca())
>>> if parameters.DISPLAY: plt.show()
"""
fitworkspaces = _init_fit_lines(lines, lambdas, spec, spec_err=spec_err, cov_matrix=cov_matrix, fwhm_func=fwhm_func,
calibration_lines_only=calibration_lines_only, xlim=xlim)
global_chisq, res = _fit_lines(fitworkspaces, ax=ax, snr_minlevel=snr_minlevel)
for line in lines.lines:
for w in fitworkspaces:
for fitted_line in w.lines:
if line.label == fitted_line.label:
line = fitted_line
break
if ax is not None:
lines.plot_detected_lines(ax)
lines.table = lines.build_detected_line_table()
lines.my_logger.debug(f"\n{lines.table}")
return global_chisq, np.array(res), fitworkspaces
[docs]
def calibrate_spectrum(spectrum, with_adr=False, niter=5, grid_search=False):
"""Convert pixels into wavelengths given the position of the order 0,
the data for the spectrum, the properties of the disperser. Fit the absorption
(and eventually the emission) lines to perform a second calibration of the
distance between the CCD and the disperser. The number of fitting steps is
limited to 30.
Finally convert the spectrum amplitude from ADU rate to erg/s/cm2/nm.
Parameters
----------
spectrum: Spectrum
Spectrum object to calibrate
with_adr: bool, optional
If True, the ADR longitudinal shift is subtracted to distances.
Must be False if the spectrum has already been decontaminated from ADR (default: False).
niter: int, optional
Number of iterations for ADR accurate evaluation (default: 5).
Returns
-------
lambdas: array_like
The new wavelength array in nm.
Examples
--------
>>> spectrum = Spectrum('tests/data/reduc_20170530_134_spectrum.fits', config="")
>>> parameters.LAMBDA_MIN = 350
>>> parameters.LAMBDA_MAX = 800
>>> parameters.DEBUG = True
>>> parameters.PIXSHIFT_PRIOR = 2
>>> parameters.DISTANCE2CCD_ERR = 0.05
>>> lambdas = calibrate_spectrum(spectrum, with_adr=True, grid_search=True)
>>> spectrum.plot_spectrum()
"""
with_adr = int(with_adr)
if spectrum.units != "ADU/s": # go back in ADU/s to remove previous lambda*dlambda normalisation
spectrum.convert_from_flam_to_ADUrate()
distance = spectrum.chromatic_psf.get_algebraic_distance_along_dispersion_axis()
spectrum.lambdas = spectrum.disperser.grating_pixel_to_lambda(distance, spectrum.x0,
D=parameters.DISTANCE2CCD, order=spectrum.order)
# ADR is x>0 westward and y>0 northward while CTIO images are x>0 westward and y>0 southward
# Must project ADR along dispersion axis
if with_adr > 0:
for k in range(niter):
adr_ra, adr_dec = adr_calib(spectrum.lambdas, spectrum.adr_params, parameters.OBS_LATITUDE,
lambda_ref=spectrum.lambda_ref)
adr_u, _ = flip_and_rotate_adr_to_image_xy_coordinates(adr_ra, adr_dec,
dispersion_axis_angle=spectrum.rotation_angle)
spectrum.lambdas = spectrum.disperser.grating_pixel_to_lambda(distance - adr_u, spectrum.x0,
D=parameters.DISTANCE2CCD, order=spectrum.order)
else:
adr_u = np.zeros_like(distance)
x0 = spectrum.x0
if x0 is None:
x0 = spectrum.target_pixcoords
spectrum.x0 = x0
# Detect emission/absorption lines and calibrate pixel/lambda
fwhm_func = interp1d(spectrum.chromatic_psf.table['lambdas'],
spectrum.chromatic_psf.table['fwhm'],
fill_value=(parameters.CALIB_PEAK_WIDTH, parameters.CALIB_PEAK_WIDTH), bounds_error=False)
def shift_minimizer(params, full_output=False):
D, shift = params
if np.isnan(D): # reset the value in case of bad gradient descent
D = parameters.DISTANCE2CCD
if np.isnan(shift): # reset the value in case of bad gradient descent
shift = 0
dist = spectrum.chromatic_psf.get_algebraic_distance_along_dispersion_axis(shift_x=shift)
spectrum.lambdas = spectrum.disperser.grating_pixel_to_lambda(dist - with_adr * adr_u,
x0=[x0[0] + shift, x0[1]],
D=D, order=spectrum.order)
spectrum.lambdas_binwidths = np.gradient(spectrum.lambdas)
spectrum.convert_from_ADUrate_to_flam()
chisq, res, fitworkspaces = detect_lines(spectrum.lines, spectrum.lambdas, spectrum.data,
spec_err=spectrum.err,
fwhm_func=fwhm_func, ax=None, calibration_lines_only=True)
chisq += (shift / parameters.PIXSHIFT_PRIOR) ** 2
spectrum.convert_from_flam_to_ADUrate()
if full_output:
return chisq, res, fitworkspaces
else:
return chisq
# grid exploration of the parameters
# necessary because of the line detection algo
D = parameters.DISTANCE2CCD
pixel_shift = 0
if 'D2CCD' in spectrum.header:
D = spectrum.header['D2CCD']
if 'PIXSHIFT' in spectrum.header:
pixel_shift = spectrum.header['PIXSHIFT']
D_err = parameters.DISTANCE2CCD_ERR
D_step = D_err / 2
pixel_shift_step = parameters.PIXSHIFT_PRIOR / 10.
pixel_shift_prior = parameters.PIXSHIFT_PRIOR
if grid_search:
Ds = np.arange(D - 5 * D_err, D + 6 * D_err, D_step)
pixel_shifts = np.arange(-pixel_shift_prior, pixel_shift_prior + pixel_shift_step, pixel_shift_step)
chisq_grid = np.zeros((len(Ds), len(pixel_shifts)))
for i, D in enumerate(Ds):
for j, pixel_shift in enumerate(pixel_shifts):
chisq_grid[i, j] = shift_minimizer([D, pixel_shift], full_output=False)
imin, jmin = np.unravel_index(chisq_grid.argmin(), chisq_grid.shape)
D = Ds[imin]
pixel_shift = pixel_shifts[jmin]
if imin == 0 or imin == Ds.size or jmin == 0 or jmin == pixel_shifts.size:
spectrum.my_logger.warning('\n\tMinimum chisq is on the edge of the exploration grid.')
if parameters.DEBUG:
fig = plt.figure(figsize=(7, 4))
im = plt.imshow(np.log10(chisq_grid), origin='lower', aspect='auto',
extent=(
np.min(pixel_shifts) - pixel_shift_step / 2,
np.max(pixel_shifts) + pixel_shift_step / 2,
np.min(Ds) - D_step / 2, np.max(Ds) + D_step / 2))
plt.gca().scatter(pixel_shift, D, marker='o', s=100, edgecolors='k', facecolors='none',
label='Minimum', linewidth=2)
c = plt.colorbar(im)
c.set_label('Log10(chisq)')
plt.xlabel(r'Pixel shift $\delta u_0$ [pix]')
plt.ylabel(r'$D_\mathrm{CCD}$ [mm]')
plt.legend()
fig.tight_layout()
if parameters.DISPLAY: # pragma: no cover
plt.show()
if parameters.LSST_SAVEFIGPATH: # pragma: no cover
fig.savefig(os.path.join(parameters.LSST_SAVEFIGPATH, 'D2CCD_x0_fit.pdf'))
# initialize starting point and spectrum.detected_lines attribute
start = np.array([D, pixel_shift])
chisq, res, fitworkspaces = shift_minimizer(start, full_output=True)
# now minimize around the global minimum found previously
# res = optimize.minimize(shift_minimizer, start, args=(), method='L-BFGS-B',
# options={'maxiter': 200, 'ftol': 1e-3},
# bounds=((D - 5 * parameters.DISTANCE2CCD_ERR, D + 5 * parameters.DISTANCE2CCD_ERR), (-2, 2)))
# print("minimize", start, res.x)
def res_minimizer(params):
D, shift = params
if np.isnan(D): # reset the value in case of bad gradient descent
D = parameters.DISTANCE2CCD
if np.isnan(shift): # reset the value in case of bad gradient descent
shift = 0
dist = spectrum.chromatic_psf.get_algebraic_distance_along_dispersion_axis(shift_x=shift)
spectrum.lambdas = spectrum.disperser.grating_pixel_to_lambda(dist - with_adr * adr_u,
x0=[x0[0] + shift, x0[1]],
D=D, order=spectrum.order)
spectrum.lambdas_binwidths = np.gradient(spectrum.lambdas)
spectrum.convert_from_ADUrate_to_flam()
for w in fitworkspaces:
w.reset_x(spectrum.lambdas[w.indices])
chisq, res = _fit_lines(fitworkspaces, ax=None)
res = np.concatenate([res, [(shift / parameters.PIXSHIFT_PRIOR) ** 2]])
#print(params, res)
spectrum.convert_from_flam_to_ADUrate()
#print(D, shift, chisq, res)
return res
# start = np.array([D, pixel_shift])
x, cov = optimize.leastsq(res_minimizer, start, epsfcn=1e-8)
# error = [parameters.DISTANCE2CCD_ERR, pixel_shift_step, 0.1] # 1*parameters.DISTANCE2CCD_ERR
# fix = [False, False, False]
# from iminuit import Minuit
# m = Minuit(res_minizer_minuit, start)
# m.errors = error
# m.errordef = 1
# m.fixed = fix
# m.print_level = 0
# m.limits = ((D - 5 * parameters.DISTANCE2CCD_ERR, D + 5 * parameters.DISTANCE2CCD_ERR), (-parameters.PIXSHIFT_PRIOR, parameters.PIXSHIFT_PRIOR), (0, 2))
# m.migrad()
# print("minuit", np.array(m.values[:]))
# D, pixel_shift = np.array(m.values[:])
# print(res.x)
# D, pixel_shift, e = m.values[:]
D, pixel_shift = x # res.x
# spectrum.disperser.D = D
x0 = [x0[0] + pixel_shift, x0[1]]
spectrum.x0 = x0
# check success, xO or D on the edge of their priors
distance = spectrum.chromatic_psf.get_algebraic_distance_along_dispersion_axis(shift_x=pixel_shift)
lambdas = spectrum.disperser.grating_pixel_to_lambda(distance - with_adr * adr_u, x0=x0,
D=D, order=spectrum.order)
spectrum.lambdas = lambdas
spectrum.lambdas_binwidths = np.gradient(lambdas)
spectrum.convert_from_ADUrate_to_flam()
spectrum.chromatic_psf.table['Dx'] -= pixel_shift
spectrum.chromatic_psf.table['Dy_disp_axis'] = distance * np.sin(spectrum.rotation_angle * np.pi / 180)
spectrum.pixels = np.copy(spectrum.chromatic_psf.table['Dx'])
chisq, res, detected_lines = detect_lines(spectrum.lines, spectrum.lambdas, spectrum.data,
spec_err=spectrum.err,
fwhm_func=fwhm_func, ax=None, calibration_lines_only=False)
# Convert back to flam units
# spectrum.convert_from_ADUrate_to_flam()
spectrum.my_logger.info(
f"\n\tOrder0 total shift: {pixel_shift:.3f}pix"
f"\n\tD = {D:.3f} mm (default: DISTANCE2CCD = {parameters.DISTANCE2CCD:.2f} "
f"+/- {parameters.DISTANCE2CCD_ERR:.2f} mm, "
f"{(D - parameters.DISTANCE2CCD) / parameters.DISTANCE2CCD_ERR:.1f} sigma shift)")
spectrum.header['PIXSHIFT'] = pixel_shift
spectrum.header['D2CCD'] = D
return lambdas
if __name__ == "__main__":
import doctest
doctest.testmod()