Source code for spectractor.fit.fitter

from scipy import optimize
import time
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import numpy as np
import scipy
import os
import json
from dataclasses import dataclass
from typing import Optional, Union

from spectractor import parameters
from spectractor.config import set_logger
from spectractor.tools import (formatting_numbers, compute_correlation_matrix, plot_correlation_matrix_simple,
                               NumpyArrayEncoder)

from lsst.utils.threads import disable_implicit_threading
disable_implicit_threading()


[docs] @dataclass() class FitParameter: """Container for a parameter to fit on data with FitWorkspace. Attributes ---------- value: float Array containing the parameter values. label: str Parameter labels for screen print. axis_name: str Parameter labels for plot print. bounds: list 2-element list giving the (lower, upper) bounds for the parameter. fixed: bool Boolean indicating whether the parameter is fixed. err: float Uncertainty on parameter value. truth: float, optional Truth value. Examples -------- >>> from spectractor.fit.fitter import FitParameter >>> p = FitParameter(value=1, label="x", axis_name="$x$", bounds=[0, 1], fixed=False, err=0.1, truth=1) >>> p x: 1.0 +/- 0.1 (truth=1) >>> p = FitParameter(value=0.01234, label="t", axis_name="$t$", bounds=[-1, 1], fixed=False, err=0.02) >>> p t: 0.01 +/- 0.02 >>> p = FitParameter(value=1, label="x", axis_name="$x$", bounds=[0, 1], fixed=True, err=0) >>> p x: 1 (fixed) """ value: float label: str axis_name: str bounds: list fixed: bool err: float truth: Optional[float] = None def __repr__(self): """Print the parameter on screen. """ txt = "" if self.fixed: txt += f"{self.label}: {self.value} (fixed)" else: if self.err != 0: val, err, _ = formatting_numbers(self.value, self.err, self.err) txt += f"{self.label}: {val} +/- {err}" else: txt += f"{self.label}: {self.value} +/- {self.err}" if self.truth: txt += f" (truth={self.truth})" return txt
[docs] @dataclass() class FitParameters: """Container for the parameters to fit on data with FitWorkspace. Attributes ---------- values: np.ndarray Array containing the parameter values. labels: list, optional List of the parameter labels for screen print. If None, make a default list with parameters labelled par (default: None). axis_names: list, optional List of the parameter labels for plot print. If None, make a default list with parameters labelled par (default: None). bounds: list, optional List of 2-element list giving the (lower, upper) bounds for every parameter. If None, make a default list with np.infinity boundaries (default: None). fixed: list, optional List of boolean: True to fix a parameter, False to let it free. If None, make a default list with False values: all parameters are free (default: None). truth: np.ndarray, optional Array of truth parameters (default: None). filename: str, optional File name associated to the fitted parameters (usually _spectrum.fits file name) (default: ''). Examples -------- >>> from spectractor.fit.fitter import FitParameters >>> params = FitParameters(values=[1, 1, 1, 1, 1]) >>> params.ndim 5 >>> params.values array([1., 1., 1., 1., 1.]) >>> params.labels ['par0', 'par1', 'par2', 'par3', 'par4'] >>> params.bounds [[-inf, inf], [-inf, inf], [-inf, inf], [-inf, inf], [-inf, inf]] """ values: Union[np.ndarray, list] labels: Optional[list] = None axis_names: Optional[list] = None bounds: Optional[list] = None fixed: Optional[list] = None truth: Optional[list] = None filename: Optional[str] = "" extra: Optional[dict] = None def __post_init__(self): if type(self.values) is list: self.values = np.array(self.values, dtype=float) self.values = np.asarray(self.values, dtype=float) if not self.labels: self.labels = [f"par{k}" for k in range(self.ndim)] else: if len(self.labels) != self.ndim: raise ValueError("labels argument must have same size as values argument.") if not self.axis_names: self.axis_names = [f"$p_{k}$" for k in range(self.ndim)] else: if len(self.axis_names) != self.ndim: raise ValueError("labels argument must have same size as values argument.") if self.bounds is None: self.bounds = [[-np.inf, np.inf]] * self.ndim else: if np.array(self.bounds).shape != (self.ndim, 2): raise ValueError(f"bounds argument size {np.array(self.bounds).shape} must be same as values argument {(self.ndim, 2)}.") if not self.fixed: self.fixed = [False] * self.ndim else: if len(list(self.fixed)) != self.ndim: raise ValueError("fixed argument must have same size as values argument.") self.cov = np.zeros((self.nfree, self.nfree)) def __getitem__(self, label): """Get parameter value given its label. Parameters ---------- label: str The parameter label. Returns ------- value: float The parameter value. Examples -------- >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> params["z"] 3.0 """ index = self.get_index(label=label) return self.values[index] def __len__(self): """Length of the parameter array, equivalent to self.ndim. Examples -------- >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> len(params) 4 >>> len(params) == params.ndim True """ return len(self.values) def __eq__(self, other): """Test parameter instances equality. Examples -------- >>> p1 = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> p2 = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> p1 == p2 True >>> p3 = FitParameters(values=[1, 2, 3, 4.1], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> p1 == p3 False >>> p4 = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[False, False, True, False]) >>> p1 == p4 False """ if not isinstance(other, FitParameters): return NotImplemented out = True for key in self.__dict__.keys(): if isinstance(getattr(self, key), np.ndarray): if len(getattr(self, key).flatten()) == len(getattr(other, key).flatten()): out *= np.all(np.equal(getattr(self, key).flatten(), getattr(other, key).flatten())) else: # if fixed parameters are not equal, covariance matrices have not the same shape # and multiplication above is forbidden out = False else: out *= getattr(self, key) == getattr(other, key) return out def __repr__(self): """Print the best fitting parameters on screen. Labels are from self.labels. Examples -------- >>> parameters.VERBOSE = True >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> params.cov = np.array([[1, -0.5], [-0.5, 4]]) >>> params x: 1.0 (fixed=True) y: 2 +1 -1 bounds=[-inf, inf] z: 3.0 (fixed=True) t: 4 +2 -2 bounds=[-inf, inf] """ return self.print_parameters_summary()
[docs] def get_index(self, label): """Get parameter index given its label. Parameters ---------- label: str The parameter label. Returns ------- index: int The parameter index. Examples -------- >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> params.get_index("z") 2 """ if label in self.labels: index = self.labels.index(label) return index else: raise KeyError(f"{label=} not in FitParameters.labels ({self.labels=}).")
[docs] def set(self, label, value): """Set value parameter given its label. Parameters ---------- label: str The parameter label. value: float The new parameter value. Examples -------- >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> params["z"] 3.0 >>> params.set("z", 0) >>> params["z"] 0.0 """ key = self.get_index(label) self.values[key] = value
@property def rho(self): """Correlation matrix computed from the covariance matrix Returns ------- rho: np.ndarray The correlation matrix array. Examples -------- >>> from spectractor.fit.fitter import FitParameters >>> params = FitParameters(values=[1, 1, 1], axis_names=["x", "y", "z"]) >>> params.cov = np.array([[2,-0.5,0],[-0.5,2,-1],[0,-1,2]]) >>> params.rho array([[ 1. , -0.25, 0. ], [-0.25, 1. , -0.5 ], [ 0. , -0.5 , 1. ]]) """ return compute_correlation_matrix(self.cov) @property def err(self): """Uncertainties on fitted parameters, as the square root of the covariance matrix diagonal. Returns ------- err: np.ndarray The uncertainty array. Examples ------- >>> from spectractor.fit.fitter import FitParameters >>> import numpy as np >>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[False, True, False, True, False]) >>> params.cov = np.array([[1,-0.5,0],[-0.5,4,-1],[0,-1,9]]) >>> params.err array([1., 0., 2., 0., 3.]) """ err = np.zeros_like(self.values, dtype=float) if np.sum(self.fixed) != len(self.fixed): err[~np.asarray(self.fixed)] = np.sqrt(np.diag(self.cov)) return err @property def ndim(self): """Number of parameters. Returns ------- ndim: int Examples -------- >>> from spectractor.fit.fitter import FitParameters >>> params = FitParameters(values=[1, 1, 1, 1, 1]) >>> params.ndim 5 """ return len(self.values) @property def nfree(self): """Number of free parameters. Returns ------- nfree: int Examples -------- >>> from spectractor.fit.fitter import FitParameters >>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[True, False, True, False, True]) >>> params.nfree 2 """ return len(self.get_free_parameters()) @property def nfixed(self): """Number of fixed parameters. Returns ------- nfixed: int Examples -------- >>> from spectractor.fit.fitter import FitParameters >>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[True, False, True, False, True]) >>> params.nfixed 3 """ return len(self.get_fixed_parameters())
[docs] def get_free_parameters(self): """Return indices array of free parameters. Examples -------- >>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=None) >>> params.fixed [False, False, False, False, False] >>> params.get_free_parameters() array([0, 1, 2, 3, 4]) >>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[True, False, True, False, True]) >>> params.fixed [True, False, True, False, True] >>> params.get_free_parameters() array([1, 3]) """ return np.array(np.where(np.array(self.fixed).astype(int) == 0)[0])
[docs] def get_fixed_parameters(self): """Return indices array of fixed parameters. Examples -------- >>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=None) >>> params.fixed [False, False, False, False, False] >>> params.get_fixed_parameters() array([], dtype=int64) >>> params = FitParameters(values=[1, 1, 1, 1, 1], fixed=[True, False, True, False, True]) >>> params.fixed [True, False, True, False, True] >>> params.get_fixed_parameters() array([0, 2, 4]) """ return np.array(np.where(np.array(self.fixed).astype(int) == 1)[0])
[docs] def print_parameters_summary(self): """Build a string with the best fitting parameters to display on screen. Labels are from self.labels. Returns ------- txt: str The printed text. Examples -------- >>> parameters.VERBOSE = True >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> params.cov = np.array([[1, -0.5], [-0.5, 4]]) >>> _ = params.print_parameters_summary() """ txt = "" ifree = self.get_free_parameters() icov = 0 for ip in range(self.ndim): if ip in ifree and np.abs(self.cov[icov, icov]) != 0.: txt += "%s: %s +%s -%s" % formatting_numbers(self.values[ip], np.sqrt(np.abs(self.cov[icov, icov])), np.sqrt(np.abs(self.cov[icov, icov])), label=self.labels[ip]) txt += f" bounds={self.bounds[ip]}" icov += 1 else: txt += f"{self.labels[ip]}: {self.values[ip]} (fixed={self.fixed[ip]})" if ip != self.ndim-1: txt += "\n" return txt
[docs] def get_parameter(self, key): """Return a FitParameter instance. key argument can be the parameter label or its index value. Parameters ---------- key: str, int The parameter key. Returns ------- param: FitParameter A FitParameter instance containing all information about a parameter. Examples -------- >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False]) >>> params.cov = np.array([[1, -0.5], [-0.5, 4]]) >>> p3 = params.get_parameter("t") >>> p3 t: 4 +/- 2 >>> p0 = params.get_parameter(0) >>> p0 x: 1.0 (fixed) """ if type(key) is str: index = self.get_index(key) else: index = key if self.truth is None: truth = None else: truth = self.truth[index] p = FitParameter(value=self.values[index], label=self.labels[index], axis_name=self.axis_names[index], bounds=self.bounds[index], fixed=self.fixed[index], err=self.err[index], truth=truth) return p
[docs] def plot_correlation_matrix(self, live_fit=False): """Compute and plot a correlation matrix. Save the plot if parameters.SAVE is True. The output file name is build from self.file_name, adding the suffix _correlation.pdf. Parameters ---------- live_fit: bool, optional, optional If True, model, data and residuals plots are made along the fitting procedure (default: False). Examples -------- >>> from spectractor.fit.fitter import FitParameters >>> params = FitParameters(values=[1, 1, 1], axis_names=["x", "y", "z"]) >>> params.cov = np.array([[1,-0.5,0],[-0.5,1,-1],[0,-1,1]]) >>> params.plot_correlation_matrix() """ ipar = self.get_free_parameters() fig = plt.figure() plot_correlation_matrix_simple(plt.gca(), self.rho, axis_names=[self.axis_names[i] for i in ipar]) fig.tight_layout() if (parameters.SAVE or parameters.LSST_SAVEFIGPATH) and self.filename != "": # pragma: no cover figname = os.path.splitext(self.filename)[0] + "_correlation.pdf" fig.savefig(figname, dpi=100, bbox_inches='tight') if parameters.LSST_SAVEFIGPATH: # pragma: no cover figname = os.path.join(parameters.LSST_SAVEFIGPATH, "parameters_correlation.pdf") fig.savefig(figname, dpi=100, bbox_inches='tight') if parameters.DISPLAY: # pragma: no cover if live_fit: plt.draw() plt.pause(1e-8) else: plt.show()
@property def txt_filename(self): return os.path.splitext(self.filename)[0] + "_bestfit.txt" @property def json_filename(self): return os.path.splitext(self.filename)[0] + "_bestfit.json"
[docs] def write_text(self, header=""): """Save the best fitting parameter summary in a text file. The file name is build from self.file_name, adding the suffix _bestfit.txt. Parameters ---------- header: str, optional A header to add to the file (default: ""). Examples -------- >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False], filename="test_spectrum.fits") >>> params.cov = np.array([[1,-0.5,0],[-0.5,1,-1],[0,-1,1]]) >>> params.write_text(header="chi2: 1") .. doctest:: :hide: >>> assert os.path.isfile(params.txt_filename) >>> os.remove(params.txt_filename) """ txt = self.filename + "\n" if header != "": txt += header + "\n" txt += self.print_parameters_summary() for row in self.cov: txt += np.array_str(row, max_line_width=20 * self.cov.shape[0]) + '\n' output_filename = os.path.splitext(self.filename)[0] + "_bestfit.txt" f = open(output_filename, 'w') f.write(txt) f.close()
[docs] def write_json(self): pass
[docs] def write_fitparameter_json(json_filename, params, extra=None): """Save FitParameters attributes as a JSON file. Parameters ---------- json_filename: str JSON file name. params: FitParameters A FitParameters instance to save in JSON json_filename. extra: dict, optional Extra information to write in the JSON file. Returns ------- jsontxt: str The JSON dictionnary as string. Examples -------- >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False], filename="test_spectrum.fits") >>> params.cov = np.array([[1,-0.5,0],[-0.5,1,-1],[0,-1,1]]) >>> jsonstr = write_fitparameter_json(params.json_filename, params, extra={"chi2": 1}) >>> jsonstr # doctest: +ELLIPSIS '{"values": [1.0, 2.0, 3.0, 4.0], "labels": ["x", "y", "z", "t"],..."extra": {"chi2": 1}... .. doctest:: :hide: >>> assert os.path.isfile(params.json_filename) >>> os.remove(params.json_filename) """ if json_filename == "": raise ValueError("Must provide attribute a JSON filename.") if extra: params.extra = extra jsontxt = json.dumps(params.__dict__, cls=NumpyArrayEncoder) with open(json_filename, 'w') as output_json: output_json.write(jsontxt) return jsontxt
[docs] def read_fitparameter_json(json_filename): """Read JSON file and store data in FitParameters instance. Parameters ---------- json_filename: str The JSON file name. Returns ------- params: FitParameters A FitParameters instance to loaded from JSON json_filename. Examples -------- >>> params = FitParameters(values=[1, 2, 3, 4], labels=["x", "y", "z", "t"], fixed=[True, False, True, False], filename="test_spectrum.fits") >>> params.cov = np.array([[1,-0.5,0],[-0.5,1,-1],[0,-1,1]]) >>> _ = write_fitparameter_json(params.json_filename, params, extra={"chi2": 1}) >>> new_params = read_fitparameter_json(params.json_filename) >>> new_params.values array([1., 2., 3., 4.]) .. doctest:: :hide: >>> assert os.path.isfile(params.json_filename) >>> assert params == new_params >>> os.remove(params.json_filename) """ params = FitParameters(values=[0]) with open(json_filename, 'r') as f: data = json.load(f) for key in ["values", "cov"]: data[key] = np.asarray(data[key]) for key in data: setattr(params, key, data[key]) return params
[docs] class FitWorkspace: def __init__(self, params=None, data=None, err=None, data_cov=None, epsilon=None, file_name="", verbose=False, plot=False, live_fit=False, truth=None): """Generic class to create a fit workspace with parameters, bounds and general fitting methods. Parameters ---------- params: FitParameters, optional The parameters to fit to data (default: None). data: np.ndarray, optional Data array to fit with simulate() method (default: None). err: np.ndarray, optional Uncertainty array for data (default: None). data_cov: np.ndarray, optional Covariance matrix for data (default: None). epsilon: float, List, tuple, np.ndarray, optional Parameter variations for finite difference jacobian approximation (default: None). file_name: str, optional The generic file name to save results. If file_name=="", nothing is saved ond disk (default: ""). verbose: bool, optional Level of verbosity (default: False). plot: bool, optional Level of plotting (default: False). live_fit: bool, optional If True, model, data and residuals plots are made along the fitting procedure (default: False). truth: array_like, optional Array of true parameters (default: None). Examples -------- >>> params = FitParameters(values=[1, 1, 1, 1, 1]) >>> w = FitWorkspace(params, data=None) >>> w.params.ndim 5 """ self.my_logger = set_logger(self.__class__.__name__) self.params = params self.epsilon = epsilon if self.epsilon is not None: self.set_epsilon() self.filename = file_name self.truth = truth self.verbose = verbose self.plot = plot self.live_fit = live_fit self.data = data self.data_cov = data_cov self.err = err if (err is not None and data_cov is not None): raise ValueError("Either err or data_cov must be specified.") if err is not None: self.err = err self.data_cov = np.asarray(self.err.flatten() ** 2) if data_cov is not None: self.data_cov = data_cov self.err = np.sqrt(np.diag(data_cov)) self.outliers = [] self.mask = [] self.W = None if self.data_cov is not None or self.err is not None: self.prepare_weight_matrices() self.model = None self.model_err = None self.model_noconv = None self.params_table = None self.costs = np.array([[]])
[docs] def set_epsilon(self): if type(self.epsilon) is float: tmp = self.epsilon * self.params.values tmp[tmp == 0] = self.epsilon self.epsilon = tmp if self.epsilon is not None and len(self.epsilon) != len(self.params.values): raise ValueError(f"Finite difference epsilon array has length {len(self.epsilon)} " f"but parameter array length is {len(self.params.values)}. " f"Check your FitWorkspace initialization.")
[docs] def get_bad_indices(self): """List of indices that are outliers rejected by a sigma-clipping method or other masking method. Returns ------- outliers: list Examples -------- >>> w = FitWorkspace() >>> w.data = np.array([np.array([1,2,3]), np.array([1,2,3,4])], dtype=object) >>> w.outliers = [2, 6] >>> w.get_bad_indices() [array([2]), array([3])] """ bad_indices = np.asarray(self.outliers, dtype=int) if self.data.dtype == object: if len(self.outliers) > 0: bad_indices = [] start_index = 0 for k in range(self.data.shape[0]): mask = np.zeros(self.data[k].size, dtype=bool) outliers = np.asarray(self.outliers)[np.logical_and(np.asarray(self.outliers) > start_index, np.asarray(self.outliers) < start_index + self.data[k].size)] mask[outliers - start_index] = True bad_indices.append(np.arange(self.data[k].size)[mask]) start_index += self.data[k].size else: bad_indices = [[] for _ in range(self.data.shape[0])] return bad_indices
[docs] def simulate(self, *p): """Compute the model prediction given a set of parameters. Parameters ---------- p: array_like Array of parameters for the computation of the model. Returns ------- x: array_like The abscisse of the model prediction. model: array_like The model prediction. model_err: array_like The uncertainty on the model prediction. Examples -------- >>> w = FitWorkspace() >>> p = np.zeros(3) >>> x, model, model_err = w.simulate(*p) .. doctest:: :hide: >>> assert x is not None """ self.x = np.array([]) self.model = np.array([]) self.model_err = np.array([]) return self.x, self.model, self.model_err
[docs] def plot_fit(self): # pragma: no cover """Generic function to plot the result of the fit for 1D curves. Returns ------- fig: plt.FigureClass The figure. """ fig = plt.figure() plt.errorbar(self.x, self.data, yerr=self.err, fmt='ko', label='Data') if self.truth is not None: x, truth, truth_err = self.simulate(*self.truth) plt.plot(self.x, truth, label="Truth") plt.plot(self.x, self.model, label='Best fitting model') plt.xlabel('$x$') plt.ylabel('$y$') title = "" for i, label in enumerate(self.params.labels): if self.params.cov.size > 0: err = np.sqrt(self.params.cov[i, i]) formatting_numbers(self.params.values[i], err, err) _, par, err, _ = formatting_numbers(self.params.values[i], err, err, label=label) title += rf"{label} = {par} $\pm$ {err}" else: title += f"{label} = {self.params.values[i]:.3g}" if i < self.params.ndim - 1: title += ", " plt.title(title) plt.legend() plt.grid() if parameters.DISPLAY: # pragma: no cover plt.show() return fig
[docs] def weighted_residuals(self, p): # pragma: nocover """Compute the weighted residuals array for a set of model parameters p. Parameters ---------- p: array_like The array of model parameters. Returns ------- residuals: np.array The array of weighted residuals. """ x, model, model_err = self.simulate(*p) if self.data_cov is None: if len(self.outliers) > 0: model_err = model_err.flatten() err = self.err.flatten() res = (model.flatten() - self.data.flatten()) / np.sqrt(model_err * model_err + err * err) else: res = ((model - self.data) / np.sqrt(model_err * model_err + self.err * self.err)).flatten() else: if self.data_cov.ndim > 2: K = self.data_cov.shape[0] if np.any(model_err > 0): cov = [self.data_cov[k] + np.diag(model_err[k] ** 2) for k in range(K)] L = [np.linalg.inv(np.linalg.cholesky(cov[k])) for k in range(K)] else: L = [np.linalg.cholesky(self.W[k]) for k in range(K)] res = [L[k] @ (model[k] - self.data[k]) for k in range(K)] res = np.concatenate(res).ravel() else: if np.any(model_err > 0): cov = self.data_cov + np.diag(model_err * model_err) L = np.linalg.inv(np.linalg.cholesky(cov)) else: if not scipy.sparse.issparse(self.W): if self.W.ndim == 1 and self.W.dtype != object: L = np.diag(np.sqrt(self.W)) elif self.W.ndim == 2 and self.W.dtype != object: L = np.linalg.cholesky(self.W) else: raise ValueError(f"Case not implemented with self.W.ndim={self.W.ndim} " f"and self.W.dtype={self.W.dtype}") else: if scipy.sparse.isspmatrix_dia(self.W): L = self.W.sqrt() else: L = np.linalg.cholesky(self.W.toarray()) res = L @ (model - self.data) return res
[docs] def prepare_weight_matrices(self): r"""Compute weight matrix :math:`\mathbf{W}` `self.W` as the inverse of data covariance matrix `self.data_cov`. Cancel weights of data outliers given by `self.outliers`. Examples -------- 1D case: >>> w = FitWorkspace() >>> w.data_cov = 2 * np.ones(4) >>> w.prepare_weight_matrices() >>> w.W array([0.5, 0.5, 0.5, 0.5]) 2D case: >>> w = FitWorkspace() >>> w.data = np.array([1,2,3]) >>> w.data_cov = np.diag([1,2,4]) >>> w.prepare_weight_matrices() >>> w.W array([[1. , 0. , 0. ], [0. , 0.5 , 0. ], [0. , 0. , 0.25]]) Add outliers: >>> w.outliers = np.array([2]) >>> w.prepare_weight_matrices() >>> w.W array([[1. , 0. , 0. ], [0. , 0.5, 0. ], [0. , 0. , 0. ]]) Use sparse matrix: >>> w.W = scipy.sparse.diags(np.ones(3)) >>> w.W.toarray() array([[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]]) >>> w.prepare_weight_matrices() >>> w.W.getformat() 'dia' >>> w.W.toarray() array([[1., 0., 0.], [0., 1., 0.], [0., 0., 0.]]) """ # Prepare covariance matrix for data if self.data_cov is None: self.data_cov = np.asarray(self.err.flatten() ** 2) # Prepare inverse covariance matrix for data if self.W is None: if self.data_cov.ndim == 1 and self.data_cov.dtype != object: self.W = 1 / self.data_cov elif self.data_cov.ndim == 2 and self.data_cov.dtype != object: self.W = np.linalg.inv(self.data_cov) elif self.data_cov.dtype is object: if self.data_cov[0].ndim == 1: self.W = np.array([1 / self.data_cov[k] for k in range(self.data_cov.shape[0])]) else: self.W = [] for k in range(len(self.data_cov)): L = np.linalg.inv(np.linalg.cholesky(self.data_cov[k])) self.W[k] = L.T @ L self.W = np.asarray(self.W) if len(self.outliers) > 0: bad_indices = self.get_bad_indices() if not scipy.sparse.issparse(self.W): if self.W.ndim == 1 and self.W.dtype != object: self.W[bad_indices] = 0 elif self.W.ndim == 2 and self.W.dtype != object: self.W[:, bad_indices] = 0 self.W[bad_indices, :] = 0 elif self.W.dtype == object: if self.data_cov[0].ndim == 1: for k in range(len(self.W)): self.W[k][bad_indices[k]] = 0 else: for k in range(len(self.W)): self.W[k][:, bad_indices[k]] = 0 self.W[k][bad_indices[k], :] = 0 else: raise ValueError( f"Data inverse covariance matrix must be a np.ndarray of dimension 1 or 2," f"either made of 1D or 2D arrays of equal lengths or not for block diagonal matrices." f"\nHere W type is {type(self.W)}, shape is {self.W.shape} and W is {self.W}.") else: format = self.W.getformat() if format == 'dia': self.W.data[0, bad_indices] = 0 else: W = self.W.tolil() W[:, bad_indices] = 0 W[bad_indices, :] = 0 W = self.W.asformat(format='csr') W.eliminate_zeros() self.W = W.asformat(format=format)
[docs] def compute_W_with_model_error(self, model_err): """Propagate model uncertainties to weight matrix W. The method add the mode uncertainties in quadrature to the inverse of the weight matrix W `self.data_cov` and re-invert it. Parameters ---------- model_err: np.ndarray Flat array of model uncertainties. Returns ------- W: array_like Weight matrix with model uncertainties propagated. Examples -------- 1D case: >>> w = FitWorkspace() >>> w.data_cov = 2 * np.ones(4) >>> w.prepare_weight_matrices() >>> w.compute_W_with_model_error(np.sqrt(2) * np.ones(4)) array([0.25, 0.25, 0.25, 0.25]) 2D case: >>> w = FitWorkspace() >>> w.data = np.array([1,2,3]) >>> w.data_cov = np.diag([1,1,1]) >>> w.prepare_weight_matrices() >>> w.compute_W_with_model_error(np.sqrt(3) * np.ones(3)) array([[0.25, 0. , 0. ], [0. , 0.25, 0. ], [0. , 0. , 0.25]]) Add outliers: >>> w.outliers = np.array([2]) >>> w.prepare_weight_matrices() >>> w.compute_W_with_model_error(np.sqrt(3) * np.ones(3)) array([[0.25, 0. , 0. ], [0. , 0.25, 0. ], [0. , 0. , 0. ]]) Use sparse matrix: >>> w.W = scipy.sparse.diags(np.ones(3)).tocsr() >>> w.W[-1, -1] = 0 # mask last data point >>> w.W.toarray() array([[1., 0., 0.], [0., 1., 0.], [0., 0., 0.]]) >>> w.compute_W_with_model_error(np.sqrt(3) * np.ones(3)).toarray() array([[0.25, 0. , 0. ], [0. , 0.25, 0. ], [0. , 0. , 0. ]]) """ if np.any(model_err > 0): if not scipy.sparse.issparse(self.W): zeros = self.W == 0 if self.W.ndim == 1 and self.W.dtype != object: self.W = 1 / (self.data_cov + model_err * model_err) elif self.W.dtype == object: K = len(self.W) if self.W[0].ndim == 1: self.W = [1 / (self.data_cov[k] + model_err[k] * model_err[k]) for k in range(K)] elif self.W[0].ndim == 2: K = len(self.W) cov = [self.data_cov[k] + np.diag(model_err[k] ** 2) for k in range(K)] L = [np.linalg.inv(np.linalg.cholesky(cov[k])) for k in range(K)] self.W = [L[k].T @ L[k] for k in range(K)] else: raise ValueError(f"First element of fitworkspace.W has no ndim attribute or has a dimension above 2. " f"I get W[0]={self.W[0]}") elif self.W.ndim == 2 and self.W.dtype != object: cov = self.data_cov + np.diag(model_err * model_err) self.W = np.linalg.inv(cov) # needs to reapply the mask of outliers self.W[zeros] = 0 else: cov = self.data_cov + np.diag(model_err * model_err) L = np.linalg.inv(np.linalg.cholesky(cov)) W = L.T @ L # needs to reapply the mask of outliers rows, cols = self.W.nonzero() self.W = scipy.sparse.csr_matrix((W[rows, cols], (rows, cols)), dtype=self.W.dtype, shape=self.W.shape) return self.W
[docs] def chisq(self, p, model_output=False): """Compute the chi square for a set of model parameters p. Four cases are implemented: diagonal W, 2D W, array of diagonal Ws, array of 2D Ws. The two latter cases are for multiple independent data vectors with W being block diagonal. Parameters ---------- p: array_like The array of model parameters. model_output: bool, optional If true, the simulated model is output. Returns ------- chisq: float The chi square value. """ # check data format if (self.data.dtype != object and self.data.ndim > 1) or (self.err.dtype != object and self.err.ndim > 1): raise ValueError("Fitworkspace.data and Fitworkspace.err must be a flat 1D array," " or an array of flat arrays of unequal lengths.") # prepare weight matrices in case they have not been built before self.prepare_weight_matrices() x, model, model_err = self.simulate(*p) W = self.compute_W_with_model_error(model_err) if not scipy.sparse.issparse(self.W): if W.ndim == 1 and W.dtype != object: res = (model - self.data) chisq = res @ (W * res) elif W.dtype == object: K = len(W) res = [model[k] - self.data[k] for k in range(K)] if W[0].ndim == 1: chisq = np.sum([res[k] @ (W[k] * res[k]) for k in range(K)]) elif W[0].ndim == 2: chisq = np.sum([res[k] @ W[k] @ res[k] for k in range(K)]) else: raise ValueError(f"First element of fitworkspace.W has no ndim attribute or has a dimension above 2. " f"I get W[0]={W[0]}") elif W.ndim == 2 and W.dtype != object: res = (model - self.data) chisq = res @ W @ res else: raise ValueError( f"Weight covariance matrix must be a np.ndarray of dimension 1 or 2 if not sparse," f"either made of 1D or 2D arrays of equal lengths or not for block diagonal matrices." f"\nHere W type is {type(W)}, shape is {W.shape} and W is {W}.") else: res = (model - self.data) chisq = res @ W @ res if model_output: return chisq, x, model, model_err else: return chisq
[docs] def lnlike(self, p): """Compute the logarithmic likelihood for a set of model parameters p as -0.5*chisq. Parameters ---------- p: array_like The array of model parameters. Returns ------- lnlike: float The logarithmic likelihood value. """ return -0.5 * self.chisq(p)
[docs] def jacobian(self, params, model_input=None): """Generic function to compute the Jacobian matrix of a model, with numerical derivatives. Parameters ---------- params: array_like The array of model parameters. model_input: array_like, optional A model input as a list with (x, model, model_err) to avoid an additional call to simulate(). Returns ------- J: np.array The Jacobian matrix. """ if self.epsilon is None: raise ValueError("Epsilon array must be set to use finite difference jacobian.") if model_input: x, model, model_err = model_input else: x, model, model_err = self.simulate(*params) if self.W.dtype == object and self.W[0].ndim == 2: J = [[] for _ in range(params.size)] else: model = model.flatten() J = np.zeros((params.size, model.size)) for ip, p in enumerate(params): if self.params.fixed[ip]: continue tmp_p = np.copy(params) if tmp_p[ip] + self.epsilon[ip] < self.params.bounds[ip][0] or tmp_p[ip] + self.epsilon[ip] > self.params.bounds[ip][1]: self.epsilon[ip] = - self.epsilon[ip] tmp_p[ip] += self.epsilon[ip] tmp_x, tmp_model, tmp_model_err = self.simulate(*tmp_p) if self.W.dtype == object and self.W[0].ndim == 2: for k in range(model.shape[0]): J[ip].append((tmp_model[k] - model[k]) / self.epsilon[ip]) else: J[ip] = (tmp_model.flatten() - model) / self.epsilon[ip] return np.asarray(J)
[docs] def hessian(self, params): # pragma: nocover """Experimental function to compute the hessian of a model. Parameters ---------- params: array_like The array of model parameters. Returns ------- """ x, model, model_err = self.simulate(*params) model = model.flatten() J = self.jacobian(params) H = np.zeros((params.size, params.size, model.size)) tmp_p = np.copy(params) for ip, p1 in enumerate(params): print(ip, p1, params[ip], tmp_p[ip], self.params.bounds[ip], self.epsilon[ip], tmp_p[ip] + self.epsilon[ip]) if self.params.fixed[ip]: continue if tmp_p[ip] + self.epsilon[ip] < self.params.bounds[ip][0] or tmp_p[ip] + self.epsilon[ip] > self.params.bounds[ip][1]: self.epsilon[ip] = - self.epsilon[ip] tmp_p[ip] += self.epsilon[ip] print(tmp_p) # tmp_x, tmp_model, tmp_model_err = self.simulate(*tmp_p) # J[ip] = (tmp_model.flatten() - model) / self.epsilon[ip] tmp_J = self.jacobian(tmp_p) for ip, p1 in enumerate(params): if self.params.fixed[ip]: continue for jp, p2 in enumerate(params): if self.params.fixed[jp]: continue x, modelplus, model_err = self.simulate(params + self.epsilon) x, modelmoins, model_err = self.simulate(params - self.epsilon) model = model.flatten() print("hh", ip, jp, tmp_J[ip], J[jp], tmp_p[ip], params, (tmp_J[ip] - J[jp]) / self.epsilon) print((modelplus + modelmoins - 2 * model) / (np.asarray(self.epsilon) ** 2)) H[ip, jp] = (tmp_J[ip] - J[jp]) / self.epsilon H[ip, jp] = (modelplus + modelmoins - 2 * model) / (np.asarray(self.epsilon) ** 2) return H
def __post_fit__(self): """Method executed at the end of a minimisation process. """ pass
[docs] def plot_gradient_descent(self): fig, ax = plt.subplots(2, 1, figsize=(10, 6), sharex="all") iterations = np.arange(self.params_table.shape[0]) ax[0].plot(iterations, self.costs, lw=2) for ip in range(self.params_table.shape[1]): ax[1].plot(iterations, self.params_table[:, ip], label=f"{self.params.axis_names[ip]}") ax[1].set_yscale("symlog") ax[1].legend(ncol=6, loc=9) ax[1].grid() ax[0].set_yscale("log") ax[0].set_ylabel(r"$\chi^2$") ax[1].set_ylabel("Parameters") ax[0].grid() ax[1].set_xlabel("Iterations") ax[0].xaxis.set_major_locator(MaxNLocator(integer=True)) fig.tight_layout() plt.subplots_adjust(wspace=0, hspace=0) if parameters.SAVE and self.filename != "": # pragma: no cover figname = os.path.splitext(self.filename)[0] + "_fitting.pdf" self.my_logger.info(f"\n\tSave figure {figname}.") fig.savefig(figname, dpi=100, bbox_inches='tight') if parameters.DISPLAY: # pragma: no cover plt.show() if parameters.PdfPages: # args from the above? MFL parameters.PdfPages.savefig() self.simulate(*self.params.values) self.live_fit = False self.plot_fit()
[docs] def save_gradient_descent(self): iterations = np.arange(self.params_table.shape[0]).astype(int) t = np.zeros((self.params_table.shape[1] + 2, self.params_table.shape[0])) t[0] = iterations t[1] = self.costs t[2:] = self.params_table.T h = 'iter,costs,' + ','.join(self.params.labels) output_filename = os.path.splitext(self.filename)[0] + "_fitting.txt" np.savetxt(output_filename, t.T, header=h, delimiter=",") self.my_logger.info(f"\n\tSave gradient descent log {output_filename}.")
[docs] def plot_triangle_fisher(self, nsamples=10000, max_params=8): """Plot a triangle (corner) plot of probability contours and distributions using Fisher matrix method. This method generates samples from a multivariate normal distribution based on the parameter covariance matrix and creates a corner plot showing 1D and 2D marginal distributions. Parameters ---------- nsamples: int, optional Number of samples to generate from the multivariate normal distribution (default: 10000). max_params: int, optional Maximum number of parameters to display. Only the first max_params free parameters will be shown (default: 8). Examples -------- >>> from spectractor.fit.fitter import FitParameters, FitWorkspace >>> params = FitParameters(values=[1, 2, 3], axis_names=["x", "y", "z"]) >>> params.cov = np.array([[1, -0.5, 0], [-0.5, 1, -1], [0, -1, 1]]) >>> w = FitWorkspace(params=params, filename="test.fits") >>> w.plot_triangle_fisher(nsamples=1000, max_params=3) Notes ----- This method requires the `corner` package to be installed. The plot is saved if parameters.SAVE is True with suffix '_triangle.pdf'. """ try: import corner except ImportError: self.my_logger.warning("Package 'corner' not installed. Cannot create triangle plot. " "Install it with: pip install corner") return # Get free parameters indices ipar = self.params.get_free_parameters() # Limit to max_params parameters n_params = min(len(ipar), max_params) ipar = ipar[:n_params] if len(ipar) == 0: self.my_logger.warning("No free parameters to plot.") return # Check that covariance matrix is available if self.params.cov.size == 0: self.my_logger.warning("Covariance matrix not available. Run fit first.") return # Extract mean values and covariance matrix for free parameters mean_values = self.params.values[ipar] cov_matrix = self.params.cov[:n_params, :n_params] # Check if covariance matrix is positive definite eigenvalues = np.linalg.eigvals(cov_matrix) if np.any(eigenvalues <= 0): self.my_logger.warning("Covariance matrix is not positive definite. " "Cannot generate samples.") return # Generate samples from multivariate normal distribution samples = np.random.multivariate_normal(mean_values, cov_matrix, size=nsamples) # Get axis names for the selected parameters axis_names = [self.params.axis_names[i] for i in ipar] # Create corner plot fig = corner.corner(samples, labels=axis_names, quantiles=[0.16, 0.5, 0.84], show_titles=True, title_kwargs={"fontsize": 12}, truths=mean_values) # Save figure if requested if (parameters.SAVE or parameters.LSST_SAVEFIGPATH) and self.filename != "": # pragma: no cover figname = os.path.splitext(self.filename)[0] + "_triangle.pdf" self.my_logger.info(f"\n\tSave triangle plot {figname}.") fig.savefig(figname, dpi=100, bbox_inches='tight') if parameters.DISPLAY: # pragma: no cover plt.show() return fig
[docs] def gradient_descent(fit_workspace, niter=10, xtol=1e-3, ftol=1e-3, with_line_search=True): """ Four cases are implemented: diagonal W, 2D W, array of diagonal Ws, array of 2D Ws. The two latter cases are for multiple independent data vectors with W being block diagonal. Parameters ---------- fit_workspace: FitWorkspace niter xtol ftol with_line_search Returns ------- """ my_logger = set_logger(__name__) tmp_params = np.copy(fit_workspace.params.values).astype(float) fit_workspace.prepare_weight_matrices() n_data_masked = len(fit_workspace.mask) + len(fit_workspace.outliers) ipar = fit_workspace.params.get_free_parameters() costs = [] params_table = [] inv_JT_W_J = np.zeros((len(ipar), len(ipar)), dtype=float) start_total = time.time() for i in range(niter): start = time.time() cost, tmp_lambdas, tmp_model, tmp_model_err = fit_workspace.chisq(tmp_params, model_output=True) if i == 0 and fit_workspace.verbose: my_logger.info(f"\n\tIteration={i}:\tinitial cost={cost:.5g}\tinitial chisq_red={cost / (tmp_model.size - n_data_masked):.5g}") W = fit_workspace.compute_W_with_model_error(tmp_model_err) # residuals if (isinstance(W, np.ndarray) or scipy.sparse.issparse(W)) and W.dtype != object: residuals = (tmp_model - fit_workspace.data).flatten() elif isinstance(W, np.ndarray) and W.dtype == object: residuals = [(tmp_model[k] - fit_workspace.data[k]) for k in range(len(W))] else: raise TypeError(f"Type of fit_workspace.W is {type(W)}. It must be a np.ndarray.") # Jacobian J = fit_workspace.jacobian(tmp_params, model_input=[tmp_lambdas, tmp_model, tmp_model_err]) # remove parameters with unexpected null Jacobian vectors or that are degenerated J_vectors = [np.array(J[ip]).ravel() for ip in range(J.shape[0])] J_norms = [np.linalg.norm(J_vectors[ip]) for ip in range(J.shape[0])] for ip in range(J.shape[0]): if ip not in ipar: continue # check for null vectors if J_norms[ip] < np.sqrt(len(J_vectors[ip]))*np.finfo(np.float64).eps and len(np.where(J_vectors[ip]==0)[0]) > J_vectors[ip].size // 2: ipar = np.delete(ipar, list(ipar).index(ip)) fit_workspace.params.fixed[ip] = True my_logger.warning( f"\n\tStep {i}: {fit_workspace.params.labels[ip]} has a null Jacobian; parameter is fixed " f"at its last known current value ({tmp_params[ip]}) because |J[par]|={J_norms[ip]:.3e}<{np.sqrt(len(J_vectors[ip]))*np.finfo(np.float64).eps} with more than half values being zeros.") continue # check for degeneracies using Cauchy-Schwartz inequality; fix the second parameter for jp in range(ip, J.shape[0]): if ip == jp or jp not in ipar: continue inner = np.abs(J_vectors[ip] @ J_vectors[jp]) if np.abs(inner - J_norms[ip] * J_norms[jp]) < 1e-8 * inner: ipar = np.delete(ipar, list(ipar).index(jp)) fit_workspace.params.fixed[jp] = True my_logger.warning( f"\n\tStep {i}: {fit_workspace.params.labels[jp]} is degenerated with {fit_workspace.params.labels[ip]}; " f"parameter {fit_workspace.params.labels[jp]} is fixed at its last known current value ({tmp_params[jp]}).") continue # remove fixed and degenerated parameters; then transpose J = J[ipar].T # compute J.T @ W @ J matrix and invert it if W.ndim == 1 and W.dtype != object: JT_W = J.T * W JT_W_J = JT_W @ J elif W.ndim == 2 and W.dtype != object: JT_W = J.T @ W JT_W_J = JT_W @ J else: if W[0].ndim == 1: JT_W = np.array([j for j in J]).T * np.concatenate(W).ravel() JT_W_J = JT_W @ np.array([j for j in J]) else: # warning ! here the data arrays indexed by k can have different lengths because outliers # because W inverse covariance is block diagonal and blocks can have different sizes # the philosophy is to temporarily flatten the data arrays JT_W = [np.concatenate([J[ip][k].T @ W[k] for k in range(W.shape[0])]).ravel() for ip in range(len(J))] JT_W_J = np.array([[JT_W[ip2] @ np.concatenate(J[ip1][:]).ravel() for ip1 in range(len(J))] for ip2 in range(len(J))]) try: L = np.linalg.inv(np.linalg.cholesky(JT_W_J)) # cholesky is too sensible to the numerical precision inv_JT_W_J = L.T @ L except np.linalg.LinAlgError: inv_JT_W_J = np.linalg.inv(JT_W_J) if fit_workspace.W.dtype != object: JT_W_R0 = JT_W @ residuals else: JT_W_R0 = JT_W @ np.concatenate(residuals).ravel() # Gauss-Newton step: dparams = - inv_JT_W_J @ JT_W_R0 new_params = np.copy(tmp_params) new_params[ipar] = tmp_params[ipar] + dparams # check bounds for ip, p in enumerate(new_params): if p < fit_workspace.params.bounds[ip][0]: new_params[ip] = fit_workspace.params.bounds[ip][0] if p > fit_workspace.params.bounds[ip][1]: new_params[ip] = fit_workspace.params.bounds[ip][1] fval = fit_workspace.chisq(new_params) if with_line_search or fval > (1 + 10 * ftol) * cost: def line_search(alpha): tmp_params_2 = np.copy(tmp_params) tmp_params_2[ipar] = tmp_params[ipar] + alpha * dparams for ipp, pp in enumerate(tmp_params_2): if pp < fit_workspace.params.bounds[ipp][0]: tmp_params_2[ipp] = fit_workspace.params.bounds[ipp][0] if pp > fit_workspace.params.bounds[ipp][1]: tmp_params_2[ipp] = fit_workspace.params.bounds[ipp][1] return fit_workspace.chisq(tmp_params_2) # tol parameter acts on alpha (not func) alpha_min, fval, iter, funcalls = optimize.brent(line_search, full_output=True, tol=5e-1, brack=(0, 1)) new_params[ipar] = tmp_params[ipar] + alpha_min * dparams # check bounds for ip, p in enumerate(new_params): if p < fit_workspace.params.bounds[ip][0]: new_params[ip] = fit_workspace.params.bounds[ip][0] if p > fit_workspace.params.bounds[ip][1]: new_params[ip] = fit_workspace.params.bounds[ip][1] else: alpha_min = 1 funcalls = 0 iter = 0 tmp_params[ipar] = new_params[ipar] # prepare outputs costs.append(fval) params_table.append(np.copy(tmp_params)) fit_workspace.params.values = tmp_params if fit_workspace.verbose: my_logger.info(f"\n\tIteration={i}:\tfinal cost={fval:.5g}\tfinal chisq_red={fval / (tmp_model.size - n_data_masked):.5g} " f"\tcomputed in {time.time() - start:.2f}s" f"\n\tNew parameters: {tmp_params[ipar]}") my_logger.debug(f"\n\t Parameter shifts: {alpha_min * dparams}\n" f"\n\t Line search: alpha_min={alpha_min:.3g} iter={iter} funcalls={funcalls}") if fit_workspace.live_fit: # pragma: no cover fit_workspace.simulate(*tmp_params) fit_workspace.plot_fit() fit_workspace.cov = inv_JT_W_J # fit_workspace.params.plot_correlation_matrix(ipar) if len(ipar) == 0: my_logger.warning(f"\n\tGradient descent terminated in {i} iterations because all parameters " f"have null Jacobian.") break if np.sum(np.abs(alpha_min * dparams)) / np.sum(np.abs(tmp_params[ipar])) < xtol: my_logger.info(f"\n\tGradient descent terminated in {i} iterations because the sum of parameter shift " f"relative to the sum of the parameters is below xtol={xtol}.") break if len(costs) > 1 and np.abs(costs[-2] - fval) / np.max([np.abs(fval), np.abs(costs[-2])]) < ftol: my_logger.info(f"\n\tGradient descent terminated in {i} iterations because the " f"relative change of cost is below ftol={ftol}.") break if time.time() - start > parameters.SPECTRACTOR_FIT_TIMEOUT_PER_ITER: raise TimeoutError(f"Gradient descent iteration longer than {parameters.SPECTRACTOR_FIT_TIMEOUT_PER_ITER=}. " f"Check data quality or increase parameters.SPECTRACTOR_FIT_TIMEOUT_PER_ITER in config file.") if time.time() - start_total > parameters.SPECTRACTOR_FIT_TIMEOUT: raise TimeoutError(f"Gradient descent longer than {parameters.SPECTRACTOR_FIT_TIMEOUT=}. " f"Check data quality or increase parameters.SPECTRACTOR_FIT_TIMEOUT in config file.") return tmp_params, inv_JT_W_J, np.array(costs), np.array(params_table)
[docs] def simple_newton_minimisation(fit_workspace, niter=10, xtol=1e-3, ftol=1e-3): # pragma: no cover """Experimental function to minimize a function. Parameters ---------- fit_workspace: FitWorkspace niter xtol ftol """ my_logger = set_logger(__name__) tmp_params = np.copy(fit_workspace.params.values) ipar = fit_workspace.params.get_free_parameters() funcs = [] params_table = [] inv_H = np.zeros((len(ipar), len(ipar))) for i in range(niter): start = time.time() tmp_lambdas, tmp_model, tmp_model_err = fit_workspace.simulate(*tmp_params) # if fit_workspace.live_fit: # fit_workspace.plot_fit() J = fit_workspace.jacobian(tmp_params) # remove parameters with unexpected null Jacobian vectors for ip in range(J.shape[0]): if ip not in ipar: continue if np.all(J[ip] == np.zeros(J.shape[1])): ipar = np.delete(ipar, list(ipar).index(ip)) # tmp_params[ip] = 0 my_logger.warning( f"\n\tStep {i}: {fit_workspace.params.labels[ip]} has a null Jacobian; parameter is fixed " f"at its last known current value ({tmp_params[ip]}).") # remove fixed parameters J = J[ipar].T # hessian H = fit_workspace.hessian(tmp_params) try: L = np.linalg.inv(np.linalg.cholesky(H)) # cholesky is too sensible to the numerical precision inv_H = L.T @ L except np.linalg.LinAlgError: inv_H = np.linalg.inv(H) dparams = - inv_H[:, :, 0] @ J[:, 0] print("dparams", dparams, inv_H, J, H) tmp_params[ipar] += dparams # check bounds print("tmp_params", tmp_params, dparams, inv_H, J) for ip, p in enumerate(tmp_params): if p < fit_workspace.params.bounds[ip][0]: tmp_params[ip] = fit_workspace.params.bounds[ip][0] if p > fit_workspace.params.bounds[ip][1]: tmp_params[ip] = fit_workspace.params.bounds[ip][1] tmp_lambdas, new_model, tmp_model_err = fit_workspace.simulate(*tmp_params) new_func = new_model[0] funcs.append(new_func) r = np.log10(fit_workspace.regs) js = [fit_workspace.jacobian(np.asarray([rr]))[0] for rr in np.array(r)] plt.plot(r, js, label="J") plt.grid() plt.legend() plt.show() if parameters.DISPLAY: fig = plt.figure() plt.plot(r, js, label="prior") mod = tmp_model + J[0] * (r - (tmp_params - dparams)[0]) plt.plot(r, mod) plt.axvline(tmp_params) plt.axhline(tmp_model) plt.grid() plt.legend() plt.draw() plt.pause(1e-8) plt.close(fig) # prepare outputs params_table.append(np.copy(tmp_params)) if fit_workspace.verbose: my_logger.info(f"\n\tIteration={i}: initial func={tmp_model[0]:.5g}" f"\n\tParameter shifts: {dparams}" f"\n\tNew parameters: {tmp_params[ipar]}" f"\n\tFinal func={new_func:.5g}" f" computed in {time.time() - start:.2f}s") if fit_workspace.live_fit: fit_workspace.simulate(*tmp_params) fit_workspace.plot_fit() fit_workspace.cov = inv_H[:, :, 0] print("shape", fit_workspace.cov.shape) # fit_workspace.params.plot_correlation_matrix(ipar) if len(ipar) == 0: my_logger.warning(f"\n\tGradient descent terminated in {i} iterations because all parameters " f"have null Jacobian.") break if np.sum(np.abs(dparams)) / np.sum(np.abs(tmp_params[ipar])) < xtol: my_logger.info(f"\n\tGradient descent terminated in {i} iterations because the sum of parameter shift " f"relative to the sum of the parameters is below xtol={xtol}.") break if len(funcs) > 1 and np.abs(funcs[-2] - new_func) / np.max([np.abs(new_func), np.abs(funcs[-2])]) < ftol: my_logger.info(f"\n\tGradient descent terminated in {i} iterations because the " f"relative change of cost is below ftol={ftol}.") break plt.close() return tmp_params, inv_H[:, :, 0], np.array(funcs), np.array(params_table)
[docs] def run_gradient_descent(fit_workspace, xtol, ftol, niter, verbose=False, with_line_search=True): if fit_workspace.costs.size == 0: fit_workspace.costs = np.array([fit_workspace.chisq(fit_workspace.params.values)]) fit_workspace.params_table = np.array([fit_workspace.params.values]) p, cov, tmp_costs, tmp_params_table = gradient_descent(fit_workspace, niter=niter, xtol=xtol, ftol=ftol, with_line_search=with_line_search) fit_workspace.params.values, fit_workspace.params.cov = p, cov fit_workspace.params_table = np.concatenate([fit_workspace.params_table, tmp_params_table]) fit_workspace.costs = np.concatenate([fit_workspace.costs, tmp_costs]) if verbose or fit_workspace.verbose: fit_workspace.my_logger.info(f"\n{fit_workspace.params.print_parameters_summary()}") if parameters.DEBUG and (verbose or fit_workspace.verbose): fit_workspace.plot_gradient_descent() if len(fit_workspace.params.get_free_parameters()) > 1: fit_workspace.params.plot_correlation_matrix()
[docs] def run_simple_newton_minimisation(fit_workspace, xtol=1e-8, ftol=1e-8, niter=50, verbose=False): # pragma: no cover fit_workspace.values, fit_workspace.cov, funcs, params_table = simple_newton_minimisation(fit_workspace, niter=niter, xtol=xtol, ftol=ftol) if verbose or fit_workspace.verbose: fit_workspace.my_logger.info(f"\n{fit_workspace.params.print_parameters_summary()}") if parameters.DEBUG and (verbose or fit_workspace.verbose): fit_workspace.plot_gradient_descent() if len(fit_workspace.params.get_free_parameters()) > 1: fit_workspace.params.plot_correlation_matrix() return params_table, funcs
[docs] def run_minimisation(fit_workspace, method="newton", xtol=1e-4, ftol=1e-4, niter=50, verbose=False, with_line_search=True, minimizer_method="L-BFGS-B"): my_logger = set_logger(__name__) bounds = fit_workspace.params.bounds nll = lambda params: -fit_workspace.lnlike(params) guess = fit_workspace.params.values.astype('float64') if verbose: my_logger.debug(f"\n\tStart guess: {guess}") if method == "minimize": start = time.time() result = optimize.minimize(nll, fit_workspace.params.values, method=minimizer_method, options={'maxiter': 100000}, bounds=bounds) fit_workspace.params.values = result['x'] if verbose: my_logger.debug(f"\n\t{result}") my_logger.debug(f"\n\tMinimize: total computation time: {time.time() - start}s") elif method == 'basinhopping': start = time.time() minimizer_kwargs = dict(method=minimizer_method, bounds=bounds) result = optimize.basinhopping(nll, guess, minimizer_kwargs=minimizer_kwargs) fit_workspace.params.values = result['x'] if verbose: my_logger.debug(f"\n\t{result}") my_logger.debug(f"\n\tBasin-hopping: total computation time: {time.time() - start}s") elif method == "least_squares": # pragma: no cover fit_workspace.my_logger.warning("least_squares might not work, use with caution... " "or repair carefully the function weighted_residuals()") start = time.time() x_scale = np.abs(guess) x_scale[x_scale == 0] = 0.1 p = optimize.least_squares(fit_workspace.weighted_residuals, guess, verbose=2, ftol=1e-6, #x_scale=x_scale, diff_step=fit_workspace.epsilon, bounds=list(np.array(bounds).T)) fit_workspace.params.values = p.x if verbose: my_logger.debug(f"\n\t{p}") my_logger.debug(f"\n\tLeast_squares: total computation time: {time.time() - start}s") elif method == 'curve_fit': def model(x, *params): x, mod, mod_err = fit_workspace.simulate(*params) return mod def Dfun(x, *params): return fit_workspace.jacobian(params, model_input=None).T start = time.time() result = optimize.curve_fit(model, fit_workspace.x, fit_workspace.data, jac=Dfun, x_scale='jac', p0=fit_workspace.params.values, sigma=fit_workspace.err, verbose=0, xtol=xtol, ftol=ftol, bounds=list(np.array(bounds).T), absolute_sigma=True) fit_workspace.params.values = result[0] fit_workspace.params.cov = result[1] if verbose: my_logger.debug(f"\n\t{result}") my_logger.debug(f"\n\tCurve_fit: total computation time: {time.time() - start}s") if parameters.DEBUG: fit_workspace.plot_fit() elif method == "lm": # pragma: no cover start = time.time() x, cov, infodict, mesg, ier = optimize.leastsq(fit_workspace.weighted_residuals, guess, ftol=ftol, xtol=xtol, full_output=True) fit_workspace.params.values = x fit_workspace.params.cov = cov if verbose: my_logger.debug(f"\n\t{x}") my_logger.debug(f"\n\tLeast_squares: total computation time: {time.time() - start}s") elif method == "newton": start = time.time() run_gradient_descent(fit_workspace, xtol=xtol, ftol=ftol, niter=niter, verbose=verbose, with_line_search=with_line_search) if verbose: my_logger.debug(f"\n\tNewton: total computation time: {time.time() - start}s") if fit_workspace.filename != "": write_fitparameter_json(fit_workspace.params.json_filename, fit_workspace.params) fit_workspace.save_gradient_descent() fit_workspace.__post_fit__() if verbose and parameters.DEBUG: fit_workspace.plot_fit()
[docs] def run_minimisation_sigma_clipping(fit_workspace, method="newton", xtol=1e-4, ftol=1e-4, niter=50, sigma_clip=5.0, niter_clip=3, verbose=False, with_line_search=True): my_logger = set_logger(__name__) for step in range(niter_clip): if verbose: my_logger.info(f"\n\tSigma-clipping step {step}/{niter_clip} (sigma={sigma_clip})") run_minimisation(fit_workspace, method=method, xtol=xtol, ftol=ftol, niter=niter, with_line_search=with_line_search, verbose=verbose) # remove outliers if fit_workspace.data.dtype == object: # indices_no_nan = ~np.isnan(np.concatenate(fit_workspace.data).ravel()) data = np.concatenate(fit_workspace.data).ravel() # [indices_no_nan] model = np.concatenate(fit_workspace.model).ravel() # [indices_no_nan] err = np.concatenate(fit_workspace.err).ravel() # [indices_no_nan] model_err = np.concatenate(fit_workspace.model_err).ravel() # [indices_no_nan] else: # indices_no_nan = ~np.isnan(fit_workspace.data.flatten()) data = fit_workspace.data.flatten() # [indices_no_nan] model = fit_workspace.model.flatten() # [indices_no_nan] err = fit_workspace.err.flatten() # [indices_no_nan] model_err = fit_workspace.model_err.flatten() # [indices_no_nan] residuals = np.abs(data - model) / np.sqrt(err**2 + model_err**2) outliers = residuals > sigma_clip outliers = [i for i in range(data.size) if outliers[i]] outliers.sort() if len(outliers) > 0: my_logger.debug(f'\n\tOutliers flat index list: {outliers}') my_logger.info(f'\n\tOutliers: {len(outliers)} / {data.size - len(fit_workspace.mask)} data points ' f'({100 * len(outliers) / (data.size - len(fit_workspace.mask)):.2f}%) ' f'at more than {sigma_clip}-sigma from best-fit model.') if np.all(fit_workspace.outliers == outliers): my_logger.info(f'\n\tOutliers flat index list unchanged since last iteration: ' f'break the sigma clipping iterations.') break else: fit_workspace.outliers = outliers else: my_logger.info(f'\n\tNo outliers detected at first iteration: break the sigma clipping iterations.') break
[docs] class RegFitWorkspace(FitWorkspace): def __init__(self, w, opt_reg=parameters.PSF_FIT_REG_PARAM, verbose=False, live_fit=False): """ Parameters ---------- w: ChromaticPSFFitWorkspace, FullFowardModelFitWorkspace FitWorkspace instance where to apply regularisation. opt_reg: float Input value for optimal regularisation parameter (default: parameters.PSF_FIT_REG_PARAM). verbose: bool, optional Level of verbosity (default: False). live_fit: bool, optional If True, model, data and residuals plots are made along the fitting procedure (default: False). """ params = FitParameters(np.asarray([np.log10(opt_reg)]), labels=["log10_reg"], axis_names=[r"$\log_{10} r$"], fixed=None, bounds=[(-20, np.log10(w.amplitude_priors.size) + 2)]) FitWorkspace.__init__(self, params, data=np.array([0]), err=np.array([1]), epsilon=[1e-1], verbose=verbose, live_fit=live_fit) self.w = w self.opt_reg = opt_reg self.resolution = np.zeros_like((self.w.amplitude_params.size, self.w.amplitude_params.size)) self.G = 0 self.chisquare = -1
[docs] def print_regularisation_summary(self): self.my_logger.info(f"\n\tOptimal regularisation parameter: {self.opt_reg}" f"\n\tTr(R) = N_dof = {np.trace(self.resolution)}" f"\n\tN_params = {len(self.w.amplitude_params)}" f"\n\tN_data = {np.concatenate([self.w.data]).size - len(self.w.mask) - len(self.w.outliers)}" f" (excluding masked pixels and outliers)")
[docs] def simulate(self, log10_r): reg = 10 ** log10_r M_dot_W_dot_M_plus_Q = self.w.M_dot_W_dot_M + reg * self.w.Q try: L = np.linalg.inv(np.linalg.cholesky(M_dot_W_dot_M_plus_Q)) cov = L.T @ L except np.linalg.LinAlgError: cov = np.linalg.inv(M_dot_W_dot_M_plus_Q) if self.w.W.dtype == object: K = len(self.w.W) M_dot_W = [self.w.M[k].T @ self.w.W[k] for k in range(K)] M_dot_W_dot_D = np.sum([M_dot_W[k] @ self.w.data[k] for k in range(K)], axis=0) A = cov @ (M_dot_W_dot_D + reg * self.w.Q_dot_A0) diff = [self.w.data[k] - self.w.M[k] @ A for k in range(K)] self.chisquare = np.sum([diff[k] @ self.w.W[k] @ diff[k] for k in range(K)]) else: if self.w.W.ndim == 1: A = cov @ (self.w.M.T @ (self.w.W * self.w.data) + reg * self.w.Q_dot_A0) else: A = cov @ (self.w.M.T @ (self.w.W @ self.w.data) + reg * self.w.Q_dot_A0) if A.ndim == 2: # ndim == 2 when A comes from a sparse matrix computation A = np.asarray(A).reshape(-1) diff = self.w.data - self.w.M @ A if self.w.W.ndim == 1: self.chisquare = diff @ (self.w.W * diff) else: self.chisquare = diff @ self.w.W @ diff self.resolution = np.eye(A.size) - reg * cov @ self.w.Q self.w.amplitude_params = A self.w.amplitude_cov_matrix = cov self.w.amplitude_params_err = np.array([np.sqrt(np.abs(cov[x, x])) for x in range(cov.shape[0])]) self.G = self.chisquare / (np.concatenate([self.w.data]).size - len(self.w.mask) - len(self.w.outliers) - np.trace(self.resolution)) ** 2 return np.asarray([log10_r]), np.asarray([self.G]), np.zeros_like(self.data)
[docs] def plot_fit(self): log10_opt_reg = self.params.values[0] regs = 10 ** np.linspace(min(-7, 0.9 * log10_opt_reg), max(3, 1.2 * log10_opt_reg), 50) Gs = [] chisqs = [] resolutions = [] for r in regs: self.simulate(np.log10(r)) Gs.append(self.G) chisqs.append(self.chisquare) resolutions.append(np.trace(self.resolution)) opt_reg = 10 ** log10_opt_reg fig, ax = plt.subplots(3, 1, figsize=(7, 5), sharex="all") ax[0].plot(regs, Gs) ax[0].axvline(opt_reg, color="k") ax[1].axvline(opt_reg, color="k") ax[2].axvline(opt_reg, color="k") ax[0].set_ylabel(r"$G(r)$") ax[0].set_xlabel("Regularisation hyper-parameter $r$") ax[0].grid() ax[0].set_title(f"Optimal regularisation parameter: {opt_reg:.3g}") ax[1].plot(regs, chisqs) ax[1].set_ylabel(r"$\chi^2(\mathbf{A}(r) \vert \mathbf{\theta})$") ax[1].set_xlabel("Regularisation hyper-parameter $r$") ax[1].grid() ax[1].set_xscale("log") ax[2].set_xscale("log") ax[2].plot(regs, resolutions) ax[2].set_ylabel(r"$\mathrm{Tr}\,\mathbf{R}$") ax[2].set_xlabel("Regularisation hyper-parameter $r$") ax[2].grid() # fig.tight_layout() plt.subplots_adjust(hspace=0) if parameters.DISPLAY: plt.show() if parameters.LSST_SAVEFIGPATH: fig.savefig(os.path.join(parameters.LSST_SAVEFIGPATH, 'regularisation.pdf')) fig = plt.figure(figsize=(7, 5)) rho = compute_correlation_matrix(self.w.amplitude_cov_matrix) plot_correlation_matrix_simple(plt.gca(), rho, axis_names=[''] * len(self.w.amplitude_params)) # ipar=np.arange(10, 20)) plt.gca().set_title(r"Correlation matrix $\mathbf{\rho}$") fig.tight_layout() if parameters.LSST_SAVEFIGPATH: fig.savefig(os.path.join(parameters.LSST_SAVEFIGPATH, 'amplitude_correlation_matrix.pdf')) if parameters.DISPLAY: plt.show()
[docs] def run_regularisation(self, Ndof=None): run_minimisation(self, method="minimize", ftol=1e-4, xtol=1e-2, verbose=self.verbose, minimizer_method="Nelder-Mead") self.opt_reg = 10 ** self.params.values[0] self.simulate(np.log10(self.opt_reg)) self.print_regularisation_summary() if Ndof is not None: r_Ndof = self.set_regularisation_with_Ndof(Ndof) if self.opt_reg < 1e-3 * r_Ndof or self.opt_reg > 1e3 * r_Ndof: self.my_logger.warning(f"\n\tRegularisation parameter r minimizing G(r) is 3 orders of magnitude away " f"from optimal regularisation parameter {r_Ndof} using {Ndof=}. " f"Probably that the model does not fit well data at this stage. " f"Switch to optimal parameter.") self.opt_reg = r_Ndof self.simulate(np.log10(self.opt_reg)) self.print_regularisation_summary()
[docs] def set_regularisation_with_Ndof(self, Ndof): """Find regularisation parameter $r$ that checks $Tr(R) = Ndof$. Parameters ---------- Ndof: float Number of degrees of freedom, ie $Tr(R)$. Returns ------- r: float Regularisation parameter. """ log10_opt_reg = self.params.values[0] regs = 10 ** np.linspace(min(-7, 0.9 * log10_opt_reg), max(3, 1.2 * log10_opt_reg), 50) Gs = np.zeros_like(regs) chisqs = np.zeros_like(regs) resolutions = np.zeros_like(regs) for ir, r in enumerate(regs): self.simulate(np.log10(r)) Gs[ir] = self.G chisqs[ir] = self.chisquare resolutions[ir] = np.trace(self.resolution) Ndof_index = np.argmin(np.abs(resolutions - Ndof)) return regs[Ndof_index]
if __name__ == "__main__": import doctest doctest.testmod()