Source code for hendrics.calibrate

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Calibrate event lists by looking in rmf files."""

import os
import warnings
import numpy as np
from astropy import log

from .base import get_file_extension
from .io import load_events, save_events
from .io import HEN_FILE_EXTENSION

[docs] def default_nustar_rmf(): """Look for the default rmf file in the CALDB. The CALDB environment variable has to point to the correct location of the NuSTAR CALDB .. note:: The calibration might change in the future. The hardcoded file name will be eventually replaced with a smarter choice based on observing time """ warnings.warn("Rmf not specified. Using default NuSTAR rmf.") rmf = "data/nustar/fpm/cpf/rmf/nuAdet3_20100101v002.rmf" path = rmf.split("/") newpath = os.path.join(os.environ["CALDB"], *path) return newpath
[docs] def read_rmf(rmf_file=None): """Load RMF info. .. note:: Preliminary: only EBOUNDS are read. Parameters ---------- rmf_file : str The rmf file used to read the calibration. If None or not specified, the one given by default_nustar_rmf() is used. Returns ------- pis : array-like the PI channels e_mins : array-like the lower energy bound of each PI channel e_maxs : array-like the upper energy bound of each PI channel """ from import fits as pf if rmf_file is None or rmf_file == "": rmf_file = default_nustar_rmf() lchdulist =, checksum=True) lchdulist.verify("warn") lctable = lchdulist["EBOUNDS"].data pis = np.array(lctable.field("CHANNEL")) e_mins = np.array(lctable.field("E_MIN")) e_maxs = np.array(lctable.field("E_MAX")) lchdulist.close() return pis, e_mins, e_maxs
[docs] def read_calibration(pis, rmf_file=None): """Read the energy channels corresponding to the given PI channels. Parameters ---------- pis : array-like The channels to lookup in the rmf Other Parameters ---------------- rmf_file : str The rmf file used to read the calibration. If None or not specified, the one given by default_nustar_rmf() is used. """ calp, calEmin, calEmax = read_rmf(rmf_file) es = np.zeros(len(pis), dtype=float) for ic, c in enumerate(calp): good = pis == c if not np.any(good): continue es[good] = (calEmin[ic] + calEmax[ic]) / 2 return es
[docs] def rough_calibration(pis, mission): """ Parameters ---------- pis: float or array of floats PI channels in data mission: str Mission name Returns ------- energies : float or array of floats Energy values Examples -------- >>> assert rough_calibration(0, 'nustar') == 1.6 >>> assert rough_calibration(0.0, 'ixpe') == 0.0 >>> # It's case-insensitive >>> assert rough_calibration(1200, 'XMm') == 1.2 >>> rough_calibration(10, 'asDf') Traceback (most recent call last): ... ValueError: Mission asdf not recognized >>> assert rough_calibration(100, 'nicer') == 1.0 """ if mission.lower() == "nustar": return pis * 0.04 + 1.6 elif mission.lower() == "xmm": return pis * 0.001 elif mission.lower() == "nicer": return pis * 0.01 elif mission.lower() == "ixpe": return pis / 375 * 15 raise ValueError(f"Mission {mission.lower()} not recognized")
[docs] def calibrate(fname, outname, rmf_file=None, rough=False): """Do calibration of an event list. Parameters ---------- fname : str The HENDRICS file containing the events outname : str The output file Other Parameters ---------------- rmf_file : str The rmf file used to read the calibration. If None or not specified, the one given by default_nustar_rmf() is used. """ # Read event file"Loading file %s..." % fname) evdata = load_events(fname)"Done.") pis = evdata.pi if rough: cal_pis = evdata.pi if hasattr(evdata, "cal_pi") and evdata.cal_pi is not None: cal_pis = evdata.cal_pi es = rough_calibration(cal_pis, evdata.mission) else: if evdata.mission.lower() == "xmm": raise RuntimeError( "Calibration for XMM should work out-of-the box in " "HENreadevents. Running HENcalibrate with the --rmf option is" " known to produce wrong results in XMM" ) es = read_calibration(pis, rmf_file) = es"Saving calibrated data to %s" % outname) save_events(evdata, outname)
def _calib_wrap(args): f, outname, rmf, rough = args return calibrate(f, outname, rmf, rough)
[docs] def main(args=None): """Main function called by the `HENcalibrate` command line script.""" import argparse from multiprocessing import Pool from .base import _add_default_args description = ( "Calibrate clean event files by associating the correct " "energy to each PI channel. Uses either a specified rmf " "file or (for NuSTAR only) an rmf file from the CALDB" ) parser = argparse.ArgumentParser(description=description) parser.add_argument("files", help="List of files", nargs="+") parser.add_argument( "-r", "--rmf", help="rmf file used for calibration. Not working with XMM data", default=None, type=str, ) parser.add_argument( "--rough", help="Rough calibration, without rmf file " "(only for NuSTAR, XMM, and NICER). Only for compatibility purposes. " "This is done automatically by HENreadevents", default=False, action="store_true", ) parser.add_argument( "-o", "--overwrite", help="Overwrite; default: no", default=False, action="store_true", ) _add_default_args(parser, ["nproc", "loglevel", "debug"]) args = parser.parse_args(args) files = args.files if args.debug: args.loglevel = "DEBUG" log.setLevel(args.loglevel) with log.log_to_file("HENcalibrate.log"): funcargs = [] for i_f, f in enumerate(files): outname = f if args.overwrite is False: label = "_calib" if args.rough: label = "_rough_calib" outname = f.replace(get_file_extension(f), label + HEN_FILE_EXTENSION) funcargs.append([f, outname, args.rmf, args.rough]) if == "nt" or args.nproc == 1: [_calib_wrap(fa) for fa in funcargs] else: pool = Pool(processes=args.nproc) for i in pool.imap_unordered(_calib_wrap, funcargs): pass pool.close()