Source code for jdaviz.configs.default.plugins.markers.markers

import numpy as np
from astropy import units as u
from traitlets import Bool, observe

from jdaviz.core.events import (ViewerAddedMessage, ChangeRefDataMessage,
                                AddDataMessage, RemoveDataMessage,
                                MarkersPluginUpdate)
from jdaviz.core.marks import MarkersMark
from jdaviz.core.registries import tray_registry
from jdaviz.core.template_mixin import PluginTemplateMixin, ViewerSelectMixin, TableMixin
from jdaviz.core.user_api import PluginUserApi

__all__ = ['Markers']


[docs] @tray_registry('g-markers', label="Markers") class Markers(PluginTemplateMixin, ViewerSelectMixin, TableMixin): """ See the :ref:`Markers Plugin Documentation <markers-plugin>` for more details. Only the following attributes and methods are available through the :ref:`public plugin API <plugin-apis>`: * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.show` * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.open_in_tray` * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.close_in_tray` * :meth:`clear_table` * :meth:`~jdaviz.core.template_mixin.TableMixin.export_table` """ template_file = __file__, "markers.vue" uses_active_status = Bool(True).tag(sync=True) _default_table_values = {'spectral_axis': np.nan, 'spectral_axis:unit': '', 'slice': np.nan, 'pixel': (np.nan, np.nan), 'pixel:unreliable': None, 'world': (np.nan, np.nan), 'world:unreliable': None, 'value': np.nan, 'value:unit': '', 'value:unreliable': None, 'index': np.nan} @property def user_api(self): return PluginUserApi(self, expose=('clear_table', 'export_table',)) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.config == 'cubeviz': headers = ['spectral_axis', 'spectral_axis:unit', 'slice', 'pixel', 'world', 'value', 'value:unit', 'viewer'] elif self.config == 'imviz': headers = ['pixel', 'pixel:unreliable', 'world', 'world:unreliable', 'value', 'value:unit', 'value:unreliable', 'viewer'] elif self.config == 'specviz': headers = ['spectral_axis', 'spectral_axis:unit', 'index', 'value', 'value:unit'] elif self.config == 'specviz2d': # TODO: add "index" if/when specviz2d supports plotting spectral_axis headers = ['spectral_axis', 'spectral_axis:unit', 'pixel', 'value', 'value:unit', 'viewer'] elif self.config == 'mosviz': headers = ['spectral_axis', 'spectral_axis:unit', 'pixel', 'world', 'index', 'value', 'value:unit', 'viewer'] else: # allow downstream configs to override headers headers = kwargs.get('headers', []) headers += ['data_label'] self.table.headers_avail = headers self.table.headers_visible = headers self.table._default_values_by_colname = self._default_table_values # subscribe to mouse events on any new viewers self.hub.subscribe(self, ViewerAddedMessage, handler=self._on_viewer_added) # account for image rotation due to a change in reference data self.hub.subscribe(self, ChangeRefDataMessage, handler=lambda msg: self._recompute_mark_positions(msg.viewer)) # enable/disable mark based on whether parent data entry is in viewer self.hub.subscribe(self, AddDataMessage, handler=lambda msg: self._recompute_mark_positions(msg.viewer)) self.hub.subscribe(self, RemoveDataMessage, handler=lambda msg: self._recompute_mark_positions(msg.viewer)) def _create_viewer_callbacks(self, viewer): if not self.is_active: return callback = self._viewer_callback(viewer, self._on_viewer_key_event) viewer.add_event_callback(callback, events=['keydown']) def _on_viewer_added(self, msg): self._create_viewer_callbacks(self.app.get_viewer_by_id(msg.viewer_id)) def _recompute_mark_positions(self, viewer): if self.table is None or self.table._qtable is None: return if 'world' not in self.table.headers_avail: return viewer_id = viewer.reference if viewer.reference is not None else viewer.reference_id viewer_loaded_data = [lyr.layer.label for lyr in viewer.layers] data_labels = self.table._qtable['data_label'] viewer_labels = self.table._qtable['viewer'] # note: could eventually have a user-provided switch to show markers in other viewers # by just skipping this first viewer_label == viewer_id check in_viewer = [viewer_label == viewer_id and data_label in viewer_loaded_data for viewer_label, data_label in zip(viewer_labels, data_labels)] viewer_mark = self._get_mark(viewer) if not np.any(in_viewer): viewer_mark.x, viewer_mark.y = [], [] return orig_world_x = np.asarray(self.table._qtable['world'][:, 0][in_viewer]) orig_world_y = np.asarray(self.table._qtable['world'][:, 1][in_viewer]) if self.app._link_type.lower() == 'wcs': # convert from the sky coordinates in the table to pixels via the WCS of the current # reference data new_wcs = viewer.state.reference_data.coords try: new_x, new_y = new_wcs.world_to_pixel_values(orig_world_x*u.deg, orig_world_y*u.deg) except Exception: # fail gracefully new_x, new_y = [], [] elif self.app._link_type == 'pixels': # we need to convert based on the WCS of the individual data layers on which each mark # was first created new_x, new_y = np.zeros_like(orig_world_x), np.zeros_like(orig_world_y) for data_label in np.unique(data_labels[in_viewer]): these = data_labels[in_viewer] == data_label if not np.any(these): continue wcs = self.app.data_collection[data_label].coords try: new_x[these], new_y[these] = wcs.world_to_pixel_values(orig_world_x[these]*u.deg, # noqa orig_world_y[these]*u.deg) # noqa except Exception: # fail gracefully new_x, new_y = [], [] break else: raise NotImplementedError(f"link_type {self.app._link_type} not implemented") # check for entries that do not correspond to a layer or only have pixel coordinates pixel_only_inds = data_labels == '' if np.any(pixel_only_inds): # TODO: should we rescale these since pixel coordinates when linked by WCS are always # on the range 0-1 because of the orientation layer? Or hide the pixel option in the # cycler when WCS-linked? pixel_x = np.asarray(self.table._qtable['pixel'][:, 0]) pixel_y = np.asarray(self.table._qtable['pixel'][:, 1]) new_x = np.append(new_x, pixel_x[pixel_only_inds]) new_y = np.append(new_y, pixel_y[pixel_only_inds]) viewer_mark.x, viewer_mark.y = new_x, new_y def _get_mark(self, viewer): matches = [mark for mark in viewer.figure.marks if isinstance(mark, MarkersMark)] if len(matches): return matches[0] mark = MarkersMark(viewer) viewer.figure.marks = viewer.figure.marks + [mark] return mark @property def marks(self): return {viewer_id: self._get_mark(viewer) for viewer_id, viewer in self.app._viewer_store.items() if hasattr(viewer, 'figure')} @property def coords_info(self): return self.app.session.application._tools['g-coords-info'] @observe('is_active') def _on_is_active_changed(self, *args): if self.disabled_msg: return # toggle visibility of markers for mark in self.marks.values(): mark.visible = self.is_active # subscribe/unsubscribe to keypress events across all viewers for viewer in self.app._viewer_store.values(): if not hasattr(viewer, 'figure'): # table viewer, etc continue callback = self._viewer_callback(viewer, self._on_viewer_key_event) if self.is_active: viewer.add_event_callback(callback, events=['keydown']) else: viewer.remove_event_callback(callback) def _on_viewer_key_event(self, viewer, data): if data['event'] == 'keydown' and data['key'] == 'm': row_info = self.coords_info.as_dict() if 'viewer' in self.table.headers_avail: row_info['viewer'] = viewer.reference if viewer.reference is not None else viewer.reference_id # noqa for k in self.table.headers_avail: row_info.setdefault(k, self._default_table_values.get(k, '')) try: self.table.add_item({k: v for k, v in row_info.items() if k in self.table.headers_avail}) except ValueError as err: raise ValueError(f'failed to add {row_info} to table: {repr(err)}') x, y = row_info['axes_x'], row_info['axes_y'] self._get_mark(viewer).append_xy(getattr(x, 'value', x), getattr(y, 'value', y)) self.hub.broadcast(MarkersPluginUpdate(table_length=len(self.table), sender=self))
[docs] def clear_table(self): """ Clear all entries/markers from the current table. """ super().clear_table() for mark in self.marks.values(): mark.clear() self.hub.broadcast(MarkersPluginUpdate(table_length=0, sender=self))