# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Light curve-related functions."""
import os
import warnings
import copy
from astropy import log
import numpy as np
from astropy.logger import AstropyUserWarning
from stingray.lightcurve import Lightcurve
from stingray.utils import assign_value_if_none
from stingray.gti import create_gti_mask, cross_gtis, contiguous_regions
from .base import (
_look_for_array_in_array,
hen_root,
mkdir_p,
interpret_bintime,
histogram,
)
from .io import load_events, save_lcurve, load_lcurve
from .io import HEN_FILE_EXTENSION, high_precision_keyword_read, get_file_type
from .base import deorbit_events, splitext_improved
[docs]
def join_lightcurve_objs(lclist):
"""Join light curves.
Light curves from different instruments are put in different channels.
Light curves from the same time interval and instrument raise
a ValueError.
Parameters
----------
lclist : list of :class:`Lightcurve` objects
The list of light curves to join
Returns
-------
lcoutlist : joint light curves, one per instrument
See Also
--------
scrunch_lightcurves : Create a single light curve from input light
curves.
Examples
--------
>>> lcA = Lightcurve(np.arange(4), np.zeros(4))
>>> lcA.instr='BU' # Test also case sensitivity
>>> lcB = Lightcurve(np.arange(4) + 4, [1, 3, 4, 5])
>>> lcB.instr='bu'
>>> lcC = join_lightcurve_objs((lcA, lcB))
>>> assert np.all(lcC['bu'].time == np.arange(8))
>>> assert np.all(lcC['bu'].counts == [0, 0, 0, 0, 1, 3, 4, 5])
"""
# --------------- Check consistency of data --------------
lcdts = [lcdata.dt for lcdata in lclist]
# Find unique elements. If multiple bin times are used, throw an exception
lcdts = list(set(lcdts))
assert len(lcdts) == 1, "Light curves must have same dt for joining"
instrs = [
lcdata.instr.lower()
for lcdata in lclist
if (hasattr(lcdata, "instr") and lcdata.instr is not None)
]
# Find unique elements. A lightcurve will be produced for each instrument
instrs = list(set(instrs))
if instrs == []:
instrs = ["unknown"]
outlcs = {}
for instr in instrs:
outlcs[instr.lower()] = None
# -------------------------------------------------------
for lcdata in lclist:
instr = assign_value_if_none(lcdata.instr, "unknown").lower()
if outlcs[instr] is None:
outlcs[instr] = lcdata
else:
outlcs[instr] = outlcs[instr].join(lcdata)
return outlcs
[docs]
def join_lightcurves(lcfilelist, outfile="out_lc" + HEN_FILE_EXTENSION):
"""Join light curves from different files.
Light curves from different instruments are put in different channels.
Parameters
----------
lcfilelist : list of str
List of input file names
outfile :
Output light curve
See Also
--------
scrunch_lightcurves : Create a single light curve from input light
curves.
"""
lcdatas = []
for lfc in lcfilelist:
log.info("Loading file %s..." % lfc)
lcdata = load_lcurve(lfc)
log.info("Done.")
lcdatas.append(lcdata)
del lcdata
outlcs = join_lightcurve_objs(lcdatas)
if outfile is not None:
instrs = list(outlcs.keys())
for instr in instrs:
if len(instrs) == 1:
tag = ""
else:
tag = instr
log.info("Saving joined light curve to %s" % outfile)
dname, fname = os.path.split(outfile)
save_lcurve(outlcs[instr], os.path.join(dname, tag + fname))
return outlcs
[docs]
def scrunch_lightcurve_objs(lclist):
"""Create a single light curve from input light curves.
Light curves are appended when they cover different times, and summed when
they fall in the same time range. This is done regardless of the channel
or the instrument.
Parameters
----------
lcfilelist : list of :class:`stingray.lightcurve.Lightcurve` objects
The list of light curves to scrunch
Returns
-------
lc : scrunched light curve
See Also
--------
join_lightcurves : Join light curves from different files
Examples
--------
>>> lcA = Lightcurve(np.arange(4), np.ones(4))
>>> lcA.instr='bu1'
>>> lcB = Lightcurve(np.arange(4), [1, 3, 4, 5])
>>> lcB.instr='bu2'
>>> lcC = scrunch_lightcurve_objs((lcA, lcB))
>>> assert np.all(lcC.time == np.arange(4))
>>> assert np.all(lcC.counts == [2, 4, 5, 6])
>>> assert np.all(lcC.instr == 'bu1,bu2')
"""
instrs = [lc.instr for lc in lclist]
gti_lists = [lc.gti for lc in lclist]
gti = cross_gtis(gti_lists)
for lc in lclist:
lc.gti = gti
lc.apply_gtis()
# Determine limits
lc0 = lclist[0]
for lc in lclist[1:]:
lc0 = lc0 + lc
lc0.instr = ",".join(instrs)
return lc0
[docs]
def scrunch_lightcurves(
lcfilelist, outfile="out_scrlc" + HEN_FILE_EXTENSION, save_joint=False
):
"""Create a single light curve from input light curves.
Light curves are appended when they cover different times, and summed when
they fall in the same time range. This is done regardless of the channel
or the instrument.
Parameters
----------
lcfilelist : list of str
The list of light curve files to scrunch
Returns
-------
time : array-like
The time array
lc :
The new light curve
gti : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
Good Time Intervals
Other Parameters
----------------
outfile : str
The output file name
save_joint : bool
If True, save the per-channel joint light curves
See Also
--------
join_lightcurves : Join light curves from different files
"""
if save_joint:
lcdata = join_lightcurves(lcfilelist)
else:
lcdata = join_lightcurves(lcfilelist, outfile=None)
lc0 = scrunch_lightcurve_objs(list(lcdata.values()))
log.info("Saving scrunched light curve to %s" % outfile)
save_lcurve(lc0, outfile)
return lc0
[docs]
def filter_lc_gtis(
lc, safe_interval=None, delete=False, min_length=0, return_borders=False
):
"""Filter a light curve for GTIs.
Parameters
----------
lc : :class:`Lightcurve` object
The input light curve
Returns
-------
newlc : :class:`Lightcurve` object
The output light curve
borders : [[i0_0, i0_1], [i1_0, i1_1], ...], optional
The indexes of the light curve corresponding to the borders of the
GTIs. Returned if return_borders is set to True
Other Parameters
----------------
safe_interval : float or [float, float]
Seconds to filter out at the start and end of each GTI. If single
float, these safe windows are equal, otherwise the two numbers refer
to the start and end of the GTI respectively
delete : bool
If delete is True, the intervals outside of GTIs are filtered out from
the light curve. Otherwise, they are set to zero.
min_length : float
Minimum length of GTI. GTIs below this length will be removed.
return_borders : bool
If True, return also the indexes of the light curve corresponding to
the borders of the GTIs
"""
mask, newgti = create_gti_mask(
lc.time,
lc.gti,
return_new_gtis=True,
safe_interval=safe_interval,
min_length=min_length,
)
nomask = np.logical_not(mask)
newlc = copy.copy(lc)
newlc.counts[nomask] = 0
newlc.gti = newgti
if return_borders:
mask = create_gti_mask(lc.time, newgti)
borders = contiguous_regions(mask)
return newlc, borders
else:
return newlc
[docs]
def lcurve_from_events(
f,
safe_interval=0,
pi_interval=None,
e_interval=None,
min_length=0,
gti_split=False,
ignore_gtis=False,
bintime=1.0,
outdir=None,
outfile=None,
noclobber=False,
deorbit_par=None,
weight_on=None,
):
"""Bin an event list in a light curve.
Parameters
----------
f : str
Input event file name
bintime : float
The bin time of the output light curve
Returns
-------
outfiles : list
List of output light curves
Other Parameters
----------------
safe_interval : float or [float, float]
Seconds to filter out at the start and end of each GTI. If single
float, these safe windows are equal, otherwise the two numbers refer
to the start and end of the GTI respectively
pi_interval : [int, int]
PI channel interval to select. Default None, meaning that all PI
channels are used
e_interval : [float, float]
Energy interval to select (only works if event list is calibrated with
`calibrate`). Default None
min_length : float
GTIs below this length will be filtered out
gti_split : bool
If True, create one light curve for each good time interval
ignore_gtis : bool
Ignore good time intervals, and get a single light curve that includes
possible gaps
outdir : str
Output directory
outfile : str
Output file
noclobber : bool
If True, do not overwrite existing files
"""
log.info("Loading file %s..." % f)
evdata = load_events(f)
log.info("Done.")
weight_on_tag = ""
weights = None
if weight_on is not None:
weights = getattr(evdata, weight_on)
weight_on_tag = f"_weight{weight_on}"
deorbit_tag = ""
if deorbit_par is not None:
evdata = deorbit_events(evdata, deorbit_par)
deorbit_tag = "_deorb"
bintime = interpret_bintime(bintime)
tag = ""
gti = evdata.gti
tstart = np.min(gti)
tstop = np.max(gti)
events = evdata.time
if hasattr(evdata, "instr") and evdata.instr is not None:
instr = evdata.instr
else:
instr = "unknown"
if ignore_gtis:
gti = np.array([[tstart, tstop]])
evdata.gti = gti
total_lc = evdata.to_lc(100)
total_lc.instr = instr
# Then, apply filters
if pi_interval is not None and np.all(np.array(pi_interval) > 0):
pis = evdata.pi
good = np.logical_and(pis > pi_interval[0], pis <= pi_interval[1])
events = events[good]
tag = "_PI%g-%g" % (pi_interval[0], pi_interval[1])
elif e_interval is not None and np.all(np.array(e_interval) > 0):
if not hasattr(evdata, "energy") or evdata.energy is None:
raise ValueError(
"No energy information is present in the file."
+ " Did you run HENcalibrate?"
)
es = evdata.energy
good = np.logical_and(es > e_interval[0], es <= e_interval[1])
events = events[good]
tag = "_E%g-%g" % (e_interval[0], e_interval[1])
else:
pass
if tag != "":
save_lcurve(
total_lc,
hen_root(f) + "_std_lc" + deorbit_tag + HEN_FILE_EXTENSION,
)
# Assign default value if None
outfile = assign_value_if_none(
outfile, hen_root(f) + tag + deorbit_tag + weight_on_tag + "_lc"
)
# Take out extension from name, if present, then give extension. This
# avoids multiple extensions
outfile = outfile.replace(HEN_FILE_EXTENSION, "") + HEN_FILE_EXTENSION
outdir = assign_value_if_none(outdir, os.path.dirname(os.path.abspath(f)))
_, outfile = os.path.split(outfile)
mkdir_p(outdir)
outfile = os.path.join(outdir, outfile)
if noclobber and os.path.exists(outfile):
warnings.warn(
"File exists, and noclobber option used. Skipping",
AstropyUserWarning,
)
return [outfile]
n_times = int((tstop - tstart) / bintime)
ev_times = (events - tstart).astype(float)
time_ranges = [0, float(n_times * bintime)]
time_edges = np.linspace(time_ranges[0], time_ranges[1], n_times + 1).astype(float)
raw_counts = histogram(
ev_times,
range=time_ranges,
bins=n_times,
)
if weights is not None:
weighted = histogram(
ev_times,
range=time_ranges,
bins=n_times,
weights=weights.astype(float),
)
counts = weighted
counts_err = raw_counts * np.std(weights)
err_dist = "gauss"
else:
counts = raw_counts
err_dist = "poisson"
counts_err = None
times = (time_edges[1:] + time_edges[:-1]) / 2 + tstart
lc = Lightcurve(
times,
counts,
err=counts_err,
gti=gti,
mjdref=evdata.mjdref,
dt=bintime,
err_dist=err_dist,
)
lc.instr = instr
lc.e_interval = e_interval
lc = filter_lc_gtis(
lc, safe_interval=safe_interval, delete=False, min_length=min_length
)
if len(lc.gti) == 0:
warnings.warn("No GTIs above min_length ({0}s) found.".format(min_length))
return
lc.header = None
if hasattr(evdata, "header"):
lc.header = evdata.header
if gti_split:
lcs = lc.split_by_gti()
outfiles = []
for ib, l0 in enumerate(lcs):
local_tag = tag + "_gti{:03d}".format(ib)
outf = hen_root(outfile) + local_tag + "_lc" + HEN_FILE_EXTENSION
if noclobber and os.path.exists(outf):
warnings.warn("File exists, and noclobber option used. Skipping")
outfiles.append(outf)
l0.instr = lc.instr
l0.header = lc.header
save_lcurve(l0, outf)
outfiles.append(outf)
else:
log.info("Saving light curve to %s" % outfile)
save_lcurve(lc, outfile)
outfiles = [outfile]
# For consistency in return value
return outfiles
[docs]
def lcurve_from_fits(
fits_file,
gtistring="GTI",
timecolumn="TIME",
ratecolumn=None,
ratehdu=1,
fracexp_limit=0.9,
outfile=None,
noclobber=False,
outdir=None,
):
"""
Load a lightcurve from a fits file and save it in HENDRICS format.
.. note ::
FITS light curve handling is still under testing.
Absolute times might be incorrect depending on the light curve format.
Parameters
----------
fits_file : str
File name of the input light curve in FITS format
Returns
-------
outfile : [str]
Returned as a list with a single element for consistency with
`lcurve_from_events`
Other Parameters
----------------
gtistring : str
Name of the GTI extension in the FITS file
timecolumn : str
Name of the column containing times in the FITS file
ratecolumn : str
Name of the column containing rates in the FITS file
ratehdu : str or int
Name or index of the FITS extension containing the light curve
fracexp_limit : float
Minimum exposure fraction allowed
outfile : str
Output file name
noclobber : bool
If True, do not overwrite existing files
"""
warnings.warn(
"""WARNING! FITS light curve handling is still under testing.
Absolute times might be incorrect."""
)
# TODO:
# treat consistently TDB, UTC, TAI, etc. This requires some documentation
# reading. For now, we assume TDB
from astropy.io import fits as pf
from astropy.time import Time
import numpy as np
from stingray.gti import create_gti_from_condition
outfile = assign_value_if_none(outfile, hen_root(fits_file) + "_lc")
outfile = outfile.replace(HEN_FILE_EXTENSION, "") + HEN_FILE_EXTENSION
outdir = assign_value_if_none(outdir, os.path.dirname(os.path.abspath(fits_file)))
_, outfile = os.path.split(outfile)
mkdir_p(outdir)
outfile = os.path.join(outdir, outfile)
if noclobber and os.path.exists(outfile):
warnings.warn(
"File exists, and noclobber option used. Skipping",
AstropyUserWarning,
)
return [outfile]
lchdulist = pf.open(fits_file)
lctable = lchdulist[ratehdu].data
# Units of header keywords
tunit = lchdulist[ratehdu].header["TIMEUNIT"]
try:
mjdref = high_precision_keyword_read(lchdulist[ratehdu].header, "MJDREF")
mjdref = Time(mjdref, scale="tdb", format="mjd")
except Exception:
mjdref = None
try:
instr = lchdulist[ratehdu].header["INSTRUME"]
except Exception:
instr = "EXTERN"
# ----------------------------------------------------------------
# Trying to comply with all different formats of fits light curves.
# It's a madness...
try:
tstart = high_precision_keyword_read(lchdulist[ratehdu].header, "TSTART")
tstop = high_precision_keyword_read(lchdulist[ratehdu].header, "TSTOP")
except Exception:
raise (Exception("TSTART and TSTOP need to be specified"))
# For nulccorr lcs this whould work
timezero = high_precision_keyword_read(lchdulist[ratehdu].header, "TIMEZERO")
# Sometimes timezero is "from tstart", sometimes it's an absolute time.
# This tries to detect which case is this, and always consider it
# referred to tstart
timezero = assign_value_if_none(timezero, 0)
# for lcurve light curves this should instead work
if tunit == "d":
# TODO:
# Check this. For now, I assume TD (JD - 2440000.5).
# This is likely wrong
timezero = Time(2440000.5 + timezero, scale="tdb", format="jd")
tstart = Time(2440000.5 + tstart, scale="tdb", format="jd")
tstop = Time(2440000.5 + tstop, scale="tdb", format="jd")
# if None, use NuSTAR defaulf MJDREF
mjdref = assign_value_if_none(
mjdref,
Time(np.longdouble("55197.00076601852"), scale="tdb", format="mjd"),
)
timezero = (timezero - mjdref).to("s").value
tstart = (tstart - mjdref).to("s").value
tstop = (tstop - mjdref).to("s").value
if timezero > tstart:
timezero -= tstart
time = np.array(lctable.field(timecolumn), dtype=np.longdouble)
if time[-1] < tstart:
time += timezero + tstart
else:
time += timezero
try:
dt = high_precision_keyword_read(lchdulist[ratehdu].header, "TIMEDEL")
if tunit == "d":
dt *= 86400
except Exception:
warnings.warn(
"Assuming that TIMEDEL is the median difference between the"
" light curve times",
AstropyUserWarning,
)
dt = np.median(np.diff(time))
# ----------------------------------------------------------------
ratecolumn = assign_value_if_none(
ratecolumn,
_look_for_array_in_array(["RATE", "RATE1", "COUNTS"], lctable.names),
)
rate = np.array(lctable.field(ratecolumn), dtype=float)
try:
rate_e = np.array(lctable.field("ERROR"), dtype=np.longdouble)
except Exception:
rate_e = np.zeros_like(rate)
if "RATE" in ratecolumn:
rate *= dt
rate_e *= dt
try:
fracexp = np.array(lctable.field("FRACEXP"), dtype=np.longdouble)
except Exception:
fracexp = np.ones_like(rate)
good_intervals = (rate == rate) * (fracexp >= fracexp_limit) * (fracexp <= 1)
rate[good_intervals] /= fracexp[good_intervals]
rate_e[good_intervals] /= fracexp[good_intervals]
rate[np.logical_not(good_intervals)] = 0
try:
gtitable = lchdulist[gtistring].data
gti_list = np.array(
[[a, b] for a, b in zip(gtitable.field("START"), gtitable.field("STOP"))],
dtype=np.longdouble,
)
except Exception:
gti_list = create_gti_from_condition(time, good_intervals)
lchdulist.close()
lc = Lightcurve(
time=time,
counts=rate,
err=rate_e,
gti=gti_list,
mjdref=mjdref.mjd,
dt=dt,
)
lc.instr = instr
lc.header = lchdulist[ratehdu].header.tostring()
log.info("Saving light curve to %s" % outfile)
save_lcurve(lc, outfile)
return [outfile]
[docs]
def lcurve_from_txt(
txt_file,
outfile=None,
noclobber=False,
outdir=None,
mjdref=None,
gti=None,
):
"""
Load a lightcurve from a text file.
Parameters
----------
txt_file : str
File name of the input light curve in text format. Assumes two columns:
time, counts. Times are seconds from MJDREF 55197.00076601852 (NuSTAR)
if not otherwise specified.
Returns
-------
outfile : [str]
Returned as a list with a single element for consistency with
`lcurve_from_events`
Other Parameters
----------------
outfile : str
Output file name
noclobber : bool
If True, do not overwrite existing files
mjdref : float, default 55197.00076601852
the MJD time reference
gti : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
Good Time Intervals
"""
import numpy as np
if mjdref is None:
mjdref = np.longdouble("55197.00076601852")
outfile = assign_value_if_none(outfile, hen_root(txt_file) + "_lc")
outfile = outfile.replace(HEN_FILE_EXTENSION, "") + HEN_FILE_EXTENSION
outdir = assign_value_if_none(outdir, os.path.dirname(os.path.abspath(txt_file)))
_, outfile = os.path.split(outfile)
mkdir_p(outdir)
outfile = os.path.join(outdir, outfile)
if noclobber and os.path.exists(outfile):
warnings.warn(
"File exists, and noclobber option used. Skipping",
AstropyUserWarning,
)
return [outfile]
time, counts = np.genfromtxt(txt_file, delimiter=" ", unpack=True)
time = np.array(time, dtype=np.longdouble)
counts = np.array(counts, dtype=float)
lc = Lightcurve(time=time, counts=counts, gti=gti, mjdref=mjdref)
lc.instr = "EXTERN"
log.info("Saving light curve to %s" % outfile)
save_lcurve(lc, outfile)
return [outfile]
def _baseline_lightcurves(lcurves, outroot, p, lam):
outroot_save = outroot
for i, f in enumerate(lcurves):
if outroot is None:
outroot = hen_root(f) + "_lc_baseline"
else:
outroot = outroot_save + "_{}".format(i)
ftype, lc = get_file_type(f)
baseline = lc.baseline(p, lam)
lc.base = baseline
save_lcurve(lc, outroot + HEN_FILE_EXTENSION)
def _wrap_lc(args):
f, kwargs = args
try:
return lcurve_from_events(f, **kwargs)
except Exception as e:
warnings.warn("HENlcurve exception: {0}".format(str(e)))
raise
def _wrap_txt(args):
f, kwargs = args
try:
return lcurve_from_txt(f, **kwargs)
except Exception as e:
warnings.warn("HENlcurve exception: {0}".format(str(e)))
return []
def _wrap_fits(args):
f, kwargs = args
try:
return lcurve_from_fits(f, **kwargs)
except Exception as e:
warnings.warn("HENlcurve exception: {0}".format(str(e)))
return []
def _execute_lcurve(args):
from multiprocessing import Pool
bintime = args.bintime
safe_interval = args.safe_interval
e_interval, pi_interval = args.energy_interval, args.pi_interval
if args.pi_interval is not None:
pi_interval = np.array(args.pi_interval)
if e_interval is not None:
args.e_interval = np.array(args.energy_interval)
# ------ Use functools.partial to wrap lcurve* with relevant keywords---
if args.fits_input:
wrap_fun = _wrap_fits
argdict = {"noclobber": args.noclobber}
elif args.txt_input:
wrap_fun = _wrap_txt
argdict = {"noclobber": args.noclobber}
else:
wrap_fun = _wrap_lc
argdict = {
"noclobber": args.noclobber,
"safe_interval": safe_interval,
"pi_interval": pi_interval,
"e_interval": e_interval,
"min_length": args.minlen,
"gti_split": args.gti_split,
"ignore_gtis": args.ignore_gtis,
"bintime": bintime,
"outdir": args.outdir,
"deorbit_par": args.deorbit_par,
"weight_on": args.weight_on,
}
arglist = [[f, argdict.copy()] for f in args.files]
na = len(arglist)
outfile = args.outfile
if outfile is not None:
outname, ext = splitext_improved(outfile)
for i in range(na):
if na > 1:
outname = outfile + "_{0}".format(i)
arglist[i][1]["outfile"] = outname
# -------------------------------------------------------------------------
outfiles = []
if os.name == "nt" or args.nproc == 1:
for a in arglist:
outfiles.append(wrap_fun(a))
else:
pool = Pool(processes=args.nproc)
for i in pool.imap_unordered(wrap_fun, arglist):
outfiles.append(i)
pool.close()
log.debug(f"{outfiles}")
if args.scrunch:
scrunch_lightcurves(outfiles)
if args.join:
join_lightcurves(outfiles)
[docs]
def main(args=None):
"""Main function called by the `HENlcurve` command line script."""
import argparse
from .base import _add_default_args, check_negative_numbers_in_args
description = (
"Create lightcurves starting from event files. It is "
"possible to specify energy or channel filtering options"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument("files", help="List of files", nargs="+")
parser.add_argument(
"-b",
"--bintime",
type=float,
default=1,
help="Bin time; if negative, negative power of 2",
)
parser.add_argument(
"--safe-interval",
nargs=2,
type=float,
default=[0, 0],
help="Interval at start and stop of GTIs used" + " for filtering",
)
parser = _add_default_args(parser, ["energies", "pi"])
parser.add_argument(
"-s",
"--scrunch",
help="Create scrunched light curve (single channel)",
default=False,
action="store_true",
)
parser.add_argument(
"-j",
"--join",
help="Create joint light curve (multiple channels)",
default=False,
action="store_true",
)
parser.add_argument(
"-g",
"--gti-split",
help="Split light curve by GTI",
default=False,
action="store_true",
)
parser.add_argument(
"--minlen",
help="Minimum length of acceptable GTIs (default:4)",
default=4,
type=float,
)
parser.add_argument(
"--ignore-gtis",
help="Ignore GTIs",
default=False,
action="store_true",
)
parser.add_argument(
"-d", "--outdir", type=str, default=None, help="Output directory"
)
parser.add_argument(
"--noclobber",
help="Do not overwrite existing files",
default=False,
action="store_true",
)
parser.add_argument(
"--fits-input",
help="Input files are light curves in FITS format",
default=False,
action="store_true",
)
parser.add_argument(
"--txt-input",
help="Input files are light curves in txt format",
default=False,
action="store_true",
)
parser.add_argument(
"--weight-on",
default=None,
type=str,
help="Use a given attribute of the event list as weights for the light curve",
)
parser = _add_default_args(
parser, ["deorbit", "output", "loglevel", "debug", "nproc"]
)
args = check_negative_numbers_in_args(args)
args = parser.parse_args(args)
if args.debug:
args.loglevel = "DEBUG"
log.setLevel(args.loglevel)
with log.log_to_file("HENlcurve.log"):
_execute_lcurve(args)
[docs]
def scrunch_main(args=None):
"""Main function called by the `HENscrunchlc` command line script."""
import argparse
description = "Sum lightcurves from different instruments or energy ranges"
parser = argparse.ArgumentParser(description=description)
parser.add_argument("files", help="List of files", nargs="+")
parser.add_argument(
"-o",
"--out",
type=str,
default="out_scrlc" + HEN_FILE_EXTENSION,
help="Output file",
)
parser.add_argument(
"--loglevel",
help=(
"use given logging level (one between INFO, "
"WARNING, ERROR, CRITICAL, DEBUG; "
"default:WARNING)"
),
default="WARNING",
type=str,
)
parser.add_argument(
"--debug",
help="use DEBUG logging level",
default=False,
action="store_true",
)
args = parser.parse_args(args)
files = args.files
if args.debug:
args.loglevel = "DEBUG"
log.setLevel(args.loglevel)
with log.log_to_file("HENscrunchlc.log"):
scrunch_lightcurves(files, args.out)
[docs]
def baseline_main(args=None):
"""Main function called by the `HENbaselinesub` command line script."""
import argparse
description = (
"Subtract a baseline from the lightcurve using the Asymmetric Least "
"Squares algorithm. The two parameters p and lambda control the "
"asymmetry and smoothness of the baseline. See below for details."
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument("files", help="List of files", nargs="+")
parser.add_argument("-o", "--out", type=str, default=None, help="Output file")
parser.add_argument(
"--loglevel",
help=(
"use given logging level (one between INFO, "
"WARNING, ERROR, CRITICAL, DEBUG; "
"default:WARNING)"
),
default="WARNING",
type=str,
)
parser.add_argument(
"--debug",
help="use DEBUG logging level",
default=False,
action="store_true",
)
parser.add_argument(
"-p",
"--asymmetry",
type=float,
help='"asymmetry" parameter. Smaller values make the '
'baseline more "horizontal". Typically '
"0.001 < p < 0.1, but not necessarily.",
default=0.01,
)
parser.add_argument(
"-l",
"--lam",
type=float,
help='lambda, or "smoothness", parameter. Larger'
" values make the baseline stiffer. Typically "
"1e2 < lam < 1e9",
default=1e5,
)
args = parser.parse_args(args)
files = args.files
if args.debug:
args.loglevel = "DEBUG"
log.setLevel(args.loglevel)
with log.log_to_file("HENbaseline.log"):
_baseline_lightcurves(files, args.out, args.asymmetry, args.lam)