from scipy import optimize
import time
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import numpy as np
import scipy
import os
import json
from dataclasses import dataclass
from typing import Optional, Union
from spectractor import parameters
from spectractor.config import set_logger
from spectractor.tools import (formatting_numbers, compute_correlation_matrix, plot_correlation_matrix_simple,
NumpyArrayEncoder)
from lsst.utils.threads import disable_implicit_threading
disable_implicit_threading()
[docs]
@dataclass()
class FitParameter:
"""Container for a parameter to fit on data with FitWorkspace.
Attributes
----------
value: float
Array containing the parameter values.
label: str
Parameter labels for screen print.
axis_name: str
Parameter labels for plot print.
bounds: list
2-element list giving the (lower, upper) bounds for the parameter.
fixed: bool
Boolean indicating whether the parameter is fixed.
err: float
Uncertainty on parameter value.
truth: float, optional
Truth value.
Examples
--------
>>> from spectractor.fit.fitter import FitParameter
>>> p = FitParameter(value=1, label="x", axis_name="$x$", bounds=[0, 1], fixed=False, err=0.1, truth=1)
>>> p
x: 1.0 +/- 0.1 (truth=1)
>>> p = FitParameter(value=0.01234, label="t", axis_name="$t$", bounds=[-1, 1], fixed=False, err=0.02)
>>> p
t: 0.01 +/- 0.02
>>> p = FitParameter(value=1, label="x", axis_name="$x$", bounds=[0, 1], fixed=True, err=0)
>>> p
x: 1 (fixed)
"""
value: float
label: str
axis_name: str
bounds: list
fixed: bool
err: float
truth: Optional[float] = None
def __repr__(self):
"""Print the parameter on screen.
"""
txt = ""
if self.fixed:
txt += f"{self.label}: {self.value} (fixed)"
else:
if self.err != 0:
val, err, _ = formatting_numbers(self.value, self.err, self.err)
txt += f"{self.label}: {val} +/- {err}"
else:
txt += f"{self.label}: {self.value} +/- {self.err}"
if self.truth:
txt += f" (truth={self.truth})"
return txt
[docs]
@dataclass()
class FitParameters:
"""Container for the parameters to fit on data with FitWorkspace.
Attributes
----------
values: np.ndarray
Array containing the parameter values.
labels: list, optional
List of the parameter labels for screen print.
If None, make a default list with parameters labelled par (default: None).
axis_names: list, optional
List of the parameter labels for plot print.
If None, make a default list with parameters labelled par (default: None).
bounds: list, optional
List of 2-element list giving the (lower, upper) bounds for every parameter.
If None, make a default list with np.infinity boundaries (default: None).
fixed: list, optional
List of boolean: True to fix a parameter, False to let it free.
If None, make a default list with False values: all parameters are free (default: None).
truth: np.ndarray, optional
Array of truth parameters (default: None).
filename: str, optional
File name associated to the fitted parameters (usually _spectrum.fits file name) (default: '').
Examples
--------
>>> from spectractor.fit.fitter import FitParameters
>>> params = FitParameters(values=[1, 1, 1, 1, 1])
>>> params.ndim
5
>>> params.values
array([1., 1., 1., 1., 1.])
>>> params.labels
['par0', 'par1', 'par2', 'par3', 'par4']
>>> params.bounds
[[-inf, inf], [-inf, inf], [-inf, inf], [-inf, inf], [-inf, inf]]
"""
values: Union[np.ndarray, list]
labels: Optional[list] = None
axis_names: Optional[list] = None
bounds: Optional[list] = None
fixed: Optional[list] = None
truth: Optional[list] = None
filename: Optional[str] = ""
extra: Optional[dict] = None
def __post_init__(self):
if type(self.values) is list:
self.values = np.array(self.values, dtype=float)
self.values = np.asarray(self.values, dtype=float)
if not self.labels:
self.labels = [f"par{k}" for k in range(self.ndim)]
else:
if len(self.labels) != self.ndim:
raise ValueError("labels argument must have same size as values argument.")
if not self.axis_names:
self.axis_names = [f"$p_{k}$" for k in range(self.ndim)]
else:
if len(self.axis_names) != self.ndim:
raise ValueError("labels argument must have same size as values argument.")
if self.bounds is None:
self.bounds = [[-np.inf, np.inf]] * self.ndim
else:
if np.array(self.bounds).shape != (self.ndim, 2):
raise ValueError(f"bounds argument size {np.array(self.bounds).shape} must be same as values argument {(self.ndim, 2)}.")
if not self.fixed:
self.fixed = [False] * self.ndim
else:
if len(list(self.fixed)) != self.ndim:
raise ValueError("fixed argument must have same size as values argument.")
self.cov = np.zeros((self.nfree, self.nfree))
def __getitem__(self, label):
"""Get parameter value given its label.
Parameters
----------
label: str
The parameter label.
Returns
-------
value: float
The parameter value.
Examples
--------
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> params["z"]
3.0
"""
index = self.get_index(label=label)
return self.values[index]
def __len__(self):
"""Length of the parameter array, equivalent to self.ndim.
Examples
--------
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> len(params)
4
>>> len(params) == params.ndim
True
"""
return len(self.values)
def __eq__(self, other):
"""Test parameter instances equality.
Examples
--------
>>> p1 = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> p2 = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> p1 == p2
True
>>> p3 = FitParameters(values=[1, 2, 3, 4.1], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> p1 == p3
False
>>> p4 = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[False, False, True, False])
>>> p1 == p4
False
"""
if not isinstance(other, FitParameters):
return NotImplemented
out = True
for key in self.__dict__.keys():
if isinstance(getattr(self, key), np.ndarray):
if len(getattr(self, key).flatten()) == len(getattr(other, key).flatten()):
out *= np.all(np.equal(getattr(self, key).flatten(), getattr(other, key).flatten()))
else:
# if fixed parameters are not equal, covariance matrices have not the same shape
# and multiplication above is forbidden
out = False
else:
out *= getattr(self, key) == getattr(other, key)
return out
def __repr__(self):
"""Print the best fitting parameters on screen.
Labels are from self.labels.
Examples
--------
>>> parameters.VERBOSE = True
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> params.cov = np.array([[1, -0.5], [-0.5, 4]])
>>> params
x: 1.0 (fixed=True)
y: 2 +1 -1 bounds=[-inf, inf]
z: 3.0 (fixed=True)
t: 4 +2 -2 bounds=[-inf, inf]
"""
return self.print_parameters_summary()
[docs]
def get_index(self, label):
"""Get parameter index given its label.
Parameters
----------
label: str
The parameter label.
Returns
-------
index: int
The parameter index.
Examples
--------
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> params.get_index("z")
2
"""
if label in self.labels:
index = self.labels.index(label)
return index
else:
raise KeyError(f"{label=} not in FitParameters.labels ({self.labels=}).")
[docs]
def set(self, label, value):
"""Set value parameter given its label.
Parameters
----------
label: str
The parameter label.
value: float
The new parameter value.
Examples
--------
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> params["z"]
3.0
>>> params.set("z", 0)
>>> params["z"]
0.0
"""
key = self.get_index(label)
self.values[key] = value
@property
def rho(self):
"""Correlation matrix computed from the covariance matrix
Returns
-------
rho: np.ndarray
The correlation matrix array.
Examples
--------
>>> from spectractor.fit.fitter import FitParameters
>>> params = FitParameters(values=[1, 1, 1], axis_names=["x", "y", "z"])
>>> params.cov = np.array([[2,-0.5,0],[-0.5,2,-1],[0,-1,2]])
>>> params.rho
array([[ 1. , -0.25, 0. ],
[-0.25, 1. , -0.5 ],
[ 0. , -0.5 , 1. ]])
"""
return compute_correlation_matrix(self.cov)
@property
def err(self):
"""Uncertainties on fitted parameters, as the square root of the covariance matrix diagonal.
Returns
-------
err: np.ndarray
The uncertainty array.
Examples
-------
>>> from spectractor.fit.fitter import FitParameters
>>> import numpy as np
>>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[False, True, False, True, False])
>>> params.cov = np.array([[1,-0.5,0],[-0.5,4,-1],[0,-1,9]])
>>> params.err
array([1., 0., 2., 0., 3.])
"""
err = np.zeros_like(self.values, dtype=float)
if np.sum(self.fixed) != len(self.fixed):
err[~np.asarray(self.fixed)] = np.sqrt(np.diag(self.cov))
return err
@property
def ndim(self):
"""Number of parameters.
Returns
-------
ndim: int
Examples
--------
>>> from spectractor.fit.fitter import FitParameters
>>> params = FitParameters(values=[1, 1, 1, 1, 1])
>>> params.ndim
5
"""
return len(self.values)
@property
def nfree(self):
"""Number of free parameters.
Returns
-------
nfree: int
Examples
--------
>>> from spectractor.fit.fitter import FitParameters
>>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[True, False, True, False, True])
>>> params.nfree
2
"""
return len(self.get_free_parameters())
@property
def nfixed(self):
"""Number of fixed parameters.
Returns
-------
nfixed: int
Examples
--------
>>> from spectractor.fit.fitter import FitParameters
>>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[True, False, True, False, True])
>>> params.nfixed
3
"""
return len(self.get_fixed_parameters())
[docs]
def get_free_parameters(self):
"""Return indices array of free parameters.
Examples
--------
>>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=None)
>>> params.fixed
[False, False, False, False, False]
>>> params.get_free_parameters()
array([0, 1, 2, 3, 4])
>>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[True, False, True, False, True])
>>> params.fixed
[True, False, True, False, True]
>>> params.get_free_parameters()
array([1, 3])
"""
return np.array(np.where(np.array(self.fixed).astype(int) == 0)[0])
[docs]
def get_fixed_parameters(self):
"""Return indices array of fixed parameters.
Examples
--------
>>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=None)
>>> params.fixed
[False, False, False, False, False]
>>> params.get_fixed_parameters()
array([], dtype=int64)
>>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[True, False, True, False, True])
>>> params.fixed
[True, False, True, False, True]
>>> params.get_fixed_parameters()
array([0, 2, 4])
"""
return np.array(np.where(np.array(self.fixed).astype(int) == 1)[0])
[docs]
def print_parameters_summary(self):
"""Build a string with the best fitting parameters to display on screen.
Labels are from self.labels.
Returns
-------
txt: str
The printed text.
Examples
--------
>>> parameters.VERBOSE = True
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> params.cov = np.array([[1, -0.5], [-0.5, 4]])
>>> _ = params.print_parameters_summary()
"""
txt = ""
ifree = self.get_free_parameters()
icov = 0
for ip in range(self.ndim):
if ip in ifree and np.abs(self.cov[icov, icov]) != 0.:
txt += "%s: %s +%s -%s" % formatting_numbers(self.values[ip], np.sqrt(np.abs(self.cov[icov, icov])),
np.sqrt(np.abs(self.cov[icov, icov])),
label=self.labels[ip])
txt += f" bounds={self.bounds[ip]}"
icov += 1
else:
txt += f"{self.labels[ip]}: {self.values[ip]} (fixed={self.fixed[ip]})"
if ip != self.ndim-1:
txt += "\n"
return txt
[docs]
def get_parameter(self, key):
"""Return a FitParameter instance. key argument can be the parameter label or its index value.
Parameters
----------
key: str, int
The parameter key.
Returns
-------
param: FitParameter
A FitParameter instance containing all information about a parameter.
Examples
--------
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False])
>>> params.cov = np.array([[1, -0.5], [-0.5, 4]])
>>> p3 = params.get_parameter("t")
>>> p3
t: 4 +/- 2
>>> p0 = params.get_parameter(0)
>>> p0
x: 1.0 (fixed)
"""
if type(key) is str:
index = self.get_index(key)
else:
index = key
if self.truth is None:
truth = None
else:
truth = self.truth[index]
p = FitParameter(value=self.values[index], label=self.labels[index],
axis_name=self.axis_names[index], bounds=self.bounds[index],
fixed=self.fixed[index], err=self.err[index], truth=truth)
return p
[docs]
def plot_correlation_matrix(self, live_fit=False):
"""Compute and plot a correlation matrix.
Save the plot if parameters.SAVE is True. The output file name is build from self.file_name,
adding the suffix _correlation.pdf.
Parameters
----------
live_fit: bool, optional, optional
If True, model, data and residuals plots are made along the fitting procedure (default: False).
Examples
--------
>>> from spectractor.fit.fitter import FitParameters
>>> params = FitParameters(values=[1, 1, 1], axis_names=["x", "y", "z"])
>>> params.cov = np.array([[1,-0.5,0],[-0.5,1,-1],[0,-1,1]])
>>> params.plot_correlation_matrix()
"""
ipar = self.get_free_parameters()
fig = plt.figure()
plot_correlation_matrix_simple(plt.gca(), self.rho, axis_names=[self.axis_names[i] for i in ipar])
fig.tight_layout()
if (parameters.SAVE or parameters.LSST_SAVEFIGPATH) and self.filename != "": # pragma: no cover
figname = os.path.splitext(self.filename)[0] + "_correlation.pdf"
fig.savefig(figname, dpi=100, bbox_inches='tight')
if parameters.LSST_SAVEFIGPATH: # pragma: no cover
figname = os.path.join(parameters.LSST_SAVEFIGPATH, "parameters_correlation.pdf")
fig.savefig(figname, dpi=100, bbox_inches='tight')
if parameters.DISPLAY: # pragma: no cover
if live_fit:
plt.draw()
plt.pause(1e-8)
else:
plt.show()
@property
def txt_filename(self):
return os.path.splitext(self.filename)[0] + "_bestfit.txt"
@property
def json_filename(self):
return os.path.splitext(self.filename)[0] + "_bestfit.json"
[docs]
def write_text(self, header=""):
"""Save the best fitting parameter summary in a text file.
The file name is build from self.file_name, adding the suffix _bestfit.txt.
Parameters
----------
header: str, optional
A header to add to the file (default: "").
Examples
--------
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False], filename="test_spectrum.fits")
>>> params.cov = np.array([[1,-0.5,0],[-0.5,1,-1],[0,-1,1]])
>>> params.write_text(header="chi2: 1")
.. doctest::
:hide:
>>> assert os.path.isfile(params.txt_filename)
>>> os.remove(params.txt_filename)
"""
txt = self.filename + "\n"
if header != "":
txt += header + "\n"
txt += self.print_parameters_summary()
for row in self.cov:
txt += np.array_str(row, max_line_width=20 * self.cov.shape[0]) + '\n'
output_filename = os.path.splitext(self.filename)[0] + "_bestfit.txt"
f = open(output_filename, 'w')
f.write(txt)
f.close()
[docs]
def write_json(self):
pass
[docs]
def write_fitparameter_json(json_filename, params, extra=None):
"""Save FitParameters attributes as a JSON file.
Parameters
----------
json_filename: str
JSON file name.
params: FitParameters
A FitParameters instance to save in JSON json_filename.
extra: dict, optional
Extra information to write in the JSON file.
Returns
-------
jsontxt: str
The JSON dictionnary as string.
Examples
--------
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False], filename="test_spectrum.fits")
>>> params.cov = np.array([[1,-0.5,0],[-0.5,1,-1],[0,-1,1]])
>>> jsonstr = write_fitparameter_json(params.json_filename, params, extra={"chi2": 1})
>>> jsonstr # doctest: +ELLIPSIS
'{"values": [1.0, 2.0, 3.0, 4.0], "labels": ["x", "y", "z", "t"],..."extra": {"chi2": 1}...
.. doctest::
:hide:
>>> assert os.path.isfile(params.json_filename)
>>> os.remove(params.json_filename)
"""
if json_filename == "":
raise ValueError("Must provide attribute a JSON filename.")
if extra:
params.extra = extra
jsontxt = json.dumps(params.__dict__, cls=NumpyArrayEncoder)
with open(json_filename, 'w') as output_json:
output_json.write(jsontxt)
return jsontxt
[docs]
def read_fitparameter_json(json_filename):
"""Read JSON file and store data in FitParameters instance.
Parameters
----------
json_filename: str
The JSON file name.
Returns
-------
params: FitParameters
A FitParameters instance to loaded from JSON json_filename.
Examples
--------
>>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False], filename="test_spectrum.fits")
>>> params.cov = np.array([[1,-0.5,0],[-0.5,1,-1],[0,-1,1]])
>>> _ = write_fitparameter_json(params.json_filename, params, extra={"chi2": 1})
>>> new_params = read_fitparameter_json(params.json_filename)
>>> new_params.values
array([1., 2., 3., 4.])
.. doctest::
:hide:
>>> assert os.path.isfile(params.json_filename)
>>> assert params == new_params
>>> os.remove(params.json_filename)
"""
params = FitParameters(values=[0])
with open(json_filename, 'r') as f:
data = json.load(f)
for key in ["values", "cov"]:
data[key] = np.asarray(data[key])
for key in data:
setattr(params, key, data[key])
return params
[docs]
class FitWorkspace:
def __init__(self, params=None, data=None, err=None, data_cov=None, epsilon=None,
file_name="", verbose=False, plot=False, live_fit=False, truth=None):
"""Generic class to create a fit workspace with parameters, bounds and general fitting methods.
Parameters
----------
params: FitParameters, optional
The parameters to fit to data (default: None).
data: np.ndarray, optional
Data array to fit with simulate() method (default: None).
err: np.ndarray, optional
Uncertainty array for data (default: None).
data_cov: np.ndarray, optional
Covariance matrix for data (default: None).
epsilon: float, List, tuple, np.ndarray, optional
Parameter variations for finite difference jacobian approximation (default: None).
file_name: str, optional
The generic file name to save results. If file_name=="", nothing is saved ond disk (default: "").
verbose: bool, optional
Level of verbosity (default: False).
plot: bool, optional
Level of plotting (default: False).
live_fit: bool, optional
If True, model, data and residuals plots are made along the fitting procedure (default: False).
truth: array_like, optional
Array of true parameters (default: None).
Examples
--------
>>> params = FitParameters(values=[1, 1, 1, 1, 1])
>>> w = FitWorkspace(params, data=None)
>>> w.params.ndim
5
"""
self.my_logger = set_logger(self.__class__.__name__)
self.params = params
self.epsilon = epsilon
if self.epsilon is not None:
self.set_epsilon()
self.filename = file_name
self.truth = truth
self.verbose = verbose
self.plot = plot
self.live_fit = live_fit
self.data = data
self.data_cov = data_cov
self.err = err
if (err is not None and data_cov is not None):
raise ValueError("Either err or data_cov must be specified.")
if err is not None:
self.err = err
self.data_cov = np.asarray(self.err.flatten() ** 2)
if data_cov is not None:
self.data_cov = data_cov
self.err = np.sqrt(np.diag(data_cov))
self.outliers = []
self.mask = []
self.W = None
if self.data_cov is not None or self.err is not None:
self.prepare_weight_matrices()
self.model = None
self.model_err = None
self.model_noconv = None
self.params_table = None
self.costs = np.array([[]])
[docs]
def set_epsilon(self):
if type(self.epsilon) is float:
tmp = self.epsilon * self.params.values
tmp[tmp == 0] = self.epsilon
self.epsilon = tmp
if self.epsilon is not None and len(self.epsilon) != len(self.params.values):
raise ValueError(f"Finite difference epsilon array has length {len(self.epsilon)} "
f"but parameter array length is {len(self.params.values)}. "
f"Check your FitWorkspace initialization.")
[docs]
def get_bad_indices(self):
"""List of indices that are outliers rejected by a sigma-clipping method or other masking method.
Returns
-------
outliers: list
Examples
--------
>>> w = FitWorkspace()
>>> w.data = np.array([np.array([1,2,3]), np.array([1,2,3,4])], dtype=object)
>>> w.outliers = [2, 6]
>>> w.get_bad_indices()
[array([2]), array([3])]
"""
bad_indices = np.asarray(self.outliers, dtype=int)
if self.data.dtype == object:
if len(self.outliers) > 0:
bad_indices = []
start_index = 0
for k in range(self.data.shape[0]):
mask = np.zeros(self.data[k].size, dtype=bool)
outliers = np.asarray(self.outliers)[np.logical_and(np.asarray(self.outliers) > start_index,
np.asarray(self.outliers) < start_index +
self.data[k].size)]
mask[outliers - start_index] = True
bad_indices.append(np.arange(self.data[k].size)[mask])
start_index += self.data[k].size
else:
bad_indices = [[] for _ in range(self.data.shape[0])]
return bad_indices
[docs]
def simulate(self, *p):
"""Compute the model prediction given a set of parameters.
Parameters
----------
p: array_like
Array of parameters for the computation of the model.
Returns
-------
x: array_like
The abscisse of the model prediction.
model: array_like
The model prediction.
model_err: array_like
The uncertainty on the model prediction.
Examples
--------
>>> w = FitWorkspace()
>>> p = np.zeros(3)
>>> x, model, model_err = w.simulate(*p)
.. doctest::
:hide:
>>> assert x is not None
"""
self.x = np.array([])
self.model = np.array([])
self.model_err = np.array([])
return self.x, self.model, self.model_err
[docs]
def plot_fit(self): # pragma: no cover
"""Generic function to plot the result of the fit for 1D curves.
Returns
-------
fig: plt.FigureClass
The figure.
"""
fig = plt.figure()
plt.errorbar(self.x, self.data, yerr=self.err, fmt='ko', label='Data')
if self.truth is not None:
x, truth, truth_err = self.simulate(*self.truth)
plt.plot(self.x, truth, label="Truth")
plt.plot(self.x, self.model, label='Best fitting model')
plt.xlabel('$x$')
plt.ylabel('$y$')
title = ""
for i, label in enumerate(self.params.labels):
if self.params.cov.size > 0:
err = np.sqrt(self.params.cov[i, i])
formatting_numbers(self.params.values[i], err, err)
_, par, err, _ = formatting_numbers(self.params.values[i], err, err, label=label)
title += rf"{label} = {par} $\pm$ {err}"
else:
title += f"{label} = {self.params.values[i]:.3g}"
if i < self.params.ndim - 1:
title += ", "
plt.title(title)
plt.legend()
plt.grid()
if parameters.DISPLAY: # pragma: no cover
plt.show()
return fig
[docs]
def weighted_residuals(self, p): # pragma: nocover
"""Compute the weighted residuals array for a set of model parameters p.
Parameters
----------
p: array_like
The array of model parameters.
Returns
-------
residuals: np.array
The array of weighted residuals.
"""
x, model, model_err = self.simulate(*p)
if self.data_cov is None:
if len(self.outliers) > 0:
model_err = model_err.flatten()
err = self.err.flatten()
res = (model.flatten() - self.data.flatten()) / np.sqrt(model_err * model_err + err * err)
else:
res = ((model - self.data) / np.sqrt(model_err * model_err + self.err * self.err)).flatten()
else:
if self.data_cov.ndim > 2:
K = self.data_cov.shape[0]
if np.any(model_err > 0):
cov = [self.data_cov[k] + np.diag(model_err[k] ** 2) for k in range(K)]
L = [np.linalg.inv(np.linalg.cholesky(cov[k])) for k in range(K)]
else:
L = [np.linalg.cholesky(self.W[k]) for k in range(K)]
res = [L[k] @ (model[k] - self.data[k]) for k in range(K)]
res = np.concatenate(res).ravel()
else:
if np.any(model_err > 0):
cov = self.data_cov + np.diag(model_err * model_err)
L = np.linalg.inv(np.linalg.cholesky(cov))
else:
if not scipy.sparse.issparse(self.W):
if self.W.ndim == 1 and self.W.dtype != object:
L = np.diag(np.sqrt(self.W))
elif self.W.ndim == 2 and self.W.dtype != object:
L = np.linalg.cholesky(self.W)
else:
raise ValueError(f"Case not implemented with self.W.ndim={self.W.ndim} "
f"and self.W.dtype={self.W.dtype}")
else:
if scipy.sparse.isspmatrix_dia(self.W):
L = self.W.sqrt()
else:
L = np.linalg.cholesky(self.W.toarray())
res = L @ (model - self.data)
return res
[docs]
def prepare_weight_matrices(self):
r"""Compute weight matrix :math:`\mathbf{W}` `self.W` as the inverse of data covariance matrix `self.data_cov`.
Cancel weights of data outliers given by `self.outliers`.
Examples
--------
1D case:
>>> w = FitWorkspace()
>>> w.data_cov = 2 * np.ones(4)
>>> w.prepare_weight_matrices()
>>> w.W
array([0.5, 0.5, 0.5, 0.5])
2D case:
>>> w = FitWorkspace()
>>> w.data = np.array([1,2,3])
>>> w.data_cov = np.diag([1,2,4])
>>> w.prepare_weight_matrices()
>>> w.W
array([[1. , 0. , 0. ],
[0. , 0.5 , 0. ],
[0. , 0. , 0.25]])
Add outliers:
>>> w.outliers = np.array([2])
>>> w.prepare_weight_matrices()
>>> w.W
array([[1. , 0. , 0. ],
[0. , 0.5, 0. ],
[0. , 0. , 0. ]])
Use sparse matrix:
>>> w.W = scipy.sparse.diags(np.ones(3))
>>> w.W.toarray()
array([[1., 0., 0.],
[0., 1., 0.],
[0., 0., 1.]])
>>> w.prepare_weight_matrices()
>>> w.W.getformat()
'dia'
>>> w.W.toarray()
array([[1., 0., 0.],
[0., 1., 0.],
[0., 0., 0.]])
"""
# Prepare covariance matrix for data
if self.data_cov is None:
self.data_cov = np.asarray(self.err.flatten() ** 2)
# Prepare inverse covariance matrix for data
if self.W is None:
if self.data_cov.ndim == 1 and self.data_cov.dtype != object:
self.W = 1 / self.data_cov
elif self.data_cov.ndim == 2 and self.data_cov.dtype != object:
self.W = np.linalg.inv(self.data_cov)
elif self.data_cov.dtype is object:
if self.data_cov[0].ndim == 1:
self.W = np.array([1 / self.data_cov[k] for k in range(self.data_cov.shape[0])])
else:
self.W = []
for k in range(len(self.data_cov)):
L = np.linalg.inv(np.linalg.cholesky(self.data_cov[k]))
self.W[k] = L.T @ L
self.W = np.asarray(self.W)
if len(self.outliers) > 0:
bad_indices = self.get_bad_indices()
if not scipy.sparse.issparse(self.W):
if self.W.ndim == 1 and self.W.dtype != object:
self.W[bad_indices] = 0
elif self.W.ndim == 2 and self.W.dtype != object:
self.W[:, bad_indices] = 0
self.W[bad_indices, :] = 0
elif self.W.dtype == object:
if self.data_cov[0].ndim == 1:
for k in range(len(self.W)):
self.W[k][bad_indices[k]] = 0
else:
for k in range(len(self.W)):
self.W[k][:, bad_indices[k]] = 0
self.W[k][bad_indices[k], :] = 0
else:
raise ValueError(
f"Data inverse covariance matrix must be a np.ndarray of dimension 1 or 2,"
f"either made of 1D or 2D arrays of equal lengths or not for block diagonal matrices."
f"\nHere W type is {type(self.W)}, shape is {self.W.shape} and W is {self.W}.")
else:
format = self.W.getformat()
if format == 'dia':
self.W.data[0, bad_indices] = 0
else:
W = self.W.tolil()
W[:, bad_indices] = 0
W[bad_indices, :] = 0
W = self.W.asformat(format='csr')
W.eliminate_zeros()
self.W = W.asformat(format=format)
[docs]
def compute_W_with_model_error(self, model_err):
"""Propagate model uncertainties to weight matrix W.
The method add the mode uncertainties in quadrature to the inverse of the weight matrix W
`self.data_cov` and re-invert it.
Parameters
----------
model_err: np.ndarray
Flat array of model uncertainties.
Returns
-------
W: array_like
Weight matrix with model uncertainties propagated.
Examples
--------
1D case:
>>> w = FitWorkspace()
>>> w.data_cov = 2 * np.ones(4)
>>> w.prepare_weight_matrices()
>>> w.compute_W_with_model_error(np.sqrt(2) * np.ones(4))
array([0.25, 0.25, 0.25, 0.25])
2D case:
>>> w = FitWorkspace()
>>> w.data = np.array([1,2,3])
>>> w.data_cov = np.diag([1,1,1])
>>> w.prepare_weight_matrices()
>>> w.compute_W_with_model_error(np.sqrt(3) * np.ones(3))
array([[0.25, 0. , 0. ],
[0. , 0.25, 0. ],
[0. , 0. , 0.25]])
Add outliers:
>>> w.outliers = np.array([2])
>>> w.prepare_weight_matrices()
>>> w.compute_W_with_model_error(np.sqrt(3) * np.ones(3))
array([[0.25, 0. , 0. ],
[0. , 0.25, 0. ],
[0. , 0. , 0. ]])
Use sparse matrix:
>>> w.W = scipy.sparse.diags(np.ones(3)).tocsr()
>>> w.W[-1, -1] = 0 # mask last data point
>>> w.W.toarray()
array([[1., 0., 0.],
[0., 1., 0.],
[0., 0., 0.]])
>>> w.compute_W_with_model_error(np.sqrt(3) * np.ones(3)).toarray()
array([[0.25, 0. , 0. ],
[0. , 0.25, 0. ],
[0. , 0. , 0. ]])
"""
if np.any(model_err > 0):
if not scipy.sparse.issparse(self.W):
zeros = self.W == 0
if self.W.ndim == 1 and self.W.dtype != object:
self.W = 1 / (self.data_cov + model_err * model_err)
elif self.W.dtype == object:
K = len(self.W)
if self.W[0].ndim == 1:
self.W = [1 / (self.data_cov[k] + model_err[k] * model_err[k]) for k in range(K)]
elif self.W[0].ndim == 2:
K = len(self.W)
cov = [self.data_cov[k] + np.diag(model_err[k] ** 2) for k in range(K)]
L = [np.linalg.inv(np.linalg.cholesky(cov[k])) for k in range(K)]
self.W = [L[k].T @ L[k] for k in range(K)]
else:
raise ValueError(f"First element of fitworkspace.W has no ndim attribute or has a dimension above 2. "
f"I get W[0]={self.W[0]}")
elif self.W.ndim == 2 and self.W.dtype != object:
cov = self.data_cov + np.diag(model_err * model_err)
self.W = np.linalg.inv(cov)
# needs to reapply the mask of outliers
self.W[zeros] = 0
else:
cov = self.data_cov + np.diag(model_err * model_err)
L = np.linalg.inv(np.linalg.cholesky(cov))
W = L.T @ L
# needs to reapply the mask of outliers
rows, cols = self.W.nonzero()
self.W = scipy.sparse.csr_matrix((W[rows, cols], (rows, cols)), dtype=self.W.dtype, shape=self.W.shape)
return self.W
[docs]
def chisq(self, p, model_output=False):
"""Compute the chi square for a set of model parameters p.
Four cases are implemented: diagonal W, 2D W, array of diagonal Ws, array of 2D Ws. The two latter cases
are for multiple independent data vectors with W being block diagonal.
Parameters
----------
p: array_like
The array of model parameters.
model_output: bool, optional
If true, the simulated model is output.
Returns
-------
chisq: float
The chi square value.
"""
# check data format
if (self.data.dtype != object and self.data.ndim > 1) or (self.err.dtype != object and self.err.ndim > 1):
raise ValueError("Fitworkspace.data and Fitworkspace.err must be a flat 1D array,"
" or an array of flat arrays of unequal lengths.")
# prepare weight matrices in case they have not been built before
self.prepare_weight_matrices()
x, model, model_err = self.simulate(*p)
W = self.compute_W_with_model_error(model_err)
if not scipy.sparse.issparse(self.W):
if W.ndim == 1 and W.dtype != object:
res = (model - self.data)
chisq = res @ (W * res)
elif W.dtype == object:
K = len(W)
res = [model[k] - self.data[k] for k in range(K)]
if W[0].ndim == 1:
chisq = np.sum([res[k] @ (W[k] * res[k]) for k in range(K)])
elif W[0].ndim == 2:
chisq = np.sum([res[k] @ W[k] @ res[k] for k in range(K)])
else:
raise ValueError(f"First element of fitworkspace.W has no ndim attribute or has a dimension above 2. "
f"I get W[0]={W[0]}")
elif W.ndim == 2 and W.dtype != object:
res = (model - self.data)
chisq = res @ W @ res
else:
raise ValueError(
f"Weight covariance matrix must be a np.ndarray of dimension 1 or 2 if not sparse,"
f"either made of 1D or 2D arrays of equal lengths or not for block diagonal matrices."
f"\nHere W type is {type(W)}, shape is {W.shape} and W is {W}.")
else:
res = (model - self.data)
chisq = res @ W @ res
if model_output:
return chisq, x, model, model_err
else:
return chisq
[docs]
def lnlike(self, p):
"""Compute the logarithmic likelihood for a set of model parameters p as -0.5*chisq.
Parameters
----------
p: array_like
The array of model parameters.
Returns
-------
lnlike: float
The logarithmic likelihood value.
"""
return -0.5 * self.chisq(p)
[docs]
def jacobian(self, params, model_input=None):
"""Generic function to compute the Jacobian matrix of a model, with numerical derivatives.
Parameters
----------
params: array_like
The array of model parameters.
model_input: array_like, optional
A model input as a list with (x, model, model_err) to avoid an additional call to simulate().
Returns
-------
J: np.array
The Jacobian matrix.
"""
if self.epsilon is None:
raise ValueError("Epsilon array must be set to use finite difference jacobian.")
if model_input:
x, model, model_err = model_input
else:
x, model, model_err = self.simulate(*params)
if self.W.dtype == object and self.W[0].ndim == 2:
J = [[] for _ in range(params.size)]
else:
model = model.flatten()
J = np.zeros((params.size, model.size))
for ip, p in enumerate(params):
if self.params.fixed[ip]:
continue
tmp_p = np.copy(params)
if tmp_p[ip] + self.epsilon[ip] < self.params.bounds[ip][0] or tmp_p[ip] + self.epsilon[ip] > self.params.bounds[ip][1]:
self.epsilon[ip] = - self.epsilon[ip]
tmp_p[ip] += self.epsilon[ip]
tmp_x, tmp_model, tmp_model_err = self.simulate(*tmp_p)
if self.W.dtype == object and self.W[0].ndim == 2:
for k in range(model.shape[0]):
J[ip].append((tmp_model[k] - model[k]) / self.epsilon[ip])
else:
J[ip] = (tmp_model.flatten() - model) / self.epsilon[ip]
return np.asarray(J)
[docs]
def hessian(self, params): # pragma: nocover
"""Experimental function to compute the hessian of a model.
Parameters
----------
params: array_like
The array of model parameters.
Returns
-------
"""
x, model, model_err = self.simulate(*params)
model = model.flatten()
J = self.jacobian(params)
H = np.zeros((params.size, params.size, model.size))
tmp_p = np.copy(params)
for ip, p1 in enumerate(params):
print(ip, p1, params[ip], tmp_p[ip], self.params.bounds[ip], self.epsilon[ip], tmp_p[ip] + self.epsilon[ip])
if self.params.fixed[ip]:
continue
if tmp_p[ip] + self.epsilon[ip] < self.params.bounds[ip][0] or tmp_p[ip] + self.epsilon[ip] > self.params.bounds[ip][1]:
self.epsilon[ip] = - self.epsilon[ip]
tmp_p[ip] += self.epsilon[ip]
print(tmp_p)
# tmp_x, tmp_model, tmp_model_err = self.simulate(*tmp_p)
# J[ip] = (tmp_model.flatten() - model) / self.epsilon[ip]
tmp_J = self.jacobian(tmp_p)
for ip, p1 in enumerate(params):
if self.params.fixed[ip]:
continue
for jp, p2 in enumerate(params):
if self.params.fixed[jp]:
continue
x, modelplus, model_err = self.simulate(params + self.epsilon)
x, modelmoins, model_err = self.simulate(params - self.epsilon)
model = model.flatten()
print("hh", ip, jp, tmp_J[ip], J[jp], tmp_p[ip], params, (tmp_J[ip] - J[jp]) / self.epsilon)
print((modelplus + modelmoins - 2 * model) / (np.asarray(self.epsilon) ** 2))
H[ip, jp] = (tmp_J[ip] - J[jp]) / self.epsilon
H[ip, jp] = (modelplus + modelmoins - 2 * model) / (np.asarray(self.epsilon) ** 2)
return H
def __post_fit__(self):
"""Method executed at the end of a minimisation process.
"""
pass
[docs]
def plot_gradient_descent(self):
fig, ax = plt.subplots(2, 1, figsize=(10, 6), sharex="all")
iterations = np.arange(self.params_table.shape[0])
ax[0].plot(iterations, self.costs, lw=2)
for ip in range(self.params_table.shape[1]):
ax[1].plot(iterations, self.params_table[:, ip], label=f"{self.params.axis_names[ip]}")
ax[1].set_yscale("symlog")
ax[1].legend(ncol=6, loc=9)
ax[1].grid()
ax[0].set_yscale("log")
ax[0].set_ylabel(r"$\chi^2$")
ax[1].set_ylabel("Parameters")
ax[0].grid()
ax[1].set_xlabel("Iterations")
ax[0].xaxis.set_major_locator(MaxNLocator(integer=True))
fig.tight_layout()
plt.subplots_adjust(wspace=0, hspace=0)
if parameters.SAVE and self.filename != "": # pragma: no cover
figname = os.path.splitext(self.filename)[0] + "_fitting.pdf"
self.my_logger.info(f"\n\tSave figure {figname}.")
fig.savefig(figname, dpi=100, bbox_inches='tight')
if parameters.DISPLAY: # pragma: no cover
plt.show()
if parameters.PdfPages: # args from the above? MFL
parameters.PdfPages.savefig()
self.simulate(*self.params.values)
self.live_fit = False
self.plot_fit()
[docs]
def save_gradient_descent(self):
iterations = np.arange(self.params_table.shape[0]).astype(int)
t = np.zeros((self.params_table.shape[1] + 2, self.params_table.shape[0]))
t[0] = iterations
t[1] = self.costs
t[2:] = self.params_table.T
h = 'iter,costs,' + ','.join(self.params.labels)
output_filename = os.path.splitext(self.filename)[0] + "_fitting.txt"
np.savetxt(output_filename, t.T, header=h, delimiter=",")
self.my_logger.info(f"\n\tSave gradient descent log {output_filename}.")
[docs]
def plot_triangle_fisher(self, nsamples=10000, max_params=8):
"""Plot a triangle (corner) plot of probability contours and distributions using Fisher matrix method.
This method generates samples from a multivariate normal distribution based on the parameter
covariance matrix and creates a corner plot showing 1D and 2D marginal distributions.
Parameters
----------
nsamples: int, optional
Number of samples to generate from the multivariate normal distribution (default: 10000).
max_params: int, optional
Maximum number of parameters to display. Only the first max_params free parameters
will be shown (default: 8).
Examples
--------
>>> from spectractor.fit.fitter import FitParameters, FitWorkspace
>>> params = FitParameters(values=[1, 2, 3], axis_names=["x", "y", "z"])
>>> params.cov = np.array([[1, -0.5, 0], [-0.5, 1, -1], [0, -1, 1]])
>>> w = FitWorkspace(params=params, filename="test.fits")
>>> w.plot_triangle_fisher(nsamples=1000, max_params=3)
Notes
-----
This method requires the `corner` package to be installed.
The plot is saved if parameters.SAVE is True with suffix '_triangle.pdf'.
"""
try:
import corner
except ImportError:
self.my_logger.warning("Package 'corner' not installed. Cannot create triangle plot. "
"Install it with: pip install corner")
return
# Get free parameters indices
ipar = self.params.get_free_parameters()
# Limit to max_params parameters
n_params = min(len(ipar), max_params)
ipar = ipar[:n_params]
if len(ipar) == 0:
self.my_logger.warning("No free parameters to plot.")
return
# Check that covariance matrix is available
if self.params.cov.size == 0:
self.my_logger.warning("Covariance matrix not available. Run fit first.")
return
# Extract mean values and covariance matrix for free parameters
mean_values = self.params.values[ipar]
cov_matrix = self.params.cov[:n_params, :n_params]
# Check if covariance matrix is positive definite
eigenvalues = np.linalg.eigvals(cov_matrix)
if np.any(eigenvalues <= 0):
self.my_logger.warning("Covariance matrix is not positive definite. "
"Cannot generate samples.")
return
# Generate samples from multivariate normal distribution
samples = np.random.multivariate_normal(mean_values, cov_matrix, size=nsamples)
# Get axis names for the selected parameters
axis_names = [self.params.axis_names[i] for i in ipar]
# Create corner plot
fig = corner.corner(samples, labels=axis_names,
quantiles=[0.16, 0.5, 0.84],
show_titles=True,
title_kwargs={"fontsize": 12},
truths=mean_values)
# Save figure if requested
if (parameters.SAVE or parameters.LSST_SAVEFIGPATH) and self.filename != "": # pragma: no cover
figname = os.path.splitext(self.filename)[0] + "_triangle.pdf"
self.my_logger.info(f"\n\tSave triangle plot {figname}.")
fig.savefig(figname, dpi=100, bbox_inches='tight')
if parameters.DISPLAY: # pragma: no cover
plt.show()
return fig
[docs]
def gradient_descent(fit_workspace, niter=10, xtol=1e-3, ftol=1e-3, with_line_search=True):
"""
Four cases are implemented: diagonal W, 2D W, array of diagonal Ws, array of 2D Ws. The two latter cases
are for multiple independent data vectors with W being block diagonal.
Parameters
----------
fit_workspace: FitWorkspace
niter
xtol
ftol
with_line_search
Returns
-------
"""
my_logger = set_logger(__name__)
tmp_params = np.copy(fit_workspace.params.values).astype(float)
fit_workspace.prepare_weight_matrices()
n_data_masked = len(fit_workspace.mask) + len(fit_workspace.outliers)
ipar = fit_workspace.params.get_free_parameters()
costs = []
params_table = []
inv_JT_W_J = np.zeros((len(ipar), len(ipar)), dtype=float)
start_total = time.time()
for i in range(niter):
start = time.time()
cost, tmp_lambdas, tmp_model, tmp_model_err = fit_workspace.chisq(tmp_params, model_output=True)
if i == 0 and fit_workspace.verbose:
my_logger.info(f"\n\tIteration={i}:\tinitial cost={cost:.5g}\tinitial chisq_red={cost / (tmp_model.size - n_data_masked):.5g}")
W = fit_workspace.compute_W_with_model_error(tmp_model_err)
# residuals
if (isinstance(W, np.ndarray) or scipy.sparse.issparse(W)) and W.dtype != object:
residuals = (tmp_model - fit_workspace.data).flatten()
elif isinstance(W, np.ndarray) and W.dtype == object:
residuals = [(tmp_model[k] - fit_workspace.data[k]) for k in range(len(W))]
else:
raise TypeError(f"Type of fit_workspace.W is {type(W)}. It must be a np.ndarray.")
# Jacobian
J = fit_workspace.jacobian(tmp_params, model_input=[tmp_lambdas, tmp_model, tmp_model_err])
# remove parameters with unexpected null Jacobian vectors or that are degenerated
J_vectors = [np.array(J[ip]).ravel() for ip in range(J.shape[0])]
J_norms = [np.linalg.norm(J_vectors[ip]) for ip in range(J.shape[0])]
for ip in range(J.shape[0]):
if ip not in ipar:
continue
# check for null vectors
if J_norms[ip] < np.sqrt(len(J_vectors[ip]))*np.finfo(np.float64).eps and len(np.where(J_vectors[ip]==0)[0]) > J_vectors[ip].size // 2:
ipar = np.delete(ipar, list(ipar).index(ip))
fit_workspace.params.fixed[ip] = True
my_logger.warning(
f"\n\tStep {i}: {fit_workspace.params.labels[ip]} has a null Jacobian; parameter is fixed "
f"at its last known current value ({tmp_params[ip]}) because |J[par]|={J_norms[ip]:.3e}<{np.sqrt(len(J_vectors[ip]))*np.finfo(np.float64).eps} with more than half values being zeros.")
continue
# check for degeneracies using Cauchy-Schwartz inequality; fix the second parameter
for jp in range(ip, J.shape[0]):
if ip == jp or jp not in ipar:
continue
inner = np.abs(J_vectors[ip] @ J_vectors[jp])
if np.abs(inner - J_norms[ip] * J_norms[jp]) < 1e-8 * inner:
ipar = np.delete(ipar, list(ipar).index(jp))
fit_workspace.params.fixed[jp] = True
my_logger.warning(
f"\n\tStep {i}: {fit_workspace.params.labels[jp]} is degenerated with {fit_workspace.params.labels[ip]}; "
f"parameter {fit_workspace.params.labels[jp]} is fixed at its last known current value ({tmp_params[jp]}).")
continue
# remove fixed and degenerated parameters; then transpose
J = J[ipar].T
# compute J.T @ W @ J matrix and invert it
if W.ndim == 1 and W.dtype != object:
JT_W = J.T * W
JT_W_J = JT_W @ J
elif W.ndim == 2 and W.dtype != object:
JT_W = J.T @ W
JT_W_J = JT_W @ J
else:
if W[0].ndim == 1:
JT_W = np.array([j for j in J]).T * np.concatenate(W).ravel()
JT_W_J = JT_W @ np.array([j for j in J])
else:
# warning ! here the data arrays indexed by k can have different lengths because outliers
# because W inverse covariance is block diagonal and blocks can have different sizes
# the philosophy is to temporarily flatten the data arrays
JT_W = [np.concatenate([J[ip][k].T @ W[k]
for k in range(W.shape[0])]).ravel()
for ip in range(len(J))]
JT_W_J = np.array([[JT_W[ip2] @ np.concatenate(J[ip1][:]).ravel() for ip1 in range(len(J))]
for ip2 in range(len(J))])
try:
L = np.linalg.inv(np.linalg.cholesky(JT_W_J)) # cholesky is too sensible to the numerical precision
inv_JT_W_J = L.T @ L
except np.linalg.LinAlgError:
inv_JT_W_J = np.linalg.inv(JT_W_J)
if fit_workspace.W.dtype != object:
JT_W_R0 = JT_W @ residuals
else:
JT_W_R0 = JT_W @ np.concatenate(residuals).ravel()
# Gauss-Newton step:
dparams = - inv_JT_W_J @ JT_W_R0
new_params = np.copy(tmp_params)
new_params[ipar] = tmp_params[ipar] + dparams
# check bounds
for ip, p in enumerate(new_params):
if p < fit_workspace.params.bounds[ip][0]:
new_params[ip] = fit_workspace.params.bounds[ip][0]
if p > fit_workspace.params.bounds[ip][1]:
new_params[ip] = fit_workspace.params.bounds[ip][1]
fval = fit_workspace.chisq(new_params)
if with_line_search or fval > (1 + 10 * ftol) * cost:
def line_search(alpha):
tmp_params_2 = np.copy(tmp_params)
tmp_params_2[ipar] = tmp_params[ipar] + alpha * dparams
for ipp, pp in enumerate(tmp_params_2):
if pp < fit_workspace.params.bounds[ipp][0]:
tmp_params_2[ipp] = fit_workspace.params.bounds[ipp][0]
if pp > fit_workspace.params.bounds[ipp][1]:
tmp_params_2[ipp] = fit_workspace.params.bounds[ipp][1]
return fit_workspace.chisq(tmp_params_2)
# tol parameter acts on alpha (not func)
alpha_min, fval, iter, funcalls = optimize.brent(line_search, full_output=True, tol=5e-1, brack=(0, 1))
new_params[ipar] = tmp_params[ipar] + alpha_min * dparams
# check bounds
for ip, p in enumerate(new_params):
if p < fit_workspace.params.bounds[ip][0]:
new_params[ip] = fit_workspace.params.bounds[ip][0]
if p > fit_workspace.params.bounds[ip][1]:
new_params[ip] = fit_workspace.params.bounds[ip][1]
else:
alpha_min = 1
funcalls = 0
iter = 0
tmp_params[ipar] = new_params[ipar]
# prepare outputs
costs.append(fval)
params_table.append(np.copy(tmp_params))
fit_workspace.params.values = tmp_params
if fit_workspace.verbose:
my_logger.info(f"\n\tIteration={i}:\tfinal cost={fval:.5g}\tfinal chisq_red={fval / (tmp_model.size - n_data_masked):.5g} "
f"\tcomputed in {time.time() - start:.2f}s"
f"\n\tNew parameters: {tmp_params[ipar]}")
my_logger.debug(f"\n\t Parameter shifts: {alpha_min * dparams}\n"
f"\n\t Line search: alpha_min={alpha_min:.3g} iter={iter} funcalls={funcalls}")
if fit_workspace.live_fit: # pragma: no cover
fit_workspace.simulate(*tmp_params)
fit_workspace.plot_fit()
fit_workspace.cov = inv_JT_W_J
# fit_workspace.params.plot_correlation_matrix(ipar)
if len(ipar) == 0:
my_logger.warning(f"\n\tGradient descent terminated in {i} iterations because all parameters "
f"have null Jacobian.")
break
if np.sum(np.abs(alpha_min * dparams)) / np.sum(np.abs(tmp_params[ipar])) < xtol:
my_logger.info(f"\n\tGradient descent terminated in {i} iterations because the sum of parameter shift "
f"relative to the sum of the parameters is below xtol={xtol}.")
break
if len(costs) > 1 and np.abs(costs[-2] - fval) / np.max([np.abs(fval), np.abs(costs[-2])]) < ftol:
my_logger.info(f"\n\tGradient descent terminated in {i} iterations because the "
f"relative change of cost is below ftol={ftol}.")
break
if time.time() - start > parameters.SPECTRACTOR_FIT_TIMEOUT_PER_ITER:
raise TimeoutError(f"Gradient descent iteration longer than {parameters.SPECTRACTOR_FIT_TIMEOUT_PER_ITER=}. "
f"Check data quality or increase parameters.SPECTRACTOR_FIT_TIMEOUT_PER_ITER in config file.")
if time.time() - start_total > parameters.SPECTRACTOR_FIT_TIMEOUT:
raise TimeoutError(f"Gradient descent longer than {parameters.SPECTRACTOR_FIT_TIMEOUT=}. "
f"Check data quality or increase parameters.SPECTRACTOR_FIT_TIMEOUT in config file.")
return tmp_params, inv_JT_W_J, np.array(costs), np.array(params_table)
[docs]
def simple_newton_minimisation(fit_workspace, niter=10, xtol=1e-3, ftol=1e-3): # pragma: no cover
"""Experimental function to minimize a function.
Parameters
----------
fit_workspace: FitWorkspace
niter
xtol
ftol
"""
my_logger = set_logger(__name__)
tmp_params = np.copy(fit_workspace.params.values)
ipar = fit_workspace.params.get_free_parameters()
funcs = []
params_table = []
inv_H = np.zeros((len(ipar), len(ipar)))
for i in range(niter):
start = time.time()
tmp_lambdas, tmp_model, tmp_model_err = fit_workspace.simulate(*tmp_params)
# if fit_workspace.live_fit:
# fit_workspace.plot_fit()
J = fit_workspace.jacobian(tmp_params)
# remove parameters with unexpected null Jacobian vectors
for ip in range(J.shape[0]):
if ip not in ipar:
continue
if np.all(J[ip] == np.zeros(J.shape[1])):
ipar = np.delete(ipar, list(ipar).index(ip))
# tmp_params[ip] = 0
my_logger.warning(
f"\n\tStep {i}: {fit_workspace.params.labels[ip]} has a null Jacobian; parameter is fixed "
f"at its last known current value ({tmp_params[ip]}).")
# remove fixed parameters
J = J[ipar].T
# hessian
H = fit_workspace.hessian(tmp_params)
try:
L = np.linalg.inv(np.linalg.cholesky(H)) # cholesky is too sensible to the numerical precision
inv_H = L.T @ L
except np.linalg.LinAlgError:
inv_H = np.linalg.inv(H)
dparams = - inv_H[:, :, 0] @ J[:, 0]
print("dparams", dparams, inv_H, J, H)
tmp_params[ipar] += dparams
# check bounds
print("tmp_params", tmp_params, dparams, inv_H, J)
for ip, p in enumerate(tmp_params):
if p < fit_workspace.params.bounds[ip][0]:
tmp_params[ip] = fit_workspace.params.bounds[ip][0]
if p > fit_workspace.params.bounds[ip][1]:
tmp_params[ip] = fit_workspace.params.bounds[ip][1]
tmp_lambdas, new_model, tmp_model_err = fit_workspace.simulate(*tmp_params)
new_func = new_model[0]
funcs.append(new_func)
r = np.log10(fit_workspace.regs)
js = [fit_workspace.jacobian(np.asarray([rr]))[0] for rr in np.array(r)]
plt.plot(r, js, label="J")
plt.grid()
plt.legend()
plt.show()
if parameters.DISPLAY:
fig = plt.figure()
plt.plot(r, js, label="prior")
mod = tmp_model + J[0] * (r - (tmp_params - dparams)[0])
plt.plot(r, mod)
plt.axvline(tmp_params)
plt.axhline(tmp_model)
plt.grid()
plt.legend()
plt.draw()
plt.pause(1e-8)
plt.close(fig)
# prepare outputs
params_table.append(np.copy(tmp_params))
if fit_workspace.verbose:
my_logger.info(f"\n\tIteration={i}: initial func={tmp_model[0]:.5g}"
f"\n\tParameter shifts: {dparams}"
f"\n\tNew parameters: {tmp_params[ipar]}"
f"\n\tFinal func={new_func:.5g}"
f" computed in {time.time() - start:.2f}s")
if fit_workspace.live_fit:
fit_workspace.simulate(*tmp_params)
fit_workspace.plot_fit()
fit_workspace.cov = inv_H[:, :, 0]
print("shape", fit_workspace.cov.shape)
# fit_workspace.params.plot_correlation_matrix(ipar)
if len(ipar) == 0:
my_logger.warning(f"\n\tGradient descent terminated in {i} iterations because all parameters "
f"have null Jacobian.")
break
if np.sum(np.abs(dparams)) / np.sum(np.abs(tmp_params[ipar])) < xtol:
my_logger.info(f"\n\tGradient descent terminated in {i} iterations because the sum of parameter shift "
f"relative to the sum of the parameters is below xtol={xtol}.")
break
if len(funcs) > 1 and np.abs(funcs[-2] - new_func) / np.max([np.abs(new_func), np.abs(funcs[-2])]) < ftol:
my_logger.info(f"\n\tGradient descent terminated in {i} iterations because the "
f"relative change of cost is below ftol={ftol}.")
break
plt.close()
return tmp_params, inv_H[:, :, 0], np.array(funcs), np.array(params_table)
[docs]
def run_gradient_descent(fit_workspace, xtol, ftol, niter, verbose=False, with_line_search=True):
if fit_workspace.costs.size == 0:
fit_workspace.costs = np.array([fit_workspace.chisq(fit_workspace.params.values)])
fit_workspace.params_table = np.array([fit_workspace.params.values])
p, cov, tmp_costs, tmp_params_table = gradient_descent(fit_workspace, niter=niter, xtol=xtol, ftol=ftol,
with_line_search=with_line_search)
fit_workspace.params.values, fit_workspace.params.cov = p, cov
fit_workspace.params_table = np.concatenate([fit_workspace.params_table, tmp_params_table])
fit_workspace.costs = np.concatenate([fit_workspace.costs, tmp_costs])
if verbose or fit_workspace.verbose:
fit_workspace.my_logger.info(f"\n{fit_workspace.params.print_parameters_summary()}")
if parameters.DEBUG and (verbose or fit_workspace.verbose):
fit_workspace.plot_gradient_descent()
if len(fit_workspace.params.get_free_parameters()) > 1:
fit_workspace.params.plot_correlation_matrix()
[docs]
def run_simple_newton_minimisation(fit_workspace, xtol=1e-8, ftol=1e-8, niter=50, verbose=False): # pragma: no cover
fit_workspace.values, fit_workspace.cov, funcs, params_table = simple_newton_minimisation(fit_workspace,
niter=niter,
xtol=xtol, ftol=ftol)
if verbose or fit_workspace.verbose:
fit_workspace.my_logger.info(f"\n{fit_workspace.params.print_parameters_summary()}")
if parameters.DEBUG and (verbose or fit_workspace.verbose):
fit_workspace.plot_gradient_descent()
if len(fit_workspace.params.get_free_parameters()) > 1:
fit_workspace.params.plot_correlation_matrix()
return params_table, funcs
[docs]
def run_minimisation(fit_workspace, method="newton", xtol=1e-4, ftol=1e-4, niter=100,
verbose=False, with_line_search=True, minimizer_method="L-BFGS-B"):
my_logger = set_logger(__name__)
bounds = fit_workspace.params.bounds
nll = lambda params: -fit_workspace.lnlike(params)
guess = fit_workspace.params.values.astype('float64')
if verbose:
my_logger.debug(f"\n\tStart guess: {guess}")
if method == "minimize":
start = time.time()
result = optimize.minimize(nll, fit_workspace.params.values, method=minimizer_method,
options={'maxiter': 100000}, bounds=bounds)
fit_workspace.params.values = result['x']
if verbose:
my_logger.debug(f"\n\t{result}")
my_logger.debug(f"\n\tMinimize: total computation time: {time.time() - start}s")
elif method == 'basinhopping':
start = time.time()
minimizer_kwargs = dict(method=minimizer_method, bounds=bounds)
result = optimize.basinhopping(nll, guess, minimizer_kwargs=minimizer_kwargs)
fit_workspace.params.values = result['x']
if verbose:
my_logger.debug(f"\n\t{result}")
my_logger.debug(f"\n\tBasin-hopping: total computation time: {time.time() - start}s")
elif method == "least_squares": # pragma: no cover
fit_workspace.my_logger.warning("least_squares might not work, use with caution... "
"or repair carefully the function weighted_residuals()")
start = time.time()
x_scale = np.abs(guess)
x_scale[x_scale == 0] = 0.1
p = optimize.least_squares(fit_workspace.weighted_residuals, guess, verbose=2, ftol=1e-6, #x_scale=x_scale,
diff_step=fit_workspace.epsilon, bounds=list(np.array(bounds).T))
fit_workspace.params.values = p.x
if verbose:
my_logger.debug(f"\n\t{p}")
my_logger.debug(f"\n\tLeast_squares: total computation time: {time.time() - start}s")
elif method == 'curve_fit':
def model(x, *params):
x, mod, mod_err = fit_workspace.simulate(*params)
return mod
def Dfun(x, *params):
return fit_workspace.jacobian(params, model_input=None).T
start = time.time()
result = optimize.curve_fit(model, fit_workspace.x, fit_workspace.data, jac=Dfun, x_scale='jac',
p0=fit_workspace.params.values, sigma=fit_workspace.err,
verbose=0, xtol=xtol, ftol=ftol,
bounds=list(np.array(bounds).T), absolute_sigma=True)
fit_workspace.params.values = result[0]
fit_workspace.params.cov = result[1]
if verbose:
my_logger.debug(f"\n\t{result}")
my_logger.debug(f"\n\tCurve_fit: total computation time: {time.time() - start}s")
if parameters.DEBUG:
fit_workspace.plot_fit()
elif method == "lm": # pragma: no cover
start = time.time()
x, cov, infodict, mesg, ier = optimize.leastsq(fit_workspace.weighted_residuals, guess,
ftol=ftol, xtol=xtol, full_output=True)
fit_workspace.params.values = x
fit_workspace.params.cov = cov
if verbose:
my_logger.debug(f"\n\t{x}")
my_logger.debug(f"\n\tLeast_squares: total computation time: {time.time() - start}s")
elif method == "newton":
start = time.time()
run_gradient_descent(fit_workspace, xtol=xtol, ftol=ftol, niter=niter, verbose=verbose,
with_line_search=with_line_search)
if verbose:
my_logger.debug(f"\n\tNewton: total computation time: {time.time() - start}s")
if fit_workspace.filename != "":
write_fitparameter_json(fit_workspace.params.json_filename, fit_workspace.params)
fit_workspace.save_gradient_descent()
fit_workspace.__post_fit__()
if verbose and parameters.DEBUG:
fit_workspace.plot_fit()
[docs]
def run_minimisation_sigma_clipping(fit_workspace, method="newton", xtol=1e-4, ftol=1e-4,
niter=100, sigma_clip=5.0, niter_clip=3, verbose=False, with_line_search=True):
my_logger = set_logger(__name__)
for step in range(niter_clip):
if verbose:
my_logger.info(f"\n\tSigma-clipping step {step}/{niter_clip} (sigma={sigma_clip})")
run_minimisation(fit_workspace, method=method, xtol=xtol, ftol=ftol, niter=niter,
with_line_search=with_line_search, verbose=verbose)
# remove outliers
if fit_workspace.data.dtype == object:
# indices_no_nan = ~np.isnan(np.concatenate(fit_workspace.data).ravel())
data = np.concatenate(fit_workspace.data).ravel() # [indices_no_nan]
model = np.concatenate(fit_workspace.model).ravel() # [indices_no_nan]
err = np.concatenate(fit_workspace.err).ravel() # [indices_no_nan]
model_err = np.concatenate(fit_workspace.model_err).ravel() # [indices_no_nan]
else:
# indices_no_nan = ~np.isnan(fit_workspace.data.flatten())
data = fit_workspace.data.flatten() # [indices_no_nan]
model = fit_workspace.model.flatten() # [indices_no_nan]
err = fit_workspace.err.flatten() # [indices_no_nan]
model_err = fit_workspace.model_err.flatten() # [indices_no_nan]
residuals = np.abs(data - model) / np.sqrt(err**2 + model_err**2)
outliers = residuals > sigma_clip
outliers = [i for i in range(data.size) if outliers[i]]
outliers.sort()
if len(outliers) > 0:
my_logger.debug(f'\n\tOutliers flat index list: {outliers}')
my_logger.info(f'\n\tOutliers: {len(outliers)} / {data.size - len(fit_workspace.mask)} data points '
f'({100 * len(outliers) / (data.size - len(fit_workspace.mask)):.2f}%) '
f'at more than {sigma_clip}-sigma from best-fit model.')
if np.all(fit_workspace.outliers == outliers):
my_logger.info(f'\n\tOutliers flat index list unchanged since last iteration: '
f'break the sigma clipping iterations.')
break
else:
fit_workspace.outliers = outliers
else:
my_logger.info(f'\n\tNo outliers detected at first iteration: break the sigma clipping iterations.')
break
[docs]
class RegFitWorkspace(FitWorkspace):
def __init__(self, w, opt_reg=parameters.PSF_FIT_REG_PARAM, verbose=False, live_fit=False):
"""
Parameters
----------
w: ChromaticPSFFitWorkspace, FullFowardModelFitWorkspace
FitWorkspace instance where to apply regularisation.
opt_reg: float
Input value for optimal regularisation parameter (default: parameters.PSF_FIT_REG_PARAM).
verbose: bool, optional
Level of verbosity (default: False).
live_fit: bool, optional
If True, model, data and residuals plots are made along the fitting procedure (default: False).
"""
params = FitParameters(np.asarray([np.log10(opt_reg)]), labels=["log10_reg"],
axis_names=[r"$\log_{10} r$"], fixed=None,
bounds=[(-20, np.log10(w.amplitude_priors.size) + 2)])
FitWorkspace.__init__(self, params, data=np.array([0]), err=np.array([1]),
epsilon=[1e-1], verbose=verbose, live_fit=live_fit)
self.w = w
self.opt_reg = opt_reg
self.resolution = np.zeros_like((self.w.amplitude_params.size, self.w.amplitude_params.size))
self.G = 0
self.chisquare = -1
[docs]
def print_regularisation_summary(self):
self.my_logger.info(f"\n\tOptimal regularisation parameter: {self.opt_reg}"
f"\n\tTr(R) = N_dof = {np.trace(self.resolution)}"
f"\n\tN_params = {len(self.w.amplitude_params)}"
f"\n\tN_data = {np.concatenate([self.w.data]).size - len(self.w.mask) - len(self.w.outliers)}"
f" (excluding masked pixels and outliers)")
[docs]
def simulate(self, log10_r):
reg = 10 ** log10_r
M_dot_W_dot_M_plus_Q = self.w.M_dot_W_dot_M + reg * self.w.Q
try:
L = np.linalg.inv(np.linalg.cholesky(M_dot_W_dot_M_plus_Q))
cov = L.T @ L
except np.linalg.LinAlgError:
cov = np.linalg.inv(M_dot_W_dot_M_plus_Q)
if self.w.W.dtype == object:
K = len(self.w.W)
M_dot_W = [self.w.M[k].T @ self.w.W[k] for k in range(K)]
M_dot_W_dot_D = np.sum([M_dot_W[k] @ self.w.data[k] for k in range(K)], axis=0)
A = cov @ (M_dot_W_dot_D + reg * self.w.Q_dot_A0)
diff = [self.w.data[k] - self.w.M[k] @ A for k in range(K)]
self.chisquare = np.sum([diff[k] @ self.w.W[k] @ diff[k] for k in range(K)])
else:
if self.w.W.ndim == 1:
A = cov @ (self.w.M.T @ (self.w.W * self.w.data) + reg * self.w.Q_dot_A0)
else:
A = cov @ (self.w.M.T @ (self.w.W @ self.w.data) + reg * self.w.Q_dot_A0)
if A.ndim == 2: # ndim == 2 when A comes from a sparse matrix computation
A = np.asarray(A).reshape(-1)
diff = self.w.data - self.w.M @ A
if self.w.W.ndim == 1:
self.chisquare = diff @ (self.w.W * diff)
else:
self.chisquare = diff @ self.w.W @ diff
self.resolution = np.eye(A.size) - reg * cov @ self.w.Q
self.w.amplitude_params = A
self.w.amplitude_cov_matrix = cov
self.w.amplitude_params_err = np.array([np.sqrt(np.abs(cov[x, x])) for x in range(cov.shape[0])])
self.G = self.chisquare / (np.concatenate([self.w.data]).size - len(self.w.mask) - len(self.w.outliers) - np.trace(self.resolution)) ** 2
return np.asarray([log10_r]), np.asarray([self.G]), np.zeros_like(self.data)
[docs]
def plot_fit(self):
log10_opt_reg = self.params.values[0]
regs = 10 ** np.linspace(min(-7, 0.9 * log10_opt_reg), max(3, 1.2 * log10_opt_reg), 50)
Gs = []
chisqs = []
resolutions = []
for r in regs:
self.simulate(np.log10(r))
Gs.append(self.G)
chisqs.append(self.chisquare)
resolutions.append(np.trace(self.resolution))
opt_reg = 10 ** log10_opt_reg
fig, ax = plt.subplots(3, 1, figsize=(7, 5), sharex="all")
ax[0].plot(regs, Gs)
ax[0].axvline(opt_reg, color="k")
ax[1].axvline(opt_reg, color="k")
ax[2].axvline(opt_reg, color="k")
ax[0].set_ylabel(r"$G(r)$")
ax[0].set_xlabel(r"Regularisation hyper-parameter $r$")
ax[0].grid()
ax[0].set_title(f"Optimal regularisation parameter: {opt_reg:.3g}")
ax[1].plot(regs, chisqs)
ax[1].set_ylabel(r"$\chi^2(\mathbf{A}(r) \vert \mathbf{\theta})$")
ax[1].set_xlabel(r"Regularisation hyper-parameter $r$")
ax[1].grid()
ax[1].set_xscale("log")
ax[2].set_xscale("log")
ax[2].plot(regs, resolutions)
ax[2].set_ylabel(r"$\mathrm{Tr}\,\mathbf{R}$")
ax[2].set_xlabel(r"Regularisation hyper-parameter $r$")
ax[2].grid()
# fig.tight_layout()
plt.subplots_adjust(hspace=0)
if parameters.DISPLAY:
plt.show()
if parameters.LSST_SAVEFIGPATH:
fig.savefig(os.path.join(parameters.LSST_SAVEFIGPATH, 'regularisation.pdf'))
fig = plt.figure(figsize=(7, 5))
rho = compute_correlation_matrix(self.w.amplitude_cov_matrix)
plot_correlation_matrix_simple(plt.gca(), rho, axis_names=[r'a'] * len(self.w.amplitude_params))
# ipar=np.arange(10, 20))
plt.gca().set_title(r"Correlation matrix $\mathbf{\rho}$")
fig.tight_layout()
if parameters.LSST_SAVEFIGPATH:
fig.savefig(os.path.join(parameters.LSST_SAVEFIGPATH, 'amplitude_correlation_matrix.pdf'))
if parameters.DISPLAY:
plt.show()
[docs]
def run_regularisation(self, Ndof=None):
run_minimisation(self, method="minimize", ftol=1e-4, xtol=1e-2, verbose=self.verbose,
minimizer_method="Nelder-Mead")
self.opt_reg = 10 ** self.params.values[0]
self.simulate(np.log10(self.opt_reg))
self.print_regularisation_summary()
if Ndof is not None:
r_Ndof = self.set_regularisation_with_Ndof(Ndof)
if self.opt_reg < 1e-3 * r_Ndof or self.opt_reg > 1e3 * r_Ndof:
self.my_logger.warning(f"\n\tRegularisation parameter r minimizing G(r) is 3 orders of magnitude away "
f"from optimal regularisation parameter {r_Ndof} using {Ndof=}. "
f"Probably that the model does not fit well data at this stage. "
f"Switch to optimal parameter.")
self.opt_reg = r_Ndof
self.simulate(np.log10(self.opt_reg))
self.print_regularisation_summary()
[docs]
def set_regularisation_with_Ndof(self, Ndof):
"""Find regularisation parameter $r$ that checks $Tr(R) = Ndof$.
Parameters
----------
Ndof: float
Number of degrees of freedom, ie $Tr(R)$.
Returns
-------
r: float
Regularisation parameter.
"""
log10_opt_reg = self.params.values[0]
regs = 10 ** np.linspace(min(-7, 0.9 * log10_opt_reg), max(3, 1.2 * log10_opt_reg), 50)
Gs = np.zeros_like(regs)
chisqs = np.zeros_like(regs)
resolutions = np.zeros_like(regs)
for ir, r in enumerate(regs):
self.simulate(np.log10(r))
Gs[ir] = self.G
chisqs[ir] = self.chisquare
resolutions[ir] = np.trace(self.resolution)
Ndof_index = np.argmin(np.abs(resolutions - Ndof))
return regs[Ndof_index]
if __name__ == "__main__":
import doctest
doctest.testmod()