Source code for hendrics.read_events

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Read and save event lists from FITS files."""

import warnings
import copy
import os
import numpy as np
from astropy import log
from stingray.io import load_events_and_gtis
from stingray.events import EventList
from stingray.gti import cross_two_gtis, cross_gtis, check_separate
from .io import load_events
from .base import common_name
from .base import hen_root
from .io import save_events
from .io import HEN_FILE_EXTENSION


[docs] def treat_event_file( filename, noclobber=False, gti_split=False, min_length=4, gtistring=None, length_split=None, randomize_by=None, discard_calibration=False, additional_columns=None, fill_small_gaps=None, ): """Read data from an event file, with no external GTI information. Parameters ---------- filename : str Other Parameters ---------------- noclobber: bool if a file is present, do not overwrite it gtistring: str comma-separated set of GTI strings to consider gti_split: bool split the file in multiple chunks, containing one GTI each length_split: float, default None split the file in multiple chunks, with approximately this length min_length: float minimum length of GTIs accepted (only if gti_split is True or length_split is not None) discard_calibration: bool discard the automatic calibration done by Stingray (if any) fill_small_gaps: float fill gaps between GTIs with random data, if shorter than this amount """ # gtistring = assign_value_if_none(gtistring, "GTI,GTI0,STDGTI") log.info("Opening %s" % filename) events = EventList.read( filename, fmt="hea", gtistring=gtistring, additional_columns=additional_columns, ) if discard_calibration: events.energy = None events.cal_pi = None mission = events.mission if hasattr(events, "instr") and isinstance(events.instr, str): instr = events.instr.lower() gti = events.gti lengths = np.array([g1 - g0 for (g0, g1) in gti]) gti = gti[lengths >= min_length] events.gti = gti detector_id = events.detector_id if randomize_by is not None: events.time += np.random.uniform( -randomize_by / 2, randomize_by / 2, events.time.size ) if fill_small_gaps is not None: events = events.fill_bad_time_intervals(fill_small_gaps) if detector_id is not None: detectors = np.array(list(set(detector_id))) else: detectors = [None] outfile_root = hen_root(filename) + "_" + mission.lower() + "_" + instr.lower() if randomize_by is not None: outfile_root += f"_randomize_by_{randomize_by:g}s" if fill_small_gaps is not None: outfile_root += f"_fix_{fill_small_gaps:g}s" output_files = [] for d in detectors: if d is not None: good_det = d == detector_id outroot_local = "{0}_det{1:02d}".format(outfile_root, d) else: good_det = np.ones_like(events.time, dtype=bool) outroot_local = outfile_root outfile = outroot_local + "_ev" + HEN_FILE_EXTENSION if noclobber and os.path.exists(outfile) and (not (gti_split or length_split)): warnings.warn("{0} exists and using noclobber. Skipping".format(outfile)) return if gti_split or (length_split is not None): if length_split: gti0 = np.arange(gti[0, 0], gti[-1, 1], length_split) gti1 = gti0 + length_split gti_chunks = np.array([[g0, g1] for (g0, g1) in zip(gti0, gti1)]) label = "chunk" else: gti_chunks = gti label = "gti" for ig, g in enumerate(gti_chunks): outfile_local = ( "{0}_{1}{2:03d}_ev".format(outroot_local, label, ig) + HEN_FILE_EXTENSION ) good_gti = cross_two_gtis([g], gti) if noclobber and os.path.exists(outfile_local): warnings.warn( "{0} exists, ".format(outfile_local) + "and noclobber option used. Skipping" ) return good = np.logical_and(events.time >= g[0], events.time < g[1]) all_good = good_det & good if len(events.time[all_good]) < 1: continue events_filt = events.apply_mask(all_good) events_filt.gti = good_gti save_events(events_filt, outfile_local) output_files.append(outfile_local) else: events_filt = events.apply_mask(good_det) save_events(events_filt, outfile) output_files.append(outfile) return output_files
def _wrap_fun(arglist): f, kwargs = arglist try: return treat_event_file(f, **kwargs) except IndexError: log.error(f"Empty or corrupt event file: {f}") except Exception as e: log.error(f"Unknown error: {f}") log.error(f"{str(e)}")
[docs] def multiple_event_concatenate(event_lists): """ Join multiple :class:`EventList` objects into one. If both are empty, an empty :class:`EventList` is returned. GTIs are crossed if the event lists are over a common time interval, and appended otherwise. ``pi`` and ``pha`` remain ``None`` if they are ``None`` in both. Otherwise, 0 is used as a default value for the :class:`EventList` where they were None. Parameters ---------- event_lists : list of :class:`EventList` object :class:`EventList` objects that we are joining Returns ------- `ev_new` : :class:`EventList` object The resulting :class:`EventList` object. Examples -------- >>> gti = [[0, 5]] >>> ev1 = EventList(time=[0.3, 1], gti=gti, energy=[3, 4], pi=None) >>> ev2 = EventList(time=[2, 3], gti=gti, energy=[3, 4]) >>> ev3 = EventList(time=[4, 4.5], gti=gti, energy=[3, 4]) >>> ev_new = multiple_event_concatenate([ev1, ev2, ev3]) >>> assert np.allclose(ev_new.time, [0.3, 1, 2, 3, 4, 4.5]) >>> assert np.allclose(ev_new.energy, [3, 4, 3, 4, 3, 4]) >>> assert ev_new.pi is None """ ev_new = EventList() if check_separate(event_lists[0].gti, event_lists[1].gti): gti = np.concatenate([ev.gti for ev in event_lists]) else: gti = cross_gtis([ev.gti for ev in event_lists]) order = np.argsort(gti[:, 0]) gti = gti[order] ev_new.time = np.concatenate([ev.time for ev in event_lists]) order = np.argsort(ev_new.time) ev_new.time = ev_new.time[order] for attr in event_lists[0].array_attrs(): if attr == "time": continue if getattr(event_lists[0], attr) is not None: setattr( ev_new, attr, np.concatenate([getattr(ev, attr) for ev in event_lists])[order], ) else: setattr(ev_new, attr, None) ev_new.mjdref = event_lists[0].mjdref ev_new.gti = gti return ev_new
[docs] def join_eventlists(event_file1, event_file2, new_event_file=None, ignore_instr=False): """Join two event files. Parameters ---------- event_file1 : str First event file event_file2 : str Second event file Other parameters ---------------- new_event_file : str, default None Output event file. If not specified uses `hendrics.utils.common_name` to figure out a good name to use mixing up the two input names. ignore_instr : bool Ignore errors about different instruments Returns ------- new_event_file : str Output event file """ if new_event_file is None: new_event_file = ( common_name(event_file1, event_file2) + "_ev" + HEN_FILE_EXTENSION ) events1 = load_events(event_file1) events2 = load_events(event_file2) if ignore_instr: events1.instr = events2.instr = ",".join((events1.instr, events2.instr)) if events2.time.size == 0 or events2.gti.size == 0: warnings.warn(f"{event_file2} has no good events") return None if events2.mjdref != events1.mjdref: warnings.warn("Different missions detected; changing MJDREF") time_diff = (events1.mjdref - events2.mjdref) * 86400 events2.time -= time_diff events2.mjdref = events1.mjdref events2.gti -= time_diff events = events1.join(events2) if hasattr(events2, "header"): events.header = events1.header if events1.instr.lower() != events2.instr.lower(): events.instr = ",".join([e.instr.lower() for e in [events1, events2]]) for attr in ["mission", "instr"]: if getattr(events1, attr) != getattr(events2, attr): setattr( events, attr, getattr(events1, attr) + "," + getattr(events2, attr), ) else: setattr(events, attr, getattr(events1, attr)) save_events(events, new_event_file) return new_event_file
[docs] def join_many_eventlists(eventfiles, new_event_file=None, ignore_instr=False): """Join two event files. Parameters ---------- event_files : list of str List of event files Other parameters ---------------- new_event_file : str, default None Output event file. If not specified ``joint_ev`` + HEN_FILE_EXTENSION ignore_instr : bool Ignore errors about different instruments Returns ------- new_event_file : str Output event file """ if new_event_file is None: new_event_file = "joint_ev" + HEN_FILE_EXTENSION N = len(eventfiles) log.info(f"Reading {eventfiles[0]} ({1}/{N})") first_events = load_events(eventfiles[0]) all_events = [first_events] instr = first_events.instr for i, event_file in enumerate(eventfiles[1:]): log.info(f"Reading {event_file} ({i + 1}/{N})") events = load_events(event_file) if not np.isclose(events.mjdref, first_events.mjdref): warnings.warn(f"{event_file} has a different MJDREF") continue if ( hasattr(events, "instr") and not events.instr == first_events.instr and not ignore_instr ): warnings.warn(f"{event_file} is from a different instrument") continue elif ignore_instr: instr += "," + events.instr if ( events.time.size == 0 or events.gti.size == 0 or not np.all( [ events.time[0] < events.gti.max(), events.time[-1] > events.gti.min(), ] ) ): warnings.warn(f"{event_file} has no good events") continue all_events.append(events) events = multiple_event_concatenate(all_events) if ignore_instr: events.instr = instr save_events(events, new_event_file) return new_event_file
def _split_events(ev, max_length, overlap=0): event_times = ev.time GTI = ev.gti t0 = GTI[0, 0] while t0 < GTI.max(): t1 = min(t0 + max_length, GTI.max()) if t1 - t0 < max_length / 2: break idx_start = np.searchsorted(event_times, t0) idx_stop = np.searchsorted(event_times, t1) gti_local = cross_two_gtis(GTI, [[t0, t1]]) local_times = event_times[idx_start:idx_stop] new_ev = EventList(time=local_times, gti=gti_local) for attr in ["pi", "energy", "cal_pi"]: if hasattr(ev, attr) and getattr(ev, attr) is not None: setattr(new_ev, attr, getattr(ev, attr)[idx_start:idx_stop]) for attr in ["mission", "instr", "mjdref", "header"]: if hasattr(ev, attr) and getattr(ev, attr) is not None: setattr(new_ev, attr, getattr(ev, attr)) t0 = t0 + max_length * (1.0 - overlap) yield new_ev
[docs] def split_eventlist(fname, max_length, overlap=None): root = hen_root(fname) ev = load_events(fname) if overlap is None: overlap = 0 if overlap >= 1: raise ValueError("Overlap cannot be >=1. Exiting.") event_times = ev.time GTI = ev.gti t0 = GTI[0, 0] count = 0 from .base import nchars_in_int_value nchars = nchars_in_int_value((GTI.max() - t0) / max_length) all_files = [] for new_ev in _split_events(ev, max_length, overlap): newfname = root + f"_{count:0{nchars}d}" + HEN_FILE_EXTENSION save_events(new_ev, newfname) all_files.append(newfname) count += 1 return all_files
[docs] def split_eventlist_at_mjd(fname, mjd): root = hen_root(fname) ev = load_events(fname) event_times = ev.time met = (mjd - ev.mjdref) * 86400 idx = np.searchsorted(event_times, met) ev_before = ev.apply_mask(slice(0, idx)) ev_after = ev.apply_mask(slice(idx, -1)) GTI = ev.gti ev_before.gti = cross_two_gtis(GTI, [[GTI[0, 0], met]]) ev_after.gti = cross_two_gtis(GTI, [[met, GTI[-1, 1]]]) fbefore = root + f"_before_{mjd:g}" + HEN_FILE_EXTENSION fafter = root + f"_after_{mjd:g}" + HEN_FILE_EXTENSION save_events(ev_before, fbefore) save_events(ev_after, fafter) all_files = [fbefore, fafter] return all_files
[docs] def main_join(args=None): """Main function called by the `HENjoinevents` command line script.""" import argparse description = ( "Read a cleaned event files and saves the relevant " "information in a standard format" ) parser = argparse.ArgumentParser(description=description) parser.add_argument("files", help="Files to join", type=str, nargs="+") parser.add_argument( "-o", "--output", type=str, help="Name of output file", default=None ) parser.add_argument( "--ignore-instr", help="Ignore instrument names in channels", default=False, action="store_true", ) args = parser.parse_args(args) if len(args.files) == 2: return join_eventlists( args.files[0], args.files[1], new_event_file=args.output, ignore_instr=args.ignore_instr, ) else: return join_many_eventlists( args.files, new_event_file=args.output, ignore_instr=args.ignore_instr, )
[docs] def main_splitevents(args=None): """Main function called by the `HENsplitevents` command line script.""" import argparse description = ( "Reads a cleaned event files and splits the file into " "overlapping multiple chunks of fixed length" ) parser = argparse.ArgumentParser(description=description) parser.add_argument("fname", help="File 1", type=str) parser.add_argument( "-l", "--length-split", help="Split event list by GTI", default=None, type=float, ) parser.add_argument( "--overlap", type=float, help="Overlap factor. 0 for no overlap, 0.5 for " "half-interval overlap, and so on.", default=None, ) parser.add_argument( "--split-at-mjd", type=float, help="Split at this MJD", default=None, ) args = parser.parse_args(args) if args.split_at_mjd is not None: return split_eventlist_at_mjd(args.fname, mjd=args.split_at_mjd) return split_eventlist( args.fname, max_length=args.length_split, overlap=args.overlap )
[docs] def main(args=None): """Main function called by the `HENreadevents` command line script.""" import argparse from multiprocessing import Pool from .base import _add_default_args, check_negative_numbers_in_args description = ( "Read a cleaned event files and saves the relevant " "information in a standard format" ) parser = argparse.ArgumentParser(description=description) parser.add_argument("files", help="List of files", nargs="+") parser.add_argument( "--noclobber", help=("Do not overwrite existing event files"), default=False, action="store_true", ) parser.add_argument( "-g", "--gti-split", help="Split event list by GTI", default=False, action="store_true", ) parser.add_argument( "--discard-calibration", help="Discard automatic calibration (if any)", default=False, action="store_true", ) parser.add_argument( "-l", "--length-split", help="Split event list by length", default=None, type=float, ) parser.add_argument( "--min-length", type=int, help="Minimum length of GTIs to consider", default=0, ) parser.add_argument("--gti-string", type=str, help="GTI string", default=None) parser.add_argument( "--randomize-by", type=float, help="Randomize event arrival times by this amount " "(e.g. it might be the 0.073-s frame time in " "XMM)", default=None, ) parser.add_argument( "--fill-small-gaps", type=float, help="Fill gaps between GTIs with random data, if shorter than this amount", default=None, ) parser.add_argument( "--additional", type=str, nargs="+", help="Additional columns to be read from the FITS file", default=None, ) _add_default_args(parser, ["output", "loglevel", "debug", "nproc"]) args = check_negative_numbers_in_args(args) args = parser.parse_args(args) files = args.files if args.debug: args.loglevel = "DEBUG" log.setLevel(args.loglevel) with log.log_to_file("HENreadevents.log"): argdict = { "noclobber": args.noclobber, "gti_split": args.gti_split, "min_length": args.min_length, "gtistring": args.gti_string, "length_split": args.length_split, "randomize_by": args.randomize_by, "discard_calibration": args.discard_calibration, "additional_columns": args.additional, "fill_small_gaps": args.fill_small_gaps, } arglist = [[f, argdict] for f in files] if os.name == "nt" or args.nproc == 1: [_wrap_fun(a) for a in arglist] else: pool = Pool(processes=args.nproc) for i in pool.imap_unordered(_wrap_fun, arglist): pass pool.close()