Skip to content

Commit

Permalink
feat(tools): add tools for handling frequency maps.
Browse files Browse the repository at this point in the history
Adds a tool to match the shuffle, crate, slot, link values to each
frequecy bin based on a frequency map. Also add a function to get a
default stream.
  • Loading branch information
ljgray committed Mar 3, 2023
1 parent 81afcc8 commit ba31ac1
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions ch_util/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,79 @@ def get_crate_channel(slot, sma):
return default


def get_default_frequency_map_stream() -> tuple[np.ndarray, np.ndarray]:
"""Get the default CHIME frequency map stream.
Level order is [shuffle, crate, slot, link].
Returns
-------
stream
[shuffle, crate, slot, link] for each frequency bin
stream_id
stream_id for each map combination
shuffle*2**12 + crate*2**8 + slot*2**4 + link
"""
stream = np.empty((1024, 4), dtype=np.int32)

# shuffle
stream[:, 0] = 3
# crate
stream[:, 1] = np.tile(np.arange(2).repeat(16), 32)
# slot
stream[:, 2] = np.tile(np.arange(16), 64)
# link
stream[:, 3] = np.tile(np.arange(8).repeat(32), 4)

stream_id = (
stream[:, 0] * 2**12
+ stream[:, 1] * 2**12
+ stream[:, 2] * 2**4
+ stream[:, 3]
).astype(np.int64)

return stream, stream_id


def order_frequency_map_stream(fmap: np.ndarray, stream_id: np.ndarray) -> np.ndarray:
"""Order stream_id components based on a frequency map.
Level order is [shuffle, crate, slot, link]
Parameters
----------
fmap
frequency map
stream_id
1-D array of stream_ids associated with each row in fmap
Returns
-------
stream
shuffle, crate, slot, link for each frequency
"""

def decode_stream_id(sid: int) -> tuple[int]:
link = sid & 15
slot = (sid >> 4) & 15
crate = (sid >> 8) & 15
shuffle = (sid >> 12) & 15

return (shuffle, crate, slot, link)

decoded_stream = [decode_stream_id(i) for i in stream_id[:]]
x = [[] for _ in range(len(stream_id))]

for ii, freqs in enumerate(fmap):
for f in freqs:
x[f].append(decoded_stream[ii])

# TODO: maybe implement some checks here
stream = np.array([i[0] for i in x], dtype=np.int32)

return stream


def get_correlator_inputs(lay_time, correlator=None, connect=True):
"""Get the information for all channels in a layout.
Expand Down

0 comments on commit ba31ac1

Please sign in to comment.