"""
Name = 'ModuleUtil'
VERSION = '1.2';
author = 'Jose Vicente Perea';
date = '2020-05-07';
ChangeLog
=========
Version 1.2 - 2020-05-07 (JVP)
------------
+ convertImage(): New function to convert EPIC images into array ready for plotting
+ TSrebin(): Fracexp definition ( = 1) for TS with no FRACEXP column
Version 1.1 - 2018-04-24 (JVP)
------------
+ TSrebin() Module imports included in function definition
+ varCheck() New class for variable checking
Version 1.0 - 2017-05-09 (JVP)
------------
+ Initial version
Utils module for Python tasks
First function included: TSrebin for re-binning TimeSeries
"""
name = 'ModuleUtil'
#print('Starting module: ', name)
#import sys
#import warnings
#
#import numpy as np
#from astropy.table import Table
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def TSrebin(tstable, DTin, DTout):
"""
Re-binning function
Input: TimeSeries Table, Input Time bin and Output Time Bin
#*****************************************************************************
#
# WARNING: DTin should be previously read from the FITS Keyword: 'TIMEDEL'
#
#*****************************************************************************
"""
import sys
import warnings
import numpy as np
from astropy.table import Table
#
# Check input Table Columns
#
colNames = tstable.keys() # Input ColumnNames
#colNames = ['TIME','RATE','ERROR','FRACEX']
mandatoryColumns = ['TIME','RATE','ERROR','FRACEXP'] # Mandatory ColumnNames
diff = set(mandatoryColumns) - set(colNames)
fracexp_chk = 0
if diff == {'FRACEXP'}:
#print('FRACEXP is missing. fracexp = 1')
fracexp_chk = 1
elif len(diff) != 0:
print(' Missing Columns in input Table : ', diff, '\n Exiting.')
sys.exit(1)
#
# Re-Binning of th Table 'tstable'
#
tstable.sort('TIME')
ntime=tstable['TIME']
nrate=np.nan_to_num(tstable['RATE'])
nerror=np.nan_to_num(tstable['ERROR'])
if fracexp_chk == 1:
### fracexp = 1
fracexp=np.linspace(1, 1, len(ntime))
else:
fracexp=np.nan_to_num(tstable['FRACEXP'])
tcum=np.nancumsum(tstable['TIME'])
rcum=np.nancumsum(nrate*fracexp) # Errors propagation are weighted by the FRACEXP from the TS
ecum=np.nancumsum(nerror**2*fracexp)
fcum=np.nancumsum(fracexp)
nin=len(rcum) # Number of In points
t_bin_frac = int(np.round( DTout / DTin ))
if t_bin_frac <= 1: # No re-binning
ntstable = tstable
new_time_bin = time_bin
print(' No re-binning needed for this Light Curve. Input / Output time bins ratio : ', t_bin_frac)
else:
nout=int(nin/t_bin_frac) # Number of Out points. t_bin_frac == DTout / DTin ('TIMEDEL'). Re-binning factor
#print(' Input / Output time bins / ratio : ', DTin, 'sec', ' / ',DTout, 'sec', ' / ', t_bin_frac)
#print(' Input / Output dots number : ', nin, ' / ',nout)
i1 = np.arange(nout)*t_bin_frac
i2 = np.concatenate( ( i1[1:]-1 , np.array([nin-1]) ) )
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning) #
if fracexp_chk == 0: # TS has FRACEXP column => rate and error are weighted by FRACEXP
nrate = ( (rcum[i2]-rcum[i1]) / (fcum[i2]-fcum[i1]) )
nerror = (np.sqrt(np.nan_to_num(ecum[i2]-ecum[i1]))/(fcum[i2]-fcum[i1]))
ntime=(tstable['TIME'][i2]+tstable['TIME'][i1])/2.
nfrac=(fcum[i2]-fcum[i1])/(i2-i1)
ntstable = Table([ntime, nrate, nerror, nfrac],\
names=['TIME','RATE','ERROR','FRACEXP'])
else: # TS has NOT FRACEXP column => fracexp = 1
nrate = ( (rcum[i2]-rcum[i1]) / (fcum[i2]-fcum[i1]) )
nerror = (np.sqrt(np.nan_to_num(ecum[i2]-ecum[i1]))/(fcum[i2]-fcum[i1]))
ntime=(tstable['TIME'][i2]+tstable['TIME'][i1])/2.
ntstable = Table([ntime, nrate, nerror],\
names=['TIME','RATE','ERROR'])
return ntstable
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def convertImage(img):
"""
Convert an EPIC 3Color image ( RGB 3Colour image ) in a normalized image in Log scale.
Ready to be displayed from Python with correct cuts
Input : RGB 3Colour image
Output : Python dictionary = { 'image': normalized image, 'limits': { 'xmin': xmin, 'xmax': xmax, 'ymin': ymin, 'ymax': ymax}}
"""
import warnings
import numpy as np
convertImageOut = {}
#print('----- DEBUG ----------------')
#print('Orig image shape : ', img.shape, len(img.shape))
if len(img.shape) == 3:
ima2 = np.transpose(img, [1,2,0]) # Transpose the 3-D image ( RGB 3Colour image )
else:
ima2 = img
#print('Transpose image shape : ', ima2.shape)
with warnings.catch_warnings(): # Logarithmic scape
warnings.simplefilter("ignore", category=RuntimeWarning)
ima3 = (np.log10(ima2)+5.)/(5.-2.52287875)
# Image size (pixels)
# if len(img.shape) == 3:
# ii = np.nansum(ima2, axis=2).nonzero() # Pixels index. X, Y min and max
# xmin = ii[0].min()
# xmax = ii[0].max()
# ymin = ii[1].min()
# ymax = ii[1].max()
# else:
# xii = np.nansum(ima2, axis=0).nonzero() # Pixels index. X, Y min and max
# xmin = xii[0].min()
# xmax = xii[0].max()
# yii = np.nansum(ima2, axis=1).nonzero() # Pixels index. X, Y min and max
# ymin = yii[0].min()
# ymax = yii[0].max()
# Image size (pixels)
ii = np.nansum(ima2, axis=2).nonzero() # Pixels index. X, Y min and max
xmin = ii[0].min()
xmax = ii[0].max()
ymin = ii[1].min()
ymax = ii[1].max()
# jj = list(map(tuple,np.where(np.isnan( ima3 ))))
# if len(jj[0]) > 0:
# ima3[jj]=0.0 # ; 1.0 for white 0 for black
ima3 = np.nan_to_num(ima3)
ima3[ ima3 > 1. ] = 1. # All values > 1 are set to 1 ( ~ normalization )
ima3[ ima3 < 0. ] = 0
#ima4 = ima3
convertImageOut['image'] = ima3
convertImageOut['limits'] = { 'xmin': xmin, 'xmax': xmax, 'ymin': ymin, 'ymax': ymax }
return convertImageOut
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# ===== Variable checks =====
#
class varCheck():
'''Variable check definitions'''
def __init__(self, client=None, inVar=None):
self.inVar = inVar
self.client = client
#Error.__init__(self, client=client, msgLayer=None, msgLevel=None, code=None, msg=None, outMessage=None)
# Boolean check
yes = ('y', 'yes', 'Y', 'YES', 'Yes')
no = ('n', 'no', 'N', 'NO', 'No')
def checkBool(self):
if self.inVar in varCheck.yes:
return True
elif self.inVar in varCheck.no:
return False
else:
#errorEvent = Error( client=self.client, code='wrongVarType', msg='Option value: ' + self.inVar + ' has to be Y/N/Yes/No. Exiting.' )
#errorEvent.error()
print('Error. Value has to be Y/N/Yes/No. Exiting.')
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
# ===== Variable checks =====
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# ===== Compress and Decompress file functions =====
#
def testGzip(file):
import gzip
chk = False
with gzip.open(file, 'r') as f:
try:
f.read(1)
chk = True
except OSError:
print(f'File {file} is not gziped. Probably already decompressed.')
return chk
def compress(f, remove=False):
"""
Compress (gzip) file function
Optional: remove the original file after compressing
"""
import os, gzip
file = f
cf = f.replace('FIT','FTZ')
fp = open(f, "rb")
data = fp.read()
bindata = bytearray(data)
with gzip.open(cf, "wb") as f:
f.write(bindata)
fp.close()
if remove:
os.remove(file)
def decompress(inFile, outDir, remove=False):
"""
Decompress input inFile in outDir directory
Optional: remove the original file after decompressing
"""
import os, gzip
inFileext = inFile.partition('.')[-1]
if inFileext == 'FTZ':
newExt = 'FIT'
elif inFileext == 'ftz':
newExt = 'fits'
else:
newExt = inFileext + '.decomp'
file = outDir + os.path.basename(inFile).replace(inFileext, newExt)
fp = open(file, "wb")
with gzip.open(inFile, "rb") as f:
bindata = f.read()
fp.write(bindata)
fp.close()
if remove:
os.remove(inFile)
return file
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++