Source code for Starfish.models.utils

import logging

import numpy as np
from nptyping import Array
from scipy.optimize import minimize
import scipy.stats as st

import Starfish.constants as C
from .kernels import global_covariance_matrix


[docs]def find_residual_peaks( model, num_residuals=100, threshold=4.0, buffer=2, wl_range=(0, np.inf) ): """ Find the peaks of the most recent residual and return their properties to aid in setting up local kernels Parameters ---------- model : Model The model to determine peaks from. Need only have a residuals array. num_residuals : int, optional The number of residuals to average together for determining peaks. By default 100. threshold : float, optional The sigma clipping threshold, by default 4.0 buffer : float, optional The minimum distance between peaks, in Angstrom, by default 2.0 wl_range : 2-tuple The (min, max) wavelengths to consider. Default is (0, np.inf) Returns ------- means : list The means of the found peaks, with the same units as model.data.wave """ residual = np.mean(list(model.residuals)[-num_residuals:], axis=0) sigma = residual.std() if "global_cov" in model.params: ag = np.exp(model.params["global_cov:log_amp"]) lg = np.exp(model.params["global_cov:log_ls"]) sigma += global_covariance_matrix( model.data.wave, model["T"], ag, lg ).diagonal() mask = np.abs(residual - residual.mean()) > threshold * sigma mask &= (model.data.wave > wl_range[0]) & (model.data.wave < wl_range[1]) peak_waves = model.data.wave[1:-1][mask[1:-1]] mus = [] covered = np.zeros_like(peak_waves, dtype=bool) # Sort from largest residual to smallest abs_resid = np.abs(residual[1:-1][mask[1:-1]]) for wl, resid in sorted( zip(peak_waves, abs_resid), key=lambda t: t[1], reverse=True ): if wl in peak_waves[covered]: continue mus.append(wl) ind = (peak_waves >= (wl - buffer)) & (peak_waves <= (wl + buffer)) covered |= ind return mus
[docs]def optimize_residual_peaks(model, mus, threshold=0.1, sigma0=50, num_residuals=100): """ Optimize the local covariance parameters based on fitting the residual input means as Gaussians around the residuals Parameters ---------- model : Model The model to determine peaks from. Need only have a residuals array. mus : array-like The means to instantiate Gaussians at and optimize. threshold : float, optional This is the threshold for restricting kernels; i.e. if a fit amplitude is less than threshold standard deviations then it will be thrown away. Default is 0.1 sigma0 : float, optional The initial standard deviation (in Angstrom) of each Gaussian. Default is 50 Angstrom. num_residuals : int, optional The number of residuals to average together for determining peaks. By default 100. Returns ------- dict A dictionary of optimized parameters ready to be plugged into model["local_cov"] Warning ------- I have had inconsistent results with this optimization, be mindful of your outputs and consider hand-tuning after optimizing. """ residual = np.mean(list(model.residuals)[-num_residuals:], axis=0) amp_cutoff = threshold * residual.std() if "global_cov" in model.params: ag = np.exp(model.params["global_cov:log_amp"]) lg = np.exp(model.params["global_cov:log_ls"]) global_cov = global_covariance_matrix(model.data.wave, model["T"], ag, lg) else: global_cov = None def chi2(P, wave, resid, sigma): log_amp, mu, log_sigma = P _amp = np.exp(log_amp) _sigma = np.exp(log_sigma) if _amp == 0 or _sigma == 0: return np.Inf # Logistic prior for the widths out to 2sigma prior = st.uniform.logpdf(_sigma, 0, 2 * sigma0) # Put prior on widths and heights such that the integrated area should be less # than the trapezoidal area of the residual area_max = 2 * _sigma * np.abs(resid).max() # Area under a gaussian # https://en.wikipedia.org/wiki/Gaussian_function#Integral_of_a_Gaussian_function area = np.sqrt(2 * np.pi * _sigma) * _amp prior += st.uniform.logpdf(area, 0, area_max) prior += st.uniform.logpdf(_amp, 0, np.abs(resid).max()) rr = C.c_kms / mu * np.abs(wave - mu) gauss = _amp * np.exp(-0.5 * (rr / _sigma) ** 2) R = gauss - resid return np.sum((R / sigma) ** 2) - prior params = [] for mu in mus: mask = (model.data.wave > mu - sigma0) & (model.data.wave < mu + sigma0) wave = model.data.wave[mask] resid = residual[mask] sigma = model.data.sigma[mask] if global_cov is not None: sigma += global_cov.diagonal()[mask] P0 = np.array([np.log(np.abs(resid).max()), mu, np.log(sigma0)]) soln = minimize( chi2, P0, args=(wave, resid, sigma), method="Nelder-Mead", options=dict(maxiter=1000), ) if soln.x[0] > np.log(amp_cutoff): params.append( {"log_amp": soln.x[0], "mu": soln.x[1], "log_sigma": soln.x[2]} ) return sorted(params, key=lambda s: s["mu"])
log = logging.getLogger(__name__)
[docs]def covariance_debugger(cov: Array[float]): """ Special debugging information for the covariance matrix decomposition. """ log.info(f"{'Covariance Debugger':-^60}".format()) log.info("See https://github.com/iancze/Starfish/issues/26") log.info("Covariance matrix at a glance:") if cov.diagonal().min() < 0.0: log.warning("- Negative entries on the diagonal:") log.info("\t- Check uncertainty estimates: should all be positive") elif np.any(np.isnan(cov.diagonal())): log.warning("- Covariance matrix has a NaN value on the diagonal") else: if not np.allclose(cov, cov.T): log.warning("- The covariance matrix is highly asymmetric") # Still might have an asymmetric matrix below `allclose` threshold eigenvalues, eigenvectors = np.linalg.eigh(cov) n_neg = (eigenvalues < 0).sum() n_tot = len(eigenvalues) log.info(f"- There are {n_neg} negative eigenvalues out of {n_tot}.") def mark(val): return ">" if val < 0 else "." log.info("Covariance matrix eigenvalues:") for i in range(10): log.info( "{: >6} {:{fill}>20.3e}".format( i, eigenvalues[i], fill=mark(eigenvalues[i]) ) ) log.info("{: >15}".format("...")) for i in range(10): log.info( "{: >6} {:{fill}>20.3e}".format( n_tot - 10 + i, eigenvalues[-10 + i], fill=mark(eigenvalues[-10 + i]), ) ) log.info(f"{'-':-^60}")