import numpy as np
import os
import sys
import matplotlib.pyplot as plt
import matplotlib as mpl
from tqdm import tqdm
import copy
from scipy.interpolate import interp1d
from scipy.signal import convolve2d
from scipy import sparse
from astropy.table import Table
from spectractor.tools import (rescale_x_to_legendre, plot_image_simple, compute_fwhm, cholesky_solve)
from spectractor.extractor.background import extract_spectrogram_background_sextractor
from spectractor.extractor.psf import PSF, PSFFitWorkspace, MoffatGauss, Moffat
from spectractor import parameters
from spectractor.config import set_logger
from spectractor.fit.fitter import (FitParameters, FitWorkspace, run_minimisation, run_minimisation_sigma_clipping,
RegFitWorkspace)
try:
import sparse_dot_mkl
except ModuleNotFoundError:
sparse_dot_mkl = None
[docs]
class ChromaticPSF:
"""Class to store a PSF evolving with wavelength.
The wavelength evolution is stored in an Astropy table instance. Whatever the PSF model, the common keywords are:
* lambdas: the wavelength [nm]
* Dx: the distance along X axis to order 0 position of the PSF model centroid [pixels]
* Dy: the distance along Y axis to order 0 position of the PSF model centroid [pixels]
* Dy_disp_axis: the distance along Y axis to order 0 position of the mean dispersion axis [pixels]
* flux_sum: the transverse sum of the data flux [spectrogram units]
* flux_integral: the integral of the best fitting PSF model to the data (should be equal to the amplitude parameter
of the PSF model if the model is correclty normalized to one) [spectrogram units]
* flux_err: the uncertainty on flux_sum [spectrogram units]
* fwhm: the FWHM of the best fitting PSF model [pixels]
* Dy_fwhm_sup: the distance along Y axis to order 0 position of the upper FWHM edge [pixels]
* Dy_fwhm_inf: the distance along Y axis to order 0 position of the lower FWHM edge [pixels]
Then all the specific parameter of the PSF model are stored in other columns with their wavelength evolution
(read from PSF.param_names attribute).
A saturation level should be specified in data units.
"""
def __init__(self, psf, Nx, Ny, x0=0, y0=None, deg=4, saturation=None, file_name=''):
"""Initialize a ChromaticPSF instance.
Parameters
----------
psf: PSF
The PSF class model to build the PSF.
Nx: int
The number of pixels along the dispersion axis.
Ny: int
The number of pixels transverse to the dispersion axis.
x0: float
Relative position to pixel (0, 0) of the order 0 position along x (default: 0).
y0: float
Relative position to pixel (0, 0) of the order 0 position along y.
If None, y0 is set at Ny/2 (default: None).
deg: int, optional
The degree of the polynomial functions to model the chromatic evolution
of the PSF shape parameters (default: 4).
saturation: float, optional
The image saturation level (default: None).
file_name: str, optional
A path to a CSV file containing a ChromaticPSF table and load it (default: "").
"""
self.my_logger = set_logger(self.__class__.__name__)
self.psf = psf
self.deg = -1
self.degrees = {}
self.set_polynomial_degrees(deg)
self.Nx = Nx
self.Ny = Ny
self.x0 = x0
if y0 is None:
y0 = Ny / 2
self.y0 = y0
self.profile_params = np.zeros((Nx, len(self.psf.params.labels)))
yy, xx = np.mgrid[:self.Ny, :self.Nx]
self.pixels = np.asarray([xx, yy])
self.saturation = 1e20
self.psf_param_start_index = 0
self.n_poly_params = 0
self.fitted_pixels = 0
if file_name == '':
arr = np.zeros((self.Nx, len(self.psf.params.labels) + 10))
self.table = Table(arr, names=['lambdas', 'Dx', 'Dy', 'Dy_disp_axis', 'flux_sum', 'flux_integral', 'flux_err',
'fwhm', 'Dy_fwhm_sup', 'Dy_fwhm_inf'] + list(self.psf.params.labels))
else:
self.table = Table.read(file_name)
self.params = None
self.init_from_table(self.table, saturation=saturation)
self.opt_reg = parameters.PSF_FIT_REG_PARAM
self.cov_matrix = np.zeros((Nx, Nx))
if file_name != "":
self.params.values = self.from_table_to_poly_params()
[docs]
def init_from_table(self, table, saturation=None):
self.table = table
self.n_poly_params = len(self.table)
self.fitted_pixels = np.arange(len(self.table)).astype(int)
self.saturation = saturation
if saturation is None:
self.saturation = 1e20
self.my_logger.warning(f"\n\tSaturation level should be given to instanciate the ChromaticPSF "
f"object. self.saturation is set arbitrarily to 1e20. Good luck.")
for name in self.psf.params.labels:
if "amplitude" in name:
continue
self.n_poly_params += self.degrees[name] + 1
labels = [] # [f"a{k}" for k in range(self.poly_params.size)]
axis_names = [] # "$a_{" + str(k) + "}$" for k in range(self.poly_params.size)]
for ip, p in enumerate(self.psf.params.labels):
if ip == 0:
labels += [f"{p}_{k}" for k in range(len(self.table))]
axis_names += ["$" + self.psf.params.axis_names[ip].replace("$", "") + "_{" + str(k) + "}$" for k in range(len(self.table))]
else:
for k in range(self.degrees[p] + 1):
labels.append(f"{p}_{k}")
axis_names.append("$" + self.psf.params.axis_names[ip].replace("$", "") + "^{(" + str(k) + ")}$")
self.params = FitParameters(values=np.zeros(self.n_poly_params), labels=labels, axis_names=axis_names)
[docs]
def resize_table(self, new_Nx):
"""Resize the table and interpolate existing values to a new length size.
Parameters
----------
new_Nx: int
New length of the ChromaticPSF on X axis.
Examples
--------
>>> psf = Moffat()
>>> s = ChromaticPSF(psf, Nx=20, Ny=5, deg=1, saturation=20000)
>>> params = s.generate_test_poly_params()
>>> s.fill_table_with_profile_params(s.from_poly_params_to_profile_params(params))
>>> print(np.sum(s.table["gamma"]))
100.0
>>> print(s.table["gamma"].size)
20
>>> s.resize_table(10)
>>> print(np.sum(s.table["gamma"]))
50.0
>>> print(s.table["gamma"].size)
10
"""
new_chromatic_psf = ChromaticPSF(psf=self.psf, Nx=new_Nx, Ny=self.Ny, x0=self.x0, y0=self.y0,
deg=self.deg, saturation=self.saturation)
old_x = np.linspace(0, 1, self.Nx)
new_x = np.linspace(0, 1, new_Nx)
for colname in self.table.colnames:
new_chromatic_psf.table[colname] = np.interp(new_x, old_x, np.array(self.table[colname]))
self.init_from_table(table=new_chromatic_psf.table, saturation=self.saturation)
self.__dict__.update(new_chromatic_psf.__dict__)
[docs]
def crop_table(self, new_Nx):
"""Crop the table to a new length size.
Parameters
----------
new_Nx: int
New length of the ChromaticPSF on X axis.
Examples
--------
>>> psf = Moffat()
>>> s = ChromaticPSF(psf, Nx=20, Ny=5, deg=1, saturation=20000)
>>> params = s.generate_test_poly_params()
>>> s.fill_table_with_profile_params(s.from_poly_params_to_profile_params(params))
>>> print(np.sum(s.table["gamma"]))
100.0
>>> print(s.table["gamma"].size)
20
>>> s.crop_table(10)
>>> print(np.sum(s.table["gamma"]))
50.0
>>> print(s.table["gamma"].size)
10
"""
new_chromatic_psf = ChromaticPSF(psf=self.psf, Nx=new_Nx, Ny=self.Ny, x0=self.x0, y0=self.y0,
deg=self.deg, saturation=self.saturation)
for colname in self.table.colnames:
new_chromatic_psf.table[colname] = self.table[colname][:new_Nx]
self.init_from_table(table=new_chromatic_psf.table, saturation=self.saturation)
self.__dict__.update(new_chromatic_psf.__dict__)
[docs]
def set_polynomial_degrees(self, deg):
self.deg = deg
self.degrees = {key: deg for key in self.psf.params.labels}
self.degrees["x_c"] = max(self.degrees["x_c"], 1)
self.degrees['saturation'] = 0
[docs]
def generate_test_poly_params(self):
"""
A set of parameters to define a test spectrogram. PSF function must be MoffatGauss
for this test example.
Returns
-------
profile_params: array
The list of the test parameters
Examples
--------
>>> psf = MoffatGauss()
>>> s = ChromaticPSF(psf, Nx=5, Ny=4, deg=1, saturation=20000)
>>> params = s.generate_test_poly_params()
.. doctest::
:hide:
>>> assert(np.all(np.isclose(params,[10, 50, 100, 150, 200, 0, 0, 0, 0, 5, 0, 2, 0, -0.4, 0, 1, 0, 20000])))
"""
if not isinstance(self.psf, MoffatGauss) and not isinstance(self.psf, Moffat):
raise TypeError(f"In this test function, PSF model must be MoffatGauss or Moffat. Gave {type(self.psf)}.")
params = [50 * i for i in range(self.Nx)]
params[0] = 10
# add absorption lines
if self.Nx > 80:
params = list(np.array(params)
- 3000 * np.exp(-((np.arange(self.Nx) - 70) / 2) ** 2)
- 2000 * np.exp(-((np.arange(self.Nx) - 50) / 2) ** 2))
params += [0.] * (self.degrees['x_c'] - 1) + [0, 0] # x mean
params += [0.] * (self.degrees['y_c'] - 1) + [0, 0] # y mean
params += [0.] * (self.degrees['gamma'] - 1) + [0, 5] # gamma
params += [0.] * (self.degrees['alpha'] - 1) + [0, 2] # alpha
if isinstance(self.psf, MoffatGauss):
params += [0.] * (self.degrees['eta_gauss'] - 1) + [0, -0.4] # eta_gauss
params += [0.] * (self.degrees['stddev'] - 1) + [0, 1] # stddev
params += [self.saturation] # saturation
poly_params = np.zeros_like(params)
poly_params[:self.Nx] = params[:self.Nx]
index = self.Nx - 1
for name in self.psf.params.labels:
if name == 'amplitude':
continue
else:
shift = self.degrees[name] + 1
if parameters.PSF_POLY_TYPE == "legendre":
c = np.polynomial.legendre.poly2leg(params[index + shift:index:-1])
elif parameters.PSF_POLY_TYPE == "polynomial":
c = np.asarray(params[index + shift:index:-1])
else:
raise ValueError(f"Unknown polynomial type {parameters.PSF_POLY_TYPE=}.")
coeffs = np.zeros(shift)
coeffs[:c.size] = c
poly_params[index + 1:index + shift + 1] = coeffs
index = index + shift
return poly_params
[docs]
def set_pixels(self, mode):
"""Return the pixels array to evaluate ChromaticPSF.
If mode='1D', one 1D array of pixels along y axis is returned.
If mode='2D', two 2D meshgrid arrays of pixels are returned.
Parameters
----------
mode, str
Must be '1D' or '2D'.
Returns
-------
pixels: array_like
The pixel array.
Examples
--------
>>> psf = MoffatGauss()
>>> s = ChromaticPSF(psf, Nx=5, Ny=4, deg=1, saturation=20000)
>>> pixels = s.set_pixels(mode='1D')
>>> pixels.shape
(4,)
>>> pixels = s.set_pixels(mode='2D')
>>> pixels.shape
(2, 4, 5)
"""
if mode == "2D":
yy, xx = np.mgrid[:self.Ny, :self.Nx]
pixels = np.asarray([xx, yy])
elif mode == "1D":
pixels = np.arange(self.Ny)
else:
raise ValueError(f"Unknown evaluation mode={mode}. Must be '1D' or '2D'.")
return pixels
[docs]
def evaluate(self, pixels, poly_params, fwhmx_clip=parameters.PSF_FWHM_CLIP,
fwhmy_clip=parameters.PSF_FWHM_CLIP, dtype="float64", mask=None, boundaries=None):
"""Simulate a 2D spectrogram of size Nx times Ny.
Given a set of polynomial parameters defining the chromatic PSF model, a 2D spectrogram
is produced either summing transverse 1D PSF profiles along the dispersion axis, or
full 2D PSF profiles.
Parameters
----------
pixels: array_like
The pixel array. If `pixels.ndim==1`, ChromaticPSF is evaluated using 1D PSF slices. Otherwise, `pixels`
must have a shape like (2, Nx, Ny).
poly_params: array_like
Parameter array of the model, in the form:
- Nx first parameters are amplitudes for the Moffat transverse profiles
- next parameters are polynomial coefficients for all the PSF parameters in the same order
as in PSF definition, except amplitude.
fwhmx_clip: int, optional
Clip PSF evaluation outside fwhmx*FWHM along x axis (default: parameters.PSF_FWHM_CLIP).
fwhmy_clip: int, optional
Clip PSF evaluation outside fwhmy*FWHM along y axis (default: parameters.PSF_FWHM_CLIP).
dtype: str, optional
Type of the output array (default: 'float64').
mask: array_like, optional
Cube of booleans where values are masked (default: None).
boundaries: dict, optional
Dictionary of boundaries for fast evaluation with keys ymin, ymax, xmin, xmax (default: None).
Returns
-------
output: array
A 2D array with the model.
Examples
--------
>>> psf = MoffatGauss()
>>> s = ChromaticPSF(psf, Nx=100, Ny=20, deg=4, saturation=20000)
>>> poly_params = s.generate_test_poly_params()
1D evaluation:
.. doctest::
>>> output = s.evaluate(s.set_pixels(mode="1D"), poly_params)
>>> im = plt.imshow(output, origin='lower') # doctest: +ELLIPSIS
>>> plt.colorbar(im) # doctest: +ELLIPSIS
<matplotlib.colorbar.Colorbar object at 0x...>
>>> plt.show()
.. plot ::
from spectractor.extractor.chromaticpsf import ChromaticPSF
from spectractor.extractor.psf import MoffatGauss
psf = MoffatGauss()
s = ChromaticPSF(psf, Nx=100, Ny=20, deg=4, saturation=20000)
poly_params = s.generate_test_poly_params()
output = s.evaluate(s.set_pixels(mode="1D"), poly_params)
im = plt.imshow(output, origin='lower')
plt.colorbar(im)
plt.show()
2D evaluation:
.. doctest::
>>> output = s.evaluate(s.set_pixels(mode="2D"), poly_params)
>>> im = plt.imshow(output, origin='lower') # doctest: +ELLIPSIS
>>> plt.colorbar(im) # doctest: +ELLIPSIS
<matplotlib.colorbar.Colorbar object at 0x...>
>>> plt.show()
.. plot ::
from spectractor.extractor.chromaticpsf import ChromaticPSF
from spectractor.extractor.psf import MoffatGauss
psf = MoffatGauss()
s = ChromaticPSF(psf, Nx=100, Ny=20, deg=4, saturation=20000)
poly_params = s.generate_test_poly_params()
output = s.evaluate(s.set_pixels(mode="2D"), poly_params)
im = plt.imshow(output, origin='lower')
plt.colorbar(im)
plt.show()
"""
profile_params = self.from_poly_params_to_profile_params(poly_params, apply_bounds=True)
if pixels.ndim == 3:
mode = "2D"
Ny, Nx = pixels[0].shape
elif pixels.ndim == 1:
mode = "1D"
Ny, Nx = pixels.size, profile_params.shape[0]
else:
raise ValueError(f"pixels argument must have shape (2, Nx, Ny) or (Ny). Got {pixels.shape=}.")
self.params.values = np.asarray(poly_params).astype(float)
self.psf.apply_max_width_to_bounds(max_half_width=Ny)
profile_params[:, 1] = np.arange(Nx) # replace x_c
output = np.zeros((Ny, Nx), dtype=dtype)
if mode == "1D":
for x in range(Nx):
output[:, x] = self.psf.evaluate(pixels, values=profile_params[x, :])
elif mode == "2D":
self.from_profile_params_to_shape_params(profile_params)
psf_cube = self.build_psf_cube(pixels, profile_params, fwhmx_clip=fwhmx_clip, fwhmy_clip=fwhmy_clip,
dtype=dtype, mask=mask, boundaries=boundaries)
output = np.sum(psf_cube, axis=0)
return np.clip(output, 0, self.saturation)
[docs]
def build_psf_cube(self, pixels, profile_params, fwhmx_clip=parameters.PSF_FWHM_CLIP,
fwhmy_clip=parameters.PSF_FWHM_CLIP, dtype="float64", mask=None, boundaries=None):
"""Build a cube, with one slice per wavelength evaluation which contains the PSF evaluation.
Parameters
----------
pixels: np.ndarray
Array of pixels to evaluate ChromaticPSF.
profile_params: array_like
ChromaticPSF profile parameters.
fwhmx_clip: int, optional
Clip PSF evaluation outside fwhmx*FWHM along x axis (default: parameters.PSF_FWHM_CLIP).
fwhmy_clip: int, optional
Clip PSF evaluation outside fwhmy*FWHM along y axis (default: parameters.PSF_FWHM_CLIP).
dtype: str, optional
Type of the output array (default: 'float64').
mask: array_like, optional
Cube of booleans where values are masked (default: None).
boundaries: dict, optional
Dictionary of boundaries for fast evaluation with keys ymin, ymax, xmin, xmax (default: None).
Returns
-------
psf_cube: np.ndarray
Cube of chromatic PSF evaluations, each slice being a PSF for a given wavelength.
Examples
--------
>>> s = ChromaticPSF(Moffat(), Nx=100, Ny=20, deg=2, saturation=20000)
>>> profile_params = s.from_poly_params_to_profile_params(s.generate_test_poly_params(), apply_bounds=True)
>>> profile_params[:, 1] = np.arange(s.Nx)
>>> psf_cube = s.build_psf_cube(s.set_pixels(mode="2D"), profile_params)
>>> psf_cube.shape
(100, 20, 100)
>>> plt.imshow(psf_cube[20], origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
>>> plt.imshow(psf_cube[80], origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
.. doctest::
:hide:
>>> assert psf_cube.shape == (s.Nx, s.Ny, s.Nx)
>>> np.argmax(np.sum(psf_cube[20], axis=0))
20
>>> np.argmax(np.sum(psf_cube[80], axis=0))
80
"""
if pixels.ndim == 3:
mode = "2D"
Ny, Nx = pixels[0].shape
elif pixels.ndim == 1:
mode = "1D"
Ny, Nx = pixels.size, profile_params.shape[0]
else:
raise ValueError(f"pixels argument must have shape (2, Nx, Ny) or (Ny). Got {pixels.shape=}.")
psf_cube = np.zeros((len(profile_params), Ny, Nx), dtype=dtype)
fwhms = self.table["fwhm"]
for x in range(len(profile_params)):
xc, yc = profile_params[x, 1:3]
if xc < - fwhmx_clip * fwhms[x]:
continue
if xc > Nx + fwhmx_clip * fwhms[x]:
break
if mask is None and not boundaries:
xmin = max(0, int(xc - max(1*parameters.PIXWIDTH_SIGNAL, fwhmx_clip * fwhms[x])))
xmax = min(Nx, int(xc + max(1*parameters.PIXWIDTH_SIGNAL, fwhmx_clip * fwhms[x])))
ymin = max(0, int(yc - max(1*parameters.PIXWIDTH_SIGNAL, fwhmy_clip * fwhms[x])))
ymax = min(Ny, int(yc + max(1*parameters.PIXWIDTH_SIGNAL, fwhmy_clip * fwhms[x])))
elif boundaries:
xmin = boundaries["xmin"][x]
xmax = boundaries["xmax"][x]
ymin = boundaries["ymin"][x]
ymax = boundaries["ymax"][x]
else:
maskx = np.any(mask[x], axis=0)
masky = np.any(mask[x], axis=1)
xmin = np.argmax(maskx)
ymin = np.argmax(masky)
xmax = len(maskx) - np.argmax(maskx[::-1])
ymax = len(masky) - np.argmax(masky[::-1])
if mode == "2D":
psf_cube[x, ymin:ymax, xmin:xmax] = self.psf.evaluate(pixels[:, ymin:ymax, xmin:xmax], values=profile_params[x, :])
else:
psf_cube[x, ymin:ymax, x] = self.psf.evaluate(pixels[ymin:ymax], values=profile_params[x, :])
return psf_cube
[docs]
def build_psf_cube_masked(self, pixels, profile_params, fwhmx_clip=parameters.PSF_FWHM_CLIP,
fwhmy_clip=parameters.PSF_FWHM_CLIP):
"""Build a boolean cube, with one slice per wavelength evaluation which contains booleans where PSF evaluation is non zero.
Parameters
----------
pixels: np.ndarray
Array of pixels to evaluate ChromaticPSF.
profile_params: array_like
ChromaticPSF profile parameters.
fwhmx_clip: int, optional
Clip PSF evaluation outside fwhmx*FWHM along x axis (default: parameters.PSF_FWHM_CLIP).
fwhmy_clip: int, optional
Clip PSF evaluation outside fwhmy*FWHM along y axis (default: parameters.PSF_FWHM_CLIP).
Returns
-------
psf_cube_masked: np.ndarray
Cube of chromatic masked PSF evaluations, each slice being a PSF for a given wavelength.
Examples
--------
>>> s = ChromaticPSF(Moffat(), Nx=100, Ny=20, deg=2, saturation=20000)
>>> profile_params = s.from_poly_params_to_profile_params(s.generate_test_poly_params(), apply_bounds=True)
>>> profile_params[:, 1] = np.arange(s.Nx)
>>> psf_cube_masked = s.build_psf_cube_masked(s.set_pixels(mode="2D"), profile_params)
>>> psf_cube_masked.shape
(100, 20, 100)
>>> plt.imshow(psf_cube_masked[20], origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
>>> plt.imshow(psf_cube_masked[80], origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
.. doctest::
:hide:
>>> assert psf_cube_masked.shape == (s.Nx, s.Ny, s.Nx)
>>> np.argmax(np.sum(psf_cube_masked[20], axis=0))
10
>>> np.argmax(np.sum(psf_cube_masked[80], axis=0))
70
"""
if pixels.ndim == 3:
mode = "2D"
Ny, Nx = pixels[0].shape
elif pixels.ndim == 1:
mode = "1D"
Ny, Nx = pixels.size, profile_params.shape[0]
else:
raise ValueError(f"pixels argument must have shape (2, Nx, Ny) or (Ny). Got {pixels.shape=}.")
psf_cube_masked = np.zeros((len(profile_params), Ny, Nx), dtype=bool)
fwhms = self.table["fwhm"]
for x in range(len(profile_params)):
xc, yc = profile_params[x, 1:3]
if xc < - fwhmx_clip * fwhms[x]:
continue
if xc > Nx + fwhmx_clip * fwhms[x]:
break
xmin = max(0, int(xc - max(1*parameters.PIXWIDTH_SIGNAL, fwhmx_clip * fwhms[x])))
xmax = min(Nx, int(xc + max(1*parameters.PIXWIDTH_SIGNAL, fwhmx_clip * fwhms[x])))
ymin = max(0, int(yc - max(1*parameters.PIXWIDTH_SIGNAL, fwhmy_clip * fwhms[x])))
ymax = min(Ny, int(yc + max(1*parameters.PIXWIDTH_SIGNAL, fwhmy_clip * fwhms[x])))
if mode == "2D":
psf_cube_masked[x, ymin:ymax, xmin:xmax] = self.psf.evaluate(pixels[:, ymin:ymax, xmin:xmax], values=profile_params[x, :]) > 0
else:
psf_cube_masked[x, ymin:ymax, x] = self.psf.evaluate(pixels[ymin:ymax], values=profile_params[x, :]) > 0
return psf_cube_masked
[docs]
@staticmethod
def convolve_psf_cube_masked(psf_cube_masked):
"""Convolve the ChromaticPSF cube of boolean values to enlarge a bit the mask.
Parameters
----------
psf_cube_masked: np.ndarray
A ChromaticPSF cube.
Returns
-------
psf_cube_masked: np.ndarray
Cube of boolean values where `psf_cube` cube is positive, eventually convolved.
Examples
--------
>>> s = ChromaticPSF(Moffat(), Nx=100, Ny=20, deg=2, saturation=20000)
>>> profile_params = s.from_poly_params_to_profile_params(s.generate_test_poly_params(), apply_bounds=True)
>>> profile_params[:, 1] = np.arange(s.Nx)
>>> psf_cube_masked = s.build_psf_cube_masked(s.set_pixels(mode="2D"), profile_params)
>>> psf_cube_masked = s.convolve_psf_cube_masked(psf_cube_masked)
>>> psf_cube_masked.dtype
dtype('bool')
>>> psf_cube_masked.shape
(100, 20, 100)
>>> plt.imshow(psf_cube_masked[20], origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
>>> plt.imshow(psf_cube_masked[80], origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
.. doctest::
:hide:
>>> assert psf_cube_masked.shape == (s.Nx, s.Ny, s.Nx)
>>> assert np.sum(psf_cube_masked[20], axis=0)[20]
>>> assert np.sum(psf_cube_masked[80], axis=0)[80]
"""
wl_size = psf_cube_masked.shape[0]
flat_spectrogram = np.sum(psf_cube_masked.reshape(wl_size, psf_cube_masked[0].size), axis=0)
mask = flat_spectrogram == 0 # < 1e-2 * np.max(flat_spectrogram)
mask = mask.reshape(psf_cube_masked[0].shape)
kernel = np.ones((3, psf_cube_masked.shape[2]//10)) # enlarge a bit more the edges of the mask
mask = convolve2d(mask, kernel, 'same').astype(bool)
for k in range(wl_size):
psf_cube_masked[k] *= ~mask
return psf_cube_masked
[docs]
@staticmethod
def set_rectangular_boundaries(psf_cube_masked):
"""Compute the ChromaticPSF computation boundaries, as a dictionnary of integers giving
the `"xmin"`, `"xmax"`, `"ymin"` and `"ymax"` edges where to compute the PSF for each wavelength.
True regions are rectangular after this operation. The `psf_cube_masked` cube is updated accordingly and returned.
Parameters
----------
psf_cube_masked: np.ndarray
Cube of boolean values where `psf_cube` cube is positive, eventually convolved.
Returns
-------
boundaries: dict
The dictionnary of PSF edges per wavelength.
psf_cube_masked: np.ndarray
Updated cube of boolean values where `psf_cube` cube is positive, eventually convolved.
Examples
--------
>>> s = ChromaticPSF(Moffat(), Nx=100, Ny=20, deg=2, saturation=20000)
>>> profile_params = s.from_poly_params_to_profile_params(s.generate_test_poly_params(), apply_bounds=True)
>>> profile_params[:, 1] = np.arange(s.Nx)
>>> psf_cube_masked = s.build_psf_cube_masked(s.set_pixels(mode="2D"), profile_params)
>>> psf_cube_masked = s.convolve_psf_cube_masked(psf_cube_masked)
>>> boundaries, psf_cube_masked = s.set_rectangular_boundaries(psf_cube_masked)
>>> boundaries["xmin"].shape
(100,)
>>> psf_cube_masked.shape
(100, 20, 100)
>>> plt.imshow(psf_cube_masked[20], origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
>>> plt.imshow(psf_cube_masked[80], origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
.. doctest::
:hide:
>>> assert psf_cube_masked.shape == (s.Nx, s.Ny, s.Nx)
>>> assert np.sum(psf_cube_masked[20], axis=0)[20]
>>> assert np.sum(psf_cube_masked[80], axis=0)[80]
>>> assert np.all(boundaries["xmin"] < boundaries["xmax"])
>>> assert np.all(boundaries["ymin"] < boundaries["ymax"])
"""
wl_size = psf_cube_masked.shape[0]
boundaries = {"xmin": np.zeros(wl_size, dtype=int), "xmax": np.zeros(wl_size, dtype=int),
"ymin": np.zeros(wl_size, dtype=int), "ymax": np.zeros(wl_size, dtype=int)}
for k in range(wl_size):
maskx = np.any(psf_cube_masked[k], axis=0)
masky = np.any(psf_cube_masked[k], axis=1)
if np.sum(maskx) > 0 and np.sum(masky) > 0:
xmin, xmax = int(np.argmax(maskx)), int(len(maskx) - np.argmax(maskx[::-1]))
ymin, ymax = int(np.argmax(masky)), int(len(masky) - np.argmax(masky[::-1]))
else:
xmin, xmax = -1, -1
ymin, ymax = -1, -1
boundaries["xmin"][k] = xmin
boundaries["xmax"][k] = xmax
boundaries["ymin"][k] = ymin
boundaries["ymax"][k] = ymax
psf_cube_masked[k, ymin:ymax, xmin:xmax] = True
return boundaries, psf_cube_masked
[docs]
def get_sparse_indices(self, boundaries):
"""Methods that returns the indices to build sparse matrices from rectangular `boundaries`.
Parameters
----------
boundaries: dict
The dictionnary of PSF edges per wavelength.
Returns
-------
psf_cube_sparse_indices: list
List of sparse indices per wavelength.
M_sparse_indices: np.ndarray
Sparse indices for the integrated matrix model :math:`mathbf{M}`.
Examples
--------
>>> s = ChromaticPSF(Moffat(), Nx=100, Ny=20, deg=2, saturation=20000)
>>> profile_params = s.from_poly_params_to_profile_params(s.generate_test_poly_params(), apply_bounds=True)
>>> profile_params[:, 1] = np.arange(s.Nx)
>>> psf_cube_masked = s.build_psf_cube_masked(s.set_pixels(mode="2D"), profile_params)
>>> psf_cube_masked = s.convolve_psf_cube_masked(psf_cube_masked)
>>> boundaries, psf_cube_masked = s.set_rectangular_boundaries(psf_cube_masked)
>>> psf_cube_sparse_indices, M_sparse_indices = s.get_sparse_indices(boundaries)
>>> assert M_sparse_indices.shape == np.sum(psf_cube_masked)
>>> assert len(psf_cube_sparse_indices) == s.Nx
"""
wl_size = self.Nx # assuming that the number of cube layers is the number of pixel columns
psf_cube_sparse_indices = []
for k in range(wl_size):
xmin, xmax = boundaries["xmin"][k], boundaries["xmax"][k]
if xmin == -1:
psf_cube_sparse_indices.append([])
else:
ymin, ymax = boundaries["ymin"][k], boundaries["ymax"][k]
psf_cube_sparse_indices.append(np.concatenate([np.arange(xmin,xmax) + k * wl_size for k in range(ymin, ymax)]))
M_sparse_indices = np.concatenate(psf_cube_sparse_indices)
return psf_cube_sparse_indices, M_sparse_indices
[docs]
def build_sparse_M(self, pixels, profile_params, M_sparse_indices, boundaries, dtype="float32"):
r"""
Compute the sparse model matrix :math:`\mathbf{M}`.
Given a vector of amplitudes :math:`\mathbf{A}`, spectrogram model is:
.. math::
\mathbf{I} = \mathbf{M} \cdot \mathbf{A}.
Parameters
----------
pixels: np.ndarray
Array of pixels to evaluate ChromaticPSF.
profile_params: array_like
ChromaticPSF profile parameters.
M_sparse_indices: array_like
Array of indices where each element gives the sparse indices for a slice of the ChromaticPSF cube.
boundaries: dict
Dictionary of boundaries for fast evaluation with keys ymin, ymax, xmin, xmax .
dtype: str, optional
Type of the output array (default: 'float32').
Returns
-------
M: np.ndarray
The model matrix :math:`\mathbf{M}`.
Examples
--------
>>> s = ChromaticPSF(Moffat(), Nx=100, Ny=20, deg=2, saturation=20000)
>>> profile_params = s.from_poly_params_to_profile_params(s.generate_test_poly_params(), apply_bounds=True)
>>> profile_params[:, 0] = np.ones(s.Nx) # normalized PSF
>>> profile_params[:, 1] = np.arange(s.Nx) # PSF x_c positions
2D case
>>> psf_cube_masked = s.build_psf_cube_masked(s.set_pixels(mode="2D"), profile_params)
>>> psf_cube_masked = s.convolve_psf_cube_masked(psf_cube_masked)
>>> boundaries, psf_cube_masked = s.set_rectangular_boundaries(psf_cube_masked)
>>> psf_cube_sparse_indices, M_sparse_indices = s.get_sparse_indices(boundaries)
>>> M = s.build_sparse_M(s.set_pixels(mode="2D"), profile_params, M_sparse_indices, boundaries, dtype="float32")
>>> M.shape
(2000, 100)
>>> M.dtype
dtype('float32')
>>> plt.imshow((M @ np.ones(s.Nx)).reshape((s.Ny, s.Nx)), origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
.. doctest::
:hide:
>>> assert M.shape == (s.Ny * s.Nx, s.Nx)
1D case
>>> psf_cube_masked = s.build_psf_cube_masked(s.set_pixels(mode="1D"), profile_params)
>>> psf_cube_masked = s.convolve_psf_cube_masked(psf_cube_masked)
>>> boundaries, psf_cube_masked = s.set_rectangular_boundaries(psf_cube_masked)
>>> psf_cube_sparse_indices, M_sparse_indices = s.get_sparse_indices(boundaries)
>>> M = s.build_sparse_M(s.set_pixels(mode="1D"), profile_params, M_sparse_indices, boundaries, dtype="float32")
>>> M.shape
(2000, 100)
>>> M.dtype
dtype('float32')
>>> plt.imshow((M @ np.ones(s.Nx)).reshape((s.Ny, s.Nx)), origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
.. doctest::
:hide:
>>> assert M.shape == (s.Ny * s.Nx, s.Nx)
"""
if pixels.ndim == 3:
mode = "2D"
Ny, Nx = pixels[0].shape
elif pixels.ndim == 1:
mode = "1D"
Ny, Nx = pixels.size, profile_params.shape[0]
else:
raise ValueError(f"pixels argument must have shape (2, Nx, Ny) or (Ny). Got {pixels.shape=}.")
if Nx != profile_params.shape[0]:
raise ValueError(f"Number of pixels along x axis must be same as profile_params table length. "
f"Got {Nx=} and {profile_params.shape}.")
sparse_psf_cube = np.zeros(M_sparse_indices.size, dtype=dtype)
indptr = np.zeros(len(profile_params)+1, dtype=int)
for x in range(len(profile_params)):
if mode == "2D":
indptr[x + 1] = (boundaries["xmax"][x] - boundaries["xmin"][x]) * (boundaries["ymax"][x] - boundaries["ymin"][x]) + indptr[x]
if boundaries["xmin"][x] < 0:
continue
sparse_psf_cube[indptr[x]:indptr[x+1]] = self.psf.evaluate(pixels[:, boundaries["ymin"][x]:boundaries["ymax"][x],
boundaries["xmin"][x]:boundaries["xmax"][x]],
values=profile_params[x, :]).ravel()
else:
indptr[x + 1] = boundaries["ymax"][x] - boundaries["ymin"][x] + indptr[x]
sparse_psf_cube[indptr[x]:indptr[x+1]] = self.psf.evaluate(pixels[boundaries["ymin"][x]:boundaries["ymax"][x]],
values=profile_params[x, :])
return sparse.csr_matrix((sparse_psf_cube, M_sparse_indices, indptr), shape=(len(profile_params), Ny*Nx), dtype=dtype).T
[docs]
def build_psf_jacobian(self, pixels, profile_params, psf_cube_sparse_indices, boundaries, dtype="float32"):
r"""
Compute the Jacobian matrix :math:`\mathbf{J}` of a ChromaticPSF model, with analytical derivatives.
Amplitude parameters :math:`\mathbf{A}` are excluded, only PSF shape and position parameters :math:`\theta` are included.
.. math::
\mathbf{J} = \frac{\partial \mathbf{M}}{\partial \theta} \cdot \mathbf{A}.
Parameters
----------
pixels: np.ndarray
Array of pixels to evaluate ChromaticPSF.
profile_params: array_like
ChromaticPSF profile parameters.
psf_cube_sparse_indices: array_like
Array of indices where each element gives the sparse indices for a slice of the ChromaticPSF cube.
boundaries: dict
Dictionary of boundaries for fast evaluation with keys ymin, ymax, xmin, xmax .
dtype: str, optional
Type of the output array (default: 'float32').
Returns
-------
J: np.ndarray
The Jacobian matrix math:`\mathbf{J}`.
Examples
--------
>>> s = ChromaticPSF(Moffat(), Nx=100, Ny=20, deg=2, saturation=20000)
>>> profile_params = s.from_poly_params_to_profile_params(s.generate_test_poly_params(), apply_bounds=True)
>>> profile_params[:, 0] = np.ones(s.Nx) # normalized PSF
>>> profile_params[:, 1] = np.arange(s.Nx) # PSF x_c positions
>>> psf_cube_masked = s.build_psf_cube_masked(s.set_pixels(mode="2D"), profile_params)
>>> psf_cube_masked = s.convolve_psf_cube_masked(psf_cube_masked)
>>> boundaries, psf_cube_masked = s.set_rectangular_boundaries(psf_cube_masked)
>>> psf_cube_sparse_indices, M_sparse_indices = s.get_sparse_indices(boundaries)
>>> s.params.fixed[s.Nx:s.Nx+s.deg+1] = [True] * (s.deg+1) # fix all x_c parameters
>>> J = s.build_psf_jacobian(s.set_pixels(mode="2D"), profile_params, psf_cube_sparse_indices, boundaries, dtype="float32")
>>> J.shape
(13, 2000)
>>> J.dtype
dtype('float32')
>>> plt.imshow(J[s.params.get_index("y_c_0")-s.Nx].reshape((s.Ny, s.Nx)), origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
.. doctest::
:hide:
>>> assert J.shape == (len(s.params) - s.Nx, s.Ny * s.Nx)
"""
if pixels.ndim == 3:
mode = "2D"
Ny, Nx = pixels[0].shape
J = np.zeros((self.n_poly_params - self.Nx, Ny * Nx), dtype=dtype)
elif pixels.ndim == 1:
mode = "1D"
Ny, Nx = pixels.size, profile_params.shape[0]
J = np.zeros((self.n_poly_params - self.Nx, Ny * Nx), dtype=dtype)
else:
raise ValueError(f"pixels argument must have shape (2, Nx, Ny) or (Ny). Got {pixels.shape=}.")
if Nx != profile_params.shape[0]:
raise ValueError(f"Number of pixels along x axis must be same as profile_params table length. "
f"Got {Nx=} and {profile_params.shape}.")
poly_x = np.linspace(-1, 1, Nx)
polys = np.zeros((self.n_poly_params-Nx, Nx), dtype=dtype)
ip = 0
repeats = []
for ipsf, label in enumerate(self.psf.params.labels):
if label == "amplitude": continue
nparams = self.degrees[label]+1
repeats.append(nparams)
for k in range(nparams):
coeffs = np.eye(1, nparams, k)[0]
if parameters.PSF_POLY_TYPE == "legendre":
polys[ip] = np.polynomial.legendre.legval(poly_x, coeffs).astype(dtype)
elif parameters.PSF_POLY_TYPE == "polynomial":
polys[ip] = np.polynomial.polynomial.polyval(poly_x, coeffs).astype(dtype)
ip += 1
for ip, label in enumerate(self.psf.params.labels):
if "amplitude" in label: # skip computation of ChromaticPSF jacobian for amplitude parameters
self.psf.params.fixed[ip] = True
else: # check if all ChromaticPSF parameters related to PSF parameter ip are fixed
indices = np.array([k for k in range(len(self.params)) if label in self.params.labels[k]])
self.psf.params.fixed[ip] = np.all(np.array(self.params.fixed)[indices])
for x in range(Nx):
if mode == "2D":
Jpsf = self.psf.jacobian(pixels[:, boundaries["ymin"][x]:boundaries["ymax"][x],
boundaries["xmin"][x]:boundaries["xmax"][x]],
profile_params[x, :], analytical=True)
else:
Jpsf = self.psf.jacobian(pixels[boundaries["ymin"][x]:boundaries["ymax"][x]], profile_params[x, :], analytical=True)
J[:, psf_cube_sparse_indices[x]] += np.repeat(Jpsf[1:], repeats, axis=0) * polys[:, x, None] # Jpsf[1:] excludes amplitude
return J
[docs]
def build_sparse_dM(self, pixels, profile_params, M_sparse_indices, boundaries, dtype="float32"):
r"""
Compute the partial derivatives of the model matrix :math:`\mathbf{M}`, with analytical derivatives.
Amplitude parameters :math:`\mathbf{A}` are excluded, only PSF shape and position parameters :math:`\theta` are included.
Parameters
----------
pixels: np.ndarray
Array of pixels to evaluate ChromaticPSF.
profile_params: array_like
ChromaticPSF profile parameters.
M_sparse_indices: array_like
Sparse indices of the model matrix :math:`\mathbf{M}`.
boundaries: dict
Dictionary of boundaries for fast evaluation with keys ymin, ymax, xmin, xmax .
dtype: str, optional
Type of the output array (default: 'float32').
Returns
-------
dM: list
List of sparse matrices :math:`\partial \mathbf{M}/\partial \theta`
Examples
--------
>>> s = ChromaticPSF(Moffat(), Nx=100, Ny=20, deg=2, saturation=20000)
>>> profile_params = s.from_poly_params_to_profile_params(s.generate_test_poly_params(), apply_bounds=True)
>>> profile_params[:, 0] = np.ones(s.Nx) # normalized PSF
>>> profile_params[:, 1] = np.arange(s.Nx) # PSF x_c positions
>>> psf_cube_masked = s.build_psf_cube_masked(s.set_pixels(mode="2D"), profile_params)
>>> psf_cube_masked = s.convolve_psf_cube_masked(psf_cube_masked)
>>> boundaries, psf_cube_masked = s.set_rectangular_boundaries(psf_cube_masked)
>>> psf_cube_sparse_indices, M_sparse_indices = s.get_sparse_indices(boundaries)
>>> s.params.fixed[s.Nx:s.Nx+s.deg+1] = [True] * (s.deg+1) # fix all x_c parameters
>>> dM = s.build_sparse_dM(s.set_pixels(mode="2D"), profile_params, M_sparse_indices, boundaries, dtype="float32")
>>> len(dM), dM[0].shape
(13, (2000, 100))
>>> dM[0].dtype
dtype('float32')
>>> plt.imshow((dM[s.params.get_index("y_c_0")-s.Nx] @ np.ones(s.Nx)).reshape((s.Ny, s.Nx)), origin="lower"); plt.show() # doctest: +ELLIPSIS
<matplotlib.image.AxesImage object at ...>
.. doctest::
:hide:
>>> assert len(dM) == len(s.params) - s.Nx
>>> assert dM[0].shape == (s.Ny * s.Nx, s.Nx)
>>> J = s.build_psf_jacobian(s.set_pixels(mode="2D"), profile_params, psf_cube_sparse_indices, boundaries, dtype="float32")
>>> assert np.allclose(dM[s.params.get_index("y_c_0")-s.Nx] @ np.ones(s.Nx), J[s.params.get_index("y_c_0")-s.Nx])
"""
if pixels.ndim == 3:
mode = "2D"
Ny, Nx = pixels[0].shape
elif pixels.ndim == 1:
mode = "1D"
Ny, Nx = pixels.size, profile_params.shape[0]
else:
raise ValueError(f"pixels argument must have shape (2, Nx, Ny) or (Ny). Got {pixels.shape=}.")
if Nx != profile_params.shape[0]:
raise ValueError(f"Number of pixels along x axis must be same as profile_params table length. "
f"Got {Nx=} and {profile_params.shape}.")
poly_x = np.linspace(-1, 1, Nx)
polys = np.zeros((self.n_poly_params-Nx, Nx), dtype=dtype)
ip = 0
repeats = []
for ipsf, label in enumerate(self.psf.params.labels):
if label == "amplitude": continue
nparams = self.degrees[label]+1
repeats.append(nparams)
for k in range(nparams):
# psf_index.append(ipsf)
coeffs = np.eye(1, nparams, k)[0]
if parameters.PSF_POLY_TYPE == "legendre":
polys[ip] = np.polynomial.legendre.legval(poly_x, coeffs).astype(dtype)
elif parameters.PSF_POLY_TYPE == "polynomial":
polys[ip] = np.polynomial.polynomial.polyval(poly_x, coeffs).astype(dtype)
ip += 1
sparse_J = np.zeros((self.n_poly_params - self.Nx, M_sparse_indices.size), dtype=dtype)
indptr = np.zeros(Nx+1, dtype=int)
for x in range(Nx):
if mode == "2D":
Jpsf = self.psf.jacobian(pixels[:, boundaries["ymin"][x]:boundaries["ymax"][x],
boundaries["xmin"][x]:boundaries["xmax"][x]],
profile_params[x, :], analytical=True)
else:
Jpsf = self.psf.jacobian(pixels[boundaries["ymin"][x]:boundaries["ymax"][x]], profile_params[x, :], analytical=True)
indptr[x+1] = (boundaries["xmax"][x]-boundaries["xmin"][x])*(boundaries["ymax"][x]-boundaries["ymin"][x]) + indptr[x]
if boundaries["xmin"][x] < 0:
continue
sparse_J[:, indptr[x]:indptr[x + 1]] += np.repeat(Jpsf[1:], repeats, axis=0) * polys[:, x, None]
dM = [sparse.csr_matrix((sparse_J[ip], M_sparse_indices, indptr), shape=(len(profile_params), pixels[0].size), dtype=dtype).T for ip in range(sparse_J.shape[0])]
return dM
[docs]
def fill_table_with_profile_params(self, profile_params):
"""
Fill the table with the profile parameters.
Parameters
----------
profile_params: np.ndarray
a Nx * len(self.psf.param_names) numpy array containing the PSF parameters as a function of pixels.
Examples
--------
>>> psf = MoffatGauss()
>>> s = ChromaticPSF(psf, Nx=100, Ny=100, deg=4, saturation=8000)
>>> poly_params_test = s.generate_test_poly_params()
>>> profile_params = s.from_poly_params_to_profile_params(poly_params_test)
>>> s.fill_table_with_profile_params(profile_params)
.. doctest::
:hide:
>>> assert(np.all(np.isclose(s.table['stddev'], 1*np.ones(100))))
"""
for k, name in enumerate(self.psf.params.labels):
self.table[name] = profile_params[:, k]
[docs]
def rotate_table(self, angle_degree):
"""
In self.table, rotate the columns Dx, Dy, Dy_fwhm_inf and Dy_fwhm_sup by an angle
given in degree. The results overwrite the previous columns in self.table.
Parameters
----------
angle_degree: float
Rotation angle in degree
Examples
--------
>>> psf = MoffatGauss()
>>> s = ChromaticPSF(psf, Nx=100, Ny=100, deg=4, saturation=8000)
>>> s.table['Dx'] = np.arange(100)
>>> s.rotate_table(45)
.. doctest::
:hide:
>>> assert(np.all(np.isclose(s.table['Dy'], -np.arange(100)/np.sqrt(2))))
>>> assert(np.all(np.isclose(s.table['Dx'], np.arange(100)/np.sqrt(2))))
>>> assert(np.all(np.isclose(s.table['Dy_fwhm_inf'], -np.arange(100)/np.sqrt(2))))
>>> assert(np.all(np.isclose(s.table['Dy_fwhm_sup'], -np.arange(100)/np.sqrt(2))))
"""
angle = angle_degree * np.pi / 180.
rotmat = np.array([[np.cos(angle), np.sin(angle)], [-np.sin(angle), np.cos(angle)]])
# finish with Dy_c to get correct Dx
for name in ['Dy', 'Dy_fwhm_inf', 'Dy_fwhm_sup', 'Dy_disp_axis']:
vec = list(np.array([self.table['Dx'], self.table[name]]).T)
rot_vec = np.array([np.dot(rotmat, v) for v in vec])
self.table[name] = rot_vec.T[1]
self.table['Dx'] = rot_vec.T[0]
[docs]
def from_profile_params_to_poly_params(self, profile_params, indices=None):
"""
Transform the profile_params array into a set of parameters for the chromatic PSF parameterisation.
Fit polynomial functions across the pixels for each PSF parameters.
Type of the polynomial function is set by parameters.PSF_POLY_TYPE.
The order of the polynomial functions is given by the self.degrees array.
Parameters
----------
profile_params: array
a Nx * len(self.psf.param_names) numpy array containing the PSF parameters as a function of pixels.
indices: array_like, optional
Array of integer indices or boolean values that selects values in profile_params for the polynomial fits.
If None every profile_params rows are used (default: None)
Returns
-------
profile_params: array_like
A set of parameters that can be evaluated by the chromatic PSF class evaluate function.
Examples
--------
Build a mock spectrogram with random Poisson noise:
>>> psf = MoffatGauss()
>>> s = ChromaticPSF(psf, Nx=100, Ny=100, deg=4, saturation=8000)
>>> poly_params_test = s.generate_test_poly_params()
>>> data = s.evaluate(s.set_pixels(mode="1D"), poly_params_test)
>>> data = np.random.poisson(data)
>>> data_errors = np.sqrt(data+1)
From the polynomial parameters to the profile parameters:
>>> profile_params = s.from_poly_params_to_profile_params(poly_params_test)
.. doctest::
:hide:
>>> assert(np.all(np.isclose(profile_params[0], [10, 0, 50, 5, 2, -0.4, 1, 8e3])))
From the profile parameters to the polynomial parameters:
>>> profile_params = s.from_profile_params_to_poly_params(profile_params)
.. doctest::
:hide:
>>> assert(np.all(np.isclose(profile_params, poly_params_test)))
"""
if indices is None:
indices = np.full(len(self.table), True)
poly_params = np.array([])
amplitude = None
for k, name in enumerate(self.psf.params.labels):
if name == 'amplitude':
amplitude = profile_params[:, k]
poly_params = np.concatenate([poly_params, amplitude])
if amplitude is None:
self.my_logger.warning('\n\tAmplitude array not initialized. '
'Polynomial fit for shape parameters will be unweighted.')
poly_x = np.linspace(-1, 1, len(self.table))[indices]
for k, name in enumerate(self.psf.params.labels):
delta = 0
if name != 'amplitude':
weights = np.copy(amplitude)[indices]
if name == 'x_c':
delta = self.x0
if name == 'y_c':
delta = self.y0
if parameters.PSF_POLY_TYPE == "legendre":
fit = np.polynomial.legendre.legfit(poly_x, profile_params[indices, k] - delta,
deg=self.degrees[name], w=weights)
elif parameters.PSF_POLY_TYPE == "polynomial":
fit = np.polynomial.polynomial.polyfit(poly_x, profile_params[indices, k] - delta,
deg=self.degrees[name], w=weights)
poly_params = np.concatenate([poly_params, fit])
return poly_params
[docs]
def from_table_to_profile_params(self): # pragma: nocover
"""
Extract the profile parameters from self.table and fill an array of profile parameters.
Parameters
----------
Returns
-------
profile_params: array
Nx * len(self.psf.param_names) numpy array containing the PSF parameters as a function of pixels.
Examples
--------
>>> from spectractor.extractor.spectrum import Spectrum
>>> s = Spectrum('./tests/data/reduc_20170530_134_spectrum.fits')
>>> profile_params = s.chromatic_psf.from_table_to_profile_params()
.. doctest::
:hide:
>>> assert(profile_params.shape == (s.chromatic_psf.Nx, len(s.chromatic_psf.psf.params.labels)))
>>> assert not np.all(np.isclose(profile_params, np.zeros_like(profile_params)))
"""
profile_params = np.zeros((len(self.table), len(self.psf.params.labels)))
for k, name in enumerate(self.psf.params.labels):
profile_params[:, k] = self.table[name]
return profile_params
[docs]
def from_table_to_poly_params(self): # pragma: nocover
"""
Extract the polynomial parameters from self.table and fill an array with polynomial parameters.
Parameters
----------
Returns
-------
poly_params: array_like
A set of polynomial parameters that can be evaluated by the chromatic PSF class evaluate function.
Examples
--------
>>> from spectractor.extractor.spectrum import Spectrum
>>> s = Spectrum('./tests/data/reduc_20170530_134_spectrum.fits')
>>> poly_params = s.chromatic_psf.from_table_to_poly_params()
.. doctest::
:hide:
>>> assert(poly_params.size > s.chromatic_psf.Nx)
>>> assert(len(poly_params.shape)==1)
>>> assert not np.all(np.isclose(poly_params, np.zeros_like(poly_params)))
"""
profile_params = self.from_table_to_profile_params()
poly_params = self.from_profile_params_to_poly_params(profile_params)
return poly_params
[docs]
def from_poly_params_to_profile_params(self, poly_params, apply_bounds=False):
"""
Evaluate the PSF profile parameters from the polynomial coefficients. If poly_params length is smaller
than self.Nx, it is assumed that the amplitude parameters are not included and set to arbitrarily to 1.
Parameters
----------
poly_params: array_like
Parameter array of the model, in the form:
- Nx first parameters are amplitudes for the Moffat transverse profiles
- next parameters are polynomial coefficients for all the PSF parameters in the same order
as in PSF definition, except amplitude
apply_bounds: bool, optional
Force profile parameters to respect their boundary conditions if they lie outside (default: False)
Returns
-------
profile_params: array
Nx * len(self.psf.param_names) numpy array containing the PSF parameters as a function of pixels.
Examples
--------
Build a mock spectrogram with random Poisson noise:
>>> psf = MoffatGauss()
>>> s = ChromaticPSF(psf, Nx=100, Ny=100, deg=1, saturation=8000)
>>> poly_params_test = s.generate_test_poly_params()
>>> data = s.evaluate(s.set_pixels(mode="1D"), poly_params_test)
>>> data = np.random.poisson(data)
>>> data_errors = np.sqrt(data+1)
From the polynomial parameters to the profile parameters:
>>> profile_params = s.from_poly_params_to_profile_params(poly_params_test, apply_bounds=True)
.. doctest::
:hide:
>>> assert np.allclose(profile_params[0], [10, 0, 50, 5, 2, -0.4, 1, 8e3], rtol=1e-3, atol=1e-3)
From the profile parameters to the polynomial parameters:
>>> profile_params = s.from_profile_params_to_poly_params(profile_params)
.. doctest::
:hide:
>>> assert np.allclose(profile_params, poly_params_test)
From the polynomial parameters to the profile parameters without Moffat amplitudes:
>>> profile_params = s.from_poly_params_to_profile_params(poly_params_test[100:])
.. doctest::
:hide:
>>> assert np.allclose(profile_params[0], [1, 0, 50, 5, 2, -0.4, 1, 8e3])
"""
length = len(self.table)
poly_x = np.linspace(-1, 1, length)
profile_params = np.zeros((length, len(self.psf.params.labels)))
shift = 0
for k, name in enumerate(self.psf.params.labels):
if name == 'amplitude':
if len(poly_params) > length:
profile_params[:, k] = poly_params[:length]
else:
profile_params[:, k] = np.ones(length)
else:
if len(poly_params) > length:
if parameters.PSF_POLY_TYPE == "legendre":
profile_params[:, k] = np.polynomial.legendre.legval(poly_x, poly_params[length + shift:length + shift + self.degrees[name] + 1])
elif parameters.PSF_POLY_TYPE == "polynomial":
profile_params[:, k] = np.polynomial.polynomial.polyval(poly_x, poly_params[length + shift:length + shift + self.degrees[name] + 1])
else:
p = poly_params[shift:shift + self.degrees[name] + 1]
if len(p) > 0: # to avoid saturation parameters in case not set
if parameters.PSF_POLY_TYPE == "legendre":
profile_params[:, k] = np.polynomial.legendre.legval(poly_x, p)
elif parameters.PSF_POLY_TYPE == "polynomial":
profile_params[:, k] = np.polynomial.polynomial.polyval(poly_x, p)
shift += self.degrees[name] + 1
if name == 'x_c':
profile_params[:, k] += self.x0
if name == 'y_c':
profile_params[:, k] += self.y0
if apply_bounds:
for k, name in enumerate(self.psf.params.labels):
profile_params[profile_params[:, k] <= self.psf.params.bounds[k][0], k] = self.psf.params.bounds[k][0]
profile_params[profile_params[:, k] > self.psf.params.bounds[k][1], k] = self.psf.params.bounds[k][1]
return profile_params
[docs]
def from_profile_params_to_shape_params(self, profile_params):
"""
Compute the PSF integrals and FWHMS given the profile_params array and fill the table.
Parameters
----------
profile_params: array
a Nx * len(self.psf.param_names) numpy array containing the PSF parameters as a function of pixels.
Examples
--------
>>> psf = MoffatGauss()
>>> s = ChromaticPSF(psf, Nx=100, Ny=100, deg=4, saturation=8000)
>>> poly_params_test = s.generate_test_poly_params()
>>> profile_params = s.from_poly_params_to_profile_params(poly_params_test)
>>> s.from_profile_params_to_shape_params(profile_params)
.. doctest::
:hide:
>>> assert s.table['fwhm'][-1] > 0
"""
self.fill_table_with_profile_params(profile_params)
pixel_x = np.arange(0, self.Nx, parameters.PSF_PIXEL_STEP_TRANSVERSE_FIT, dtype=int)
fwhms = np.zeros_like(pixel_x, dtype=float)
# oversampling for precise computation of the PSF
# pixels.shape = (2, Ny, Nx): self.pixels[1<-y, :, 0<-first pixel value column]
# TODO: account for rotation ad projection effects is PSF is not round
pixel_eval = np.arange(self.pixels[1, 0, 0], self.pixels[1, -1, 0], 0.5, dtype=np.float32)
for ix, x in enumerate(pixel_x):
p = profile_params[x, :]
# compute FWHM transverse to dispersion axis (assuming revolution symmetry of the PSF)
out = self.psf.evaluate(pixel_eval, values=p)
fwhms[ix] = compute_fwhm(pixel_eval, out, center=p[2], minimum=0, epsilon=1e-2)
# clean fwhm bad points
mask = np.logical_and(fwhms > 1, fwhms < self.Ny // 2) # more than 1 pixel or less than window
self.table['fwhm'] = interp1d(pixel_x[mask], fwhms[mask], kind="linear",
bounds_error=False, fill_value="extrapolate")(np.arange(self.Nx))
self.table['flux_integral'] = profile_params[:, 0] # if MoffatGauss1D normalized
[docs]
def set_bounds(self):
"""
This function returns an array of bounds for PSF polynomial parameters (no amplitude ones).
Returns
-------
bounds: list
2D array containing the pair of bounds for each polynomial parameters.
Examples
________
>>> psf = MoffatGauss()
>>> s = ChromaticPSF(psf, Nx=100, Ny=100, deg=4, saturation=8000)
>>> s.set_bounds() # doctest: +ELLIPSIS
[array([-inf, inf]), array([-inf, inf]), ...
"""
bounds = [[], []]
for k, name in enumerate(self.psf.params.labels):
if parameters.PSF_POLY_TYPE == "legendre":
tmp_bounds = [[-np.inf] * (1 + self.degrees[name]), [np.inf] * (1 + self.degrees[name])]
elif parameters.PSF_POLY_TYPE == "polynomial":
tmp_bounds = [[self.psf.params.bounds[k][0]] + [-np.inf] * (self.degrees[name]),
[self.psf.params.bounds[k][1]] + [np.inf] * (self.degrees[name])]
# tmp_bounds = [[self.psf.params.bounds[k][0]] + [-0.5 * 2 * (self.psf.params.bounds[k][1] - self.psf.params.bounds[k][0]) for deg in range(1, self.degrees[name] + 1)],
# [self.psf.params.bounds[k][1]] + [0.5 * 2 * (self.psf.params.bounds[k][1] - self.psf.params.bounds[k][0]) for deg in range(1, self.degrees[name] + 1)]]
else:
raise ValueError(f"Unknown polynomial type {parameters.PSF_POLY_TYPE=}.")
if name == "saturation":
tmp_bounds = [[0], [2 * self.saturation]]
elif name == "amplitude":
continue
bounds[0] += tmp_bounds[0]
bounds[1] += tmp_bounds[1]
return list(np.array(bounds).T)
[docs]
def set_bounds_for_minuit(self, data=None): # pragma: nocover
"""
This function returns an array of bounds for iminuit. It is very touchy, change the values with caution !
Parameters
----------
data: array_like, optional
The data array, to set the bounds for the amplitude using its maximum.
If None is provided, no bounds are provided for the amplitude parameters.
Returns
-------
bounds: array_like
2D array containing the pair of bounds for each polynomial parameters.
"""
if self.saturation is None:
self.saturation = 2 * np.max(data)
if data is not None:
Ny, Nx = data.shape
bounds = [[0.1 * np.max(data[:, x]) for x in range(Nx)], [100.0 * np.max(data[:, x]) for x in range(Nx)]]
else:
bounds = [[], []]
for k, name in enumerate(self.psf.params.labels):
tmp_bounds = [[-np.inf] * (1 + self.degrees[name]), [np.inf] * (1 + self.degrees[name])]
if name == "saturation":
if data is not None:
tmp_bounds = [[0.1 * np.max(data)], [2 * self.saturation]]
else:
tmp_bounds = [[0], [2 * self.saturation]]
elif name == "amplitude":
continue
bounds[0] += tmp_bounds[0]
bounds[1] += tmp_bounds[1]
return np.array(bounds).T
[docs]
def get_algebraic_distance_along_dispersion_axis(self, shift_x=0, shift_y=0):
return np.asarray(np.sign(self.table['Dx']) *
np.sqrt((self.table['Dx'] - shift_x) ** 2 + (self.table['Dy_disp_axis'] - shift_y) ** 2))
[docs]
def update(self, psf_poly_params, x0, y0, angle, plot=False, apply_bounds=True):
profile_params = self.from_poly_params_to_profile_params(psf_poly_params, apply_bounds=apply_bounds)
self.fill_table_with_profile_params(profile_params)
Dx = np.arange(self.Nx) - x0 # distance in (x,y) spectrogram frame for column x
self.table["Dx"] = Dx
self.table['Dy_disp_axis'] = np.tan(angle * np.pi / 180) * self.table['Dx']
self.table['Dy'] = np.copy(self.table['y_c']) - y0
if plot:
self.plot_summary()
return profile_params
[docs]
def plot_summary(self, truth=None):
fig, ax = plt.subplots(1, 1, sharex='all', figsize=(7, 4))
PSF_models = []
PSF_truth = []
if truth is not None:
truth.psf.apply_max_width_to_bounds(max_half_width=self.Ny)
PSF_truth = truth.from_poly_params_to_profile_params(truth.params.values, apply_bounds=True)
PSF_truth[:, 1] = np.arange(self.Nx) # replace x_c
all_pixels = np.arange(self.profile_params.shape[0])
for i, name in enumerate(self.psf.params.labels):
coeffs = [self.params.values[k] for k in range(self.params.ndim) if name in self.params.labels[k]]
delta = 0
if name == 'x_c':
delta = self.x0
if name == 'y_c':
delta = self.y0
if parameters.PSF_POLY_TYPE == "legendre":
PSF_models.append(np.polynomial.polynomial.legval(rescale_x_to_legendre(all_pixels), coeffs) + delta)
elif parameters.PSF_POLY_TYPE == "polynomial":
PSF_models.append(np.polynomial.polynomial.polyval(rescale_x_to_legendre(all_pixels), coeffs) + delta)
for i, name in enumerate(self.psf.params.labels):
p = ax.plot(all_pixels, self.profile_params[:, i], marker='+', linestyle='none')
ax.plot(all_pixels[self.fitted_pixels], self.profile_params[self.fitted_pixels, i], label=name,
marker='o', linestyle='none', color=p[0].get_color())
if i > 0:
ax.plot(all_pixels, PSF_models[i], color=p[0].get_color())
if truth is not None:
ax.plot(all_pixels, PSF_truth[:, i], color=p[0].get_color(), linestyle='--')
ax.set_xlabel('X pixels')
ax.set_ylabel('PSF parameters')
ax.grid()
ax.set_yscale('symlog', linthresh=10)
ax.legend()
fig.tight_layout()
# fig.subplots_adjust(hspace=0)
if parameters.DISPLAY: # pragma: no cover
plt.show()
[docs]
def fit_transverse_PSF1D_profile(self, data, err, w, ws, pixel_step=1, bgd_model_func=None, saturation=None,
live_fit=False, sigma_clip=5):
"""
Fit the transverse profile of a 2D data image with a PSF profile.
Loop is done on the x-axis direction.
An order 1 polynomial function is fitted to subtract the background for each pixel
with a 3*sigma outlier removal procedure to remove background stars.
Parameters
----------
data: array
The 2D array image. The transverse profile is fitted on the y direction
for all pixels along the x direction.
err: array
The uncertainties related to the data array.
w: int
Half width of central region where the spectrum is extracted and summed (default: 10)
ws: list
up/down region extension where the sky background is estimated with format [int, int] (default: [20,30])
pixel_step: int, optional
The step in pixels between the slices to be fitted (default: 1).
The values for the skipped pixels are interpolated with splines from the fitted parameters.
bgd_model_func: callable, optional
A 2D function to model the extracted background (default: None -> null background)
saturation: float, optional
The saturation level of the image. Default is set to twice the maximum of the data array and has no effect.
live_fit: bool, optional
If True, the transverse profile fit is plotted in live accross the loop (default: False).
sigma_clip: int
Sigma for outlier rejection (default: 5).
Examples
--------
Build a mock spectrogram with random Poisson noise:
>>> psf = MoffatGauss()
>>> s0 = ChromaticPSF(psf, Nx=100, Ny=100, saturation=1000)
>>> s0.params.values = s0.generate_test_poly_params()
>>> saturation = s0.params.values[-1]
>>> data = s0.evaluate(s0.set_pixels(mode="1D"), s0.params.values)
>>> bgd = 10*np.ones_like(data)
>>> xx, yy = np.meshgrid(np.arange(s0.Nx), np.arange(s0.Ny))
>>> bgd += 1000*np.exp(-((xx-20)**2+(yy-10)**2)/(2*2))
>>> data += bgd
>>> data_errors = np.sqrt(data+1)
Extract the background:
>>> bgd_model_func, _, _ = extract_spectrogram_background_sextractor(data, data_errors, ws=[30,50])
Fit the transverse profile:
>>> s = ChromaticPSF(psf, Nx=100, Ny=100, deg=4, saturation=saturation)
>>> s.fit_transverse_PSF1D_profile(data, data_errors, w=20, ws=[30,50], pixel_step=5,
... bgd_model_func=bgd_model_func, saturation=saturation, live_fit=False, sigma_clip=5)
>>> s.plot_summary(truth=s0)
.. doctest::
:hide:
>>> truth_profile_params = s0.from_poly_params_to_profile_params(s0.params.values)
>>> truth_profile_params[:, 1] = np.arange(s0.Nx) # replace x_c
>>> for i in range(1, s.profile_params.shape[1]):
... print(s.psf.params.labels[i], np.isclose(np.mean(s.profile_params[1:-1, i]), np.mean(truth_profile_params[:, i]), rtol=5e-2))
x_c True
y_c True
gamma True
alpha True
eta_gauss True
stddev True
saturation True
>>> assert(not np.any(np.isclose(s.table['flux_sum'][3:6], np.zeros(s.Nx)[3:6], rtol=1e-3)))
"""
if saturation is None:
saturation = 2 * np.max(data)
Ny, Nx = data.shape
middle = Ny // 2
index = np.arange(Ny)
# Prepare the fit: start with the maximum of the spectrum
xmax_index = int(np.unravel_index(np.argmax(data[middle - w:middle + w, :]), data.shape)[1])
bgd_index = np.concatenate((np.arange(0, middle - ws[0]), np.arange(middle + ws[0], Ny))).astype(int)
y = data[:, xmax_index]
# first fit with moffat only to initialize the guess
# hypothesis that max of spectrum if well describe by a focused PSF
if bgd_model_func is not None:
signal = y - bgd_model_func(xmax_index, index)[:, 0]
else:
signal = y
# fwhm = compute_fwhm(index, signal, minimum=0)
# Initialize PSF
psf = self.psf
guess = np.copy(psf.values_default).astype(float)
# guess = [2 * np.nanmax(signal), middle, 0.5 * fwhm, 2, 0, 0.1 * fwhm, saturation]
signal_sum = np.nanmax(signal)
guess[0] = signal_sum
guess[1] = xmax_index
guess[2] = middle
guess[-1] = saturation
# bounds = [(0.1 * maxi, 10 * maxi), (middle - w, middle + w), (0.1, min(fwhm, Ny // 2)), (0.1, self.alpha_max),
# (-1, 0),
# (0.1, min(Ny // 2, fwhm)),
# (0, 2 * saturation)]
psf.apply_max_width_to_bounds(max_half_width=Ny)
initial_bounds = np.copy(psf.params.bounds)
bounds = np.copy(psf.params.bounds)
bounds[0] = (0.1 * signal_sum, 2 * signal_sum)
bounds[2] = (middle - w, middle + w)
bounds[-1] = (0, 2 * saturation)
# moffat_guess = [2 * np.nanmax(signal), middle, 0.5 * fwhm, 2]
# moffat_bounds = [(0.1 * maxi, 10 * maxi), (middle - w, middle + w), (0.1, min(fwhm, Ny // 2)), (0.1, 10)]
# fit = fit_moffat1d_outlier_removal(index, signal, sigma=sigma, niter=2,
# guess=moffat_guess, bounds=np.array(moffat_bounds).T)
# moffat_guess = [getattr(fit, p).value for p in fit.param_names]
# signal_width_guess = moffat_guess[2]
# bounds[2] = (0.1, min(Ny // 2, 5 * signal_width_guess))
# bounds[5] = (0.1, min(Ny // 2, 5 * signal_width_guess))
# guess[:4] = moffat_guess
init_guess = np.copy(guess)
# Go from max to right, then from max to left
# includes the boundaries to avoid Runge phenomenum in chromatic_fit
pixel_range = list(np.arange(xmax_index, Nx, pixel_step).astype(int))
if Nx - 1 not in pixel_range:
pixel_range.append(Nx - 1)
pixel_range += list(np.arange(xmax_index, -1, -pixel_step).astype(int))
if 0 not in pixel_range:
pixel_range.append(0)
pixel_range = np.array(pixel_range)
self.fitted_pixels = np.full(Nx, False)
for x in tqdm(pixel_range, disable=not parameters.VERBOSE):
guess = np.copy(guess)
if x == xmax_index:
guess = np.copy(init_guess)
# fit the background with a polynomial function
y = data[:, x]
if bgd_model_func is not None:
# x_array = [x] * index.size
signal = y - bgd_model_func(x, index)[:, 0]
else:
signal = y
if np.mean(signal[bgd_index]) < 0:
signal -= np.mean(signal[bgd_index])
# in case guess amplitude is too low
# pdf = np.abs(signal)
signal_sum = np.nansum(signal[middle - ws[0]:middle + ws[0]])
if signal_sum < 3 * np.nanstd(signal[bgd_index]):
continue
# if signal_sum > 0:
# pdf /= signal_sum
# mean = np.nansum(pdf * index)
# bounds[0] = (0.1 * np.nanstd(bgd), 2 * np.nanmax(y[middle - ws[0]:middle + ws[0]]))
bounds[0] = (0.1 * signal_sum, 1.5 * signal_sum)
guess[0] = signal_sum
guess[1] = x
# if guess[4] > -1:
# guess[0] = np.max(signal) / (1 + guess[4])
# std = np.sqrt(np.nansum(pdf * (index - mean) ** 2))
# if guess[0] < 3 * np.nanstd(bgd):
# guess[0] = float(0.9 * np.abs(np.nanmax(signal)))
# if guess[0] * (1 + 0*guess[4]) > 1.2 * maxi:
# guess[0] = 0.9 * maxi
psf.params.values = guess
psf.params.bounds = bounds
w = PSFFitWorkspace(psf, signal, data_errors=err[:, x], bgd_model_func=None,
live_fit=False, verbose=False, jacobian_analytical=True)
try:
run_minimisation_sigma_clipping(w, method="newton", sigma_clip=sigma_clip, niter_clip=1, verbose=False)
except:
pass
best_fit = w.params.values
# It is better not to propagate the guess to further pixel columns
# otherwise fit_chromatic_psf1D is more likely to get trapped in a local minimum
# Randomness of the slice fit is better :
# guess = best_fit
self.profile_params[x, :] = best_fit
# TODO: propagate amplitude uncertainties from Newton fit
self.table['flux_err'][x] = np.sqrt(np.sum(err[:, x] ** 2))
self.table['flux_sum'][x] = np.sum(signal)
if live_fit and parameters.DISPLAY: # pragma: no cover
if not np.any(np.isnan(best_fit[0])):
w.live_fit = True
w.plot_fit()
if not np.any(np.isnan(best_fit)):
self.fitted_pixels[x] = True
# interpolate the skipped pixels with splines
all_pixels = np.arange(Nx)
# xp = np.array(sorted(set(list(pixel_range))))
xp = all_pixels[self.fitted_pixels]
# self.fitted_pixels = xp
for i in range(len(self.psf.params.labels)):
yp = self.profile_params[xp, i]
self.profile_params[:, i] = interp1d(xp, yp, kind='cubic', fill_value='extrapolate', bounds_error=False)(all_pixels)
for x in all_pixels:
y = data[:, x]
if bgd_model_func is not None:
signal = y - bgd_model_func(x, index)[:, 0]
else:
signal = y
if np.mean(signal[bgd_index]) < 0:
signal -= np.mean(signal[bgd_index])
self.table['flux_err'][x] = np.sqrt(np.sum(err[:, x] ** 2))
self.table['flux_sum'][x] = np.sum(signal)
# keep only brightest transverse profiles
selected_pixels = self.fitted_pixels & (self.profile_params[:, 0] > 0.1 * np.median(self.profile_params[:, 0]))
# then keep only profiles with first shape parameter (index=3) are not too deviant from its median value
for k in range(3, self.profile_params.shape[1]):
keep = selected_pixels & (np.abs(self.profile_params[:, k]) < 5 * np.median(self.profile_params[:, k]))
# keep at least 20% of the fitted pixels or twice the number of data points needed for a polynomial fit
if np.sum(keep) > 0.2 * len(selected_pixels) or np.sum(keep) > 2 * (self.deg + 1):
selected_pixels = keep
self.params.values = self.from_profile_params_to_poly_params(self.profile_params, indices=selected_pixels)
self.from_profile_params_to_shape_params(self.profile_params)
self.cov_matrix = np.diag(1 / np.array(self.table['flux_err']) ** 2)
psf.params.bounds = initial_bounds
[docs]
def fit_chromatic_psf(self, data, mask=None, bgd_model_func=None, data_errors=None, mode="1D", analytical=True,
amplitude_priors_method="noprior", verbose=False, live_fit=False):
"""
Fit a chromatic PSF model on 2D data.
Parameters
----------
data: np.array
2D array containing the image data.
mask: np.array, optional
2D array containing the masked pixels.
bgd_model_func: callable, optional
A 2D function to model the extracted background (default: None -> null background)
data_errors: np.array
the 2D array uncertainties.
mode: str, optional
Set the fitting mode: either transverse 1D PSF profile (mode="1D") or full 2D PSF profile (mode="2D").
amplitude_priors_method: str, optional
Prior method to use to constrain the amplitude parameters of the PSF (default: "noprior").
verbose: bool, optional
Set the verbosity of the fitting process (default: False).
Returns
-------
w: ChromaticPSFFitWorkspace
The ChromaticPSFFitWorkspace containing info abut the fitting process.
Examples
--------
Set the parameters:
>>> parameters.PIXDIST_BACKGROUND = 40
>>> parameters.PIXWIDTH_BACKGROUND = 10
>>> parameters.PIXWIDTH_SIGNAL = 30
>>> parameters.DEBUG = True
Build a mock spectrogram with random Poisson noise using the full 2D PSF model:
>>> psf = Moffat(clip=False)
>>> s0 = ChromaticPSF(psf, Nx=120, Ny=100, deg=2, saturation=10000000)
>>> params = s0.generate_test_poly_params()
>>> params[:s0.Nx] *= 1
>>> s0.params.values = params
>>> saturation = params[-1]
>>> data = s0.evaluate(s0.set_pixels(mode="2D"), params)
>>> bgd = 10*np.ones_like(data)
>>> data += bgd
>>> data = np.random.poisson(data)
>>> data_errors = np.sqrt(np.abs(data+1))
>>> mask = np.zeros_like(data).astype(bool)
>>> mask[10:30,20:22] = True
Extract the background:
>>> bgd_model_func, _, _ = extract_spectrogram_background_sextractor(data, data_errors, ws=[30,50])
Propagate background uncertainties:
>>> data_errors = np.sqrt(data_errors**2 + bgd_model_func(np.arange(s0.Nx), np.arange(s0.Ny)))
Estimate the first guess values:
>>> s = ChromaticPSF(psf, Nx=120, Ny=100, deg=2, saturation=saturation)
>>> s.fit_transverse_PSF1D_profile(data, data_errors, w=20, ws=[30,50],
... pixel_step=1, bgd_model_func=bgd_model_func, saturation=saturation, live_fit=False)
>>> s.plot_summary(truth=s0)
>>> amplitude_residuals = [ [s0.params.values[:s0.Nx], np.array(s.table["amplitude"])-s0.params.values[:s0.Nx],
... np.array(s.table['amplitude'] * s.table['flux_err'] / s.table['flux_sum'])] ]
Fit the data using the transverse 1D PSF model only:
>>> w = s.fit_chromatic_psf(data, mode="1D", data_errors=data_errors, bgd_model_func=bgd_model_func,
... amplitude_priors_method="noprior", verbose=True, mask=mask)
>>> s.plot_summary(truth=s0)
>>> amplitude_residuals.append([s0.params.values[:s0.Nx], w.amplitude_params-s0.params.values[:s0.Nx],
... w.amplitude_params_err])
.. doctest::
:hide:
>>> residuals = [(w.data[x]-w.model[x])/w.err[x] for x in range(w.Nx)]
>>> assert w.costs[-1] /(w.Nx*w.Ny) < 1.5
>>> assert np.abs(np.mean(residuals)) < 0.15
>>> assert np.std(residuals) < 1.2
Fit the data using the full 2D PSF model
>>> parameters.PSF_FIT_REG_PARAM = 0.002
>>> w = s.fit_chromatic_psf(data, mode="2D", data_errors=data_errors, bgd_model_func=bgd_model_func,
... amplitude_priors_method="psf1d", verbose=True, analytical=True, mask=mask)
>>> s.plot_summary(truth=s0)
>>> amplitude_residuals.append([s0.params.values[:s0.Nx], w.amplitude_params-s0.params.values[:s0.Nx],
... w.amplitude_params_err])
>>> for k, label in enumerate(["Transverse", "PSF1D", "PSF2D"]):
... plt.errorbar(np.arange(s0.Nx), amplitude_residuals[k][1]/amplitude_residuals[k][2],
... yerr=amplitude_residuals[k][2]/amplitude_residuals[k][2],
... fmt="+", label=label) # doctest: +ELLIPSIS
<ErrorbarContainer ...>
>>> plt.grid()
>>> plt.legend() # doctest: +ELLIPSIS
<matplotlib.legend.Legend object at ...>
>>> plt.show()
.. doctest::
:hide:
>>> residuals = (w.data.flatten()-w.model.flatten())/w.err.flatten()
>>> assert w.costs[-1] /(w.Nx*w.Ny) < 1.2
>>> assert np.abs(np.mean(residuals)) < 0.15
>>> assert np.std(residuals) < 1.2
>>> assert np.abs(np.mean((w.amplitude_params - s0.params.values[:s0.Nx])/w.amplitude_params_err)) < 0.5
"""
if mode == "1D":
w = ChromaticPSFFitWorkspace(self, data, mask=mask, data_errors=data_errors, mode=mode, bgd_model_func=bgd_model_func,
amplitude_priors_method=amplitude_priors_method, verbose=verbose,
live_fit=live_fit, analytical=analytical)
# first fit order 0 terms
w.my_logger.info("\n\tFit order 0 parameters...")
fixed_default = np.copy(w.params.fixed)
# fix higher order coefficients of polynomes
for k in range(w.params.ndim):
if "_0" not in w.params.labels[k] and not w.params.fixed[k]:
w.params.fixed[k] = True # _k parameters that are yet fixed
w.params.values[k] = 0.0 # set them to zero
# if zero order coefficient is closer than 5% to its bound, change the value
for k, name in enumerate(self.psf.params.labels):
if name == 'amplitude':
continue
ind = w.params.get_index(name+"_0")
val = w.params.values[ind]
if (val-self.psf.params.bounds[k][0]) < 0.05 * val:
w.params.values[ind] = min(val * 1.05, self.psf.params.bounds[k][1])
if (val-self.psf.params.bounds[k][1]) < 0.05 * val:
w.params.values[ind] = max(val * 0.95, self.psf.params.bounds[k][0])
run_minimisation(w, method="newton", ftol=1 / (w.Nx * w.Ny), xtol=1e-6, niter=20, verbose=verbose, with_line_search=False)
# then fit all parameters together
w.my_logger.info("\n\tFit all ChromaticPSF parameters...")
w.params.fixed = fixed_default
run_minimisation(w, method="newton", ftol=1 / (w.Nx * w.Ny), xtol=1e-6, niter=50, with_line_search=False)
elif mode == "2D":
# first shot to set the mask
w = ChromaticPSFFitWorkspace(self, data, mask=mask, data_errors=data_errors, mode=mode, bgd_model_func=bgd_model_func,
amplitude_priors_method=amplitude_priors_method, verbose=verbose,
live_fit=live_fit, analytical=analytical)
# first, fit the transverse position
w.my_logger.info("\n\tFit y_c parameters...")
fixed_default = np.copy(w.params.fixed)
w.params.fixed = [True] * w.params.ndim
for k in range(w.params.ndim):
if "y_c" in w.params.labels[k]:
w.params.fixed[k] = False # y_c_k
run_minimisation(w, method="newton", ftol=100 / (w.Nx * w.Ny), xtol=1e-3, niter=10, verbose=verbose, with_line_search=False)
# then fit all parameters together
w.my_logger.info("\n\tFit all ChromaticPSF parameters...")
w.params.fixed = fixed_default
run_minimisation(w, method="newton", ftol=10 / (w.Nx * w.Ny), xtol=1e-4, niter=50, verbose=verbose, with_line_search=False)
else:
raise ValueError(f"mode argument must be '1D' or '2D'. Got {mode=}.")
if amplitude_priors_method == "psf1d":
w_reg = RegFitWorkspace(w, opt_reg=parameters.PSF_FIT_REG_PARAM, verbose=verbose)
w_reg.run_regularisation(Ndof=w.trace_r)
w.reg = np.copy(w_reg.opt_reg)
w.trace_r = np.trace(w_reg.resolution)
self.opt_reg = w_reg.opt_reg
w.simulate(*w.params.values)
if np.trace(w.amplitude_cov_matrix) < np.trace(w.amplitude_priors_cov_matrix):
self.my_logger.warning(
f"\n\tTrace of final covariance matrix ({np.trace(w.amplitude_cov_matrix)}) is "
f"below the trace of the prior covariance matrix "
f"({np.trace(w.amplitude_priors_cov_matrix)}). This is probably due to a very "
f"high regularisation parameter in case of a bad fit. Therefore the final "
f"covariance matrix is multiplied by the ratio of the traces and "
f"the amplitude parameters are very close the amplitude priors.")
r = np.trace(w.amplitude_priors_cov_matrix) / np.trace(w.amplitude_cov_matrix)
w.amplitude_cov_matrix *= r
w.amplitude_params_err = np.array([np.sqrt(w.amplitude_cov_matrix[x, x]) for x in range(self.Nx)])
w.set_mask(poly_params=w.poly_params)
# precise fit with sigma clipping
run_minimisation_sigma_clipping(w, method="newton", ftol=1 / (w.Nx * w.Ny), xtol=1e-6, niter=50,
sigma_clip=parameters.SPECTRACTOR_DECONVOLUTION_SIGMA_CLIP,
niter_clip=3, verbose=verbose, with_line_search=False)
# recompute and save params in class attributes
w.simulate(*w.params.values)
self.params.values = w.poly_params
self.cov_matrix = np.copy(w.amplitude_cov_matrix)
# add background crop to y_c
self.params.values[w.Nx + w.y_c_0_index] += w.bgd_width
# fill results
self.psf.apply_max_width_to_bounds(max_half_width=w.Ny + 2 * w.bgd_width)
self.profile_params = self.from_poly_params_to_profile_params(self.params.values, apply_bounds=True)
self.fill_table_with_profile_params(self.profile_params)
self.profile_params[:self.Nx, 0] = w.amplitude_params
self.profile_params[:self.Nx, 1] = np.arange(self.Nx)
self.from_profile_params_to_shape_params(self.profile_params)
# save plots
if parameters.LSST_SAVEFIGPATH: # pragma: no cover
w.plot_fit()
return w
[docs]
class ChromaticPSFFitWorkspace(FitWorkspace):
def __init__(self, chromatic_psf, data, data_errors, mode, bgd_model_func=None, mask=None, file_name="", analytical=True,
amplitude_priors_method="noprior", verbose=False, plot=False, live_fit=False, truth=None):
if mode not in ["1D", "2D"]:
raise ValueError(f"mode argument must be '1D' or '2D'. Got {mode=}.")
length = len(chromatic_psf.table)
params = FitParameters(np.copy(chromatic_psf.params.values[length:]),
labels=list(np.copy(chromatic_psf.params.labels[length:])),
axis_names=list(np.copy(chromatic_psf.params.axis_names[length:])), fixed=None,
truth=truth, filename=file_name)
for k, par in enumerate(params.labels):
if "x_c" in par or "saturation" in par:
params.fixed[k] = True
FitWorkspace.__init__(self, params, epsilon=1e-4, data=data, err=data_errors,
file_name=file_name, verbose=verbose, plot=plot, live_fit=live_fit)
self.my_logger = set_logger(self.__class__.__name__)
self.chromatic_psf = chromatic_psf
self.bgd_model_func = bgd_model_func
self.analytical = analytical
self.poly_params = np.copy(self.chromatic_psf.params.values)
self.y_c_0_index = -1
for k, par in enumerate(params.labels):
if par == "y_c_0":
self.y_c_0_index = k
break
# prepare the fit
self.Ny, self.Nx = self.data.shape
if self.Ny != self.chromatic_psf.Ny:
raise AttributeError(f"Data y shape {self.Ny} different from "
f"ChromaticPSF input Ny {self.chromatic_psf.Ny}.")
if self.Nx != self.chromatic_psf.Nx:
raise AttributeError(f"Data x shape {self.Nx} different from "
f"ChromaticPSF input Nx {self.chromatic_psf.Nx}.")
# prepare the background, data and errors
self.bgd = np.zeros_like(self.data)
if self.bgd_model_func is not None:
self.bgd = self.bgd_model_func(np.arange(self.Nx), np.arange(self.Ny))
self.data = self.data - self.bgd
self.bgd_std = float(np.std(np.random.poisson(np.abs(self.bgd))))
# crop spectrogram to fit faster
self.bgd_width = parameters.PIXWIDTH_BACKGROUND + parameters.PIXDIST_BACKGROUND - parameters.PIXWIDTH_SIGNAL
self.data = self.data[self.bgd_width:-self.bgd_width, :]
self.err = self.err[self.bgd_width:-self.bgd_width, :]
self.Ny, self.Nx = self.data.shape
self.data = self.data.astype("float32").ravel()
self.err = self.err.astype("float32").ravel()
self.pixels = np.arange(self.data.shape[0])
if mask is not None:
self.mask = list(np.where(mask[self.bgd_width:-self.bgd_width, :].astype(bool).ravel())[0])
else:
self.mask = []
if mode == "1D":
self.pixels = np.arange(self.Ny)
else:
yy, xx = np.mgrid[:self.Ny, :self.Nx]
self.pixels = np.asarray([xx, yy])
self.poly_params[self.Nx + self.y_c_0_index] -= self.bgd_width
self.profile_params = self.chromatic_psf.from_poly_params_to_profile_params(self.poly_params)
self.data_before_mask = np.copy(self.data)
self.mask_before_mask = list(np.copy(self.mask))
self.boundaries = None
self.psf_cube_sparse_indices = None
self.psf_cube_masked = None
self.M_sparse_indices = None
# update the bounds
self.chromatic_psf.psf.apply_max_width_to_bounds(max_half_width=self.Ny)
self.params.bounds = self.chromatic_psf.set_bounds()
# error matrix
# here image uncertainties are assumed to be uncorrelated
# (which is not exactly true in rotated images)
self.data_cov = sparse.diags(self.err * self.err, dtype="float32", format="dia")
self.W = sparse.diags(1 / (self.err * self.err), dtype="float32", format="dia")
self.sqrtW = self.W.sqrt()
self.W_before_mask = self.W.copy()
# design matrix
self.M = sparse.csr_matrix((self.Nx, self.data.size), dtype="float32")
self.M_dot_W_dot_M = sparse.csr_matrix((self.Nx, self.Nx), dtype="float32")
# prepare results
self.amplitude_params = np.zeros(self.Nx)
self.amplitude_params_err = np.zeros(self.Nx)
self.amplitude_cov_matrix = np.zeros((self.Nx, self.Nx))
# priors on amplitude parameters
self.amplitude_priors_list = ['noprior', 'positive', 'smooth', 'psf1d', 'fixed', 'keep']
self.amplitude_priors_method = amplitude_priors_method
self.fwhm_priors = np.copy(self.chromatic_psf.table['fwhm'])
self.reg = parameters.PSF_FIT_REG_PARAM
self.trace_r = self.Nx / np.median(self.fwhm_priors) # spectrophotometric uncertainty principle
self.Q = np.zeros((self.Nx, self.Nx))
self.Q_dot_A0 = np.zeros(self.Nx)
if amplitude_priors_method not in self.amplitude_priors_list:
raise ValueError(f"Unknown prior method for the amplitude fitting: {self.amplitude_priors_method}. "
f"Must be either {self.amplitude_priors_list}.")
# regularisation matrices
self.reg = parameters.PSF_FIT_REG_PARAM
if self.amplitude_priors_method == "psf1d":
self.amplitude_priors = np.copy(self.chromatic_psf.params.values[:self.Nx])
self.amplitude_priors_cov_matrix = np.copy(self.chromatic_psf.cov_matrix)
self.U = np.diag([1 / np.sqrt(self.amplitude_priors_cov_matrix[x, x]) for x in range(self.Nx)])
L = np.diag(-2 * np.ones(self.Nx)) + np.diag(np.ones(self.Nx), -1)[:-1, :-1] + np.diag(np.ones(self.Nx), 1)[:-1, :-1]
L.astype(float)
L[0, 0] = -1
L[-1, -1] = -1
self.L = L
self.Q = L.T @ self.U.T @ self.U @ L
self.Q_dot_A0 = self.Q @ self.amplitude_priors
if self.amplitude_priors_method == "fixed":
self.amplitude_priors = np.copy(self.chromatic_psf.params.values[:self.Nx])
self.set_mask()
[docs]
def plot_fit(self):
cmap_bwr = copy.copy(mpl.colormaps["bwr"])
cmap_bwr.set_bad(color='lightgrey')
cmap_viridis = copy.copy(mpl.colormaps["viridis"])
cmap_viridis.set_bad(color='lightgrey')
data = np.copy(self.data_before_mask)
model = np.copy(self.model)
err = np.copy(self.err)
if len(self.outliers) > 0 or len(self.mask) > 0:
bad_indices = np.array(list(self.get_bad_indices()) + list(self.mask)).astype(int)
data[bad_indices] = np.nan
data = data.reshape((self.Ny, self.Nx))
model = model.reshape((self.Ny, self.Nx))
err = err.reshape((self.Ny, self.Nx))
gs_kw = dict(width_ratios=[3, 0.15], height_ratios=[1, 1, 1, 1])
fig, ax = plt.subplots(nrows=4, ncols=2, figsize=(7, 7), gridspec_kw=gs_kw)
norm = np.nanmax(data)
plot_image_simple(ax[1, 0], data=model / norm, aspect='auto', cax=ax[1, 1], vmin=0, vmax=1,
units='1/max(data)', cmap=cmap_viridis)
ax[1, 0].set_title("Model", fontsize=10, loc='center', color='white', y=0.8)
plot_image_simple(ax[0, 0], data=data / norm, title='Data', aspect='auto',
cax=ax[0, 1], vmin=0, vmax=1, units='1/max(data)', cmap=cmap_viridis)
ax[0, 0].set_title('Data', fontsize=10, loc='center', color='white', y=0.8)
residuals = (data - model)
# residuals_err = self.spectrum.spectrogram_err / self.model
norm = err
residuals /= norm
std = float(np.nanstd(residuals))
plot_image_simple(ax[2, 0], data=residuals, vmin=-5 * std, vmax=5 * std, title='(Data-Model)/Err',
aspect='auto', cax=ax[2, 1], units='', cmap=cmap_bwr)
ax[2, 0].set_title('(Data-Model)/Err', fontsize=10, loc='center', color='black', y=0.8)
ax[2, 0].text(0.05, 0.05, f'mean={np.nanmean(residuals):.3f}\nstd={np.nanstd(residuals):.3f}',
horizontalalignment='left', verticalalignment='bottom',
color='black', transform=ax[2, 0].transAxes)
ax[0, 0].set_xticks(ax[2, 0].get_xticks()[1:-1])
ax[0, 1].get_yaxis().set_label_coords(3.5, 0.5)
ax[1, 1].get_yaxis().set_label_coords(3.5, 0.5)
ax[2, 1].get_yaxis().set_label_coords(3.5, 0.5)
ax[3, 1].remove()
ax[3, 0].errorbar(np.arange(self.Nx), np.nansum(data, axis=0), yerr=np.sqrt(np.nansum(err ** 2, axis=0)),
label='Data', fmt='k.', markersize=0.1)
model[np.isnan(data)] = np.nan # mask background values outside fitted region
ax[3, 0].plot(np.arange(self.Nx), np.nansum(model, axis=0), label='Model')
ax[3, 0].set_ylabel('Transverse sum')
ax[3, 0].set_xlabel(parameters.PLOT_XLABEL)
ax[3, 0].legend(fontsize=7)
ax[3, 0].set_xlim((0, data.shape[1]))
ax[3, 0].grid(True)
fig.tight_layout()
if self.live_fit: # pragma: no cover
plt.draw()
plt.pause(1e-8)
plt.close()
else: # pragma: no cover
if parameters.DISPLAY and self.verbose:
plt.show()
if parameters.LSST_SAVEFIGPATH: # pragma: no cover
fig.savefig(os.path.join(parameters.LSST_SAVEFIGPATH,
f'fit_chromatic_psf_best_fit_{self.amplitude_priors_method}.pdf'),
dpi=100, bbox_inches='tight')
[docs]
def set_mask(self, poly_params=None):
if poly_params is None:
poly_params = self.poly_params
profile_params = self.chromatic_psf.from_poly_params_to_profile_params(poly_params, apply_bounds=True)
self.chromatic_psf.from_profile_params_to_shape_params(profile_params)
if self.pixels.ndim == 1:
psf_cube_masked = np.zeros((len(profile_params), self.Ny, self.Nx), dtype=bool)
for x in range(psf_cube_masked.shape[0]):
psf_cube_masked[x, :, x] = 1
else:
psf_cube_masked = self.chromatic_psf.build_psf_cube_masked(self.pixels, profile_params,
fwhmx_clip=3 * parameters.PSF_FWHM_CLIP,
fwhmy_clip=parameters.PSF_FWHM_CLIP)
self.psf_cube_masked = self.chromatic_psf.convolve_psf_cube_masked(psf_cube_masked)
self.boundaries, self.psf_cube_masked = self.chromatic_psf.set_rectangular_boundaries(self.psf_cube_masked)
self.psf_cube_sparse_indices, self.M_sparse_indices = self.chromatic_psf.get_sparse_indices(self.boundaries)
# mask = np.sum(self.psf_cube_masked.reshape(psf_cube_masked.shape[0], psf_cube_masked[0].size), axis=0) == 0
# cumulate the boolean values as int
weight_mask = np.sum(self.psf_cube_masked, axis=0)
# look for indices with maximum weight per column (all sheets of the psf cube have contributed)
res = np.max(weight_mask, axis=0)[np.newaxis,:] * np.ones((weight_mask.shape[0],1))
# keep only the pixels where all psf_cube sheets have contributed per column
mask = (weight_mask != res).ravel()
self.mask = list(self.mask_before_mask) + list(np.where(mask)[0])
self.mask = list(set(self.mask))
W = np.copy(self.W_before_mask.data.ravel())
W[self.mask] = 0
self.W = sparse.diags(W, dtype="float32", format="dia")
self.sqrtW = self.W.sqrt()
[docs]
def simulate(self, *shape_params):
r"""
Compute a ChromaticPSF2D model given PSF shape parameters and minimizing
amplitude parameters using a spectrogram data array.
The ChromaticPSF2D model :math:`\vec{m}(\vec{x},\vec{p})` can be written as
.. math ::
:label: chromaticpsf2d
\vec{m}(\vec{x},\vec{p}) = \sum_{i=0}^{N_x} A_i \phi\left(\vec{x},\vec{p}_i\right)
with :math:`\vec{x}` the 2D array of the pixel coordinates, :math:`\vec{A}` the amplitude parameter array
along the x axis of the spectrogram, :math:`\phi\left(\vec{x},\vec{p}_i\right)` the 2D PSF kernel whose integral
is normalised to one parametrized with the :math:`\vec{p}_i` non-linear parameter array. If the :math:`\vec{x}`
2D array is flatten in 1D, equation :eq:`chromaticpsf2d` is
.. math ::
:label: chromaticpsf2d_matrix
:nowrap:
\begin{align}
\vec{m}(\vec{x},\vec{p}) & = \mathbf{M}\left(\vec{x},\vec{p}\right) \mathbf{A} \\
\mathbf{M}\left(\vec{x},\vec{p}\right) & = \left(\begin{array}{cccc}
\phi\left(\vec{x}_1,\vec{p}_1\right) & \phi\left(\vec{x}_2,\vec{p}_1\right) & ...
& \phi\left(\vec{x}_{N_x},\vec{p}_1\right) \\
... & ... & ... & ...\\
\phi\left(\vec{x}_1,\vec{p}_{N_x}\right) & \phi\left(\vec{x}_2,\vec{p}_{N_x}\right) & ...
& \phi\left(\vec{x}_{N_x},\vec{p}_{N_x}\right) \\
\end{array}\right)
\end{align}
with :math:`\mathbf{M}` the design matrix.
The goal of this function is to perform a minimisation of the amplitude vector :math:`\mathbf{A}` given
a set of non-linear parameters :math:`\mathbf{p}` and a spectrogram data array :math:`mathbf{y}` modelise as
.. math:: \mathbf{y} = \mathbf{m}(\vec{x},\vec{p}) + \vec{\epsilon}
with :math:`\vec{\epsilon}` a random noise vector. The :math:`\chi^2` function to minimise is
.. math::
:label: chromaticspsf2d_chi2
\chi^2(\mathbf{A})= \left(\mathbf{y} - \mathbf{M}\left(\vec{x},\vec{p}\right) \mathbf{A}\right)^T \mathbf{W}
\left(\mathbf{y} - \mathbf{M}\left(\vec{x},\vec{p}\right) \mathbf{A} \right)
with :math:`\mathbf{W}` the weight matrix, inverse of the covariance matrix. In our case this matrix is diagonal
as the pixels are considered all independent. The minimum of equation :eq:`chromaticspsf2d_chi2` is reached for
the set of amplitude parameters :math:`\hat{\mathbf{A}}` given by
.. math::
\hat{\mathbf{A}} = (\mathbf{M}^T \mathbf{W} \mathbf{M})^{-1} \mathbf{M}^T \mathbf{W} \mathbf{y}
The error matrix on the :math:`\hat{\mathbf{A}}` coefficient is simply
:math:`(\mathbf{M}^T \mathbf{W} \mathbf{M})^{-1}`.
Parameters
----------
shape_params: array_like
PSF shape polynomial parameter array.
Examples
--------
Set the parameters:
>>> parameters.PIXDIST_BACKGROUND = 40
>>> parameters.PIXWIDTH_BACKGROUND = 10
>>> parameters.PIXWIDTH_SIGNAL = 30
Build a mock spectrogram without random Poisson noise:
>>> psf = Moffat(clip=False)
>>> s0 = ChromaticPSF(psf, Nx=120, Ny=100, deg=2, saturation=100000)
>>> params = s0.generate_test_poly_params()
>>> params[:s0.Nx] *= 10
>>> s0.params.values = params
>>> saturation = params[-1]
>>> data = s0.evaluate(s0.set_pixels(mode="2D"), params)
>>> bgd = 10*np.ones_like(data)
>>> data += bgd
>>> data_errors = np.sqrt(data+1)
Extract the background:
>>> bgd_model_func, _, _ = extract_spectrogram_background_sextractor(data, data_errors, ws=[30,50])
Estimate the first guess values:
>>> s = ChromaticPSF(psf, Nx=120, Ny=100, deg=2, saturation=saturation)
>>> s.fit_transverse_PSF1D_profile(data, data_errors, w=20, ws=[30,50],
... pixel_step=1, bgd_model_func=bgd_model_func, saturation=saturation, live_fit=False)
>>> s.plot_summary(truth=s0)
1D case.
Simulate the data with fixed amplitude priors:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "1D", bgd_model_func=bgd_model_func,
... amplitude_priors_method="fixed", verbose=True)
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> w.plot_fit()
.. doctest::
:hide:
>>> assert mod is not None
Fit the amplitude of data without any prior:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "1D", bgd_model_func=bgd_model_func, verbose=True,
... amplitude_priors_method="noprior")
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> w.plot_fit()
.. doctest::
:hide:
>>> assert mod is not None
>>> assert np.mean(np.abs(mod-w.data)/w.err) < 1
Fit the amplitude of data smoothing the result with a window of size 10 pixels:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "1D", bgd_model_func=bgd_model_func, verbose=True,
... amplitude_priors_method="smooth")
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> w.plot_fit()
.. doctest::
:hide:
>>> assert mod is not None
>>> assert np.mean(np.abs(mod-w.data)/w.err) < 1
Fit the amplitude of data using the transverse PSF1D fit as a prior and with a
Tikhonov regularisation parameter set by parameters.PSF_FIT_REG_PARAM:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "1D", bgd_model_func=bgd_model_func, verbose=True,
... amplitude_priors_method="psf1d")
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> w.plot_fit()
.. doctest::
:hide:
>>> assert mod is not None
>>> assert np.mean(np.abs(mod-w.data)/w.err) < 1
2D case
Simulate the data with fixed amplitude priors:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "2D", bgd_model_func=bgd_model_func,
... amplitude_priors_method="fixed", verbose=True)
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> w.plot_fit()
.. doctest::
:hide:
>>> assert mod is not None
Simulate the data with a Tikhonov prior on amplitude parameters:
>>> parameters.PSF_FIT_REG_PARAM = 0.002
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "2D", bgd_model_func=bgd_model_func,
... amplitude_priors_method="psf1d", verbose=True)
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> w.plot_fit()
.. doctest::
:hide:
>>> assert mod is not None
"""
# linear regression for the amplitude parameters
# prepare the vectors
poly_params = np.concatenate([np.ones(self.Nx), shape_params])
poly_params[self.Nx + self.y_c_0_index] -= self.bgd_width
profile_params = self.chromatic_psf.from_poly_params_to_profile_params(poly_params, apply_bounds=True)
profile_params[:self.Nx, 0] = 1
profile_params[:self.Nx, 1] = np.arange(self.Nx)
# profile_params[:self.Nx, 2] -= self.bgd_width
if self.amplitude_priors_method != "fixed":
M = self.chromatic_psf.build_sparse_M(self.pixels, profile_params, dtype="float32",
M_sparse_indices=self.M_sparse_indices, boundaries=self.boundaries)
M_dot_W = M.T @ self.sqrtW
W_dot_data = self.W @ self.data
# Compute the minimizing amplitudes
if sparse_dot_mkl is None:
M_dot_W_dot_M = M_dot_W @ M_dot_W.T
else:
tri = sparse_dot_mkl.gram_matrix_mkl(M_dot_W, transpose=True)
dia = sparse.csr_matrix((tri.diagonal(), (np.arange(tri.shape[0]), np.arange(tri.shape[0]))),
shape=tri.shape, dtype="float32")
M_dot_W_dot_M = tri + tri.T - dia
if self.amplitude_priors_method != "psf1d":
if self.amplitude_priors_method == "keep":
amplitude_params = np.copy(self.amplitude_params)
else:
amplitude_params = cholesky_solve(M_dot_W_dot_M.toarray(), M.T @ W_dot_data)
if self.amplitude_priors_method == "positive":
amplitude_params[amplitude_params < 0] = 0
elif self.amplitude_priors_method == "smooth":
null_indices = np.where(amplitude_params < 0)[0]
for index in null_indices:
right = amplitude_params[index]
for i in range(index, min(index + 10, self.Nx)):
right = amplitude_params[i]
if i not in null_indices:
break
left = amplitude_params[index]
for i in range(index, max(0, index - 10), -1):
left = amplitude_params[i]
if i not in null_indices:
break
amplitude_params[index] = 0.5 * (right + left)
elif self.amplitude_priors_method == "noprior":
pass
else:
M_dot_W_dot_M_plus_Q = M_dot_W_dot_M + self.reg * self.Q
amplitude_params = cholesky_solve(M_dot_W_dot_M_plus_Q, M.T @ W_dot_data + np.float32(self.reg) * self.Q_dot_A0)
amplitude_params = np.asarray(amplitude_params).reshape(-1)
self.M = M
self.M_dot_W_dot_M = M_dot_W_dot_M
self.model = M @ amplitude_params
else:
amplitude_params = np.copy(self.amplitude_priors)
self.amplitude_params = np.copy(amplitude_params)
poly_params[:self.Nx] = amplitude_params
if self.amplitude_priors_method == "fixed":
self.model = self.chromatic_psf.evaluate(self.pixels, poly_params)
# self.chromatic_psf.params.values is updated in evaluate(): reset to original values
self.chromatic_psf.params.values[self.Nx + self.y_c_0_index] += self.bgd_width
self.poly_params = np.copy(poly_params)
self.profile_params = np.copy(profile_params)
self.model_err = np.zeros_like(self.model)
return self.pixels, self.model, self.model_err
[docs]
def amplitude_covariance(self):
r"""
Compute the covariance matrix for the amplitude parameters.
The error matrix on the :math:`\hat{\mathbf{A}}` coefficient is simply
:math:`(\mathbf{M}^T \mathbf{W} \mathbf{M})^{-1}`.
Examples
--------
Set the parameters:
>>> from spectractor.tools import plot_covariance_matrix
>>> parameters.PIXDIST_BACKGROUND = 40
>>> parameters.PIXWIDTH_BACKGROUND = 10
>>> parameters.PIXWIDTH_SIGNAL = 30
Build a mock spectrogram without random Poisson noise:
>>> psf = Moffat(clip=False)
>>> s0 = ChromaticPSF(psf, Nx=120, Ny=100, deg=2, saturation=100000)
>>> params = s0.generate_test_poly_params()
>>> params[:s0.Nx] *= 10
>>> s0.params.values = params
>>> saturation = params[-1]
>>> data = s0.evaluate(s0.set_pixels(mode="2D"), params)
>>> bgd = 10*np.ones_like(data)
>>> data += bgd
>>> data_errors = np.sqrt(data+1)
Extract the background:
>>> bgd_model_func, _, _ = extract_spectrogram_background_sextractor(data, data_errors, ws=[30,50])
Estimate the first guess values:
>>> s = ChromaticPSF(psf, Nx=120, Ny=100, deg=2, saturation=saturation)
>>> s.fit_transverse_PSF1D_profile(data, data_errors, w=20, ws=[30,50],
... pixel_step=1, bgd_model_func=bgd_model_func, saturation=saturation, live_fit=False)
>>> s.plot_summary(truth=s0)
1D case.
Simulate the data with fixed amplitude priors:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "1D", bgd_model_func=bgd_model_func,
... amplitude_priors_method="fixed", verbose=True)
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> cov = w.amplitude_covariance()
>>> plot_covariance_matrix(cov)
.. doctest::
:hide:
>>> assert mod is not None
Fit the amplitude of data without any prior:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "1D", bgd_model_func=bgd_model_func, verbose=True,
... amplitude_priors_method="noprior")
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> cov = w.amplitude_covariance()
>>> plot_covariance_matrix(cov)
.. doctest::
:hide:
>>> assert mod is not None
>>> assert np.mean(np.abs(mod-w.data)/w.err) < 1
Fit the amplitude of data smoothing the result with a window of size 10 pixels:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "1D", bgd_model_func=bgd_model_func, verbose=True,
... amplitude_priors_method="smooth")
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> cov = w.amplitude_covariance()
>>> plot_covariance_matrix(cov)
.. doctest::
:hide:
>>> assert mod is not None
>>> assert np.mean(np.abs(mod-w.data)/w.err) < 1
Fit the amplitude of data using the transverse PSF1D fit as a prior and with a
Tikhonov regularisation parameter set by parameters.PSF_FIT_REG_PARAM:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "1D", bgd_model_func=bgd_model_func, verbose=True,
... amplitude_priors_method="psf1d")
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> cov = w.amplitude_covariance()
>>> plot_covariance_matrix(cov)
.. doctest::
:hide:
>>> assert mod is not None
>>> assert np.mean(np.abs(mod-w.data)/w.err) < 1
2D case
Simulate the data with fixed amplitude priors:
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "2D", bgd_model_func=bgd_model_func,
... amplitude_priors_method="fixed", verbose=True)
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> cov = w.amplitude_covariance()
>>> plot_covariance_matrix(cov)
.. doctest::
:hide:
>>> assert mod is not None
Simulate the data with a Tikhonov prior on amplitude parameters:
>>> parameters.PSF_FIT_REG_PARAM = 0.002
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "2D", bgd_model_func=bgd_model_func,
... amplitude_priors_method="psf1d", verbose=True)
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> cov = w.amplitude_covariance()
>>> plot_covariance_matrix(cov)
.. doctest::
:hide:
>>> assert mod is not None
"""
if self.amplitude_priors_method != "fixed":
if self.amplitude_priors_method != "psf1d":
if self.amplitude_priors_method == "keep":
cov_matrix = np.copy(self.amplitude_cov_matrix)
else:
cov_matrix = np.linalg.inv(self.M_dot_W_dot_M.toarray())
else:
M_dot_W_dot_M_plus_Q = self.M_dot_W_dot_M + self.reg * self.Q
cov_matrix = np.linalg.inv(M_dot_W_dot_M_plus_Q)
else:
err2 = np.copy(self.amplitude_priors)
err2[err2 <= 0] = np.min(np.abs(err2[err2 > 0]))
cov_matrix = np.diag(err2)
# TODO: propagate and marginalize over the shape parameter uncertainties ?
self.amplitude_params_err = np.array([np.sqrt(cov_matrix[x, x]) for x in range(self.Nx)])
self.amplitude_cov_matrix = np.copy(cov_matrix)
return self.amplitude_cov_matrix
[docs]
def amplitude_derivatives(self):
r"""
Compute analytically the amplitude vector \hat{\mathbf{A}} derivatives with respect to the PSF parameters.
With
.. math::
\hat{\mathbf{A}} = \hat{\mathbf{C}} \cdot \mathbf{M}^T \mathbf{W} \mathbf{y}
\hat{\mathbf{C}} = (\mathbf{M}^T \mathbf{W} \mathbf{M})^{-1}
derivatives are
.. math::
\frac{\partial \hat{\mathbf{A}}}{\partial \theta} = \frac{\partial \hat{\mathbf{C}}}{\partial \theta} \cdot \mathbf{M}^T \mathbf{W} \mathbf{y} + \hat{\mathbf{C}} \cdot \frac{\partial \mathbf{M}^T \mathbf{W} \mathbf{y}}{\partial \theta}
\frac{\partial \hat{\mathbf{C}}}{\partial \theta} = - \hat{\mathbf{C}} \cdot \frac{\partial \mathbf{M}^T \mathbf{W} \mathbf{M}}{\partial \theta} \cdot \hat{\mathbf{C}}
\frac{\partial \mathbf{M}^T \mathbf{W} \mathbf{M}}{\partial \theta} = 2 \frac{\partial \mathbf{M}^T}{\partial \theta} \mathbf{W} \mathbf{M}
\frac{\partial \mathbf{M}^T \mathbf{W} \mathbf{y}}{\partial \theta} = \frac{\partial \mathbf{M}^T}{\partial \theta} \mathbf{W} \mathbf{y}
If amplitude vector is regularized via Tikhonov regularisation, regularisation term is added appropriately.
Returns
-------
dA_dtheta: list
List of amplitude vector derivatives.
Examples
--------
Set the parameters:
>>> parameters.PIXDIST_BACKGROUND = 40
>>> parameters.PIXWIDTH_BACKGROUND = 10
>>> parameters.PIXWIDTH_SIGNAL = 30
Build a mock spectrogram without random Poisson noise:
>>> psf = Moffat(clip=False)
>>> s0 = ChromaticPSF(psf, Nx=120, Ny=100, deg=2, saturation=100000)
>>> params = s0.generate_test_poly_params()
>>> params[:s0.Nx] *= 10
>>> s0.params.values = params
>>> saturation = params[-1]
>>> data = s0.evaluate(s0.set_pixels(mode="2D"), params)
>>> bgd = 10*np.ones_like(data)
>>> data += bgd
>>> data_errors = np.sqrt(data+1)
Extract the background:
>>> bgd_model_func, _, _ = extract_spectrogram_background_sextractor(data, data_errors, ws=[30,50])
Estimate the first guess values:
>>> s = ChromaticPSF(psf, Nx=120, Ny=100, deg=2, saturation=saturation)
>>> s.fit_transverse_PSF1D_profile(data, data_errors, w=20, ws=[30,50],
... pixel_step=1, bgd_model_func=bgd_model_func, saturation=saturation, live_fit=False)
>>> s.plot_summary(truth=s0)
Simulate the data with a Tikhonov prior on amplitude parameters:
>>> parameters.PSF_FIT_REG_PARAM = 0.002
>>> s.params.values = s.from_table_to_poly_params()
>>> w = ChromaticPSFFitWorkspace(s, data, data_errors, "2D", bgd_model_func=bgd_model_func,
... amplitude_priors_method="psf1d", verbose=True)
>>> y, mod, mod_err = w.simulate(*s.params.values[s.Nx:])
>>> w.plot_fit()
.. doctest::
:hide:
>>> assert mod is not None
Get the derivatives:
>>> dA_dtheta = w.amplitude_derivatives()
>>> print(np.array(dA_dtheta).shape, w.amplitude_params.shape)
(13, 120) (120,)
"""
# compute matrices without derivatives
WM = self.W @ self.M
WD = self.W @ self.data
MWD = self.M.T @ WD
if self.amplitude_priors_method == "psf1d":
MWD += np.float32(self.reg) * self.Q_dot_A0
# compute partial derivatives of model matrix M
dM_dtheta = self.chromatic_psf.build_sparse_dM(self.pixels, profile_params=self.profile_params,
M_sparse_indices=self.M_sparse_indices,
boundaries=self.boundaries, dtype="float32")
# compute partial derivatives of amplitude vector A
nparams = len(dM_dtheta)
dMWD_dtheta = [dM_dtheta[ip].T @ WD for ip in range(nparams)]
dMWM_dtheta = [2 * dM_dtheta[ip].T @ WM for ip in range(nparams)]
dcov_dtheta = [-self.amplitude_cov_matrix @ (dMWM_dtheta[ip] @ self.amplitude_cov_matrix) for ip in range(nparams)]
dA_dtheta = [self.amplitude_cov_matrix @ dMWD_dtheta[ip] + dcov_dtheta[ip] @ MWD for ip in range(nparams)]
return dA_dtheta
[docs]
def jacobian(self, params, model_input=None):
r"""Generic function to compute the Jacobian matrix of a model, linear parameters being fixed (see Notes),
with analytical or numerical derivatives. Analytical derivatives are performed if `self.analytical` is True.
Let's write :math:`\theta` the non-linear model parameters. If the model is written as:
.. math::
\mathbf{I} = \mathbf{M}(\theta) \cdot \hat{\mathbf{A}}(\theta),
this jacobian function returns:
.. math::
\frac{\partial \mathbf{I}}{\partial \theta} = \frac{\partial \mathbf{M}}{\partial \theta} \cdot \hat{\mathbf{A}}.
Notes
-----
The gradient descent is performed on the non-linear parameters :math:`\theta` (PSF shape and position). Linear parameters :math:`\mathbf{A}` (amplitudes) are computed on the fly.
Therefore, :math:`\chi^2` is a function of :math:`\theta` only
.. math ::
\chi^2(\theta) = \chi'^2(\theta, \hat{\mathbf{A}}
whose partial derivatives on :math:`\theta` for gradient descent are:
.. math ::
\frac{\partial \chi^2}{\partial \theta} = \left.\left(\frac{\partial \chi'^2}{\partial \theta} + \frac{\partial \chi'^2}{\partial \mathbf{A}} \frac{\partial \mathbf{A}}{\partial \theta}\right)\right\vert_{\mathbf{A} = \hat{\mathbf{A}}}
By definition, :math:`\left.\partial \chi'^2/\partial \mathbf{A}\right\vert_{\mathbf{A} = \hat{\mathbf{A}}}=0` then :math:`\chi^2` partial derivatives must be performed with fixed :math:`\mathbf{A} = \hat{\mathbf{A}}`
for gradient descent. `self.amplitude_priors_method` is temporarily switched to "keep" in `self.jacobian()` to use previously computed :math:`\hat{\mathbf{A}}` solution.
Parameters
----------
params: array_like
The array of model parameters.
model_input: array_like, optional
A model input as a list with (x, model, model_err) to avoid an additional call to simulate().
Returns
-------
J: np.array
The Jacobian matrix.
"""
method = copy.copy(self.amplitude_priors_method)
self.amplitude_priors_method = "keep"
if not self.analytical:
J = super().jacobian(params, epsilon=self.epsilon, model_input=model_input)
else:
profile_params = np.copy(self.profile_params)
amplitude_params = np.copy(self.amplitude_params)
profile_params[:, 0] *= amplitude_params # np.ones_like(amplitude_params)
J = self.chromatic_psf.build_psf_jacobian(self.pixels, profile_params=profile_params,
psf_cube_sparse_indices=self.psf_cube_sparse_indices,
boundaries=self.boundaries, dtype="float32")
self.amplitude_priors_method = method
return J
def __post_fit__(self):
self.amplitude_covariance()
if __name__ == "__main__":
import doctest
doctest.testmod()