Source code for jdaviz.configs.default.plugins.model_fitting.fitting_backend

from asteval import Interpreter

import multiprocessing as mp
from multiprocessing import Pool

import numpy as np

import astropy.units as u

from specutils import Spectrum1D
from specutils.fitting import fit_lines

__all__ = ['fit_model_to_spectrum']


[docs] def fit_model_to_spectrum(spectrum, component_list, expression, run_fitter=False, window=None, n_cpu=None): """Fits a `~astropy.modeling.CompoundModel` to a `~specutils.Spectrum1D` instance. If the input spectrum represents a spectral cube, then fits the model to every spaxel in the cube, using a multiprocessor pool running in parallel (if ``n_cpu`` is larger than 1). Parameters ---------- spectrum : `~specutils.Spectrum1D` The spectrum to be fitted. component_list : list Spectral model subcomponents stored in a list. Their ``'name'`` attribute must be unique. Each subcomponent should be an initialized object from `~astropy.modeling.Model`. expression : str The arithmetic expression that combines together the model subcomponents. The subcomponents are referred via their ``'name'`` attribute. run_fitter : bool **This is currently being ignored for 3D fits.** When `False` (the default), the function composes the compound model and returns it without fitting. window : `None` or `~specutils.SpectralRegion` See :func:`specutils.fitting.fit_lines`. n_cpu : `None` or int **This is only used for spectral cube fitting.** Number of cores to use for multiprocessing. Using all the cores at once is not recommended. If `None`, it will use max cores minus one. Set this to 1 for debugging. Returns ------- output_model : `~astropy.modeling.CompoundModel` or list The model resulting from the fit. In the case of a 1D input spectrum, a single model instance is returned. In case of a 3D spectral cube input, instead o model instances for every spaxel, a list with 2D arrays, each one storing fitted parameter values for all spaxels, is returned. output_spectrum : `~specutils.Spectrum1D` The realization of the fitted model as a spectrum. The spectrum will be 1D or 3D depending on the shape of input spectrum. """ # Initial guess for the fit. initial_model = _build_model(component_list, expression) if len(spectrum.shape) > 1: return _fit_3D(initial_model, spectrum, window=window, n_cpu=n_cpu) else: return _fit_1D(initial_model, spectrum, run_fitter, window=window)
def _fit_1D(initial_model, spectrum, run_fitter, filter_non_finite=True, window=None): """ Fits an astropy CompoundModel to a Spectrum1D instance. Parameters ---------- initial_model : :class: `astropy.modeling.CompoundModel` Initial guess for the model to be fitted. spectrum : :class:`specutils.Spectrum1D` The spectrum to be fitted. run_fitter : bool When False (the default), the function composes the compound model and returns it without fitting. window : `None` or :class:`specutils.spectra.SpectralRegion` See :func:`specutils.fitting.fitmodels.fit_lines`. Returns ------- output_model : :class: `astropy.modeling.CompoundModel` The model resulting from the fit. output_spectrum : :class:`specutils.Spectrum1D` The realization of the fitted model as a spectrum. """ if run_fitter: if spectrum.uncertainty and not np.all(spectrum.uncertainty.array == 0): weights = 'unc' else: weights = None output_model = fit_lines(spectrum, initial_model, weights=weights, filter_non_finite=filter_non_finite, window=window) output_values = output_model(spectrum.spectral_axis) else: # Return without fitting. output_model = initial_model output_values = initial_model(spectrum.spectral_axis) # Build return spectrum output_spectrum = Spectrum1D(spectral_axis=spectrum.spectral_axis, flux=output_values, mask=spectrum.mask) return output_model, output_spectrum def _fit_3D(initial_model, spectrum, window=None, n_cpu=None): """ Fits an astropy CompoundModel to every spaxel in a cube using a multiprocessor pool running in parallel. Computes realizations of the models over each spaxel. Parameters ---------- initial_model : :class: `astropy.modeling.CompoundModel` Initial guess for the model to be fitted. spectrum : :class:`specutils.Spectrum1D` The spectrum that stores the cube in its 'flux' attribute. window : `None` or :class:`specutils.spectra.SpectralRegion` See :func:`specutils.fitting.fitmodels.fit_lines`. n_cpu : `None` or int Number of cores to use for multiprocessing. Using all the cores at once is not recommended. If `None`, it will use max cores minus one. Set this to 1 for debugging. Returns ------- output_model : :list: a list that stores 2D arrays. Each array contains one parameter from `astropy.modeling.CompoundModel` instances fitted to every spaxel in the input cube. output_spectrum : :class:`specutils.Spectrum1D` The spectrum that stores the fitted model values in its 'flux' attribute. """ if n_cpu is None: n_cpu = mp.cpu_count() - 1 # Generate list of all spaxels to be fitted spaxels = _generate_spaxel_list(spectrum) fitted_models = [] # Build cube with empty arrays, one per input spaxel. These # will store the flux values corresponding to the fitted # model realization over each spaxel. output_flux_cube = np.zeros(shape=spectrum.flux.shape) # Callback to collect results from workers into the cubes def collect_result(results): for i in range(len(results['x'])): x = results['x'][i] y = results['y'][i] model = results['fitted_model'][i] fitted_values = results['fitted_values'][i] # Store fitted model parameters fitted_models.append({"x": x, "y": y, "model": model}) # Store fitted values output_flux_cube[x, y, :] = fitted_values # Run multiprocessor pool to fit each spaxel and # compute model values on that same spaxel. if n_cpu > 1: results = [] pool = Pool(n_cpu) # The communication overhead of spawning a process for each *individual* # parameter set is prohibitively high (it's actually faster to run things # sequentially). Instead, chunk the spaxel list based on the number of # available processors, and have each processor do the model fitting # on the entire subset of spaxel tuples, then return the set of results. for spx in np.array_split(spaxels, n_cpu): # Worker for the multiprocess pool. worker = SpaxelWorker(spectrum.flux, spectrum.spectral_axis, initial_model, param_set=spx, window=window, mask=spectrum.mask) r = pool.apply_async(worker, callback=collect_result) results.append(r) for r in results: r.wait() pool.close() # This route is only for dev debugging because it is very slow # but exceptions will not get swallowed up by multiprocessing. else: # pragma: no cover worker = SpaxelWorker(spectrum.flux, spectrum.spectral_axis, initial_model, param_set=spaxels, window=window, mask=spectrum.mask) collect_result(worker()) # Build output 3D spectrum funit = spectrum.flux.unit output_spectrum = Spectrum1D(wcs=spectrum.wcs, flux=output_flux_cube * funit, mask=spectrum.mask) return fitted_models, output_spectrum class SpaxelWorker: """ A class with callable instances that perform fitting over a spaxel. It provides the callable for the `Pool.apply_async` function, and also holds everything necessary to perform the fit over one spaxel. Additionally, the callable computes the realization of the model just fitted, over that same spaxel. We cannot do these two steps (fit and compute) separately, since we cannot modify parameter values in an already built CompoundModel instance. We need to use the current model instance while it still exists. """ def __init__(self, flux_cube, wave_array, initial_model, param_set, window=None, mask=None): self.cube = flux_cube self.wave = wave_array self.model = initial_model self.param_set = param_set self.window = window self.mask = mask def __call__(self): results = {'x': [], 'y': [], 'fitted_model': [], 'fitted_values': []} for parameters in self.param_set: x = parameters[0] y = parameters[1] # Calling the Spectrum1D constructor for every spaxel # turned out to be less expensive than expected. Experiments # show that the cost amounts to a couple percent additional # running time in comparison with a version that uses a 3D # spectrum as input. Besides, letting an externally-created # spectrum reference into the callable somehow prevents it # to execute. This behavior was seen also with other functions # passed to the callable. flux = self.cube[x, y, :] if self.mask is not None: mask = self.mask[x, y, :] else: # If no mask is provided: mask = np.zeros_like(flux.value).astype(bool) sp = Spectrum1D(spectral_axis=self.wave, flux=flux, mask=mask) if sp.uncertainty and not np.all(sp.uncertainty.array == 0): weights = 'unc' else: weights = None fitted_model = fit_lines(sp, self.model, window=self.window, weights=weights) fitted_values = fitted_model(self.wave) results['x'].append(x) results['y'].append(y) results['fitted_model'].append(fitted_model) results['fitted_values'].append(fitted_values) return results def _build_model(component_list, expression): """ Builds an astropy CompoundModel from a list of components and an expression that links them to each other. Parameters ---------- component_list : list Spectral model subcomponents stored in a list. Their ``'name'`` attribute must be unique. Each subcomponent should be an initialized object from `astropy.modeling.models' expression : str The arithmetic expression that combines together the model subcomponents. The subcomponents are referred via their ``'name'`` attribute. Returns ------- model : :class:`astropy.modeling.CompoundModel` The model resulting from the fit. """ # The expression parser needs the subcomponents stored in a # dict, with keys taken from their names. This mechanism can # be augmented with status feedback to the UI. This requires # the code in here to be refactored to provide the necessary # hooks for the GUI (see # specviz/specviz/plugins/model_editor/models.py around lines # 200-230). model_dict = {} for component in component_list: model_dict[component.name] = component aeval = Interpreter(usersyms=model_dict) model = aeval(expression) return model def _handle_parameter_units(model, fitted_parameters_cube, param_units): """ Extracts parameter units from a CompoundModel and parameter values from a list of 2D numpy arrays, and returns a dict of 2D Quantity arrays. The dict values are keyed by the parameter names. Parameters ---------- model : :class:`astropy.modeling.CompoundModel` An instance of a model. fitted_parameters_cube : ndarray A 3D array in which the 1st dimension addresses model parameters, and the 2nd and 3rd dimensions address spaxels. param_units : list A list with each parameter's units (this is not available in the model instance itself). Returns ------- fitted_parameters_dict : dict 2D Quantity arrays keyed by parameter name """ fitted_parameters_dict = {} for index in range(len(model.parameters)): key = model.param_names[index] _ary = fitted_parameters_cube[index, :, :] fitted_parameters_dict[key] = u.Quantity(_ary, param_units[index]) return fitted_parameters_dict def _generate_spaxel_list(spectrum): """ Generates a list with tuples, each one addressing the (x,y) coordinates of a spaxel in a 3-D spectrum cube. If a mask is available, skip masked indices. Parameters ---------- spectrum : :class:`specutils.Spectrum1D` The spectrum that stores the cube in its ``'flux'`` attribute. Returns ------- spaxels : list List with spaxels """ n_x, n_y, _ = spectrum.flux.shape if spectrum.mask is None: spx = [[(x, y) for x in range(n_x)] for y in range(n_y)] else: # return only non-masked spaxels spx = [[(x, y) for x in range(n_x) if np.any(~spectrum.mask[x, y])] for y in range(n_y) if np.any(~spectrum.mask[:, y])] spaxels = [item for sublist in spx for item in sublist] return spaxels