diff --git a/ch_util/tools.py b/ch_util/tools.py index 79ccdea8..6cd6252f 100644 --- a/ch_util/tools.py +++ b/ch_util/tools.py @@ -121,6 +121,8 @@ - :py:meth:`invert_no_zero` """ +from typing import Tuple + import datetime import numpy as np import scipy.linalg as la @@ -1229,6 +1231,79 @@ def get_crate_channel(slot, sma): return default +def get_default_frequency_map_stream() -> Tuple[np.ndarray, np.ndarray]: + """Get the default CHIME frequency map stream. + + Level order is [shuffle, crate, slot, link]. + + Returns + ------- + stream : np.ndarray (np.int32) + [shuffle, crate, slot, link] for each frequency bin + stream_id : np.ndarray (np.int64) + stream_id for each map combination + shuffle*2**12 + crate*2**8 + slot*2**4 + link + """ + stream = np.empty((1024, 4), dtype=np.int32) + + # shuffle + stream[:, 0] = 3 + # crate + stream[:, 1] = np.tile(np.arange(2).repeat(16), 32) + # slot + stream[:, 2] = np.tile(np.arange(16), 64) + # link + stream[:, 3] = np.tile(np.arange(8).repeat(32), 4) + + stream_id = ( + stream[:, 0] * 2**12 + + stream[:, 1] * 2**12 + + stream[:, 2] * 2**4 + + stream[:, 3] + ) + + return stream, stream_id.astype(np.int64) + + +def order_frequency_map_stream(fmap: np.ndarray, stream_id: np.ndarray) -> np.ndarray: + """Order stream_id components based on a frequency map. + + Level order is [shuffle, crate, slot, link] + + Parameters + ---------- + fmap : np.ndarray[int] + frequency map + stream_id : np.ndarray[int] + 1-D array of stream_ids associated with each row in fmap + + Returns + ------- + stream : np.ndarray[nfreq, 4] + shuffle, crate, slot, link for each frequency + """ + + def decode_stream_id(sid) -> tuple[int]: + link = sid & 15 + slot = (sid >> 4) & 15 + crate = (sid >> 8) & 15 + shuffle = (sid >> 12) & 15 + + return (shuffle, crate, slot, link) + + decoded_stream = [decode_stream_id(i) for i in stream_id[:]] + x = [[] for _ in range(len(stream_id))] + + for ii, freqs in enumerate(fmap): + for f in freqs: + x[f].append(decoded_stream[ii]) + + # TODO: maybe implement some checks here + stream = np.array([i[0] for i in x], dtype=np.int32) + + return stream + + def get_correlator_inputs(lay_time, correlator=None, connect=True): """Get the information for all channels in a layout.