Source code for jdaviz.configs.specviz2d.plugins.spectral_extraction.spectral_extraction
import numpy as np
from traitlets import Bool, List, Unicode, observe
from jdaviz.core.events import SnackbarMessage
from jdaviz.core.registries import tray_registry
from jdaviz.core.template_mixin import (PluginTemplateMixin,
SelectPluginComponent,
DatasetSelect,
AddResults,
skip_if_no_updates_since_last_active,
with_spinner)
from jdaviz.core.user_api import PluginUserApi
from jdaviz.core.custom_traitlets import IntHandleEmpty, FloatHandleEmpty
from jdaviz.core.marks import PluginLine
from astropy.modeling import models
from astropy.nddata import StdDevUncertainty, VarianceUncertainty, UnknownUncertainty
from specreduce import tracing
from specreduce import background
from specreduce import extract
__all__ = ['SpectralExtraction']
_model_cls = {'Spline': models.Spline1D,
'Polynomial': models.Polynomial1D,
'Legendre': models.Legendre1D,
'Chebyshev': models.Chebyshev1D}
[docs]
@tray_registry('spectral-extraction', label="Spectral Extraction",
viewer_requirements=['spectrum', 'spectrum-2d'])
class SpectralExtraction(PluginTemplateMixin):
"""
The Spectral Extraction plugin exposes specreduce methods for tracing, background subtraction,
and spectral extraction from 2D spectra.
See the :ref:`Spectral Extraction Plugin Documentation <specviz2d-spectral-extraction>`
for more details.
Only the following attributes and methods are available through the
:ref:`public plugin API <plugin-apis>`:
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.show`
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.open_in_tray`
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.close_in_tray`
* :attr:`interactive_extract`
* ``trace_dataset`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`):
controls the input dataset for generating the trace.
* ``trace_type`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`):
controls the type of trace to be generated.
* ``trace_peak_method`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`):
only applicable if ``trace_type`` is not ``Flat``.
* :attr:`trace_pixel` :
pixel of the trace. If ``trace_type`` is not ``Flat``, then this
is the "guess" for the automated trace.
* :attr:`trace_do_binning` :
only applicable if ``trace_type`` is not ``Flat``. Bin the input data when fitting the
trace.
* :attr:`trace_bins` :
only applicable if ``trace_type`` is not ``Flat`` and ``trace_do_binning``.
* :attr:`trace_window` :
full width of the trace.
* :meth:`import_trace`
* :meth:`export_trace`
* ``bg_dataset`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`):
controls the input dataset for generating the background.
* ``bg_type`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`):
controls the type of background to be generated.
* :attr:`bg_trace_pixel` :
only applicable if ``bg_type`` is set to ``Manual``
* :attr:`bg_separation` :
only applicable if ``bg_type`` set set to ``OneSided`` or ``TwoSided``.
Separation from the referenced trace for the center of each of the background window(s).
* :attr:`bg_width` :
full width of each background window(s).
* ``bg_statistic`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`):
statistic to use when computing the background. 'Average' will account for partial pixel
weights, 'Median' will include all partial pixels.
* ``bg_add_results`` (:class:`~jdaviz.core.template_mixin.AddResults`)
* ``bg_sub_add_results``
* :meth:`import_bg`
* :meth:`export_bg`
* :meth:`export_bg_img`
* :meth:`export_bg_sub`
* ``ext_dataset`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`):
controls the input dataset for generating the extracted spectrum. "From Plugin" will
use the background-subtracted image as defined by the background options above. To skip
background extraction, select the original input 2D spectrum.
* ``ext_trace`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`)
* ``ext_type`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
* :attr:`ext_width` :
full width of the extraction window.
* ``ext_add_results`` (:class:`~jdaviz.core.template_mixin.AddResults`)
* :meth:`import_extract`
* :meth:`export_extract`
* :meth:`export_extract_spectrum`
"""
dialog = Bool(False).tag(sync=True)
template_file = __file__, "spectral_extraction.vue"
uses_active_status = Bool(True).tag(sync=True)
active_step = Unicode().tag(sync=True)
interactive_extract = Bool(True).tag(sync=True)
# TRACE
trace_trace_items = List().tag(sync=True)
trace_trace_selected = Unicode().tag(sync=True)
trace_offset = IntHandleEmpty(0).tag(sync=True)
trace_dataset_items = List().tag(sync=True)
trace_dataset_selected = Unicode().tag(sync=True)
trace_type_items = List().tag(sync=True)
trace_type_selected = Unicode().tag(sync=True)
trace_pixel = FloatHandleEmpty(0).tag(sync=True)
trace_order = IntHandleEmpty(3).tag(sync=True)
trace_peak_method_items = List().tag(sync=True)
trace_peak_method_selected = Unicode().tag(sync=True)
trace_do_binning = Bool(True).tag(sync=True)
trace_bins = IntHandleEmpty(20).tag(sync=True)
trace_window = IntHandleEmpty(0).tag(sync=True)
trace_results_label = Unicode().tag(sync=True)
trace_results_label_default = Unicode().tag(sync=True)
trace_results_label_auto = Bool(True).tag(sync=True)
trace_results_label_invalid_msg = Unicode('').tag(sync=True)
trace_results_label_overwrite = Bool().tag(sync=True)
trace_add_to_viewer_items = List().tag(sync=True)
trace_add_to_viewer_selected = Unicode().tag(sync=True)
trace_spinner = Bool(False).tag(sync=True)
# BACKGROUND
bg_dataset_items = List().tag(sync=True)
bg_dataset_selected = Unicode().tag(sync=True)
bg_type_items = List().tag(sync=True)
bg_type_selected = Unicode().tag(sync=True)
bg_trace_items = List().tag(sync=True)
bg_trace_selected = Unicode().tag(sync=True)
bg_trace_pixel = FloatHandleEmpty(0).tag(sync=True)
bg_statistic_items = List().tag(sync=True)
bg_statistic_selected = Unicode().tag(sync=True)
bg_separation = FloatHandleEmpty(0).tag(sync=True)
bg_width = FloatHandleEmpty(0).tag(sync=True)
bg_results_label = Unicode().tag(sync=True)
bg_results_label_default = Unicode().tag(sync=True)
bg_results_label_auto = Bool(True).tag(sync=True)
bg_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_results_label_overwrite = Bool().tag(sync=True)
bg_add_to_viewer_items = List().tag(sync=True)
bg_add_to_viewer_selected = Unicode().tag(sync=True)
bg_img_spinner = Bool(False).tag(sync=True)
bg_spec_results_label = Unicode().tag(sync=True)
bg_spec_results_label_default = Unicode().tag(sync=True)
bg_spec_results_label_auto = Bool(True).tag(sync=True)
bg_spec_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_spec_results_label_overwrite = Bool().tag(sync=True)
bg_spec_add_to_viewer_items = List().tag(sync=True)
bg_spec_add_to_viewer_selected = Unicode().tag(sync=True)
bg_spec_spinner = Bool(False).tag(sync=True)
bg_sub_results_label = Unicode().tag(sync=True)
bg_sub_results_label_default = Unicode().tag(sync=True)
bg_sub_results_label_auto = Bool(True).tag(sync=True)
bg_sub_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_sub_results_label_overwrite = Bool().tag(sync=True)
bg_sub_add_to_viewer_items = List().tag(sync=True)
bg_sub_add_to_viewer_selected = Unicode().tag(sync=True)
bg_sub_spinner = Bool(False).tag(sync=True)
# EXTRACT
ext_dataset_items = List().tag(sync=True)
ext_dataset_selected = Unicode().tag(sync=True)
ext_trace_items = List().tag(sync=True)
ext_trace_selected = Unicode().tag(sync=True)
ext_type_items = List().tag(sync=True)
ext_type_selected = Unicode().tag(sync=True)
ext_width = FloatHandleEmpty(0).tag(sync=True)
ext_uncert_warn = Bool(False).tag(sync=True)
ext_specreduce_err = Unicode().tag(sync=True)
ext_results_label = Unicode().tag(sync=True)
ext_results_label_default = Unicode().tag(sync=True)
ext_results_label_auto = Bool(True).tag(sync=True)
ext_results_label_invalid_msg = Unicode('').tag(sync=True)
ext_results_label_overwrite = Bool().tag(sync=True)
ext_add_to_viewer_items = List().tag(sync=True)
ext_add_to_viewer_selected = Unicode().tag(sync=True)
# uses default "spinner"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._marks = {}
self._do_marks = False
# TRACE
self.trace_trace = DatasetSelect(self,
'trace_trace_items',
'trace_trace_selected',
default_text='New Trace',
filters=['is_trace'])
self.trace_dataset = DatasetSelect(self,
'trace_dataset_items',
'trace_dataset_selected',
filters=['layer_in_spectrum_2d_viewer', 'not_trace'])
self.trace_type = SelectPluginComponent(self,
items='trace_type_items',
selected='trace_type_selected',
manual_options=['Flat', 'Polynomial',
'Legendre', 'Chebyshev',
'Spline'])
self.trace_peak_method = SelectPluginComponent(self,
items='trace_peak_method_items',
selected='trace_peak_method_selected',
manual_options=['Gaussian', 'Centroid', 'Max']) # noqa
self.trace_add_results = AddResults(self, 'trace_results_label',
'trace_results_label_default',
'trace_results_label_auto',
'trace_results_label_invalid_msg',
'trace_results_label_overwrite',
'trace_add_to_viewer_items',
'trace_add_to_viewer_selected')
self.trace_add_results.viewer.filters = ['is_spectrum_2d_viewer']
self.trace_results_label_default = 'trace'
# BACKGROUND
self.bg_dataset = DatasetSelect(self,
'bg_dataset_items',
'bg_dataset_selected',
filters=['layer_in_spectrum_2d_viewer', 'not_trace'])
self.bg_type = SelectPluginComponent(self,
items='bg_type_items',
selected='bg_type_selected',
manual_options=['TwoSided', 'OneSided', 'Manual'])
self.bg_trace = DatasetSelect(self,
items='bg_trace_items',
selected='bg_trace_selected',
default_text='From Plugin',
filters=['is_trace'])
self.bg_statistic = SelectPluginComponent(self,
items='bg_statistic_items',
selected='bg_statistic_selected',
manual_options=['Average', 'Median'])
self.bg_add_results = AddResults(self, 'bg_results_label',
'bg_results_label_default',
'bg_results_label_auto',
'bg_results_label_invalid_msg',
'bg_results_label_overwrite',
'bg_add_to_viewer_items',
'bg_add_to_viewer_selected')
self.bg_add_results.viewer.filters = ['is_spectrum_2d_viewer']
self.bg_results_label_default = 'background'
self.bg_spec_add_results = AddResults(self, 'bg_spec_results_label',
'bg_spec_results_label_default',
'bg_spec_results_label_auto',
'bg_spec_results_label_invalid_msg',
'bg_spec_results_label_overwrite',
'bg_spec_add_to_viewer_items',
'bg_spec_add_to_viewer_selected')
self.bg_spec_add_results.viewer.filters = ['is_spectrum_viewer']
self.bg_spec_results_label_default = 'background-spectrum'
self.bg_sub_add_results = AddResults(self, 'bg_sub_results_label',
'bg_sub_results_label_default',
'bg_sub_results_label_auto',
'bg_sub_results_label_invalid_msg',
'bg_sub_results_label_overwrite',
'bg_sub_add_to_viewer_items',
'bg_sub_add_to_viewer_selected')
self.bg_sub_add_results.viewer.filters = ['is_spectrum_2d_viewer']
self.bg_sub_results_label_default = 'background-subtracted'
# EXTRACT
self.ext_dataset = DatasetSelect(self,
'ext_dataset_items',
'ext_dataset_selected',
default_text='From Plugin',
filters=['layer_in_spectrum_2d_viewer', 'not_trace'])
self.ext_trace = DatasetSelect(self,
items='ext_trace_items',
selected='ext_trace_selected',
default_text='From Plugin',
filters=['is_trace'])
self.ext_type = SelectPluginComponent(self,
items='ext_type_items',
selected='ext_type_selected',
manual_options=['Boxcar', 'Horne'])
self.ext_add_results = AddResults(self, 'ext_results_label',
'ext_results_label_default',
'ext_results_label_auto',
'ext_results_label_invalid_msg',
'ext_results_label_overwrite',
'ext_add_to_viewer_items',
'ext_add_to_viewer_selected')
self.ext_add_results.viewer.filters = ['is_spectrum_viewer']
# NOTE: defaults to overwriting original spectrum
self.ext_add_results.label_whitelist_overwrite = ['Spectrum 1D']
self.ext_results_label_default = 'Spectrum 1D'
# continue to not use live-preview marks for the instance of the plugin used for the
# initial spectral extraction during load_data
self._do_marks = kwargs.get('interactive', True)
@property
def _default_spectrum_viewer_reference_name(self):
return self.app._jdaviz_helper._default_spectrum_viewer_reference_name
@property
def _default_spectrum_2d_viewer_reference_name(self):
return self.app._jdaviz_helper._default_spectrum_2d_viewer_reference_name
@property
def user_api(self):
return PluginUserApi(self, expose=('interactive_extract',
'trace_dataset', 'trace_type',
'trace_order', 'trace_peak_method',
'trace_pixel',
'trace_do_binning', 'trace_bins', 'trace_window',
'import_trace',
'export_trace',
'bg_dataset', 'bg_type',
'bg_trace_pixel', 'bg_separation', 'bg_width',
'bg_statistic',
'bg_add_results', 'bg_sub_add_results',
'import_bg',
'export_bg', 'export_bg_img', 'export_bg_sub',
'ext_dataset', 'ext_trace', 'ext_type',
'ext_width', 'ext_add_results',
'import_extract',
'export_extract', 'export_extract_spectrum'))
@observe('trace_dataset_selected')
def _trace_dataset_selected(self, msg=None):
if not hasattr(self, 'trace_dataset'):
# happens when first initializing plugin outside of tray
return
width = self.trace_dataset.selected_obj.shape[0]
# estimate the pixel number by taking the median of the brightest pixel index in each column
brightest_pixel = int(np.median(np.argmax(self.trace_dataset.selected_obj.flux, axis=0)))
# do not allow to be an edge pixel
if brightest_pixel < 1:
brightest_pixel = 1
if brightest_pixel > width - 1:
brightest_pixel = width - 1
distance_from_edge = min(brightest_pixel, width-brightest_pixel)
# default width will be 10% of cross-dispersion "height",
# but no larger than distance from the edge
default_bg_width = int(np.ceil(width / 10))
default_width = min(default_bg_width, distance_from_edge * 2)
# sign for one-sided and single trace-pixel depending on whether the brightest pixel is
# above or below the middle of the image
if default_bg_width * 2 >= distance_from_edge:
sign = 1 if (brightest_pixel < width / 2) else -1
default_bg_separation = sign * default_bg_width * 2
else:
default_bg_separation = default_bg_width * 2
if self.trace_pixel == 0:
self.trace_pixel = brightest_pixel
if self.trace_window == 0:
self.trace_window = default_width
if self.bg_trace_pixel == 0:
self.bg_trace_pixel = brightest_pixel + default_bg_separation
if self.bg_separation == 0:
if default_bg_width * 2 >= distance_from_edge:
self.bg_type_selected = 'OneSided'
self.bg_separation = default_bg_separation
if self.bg_width == 0:
self.bg_width = default_bg_width
if self.ext_width == 0:
self.ext_width = default_width
[docs]
def update_marks(self, step=None):
"""
Manually update the live-preview marks for a given step in spectral extraction. This API
mimics opening the plugin and interacting with one of the steps.
Parameters
----------
step : str
Step in the extraction process to visualize. Must be one of: 'trace', 'bg', 'ext'.
"""
if step is not None:
if step == 'trace':
self._interaction_in_trace_step()
elif step == 'bg':
self._interaction_in_bg_step()
elif step == 'ext':
self._interaction_in_ext_step()
else:
raise ValueError("step must be one of: trace, bg, ext")
# also listens to is_active from any _interaction_in_*_step methods
def _update_plugin_marks(self, msg={}):
if not self._do_marks:
return
if self.app._jdaviz_helper is None:
return
if not len(self._marks):
# plugin has never been opened, no need to create marks just to hide them,
# we'll create marks when the plugin is first opened
return
if not (self.is_active):
for step, mark in self.marks.items():
mark.visible = False
return
if self.active_step == '':
# on load, default to 'extract' (this will then trigger the observe to update the marks)
self.active_step = 'ext'
return
display_marks = {'trace': ['trace', 'extract'],
'bg': ['trace',
'bg1_center', 'bg1_lower', 'bg1_upper',
'bg2_center', 'bg2_lower', 'bg2_upper',
'bg_spec', 'extract'],
'ext': ['trace',
'ext_upper', 'ext_lower',
'extract']}
for step, mark in self.marks.items():
mark.visible = step in display_marks.get(self.active_step, [])
@property
def marks(self):
"""
Access the marks created by this plugin in both the spectrum-viewer and spectrum-2d-viewer.
"""
if self._marks:
# TODO: replace with cache property?
return self._marks
if not self._do_marks:
return {}
viewer2d = self.app.get_viewer(self._default_spectrum_2d_viewer_reference_name)
viewer1d = self.app.get_viewer(self._default_spectrum_viewer_reference_name)
if not viewer2d.state.reference_data:
# we don't have data yet for scales, defer initializing
return {}
# the marks haven't been initialized yet, so initialize with empty
# marks that will be populated once the first analysis is done.
self._marks = {'trace': PluginLine(viewer2d, visible=self.is_active),
'ext_lower': PluginLine(viewer2d, visible=self.is_active),
'ext_upper': PluginLine(viewer2d, visible=self.is_active),
'bg1_center': PluginLine(viewer2d, visible=self.is_active,
line_style='dotted'),
'bg1_lower': PluginLine(viewer2d, visible=self.is_active),
'bg1_upper': PluginLine(viewer2d, visible=self.is_active),
'bg2_center': PluginLine(viewer2d, visible=self.is_active,
line_style='dotted'),
'bg2_lower': PluginLine(viewer2d, visible=self.is_active),
'bg2_upper': PluginLine(viewer2d, visible=self.is_active)}
# NOTE: += won't trigger the figure to notice new marks
viewer2d.figure.marks = viewer2d.figure.marks + list(self._marks.values())
self._marks['extract'] = PluginLine(viewer1d, visible=self.is_active)
self._marks['bg_spec'] = PluginLine(viewer1d, visible=self.is_active, stroke_width=1) # noqa
# NOTE: += won't trigger the figure to notice new marks
viewer1d.figure.marks = viewer1d.figure.marks + [self._marks['extract'],
self._marks['bg_spec']]
return self._marks
@observe('interactive_extract')
@skip_if_no_updates_since_last_active()
def _update_interactive_extract(self, event={}):
# also called by any of the _interaction_in_*_step
if not self._do_marks:
return False
if self.interactive_extract:
try:
sp1d = self.export_extract_spectrum(add_data=False)
except Exception as e:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
# NOTE: FitTrace or manual background are often giving a
# "background regions overlapped" error from specreduce
self.ext_specreduce_err = repr(e)
self.marks['extract'].clear()
else:
self.ext_specreduce_err = ''
self.marks['extract'].update_xy(sp1d.spectral_axis.value,
sp1d.flux.value)
else:
self.marks['extract'].clear()
if self.interactive_extract and self.active_step == 'bg':
try:
spec = self.export_bg_spectrum()
except Exception:
self.marks['bg_spec'].clear()
else:
self.marks['bg_spec'].update_xy(spec.spectral_axis, spec.flux)
else:
self.marks['bg_spec'].clear()
@observe('is_active', 'trace_dataset_selected', 'trace_type_selected',
'trace_trace_selected', 'trace_offset', 'trace_order',
'trace_pixel', 'trace_peak_method_selected',
'trace_do_binning', 'trace_bins', 'trace_window', 'active_step')
def _interaction_in_trace_step(self, event={}):
if not self._do_marks:
return
if ((event.get('name', '') in ('active_step', 'is_active') and self.active_step != 'trace')
or not self.is_active):
self._update_plugin_marks(event)
return
try:
trace = self.export_trace(add_data=False)
except Exception:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
self.marks['trace'].clear()
else:
self.marks['trace'].update_xy(range(len(trace.trace)),
trace.trace)
self.marks['trace'].line_style = 'solid'
self._update_interactive_extract(event)
self.active_step = 'trace'
self._update_plugin_marks(event)
@observe('is_active', 'bg_dataset_selected', 'bg_type_selected',
'bg_trace_selected', 'bg_trace_pixel',
'bg_separation', 'bg_width', 'bg_statistic_selected', 'active_step')
def _interaction_in_bg_step(self, event={}):
if not self._do_marks:
return
if ((event.get('name', '') in ('active_step', 'is_active') and self.active_step != 'bg')
or not self.is_active):
self._update_plugin_marks(event)
return
try:
trace = self._get_bg_trace()
except Exception:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
for mark in ['trace', 'bg1_center', 'bg1_lower', 'bg1_upper',
'bg2_center', 'bg2_lower', 'bg2_upper', 'bg_spec']:
self.marks[mark].clear()
else:
xs = range(len(trace.trace))
self.marks['trace'].update_xy(xs,
trace.trace)
self.marks['trace'].line_style = 'dashed'
if self.bg_type_selected in ['OneSided', 'TwoSided']:
self.marks['bg1_center'].update_xy(xs,
trace.trace+self.bg_separation)
self.marks['bg1_lower'].update_xy(xs,
trace.trace+self.bg_separation-self.bg_width/2)
self.marks['bg1_upper'].update_xy(xs,
trace.trace+self.bg_separation+self.bg_width/2)
else:
self.marks['bg1_center'].clear()
self.marks['bg1_lower'].update_xy(xs,
trace.trace-self.bg_width/2)
self.marks['bg1_upper'].update_xy(xs,
trace.trace+self.bg_width/2)
if self.bg_type_selected == 'TwoSided':
self.marks['bg2_center'].update_xy(xs,
trace.trace-self.bg_separation)
self.marks['bg2_lower'].update_xy(xs,
trace.trace-self.bg_separation-self.bg_width/2)
self.marks['bg2_upper'].update_xy(xs,
trace.trace-self.bg_separation+self.bg_width/2)
else:
for mark in ['bg2_center', 'bg2_lower', 'bg2_upper']:
self.marks[mark].clear()
self._update_interactive_extract(event)
self.active_step = 'bg'
self._update_plugin_marks(event)
@observe('is_active', 'ext_dataset_selected', 'ext_trace_selected',
'ext_type_selected', 'ext_width', 'active_step')
def _interaction_in_ext_step(self, event={}):
if not self._do_marks:
return
if ((event.get('name', '') in ('active_step', 'is_active') and self.active_step not in ('ext', '')) # noqa
or not self.is_active):
self._update_plugin_marks(event)
return
try:
trace = self._get_ext_trace()
except Exception:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
for mark in ['trace', 'ext_lower', 'ext_upper']:
self.marks[mark].clear()
else:
xs = range(len(trace.trace))
self.marks['trace'].update_xy(xs,
trace.trace)
self.marks['trace'].line_style = 'dashed'
if self.ext_type_selected == 'Boxcar':
self.marks['ext_lower'].update_xy(xs,
trace.trace-self.ext_width/2)
self.marks['ext_upper'].update_xy(xs,
trace.trace+self.ext_width/2)
else:
for mark in ['ext_lower', 'ext_upper']:
self.marks[mark].clear()
self._update_interactive_extract(event)
self.active_step = 'ext'
self._update_plugin_marks(event)
# TODO: remove this, the traitlet, and the row in spectral_extraction.vue
# when specutils handles the warning/exception
if self.ext_type_selected == 'Horne':
inp_sp2d = self._get_ext_input_spectrum()
self.ext_uncert_warn = isinstance(inp_sp2d.uncertainty, UnknownUncertainty)
else:
self.ext_uncert_warn = False
def _set_create_kwargs(self, **kwargs):
invalid_kwargs = [k for k in kwargs.keys() if not hasattr(self, k)]
if len(invalid_kwargs):
raise ValueError(f"{invalid_kwargs} are not valid attributes to pass as kwargs")
for k, v in kwargs.items():
setattr(self, k, v)
[docs]
def import_trace(self, trace):
"""
Import the input parameters from an existing specreduce Trace object into the plugin.
Parameters
----------
trace : specreduce.tracing.Trace
Trace object to import
"""
if not isinstance(trace, tracing.Trace): # pragma: no cover
raise TypeError("trace must be a specreduce.tracing.Trace object")
if isinstance(trace, tracing.FlatTrace):
self.trace_type_selected = 'Flat'
self.trace_pixel = trace.trace_pos
elif isinstance(trace, tracing.FitTrace):
self.trace_type_selected = trace.trace_model.__class__.__name__.strip('1D')
self.trace_pixel = trace.guess
self.trace_window = trace.window
self.trace_bins = trace.bins
self.trace_do_binning = True
if hasattr(trace.trace_model, 'degree'):
self.trace_order = trace.trace_model.degree
elif isinstance(trace, tracing.ArrayTrace): # pragma: no cover
raise NotImplementedError(f"cannot import ArrayTrace into plugin. Use viz.load_trace instead") # noqa
else: # pragma: no cover
raise NotImplementedError(f"trace of type {trace.__class__.__name__} not supported")
@with_spinner('trace_spinner')
def export_trace(self, add_data=False, **kwargs):
"""
Create a specreduce Trace object from the input parameters
defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting trace to the application, according to the options
defined in the plugin.
"""
self._set_create_kwargs(**kwargs)
if len(kwargs) and self.active_step != 'trace':
self.update_marks(step='trace')
if self.trace_trace_selected != 'New Trace':
# then we're offsetting an existing trace
# for FlatTrace, we can keep and expose a new FlatTrace (which has the advantage of
# being able to load back into the plugin)
orig_trace = self.trace_trace.selected_obj
if isinstance(orig_trace, tracing.FlatTrace):
trace = tracing.FlatTrace(self.trace_dataset.selected_obj,
orig_trace.trace_pos+self.trace_offset)
else:
trace = tracing.ArrayTrace(self.trace_dataset.selected_obj,
self.trace_trace.selected_obj.trace+self.trace_offset)
elif self.trace_type_selected == 'Flat':
trace = tracing.FlatTrace(self.trace_dataset.selected_obj,
self.trace_pixel)
elif self.trace_type_selected in _model_cls:
trace_model = _model_cls[self.trace_type_selected](degree=self.trace_order)
trace = tracing.FitTrace(self.trace_dataset.selected_obj,
guess=self.trace_pixel,
bins=int(self.trace_bins) if self.trace_do_binning else None,
window=self.trace_window,
peak_method=self.trace_peak_method_selected.lower(),
trace_model=trace_model)
else:
raise NotImplementedError(f"trace_type={self.trace_type_selected} not implemented")
if add_data:
self.trace_add_results.add_results_from_plugin(trace, replace=False)
return trace
def _get_bg_trace(self):
if self.bg_type_selected == 'Manual':
trace = tracing.FlatTrace(self.trace_dataset.selected_obj,
self.bg_trace_pixel)
elif self.bg_trace_selected == 'From Plugin':
trace = self.export_trace(add_data=False)
else:
trace = self.bg_trace.selected_obj
return trace
[docs]
def import_bg(self, bg):
"""
Import the input parameters from an existing specreduce Background object into the plugin.
Parameters
----------
bg : specreduce.background.Background
Background object to import
"""
if not isinstance(bg, background.Background): # pragma: no cover
raise TypeError("bg must be a specreduce.background.Background object")
# TODO: should we detect/set the referenced dataset?
trace = self._get_bg_trace()
if len(bg.traces) == 2:
# try to detect constant separation
seps1 = bg.traces[0].trace - trace.trace
seps2 = trace.trace - bg.traces[1].trace
if np.all(seps1 == seps1[0]) and np.all(seps2 == seps1[0]):
self.bg_type_selected = 'TwoSided'
self.bg_separation = abs(int(seps1[0]))
else: # pragma: no cover
raise NotImplementedError("backgrounds with custom traces not supported (could not detect common separation)") # noqa
elif len(bg.traces) == 1:
# either one_sided or trace, let's see if its constant offset from the trace
seps = bg.traces[0].trace - trace.trace
if np.all(seps == seps[0]):
self.bg_type_selected = 'OneSided'
self.bg_separation = int(seps[0])
else: # pragma: no cover
raise NotImplementedError("backgrounds with custom traces not supported (could not detect common separation)") # noqa
else: # pragma: no cover
raise NotImplementedError("backgrounds with more than 2 traces not supported")
self.bg_width = bg.width
@with_spinner('bg_spinner')
def export_bg(self, **kwargs):
"""
Create a specreduce Background object from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting image to the application, according to the options
defined in the plugin.
"""
self._set_create_kwargs(**kwargs)
if len(kwargs) and self.active_step != 'bg':
self.update_marks(step='bg')
trace = self._get_bg_trace()
if self.bg_type_selected == 'Manual':
bg = background.Background(self.bg_dataset.selected_obj,
[trace], width=self.bg_width,
statistic=self.bg_statistic.selected.lower())
elif self.bg_type_selected == 'OneSided':
bg = background.Background.one_sided(self.bg_dataset.selected_obj,
trace,
self.bg_separation,
width=self.bg_width,
statistic=self.bg_statistic.selected.lower())
elif self.bg_type_selected == 'TwoSided':
bg = background.Background.two_sided(self.bg_dataset.selected_obj,
trace,
self.bg_separation,
width=self.bg_width,
statistic=self.bg_statistic.selected.lower())
else: # pragma: no cover
raise NotImplementedError(f"bg_type={self.bg_type_selected} not implemented")
return bg
@with_spinner('bg_img_spinner')
def export_bg_img(self, add_data=False, **kwargs):
"""
Create a background 2D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting image to the application, according to the options
defined in the plugin.
"""
bg_spec = self.export_bg(**kwargs).bkg_image()
if add_data:
self.bg_add_results.add_results_from_plugin(bg_spec, replace=True)
return bg_spec
[docs]
def vue_create_bg_img(self, *args):
try:
self.export_bg_img(add_data=True)
except Exception as e:
self.app.hub.broadcast(
SnackbarMessage(f"Specreduce background failed with the following error: {repr(e)}",
color='error', sender=self)
)
@with_spinner('bg_spec_spinner')
def export_bg_spectrum(self, add_data=False, **kwargs):
"""
Create a background 1D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting spectrum to the application, according to the options
defined in the plugin.
"""
spec = self.export_bg(**kwargs).bkg_spectrum()
if add_data:
self.bg_spec_add_results.add_results_from_plugin(spec, replace=False)
return spec
@with_spinner('bg_sub_spinner')
def export_bg_sub(self, add_data=False, **kwargs):
"""
Create a background-subtracted 2D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting image to the application, according to the options
defined in the plugin.
"""
bg_sub_spec = self.export_bg(**kwargs).sub_image()
if add_data:
self.bg_sub_add_results.add_results_from_plugin(bg_sub_spec, replace=True)
return bg_sub_spec
def _get_ext_trace(self):
if self.ext_trace_selected == 'From Plugin':
return self.export_trace(add_data=False)
else:
return self.ext_trace.selected_obj
def _get_ext_input_spectrum(self):
if self.ext_dataset_selected == 'From Plugin':
return self.export_bg_sub(add_data=False)
else:
return self.ext_dataset.selected_obj
[docs]
def import_extract(self, ext):
"""
Import the input parameters from an existing specreduce extract object into the plugin.
Parameters
----------
ext : specreduce.extract.BoxcarExtract
Extract object to import
"""
if isinstance(ext, extract.BoxcarExtract):
self.ext_type_selected = 'Boxcar'
self.ext_width = ext.width
elif isinstance(ext, extract.HorneExtract):
self.ext_type_selected = 'Horne'
else: # pragma: no cover
raise TypeError("ext must be a specreduce.extract.BoxcarExtract or specreduce.extract.HorneExtract object") # noqa
[docs]
def export_extract(self, **kwargs):
"""
Create a specreduce extraction object from the input parameters defined in the plugin.
"""
self._set_create_kwargs(**kwargs)
if len(kwargs) and self.active_step != 'ext':
self.update_marks(step='ext')
trace = self._get_ext_trace()
inp_sp2d = self._get_ext_input_spectrum()
if self.ext_type_selected == 'Boxcar':
ext = extract.BoxcarExtract(inp_sp2d, trace, width=self.ext_width)
elif self.ext_type_selected == 'Horne':
if inp_sp2d.uncertainty is None:
inp_sp2d.uncertainty = VarianceUncertainty(np.ones_like(inp_sp2d.data))
if not hasattr(inp_sp2d.uncertainty, 'uncertainty_type'):
inp_sp2d.uncertainty = StdDevUncertainty(inp_sp2d.uncert)
ext = extract.HorneExtract(inp_sp2d, trace)
else:
raise NotImplementedError(f"extraction type '{self.ext_type_selected}' not supported") # noqa
return ext
@with_spinner('spinner')
def export_extract_spectrum(self, add_data=False, **kwargs):
"""
Create an extracted 1D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting spectrum to the application, according to the options
defined in the plugin.
"""
extract = self.export_extract(**kwargs)
spectrum = extract.spectrum
if add_data:
self.ext_add_results.add_results_from_plugin(spectrum, replace=False)
return spectrum