Source code for cdflib.epochs_astropy

"""
##################
CDF Astropy Epochs
##################

@author: Bryan Harter
"""
import datetime
from datetime import timezone
from typing import List, Optional, Tuple, Union

import numpy as np
import numpy.typing as npt
from astropy.time import Time
from astropy.time.formats import TimeFromEpoch, erfa

__all__ = ["CDFAstropy"]


class CDFEpoch(TimeFromEpoch):
    name = "cdf_epoch"
    unit = 1.0 / (erfa.DAYSEC * 1000)  # Milliseconds
    epoch_val = "0000-01-01 00:00:00"
    epoch_val2 = None
    epoch_scale = "utc"
    epoch_format = "iso"


class CDFEpoch16(TimeFromEpoch):
    name = "cdf_epoch16"
    unit = 1.0 / (erfa.DAYSEC)  # Seconds
    epoch_val = "0000-01-01 00:00:00"
    epoch_val2 = None
    epoch_scale = "utc"
    epoch_format = "iso"


class CDFTT2000(TimeFromEpoch):
    name = "cdf_tt2000"
    unit = 1.0 / (erfa.DAYSEC * 1e9)  # Nanoseconds
    epoch_val = "2000-01-01 12:00:00"
    epoch_val2 = None
    epoch_scale = "tt"
    epoch_format = "iso"


[docs] class CDFAstropy: """ Class to encapsulate astropy time routines with CDF class. """ version = 3 release = 7 increment = 0
[docs] @staticmethod def convert_to_astropy(epochs: Union[Time, npt.ArrayLike], format: Optional[str] = None) -> Time: """ Convert CDF epochs to astropy time objects. Returns ------- astropy.time.Time """ # If already in Astropy time Units, do nothing if isinstance(epochs, Time): return epochs # If format is specified, then force it to do that if format is not None: return Time(epochs, format=format, precision=9) epochs = np.array(epochs) # Determine best format for the input type if epochs.dtype == np.int64: return Time(epochs, format="cdf_tt2000", precision=9) elif epochs.dtype == np.complex128: return Time(epochs.real, epochs.imag / 1000000000000.0, format="cdf_epoch16", precision=9) elif epochs.dtype == np.float64: return Time(epochs, format="cdf_epoch", precision=9) else: raise TypeError("Not sure how to handle type {}".format(type(epochs)))
[docs] @staticmethod def encode(epochs: npt.ArrayLike, iso_8601: bool = True) -> npt.NDArray[np.str_]: epochs = CDFAstropy.convert_to_astropy(epochs) if iso_8601: return epochs.iso else: return epochs.strftime("%d-%b-%Y %H:%M:%S.%f")
[docs] @staticmethod def breakdown(epochs: Union[Time, npt.ArrayLike]) -> npt.NDArray: # Returns either a single array, or a array of arrays depending on the input epochs = CDFAstropy.convert_to_astropy(epochs) if epochs.format == "cdf_tt2000": return CDFAstropy.breakdown_tt2000(epochs) elif epochs.format == "cdf_epoch": return CDFAstropy.breakdown_epoch(epochs) elif epochs.format == "cdf_epoch16": return CDFAstropy.breakdown_epoch16(epochs) raise TypeError("Not sure how to handle type {}".format(type(epochs)))
[docs] @staticmethod def to_datetime(cdf_time: npt.ArrayLike) -> Time: cdf_time = CDFAstropy.convert_to_astropy(cdf_time) return cdf_time.datetime
[docs] @staticmethod def unixtime(cdf_time: Union[Time, npt.ArrayLike]) -> npt.NDArray: """ Encodes the epoch(s) into seconds after 1970-01-01. Precision is only kept to the nearest microsecond. """ epochs = CDFAstropy.convert_to_astropy(cdf_time) return epochs.unix
[docs] @staticmethod def compute(datetimes: npt.ArrayLike) -> npt.NDArray: datetimes = np.atleast_2d(datetimes) cdf_time = [] for d in datetimes: unix_seconds = datetime.datetime(d[0], d[1], d[2], d[3], d[4], d[5]).replace(tzinfo=timezone.utc).timestamp() if len(d) == 7: remainder_seconds = d[6] / 1000.0 astrotime = Time(unix_seconds, remainder_seconds, format="unix", precision=9) cdf_time.append(astrotime.cdf_epoch) if len(d) == 9: remainder_seconds = (d[6] / 1000.0) + (d[7] / 1000000.0) + (d[8] / 1000000000.0) astrotime = Time(unix_seconds, remainder_seconds, format="unix", precision=9) cdf_time.append(astrotime.cdf_tt2000) if len(d) == 10: remainder_seconds = (d[6] / 1000.0) + (d[7] / 1000000.0) + (d[8] / 1000000000.0) + (d[9] / 1000000000000.0) astrotime = Time(unix_seconds, remainder_seconds, format="unix", precision=9) cdf_time.append(astrotime.cdf_epoch16) return np.squeeze(cdf_time)
[docs] @staticmethod def findepochrange( epochs: Union[Time, npt.ArrayLike], starttime: Optional[npt.ArrayLike] = None, endtime: Optional[npt.ArrayLike] = None ) -> Tuple[int, int]: if isinstance(starttime, list): start = CDFAstropy.compute(starttime) if isinstance(endtime, list): end = CDFAstropy.compute(endtime) epochs = CDFAstropy.convert_to_astropy(epochs) epochs_as_np = epochs.value indices = np.where((epochs_as_np >= start) & (epochs_as_np <= end)) return min(indices[0]), max(indices[0])
[docs] @staticmethod def breakdown_tt2000(tt2000: Time) -> npt.NDArray: tt2000strings = np.atleast_1d(tt2000.iso) times = [] for t in tt2000strings: date, time = t.split(" ") yyyy, mon, dd = date.split("-") hhmmss, decimal_seconds = time.split(".") decimal_seconds = "." + decimal_seconds hh, mm, ss = hhmmss.split(":") time_as_list = [] time_as_list.append(int(yyyy)) # year time_as_list.append(int(mon)) # month time_as_list.append(int(dd)) # day time_as_list.append(int(hh)) # hour time_as_list.append(int(mm)) # minute time_as_list.append(int(ss)) # second decimal_seconds = float(decimal_seconds) milliseconds = decimal_seconds * 1000 time_as_list.append(int(milliseconds)) # milliseconds microseconds = (milliseconds % 1) * 1000 time_as_list.append(int(microseconds)) # microseconds nanoseconds = (microseconds % 1) * 1000 time_as_list.append(int(nanoseconds)) # microseconds times.append(time_as_list) return np.squeeze(times)
[docs] @staticmethod def breakdown_epoch16(epochs: Time) -> npt.NDArray: epoch16strings = np.atleast_1d(epochs.iso) times = [] for t in epoch16strings: time_as_list: List[int] = [] date, time = t.split(" ") yyyy, mon, dd = date.split("-") hhmmss, decimal_seconds = time.split(".") decimal_seconds = "." + decimal_seconds hh, mm, ss = hhmmss.split(":") time_as_list = [] time_as_list.append(int(yyyy)) # year time_as_list.append(int(mon)) # month time_as_list.append(int(dd)) # day time_as_list.append(int(hh)) # hour time_as_list.append(int(mm)) # minute time_as_list.append(int(ss)) # second decimal_seconds = float(decimal_seconds) milliseconds = decimal_seconds * 1000 time_as_list.append(int(milliseconds)) # milliseconds microseconds = (milliseconds % 1) * 1000 time_as_list.append(int(microseconds)) # microseconds nanoseconds = (microseconds % 1) * 1000 time_as_list.append(int(nanoseconds)) # nanoseconds picoseconds = (nanoseconds % 1) * 1000 time_as_list.append(int(picoseconds)) # picoseconds times.append(time_as_list) return np.squeeze(times)
[docs] @staticmethod def breakdown_epoch(epochs: Time) -> npt.NDArray: epochstrings = np.atleast_1d(epochs.iso) times = [] for t in epochstrings: date, time = t.split(" ") yyyy, mon, dd = date.split("-") hhmmss, decimal_seconds = time.split(".") decimal_seconds = "." + decimal_seconds hh, mm, ss = hhmmss.split(":") time_as_list = [] time_as_list.append(int(yyyy)) # year time_as_list.append(int(mon)) # month time_as_list.append(int(dd)) # day time_as_list.append(int(hh)) # hour time_as_list.append(int(mm)) # minute time_as_list.append(int(ss)) # second decimal_seconds = float(decimal_seconds) milliseconds = decimal_seconds * 1000 time_as_list.append(int(milliseconds)) # milliseconds times.append(time_as_list) return np.squeeze(times)
[docs] @staticmethod def parse(value: npt.ArrayLike) -> npt.NDArray: """ Parses the provided date/time string(s) into CDF epoch value(s). For CDF_EPOCH: 'yyyy-mm-dd hh:mm:ss.xxx' (in iso_8601). The string is the output from encode function. For CDF_EPOCH16: The string has to be in the form of 'yyyy-mm-dd hh:mm:ss.mmmuuunnnppp' (in iso_8601). The string is the output from encode function. For TT2000: The string has to be in the form of 'yyyy-mm-dd hh:mm:ss.mmmuuunnn' (in iso_8601). The string is the output from encode function. """ value = np.atleast_1d(value) time_list = [] for t in value: date, subs = t.split(".") if len(subs) == 3: time_list.append(Time(t, precision=9).cdf_epoch) if len(subs) == 12: time_list.append(Time(t, precision=9).cdf_epoch16) if len(subs) == 9: time_list.append(int(Time(t, precision=9).cdf_tt2000)) return np.squeeze(time_list)