Source code for jdaviz.configs.imviz.plugins.parsers

import os

import asdf
import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.nddata import NDData
from astropy.wcs import WCS
from astropy.utils.data import cache_contents

from glue.core.data import Component, Data
from gwcs.wcs import WCS as GWCS
from stdatamodels import asdf_in_fits

from jdaviz.core.registries import data_parser_registry
from jdaviz.core.events import SnackbarMessage
from jdaviz.utils import standardize_metadata, PRIHDR_KEY, _wcs_only_label

try:
    from roman_datamodels import datamodels as rdd
except ImportError:
    HAS_ROMAN_DATAMODELS = False
else:
    HAS_ROMAN_DATAMODELS = True

__all__ = ['parse_data']

INFO_MSG = ("The file contains more viewable extensions. Add the '[*]' suffix"
            " to the file name to load all of them.")


def prep_data_layer_as_dq(data, component_id='DQ'):
    # nans are used to mark "good" flags in the DQ colormap, so
    # convert DQ array to float to support nans:
    cid = data.get_component(component_id)
    data_arr = np.float32(cid.data)
    data_arr[data_arr == 0] = np.nan
    data.update_components({cid: data_arr})


[docs] @data_parser_registry("imviz-data-parser") def parse_data(app, file_obj, ext=None, data_label=None, parent=None): """Parse a data file into Imviz. Parameters ---------- app : `~jdaviz.app.Application` The application-level object used to reference the viewers. file_obj : str or obj The path to an image data file or FITS HDUList or image object. ext : str, tuple, or `None`, optional FITS extension, if given. Examples: ``'SCI'`` or ``('SCI', 1)`` data_label : str, optional The label to be applied to the Glue data component. """ if isinstance(file_obj, str): if data_label is None: data_label = os.path.splitext(os.path.basename(file_obj))[0] # If file_obj is a path to a cached file from # astropy.utils.data.download_file, the path has no file extension. # Here we check if the file is in the download cache, and if it is, # we look up the file extension from the source URL: if file_obj.endswith('contents'): path_to_url_mapping = {v: k for k, v in cache_contents().items() if file_obj} source_url = path_to_url_mapping[file_obj] # file_obj_lower is only used for checking extensions, # file_obj is passed for parsing and is not modified here: file_obj_lower = source_url.split('/')[-1].lower() else: file_obj_lower = file_obj.lower() if file_obj_lower.endswith(('.jpg', '.jpeg', '.png')): from skimage.io import imread from skimage.color import rgb2gray, rgba2rgb im = imread(file_obj) if im.shape[2] == 4: pf = rgb2gray(rgba2rgb(im)) else: # Assume RGB pf = rgb2gray(im) pf = pf[::-1, :] # Flip it _parse_image(app, pf, data_label, ext=ext, parent=parent) elif file_obj_lower.endswith('.asdf'): try: if HAS_ROMAN_DATAMODELS: with rdd.open(file_obj) as pf: _parse_image(app, pf, data_label, ext=ext, parent=parent) except TypeError: # if roman_datamodels cannot parse the file, load it with asdf: with asdf.open(file_obj) as af: _parse_image(app, af, data_label, ext=ext, parent=parent) elif file_obj_lower.endswith('.reg'): # This will load DS9 regions as Subset but only if there is already data. app._jdaviz_helper.load_regions_from_file(file_obj) else: # Assume FITS with fits.open(file_obj) as pf: available_extensions = [hdu.name for hdu in pf] # if FITS file contains SCI and DQ extensions, assume the # parent for the DQ is SCI: if ( 'SCI' in available_extensions and ext == 'DQ' and parent is None ): loaded_data_labels = [data.label for data in app.data_collection] latest_sci_extension = [label for label in loaded_data_labels if label.endswith('[DATA]')][-1] parent = latest_sci_extension _parse_image(app, pf, data_label, ext=ext, parent=parent) else: _parse_image(app, file_obj, data_label, ext=ext, parent=parent)
def get_image_data_iterator(app, file_obj, data_label, ext=None): """This function is for internal use, so other viz can also extract image data like Imviz does. """ if isinstance(file_obj, fits.HDUList): if 'ASDF' in file_obj: # JWST ASDF-in-FITS # Load multiple extensions if ext == '*' or isinstance(ext, (tuple, list)): data_iter = _jwst_all_to_glue_data(file_obj, data_label, load_extensions=ext) # Load only specified extension else: data_iter = _jwst_to_glue_data(file_obj, ext, data_label) # if more than one viewable extension is found in the file, # issue info message. _info_nextensions(app, file_obj) elif ext == '*' or isinstance(ext, (tuple, list)): # Load multiple extensions data_iter = _hdus_to_glue_data(file_obj, data_label, ext=ext) elif ext is not None: # Load just the EXT user wants hdu = file_obj[ext] _validate_fits_image2d(hdu) data_iter = _hdu_to_glue_data(hdu, data_label, hdulist=file_obj) else: # Load first image extension found found = False for hdu in file_obj: if _validate_fits_image2d(hdu, raise_error=False): data_iter = _hdu_to_glue_data(hdu, data_label, hdulist=file_obj) found = True # if more than one viewable extension is found in the file, # issue info message. _info_nextensions(app, file_obj) break if not found: raise ValueError(f'{file_obj} does not have any FITS image') elif isinstance(file_obj, (fits.ImageHDU, fits.CompImageHDU, fits.PrimaryHDU)): # NOTE: ext is not used here. It only means something if HDUList is given. _validate_fits_image2d(file_obj) data_iter = _hdu_to_glue_data(file_obj, data_label) elif isinstance(file_obj, NDData): if file_obj.meta.get(_wcs_only_label, False): data_iter = _wcsonly_to_glue_data(file_obj, data_label) else: data_iter = _nddata_to_glue_data(file_obj, data_label) elif isinstance(file_obj, np.ndarray): data_iter = _ndarray_to_glue_data(file_obj, data_label) # load Roman 2D datamodels: elif HAS_ROMAN_DATAMODELS and isinstance(file_obj, rdd.DataModel): data_iter = _roman_2d_to_glue_data(file_obj, data_label, ext=ext) # load ASDF files that may not validate as Roman datamodels: elif isinstance(file_obj, asdf.AsdfFile): data_iter = _roman_asdf_2d_to_glue_data(file_obj, data_label, ext=ext) else: raise NotImplementedError(f'Imviz does not support {file_obj}') return data_iter def _parse_image(app, file_obj, data_label, ext=None, parent=None): if app is None: raise ValueError("app is None, cannot proceed") if data_label is None: data_label = app.return_data_label(file_obj, ext, alt_name="image_data") data_iter = get_image_data_iterator(app, file_obj, data_label, ext=ext) # Save the SCI extension to this list: sci_ext = None for data, data_label in data_iter: # if the science extension hasn't been identified yet, do so here: if sci_ext is None and data_label.endswith(('[DATA]', '[SCI]')): sci_ext = data_label if isinstance(data.coords, GWCS) and (data.coords.bounding_box is not None): # keep a copy of the original bounding box so we can detect # when extrapolating beyond, but then remove the bounding box # so that image layers are not cropped. # NOTE: if extending this beyond GWCS, the mouseover logic # for outside_*_bounding_box should also be updated. data.coords._orig_bounding_box = data.coords.bounding_box data.coords.bounding_box = None if not data.meta.get(_wcs_only_label, False): data_label = app.return_data_label(data_label, alt_name="image_data") # TODO: generalize/centralize this for use in other configs too if data_label.endswith('[DQ]'): prep_data_layer_as_dq(data) if parent is not None: parent_data_label = parent elif data_label.endswith('[DQ]'): parent_data_label = sci_ext else: parent_data_label = None app.add_data(data, data_label, parent=parent_data_label) # Do not link image data here. We do it at the end in Imviz.load_data() def _info_nextensions(app, file_obj): if _count_image2d_extensions(file_obj) > 1: info_msg = SnackbarMessage(INFO_MSG, color="info", timeout=8000, sender=app) app.hub.broadcast(info_msg) def _count_image2d_extensions(file_obj): count = 0 for hdu in file_obj: if _validate_fits_image2d(hdu, raise_error=False): count += 1 return count def _validate_fits_image2d(hdu, raise_error=True): valid = hdu.data is not None and hdu.is_image and hdu.data.ndim == 2 if not valid and raise_error: if hdu.data is not None and hdu.is_image: ndim = hdu.data.ndim else: ndim = None raise ValueError( f'Imviz cannot load this HDU ({hdu}): ' f'has_data={hdu.data is not None}, is_image={hdu.is_image}, ' f'name={hdu.name}, ver={hdu.ver}, ndim={ndim}') return valid def _validate_bunit(bunit, raise_error=True): # TODO: Handle weird FITS BUNIT values here, as needed. if bunit == 'ELECTRONS/S': valid = 'electron/s' elif bunit == 'ELECTRONS': valid = 'electron' else: try: u.Unit(bunit) except Exception: if raise_error: raise valid = '' else: valid = bunit return valid # ---- Functions that handle input from JWST FITS files ----- def _jwst_all_to_glue_data(file_obj, data_label, load_extensions='*'): for hdu in file_obj: if ( _validate_fits_image2d(hdu, raise_error=False) and (load_extensions == '*' or hdu.name in load_extensions) ): ext = hdu.name.lower() if ext == 'sci': ext = 'data' data, new_data_label = _jwst2data(file_obj, ext, data_label) yield data, new_data_label def _jwst_to_glue_data(file_obj, ext, data_label): # Translate FITS extension into JWST ASDF convention. if ext is None: ext = 'data' fits_ext = 'sci' else: if isinstance(ext, tuple): ext = ext[0] # EXTVER means nothing in ASDF ext = ext.lower() if ext in ('sci', 'asdf'): ext = 'data' fits_ext = 'sci' else: fits_ext = ext _validate_fits_image2d(file_obj[fits_ext]) data, new_data_label = _jwst2data(file_obj, ext, data_label) yield data, new_data_label def _jwst2data(file_obj, ext, data_label): comp_label = ext.upper() new_data_label = f'{data_label}[{comp_label}]' data = Data(label=new_data_label) unit_attr = f'bunit_{ext}' try: # This is very specific to JWST pipeline image output. with asdf_in_fits.open(file_obj) as af: dm = af.tree dm_meta = af.tree["meta"] data.meta.update(standardize_metadata(dm_meta)) if unit_attr in dm_meta: bunit = _validate_bunit(dm_meta[unit_attr], raise_error=False) else: bunit = '' # This is instance of gwcs.WCS, not astropy.wcs.WCS if 'wcs' in dm_meta: data.coords = dm_meta['wcs'] imdata = dm[ext] component = Component.autotyped(imdata, units=bunit) # Might have bad GWCS. If so, we exclude it. try: data.add_component(component=component, label=comp_label) except Exception: # pragma: no cover data.coords = None data.add_component(component=component, label=comp_label) # TODO: Do not need this when jwst.datamodels finally its own package. # This might happen for grism image; fall back to FITS loader without WCS. except Exception: if ext == 'data': ext = 'sci' hdu = file_obj[ext] return _hdu2data(hdu, data_label, file_obj, include_wcs=False) return data, new_data_label # ---- Functions that handle input from Roman ASDF files ----- def _roman_2d_to_glue_data(file_obj, data_label, ext=None): if ext == '*': # NOTE: Update as needed. Should cover all the image extensions available. ext_list = ('data', 'dq', 'err', 'var_poisson', 'var_rnoise') elif ext is None: ext_list = ('data', ) elif isinstance(ext, (list, tuple)): ext_list = ext else: ext_list = (ext, ) meta = getattr(file_obj, 'meta', {}) coords = getattr(meta, 'wcs', None) for cur_ext in ext_list: comp_label = cur_ext.upper() new_data_label = f'{data_label}[{comp_label}]' data = Data(coords=coords, label=new_data_label) # This could be a quantity or a ndarray: ext_values = getattr(file_obj, cur_ext) bunit = getattr(ext_values, 'unit', '') component = Component.autotyped(np.array(ext_values), units=bunit) data.add_component(component=component, label=comp_label) data.meta.update(standardize_metadata(dict(meta))) if comp_label == 'dq': prep_data_layer_as_dq(data, component_id=comp_label) yield data, new_data_label def _roman_asdf_2d_to_glue_data(file_obj, data_label, ext=None): if ext == '*': # NOTE: Update as needed. Should cover all the image extensions available. ext_list = ('data', 'dq', 'err', 'var_poisson', 'var_rnoise') elif ext is None: ext_list = ('data', ) elif isinstance(ext, (list, tuple)): ext_list = ext else: ext_list = (ext, ) roman = file_obj.tree.get('roman') meta = roman.get('meta', {}) coords = meta.get('wcs', None) for cur_ext in ext_list: if cur_ext in roman: comp_label = cur_ext.upper() new_data_label = f'{data_label}[{comp_label}]' data = Data(coords=coords, label=new_data_label) # This could be a quantity or a ndarray: ext_values = roman.get(cur_ext) bunit = getattr(ext_values, 'unit', '') component = Component(np.array(ext_values), units=bunit) data.add_component(component=component, label=comp_label) data.meta.update(standardize_metadata(dict(meta))) if comp_label == 'DQ': prep_data_layer_as_dq(data, component_id=comp_label) yield data, new_data_label # ---- Functions that handle input from non-JWST, non-Roman FITS files ----- def _hdu_to_glue_data(hdu, data_label, hdulist=None): data, data_label = _hdu2data(hdu, data_label, hdulist) yield data, data_label def _hdus_to_glue_data(file_obj, data_label, ext=None): for hdu in file_obj: if ext is None or ext == '*' or hdu.name in ext: if _validate_fits_image2d(hdu, raise_error=False): data, new_data_label = _hdu2data(hdu, data_label, file_obj) yield data, new_data_label def _hdu2data(hdu, data_label, hdulist, include_wcs=True): if 'BUNIT' in hdu.header: bunit = _validate_bunit(hdu.header['BUNIT'], raise_error=False) else: bunit = '' comp_label = f'{hdu.name.upper()},{hdu.ver}' new_data_label = f'{data_label}[{comp_label}]' data = Data(label=new_data_label) if hdulist is not None and hdu.name != 'PRIMARY' and 'PRIMARY' in hdulist: data.meta[PRIHDR_KEY] = standardize_metadata(hdulist['PRIMARY'].header) data.meta.update(standardize_metadata(hdu.header)) if include_wcs: data.coords = WCS(hdu.header, hdulist) component = Component.autotyped(hdu.data, units=bunit) data.add_component(component=component, label=comp_label) return data, new_data_label # ---- Functions that handle input from arrays ----- def _nddata_to_glue_data(ndd, data_label): if ndd.data.ndim != 2: raise ValueError(f'Imviz cannot load this NDData with ndim={ndd.data.ndim}') for attrib in ('data', 'mask', 'uncertainty'): arr = getattr(ndd, attrib) if arr is None: continue comp_label = attrib.upper() cur_label = f'{data_label}[{comp_label}]' cur_data = Data(label=cur_label) cur_data.meta.update(standardize_metadata(ndd.meta)) if ndd.wcs is not None: cur_data.coords = ndd.wcs raw_arr = arr if attrib == 'data': bunit = ndd.unit or '' elif attrib == 'uncertainty': raw_arr = arr.array bunit = arr.unit or '' else: bunit = '' component = Component.autotyped(raw_arr, units=bunit) cur_data.add_component(component=component, label=comp_label) yield cur_data, cur_label def _ndarray_to_glue_data(arr, data_label): if arr.ndim != 2: raise ValueError(f'Imviz cannot load this array with ndim={arr.ndim}') data = Data(label=data_label) component = Component.autotyped(arr) data.add_component(component=component, label='DATA') yield (data, data_label) # ---- Functions that handle WCS-only data ----- def _wcsonly_to_glue_data(ndd, data_label): """Return Data given NDData containing WCS-only data.""" arr = ndd.data data = Data(label=data_label) data.meta.update(standardize_metadata(ndd.meta)) data.coords = ndd.wcs component = Component.autotyped(arr, units="") data.add_component(component=component, label="DATA") yield (data, data_label)