Source code for jdaviz.core.astrowidgets_api

import os

import gwcs
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.wcs import NoConvergence
from echo import delay_callback
from glue.config import colormaps
from glue.core import Data

from jdaviz.configs.imviz.helper import get_top_layer_index, get_reference_image_data
from jdaviz.core.events import SnackbarMessage, AstrowidgetMarkersChangedMessage
from jdaviz.core.helpers import data_has_valid_wcs

__all__ = ['AstrowidgetsImageViewerMixin']


[docs] class AstrowidgetsImageViewerMixin: """This class implements ``astrowidgets`` API for Jdaviz *image* viewer. This does not provide a fully functional viewer, but rather should be used as mixin into an existing viewer subclass. A child class that uses this must run :meth:`init_astrowidgets_api` within its ``__init__``. .. note:: Do not use this for spectral or cube viewers. """ RESERVED_MARKER_SET_NAMES = ['all'] # __init__ not called, so use this to setup.
[docs] def init_astrowidgets_api(self): """This method must be called in child class ``__init__``.""" # Markers self._marktags = set() self._default_mark_tag_name = 'default-marker-name' # marker shape not settable: https://github.com/glue-viz/glue/issues/2202 self.marker = {'color': 'red', 'alpha': 1.0, 'markersize': 5}
[docs] def save(self, filename): """Save out the current image view to given PNG filename. Parameters ---------- filename : str PNG filename. If the given file already exists, it will be silently overwritten. """ if not filename.lower().endswith('.png'): filename = filename + '.png' # https://github.com/bqplot/bqplot/pull/1397 def on_png_received(data): with open(os.path.expanduser(filename), 'bw') as f: f.write(data) self.figure.get_png_data(on_png_received)
[docs] def center_on(self, point): """Centers the view on a particular point on the top visible layer. The data label of the top visible layer can be queried using the viewer's `~jdaviz.configs.imviz.plugins.viewers.ImvizImageView.top_visible_data_label` property. Parameters ---------- point : tuple or `~astropy.coordinates.SkyCoord` If tuple of ``(X, Y)`` is given, it is assumed to be in data coordinates and 0-indexed. Raises ------ AttributeError Sky coordinates are given but image does not have a valid WCS. """ i_top = get_top_layer_index(self) if i_top is None: return image = self.layers[i_top].layer if isinstance(point, SkyCoord): if data_has_valid_wcs(image): try: point = image.coords.world_to_pixel(point) # 0-indexed X, Y except NoConvergence as e: # pragma: no cover self.session.hub.broadcast(SnackbarMessage( f'{point} is likely out of bounds: {repr(e)}', color="warning", sender=self)) return else: raise AttributeError(f'{getattr(image, "label", None)} does not have a valid WCS') if not np.all(np.isfinite(point)): return elif hasattr(self, '_get_real_xy'): # User gives pixel wrt top layer but we want reference data location. pix = self._get_real_xy(image, point[0], point[1], reverse=True) else: # pragma: no cover pix = point width = self.state.x_max - self.state.x_min height = self.state.y_max - self.state.y_min with delay_callback(self.state, 'x_min', 'x_max', 'y_min', 'y_max'): self.state.x_min = pix[0] - (width * 0.5) self.state.y_min = pix[1] - (height * 0.5) self.state.x_max = self.state.x_min + width self.state.y_max = self.state.y_min + height
[docs] def offset_by(self, dx, dy): """Move the center to a point that is given offset away from the current center. Parameters ---------- dx, dy : float or `~astropy.units.Quantity` Offset value. Without a unit, assumed to be pixel offsets. If a unit is attached, offset by pixel or sky is assumed from the unit. Raises ------ AttributeError Sky offset is given but image does not have a valid WCS. ValueError Offsets are of different types. astropy.units.core.UnitTypeError Sky offset has invalid unit. """ i_top = get_top_layer_index(self) image = self.layers[i_top].layer width = self.state.x_max - self.state.x_min height = self.state.y_max - self.state.y_min x_cen = self.state.x_min + (width * 0.5) y_cen = self.state.y_min + (height * 0.5) if hasattr(self, '_get_real_xy'): real_cen = self._get_real_xy(image, x_cen, y_cen) else: real_cen = (x_cen, y_cen) dx, dx_coord = _offset_is_pixel_or_sky(dx) dy, dy_coord = _offset_is_pixel_or_sky(dy) if dx_coord != dy_coord: raise ValueError(f'dx is of type {dx_coord} but dy is of type {dy_coord}') if dx_coord == 'wcs': if data_has_valid_wcs(image): # To avoid distortion headache, assume offset is relative to # displayed center. sky_cen = image.coords.pixel_to_world(real_cen[0], real_cen[1]) new_cen = sky_cen.spherical_offsets_by(dx, dy) else: raise AttributeError(f'{getattr(image, "label", None)} does not have a valid WCS') else: new_cen = (real_cen[0] + dx, real_cen[1] + dy) self.center_on(new_cen)
@property def zoom_level(self): """ The zoom level for an image viewer (not linked by WCS). .. warning:: When a viewer is linked by WCS, the result corresponds to the ``zoom_level`` of the reference data. * 1 means real-pixel-size. * 2 means zoomed in by a factor of 2. * 0.5 means zoomed out by a factor of 2. * 'fit' means zoomed to fit the whole image width into display. """ if self.shape is None: # pragma: no cover raise ValueError('Viewer is still loading, try again later') if hasattr(self, '_get_real_xy'): image, i_ref = get_reference_image_data(self.jdaviz_app, self.reference) # TODO: Do we want top layer instead? # i_top = get_top_layer_index(self) # image = self.layers[i_top].layer real_min = self._get_real_xy(image, self.state.x_min, self.state.y_min) real_max = self._get_real_xy(image, self.state.x_max, self.state.y_max) else: real_min = (self.state.x_min, self.state.y_min) real_max = (self.state.x_max, self.state.y_max) screenx = self.shape[1] screeny = self.shape[0] zoom_x = screenx / abs(real_max[0] - real_min[0]) zoom_y = screeny / abs(real_max[1] - real_min[1]) return max(zoom_x, zoom_y) # Similar to Ginga get_scale() # Loosely based on glue/viewers/image/state.py @zoom_level.setter def zoom_level(self, val): if ((not isinstance(val, (int, float)) and val != 'fit') or (isinstance(val, (int, float)) and val <= 0)): raise ValueError(f'Unsupported zoom level: {val}') if (self.shape is None or self.state.x_att is None or self.state.y_att is None): # pragma: no cover return # Zoom on X and Y will auto-adjust. if val == 'fit': self.state.reset_limits() return new_dx = self.shape[1] * 0.5 / val if hasattr(self, '_get_real_xy'): image, i_ref = get_reference_image_data(self.jdaviz_app, self.reference) # TODO: Do we want top layer instead? # i_top = get_top_layer_index(self) # image = self.layers[i_top].layer real_min = self._get_real_xy(image, self.state.x_min, self.state.y_min) real_max = self._get_real_xy(image, self.state.x_max, self.state.y_max) cur_xcen = (real_min[0] + real_max[0]) * 0.5 new_x_min = self._get_real_xy(image, cur_xcen - new_dx - 0.5, real_min[1], reverse=True)[0] # noqa: E501 new_x_max = self._get_real_xy(image, cur_xcen + new_dx - 0.5, real_max[1], reverse=True)[0] # noqa: E501 else: cur_xcen = (self.state.x_min + self.state.x_max) * 0.5 new_x_min = cur_xcen - new_dx - 0.5 new_x_max = cur_xcen + new_dx - 0.5 with delay_callback(self.state, 'x_min', 'x_max'): self.state.x_min = new_x_min self.state.x_max = new_x_max # We need to adjust the limits in here to avoid triggering all # the update events then changing the limits again. self.state._adjust_limits_aspect() # Discussion on why we need two different ways to set zoom at # https://github.com/astropy/astrowidgets/issues/144
[docs] def zoom(self, val): """Zoom in or out by the given factor. .. warning:: When a viewer is linked by WCS, this method accidentally changes the center also; see the warning in ``zoom_level``. Parameters ---------- val : int or float The zoom level to zoom the image. See `zoom_level`. Raises ------ ValueError Invalid zoom factor. """ if not isinstance(val, (int, float)): raise ValueError(f"zoom only accepts int or float but got '{val}'") self.zoom_level = self.zoom_level * val
@property def colormap_options(self): """List of colormap names.""" # Do not sort to match Plot Options order. return [member[0] for member in colormaps.members]
[docs] def set_colormap(self, cmap): """Set colormap to the given colormap name. Parameters ---------- cmap : str Colormap name. Possible values can be obtained from :meth:`colormap_options`. Raises ------ ValueError Invalid colormap name. """ cm = None for member in colormaps.members: if member[0] == cmap: cm = member[1] break if cm is None: raise ValueError(f"Invalid colormap '{cmap}', must be one of {self.colormap_options}") i_top = get_top_layer_index(self) self.state.layers[i_top].cmap = cm
@property def stretch_options(self): """List of all available options for image stretching. Their ``astropy.visualization`` counterparts are also accepted, as follows: * ``'arcsinh'``: ``astropy.visualization.AsinhStretch`` * ``'linear'``: ``astropy.visualization.LinearStretch`` * ``'log'``: ``astropy.visualization.LogStretch`` * ``'sqrt'``: ``astropy.visualization.SqrtStretch`` """ # TODO: Is there a better way to access this in Glue? See glue/viewers/image/state.py return ['arcsinh', 'linear', 'log', 'sqrt'] @property def stretch(self): """The image stretching algorithm in use.""" i_top = get_top_layer_index(self) return self.state.layers[i_top].stretch @stretch.setter def stretch(self, val): valid_vals = self.stretch_options if isinstance(val, type): # is a class # Translate astropy.visualization from astropy.visualization import AsinhStretch, LinearStretch, LogStretch, SqrtStretch if issubclass(val, AsinhStretch): val = 'arcsinh' elif issubclass(val, LinearStretch): val = 'linear' elif issubclass(val, LogStretch): val = 'log' elif issubclass(val, SqrtStretch): val = 'sqrt' else: raise ValueError(f"Invalid stretch {val}, must be one of {valid_vals}") elif val not in valid_vals: raise ValueError(f"Invalid stretch '{val}', must be one of {valid_vals}") i_top = get_top_layer_index(self) self.state.layers[i_top].stretch = val @property def autocut_options(self): """List of all available options for automatic image cut levels.""" # See glue-jupyter/bqplot/image/state.py#L29 return ['minmax', '99.5%', '99%', '95%', '90%'] @property def cuts(self): """Current image cut levels. To set new cut levels, either provide a tuple of ``(low, high)`` values or one of the options from `autocut_options`. """ i_top = get_top_layer_index(self) return self.state.layers[i_top].v_min, self.state.layers[i_top].v_max # TODO: Support astropy.visualization, see https://github.com/glue-viz/glue/issues/2218 @cuts.setter def cuts(self, val): i_top = get_top_layer_index(self) if isinstance(val, str): # autocut if val == 'minmax': val = 100 elif val == '99.5%': val = 99.5 elif val == '99%': val = 99 elif val == '95%': val = 95 elif val == '90%': val = 90 else: raise ValueError(f"Invalid autocut '{val}', must be one of {self.autocut_options}") self.state.layers[i_top].percentile = val else: # (low, high) if (not isinstance(val, (list, tuple)) or len(val) != 2 or not np.all([isinstance(x, (int, float)) for x in val])): raise ValueError(f"Invalid cut levels {val}, must be (low, high)") self.state.layers[i_top].v_min = val[0] self.state.layers[i_top].v_max = val[1] @property def marker(self): """Marker to use. Marker can be set as follows; e.g.:: {'color': 'red', 'alpha': 1.0, 'markersize': 3} {'color': '#ff0000', 'alpha': 0.5, 'markersize': 10} {'color': (1, 0, 0)} The valid properties for Glue markers are listed at https://docs.glueviz.org/en/stable/api/glue.core.visual.VisualAttributes.html """ return self._marker_dict @marker.setter def marker(self, val): # Validation: Ideally Glue should do this but we have to due to # https://github.com/glue-viz/glue/issues/2203 given = set(val.keys()) allowed = set(('color', 'alpha', 'markersize', 'fill')) if not given.issubset(allowed): raise KeyError(f'Invalid attribute(s): {given - allowed}') if 'color' in val: from matplotlib.colors import ColorConverter ColorConverter().to_rgb(val['color']) # ValueError: Invalid RGBA argument if 'alpha' in val: alpha = val['alpha'] if not isinstance(alpha, (int, float)) or alpha < 0 or alpha > 1: raise ValueError(f'Invalid alpha: {alpha}') if 'markersize' in val: size = val['markersize'] if not isinstance(size, (int, float)): raise ValueError(f'Invalid marker size: {size}') if 'fill' in val: fill = val['fill'] if not isinstance(fill, bool): raise ValueError(f'Invalid fill: {fill}') # Only set this once we have successfully validated a marker. # Those not set here use Glue defaults. self._marker_dict = val def _validate_marker_name(self, marker_name): """Raise an error if the marker_name is not allowed.""" if marker_name in self.RESERVED_MARKER_SET_NAMES: raise ValueError( f"The marker name {marker_name} is not allowed. Any name is " f"allowed except these: {', '.join(self.RESERVED_MARKER_SET_NAMES)}")
[docs] def add_markers(self, table, x_colname='x', y_colname='y', skycoord_colname='coord', use_skycoord=False, marker_name=None): """Creates markers w.r.t. the reference image at given points in the table. .. note:: Use `marker` to change marker appearance. .. note:: Once markers are added, linking cannot be changed. To change linking options, remove and re-add the markers manually. Parameters ---------- table : `~astropy.table.Table` Table containing marker locations. x_colname, y_colname : str Column names for X and Y. Coordinates must be 0-indexed. skycoord_colname : str Column name with `~astropy.coordinates.SkyCoord` objects. use_skycoord : bool If `True`, use ``skycoord_colname`` to mark. Otherwise, use ``x_colname`` and ``y_colname``. marker_name : str, optional Name to assign the markers in the table. Providing a name allows markers to be removed by name at a later time. Raises ------ AttributeError Sky coordinates are given but reference image does not have a valid WCS. ValueError Invalid marker name. """ from glue.viewers.scatter.state import ScatterLayerState if marker_name is None: marker_name = self._default_mark_tag_name self._validate_marker_name(marker_name) jglue = self.session.application # Link markers to top visible image data or reference data. if not use_skycoord and hasattr(self, '_get_real_xy'): i_top = get_top_layer_index(self) image = self.layers[i_top].layer else: image = self.state.reference_data # TODO: Is Glue smart enough to no-op if link already there? if use_skycoord: if not data_has_valid_wcs(image): raise AttributeError(f'{getattr(image, "label", None)} does not have a valid WCS') sky = table[skycoord_colname] t_glue = Data(marker_name, ra=sky.ra.deg, dec=sky.dec.deg) dcomps = image.components if (isinstance(image.coords, gwcs.WCS) and image.coords.output_frame.reference_frame.name != 'galactic' and 'Lon' in dcomps and 'Lat' in dcomps): ra_str = 'Lon' dec_str = 'Lat' else: ra_str = 'Right Ascension' dec_str = 'Declination' with jglue.data_collection.delay_link_manager_update(): jglue.data_collection[marker_name] = t_glue jglue.add_link(t_glue, 'ra', image, ra_str) jglue.add_link(t_glue, 'dec', image, dec_str) else: t_glue = Data(marker_name, **table[x_colname, y_colname]) with jglue.data_collection.delay_link_manager_update(): jglue.data_collection[marker_name] = t_glue jglue.add_link(t_glue, x_colname, image, image.pixel_component_ids[1].label) jglue.add_link(t_glue, y_colname, image, image.pixel_component_ids[0].label) try: self.add_data(t_glue) except Exception as e: # pragma: no cover self.session.hub.broadcast(SnackbarMessage( f"Failed to add markers '{marker_name}': {repr(e)}", color="warning", sender=self)) else: # Only can set alpha and color using self.add_data(), so brute force here instead. # https://github.com/glue-viz/glue/issues/2201 for lyr in self.state.layers: if isinstance(lyr, ScatterLayerState) and lyr.layer.label == marker_name: for key, val in self.marker.items(): setattr(lyr, {'markersize': 'size'}.get(key, key), val) break self.jdaviz_app.set_data_visibility(self.reference_id, marker_name, visible=True, replace=False) self._marktags.add(marker_name) self.session.hub.broadcast(AstrowidgetMarkersChangedMessage(True, sender=self))
[docs] def remove_markers(self, marker_name=None): """Remove some but not all of the markers by name used when adding the markers. Parameters ---------- marker_name : str Name used when the markers were added. If not given, will delete markers added under default name. """ if marker_name is None: marker_name = self._default_mark_tag_name # TODO: How to test manually created tiled viewers in CI? if marker_name not in self._marktags: # pragma: no cover self.session.hub.broadcast(SnackbarMessage( f"Failed to remove markers '{marker_name}': Not added by this viewer", color="warning", sender=self)) return try: i = self.session.application.data_collection.labels.index(marker_name) except ValueError as e: # pragma: no cover self.session.hub.broadcast(SnackbarMessage( f"Failed to remove markers '{marker_name}': {repr(e)}", color="warning", sender=self)) return data = self.session.application.data_collection[i] self.session.application.data_collection.remove(data) self._marktags.remove(marker_name) self.session.hub.broadcast(AstrowidgetMarkersChangedMessage(len(self._marktags) > 0, sender=self))
[docs] def reset_markers(self): """Delete all markers.""" # Grab the entire list of marker names before iterating # otherwise what we are iterating over changes. for marker_name in list(self._marktags): self.remove_markers(marker_name=marker_name)
def _offset_is_pixel_or_sky(x): if isinstance(x, u.Quantity): if x.unit in (u.dimensionless_unscaled, u.pix): coord = 'data' val = x.value else: coord = 'wcs' val = x # Can stay Quantity else: coord = 'data' val = x return val, coord