Source code for hendrics.fake

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Functions to simulate data and produce a fake event file."""

import copy
import os

import numpy as np
import numpy.random as ra
from stingray.events import EventList
from stingray.filters import filter_for_deadtime
from stingray.io import get_key_from_mission_info, read_mission_info
from stingray.lightcurve import Lightcurve
from stingray.utils import assign_value_if_none

from astropy import log
from astropy.io.fits import Header

from .base import deorbit_events, get_file_format, r_in
from .fold import filter_energy
from .io import HEN_FILE_EXTENSION, load_events, load_lcurve, save_events
from .lcurve import lcurve_from_fits


def _clean_up_header(header):
    if header is None:
        return None
    for key in header.keys():
        for k in ["TTYP", "TFORM"]:
            if key.startswith(k):
                header.pop(key)
    for k in ["EXTNAME", "PCOUNT", "GCOUNT", "NAXIS1", "NAXIS2"]:
        if k in header:
            header.pop(k)
    return header


def _fill_in_default_information(tbheader):
    tbheader["OBSERVER"] = "Edwige Bubble"
    tbheader["COMMENT"] = (
        "FITS (Flexible Image Transport System) format is"
        " defined in 'Astronomy and Astrophysics', volume"
        " 376, page 359; bibcode: 2001A&A...376..359H"
    )
    tbheader["OBS_ID"] = ("00000000001", "Observation ID")
    tbheader["TARG_ID"] = (0, "Target ID")
    tbheader["OBJECT"] = ("Fake X-1", "Name of observed object")
    tbheader["RA_OBJ"] = (0.0, "[deg] R.A. Object")
    tbheader["DEC_OBJ"] = (0.0, "[deg] Dec Object")
    tbheader["RA_NOM"] = (
        0.0,
        "Right Ascension used for barycenter corrections",
    )
    tbheader["DEC_NOM"] = (0.0, "Declination used for barycenter corrections")
    tbheader["RA_PNT"] = (0.0, "[deg] RA pointing")
    tbheader["DEC_PNT"] = (0.0, "[deg] Dec pointing")
    tbheader["PA_PNT"] = (0.0, "[deg] Position angle (roll)")
    tbheader["EQUINOX"] = (2.000e03, "Equinox of celestial coord system")
    tbheader["RADECSYS"] = ("FK5", "Coordinate Reference System")
    tbheader["TASSIGN"] = ("SATELLITE", "Time assigned by onboard clock")
    tbheader["TIMESYS"] = ("TDB", "All times in this file are TDB")
    tbheader["TIMEREF"] = (
        "SOLARSYSTEM",
        "Times are pathlength-corrected to barycenter",
    )
    tbheader["CLOCKAPP"] = (
        False,
        "TRUE if timestamps corrected by gnd sware",
    )
    tbheader["COMMENT"] = "MJDREFI+MJDREFF = epoch of Jan 1, 2010, in TT " "time system."
    tbheader["TIMEUNIT"] = ("s", "unit for time keywords")
    return tbheader


[docs] def generate_fake_fits_observation( event_list=None, filename=None, instr=None, gti=None, tstart=None, tstop=None, mission=None, mjdref=55197.00076601852, livetime=None, additional_columns={}, ): """Generate fake X-ray data. Takes an event list (as a list of floats) All inputs are None by default, and can be set during the call. Parameters ---------- event_list : list-like :class:`stingray.events.Eventlist` object. If left None, 1000 random events will be generated, for a total length of 1025 s or the difference between tstop and tstart. filename : str Output file name Returns ------- hdulist : FITS hdu list FITS hdu list of the output file Other Parameters ---------------- mjdref : float Reference MJD. Default is 55197.00076601852 (NuSTAR) pi : list-like The PI channel of each event tstart : float Start of the observation (s from mjdref) tstop : float End of the observation (s from mjdref) instr : str Name of the instrument. Default is 'FPMA' livetime : float Total livetime. Default is tstop - tstart """ from astropy.io import fits inheader = None if event_list is None: tstart = assign_value_if_none(tstart, 8e7) tstop = assign_value_if_none(tstop, tstart + 1025) ev_list = np.sort(ra.uniform(tstart, tstop, 1000)) gti = assign_value_if_none(gti, np.array([[tstart, tstop]])) else: if hasattr(event_list, "header") and event_list.header is not None: inheader = Header.fromstring(event_list.header) inheader = _clean_up_header(inheader) ev_list = event_list.time gti = assign_value_if_none(event_list.gti, np.asarray([[ev_list[0], ev_list[-1]]])) mission = assign_value_if_none(mission, event_list.mission) instr = assign_value_if_none(instr, event_list.instr) tstart = assign_value_if_none(tstart, gti[0, 0]) tstop = assign_value_if_none(tstop, gti[-1, 1]) if hasattr(event_list, "mjdref") and event_list.mjdref is not None: mjdref = event_list.mjdref mission = assign_value_if_none(mission, "NuSTAR") instr = assign_value_if_none(instr, "FPMA") if hasattr(event_list, "pi") and event_list.pi is not None: pi = event_list.pi else: pi = ra.randint(0, 1024, np.size(ev_list)) if hasattr(event_list, "cal_pi") and event_list.cal_pi is not None: cal_pi = event_list.cal_pi else: cal_pi = pi / 3 filename = assign_value_if_none(filename, "events.evt") livetime = assign_value_if_none(livetime, tstop - tstart) if livetime > tstop - tstart: raise ValueError("Livetime must be equal or smaller than " "tstop - tstart") mission_info = read_mission_info(mission) allowed_instr = [] if "instruments" in mission_info: allowed_instr = mission_info["instruments"] # Just prefer EPN for XMM if "xmm" in mission.lower(): allowed_instr = ["EPN", "EMOS1", "EMOS2", "RGS1", "RGS2"] allowed_instr = [ins.lower() for ins in allowed_instr] if (allowed_instr != []) and (instr.lower() not in allowed_instr): instr = allowed_instr[0] ccol = get_key_from_mission_info(mission_info, "ccol", "none", inst=instr) if ccol is not None and ccol.lower() == "none": ccol = None ecol = get_key_from_mission_info(mission_info, "ecol", "PI", inst=instr) ext = get_key_from_mission_info(mission_info, "events", "EVENTS", inst=instr) # Create primary header prihdr = fits.Header() prihdr["OBSERVER"] = "Edwige Bubble" if inheader is not None and "OBSERVER" in inheader: prihdr["OBSERVER"] = inheader["OBSERVER"] prihdr["TELESCOP"] = (mission, "Telescope (mission) name") prihdr["INSTRUME"] = (instr, "Instrument name") prihdu = fits.PrimaryHDU(header=prihdr) prihdu.verify("exception") # Write events to table col1 = fits.Column(name="TIME", format="1D", array=ev_list) allcols = [col1] if ccol is not None: if not hasattr(event_list, "detector_id") or event_list.detector_id is None: ccdnr = np.zeros(np.size(ev_list)) + 1 ccdnr[1] = 2 # Make it less trivial ccdnr[10] = 7 else: ccdnr = event_list.detector_id allcols.append(fits.Column(name=ccol, format="1J", array=ccdnr)) if mission.lower().strip() in ["xmm", "swift"]: allcols.append(fits.Column(name="PHA", format="1J", array=pi)) allcols.append(fits.Column(name="PI", format="1J", array=cal_pi)) else: allcols.append(fits.Column(name=ecol, format="1J", array=pi)) for c in additional_columns.keys(): col = fits.Column( name=c, array=additional_columns[c]["data"], format=additional_columns[c]["format"], ) allcols.append(col) cols = fits.ColDefs(allcols) # ---- Fake lots of information ---- tbheader = Header() tbheader = _fill_in_default_information(tbheader) # If None, it will not update tbheader.update(inheader) tbheader["TSTART"] = ( tstart, "Elapsed seconds since MJDREF at start of file", ) tbheader["TELESCOP"] = (mission, "Telescope (mission) name") tbheader["INSTRUME"] = (instr, "Instrument name") if mjdref != float(int(mjdref)): tbheader["MJDREFI"] = ( int(mjdref), "TDB time reference; Modified Julian Day (int)", ) tbheader["MJDREFF"] = ( mjdref - int(mjdref), "TDB time reference; Modified Julian Day (frac)", ) tbheader.pop("MJDREF", None) else: tbheader["MJDREF"] = mjdref tbheader.pop("MJDREFI", None) tbheader.pop("MJDREFF", None) tbheader["TSTOP"] = (tstop, "Elapsed seconds since MJDREF at end of file") tbheader["LIVETIME"] = (livetime, "On-source time") tbheader["TIMEZERO"] = (0.000000e00, "Time Zero") tbheader["HISTORY"] = f"Generated with HENDRICS by {os.getenv('USER')}" tbhdu = fits.BinTableHDU.from_columns(cols, header=tbheader) tbhdu.name = ext tbhdu.add_checksum() tbhdu.verify("exception") # ---- END Fake lots of information ---- # Fake GTIs start = gti[:, 0] stop = gti[:, 1] col1 = fits.Column(name="START", format="1D", array=start) col2 = fits.Column(name="STOP", format="1D", array=stop) allcols = [col1, col2] cols = fits.ColDefs(allcols) gtinames = ["GTI"] if mission.lower().strip() == "xmm": gtinames = [] for i in set(ccdnr): gtinames.append(f"STDGTI{int(i):02d}") all_new_hdus = [prihdu, tbhdu] for name in gtinames: gtihdu = fits.BinTableHDU.from_columns(cols) gtihdu.name = name gtihdu.verify("exception") all_new_hdus.append(gtihdu) tbhdu.verify("exception") thdulist = fits.HDUList(all_new_hdus) assert thdulist[1].verify_datasum() == 1 thdulist.writeto(filename, overwrite=True, checksum=True, output_verify="exception") thdulist.close() return filename
def _read_event_list(filename): ev_list = load_events(filename) return ev_list def _read_light_curve(filename): file_format = get_file_format(filename) if file_format == "fits": filename = lcurve_from_fits(filename)[0] lc = load_lcurve(filename) return lc
[docs] def acceptance_rejection(dt, counts_per_bin, t0=0.0, poissonize_n_events=False, deadtime=0.0): """ Examples -------- >>> counts_per_bin = [10, 5, 5] >>> dt = 0.1 >>> ev = acceptance_rejection(dt, counts_per_bin) >>> assert ev.size == 20 >>> assert ev.max() < 0.3 >>> assert ev.min() > 0 >>> assert np.all(np.diff(ev) >= 0) """ counts_per_bin = np.asarray(counts_per_bin) rates = counts_per_bin / dt dead_time_corrected_rates = r_in(deadtime, rates) counts_per_bin = dead_time_corrected_rates * dt n_events = np.rint(np.sum(counts_per_bin)).astype(int) if poissonize_n_events: n_events = np.random.poisson(n_events) n_bins = counts_per_bin.size event_times = np.zeros(n_events) n_missing = n_events M = np.max(counts_per_bin) while n_missing > 0: stats = np.random.uniform(0, M, n_missing) float_bin = np.random.uniform(0, n_bins, n_missing) int_bin = np.floor(float_bin).astype(int) good = stats < counts_per_bin[int_bin] n = np.count_nonzero(good) if n == 0: continue start_bin = -n_missing end_bin = -n_missing + n if end_bin == 0: end_bin = event_times.size event_times[start_bin:end_bin] = float_bin[good] * dt + t0 n_missing -= n return filter_for_deadtime(np.sort(event_times), deadtime)
[docs] def make_counts_pulsed(nevents, t_start, t_stop, pulsed_fraction=0.0): """ Examples -------- >>> nevents = 10 >>> dt, counts = make_counts_pulsed(nevents, 0, 100) >>> assert np.isclose(np.sum(counts), nevents) >>> dt, counts = make_counts_pulsed(nevents, 0, 100, pulsed_fraction=1) >>> assert np.isclose(np.sum(counts), nevents) """ dt = 0.0546372810934756 length = t_start - t_stop n_bins = int(np.ceil(length / dt)) # make dt an exact divisor of the length dt = length / n_bins times = np.arange(t_start, t_stop, dt) sinusoid = pulsed_fraction / 2 * np.sin(np.pi * 2 * times) lc = 1 - pulsed_fraction / 2 + sinusoid counts = lc * nevents / np.sum(lc) return dt, counts
[docs] def scramble( event_list, smooth_kind="flat", dt=None, pulsed_fraction=0.0, deadtime=0.0, orbit_par=None, frequency=1, ): """Scramble event list, GTI by GTI. Parameters ---------- event_list: :class:`stingray.events.Eventlist` object Input event list Other Parameters ---------------- smooth_kind: str in ['flat', 'smooth', 'pulsed'] if 'flat', count the events GTI by GTI without caring about long-term variability; if 'smooth', try to calculate smooth light curve first dt: float If ``smooth_kind`` is 'smooth', bin the light curve with this bin time. Ignored for other values of ``smooth_kind`` pulsed_fraction: float If ``smooth_kind`` is 'pulsed', use this pulse fraction, defined as the 2 A / B, where A is the amplitude of the sinusoid and B the maximum flux. Ignored for other values of ``smooth_kind`` deadtime: float Dead time in the data. orbit_par: str Parameter file for orbital modulation Returns ------- new_event_list: :class:`stingray.events.Eventlist` object "Scrambled" event list Examples -------- >>> times = np.array([0.5, 134, 246, 344, 867]) >>> event_list = EventList( ... times, gti=np.array([[0, 0.9], [111, 123.2], [125.123, 1000]])) >>> new_event_list = scramble(event_list, 'smooth') >>> assert new_event_list.time.size == times.size >>> assert np.all(new_event_list.gti == event_list.gti) >>> new_event_list = scramble(event_list, 'flat') >>> assert new_event_list.time.size == times.size >>> assert np.all(new_event_list.gti == event_list.gti) """ new_event_list = copy.deepcopy(event_list) assert np.all(np.diff(new_event_list.time) > 0) idxs = np.searchsorted(new_event_list.time, new_event_list.gti) if smooth_kind == "pulsed": # Frequency is one, but can be anywhere in the frequency bin (for # sensitivity losses) length = new_event_list.gti.max() - new_event_list.gti.min() df = 0.5 / length frequency += np.random.uniform(-df, df) for (i_start, i_stop), gti_boundary in zip(idxs, new_event_list.gti): locally_flat = False nevents = i_stop - i_start t_start, t_stop = gti_boundary[0], gti_boundary[1] if nevents < 1: continue length = t_stop - t_start if nevents < 10 and smooth_kind == "pulsed": continue if length <= 1: # in very short GTIs, always assume a flat distribution. locally_flat = True if smooth_kind == "flat" or locally_flat: rate = nevents / length input_rate = r_in(deadtime, rate) new_events = np.sort( np.random.uniform( t_start, t_stop, np.rint(nevents * input_rate / rate).astype(int), ) ) new_events = filter_for_deadtime(new_events, deadtime) new_event_list.time[i_start:i_stop] = new_events[: i_stop - i_start] continue elif smooth_kind == "smooth": if dt is None: # Try to have at least 20 counts per bin on average dt = min(length / (nevents / 20), length) # make dt an exact divisor of the length n_bins = int(np.ceil(length / dt)) dt = length / n_bins counts, _ = np.histogram( new_event_list.time[i_start:i_stop], range=[t_start, t_stop], bins=n_bins, ) elif smooth_kind == "pulsed": # dt must be sufficiently small so that the frequency can be # detected with no loss of sensitivity. Moreover, not exactly a # multiple of the frequency, to increase randomness in the # detection sensitivity. I take a random Nyquist frequency between # 10 and 15 times the pulse frequency nyq = np.random.uniform(frequency * 10, frequency * 15) dt = 0.5 / nyq n_bins = int(np.ceil(length / dt)) # make dt an exact divisor of the length dt = length / n_bins times = np.arange(t_start, t_stop, dt) sinusoid = pulsed_fraction / 2 * np.sin(np.pi * 2 * times * frequency) lc = 1 - pulsed_fraction / 2 + sinusoid counts = lc * nevents / np.sum(lc) else: raise ValueError("Unknown value for `smooth_kind`") newev = acceptance_rejection( dt, counts, t0=t_start, poissonize_n_events=False, deadtime=deadtime, ) new_event_list.time[i_start:i_stop] = newev if orbit_par is not None: new_event_list = deorbit_events(new_event_list, orbit_par, invert=True) return new_event_list
[docs] def main_scramble(args=None): """Main function called by the `HENscramble` command line script.""" import argparse from .base import _add_default_args, check_negative_numbers_in_args description = ( "Scramble the events inside an event list, maintaining the same " "energies and GTIs" ) parser = argparse.ArgumentParser(description=description) parser.add_argument( "fname", type=str, default=None, help="File containing input event list", ) parser.add_argument( "--smooth-kind", choices=["smooth", "flat", "pulsed"], help="Special testing value", default="flat", ) parser.add_argument( "--deadtime", type=float, default=0, help="Dead time magnitude. Can be specified as a " "single number, or two. In this last case, the " "second value is used as sigma of the dead time " "distribution", ) parser.add_argument( "--dt", type=float, default=0, help="Time resolution of smoothed light curve", ) parser.add_argument( "--pulsed-fraction", type=float, default=0, help="Pulsed fraction of simulated pulsations", ) parser.add_argument( "-f", "--frequency", type=float, default=1, help="Pulsed fraction of simulated pulsations", ) parser.add_argument( "--seed", type=int, default=None, help="Random seed for reproducibility.", ) parser.add_argument("--outfile", type=str, default=None, help="Output file name") args = check_negative_numbers_in_args(args) _add_default_args(parser, ["deorbit", "energies", "loglevel", "debug"]) args = parser.parse_args(args) if args.debug: args.loglevel = "DEBUG" if args.seed is not None: np.random.seed(args.seed) log.setLevel(args.loglevel) event_list = load_events(args.fname) emin = emax = None if args.energy_interval is not None: emin, emax = args.energy_interval event_list, elabel = filter_energy(event_list, emin, emax) if elabel != "Energy": raise ValueError("You are filtering by energy but the data are not calibrated") new_event_list = scramble( event_list, smooth_kind=args.smooth_kind, dt=args.dt, pulsed_fraction=args.pulsed_fraction, deadtime=args.deadtime, orbit_par=args.deorbit_par, frequency=args.frequency, ) if args.outfile is not None: outfile = args.outfile else: label = "_scramble" if args.smooth_kind == "pulsed": label += f"_pulsed_df{args.pulsed_fraction:g}" elif args.smooth_kind == "smooth": label += f"_smooth_dt{args.dt:g}s" if args.deadtime > 0: label += f"_deadtime_{args.deadtime:g}" if args.energy_interval is not None: label += f"_{emin:g}-{emax:g}keV" outfile = args.fname.replace(HEN_FILE_EXTENSION, f"{label}" + HEN_FILE_EXTENSION) save_events(new_event_list, outfile) return outfile
[docs] def main(args=None): """Main function called by the `HENfake` command line script.""" import argparse from .base import _add_default_args, check_negative_numbers_in_args description = ( "Create an event file in FITS format from an event list, or simulating" " it. If input event list is not specified, generates the events " "randomly" ) parser = argparse.ArgumentParser(description=description) parser.add_argument( "-e", "--event-list", type=str, default=None, help="File containing event list", ) parser.add_argument( "-l", "--lc", type=str, default=None, help="File containing light curve", ) parser.add_argument( "-c", "--ctrate", type=float, default=None, help="Count rate for simulated events", ) parser.add_argument( "-o", "--outname", type=str, default="events.evt", help="Output file name", ) parser.add_argument("-i", "--instrument", type=str, default=None, help="Instrument name") parser.add_argument("-m", "--mission", type=str, default=None, help="Mission name") parser.add_argument( "--tstart", type=float, default=None, help="Start time of the observation (s from MJDREF)", ) parser.add_argument( "--tstop", type=float, default=None, help="End time of the observation (s from MJDREF)", ) parser.add_argument( "--mjdref", type=float, default=55197.00076601852, help="Reference MJD", ) parser.add_argument( "--seed", type=int, default=None, help="Random seed for reproducibility.", ) parser.add_argument( "--deadtime", type=float, default=None, nargs="+", help="Dead time magnitude. Can be specified as a " "single number, or two. In this last case, the " "second value is used as sigma of the dead time " "distribution", ) args = check_negative_numbers_in_args(args) _add_default_args(parser, ["loglevel", "debug"]) args = parser.parse_args(args) if args.debug: args.loglevel = "DEBUG" if args.seed is not None: np.random.seed(args.seed) log.setLevel(args.loglevel) additional_columns = {} livetime = None if args.lc is None and args.ctrate is None and args.event_list is not None: event_list = _read_event_list(args.event_list) elif args.lc is not None or args.ctrate is not None: event_list = EventList() if args.lc is not None: lc = _read_light_curve(args.lc) elif args.ctrate is not None: tstart = assign_value_if_none(args.tstart, 0) tstop = assign_value_if_none(args.tstop, 1024) dt = (tstop - tstart) / 1024 t = np.arange(tstart, tstop + 1, dt) lc = Lightcurve(time=t, counts=args.ctrate * dt + np.zeros_like(t), dt=dt) event_list.simulate_times(lc) nevents = len(event_list.time) event_list.pi = np.zeros(nevents, dtype=int) event_list.mjdref = args.mjdref log.info(f"{nevents} events generated") else: event_list = None if args.deadtime is not None and event_list is not None: deadtime = args.deadtime[0] deadtime_sigma = None if len(args.deadtime) > 1: deadtime_sigma = args.deadtime[1] event_list, info = filter_for_deadtime( event_list, deadtime, dt_sigma=deadtime_sigma, return_all=True ) log.info(f"{len(event_list.time)} events after filter") prior = np.zeros_like(event_list.time) prior[1:] = np.diff(event_list.time) - info.deadtime[:-1] additional_columns["PRIOR"] = {"data": prior, "format": "D"} additional_columns["KIND"] = { "data": info.is_event, "format": "L", } livetime = np.sum(prior) generate_fake_fits_observation( event_list=event_list, filename=args.outname, instr=args.instrument, mission=args.mission, tstart=args.tstart, tstop=args.tstop, mjdref=args.mjdref, livetime=livetime, additional_columns=additional_columns, )