Source code for jdaviz.core.helpers

"""
Helper classes are meant to provide a convenient user API for specific
configurations. They allow a separation of "viztool-specific" API and the glue
application objects.

See also https://github.com/spacetelescope/jdaviz/issues/104 for more details
on the motivation behind this concept.
"""
import re
import warnings
from contextlib import contextmanager
from inspect import isclass

import numpy as np
import astropy.units as u
from astropy.nddata import CCDData, StdDevUncertainty
from regions.core.core import Region
from glue.core import HubListener
from glue.core.edit_subset_mode import NewMode
from glue.core.message import SubsetCreateMessage, SubsetDeleteMessage
from glue.core.subset import Subset, MaskSubsetState
from glue.config import data_translator
from ipywidgets.widgets import widget_serialization
from specutils import Spectrum1D, SpectralRegion


from jdaviz.app import Application
from jdaviz.core.events import SnackbarMessage, ExitBatchLoadMessage
from jdaviz.core.template_mixin import show_widget
from jdaviz.utils import data_has_valid_wcs

__all__ = ['ConfigHelper', 'ImageConfigHelper']


[docs] class ConfigHelper(HubListener): """The Base Helper Class. Provides shared abstracted helper methods to the user. Subclasses should set ``_default_configuration`` if they are meant to be used with a specific configuration. Parameters ---------- app : `~jdaviz.app.Application` or `None` The application object, or if `None`, creates a new one based on the default configuration for this helper. verbosity : {'debug', 'info', 'warning', 'error'} Verbosity of the popup messages in the application. history_verbosity : {'debug', 'info', 'warning', 'error'} Verbosity of the history logger in the application. """ _default_configuration = 'default' def __init__(self, app=None, verbosity='warning', history_verbosity='info'): if app is None: self.app = Application(configuration=self._default_configuration) else: self.app = app self.app.verbosity = verbosity self.app.history_verbosity = history_verbosity # give a reference from the app back to this config helper. These can be accessed from a # viewer via viewer.jdaviz_app and viewer.jdaviz_helper # if the helper has already been set, this is probably a nested viz tool. Don't overwrite if self.app._jdaviz_helper is None: self.app._jdaviz_helper = self self.app.hub.subscribe(self, SubsetCreateMessage, handler=lambda msg: self._propagate_callback_to_viewers('_on_subset_create', msg)) # noqa self.app.hub.subscribe(self, SubsetDeleteMessage, handler=lambda msg: self._propagate_callback_to_viewers('_on_subset_delete', msg)) # noqa self._in_batch_load = 0 self._delayed_show_in_viewer_labels = {} # label: viewer_reference pairs def _propagate_callback_to_viewers(self, method, msg): # viewers don't have access to the app/hub to subscribe to messages, so we'll # catch all messages here and pass them on to each of the viewers that # have the applicable method implemented. for viewer in self.app._viewer_store.values(): if hasattr(viewer, method): getattr(viewer, method)(msg)
[docs] @contextmanager def batch_load(self): """ Context manager to delay linking and loading data into viewers """ # we'll use a counter instead of a boolean to allow the user to nest multiple # context managers. Once they're all exited, then the linking/showing will # take place. self._in_batch_load += 1 with self.app.data_collection.delay_link_manager_update(): # user entrypoint (anything within the with-statement will get called here) yield self._in_batch_load -= 1 if not self._in_batch_load: self.app.hub.broadcast(ExitBatchLoadMessage(sender=self.app)) # add any data to viewers that were requested but deferred for data_label, viewer_ref in self._delayed_show_in_viewer_labels.items(): self.app.set_data_visibility(viewer_ref, data_label, visible=True, replace=False) self._delayed_show_in_viewer_labels = {}
[docs] def load_data(self, data, data_label=None, parser_reference=None, **kwargs): if data_label: kwargs['data_label'] = data_label self.app.load_data(data, parser_reference=parser_reference, **kwargs)
@property def data_labels(self): """ List of data labels loaded and available in jdaviz Returns ------- data_labels : list list of strings """ return [data.label for data in self.app.data_collection] @property def plugins(self): """ Access API objects for plugins in the plugin tray. Returns ------- plugins : dict dict of plugin objects """ plugins = {item['label']: widget_serialization['from_json'](item['widget'], None).user_api for item in self.app.state.tray_items if item['is_relevant']} # handle renamed plugins during deprecation if 'Orientation' in plugins: plugins['Links Control'] = plugins['Orientation']._obj.user_api plugins['Links Control']._deprecation_msg = 'in the future, the formerly named \"Links Control\" plugin will only be available by its new name: \"Orientation\".' # noqa if 'Canvas Rotation' in plugins: plugins['Canvas Rotation']._deprecation_msg = 'this functionality will be removed in favor of the implementation for rotation in the \"Orientation\" plugin.' # noqa if 'Export' in plugins: plugins['Export Plot'] = plugins['Export']._obj.user_api plugins['Export Plot']._deprecation_msg = 'in the future, the formerly named \"Export Plot\" plugin will only be available by its new name: \"Export\".' # noqa return plugins @property def plugin_tables(self): return self.app._plugin_tables @property def plugin_plots(self): return self.app._plugin_plots @property def viewers(self): """ Access API objects for any viewer. Returns ------- viewers : dict dict of viewer objects """ return {viewer._ref_or_id: viewer.user_api for viewer in self.app._viewer_store.values()} @property def fitted_models(self): """ Returns the fitted models. Returns ------- parameters : dict dict of `astropy.modeling.Model` objects, or None. """ return self.app.fitted_models
[docs] def get_models(self, models=None, model_label=None, x=None, y=None): """ Loop through all models and output models of the label model_label. If x or y is set, return model_labels of those (x, y) coordinates. If x and y are None, print all models regardless of coordinates. Parameters ---------- models : dict A dict of models, with the key being the label name and the value being an `astropy.modeling.CompoundModel` object. Defaults to `fitted_models` if no parameter is provided. model_label : str The name of the model that will be found and returned. If it equals default, every model present will be returned. x : int The x coordinate of the model spaxels that will be returned. y : int The y coordinate of the model spaxels that will be returned. Returns ------- selected_models : dict Dictionary of the selected models. """ selected_models = {} # If models is not provided, use the app's fitted models if not models: models = self.fitted_models # Loop through all keys in the dict models for label in models: # Prevent "Model 2" from being returned when model_label is "Model" if model_label is not None: if label.split(" (")[0] != model_label: continue # If no label was provided, use label name without coordinates. if model_label is None and " (" in label: find_label = label.split(" (")[0] # If coordinates are not present, just use the label. elif model_label is None: find_label = label else: find_label = model_label # If x and y are set, return keys that match the model plus that # coordinate pair. If only x or y is set, return keys that fit # that value for the appropriate coordinate. if x is not None and y is not None: find_label = r"{} \({}, {}\)".format(find_label, x, y) elif x: find_label = r"{} \({}, .+\)".format(find_label, x) elif y: find_label = r"{} \(.+, {}\)".format(find_label, y) if re.search(find_label, label): selected_models[label] = models[label] return selected_models
[docs] def get_model_parameters(self, models=None, model_label=None, x=None, y=None): """ Convert each parameter of model inside models into a coordinate that maps the model name and parameter name to a `astropy.units.Quantity` object. Parameters ---------- models : dict A dictionary where the key is a model name and the value is an `astropy.modeling.CompoundModel` object. model_label : str Get model parameters for a particular model by inputting its label. x : int The x coordinate of the model spaxels that will be returned from get_models. y : int The y coordinate of the model spaxels that will be returned from get_models. Returns ------- :dict: a dictionary of the form {model name: {parameter name: [[`astropy.units.Quantity`]]}} for 3d models or {model name: {parameter name: `astropy.units.Quantity`}} where the Quantity object represents the parameter value and unit of one of spaxel models or the 1d models, respectively. """ if models and model_label: models = self.get_models(models=models, model_label=model_label, x=x, y=y) elif models is None and model_label: models = self.get_models(model_label=model_label, x=x, y=y) elif models is None: models = self.fitted_models data_shapes = {} for label in models: data_label = label.split(" (")[0] if data_label not in data_shapes: data_shapes[data_label] = self.app.data_collection[data_label].data.shape param_dict = {} parameters_cube = {} param_x_y = {} param_units = {} for label in models: # 3d models take the form of "Model (1,2)" so this if statement # looks for that style and separates out the pertinent information. if " (" in label: label_split = label.split(" (") model_name = label_split[0] x = int(label_split[1].split(", ")[0]) y = int(label_split[1].split(", ")[1][:-1]) # x and y values are added to this dict where they will be used # to convert the models of each spaxel into a single # coordinate in the parameters_cube dictionary. if model_name not in param_x_y: param_x_y[model_name] = {'x': [], 'y': []} if x not in param_x_y[model_name]['x']: param_x_y[model_name]['x'].append(x) if y not in param_x_y[model_name]['y']: param_x_y[model_name]['y'].append(y) # 1d models will be handled by this else statement. else: model_name = label if model_name not in param_dict: param_dict[model_name] = list(models[label].param_names) # This adds another dictionary as the value of # parameters_cube[model_name] where the key is the parameter name # and the value is either a 2d array of zeros or a single value, depending # on whether the model in question is 3d or 1d, respectively. for model_name in param_dict: if model_name in param_x_y: parameters_cube[model_name] = {x: np.zeros(shape=data_shapes[model_name][:2]) for x in param_dict[model_name]} else: parameters_cube[model_name] = {x: 0 for x in param_dict[model_name]} # This loop handles the actual placement of param.values and # param.units into the parameter_cubes dictionary. for label in models: if " (" in label: label_split = label.split(" (") model_name = label_split[0] # If the get_models method is used to build a dictionary of # models and a value is set for the x or y parameters, that # will mean that only one x or y value is present in the # models. if len(param_x_y[model_name]['x']) == 1: x = 0 else: x = int(label_split[1].split(", ")[0]) if len(param_x_y[model_name]['y']) == 1: y = 0 else: y = int(label_split[1].split(", ")[1][:-1]) param_units[model_name] = {} for name in param_dict[model_name]: param = getattr(models[label], name) parameters_cube[model_name][name][x][y] = param.value param_units[model_name][name] = param.unit else: model_name = label param_units[model_name] = {} # 1d models do not have anything set of param.unit, so the # return_units and input_units properties need to be used # instead, depending on the type of parameter `name` is. for name in param_dict[model_name]: param = getattr(models[label], name) parameters_cube[model_name][name] = param.value param_units[model_name][name] = param.unit # Convert values of parameters_cube[key][param_name] into u.Quantity # objects that contain the appropriate unit set in # param_units[key][param_name] for key in parameters_cube: for param_name in parameters_cube[key]: parameters_cube[key][param_name] = u.Quantity( parameters_cube[key][param_name], param_units[key].get(param_name, None)) return parameters_cube
[docs] def show(self, loc="inline", title=None, height=None): # pragma: no cover """Display the Jdaviz application. Parameters ---------- loc : str The display location determines where to present the viz app. Supported locations: "inline": Display the Jdaviz application inline in a notebook. Note this is functionally equivalent to displaying the cell ``viz.app`` in the notebook. "sidecar": Display the Jdaviz application in a separate JupyterLab window from the notebook, the location of which is decided by the 'anchor.' right is the default Other anchors: * ``sidecar:right`` (The default, opens a tab to the right of display) * ``sidecar:tab-before`` (Full-width tab before the current notebook) * ``sidecar:tab-after`` (Full-width tab after the current notebook) * ``sidecar:split-right`` (Split-tab in the same window right of the notebook) * ``sidecar:split-left`` (Split-tab in the same window left of the notebook) * ``sidecar:split-top`` (Split-tab in the same window above the notebook) * ``sidecar:split-bottom`` (Split-tab in the same window below the notebook) See `jupyterlab-sidecar <https://github.com/jupyter-widgets/jupyterlab-sidecar>`_ for the most up-to-date options. "popout": Display the Jdaviz application in a detached display. By default, a new window will open. Browser popup permissions required. Other anchors: * ``popout:window`` (The default, opens Jdaviz in a new, detached popout) * ``popout:tab`` (Opens Jdaviz in a new, detached tab in your browser) title : str, optional The title of the sidecar tab. Defaults to the name of the application; e.g., "specviz". NOTE: Only applicable to a "sidecar" display. height: int, optional The height of the top-level application widget, in pixels. Applies to all instances of the same application in the notebook. Notes ----- If "sidecar" is requested in the "classic" Jupyter notebook, the app will appear inline, as only JupyterLab has a mechanism to have multiple tabs. """ title = self.app.config if title is None else title if height is not None: if isinstance(height, int): height = f"{height}px" self.app.layout.height = height self.app.state.settings['context']['notebook']['max_height'] = height show_widget(self.app, loc=loc, title=title)
[docs] def show_in_sidecar(self, anchor=None, title=None): # pragma: no cover """ Preserved for backwards compatibility Shows Jdaviz in a sidecar with the default anchor: right """ warnings.warn('show_in_sidecar has been replaced with show(loc="sidecar")', DeprecationWarning) location = 'sidecar' if anchor is None else f"sidecar:{anchor}" return self.show(loc=location, title=title)
[docs] def show_in_new_tab(self, title=None): # pragma: no cover """ Preserved for backwards compatibility Shows Jdaviz in a sidecar in a new tab to the right """ warnings.warn('show_in_new_tab has been replaced with show(loc="sidecar:tab-after")', DeprecationWarning) return self.show(loc="sidecar:tab-after", title=title)
def _get_data(self, data_label=None, spatial_subset=None, spectral_subset=None, mask_subset=None, function=None, cls=None, use_display_units=False): def _handle_display_units(data, use_display_units): if use_display_units: if isinstance(data, Spectrum1D): spectral_unit = self.app._get_display_unit('spectral') if not spectral_unit: return data if self.app.config == 'cubeviz' and spectral_unit == 'deg': # this happens before the correct axis is set for the spectrum-viewer # and would result in a unit-conversion error if attempting to convert # to the display units. This should only ever be temporary during # app intialization. return data flux_unit = self.app._get_display_unit('flux') # TODO: any other attributes (meta, wcs, etc)? # TODO: implement uncertainty.to upstream uncertainty = data.uncertainty if uncertainty is not None: # convert the uncertainties to StdDevUncertainties, since # that is assumed in a few places in jdaviz: if uncertainty.unit is None: uncertainty.unit = data.flux.unit if hasattr(uncertainty, 'represent_as'): new_uncert = uncertainty.represent_as( StdDevUncertainty ).quantity.to(flux_unit) else: # if not specified as NDUncertainty, assume stddev: new_uncert = uncertainty.quantity.to(flux_unit) new_uncert = StdDevUncertainty(new_uncert, unit=flux_unit) else: new_uncert = None data = Spectrum1D(spectral_axis=data.spectral_axis.to(spectral_unit, u.spectral()), flux=data.flux.to(flux_unit, u.spectral_density(data.spectral_axis)), uncertainty=new_uncert, mask=data.mask) else: # pragma: nocover raise NotImplementedError(f"converting {data.__class__.__name__} to display units is not supported") # noqa return data list_of_valid_function_values = ('minimum', 'maximum', 'mean', 'median', 'sum') if function and function not in list_of_valid_function_values: raise ValueError(f"function {function} not in list of valid" f" function values {list_of_valid_function_values}") list_of_valid_subset_names = [x.label for x in self.app.data_collection.subset_groups] for subset in (spatial_subset, spectral_subset, mask_subset): if subset and subset not in list_of_valid_subset_names: raise ValueError(f"Subset {subset} not in list of valid" f" subset names {list_of_valid_subset_names}") if data_label and data_label not in self.app.data_collection.labels: raise ValueError(f'{data_label} not in {self.app.data_collection.labels}.') elif not data_label and len(self.app.data_collection) > 1: raise ValueError('data_label must be set if more than' ' one data exists in data_collection.') elif not data_label and len(self.app.data_collection) == 1: data_label = self.app.data_collection[0].label if cls is not None and not isclass(cls): raise TypeError( "cls in get_data must be a class or None.") if spectral_subset: if mask_subset is not None: raise ValueError("cannot use both mask_subset and spectral_subset") # spectral_subset is applied as a mask, the only difference is that it has # its own set of validity checks (whereas mask_subset can be used by downstream # apps which would then need to do their own type checks, if necessary) mask_subset = spectral_subset # End validity checks and start data retrieval data = self.app.data_collection[data_label] if not cls: if 'Trace' in data.meta: cls = None elif data.ndim == 2 and self.app.config == "specviz2d": cls = Spectrum1D elif data.ndim == 2: cls = CCDData elif data.ndim in [1, 3]: cls = Spectrum1D object_kwargs = {} if cls == Spectrum1D: object_kwargs['statistic'] = function if not spatial_subset and not mask_subset: if 'Trace' in data.meta: if cls is not None: # pragma: no cover raise ValueError("cls not supported for Trace object") data = data.get_object() else: data = data.get_object(cls=cls, **object_kwargs) return _handle_display_units(data, use_display_units) if not cls and spatial_subset: raise AttributeError(f"A valid cls must be provided to" f" apply subset {spatial_subset} to data. " f"Instead, {cls} was given.") elif not cls and mask_subset: raise AttributeError(f"A valid cls must be provided to" f" apply subset {mask_subset} to data. " f"Instead, {cls} was given.") # Now we work on applying subsets to the data all_subsets = self.app.get_subsets(object_only=True) # Handle spatial subset if spatial_subset and not isinstance(all_subsets[spatial_subset][0], Region): raise ValueError(f"{spatial_subset} is not a spatial subset.") elif spatial_subset: real_spatial = [sub for subsets in self.app.data_collection.subset_groups for sub in subsets.subsets if sub.data.label == data_label and subsets.label == spatial_subset][0] handler, _ = data_translator.get_handler_for(cls) try: data = handler.to_object(real_spatial, **object_kwargs) except Exception as e: warnings.warn(f"Not able to get {data_label} returned with" f" subset {spatial_subset} applied of type {cls}." f" Exception: {e}") elif function: # This covers the case where cubeviz.get_data is called using a spectral_subset # with function set. data = data.get_object(cls=cls, **object_kwargs) # Handle spectral subset, including case where spatial subset is also set if spectral_subset and not isinstance(all_subsets[spectral_subset], SpectralRegion): raise ValueError(f"{spectral_subset} is not a spectral subset.") if mask_subset: real_spectral = [sub for subsets in self.app.data_collection.subset_groups for sub in subsets.subsets if sub.data.label == data_label and subsets.label == mask_subset][0] # noqa handler, _ = data_translator.get_handler_for(cls) try: spec_subset = handler.to_object(real_spectral, **object_kwargs) except Exception as e: warnings.warn(f"Not able to get {data_label} returned with" f" subset {mask_subset} applied of type {cls}." f" Exception: {e}") if spatial_subset or function: # Return collapsed Spectrum1D object with spectral subset mask applied data.mask = spec_subset.mask else: data = spec_subset return _handle_display_units(data, use_display_units)
[docs] def get_data(self, data_label=None, cls=None, use_display_units=False, **kwargs): """ Returns data with name equal to data_label of type cls. Parameters ---------- data_label : str, optional Provide a label to retrieve a specific data set from data_collection. cls : `~specutils.Spectrum1D`, `~astropy.nddata.CCDData`, optional The type that data will be returned as. use_display_units : bool, optional Whether to convert to the display units defined in the <unit-conversion> plugin. kwargs : dict For Cubeviz, you could also pass in ``function`` (str) to collapse the cube into 1D spectrum using provided function. Returns ------- data : cls Data is returned as type ``cls``. """ return self._get_data(data_label=data_label, cls=cls, use_display_units=use_display_units, **kwargs)
[docs] class ImageConfigHelper(ConfigHelper): """`ConfigHelper` that uses an image viewer as its primary viewer. For example, Imviz and Cubeviz. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # NOTE: The first viewer must always be there and is an image viewer. self._default_viewer = self.app.get_viewer_by_id(f'{self.app.config}-0') @property def default_viewer(self): """Default viewer instance. This is typically the first viewer (e.g., "imviz-0" or "cubeviz-0").""" return self._default_viewer.user_api
[docs] def load_regions_from_file(self, region_file, region_format='ds9', max_num_regions=20, **kwargs): """Load regions defined in the given file. See :ref:`regions:regions_io` for supported file formats. See :meth:`load_regions` for how regions are loaded. Parameters ---------- region_file : str Path to region file. region_format : {'crtf', 'ds9', 'fits'} See :meth:`regions.Regions.get_formats`. max_num_regions : int or `None` Maximum number of regions to read from the file, starting from top of the file. Default is first 20 regions that can be successfully loaded. If `None`, it will load everything. kwargs : dict See :meth:`load_regions`. Returns ------- bad_regions : list of (obj, str) or `None` See :meth:`load_regions`. """ from regions import Regions raw_regs = Regions.read(region_file, format=region_format) return self.load_regions(raw_regs, max_num_regions=max_num_regions, **kwargs)
[docs] def load_regions(self, regions, max_num_regions=None, refdata_label=None, return_bad_regions=False, **kwargs): """Load given region(s) into the viewer. WCS-to-pixel translation and mask creation, if needed, is relative to the image defined by ``refdata_label``. Meanwhile, the rest of the Subset operations are based on reference image as defined by Glue. .. note:: Loading too many regions will affect performance. A valid region can be loaded into one of the following categories: * An interactive Subset, as if it was drawn by hand. This is always done for supported shapes. Its label will be ``'Subset N'``, where ``N`` is an integer. * A masked Subset that will display on the image but cannot be modified once loaded. This is done if the shape cannot be made interactive. Its label will be ``'MaskedSubset N'``, where ``N`` is an integer. Parameters ---------- regions : list of obj A list of region objects. A region object can be one of the following: * Astropy ``regions`` object * ``photutils`` apertures (limited support until ``photutils`` fully supports ``regions``) max_num_regions : int or `None` Maximum number of regions to load, starting from top of the list. Default is to load everything. refdata_label : str or `None` Label of data to use for sky-to-pixel conversion for a region, or mask creation. Data must already be loaded into Jdaviz. If `None`, defaults to the reference data in the default viewer. Choice of this data is particularly important when sky region is involved. return_bad_regions : bool If `True`, return the regions that failed to load (see ``bad_regions``); This is useful for debugging. If `False`, do not return anything (`None`). kwargs : dict Extra keywords to be passed into the region's ``to_mask`` method. **This is ignored if the region can be made interactive.** Returns ------- bad_regions : list of (obj, str) or `None` If requested (see ``return_bad_regions`` option), return a list of ``(region, reason)`` tuples for region objects that failed to load. If all the regions loaded successfully, this list will be empty. If not requested, return `None`. """ if len(self.app.data_collection) == 0: raise ValueError('Cannot load regions without data.') from photutils.aperture import (CircularAperture, SkyCircularAperture, EllipticalAperture, SkyEllipticalAperture, RectangularAperture, SkyRectangularAperture, CircularAnnulus, SkyCircularAnnulus) from regions import (Regions, CirclePixelRegion, CircleSkyRegion, EllipsePixelRegion, EllipseSkyRegion, RectanglePixelRegion, RectangleSkyRegion, CircleAnnulusPixelRegion, CircleAnnulusSkyRegion) from jdaviz.core.region_translators import regions2roi, aperture2regions # If user passes in one region obj instead of list, try to be smart. if not isinstance(regions, (list, tuple, Regions)): regions = [regions] n_loaded = 0 bad_regions = [] # To keep track of masked subsets. msg_prefix = 'MaskedSubset' msg_count = _next_subset_num(msg_prefix, self.app.data_collection.subset_groups) # Subset is global but reference data is viewer-dependent. if refdata_label is None: data = self.default_viewer._obj.state.reference_data else: data = self.app.data_collection[refdata_label] # TODO: Make this work for data cube. # https://github.com/glue-viz/glue-astronomy/issues/75 has_wcs = data_has_valid_wcs(data, ndim=2) for region in regions: if (isinstance(region, (SkyCircularAperture, SkyEllipticalAperture, SkyRectangularAperture, SkyCircularAnnulus, CircleSkyRegion, EllipseSkyRegion, RectangleSkyRegion, CircleAnnulusSkyRegion)) and not has_wcs): bad_regions.append((region, 'Sky region provided but data has no valid WCS')) continue if (isinstance(region, (CircularAperture, EllipticalAperture, RectangularAperture, CircularAnnulus, CirclePixelRegion, EllipsePixelRegion, RectanglePixelRegion, CircleAnnulusPixelRegion)) and self.app._link_type == "wcs"): bad_regions.append((region, 'Pixel region provided by data is linked by WCS')) continue # photutils: Convert to regions shape first if isinstance(region, (CircularAperture, SkyCircularAperture, EllipticalAperture, SkyEllipticalAperture, RectangularAperture, SkyRectangularAperture, CircularAnnulus, SkyCircularAnnulus)): region = aperture2regions(region) # regions: Convert to ROI. # NOTE: Out-of-bounds ROI will succeed; this is native glue behavior. if isinstance(region, (CirclePixelRegion, CircleSkyRegion, EllipsePixelRegion, EllipseSkyRegion, RectanglePixelRegion, RectangleSkyRegion, CircleAnnulusPixelRegion, CircleAnnulusSkyRegion)): state = regions2roi(region, wcs=data.coords) # TODO: Do we want user to specify viewer? Does it matter? self.app.session.edit_subset_mode._mode = NewMode self.default_viewer._obj.apply_roi(state) self.app.session.edit_subset_mode.edit_subset = None # No overwrite next iteration # noqa # Last resort: Masked Subset that is static (if data is not a cube) elif data.ndim == 2: im = None if hasattr(region, 'to_pixel'): # Sky region: Convert to pixel region if not has_wcs: bad_regions.append((region, 'Sky region provided but data has no valid WCS')) # noqa continue region = region.to_pixel(data.coords) if hasattr(region, 'to_mask'): try: mask = region.to_mask(**kwargs) im = mask.to_image(data.shape) # Can be None except Exception as e: # pragma: no cover bad_regions.append((region, f'Failed to load: {repr(e)}')) continue # Boolean mask as input is supported but not advertised. elif (isinstance(region, np.ndarray) and region.shape == data.shape and region.dtype == np.bool_): im = region if im is None: bad_regions.append((region, 'Mask creation failed')) continue # NOTE: Region creation info is thus lost. try: subset_label = f'{msg_prefix} {msg_count}' state = MaskSubsetState(im, data.pixel_component_ids) self.app.data_collection.new_subset_group(subset_label, state) msg_count += 1 except Exception as e: # pragma: no cover bad_regions.append((region, f'Failed to load: {repr(e)}')) continue else: bad_regions.append((region, 'Mask creation failed')) continue n_loaded += 1 if max_num_regions is not None and n_loaded >= max_num_regions: break n_reg_in = len(regions) n_reg_bad = len(bad_regions) if n_loaded == 0: snack_color = "error" elif n_reg_bad > 0: snack_color = "warning" else: snack_color = "success" self.app.hub.broadcast(SnackbarMessage( f"Loaded {n_loaded}/{n_reg_in} regions, max_num_regions={max_num_regions}, " f"bad={n_reg_bad}", color=snack_color, timeout=8000, sender=self.app)) if return_bad_regions: return bad_regions
[docs] def get_interactive_regions(self): """Return spatial regions that can be interacted with in the viewer. This does not return masked regions added via :meth:`load_regions`. Unsupported region shapes will be skipped. When that happens, a yellow snackbar message will appear on display. Returns ------- regions : dict Dictionary mapping interactive region names to respective Astropy ``regions`` objects. """ from glue_astronomy.translators.regions import roi_subset_state_to_region regions = {} failed_regs = set() to_sky = self.app._link_type == 'wcs' # Subset is global, so we just use default viewer. for lyr in self.default_viewer._obj.layers: if (not hasattr(lyr, 'layer') or not isinstance(lyr.layer, Subset) or lyr.layer.ndim not in (2, 3)): continue subset_data = lyr.layer subset_label = subset_data.label # TODO: Remove this when Jdaviz support round-tripping, see # https://github.com/spacetelescope/jdaviz/pull/721 if not subset_label.startswith('Subset'): continue try: if self.app.config == "imviz" and to_sky: region = roi_subset_state_to_region(subset_data.subset_state, to_sky=to_sky) else: region = subset_data.data.get_selection_definition( subset_id=subset_label, format='astropy-regions') except (NotImplementedError, ValueError): failed_regs.add(subset_label) else: regions[subset_label] = region if len(failed_regs) > 0: self.app.hub.broadcast(SnackbarMessage( f"Regions skipped: {', '.join(sorted(failed_regs))}", color="warning", timeout=8000, sender=self.app)) return regions
# See https://github.com/glue-viz/glue-jupyter/issues/253 def _apply_interactive_region(self, toolname, from_pix, to_pix): """Mimic interactive region drawing. This is for internal testing only. """ self.app.session.edit_subset_mode._mode = NewMode tool = self.default_viewer._obj.toolbar.tools[toolname] tool.activate() tool.interact.brushing = True tool.interact.selected = [from_pix, to_pix] tool.interact.brushing = False self.app.session.edit_subset_mode.edit_subset = None # No overwrite next iteration # TODO: Make this public API? def _delete_region(self, subset_label): """Delete region given the Subset label.""" all_subset_labels = [s.label for s in self.app.data_collection.subset_groups] if subset_label not in all_subset_labels: return i = all_subset_labels.index(subset_label) subset_grp = self.app.data_collection.subset_groups[i] self.app.data_collection.remove_subset_group(subset_grp) # TODO: Make this public API? def _delete_all_regions(self): """Delete all regions.""" for subset_grp in self.app.data_collection.subset_groups: # should be a copy self.app.data_collection.remove_subset_group(subset_grp)
def _next_subset_num(label_prefix, subset_groups): """Assumes ``prefix i`` format. Does not go back and fill in lower but available numbers. This is consistent with Glue. """ max_i = 0 for sg in subset_groups: if sg.label.startswith(label_prefix): sub_label = sg.label.split(' ') if len(sub_label) > 1: i_str = sub_label[-1] try: i = int(i_str) except Exception: # nosec continue else: if i > max_i: max_i = i return max_i + 1