Source code for jdaviz.configs.default.plugins.subset_plugin.subset_plugin

import os

import numpy as np

from astropy.time import Time
import astropy.units as u
from glue.core.message import EditSubsetMessage, SubsetUpdateMessage
from glue.core.edit_subset_mode import (AndMode, AndNotMode, OrMode,
                                        ReplaceMode, XorMode)
from glue.core.roi import CircularROI, CircularAnnulusROI, EllipticalROI, RectangularROI
from glue.core.subset import RoiSubsetState, RangeSubsetState, CompositeSubsetState
from glue.icons import icon_path
from glue_jupyter.widgets.subset_mode_vuetify import SelectionModeMenu
from glue_jupyter.common.toolbar_vuetify import read_icon
from traitlets import Any, List, Unicode, Bool, observe

from jdaviz.core.events import SnackbarMessage, GlobalDisplayUnitChanged, LinkUpdatedMessage
from jdaviz.core.registries import tray_registry
from jdaviz.core.template_mixin import PluginTemplateMixin, DatasetSelectMixin, SubsetSelect
from jdaviz.core.tools import ICON_DIR
from jdaviz.utils import MultiMaskSubsetState

from jdaviz.configs.default.plugins.subset_plugin import utils


__all__ = ['SubsetPlugin']

SUBSET_MODES = {
    'replace': ReplaceMode,
    'OrState': OrMode,
    'AndState': AndMode,
    'XorState': XorMode,
    'AndNotState': AndNotMode,
    'RangeSubsetState': RangeSubsetState,
    'RoiSubsetState': RoiSubsetState
}


[docs] @tray_registry('g-subset-plugin', label="Subset Tools") class SubsetPlugin(PluginTemplateMixin, DatasetSelectMixin): template_file = __file__, "subset_plugin.vue" select = List([]).tag(sync=True) subset_items = List([]).tag(sync=True) subset_selected = Any().tag(sync=True) mode_selected = Unicode('add').tag(sync=True) show_region_info = Bool(True).tag(sync=True) subset_types = List([]).tag(sync=True) subset_definitions = List([]).tag(sync=True) glue_state_types = List([]).tag(sync=True) has_subset_details = Bool(False).tag(sync=True) subplugins_opened = Any().tag(sync=True) multiselect = Bool(False).tag(sync=True) # multiselect only for subset is_centerable = Bool(False).tag(sync=True) can_simplify = Bool(False).tag(sync=True) can_freeze = Bool(False).tag(sync=True) icon_replace = Unicode(read_icon(os.path.join(icon_path("glue_replace", icon_format="svg")), 'svg+xml')).tag(sync=True) # noqa icon_or = Unicode(read_icon(os.path.join(icon_path("glue_or", icon_format="svg")), 'svg+xml')).tag(sync=True) # noqa icon_and = Unicode(read_icon(os.path.join(icon_path("glue_and", icon_format="svg")), 'svg+xml')).tag(sync=True) # noqa icon_xor = Unicode(read_icon(os.path.join(icon_path("glue_xor", icon_format="svg")), 'svg+xml')).tag(sync=True) # noqa icon_andnot = Unicode(read_icon(os.path.join(icon_path("glue_andnot", icon_format="svg")), 'svg+xml')).tag(sync=True) # noqa icon_radialtocheck = Unicode(read_icon(os.path.join(ICON_DIR, 'radialtocheck.svg'), 'svg+xml')).tag(sync=True) # noqa icon_checktoradial = Unicode(read_icon(os.path.join(ICON_DIR, 'checktoradial.svg'), 'svg+xml')).tag(sync=True) # noqa def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.components = { 'g-subset-mode': SelectionModeMenu(session=self.session) } self.session.hub.subscribe(self, EditSubsetMessage, handler=self._sync_selected_from_state) self.session.hub.subscribe(self, SubsetUpdateMessage, handler=self._on_subset_update) self.session.hub.subscribe(self, GlobalDisplayUnitChanged, handler=self._on_display_unit_changed) self.session.hub.subscribe(self, LinkUpdatedMessage, handler=self._on_link_update) self.subset_select = SubsetSelect(self, 'subset_items', 'subset_selected', multiselect='multiselect', default_text="Create New") self.subset_states = [] self.spectral_display_unit = None link_type = getattr(self.app, '_link_type', None) self.display_sky_coordinates = (link_type == 'wcs' and not self.multiselect) def _on_link_update(self, *args): """When linking is changed pixels<>wcs, change display units of the subset plugin from pixel (for pixel linking) to sky (for WCS linking). If there is an active selection in the subset plugin, push this change to the UI upon link change by calling _get_subset_definition, which will re-determine how to display subset information.""" link_type = getattr(self.app, '_link_type', None) self.display_sky_coordinates = (link_type == 'wcs') if self.subset_selected != self.subset_select.default_text: self._get_subset_definition(*args) def _sync_selected_from_state(self, *args): if not hasattr(self, 'subset_select') or self.multiselect: # during initial init, this can trigger before the component is initialized return if self.session.edit_subset_mode.edit_subset == []: if self.subset_selected != self.subset_select.default_text: self.subset_selected = self.subset_select.default_text self.show_region_info = False else: new_label = self.session.edit_subset_mode.edit_subset[0].label if new_label != self.subset_selected: if new_label not in [s['label'] for s in self.subset_items]: self._sync_available_from_state() self.subset_selected = self.session.edit_subset_mode.edit_subset[0].label self.show_region_info = True def _on_subset_update(self, *args): self._sync_selected_from_state(*args) if 'Create New' in self.subset_selected: return self._get_subset_definition(*args) subset_to_update = self.session.edit_subset_mode.edit_subset[0] self.subset_select._update_subset(subset_to_update, attribute="type") def _sync_available_from_state(self, *args): if not hasattr(self, 'subset_select'): # during initial init, this can trigger before the component is initialized return self.subset_items = [{'label': self.subset_select.default_text}] + [ self.subset_select._subset_to_dict(subset) for subset in self.data_collection.subset_groups] @observe('subset_selected') def _sync_selected_from_ui(self, change): self.subset_definitions = [] self.subset_types = [] self.glue_state_types = [] self.is_centerable = False if not hasattr(self, 'subset_select'): # during initial init, this can trigger before the component is initialized return if change['new'] != self.subset_select.default_text: self._get_subset_definition(change['new']) self.show_region_info = change['new'] != self.subset_select.default_text m = [s for s in self.app.data_collection.subset_groups if s.label == change['new']] if m != self.session.edit_subset_mode.edit_subset: self.session.edit_subset_mode.edit_subset = m def _unpack_get_subsets_for_ui(self): """ Convert what app.get_subsets returns into something the UI of this plugin can display. """ if self.multiselect: self.is_centerable = True return include_sky_region = bool(self.display_sky_coordinates) subset_information = self.app.get_subsets(self.subset_selected, simplify_spectral=False, use_display_units=True, include_sky_region=include_sky_region) _around_decimals = 6 # Avoid 30 degrees from coming back as 29.999999999999996 if not subset_information: return if ((len(subset_information) == 1) and (isinstance(subset_information[0]["subset_state"], RangeSubsetState) or (isinstance(subset_information[0]["subset_state"], RoiSubsetState) and isinstance(subset_information[0]["subset_state"].roi, (CircularROI, RectangularROI, EllipticalROI))))): self.is_centerable = True else: self.is_centerable = False for spec in subset_information: subset_definition = [] subset_type = '' subset_state = spec["subset_state"] glue_state = spec["glue_state"] if isinstance(subset_state, RoiSubsetState): subset_definition.append({ "name": "Parent", "att": "parent", "value": subset_state.xatt.parent.label, "orig": subset_state.xatt.parent.label}) sky_region = spec['sky_region'] if self.display_sky_coordinates and (sky_region is not None): subset_definition += utils._sky_region_to_subset_def(sky_region) else: if isinstance(subset_state.roi, CircularROI): x, y = subset_state.roi.center() r = subset_state.roi.radius subset_definition += [ {"name": "X Center (pixels)", "att": "xc", "value": x, "orig": x}, {"name": "Y Center (pixels)", "att": "yc", "value": y, "orig": y}, {"name": "Radius (pixels)", "att": "radius", "value": r, "orig": r}] elif isinstance(subset_state.roi, RectangularROI): for att in ("Xmin", "Xmax", "Ymin", "Ymax"): real_att = att.lower() val = getattr(subset_state.roi, real_att) subset_definition.append( {"name": att + " (pixels)", "att": real_att, "value": val, "orig": val}) theta = np.around(np.degrees(subset_state.roi.theta), decimals=_around_decimals) subset_definition.append({"name": "Angle", "att": "theta", "value": theta, "orig": theta}) elif isinstance(subset_state.roi, EllipticalROI): xc, yc = subset_state.roi.center() rx = subset_state.roi.radius_x ry = subset_state.roi.radius_y theta = np.around(np.degrees(subset_state.roi.theta), decimals=_around_decimals) subset_definition += [ {"name": "X Center (pixels)", "att": "xc", "value": xc, "orig": xc}, {"name": "Y Center (pixels)", "att": "yc", "value": yc, "orig": yc}, {"name": "X Radius (pixels)", "att": "radius_x", "value": rx, "orig": rx}, {"name": "Y Radius (pixels)", "att": "radius_y", "value": ry, "orig": ry}, {"name": "Angle", "att": "theta", "value": theta, "orig": theta}] elif isinstance(subset_state.roi, CircularAnnulusROI): xc, yc = subset_state.roi.center() inner_r = subset_state.roi.inner_radius outer_r = subset_state.roi.outer_radius subset_definition += [{"name": "X Center (pixels)", "att": "xc", "value": xc, "orig": xc}, {"name": "Y Center (pixels)", "att": "yc", "value": yc, "orig": yc}, {"name": "Inner Radius (pixels)", "att": "inner_radius", "value": inner_r, "orig": inner_r}, {"name": "Outer Radius (pixels)", "att": "outer_radius", "value": outer_r, "orig": outer_r}] else: # pragma: no cover raise NotImplementedError(f"Unable to translate {subset_state.roi.__class__.__name__}") # noqa: E501 subset_type = subset_state.roi.__class__.__name__ elif isinstance(subset_state, RangeSubsetState): region = spec['region'] if isinstance(region, Time): lo = region.min() hi = region.max() subset_definition = [{"name": "Lower bound", "att": "lo", "value": lo.value, "orig": lo.value}, {"name": "Upper bound", "att": "hi", "value": hi.value, "orig": hi.value}] else: lo = region.lower hi = region.upper subset_definition = [{"name": "Lower bound", "att": "lo", "value": lo.value, "orig": lo.value, "unit": str(lo.unit)}, {"name": "Upper bound", "att": "hi", "value": hi.value, "orig": hi.value, "unit": str(hi.unit)}] subset_type = "Range" elif isinstance(subset_state, MultiMaskSubsetState): total_masked = subset_state.total_masked_first_data() subset_definition = [{"name": "Masked values", "att": "masked", "value": total_masked, "orig": total_masked}] subset_type = "Mask" if len(subset_definition) > 0: # Note: .append() does not work for List traitlet. self.subset_definitions = self.subset_definitions + [subset_definition] self.subset_types = self.subset_types + [subset_type] self.glue_state_types = self.glue_state_types + [glue_state] self.subset_states = self.subset_states + [subset_state] simplifiable_states = set(['AndState', 'XorState', 'AndNotState']) # Check if the subset has more than one subregion, is a range subset # type, and uses one of the states that can be simplified. Mask subset # types cannot be simplified so subsets contained that are skipped. if 'Mask' in self.subset_types: self.can_simplify = False elif (len(self.subset_states) > 1 and isinstance(self.subset_states[0], RangeSubsetState) and len(simplifiable_states - set(self.glue_state_types)) < 3): self.can_simplify = True elif (len(self.subset_states) > 1 and isinstance(self.subset_states[0], RangeSubsetState) and self.app.is_there_overlap_spectral_subset(self.subset_selected)): self.can_simplify = True else: self.can_simplify = False def _get_subset_definition(self, *args): """ Retrieve the parameters defining the selected subset, for example the upper and lower bounds for a simple spectral subset. """ self.subset_definitions = [] self.subset_types = [] self.glue_state_types = [] self.subset_states = [] self._unpack_get_subsets_for_ui()
[docs] def vue_freeze_subset(self, *args): sgs = {sg.label: sg for sg in self.app.data_collection.subset_groups} sg = sgs.get(self.subset_selected) masks = {} for data in self.app.data_collection: masks[data.uuid] = sg.subset_state.to_mask(data) sg.subset_state = MultiMaskSubsetState(masks)
[docs] def vue_simplify_subset(self, *args): if self.multiselect: self.hub.broadcast(SnackbarMessage("Cannot simplify spectral subset " "when multiselect is active", color='warning', sender=self)) return if len(self.subset_states) < 2: self.hub.broadcast(SnackbarMessage("Cannot simplify spectral subset " "of length less than 2", color='warning', sender=self)) return att = self.subset_states[0].att self.app.simplify_spectral_subset(subset_name=self.subset_selected, att=att, overwrite=True)
def _on_display_unit_changed(self, msg): # We only care about the spectral units, since flux units don't affect spectral subsets if msg.axis == "spectral": self.spectral_display_unit = msg.unit if self.subset_selected != self.subset_select.default_text: self._get_subset_definition(self.subset_selected)
[docs] def vue_update_subset(self, *args): if self.multiselect: self.hub.broadcast(SnackbarMessage("Cannot update subset " "when multiselect is active", color='warning', sender=self)) return status, reason = self._check_input() if not status: self.hub.broadcast(SnackbarMessage(reason, color='error', sender=self)) return for index, sub in enumerate(self.subset_definitions): if len(self.subset_states) <= index: return sub_states = self.subset_states[index] # we need to push updates to subset in pixels. to do this when wcs # linked, convert the updated subset parameters from sky to pix wcs = None if self.display_sky_coordinates: wcs = self.app._get_wcs_from_subset(sub_states) if wcs is not None: # convert newly entered sky coords to pixel updated_skyreg = utils._subset_def_to_region(self.subset_types[index], sub) # noqa updated_pixreg_attrs = utils._get_pixregion_params_in_dict(updated_skyreg.to_pixel(wcs)) # noqa # convert previous entered sky coords to pixel orig_skyreg = utils._subset_def_to_region(self.subset_types[index], sub, val='orig') # noqa orig_pixreg_attrs = utils._get_pixregion_params_in_dict(orig_skyreg.to_pixel(wcs)) # noqa for d_att in sub: if d_att["att"] == 'parent': # Read-only continue if self.display_sky_coordinates and (wcs is not None): d_att["value"] = updated_pixreg_attrs[d_att["att"]] d_att["orig"] = orig_pixreg_attrs[d_att["att"]] if (d_att["att"] == 'theta') and (self.display_sky_coordinates is False): # Humans use degrees but glue uses radians # We've already enforced this in wcs linking in _get_pixregion_params_in_dict d_val = np.radians(d_att["value"]) else: d_val = float(d_att["value"]) # Convert from display unit to original unit if necessary if self.subset_types[index] == "Range": if self.spectral_display_unit is not None: x_att = sub_states.att base_units = self.app.data_collection[0].get_component(x_att).units if self.spectral_display_unit != base_units: d_val = d_val*u.Unit(self.spectral_display_unit) d_val = d_val.to(u.Unit(base_units)) d_val = d_val.value if float(d_att["orig"]) != d_val: if self.subset_types[index] == "Range": setattr(sub_states, d_att["att"], d_val) else: setattr(sub_states.roi, d_att["att"], d_val) self._push_update_to_ui()
def _push_update_to_ui(self, subset_name=None): """ Forces the UI to update how it represents the subset. Parameters ---------- subset_name : str The name of the subset that is being updated. """ if not subset_name: subset_name = self.subset_selected try: dc = self.data_collection subsets = dc.subset_groups subset_to_update = subsets[[x.label for x in subsets].index(subset_name)] self.session.edit_subset_mode.edit_subset = [subset_to_update] self.session.edit_subset_mode._combine_data(subset_to_update.subset_state, override_mode=ReplaceMode) except Exception as err: # pragma: no cover self.hub.broadcast(SnackbarMessage( f"Failed to update Subset: {repr(err)}", color='error', sender=self)) def _check_input(self): status = True reason = "" for index, sub in enumerate(self.subset_definitions): lo = hi = xmin = xmax = ymin = ymax = None inner_radius = outer_radius = None for d_att in sub: if d_att["att"] == "lo": lo = d_att["value"] elif d_att["att"] == "hi": hi = d_att["value"] elif d_att["att"] == "radius" and d_att["value"] <= 0: status = False reason = "Failed to update Subset: radius must be a positive scalar" break elif d_att["att"] == "xmin": xmin = d_att["value"] elif d_att["att"] == "xmax": xmax = d_att["value"] elif d_att["att"] == "ymin": ymin = d_att["value"] elif d_att["att"] == "ymax": ymax = d_att["value"] elif d_att["att"] == "outer_radius": outer_radius = d_att["value"] elif d_att["att"] == "inner_radius": inner_radius = d_att["value"] if lo and hi and hi <= lo: status = False reason = "Failed to update Subset: lower bound must be less than upper bound" break elif xmin and xmax and ymin and ymax and (xmax - xmin <= 0 or ymax - ymin <= 0): status = False reason = "Failed to update Subset: width and length must be positive scalars" break elif inner_radius and outer_radius and inner_radius >= outer_radius: status = False reason = "Failed to update Subset: inner radius must be less than outer radius" break return status, reason
[docs] def vue_recenter_subset(self, *args): # Composite region cannot be centered. This only works for Imviz. if not self.is_centerable or self.config != 'imviz': # no-op raise NotImplementedError( f'Cannot recenter: is_centerable={self.is_centerable}, config={self.config}') from astropy.wcs.utils import pixel_to_pixel from photutils.aperture import ApertureStats from jdaviz.core.region_translators import regions2aperture, _get_region_from_spatial_subset def _do_recentering(subset, subset_state): try: reg = _get_region_from_spatial_subset(self, subset_state) aperture = regions2aperture(reg) data = self.dataset.selected_dc_item comp = data.get_component(data.main_components[0]) comp_data = comp.data phot_aperstats = ApertureStats(comp_data, aperture, wcs=data.coords) # Sky region from WCS linking, need to convert centroid back to pixels. if hasattr(reg, "to_pixel"): # Centroid was calculated in selected data. # However, Subset is always defined w.r.t. its parent, # so we need to convert back. x, y = pixel_to_pixel( data.coords, subset_state.xatt.parent.coords, phot_aperstats.xcentroid, phot_aperstats.ycentroid) else: x = phot_aperstats.xcentroid y = phot_aperstats.ycentroid if not np.all(np.isfinite((x, y))): raise ValueError(f'Invalid centroid ({x}, {y})') except Exception as err: self.set_center(self.get_center(subset_name=subset), subset_name=subset, update=False) self.hub.broadcast(SnackbarMessage( f"Failed to calculate centroid: {repr(err)}", color='error', sender=self)) else: self.set_center((x, y), subset_name=subset, update=True) if not self.multiselect: _do_recentering(self.subset_selected, self.subset_select.selected_subset_state) else: for sub, subset_state in zip(self.subset_selected, self.subset_select.selected_subset_state): if (sub != self.subset_select.default_text and not isinstance(subset_state, CompositeSubsetState)): self.is_centerable = True _do_recentering(sub, subset_state) elif (sub != self.subset_select.default_text and isinstance(subset_state, CompositeSubsetState)): self.hub.broadcast(SnackbarMessage(f"Unable to recenter " f"composite subset {sub}", color='error', sender=self))
def _get_subset_state(self, subset_name=None): if self.multiselect and not subset_name: raise ValueError("Please include subset_name in when in multiselect mode") if subset_name is not None: return self.subset_select._get_subset_state(subset_name) # guaranteed to only return a single entry because of check above return self.subset_select.selected_subset_state
[docs] def get_center(self, subset_name=None): """Return the center of the Subset. This may or may not be the centroid obtain from data. Parameters ---------- subset_name : str The name of the subset that is being updated. Returns ------- cen : number, tuple of numbers, or `None` The center of the Subset in ``x`` or ``(x, y)``, depending on the Subset type, if applicable. If Subset is not centerable, this returns `None`. """ # Composite region cannot be centered. if not self.is_centerable: # no-op return subset_state = self._get_subset_state(subset_name) return subset_state.center()
[docs] def set_center(self, new_cen, subset_name=None, update=False): """Set the desired center for the selected Subset, if applicable. If Subset is not centerable, nothing is done. Parameters ---------- new_cen : number or tuple of numbers The new center defined either as ``x`` or ``(x, y)``, depending on the Subset type. subset_name : str The name of the subset that is being updated. update : bool If `True`, the Subset is also moved to the new center. Otherwise, only the relevant editable fields are updated but the Subset is not moved. Raises ------ NotImplementedError Subset type is not supported. """ # Composite region cannot be centered, so just grab first element. if not self.is_centerable: # no-op return subset_state = self._get_subset_state(subset_name) if isinstance(subset_state, RoiSubsetState): x, y = new_cen # x and y are arrays so this converts them back to floats x = float(x) y = float(y) sbst_obj = subset_state.roi if isinstance(sbst_obj, (CircularROI, CircularAnnulusROI, EllipticalROI)): sbst_obj.move_to(x, y) elif isinstance(sbst_obj, RectangularROI): sbst_obj.move_to(x, y) else: # pragma: no cover raise NotImplementedError(f'Recentering of {sbst_obj.__class__} is not supported') elif isinstance(subset_state, RangeSubsetState): subset_state.move_to(new_cen) else: # pragma: no cover raise NotImplementedError( f'Getting center of {subset_state.__class__} is not supported') if update: self._push_update_to_ui(subset_name=subset_name) else: # Force UI to update on browser without changing the subset. tmp = self.subset_definitions self.subset_definitions = [] self.subset_definitions = tmp
# List of JSON-like dict is nice for front-end but a pain to look up, # so we use these helper functions. def _get_value_from_subset_definition(self, index, name, desired_key): subset_definition = self.subset_definitions[index] value = None for item in subset_definition: if item['name'] == name: value = item[desired_key] break return value def _set_value_in_subset_definition(self, index, name, desired_key, new_value): for i in range(len(self.subset_definitions[index])): if self.subset_definitions[index][i]['name'] == name: self.subset_definitions[index][i]['value'] = new_value break