# Misc python library to support HETDEX software and data analysis
# Copyright (C) 2015, 2016, 2017 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Reconstruct the IFU
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import itertools as it
import os
import astropy
from astropy.io import fits
import astropy.stats as stats
import numpy as np
from pkg_resources import parse_version
from pyhetdex.tools.files.fits_tools import wavelength_to_index
from pyhetdex.het import ifu_centers, dither as dith
import pyhetdex.cure.distortion as distortion
[docs]class ReconstructError(Exception):
"""Generic reconstruction error"""
pass
[docs]class ReconstructIndexError(ReconstructError, IndexError):
"""Error for miss-matching the number of fibers in the ifu center files and
the fiber extracted ones"""
[docs]class ReconstructIOError(ReconstructError, IOError):
"""Error when the name and/or number fiber extracted file names is not
correct"""
pass
[docs]class ReconstructValueError(ReconstructError, ValueError):
"""Errors for wrong combinations of input parameters in
:class:`~ReconstructedIFU` or :meth:`~ReconstructedIFU.from_files`"""
pass
# TODO: account for illumination weights and fiber throughput
[docs]class ReconstructedIFU(object):
r"""Reconstructed IFU head image from the fiber extracted frames given the
``ifu_center`` and the ``dither``.
Parameters
----------
ifu_center : instance of :class:`~pyhetdex.het.ifu_centers.IFUCenter`
fiber number to fiber position mapping
dither : instance child of :class:`~pyhetdex.het.dither._BaseDither`
dither relative position, illumination, image quality
fextract : None or list of fits files, optional
* if None the list of files is inferred from
:attr:`dither.basename`;
* if not None must have ``ndither`` x ``nchannels`` elements. The
channel name and dither number are extracted from the ``CCDPOS`` and
the ``DITHER`` header keywords
fe_prefix : string, optional
when getting the names from the dither file, prepend ``fe_prefix`` to
the ``basename``
Attributes
----------
ifu_center
dither
x, y : 1-dimensional arrays
x and y position of the fibers
flux : list of 2-dimensional arrays
each element is the content of one fiber extracted file
header : list of dictionaries
each element contains the ``CRVAL1`` and ``CDELT1`` keywords :value
pairs from the headers of the fiber extracted files; used to determine
the wavelength range in :meth:`~reconstruct`
Raises
------
ReconstructValueError
if an empty dither is passed and ``fextract`` is ``None``
ReconstructIOError
if the number and/or number of fiber extracted frames is not correct;
raised by :meth:`~_fedict`
ReconstructIndexError
if the number of fibers from the fiber extracted files and from the ifu
center files do not match; raised by :meth:`~_reconstruct`
"""
def __init__(self, ifu_center, dither, fextract=None,
fe_prefix=""):
if isinstance(dither, dith.EmptyDither) and fextract is None:
msg = "With an empty dither file a fiber extract file name must be"
msg += " provided"
raise ReconstructValueError(msg)
self.ifu_center = ifu_center
self.dither = dither
# public attributes, filled in *_reconstruct*
self.x, self.y = np.array([[], []], dtype=float)
self.flux, self.header = [], []
self._fe_prefix = fe_prefix
# the fiber extracted file names are saved into a dictionary with key
# like: D1_L
self._key = "{d}_{ch}"
self._reconstruct(self._fedict(fextract))
[docs] @classmethod
def from_files(cls, ifu_center_file, dither_file=None, fextract=None,
fe_prefix=""):
"""
Read and parse the file
Parameters
----------
ifu_center_file : string
file containing the fiber number to fiber position mapping
dither_file : string, optional
file containing the dither relative position. If not given, a singe
dither is assumed
fextract : None or list of fits files, optional
if None the list of files is inferred from the second column
of the ``dither_file``;
if not None must have ``ndither`` x ``nchannels`` elements. The
channel name and dither number are extracted from the ``CCDPOS``
and the ``DITHER`` header keywords
fe_prefix : string, optional
when getting the names from the dither file, prepend ``fe_prefix``
to the ``basename``
Raises
------
ReconstructValueError
if both ``dither_file`` and ``fextract`` are ``None``
ReconstructValueError, ReconstructIOError
as in :class:`~ReconstructedIFU`
Notes
-----
if ``dither_file`` is None, ``fextract`` must contain a number of
files equal to the number of channels (2)
"""
if dither_file is None and fextract is None:
msg = "dither_file and/or fextract must be provided"
raise ReconstructValueError(msg)
_ifu_center = ifu_centers.IFUCenter(ifu_center_file)
if dither_file is None:
_dither = dith.EmptyDither()
else:
_dither = dith.ParseDither(dither_file)
return cls(_ifu_center, _dither, fextract=fextract,
fe_prefix=fe_prefix)
[docs] def _fedict(self, fextract):
"""
Organize the fiber extracted file names into a dictionary
Parameters
----------
fextract : None or list of string
If None get the file names from the dither, otherwise from the
*fextract* list
Returns
-------
dfextract : dict
dictionary of fiber extracted frames
"""
channels = self.ifu_center.channels
dithers = self.dither.dithers
n_shots = len(channels) * len(dithers)
dfextract = {}
if fextract is None: # get the file names from the dither file
fetemplate = os.path.join(self.dither.abspath,
self._fe_prefix + "{bn}_{ch}.fits")
for ch, (d, bn) in it.product(channels,
self.dither.basename.items()):
k = self._key.format(ch=ch, d=d)
dfextract[k] = fetemplate.format(ch=ch, bn=bn)
else: # the file names are provided
if len(fextract) != n_shots:
msg = "The number of fiber extracted files is different from"
msg += " {} ({} channels and {} dithers)"
raise ReconstructIOError(msg.format(n_shots, len(channels),
len(dithers)))
# as the fiber extracted files are ordered, pop elements. This way
# makes sure that all the dithers and channels are represented
all_keys = [self._key.format(d=d, ch=ch) for d in dithers for ch in
channels]
for fe in fextract:
header = fits.getheader(fe)
ch = header['CCDPOS'].strip()
d = "D{0:d}".format(header['DITHER'])
k = self._key.format(ch=ch, d=d)
# check that the files are correct
if k in dfextract:
msg = "The file '{}' is for the same dither".format(fe)
msg += " and channel as file '{}'".format(dfextract[k])
raise ReconstructIOError(msg)
else:
try:
all_keys.remove(k)
except ValueError:
msg = ("The file '{}' is for an unknown combination of"
" dither ({}) and channel ({}). One of {}"
" expected".format(fe, d, ch,
", ".join(all_keys)))
raise ReconstructIOError(msg)
dfextract[k] = fe
return dfextract
[docs] def _reconstruct(self, dfextract):
"""
Read the fiber extracted files and creates a set of three lists for x,
y and flux.
Parameters
----------
dfextract : dictionary
name of the fiber extracted file
"""
for ch, d in it.product(self.ifu_center.channels, self.dither.dithers):
# get the correct values of x and y
self.x = np.concatenate([self.x, self.xpos(ch, d)])
self.y = np.concatenate([self.y, self.ypos(ch, d)])
# read the fiber extracted file, order the fibers and save the
# necessary keys into self.h
# python starts from 0
fib_numbs = [fn - 1 for fn in self.ifu_center.fib_number[ch]]
k = self._key.format(ch=ch, d=d)
with fits.open(dfextract[k]) as hdu:
h = hdu[0].header
data = hdu[0].data
# if the number of fiber from the fiber extracted file is
# different from the number of fibers, something wrong
if data.shape[0] != len(fib_numbs):
msg = "The number of rows in file '{0}' ({1:d}) does not"
msg += " agree with the number of active fibers from file"
msg += " '{2}' ({3:d})"
msg = msg.format(hdu.filename(), data.shape[0],
self.ifu_center.filename, len(fib_numbs))
raise ReconstructIndexError(msg)
self.flux.append(data[fib_numbs, :]) # order the fibers
# get the header keywords needed to get the index at a given
# wavelength
self.header.append({k: h.get(k) for k in ["CRVAL1", "CDELT1"]})
[docs] def xpos(self, channel, dither):
"""get the position for the x *dither* in *channel*
Parameters
----------
channel : string
name of the channel [L, R]
dither : string
name of the dither [D1, D2, ..]
Returns
-------
ndarray
x position of the fibers for the given channel and dither
"""
return np.array(self.ifu_center.xifu[channel]) + self.dither.dx[dither]
[docs] def ypos(self, channel, dither):
"""get the position for the y *dither* in *channel*
Parameters
----------
channel : string
name of the channel [L, R]
dither : string
name of the dither [D1, D2, ..]
Returns
-------
ndarray
y position of the fibers for the given channel and dither
"""
return np.array(self.ifu_center.yifu[channel]) + self.dither.dy[dither]
[docs] def reconstruct(self, wmin=None, wmax=None):
"""
Returns the reconstructed IFU with the flux computed between
[``wmin``, ``wmax``]
Parameters
----------
wmin, wmax : float, optional
min and max wavelength to use. If ``None``: use the min and/or max
from the file
Returns
-------
x, y : 1d arrays
x and y position of the fibers
flux : 1d array
flux of the fibers within ``wmin`` and ``wmax``
"""
_flux = []
for f, h in zip(self.flux, self.header):
_imin = wavelength_to_index(h, wmin)
_imax = wavelength_to_index(h, wmax)
_flux.append(f[:, _imin:_imax].sum(axis=1))
return self.x, self.y, np.concatenate(_flux)
[docs]class QuickReconstructedIFU(object):
"""Quick reconstructed IFU head image from a set of frames given the
``ifu_center`` and some reference distortion files.
Parameters
----------
ifu_center : instance of :class:`~pyhetdex.het.ifu_centers.IFUCenter`
fiber number to fiber position mapping
files : string
Basefilename of the image to be reconstructed, or a tuple of three
images to reconstruct a complete dither.
dist_l, dist_r : string
Distortion file for the left and right spectrograph; at least one of
them must be provided
pixscale : float, optional
pixel scale
Attributes
----------
ifu_center : :class:`~pyhetdex.het.ifu_centers.IFUCenter`
parsed ``ifu_center`` file
dists : dict
distortion files for ``'L'`` and ``'R'``
dx, dy : tuples
relative shifts of the dithers
pscale : float
pixel scale
img : 2d numpy array
reconstructed image
dmap : dict of dicts
for each spectrograph, map a fiber number with its y value
Raises
------
ReconstructValueError
if an empty dither is passed and ``fextract`` is ``None``
ReconstructIOError
if the number and/or number of fiber extracted frames is not correct;
raised by :meth:`~_fedict`
ReconstructIndexError
if the number of fibers from the fiber extracted files and from the ifu
center files do not match; raised by :meth:`~_reconstruct`
"""
def __init__(self, ifu_center, dist_l=None, dist_r=None, pixscale=0.25):
self._specs = ['L', 'R']
self._dithers = [1, 2, 3]
self.dists = {s: None for s in self._specs}
self.dmap = {}
self.dx = [0, -1.27, -1.27]
self.dy = [0, 0.73, -0.73]
self.x_idx = {}
self.y_idx = {}
self._pscale = pixscale
self.isEmpty = True
self._load_ifu_center(ifu_center)
self._load_dists(dist_l, dist_r)
self._create_dist_map()
self._min_max()
self._create_empty_image()
self._create_dxdy()
[docs] def _load_ifu_center(self, ifu_center):
'''Load the IFUcen file
Parameters
----------
ifu_center : instance of :class:`~pyhetdex.het.ifu_centers.IFUCenter`
fiber number to fiber position mapping
'''
if not ifu_center:
raise ReconstructValueError('An IFU center file is needed to'
' quickreconstruct an image')
self.ifu_center = ifu_centers.IFUCenter(ifu_center)
[docs] def _load_dists(self, dist_l, dist_r):
'''Load the distortion files into :attr:`dists`
Parameters
----------
dist_l, dist_r : string
Distortion file for the left and right spectrograph; at least one
of them must be provided
'''
if not dist_r and not dist_l:
raise ReconstructValueError('A distortion is needed to'
' quickreconstruct an image')
if dist_l:
self.dists['L'] = distortion.Distortion(dist_l)
if dist_r:
self.dists['R'] = distortion.Distortion(dist_r)
[docs] def _create_dist_map(self):
'''Load the distortion into :attr:`dmap`'''
for spec in self._specs:
D = self.dists[spec]
if D:
d = {}
for f in self.ifu_center.fib_number[spec]:
d[f-1] = D.map_xf_y(516, D.reference_f_.data[f-1])
self.dmap[spec] = d
[docs] def _min_max(self):
'''Compute the minimum and max x and y positions from the ifu center
file and save them in :attr:`maxx`, :attr:`minx`, :attr:`maxy`,
:attr:`miny`.'''
self.maxx = (max((max(self.ifu_center.xifu['L']),
max(self.ifu_center.xifu['R']))) +
max(self.dx) + self.ifu_center.fiber_d/2.)
self.minx = (min((min(self.ifu_center.xifu['L']),
min(self.ifu_center.xifu['R']))) +
min(self.dx) - self.ifu_center.fiber_d/2.)
self.maxy = (max((max(self.ifu_center.yifu['L']),
max(self.ifu_center.yifu['R']))) +
max(self.dy) + self.ifu_center.fiber_d/2.)
self.miny = (min((min(self.ifu_center.yifu['L']),
min(self.ifu_center.yifu['R']))) +
min(self.dy) - self.ifu_center.fiber_d/2.)
[docs] def _create_dxdy(self):
'''Precompute the dx and dy shifts'''
fr_2 = self.ifu_center.fiber_d ** 2 / 4.
for spec in self._specs:
self.x_idx[spec] = {}
self.y_idx[spec] = {}
fibs = np.array(self.ifu_center.fib_number[spec])
for dither in self._dithers:
self.x_idx[spec][dither] = {}
self.y_idx[spec][dither] = {}
for f in fibs:
self.x_idx[spec][dither][f] = []
self.y_idx[spec][dither][f] = []
it = np.nditer(self.img, flags=['multi_index'],
op_flags=['readonly'])
while not it.finished:
px = self.minx + it.multi_index[0] * self.pscale
py = self.miny + it.multi_index[1] * self.pscale
xc = np.array(self.ifu_center.xifu[spec]) + \
self.dx[dither-1]
yc = np.array(self.ifu_center.yifu[spec]) + \
self.dy[dither-1]
dx = xc - px
dy = yc - py
dd = (dx*dx + dy*dy)
for f in fibs[dd < fr_2]:
self.x_idx[spec][dither][f].append(it.multi_index[0])
self.y_idx[spec][dither][f].append(it.multi_index[1])
it.iternext()
@property
def pscale(self):
"""Pixel scale
.. warning::
changing the pixel scale with invalidate the reconstructed
image
"""
return self._pscale
@pscale.setter
def pscale(self, pixscale):
self._pscale = pixscale
self._create_empty_image()
[docs] def reconstruct(self, files, subtract_overscan=True,
do_not_scale_image_data=False):
"""Read the files and create a reconstructed image.
Parameters
----------
files : list
name of the files to use
subtract_overscan : bool
If the overscan region is still present in the image,
subtract the bias level, calculated from the overscan
region of the image.
do_not_scale_image_data : bool
If True, image data is not scaled using BSCALE/BZERO values when
read.
Returns
-------
img : nd-array
reconstructed image; it is also stored in the :attr:`img`
attribute
"""
ifuid = None
self._create_empty_image()
recon_img = self.img.copy()
for img in files: # Loop over all input file
f_offset = 0 # Python arrays start at 0, FITS files at 1...
kwargs = dict(memmap=False,
do_not_scale_image_data=do_not_scale_image_data)
with fits.open(img, **kwargs) as hdu:
h = hdu[0].header
data = hdu[0].data.transpose()
if not ifuid:
ifuid = h['IFUID']
elif ifuid != h['IFUID']:
print('WARNING: %s if from a different IFU, skipping it!')
continue
ccdpos = h['CCDPOS']
ccdhalf = h['CCDHALF']
D = self.dists[ccdpos]
if not D:
raise ReconstructValueError('No distortion given for ' +
ccdpos + ' spectrograph')
if h['NAXIS2'] < 1500: # Working on single images
if ccdhalf == 'U':
f_offset = 1032 # FIXME This should come from header
if ccdpos == 'L' and ccdhalf == 'U':
data = data[::-1, ::-1]
if ccdpos == 'R' and ccdhalf == 'L':
data = data[::-1, ::-1]
if h['NAXIS1'] > 1032 and subtract_overscan:
bias = self._get_overscan(data, h['BIASSEC'])
data = data-bias
dither = int(h.get('DITHER', self._dithers[0]))
if dither not in self._dithers:
dither = self._dithers[0]
center_x = int(np.round(data.shape[0]/2))
for f in self.ifu_center.fib_number[ccdpos]:
# fib = f+1
fy_f = self.dmap[ccdpos][f-1] - f_offset
fy = int(fy_f)
fy_d = fy_f - fy
if fy < 0 or fy > data.shape[1]:
continue
# Take the 3 pixels around the centre of the fibre profile
# and add the fractional part for the next pixel rows.
# If the fibre centre would be in the middle of the central
# pixel, 50% of each of the outer rows would be added. As
# the centre is shifted with respect to the middle, the
# relative percentages of the outer rows change
# accordingly.
flx = np.sum(data[center_x-20:center_x + 20,
fy-1:fy+2]) + \
np.sum(data[center_x-20:center_x + 20,
fy-2:fy-1]*(1.-fy_d)) + \
np.sum(data[center_x-20:center_x+20,
fy+2:fy+3]*(fy_d))
if self.x_idx[ccdpos][dither][f] and \
self.y_idx[ccdpos][dither][f]:
recon_img[[self.x_idx[ccdpos][dither][f],
self.y_idx[ccdpos][dither][f]]] += flx
recon_img = recon_img.transpose()
self.img = recon_img.copy()
self.isEmpty = False
return recon_img
[docs] def write(self, filename):
"""Write the reconstructed image to file ``filename`` as using the fits
format
Parameters
----------
filename : string
name of the output fits file
"""
if not self.isEmpty:
if parse_version(astropy.__version__) < parse_version('1.3'):
writeto_kwars = {'clobber': True}
else:
writeto_kwars = {'overwrite': True}
outimg = fits.PrimaryHDU(self.img)
outimg.writeto(filename, **writeto_kwars)
else:
raise ReconstructError("Make sure to run the ``reconstruct``"
" method to create the image before saving"
" it")
[docs] def _section_to_list(self, sec):
"""
Convert a header section string of the form [x1:x2,y1:y2]
to a list of integers
Parameters
----------
sec : str
The header section string
"""
return [int(i) for i in sec.replace('[', '').replace(']', '').
replace(':', ',').split(',')]
[docs] def _get_overscan(self, img, biassec):
r"""Extract the sigma clipped mean of the overscan region
Parameters
----------
img : instance of :class:`numpy.array`
Input image data
biassec : list
List or tuple with the ranges of the bias section.
Use :func:_section_to_list to convert the header
BIASSEC keyword to a python list
"""
biasreg = self._section_to_list(biassec)
return np.mean(stats.sigma_clip(img[biasreg[0]:biasreg[1], ]))
[docs] def _create_empty_image(self):
"""Find the number of pixels x and y and create empty images"""
nx = int((self.maxx - self.minx) / self.pscale)
ny = int((self.maxy - self.miny) / self.pscale)
self.img = np.zeros((nx, ny))
self.isEmpty = True
[docs]def argument_parser(argv=None):
"""Parse the command line"""
import argparse
# Parse user input
description = """Reconstruct the IFU image from a list of fits images."""
parser = argparse.ArgumentParser(description=description,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('ifucen', help="""Name of the IFUcen file""")
parser.add_argument('files', nargs='+', help="""The input images""")
parser.add_argument('-o', '--outfile', help="""Name of a file to output""",
default='reconstruct.fits')
parser.add_argument('-l', '--ldist', help="""Name of the distortion file for the
left spectrograph; at least one of '%(dest)s' or
'rdist' must be provided""")
parser.add_argument('-r', '--rdist', help="""Name of the distortion file for the
right spectrograph""")
parser.add_argument('-s', '--scale', help="""Scale of the pixels in the
reconstructed image""", default=0.3, type=float)
return parser.parse_args(argv)
[docs]def create_quick_reconstruction(argv=None):
"""Entry point for the executable running the quick reconstruction of the
IFU reconstruction image
Parameters
----------
argv : list of strings
if not provided sys.argv is used
"""
args = argument_parser(argv=argv)
# create the shot object
recon = QuickReconstructedIFU(args.ifucen, dist_r=args.rdist,
dist_l=args.ldist, pixscale=args.scale)
recon.reconstruct(args.files, subtract_overscan=True)
recon.write(args.outfile)