import copy
import numpy as np
from scipy.interpolate import interp1d
from spectractor.extractor.spectrum import Spectrum
from spectractor.extractor.targets import Target
from spectractor.extractor.psf import load_PSF
from spectractor.tools import fftconvolve_gaussian
from spectractor.config import set_logger
from spectractor.simulation.atmosphere import Atmosphere, AtmosphereGrid
import spectractor.parameters as parameters
[docs]
class SpectrumSimulation(Spectrum):
def __init__(self, spectrum, target=None, disperser=None, throughput=None, atmosphere=None, fast_sim=True, with_adr=True):
"""Class to simulate cross spectrum.
Parameters
----------
spectrum: Spectrum
Spectrum instance.
target: Target
Target instance.
throughput: TelescopeTransmission, optional
Telescope throughput (default: None).
disperser: Grating, optional
Disperser instance (default: None).
atmosphere: Atmosphere, optional
Atmosphere or AtmosphereGrid instance to make the atmospheric simulation (default: None).
fast_sim: bool, optional
If True, do a fast simulation without integrating within the wavelength bins (default: True).
with_adr: bool, optional
If True, use ADR model to build lambda array (default: False).
Examples
--------
>>> spectrum = Spectrum("./tests/data/reduc_20170530_134_spectrum.fits")
>>> parameters.SPECTRACTOR_ATMOSPHERE_SIM='libradtran'
>>> atmosphere = Atmosphere(airmass=1.2, pressure=800, temperature=10)
>>> sim = SpectrumSimulation(spectrum, atmosphere=atmosphere, fast_sim=True)
"""
Spectrum.__init__(self)
for k, v in list(spectrum.__dict__.items()):
self.__dict__[k] = copy.copy(v)
self.my_logger = set_logger(self.__class__.__name__)
# if parameters.CCD_REBIN > 1:
# self.chromatic_psf.table['Dx'] *= parameters.CCD_REBIN
# apply_rebinning_to_parameters(reverse=True)
if target is not None:
self.target = target
if disperser is not None:
self.disperser = disperser
if throughput is not None:
self.throughput = throughput
self.atmosphere = atmosphere
self.fast_sim = fast_sim
self.with_adr = with_adr
# save original pixel distances to zero order
# self.disperser.grating_lambda_to_pixel(self.lambdas, x0=self.x0, order=1)
# now reset data
self.lambdas = None
self.lambdas_order2 = None
self.err = None
self.model = lambda x: np.zeros_like(x)
self.model_err = lambda x: np.zeros_like(x)
lbdas_sed = self.target.wavelengths[0]
sub = np.where((lbdas_sed > parameters.LAMBDA_MIN) & (lbdas_sed < parameters.LAMBDA_MAX))
self.lambdas_step = min(parameters.LAMBDA_STEP, np.min(lbdas_sed[sub]))
[docs]
def simulate_without_atmosphere(self, lambdas):
"""Compute the spectrum of an object and its uncertainties
after its transmission throught the instrument except the atmosphere.
The units remain the ones of the Target instance.
Parameters
----------
lambdas: array_like
The wavelength array in nm
Returns
-------
spectrum: array_like
The spectrum in Target units.
spectrum_err: array_like
The spectrum uncertainties in Target units.
"""
self.lambdas = lambdas
self.lambdas_binwidths = np.gradient(lambdas)
self.data = self.disperser.transmission(lambdas)
self.data *= self.throughput.transmission(lambdas)
self.data *= self.target.sed(lambdas)
self.err = np.zeros_like(self.data)
idx = (self.throughput.transmission(lambdas) > 0) & (self.target.sed(lambdas) > 0)
self.err[idx] = np.sqrt((self.throughput.transmission_err(lambdas[idx]) / self.throughput.transmission(lambdas[idx]))**2 + (self.target.sed_err(lambdas[idx])/self.target.sed(lambdas[idx]))**2)
self.err[idx] *= np.abs(self.data[idx])
idx = (self.throughput.transmission(lambdas) <= 0) | (self.target.sed(lambdas) <= 0)
self.err[idx] = 10 * np.max(self.err)
return self.data, self.err
[docs]
def simulate(self, A1=1.0, A2=0., aerosols=0.05, angstrom_exponent=None, ozone=300, pwv=5, reso=0.,
D=parameters.DISTANCE2CCD, shift_x=0.):
"""Simulate the cross spectrum of an object and its uncertainties
after its transmission throught the instrument and the atmosphere.
Parameters
----------
A1: float
Global amplitude of the spectrum (default: 1).
A2: float
Relative amplitude of the order 2 spectrum contamination (default: 0).
aerosols: float
VAOD Vertical Aerosols Optical Depth
angstrom_exponent: float, optional
Angstrom exponent for aerosols.
If None, the Atmosphere.angstrom_exponent_default value is used (default: None).
ozone: float
Ozone quantity in Dobson
pwv: float
Precipitable Water Vapor quantity in mm
reso: float
Gaussian kernel size for the convolution (default: 0).
D: float
Distance between the CCD and the disperser in mm (default: parameters.DISTANCE2CCD)
shift_x: float
Shift in pixels of the order 0 position estimate (default: 0).
Returns
-------
lambdas: array_like
The wavelength array in nm used for the interpolation.
spectrum: array_like
The spectrum interpolated function in Target units.
spectrum_err: array_like
The spectrum uncertainties interpolated function in Target units.
Examples
--------
>>> spectrum = Spectrum("./tests/data/reduc_20170530_134_spectrum.fits")
>>> parameters.SPECTRACTOR_ATMOSPHERE_SIM='libradtran'
>>> atmosphere = AtmosphereGrid(atmgrid_filename="./tests/data/reduc_20170530_134_atmsim.fits")
>>> sim = SpectrumSimulation(spectrum, atmosphere=atmosphere, fast_sim=True)
>>> lambdas, model, model_err = sim.simulate(A1=1, A2=1, ozone=300, pwv=5, aerosols=0.05, reso=0.,
... D=parameters.DISTANCE2CCD, shift_x=0.)
>>> sim.plot_spectrum()
.. doctest::
:hide:
>>> assert np.sum(lambdas) > 0
>>> assert np.sum(model) > 0
>>> assert np.sum(model) < 1e-10
>>> assert np.sum(sim.data_next_order) >= 0
>>> assert np.sum(sim.data_next_order) < 1e-11
"""
# find lambdas including ADR effect
# must add ADR to get perfect result on atmospheric fit in full chain test with SpectrumSimulation()
lambdas = self.compute_lambdas_in_spectrogram(D, shift_x=shift_x, shift_y=0, angle=self.rotation_angle,
order=1, with_adr=self.with_adr, niter=5)
lambdas_order2 = self.compute_lambdas_in_spectrogram(D, shift_x=shift_x, shift_y=0, angle=self.rotation_angle,
order=2, with_adr=self.with_adr, niter=5)
self.lambdas = lambdas
if self.atmosphere is not None:
self.atmosphere.set_lambda_range(lambdas)
atmospheric_transmission = self.atmosphere.simulate(aerosols=aerosols, ozone=ozone, pwv=pwv, angstrom_exponent=angstrom_exponent)
else:
def atmospheric_transmission(lbda):
return 1
if self.fast_sim:
self.data, self.err = self.simulate_without_atmosphere(lambdas)
self.data *= A1 * atmospheric_transmission(lambdas)
self.err *= A1 * atmospheric_transmission(lambdas)
else:
def integrand(lbda):
return self.target.sed(lbda) * self.throughput.transmission(lbda) \
* self.disperser.transmission(lbda) * atmospheric_transmission(lbda)
self.data = np.zeros_like(lambdas)
self.err = np.zeros_like(lambdas)
for i in range(len(lambdas) - 1):
lbdas = np.arange(lambdas[i], lambdas[i + 1], self.lambdas_step)
self.data[i] = A1 * np.mean(integrand(lbdas))
# self.data[i] = A1 * quad(integrand, lambdas[i], lambdas[i + 1])[0]
self.data[-1] = self.data[-2]
# self.data /= np.gradient(lambdas)
telescope_transmission = self.throughput.transmission(lambdas)
idx = self.data > 0
self.err[idx] = self.data[idx] * self.throughput.transmission_err(lambdas)[idx] / telescope_transmission[idx]
idx = self.data <= 0
self.err[idx] = 10 * np.max(self.err)
# Now add the systematics
if reso > 0.1:
self.data = fftconvolve_gaussian(self.data, reso)
# self.err = np.sqrt(np.abs(fftconvolve_gaussian(self.err ** 2, reso)))
if A2 > 0:
lambdas_binwidths_order2 = np.gradient(lambdas_order2)
sim_conv = interp1d(lambdas, self.data * lambdas, kind="linear", bounds_error=False, fill_value=(0, 0))
err_conv = interp1d(lambdas, self.err * lambdas, kind="linear", bounds_error=False, fill_value=(0, 0))
spectrum_order2 = self.disperser.ratio_order_2over1(lambdas_order2) * sim_conv(lambdas_order2) \
* lambdas_binwidths_order2 / self.lambdas_binwidths
err_order2 = err_conv(lambdas_order2) * lambdas_binwidths_order2 / self.lambdas_binwidths
self.data = (sim_conv(lambdas) + A2 * spectrum_order2) / lambdas
self.data_order2 = A2 * spectrum_order2 / lambdas
self.err = (err_conv(lambdas) + A2 * err_order2) / lambdas
if np.any(self.err <= 0) and not np.all(self.err<=0):
min_positive = np.min(self.err[self.err > 0])
self.err[np.isclose(self.err, 0., atol=0.01 * min_positive)] = min_positive
# Save the truth parameters
self.header['OZONE_T'] = ozone
self.header['ANGEXP_T'] = angstrom_exponent
self.header['PWV_T'] = pwv
self.header['VAOD_T'] = aerosols
self.header['A1_T'] = A1
self.header['A2_T'] = A2
self.header['RESO_T'] = reso
self.header['D2CCD_T'] = D
self.header['X0_T'] = shift_x
return self.lambdas, self.data, self.err
[docs]
class SpectrogramModel(Spectrum):
def __init__(self, spectrum, target=None, disperser=None, throughput=None, atmosphere=None, diffraction_orders=None,
fast_sim=True, full_image=False, with_adr=True):
"""Class to simulate a spectrogram.
Parameters
----------
spectrum: Spectrum
Spectrum instance to load main properties before simulation.
target: Target
Target instance.
throughput: TelescopeTransmission, optional
Telescope throughput (default: None).
disperser: Grating, optional
Disperser instance (default: None).
atmosphere: Atmosphere, optional
Atmosphere or AtmosphereGrid instance to make the atmospheric simulation (default: None).
diffraction_orders: array_like, optional
List of diffraction orders to simulate. If None, takes first three (default: None).
fast_sim: bool, optional
If True, perform a fast simulation of the spectrum without integrated the spectrum
in pixel bins (default: True).
full_image: bool, optional
If True, simulate the spectrogram on the full CCD size,
otherwise only the cropped spectrogram (default: False).
with_adr: bool, optional
If True, simulate the spectrogram with ADR effect (default: True).
Examples
--------
>>> spectrum = Spectrum("./tests/data/reduc_20170530_134_spectrum.fits")
>>> parameters.SPECTRACTOR_ATMOSPHERE_SIM='libradtran'
>>> atmosphere = Atmosphere(airmass=1.2, pressure=800, temperature=10)
>>> sim = SpectrogramModel(spectrum, atmosphere=atmosphere, fast_sim=True)
"""
Spectrum.__init__(self)
if diffraction_orders is None:
self.diffraction_orders = np.arange(spectrum.order, spectrum.order + 3 * np.sign(spectrum.order), np.sign(spectrum.order))
else:
self.diffraction_orders = diffraction_orders
if self.diffraction_orders[0] != 1:
raise NotImplementedError("Spectrogram simulations are only implemented for 1st diffraction order and followings.")
if len(self.diffraction_orders) == 0:
raise ValueError(f"At least one diffraction order must be given for spectrogram simulation.")
for k, v in list(spectrum.__dict__.items()):
self.__dict__[k] = copy.copy(v)
if target is not None:
self.target = target
if disperser is not None:
self.disperser = disperser
if throughput is not None:
self.throughput = throughput
self.atmosphere = atmosphere
# load the disperser relative transmissions
self.tr_ratio = interp1d(parameters.LAMBDAS, np.ones_like(parameters.LAMBDAS), bounds_error=False, fill_value=1.)
if abs(self.order) == 1:
self.tr_ratio_next_order = self.disperser.ratio_order_2over1
self.tr_ratio_next_next_order = self.disperser.ratio_order_3over1
elif abs(self.order) == 2:
self.tr_ratio_next_order = self.disperser.ratio_order_3over2
self.tr_ratio_next_next_order = None
elif abs(self.order) == 3:
self.tr_ratio_next_order = None
self.tr_ratio_next_next_order = None
else:
raise ValueError(f"{abs(self.order)=}: must be 1, 2 or 3. "
f"Higher diffraction orders not implemented yet in full forward model.")
self.tr = [self.tr_ratio, self.tr_ratio_next_order, self.tr_ratio_next_next_order]
self.true_lambdas = None
self.true_spectrum = None
self.lambdas = None
self.model = lambda x, y: np.zeros((x.size, y.size))
self.psf = load_PSF(psf_type=parameters.PSF_TYPE)
self.fix_psf_cube = False
# PSF cube computation
self.psf_cubes = {}
self.psf_cubes_masked = {}
self.boundaries = {}
self.profile_params = {}
self.M_sparse_indices = {}
self.psf_cube_sparse_indices = {}
for order in self.diffraction_orders:
self.psf_cubes[order] = None
self.psf_cubes_masked[order] = None
self.boundaries[order] = {}
self.profile_params[order] = None
self.M_sparse_indices[order] = None
self.psf_cube_sparse_indices[order] = None
self.fix_psf_cube = False
self.fix_atm_sim = False
self.atmosphere_sim = None
self.fast_sim = fast_sim
self.full_image = full_image
self.with_adr = with_adr
if self.full_image:
self.Nx = parameters.CCD_IMSIZE
self.Ny = self.spectrogram_Ny # too long if =parameters.CCD_IMSIZE
self.r0 = self.x0[0] + 1j * self.spectrogram_y0
else:
self.Nx = self.spectrogram_Nx
self.Ny = self.spectrogram_Ny
self.r0 = self.spectrogram_x0 + 1j * self.spectrogram_y0
lbdas_sed = self.target.wavelengths[0]
sub = np.where((lbdas_sed > parameters.LAMBDA_MIN) & (lbdas_sed < parameters.LAMBDA_MAX))
self.lambdas_step = min(parameters.LAMBDA_STEP, np.min(lbdas_sed[sub]))
self.yy, self.xx = np.mgrid[:self.Ny, :self.Nx]
self.pixels = np.asarray([self.xx, self.yy])
[docs]
def set_true_spectrum(self, lambdas, aerosols, ozone, pwv, shift_t=0.):
atmosphere = self.atmosphere.simulate(aerosols=aerosols, ozone=ozone, pwv=pwv)
spectrum = self.target.sed(lambdas)
spectrum *= self.disperser.transmission(lambdas - shift_t)
spectrum *= self.throughput.transmission(lambdas - shift_t)
spectrum *= atmosphere(lambdas)
self.true_spectrum = spectrum
self.true_lambdas = lambdas
return spectrum
[docs]
def simulate_spectrum(self, lambdas, atmosphere, shift_t=0.):
"""
Simulate the spectrum of the object and return the result in Target units.
Parameters
----------
lambdas: array_like
The wavelength array in nm.
atmosphere: callable
A callable function of the atmospheric transmission.
shift_t: float
Shift of the transmission in nm (default: 0).
Returns
-------
spectrum: array_like
The spectrum array in ADU/s units.
spectrum_err: array_like
The spectrum uncertainty array in ADU/s units.
"""
spectrum = np.zeros_like(lambdas)
telescope_transmission = self.throughput.transmission(lambdas - shift_t)
if self.fast_sim:
spectrum = self.target.sed(lambdas)
spectrum *= self.disperser.transmission(lambdas - shift_t)
spectrum *= telescope_transmission
spectrum *= atmosphere(lambdas)
spectrum *= parameters.FLAM_TO_ADURATE * lambdas * np.gradient(lambdas)
else:
def integrand(lbda):
return lbda * self.target.sed(lbda) * self.throughput.transmission(lbda - shift_t) \
* self.disperser.transmission(lbda - shift_t) * atmosphere(lbda)
for i in range(len(lambdas) - 1):
# spectrum[i] = parameters.FLAM_TO_ADURATE * quad(integrand, lambdas[i], lambdas[i + 1])[0]
lbdas = np.arange(lambdas[i], lambdas[i + 1], self.lambdas_step)
spectrum[i] = parameters.FLAM_TO_ADURATE * np.mean(integrand(lbdas)) * (lambdas[i + 1] - lambdas[i])
spectrum[-1] = spectrum[-2]
spectrum_err = np.zeros_like(spectrum)
idx = telescope_transmission > 0
spectrum_err[idx] = self.throughput.transmission_err(lambdas)[idx] / telescope_transmission[idx] * spectrum[idx]
idx = (telescope_transmission > 0) & (self.target.sed(lambdas) > 0)
spectrum_err[idx] = np.sqrt((self.throughput.transmission_err(lambdas[idx]) / telescope_transmission[idx])**2 + (self.target.sed_err(lambdas[idx])/self.target.sed(lambdas[idx]))**2)
spectrum_err[idx] *= np.abs(spectrum[idx])
idx = (telescope_transmission <= 0) | (self.target.sed(lambdas) <= 0) | (spectrum_err <= 0)
spectrum_err[idx] = 10 * np.max(spectrum_err)
return spectrum, spectrum_err
[docs]
def simulate(self, A1=1.0, A2=0., A3=0., aerosols=0.05, angstrom_exponent=None, ozone=300, pwv=5,
D=parameters.DISTANCE2CCD, shift_x=0., shift_y=0., angle=0., psf_poly_params=None):
"""
Parameters
----------
A1: float
Global amplitude of the spectrum (default: 1).
A2: float
Relative amplitude of the order 2 spectrum contamination (default: 0).
A3: float
Relative amplitude of the order 3 spectrum contamination (default: 0).
aerosols: float
VAOD Vertical Aerosols Optical Depth.
angstrom_exponent: float, optional
Angstrom exponent for aerosols.
If None, the Atmosphere.angstrom_exponent_default value is used (default: None).
ozone: float
Ozone quantity in Dobson.
pwv: float
Precipitable Water Vapor quantity in mm.
D: float
Distance between the CCD and the disperser in mm (default: parameters.DISTANCE2CCD)
shift_x: float
Shift in pixels along x axis of the order 0 position estimate (default: 0).
shift_y: float
Shift in pixels along y axis of the order 0 position estimate (default: 0).
angle: float
Angle of the dispersion axis in degree (default: 0).
psf_poly_params: array_like
Polynomial parameters describing the PSF dependence in wavelength (default: None).
Returns
-------
lambdas: array_like
The wavelength array in nm used for the interpolation.
spectrogram: array_like
The spectrogram array in ADU/s units.
spectrogram_err: array_like
The spectrogram uncertainty array in ADU/s units.
Example
-------
>>> spec = Spectrum("./tests/data/reduc_20170530_134_spectrum.fits")
>>> spec.disperser.ratio_ratio_order_3over2 = lambda lbda: 0.1
>>> psf_poly_params = list(spec.chromatic_psf.from_table_to_poly_params()) * 3
>>> parameters.SPECTRACTOR_ATMOSPHERE_SIM='libradtran'
>>> atmosphere = Atmosphere(airmass=1.2, pressure=800, temperature=10)
>>> sim = SpectrogramModel(spec, atmosphere=atmosphere, fast_sim=True)
>>> lambdas, model, model_err = sim.simulate(A2=1, angle=-1.5, psf_poly_params=psf_poly_params)
>>> sim.plot_spectrogram()
.. doctest::
:hide:
>>> assert np.sum(lambdas) > 0
>>> assert np.sum(model) > 20
"""
poly_params = np.array(psf_poly_params).reshape((len(self.diffraction_orders), -1))
self.rotation_angle = angle
self.lambdas = self.compute_lambdas_in_spectrogram(D, shift_x, shift_y, angle, niter=5, with_adr=True,
order=self.diffraction_orders[0])
self.lambdas_binwidths = np.gradient(self.lambdas)
if self.atmosphere_sim is None or not self.fix_atm_sim:
self.atmosphere.set_lambda_range(self.lambdas)
self.atmosphere_sim = self.atmosphere.simulate(aerosols=aerosols, ozone=ozone, pwv=pwv,
angstrom_exponent=angstrom_exponent)
spectrum, spectrum_err = self.simulate_spectrum(self.lambdas, self.atmosphere_sim)
As = [1, A2, A3]
ima = np.zeros((self.Ny, self.Nx), dtype="float32")
ima_err2 = np.zeros((self.Ny, self.Nx), dtype="float32")
for k, order in enumerate(self.diffraction_orders):
if self.tr[k] is None or As[k] == 0: # diffraction order undefined
continue
# Dispersion law
dispersion_law = self.compute_dispersion_in_spectrogram(self.lambdas, D, shift_x, shift_y, angle,
with_adr=True, order=order)
# Spectrum amplitude is in ADU/s
spec = As[k] * self.tr[k](self.lambdas) * spectrum
spec_err = As[k] * self.tr[k](self.lambdas) * spectrum_err
if np.any(np.isnan(spec)):
spec[np.isnan(spec)] = 0.
# Evaluate PSF profile
if self.profile_params[order] is None or not self.fix_psf_cube:
if k==0:
self.profile_params[order] = self.chromatic_psf.update(poly_params[k], x0=self.r0.real + shift_x,
y0=self.r0.imag + shift_y, angle=angle, plot=False, apply_bounds=True)
else:
self.profile_params[order] = self.chromatic_psf.from_poly_params_to_profile_params(poly_params[k], apply_bounds=True)
self.profile_params[order][:, 0] = 1
self.profile_params[order][:, 1] = dispersion_law.real + self.r0.real
self.profile_params[order][:, 2] += dispersion_law.imag
if k == 0:
self.chromatic_psf.table["Dx"] = self.profile_params[order][:, 1] - self.r0.real
self.chromatic_psf.table["Dy"] = self.profile_params[order][:, 2] - self.r0.imag
# Fill the PSF cube for each diffraction order
argmin = max(0, int(np.argmin(np.abs(self.profile_params[order][:, 1]))))
argmax = min(self.Nx, np.argmin(np.abs(self.profile_params[order][:, 1]-self.Nx)) + 1)
if self.psf_cubes[order] is None or not self.fix_psf_cube:
self.psf_cubes[order] = self.chromatic_psf.build_psf_cube(self.pixels, self.profile_params[order],
fwhmx_clip=3 * parameters.PSF_FWHM_CLIP,
fwhmy_clip=parameters.PSF_FWHM_CLIP,
dtype="float32",
mask=self.psf_cubes_masked[order],
boundaries=self.boundaries[order])
# Assemble all diffraction orders in simulation
for x in range(argmin, argmax):
if self.boundaries[order]:
xmin = self.boundaries[order]["xmin"][x]
xmax = self.boundaries[order]["xmax"][x]
ymin = self.boundaries[order]["ymin"][x]
ymax = self.boundaries[order]["ymax"][x]
else:
xmin, xmax = 0, self.Nx
ymin, ymax = 0, self.Ny
ima[ymin:ymax, xmin:xmax] += spec[x] * self.psf_cubes[order][x, ymin:ymax, xmin:xmax]
ima_err2[ymin:ymax, xmin:xmax] += (spec_err[x] * self.psf_cubes[order][x, ymin:ymax, xmin:xmax])**2
if np.allclose(self.profile_params[order][:, 0] , 1):
self.profile_params[order][:, 0] = spec
# self.spectrogram is in ADU/s units here
self.spectrogram_data = A1 * ima
self.spectrogram_err = A1 * np.sqrt(ima_err2)
# Save the simulation parameters
self.psf_poly_params = np.copy(poly_params[0])
self.header['OZONE_T'] = ozone
self.header['PWV_T'] = pwv
self.header['VAOD_T'] = aerosols
self.header['A1_T'] = A1
self.header['A2_T'] = A2
self.header['A3_T'] = A3
self.header['D2CCD_T'] = D
self.header['X0_T'] = shift_x
self.header['Y0_T'] = shift_y
self.header['ROTANGLE'] = angle
return self.lambdas, self.spectrogram_data, self.spectrogram_err
if __name__ == "__main__":
import doctest
doctest.testmod()