diff --git a/ch_util/ephemeris.py b/ch_util/ephemeris.py index 29c5dd6b..c8203a74 100644 --- a/ch_util/ephemeris.py +++ b/ch_util/ephemeris.py @@ -100,6 +100,7 @@ """ from datetime import datetime +from typing import Union from numpy.core.multiarray import unravel_index # NOTE: Load Skyfield API but be sure to use skyfield_wrapper for loading data @@ -859,6 +860,156 @@ def peak_RA(body, date=None, deg=False): return ra +def get_doppler_shifted_freq( + source: Union[skyfield.starlib.Star, str], + date: Union[float, list], + freq_rest: Union[float, list] = None, + obs: Observer = chime, +) -> np.array: + """Calculate Doppler shifted frequency of spectral feature with rest + frequency `freq_rest`, seen towards source `source` at time `date`, due to + Earth's motion and rotation, following the relativistic Doppler effect. + + Parameters + ---------- + source + Position(s) on the sky. If the input is a `str`, attempt to resolve this + from `ch_util.hfbcat.HFBCatalog`. + date + Unix time(s) for which to calculate Doppler shift. + freq_rest + Rest frequency(ies) in MHz. If None, attempt to obtain rest frequency + of absorption feature from `ch_util.hfbcat.HFBCatalog.freq_abs`. + obs + An Observer instance to use. If not supplied use `chime`. For many + calculations changing from this default will make little difference. + + Returns + ------- + freq_obs + Doppler shifted frequencies in MHz. Array where rows correspond to the + different input rest frequencies and columns correspond either to input + times or to input sky positions (whichever contains multiple entries). + + Notes + ----- + Only one of `source` and `date` can contain multiple entries. + + Example + ------- + To get the Doppler shifted frequencies of a feature with a rest frequency + of 600 MHz for two positions on the sky at a single point in time (Unix + time 1717809534 = 2024-06-07T21:18:54+00:00), run: + + >>> from skyfield.starlib import Star + >>> from skyfield.positionlib import Angle + >>> from ch_util.tools import get_doppler_shifted_freq + >>> coord = Star(ra=Angle(degrees=[100, 110]), dec=Angle(degrees=[45, 50])) + >>> get_doppler_shifted_freq(coord, 1717809534, 600) + """ + + from scipy.constants import c as speed_of_light + + from ch_util.fluxcat import _ensure_list + from ch_util.hfbcat import HFBCatalog + + # For source string inputs, get skyfield Star object from HFB catalog + if isinstance(source, str): + try: + source = HFBCatalog[source].skyfield + except KeyError: + raise ValueError(f"Could not find source {source} in HFB catalog.") + + # Get rest frequency from HFB catalog + if freq_rest is None: + if not source.names or source.names not in HFBCatalog: + raise ValueError( + "Rest frequencies must be supplied unless source can be found " + "in ch_util.hfbcat.HFBCatalog. " + f"Got source {source} with names {source.names}" + ) + else: + freq_rest = HFBCatalog[source.names].freq_abs + + # Prepare rest frequencies for broadcasting + freq_rest = np.asarray(_ensure_list(freq_rest))[:, np.newaxis] + + # Get rate at which the distance between the observer and source changes + # (positive for observer and source moving appart) + range_rate = get_range_rate(source, date, obs) + + # Compute observed frequencies from rest frequencies + # using relativistic Doppler effect + beta = range_rate / speed_of_light + freq_obs = freq_rest * np.sqrt((1.0 - beta) / (1.0 + beta)) + + return freq_obs + + +def get_range_rate( + source: skyfield.starlib.Star, + date: Union[float, list], + obs: Observer = chime, +) -> Union[float, np.array]: + """Calculate rate at which distance between observer and source changes. + + Parameters + ---------- + source + Position(s) on the sky. + date + Unix time(s) for which to calculate range rate. + obs + An Observer instance to use. If not supplied use `chime`. For many + calculations changing from this default will make little difference. + + Returns + ------- + range_rate + Rate (in m/s) at which the distance between the observer and source + changes (i.e., the velocity of observer in direction of source, but + positive for observer and source moving appart). If either `source` + or `date` contains multiple entries, `range_rate` will be an array. + Otherwise, `range_rate` will be a float. + + Notes + ----- + Only one of `source` and `date` can contain multiple entries. + + This routine uses an :class:`skyfield.positionlib.Apparent` object + (rather than an :class:`skyfield.positionlib.Astrometric` object) to find + the velocity of the observatory and the position of the source. This + accounts for the gravitational deflection and the aberration of light. + It is unclear if the latter should be taken into account for this Doppler + shift calculation, but its effects are negligible. + """ + + if hasattr(source.ra._degrees, "__iter__") and hasattr(date, "__iter__"): + raise ValueError( + "Only one of `source` and `date` can contain multiple entries." + ) + + # Convert unix times to skyfield times + date = unix_to_skyfield_time(date) + + # Create skyfield Apparent object of source position seen from observer + position = obs.skyfield_obs().at(date).observe(source).apparent() + + # Observer velocity vector in ICRS xyz coordinates in units of m/s + obs_vel_m_per_s = position.velocity.m_per_s + + # Normalized source position vector in ICRS xyz coordinates + source_pos_m = position.position.m + source_pos_norm = source_pos_m / np.linalg.norm(source_pos_m, axis=0) + + # Dot product of observer velocity and source position gives observer + # velocity in direction of source; flip sign to get range rate (positive + # for observer and source moving appart) + range_rate = -np.sum(obs_vel_m_per_s.T * source_pos_norm.T, axis=-1) + + return range_rate + + def get_source_dictionary(*args): """Returns a dictionary containing :class:`skyfield.starlib.Star` objects for common radio point sources. This is useful for