Created
June 21, 2022 05:02
-
-
Save telegraphic/b7243becf9ffccb9dc3702298f55ea8e to your computer and use it in GitHub Desktop.
Gist for controlling multiple snap boards through `casperfpga` and multiprocessing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from utmost2d_snap import UtmostSnap | |
from rpi_bf_control import RpiBeamformerController | |
import time | |
from multiprocessing import JoinableQueue | |
from threading import Thread | |
class BoardManager(object): | |
""" Base class for multithreaded control of multiple boards | |
Args: | |
board_cfg_list (list): List of board config files, or list of IP addresses, | |
or other string that identifies board IDs. | |
board_controller (fn): A board controller function, such as the UtmostSnap | |
or RpiBeamformerController. | |
Notes: | |
The main method is _run_on_all(), which will run a given method from the | |
controller class on all boards. For example, if the class had a method | |
self.program(fpg_filename), you could call _run_on_all('program', fpg_filename) | |
The board_controller must have an attribute self.host, which is used to | |
start a unique thread and parse relevant dictionaries used in control. | |
""" | |
def __init__(self, board_cfg_list, board_controller): | |
self.controller = board_controller | |
self.boards = [self.controller(cfg) for cfg in board_cfg_list] | |
self.task_queue = JoinableQueue() | |
def __repr__(self): | |
rstr = f"<BoardManager - controller: {self.controller}>" | |
for brd in self.boards: | |
rstr += "\n %s" % str(brd) | |
return rstr | |
def _run(self, q, proc_id, fn_to_run, *args, **kwargs): | |
""" Multithreaded run command | |
Args: | |
q (JoinableQueue): Queue used for multithreading | |
proc_id (str): Unique name for process ID | |
fn_to_run (fn): Python method to run | |
args: list of arguments to pass to fn_to_run | |
""" | |
return_val = fn_to_run(*args, **kwargs) | |
q.put([proc_id, return_val]) | |
q.task_done() | |
def _run_on_all(self, fn_to_run, *args, **kwargs): | |
""" Run a command in parallel across all boards | |
Args: | |
fn_to_run (fn): Python method to run (eg from board controller) | |
args: arguments to pass to fn_to_run | |
kwargs: keyword arguments to pass to fn_to_run | |
Notes: | |
If different values need to be passed to different boards, you can | |
pass a python dictionary, e.g. | |
`regvalue = {'host1': regval1, 'host2': regval2}` | |
""" | |
q = JoinableQueue() | |
for s in self.boards: | |
s_name = s.host | |
try: | |
method = getattr(s, fn_to_run) | |
except AttributeError: | |
raise RuntimeError("Cannot find method %s" % fn_to_run) | |
# Setup arguments and keyword args | |
all_args = [q, s_name, method] | |
if kwargs is None: | |
kwargs = {} | |
if args is not None: | |
for aa in args: | |
if isinstance(aa, dict): | |
all_args.append(aa[s_name]) | |
else: | |
all_args.append(aa) | |
t = Thread(target=self._run, | |
name=s_name, | |
args=all_args, | |
kwargs=kwargs) | |
t.daemon = True | |
t.start() | |
q.join() | |
# Iterate through queue and get output | |
outdict = {} | |
for ii in range(0, len(self.boards)): | |
d_key, d_out = q.get() | |
outdict[d_key] = d_out | |
return outdict | |
class UtmostSnapManager(BoardManager): | |
""" Manager for multiple UtmostSnap boards | |
Args: | |
snapdict (dict): Dictionary of SNAP board configs, as set in config.py | |
Notes: | |
snapdict be of the form: | |
``` | |
snapdict = {0: {'ip': snap0_ip, 'config': snap0_config}, | |
1: {'ip': snap1_ip, 'config': snap1_config}, | |
...} | |
``` | |
""" | |
def __init__(self, snapdict): | |
board_controller = UtmostSnap | |
blist = [snapdict[ii]['config'] for ii in snapdict.keys()] | |
super(UtmostSnapManager, self).__init__(blist, board_controller) | |
self._run_on_all('get_system_information') | |
def configure_coarse_delays(self, delay_dict, wait_for_pps=True): | |
""" Configure coarse delays across boards | |
Runs the UtmostSnap.configure_coarse_delays() method, applying | |
given delays for each board. | |
Args: | |
delay_dict (dict): Dictionaries of delay values | |
wait_for_pps (bool): Wait for PPS or apply immediately? (Default True) | |
""" | |
if wait_for_pps: | |
self.wait_for_pps() | |
vv = {b.host: False for b in self.boards} | |
self._run_on_all('configure_coarse_delays', delay_dict, vv) | |
def read_coarse_delays(self): | |
""" Read coarse delays from SNAP registers | |
Returns: dict of delays per board | |
""" | |
return self._run_on_all('read_coarse_delays') | |
def wait_for_pps(self, verbose=False): | |
""" Wait for a PPS | |
Connects to board[0] and waits for a PPS, with 1ms waits between | |
register reads | |
""" | |
if verbose: | |
print("Waiting for next PPS..."), | |
# Wait for a PPS tick | |
ppsC = self.boards[0].read_int('pps_count') | |
while ppsC == self.boards[0].read_int('pps_count'): | |
time.sleep(0.001) | |
if verbose: | |
print('OK.') | |
class RpiBeamformerManager(BoardManager): | |
""" Manager for multipel RpiBeamformer boards | |
Args: | |
bfdict (dict): Dictionary of RPI Beamformer configs, as set in config.py | |
Notes: | |
bfdict should be of the form: | |
``` | |
bfdict = { | |
'M1' : {'ip': '172.17.228.209', 'active_addrs': [0, 5], 'active_pols': ['hp', 'vp']}, | |
'NO' : {'ip': '172.17.228.210', 'active_addrs': [1], 'active_pols': ['hp', 'vp']}, | |
...} | |
``` | |
""" | |
def __init__(self, bfdict): | |
self.cfg = {b['ip']: b for b in bfdict.values()} | |
board_controller = RpiBeamformerController | |
blist = [bfdict[k]['ip'] for k in bfdict.keys()] | |
super(RpiBeamformerManager, self).__init__(blist, board_controller) | |
def point(self, angle, verbose=False): | |
""" Point all boards towards given angle | |
Runs RpiBeamformerController.point_multi() method on all boards | |
Args: | |
angle (int): Value between -60 to 60 degrees. | |
print_cmd (bool): Print commands to screen | |
""" | |
addr_dict = {b: self.cfg[b]['active_addrs'] for b in self.cfg.keys()} | |
pol_dict = {b: self.cfg[b]['active_pols'] for b in self.cfg.keys()} | |
angle_dict = {b: angle for b in self.cfg.keys()} | |
print_dict = {b: verbose for b in self.cfg.keys()} | |
self._run_on_all('point_multi', addr_dict, pol_dict, angle_dict, print_dict) | |
if __name__ == "__main__": | |
import config | |
import redis | |
import time | |
r = redis.Redis(config.redis_db_ip) | |
rm = RpiBeamformerManager(config.bfdict) | |
rm.point(30, print_cmd=True) | |
# Pause any SNAP monitors by setting redis flag | |
print("Pausing data monitor..."), | |
r.set('pause_data_monitor', 'T') | |
print("OK") | |
sm = UtmostSnapManager(config.snapdict) | |
#sm.wait_for_pps() | |
t0 = time.time() | |
test_delays = {'172.17.228.248': [3] * 12, | |
'172.17.228.242': [4] * 12, | |
'172.17.228.244': [5] * 12} | |
sm.configure_coarse_delays(test_delays) | |
t1 = time.time() - t0 | |
print(f"Time elapsed: {t1:2.2f}s") | |
r.set('pause_data_monitor', 'F') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
# rpi_bf_control.py | |
Control raspberry Pi beamformers | |
""" | |
import sys, redis, time, os | |
import config | |
import subprocess | |
class RpiBeamformerController(object): | |
""" Controller class for Raspberry Pi powered beamformer boards """ | |
def __init__(self, ip): | |
self.ip = ip | |
self.host = ip # Used in multiboard controller | |
def _run_cmd(self, command, *args, print_cmd=False): | |
sshcmd = 'ssh -i ~/.ssh/id_rsa_pi' | |
ctlscript = 'sudo /mnt/utmost2d/beam_control_local/beam_bidir_v1/RPI_BIDIRECTIONAL_V1.py' | |
argstr = ' '.join(map(str, args)) | |
bf_cmd = '{ssh} pi@{ip} {ctlscript} --{cmd} {args}'.format(ssh=sshcmd, ip=self.ip, ctlscript=ctlscript, | |
cmd=command, args=argstr) | |
if print_cmd: | |
print(bf_cmd) | |
sp = subprocess.Popen(bf_cmd, shell=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
universal_newlines=True) | |
out,err = sp.communicate() | |
rc = sp.wait() | |
time.sleep(1) | |
if print_cmd: | |
print(out) | |
def __repr__(self): | |
rstr = f"<RpiBeamformerController: {self.host}>" | |
return rstr | |
def _check_addr_pol(self, addr, pol, zero_ok=True): | |
""" Check address and pol are valid """ | |
addr_range = range(0, 7) if zero_ok else range(1, 7) | |
if addr not in addr_range: | |
raise RuntimeError("Address must be between %i--6" % addr_range[0]) | |
if pol.lower() not in ('hp', 'vp'): | |
raise RuntimeError("Pol must be 'vp' or 'hp'") | |
def point(self, addr, pol, angle, print_cmd=False): | |
""" Point the beamformer | |
Args: | |
addr (int): Address 1--6 for single BF pointing or 0 for broadcasting to all | |
pol (str): Either 'hp' or 'vp' | |
angle (int): Angle between -60 to 60 degrees | |
print_cmd (bool): Print the runtime command to screen | |
""" | |
self._check_addr_pol(addr, pol) | |
if not (-60 <= angle <= 60): | |
raise RuntimeError("Angle must be between -60 and 60") | |
self._run_cmd('point', addr, pol, round(angle), print_cmd=print_cmd) | |
def point_multi(self, addr_list, pol_list, angle, print_cmd=False): | |
""" Point multiple pols for multiple addresses to given angle | |
Args: | |
addr_list (list): List of addresses | |
pol_list (list): List of pols (must be list even if ['vp']) | |
angle (int): Angle between -60 to 60 degrees | |
print_cmd (bool): Print runtime command to screen | |
""" | |
for addr in addr_list: | |
for pol in pol_list: | |
self.point(addr, pol, angle, print_cmd=print_cmd) | |
def lstate(self, addr, pol, state, print_cmd=False): | |
""" control the state of the LNAs | |
Args: | |
addr (int): Address 1--6 for single BF pointing or 0 for broadcasting to all | |
pol (str): Either 'hp' or 'vp' | |
state (str or int): A LNA state setting, either 'on', 'off' or 8-bit word of 0's and 1's | |
print_cmd (bool): Print the runtime command to screen | |
""" | |
self._check_addr_pol(addr, pol) | |
self._run_cmd('lstate', addr, pol, state.lower(), print_cmd=print_cmd) | |
def query(self, addr, pol, print_cmd=False): | |
""" Query current switch settings | |
Args: | |
addr (int): Address 1--6 for single BF pointing | |
pol (str): Either 'hp' or 'vp' | |
print_cmd (bool): Print the runtime command to screen | |
""" | |
self._check_addr_pol(addr, pol, zero_ok=False) | |
self._run_cmd('query', addr, pol, print_cmd=print_cmd) | |
def cur(self, addr, pol, print_cmd=False): | |
""" Query the current | |
Args: | |
addr (int): Address 1--6 for single BF pointing | |
pol (str): Either 'hp' or 'vp' | |
print_cmd (bool): Print the runtime command to screen | |
""" | |
self._check_addr_pol(addr, pol, zero_ok=False) | |
self._run_cmd('cur', addr, pol, print_cmd=print_cmd) | |
def ver(self, addr, pol, print_cmd=False): | |
""" Query BF software version | |
Args: | |
addr (int): Address 1--6 for single BF pointing | |
pol (str): Either 'hp' or 'vp' | |
print_cmd (bool): Print the runtime command to screen | |
""" | |
self._check_addr_pol(addr, pol, zero_ok=False) | |
self._run_cmd('ver', addr, pol, print_cmd=print_cmd) | |
def out(self, addr, pol, state, print_cmd=False): | |
""" turn the output amplifier on or off | |
Args: | |
addr (int): Address 1--6 for single BF pointing | |
pol (str): Either 'hp' or 'vp' | |
print_cmd (bool): Print the runtime command to screen | |
""" | |
self._check_addr_pol(add, pol) | |
self._run_cmd('out', addr, pol, state, print_cmd=print_cmd) | |
def sstate(self, state, print_cmd=False): | |
""" Put modules to sleep or wake them | |
Args: | |
state (str): Either 'on' or 'off' | |
print_cmd (bool): Print the runtime command to screen | |
""" | |
self._run_cmd('out', state.lower(), print_cmd=print_cmd) | |
if __name__ == "__main__": | |
import argparse | |
p = argparse.ArgumentParser(description='RPi beamformer controller', prefix_chars='@') | |
p.add_argument('ip', type=str, help='IP address of beamformer') | |
args = p.parse_args() | |
bf = RpiBeamformerController(args.ip) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
# utmost2d_snap.py | |
Monitor and control class for SNAP boards in UTMOST-2D project. | |
Provides a class, UtmostSnap, which wraps the CasperFpga class, adding | |
UTMOST-specific features for control and configuration. | |
Usage: | |
from utmost2d_snap import UtmostSnap | |
snap = UtmostSnap('../path/to/config.txt') | |
snap.program() | |
d_adc = snap.grab_adc_samples() | |
d_spec = snap.grab_spectra() | |
""" | |
import casperfpga | |
from casperfpga import Mac, IpAddress | |
import numpy as np | |
import struct | |
import time | |
import sys | |
import os | |
from config_parser import read_config_file, validate_config | |
class UtmostSnap(casperfpga.CasperFpga): | |
""" Python control class for UTMOST-2D SNAP board """ | |
def __init__(self, config_file, katcp_port=7147, validate=True): | |
paramdict = read_config_file(config_file, validate=validate) | |
# Basic setup | |
self.ADC_CLK = float(paramdict['ADC_CLK']) # ADC Clock frequency, in MHz | |
self.ADC_GAIN = paramdict['ADC_GAIN'] # Digital gain on ADC | |
self.PI_IP = paramdict['PI_IP'] # string, IP of host raspberry pi | |
self.BOFFILE = paramdict['BOFFILE'] # string, name of bof | |
self.SNAPID = paramdict['SNAPID'] # 8 bit integer ID for board | |
# Ethernet | |
self.SNAPMAC1 = Mac(paramdict['SNAPMAC1']).mac_int # MAC address for GbE1 | |
self.SNAPMAC2 = Mac(paramdict['SNAPMAC2']).mac_int # MAC address for GbE2 | |
self.SNAPIP1 = IpAddress(paramdict['SNAPIP1']).ip_int # IP address for GbE1 | |
self.SNAPIP2 = IpAddress(paramdict['SNAPIP2']).ip_int # IP address for GbE2 | |
self.SNAPPORT1 = paramdict['SNAPPORT1'] # integer, port number for GbE1 | |
self.SNAPPORT2 = paramdict['SNAPPORT2'] # integer, port number for GbE2 | |
self.DESTMACS1 = paramdict['DESTMACS1'] # 256x MACs for GbE1 ARP table | |
self.DESTMACS2 = paramdict['DESTMACS2'] # 256x MACs for GbE2 ARP table | |
self.DESTIPS1 = paramdict['DESTIPS1'] # 10x dest IPs for each subband | |
self.DESTIPS2 = paramdict['DESTIPS2'] # 10x dest IPs for each subband | |
self.DESTPORTS1 = paramdict['DESTPORTS1'] # List of 10 destination IP ports | |
self.DESTPORTS2 = paramdict['DESTPORTS2'] # List of 10 destination IP ports | |
# Convert to integers | |
self.DESTMACS1 = [Mac(m).mac_int for m in self.DESTMACS1] | |
self.DESTMACS2 = [Mac(m).mac_int for m in self.DESTMACS2] | |
# Channel selection | |
self.START_CHANNEL_1 = paramdict['START_CHANNEL_1'] # Lowest chan for 320 channels on GbE1 | |
self.START_CHANNEL_2 = paramdict['START_CHANNEL_2'] # Lowest chan for 320 channels on GbE2 | |
# Coarse delay and F-engine | |
self.COARSE_DELAYS = paramdict['COARSE_DELAYS'] # Coarse delay (clk cycles) ADC0-11 | |
self.PFB_FFTSHIFT = paramdict['FFTSHIFT'] | |
self.PFB_GAIN = paramdict['GAIN'] | |
self.FLIP_BAND = paramdict['FLIP_BAND'] | |
# Setup frequency values (currently hardcoded Nyquist zone) | |
NYQ_ZONE = 8 | |
self.freqs = np.arange(0, 1024) * self.ADC_CLK / 2 / 1024 + self.ADC_CLK / 2 * NYQ_ZONE | |
# Mapping of antenna ID to stream ID for spectrometer | |
self._spec_stream_map = { | |
0: [0, 0], | |
2: [0, 1024], | |
1: [1, 0], | |
3: [1, 1024], | |
4: [2, 0], | |
6: [2, 1024], | |
5: [3, 0], | |
7: [3, 1024], | |
8: [4, 0], | |
10: [4, 1024], | |
9: [5, 0], | |
11: [5, 1024] | |
} | |
self._spec_ant_id_map = { | |
0: [0, 2], | |
1: [1, 3], | |
2: [4, 6], | |
3: [5, 7], | |
4: [8, 10], | |
5: [9, 11] | |
} | |
super(UtmostSnap, self).__init__(self.PI_IP, katcp_port) | |
def __repr__(self): | |
return "<UtmostSnap IP:{ip} CLK:{clk}MHz FPG:{fpg}>".format(ip=self.PI_IP, clk=self.ADC_CLK, | |
fpg=os.path.basename(self.BOFFILE)) | |
def program(self, fpg=None): | |
""" Program SNAP board with firmware and calibrate ADCS | |
Args: | |
fpg (str): Name of fpg file to upload and program. Defaults to None. | |
(Will use fpg from config file as default). | |
""" | |
name_of_bof = self.BOFFILE | |
#if name_of_bof in r.listbof(): | |
print("Upload and program %s..." % name_of_bof) | |
self.upload_to_ram_and_program(name_of_bof) | |
#r.adc.initialize(chips='all', demux_mode=1, gain=0b0110) | |
print("Initialize ADCs...") | |
adc = self.adcs.adc_snap_adc | |
is_cal = adc.init(sample_rate=self.ADC_CLK, num_channel=4) | |
if is_cal == 0: | |
print('ADC calibrated') | |
else: | |
print('ADC CALIBRATION ERROR') | |
exit() | |
print("\tADC digital gain: %i" % self.ADC_GAIN) | |
adc.set_gain([self.ADC_GAIN]*4, use_linear_step=True) | |
print("Checking ethernet status..."), | |
time.sleep(1) | |
status1 = self.read_int('eth_gbe1_status') | |
status2 = self.read_int('eth_gbe2_status') | |
if (status1 == 4): # and (status2 == 4): | |
print('Ready to configure ethernet and packetizer.') | |
elif status1 != 4: | |
print("There's a problem, ethernet1 status register = ", self.read_int('eth_gbe1_status')) | |
if status1 < 4: | |
print("Status msb = 0 means that the ethernet link is not up.") | |
print("ERROR programming not fully successful") | |
else: | |
print("Programmed OK.") | |
def configure_ethernet(self): | |
""" Configure 10 GbE Ethernet cores | |
Ethernet settings are read from the config_file. | |
""" | |
print('Configuring 10Gb ethernet...') | |
# set SNAP board identifier | |
self.write_int('eth_gbe1_header_id', self.SNAPID) | |
self.write_int('eth_gbe2_header_id', self.SNAPID) | |
self.gbes.eth_gbe1.setup(mac=self.SNAPMAC1, ipaddress=self.SNAPIP1, port=self.SNAPPORT1, | |
gateway=1, subnet_mask='255.255.255.0') | |
self.gbes.eth_gbe2.setup(mac=self.SNAPMAC2, ipaddress=self.SNAPIP2, port=self.SNAPPORT2, | |
gateway=1, subnet_mask='255.255.255.0') | |
self.gbes.eth_gbe1.set_arp_table(self.DESTMACS1) | |
self.gbes.eth_gbe2.set_arp_table(self.DESTMACS2) | |
self.gbes.eth_gbe1.configure_core() | |
self.gbes.eth_gbe2.configure_core() | |
print('Configuring destination IP addresses and ports...') | |
for I in range(len(self.DESTIPS1)): | |
print('\tGBE1-%i %s:%i' % (I, self.DESTIPS1[I], self.DESTPORTS1[I])) | |
self.write_int('eth_gbe1_dest_ip%i' % I, IpAddress(self.DESTIPS1[I]).ip_int) | |
self.write_int('eth_gbe1_dest_port%i' % I, self.DESTPORTS1[I]) | |
for I in range(len(self.DESTIPS2)): | |
print('\tGBE2-%i %s:%i' % (I, self.DESTIPS2[I], self.DESTPORTS2[I])) | |
self.write_int('eth_gbe2_dest_ip%i' % I, IpAddress(self.DESTIPS2[I]).ip_int) | |
self.write_int('eth_gbe2_dest_port%i' % I, self.DESTPORTS2[I]) | |
def reset_and_sync(self, use_pps=True): | |
""" reset ethernet block and send sync pulse | |
Args: | |
use_pps (bool): Use the pulse-per-second to arm sync pulse. If false, a | |
'fake' PPS signal will be sent manually. | |
""" | |
if use_pps: | |
print("Checking PPS status..."), | |
self.write_int('software_start', 0) | |
self.write_int('pps_counter_rst', 0) | |
self.write_int('pps_counter_rst', 1) | |
self.write_int('pps_counter_rst', 0) | |
# Wait until next unix tick | |
t0 = time.time() | |
td = t0 - int(t0) | |
time.sleep(1 - td + 0.1) | |
# Read PPS | |
pps0 = self.read_int('pps_count') | |
time.sleep(1.01) | |
pps1 = self.read_int('pps_count') | |
if pps1 - pps0 != 1: | |
print("\nError: PPS is not being receieved each second!") | |
print("Error: PPS0: {} PPS1: {}".format(pps0, pps1)) | |
else: | |
print("OK.") | |
print("Waiting for next PPS..."), | |
while pps1 == self.read_int('pps_count'): | |
time.sleep(0.01) | |
print("OK.") | |
pps_start = self.read_int('pps_count') + 1 | |
unix_start = int(time.time()) + 1 | |
self.write_int('software_start', 1) | |
time.sleep(1.01) | |
self.write_int('software_start', 0) | |
self.write_int('eth_tx_enable', 1) | |
print("Starting on PPS {}. Unix start {}".format(pps_start, unix_start)) | |
else: | |
print('Sending manual pulse to reset ethernet and sync...') | |
self.write_int('force_new_sync',1) | |
time.sleep(0.1) | |
self.write_int('force_new_sync',0) | |
time.sleep(1.01) | |
self.write_int('eth_tx_enable', 1) | |
time.sleep(1.01) | |
def check_ethernet_status(self): | |
""" Read ethernet status registers | |
If value == 4 everything is good and link is up. | |
If value < 4 the physical link isn't up. | |
""" | |
print('Ethernet1 status = %i' % self.read_int('eth_gbe1_status')) | |
print('Ethernet2 status = %i' % self.read_int('eth_gbe2_status')) | |
def configure_coarse_delays(self, coarse_delays=None, verbose=True): | |
""" Apply coarse delays to ADC inputs. | |
If no arguments supplied, will use values from config file. | |
Args: | |
coarse_delays (list/array): Integer delays to apply to each input. | |
Should be list or array of length 12. | |
""" | |
if coarse_delays is not None: | |
try: | |
assert len(coarse_delays) == 12 | |
self.COARSE_DELAYS = coarse_delays | |
except AssertionError: | |
raise RuntimeError("Coarse delays must be of length 12") | |
if verbose: | |
print('Configuring coarse delays.') | |
for ii in range(12): | |
if verbose: | |
print('\tADC %i: %i' % (ii, self.COARSE_DELAYS[ii])) | |
self.write_int('adc_delay%i' % ii, self.COARSE_DELAYS[ii]) | |
def read_coarse_delays(self): | |
""" Read current coarse delays | |
Returns: | |
coarse_delays (list/array): Integer delays to apply to each input, list of length 12. | |
""" | |
coarse_delays = np.zeros(12, dtype='int32') | |
for ii in range(12): | |
coarse_delays[ii] = self.read_int('adc_delay%i' % ii) | |
return coarse_delays | |
def configure_fengine(self, fft_shift=None, rq_gain=None): | |
""" Configure the F-engine | |
Applies FFT shift, requantization gain, and sets up data reorder | |
for packetization. | |
If no arguments supplied, will use settings from config file | |
Args: | |
fft_shift (int): Shift schedule to apply (see docs). | |
rq_gain (int): Gain value to use in requantization (see docs). | |
""" | |
print("Configuring F-engine...") | |
self.write_int('pfb_fftshift', self.PFB_FFTSHIFT) | |
self.write_int('pfb_gain', self.PFB_GAIN) | |
self.write_int('band_flip', self.FLIP_BAND) | |
print("\tFFT shift schedule: %s" % bin(self.PFB_FFTSHIFT)) | |
print("\tRequantization gain: %s" % bin(self.PFB_GAIN)) | |
print("Configuring reorder map...") | |
#set reorder map | |
T = 8 #number of spectra per packet | |
Bt = 32 #TOTAL number of subbands in spectrum | |
B = 10 #number of subbands TO SEND | |
F = 32 #number of frequency channels per subband | |
M = 2 #number of modules per fft output stream | |
reorder_length = T*Bt*F*M; | |
# start = 330; #lowest frequency channel number to send | |
#start1 = self.START_CHANNEL_1 + 2 | |
#start2 = self.START_CHANNEL_2 + 2 | |
start1 = self.START_CHANNEL_1 | |
start2 = self.START_CHANNEL_2 | |
I = np.ones(reorder_length); | |
reorder_length = T*Bt*F*M; | |
I = np.ones(reorder_length) | |
j=0; | |
for b in range(B): | |
for t in range(T): | |
for f in range(F): | |
for m in range(M): | |
i1 = Bt*F*M*t + F*b +f + Bt*F*m + start1; | |
i2 = Bt*F*M*t + F*b +f + Bt*F*m + start2; | |
#p = m+f+b+t; This was useful for debugging. | |
I[2*j] = i1; | |
I[2*j +1] = i2; | |
j +=1; | |
j +=128; #defines gap between packets (gap between one set of subbands and the next) | |
reordermap = I.astype(int) | |
s = struct.Struct('>16384H') | |
self.write('reorder_reorder1_map1',s.pack(*reordermap)) | |
time.sleep(0.1) | |
def grab_adc_samples(self): | |
""" Read raw ADC samples | |
Returns: | |
Dictionary of lists, each list 4096 samples long. Keys are 'a0' through 'a11'. | |
data = { 'a0': [sample0, sample1, ... sample4095], | |
'a1': [sample0, ...], | |
'a11': ... } | |
""" | |
ss = self.devices['adc_samples_ss'] | |
d = ss.read(man_trig=True, man_valid=True) | |
d = d['data'] | |
for k, v in d.items(): | |
d[k] = np.array(v) | |
return d | |
def _grab_spectra_stream(self, stream=0): | |
""" Grab spectra for given input stream | |
Two antpols on each stream stored in BRAM, use grab_spectrum(ant_id). | |
Args: | |
stream (int): Integer between 0-6. Two antpols on each stream. | |
Returns: np.array of length 2048, dtype='uint64'. | |
""" | |
d = np.fromstring(self.read('spec_bram%i' % stream, 2048 * 8), dtype='uint64').byteswap() | |
return d | |
def grab_spectrum(self, ant_id): | |
""" Grab spectrum for given antpol input (0-11). | |
Args: | |
ant_id (int): Antpol ID (0-11) | |
Returns: np.array of length 1024, dtype='float32' | |
""" | |
stream_id, steam_offset = self._spec_stream_map[ant_id] | |
d_stream = self._grab_spectra_stream(stream_id).astype('float32') | |
return d_stream[stream_offset:stream_offset+1024] / 2**32 | |
def grab_spectra(self): | |
""" Grab spectra for all 12 inputs from on-board spectrometer. | |
Returns: python dict {'a0': np.array([chan0, .. chan1023], dtype='float32'), ... | |
'a11': np.array([chan0, .. chan1023], dtype='float32)} | |
""" | |
d = {} | |
for stream, (ant0, ant1) in self._spec_ant_id_map.items(): | |
dstream = np.fromstring(self.read('spec_bram%i' % stream, 2048 * 8), dtype='uint64').byteswap() | |
d['a%i' % ant0] = dstream[:1024].astype('float32') / 2**32 | |
d['a%i' % ant1] = dstream[1024:].astype('float32') / 2**32 | |
return d |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment