Skip to content

GimbalRS2 module

Bases: object

Python Class that works as the driver for the gimbal DJI RS2.

The gimbal should be connected to the host computer through an USB-to-PCAN bridge (PCAN System).

It creates a thread (called here a gimbal thread) to handle the communication between the gimbal and this host computer.

More info on "Manual A2GMeasurements".

Gimbal control modified and extended from (based as well on DJI R SDK demo software):

DJI ROS CONTROLLER

Source code in a2gmeasurements.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
class GimbalRS2(object):
    """
    Python Class that works as the driver for the gimbal DJI RS2.

    The gimbal should be connected to the host computer through an USB-to-PCAN bridge (PCAN System). 

    It creates a thread (called here a gimbal thread) to handle the communication between the gimbal and this host computer.

    More info on "Manual A2GMeasurements".

    Gimbal control modified and extended from (based as well on DJI R SDK demo software):

    [DJI ROS CONTROLLER](https://github.com/ceinem/dji_rs2_ros_controller)
    """   

    def __init__(self, speed_yaw=40, speed_pitch=40, speed_roll=40, DBG_LVL_1=False, DBG_LVL_0=False):
        """
        Initialize properties of the class, like the DJI SDK frame header, sequence number and others.

        Args:
            speed_yaw (int, optional): speed of yaw axis in deg/s. Defaults to 40.
            speed_pitch (int, optional): speed of pitch axis in deg/s. Defaults to 40.
            speed_roll (int, optional): speed of roll axis in deg/s. Defaults to 40.
            DBG_LVL_1 (bool, optional): level of verbose to show at the command line (beta). This shows less verbose than the 0 level. Defaults to False.
            DBG_LVL_0 (bool, optional): level of verbose to show at the command line (beta). Defaults to False.
        """      

        self.header = 0xAA
        self.enc = 0x00
        self.res1 = 0x00
        self.res2 = 0x00
        self.res3 = 0x00
        self.seq = 0x0002

        self.send_id = 0x223
        self.recv_id = 0x222

        self.MAX_CAN_FRAME_LEN = 8

        self.can_recv_msg_buffer = []
        self.can_recv_msg_len_buffer = []
        self.can_recv_buffer_len = 10

        self.roll = 0.0
        self.pitch = 0.0
        self.yaw = 0.0

        self.SPEED_YAW =  speed_yaw # deg/s
        self.SPEED_PITCH =  speed_pitch # deg/s
        self.SPEED_ROLL =  speed_roll # deg/s

        self.MAIN_LOOP_STOP = True
        self.keyboard_set_flag = False
        self.keyboard_buff = []

        self.cntBytes = 0
        self.TIME_POS_REQ = 0.01
        self.DBG_LVL_0 = DBG_LVL_0
        self.DBG_LVL_1 = DBG_LVL_1

        self.TIME2MOVE_180_DEG_YAW = 180/speed_yaw
        self.TIME2MOVE_180_DEG_YAW = 180/speed_pitch
        self.TIME2MOVE_180_DEG_YAW = 180/speed_roll

    def seq_num(self):
        """
        Updates the sequence number of the gimbal data.

        Returns:
            seq_str (str): number in hexadecimal split by a ``:``.
        """

        if self.seq >= 0xFFFD:
            self.seq = 0x0002
        self.seq += 1
        # Seq_Init_Data = 0x1122
        seq_str = "%04x" % self.seq
        return seq_str[2:] + ":" + seq_str[0:2]

    def can_buffer_to_full_frame(self):
        """
        Parse the full DJI R frame message from the can buffer.

        Its fields are explained in the DJI R SDK Protocol and User Interface.

        Returns:
            full_msg_frames (list): a full message frame
        """

        full_msg_frames = []
        full_frame_counter = 0
        for i in range(len(self.can_recv_msg_buffer)):
            msg = self.can_recv_msg_buffer[i]
            length = self.can_recv_msg_len_buffer[i]
            msg = msg[:length]
            cmd_data = ':'.join(msg)
            # print("len: " + str(length) + " - " +
            #       str(msg) + " -> " + cmd_data)
            if msg[0] == "AA":
                full_msg_frames.append(msg)
                full_frame_counter += 1
            if msg[0] != "AA" and (full_frame_counter > 0):
                # full_msg_frames[-1] += ":"
                for byte in msg:
                    full_msg_frames[-1].append(byte)
        return full_msg_frames

    def validate_api_call(self, data_frame):
        """
        CRC error check.

        Args:
            data_frame (list): DJI RS2 frame message

        Returns:
            validated (bool): passed or not the crc check
        """

        validated = False
        check_sum = ':'.join(data_frame[-4:])
        data = ':'.join(data_frame[:-4])
        # # print(len(hex_data))
        # # print(data)
        if len(data_frame) >= 8:
            crc_obj = Checksum()
            if check_sum == crc_obj.calc_crc32(data):
                #         # print("Approved Message: " + str(hex_data))
                header = ':'.join(data_frame[:10])
                header_check_sum = ':'.join(data_frame[10:12])
                if header_check_sum == crc_obj.calc_crc16(header):
                    validated = True
        return validated

    def parse_position_response(self, data_frame):
        """
        Retrieve the position from the full DJI frame message.

        Args:
            data_frame (list): DJI RS2 frame message
        """

        pos_data = data_frame[16:-4]
        yaw = int(
            '0x' + pos_data[1] + pos_data[0], base=16)
        roll = int(
            '0x' + pos_data[3] + pos_data[2], base=16)
        pitch = int(
            '0x' + pos_data[5] + pos_data[4], base=16)
        if yaw > 1800:
            yaw -= 65538
        if roll > 1800:
            roll -= 65538
        if pitch > 1800:
            pitch -= 65538

        # Radians
        #self.yaw = yaw * 0.1 * np.pi / 180
        #self.roll = roll * 0.1 * np.pi / 180
        #self.pitch = pitch * 0.1 * np.pi / 180

        # Degrees
        self.yaw = yaw * 0.1 
        self.roll = roll * 0.1
        self.pitch = pitch * 0.1

        output = "Pitch: " + \
            str(self.pitch) + ", Yaw: " + \
            str(self.yaw) + ", Roll: " + str(self.roll)

        print(output + '\n')

    def can_callback(self, data):
        """
        Callback for the thread in charge of checking the USB-to-CAN input (receive).

        Args:
            data (list): DJI RS2 frame message
        """

        str_data = ['{:02X}'.format(i) for i in data.data]

        self.can_recv_msg_buffer.append(str_data)
        self.can_recv_msg_len_buffer.append(data.dlc)

        if len(self.can_recv_msg_buffer) > self.can_recv_buffer_len:
            self.can_recv_msg_buffer.pop(0)
            self.can_recv_msg_len_buffer.pop(0)

        full_msg_frames = self.can_buffer_to_full_frame()

        for hex_data in full_msg_frames:
            if self.validate_api_call(hex_data):
                request_data = ":".join(hex_data[12:14])
                if request_data == "0E:02":
                    # This is response data to a get position request
                    if self.DBG_LVL_1:
                        print('\nResponse received to request_current_position on gimbal RS2')
                    self.parse_position_response(hex_data)
                elif request_data == "0E:00":
                    # Parse response to control handheld gimbal position
                    1
                elif request_data == "0E:01":
                    # Parse response to Control handheld gimbal speed
                    print('\nObtained a response to setSpeedControl')
                elif request_data == "0E:03":
                    # Parse response to Set handheld gimbal limit angle
                    1
                elif request_data == "0E:04":
                    # Parse response to Obtain handheld gimbal limit angle
                    1
                elif request_data == "0E:05":
                    # Parse response to Set handheld gimbal motor stifness
                    1
                elif request_data == "0E:06":
                    # Parse response to Obtain handheld gimbal limit angle
                    1
                elif request_data == "0E:07":
                    # Parse response to Set information push of handheld gimbal parameters
                    1
                elif request_data == "0E:08":
                    # Parse response to Push handheld gimbal parameters
                    1
                elif request_data == "0E:09":
                    # Parse response to Obtain module version number
                    1
                elif request_data == "0E:0A":
                    # Parse response to Push joystick control comand
                    1
                elif request_data == "0E:0B":
                    # Parse response to Obtain handheld gimbal user parameters
                    1
                elif request_data == "0E:0C":
                    # Parse response to Set handheld gimbal user parameters
                    1
                elif request_data == "0E:0D":
                    # Parse response to Set handheld gimbal operation mode
                    1
                elif request_data == "0E:0E":
                    # Parse response to Set gimbal Recenter, Selfie, amd Follow modes
                    1
                elif request_data == "0D:00":
                    # Parse response to Third-party motion comand
                    1
                elif request_data == "0D:01":
                    # Parse response to Third-party camera status obtain comand
                    1
                else:
                    print('\n[ERROR]: error on gimbal command reception, error code: ', request_data)

    def setPosControl(self, yaw, roll, pitch, ctrl_byte=0x01, time_for_action=0x14):
        """
        Set the gimbal position by providing the yaw, roll and pitch.

        Args:
            yaw (int): yaw angle. Value should be between -1800 and 1800.
            roll (int): roll angle. value should be betweeen -1800 and 1800. However, gimbal might stop if it reachs its maximum/minimum (this) axis value.
            pitch (int): pitch angle. value should be betweeen -1800 and 1800. However, gimbal might stop if it reachs its maximum/minimum (this) axis value.
            ctrl_byte (hexadecimal, optional): Absolute or relative movement. For absolute use 0x01, while for relative use 0x00. Defaults to 0x01.
            time_for_action (hexadecimal, optional): Time it takes for the gimbal to move to desired position. Implicitly, this command controls the speed of gimbal. It is given in units of 0.1 s. For example: a value of 0x14 is 20, which means that the gimbal will take 2s (20*0.1) to reach its destination. Defaults to 0x14.

        Returns:
            True (bool): always returns the same, unless there is a raising error.
        """

        # yaw, roll, pitch in 0.1 steps (-1800,1800)
        # ctrl_byte always to 1
        # time_for_action to define speed in 0.1sec
        hex_data = struct.pack('<3h2B', yaw, roll, pitch,
                               ctrl_byte, time_for_action)

        pack_data = ['{:02X}'.format(i) for i in hex_data]
        cmd_data = ':'.join(pack_data)
        # print(cmd_data)
        cmd = self.assemble_can_msg(cmd_type='03', cmd_set='0E',
                                    cmd_id='00', data=cmd_data)

        self.send_cmd(cmd)

        # Save the current pitch and yaw values
        if ctrl_byte == 0x00:
            self.pitch = self.pitch + pitch
            self.yaw = self.yaw + yaw
        elif ctrl_byte == 0x01:
            self.pitch = pitch
            self.yaw = yaw

        if self.pitch > 1800 and self.pitch <= 3600:
            self.pitch = self.pitch - 3600 
        elif self.pitch < -1800 and self.pitch >= -3600:
            self.pitch = self.pitch + 3600

        if self.yaw > 1800 and self.yaw <= 3600:
            self.yaw = self.yaw - 3600
        elif self.yaw < -1800 and self.yaw >= -3600:
            self.yaw = self.yaw + 3600

        return True    

    def setSpeedControl(self, yaw, roll, pitch, ctrl_byte=0x80):
        """
        Sets speed for each axis of the gimbal.

        Always after seting the speed the gimbal roll is moved (strange behaviour). 

        Developer has to send a setPosControl to set again the position of the gimbal where it was previously.

        Args:
            yaw (int): yaw speed in units of 0.1 deg/s.
            roll (int): roll speed in units of 0.1 deg/s.
            pitch (int): pitch speed in units of 0.1 deg/s.
            ctrl_byte (hexadecimal, optional): check DJI SDK manual. Defaults to 0x80.

        Returns:
            True (bool): True if provided arguments are within acceptable range. False otherwise.
        """

        if -3600 <= yaw <= 3600 and -3600 <= roll <= 3600 and -3600 <= pitch <= 3600:

            hex_data = struct.pack('<3hB', yaw, roll, pitch, ctrl_byte)
            pack_data = ['{:02X}'.format(i) for i in hex_data]
            cmd_data = ':'.join(pack_data)

            cmd = self.assemble_can_msg(cmd_type='03', cmd_set='0E',
                                        cmd_id='01', data=cmd_data)
            # print('cmd---data {}'.format(cmd))
            self.send_cmd(cmd)

            return True
        else:
            return False

    def request_current_position(self):
        """
        Sends command to request the current position of the gimbal.

        Blocks thread execution for the time given by attribute ``TIME_POS_REQ`` to allow the response to be received.
        """

        hex_data = [0x01]
        pack_data = ['{:02X}'.format(i)
                     for i in hex_data]
        cmd_data = ':'.join(pack_data)
        cmd = self.assemble_can_msg(cmd_type='03', cmd_set='0E',
                                    cmd_id='02', data=cmd_data)
        self.send_cmd(cmd)

        # Time to receive response from gimbal
        time.sleep(self.TIME_POS_REQ)

    def assemble_can_msg(self, cmd_type, cmd_set, cmd_id, data):
        """
        Builds a DJI RS2 message frame based on the command to be sent.

        Args:
            cmd_type (int): see DJI R SDK Protocol and User Interface document for a description.
            cmd_set (int): see DJI R SDK Protocol and User Interface document for a description.
            cmd_id (int): see DJI R SDK Protocol and User Interface document for a description.
            data (int): see DJI R SDK Protocol and User Interface document for a description.

        Returns:
            whole_can_frame (str): parsed can frame whose fields are separated by ":".
        """

        if data == "":
            can_frame_data = "{prefix}" + \
                ":{cmd_set}:{cmd_id}".format(
                    cmd_set=cmd_set, cmd_id=cmd_id)
        else:
            can_frame_data = "{prefix}" + ":{cmd_set}:{cmd_id}:{data}".format(
                cmd_set=cmd_set, cmd_id=cmd_id, data=data)

        cmd_length = len(can_frame_data.split(":")) + 15

        seqnum = self.seq_num()
        # ic(seqnum)
        can_frame_header = "{header:02x}".format(
            header=self.header)  # SOF byte
        can_frame_header += ":" + \
            ("%04x" % (cmd_length))[2:4]  # 1st length byte
        can_frame_header += ":" + \
            ("%04x" % (cmd_length))[0:2]  # 2nd length byte
        can_frame_header += ":" + \
            "{cmd_type}".format(cmd_type=cmd_type)  # Command Type
        can_frame_header += ":" + "{enc:02x}".format(enc=self.enc)  # Encoding
        can_frame_header += ":" + \
            "{res1:02x}".format(res1=self.res1)  # Reserved 1
        can_frame_header += ":" + \
            "{res2:02x}".format(res2=self.res2)  # Reserved 2
        can_frame_header += ":" + \
            "{res3:02x}".format(res3=self.res3)  # Reserved 3
        can_frame_header += ":" + seqnum    # Sequence number

        crc_obj = Checksum()
        can_frame_header += ":" + crc_obj.calc_crc16(can_frame_header)

        # hex_seq = [eval("0x" + hex_num) for hex_num in can_frame_header.split(":")]

        whole_can_frame = can_frame_data.format(prefix=can_frame_header)
        whole_can_frame += ":" + crc_obj.calc_crc32(whole_can_frame)
        whole_can_frame = whole_can_frame.upper()
        #
        # print("Header: ", can_frame_header)
        # print("Total: ", whole_can_frame)
        return whole_can_frame

    def send_cmd(self, cmd):
        """
        Wrapper to ``send_data`` method.

        Args:
            cmd (str): command fields separated by ':'.
        """

        data = [int(i, 16) for i in cmd.split(":")]
        self.send_data(self.send_id, data)

    def send_data(self, can_id, data):
        """
        Sends a command through the can bus.

        Args:
            can_id (int): static can id.
            data (list): fields of the frame.
        """

        data_len = len(data)
        full_frame_num, left_len = divmod(data_len, self.MAX_CAN_FRAME_LEN)

        if left_len == 0:
            frame_num = full_frame_num
        else:
            frame_num = full_frame_num + 1

        data_offset = 0

        full_msg = []
        for i in range(full_frame_num):
            full_msg.append(can.Message(arbitration_id=can_id, dlc=8, data=data[data_offset:data_offset + self.MAX_CAN_FRAME_LEN], 
                                            is_extended_id=False, is_error_frame=False, is_remote_frame=False))
            data_offset += self.MAX_CAN_FRAME_LEN

        # If there is data left over, the last frame isn't 8byte long

        if left_len > 0:
            full_msg.append(can.Message(arbitration_id=can_id, dlc=left_len, data=data[data_offset:data_offset + left_len], 
                                            is_extended_id=False, is_error_frame=False, is_remote_frame=False))        

        for m in full_msg:
            try:
                self.actual_bus.send(m)
                if self.DBG_LVL_0:
                    print('\ngimbal RS2 Message sent on ', self.actual_bus.channel_info)
            except can.CanError:
                print("\n[ERROR]: gimbal RS2 Message NOT sent")
                return

    def receive(self, bus, stop_event):
        """
        Threading callback function. Defined when the thread is created. This thread listens for coming (received) can messages on a USB port. Reads 1 entry of the rx bus buffer at a time.

        Args:
            bus (can.Bus object): object pointing to the type of bus (from 'can' python package).
            stop_event (threading.Event): works as a flag to stop receiving messages.
        """

        if self.DBG_LVL_0:
            print("Start receiving messages")
        while not stop_event.is_set():
            try:
                rx_msg = bus.recv(1)
                if rx_msg is not None:    
                    self.cntBytes = self.cntBytes + 1
                    self.can_callback(rx_msg)
            except Exception as e:
                #There might be an error due to the gimbal disconnecting itself due to improper balance
                print("[DEBUG]: Error in Gimbal RS2 callback, ", e)

        if self.DBG_LVL_0:
            print("Stopped receiving messages")

    def start_thread_gimbal(self, bitrate=1000000):
        """
        Starts the thread for listening the incoming data (if any) from the gimbal.

        Args:
            bitrate (int, optional): Bitrate for the usb-to-can interface. This is a parameter inherited from can.Bus. Defaults to 1000000.
        """

        try:
            bus = can.interface.Bus(interface="pcan", channel="PCAN_USBBUS1", bitrate=bitrate)
            self.actual_bus = bus
        except Exception as e:
            print(e)
            self.GIMBAL_CONN_SUCCES = False
            print("\n[DEBUG]: Gimbal thread NOT started")
            return

        self.event_stop_thread_gimbal = threading.Event()                              
        t_receive = threading.Thread(target=self.receive, args=(self.actual_bus,self.event_stop_thread_gimbal))
        t_receive.start()

        self.GIMBAL_CONN_SUCCES = True
        print("\n[DEBUG]: Gimbal thread started")

        #self.setSpeedControl(int(self.SPEED_YAW*10), int(self.SPEED_ROLL*10), int(self.SPEED_PITCH*10))

    def stop_thread_gimbal(self):
        """
        Stops the gimbal thread by setting the threading. Event attribute created in ``start_thread_gimbal``.
        """        
        self.event_stop_thread_gimbal.set()        

__init__(speed_yaw=40, speed_pitch=40, speed_roll=40, DBG_LVL_1=False, DBG_LVL_0=False)

Initialize properties of the class, like the DJI SDK frame header, sequence number and others.

Parameters:

Name Type Description Default
speed_yaw int

speed of yaw axis in deg/s. Defaults to 40.

40
speed_pitch int

speed of pitch axis in deg/s. Defaults to 40.

40
speed_roll int

speed of roll axis in deg/s. Defaults to 40.

40
DBG_LVL_1 bool

level of verbose to show at the command line (beta). This shows less verbose than the 0 level. Defaults to False.

False
DBG_LVL_0 bool

level of verbose to show at the command line (beta). Defaults to False.

False
Source code in a2gmeasurements.py
def __init__(self, speed_yaw=40, speed_pitch=40, speed_roll=40, DBG_LVL_1=False, DBG_LVL_0=False):
    """
    Initialize properties of the class, like the DJI SDK frame header, sequence number and others.

    Args:
        speed_yaw (int, optional): speed of yaw axis in deg/s. Defaults to 40.
        speed_pitch (int, optional): speed of pitch axis in deg/s. Defaults to 40.
        speed_roll (int, optional): speed of roll axis in deg/s. Defaults to 40.
        DBG_LVL_1 (bool, optional): level of verbose to show at the command line (beta). This shows less verbose than the 0 level. Defaults to False.
        DBG_LVL_0 (bool, optional): level of verbose to show at the command line (beta). Defaults to False.
    """      

    self.header = 0xAA
    self.enc = 0x00
    self.res1 = 0x00
    self.res2 = 0x00
    self.res3 = 0x00
    self.seq = 0x0002

    self.send_id = 0x223
    self.recv_id = 0x222

    self.MAX_CAN_FRAME_LEN = 8

    self.can_recv_msg_buffer = []
    self.can_recv_msg_len_buffer = []
    self.can_recv_buffer_len = 10

    self.roll = 0.0
    self.pitch = 0.0
    self.yaw = 0.0

    self.SPEED_YAW =  speed_yaw # deg/s
    self.SPEED_PITCH =  speed_pitch # deg/s
    self.SPEED_ROLL =  speed_roll # deg/s

    self.MAIN_LOOP_STOP = True
    self.keyboard_set_flag = False
    self.keyboard_buff = []

    self.cntBytes = 0
    self.TIME_POS_REQ = 0.01
    self.DBG_LVL_0 = DBG_LVL_0
    self.DBG_LVL_1 = DBG_LVL_1

    self.TIME2MOVE_180_DEG_YAW = 180/speed_yaw
    self.TIME2MOVE_180_DEG_YAW = 180/speed_pitch
    self.TIME2MOVE_180_DEG_YAW = 180/speed_roll

assemble_can_msg(cmd_type, cmd_set, cmd_id, data)

Builds a DJI RS2 message frame based on the command to be sent.

Parameters:

Name Type Description Default
cmd_type int

see DJI R SDK Protocol and User Interface document for a description.

required
cmd_set int

see DJI R SDK Protocol and User Interface document for a description.

required
cmd_id int

see DJI R SDK Protocol and User Interface document for a description.

required
data int

see DJI R SDK Protocol and User Interface document for a description.

required

Returns:

Name Type Description
whole_can_frame str

parsed can frame whose fields are separated by ":".

Source code in a2gmeasurements.py
def assemble_can_msg(self, cmd_type, cmd_set, cmd_id, data):
    """
    Builds a DJI RS2 message frame based on the command to be sent.

    Args:
        cmd_type (int): see DJI R SDK Protocol and User Interface document for a description.
        cmd_set (int): see DJI R SDK Protocol and User Interface document for a description.
        cmd_id (int): see DJI R SDK Protocol and User Interface document for a description.
        data (int): see DJI R SDK Protocol and User Interface document for a description.

    Returns:
        whole_can_frame (str): parsed can frame whose fields are separated by ":".
    """

    if data == "":
        can_frame_data = "{prefix}" + \
            ":{cmd_set}:{cmd_id}".format(
                cmd_set=cmd_set, cmd_id=cmd_id)
    else:
        can_frame_data = "{prefix}" + ":{cmd_set}:{cmd_id}:{data}".format(
            cmd_set=cmd_set, cmd_id=cmd_id, data=data)

    cmd_length = len(can_frame_data.split(":")) + 15

    seqnum = self.seq_num()
    # ic(seqnum)
    can_frame_header = "{header:02x}".format(
        header=self.header)  # SOF byte
    can_frame_header += ":" + \
        ("%04x" % (cmd_length))[2:4]  # 1st length byte
    can_frame_header += ":" + \
        ("%04x" % (cmd_length))[0:2]  # 2nd length byte
    can_frame_header += ":" + \
        "{cmd_type}".format(cmd_type=cmd_type)  # Command Type
    can_frame_header += ":" + "{enc:02x}".format(enc=self.enc)  # Encoding
    can_frame_header += ":" + \
        "{res1:02x}".format(res1=self.res1)  # Reserved 1
    can_frame_header += ":" + \
        "{res2:02x}".format(res2=self.res2)  # Reserved 2
    can_frame_header += ":" + \
        "{res3:02x}".format(res3=self.res3)  # Reserved 3
    can_frame_header += ":" + seqnum    # Sequence number

    crc_obj = Checksum()
    can_frame_header += ":" + crc_obj.calc_crc16(can_frame_header)

    # hex_seq = [eval("0x" + hex_num) for hex_num in can_frame_header.split(":")]

    whole_can_frame = can_frame_data.format(prefix=can_frame_header)
    whole_can_frame += ":" + crc_obj.calc_crc32(whole_can_frame)
    whole_can_frame = whole_can_frame.upper()
    #
    # print("Header: ", can_frame_header)
    # print("Total: ", whole_can_frame)
    return whole_can_frame

can_buffer_to_full_frame()

Parse the full DJI R frame message from the can buffer.

Its fields are explained in the DJI R SDK Protocol and User Interface.

Returns:

Name Type Description
full_msg_frames list

a full message frame

Source code in a2gmeasurements.py
def can_buffer_to_full_frame(self):
    """
    Parse the full DJI R frame message from the can buffer.

    Its fields are explained in the DJI R SDK Protocol and User Interface.

    Returns:
        full_msg_frames (list): a full message frame
    """

    full_msg_frames = []
    full_frame_counter = 0
    for i in range(len(self.can_recv_msg_buffer)):
        msg = self.can_recv_msg_buffer[i]
        length = self.can_recv_msg_len_buffer[i]
        msg = msg[:length]
        cmd_data = ':'.join(msg)
        # print("len: " + str(length) + " - " +
        #       str(msg) + " -> " + cmd_data)
        if msg[0] == "AA":
            full_msg_frames.append(msg)
            full_frame_counter += 1
        if msg[0] != "AA" and (full_frame_counter > 0):
            # full_msg_frames[-1] += ":"
            for byte in msg:
                full_msg_frames[-1].append(byte)
    return full_msg_frames

can_callback(data)

Callback for the thread in charge of checking the USB-to-CAN input (receive).

Parameters:

Name Type Description Default
data list

DJI RS2 frame message

required
Source code in a2gmeasurements.py
def can_callback(self, data):
    """
    Callback for the thread in charge of checking the USB-to-CAN input (receive).

    Args:
        data (list): DJI RS2 frame message
    """

    str_data = ['{:02X}'.format(i) for i in data.data]

    self.can_recv_msg_buffer.append(str_data)
    self.can_recv_msg_len_buffer.append(data.dlc)

    if len(self.can_recv_msg_buffer) > self.can_recv_buffer_len:
        self.can_recv_msg_buffer.pop(0)
        self.can_recv_msg_len_buffer.pop(0)

    full_msg_frames = self.can_buffer_to_full_frame()

    for hex_data in full_msg_frames:
        if self.validate_api_call(hex_data):
            request_data = ":".join(hex_data[12:14])
            if request_data == "0E:02":
                # This is response data to a get position request
                if self.DBG_LVL_1:
                    print('\nResponse received to request_current_position on gimbal RS2')
                self.parse_position_response(hex_data)
            elif request_data == "0E:00":
                # Parse response to control handheld gimbal position
                1
            elif request_data == "0E:01":
                # Parse response to Control handheld gimbal speed
                print('\nObtained a response to setSpeedControl')
            elif request_data == "0E:03":
                # Parse response to Set handheld gimbal limit angle
                1
            elif request_data == "0E:04":
                # Parse response to Obtain handheld gimbal limit angle
                1
            elif request_data == "0E:05":
                # Parse response to Set handheld gimbal motor stifness
                1
            elif request_data == "0E:06":
                # Parse response to Obtain handheld gimbal limit angle
                1
            elif request_data == "0E:07":
                # Parse response to Set information push of handheld gimbal parameters
                1
            elif request_data == "0E:08":
                # Parse response to Push handheld gimbal parameters
                1
            elif request_data == "0E:09":
                # Parse response to Obtain module version number
                1
            elif request_data == "0E:0A":
                # Parse response to Push joystick control comand
                1
            elif request_data == "0E:0B":
                # Parse response to Obtain handheld gimbal user parameters
                1
            elif request_data == "0E:0C":
                # Parse response to Set handheld gimbal user parameters
                1
            elif request_data == "0E:0D":
                # Parse response to Set handheld gimbal operation mode
                1
            elif request_data == "0E:0E":
                # Parse response to Set gimbal Recenter, Selfie, amd Follow modes
                1
            elif request_data == "0D:00":
                # Parse response to Third-party motion comand
                1
            elif request_data == "0D:01":
                # Parse response to Third-party camera status obtain comand
                1
            else:
                print('\n[ERROR]: error on gimbal command reception, error code: ', request_data)

parse_position_response(data_frame)

Retrieve the position from the full DJI frame message.

Parameters:

Name Type Description Default
data_frame list

DJI RS2 frame message

required
Source code in a2gmeasurements.py
def parse_position_response(self, data_frame):
    """
    Retrieve the position from the full DJI frame message.

    Args:
        data_frame (list): DJI RS2 frame message
    """

    pos_data = data_frame[16:-4]
    yaw = int(
        '0x' + pos_data[1] + pos_data[0], base=16)
    roll = int(
        '0x' + pos_data[3] + pos_data[2], base=16)
    pitch = int(
        '0x' + pos_data[5] + pos_data[4], base=16)
    if yaw > 1800:
        yaw -= 65538
    if roll > 1800:
        roll -= 65538
    if pitch > 1800:
        pitch -= 65538

    # Radians
    #self.yaw = yaw * 0.1 * np.pi / 180
    #self.roll = roll * 0.1 * np.pi / 180
    #self.pitch = pitch * 0.1 * np.pi / 180

    # Degrees
    self.yaw = yaw * 0.1 
    self.roll = roll * 0.1
    self.pitch = pitch * 0.1

    output = "Pitch: " + \
        str(self.pitch) + ", Yaw: " + \
        str(self.yaw) + ", Roll: " + str(self.roll)

    print(output + '\n')

receive(bus, stop_event)

Threading callback function. Defined when the thread is created. This thread listens for coming (received) can messages on a USB port. Reads 1 entry of the rx bus buffer at a time.

Parameters:

Name Type Description Default
bus can.Bus object

object pointing to the type of bus (from 'can' python package).

required
stop_event Event

works as a flag to stop receiving messages.

required
Source code in a2gmeasurements.py
def receive(self, bus, stop_event):
    """
    Threading callback function. Defined when the thread is created. This thread listens for coming (received) can messages on a USB port. Reads 1 entry of the rx bus buffer at a time.

    Args:
        bus (can.Bus object): object pointing to the type of bus (from 'can' python package).
        stop_event (threading.Event): works as a flag to stop receiving messages.
    """

    if self.DBG_LVL_0:
        print("Start receiving messages")
    while not stop_event.is_set():
        try:
            rx_msg = bus.recv(1)
            if rx_msg is not None:    
                self.cntBytes = self.cntBytes + 1
                self.can_callback(rx_msg)
        except Exception as e:
            #There might be an error due to the gimbal disconnecting itself due to improper balance
            print("[DEBUG]: Error in Gimbal RS2 callback, ", e)

    if self.DBG_LVL_0:
        print("Stopped receiving messages")

request_current_position()

Sends command to request the current position of the gimbal.

Blocks thread execution for the time given by attribute TIME_POS_REQ to allow the response to be received.

Source code in a2gmeasurements.py
def request_current_position(self):
    """
    Sends command to request the current position of the gimbal.

    Blocks thread execution for the time given by attribute ``TIME_POS_REQ`` to allow the response to be received.
    """

    hex_data = [0x01]
    pack_data = ['{:02X}'.format(i)
                 for i in hex_data]
    cmd_data = ':'.join(pack_data)
    cmd = self.assemble_can_msg(cmd_type='03', cmd_set='0E',
                                cmd_id='02', data=cmd_data)
    self.send_cmd(cmd)

    # Time to receive response from gimbal
    time.sleep(self.TIME_POS_REQ)

send_cmd(cmd)

Wrapper to send_data method.

Parameters:

Name Type Description Default
cmd str

command fields separated by ':'.

required
Source code in a2gmeasurements.py
def send_cmd(self, cmd):
    """
    Wrapper to ``send_data`` method.

    Args:
        cmd (str): command fields separated by ':'.
    """

    data = [int(i, 16) for i in cmd.split(":")]
    self.send_data(self.send_id, data)

send_data(can_id, data)

Sends a command through the can bus.

Parameters:

Name Type Description Default
can_id int

static can id.

required
data list

fields of the frame.

required
Source code in a2gmeasurements.py
def send_data(self, can_id, data):
    """
    Sends a command through the can bus.

    Args:
        can_id (int): static can id.
        data (list): fields of the frame.
    """

    data_len = len(data)
    full_frame_num, left_len = divmod(data_len, self.MAX_CAN_FRAME_LEN)

    if left_len == 0:
        frame_num = full_frame_num
    else:
        frame_num = full_frame_num + 1

    data_offset = 0

    full_msg = []
    for i in range(full_frame_num):
        full_msg.append(can.Message(arbitration_id=can_id, dlc=8, data=data[data_offset:data_offset + self.MAX_CAN_FRAME_LEN], 
                                        is_extended_id=False, is_error_frame=False, is_remote_frame=False))
        data_offset += self.MAX_CAN_FRAME_LEN

    # If there is data left over, the last frame isn't 8byte long

    if left_len > 0:
        full_msg.append(can.Message(arbitration_id=can_id, dlc=left_len, data=data[data_offset:data_offset + left_len], 
                                        is_extended_id=False, is_error_frame=False, is_remote_frame=False))        

    for m in full_msg:
        try:
            self.actual_bus.send(m)
            if self.DBG_LVL_0:
                print('\ngimbal RS2 Message sent on ', self.actual_bus.channel_info)
        except can.CanError:
            print("\n[ERROR]: gimbal RS2 Message NOT sent")
            return

seq_num()

Updates the sequence number of the gimbal data.

Returns:

Name Type Description
seq_str str

number in hexadecimal split by a :.

Source code in a2gmeasurements.py
def seq_num(self):
    """
    Updates the sequence number of the gimbal data.

    Returns:
        seq_str (str): number in hexadecimal split by a ``:``.
    """

    if self.seq >= 0xFFFD:
        self.seq = 0x0002
    self.seq += 1
    # Seq_Init_Data = 0x1122
    seq_str = "%04x" % self.seq
    return seq_str[2:] + ":" + seq_str[0:2]

setPosControl(yaw, roll, pitch, ctrl_byte=1, time_for_action=20)

Set the gimbal position by providing the yaw, roll and pitch.

Parameters:

Name Type Description Default
yaw int

yaw angle. Value should be between -1800 and 1800.

required
roll int

roll angle. value should be betweeen -1800 and 1800. However, gimbal might stop if it reachs its maximum/minimum (this) axis value.

required
pitch int

pitch angle. value should be betweeen -1800 and 1800. However, gimbal might stop if it reachs its maximum/minimum (this) axis value.

required
ctrl_byte hexadecimal

Absolute or relative movement. For absolute use 0x01, while for relative use 0x00. Defaults to 0x01.

1
time_for_action hexadecimal

Time it takes for the gimbal to move to desired position. Implicitly, this command controls the speed of gimbal. It is given in units of 0.1 s. For example: a value of 0x14 is 20, which means that the gimbal will take 2s (20*0.1) to reach its destination. Defaults to 0x14.

20

Returns:

Name Type Description
True bool

always returns the same, unless there is a raising error.

Source code in a2gmeasurements.py
def setPosControl(self, yaw, roll, pitch, ctrl_byte=0x01, time_for_action=0x14):
    """
    Set the gimbal position by providing the yaw, roll and pitch.

    Args:
        yaw (int): yaw angle. Value should be between -1800 and 1800.
        roll (int): roll angle. value should be betweeen -1800 and 1800. However, gimbal might stop if it reachs its maximum/minimum (this) axis value.
        pitch (int): pitch angle. value should be betweeen -1800 and 1800. However, gimbal might stop if it reachs its maximum/minimum (this) axis value.
        ctrl_byte (hexadecimal, optional): Absolute or relative movement. For absolute use 0x01, while for relative use 0x00. Defaults to 0x01.
        time_for_action (hexadecimal, optional): Time it takes for the gimbal to move to desired position. Implicitly, this command controls the speed of gimbal. It is given in units of 0.1 s. For example: a value of 0x14 is 20, which means that the gimbal will take 2s (20*0.1) to reach its destination. Defaults to 0x14.

    Returns:
        True (bool): always returns the same, unless there is a raising error.
    """

    # yaw, roll, pitch in 0.1 steps (-1800,1800)
    # ctrl_byte always to 1
    # time_for_action to define speed in 0.1sec
    hex_data = struct.pack('<3h2B', yaw, roll, pitch,
                           ctrl_byte, time_for_action)

    pack_data = ['{:02X}'.format(i) for i in hex_data]
    cmd_data = ':'.join(pack_data)
    # print(cmd_data)
    cmd = self.assemble_can_msg(cmd_type='03', cmd_set='0E',
                                cmd_id='00', data=cmd_data)

    self.send_cmd(cmd)

    # Save the current pitch and yaw values
    if ctrl_byte == 0x00:
        self.pitch = self.pitch + pitch
        self.yaw = self.yaw + yaw
    elif ctrl_byte == 0x01:
        self.pitch = pitch
        self.yaw = yaw

    if self.pitch > 1800 and self.pitch <= 3600:
        self.pitch = self.pitch - 3600 
    elif self.pitch < -1800 and self.pitch >= -3600:
        self.pitch = self.pitch + 3600

    if self.yaw > 1800 and self.yaw <= 3600:
        self.yaw = self.yaw - 3600
    elif self.yaw < -1800 and self.yaw >= -3600:
        self.yaw = self.yaw + 3600

    return True    

setSpeedControl(yaw, roll, pitch, ctrl_byte=128)

Sets speed for each axis of the gimbal.

Always after seting the speed the gimbal roll is moved (strange behaviour).

Developer has to send a setPosControl to set again the position of the gimbal where it was previously.

Parameters:

Name Type Description Default
yaw int

yaw speed in units of 0.1 deg/s.

required
roll int

roll speed in units of 0.1 deg/s.

required
pitch int

pitch speed in units of 0.1 deg/s.

required
ctrl_byte hexadecimal

check DJI SDK manual. Defaults to 0x80.

128

Returns:

Name Type Description
True bool

True if provided arguments are within acceptable range. False otherwise.

Source code in a2gmeasurements.py
def setSpeedControl(self, yaw, roll, pitch, ctrl_byte=0x80):
    """
    Sets speed for each axis of the gimbal.

    Always after seting the speed the gimbal roll is moved (strange behaviour). 

    Developer has to send a setPosControl to set again the position of the gimbal where it was previously.

    Args:
        yaw (int): yaw speed in units of 0.1 deg/s.
        roll (int): roll speed in units of 0.1 deg/s.
        pitch (int): pitch speed in units of 0.1 deg/s.
        ctrl_byte (hexadecimal, optional): check DJI SDK manual. Defaults to 0x80.

    Returns:
        True (bool): True if provided arguments are within acceptable range. False otherwise.
    """

    if -3600 <= yaw <= 3600 and -3600 <= roll <= 3600 and -3600 <= pitch <= 3600:

        hex_data = struct.pack('<3hB', yaw, roll, pitch, ctrl_byte)
        pack_data = ['{:02X}'.format(i) for i in hex_data]
        cmd_data = ':'.join(pack_data)

        cmd = self.assemble_can_msg(cmd_type='03', cmd_set='0E',
                                    cmd_id='01', data=cmd_data)
        # print('cmd---data {}'.format(cmd))
        self.send_cmd(cmd)

        return True
    else:
        return False

start_thread_gimbal(bitrate=1000000)

Starts the thread for listening the incoming data (if any) from the gimbal.

Parameters:

Name Type Description Default
bitrate int

Bitrate for the usb-to-can interface. This is a parameter inherited from can.Bus. Defaults to 1000000.

1000000
Source code in a2gmeasurements.py
def start_thread_gimbal(self, bitrate=1000000):
    """
    Starts the thread for listening the incoming data (if any) from the gimbal.

    Args:
        bitrate (int, optional): Bitrate for the usb-to-can interface. This is a parameter inherited from can.Bus. Defaults to 1000000.
    """

    try:
        bus = can.interface.Bus(interface="pcan", channel="PCAN_USBBUS1", bitrate=bitrate)
        self.actual_bus = bus
    except Exception as e:
        print(e)
        self.GIMBAL_CONN_SUCCES = False
        print("\n[DEBUG]: Gimbal thread NOT started")
        return

    self.event_stop_thread_gimbal = threading.Event()                              
    t_receive = threading.Thread(target=self.receive, args=(self.actual_bus,self.event_stop_thread_gimbal))
    t_receive.start()

    self.GIMBAL_CONN_SUCCES = True
    print("\n[DEBUG]: Gimbal thread started")

stop_thread_gimbal()

Stops the gimbal thread by setting the threading. Event attribute created in start_thread_gimbal.

Source code in a2gmeasurements.py
def stop_thread_gimbal(self):
    """
    Stops the gimbal thread by setting the threading. Event attribute created in ``start_thread_gimbal``.
    """        
    self.event_stop_thread_gimbal.set()        

validate_api_call(data_frame)

CRC error check.

Parameters:

Name Type Description Default
data_frame list

DJI RS2 frame message

required

Returns:

Name Type Description
validated bool

passed or not the crc check

Source code in a2gmeasurements.py
def validate_api_call(self, data_frame):
    """
    CRC error check.

    Args:
        data_frame (list): DJI RS2 frame message

    Returns:
        validated (bool): passed or not the crc check
    """

    validated = False
    check_sum = ':'.join(data_frame[-4:])
    data = ':'.join(data_frame[:-4])
    # # print(len(hex_data))
    # # print(data)
    if len(data_frame) >= 8:
        crc_obj = Checksum()
        if check_sum == crc_obj.calc_crc32(data):
            #         # print("Approved Message: " + str(hex_data))
            header = ':'.join(data_frame[:10])
            header_check_sum = ':'.join(data_frame[10:12])
            if header_check_sum == crc_obj.calc_crc16(header):
                validated = True
    return validated