Source code for jdaviz.configs.imviz.plugins.aper_phot_simple.aper_phot_simple

import os
import warnings
from datetime import datetime, timezone

import astropy
import numpy as np
from astropy import units as u
from astropy.modeling.fitting import LevMarLSQFitter
from astropy.modeling import Parameter
from astropy.modeling.models import Gaussian1D
from astropy.time import Time
from glue.core.message import SubsetUpdateMessage
from ipywidgets import widget_serialization
from packaging.version import Version
from photutils.aperture import (ApertureStats, CircularAperture, EllipticalAperture,
                                RectangularAperture)
from traitlets import Any, Bool, Integer, List, Unicode, observe

from jdaviz.core.custom_traitlets import FloatHandleEmpty
from jdaviz.core.events import SnackbarMessage, LinkUpdatedMessage, SliceValueUpdatedMessage
from jdaviz.core.region_translators import regions2aperture, _get_region_from_spatial_subset
from jdaviz.core.registries import tray_registry
from jdaviz.core.template_mixin import (PluginTemplateMixin, DatasetMultiSelectMixin,
                                        SubsetSelect, ApertureSubsetSelectMixin,
                                        TableMixin, PlotMixin, MultiselectMixin, with_spinner)
from jdaviz.utils import PRIHDR_KEY

__all__ = ['SimpleAperturePhotometry']

ASTROPY_LT_5_2 = Version(astropy.__version__) < Version('5.2')


[docs] @tray_registry('imviz-aper-phot-simple', label="Aperture Photometry") class SimpleAperturePhotometry(PluginTemplateMixin, ApertureSubsetSelectMixin, DatasetMultiSelectMixin, TableMixin, PlotMixin, MultiselectMixin): """ The Aperture Photometry plugin performs aperture photometry for drawn regions. See the :ref:`Aperture Photometry Plugin Documentation <aper-phot-simple>` for more details. Only the following attributes and methods are available through the :ref:`public plugin API <plugin-apis>`: * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.show` * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.open_in_tray` * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.close_in_tray` """ template_file = __file__, "aper_phot_simple.vue" uses_active_status = Bool(True).tag(sync=True) aperture_area = Integer().tag(sync=True) background_items = List().tag(sync=True) background_selected = Unicode("").tag(sync=True) background_value = FloatHandleEmpty(0).tag(sync=True) pixel_area_multi_auto = Bool(True).tag(sync=True) pixel_area = FloatHandleEmpty(0).tag(sync=True) counts_factor = FloatHandleEmpty(0).tag(sync=True) flux_scaling_multi_auto = Bool(True).tag(sync=True) flux_scaling_warning = Unicode("").tag(sync=True) flux_scaling = FloatHandleEmpty(0).tag(sync=True) result_available = Bool(False).tag(sync=True) result_failed_msg = Unicode("").tag(sync=True) results = List().tag(sync=True) plot_types = List([]).tag(sync=True) current_plot_type = Unicode().tag(sync=True) plot_available = Bool(False).tag(sync=True) radial_plot = Any('').tag(sync=True, **widget_serialization) fit_radial_profile = Bool(False).tag(sync=True) fit_results = List().tag(sync=True) # Cubeviz only cube_slice = Unicode("").tag(sync=True) is_cube = Bool(False).tag(sync=True) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.background = SubsetSelect(self, 'background_items', 'background_selected', dataset='dataset', default_text='Manual', manual_options=['Manual'], filters=['is_spatial', 'is_not_composite']) headers = ['xcenter', 'ycenter', 'sky_center', 'sum', 'sum_aper_area', 'aperture_sum_counts', 'aperture_sum_counts_err', 'aperture_sum_mag', 'min', 'max', 'mean', 'median', 'mode', 'std', 'mad_std', 'var', 'biweight_location', 'biweight_midvariance', 'fwhm', 'semimajor_sigma', 'semiminor_sigma', 'orientation', 'eccentricity', 'data_label', 'subset_label'] self.table.headers_avail = headers self.table.headers_visible = headers self.plot_types = ["Curve of Growth", "Radial Profile", "Radial Profile (Raw)"] self.current_plot_type = self.plot_types[0] self._fitted_model_name = 'phot_radial_profile' # override default plot styling self.plot.figure.fig_margin = {'top': 60, 'bottom': 60, 'left': 65, 'right': 15} self.plot.viewer.axis_y.tick_format = '0.2e' self.plot.viewer.axis_y.label_offset = '55px' self.session.hub.subscribe(self, SubsetUpdateMessage, handler=self._on_subset_update) self.session.hub.subscribe(self, LinkUpdatedMessage, handler=self._on_link_update) # Custom dataset filters for Cubeviz if self.config == "cubeviz": def valid_cubeviz_datasets(data): comp = data.get_component(data.main_components[0]) img_unit = u.Unit(comp.units) if comp.units else u.dimensionless_unscaled acceptable_types = ['spectral flux density wav', 'photon flux density wav', 'spectral flux density', 'photon flux density'] return ((data.ndim in (2, 3)) and ((img_unit == (u.MJy / u.sr)) or (img_unit.physical_type in acceptable_types))) self.dataset.add_filter(valid_cubeviz_datasets) self.session.hub.subscribe(self, SliceValueUpdatedMessage, handler=self._on_slice_changed) # TODO: expose public API once finalized # @property # def user_api(self): # return PluginUserApi(self, expose=('multiselect', 'dataset', 'aperture', # 'background', 'background_value', # 'pixel_area', 'counts_factor', 'flux_scaling', # 'calculate_photometry', # 'unpack_batch_options', 'calculate_batch_photometry')) def _on_slice_changed(self, msg): if self.config != "cubeviz": return self.cube_slice = f"{msg.value:.3e} {msg.value_unit}" self._cube_wave = u.Quantity(msg.value, msg.value_unit) @observe("dataset_selected") def _on_dataset_selected_changed(self, event={}): if self.config != "cubeviz": return # self.dataset might not exist when app is setting itself up. if hasattr(self, "dataset") and self.dataset.selected_dc_item.ndim > 2: self.is_cube = True else: self.is_cube = False def _get_defaults_from_metadata(self, dataset=None): defaults = {} if dataset is None: meta = self.dataset.selected_dc_item.meta.copy() else: meta = self.dataset._get_dc_item(dataset).meta.copy() # Extract telescope specific unit conversion factors, if applicable. if PRIHDR_KEY in meta: meta.update(meta[PRIHDR_KEY]) del meta[PRIHDR_KEY] if 'telescope' in meta: telescope = meta['telescope'] else: telescope = meta.get('TELESCOP', '') if telescope == 'JWST': # Hardcode the flux conversion factor from MJy to ABmag mjy2abmag = 0.003631 if 'photometry' in meta and 'pixelarea_arcsecsq' in meta['photometry']: defaults['pixel_area'] = meta['photometry']['pixelarea_arcsecsq'] if 'bunit_data' in meta and meta['bunit_data'] == u.Unit("MJy/sr"): defaults['flux_scaling'] = mjy2abmag elif 'PIXAR_A2' in meta: defaults['pixel_area'] = meta['PIXAR_A2'] if 'BUNIT' in meta and meta['BUNIT'] == u.Unit("MJy/sr"): defaults['flux_scaling'] = mjy2abmag elif telescope == 'HST': # TODO: Add more HST support, as needed. # HST pixel scales are from instrument handbooks. # This is really not used because HST data does not have sr in unit. # This is only for completeness. # For counts conversion, PHOTFLAM is used to convert "counts" to flux manually, # which is the opposite of JWST, so we just do not do it here. instrument = meta.get('INSTRUME', '').lower() detector = meta.get('DETECTOR', '').lower() if instrument == 'acs': if detector == 'wfc': defaults['pixel_area'] = 0.05 * 0.05 elif detector == 'hrc': # pragma: no cover defaults['pixel_area'] = 0.028 * 0.025 elif detector == 'sbc': # pragma: no cover defaults['pixel_area'] = 0.034 * 0.03 elif instrument == 'wfc3' and detector == 'uvis': # pragma: no cover defaults['pixel_area'] = 0.04 * 0.04 return defaults @observe('flux_scaling_multi_auto') def _multiselect_flux_scaling_warning(self, event={}): if not self.flux_scaling_multi_auto: self.flux_scaling_warning = '' return no_flux_scaling = [dataset for dataset in self.dataset.selected if 'flux_scaling' not in self._get_defaults_from_metadata(dataset)] if len(no_flux_scaling): self.flux_scaling_warning = ('Could not determine flux scaling for ' f'{", ".join(no_flux_scaling)}. Those entries will ' 'default to zero. Turn off auto-mode to provide ' 'flux-scaling manually.') else: self.flux_scaling_warning = '' @observe('flux_scaling') def _singleselect_flux_scaling_warning(self, event={}): if not self.multiselect: # disable warning once user changes value self.flux_scaling_warning = '' @observe('dataset_selected') def _dataset_selected_changed(self, event={}): if not hasattr(self, 'dataset'): # plugin not fully initialized return if self.dataset.selected_dc_item is None: return if self.multiselect: # defaults are applied within the loop if the auto-switches are enabled, # but we still need to update the flux-scaling warning self._multiselect_flux_scaling_warning() return try: defaults = self._get_defaults_from_metadata() self.counts_factor = 0 self.pixel_area = defaults.get('pixel_area', 0) self.flux_scaling = defaults.get('flux_scaling', 0) if 'flux_scaling' in defaults: self.flux_scaling_warning = '' else: self.flux_scaling_warning = ('Could not determine flux scaling for ' f'{self.dataset.selected}, defaulting to zero.') except Exception as e: self.hub.broadcast(SnackbarMessage( f"Failed to extract {self.dataset_selected}: {repr(e)}", color='error', sender=self)) # auto-populate background, if applicable. self._aperture_selected_changed() def _on_subset_update(self, msg): if not self.dataset_selected or not self.aperture_selected: return if self.multiselect: self._background_selected_changed() return sbst = msg.subset if sbst.label == self.aperture_selected and sbst.data.label == self.dataset_selected: self._aperture_selected_changed() elif sbst.label == self.background_selected and sbst.data.label == self.dataset_selected: self._background_selected_changed() def _on_link_update(self, msg): if not self.dataset_selected or not self.aperture_selected: return # Force background auto-calculation to update when linking has changed. self._aperture_selected_changed() @observe('aperture_selected', 'multiselect') def _aperture_selected_changed(self, event={}): if not self.dataset_selected or not self.aperture_selected: return if self.multiselect is not isinstance(self.aperture_selected, list): # then multiselect is in the process of changing but the traitlet for aperture_selected # has not been updated internally yet return if self.multiselect: self._background_selected_changed() return # NOTE: aperture_selected can be triggered here before aperture_selected_validity is updated # so we'll still allow the snackbar to be raised as a second warning to the user and to # avoid acting on outdated information # NOTE: aperture area is only used to determine if a warning should be shown in the UI # and so does not need to be calculated within user API calls that don't act on traitlets try: # Sky subset does not have area. Not worth it to calculate just for a warning. if hasattr(self.aperture.selected_spatial_region, 'area'): self.aperture_area = int(np.ceil(self.aperture.selected_spatial_region.area)) else: self.aperture_area = 0 except Exception as e: self.hub.broadcast(SnackbarMessage( f"Failed to extract {self.aperture_selected}: {repr(e)}", color='error', sender=self)) else: self._background_selected_changed() @property def _cubeviz_slice_ind(self): fv = self.app.get_viewer(self.app._jdaviz_helper._default_flux_viewer_reference_name) return fv.slice def _calc_background_median(self, reg, data=None): # Basically same way image stats are calculated in vue_do_aper_phot() # except here we only care about one stat for the background. if data is None: if self.multiselect: if len(self.dataset.selected) == 1: data = self.dataset.selected_dc_item[0] else: raise ValueError("cannot calculate background median in multiselect") else: data = self.dataset.selected_dc_item comp = data.get_component(data.main_components[0]) if self.config == "cubeviz" and data.ndim > 2: comp_data = comp.data[:, :, self._cubeviz_slice_ind].T # nx, ny --> ny, nx # Similar to coords_info logic. if '_orig_spec' in getattr(data, 'meta', {}): w = data.meta['_orig_spec'].wcs.celestial else: w = data.coords.celestial else: # "imviz" comp_data = comp.data # ny, nx w = data.coords if hasattr(reg, 'to_pixel'): reg = reg.to_pixel(w) aper_mask_stat = reg.to_mask(mode='center') img_stat = aper_mask_stat.get_values(comp_data, mask=None) # photutils/background/_utils.py --> nanmedian() return np.nanmedian(img_stat) # Naturally in data unit @observe('background_selected') def _background_selected_changed(self, event={}): background_selected = event.get('new', self.background_selected) if background_selected == 'Manual': # we'll later access the user's self.background_value directly return if self.multiselect: # background_value will be recomputed within batch mode anyways and will # be replaced in the UI with a message self.background_value = -1 return try: reg = _get_region_from_spatial_subset(self, self.background.selected_subset_state) self.background_value = self._calc_background_median(reg) except Exception as e: self.background_value = 0 self.hub.broadcast(SnackbarMessage( f"Failed to extract {background_selected}: {repr(e)}", color='error', sender=self)) @with_spinner() def calculate_photometry(self, dataset=None, aperture=None, background=None, background_value=None, pixel_area=None, counts_factor=None, flux_scaling=None, add_to_table=True, update_plots=True): """ Calculate aperture photometry given the values set in the plugin or any overrides provided as arguments here (which will temporarily override plugin values for this calculation only). Parameters ---------- dataset : str, optional Dataset to use for photometry. aperture : str, optional Subset to use as the aperture. background : str, optional Subset to use to calculate the background. background_value : float, optional Background to subtract, same unit as data. Automatically computed if ``background`` is set to a subset. pixel_area : float, optional Pixel area in arcsec squared, only used if sr in data unit. counts_factor : float, optional Factor to convert data unit to counts, in unit of flux/counts. flux_scaling : float, optional Same unit as data, used in -2.5 * log(flux / flux_scaling). add_to_table : bool, optional update_plots : bool, optional Returns ------- table row, fit results """ if self.multiselect and (dataset is None or aperture is None): # pragma: no cover raise ValueError("for batch mode, use calculate_batch_photometry") if dataset is not None: if dataset not in self.dataset.choices: # pragma: no cover raise ValueError(f"dataset must be one of {self.dataset.choices}") data = self.dataset._get_dc_item(dataset) else: # we can use the pre-cached value data = self.dataset.selected_dc_item if aperture is not None: if aperture not in self.aperture.choices: raise ValueError(f"aperture must be one of {self.aperture.choices}") if aperture is not None or dataset is not None: reg = self.aperture._get_spatial_region(subset=aperture if aperture is not None else self.aperture.selected, # noqa dataset=dataset if dataset is not None else self.dataset.selected) # noqa # determine if a valid aperture (since selected_validity only applies to selected entry) _, _, validity = self.aperture._get_mark_coords_and_validate(selected=aperture) if not validity.get('is_aperture'): raise ValueError(f"Selected aperture {aperture} is not valid: {validity.get('aperture_message')}") # noqa else: # use the pre-cached value if not self.aperture.selected_validity.get('is_aperture'): raise ValueError(f"Selected aperture is not valid: {self.aperture.selected_validity.get('aperture_message')}") # noqa reg = self.aperture.selected_spatial_region # Reset last fitted model fit_model = None # TODO: remove _fitted_model_name cache? if self._fitted_model_name in self.app.fitted_models: del self.app.fitted_models[self._fitted_model_name] comp = data.get_component(data.main_components[0]) if background is not None and background not in self.background.choices: # pragma: no cover raise ValueError(f"background must be one of {self.background.choices}") if background_value is not None: if ((background not in (None, 'Manual')) or (background is None and self.background_selected != 'Manual')): raise ValueError("cannot provide background_value with background!='Manual'") elif (background == 'Manual' or (background is None and self.background.selected == 'Manual')): background_value = self.background_value elif background is None and dataset is None: # use the previously-computed value in the plugin background_value = self.background_value else: bg_reg = self.aperture._get_spatial_region(subset=background if background is not None else self.background.selected, # noqa dataset=dataset if dataset is not None else self.dataset.selected) # noqa background_value = self._calc_background_median(bg_reg, data=data) try: bg = float(background_value) except ValueError: # Clearer error message raise ValueError('Missing or invalid background value') if self.config == "cubeviz" and data.ndim > 2: comp_data = comp.data[:, :, self._cubeviz_slice_ind].T # nx, ny --> ny, nx # Similar to coords_info logic. if '_orig_spec' in getattr(data, 'meta', {}): w = data.meta['_orig_spec'].wcs else: w = data.coords else: # "imviz" comp_data = comp.data # ny, nx w = data.coords if hasattr(reg, 'to_pixel'): sky_center = reg.center if self.config == "cubeviz" and data.ndim > 2: ycenter, xcenter = w.world_to_pixel(self._cube_wave, sky_center)[1] else: # "imviz" xcenter, ycenter = w.world_to_pixel(sky_center) else: xcenter = reg.center.x ycenter = reg.center.y if data.coords is not None: if self.config == "cubeviz" and data.ndim > 2: sky_center = w.pixel_to_world(self._cubeviz_slice_ind, ycenter, xcenter)[1] else: # "imviz" sky_center = w.pixel_to_world(xcenter, ycenter) else: sky_center = None aperture = regions2aperture(reg) include_pixarea_fac = False include_counts_fac = False include_flux_scale = False if comp.units: img_unit = u.Unit(comp.units) bg = bg * img_unit comp_data = comp_data << img_unit if u.sr in img_unit.bases: # TODO: Better way to detect surface brightness unit? try: pixarea = float(pixel_area if pixel_area is not None else self.pixel_area) except ValueError: # Clearer error message raise ValueError('Missing or invalid pixel area') if not np.allclose(pixarea, 0): include_pixarea_fac = True if img_unit != u.count: try: ctfac = float(counts_factor if counts_factor is not None else self.counts_factor) # noqa except ValueError: # Clearer error message raise ValueError('Missing or invalid counts conversion factor') if not np.allclose(ctfac, 0): include_counts_fac = True try: flux_scale = float(flux_scaling if flux_scaling is not None else self.flux_scaling) except ValueError: # Clearer error message raise ValueError('Missing or invalid flux scaling') if not np.allclose(flux_scale, 0): include_flux_scale = True phot_aperstats = ApertureStats(comp_data, aperture, wcs=data.coords, local_bkg=bg) phot_table = phot_aperstats.to_table(columns=( 'id', 'sum', 'sum_aper_area', 'min', 'max', 'mean', 'median', 'mode', 'std', 'mad_std', 'var', 'biweight_location', 'biweight_midvariance', 'fwhm', 'semimajor_sigma', 'semiminor_sigma', 'orientation', 'eccentricity')) # Some cols excluded, add back as needed. # noqa rawsum = phot_table['sum'][0] if include_pixarea_fac: pixarea = pixarea * (u.arcsec * u.arcsec / (u.pix * u.pix)) # NOTE: Sum already has npix value encoded, so we simply apply the npix unit here. pixarea_fac = (u.pix * u.pix) * pixarea.to(u.sr / (u.pix * u.pix)) phot_table['sum'] = [rawsum * pixarea_fac] else: pixarea_fac = None if include_counts_fac: ctfac = ctfac * (rawsum.unit / u.count) sum_ct = rawsum / ctfac sum_ct_err = np.sqrt(sum_ct.value) * sum_ct.unit else: ctfac = None sum_ct = None sum_ct_err = None if include_flux_scale: flux_scale = flux_scale * phot_table['sum'][0].unit sum_mag = -2.5 * np.log10(phot_table['sum'][0] / flux_scale) * u.mag else: flux_scale = None sum_mag = None # Extra info beyond photutils. phot_table.add_columns( [xcenter * u.pix, ycenter * u.pix, sky_center, bg, pixarea_fac, sum_ct, sum_ct_err, ctfac, sum_mag, flux_scale, data.label, reg.meta.get('label', ''), Time(datetime.now(tz=timezone.utc))], names=['xcenter', 'ycenter', 'sky_center', 'background', 'pixarea_tot', 'aperture_sum_counts', 'aperture_sum_counts_err', 'counts_fac', 'aperture_sum_mag', 'flux_scaling', 'data_label', 'subset_label', 'timestamp'], indexes=[1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 18, 18, 18]) if self.config == "cubeviz": if data.ndim > 2: slice_val = self._cube_wave else: slice_val = u.Quantity(np.nan, self._cube_wave.unit) phot_table.add_column(slice_val, name="slice_wave", index=29) if add_to_table: try: phot_table['id'][0] = self.table._qtable['id'].max() + 1 self.table.add_item(phot_table) except Exception: # Discard incompatible QTable self.table.clear_table() phot_table['id'][0] = 1 self.table.add_item(phot_table) # User wants 'sum' as scientific notation. self.table._qtable['sum'].info.format = '.6e' # Plots. if update_plots: if self.current_plot_type == "Curve of Growth": if self.config == "cubeviz" and data.ndim > 2: self.plot.figure.title = f'Curve of growth from aperture center at {slice_val:.4e}' # noqa: E501 else: self.plot.figure.title = 'Curve of growth from aperture center' x_arr, sum_arr, x_label, y_label = _curve_of_growth( comp_data, (xcenter, ycenter), aperture, phot_table['sum'][0], wcs=data.coords, background=bg, pixarea_fac=pixarea_fac) self.plot._update_data('profile', x=x_arr, y=sum_arr, reset_lims=True) self.plot.update_style('profile', line_visible=True, color='gray', size=32) self.plot.update_style('fit', visible=False) self.plot.figure.axes[0].label = x_label self.plot.figure.axes[1].label = y_label else: # Radial profile self.plot.figure.axes[0].label = 'pix' self.plot.figure.axes[1].label = comp.units or 'Value' if self.current_plot_type == "Radial Profile": if self.config == "cubeviz" and data.ndim > 2: self.plot.figure.title = f'Radial profile from aperture center at {slice_val:.4e}' # noqa: E501 else: self.plot.figure.title = 'Radial profile from aperture center' x_data, y_data = _radial_profile( phot_aperstats.data_cutout, phot_aperstats.bbox, (xcenter, ycenter), raw=False) self.plot._update_data('profile', x=x_data, y=y_data, reset_lims=True) self.plot.update_style('profile', line_visible=True, color='gray', size=32) else: # Radial Profile (Raw) if self.config == "cubeviz" and data.ndim > 2: self.plot.figure.title = f'Raw radial profile from aperture center at {slice_val:.4e}' # noqa: E501 else: self.plot.figure.title = 'Raw radial profile from aperture center' x_data, y_data = _radial_profile( phot_aperstats.data_cutout, phot_aperstats.bbox, (xcenter, ycenter), raw=True) self.plot._update_data('profile', x=x_data, y=y_data, reset_lims=True) self.plot.update_style('profile', line_visible=False, color='gray', size=10) # Fit Gaussian1D to radial profile data. if self.fit_radial_profile: fitter = LevMarLSQFitter() y_max = y_data.max() x_mean = x_data[np.where(y_data == y_max)].mean() std = 0.5 * (phot_table['semimajor_sigma'][0] + phot_table['semiminor_sigma'][0]) if isinstance(std, u.Quantity): std = std.value gs = Gaussian1D(amplitude=y_max, mean=x_mean, stddev=std, fixed={'amplitude': True}, bounds={'amplitude': (y_max * 0.5, y_max)}) if ASTROPY_LT_5_2: fitter_kw = {} else: fitter_kw = {'filter_non_finite': True} with warnings.catch_warnings(record=True) as warns: fit_model = fitter(gs, x_data, y_data, **fitter_kw) if len(warns) > 0: msg = os.linesep.join([str(w.message) for w in warns]) self.hub.broadcast(SnackbarMessage( f"Radial profile fitting: {msg}", color='warning', sender=self)) y_fit = fit_model(x_data) self.app.fitted_models[self._fitted_model_name] = fit_model self.plot._update_data('fit', x=x_data, y=y_fit, reset_lims=True) self.plot.update_style('fit', color='magenta', markers_visible=False, line_visible=True) else: self.plot.update_style('fit', visible=False) # Parse results for GUI. tmp = [] for key in phot_table.colnames: if key in ('id', 'data_label', 'subset_label', 'background', 'pixarea_tot', 'counts_fac', 'aperture_sum_counts_err', 'flux_scaling', 'timestamp'): continue x = phot_table[key][0] if (isinstance(x, (int, float, u.Quantity)) and key not in ('xcenter', 'ycenter', 'sky_center', 'sum_aper_area', 'aperture_sum_counts', 'aperture_sum_mag', 'slice_wave')): tmp.append({'function': key, 'result': f'{x:.4e}'}) elif key == 'sky_center' and x is not None: tmp.append({'function': 'RA center', 'result': f'{x.ra.deg:.6f} deg'}) tmp.append({'function': 'Dec center', 'result': f'{x.dec.deg:.6f} deg'}) elif key in ('xcenter', 'ycenter', 'sum_aper_area'): tmp.append({'function': key, 'result': f'{x:.1f}'}) elif key == 'aperture_sum_counts' and x is not None: tmp.append({'function': key, 'result': f'{x:.4e} ({phot_table["aperture_sum_counts_err"][0]:.4e})'}) elif key == 'aperture_sum_mag' and x is not None: tmp.append({'function': key, 'result': f'{x:.3f}'}) elif key == 'slice_wave': if data.ndim > 2: tmp.append({'function': key, 'result': f'{slice_val:.4e}'}) else: tmp.append({'function': key, 'result': str(x)}) if update_plots: # Also display fit results fit_tmp = [] if fit_model is not None and isinstance(fit_model, Gaussian1D): for param in ('mean', 'fwhm', 'amplitude'): p_val = getattr(fit_model, param) if isinstance(p_val, Parameter): p_val = p_val.value fit_tmp.append({'function': param, 'result': f'{p_val:.4e}'}) self.results = tmp self.result_available = True if update_plots: self.fit_results = fit_tmp self.plot_available = True return phot_table, fit_model
[docs] def vue_do_aper_phot(self, *args, **kwargs): if self.dataset_selected == '' or self.aperture_selected == '': self.hub.broadcast(SnackbarMessage( "No data for aperture photometry", color='error', sender=self)) return try: if self.multiselect: # even though plots aren't show in the UI when in multiselect mode, # we'll create the last entry so if multiselect is disabled, the last # iteration will show and not result in confusing behavior self.calculate_batch_photometry(add_to_table=True, update_plots=True) else: self.calculate_photometry(add_to_table=True, update_plots=True) except Exception as e: # pragma: no cover self.plot.clear_all_marks() msg = f"Aperture photometry failed: {repr(e)}" self.hub.broadcast(SnackbarMessage(msg, color='error', sender=self)) self.result_failed_msg = msg else: self.result_failed_msg = ''
[docs] def unpack_batch_options(self, **options): """ Unpacks a dictionary of options for batch mode, including all combinations of any values passed as tuples or lists. For example:: unpack_batch_options(dataset=['image1', 'image2'], aperture=['Subset 1', 'Subset 2'], background=['Subset 3'], flux_scaling=3 ) would result in:: [{'aperture': 'Subset 1', 'dataset': 'image1', 'background': 'Subset 3', 'flux_scaling': 3}, {'aperture': 'Subset 2', 'dataset': 'image1', 'background': 'Subset 3', 'flux_scaling': 3}, {'aperture': 'Subset 1', 'dataset': 'image2', 'background': 'Subset 3', 'flux_scaling': 3}, {'aperture': 'Subset 2', 'dataset': 'image2', 'background': 'Subset 3', 'flux_scaling': 3}] Parameters ---------- options : dict, optional Dictionary of values to override from the values set in the plugin/traitlets. Each entry can either be a single value, or a list. All combinations of those that contain a list will be exposed. If not provided and the plugin is in multiselect mode (``multiselect = True``), then the current values set in the plugin will be used. Returns ------- options : list List of all combinations of input parameters, which can then be used as input to `calculate_batch_photometry`. """ if not isinstance(options, dict): raise TypeError("options must be a dictionary") if not options: if not self.multiselect: # pragma: no cover raise ValueError("must either provide a dictionary or set plugin to multiselect mode") # noqa options = {'dataset': self.dataset.selected, 'aperture': self.aperture.selected} # TODO: use self.user_api once API is made public user_api = self # .user_api invalid_keys = [k for k in options.keys() if not hasattr(user_api, k)] if len(invalid_keys): raise ValueError(f"{invalid_keys} are not valid inputs for batch photometry") def _is_single(v): if isinstance(v, (list, tuple)): if len(v) == 1: return True, v[0] return False, v return True, v single_values, mult_values = {}, {} for k, v in options.items(): is_single, this_value = _is_single(v) if is_single: single_values[k] = this_value else: mult_values[k] = this_value def _unpack_dict_list(mult_values, single_values): if not len(mult_values): return [single_values] options_list = [] # loop over the first item in mult_values # any remaining mult values will require recursion this_attr, this_values = list(mult_values.items())[0] remaining_mult_values = {k: v for j, (k, v) in enumerate(mult_values.items()) if j > 0} for this_value in this_values: if not len(remaining_mult_values): options_list += [{this_attr: this_value, **single_values}] continue options_list += _unpack_dict_list(remaining_mult_values, {this_attr: this_value, **single_values}) return options_list return _unpack_dict_list(mult_values, single_values)
@with_spinner() def calculate_batch_photometry(self, options=[], add_to_table=True, update_plots=True, full_exceptions=False): """ Run aperture photometry over a list of options. Unprovided options will remain at their values defined in the plugin. To provide a list of values per-input, use `unpack_batch_options` to and pass that as input here. Parameters ---------- options : list Each entry will result in one computation of aperture photometry and should be a dictionary of values to override from the values set in the plugin/traitlets. add_to_table : bool Whether to add results to the plugin table. update_plots : bool Whether to update the plugin plots for the last iteration. full_exceptions : bool, optional Whether to expose the full exception message for all failed iterations. """ # input validation if not isinstance(options, list): raise TypeError("options must be a list of dictionaries") if not np.all([isinstance(option, dict) for option in options]): raise TypeError("options must be a list of dictionaries") if not len(options): if not self.multiselect: # pragma: no cover raise ValueError("must either provide manual options or put the plugin in multiselect mode") # noqa # unpack the batch options as provided in the app options = self.unpack_batch_options() failed_iters, exceptions = [], [] for i, option in enumerate(options): # only update plots on the last iteration this_update_plots = i == len(options) and update_plots defaults = self._get_defaults_from_metadata(option.get('dataset', self.dataset.selected)) if self.pixel_area_multi_auto: option.setdefault('pixel_area', defaults.get('pixel_area', 0)) if self.flux_scaling_multi_auto: option.setdefault('flux_scaling', defaults.get('flux_scaling', 0)) try: self.calculate_photometry(add_to_table=add_to_table, update_plots=this_update_plots, **option) except Exception as e: failed_iters.append(i) if full_exceptions: exceptions.append(e) if len(failed_iters): err_msg = f"inputs {failed_iters} failed and were skipped." if full_exceptions: err_msg += f" Exception messages: {exceptions}" else: err_msg += " To see full exceptions, run individually or pass full_exceptions=True" # noqa raise RuntimeError(err_msg)
# NOTE: These are hidden because the APIs are for internal use only # but we need them as a separate functions for unit testing. def _radial_profile(radial_cutout, reg_bb, centroid, raw=False): """Calculate radial profile. Parameters ---------- radial_cutout : ndarray Cutout image from ``ApertureStats``. reg_bb : obj Bounding box from ``ApertureStats``. centroid : tuple of int ``ApertureStats`` centroid or desired center in ``(x, y)``. raw : bool If `True`, returns raw data points for scatter plot. Otherwise, use ``imexam`` algorithm for a clean plot. """ reg_ogrid = np.ogrid[reg_bb.iymin:reg_bb.iymax, reg_bb.ixmin:reg_bb.ixmax] radial_dx = reg_ogrid[1] - centroid[0] radial_dy = reg_ogrid[0] - centroid[1] radial_r = np.hypot(radial_dx, radial_dy) # Sometimes the mask is smaller than radial_r if radial_cutout.shape != reg_bb.shape: radial_r = radial_r[:radial_cutout.shape[0], :radial_cutout.shape[1]] radial_r = radial_r[~radial_cutout.mask].ravel() # pix radial_img = radial_cutout.compressed() # data unit if raw: i_arr = np.argsort(radial_r) x_arr = radial_r[i_arr] y_arr = radial_img[i_arr] else: # This algorithm is from the imexam package, # see licenses/IMEXAM_LICENSE.txt for more details radial_r = np.rint(radial_r).astype(int) y_arr = np.bincount(radial_r, radial_img) / np.bincount(radial_r) x_arr = np.arange(y_arr.size) return x_arr, y_arr def _curve_of_growth(data, centroid, aperture, final_sum, wcs=None, background=0, n_datapoints=10, pixarea_fac=None): """Calculate curve of growth for aperture photometry. Parameters ---------- data : ndarray or `~astropy.units.Quantity` Data for the calculation. centroid : tuple of int ``ApertureStats`` centroid or desired center in ``(x, y)``. aperture : obj ``photutils`` aperture to use, except its center will be changed to the given ``centroid``. This is because the aperture might be hand-drawn and a more accurate centroid has been recalculated separately. final_sum : float or `~astropy.units.Quantity` Aperture sum that is already calculated in the main plugin above. wcs : obj or `None` Supported WCS objects or `None`. background : float or `~astropy.units.Quantity` Background to subtract, if any. Unit must match ``data``. n_datapoints : int Number of data points in the curve. pixarea_fac : float or `None` For ``flux_unit/sr`` to ``flux_unit`` conversion. Returns ------- x_arr : ndarray Data for X-axis of the curve. sum_arr : ndarray or `~astropy.units.Quantity` Data for Y-axis of the curve. x_label, y_label : str X- and Y-axis labels, respectively. Raises ------ TypeError Unsupported aperture. """ n_datapoints += 1 # n + 1 if hasattr(aperture, 'to_pixel'): aperture = aperture.to_pixel(wcs) if isinstance(aperture, CircularAperture): x_label = 'Radius (pix)' x_arr = np.linspace(0, aperture.r, num=n_datapoints)[1:] aper_list = [CircularAperture(centroid, cur_r) for cur_r in x_arr[:-1]] elif isinstance(aperture, EllipticalAperture): x_label = 'Semimajor axis (pix)' x_arr = np.linspace(0, aperture.a, num=n_datapoints)[1:] a_arr = x_arr[:-1] b_arr = aperture.b * a_arr / aperture.a aper_list = [EllipticalAperture(centroid, cur_a, cur_b, theta=aperture.theta) for (cur_a, cur_b) in zip(a_arr, b_arr)] elif isinstance(aperture, RectangularAperture): x_label = 'Width (pix)' x_arr = np.linspace(0, aperture.w, num=n_datapoints)[1:] w_arr = x_arr[:-1] h_arr = aperture.h * w_arr / aperture.w aper_list = [RectangularAperture(centroid, cur_w, cur_h, theta=aperture.theta) for (cur_w, cur_h) in zip(w_arr, h_arr)] else: raise TypeError(f'Unsupported aperture: {aperture}') sum_arr = [ApertureStats(data, cur_aper, wcs=wcs, local_bkg=background).sum for cur_aper in aper_list] if isinstance(sum_arr[0], u.Quantity): sum_arr = u.Quantity(sum_arr) else: sum_arr = np.array(sum_arr) if pixarea_fac is not None: sum_arr = sum_arr * pixarea_fac sum_arr = np.append(sum_arr, final_sum) if isinstance(sum_arr, u.Quantity): y_label = sum_arr.unit.to_string() sum_arr = sum_arr.value # bqplot does not like Quantity else: y_label = 'Value' return x_arr, sum_arr, x_label, y_label