# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Read and save event lists from FITS files."""
import os
import warnings
from collections.abc import Iterable
import numpy as np
from stingray.events import EventList
from stingray.gti import (
check_separate,
create_gti_from_condition,
create_gti_mask,
cross_gtis,
cross_two_gtis,
)
from astropy import log
from .base import common_name, hen_root
from .io import HEN_FILE_EXTENSION, load_events, save_events
[docs]
def treat_event_file(
filename,
noclobber=False,
gti_split=False,
min_length=4,
gtistring=None,
length_split=None,
randomize_by=None,
discard_calibration=False,
split_by_detector=True,
additional_columns=None,
fill_small_gaps=None,
bin_time_for_occultations=None,
safe_interval=None,
):
"""Read data from an event file, with no external GTI information.
Parameters
----------
filename : str
Other Parameters
----------------
noclobber: bool
if a file is present, do not overwrite it
gtistring: str
comma-separated set of GTI strings to consider
gti_split: bool
split the file in multiple chunks, containing one GTI each
length_split: float, default None
split the file in multiple chunks, with approximately this length
min_length: float
minimum length of GTIs accepted (only if gti_split is True or
length_split is not None)
discard_calibration: bool
discard the automatic calibration done by Stingray (if any)
split_by_detector: bool, default True
split data from different detectors
fill_small_gaps: float
fill gaps between GTIs with random data, if shorter than this amount
bin_time_for_occultations: float
Create a light curve with this bin time and infer occultations not
recorded in GTIs. The rule is that the flux drops to zero and the average
counts per bin are significantly above 25 ct/s.
safe_interval : float or [float, float]
A safe interval to exclude at both ends (if single float) or the start
and the end (if pair of values) of GTIs.
"""
# gtistring = assign_value_if_none(gtistring, "GTI,GTI0,STDGTI")
log.info(f"Opening {filename}")
events = EventList.read(
filename,
fmt="hea",
gtistring=gtistring,
additional_columns=additional_columns,
)
if discard_calibration:
events.energy = None
events.cal_pi = None
mission = events.mission
if hasattr(events, "instr") and isinstance(events.instr, str):
instr = events.instr.lower()
gti = events.gti
if bin_time_for_occultations is not None and bin_time_for_occultations > 0:
lc = events.to_lc(bin_time_for_occultations)
meanrate = np.median(lc.counts)
if meanrate > 25:
log.info("Filtering out occultations")
good_gti = create_gti_mask(lc.time, lc.gti, safe_interval=safe_interval)
good = lc.counts > 0
new_bad = (~good) & good_gti
if np.any(new_bad):
warnings.warn(f"Found zero counts in the light curve at times{lc.time[new_bad]}")
gti = create_gti_from_condition(
lc.time, (good_gti & good), safe_interval=bin_time_for_occultations
)
events.gti = gti
elif safe_interval is not None:
if not isinstance(safe_interval, Iterable):
safe_interval = [safe_interval, safe_interval]
gti[:, 0] += safe_interval[0]
gti[:, 1] -= safe_interval[1]
lengths = gti[:, 1] - gti[:, 0]
gti = gti[lengths >= min_length]
events.gti = gti
detector_id = events.detector_id
if randomize_by is not None:
events.time += np.random.uniform(-randomize_by / 2, randomize_by / 2, events.time.size)
if fill_small_gaps is not None:
events = events.fill_bad_time_intervals(fill_small_gaps)
if detector_id is not None and split_by_detector:
detectors = np.array(list(set(detector_id)))
else:
detectors = [None]
outfile_root = hen_root(filename) + "_" + mission.lower() + "_" + instr.lower()
if randomize_by is not None:
outfile_root += f"_randomize_by_{randomize_by:g}s"
if fill_small_gaps is not None:
outfile_root += f"_fix_{fill_small_gaps:g}s"
output_files = []
for d in detectors:
if d is not None:
good_det = d == detector_id
outroot_local = f"{outfile_root}_det{d:02d}"
else:
good_det = np.ones_like(events.time, dtype=bool)
outroot_local = outfile_root
outfile = outroot_local + "_ev" + HEN_FILE_EXTENSION
if noclobber and os.path.exists(outfile) and (not (gti_split or length_split)):
warnings.warn(f"{outfile} exists and using noclobber. Skipping")
return
if gti_split or (length_split is not None):
if length_split:
gti0 = np.arange(gti[0, 0], gti[-1, 1], length_split)
gti1 = gti0 + length_split
gti_chunks = np.array([[g0, g1] for (g0, g1) in zip(gti0, gti1)])
label = "chunk"
else:
gti_chunks = gti
label = "gti"
for ig, g in enumerate(gti_chunks):
outfile_local = f"{outroot_local}_{label}{ig:03d}_ev" + HEN_FILE_EXTENSION
good_gti = cross_two_gtis([g], gti)
if noclobber and os.path.exists(outfile_local):
warnings.warn(f"{outfile_local} exists, and noclobber option used. Skipping")
return
good = np.logical_and(events.time >= g[0], events.time < g[1])
all_good = good_det & good
if len(events.time[all_good]) < 1:
continue
events_filt = events.apply_mask(all_good)
events_filt.gti = good_gti
save_events(events_filt, outfile_local)
output_files.append(outfile_local)
else:
events_filt = events.apply_mask(good_det)
save_events(events_filt, outfile)
output_files.append(outfile)
return output_files
def _wrap_fun(arglist):
f, kwargs = arglist
try:
return treat_event_file(f, **kwargs)
except IndexError:
log.error(f"Empty or corrupt event file: {f}")
except Exception as e:
log.error(f"Unknown error: {f}")
log.error(f"{str(e)}")
[docs]
def multiple_event_concatenate(event_lists):
"""Join multiple :class:`EventList` objects into one.
If both are empty, an empty :class:`EventList` is returned.
GTIs are crossed if the event lists are over a common time interval,
and appended otherwise.
``pi`` and ``pha`` remain ``None`` if they are ``None`` in both.
Otherwise, 0 is used as a default value for the :class:`EventList` where
they were None.
Parameters
----------
event_lists : list of :class:`EventList` object
:class:`EventList` objects that we are joining
Returns
-------
`ev_new` : :class:`EventList` object
The resulting :class:`EventList` object.
Examples
--------
>>> gti = [[0, 5]]
>>> ev1 = EventList(time=[0.3, 1], gti=gti, energy=[3, 4], pi=None)
>>> ev2 = EventList(time=[2, 3], gti=gti, energy=[3, 4])
>>> ev3 = EventList(time=[4, 4.5], gti=gti, energy=[3, 4])
>>> ev_new = multiple_event_concatenate([ev1, ev2, ev3])
>>> assert np.allclose(ev_new.time, [0.3, 1, 2, 3, 4, 4.5])
>>> assert np.allclose(ev_new.energy, [3, 4, 3, 4, 3, 4])
>>> assert ev_new.pi is None
"""
ev_new = EventList()
if check_separate(event_lists[0].gti, event_lists[1].gti):
gti = np.concatenate([ev.gti for ev in event_lists])
else:
gti = cross_gtis([ev.gti for ev in event_lists])
order = np.argsort(gti[:, 0])
gti = gti[order]
ev_new.time = np.concatenate([ev.time for ev in event_lists])
order = np.argsort(ev_new.time)
ev_new.time = ev_new.time[order]
for attr in event_lists[0].array_attrs():
if attr == "time":
continue
if getattr(event_lists[0], attr) is not None:
setattr(
ev_new,
attr,
np.concatenate([getattr(ev, attr) for ev in event_lists])[order],
)
else:
setattr(ev_new, attr, None)
ev_new.mjdref = event_lists[0].mjdref
ev_new.gti = gti
return ev_new
[docs]
def join_eventlists(event_file1, event_file2, new_event_file=None, ignore_instr=False):
"""Join two event files.
Parameters
----------
event_file1 : str
First event file
event_file2 : str
Second event file
Other Parameters
----------------
new_event_file : str, default None
Output event file. If not specified uses `hendrics.utils.common_name`
to figure out a good name to use mixing up the two input names.
ignore_instr : bool
Ignore errors about different instruments
Returns
-------
new_event_file : str
Output event file
"""
if new_event_file is None:
new_event_file = common_name(event_file1, event_file2) + "_ev" + HEN_FILE_EXTENSION
events1 = load_events(event_file1)
events2 = load_events(event_file2)
if ignore_instr:
events1.instr = events2.instr = f"{events1.instr},{events2.instr}"
if events2.time.size == 0 or events2.gti.size == 0:
warnings.warn(f"{event_file2} has no good events")
return None
if events2.mjdref != events1.mjdref:
warnings.warn("Different missions detected; changing MJDREF")
time_diff = (events1.mjdref - events2.mjdref) * 86400
events2.time -= time_diff
events2.mjdref = events1.mjdref
events2.gti -= time_diff
events = events1.join(events2)
if hasattr(events2, "header"):
events.header = events1.header
if events1.instr.lower() != events2.instr.lower():
events.instr = ",".join([e.instr.lower() for e in [events1, events2]])
for attr in ["mission", "instr"]:
if getattr(events1, attr) != getattr(events2, attr):
setattr(
events,
attr,
getattr(events1, attr) + "," + getattr(events2, attr),
)
else:
setattr(events, attr, getattr(events1, attr))
save_events(events, new_event_file)
return new_event_file
[docs]
def join_many_eventlists(eventfiles, new_event_file=None, ignore_instr=False):
"""Join two event files.
Parameters
----------
event_files : list of str
List of event files
Other Parameters
----------------
new_event_file : str, default None
Output event file. If not specified ``joint_ev`` + HEN_FILE_EXTENSION
ignore_instr : bool
Ignore errors about different instruments
Returns
-------
new_event_file : str
Output event file
"""
if new_event_file is None:
new_event_file = "joint_ev" + HEN_FILE_EXTENSION
N = len(eventfiles)
log.info(f"Reading {eventfiles[0]} ({1}/{N})")
first_events = load_events(eventfiles[0])
all_events = [first_events]
instr = first_events.instr
for i, event_file in enumerate(eventfiles[1:]):
log.info(f"Reading {event_file} ({i + 1}/{N})")
events = load_events(event_file)
if not np.isclose(events.mjdref, first_events.mjdref):
warnings.warn(f"{event_file} has a different MJDREF")
continue
if hasattr(events, "instr") and not events.instr == first_events.instr and not ignore_instr:
warnings.warn(f"{event_file} is from a different instrument")
continue
elif ignore_instr:
instr += "," + events.instr
if (
events.time.size == 0
or events.gti.size == 0
or not np.all(
[
events.time[0] < events.gti.max(),
events.time[-1] > events.gti.min(),
]
)
):
warnings.warn(f"{event_file} has no good events")
continue
all_events.append(events)
events = multiple_event_concatenate(all_events)
if ignore_instr:
events.instr = instr
save_events(events, new_event_file)
return new_event_file
def _split_events(ev, max_length, overlap=0):
event_times = ev.time
GTI = ev.gti
t0 = GTI[0, 0]
while t0 < GTI.max():
t1 = min(t0 + max_length, GTI.max())
if t1 - t0 < max_length / 2:
break
idx_start = np.searchsorted(event_times, t0)
idx_stop = np.searchsorted(event_times, t1)
gti_local = cross_two_gtis(GTI, [[t0, t1]])
local_times = event_times[idx_start:idx_stop]
new_ev = EventList(time=local_times, gti=gti_local)
for attr in ["pi", "energy", "cal_pi"]:
if hasattr(ev, attr) and getattr(ev, attr) is not None:
setattr(new_ev, attr, getattr(ev, attr)[idx_start:idx_stop])
for attr in ["mission", "instr", "mjdref", "header"]:
if hasattr(ev, attr) and getattr(ev, attr) is not None:
setattr(new_ev, attr, getattr(ev, attr))
t0 = t0 + max_length * (1.0 - overlap)
yield new_ev
[docs]
def split_eventlist(fname, max_length, overlap=None):
root = hen_root(fname)
ev = load_events(fname)
if overlap is None:
overlap = 0
if overlap >= 1:
raise ValueError("Overlap cannot be >=1. Exiting.")
event_times = ev.time
GTI = ev.gti
t0 = GTI[0, 0]
from .base import nchars_in_int_value
nchars = nchars_in_int_value((GTI.max() - t0) / max_length)
all_files = []
for count, new_ev in enumerate(_split_events(ev, max_length, overlap)):
newfname = root + f"_{count:0{nchars}d}" + HEN_FILE_EXTENSION
save_events(new_ev, newfname)
all_files.append(newfname)
return all_files
[docs]
def split_eventlist_at_mjd(fname, mjd):
root = hen_root(fname)
ev = load_events(fname)
event_times = ev.time
met = (mjd - ev.mjdref) * 86400
idx = np.searchsorted(event_times, met)
ev_before = ev.apply_mask(slice(0, idx))
ev_after = ev.apply_mask(slice(idx, -1))
GTI = ev.gti
ev_before.gti = cross_two_gtis(GTI, [[GTI[0, 0], met]])
ev_after.gti = cross_two_gtis(GTI, [[met, GTI[-1, 1]]])
fbefore = root + f"_before_{mjd:g}" + HEN_FILE_EXTENSION
fafter = root + f"_after_{mjd:g}" + HEN_FILE_EXTENSION
save_events(ev_before, fbefore)
save_events(ev_after, fafter)
all_files = [fbefore, fafter]
return all_files
[docs]
def main_join(args=None):
"""Main function called by the `HENjoinevents` command line script."""
import argparse
description = (
"Read a cleaned event files and saves the relevant information in a standard format"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument("files", help="Files to join", type=str, nargs="+")
parser.add_argument("-o", "--output", type=str, help="Name of output file", default=None)
parser.add_argument(
"--ignore-instr",
help="Ignore instrument names in channels",
default=False,
action="store_true",
)
args = parser.parse_args(args)
if len(args.files) == 2:
return join_eventlists(
args.files[0],
args.files[1],
new_event_file=args.output,
ignore_instr=args.ignore_instr,
)
else:
return join_many_eventlists(
args.files,
new_event_file=args.output,
ignore_instr=args.ignore_instr,
)
[docs]
def main_splitevents(args=None):
"""Main function called by the `HENsplitevents` command line script."""
import argparse
description = (
"Reads a cleaned event files and splits the file into "
"overlapping multiple chunks of fixed length"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument("fname", help="File 1", type=str)
parser.add_argument(
"-l",
"--length-split",
help="Split event list by GTI",
default=None,
type=float,
)
parser.add_argument(
"--overlap",
type=float,
help="Overlap factor. 0 for no overlap, 0.5 for " "half-interval overlap, and so on.",
default=None,
)
parser.add_argument(
"--split-at-mjd",
type=float,
help="Split at this MJD",
default=None,
)
args = parser.parse_args(args)
if args.split_at_mjd is not None:
return split_eventlist_at_mjd(args.fname, mjd=args.split_at_mjd)
return split_eventlist(args.fname, max_length=args.length_split, overlap=args.overlap)
[docs]
def main(args=None):
"""Main function called by the `HENreadevents` command line script."""
import argparse
from multiprocessing import Pool
from .base import _add_default_args, check_negative_numbers_in_args
description = (
"Read a cleaned event files and saves the relevant information in a standard format"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument("files", help="List of files", nargs="+")
parser.add_argument(
"--noclobber",
help=("Do not overwrite existing event files"),
default=False,
action="store_true",
)
parser.add_argument(
"-g",
"--gti-split",
help="Split event list by GTI",
default=False,
action="store_true",
)
parser.add_argument(
"--discard-calibration",
help="Discard automatic calibration (if any)",
default=False,
action="store_true",
)
parser.add_argument(
"--ignore-detectors",
help="Do not split by detector",
default=False,
action="store_true",
)
parser.add_argument(
"-l",
"--length-split",
help="Split event list by length",
default=None,
type=float,
)
parser.add_argument(
"--min-length",
type=int,
help="Minimum length of GTIs to consider",
default=0,
)
parser.add_argument(
"--bin-time-for-occultations",
type=float,
help=(
"Create a light curve with this bin time and infer occultations not recorded in GTIs."
" (The flux drops to zero and the average count rate is significantly above 25 ct/s)"
),
default=None,
)
parser.add_argument("--gti-string", type=str, help="GTI string", default=None)
parser.add_argument(
"--randomize-by",
type=float,
help="Randomize event arrival times by this amount "
"(e.g. it might be the 0.073-s frame time in "
"XMM)",
default=None,
)
parser.add_argument(
"--safe-interval",
nargs=2,
type=float,
default=0,
help="Interval at start and stop of GTIs used" + " for filtering",
)
parser.add_argument(
"--fill-small-gaps",
type=float,
help="Fill gaps between GTIs with random data, if shorter than this amount",
default=None,
)
parser.add_argument(
"--additional",
type=str,
nargs="+",
help="Additional columns to be read from the FITS file",
default=None,
)
_add_default_args(parser, ["output", "loglevel", "debug", "nproc"])
args = check_negative_numbers_in_args(args)
args = parser.parse_args(args)
files = args.files
if args.debug:
args.loglevel = "DEBUG"
log.setLevel(args.loglevel)
argdict = {
"noclobber": args.noclobber,
"gti_split": args.gti_split,
"min_length": args.min_length,
"gtistring": args.gti_string,
"length_split": args.length_split,
"randomize_by": args.randomize_by,
"discard_calibration": args.discard_calibration,
"additional_columns": args.additional,
"fill_small_gaps": args.fill_small_gaps,
"split_by_detector": not args.ignore_detectors,
"bin_time_for_occultations": args.bin_time_for_occultations,
"safe_interval": args.safe_interval,
}
arglist = [[f, argdict] for f in files]
if os.name == "nt" or args.nproc == 1:
[_wrap_fun(a) for a in arglist]
else:
pool = Pool(processes=args.nproc)
for i in pool.imap_unordered(_wrap_fun, arglist):
pass
pool.close()