Source code for jdaviz.core.helpers

"""
Helper classes are meant to provide a convenient user API for specific
configurations. They allow a separation of "viztool-specific" API and the glue
application objects.

See also https://github.com/spacetelescope/jdaviz/issues/104 for more details
on the motivation behind this concept.
"""
import warnings
from contextlib import contextmanager
from inspect import isclass

import numpy as np
from glue.core import ComponentID, HubListener
from glue.core.edit_subset_mode import NewMode
from glue.core.message import SubsetCreateMessage, SubsetDeleteMessage
from glue.core.subset import Subset, MaskSubsetState
from glue.config import data_translator
from ipywidgets.widgets import widget_serialization

from astropy.nddata import NDDataArray, CCDData, StdDevUncertainty
import astropy.units as u
from astropy.utils.decorators import deprecated
from regions.core.core import Region
from specutils import Spectrum1D, SpectralRegion

from jdaviz.app import Application
from jdaviz.core.events import SnackbarMessage, ExitBatchLoadMessage, SliceSelectSliceMessage
from jdaviz.core.loaders.resolvers import find_matching_resolver
from jdaviz.core.template_mixin import show_widget
from jdaviz.utils import data_has_valid_wcs
from jdaviz.core.unit_conversion_utils import (all_flux_unit_conversion_equivs,
                                               check_if_unit_is_per_solid_angle,
                                               flux_conversion_general,
                                               spectral_axis_conversion)


__all__ = ['ConfigHelper', 'ImageConfigHelper', 'CubeConfigHelper']


[docs] class ConfigHelper(HubListener): """The Base Helper Class. Provides shared abstracted helper methods to the user. Subclasses should set ``_default_configuration`` if they are meant to be used with a specific configuration. Parameters ---------- app : `~jdaviz.app.Application` or `None` The application object, or if `None`, creates a new one based on the default configuration for this helper. verbosity : {'debug', 'info', 'warning', 'error'} Verbosity of the popup messages in the application. history_verbosity : {'debug', 'info', 'warning', 'error'} Verbosity of the history logger in the application. """ _default_configuration = 'default' _component_ids = {} def __init__(self, app=None, verbosity='warning', history_verbosity='info'): if app is None: self.app = Application(configuration=self._default_configuration) else: self.app = app self.app.verbosity = verbosity self.app.history_verbosity = history_verbosity # give a reference from the app back to this config helper. These can be accessed from a # viewer via viewer.jdaviz_app and viewer.jdaviz_helper # if the helper has already been set, this is probably a nested viz tool. Don't overwrite if self.app._jdaviz_helper is None: self.app._jdaviz_helper = self self.app.hub.subscribe(self, SubsetCreateMessage, handler=lambda msg: self._propagate_callback_to_viewers('_on_subset_create', msg)) # noqa self.app.hub.subscribe(self, SubsetDeleteMessage, handler=lambda msg: self._propagate_callback_to_viewers('_on_subset_delete', msg)) # noqa self._in_batch_load = 0 self._delayed_show_in_viewer_labels = {} # label: viewer_reference pairs def _propagate_callback_to_viewers(self, method, msg): # viewers don't have access to the app/hub to subscribe to messages, so we'll # catch all messages here and pass them on to each of the viewers that # have the applicable method implemented. for viewer in self.app._viewer_store.values(): if hasattr(viewer, method): getattr(viewer, method)(msg)
[docs] @contextmanager def batch_load(self): """ Context manager to delay linking and loading data into viewers """ # we'll use a counter instead of a boolean to allow the user to nest multiple # context managers. Once they're all exited, then the linking/showing will # take place. self._in_batch_load += 1 with self.app.data_collection.delay_link_manager_update(): # user entrypoint (anything within the with-statement will get called here) yield self._in_batch_load -= 1 if not self._in_batch_load: self.app.hub.broadcast(ExitBatchLoadMessage(sender=self.app)) # add any data to viewers that were requested but deferred for data_label, viewer_ref in self._delayed_show_in_viewer_labels.items(): self.app.set_data_visibility(viewer_ref, data_label, visible=True, replace=False) self._delayed_show_in_viewer_labels = {}
[docs] def load_data(self, data, data_label=None, parser_reference=None, **kwargs): if data_label: kwargs['data_label'] = data_label self.app.load_data(data, parser_reference=parser_reference, **kwargs)
@property def _coords_info(self): return self.app.session.application._tools.get('g-coords-info') @property def loaders(self): """ Access API objects for data loaders in the import dialog. Returns ------- loaders : dict dict of loader objects """ if not (self.app.state.dev_loaders or self.app.config in ('deconfigged', 'specviz', 'specviz2d')): # noqa raise NotImplementedError("loaders is under active development and requires a dev-flag to test") # noqa loaders = {item['label']: widget_serialization['from_json'](item['widget'], None).user_api for item in self.app.state.loader_items} return loaders @property def new_viewers(self): """ Access API objects for creating new viewers. Returns ------- new_viewers : dict dict of viewer-creator objects """ if not self.app.config == 'deconfigged': raise NotImplementedError("new_viewers is only enabled in the deconfigged app") # noqa new_viewers = {item['label']: widget_serialization['from_json'](item['widget'], None).user_api # noqa for item in self.app.state.new_viewer_items if item['is_relevant']} return new_viewers def _load(self, inp=None, loader=None, format=None, target=None, **kwargs): """ Load data into the app. A single valid loader/importer must be able to be matched based on the input, otherwise an error will be raised suggesting what further information to provide. For an interactive approach, see ``loaders``. Parameters ---------- inp : string or object or None Input filename, url, data object, etc. loader : string, optional Only consider a specific loader/resolver format : string, optional Only consider a specific format target : string, optional Only consider a specific target kwargs : Additional kwargs are passed on to both the loader and importer, as applicable. Any kwargs that do not match valid inputs are silently ignored. """ resolver = find_matching_resolver(self.app, inp, resolver=loader, format=format, target=target, **kwargs) importer = resolver.importer for k, v in kwargs.items(): if hasattr(importer, k) and v is not None: setattr(importer, k, v) return importer() @property def data_labels(self): """ List of data labels loaded and available in jdaviz Returns ------- data_labels : list list of strings """ return [data.label for data in self.app.data_collection] @property def plugins(self): """ Access API objects for plugins in the plugin tray. Returns ------- plugins : dict dict of plugin objects """ plugins = {item['label']: widget_serialization['from_json'](item['widget'], None).user_api for item in self.app.state.tray_items if item['is_relevant']} # handle renamed plugins during deprecation if 'Image Profiles (XY)' in plugins: # renamed in 4.0 plugins['Imviz Line Profiles (XY)'] = plugins['Image Profiles (XY)']._obj.user_api plugins['Imviz Line Profiles (XY)']._deprecation_msg = 'in the future, the formerly named \"Imviz Line Profiles (XY)\" plugin will only be available by its new name: \"Image Profiles (XY)\".' # noqa return plugins @property def plugin_tables(self): return self.app._plugin_tables @property def plugin_plots(self): return self.app._plugin_plots @property def viewers(self): """ Access API objects for any viewer. Returns ------- viewers : dict dict of viewer objects """ return {viewer._ref_or_id: viewer.user_api for viewer in self.app._viewer_store.values()} def _get_clone_viewer_reference(self, reference): base_name = reference.split("[")[0] name = base_name ind = 0 while name in self.viewers.keys(): ind += 1 name = f"{base_name}[{ind}]" return name def _set_data_component(self, data, component_label, values): if component_label in self._component_ids: component_id = self._component_ids[component_label] else: existing_components = [component.label for component in data.components] if component_label in existing_components: component_id = data.components[existing_components.index(component_label)] else: component_id = ComponentID(component_label) self._component_ids[component_label] = component_id if component_id in data.components: data.update_components({component_id: values}) else: data.add_component(values, component_id) @property @deprecated(since="4.2", alternative="plugins['Model Fitting'].fitted_models") def fitted_models(self): """ Returns the fitted models. Returns ------- parameters : dict dict of `astropy.modeling.Model` objects, or None. """ plg = self.plugins.get('Model Fitting', None) if plg is None: raise ValueError("Model Fitting plugins is not loaded") return plg.fitted_models
[docs] @deprecated(since="4.2", alternative="plugins['Model Fitting'].get_models") def get_models(self, models=None, model_label=None, x=None, y=None): """ Loop through all models and output models of the label model_label. If x or y is set, return model_labels of those (x, y) coordinates. If x and y are None, print all models regardless of coordinates. Parameters ---------- models : dict A dict of models, with the key being the label name and the value being an `astropy.modeling.CompoundModel` object. Defaults to `fitted_models` if no parameter is provided. model_label : str The name of the model that will be found and returned. If it equals default, every model present will be returned. x : int The x coordinate of the model spaxels that will be returned. y : int The y coordinate of the model spaxels that will be returned. Returns ------- selected_models : dict Dictionary of the selected models. """ plg = self.plugins.get('Model Fitting', None) if plg is None: raise ValueError("Model Fitting plugins is not loaded") return plg._obj.get_models(models=models, model_label=model_label, x=x, y=y)
[docs] @deprecated(since="4.2") def get_model_parameters(self, models=None, model_label=None, x=None, y=None): """ Convert each parameter of model inside models into a coordinate that maps the model name and parameter name to a `astropy.units.Quantity` object. Parameters ---------- models : dict A dictionary where the key is a model name and the value is an `astropy.modeling.CompoundModel` object. model_label : str Get model parameters for a particular model by inputting its label. x : int The x coordinate of the model spaxels that will be returned from get_models. y : int The y coordinate of the model spaxels that will be returned from get_models. Returns ------- :dict: a dictionary of the form {model name: {parameter name: [[`astropy.units.Quantity`]]}} for 3d models or {model name: {parameter name: `astropy.units.Quantity`}} where the Quantity object represents the parameter value and unit of one of spaxel models or the 1d models, respectively. """ plg = self.plugins.get('Model Fitting', None) if plg is None: raise ValueError("Model Fitting plugins is not loaded") return plg._obj.get_model_parameters(models=models, model_label=model_label, x=x, y=y)
[docs] def show(self, loc="inline", title=None, height=None): # pragma: no cover """Display the Jdaviz application. Parameters ---------- loc : str The display location determines where to present the viz app. Supported locations: "inline": Display the Jdaviz application inline in a notebook. Note this is functionally equivalent to displaying the cell ``viz.app`` in the notebook. "sidecar": Display the Jdaviz application in a separate JupyterLab window from the notebook, the location of which is decided by the 'anchor.' right is the default Other anchors: * ``sidecar:right`` (The default, opens a tab to the right of display) * ``sidecar:tab-before`` (Full-width tab before the current notebook) * ``sidecar:tab-after`` (Full-width tab after the current notebook) * ``sidecar:split-right`` (Split-tab in the same window right of the notebook) * ``sidecar:split-left`` (Split-tab in the same window left of the notebook) * ``sidecar:split-top`` (Split-tab in the same window above the notebook) * ``sidecar:split-bottom`` (Split-tab in the same window below the notebook) See `jupyterlab-sidecar <https://github.com/jupyter-widgets/jupyterlab-sidecar>`_ for the most up-to-date options. "popout": Display the Jdaviz application in a detached display. By default, a new window will open. Browser popup permissions required. Other anchors: * ``popout:window`` (The default, opens Jdaviz in a new, detached popout) * ``popout:tab`` (Opens Jdaviz in a new, detached tab in your browser) title : str, optional The title of the sidecar tab. Defaults to the name of the application; e.g., "specviz". NOTE: Only applicable to a "sidecar" display. height: int, optional The height of the top-level application widget, in pixels. Applies to all instances of the same application in the notebook. Notes ----- If "sidecar" is requested in the "classic" Jupyter notebook, the app will appear inline, as only JupyterLab has a mechanism to have multiple tabs. """ title = self.app.config if title is None else title if height is not None: if isinstance(height, int): height = f"{height}px" self.app.layout.height = height self.app.state.settings['context']['notebook']['max_height'] = height if self.app.config in ('specviz', 'specviz2d', 'lcviz') or self.app.state.dev_loaders: if not len(self.viewers) and not len(self.app.state.drawer_content): self.app.state.drawer_content = 'loaders' else: self.app.state.drawer_content = 'plugins' else: self.app.state.drawer_content = 'plugins' show_widget(self.app, loc=loc, title=title)
[docs] def show_in_sidecar(self, anchor=None, title=None): # pragma: no cover """ Preserved for backwards compatibility Shows Jdaviz in a sidecar with the default anchor: right """ warnings.warn('show_in_sidecar has been replaced with show(loc="sidecar")', DeprecationWarning) location = 'sidecar' if anchor is None else f"sidecar:{anchor}" return self.show(loc=location, title=title)
[docs] def show_in_new_tab(self, title=None): # pragma: no cover """ Preserved for backwards compatibility Shows Jdaviz in a sidecar in a new tab to the right """ warnings.warn('show_in_new_tab has been replaced with show(loc="sidecar:tab-after")', DeprecationWarning) return self.show(loc="sidecar:tab-after", title=title)
[docs] def toggle_api_hints(self, enabled=None): """ Toggle the visibility of API hints in the application. Parameters ---------- enabled : bool, optional If `True`, show API hints. If `False`, hide API hints. If `None`, toggle the current state. """ if enabled is None: enabled = not self.app.state.show_api_hints self.app.state.show_api_hints = enabled
def _handle_display_units(self, data, use_display_units=True): if use_display_units: if isinstance(data, Spectrum1D): spectral_unit = self.app._get_display_unit('spectral') if not spectral_unit: return data if self.app.config == 'specviz' and self.app._get_display_unit('sb'): y_unit = self.app._get_display_unit('sb') else: y_unit = self.app._get_display_unit('spectral_y') # if there is no pixel scale factor, and the requested conversion # is between flux/sb, then skip. this case is encountered when # starting the app? ideally this should raise an error, and this # should allow pix2/spaxel but it doesn't - keeping this # condition as-is until further investigation orig_sa = check_if_unit_is_per_solid_angle(u.Unit(data.flux.unit)) targ_sa = check_if_unit_is_per_solid_angle(u.Unit(y_unit)) skip_flux_conv = ('_pixel_scale_factor' not in data.meta) & (orig_sa != targ_sa) # equivalencies for flux/sb unit conversions pixar_sr = data.meta.get('_pixel_scale_factor', None) eqv = all_flux_unit_conversion_equivs(pixar_sr=pixar_sr, cube_wave=data.spectral_axis) # TODO: any other attributes (meta, wcs, etc)? # TODO: implement uncertainty.to upstream uncertainty = data.uncertainty if uncertainty is not None: # convert the uncertainties to StdDevUncertainties, since # that is assumed in a few places in jdaviz: if uncertainty.unit is None: uncertainty.unit = data.flux.unit if hasattr(uncertainty, 'represent_as'): new_uncert = uncertainty.represent_as(StdDevUncertainty) else: # if not specified as NDUncertainty, assume stddev: new_uncert = uncertainty # convert uncertainty units to display units if skip_flux_conv: new_uncert = StdDevUncertainty(new_uncert, unit=data.flux.unit) else: new_uncert_conv = flux_conversion_general(new_uncert.quantity.value, new_uncert.unit, y_unit, eqv, with_unit=False) new_uncert = StdDevUncertainty(new_uncert_conv, unit=y_unit) else: new_uncert = None # convert flux/sb units to display units if skip_flux_conv: new_y = data.flux.value * u.Unit(data.flux.unit) else: # multiply by unit rather than using with_unit because of # edge case for dimensionless we want here new_y = flux_conversion_general(data.flux.value, data.flux.unit, y_unit, eqv, with_unit=False) * u.Unit(y_unit) # convert spectral axis to display units new_spec = (spectral_axis_conversion(data.spectral_axis.value, data.spectral_axis.unit, spectral_unit) * u.Unit(spectral_unit)) data = Spectrum1D(spectral_axis=new_spec, flux=new_y, uncertainty=new_uncert, mask=data.mask) else: # pragma: nocover raise NotImplementedError(f"converting {data.__class__.__name__} to display units is not supported") # noqa return data def _get_data(self, data_label=None, spatial_subset=None, spectral_subset=None, temporal_subset=None, mask_subset=None, cls=None, use_display_units=False): list_of_valid_subset_names = [x.label for x in self.app.data_collection.subset_groups] for subset in (spatial_subset, spectral_subset, mask_subset): if subset and subset not in list_of_valid_subset_names: raise ValueError(f"Subset {subset} not in list of valid" f" subset names {list_of_valid_subset_names}") if data_label and data_label not in self.app.data_collection.labels: raise ValueError(f'{data_label} not in {self.app.data_collection.labels}.') elif not data_label and len(self.app.data_collection) > 1: raise ValueError('data_label must be set if more than' ' one data exists in data_collection.') elif not data_label and len(self.app.data_collection) == 1: data_label = self.app.data_collection[0].label if cls is not None and not isclass(cls): raise TypeError( "cls in get_data must be a class or None.") if spectral_subset: if mask_subset is not None: raise ValueError("cannot use both mask_subset and spectral_subset") # spectral_subset is applied as a mask, the only difference is that it has # its own set of validity checks (whereas mask_subset can be used by downstream # apps which would then need to do their own type checks, if necessary) mask_subset = spectral_subset if temporal_subset: if mask_subset is not None: raise ValueError("cannot use both mask_subset and spectral_subset") mask_subset = temporal_subset # End validity checks and start data retrieval data = self.app.data_collection[data_label] if not cls: if self.app.config == 'deconfigged' and data_label in self.app._default_data_cls: cls = self.app._default_data_cls.get(data_label) elif 'Trace' in data.meta: cls = None elif hasattr(data, '_native_data_cls'): cls = data._native_data_cls elif data.ndim == 2 and self.app.config == "specviz2d": cls = Spectrum1D elif data.ndim == 2: cls = CCDData elif data.ndim in [1, 3]: if self.app.config == 'rampviz': cls = NDDataArray else: # for cubeviz, specviz, mosviz, this must be a spectrum: cls = Spectrum1D object_kwargs = {} if cls == Spectrum1D: object_kwargs['statistic'] = None if not spatial_subset and not mask_subset: if 'Trace' in data.meta: # ignore cls data = data.get_object() else: data = data.get_object(cls=cls, **object_kwargs) return self._handle_display_units(data, use_display_units) if not cls and spatial_subset: raise AttributeError(f"A valid cls must be provided to" f" apply subset {spatial_subset} to data. " f"Instead, {cls} was given.") elif not cls and mask_subset: raise AttributeError(f"A valid cls must be provided to" f" apply subset {mask_subset} to data. " f"Instead, {cls} was given.") # Now we work on applying subsets to the data all_subsets = self.app.get_subsets(object_only=True) # Handle spatial subset if spatial_subset and not isinstance(all_subsets[spatial_subset][0], Region): raise ValueError(f"{spatial_subset} is not a spatial subset.") elif spatial_subset: real_spatial = [sub for subsets in self.app.data_collection.subset_groups for sub in subsets.subsets if sub.data.label == data_label and subsets.label == spatial_subset][0] handler, _ = data_translator.get_handler_for(cls) try: data = handler.to_object(real_spatial, **object_kwargs) except Exception as e: warnings.warn(f"Not able to get {data_label} returned with" f" subset {spatial_subset} applied of type {cls}." f" Exception: {e}") # Handle spectral subset, including case where spatial subset is also set if spectral_subset and not isinstance(all_subsets[spectral_subset], SpectralRegion): raise ValueError(f"{spectral_subset} is not a spectral subset.") if mask_subset: real_spectral = [sub for subsets in self.app.data_collection.subset_groups for sub in subsets.subsets if sub.data.label == data_label and subsets.label == mask_subset][0] # noqa handler, _ = data_translator.get_handler_for(cls) try: spec_subset = handler.to_object(real_spectral, **object_kwargs) except Exception as e: warnings.warn(f"Not able to get {data_label} returned with" f" subset {mask_subset} applied of type {cls}." f" Exception: {e}") if spatial_subset: # Return collapsed Spectrum1D object with spectral subset mask applied data.mask = spec_subset.mask else: data = spec_subset return self._handle_display_units(data, use_display_units)
[docs] def get_data(self, data_label=None, cls=None, use_display_units=False, **kwargs): """ Returns data with name equal to data_label of type cls. Parameters ---------- data_label : str, optional Provide a label to retrieve a specific data set from data_collection. cls : `~specutils.Spectrum1D`, `~astropy.nddata.CCDData`, optional The type that data will be returned as. use_display_units : bool, optional Whether to convert to the display units defined in the <unit-conversion> plugin. Returns ------- data : cls Data is returned as type ``cls``. """ return self._get_data(data_label=data_label, cls=cls, use_display_units=use_display_units)
[docs] class ImageConfigHelper(ConfigHelper): """`ConfigHelper` that uses an image viewer as its primary viewer. For example, Imviz and Cubeviz. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # NOTE: The first viewer must always be there and is an image viewer. self._default_viewer = self.app.get_viewer_by_id(f'{self.app.config}-0') @property def default_viewer(self): """Default viewer instance. This is typically the first viewer (e.g., "imviz-0" or "cubeviz-0").""" return self._default_viewer.user_api
[docs] @deprecated(since="4.2", alternative="subset_tools.import_region") def load_regions_from_file(self, region_file, region_format='ds9', max_num_regions=20, **kwargs): """Load regions defined in the given file. See :ref:`regions:regions_io` for supported file formats. See :meth:`load_regions` for how regions are loaded. Parameters ---------- region_file : str Path to region file. region_format : {'crtf', 'ds9', 'fits'} See :meth:`regions.Regions.get_formats`. max_num_regions : int or `None` Maximum number of regions to read from the file, starting from top of the file. Default is first 20 regions that can be successfully loaded. If `None`, it will load everything. kwargs : dict See :meth:`load_regions`. Returns ------- bad_regions : list of (obj, str) or `None` See :meth:`load_regions`. """ from regions import Regions raw_regs = Regions.read(region_file, format=region_format) return self.load_regions(raw_regs, max_num_regions=max_num_regions, **kwargs)
[docs] @deprecated(since="4.2", alternative="subset_tools.import_region") def load_regions(self, regions, max_num_regions=None, refdata_label=None, return_bad_regions=False, **kwargs): """Load given region(s) into the viewer. WCS-to-pixel translation and mask creation, if needed, is relative to the image defined by ``refdata_label``. Meanwhile, the rest of the Subset operations are based on reference image as defined by Glue. .. note:: Loading too many regions will affect performance. A valid region can be loaded into one of the following categories: * An interactive Subset, as if it was drawn by hand. This is always done for supported shapes. Its label will be ``'Subset N'``, where ``N`` is an integer. * A masked Subset that will display on the image but cannot be modified once loaded. This is done if the shape cannot be made interactive. Its label will be ``'MaskedSubset N'``, where ``N`` is an integer. Parameters ---------- regions : list of obj A list of region objects. A region object can be one of the following: * Astropy ``regions`` object * ``photutils`` apertures (limited support until ``photutils`` fully supports ``regions``) max_num_regions : int or `None` Maximum number of regions to load, starting from top of the list. Default is to load everything. refdata_label : str or `None` Label of data to use for sky-to-pixel conversion for a region, or mask creation. Data must already be loaded into Jdaviz. If `None`, defaults to the reference data in the default viewer. Choice of this data is particularly important when sky region is involved. return_bad_regions : bool If `True`, return the regions that failed to load (see ``bad_regions``); This is useful for debugging. If `False`, do not return anything (`None`). kwargs : dict Extra keywords to be passed into the region's ``to_mask`` method. **This is ignored if the region can be made interactive.** Returns ------- bad_regions : list of (obj, str) or `None` If requested (see ``return_bad_regions`` option), return a list of ``(region, reason)`` tuples for region objects that failed to load. If all the regions loaded successfully, this list will be empty. If not requested, return `None`. """ if len(self.app.data_collection) == 0: raise ValueError('Cannot load regions without data.') from photutils.aperture import (CircularAperture, SkyCircularAperture, EllipticalAperture, SkyEllipticalAperture, RectangularAperture, SkyRectangularAperture, CircularAnnulus, SkyCircularAnnulus) from regions import (Regions, CirclePixelRegion, CircleSkyRegion, EllipsePixelRegion, EllipseSkyRegion, RectanglePixelRegion, RectangleSkyRegion, CircleAnnulusPixelRegion, CircleAnnulusSkyRegion) from jdaviz.core.region_translators import regions2roi, aperture2regions # If user passes in one region obj instead of list, try to be smart. if not isinstance(regions, (list, tuple, Regions)): regions = [regions] n_loaded = 0 bad_regions = [] # To keep track of masked subsets. msg_prefix = 'MaskedSubset' msg_count = _next_subset_num(msg_prefix, self.app.data_collection.subset_groups) # Subset is global but reference data is viewer-dependent. if refdata_label is None: data = self.default_viewer._obj.state.reference_data else: data = self.app.data_collection[refdata_label] # TODO: Make this work for data cube. # https://github.com/glue-viz/glue-astronomy/issues/75 has_wcs = data_has_valid_wcs(data, ndim=2) for region in regions: if (isinstance(region, (SkyCircularAperture, SkyEllipticalAperture, SkyRectangularAperture, SkyCircularAnnulus, CircleSkyRegion, EllipseSkyRegion, RectangleSkyRegion, CircleAnnulusSkyRegion)) and not has_wcs): bad_regions.append((region, 'Sky region provided but data has no valid WCS')) continue if (isinstance(region, (CircularAperture, EllipticalAperture, RectangularAperture, CircularAnnulus, CirclePixelRegion, EllipsePixelRegion, RectanglePixelRegion, CircleAnnulusPixelRegion)) and self.app._align_by == "wcs"): bad_regions.append((region, 'Pixel region provided by data is aligned by WCS')) continue # photutils: Convert to regions shape first if isinstance(region, (CircularAperture, SkyCircularAperture, EllipticalAperture, SkyEllipticalAperture, RectangularAperture, SkyRectangularAperture, CircularAnnulus, SkyCircularAnnulus)): region = aperture2regions(region) # regions: Convert to ROI. # NOTE: Out-of-bounds ROI will succeed; this is native glue behavior. if isinstance(region, (CirclePixelRegion, CircleSkyRegion, EllipsePixelRegion, EllipseSkyRegion, RectanglePixelRegion, RectangleSkyRegion, CircleAnnulusPixelRegion, CircleAnnulusSkyRegion)): state = regions2roi(region, wcs=data.coords) # TODO: Do we want user to specify viewer? Does it matter? self.app.session.edit_subset_mode._mode = NewMode self.default_viewer._obj.apply_roi(state) self.app.session.edit_subset_mode.edit_subset = None # No overwrite next iteration # noqa # Last resort: Masked Subset that is static (if data is not a cube) elif data.ndim == 2: im = None if hasattr(region, 'to_pixel'): # Sky region: Convert to pixel region if not has_wcs: bad_regions.append((region, 'Sky region provided but data has no valid WCS')) # noqa continue region = region.to_pixel(data.coords) if hasattr(region, 'to_mask'): try: mask = region.to_mask(**kwargs) im = mask.to_image(data.shape) # Can be None except Exception as e: # pragma: no cover bad_regions.append((region, f'Failed to load: {repr(e)}')) continue # Boolean mask as input is supported but not advertised. elif (isinstance(region, np.ndarray) and region.shape == data.shape and region.dtype == np.bool_): im = region if im is None: bad_regions.append((region, 'Mask creation failed')) continue # NOTE: Region creation info is thus lost. try: subset_label = f'{msg_prefix} {msg_count}' state = MaskSubsetState(im, data.pixel_component_ids) self.app.data_collection.new_subset_group(subset_label, state) msg_count += 1 except Exception as e: # pragma: no cover bad_regions.append((region, f'Failed to load: {repr(e)}')) continue else: bad_regions.append((region, 'Mask creation failed')) continue n_loaded += 1 if max_num_regions is not None and n_loaded >= max_num_regions: break n_reg_in = len(regions) n_reg_bad = len(bad_regions) if n_loaded == 0: snack_color = "error" elif n_reg_bad > 0: snack_color = "warning" else: snack_color = "success" self.app.hub.broadcast(SnackbarMessage( f"Loaded {n_loaded}/{n_reg_in} regions, max_num_regions={max_num_regions}, " f"bad={n_reg_bad}", color=snack_color, timeout=8000, sender=self.app)) if return_bad_regions: return bad_regions
[docs] @deprecated(since="4.1", alternative="subset_tools.get_regions") def get_interactive_regions(self): """ Return spatial regions that can be interacted with in the viewer. This does not return masked regions added via :meth:`load_regions`. Unsupported region shapes will be skipped. When that happens, a yellow snackbar message will appear on display. Returns ------- regions : dict Dictionary mapping interactive region names to respective Astropy ``regions`` objects. """ from glue_astronomy.translators.regions import roi_subset_state_to_region regions = {} failed_regs = set() to_sky = self.app._align_by == 'wcs' # Subset is global, so we just use default viewer. for lyr in self.default_viewer._obj.layers: if (not hasattr(lyr, 'layer') or not isinstance(lyr.layer, Subset) or lyr.layer.ndim not in (2, 3)): continue subset_data = lyr.layer subset_label = subset_data.label try: if self.app.config == "imviz" and to_sky: region = roi_subset_state_to_region(subset_data.subset_state, to_sky=to_sky) else: region = subset_data.data.get_selection_definition( subset_id=subset_label, format='astropy-regions') except (NotImplementedError, ValueError): failed_regs.add(subset_label) else: regions[subset_label] = region if len(failed_regs) > 0: self.app.hub.broadcast(SnackbarMessage( f"Regions skipped: {', '.join(sorted(failed_regs))}", color="warning", timeout=8000, sender=self.app)) return regions
# TODO: Make this public API? def _delete_region(self, subset_label): """Delete region given the Subset label.""" all_subset_labels = [s.label for s in self.app.data_collection.subset_groups] if subset_label not in all_subset_labels: return i = all_subset_labels.index(subset_label) subset_grp = self.app.data_collection.subset_groups[i] self.app.data_collection.remove_subset_group(subset_grp)
def _next_subset_num(label_prefix, subset_groups): """Assumes ``prefix i`` format. Does not go back and fill in lower but available numbers. This is consistent with Glue. """ max_i = 0 for sg in subset_groups: if sg.label.startswith(label_prefix): sub_label = sg.label.split(' ') if len(sub_label) > 1: i_str = sub_label[-1] try: i = int(i_str) except Exception: # nosec continue else: if i > max_i: max_i = i return max_i + 1
[docs] class CubeConfigHelper(ImageConfigHelper): """Base config helper class for cubes""" _loaded_flux_cube = None _loaded_uncert_cube = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] @deprecated(since="4.1", alternative="plugins['Slice'].slice") def select_slice(self, value): """ Select the slice closest to the provided value. Parameters ---------- value : float or int, optional Slice value to select in units of the x-axis of the profile viewer. The nearest slice will be selected if "snap to slice" is enabled in the slice plugin. """ msg = SliceSelectSliceMessage(value=value, sender=self) self.app.hub.broadcast(msg)