Skip to content

GpsSignaling module

Bases: object

Python class that works as "software" driver for the Septentrios GPS receiver (Mosaic-go).

It implements the commands described in Septentrios "mosaic-go Reference Guide".

There are commands (sent to the GPS receiver) that control (mainly) what type of information (in the form of what is called in Septentrio's documentation, NMEA or SBF sentences) is retrieved from the receiver.

It creates a thread (called here a gps thread) to handle the communication between the receiver and this host computer.

The reference coordinate system used by the Septentrio gps is defined as followed:

  1. The (positive) x-axis is the longitudinal axis. This is the axis pointing in the direction of movement of the node.

  2. The (positive) y-axis is 90 degrees to the right (clockwise) of the (positive) x-axis.

Source code in a2gmeasurements.py
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
class GpsSignaling(object):
    """
    Python class that works as "software" driver for the Septentrios GPS receiver (Mosaic-go). 

    It implements the commands described in Septentrios "mosaic-go Reference Guide".

    There are commands (sent to the GPS receiver) that control (mainly) what type of information (in the form of what is called in Septentrio's documentation, NMEA or SBF sentences) is retrieved from the receiver. 

    It creates a thread (called here a gps thread) to handle the communication between the receiver and this host computer.

    The reference coordinate system used by the Septentrio gps is defined as followed:

     1. The (positive) x-axis is the **longitudinal** axis. This is the axis pointing in the direction of movement of the node.

     2. The (positive) y-axis is 90 degrees to the right (clockwise) of the (positive) x-axis.

    """

    def __init__(self, DBG_LVL_1=False, DBG_LVL_2=False, DBG_LVL_0=False, save_filename='GPS'):
        """
        Contructor for the GpsSignaling class. Important attributes are:

        * ``register_sbf_sentences_by_id``: list of integers containing the expected SBF sentences that are going to be requested at the receiver. The integer is the ID of the sentence described in Septentrio's manual.

        * ``SBF_frame_buffer``: list of dictionaries containing the SBF frames during the execution of the thread responsible for receiving SBF frames.

        * ``MAX_SBF_BUFF_LEN``: Maximum number of entries in the SBF frame buffer before saving, cleaning and starting again

        Args:
            DBG_LVL_1 (bool, optional): used to print less verbose than level 0. Defaults to False.
            DBG_LVL_2 (bool, optional): used to print less verbose than level 2. Defaults to False.
            DBG_LVL_0 (bool, optional): prints all the verbose available. Defaults to False.
            save_filename (str, optional): name of the file where to save the record of GPS coordinates along an experiment. Defaults to 'GPS'.
        """

        # Initializations
        datestr = datetime.datetime.now()
        datestr = datestr.strftime('%Y-%m-%d-%H-%M-%S-%f')
        self.save_filename = save_filename + '-' + datestr
        self.SBF_frame_buffer = []
        self.NMEA_buffer = []
        self.stream_info = []
        self.MAX_SBF_BUFF_LEN = 100

        self.DBG_LVL_1 = DBG_LVL_1
        self.DBG_LVL_2 = DBG_LVL_2
        self.DBG_LVL_0 = DBG_LVL_0

        # Expected SBF sentences to be requested. Add or remove according to planned
        # SBF sentences to be requested.
        self.register_sbf_sentences_by_id = [4006, 5938] # PVTCart, AttEul
        self.n_sbf_sentences = len(self.register_sbf_sentences_by_id)

        self.ERR_GPS_CODE_GENERAL = -1.5e3
        self.ERR_GPS_CODE_SMALL_BUFF_SZ = -2.5e3       
        self.ERR_GPS_CODE_BUFF_NULL = -3.5e3
        self.ERR_GPS_CODE_NO_COORD_AVAIL = -4.5e3 
        self.ERR_GPS_CODE_NO_HEAD_AVAIL = -5.5e3

    def serial_connect(self, serial_port=None):
        """
        Open a serial connection. The Septentrio mosaic-go provides 2 virtual serial ports.

        In Windows the name of the virtual serial ports are typically: COM# (Virtual serial port 1), COM# (Virtual serial port 2).

        In Linux the name of the virtual serial ports (controlled by the standard Linux CDC-ACM driver) are: ``/dev/ttyACM0`` (Virtual serial port 1), ``/dev/ttyACM1`` (Virtual serial port 2).

        Septentrio has different interfaces to use with its receiver. Among other interfaces are: IP (using Ethernet-over-USB), USB.

        For the virtual serial ports the interface name in Septentrio receiver is 'USB' as their
        communication is made through the USB connection with the host computer. 

        Additionally there is an actual (not virtual) serial port in the mosaic-go device. Under Linux, the name of this port is ``/dev/serial0`` which is the symbolic link to either ``dev/ttyS#`` or ``/dev/ttyAMA#``.

        For information about all available interfaces check the Septentrio "mosaic-go Reference Guide".

        *It is important to note that only the USB interface has been implemented in this class*.

        Args:
            serial_port (str, optional): serial port or virtual serial port name. Defaults to None.
        """

        self.serial_port = None
        # Look for the first Virtual Com in Septentrio receiver. It is assumed that it is available, 
        # meaning that it has been closed by user if was used before.        
        for (this_port, desc, _) in sorted(comports()):

            # Linux CDC-ACM driver
            if 'Septentrio USB Device - CDC Abstract Control Model (ACM)' in desc:
                    #self.serial_port = '/dev/ttyACM0'
                    self.serial_port = this_port
                    self.interface_number = 2
            # Windows driver
            elif 'Septentrio Virtual USB COM Port 1' in desc: # Choose the first virtual COM port
                    self.serial_port = this_port
                    self.interface_number = 1

        if self.serial_port is None:
            self.GPS_CONN_SUCCESS = False
            print("\n[DEBUG]: NO GPS found in any serial port")
            return
        else:
            self.GPS_CONN_SUCCESS = True
            print("\n[DEBUG]: GPS found in one serial port")

        serial_instance = None
        while serial_instance is None:
            try:
                serial_instance = serial.serial_for_url(self.serial_port,
                                                        9600,
                                                        parity='N',
                                                        rtscts=False,
                                                        xonxoff=False,
                                                        do_not_open=True)

                serial_instance.timeout = 5

                serial_instance.exclusive = True
                serial_instance.open()

            except serial.SerialException as e:
                sys.stderr.write('could not open port {!r}: {}\n'.format(self.serial_port, e))

            else:
                break

        #if self.DBG_LVL_0:
        print('[DEBUG]:CONNECTED TO VIRTUAL SERIAL PORT IN SEPTENTRIO')

        self.serial_instance = serial_instance
        time.sleep(0.1)

    def process_gps_nmea_data(self, data):
        """
        Parses a line of NMEA data retrieved from the gps and coming from the virtual serial port.

        Used NMEA sentences are GGA and HDT.

        The labels of the items of the returned dictionary are the following ones for the GGA sentence: ``Timestamp``, ``Latitude``, ``Longitude``, ``Latitude Direction``, ``Longitude``, ``Longitude Direction``, ``GPS Quality Indicator``, ``Number of Satellites in use``, ``Horizontal Dilution of Precision``, ``Antenna Alt above sea level (mean)``, ``Units of altitude (meters)``, ``Geoidal Separation``, ``Units of Geoidal Separation (meters)``, ``Age of Differential GPS Data (secs)``, ``Differential Reference Station ID``.

        *The instances of this class created in the GUI and other classes, use SBF sentences as the default type of sentence*.

        Args:
            data (str): line of read data following the structure of a NMEA frame.
        """

        try:
            if self.DBG_LVL_0:
                print('\nNMEA PARSING')

            nmeaobj = pynmea2.parse(data.decode())
            extracted_data = ['%s: %s' % (nmeaobj.fields[i][0], nmeaobj.data[i]) for i in range(len(nmeaobj.fields))]
            gps_data = {}
            for item in extracted_data:
                tmp = item.split(': ')
                gps_data[tmp[0]] = tmp[1]

            # GGA type of NMEA sentence
            if 'Antenna Alt above sea level (mean)' in gps_data:
                if int(gps_data['Latitude'][0]) != 0:
                    gps_data['Latitude'] = float(gps_data['Latitude'][0:2]) + float(gps_data['Latitude'][2:])/60
                else:
                    gps_data['Latitude'] = float(gps_data['Latitude'][0:3]) + float(gps_data['Latitude'][3:])/60

                if int(gps_data['Longitude'][0]) != 0:
                    gps_data['Longitude'] = float(gps_data['Longitude'][0:2]) + float(gps_data['Longitude'][2:])/60
                else:
                    gps_data['Longitude'] = float(gps_data['Longitude'][0:3]) + float(gps_data['Longitude'][3:])/60

                gps_data['Antenna Alt above sea level (mean)'] = float(gps_data['Antenna Alt above sea level (mean)'])
                gps_data['Timestamp'] = float(gps_data['Timestamp'])

                '''
                # Save the UNIX timestamp. As the timestamp provides hour/min/sec only, add the date
                today_date = datetime.date.today()
                today_date = [int(i) for i in today_date.strftime("%Y-%m-%d").split('-')]                

                complete_date = datetime.datetime(year=today_date[0], 
                                                month=today_date[1], 
                                                day=today_date[2], 
                                                hour=int(gps_data['Timestamp'][0:2]), 
                                                minute=int(gps_data['Timestamp'][2:4]), 
                                                second=int(gps_data['Timestamp'][4:6]))

                gps_data['Timestamp'] = time.mktime(complete_date.timetuple())

                '''

            # HDT NMEA sentence
            if 'Heading' in gps_data:
                if gps_data['Heading'] == '':
                    gps_data['Heading'] = -2000
                else:
                    gps_data['Heading'] = float(gps_data['Heading'])

                # No need to restrict heading to [-pi, pi] since it will be done 
                # inside 'ground_gimbal_follows_drone' function 
                #if gps_data['Heading'] > 180:
                #    gps_data['Heading'] = gps_data['Heading'] - 360

                # Make the timestamp the same format as the GGA sentence
                for stream in self.stream_info:
                    if stream['msg_type'] == 'NMEA':
                        # Need to update faster
                        if 'msec' in stream['interval']:
                            1
                        #elif 'sec' in stream['interval']:                            
                        else:
                            gps_data['Timestamp'] = ''
                            for i in datetime.datetime.utcnow().timetuple()[3:6]:
                                tmp = str(i)
                                if len(tmp) == 1:
                                    tmp = '0' + tmp
                                gps_data['Timestamp'] = gps_data['Timestamp'] + tmp
                            gps_data['Timestamp'] = float(int(gps_data['Timestamp']))

            if self.DBG_LVL_2 or len(self.NMEA_buffer):
                if self.DBG_LVL_0:
                    print('\nSAVES NMEA DATA INTO BUFFER')    
                self.NMEA_buffer.append(gps_data)  

        except Exception as e:
            # Do not save any other comand line
            if self.DBG_LVL_1:
                print('\nEXCEPTION PROCESSING NMEA')
            if self.DBG_LVL_0:
                print('\nThis is the exception: ', e) 

    def process_pvtcart_sbf_data(self, raw_data):
        """
        Parses an PVTCart SBF sentence. To be able to receive this block, the receiver should be configured to output SBF sentences.

        The PVTCart SBF sentence provides geocentric coordinates X, Y, Z for the position of the receiver.

        The coordinates are stored in ``SBF_frame_buffer``. Each ``MAX_SBF_BUFF_LEN`` entries of ``SBF_frame_buffer``, the buffer is flushed and its contents are saved on disk.

        *More about the information carried by this block in "mosaic-go Reference Guide"*.

        Args:
            raw_data (bytes): received data corresponding to the PVTCart SBF block.
        """

        format_before_padd = '<1c3H1I1H2B3d5f1d1f4B2H1I2B4H1B' 
        format_after_padd = format_before_padd + str(sys.getsizeof(raw_data)-struct.calcsize(format_before_padd)) + 'B'

        TOW = struct.unpack('<1I', raw_data[7:11])[0]
        WNc = struct.unpack('<1H', raw_data[11:13])[0]        
        MODE =  struct.unpack('<1B', raw_data[13:14])[0]
        ERR =  struct.unpack('<1B', raw_data[14:15])[0]
        X =  struct.unpack('<1d', raw_data[15:23])[0]
        Y =  struct.unpack('<1d', raw_data[23:31])[0]
        try:
            Z = struct.unpack('<1d', raw_data[31:39])[0]
        except Exception as e:
            if self.DBG_LVL_0:
                print("[DEBUG]: error unpacking Z coord, ", e)
        Undulation =  struct.unpack('<1f', raw_data[39:43])[0]
        Vx =  struct.unpack('<1f', raw_data[43:47])[0]
        Vy = struct.unpack('<1f', raw_data[47:51])[0]
        Vz =  struct.unpack('<1f', raw_data[51:55])[0]
        COG =  struct.unpack('<1f', raw_data[55:59])[0]
        RxClkBias = struct.unpack('<1d', raw_data[59:67])[0]
        RxClkDrift =  struct.unpack('<1f', raw_data[67:71])[0]
        TimeSystem = struct.unpack('<1B', raw_data[71:72])[0]
        Datum =  struct.unpack('<1B', raw_data[72:73])[0]
        NrSV = struct.unpack('<1B', raw_data[73:74])[0]
        WACorrInfo =  struct.unpack('<1B', raw_data[74:75])[0]
        ReferenceID =  struct.unpack('<1H', raw_data[75:77])[0]
        MeanCorrAge = struct.unpack('<1H', raw_data[77:79])[0]
        SignalInfo =  struct.unpack('<1I', raw_data[79:83])[0] 
        AlertFlag = struct.unpack('<1B', raw_data[83:84])[0]
        NrBases =  struct.unpack('<1B', raw_data[84:85])[0]
        PPPInfo =  struct.unpack('<1H', raw_data[85:87])[0]
        Latency =  struct.unpack('<1H', raw_data[87:89])[0]        
        HAccuracy =  struct.unpack('<1H', raw_data[89:91])[0]         
        VAccuracy =  struct.unpack('<1H', raw_data[91:93])[0]  

        '''
        pvt_msg_format = {'TOW': TOW, 'WNc': WNc, 'MODE': MODE, 'ERR': ERR, 'X': X, 'Y': Y, 'Z': Z,
                          'Undulation': Undulation, 'Vx': Vx, 'Vy': Vy, 'Vz': Vz, 'COG': COG,
                          'RxClkBias': RxClkBias, 'RxClkDrift': RxClkDrift, 'TimeSystem': TimeSystem, 'Datum': Datum,
                          'NrSV': NrSV, 'WACorrInfo': WACorrInfo, 'ReferenceID': ReferenceID, 'MeanCorrAge': MeanCorrAge,
                          'SignalInfo': SignalInfo, 'AlertFlag': AlertFlag, 'NrBases': NrBases, 'PPPInfo': PPPInfo,
                          'Latency': Latency, 'HAccuracy': HAccuracy, 'VAccuracy': VAccuracy}        
        '''
        pvt_data_we_care = {'ID': 'Coordinates', 'TOW': TOW, 'WNc': WNc, 'MODE': MODE, 'ERR': ERR, 
                            'X': X, 'Y': Y, 'Z': Z, 'Datum': Datum}

        self.SBF_frame_buffer.append(pvt_data_we_care)

        if len(self.SBF_frame_buffer) > self.MAX_SBF_BUFF_LEN:
            with open(self.save_filename + '.txt', 'a+') as file:      
                file.write(json.dumps(self.SBF_frame_buffer))       
                print("[DEBUG]: Saved GPS cooridnates file")     
            self.SBF_frame_buffer = []

    def process_pvtgeodetic_sbf_data(self, raw_data):
        """
        Parses an PVTGeodetic SBF sentence. To be able to receive this block, the receiver should be configured to output SBF sentences.

        The PVTGeodetic SBF sentence provides geodetic coordinates lat, lon, h for the position of the receiver.

        The coordinates are stored in ``SBF_frame_buffer``. Each ``MAX_SBF_BUFF_LEN`` entries of ``SBF_frame_buffer``, the buffer is flushed and its contents are saved on disk.

        *More about the information carried by this block in "mosaic-go Reference Guide"*.

        Args:
            raw_data (bytes): received data corresponding to the PVTGeodetic SBF block.
        """

        TOW = struct.unpack('<1I', raw_data[7:11])[0]
        WNc = struct.unpack('<1H', raw_data[11:13])[0]        
        MODE =  struct.unpack('<1B', raw_data[13:14])[0]
        ERR =  struct.unpack('<1B', raw_data[14:15])[0]
        LAT =  struct.unpack('<1d', raw_data[15:23])[0]
        LON =  struct.unpack('<1d', raw_data[23:31])[0]
        H = struct.unpack('<1d', raw_data[31:39])[0]
        Undulation =  struct.unpack('<1f', raw_data[39:43])[0]
        Vx =  struct.unpack('<1f', raw_data[43:47])[0]
        Vy = struct.unpack('<1f', raw_data[47:51])[0]
        Vz =  struct.unpack('<1f', raw_data[51:55])[0]
        COG =  struct.unpack('<1f', raw_data[55:59])[0]
        RxClkBias = struct.unpack('<1d', raw_data[59:67])[0]
        RxClkDrift =  struct.unpack('<1f', raw_data[67:71])[0]
        TimeSystem = struct.unpack('<1B', raw_data[71:72])[0]
        Datum =  struct.unpack('<1B', raw_data[72:73])[0]        

        pvt_data_we_care = {'ID': 'Coordinates', 'TOW': TOW, 'WNc': WNc, 'MODE': MODE, 'ERR': ERR, 
                            'LAT': LAT, 'LON': LON, 'HEIGHT': H, 'Datum': Datum}

        self.SBF_frame_buffer.append(pvt_data_we_care)

        if len(self.SBF_frame_buffer) > self.MAX_SBF_BUFF_LEN:
            with open(self.save_filename + '.txt', 'a+') as file:      
                file.write(json.dumps(self.SBF_frame_buffer))      
                print("[DEBUG]: Saved GPS cooridnates file")           
            self.SBF_frame_buffer = []

    def process_atteuler_sbf_data(self, raw_data):
        """
        Parses an AttEuler SBF sentence. To be able to receive this block, the receiver should be configured to output SBF sentences.

        The AttEuler SBF sentence provides heading information of the imaginary line formed by the first and second antennas, w.r.t the North. To do so, the heading, pitch, and roll axis are defined.

        The coordinates are stored in ``SBF_frame_buffer``. Each ``MAX_SBF_BUFF_LEN`` entries of ``SBF_frame_buffer``, the buffer is flushed and its contents are saved on disk.

        *More about all axis definition, and heading information in "mosaic-go Reference Guide"*.

        Args:
            raw_data (bytes): received data corresponding to the AttEurler SBF sentence.
        """

        TOW = struct.unpack('<1I', raw_data[7:11])[0]
        WNc = struct.unpack('<1H', raw_data[11:13])[0]        
        NrSV = struct.unpack('<1B', raw_data[13:14])[0]
        ERR =  struct.unpack('<1B', raw_data[14:15])[0]
        MODE =  struct.unpack('<1H', raw_data[15:17])[0]
        Heading =  struct.unpack('<1f', raw_data[19:23])[0]
        try:
            Pitch =  struct.unpack('<1f', raw_data[23:27])[0]
        except Exception as e:
            print("[DEBUG]: Error unpacking Pitch attitude, ", e)
        Roll = struct.unpack('<1f', raw_data[27:31])[0]
        PitchDot =  struct.unpack('<1f', raw_data[31:35])[0]
        RollDot =  struct.unpack('<1f', raw_data[35:39])[0]
        HeadingDot = struct.unpack('<1f', raw_data[39:43])[0]

        '''
        atteul_msg_format = {'TOW': TOW, 'WNc': WNc, 'NrSV': NrSV, 'ERR': ERR, 'MODE': MODE, 
                             'Heading': Heading, 'Pitch': Pitch, 'Roll': Roll, 
                             'PitchDot': PitchDot, 'RollDot': RollDot, 'HeadingDot': HeadingDot}        
        '''
        atteul_msg_useful = {'ID': 'Heading', 'TOW': TOW, 'WNc': WNc,'ERR': ERR, 'MODE': MODE, 
                             'Heading': Heading, 'Pitch': Pitch, 'Roll': Roll}

        self.SBF_frame_buffer.append(atteul_msg_useful)

        if len(self.SBF_frame_buffer) > self.MAX_SBF_BUFF_LEN:
            with open(self.save_filename + '.txt', 'a+') as file:      
                file.write(json.dumps(self.SBF_frame_buffer))    
                print("[DEBUG]: Saved GPS cooridnates file")             
            self.SBF_frame_buffer = []

    def parse_septentrio_msg(self, rx_msg):
        """
        Parses the received message and process it depending if it is an SBF or NMEA message

        Raises an exception if *any* problem is encountered when parsing the message.

        Args:
            rx_msg (bytes or str): received msg from Ronin RS2 gimbal.
        """

        try:
            if self.DBG_LVL_1:
                print('\nPARSING RX DATA')
            if self.DBG_LVL_0:
                print('0 POS: ', rx_msg[0])
                print('\nRX DATA LENGTH: ', len(rx_msg), rx_msg.decode('utf-8', 'ignore'))

            # The SBF output follows the $ sync1 byte, with a second sync byte that is the symbol @ or in utf-8 the decimal 64
            # Bytes indexing  works as follows:
            # One integer gives a decimal
            # A slice (i.e. 0:1) gives a bytes object ---> rx_msg[0] != rx_msg[0:1]
            if rx_msg[0] == 64:                
                if self.DBG_LVL_0:
                    print('\nDETECTS SBF')

                # Header detection
                #SYNC = struct.unpack('<1c', rx_msg[0:1]) 
                CRC = struct.unpack('<1H', rx_msg[1:3])                
                ID_SBF_msg = struct.unpack('<1H', rx_msg[3:5])
                LEN_SBF_msg = struct.unpack('<1H', rx_msg[5:7])

                # According to the manual, the LEN should always be a multiple of 4, otherwise 
                # there is an error
                if np.mod(int(LEN_SBF_msg[0]),4) != 0 :
                    if self.DBG_LVL_1:
                        print('\nDiscarded frame as LEN_SBF_msg is not multiple of 4, LEN_SBF_msg: ', LEN_SBF_msg[0])
                    return

                '''
                # CRC checker
                crc16_checker = Calculator(Crc16.CCITT)
                idx_bytes_crc_to_read = 7+int(LEN_SBF_msg[0])-8
                crc_data = rx_msg[7:idx_bytes_crc_to_read]
                print(type(crc_data))
                crc16 = crc16_checker.checksum(crc_data)
                print(rx_msg[1:3], type(crc16))
                if CRC[0] != crc16:
                    if self.DBG_LVL_1:
                        print('\nDiscarded frame cause it did not pass the CRC check')
                    return
                '''

                # PVTGeodetic SBF sentenced identified by ID 4007
                if ID_SBF_msg[0] & 8191 == 4007: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID                    
                    self.process_pvtgeodetic_sbf_data(rx_msg)
                    #print("Received pvt geodetic")

                # PVTCart SBF sentence identified by ID 4006
                if ID_SBF_msg[0] & 8191 == 4006: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID                    
                    self.process_pvtcart_sbf_data(rx_msg)
                    #print("Received pvtcart")

                # PosCovCartesian SBF sentence identified by ID 5905
                if ID_SBF_msg[0] & 8191 == 5905: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                    if self.DBG_LVL_1:
                        print('\nReceived PosCovCartesian SBF sentence')

                if ID_SBF_msg[0] & 8191 == 5907: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                    if self.DBG_LVL_1:
                        print('\nReceived VelCovCartesian SBF sentence')

                if ID_SBF_msg[0] & 8191 == 4043: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                    if self.DBG_LVL_1:
                        print('\nReceived BaseVectorCart SBF sentence')

                if ID_SBF_msg[0] & 8191 == 5942: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                    if self.DBG_LVL_1:
                        print('\nReceived AuxAntPositions SBF sentence')

                if ID_SBF_msg[0] & 8191 == 5938: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                    if self.DBG_LVL_1:
                        1
                        #print('\nReceived AttEuler SBF sentence')
                    print("Received attitude")
                    self.process_atteuler_sbf_data(rx_msg)

                if ID_SBF_msg[0] & 8191 == 5939: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                    if self.DBG_LVL_1:
                        print('\nReceived AttCovEuler SBF sentence')

                if ID_SBF_msg[0] & 8191 == 5943: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                    if self.DBG_LVL_1:
                        print('\nReceived EndOfAtt SBF sentence')

                # Sort SBF buffer entries by time (this is double checking, as they are expected to arrive in time order)
                self.SBF_frame_buffer.sort(key=lambda k : k['TOW'])

                # Merge buffer entries corresponding to the same TOW
                #self.util_merge_buffer_entries_by_timetag(type_msg='SBF')

            # NMEA Output starts with the letter G, that in utf-8 is the decimal 71
            elif rx_msg[0] == 71:
                if self.DBG_LVL_0:
                    print('\nDETECTS NMEA')
                self.process_gps_nmea_data(rx_msg[:-1])
                #self.util_merge_buffer_entries_by_timetag(type_msg='NMEA')

        except Exception as e:
            if self.DBG_LVL_1:
                print('\nEXCEPTION IN parse_septentrio_msg')
            if self.DBG_LVL_0:
                print('\nThis is the exception: ', e, )
                logging.exception("\nError occurred: ")

    def get_last_sbf_buffer_info(self, what='Coordinates'):
        """
        Retrieves the last gps coordinates, the last heading information of the receiver, or both things.

        Args:
            what (str, optional): defines which information wants to be retrieved from ``SBF_frame_buffer``. Options are: 'Coordinates', 'Heading' or 'Both'. Defaults to 'Coordinates'.

        Returns:
            data_to_return (dict): ``X``, ``Y`` and ``Z`` coordinates in absence of any error. Otherwise, the error code.
            data_to_return_2 (dict, optional): ``Heading`` angle in [0, 360] degrees.
        """

        # Coordinates
        data_to_return = []       

        # Heading
        data_to_return_2 = []     

        len_sbf_buffer = len(self.SBF_frame_buffer)

        cnt = 1
        if  len_sbf_buffer > 0:
            if what == 'Coordinates' or what == 'Heading':
                while(len(data_to_return) == 0):
                    if cnt > len_sbf_buffer:
                        print('\n[WARNING]: Either heading or coordinates information not available')

                        if what == 'Coordinates':
                            print('\n[WARNING]: Return ERR_GPS_CODE_NO_COORD_AVAIL for each coordinate in data_to_return')
                            data_to_return = {'X': self.ERR_GPS_CODE_NO_COORD_AVAIL, 'Y': self.ERR_GPS_CODE_NO_COORD_AVAIL, 'Z': self.ERR_GPS_CODE_NO_COORD_AVAIL}
                            return data_to_return

                        elif what == 'Heading':
                            print('\n[WARNING]: Return ERR_GPS_CODE_NO_HEAD_AVAIL for heading in data_to_return')
                            data_to_return = {'Heading': self.ERR_GPS_CODE_NO_HEAD_AVAIL}
                            return data_to_return

                    dict_i = self.SBF_frame_buffer[-cnt]
                    if dict_i['ID'] == what:
                        # Both AttEuler and PVTCart return 'Error' field equal to 0, when there is no error
                        if dict_i['ERR'] == 0:
                            data_to_return = dict_i

                    cnt = cnt + 1             

                if self.DBG_LVL_1:
                    print('\n[DEBUG_1]: retrieved a ' + what + ' response') 

                return data_to_return

            elif what == 'Both':
                while((len(data_to_return) == 0) or (len(data_to_return_2) == 0)):     
                    if cnt > len_sbf_buffer:
                        print('\n[WARNING]: heading stream not on or not heading info available /or/ coordinates stream not on or no coordinates available')
                        print('\n[WARNING]: Return ERR_GPS_CODE_SMALL_BUFF_SZ for each coordinate in data_to_return')
                        print('\n[WARNING]: Return ERR_GPS_CODE_SMALL_BUFF_SZ for heading in data_to_return_2')

                        data_to_return = {'X': self.ERR_GPS_CODE_SMALL_BUFF_SZ, 'Y': self.ERR_GPS_CODE_SMALL_BUFF_SZ, 'Z': self.ERR_GPS_CODE_SMALL_BUFF_SZ}
                        data_to_return_2 = {'Heading': self.ERR_GPS_CODE_SMALL_BUFF_SZ}

                        return data_to_return, data_to_return_2

                    dict_i = self.SBF_frame_buffer[-cnt]

                    if dict_i['ID'] == 'Heading':
                        # Both AttEuler and PVTCart return 'Error' field equal to 0, when there is no error
                        if dict_i['ERR'] == 0:
                            data_to_return_2 = dict_i                           

                    elif dict_i['ID'] == 'Coordinates':
                        # Both AttEuler and PVTCart return 'Error' field equal to 0, when there is no error
                        if dict_i['ERR'] == 0:                            
                            data_to_return = dict_i

                    cnt = cnt + 1

                if self.DBG_LVL_1:
                    print('\n[DEBUG_1]: retrieved a Heading and Coordinates response') 

                return data_to_return, data_to_return_2
        else:
            print('\n[WARNING]: nothing in SBF buffer')
            if what == 'Coordinates':
                data_to_return = {'X': self.ERR_GPS_CODE_BUFF_NULL, 'Y': self.ERR_GPS_CODE_BUFF_NULL, 'Z': self.ERR_GPS_CODE_BUFF_NULL}
                print('\n[ERROR]: Return ERR_GPS_CODE_BUFF_NULL for each coordinate in data_to_return')
                return data_to_return

            elif what == 'Heading':
                data_to_return = {'Heading': self.ERR_GPS_CODE_BUFF_NULL}
                print('\n[ERROR]: Return ERR_GPS_CODE_BUFF_NULL for each heading in in data_to_return')
                return data_to_return

            elif what == 'Both':
                data_to_return = {'X': self.ERR_GPS_CODE_BUFF_NULL, 'Y': self.ERR_GPS_CODE_BUFF_NULL, 'Z': self.ERR_GPS_CODE_BUFF_NULL}
                data_to_return_2 = {'Heading': self.ERR_GPS_CODE_BUFF_NULL}
                print('\n[ERROR]: Return ERR_GPS_CODE_BUFF_NULL for each coordinate in data_to_return and for heading in data_to_return_2')
                return data_to_return, data_to_return_2    

    def check_coord_closeness(self, coordinates2compare, tol=5):
        """
        Checks how close is a coordinate with respect to the actual node position.

        It is assumed that both pair of coordinates to be compared lay at the same height.

        Args:
            coordinates2compare (dict): keys of the dictionary are 'LAT' and 'LON', and each of them has ONLY ONE value.
            tol (int, optional): margin in meters by which the coordinates in comparison are close or not. Defaults to 5.

        Returns:
            True (bool): True if close , False otherwise.
        """

        coords, head_info = self.get_last_sbf_buffer_info(what='Both')

        if coords['X'] == self.ERR_GPS_CODE_BUFF_NULL or self.ERR_GPS_CODE_SMALL_BUFF_SZ:
            return None
        else:
            lat_node, lon_node, height_node = geocentric2geodetic(coords['X'], coords['Y'], coords['Z'])
            wgs84_geod = Geod(ellps='WGS84')

            _,_, dist = wgs84_geod.inv(lon_node, lat_node, coordinates2compare['LON'], coordinates2compare['LAT'])

            if dist < tol:
                return True
            else:
                return False

    def serial_receive(self, serial_instance_actual, stop_event):
        """
        Callback function invoked by the thread responsible for handling I/O communication between the host computer and the Septentrio mosaic-go receiver.

        Most of all messages sent by Septentrio mosaic-go receiver start with an "$" character.

        The next character depends if the message is an echo of a command sent by the host computer, or if the message is an answer to a command sent by the host computer.

        Echoes of commands sent by the host computer, don't follow the ``$`` character with any predefined character. This messages are discarded by the method ``parse_septentrio_msg``.

        Messages that answer a command sent by the host computer, DO start with a predefined character. The predefined character depends wheter the answer arises from a NMEA sentence or an SBF sentence. This messages are parsed by the method ``parse_septentrio_msg``.

        Args:
            serial_instance_actual (Serial): serial connection instance.
            stop_event (threading.Event): Event to be used to stop the reading of the serial port.
        """

        while not stop_event.is_set():
            # This is if only NMEA messages are received
            #rx_msg = serial_instance_actual.readline()

            # This looks for the start of a sentence in either NMEA or SBF messages
            try:
                rx_msg = serial_instance_actual.read_until(expected='$'.encode('utf-8'))
                if len(rx_msg) > 0:
                    self.parse_septentrio_msg(rx_msg)
            except Exception as e:
                print('[WARNING]: No bytes to read, ', e)

    def start_thread_gps(self, interface='USB'):
        """
        Starts the GPS thread responsible for handling I/O communication between the host computer and the Septentrio mosaic-go receiver.

        Creates the threading Event that is set when the I/O communication must be closed.

        Args:
            interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
        """

        self.event_stop_thread_gps = threading.Event()

        if interface == 'USB' or interface == 'COM':
            t_receive = threading.Thread(target=self.serial_receive, args=(self.serial_instance, self.event_stop_thread_gps))

        #elif interface == 'IP':
        #    t_receive = threading.Thread(target=self.socket_receive, args=(self.event_stop_thread_gps))

        t_receive.start()
        print('\n[DEBUG]: Septentrio GPS thread opened')
        time.sleep(0.5)

    def stop_thread_gps(self, interface='USB'):
        """
        Stops the GPS thread.

        Args:
            interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
        """

        self.event_stop_thread_gps.set()
        time.sleep(0.1)

        if interface =='USB' or interface == 'COM':
            self.serial_instance.close()

        elif interface =='IP':
            self.socket.close()

        print('\n[DEBUG]: Septentrio GPS thread closed')

    def sendCommandGps(self, cmd, interface='USB'):
        """
        Sends a command to the Septentrio mosaic-go receiver.

        Blocks this thread execution for 500 ms.

        Args:
            cmd (str): command to be sent to the Septentrio mosaic-go receiver. The available list of commands is defined in "mosaic-go Reference Guide".
            interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
        """

        cmd_eof = cmd + '\n'

        if interface =='USB':
            self.serial_instance.write(cmd_eof.encode('utf-8'))
        #elif interface == 'IP':
        #    self.socket.sendall(cmd_eof.encode('utf-8'))

        time.sleep(0.5)

    def start_gps_data_retrieval(self, stream_number=1, interface='USB', interval='sec1', msg_type='SBF', 
                                 nmea_type='+GGA+HDT', sbf_type='+PVTCartesian+AttEuler'):
        """
        Starts the streaming of the NMEA or SBF sentences.

        Wrapper of ``sendCommandGps``.

        Args:
            stream_number (int, optional): each interface can have multiple data streams. This parameter defined which is the number of the stream for the given ``interface``. Defaults to 1.
            interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
            interval (str, optional): time regularity used by the Septentrio receiver to sense the given SBF/NMEA sentence. Can be any of the following self-explanatory names: 'msec10', 'msec20', 'msec40', 'msec50', 'msec100', 'msec200', 'msec500', 'sec1', 'sec2', 'sec5', 'sec10', 'sec15', 'sec30', 'sec60', 'min2', 'min5', 'min10', 'min15', 'min30', 'min60'. Defaults to 'sec1'.
            msg_type (str, optional): ``NMEA`` or ``SBF``. Defaults to ``SBF``.
            nmea_type (str, optional): name/s of the NMEA sentence/s to be retrieved. If multiple sentences, each sentence string identifier should be preceded by '+', and all the string should be concatenated in one single string (i.e. ``+HDT+GGA``). Defaults to ``+GGA+HDT``.
            sbf_type (str, optional): name/s of the SBF sentence/s to be retrieved. If multiple sentences, each sentence string identifier should be preceded by '+', and all the string should be concatenated in one single string (i.e. '+PVTCartesian+AttEuler'). Each sentence needs to have a parsing function that is called in ``parse_septentrio_msg`` in the part corresponding to the id of the sentence. Defaults to ``+PVTCartesian+AttEuler``.
        """

        if interface == 'USB' or interface == 'COM':
            if msg_type == 'SBF':
                cmd1 = 'setDataInOut, ' + interface + str(self.interface_number) + ',, ' + '+SBF'
                cmd2 = 'setSBFOutput, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', ' +  sbf_type + ', ' + interval
            elif msg_type == 'NMEA':
                cmd1 = 'setDataInOut, ' + interface + str(self.interface_number) + ',, ' + '+NMEA'
                cmd2 = 'sno, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', ' + nmea_type + ', ' + interval

        self.stream_info.append({'interface': interface, 'stream_number': stream_number, 'interval': interval, 'msg_type': msg_type})

        self.sendCommandGps(cmd1)
        self.sendCommandGps(cmd2)

        if self.DBG_LVL_1:
            print('\n'+ cmd1)
            print('\n'+ cmd2)

    def stop_gps_data_retrieval(self, stream_number=1, interface='USB', msg_type='+NMEA+SBF'):   
        """
        Stops the streaming of the NMEA/SBF sentences initiated by calling ``start_gps_data_retrieval``. 

        Wrapper of ``sendCommandGps``.

        *Unexpected behaviour to be noted by* **developers**: *it seems that if the stream is not stopped by the time the serial connection is closed, then, when the user opens a new serial connection, Septentrio will start sending all the SBF or NMEA messages that were produced between the last time the serial connection was closed and the time it is opened again*.

        Args:
            stream_number (int, optional): number of the stream to be stopped. Defaults to 1.
            interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
            msg_type (str, optional): the message type corresponding to the stream ``stream_number``. Options: 'SBF', 'NMEA' or '+NMEA+SBF' or '+SBF+NMEA' (the last two are the same). Defaults to ``NMEA+SBF``.
        """

        if interface == 'USB' or interface == 'COM':
            if msg_type == 'SBF':
                cmd1 = 'setSBFOutput, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', none '
                cmd2 = 'sdio, ' + interface + str(self.interface_number) + ',, -SBF'

                self.sendCommandGps(cmd1)
                self.sendCommandGps(cmd2)
            elif msg_type == 'NMEA':
                cmd1 = 'sno, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', none ' 
                cmd2 = 'sdio, ' + interface + str(self.interface_number) + ',, -NMEA'

                self.sendCommandGps(cmd1)
                self.sendCommandGps(cmd2)
            elif msg_type == '+NMEA+SBF' or msg_type == '+SBF+NMEA':
                cmd1 = 'setSBFOutput, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', none '
                cmd2 = 'sdio, ' + interface + str(self.interface_number) + ',, -SBF'
                cmd3 = 'sno, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', none ' 
                cmd4 = 'sdio, ' + interface + str(self.interface_number) + ',, -NMEA'

                self.sendCommandGps(cmd1)
                self.sendCommandGps(cmd2)       
                self.sendCommandGps(cmd3)
                self.sendCommandGps(cmd4)       

    def setHeadingOffset(self, offset_wrt_xaxis):
        """
        Sets the offset mismatch between between the imaginary line formed by the first and second antennas AND the longitudinal axis of the node.

        Wrapper of ``sendCommandGps``.

        Args:
            offset_wrt_xaxis (float): angle (degrees) *from* the longitudinal axis of the node *to* the imaginary line formed by the first and second antennas.
        """

        self.sendCommandGps(cmd='setAttitudeOffset, ' + str(offset_wrt_xaxis))

__init__(DBG_LVL_1=False, DBG_LVL_2=False, DBG_LVL_0=False, save_filename='GPS')

Contructor for the GpsSignaling class. Important attributes are:

  • register_sbf_sentences_by_id: list of integers containing the expected SBF sentences that are going to be requested at the receiver. The integer is the ID of the sentence described in Septentrio's manual.

  • SBF_frame_buffer: list of dictionaries containing the SBF frames during the execution of the thread responsible for receiving SBF frames.

  • MAX_SBF_BUFF_LEN: Maximum number of entries in the SBF frame buffer before saving, cleaning and starting again

Parameters:

Name Type Description Default
DBG_LVL_1 bool

used to print less verbose than level 0. Defaults to False.

False
DBG_LVL_2 bool

used to print less verbose than level 2. Defaults to False.

False
DBG_LVL_0 bool

prints all the verbose available. Defaults to False.

False
save_filename str

name of the file where to save the record of GPS coordinates along an experiment. Defaults to 'GPS'.

'GPS'
Source code in a2gmeasurements.py
def __init__(self, DBG_LVL_1=False, DBG_LVL_2=False, DBG_LVL_0=False, save_filename='GPS'):
    """
    Contructor for the GpsSignaling class. Important attributes are:

    * ``register_sbf_sentences_by_id``: list of integers containing the expected SBF sentences that are going to be requested at the receiver. The integer is the ID of the sentence described in Septentrio's manual.

    * ``SBF_frame_buffer``: list of dictionaries containing the SBF frames during the execution of the thread responsible for receiving SBF frames.

    * ``MAX_SBF_BUFF_LEN``: Maximum number of entries in the SBF frame buffer before saving, cleaning and starting again

    Args:
        DBG_LVL_1 (bool, optional): used to print less verbose than level 0. Defaults to False.
        DBG_LVL_2 (bool, optional): used to print less verbose than level 2. Defaults to False.
        DBG_LVL_0 (bool, optional): prints all the verbose available. Defaults to False.
        save_filename (str, optional): name of the file where to save the record of GPS coordinates along an experiment. Defaults to 'GPS'.
    """

    # Initializations
    datestr = datetime.datetime.now()
    datestr = datestr.strftime('%Y-%m-%d-%H-%M-%S-%f')
    self.save_filename = save_filename + '-' + datestr
    self.SBF_frame_buffer = []
    self.NMEA_buffer = []
    self.stream_info = []
    self.MAX_SBF_BUFF_LEN = 100

    self.DBG_LVL_1 = DBG_LVL_1
    self.DBG_LVL_2 = DBG_LVL_2
    self.DBG_LVL_0 = DBG_LVL_0

    # Expected SBF sentences to be requested. Add or remove according to planned
    # SBF sentences to be requested.
    self.register_sbf_sentences_by_id = [4006, 5938] # PVTCart, AttEul
    self.n_sbf_sentences = len(self.register_sbf_sentences_by_id)

    self.ERR_GPS_CODE_GENERAL = -1.5e3
    self.ERR_GPS_CODE_SMALL_BUFF_SZ = -2.5e3       
    self.ERR_GPS_CODE_BUFF_NULL = -3.5e3
    self.ERR_GPS_CODE_NO_COORD_AVAIL = -4.5e3 
    self.ERR_GPS_CODE_NO_HEAD_AVAIL = -5.5e3

check_coord_closeness(coordinates2compare, tol=5)

Checks how close is a coordinate with respect to the actual node position.

It is assumed that both pair of coordinates to be compared lay at the same height.

Parameters:

Name Type Description Default
coordinates2compare dict

keys of the dictionary are 'LAT' and 'LON', and each of them has ONLY ONE value.

required
tol int

margin in meters by which the coordinates in comparison are close or not. Defaults to 5.

5

Returns:

Name Type Description
True bool

True if close , False otherwise.

Source code in a2gmeasurements.py
def check_coord_closeness(self, coordinates2compare, tol=5):
    """
    Checks how close is a coordinate with respect to the actual node position.

    It is assumed that both pair of coordinates to be compared lay at the same height.

    Args:
        coordinates2compare (dict): keys of the dictionary are 'LAT' and 'LON', and each of them has ONLY ONE value.
        tol (int, optional): margin in meters by which the coordinates in comparison are close or not. Defaults to 5.

    Returns:
        True (bool): True if close , False otherwise.
    """

    coords, head_info = self.get_last_sbf_buffer_info(what='Both')

    if coords['X'] == self.ERR_GPS_CODE_BUFF_NULL or self.ERR_GPS_CODE_SMALL_BUFF_SZ:
        return None
    else:
        lat_node, lon_node, height_node = geocentric2geodetic(coords['X'], coords['Y'], coords['Z'])
        wgs84_geod = Geod(ellps='WGS84')

        _,_, dist = wgs84_geod.inv(lon_node, lat_node, coordinates2compare['LON'], coordinates2compare['LAT'])

        if dist < tol:
            return True
        else:
            return False

get_last_sbf_buffer_info(what='Coordinates')

Retrieves the last gps coordinates, the last heading information of the receiver, or both things.

Parameters:

Name Type Description Default
what str

defines which information wants to be retrieved from SBF_frame_buffer. Options are: 'Coordinates', 'Heading' or 'Both'. Defaults to 'Coordinates'.

'Coordinates'

Returns:

Name Type Description
data_to_return dict

X, Y and Z coordinates in absence of any error. Otherwise, the error code.

data_to_return_2 (dict, optional)

Heading angle in [0, 360] degrees.

Source code in a2gmeasurements.py
def get_last_sbf_buffer_info(self, what='Coordinates'):
    """
    Retrieves the last gps coordinates, the last heading information of the receiver, or both things.

    Args:
        what (str, optional): defines which information wants to be retrieved from ``SBF_frame_buffer``. Options are: 'Coordinates', 'Heading' or 'Both'. Defaults to 'Coordinates'.

    Returns:
        data_to_return (dict): ``X``, ``Y`` and ``Z`` coordinates in absence of any error. Otherwise, the error code.
        data_to_return_2 (dict, optional): ``Heading`` angle in [0, 360] degrees.
    """

    # Coordinates
    data_to_return = []       

    # Heading
    data_to_return_2 = []     

    len_sbf_buffer = len(self.SBF_frame_buffer)

    cnt = 1
    if  len_sbf_buffer > 0:
        if what == 'Coordinates' or what == 'Heading':
            while(len(data_to_return) == 0):
                if cnt > len_sbf_buffer:
                    print('\n[WARNING]: Either heading or coordinates information not available')

                    if what == 'Coordinates':
                        print('\n[WARNING]: Return ERR_GPS_CODE_NO_COORD_AVAIL for each coordinate in data_to_return')
                        data_to_return = {'X': self.ERR_GPS_CODE_NO_COORD_AVAIL, 'Y': self.ERR_GPS_CODE_NO_COORD_AVAIL, 'Z': self.ERR_GPS_CODE_NO_COORD_AVAIL}
                        return data_to_return

                    elif what == 'Heading':
                        print('\n[WARNING]: Return ERR_GPS_CODE_NO_HEAD_AVAIL for heading in data_to_return')
                        data_to_return = {'Heading': self.ERR_GPS_CODE_NO_HEAD_AVAIL}
                        return data_to_return

                dict_i = self.SBF_frame_buffer[-cnt]
                if dict_i['ID'] == what:
                    # Both AttEuler and PVTCart return 'Error' field equal to 0, when there is no error
                    if dict_i['ERR'] == 0:
                        data_to_return = dict_i

                cnt = cnt + 1             

            if self.DBG_LVL_1:
                print('\n[DEBUG_1]: retrieved a ' + what + ' response') 

            return data_to_return

        elif what == 'Both':
            while((len(data_to_return) == 0) or (len(data_to_return_2) == 0)):     
                if cnt > len_sbf_buffer:
                    print('\n[WARNING]: heading stream not on or not heading info available /or/ coordinates stream not on or no coordinates available')
                    print('\n[WARNING]: Return ERR_GPS_CODE_SMALL_BUFF_SZ for each coordinate in data_to_return')
                    print('\n[WARNING]: Return ERR_GPS_CODE_SMALL_BUFF_SZ for heading in data_to_return_2')

                    data_to_return = {'X': self.ERR_GPS_CODE_SMALL_BUFF_SZ, 'Y': self.ERR_GPS_CODE_SMALL_BUFF_SZ, 'Z': self.ERR_GPS_CODE_SMALL_BUFF_SZ}
                    data_to_return_2 = {'Heading': self.ERR_GPS_CODE_SMALL_BUFF_SZ}

                    return data_to_return, data_to_return_2

                dict_i = self.SBF_frame_buffer[-cnt]

                if dict_i['ID'] == 'Heading':
                    # Both AttEuler and PVTCart return 'Error' field equal to 0, when there is no error
                    if dict_i['ERR'] == 0:
                        data_to_return_2 = dict_i                           

                elif dict_i['ID'] == 'Coordinates':
                    # Both AttEuler and PVTCart return 'Error' field equal to 0, when there is no error
                    if dict_i['ERR'] == 0:                            
                        data_to_return = dict_i

                cnt = cnt + 1

            if self.DBG_LVL_1:
                print('\n[DEBUG_1]: retrieved a Heading and Coordinates response') 

            return data_to_return, data_to_return_2
    else:
        print('\n[WARNING]: nothing in SBF buffer')
        if what == 'Coordinates':
            data_to_return = {'X': self.ERR_GPS_CODE_BUFF_NULL, 'Y': self.ERR_GPS_CODE_BUFF_NULL, 'Z': self.ERR_GPS_CODE_BUFF_NULL}
            print('\n[ERROR]: Return ERR_GPS_CODE_BUFF_NULL for each coordinate in data_to_return')
            return data_to_return

        elif what == 'Heading':
            data_to_return = {'Heading': self.ERR_GPS_CODE_BUFF_NULL}
            print('\n[ERROR]: Return ERR_GPS_CODE_BUFF_NULL for each heading in in data_to_return')
            return data_to_return

        elif what == 'Both':
            data_to_return = {'X': self.ERR_GPS_CODE_BUFF_NULL, 'Y': self.ERR_GPS_CODE_BUFF_NULL, 'Z': self.ERR_GPS_CODE_BUFF_NULL}
            data_to_return_2 = {'Heading': self.ERR_GPS_CODE_BUFF_NULL}
            print('\n[ERROR]: Return ERR_GPS_CODE_BUFF_NULL for each coordinate in data_to_return and for heading in data_to_return_2')
            return data_to_return, data_to_return_2    

parse_septentrio_msg(rx_msg)

Parses the received message and process it depending if it is an SBF or NMEA message

Raises an exception if any problem is encountered when parsing the message.

Parameters:

Name Type Description Default
rx_msg bytes or str

received msg from Ronin RS2 gimbal.

required
Source code in a2gmeasurements.py
def parse_septentrio_msg(self, rx_msg):
    """
    Parses the received message and process it depending if it is an SBF or NMEA message

    Raises an exception if *any* problem is encountered when parsing the message.

    Args:
        rx_msg (bytes or str): received msg from Ronin RS2 gimbal.
    """

    try:
        if self.DBG_LVL_1:
            print('\nPARSING RX DATA')
        if self.DBG_LVL_0:
            print('0 POS: ', rx_msg[0])
            print('\nRX DATA LENGTH: ', len(rx_msg), rx_msg.decode('utf-8', 'ignore'))

        # The SBF output follows the $ sync1 byte, with a second sync byte that is the symbol @ or in utf-8 the decimal 64
        # Bytes indexing  works as follows:
        # One integer gives a decimal
        # A slice (i.e. 0:1) gives a bytes object ---> rx_msg[0] != rx_msg[0:1]
        if rx_msg[0] == 64:                
            if self.DBG_LVL_0:
                print('\nDETECTS SBF')

            # Header detection
            #SYNC = struct.unpack('<1c', rx_msg[0:1]) 
            CRC = struct.unpack('<1H', rx_msg[1:3])                
            ID_SBF_msg = struct.unpack('<1H', rx_msg[3:5])
            LEN_SBF_msg = struct.unpack('<1H', rx_msg[5:7])

            # According to the manual, the LEN should always be a multiple of 4, otherwise 
            # there is an error
            if np.mod(int(LEN_SBF_msg[0]),4) != 0 :
                if self.DBG_LVL_1:
                    print('\nDiscarded frame as LEN_SBF_msg is not multiple of 4, LEN_SBF_msg: ', LEN_SBF_msg[0])
                return

            '''
            # CRC checker
            crc16_checker = Calculator(Crc16.CCITT)
            idx_bytes_crc_to_read = 7+int(LEN_SBF_msg[0])-8
            crc_data = rx_msg[7:idx_bytes_crc_to_read]
            print(type(crc_data))
            crc16 = crc16_checker.checksum(crc_data)
            print(rx_msg[1:3], type(crc16))
            if CRC[0] != crc16:
                if self.DBG_LVL_1:
                    print('\nDiscarded frame cause it did not pass the CRC check')
                return
            '''

            # PVTGeodetic SBF sentenced identified by ID 4007
            if ID_SBF_msg[0] & 8191 == 4007: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID                    
                self.process_pvtgeodetic_sbf_data(rx_msg)
                #print("Received pvt geodetic")

            # PVTCart SBF sentence identified by ID 4006
            if ID_SBF_msg[0] & 8191 == 4006: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID                    
                self.process_pvtcart_sbf_data(rx_msg)
                #print("Received pvtcart")

            # PosCovCartesian SBF sentence identified by ID 5905
            if ID_SBF_msg[0] & 8191 == 5905: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                if self.DBG_LVL_1:
                    print('\nReceived PosCovCartesian SBF sentence')

            if ID_SBF_msg[0] & 8191 == 5907: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                if self.DBG_LVL_1:
                    print('\nReceived VelCovCartesian SBF sentence')

            if ID_SBF_msg[0] & 8191 == 4043: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                if self.DBG_LVL_1:
                    print('\nReceived BaseVectorCart SBF sentence')

            if ID_SBF_msg[0] & 8191 == 5942: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                if self.DBG_LVL_1:
                    print('\nReceived AuxAntPositions SBF sentence')

            if ID_SBF_msg[0] & 8191 == 5938: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                if self.DBG_LVL_1:
                    1
                    #print('\nReceived AttEuler SBF sentence')
                print("Received attitude")
                self.process_atteuler_sbf_data(rx_msg)

            if ID_SBF_msg[0] & 8191 == 5939: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                if self.DBG_LVL_1:
                    print('\nReceived AttCovEuler SBF sentence')

            if ID_SBF_msg[0] & 8191 == 5943: # np.sum([np.power(2,i) for i in range(13)]) # --->  bits 0-12 contain the ID
                if self.DBG_LVL_1:
                    print('\nReceived EndOfAtt SBF sentence')

            # Sort SBF buffer entries by time (this is double checking, as they are expected to arrive in time order)
            self.SBF_frame_buffer.sort(key=lambda k : k['TOW'])

            # Merge buffer entries corresponding to the same TOW
            #self.util_merge_buffer_entries_by_timetag(type_msg='SBF')

        # NMEA Output starts with the letter G, that in utf-8 is the decimal 71
        elif rx_msg[0] == 71:
            if self.DBG_LVL_0:
                print('\nDETECTS NMEA')
            self.process_gps_nmea_data(rx_msg[:-1])
            #self.util_merge_buffer_entries_by_timetag(type_msg='NMEA')

    except Exception as e:
        if self.DBG_LVL_1:
            print('\nEXCEPTION IN parse_septentrio_msg')
        if self.DBG_LVL_0:
            print('\nThis is the exception: ', e, )
            logging.exception("\nError occurred: ")

process_atteuler_sbf_data(raw_data)

Parses an AttEuler SBF sentence. To be able to receive this block, the receiver should be configured to output SBF sentences.

The AttEuler SBF sentence provides heading information of the imaginary line formed by the first and second antennas, w.r.t the North. To do so, the heading, pitch, and roll axis are defined.

The coordinates are stored in SBF_frame_buffer. Each MAX_SBF_BUFF_LEN entries of SBF_frame_buffer, the buffer is flushed and its contents are saved on disk.

More about all axis definition, and heading information in "mosaic-go Reference Guide".

Parameters:

Name Type Description Default
raw_data bytes

received data corresponding to the AttEurler SBF sentence.

required
Source code in a2gmeasurements.py
def process_atteuler_sbf_data(self, raw_data):
    """
    Parses an AttEuler SBF sentence. To be able to receive this block, the receiver should be configured to output SBF sentences.

    The AttEuler SBF sentence provides heading information of the imaginary line formed by the first and second antennas, w.r.t the North. To do so, the heading, pitch, and roll axis are defined.

    The coordinates are stored in ``SBF_frame_buffer``. Each ``MAX_SBF_BUFF_LEN`` entries of ``SBF_frame_buffer``, the buffer is flushed and its contents are saved on disk.

    *More about all axis definition, and heading information in "mosaic-go Reference Guide"*.

    Args:
        raw_data (bytes): received data corresponding to the AttEurler SBF sentence.
    """

    TOW = struct.unpack('<1I', raw_data[7:11])[0]
    WNc = struct.unpack('<1H', raw_data[11:13])[0]        
    NrSV = struct.unpack('<1B', raw_data[13:14])[0]
    ERR =  struct.unpack('<1B', raw_data[14:15])[0]
    MODE =  struct.unpack('<1H', raw_data[15:17])[0]
    Heading =  struct.unpack('<1f', raw_data[19:23])[0]
    try:
        Pitch =  struct.unpack('<1f', raw_data[23:27])[0]
    except Exception as e:
        print("[DEBUG]: Error unpacking Pitch attitude, ", e)
    Roll = struct.unpack('<1f', raw_data[27:31])[0]
    PitchDot =  struct.unpack('<1f', raw_data[31:35])[0]
    RollDot =  struct.unpack('<1f', raw_data[35:39])[0]
    HeadingDot = struct.unpack('<1f', raw_data[39:43])[0]

    '''
    atteul_msg_format = {'TOW': TOW, 'WNc': WNc, 'NrSV': NrSV, 'ERR': ERR, 'MODE': MODE, 
                         'Heading': Heading, 'Pitch': Pitch, 'Roll': Roll, 
                         'PitchDot': PitchDot, 'RollDot': RollDot, 'HeadingDot': HeadingDot}        
    '''
    atteul_msg_useful = {'ID': 'Heading', 'TOW': TOW, 'WNc': WNc,'ERR': ERR, 'MODE': MODE, 
                         'Heading': Heading, 'Pitch': Pitch, 'Roll': Roll}

    self.SBF_frame_buffer.append(atteul_msg_useful)

    if len(self.SBF_frame_buffer) > self.MAX_SBF_BUFF_LEN:
        with open(self.save_filename + '.txt', 'a+') as file:      
            file.write(json.dumps(self.SBF_frame_buffer))    
            print("[DEBUG]: Saved GPS cooridnates file")             
        self.SBF_frame_buffer = []

process_gps_nmea_data(data)

Parses a line of NMEA data retrieved from the gps and coming from the virtual serial port.

Used NMEA sentences are GGA and HDT.

The labels of the items of the returned dictionary are the following ones for the GGA sentence: Timestamp, Latitude, Longitude, Latitude Direction, Longitude, Longitude Direction, GPS Quality Indicator, Number of Satellites in use, Horizontal Dilution of Precision, Antenna Alt above sea level (mean), Units of altitude (meters), Geoidal Separation, Units of Geoidal Separation (meters), Age of Differential GPS Data (secs), Differential Reference Station ID.

The instances of this class created in the GUI and other classes, use SBF sentences as the default type of sentence.

Parameters:

Name Type Description Default
data str

line of read data following the structure of a NMEA frame.

required
Source code in a2gmeasurements.py
def process_gps_nmea_data(self, data):
    """
    Parses a line of NMEA data retrieved from the gps and coming from the virtual serial port.

    Used NMEA sentences are GGA and HDT.

    The labels of the items of the returned dictionary are the following ones for the GGA sentence: ``Timestamp``, ``Latitude``, ``Longitude``, ``Latitude Direction``, ``Longitude``, ``Longitude Direction``, ``GPS Quality Indicator``, ``Number of Satellites in use``, ``Horizontal Dilution of Precision``, ``Antenna Alt above sea level (mean)``, ``Units of altitude (meters)``, ``Geoidal Separation``, ``Units of Geoidal Separation (meters)``, ``Age of Differential GPS Data (secs)``, ``Differential Reference Station ID``.

    *The instances of this class created in the GUI and other classes, use SBF sentences as the default type of sentence*.

    Args:
        data (str): line of read data following the structure of a NMEA frame.
    """

    try:
        if self.DBG_LVL_0:
            print('\nNMEA PARSING')

        nmeaobj = pynmea2.parse(data.decode())
        extracted_data = ['%s: %s' % (nmeaobj.fields[i][0], nmeaobj.data[i]) for i in range(len(nmeaobj.fields))]
        gps_data = {}
        for item in extracted_data:
            tmp = item.split(': ')
            gps_data[tmp[0]] = tmp[1]

        # GGA type of NMEA sentence
        if 'Antenna Alt above sea level (mean)' in gps_data:
            if int(gps_data['Latitude'][0]) != 0:
                gps_data['Latitude'] = float(gps_data['Latitude'][0:2]) + float(gps_data['Latitude'][2:])/60
            else:
                gps_data['Latitude'] = float(gps_data['Latitude'][0:3]) + float(gps_data['Latitude'][3:])/60

            if int(gps_data['Longitude'][0]) != 0:
                gps_data['Longitude'] = float(gps_data['Longitude'][0:2]) + float(gps_data['Longitude'][2:])/60
            else:
                gps_data['Longitude'] = float(gps_data['Longitude'][0:3]) + float(gps_data['Longitude'][3:])/60

            gps_data['Antenna Alt above sea level (mean)'] = float(gps_data['Antenna Alt above sea level (mean)'])
            gps_data['Timestamp'] = float(gps_data['Timestamp'])

            '''
            # Save the UNIX timestamp. As the timestamp provides hour/min/sec only, add the date
            today_date = datetime.date.today()
            today_date = [int(i) for i in today_date.strftime("%Y-%m-%d").split('-')]                

            complete_date = datetime.datetime(year=today_date[0], 
                                            month=today_date[1], 
                                            day=today_date[2], 
                                            hour=int(gps_data['Timestamp'][0:2]), 
                                            minute=int(gps_data['Timestamp'][2:4]), 
                                            second=int(gps_data['Timestamp'][4:6]))

            gps_data['Timestamp'] = time.mktime(complete_date.timetuple())

            '''

        # HDT NMEA sentence
        if 'Heading' in gps_data:
            if gps_data['Heading'] == '':
                gps_data['Heading'] = -2000
            else:
                gps_data['Heading'] = float(gps_data['Heading'])

            # No need to restrict heading to [-pi, pi] since it will be done 
            # inside 'ground_gimbal_follows_drone' function 
            #if gps_data['Heading'] > 180:
            #    gps_data['Heading'] = gps_data['Heading'] - 360

            # Make the timestamp the same format as the GGA sentence
            for stream in self.stream_info:
                if stream['msg_type'] == 'NMEA':
                    # Need to update faster
                    if 'msec' in stream['interval']:
                        1
                    #elif 'sec' in stream['interval']:                            
                    else:
                        gps_data['Timestamp'] = ''
                        for i in datetime.datetime.utcnow().timetuple()[3:6]:
                            tmp = str(i)
                            if len(tmp) == 1:
                                tmp = '0' + tmp
                            gps_data['Timestamp'] = gps_data['Timestamp'] + tmp
                        gps_data['Timestamp'] = float(int(gps_data['Timestamp']))

        if self.DBG_LVL_2 or len(self.NMEA_buffer):
            if self.DBG_LVL_0:
                print('\nSAVES NMEA DATA INTO BUFFER')    
            self.NMEA_buffer.append(gps_data)  

    except Exception as e:
        # Do not save any other comand line
        if self.DBG_LVL_1:
            print('\nEXCEPTION PROCESSING NMEA')
        if self.DBG_LVL_0:
            print('\nThis is the exception: ', e) 

process_pvtcart_sbf_data(raw_data)

Parses an PVTCart SBF sentence. To be able to receive this block, the receiver should be configured to output SBF sentences.

The PVTCart SBF sentence provides geocentric coordinates X, Y, Z for the position of the receiver.

The coordinates are stored in SBF_frame_buffer. Each MAX_SBF_BUFF_LEN entries of SBF_frame_buffer, the buffer is flushed and its contents are saved on disk.

More about the information carried by this block in "mosaic-go Reference Guide".

Parameters:

Name Type Description Default
raw_data bytes

received data corresponding to the PVTCart SBF block.

required
Source code in a2gmeasurements.py
def process_pvtcart_sbf_data(self, raw_data):
    """
    Parses an PVTCart SBF sentence. To be able to receive this block, the receiver should be configured to output SBF sentences.

    The PVTCart SBF sentence provides geocentric coordinates X, Y, Z for the position of the receiver.

    The coordinates are stored in ``SBF_frame_buffer``. Each ``MAX_SBF_BUFF_LEN`` entries of ``SBF_frame_buffer``, the buffer is flushed and its contents are saved on disk.

    *More about the information carried by this block in "mosaic-go Reference Guide"*.

    Args:
        raw_data (bytes): received data corresponding to the PVTCart SBF block.
    """

    format_before_padd = '<1c3H1I1H2B3d5f1d1f4B2H1I2B4H1B' 
    format_after_padd = format_before_padd + str(sys.getsizeof(raw_data)-struct.calcsize(format_before_padd)) + 'B'

    TOW = struct.unpack('<1I', raw_data[7:11])[0]
    WNc = struct.unpack('<1H', raw_data[11:13])[0]        
    MODE =  struct.unpack('<1B', raw_data[13:14])[0]
    ERR =  struct.unpack('<1B', raw_data[14:15])[0]
    X =  struct.unpack('<1d', raw_data[15:23])[0]
    Y =  struct.unpack('<1d', raw_data[23:31])[0]
    try:
        Z = struct.unpack('<1d', raw_data[31:39])[0]
    except Exception as e:
        if self.DBG_LVL_0:
            print("[DEBUG]: error unpacking Z coord, ", e)
    Undulation =  struct.unpack('<1f', raw_data[39:43])[0]
    Vx =  struct.unpack('<1f', raw_data[43:47])[0]
    Vy = struct.unpack('<1f', raw_data[47:51])[0]
    Vz =  struct.unpack('<1f', raw_data[51:55])[0]
    COG =  struct.unpack('<1f', raw_data[55:59])[0]
    RxClkBias = struct.unpack('<1d', raw_data[59:67])[0]
    RxClkDrift =  struct.unpack('<1f', raw_data[67:71])[0]
    TimeSystem = struct.unpack('<1B', raw_data[71:72])[0]
    Datum =  struct.unpack('<1B', raw_data[72:73])[0]
    NrSV = struct.unpack('<1B', raw_data[73:74])[0]
    WACorrInfo =  struct.unpack('<1B', raw_data[74:75])[0]
    ReferenceID =  struct.unpack('<1H', raw_data[75:77])[0]
    MeanCorrAge = struct.unpack('<1H', raw_data[77:79])[0]
    SignalInfo =  struct.unpack('<1I', raw_data[79:83])[0] 
    AlertFlag = struct.unpack('<1B', raw_data[83:84])[0]
    NrBases =  struct.unpack('<1B', raw_data[84:85])[0]
    PPPInfo =  struct.unpack('<1H', raw_data[85:87])[0]
    Latency =  struct.unpack('<1H', raw_data[87:89])[0]        
    HAccuracy =  struct.unpack('<1H', raw_data[89:91])[0]         
    VAccuracy =  struct.unpack('<1H', raw_data[91:93])[0]  

    '''
    pvt_msg_format = {'TOW': TOW, 'WNc': WNc, 'MODE': MODE, 'ERR': ERR, 'X': X, 'Y': Y, 'Z': Z,
                      'Undulation': Undulation, 'Vx': Vx, 'Vy': Vy, 'Vz': Vz, 'COG': COG,
                      'RxClkBias': RxClkBias, 'RxClkDrift': RxClkDrift, 'TimeSystem': TimeSystem, 'Datum': Datum,
                      'NrSV': NrSV, 'WACorrInfo': WACorrInfo, 'ReferenceID': ReferenceID, 'MeanCorrAge': MeanCorrAge,
                      'SignalInfo': SignalInfo, 'AlertFlag': AlertFlag, 'NrBases': NrBases, 'PPPInfo': PPPInfo,
                      'Latency': Latency, 'HAccuracy': HAccuracy, 'VAccuracy': VAccuracy}        
    '''
    pvt_data_we_care = {'ID': 'Coordinates', 'TOW': TOW, 'WNc': WNc, 'MODE': MODE, 'ERR': ERR, 
                        'X': X, 'Y': Y, 'Z': Z, 'Datum': Datum}

    self.SBF_frame_buffer.append(pvt_data_we_care)

    if len(self.SBF_frame_buffer) > self.MAX_SBF_BUFF_LEN:
        with open(self.save_filename + '.txt', 'a+') as file:      
            file.write(json.dumps(self.SBF_frame_buffer))       
            print("[DEBUG]: Saved GPS cooridnates file")     
        self.SBF_frame_buffer = []

process_pvtgeodetic_sbf_data(raw_data)

Parses an PVTGeodetic SBF sentence. To be able to receive this block, the receiver should be configured to output SBF sentences.

The PVTGeodetic SBF sentence provides geodetic coordinates lat, lon, h for the position of the receiver.

The coordinates are stored in SBF_frame_buffer. Each MAX_SBF_BUFF_LEN entries of SBF_frame_buffer, the buffer is flushed and its contents are saved on disk.

More about the information carried by this block in "mosaic-go Reference Guide".

Parameters:

Name Type Description Default
raw_data bytes

received data corresponding to the PVTGeodetic SBF block.

required
Source code in a2gmeasurements.py
def process_pvtgeodetic_sbf_data(self, raw_data):
    """
    Parses an PVTGeodetic SBF sentence. To be able to receive this block, the receiver should be configured to output SBF sentences.

    The PVTGeodetic SBF sentence provides geodetic coordinates lat, lon, h for the position of the receiver.

    The coordinates are stored in ``SBF_frame_buffer``. Each ``MAX_SBF_BUFF_LEN`` entries of ``SBF_frame_buffer``, the buffer is flushed and its contents are saved on disk.

    *More about the information carried by this block in "mosaic-go Reference Guide"*.

    Args:
        raw_data (bytes): received data corresponding to the PVTGeodetic SBF block.
    """

    TOW = struct.unpack('<1I', raw_data[7:11])[0]
    WNc = struct.unpack('<1H', raw_data[11:13])[0]        
    MODE =  struct.unpack('<1B', raw_data[13:14])[0]
    ERR =  struct.unpack('<1B', raw_data[14:15])[0]
    LAT =  struct.unpack('<1d', raw_data[15:23])[0]
    LON =  struct.unpack('<1d', raw_data[23:31])[0]
    H = struct.unpack('<1d', raw_data[31:39])[0]
    Undulation =  struct.unpack('<1f', raw_data[39:43])[0]
    Vx =  struct.unpack('<1f', raw_data[43:47])[0]
    Vy = struct.unpack('<1f', raw_data[47:51])[0]
    Vz =  struct.unpack('<1f', raw_data[51:55])[0]
    COG =  struct.unpack('<1f', raw_data[55:59])[0]
    RxClkBias = struct.unpack('<1d', raw_data[59:67])[0]
    RxClkDrift =  struct.unpack('<1f', raw_data[67:71])[0]
    TimeSystem = struct.unpack('<1B', raw_data[71:72])[0]
    Datum =  struct.unpack('<1B', raw_data[72:73])[0]        

    pvt_data_we_care = {'ID': 'Coordinates', 'TOW': TOW, 'WNc': WNc, 'MODE': MODE, 'ERR': ERR, 
                        'LAT': LAT, 'LON': LON, 'HEIGHT': H, 'Datum': Datum}

    self.SBF_frame_buffer.append(pvt_data_we_care)

    if len(self.SBF_frame_buffer) > self.MAX_SBF_BUFF_LEN:
        with open(self.save_filename + '.txt', 'a+') as file:      
            file.write(json.dumps(self.SBF_frame_buffer))      
            print("[DEBUG]: Saved GPS cooridnates file")           
        self.SBF_frame_buffer = []

sendCommandGps(cmd, interface='USB')

Sends a command to the Septentrio mosaic-go receiver.

Blocks this thread execution for 500 ms.

Parameters:

Name Type Description Default
cmd str

command to be sent to the Septentrio mosaic-go receiver. The available list of commands is defined in "mosaic-go Reference Guide".

required
interface str

is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.

'USB'
Source code in a2gmeasurements.py
def sendCommandGps(self, cmd, interface='USB'):
    """
    Sends a command to the Septentrio mosaic-go receiver.

    Blocks this thread execution for 500 ms.

    Args:
        cmd (str): command to be sent to the Septentrio mosaic-go receiver. The available list of commands is defined in "mosaic-go Reference Guide".
        interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
    """

    cmd_eof = cmd + '\n'

    if interface =='USB':
        self.serial_instance.write(cmd_eof.encode('utf-8'))
    #elif interface == 'IP':
    #    self.socket.sendall(cmd_eof.encode('utf-8'))

    time.sleep(0.5)

serial_connect(serial_port=None)

Open a serial connection. The Septentrio mosaic-go provides 2 virtual serial ports.

In Windows the name of the virtual serial ports are typically: COM# (Virtual serial port 1), COM# (Virtual serial port 2).

In Linux the name of the virtual serial ports (controlled by the standard Linux CDC-ACM driver) are: /dev/ttyACM0 (Virtual serial port 1), /dev/ttyACM1 (Virtual serial port 2).

Septentrio has different interfaces to use with its receiver. Among other interfaces are: IP (using Ethernet-over-USB), USB.

For the virtual serial ports the interface name in Septentrio receiver is 'USB' as their communication is made through the USB connection with the host computer.

Additionally there is an actual (not virtual) serial port in the mosaic-go device. Under Linux, the name of this port is /dev/serial0 which is the symbolic link to either dev/ttyS# or /dev/ttyAMA#.

For information about all available interfaces check the Septentrio "mosaic-go Reference Guide".

It is important to note that only the USB interface has been implemented in this class.

Parameters:

Name Type Description Default
serial_port str

serial port or virtual serial port name. Defaults to None.

None
Source code in a2gmeasurements.py
def serial_connect(self, serial_port=None):
    """
    Open a serial connection. The Septentrio mosaic-go provides 2 virtual serial ports.

    In Windows the name of the virtual serial ports are typically: COM# (Virtual serial port 1), COM# (Virtual serial port 2).

    In Linux the name of the virtual serial ports (controlled by the standard Linux CDC-ACM driver) are: ``/dev/ttyACM0`` (Virtual serial port 1), ``/dev/ttyACM1`` (Virtual serial port 2).

    Septentrio has different interfaces to use with its receiver. Among other interfaces are: IP (using Ethernet-over-USB), USB.

    For the virtual serial ports the interface name in Septentrio receiver is 'USB' as their
    communication is made through the USB connection with the host computer. 

    Additionally there is an actual (not virtual) serial port in the mosaic-go device. Under Linux, the name of this port is ``/dev/serial0`` which is the symbolic link to either ``dev/ttyS#`` or ``/dev/ttyAMA#``.

    For information about all available interfaces check the Septentrio "mosaic-go Reference Guide".

    *It is important to note that only the USB interface has been implemented in this class*.

    Args:
        serial_port (str, optional): serial port or virtual serial port name. Defaults to None.
    """

    self.serial_port = None
    # Look for the first Virtual Com in Septentrio receiver. It is assumed that it is available, 
    # meaning that it has been closed by user if was used before.        
    for (this_port, desc, _) in sorted(comports()):

        # Linux CDC-ACM driver
        if 'Septentrio USB Device - CDC Abstract Control Model (ACM)' in desc:
                #self.serial_port = '/dev/ttyACM0'
                self.serial_port = this_port
                self.interface_number = 2
        # Windows driver
        elif 'Septentrio Virtual USB COM Port 1' in desc: # Choose the first virtual COM port
                self.serial_port = this_port
                self.interface_number = 1

    if self.serial_port is None:
        self.GPS_CONN_SUCCESS = False
        print("\n[DEBUG]: NO GPS found in any serial port")
        return
    else:
        self.GPS_CONN_SUCCESS = True
        print("\n[DEBUG]: GPS found in one serial port")

    serial_instance = None
    while serial_instance is None:
        try:
            serial_instance = serial.serial_for_url(self.serial_port,
                                                    9600,
                                                    parity='N',
                                                    rtscts=False,
                                                    xonxoff=False,
                                                    do_not_open=True)

            serial_instance.timeout = 5

            serial_instance.exclusive = True
            serial_instance.open()

        except serial.SerialException as e:
            sys.stderr.write('could not open port {!r}: {}\n'.format(self.serial_port, e))

        else:
            break

    #if self.DBG_LVL_0:
    print('[DEBUG]:CONNECTED TO VIRTUAL SERIAL PORT IN SEPTENTRIO')

    self.serial_instance = serial_instance
    time.sleep(0.1)

serial_receive(serial_instance_actual, stop_event)

Callback function invoked by the thread responsible for handling I/O communication between the host computer and the Septentrio mosaic-go receiver.

Most of all messages sent by Septentrio mosaic-go receiver start with an "$" character.

The next character depends if the message is an echo of a command sent by the host computer, or if the message is an answer to a command sent by the host computer.

Echoes of commands sent by the host computer, don't follow the $ character with any predefined character. This messages are discarded by the method parse_septentrio_msg.

Messages that answer a command sent by the host computer, DO start with a predefined character. The predefined character depends wheter the answer arises from a NMEA sentence or an SBF sentence. This messages are parsed by the method parse_septentrio_msg.

Parameters:

Name Type Description Default
serial_instance_actual Serial

serial connection instance.

required
stop_event Event

Event to be used to stop the reading of the serial port.

required
Source code in a2gmeasurements.py
def serial_receive(self, serial_instance_actual, stop_event):
    """
    Callback function invoked by the thread responsible for handling I/O communication between the host computer and the Septentrio mosaic-go receiver.

    Most of all messages sent by Septentrio mosaic-go receiver start with an "$" character.

    The next character depends if the message is an echo of a command sent by the host computer, or if the message is an answer to a command sent by the host computer.

    Echoes of commands sent by the host computer, don't follow the ``$`` character with any predefined character. This messages are discarded by the method ``parse_septentrio_msg``.

    Messages that answer a command sent by the host computer, DO start with a predefined character. The predefined character depends wheter the answer arises from a NMEA sentence or an SBF sentence. This messages are parsed by the method ``parse_septentrio_msg``.

    Args:
        serial_instance_actual (Serial): serial connection instance.
        stop_event (threading.Event): Event to be used to stop the reading of the serial port.
    """

    while not stop_event.is_set():
        # This is if only NMEA messages are received
        #rx_msg = serial_instance_actual.readline()

        # This looks for the start of a sentence in either NMEA or SBF messages
        try:
            rx_msg = serial_instance_actual.read_until(expected='$'.encode('utf-8'))
            if len(rx_msg) > 0:
                self.parse_septentrio_msg(rx_msg)
        except Exception as e:
            print('[WARNING]: No bytes to read, ', e)

setHeadingOffset(offset_wrt_xaxis)

Sets the offset mismatch between between the imaginary line formed by the first and second antennas AND the longitudinal axis of the node.

Wrapper of sendCommandGps.

Parameters:

Name Type Description Default
offset_wrt_xaxis float

angle (degrees) from the longitudinal axis of the node to the imaginary line formed by the first and second antennas.

required
Source code in a2gmeasurements.py
def setHeadingOffset(self, offset_wrt_xaxis):
    """
    Sets the offset mismatch between between the imaginary line formed by the first and second antennas AND the longitudinal axis of the node.

    Wrapper of ``sendCommandGps``.

    Args:
        offset_wrt_xaxis (float): angle (degrees) *from* the longitudinal axis of the node *to* the imaginary line formed by the first and second antennas.
    """

    self.sendCommandGps(cmd='setAttitudeOffset, ' + str(offset_wrt_xaxis))

start_gps_data_retrieval(stream_number=1, interface='USB', interval='sec1', msg_type='SBF', nmea_type='+GGA+HDT', sbf_type='+PVTCartesian+AttEuler')

Starts the streaming of the NMEA or SBF sentences.

Wrapper of sendCommandGps.

Parameters:

Name Type Description Default
stream_number int

each interface can have multiple data streams. This parameter defined which is the number of the stream for the given interface. Defaults to 1.

1
interface str

is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.

'USB'
interval str

time regularity used by the Septentrio receiver to sense the given SBF/NMEA sentence. Can be any of the following self-explanatory names: 'msec10', 'msec20', 'msec40', 'msec50', 'msec100', 'msec200', 'msec500', 'sec1', 'sec2', 'sec5', 'sec10', 'sec15', 'sec30', 'sec60', 'min2', 'min5', 'min10', 'min15', 'min30', 'min60'. Defaults to 'sec1'.

'sec1'
msg_type str

NMEA or SBF. Defaults to SBF.

'SBF'
nmea_type str

name/s of the NMEA sentence/s to be retrieved. If multiple sentences, each sentence string identifier should be preceded by '+', and all the string should be concatenated in one single string (i.e. +HDT+GGA). Defaults to +GGA+HDT.

'+GGA+HDT'
sbf_type str

name/s of the SBF sentence/s to be retrieved. If multiple sentences, each sentence string identifier should be preceded by '+', and all the string should be concatenated in one single string (i.e. '+PVTCartesian+AttEuler'). Each sentence needs to have a parsing function that is called in parse_septentrio_msg in the part corresponding to the id of the sentence. Defaults to +PVTCartesian+AttEuler.

'+PVTCartesian+AttEuler'
Source code in a2gmeasurements.py
def start_gps_data_retrieval(self, stream_number=1, interface='USB', interval='sec1', msg_type='SBF', 
                             nmea_type='+GGA+HDT', sbf_type='+PVTCartesian+AttEuler'):
    """
    Starts the streaming of the NMEA or SBF sentences.

    Wrapper of ``sendCommandGps``.

    Args:
        stream_number (int, optional): each interface can have multiple data streams. This parameter defined which is the number of the stream for the given ``interface``. Defaults to 1.
        interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
        interval (str, optional): time regularity used by the Septentrio receiver to sense the given SBF/NMEA sentence. Can be any of the following self-explanatory names: 'msec10', 'msec20', 'msec40', 'msec50', 'msec100', 'msec200', 'msec500', 'sec1', 'sec2', 'sec5', 'sec10', 'sec15', 'sec30', 'sec60', 'min2', 'min5', 'min10', 'min15', 'min30', 'min60'. Defaults to 'sec1'.
        msg_type (str, optional): ``NMEA`` or ``SBF``. Defaults to ``SBF``.
        nmea_type (str, optional): name/s of the NMEA sentence/s to be retrieved. If multiple sentences, each sentence string identifier should be preceded by '+', and all the string should be concatenated in one single string (i.e. ``+HDT+GGA``). Defaults to ``+GGA+HDT``.
        sbf_type (str, optional): name/s of the SBF sentence/s to be retrieved. If multiple sentences, each sentence string identifier should be preceded by '+', and all the string should be concatenated in one single string (i.e. '+PVTCartesian+AttEuler'). Each sentence needs to have a parsing function that is called in ``parse_septentrio_msg`` in the part corresponding to the id of the sentence. Defaults to ``+PVTCartesian+AttEuler``.
    """

    if interface == 'USB' or interface == 'COM':
        if msg_type == 'SBF':
            cmd1 = 'setDataInOut, ' + interface + str(self.interface_number) + ',, ' + '+SBF'
            cmd2 = 'setSBFOutput, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', ' +  sbf_type + ', ' + interval
        elif msg_type == 'NMEA':
            cmd1 = 'setDataInOut, ' + interface + str(self.interface_number) + ',, ' + '+NMEA'
            cmd2 = 'sno, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', ' + nmea_type + ', ' + interval

    self.stream_info.append({'interface': interface, 'stream_number': stream_number, 'interval': interval, 'msg_type': msg_type})

    self.sendCommandGps(cmd1)
    self.sendCommandGps(cmd2)

    if self.DBG_LVL_1:
        print('\n'+ cmd1)
        print('\n'+ cmd2)

start_thread_gps(interface='USB')

Starts the GPS thread responsible for handling I/O communication between the host computer and the Septentrio mosaic-go receiver.

Creates the threading Event that is set when the I/O communication must be closed.

Parameters:

Name Type Description Default
interface str

is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.

'USB'
Source code in a2gmeasurements.py
def start_thread_gps(self, interface='USB'):
    """
    Starts the GPS thread responsible for handling I/O communication between the host computer and the Septentrio mosaic-go receiver.

    Creates the threading Event that is set when the I/O communication must be closed.

    Args:
        interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
    """

    self.event_stop_thread_gps = threading.Event()

    if interface == 'USB' or interface == 'COM':
        t_receive = threading.Thread(target=self.serial_receive, args=(self.serial_instance, self.event_stop_thread_gps))

    #elif interface == 'IP':
    #    t_receive = threading.Thread(target=self.socket_receive, args=(self.event_stop_thread_gps))

    t_receive.start()
    print('\n[DEBUG]: Septentrio GPS thread opened')
    time.sleep(0.5)

stop_gps_data_retrieval(stream_number=1, interface='USB', msg_type='+NMEA+SBF')

Stops the streaming of the NMEA/SBF sentences initiated by calling start_gps_data_retrieval.

Wrapper of sendCommandGps.

Unexpected behaviour to be noted by developers: it seems that if the stream is not stopped by the time the serial connection is closed, then, when the user opens a new serial connection, Septentrio will start sending all the SBF or NMEA messages that were produced between the last time the serial connection was closed and the time it is opened again.

Parameters:

Name Type Description Default
stream_number int

number of the stream to be stopped. Defaults to 1.

1
interface str

is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.

'USB'
msg_type str

the message type corresponding to the stream stream_number. Options: 'SBF', 'NMEA' or '+NMEA+SBF' or '+SBF+NMEA' (the last two are the same). Defaults to NMEA+SBF.

'+NMEA+SBF'
Source code in a2gmeasurements.py
def stop_gps_data_retrieval(self, stream_number=1, interface='USB', msg_type='+NMEA+SBF'):   
    """
    Stops the streaming of the NMEA/SBF sentences initiated by calling ``start_gps_data_retrieval``. 

    Wrapper of ``sendCommandGps``.

    *Unexpected behaviour to be noted by* **developers**: *it seems that if the stream is not stopped by the time the serial connection is closed, then, when the user opens a new serial connection, Septentrio will start sending all the SBF or NMEA messages that were produced between the last time the serial connection was closed and the time it is opened again*.

    Args:
        stream_number (int, optional): number of the stream to be stopped. Defaults to 1.
        interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
        msg_type (str, optional): the message type corresponding to the stream ``stream_number``. Options: 'SBF', 'NMEA' or '+NMEA+SBF' or '+SBF+NMEA' (the last two are the same). Defaults to ``NMEA+SBF``.
    """

    if interface == 'USB' or interface == 'COM':
        if msg_type == 'SBF':
            cmd1 = 'setSBFOutput, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', none '
            cmd2 = 'sdio, ' + interface + str(self.interface_number) + ',, -SBF'

            self.sendCommandGps(cmd1)
            self.sendCommandGps(cmd2)
        elif msg_type == 'NMEA':
            cmd1 = 'sno, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', none ' 
            cmd2 = 'sdio, ' + interface + str(self.interface_number) + ',, -NMEA'

            self.sendCommandGps(cmd1)
            self.sendCommandGps(cmd2)
        elif msg_type == '+NMEA+SBF' or msg_type == '+SBF+NMEA':
            cmd1 = 'setSBFOutput, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', none '
            cmd2 = 'sdio, ' + interface + str(self.interface_number) + ',, -SBF'
            cmd3 = 'sno, Stream ' + str(stream_number) + ', ' + interface + str(self.interface_number) + ', none ' 
            cmd4 = 'sdio, ' + interface + str(self.interface_number) + ',, -NMEA'

            self.sendCommandGps(cmd1)
            self.sendCommandGps(cmd2)       
            self.sendCommandGps(cmd3)
            self.sendCommandGps(cmd4)       

stop_thread_gps(interface='USB')

Stops the GPS thread.

Parameters:

Name Type Description Default
interface str

is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.

'USB'
Source code in a2gmeasurements.py
def stop_thread_gps(self, interface='USB'):
    """
    Stops the GPS thread.

    Args:
        interface (str, optional): is one of the allowed Septentrio interfaces. Current implementation of this class only uses 'USB' interface. Defaults to 'USB'.
    """

    self.event_stop_thread_gps.set()
    time.sleep(0.1)

    if interface =='USB' or interface == 'COM':
        self.serial_instance.close()

    elif interface =='IP':
        self.socket.close()

    print('\n[DEBUG]: Septentrio GPS thread closed')