Skip to content

Communication with the RFSoC

Python class that implements all functionality for the communication between a host computer (client) and the RFSoC (server) connected through Ethernet to it.

Configures the antenna front end (Sivers EVK) parameters. The antenna front end is responsible for upconverting the frequency, beamforming and providing the physical interface with the air.

Implements the client side (host computer) functionality of the TCP connection.

Most of the methods of this class were developed by Panagiotis Skrimponis. They were integrated in a class and extended by Julian D. Villegas G.

Source code in a2gmeasurements.py
3472
3473
3474
3475
3476
3477
3478
3479
3480
3481
3482
3483
3484
3485
3486
3487
3488
3489
3490
3491
3492
3493
3494
3495
3496
3497
3498
3499
3500
3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522
3523
3524
3525
3526
3527
3528
3529
3530
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
3548
3549
3550
3551
3552
3553
3554
3555
3556
3557
3558
3559
3560
3561
3562
3563
3564
3565
3566
3567
3568
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
3599
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3702
3703
3704
3705
3706
3707
3708
3709
3710
3711
3712
3713
3714
3715
3716
3717
3718
3719
3720
3721
3722
3723
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774
3775
3776
3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
class RFSoCRemoteControlFromHost():
    """
    Python class that implements all functionality for the communication between a host computer (client) and the RFSoC (server) connected through Ethernet to it. 

    Configures the antenna front end (Sivers EVK) parameters. The antenna front end is responsible for upconverting the frequency, beamforming and providing the physical interface with the air.

    Implements the client side (host computer) functionality of the TCP connection. 

    Most of the methods of this class were developed by Panagiotis Skrimponis. They were integrated in a class and extended by Julian D. Villegas G.
    """
    def __init__(self, radio_control_port=8080, radio_data_port=8081, rfsoc_static_ip_address='10.1.1.40', filename='PDAPs', operating_freq=57.51e9):
        """
        Creates two sockets: one for control commands and another for transfer data.

        Establish the connection between the client and the RFSoC.

        Some important attributes of this class are:

         1. ``beam_idx_for_vis``: this attribute sets the index of the beams of the measured Channel Impulse Response (CIR) that are sent from the drone node to the ground node to be displayed in the GUI.

         2. ``TIME_SNAPS_TO_SAVE``: number of CIR snapshots to collect before save them on disk.

         3. ``TIME_SNAPS_TO_VIS``: number of CIR snapshots to be displayed. This value is set depending on the max bytes that can be send through Ethernet in a single message (i.e. 22 time snaps * 16 beams * 4 bytes-per-PAP-entry = 1408 bytes)

         4. ``nbeams``: number of beams of the CIR to be retrieved from the server.

         5. ``nread``: number of delay taps of the CIR to be retrieved from the server.

         6. ``nbytes``: number of bytes of a full CIR (1 time snapshot, 64 beams, 1024 delay taps)

         7. ``beam_angles``: list of beam angles used in beamforming. Beam angle at index 0 corresponds to the omnidirectional case.

        Args:
            radio_control_port (int, optional): port for "control socket". Defaults to 8080.
            radio_data_port (int, optional): port for "data socket". Defaults to 8081.
            rfsoc_static_ip_address (str, optional): static IP address of the rfsoc ethernet interface. Defaults to '10.1.1.40'.
            filename (str, optional): name to be used when saving the CIRs. Defaults to 'PDAPs'.
            operating_freq (_type_, optional): operating frequency for the antenna array front end. Defaults to 57.51e9.
        """

        self.operating_freq = operating_freq
        self.radio_control_port = radio_control_port
        self.radio_data_port = radio_data_port
        self.filename_to_save = filename
        self.hest = []
        self.meas_time_tag = []
        self.RFSoCSuccessExecutionAns = "Successully executed"
        self.RFSoCSuccessAns = "Success"
        self.n_receive_calls = 0
        self.time_begin_receive_call = 0
        self.time_finish_receive_call = 0
        self.time_begin_receive_thread = 0
        self.time_finish_receive_thread = 0
        self.beam_idx_for_vis = [i*4 for i in range(0, 16)]
        self.bytes_per_irf = 64*1024*16 # Exactly 1 MB
        self.irfs_per_second = 7 # THIS MUST BE FOUND IN BETTER A WAY
        self.TIME_SNAPS_TO_SAVE = 220  
        self.MAX_PAP_BUF_SIZE_BYTES = self.TIME_SNAPS_TO_SAVE * self.bytes_per_irf
        self.TIME_SNAPS_TO_VIS = 22
        self.TIME_GET_IRF = 0.14
        self.nbeams = 64
        nbytes_per_item = 2
        self.nread = 1024
        self.nbytes = self.nbeams * nbytes_per_item * self.nread * 2 # Beams x SubCarriers(delay taps) x 2Bytes from  INT16 x 2 frpm Real and Imaginary

        beamforming_angles_file = "C:\\Users\\jvjulian\\OneDrive - Teknologian Tutkimuskeskus VTT\\Documents\\Aerial\\Repos\\a2gMeasurements\\data\\rx_sivers_beam_index_mapping.csv"

        self.beam_angles = [0]*64

        # Get the beamforming angles
        #with open(beamforming_angles_file, 'r') as f:
        #    reader = csv.reader(f, delimiter=",")
        #    for cnt, row in enumerate(reader):
        #        self.beam_angles[cnt] = float(row[1])

        self.radio_control = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.radio_control.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.radio_data = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.radio_data.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        try:
            self.radio_control.connect((rfsoc_static_ip_address, radio_control_port))
            self.radio_data.connect((rfsoc_static_ip_address, radio_data_port))
        except Exception as e:
            print("[DEBUG]: Error in rfsoc socket connection: ", e)
            print("[DEBUG]: Check RFSoC ETH, and Power it Off and On")

    def send_cmd(self, cmd, cmd_arg=None):
        """
        Sends a command to the RFSoC server. 

        These commands control the Sivers EVK mode, carrier frequncy, tx gain and rx gain.

        Args:
            cmd (str): available commands are: ``setModeSivers``, ``setCarrierFrequencySivers``, ``setGainTxSivers``, ``setGainRxSivers``.
            cmd_arg (str | float | dict, optional): supported parameters for 'setModeSivers' are 'RXen_0_TXen1', 'RXen1_TXen0', 'RXen0_TXen0'; supported parameters for 'setCarrierFrequencySivers' are float number, i.e.: 57.51e9; supported parameters for 'setGainTxSivers' are dict with this structure {'tx_bb_gain': 0x00, 'tx_bb_phase': 0x00, 'tx_bb_iq_gain': 0x00, 'tx_bfrf_gain': 0x00}; supported parameters for 'setGainRxSivers' are dict with this structure {'rx_gain_ctrl_bb1':0x00, 'rx_gain_ctrl_bb2':0x00, 'rx_gain_ctrl_bb3':0x00, 'rx_gain_ctrl_bfrf':0x00}. Defaults to None.
        """

        try:
            if cmd == 'setModeSivers':
                if cmd_arg == 'RXen0_TXen1' or cmd_arg == 'RXen1_TXen0' or cmd_arg == 'RXen0_TXen0':
                    self.radio_control.sendall(b"setModeSiver "+str.encode(str(cmd_arg)))
                else:
                    print("[DEBUG]: Unknown Sivers mode")
            elif cmd == 'setCarrierFrequencySivers':
                self.radio_control.sendall(b"setCarrierFrequency "+str.encode(str(cmd_arg)))
            elif cmd == 'setGainTxSivers':
                tx_bb_gain = cmd_arg['tx_bb_gain']
                tx_bb_phase = cmd_arg['tx_bb_phase']
                tx_bb_iq_gain = cmd_arg['tx_bb_iq_gain']
                tx_bfrf_gain = cmd_arg['tx_bfrf_gain']

                self.radio_control.sendall(b"setGainTX " + str.encode(str(int(tx_bb_gain)) + " ") \
                                                            + str.encode(str(int(tx_bb_phase)) + " ") \
                                                            + str.encode(str(int(tx_bb_iq_gain)) + " ") \
                                                            + str.encode(str(int(tx_bfrf_gain))))
            elif cmd == 'setGainRxSivers':
                rx_gain_ctrl_bb1 = cmd_arg['rx_gain_ctrl_bb1']
                rx_gain_ctrl_bb2 = cmd_arg['rx_gain_ctrl_bb2']
                rx_gain_ctrl_bb3 = cmd_arg['rx_gain_ctrl_bb3']
                rx_gain_ctrl_bfrf = cmd_arg['rx_gain_ctrl_bfrf']

                self.radio_control.sendall(b"setGainRX " + str.encode(str(int(rx_gain_ctrl_bb1)) + " ") \
                                                            + str.encode(str(int(rx_gain_ctrl_bb2)) + " ") \
                                                            + str.encode(str(int(rx_gain_ctrl_bb3)) + " ") \
                                                            + str.encode(str(int(rx_gain_ctrl_bfrf))))
            elif cmd == 'transmitSamples':
                self.radio_control.sendall(b"transmitSamples")
            else: 
                print("[DEBUG]: Unknown command to send to RFSoC")
                return
        except IOError as e:
            if e.errno == errno.EPIPE:
                print("[ERROR]: RFSoC to Host connection has problems: ", e)
        else:
            data = self.radio_control.recv(1024)
            data = data.decode('utf-8')

            if self.RFSoCSuccessExecutionAns in data or self.RFSoCSuccessAns in data:
                print("[DEBUG]: Command ", cmd, " executed succesfully on Sivers or RFSoC")
            else:
                print("[DEBUG]: Command ", cmd, " was not successfully executed on Sivers or RFSoC. The following error appears: ", data)

    def transmit_signal(self, tx_bb_gain=0x3, tx_bb_phase=0, tx_bb_iq_gain=0x77, tx_bfrf_gain=0x40, carrier_freq=57.51e9):
        """
        Sets Tx gains and frequency of operation. Wrapper function of ``send_cmd``.

        More about TX gains is found in the Sivers EVK manual/reference guides.

        Args:
            tx_bb_gain (hexadecimal, optional): sets baseband gain according to: 0x00  = 0 dB, 0x01  = 3.5 dB, 0x02  = 3.5 dB, 0x03  = 6 dB (when sivers register tx_ctrl bit 3 (BB Ibias set) = 1). Defaults to 0x3.
            tx_bb_phase (int, optional): _description_. Defaults to 0.
            tx_bb_iq_gain (hexadecimal, optional): sets baseband I, Q gain according to: [0:3, I gain]: 0-6 dB, 16 steps; [4:7, Q gain]: 0-6 dB, 16 steps. Defaults to 0x77.
            tx_bfrf_gain (hexadecimal, optional): sets gain after RF mixer according to: [0:3, RF gain]: 0-15 dB, 16 steps; [4:7, BF gain]: 0-15 dB, 16 steps. Defaults to 0x40.
            carrier_freq (_type_, optional): carrier frequency from the available frequency range for the Sivers EVK 06002/3 (in this case: 57-71 GHz). Defaults to 57.51e9.
        """

        dict_tx_gains = {'tx_bb_gain': tx_bb_gain, 'tx_bb_phase': tx_bb_phase, 'tx_bb_iq_gain': tx_bb_iq_gain, 'tx_bfrf_gain': tx_bfrf_gain}

        self.send_cmd('transmitSamples')
        self.send_cmd('setModeSivers', cmd_arg='RXen0_TXen1')
        self.send_cmd('setCarrierFrequencySivers', cmd_arg=carrier_freq)
        self.send_cmd('setGainTxSivers', cmd_arg=dict_tx_gains)

    def set_rx_rf(self, rx_gain_ctrl_bb1=0x77, rx_gain_ctrl_bb2=0x00, rx_gain_ctrl_bb3=0x99, rx_gain_ctrl_bfrf=0xFF, carrier_freq=57.51e9):
        """
        Sets rx gains and frequency of operation. Wrapper function of ``send_cmd``.

        Args:
            rx_gain_ctrl_bb1 (hexadecimal, optional): sets the first rx gain for the I,Q according to: I[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps; Q[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps. Defaults to 0x77.
            rx_gain_ctrl_bb2 (hexadecimal, optional): sets the second rx gain for the I,Q according to: I[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps; Q[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps. Defaults to 0x00.
            rx_gain_ctrl_bb3 (hexadecimal, optional): sets the third rx gain for the I,Q according to: I[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps; Q[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps. Defaults to 0x99.
            rx_gain_ctrl_bfrf (_type_, optional): sets gain after the mixer according to; [0:3,RF gain]: 0-15 dB, 16 steps; [4:7, BF gain]: 0-15 dB, 16 steps. Defaults to 0xFF.
            carrier_freq (_type_, optional): carrier frequency from the available frequency range for the Sivers EVK 06002/3 (in this case: 57-71 GHz). Defaults to 57.51e9.
        """

        dict_rx_gains = {'rx_gain_ctrl_bb1':rx_gain_ctrl_bb1, 'rx_gain_ctrl_bb2':rx_gain_ctrl_bb2, 'rx_gain_ctrl_bb3':rx_gain_ctrl_bb3, 'rx_gain_ctrl_bfrf':rx_gain_ctrl_bfrf}

        self.send_cmd('setModeSivers', cmd_arg='RXen1_TXen0')
        self.send_cmd('setCarrierFrequencySivers', cmd_arg=carrier_freq)
        self.send_cmd('setGainRxSivers', cmd_arg=dict_rx_gains)

    def receive_signal_async(self, stop_event):
        """
        Callback for the thread responsible for retrieving CIRs from RFSoC server (rfsoc thread).

        When enough (``self.TIME_SNAPS_TO_VIS``) CIR time snapshots are available, computes the Power Angular Profile to be sent to the ground node for displaying it in the GUI.

        When enough (``self.TIME_SNAPS_TO_SAVE``) CIR time snapshots are available, saves the CIRs on disk.

        Args:
            stop_event (threading.Event): when set, this function does nothing (the thread can be alived but does nothing)
        """

        while not stop_event.is_set():
            self.n_receive_calls = self.n_receive_calls + 1
            self.radio_control.sendall(b"receiveSamples")
            buf = bytearray()

            while len(buf) < self.nbytes:
                data = self.radio_data.recv(self.nbytes)
                buf.extend(data)
            data = np.frombuffer(buf, dtype=np.int16)
            rxtd = data[:self.nread*self.nbeams] + 1j*data[self.nread*self.nbeams:]
            rxtd = rxtd.reshape(self.nbeams, self.nread)

            self.hest.append(rxtd)

            if len(self.hest) == self.TIME_SNAPS_TO_VIS:
                self.data_to_visualize = self.pipeline_operations_rfsoc_rx_ndarray(np.array(self.hest), 2)

            if len(self.hest) > self.TIME_SNAPS_TO_VIS: # maximum packet size to send over the tcp connection
                tmp = self.pipeline_operations_rfsoc_rx_ndarray(rxtd, 1)
                self.data_to_visualize = np.roll(self.data_to_visualize, -1, axis=0) 
                self.data_to_visualize[-1, :] = tmp 

            if len(self.hest) >= self.TIME_SNAPS_TO_SAVE:
                print(f"[DEBUG]: Time between save callbacks: {time.time() - self.start_time_pap_callback}")
                self.save_hest_buffer()      

    def maximum_power_direction(self, array):
        """
        Computes the beamforming angle (azimuth) of maximum power at the receiver.

        The time under consideration should be short or less than the inverse of the rate of change of the direction of maximum power.

        The rate of change of the direction of maximum power is not easy to compute or assume, but we can restrict the computation of the direction to a window of a given ms.

        Args:
            array (numpy.ndarray): this is the array having the IRFs. Its dimensionality is Time x 64 x 2044.

        Returns:
            angleMaxPower (list): contains the (azimuth) angles for maximum power across all the time snapshots considered. Has 64 entries.
        """
        aux = np.abs(array) 
        aux = aux * aux # faster pow 2
        aux = np.sum(aux, axis=2)

        # This is an Nbeams array
        idx_max_power = np.argmax(aux, axis=0)
        return self.beam_angles[idx_max_power]        

    def pipeline_operations_rfsoc_rx_ndarray(self, array, axis, each_n_beams=4):
        """
        Computes the PAP for a single snapshot CIR (64 beams * 1024 delay taps) or the PAPs of multiple snapshots CIR (snaps * 64 beams * 1024 delay taps).

        Args:
            array (numpy.ndarray): CIRs. If it has 2 dimensions the CIR correspond to a single snapshot, if it has 3 dimensions, the CIR correspond to multiple snapshots.
            axis (int): delay tap axis, either 0, 1 and 2.
            each_n_beams (int, optional): subsample the 64 beams by this value. Defaults to 4.

        Returns:
            aux (numpy.ndarray): computed PAP "subsampled" version.
        """

        if axis >= len(array.shape):
            print(f"[DEBUG]: Invalid axis over which to add entries. The array has: {len(array.shape)} dimensions")
            return 0

        aux = np.abs(array) 
        aux = aux * aux # faster pow 2
        aux = np.sum(aux, axis=axis)
        if axis==1:
            aux = aux[::each_n_beams]
        elif axis==2:
            aux = aux[:, ::each_n_beams]
        else:
            print("[ERROR]: Wrong axis over which to do pipeline_operations_rfsoc_rx_ndarray")
        # Compute 10-time-snaps block mean
        #self.data_to_visualize = compute_block_mean_2d_array(self.data_to_visualize, 10)

        aux = np.asarray(aux, dtype=np.float32)
        return aux

    def save_hest_buffer(self, register_time=True):
        """
        Saves the raw (time-snaps, n_beams, n_delay_taps) CIR array.

        Args:
            register_time (bool, optional): parameter used for debugging purposes. Defaults to True.
        """

        datestr = datetime.datetime.now()
        datestr = datestr.strftime('%Y-%m-%d-%H-%M-%S-%f')

        # Double check that there is something in the array
        if len(self.hest) > 0:
            with open('../Measurement Files/' + datestr + '-' + self.filename_to_save + '.npy', 'wb') as f:
                #np.save(f, np.stack(self.hest, axis=0))
                np.save(f, np.array(self.hest))

            print("[DEBUG]: Saved file ", datestr + self.filename_to_save + '.npy')
            print("[DEBUG]: Saved file ", datestr + self.filename_to_save + '-TIMETAGS' + '.npy')
            self.hest = []

        if register_time:
            self.start_time_pap_callback = time.time()

    def start_thread_receive_meas_data(self, msg_data):
        """
        Creates and starts the rfsoc thread.

        A thread -instead of a subprocess- is good enough since the computational expense of the task is not donde in the host computer but in the RFSoC. The host just reads the data through Ethernet.

        A new thread is started each time this function is called. It is required for the developer to call 'stop_thread_receive_meas_data' before calling again this function in order to close the actual thread before creating a new one.

        Args:
            msg_data (dict): dictionary containing the parameters required by ``set_rx_rf`` to set a Sivers EVK configuration. 
        """

        self.event_stop_thread_rx_irf = threading.Event()                
        self.thread_rx_irf = threading.Thread(target=self.receive_signal_async, args=(self.event_stop_thread_rx_irf,))
        self.set_rx_rf(carrier_freq=msg_data['carrier_freq'],
                       rx_gain_ctrl_bb1=msg_data['rx_gain_ctrl_bb1'],
                       rx_gain_ctrl_bb2=msg_data['rx_gain_ctrl_bb2'],
                       rx_gain_ctrl_bb3=msg_data['rx_gain_ctrl_bb3'],
                       rx_gain_ctrl_bfrf=msg_data['rx_gain_ctrl_bfrf'])
        time.sleep(0.1)
        self.time_begin_receive_thread = time.time()
        self.thread_rx_irf.start()
        self.start_time_pap_callback = time.time()

        print("[DEBUG]: receive_signal_async thread STARTED")

    def stop_thread_receive_meas_data(self):
        """
        Stops the rfsoc thread and saves remaining CIRs.
        """

        self.event_stop_thread_rx_irf.set()
        self.time_finish_receive_thread = time.time()
        print("[DEBUG]: receive_signal_async thread STOPPED")
        print("[DEBUG]: Received calls: ", self.n_receive_calls)
        print("[DEBUG]: Avg. time of execution of 'receive_signal' callback is ", ((self.time_finish_receive_thread - self.time_begin_receive_thread)/self.n_receive_calls))

        self.save_hest_buffer(self, register_time=False)

        self.n_receive_calls = 0

    def finish_measurement(self):
        """
        Kills the rfsoc if it is still alive and saves the remaining CIRs.
        """
        # Check if the thread is finished and if not stop it
        if self.thread_rx_irf.is_alive():
            self.stop_thread_receive_meas_data()

        self.save_hest_buffer(self, register_time=False)

__init__(radio_control_port=8080, radio_data_port=8081, rfsoc_static_ip_address='10.1.1.40', filename='PDAPs', operating_freq=57510000000.0)

Creates two sockets: one for control commands and another for transfer data.

Establish the connection between the client and the RFSoC.

Some important attributes of this class are:

  1. beam_idx_for_vis: this attribute sets the index of the beams of the measured Channel Impulse Response (CIR) that are sent from the drone node to the ground node to be displayed in the GUI.

  2. TIME_SNAPS_TO_SAVE: number of CIR snapshots to collect before save them on disk.

  3. TIME_SNAPS_TO_VIS: number of CIR snapshots to be displayed. This value is set depending on the max bytes that can be send through Ethernet in a single message (i.e. 22 time snaps * 16 beams * 4 bytes-per-PAP-entry = 1408 bytes)

  4. nbeams: number of beams of the CIR to be retrieved from the server.

  5. nread: number of delay taps of the CIR to be retrieved from the server.

  6. nbytes: number of bytes of a full CIR (1 time snapshot, 64 beams, 1024 delay taps)

  7. beam_angles: list of beam angles used in beamforming. Beam angle at index 0 corresponds to the omnidirectional case.

Parameters:

Name Type Description Default
radio_control_port int

port for "control socket". Defaults to 8080.

8080
radio_data_port int

port for "data socket". Defaults to 8081.

8081
rfsoc_static_ip_address str

static IP address of the rfsoc ethernet interface. Defaults to '10.1.1.40'.

'10.1.1.40'
filename str

name to be used when saving the CIRs. Defaults to 'PDAPs'.

'PDAPs'
operating_freq _type_

operating frequency for the antenna array front end. Defaults to 57.51e9.

57510000000.0
Source code in a2gmeasurements.py
def __init__(self, radio_control_port=8080, radio_data_port=8081, rfsoc_static_ip_address='10.1.1.40', filename='PDAPs', operating_freq=57.51e9):
    """
    Creates two sockets: one for control commands and another for transfer data.

    Establish the connection between the client and the RFSoC.

    Some important attributes of this class are:

     1. ``beam_idx_for_vis``: this attribute sets the index of the beams of the measured Channel Impulse Response (CIR) that are sent from the drone node to the ground node to be displayed in the GUI.

     2. ``TIME_SNAPS_TO_SAVE``: number of CIR snapshots to collect before save them on disk.

     3. ``TIME_SNAPS_TO_VIS``: number of CIR snapshots to be displayed. This value is set depending on the max bytes that can be send through Ethernet in a single message (i.e. 22 time snaps * 16 beams * 4 bytes-per-PAP-entry = 1408 bytes)

     4. ``nbeams``: number of beams of the CIR to be retrieved from the server.

     5. ``nread``: number of delay taps of the CIR to be retrieved from the server.

     6. ``nbytes``: number of bytes of a full CIR (1 time snapshot, 64 beams, 1024 delay taps)

     7. ``beam_angles``: list of beam angles used in beamforming. Beam angle at index 0 corresponds to the omnidirectional case.

    Args:
        radio_control_port (int, optional): port for "control socket". Defaults to 8080.
        radio_data_port (int, optional): port for "data socket". Defaults to 8081.
        rfsoc_static_ip_address (str, optional): static IP address of the rfsoc ethernet interface. Defaults to '10.1.1.40'.
        filename (str, optional): name to be used when saving the CIRs. Defaults to 'PDAPs'.
        operating_freq (_type_, optional): operating frequency for the antenna array front end. Defaults to 57.51e9.
    """

    self.operating_freq = operating_freq
    self.radio_control_port = radio_control_port
    self.radio_data_port = radio_data_port
    self.filename_to_save = filename
    self.hest = []
    self.meas_time_tag = []
    self.RFSoCSuccessExecutionAns = "Successully executed"
    self.RFSoCSuccessAns = "Success"
    self.n_receive_calls = 0
    self.time_begin_receive_call = 0
    self.time_finish_receive_call = 0
    self.time_begin_receive_thread = 0
    self.time_finish_receive_thread = 0
    self.beam_idx_for_vis = [i*4 for i in range(0, 16)]
    self.bytes_per_irf = 64*1024*16 # Exactly 1 MB
    self.irfs_per_second = 7 # THIS MUST BE FOUND IN BETTER A WAY
    self.TIME_SNAPS_TO_SAVE = 220  
    self.MAX_PAP_BUF_SIZE_BYTES = self.TIME_SNAPS_TO_SAVE * self.bytes_per_irf
    self.TIME_SNAPS_TO_VIS = 22
    self.TIME_GET_IRF = 0.14
    self.nbeams = 64
    nbytes_per_item = 2
    self.nread = 1024
    self.nbytes = self.nbeams * nbytes_per_item * self.nread * 2 # Beams x SubCarriers(delay taps) x 2Bytes from  INT16 x 2 frpm Real and Imaginary

    beamforming_angles_file = "C:\\Users\\jvjulian\\OneDrive - Teknologian Tutkimuskeskus VTT\\Documents\\Aerial\\Repos\\a2gMeasurements\\data\\rx_sivers_beam_index_mapping.csv"

    self.beam_angles = [0]*64

    # Get the beamforming angles
    #with open(beamforming_angles_file, 'r') as f:
    #    reader = csv.reader(f, delimiter=",")
    #    for cnt, row in enumerate(reader):
    #        self.beam_angles[cnt] = float(row[1])

    self.radio_control = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
    self.radio_control.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.radio_data = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
    self.radio_data.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    try:
        self.radio_control.connect((rfsoc_static_ip_address, radio_control_port))
        self.radio_data.connect((rfsoc_static_ip_address, radio_data_port))
    except Exception as e:
        print("[DEBUG]: Error in rfsoc socket connection: ", e)
        print("[DEBUG]: Check RFSoC ETH, and Power it Off and On")

finish_measurement()

Kills the rfsoc if it is still alive and saves the remaining CIRs.

Source code in a2gmeasurements.py
def finish_measurement(self):
    """
    Kills the rfsoc if it is still alive and saves the remaining CIRs.
    """
    # Check if the thread is finished and if not stop it
    if self.thread_rx_irf.is_alive():
        self.stop_thread_receive_meas_data()

    self.save_hest_buffer(self, register_time=False)

maximum_power_direction(array)

Computes the beamforming angle (azimuth) of maximum power at the receiver.

The time under consideration should be short or less than the inverse of the rate of change of the direction of maximum power.

The rate of change of the direction of maximum power is not easy to compute or assume, but we can restrict the computation of the direction to a window of a given ms.

Parameters:

Name Type Description Default
array ndarray

this is the array having the IRFs. Its dimensionality is Time x 64 x 2044.

required

Returns:

Name Type Description
angleMaxPower list

contains the (azimuth) angles for maximum power across all the time snapshots considered. Has 64 entries.

Source code in a2gmeasurements.py
def maximum_power_direction(self, array):
    """
    Computes the beamforming angle (azimuth) of maximum power at the receiver.

    The time under consideration should be short or less than the inverse of the rate of change of the direction of maximum power.

    The rate of change of the direction of maximum power is not easy to compute or assume, but we can restrict the computation of the direction to a window of a given ms.

    Args:
        array (numpy.ndarray): this is the array having the IRFs. Its dimensionality is Time x 64 x 2044.

    Returns:
        angleMaxPower (list): contains the (azimuth) angles for maximum power across all the time snapshots considered. Has 64 entries.
    """
    aux = np.abs(array) 
    aux = aux * aux # faster pow 2
    aux = np.sum(aux, axis=2)

    # This is an Nbeams array
    idx_max_power = np.argmax(aux, axis=0)
    return self.beam_angles[idx_max_power]        

pipeline_operations_rfsoc_rx_ndarray(array, axis, each_n_beams=4)

Computes the PAP for a single snapshot CIR (64 beams * 1024 delay taps) or the PAPs of multiple snapshots CIR (snaps * 64 beams * 1024 delay taps).

Parameters:

Name Type Description Default
array ndarray

CIRs. If it has 2 dimensions the CIR correspond to a single snapshot, if it has 3 dimensions, the CIR correspond to multiple snapshots.

required
axis int

delay tap axis, either 0, 1 and 2.

required
each_n_beams int

subsample the 64 beams by this value. Defaults to 4.

4

Returns:

Name Type Description
aux ndarray

computed PAP "subsampled" version.

Source code in a2gmeasurements.py
def pipeline_operations_rfsoc_rx_ndarray(self, array, axis, each_n_beams=4):
    """
    Computes the PAP for a single snapshot CIR (64 beams * 1024 delay taps) or the PAPs of multiple snapshots CIR (snaps * 64 beams * 1024 delay taps).

    Args:
        array (numpy.ndarray): CIRs. If it has 2 dimensions the CIR correspond to a single snapshot, if it has 3 dimensions, the CIR correspond to multiple snapshots.
        axis (int): delay tap axis, either 0, 1 and 2.
        each_n_beams (int, optional): subsample the 64 beams by this value. Defaults to 4.

    Returns:
        aux (numpy.ndarray): computed PAP "subsampled" version.
    """

    if axis >= len(array.shape):
        print(f"[DEBUG]: Invalid axis over which to add entries. The array has: {len(array.shape)} dimensions")
        return 0

    aux = np.abs(array) 
    aux = aux * aux # faster pow 2
    aux = np.sum(aux, axis=axis)
    if axis==1:
        aux = aux[::each_n_beams]
    elif axis==2:
        aux = aux[:, ::each_n_beams]
    else:
        print("[ERROR]: Wrong axis over which to do pipeline_operations_rfsoc_rx_ndarray")
    # Compute 10-time-snaps block mean
    #self.data_to_visualize = compute_block_mean_2d_array(self.data_to_visualize, 10)

    aux = np.asarray(aux, dtype=np.float32)
    return aux

receive_signal_async(stop_event)

Callback for the thread responsible for retrieving CIRs from RFSoC server (rfsoc thread).

When enough (self.TIME_SNAPS_TO_VIS) CIR time snapshots are available, computes the Power Angular Profile to be sent to the ground node for displaying it in the GUI.

When enough (self.TIME_SNAPS_TO_SAVE) CIR time snapshots are available, saves the CIRs on disk.

Parameters:

Name Type Description Default
stop_event Event

when set, this function does nothing (the thread can be alived but does nothing)

required
Source code in a2gmeasurements.py
def receive_signal_async(self, stop_event):
    """
    Callback for the thread responsible for retrieving CIRs from RFSoC server (rfsoc thread).

    When enough (``self.TIME_SNAPS_TO_VIS``) CIR time snapshots are available, computes the Power Angular Profile to be sent to the ground node for displaying it in the GUI.

    When enough (``self.TIME_SNAPS_TO_SAVE``) CIR time snapshots are available, saves the CIRs on disk.

    Args:
        stop_event (threading.Event): when set, this function does nothing (the thread can be alived but does nothing)
    """

    while not stop_event.is_set():
        self.n_receive_calls = self.n_receive_calls + 1
        self.radio_control.sendall(b"receiveSamples")
        buf = bytearray()

        while len(buf) < self.nbytes:
            data = self.radio_data.recv(self.nbytes)
            buf.extend(data)
        data = np.frombuffer(buf, dtype=np.int16)
        rxtd = data[:self.nread*self.nbeams] + 1j*data[self.nread*self.nbeams:]
        rxtd = rxtd.reshape(self.nbeams, self.nread)

        self.hest.append(rxtd)

        if len(self.hest) == self.TIME_SNAPS_TO_VIS:
            self.data_to_visualize = self.pipeline_operations_rfsoc_rx_ndarray(np.array(self.hest), 2)

        if len(self.hest) > self.TIME_SNAPS_TO_VIS: # maximum packet size to send over the tcp connection
            tmp = self.pipeline_operations_rfsoc_rx_ndarray(rxtd, 1)
            self.data_to_visualize = np.roll(self.data_to_visualize, -1, axis=0) 
            self.data_to_visualize[-1, :] = tmp 

        if len(self.hest) >= self.TIME_SNAPS_TO_SAVE:
            print(f"[DEBUG]: Time between save callbacks: {time.time() - self.start_time_pap_callback}")
            self.save_hest_buffer()      

save_hest_buffer(register_time=True)

Saves the raw (time-snaps, n_beams, n_delay_taps) CIR array.

Parameters:

Name Type Description Default
register_time bool

parameter used for debugging purposes. Defaults to True.

True
Source code in a2gmeasurements.py
def save_hest_buffer(self, register_time=True):
    """
    Saves the raw (time-snaps, n_beams, n_delay_taps) CIR array.

    Args:
        register_time (bool, optional): parameter used for debugging purposes. Defaults to True.
    """

    datestr = datetime.datetime.now()
    datestr = datestr.strftime('%Y-%m-%d-%H-%M-%S-%f')

    # Double check that there is something in the array
    if len(self.hest) > 0:
        with open('../Measurement Files/' + datestr + '-' + self.filename_to_save + '.npy', 'wb') as f:
            #np.save(f, np.stack(self.hest, axis=0))
            np.save(f, np.array(self.hest))

        print("[DEBUG]: Saved file ", datestr + self.filename_to_save + '.npy')
        print("[DEBUG]: Saved file ", datestr + self.filename_to_save + '-TIMETAGS' + '.npy')
        self.hest = []

    if register_time:
        self.start_time_pap_callback = time.time()

send_cmd(cmd, cmd_arg=None)

Sends a command to the RFSoC server.

These commands control the Sivers EVK mode, carrier frequncy, tx gain and rx gain.

Parameters:

Name Type Description Default
cmd str

available commands are: setModeSivers, setCarrierFrequencySivers, setGainTxSivers, setGainRxSivers.

required
cmd_arg str | float | dict

supported parameters for 'setModeSivers' are 'RXen_0_TXen1', 'RXen1_TXen0', 'RXen0_TXen0'; supported parameters for 'setCarrierFrequencySivers' are float number, i.e.: 57.51e9; supported parameters for 'setGainTxSivers' are dict with this structure {'tx_bb_gain': 0x00, 'tx_bb_phase': 0x00, 'tx_bb_iq_gain': 0x00, 'tx_bfrf_gain': 0x00}; supported parameters for 'setGainRxSivers' are dict with this structure {'rx_gain_ctrl_bb1':0x00, 'rx_gain_ctrl_bb2':0x00, 'rx_gain_ctrl_bb3':0x00, 'rx_gain_ctrl_bfrf':0x00}. Defaults to None.

None
Source code in a2gmeasurements.py
def send_cmd(self, cmd, cmd_arg=None):
    """
    Sends a command to the RFSoC server. 

    These commands control the Sivers EVK mode, carrier frequncy, tx gain and rx gain.

    Args:
        cmd (str): available commands are: ``setModeSivers``, ``setCarrierFrequencySivers``, ``setGainTxSivers``, ``setGainRxSivers``.
        cmd_arg (str | float | dict, optional): supported parameters for 'setModeSivers' are 'RXen_0_TXen1', 'RXen1_TXen0', 'RXen0_TXen0'; supported parameters for 'setCarrierFrequencySivers' are float number, i.e.: 57.51e9; supported parameters for 'setGainTxSivers' are dict with this structure {'tx_bb_gain': 0x00, 'tx_bb_phase': 0x00, 'tx_bb_iq_gain': 0x00, 'tx_bfrf_gain': 0x00}; supported parameters for 'setGainRxSivers' are dict with this structure {'rx_gain_ctrl_bb1':0x00, 'rx_gain_ctrl_bb2':0x00, 'rx_gain_ctrl_bb3':0x00, 'rx_gain_ctrl_bfrf':0x00}. Defaults to None.
    """

    try:
        if cmd == 'setModeSivers':
            if cmd_arg == 'RXen0_TXen1' or cmd_arg == 'RXen1_TXen0' or cmd_arg == 'RXen0_TXen0':
                self.radio_control.sendall(b"setModeSiver "+str.encode(str(cmd_arg)))
            else:
                print("[DEBUG]: Unknown Sivers mode")
        elif cmd == 'setCarrierFrequencySivers':
            self.radio_control.sendall(b"setCarrierFrequency "+str.encode(str(cmd_arg)))
        elif cmd == 'setGainTxSivers':
            tx_bb_gain = cmd_arg['tx_bb_gain']
            tx_bb_phase = cmd_arg['tx_bb_phase']
            tx_bb_iq_gain = cmd_arg['tx_bb_iq_gain']
            tx_bfrf_gain = cmd_arg['tx_bfrf_gain']

            self.radio_control.sendall(b"setGainTX " + str.encode(str(int(tx_bb_gain)) + " ") \
                                                        + str.encode(str(int(tx_bb_phase)) + " ") \
                                                        + str.encode(str(int(tx_bb_iq_gain)) + " ") \
                                                        + str.encode(str(int(tx_bfrf_gain))))
        elif cmd == 'setGainRxSivers':
            rx_gain_ctrl_bb1 = cmd_arg['rx_gain_ctrl_bb1']
            rx_gain_ctrl_bb2 = cmd_arg['rx_gain_ctrl_bb2']
            rx_gain_ctrl_bb3 = cmd_arg['rx_gain_ctrl_bb3']
            rx_gain_ctrl_bfrf = cmd_arg['rx_gain_ctrl_bfrf']

            self.radio_control.sendall(b"setGainRX " + str.encode(str(int(rx_gain_ctrl_bb1)) + " ") \
                                                        + str.encode(str(int(rx_gain_ctrl_bb2)) + " ") \
                                                        + str.encode(str(int(rx_gain_ctrl_bb3)) + " ") \
                                                        + str.encode(str(int(rx_gain_ctrl_bfrf))))
        elif cmd == 'transmitSamples':
            self.radio_control.sendall(b"transmitSamples")
        else: 
            print("[DEBUG]: Unknown command to send to RFSoC")
            return
    except IOError as e:
        if e.errno == errno.EPIPE:
            print("[ERROR]: RFSoC to Host connection has problems: ", e)
    else:
        data = self.radio_control.recv(1024)
        data = data.decode('utf-8')

        if self.RFSoCSuccessExecutionAns in data or self.RFSoCSuccessAns in data:
            print("[DEBUG]: Command ", cmd, " executed succesfully on Sivers or RFSoC")
        else:
            print("[DEBUG]: Command ", cmd, " was not successfully executed on Sivers or RFSoC. The following error appears: ", data)

set_rx_rf(rx_gain_ctrl_bb1=119, rx_gain_ctrl_bb2=0, rx_gain_ctrl_bb3=153, rx_gain_ctrl_bfrf=255, carrier_freq=57510000000.0)

Sets rx gains and frequency of operation. Wrapper function of send_cmd.

Parameters:

Name Type Description Default
rx_gain_ctrl_bb1 hexadecimal

sets the first rx gain for the I,Q according to: I[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps; Q[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps. Defaults to 0x77.

119
rx_gain_ctrl_bb2 hexadecimal

sets the second rx gain for the I,Q according to: I[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps; Q[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps. Defaults to 0x00.

0
rx_gain_ctrl_bb3 hexadecimal

sets the third rx gain for the I,Q according to: I[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps; Q[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps. Defaults to 0x99.

153
rx_gain_ctrl_bfrf _type_

sets gain after the mixer according to; [0:3,RF gain]: 0-15 dB, 16 steps; [4:7, BF gain]: 0-15 dB, 16 steps. Defaults to 0xFF.

255
carrier_freq _type_

carrier frequency from the available frequency range for the Sivers EVK 06002/3 (in this case: 57-71 GHz). Defaults to 57.51e9.

57510000000.0
Source code in a2gmeasurements.py
def set_rx_rf(self, rx_gain_ctrl_bb1=0x77, rx_gain_ctrl_bb2=0x00, rx_gain_ctrl_bb3=0x99, rx_gain_ctrl_bfrf=0xFF, carrier_freq=57.51e9):
    """
    Sets rx gains and frequency of operation. Wrapper function of ``send_cmd``.

    Args:
        rx_gain_ctrl_bb1 (hexadecimal, optional): sets the first rx gain for the I,Q according to: I[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps; Q[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps. Defaults to 0x77.
        rx_gain_ctrl_bb2 (hexadecimal, optional): sets the second rx gain for the I,Q according to: I[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps; Q[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps. Defaults to 0x00.
        rx_gain_ctrl_bb3 (hexadecimal, optional): sets the third rx gain for the I,Q according to: I[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps; Q[0:3]:[0,1,3,7,F]:-6:0 dB, 4 steps. Defaults to 0x99.
        rx_gain_ctrl_bfrf (_type_, optional): sets gain after the mixer according to; [0:3,RF gain]: 0-15 dB, 16 steps; [4:7, BF gain]: 0-15 dB, 16 steps. Defaults to 0xFF.
        carrier_freq (_type_, optional): carrier frequency from the available frequency range for the Sivers EVK 06002/3 (in this case: 57-71 GHz). Defaults to 57.51e9.
    """

    dict_rx_gains = {'rx_gain_ctrl_bb1':rx_gain_ctrl_bb1, 'rx_gain_ctrl_bb2':rx_gain_ctrl_bb2, 'rx_gain_ctrl_bb3':rx_gain_ctrl_bb3, 'rx_gain_ctrl_bfrf':rx_gain_ctrl_bfrf}

    self.send_cmd('setModeSivers', cmd_arg='RXen1_TXen0')
    self.send_cmd('setCarrierFrequencySivers', cmd_arg=carrier_freq)
    self.send_cmd('setGainRxSivers', cmd_arg=dict_rx_gains)

start_thread_receive_meas_data(msg_data)

Creates and starts the rfsoc thread.

A thread -instead of a subprocess- is good enough since the computational expense of the task is not donde in the host computer but in the RFSoC. The host just reads the data through Ethernet.

A new thread is started each time this function is called. It is required for the developer to call 'stop_thread_receive_meas_data' before calling again this function in order to close the actual thread before creating a new one.

Parameters:

Name Type Description Default
msg_data dict

dictionary containing the parameters required by set_rx_rf to set a Sivers EVK configuration.

required
Source code in a2gmeasurements.py
def start_thread_receive_meas_data(self, msg_data):
    """
    Creates and starts the rfsoc thread.

    A thread -instead of a subprocess- is good enough since the computational expense of the task is not donde in the host computer but in the RFSoC. The host just reads the data through Ethernet.

    A new thread is started each time this function is called. It is required for the developer to call 'stop_thread_receive_meas_data' before calling again this function in order to close the actual thread before creating a new one.

    Args:
        msg_data (dict): dictionary containing the parameters required by ``set_rx_rf`` to set a Sivers EVK configuration. 
    """

    self.event_stop_thread_rx_irf = threading.Event()                
    self.thread_rx_irf = threading.Thread(target=self.receive_signal_async, args=(self.event_stop_thread_rx_irf,))
    self.set_rx_rf(carrier_freq=msg_data['carrier_freq'],
                   rx_gain_ctrl_bb1=msg_data['rx_gain_ctrl_bb1'],
                   rx_gain_ctrl_bb2=msg_data['rx_gain_ctrl_bb2'],
                   rx_gain_ctrl_bb3=msg_data['rx_gain_ctrl_bb3'],
                   rx_gain_ctrl_bfrf=msg_data['rx_gain_ctrl_bfrf'])
    time.sleep(0.1)
    self.time_begin_receive_thread = time.time()
    self.thread_rx_irf.start()
    self.start_time_pap_callback = time.time()

    print("[DEBUG]: receive_signal_async thread STARTED")

stop_thread_receive_meas_data()

Stops the rfsoc thread and saves remaining CIRs.

Source code in a2gmeasurements.py
def stop_thread_receive_meas_data(self):
    """
    Stops the rfsoc thread and saves remaining CIRs.
    """

    self.event_stop_thread_rx_irf.set()
    self.time_finish_receive_thread = time.time()
    print("[DEBUG]: receive_signal_async thread STOPPED")
    print("[DEBUG]: Received calls: ", self.n_receive_calls)
    print("[DEBUG]: Avg. time of execution of 'receive_signal' callback is ", ((self.time_finish_receive_thread - self.time_begin_receive_thread)/self.n_receive_calls))

    self.save_hest_buffer(self, register_time=False)

    self.n_receive_calls = 0

transmit_signal(tx_bb_gain=3, tx_bb_phase=0, tx_bb_iq_gain=119, tx_bfrf_gain=64, carrier_freq=57510000000.0)

Sets Tx gains and frequency of operation. Wrapper function of send_cmd.

More about TX gains is found in the Sivers EVK manual/reference guides.

Parameters:

Name Type Description Default
tx_bb_gain hexadecimal

sets baseband gain according to: 0x00 = 0 dB, 0x01 = 3.5 dB, 0x02 = 3.5 dB, 0x03 = 6 dB (when sivers register tx_ctrl bit 3 (BB Ibias set) = 1). Defaults to 0x3.

3
tx_bb_phase int

description. Defaults to 0.

0
tx_bb_iq_gain hexadecimal

sets baseband I, Q gain according to: [0:3, I gain]: 0-6 dB, 16 steps; [4:7, Q gain]: 0-6 dB, 16 steps. Defaults to 0x77.

119
tx_bfrf_gain hexadecimal

sets gain after RF mixer according to: [0:3, RF gain]: 0-15 dB, 16 steps; [4:7, BF gain]: 0-15 dB, 16 steps. Defaults to 0x40.

64
carrier_freq _type_

carrier frequency from the available frequency range for the Sivers EVK 06002/3 (in this case: 57-71 GHz). Defaults to 57.51e9.

57510000000.0
Source code in a2gmeasurements.py
def transmit_signal(self, tx_bb_gain=0x3, tx_bb_phase=0, tx_bb_iq_gain=0x77, tx_bfrf_gain=0x40, carrier_freq=57.51e9):
    """
    Sets Tx gains and frequency of operation. Wrapper function of ``send_cmd``.

    More about TX gains is found in the Sivers EVK manual/reference guides.

    Args:
        tx_bb_gain (hexadecimal, optional): sets baseband gain according to: 0x00  = 0 dB, 0x01  = 3.5 dB, 0x02  = 3.5 dB, 0x03  = 6 dB (when sivers register tx_ctrl bit 3 (BB Ibias set) = 1). Defaults to 0x3.
        tx_bb_phase (int, optional): _description_. Defaults to 0.
        tx_bb_iq_gain (hexadecimal, optional): sets baseband I, Q gain according to: [0:3, I gain]: 0-6 dB, 16 steps; [4:7, Q gain]: 0-6 dB, 16 steps. Defaults to 0x77.
        tx_bfrf_gain (hexadecimal, optional): sets gain after RF mixer according to: [0:3, RF gain]: 0-15 dB, 16 steps; [4:7, BF gain]: 0-15 dB, 16 steps. Defaults to 0x40.
        carrier_freq (_type_, optional): carrier frequency from the available frequency range for the Sivers EVK 06002/3 (in this case: 57-71 GHz). Defaults to 57.51e9.
    """

    dict_tx_gains = {'tx_bb_gain': tx_bb_gain, 'tx_bb_phase': tx_bb_phase, 'tx_bb_iq_gain': tx_bb_iq_gain, 'tx_bfrf_gain': tx_bfrf_gain}

    self.send_cmd('transmitSamples')
    self.send_cmd('setModeSivers', cmd_arg='RXen0_TXen1')
    self.send_cmd('setCarrierFrequencySivers', cmd_arg=carrier_freq)
    self.send_cmd('setGainTxSivers', cmd_arg=dict_tx_gains)