import math
import numpy as np
from traitlets import Bool, Unicode, observe
from astropy import units as u
from bqplot import LinearScale
from glue.core import BaseData
from glue.core.subset_group import GroupedSubset
from glue_jupyter.bqplot.image.layer_artist import BqplotImageSubsetLayerArtist
from jdaviz.configs.cubeviz.plugins.viewers import CubevizImageView
from jdaviz.configs.imviz.plugins.viewers import ImvizImageView
from jdaviz.configs.mosviz.plugins.viewers import (MosvizImageView, MosvizProfileView,
MosvizProfile2DView)
from jdaviz.configs.specviz.plugins.viewers import SpecvizProfileView
from jdaviz.core.events import ViewerAddedMessage
from jdaviz.core.helpers import data_has_valid_wcs
from jdaviz.core.marks import PluginScatter, PluginLine
from jdaviz.core.registries import tool_registry
from jdaviz.core.template_mixin import TemplateMixin, DatasetSelectMixin
from jdaviz.utils import get_subset_type
__all__ = ['CoordsInfo']
[docs]
@tool_registry('g-coords-info')
class CoordsInfo(TemplateMixin, DatasetSelectMixin):
template_file = __file__, "coords_info.vue"
_supported_viewer_classes = (SpecvizProfileView,
ImvizImageView,
CubevizImageView,
MosvizImageView,
MosvizProfile2DView)
_viewer_classes_with_marker = (SpecvizProfileView, MosvizProfile2DView)
dataset_icon = Unicode("").tag(sync=True) # option for layer (auto, none, or specific layer)
icon = Unicode("").tag(sync=True) # currently exposed layer
row1a_title = Unicode("").tag(sync=True)
row1a_text = Unicode("").tag(sync=True)
row1b_title = Unicode("").tag(sync=True)
row1b_text = Unicode("").tag(sync=True)
row1_unreliable = Bool(False).tag(sync=True)
row2_title = Unicode("").tag(sync=True)
row2_text = Unicode("").tag(sync=True)
row2_unreliable = Bool(False).tag(sync=True)
row3_title = Unicode("").tag(sync=True)
row3_text = Unicode("").tag(sync=True)
row3_unreliable = Bool(False).tag(sync=True)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._marks = {}
self._dict = {} # dictionary representation of current mouseover info
self._x, self._y = None, None # latest known cursor positions
# subscribe/unsubscribe to mouse events across all existing viewers
viewer_refs = []
for viewer in self.app._viewer_store.values():
if isinstance(viewer, self._supported_viewer_classes):
self._create_viewer_callbacks(viewer)
viewer_refs.append(viewer.reference_id)
self.dataset._manual_options = ['auto', 'none']
self.dataset.filters = ['layer_in_viewers', 'is_not_wcs_only', 'layer_is_not_dq']
if self.app.config == 'imviz':
# filter out scatter-plot entries (from add_markers API, for example)
self.dataset.add_filter('is_image')
# we also want to include auto-collapsed spectra (spatial subsets)
self.dataset._cubeviz_include_spatial_subsets()
# subscribe to mouse events on any new viewers
self.hub.subscribe(self, ViewerAddedMessage, handler=self._on_viewer_added)
def _create_marks_for_viewer(self, viewer, id=None):
if id is None:
id = viewer.reference_id
if id in self._marks:
return
if isinstance(viewer, MosvizProfile2DView):
self._marks[id] = PluginLine(viewer,
x=[0, 0], y=[0, 1],
scales={'x': viewer.scales['x'],
'y': LinearScale(min=0, max=1)},
visible=False)
else:
self._marks[id] = PluginScatter(viewer,
marker='rectangle', stroke_width=1,
visible=False)
if isinstance(viewer, MosvizProfileView):
matched_id = f"{id}:matched"
self._marks[matched_id] = PluginLine(viewer,
x=[0, 0], y=[0, 1],
scales={'x': viewer.scales['x'],
'y': LinearScale(min=0, max=1)},
visible=False)
viewer.figure.marks = viewer.figure.marks + [self._marks[matched_id]]
viewer.figure.marks = viewer.figure.marks + [self._marks[id]]
def _create_viewer_callbacks(self, viewer):
if isinstance(viewer, self._supported_viewer_classes):
if isinstance(viewer, self._viewer_classes_with_marker):
self._create_marks_for_viewer(viewer)
callback = self._viewer_callback(viewer, self._viewer_mouse_event)
viewer.add_event_callback(callback, events=['mousemove', 'mouseleave', 'mouseenter'])
viewer.state.add_callback('layers', lambda msg: self._layers_changed(viewer))
def _on_viewer_added(self, msg):
self._create_viewer_callbacks(self.app.get_viewer_by_id(msg.viewer_id))
@property
def marks(self):
"""
Access the marks created by this plugin.
"""
if self._marks:
# TODO: replace with cache property?
return self._marks
# create marks for each of the spectral viewers (will need a listener event to create marks
# for new viewers if dynamic creation of spectral viewers is ever supported)
for id, viewer in self.app._viewer_store.items():
if isinstance(viewer, self._viewer_classes_with_marker):
self._create_marks_for_viewer(viewer, id)
return self._marks
@property
def _matched_markers(self):
if self.app.config == 'specviz2d':
return {'specviz2d-0': ['specviz2d-1:matched'],
'specviz2d-1': ['specviz2d-0']}
if self.app.config == 'mosviz':
return {'mosviz-1': ['mosviz-2:matched'],
'mosviz-2': ['mosviz-1']}
return {}
[docs]
def as_text(self):
return (f"{self.row1a_title} {self.row1a_text} {self.row1b_title} {self.row1b_text}".strip(), # noqa
f"{self.row2_title} {self.row2_text}".strip(),
f"{self.row3_title} {self.row3_text}".strip())
[docs]
def as_dict(self):
return self._dict
[docs]
def reset_coords_display(self):
self.row1a_title = '\u00A0' # to force empty line if no other content
self.row1a_text = ""
self.row1b_title = ""
self.row1b_text = ""
self.row1_unreliable = False
self.row2_title = '\u00A0'
self.row2_text = ""
self.row2_unreliable = False
self.row3_title = '\u00A0'
self.row3_text = ""
self.row3_unreliable = False
self.icon = ""
self._dict = {}
def _viewer_mouse_clear_event(self, viewer, data=None):
self.reset_coords_display()
marker_ids = [viewer._reference_id] + self._matched_markers.get(viewer._reference_id, [])
for marker_id in marker_ids:
marks = self.marks.get(marker_id)
if marks is not None:
marks.visible = False
def _viewer_mouse_event(self, viewer, data):
if data['event'] in ('mouseleave', 'mouseenter'):
self._viewer_mouse_clear_event(viewer, data)
return
if len(self.app.data_collection) < 1:
self._viewer_mouse_clear_event(viewer)
return
# otherwise a mousemove event, we need to get cursor coordinates and update the display
# Extract data coordinates - these are pixels in the reference image
x = data['domain']['x']
y = data['domain']['y']
if x is None or y is None: # Out of bounds
self._viewer_mouse_clear_event(viewer)
return
# update last known cursor position (so another event like a change in layers can update
# the coordinates with the last known position)
self._x, self._y = x, y
self.update_display(viewer, x=x, y=y)
def _layers_changed(self, viewer):
if self._x is None or self._y is None:
return
# update display for a (possible) change to the active layer based on the last known
# cursor position
self.update_display(viewer, self._x, self._y)
@observe('dataset_selected')
def _selected_dataset_changed(self, *args):
if self.dataset_selected == 'auto':
self.dataset_icon = 'mdi-auto-fix'
elif self.dataset_selected == 'none':
self.dataset_icon = 'mdi-cursor-default'
else:
self.dataset_icon = self.app.state.layer_icons.get(self.dataset_selected, '')
[docs]
def vue_next_layer(self, *args, **kwargs):
self.dataset.select_next()
[docs]
def update_display(self, viewer, x, y):
self._dict = {}
if isinstance(viewer, SpecvizProfileView):
self._spectrum_viewer_update(viewer, x, y)
elif isinstance(viewer,
(ImvizImageView, CubevizImageView, MosvizImageView, MosvizProfile2DView)):
self._image_viewer_update(viewer, x, y)
def _image_shape_inds(self, image):
# return the indices in image.shape for the x and y dimension, respectively
if image.ndim == 3:
# cubeviz case
return (0, 1) # (ix_shape, iy_shape)
elif image.ndim == 2:
return (1, 0) # (ix_shape, iy_shape)
else: # pragma: no cover
raise ValueError(f'does not support ndim={image.ndim}')
def _get_cube_value(self, image, arr, x, y, viewer):
if image.ndim == 3:
# cubeviz case:
return arr[int(round(x)), int(round(y)), viewer.state.slices[-1]]
elif image.ndim == 2:
return arr[int(round(y)), int(round(x))]
else: # pragma: no cover
raise ValueError(f'does not support ndim={image.ndim}')
def _image_viewer_update(self, viewer, x, y):
# Display the current cursor coordinates (both pixel and world) as
# well as data values. For now we use the first dataset in the
# viewer for the data values.
# Extract first dataset from visible layers and use this for coordinates - the choice
# of dataset shouldn't matter if the datasets are linked correctly
active_layer = viewer.active_image_layer
if active_layer is None:
self._viewer_mouse_clear_event(viewer)
return
if self.dataset.selected == 'auto':
image = active_layer.layer
elif self.dataset.selected == 'none':
active_layer = viewer.layers[0].state
image = viewer.layers[0].layer
else:
for layer in viewer.layers:
if layer.layer.label == self.dataset.selected and layer.visible:
if isinstance(layer, BqplotImageSubsetLayerArtist):
# cannot expose info for spatial subset layers
continue
active_layer = layer.state
image = layer.layer
break
else:
image = None
# If there is one, get the associated DQ layer for the active layer:
associated_dq_layers = None
available_plugins = [tray_item['name'] for tray_item in self.app.state.tray_items]
if 'g-data-quality' in available_plugins:
assoc_children = self.app._get_assoc_data_children(active_layer.layer.label)
if assoc_children:
data_quality_plugin = self.app.get_tray_item_from_name('g-data-quality')
viewer_obj = self.app.get_viewer(viewer)
associated_dq_layers = data_quality_plugin.get_dq_layers(viewer_obj)
unreliable_pixel, unreliable_world = False, False
self._dict['axes_x'] = x
self._dict['axes_x:unit'] = 'pix'
self._dict['axes_y'] = y
self._dict['axes_y:unit'] = 'pix'
# set default empty values
if self.dataset.selected != 'none' and image is not None:
self.icon = self.app.state.layer_icons.get(image.label, '') # noqa
self._dict['data_label'] = image.label
# Separate logic for each viewer type, ultimately needs to result in extracting sky coords.
# NOTE: pixel_to_world axes order is opposite of array value axes order, so...
# 3D: pixel_to_world(z, y, x) -> arr[x, y, z]
# 2D: pixel_to_world(x, y) -> arr[y, x]
if self.dataset.selected == 'none' or image is None:
self.icon = 'mdi-cursor-default'
self._dict['data_label'] = ''
coords_status = False
elif isinstance(viewer, ImvizImageView):
x, y, coords_status, (unreliable_world, unreliable_pixel) = viewer._get_real_xy(image, x, y) # noqa
if coords_status:
try:
sky = image.coords.pixel_to_world(x, y).icrs
except Exception: # WCS might not be celestial
coords_status = False
elif isinstance(viewer, CubevizImageView):
# TODO: This assumes data_collection[0] is the main reference
# data for this application. This section will need to be updated
# when that is no longer true.
# Hack to insert WCS for generated 2D and 3D images using FLUX cube WCS.
if 'Plugin' in getattr(image, 'meta', {}) and not image.coords:
coo_data = self.app.data_collection[0]
else:
coo_data = image
if '_orig_spec' in getattr(coo_data, 'meta', {}):
# Hack around various WCS propagation issues in Cubeviz, example:
# https://github.com/glue-viz/glue-astronomy/issues/75
data_wcs = coo_data.meta['_orig_spec'].wcs
wcs_ndim = 3
elif data_has_valid_wcs(coo_data):
data_wcs = coo_data.coords
wcs_ndim = coo_data.ndim
else:
data_wcs = None
if data_wcs:
try:
if wcs_ndim == 3:
sky = data_wcs.pixel_to_world(viewer.slice, y, x)[1].icrs
else: # wcs_ndim == 2
sky = data_wcs.pixel_to_world(x, y).icrs
except Exception:
coords_status = False
else:
coords_status = True
else:
self.reset_coords_display()
coords_status = False
slice_plugin = self.app._jdaviz_helper.plugins.get('Slice', None)
if slice_plugin is not None and len(image.shape) == 3:
# float to be compatible with default value of nan
self._dict['slice'] = float(viewer.slice)
self._dict['spectral_axis'] = slice_plugin.value
self._dict['spectral_axis:unit'] = slice_plugin._obj.value_unit
elif isinstance(viewer, MosvizImageView):
if data_has_valid_wcs(image, ndim=2):
try:
sky = image.coords.pixel_to_world(x, y).icrs
except Exception: # WCS might not be celestial # pragma: no cover
coords_status = False
else:
coords_status = True
else: # pragma: no cover
self.reset_coords_display()
coords_status = False
elif isinstance(viewer, MosvizProfile2DView):
self._dict['spectral_axis'] = self._dict['axes_x']
self._dict['spectral_axis:unit'] = self._dict['axes_x:unit']
self._dict['value'] = self._dict['axes_y']
self._dict['value:unit'] = self._dict['axes_y:unit']
coords_status = False
if coords_status:
celestial_coordinates = sky.to_string('hmsdms', precision=4, pad=True).split()
celestial_coordinates_deg = sky.to_string('decimal', precision=10, pad=True).split()
world_ra = celestial_coordinates[0]
world_dec = celestial_coordinates[1]
world_ra_deg = celestial_coordinates_deg[0]
world_dec_deg = celestial_coordinates_deg[1]
if "nan" in (world_ra, world_dec, world_ra_deg, world_dec_deg):
self.reset_coords_display()
self.row2_title = 'World'
self.row2_text = f'{world_ra} {world_dec} (ICRS)'
self.row2_unreliable = unreliable_world
self.row3_title = ''
self.row3_text = f'{world_ra_deg} {world_dec_deg} (deg)'
self.row3_unreliable = unreliable_world
# TODO: use sky directly, but need to figure out how to have a compatible "blank" entry
self._dict['world'] = (sky.ra.value, sky.dec.value)
self._dict['world:unreliable'] = unreliable_world
elif isinstance(viewer, MosvizProfile2DView) and hasattr(getattr(image, 'coords', None),
'pixel_to_world'):
# use WCS to expose the wavelength for a 2d spectrum shown in pixel space
try:
wave, pixel = image.coords.pixel_to_world(x, y)
except Exception: # WCS might not be valid # pragma: no cover
coords_status = False
else:
self.row2_title = 'Wave'
self.row2_text = f'{wave.value:10.5e} {wave.unit.to_string()}'
self.row2_unreliable = False
self.row3_title = '\u00A0'
self.row3_text = ""
self.row3_unreliable = False
else:
self.row2_title = '\u00A0'
self.row2_text = ""
self.row2_unreliable = False
self.row3_title = '\u00A0'
self.row3_text = ""
self.row3_unreliable = False
maxsize = int(np.ceil(np.log10(np.max(active_layer.layer.shape)))) + 3
fmt = 'x={0:0' + str(maxsize) + '.1f} y={1:0' + str(maxsize) + '.1f}'
self.row1a_title = 'Pixel'
self.row1a_text = (fmt.format(x, y))
self.row1_unreliable = unreliable_pixel
self._dict['pixel'] = (float(x), float(y))
self._dict['pixel:unreliable'] = unreliable_pixel
# Extract data values at this position.
# TODO: for now we just use the first visible layer but we should think
# of how to display values when multiple datasets are present.
if self.dataset.selected == 'none' or image is None:
# no data values to extract
self.row1b_title = ''
self.row1b_text = ''
return
# Extract data values at this position.
# Check if shape is [x, y, z] or [y, x] and show value accordingly.
ix_shape, iy_shape = self._image_shape_inds(image)
if (-0.5 < x < image.shape[ix_shape] - 0.5 and -0.5 < y < image.shape[iy_shape] - 0.5
and hasattr(active_layer, 'attribute')):
attribute = active_layer.attribute
if isinstance(viewer, (ImvizImageView, MosvizImageView, MosvizProfile2DView)):
value = image.get_data(attribute)[int(round(y)), int(round(x))]
if associated_dq_layers is not None:
associated_dq_layer = associated_dq_layers[0]
dq_attribute = associated_dq_layer.state.attribute
dq_data = associated_dq_layer.layer.get_data(dq_attribute)
dq_value = dq_data[int(round(y)), int(round(x))]
unit = image.get_component(attribute).units
elif isinstance(viewer, CubevizImageView):
arr = image.get_component(attribute).data
unit = image.get_component(attribute).units
value = self._get_cube_value(image, arr, x, y, viewer)
if associated_dq_layers is not None:
associated_dq_layer = associated_dq_layers[0]
dq_attribute = associated_dq_layer.state.attribute
dq_data = associated_dq_layer.layer.get_data(dq_attribute)
dq_value = self._get_cube_value(image, dq_data, x, y, viewer)
self.row1b_title = 'Value'
if associated_dq_layers is not None:
if np.isnan(dq_value):
dq_text = ''
else:
dq_text = f' (DQ: {int(dq_value):d})'
else:
dq_text = ''
self.row1b_text = f'{value:+10.5e} {unit}{dq_text}'
self._dict['value'] = float(value)
self._dict['value:unit'] = unit
self._dict['value:unreliable'] = unreliable_pixel
else:
self.row1b_title = ''
self.row1b_text = ''
if isinstance(viewer, MosvizProfile2DView):
self.marks[viewer._reference_id].update_xy([x, x], [0, 1])
self.marks[viewer._reference_id].visible = True
for matched_marker_id in self._matched_markers.get(viewer._reference_id, []):
if coords_status and hasattr(getattr(image, 'coords', None), 'pixel_to_world'):
# should already have wave computed from setting the coords-info
matched_viewer = self.app.get_viewer(matched_marker_id.split(':matched')[0])
wave = wave.to_value(matched_viewer.state.x_display_unit)
self.marks[matched_marker_id].update_xy([wave, wave], [0, 1])
self.marks[matched_marker_id].visible = True
else:
self.marks[matched_marker_id].visible = False
def _spectrum_viewer_update(self, viewer, x, y):
def _cursor_fallback():
self._dict['axes_x'] = x
self._dict['axes_x:unit'] = viewer.state.x_display_unit
self._dict['axes_y'] = y
self._dict['axes_y:unit'] = viewer.state.y_display_unit
self._dict['data_label'] = ''
def _copy_axes_to_spectral():
self._dict['spectral_axis'] = self._dict['axes_x']
self._dict['spectral_axis:unit'] = self._dict['axes_x:unit']
self._dict['value'] = self._dict['axes_y']
self._dict['value:unit'] = self._dict['axes_y:unit']
if not len(viewer.state.layers):
return
self.row1a_title = 'Cursor'
self.row1a_text = f'{x:10.5e}, {y:10.5e}'
# show the locked marker/coords only if either no tool or the default tool is active
if self.dataset.selected == 'none':
self.row2_title = '\u00A0'
self.row2_text = ''
self.row3_title = '\u00A0'
self.row3_text = ''
self.icon = 'mdi-cursor-default'
self.marks[viewer._reference_id].visible = False
_cursor_fallback()
_copy_axes_to_spectral()
return
# Snap to the closest data point, not the actual mouse location.
sp = None
closest_i = None
closest_wave = None
closest_flux = None
closest_icon = 'mdi-cursor-default'
closest_distance = None
for lyr in viewer.state.layers:
if self.dataset.selected == 'auto' and not lyr.visible:
continue
if self.dataset.selected != 'auto' and self.dataset.selected != lyr.layer.label:
continue
if isinstance(lyr.layer, GroupedSubset):
subset_state = getattr(lyr.layer, 'subset_state', None)
if subset_state is None:
continue
subset_type = get_subset_type(subset_state)
if subset_type == 'spectral':
# then this is a SPECTRAL subset
continue
# For use later in data retrieval
subset_label = lyr.layer.label
data_label = lyr.layer.data.label
elif ((not isinstance(lyr.layer, BaseData)) or (lyr.layer.ndim not in (1, 3))):
continue
else:
subset_label = None
data_label = lyr.layer.label
try:
# Cache should have been populated when spectrum was first plotted.
# But if not (maybe user changed statistic), we cache it here too.
statistic = getattr(viewer.state, 'function', None)
cache_key = (lyr.layer.label, statistic)
if cache_key in self.app._get_object_cache:
sp = self.app._get_object_cache[cache_key]
else:
sp = self._specviz_helper.get_data(data_label=data_label,
spatial_subset=subset_label)
self.app._get_object_cache[cache_key] = sp
# Calculations have to happen in the frame of viewer display units.
disp_wave = sp.spectral_axis.to_value(viewer.state.x_display_unit, u.spectral())
disp_flux = sp.flux.to_value(viewer.state.y_display_unit,
u.spectral_density(sp.spectral_axis))
# Out of range in spectral axis.
if (self.dataset.selected != lyr.layer.label and
(x < disp_wave.min() or x > disp_wave.max())):
continue
cur_i = np.argmin(abs(disp_wave - x))
cur_wave = disp_wave[cur_i]
cur_flux = disp_flux[cur_i]
dx = cur_wave - x
dy = cur_flux - y
cur_distance = math.sqrt(dx * dx + dy * dy)
if (closest_distance is None) or (cur_distance < closest_distance):
closest_distance = cur_distance
closest_i = cur_i
closest_wave = cur_wave
closest_flux = cur_flux
closest_icon = self.app.state.layer_icons.get(lyr.layer.label, '')
self._dict['data_label'] = lyr.layer.label
except Exception: # nosec
# Something is loaded but not the right thing
continue
if closest_wave is None:
self.row2_title = '\u00A0'
self.row2_text = ''
self.row3_title = '\u00A0'
self.row3_text = ''
self.icon = 'mdi-cursor-default'
self.marks[viewer._reference_id].visible = False
_cursor_fallback()
_copy_axes_to_spectral()
return
self.row2_title = 'Wave'
self.row2_text = f'{closest_wave:10.5e} {viewer.state.x_display_unit}'
self._dict['axes_x'] = closest_wave
self._dict['axes_x:unit'] = viewer.state.x_display_unit
if viewer.state.x_display_unit != u.pix:
self.row2_text += f' ({int(closest_i)} pix)'
if self.app.config == 'cubeviz':
# float to be compatible with nan
self._dict['slice'] = float(closest_i)
self._dict['spectral_axis'] = closest_wave
self._dict['spectral_axis:unit'] = viewer.state.x_display_unit
else:
# float to be compatible with nan
self._dict['index'] = float(closest_i)
if viewer.state.y_display_unit is None:
flux_unit = ""
else:
flux_unit = viewer.state.y_display_unit
self.row3_title = 'Flux'
self.row3_text = f'{closest_flux:10.5e} {flux_unit}'
self._dict['axes_y'] = closest_flux
self._dict['axes_y:unit'] = viewer.state.y_display_unit
if closest_icon is not None:
self.icon = closest_icon
else:
self.icon = ""
self.marks[viewer._reference_id].update_xy([closest_wave], [closest_flux])
self.marks[viewer._reference_id].visible = True
for matched_marker_id in self._matched_markers.get(viewer._reference_id, []):
# NOTE: this currently assumes the the matched marker is a vertical line with a
# normalized y-scale
self.marks[matched_marker_id].update_xy([closest_i, closest_i], [0, 1])
self.marks[matched_marker_id].visible = True
_copy_axes_to_spectral()