Source code for pyhetdex.cure.distortion

# Misc python library to support HETDEX software and data analysis
# Copyright (C) 2015, 2016  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, print_function

import pyhetdex.ltl.marray as ma
import pyhetdex.ltl.chebyshev as cheby
from pyhetdex.tools import io_helpers

import datetime
import numpy as np

__version__ = '$Id: distortion.py 393 2018-05-08 09:54:47Z montefra $'


[docs]class DistortionBase(object): def __init__(self, filename): _vdict = {14: Distortion_14, 17: Distortion_14} in_ = open(filename) fileversion = int(io_helpers.skip_commentlines(in_)) in_.close() if fileversion not in _vdict: raise IOError('Unsupported version of Distortion file!') self._cls = _vdict[fileversion](filename)
[docs]class Distortion(DistortionBase): def __init__(self, filename): super(Distortion, self).__init__(filename) self._cls.read() def __getattribute__(self, a): try: return super(Distortion, self).__getattribute__(a) except AttributeError: return self._cls.__getattribute__(a) def __setattr__(self, a, v): try: return super(Distortion, self).__setattr__(a, v) except AttributeError: # pragma: no cover return self._cls.__setattr__(a, v)
[docs]class Distortion_14(object): def __init__(self, filename): self.filename = filename self.version = 0 self.minw = float(0) self.maxw = float(0) self.minf = float(0) self.maxf = float(0) self.minx = float(0) self.maxx = float(0) self.miny = float(0) self.maxy = float(0) self.wave_par_ = ma.FVector() self.wave_errors_ = ma.FVector() self.fiber_par_ = ma.FVector() self.fiber_errors_ = ma.FVector() self.x_par_ = ma.FVector() self.x_errors_ = ma.FVector() self.y_par_ = ma.FVector() self.y_errors_ = ma.FVector() self.fy_par_ = ma.FVector() self.fy_errors_ = ma.FVector() self.reference_wavelength_ = float(0) self.reference_w_ = ma.MArray() self.reference_f_ = ma.MArray() self.wave_offsets_ = ma.MArray() self.x_offsets_ = ma.MArray()
[docs] def read(self): with open(self.filename) as in_: self.version = int(io_helpers.skip_commentlines(in_)) self.minw = float(io_helpers.skip_commentlines(in_)) self.maxw = float(io_helpers.skip_commentlines(in_)) self.minf = float(io_helpers.skip_commentlines(in_)) self.maxf = float(io_helpers.skip_commentlines(in_)) self.minx = float(io_helpers.skip_commentlines(in_)) self.maxx = float(io_helpers.skip_commentlines(in_)) self.miny = float(io_helpers.skip_commentlines(in_)) self.maxy = float(io_helpers.skip_commentlines(in_)) self.wave_par_.read(in_) self.wave_errors_.read(in_) self.fiber_par_.read(in_) self.fiber_errors_.read(in_) self.x_par_.read(in_) self.x_errors_.read(in_) self.y_par_.read(in_) self.y_errors_.read(in_) self.fy_par_.read(in_) self.fy_errors_.read(in_) self.reference_wavelength_ = \ float(io_helpers.skip_commentlines(in_)) self.reference_w_.read(in_) self.reference_f_.read(in_) self.wave_offsets_.read(in_) self.x_offsets_.read(in_)
[docs] def writeto(self, filename): with open(filename, 'w') as out_: out_.write('# CALL : Written by pyhetdex %s\n' % __version__) out_.write('#FILE CREATED ON : %s\n' % datetime.datetime.now().strftime('%c')) out_.write('# Title: Distortion model\n') out_.write('%d\n' % self.version) out_.write('%.6e\n' % self.minw) out_.write('%.6e\n' % self.maxw) out_.write('%.6e\n' % self.minf) out_.write('%.6e\n' % self.maxf) out_.write('%.6e\n' % self.minx) out_.write('%.6e\n' % self.maxx) out_.write('%.6e\n' % self.miny) out_.write('%.6e\n' % self.maxy) self.wave_par_.write(out_) self.wave_errors_.write(out_) self.fiber_par_.write(out_) self.fiber_errors_.write(out_) self.x_par_.write(out_) self.x_errors_.write(out_) self.y_par_.write(out_) self.y_errors_.write(out_) self.fy_par_.write(out_) self.fy_errors_.write(out_) out_.write('%.6e\n' % self.reference_wavelength_) self.reference_w_.write(out_) self.reference_f_.write(out_) self.wave_offsets_.write(out_) self.x_offsets_.write(out_)
[docs] def _scal_x(self, x): return (x - self.minx) / (self.maxx - self.minx)
[docs] def _scal_y(self, y): return (y - self.miny) / (self.maxy - self.miny)
[docs] def _scal_w(self, w): return (w - self.minw) / (self.maxw - self.minw)
[docs] def _scal_f(self, f): return (f - self.minf) / (self.maxf - self.minf)
[docs] def get_numfibers(self): return len(self.reference_f_.data)
[docs] def get_reference_f(self, fiber): return self.reference_f_.data[fiber-1]
[docs] def map_xy_fiber(self, x, y): if isinstance(x, (tuple, list)): x = np.asarray(x) if isinstance(y, (tuple, list)): y = np.asarray(y) return self.interp(self._scal_x(x), self._scal_y(y), self.fiber_par_.data)
[docs] def map_xy_wavelength(self, x, y): w, f = self.map_xy_wf(x, y) return w
[docs] def map_xy_wf(self, x, y): if isinstance(x, (tuple, list)): x = np.asarray(x) if isinstance(y, (tuple, list)): y = np.asarray(y) value = self.interp(self._scal_x(x), self._scal_y(y), self.wave_par_.data) if self.reference_wavelength_ < 0: return value fiber = self.map_xy_fiber(x, y) if np.isscalar(fiber): return value-self.wave_offsets_.data[self._closest_fiber(fiber)], \ fiber wo = [self.wave_offsets_.data[self._closest_fiber(fib)] for fib in fiber] return value-np.array(wo), fiber
[docs] def map_xf_y(self, x, f): if isinstance(x, (tuple, list)): x = np.asarray(x) if isinstance(f, (tuple, list)): f = np.asarray(f) return self.interp(self._scal_x(x), self._scal_f(f), self.fy_par_.data)
[docs] def map_wf_x(self, w, f): if isinstance(w, (tuple, list)): w = np.asarray(w) if isinstance(f, (tuple, list)): f = np.asarray(f) value = self.interp(self._scal_w(w), self._scal_f(f), self.x_par_.data) if self.reference_wavelength_ < 0: return value if np.isscalar(f): return value-self.x_offsets_.data[self._closest_fiber(f)] xo = [self.x_offsets_.data[self._closest_fiber(fib)] for fib in f] return value - np.array(xo)
[docs] def map_wf_y(self, w, f): if isinstance(w, (tuple, list)): w = np.asarray(w) if isinstance(f, (tuple, list)): f = np.asarray(f) return self.interp(self._scal_w(w), self._scal_f(f), self.y_par_.data)
[docs] def map_xy_fibernum(self, x, y): f = self.map_xy_fiber(x, y) # Fibernumbers are one based, numpy arrays zero based return self._closest_fiber(f) + 1
[docs] def _closest_fiber(self, f): if np.isscalar(f): return self._find_closest_index(self.reference_f_.data, f) idx = [self._find_closest_index(self.reference_f_.data, fib) for fib in f] return np.array(idx)
[docs] def _find_closest_index(self, a, v): return (np.abs(a-v)).argmin()
[docs] def interp(self, x, y, par): return cheby.interpCheby2D_7(x, y, par)