Skip to content

TMotor

CANManagerServo

TMotor servo mode CAN communication manager

Source code in opensourceleg/actuators/tmotor.py
class CANManagerServo:
    """TMotor servo mode CAN communication manager"""

    _instance: Optional["CANManagerServo"] = None
    debug: bool = False
    _initialized: bool = False

    def __new__(cls) -> "CANManagerServo":
        if not cls._instance:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self) -> None:
        if hasattr(self, "_initialized") and self._initialized:
            return

        LOGGER.info("Initializing CAN Manager for TMotor Servo Mode")
        try:
            # NOTE: CAN interface must be configured before running this code.
            # Please run the following commands before using TMotor servo mode:
            # sudo /sbin/ip link set can0 down
            # sudo /sbin/ip link set can0 up type can bitrate 1000000
            # sudo ifconfig can0 txqueuelen 1000

            self.bus = can.interface.Bus(channel="can0", bustype="socketcan")
            self.notifier = can.Notifier(bus=self.bus, listeners=[])

            LOGGER.info(f"CAN bus connected: {self.bus}")
            self._initialized = True

        except Exception as e:
            LOGGER.error(f"CAN bus initialization failed: {e}")
            LOGGER.error("Please ensure CAN interface is configured. Run:")
            LOGGER.error("sudo /sbin/ip link set can0 down")
            LOGGER.error("sudo /sbin/ip link set can0 up type can bitrate 1000000")
            LOGGER.error("sudo ifconfig can0 txqueuelen 1000")
            raise RuntimeError("CAN bus initialization failed. Please configure CAN interface first.") from e

    def __del__(self) -> None:
        try:
            if hasattr(self, "bus"):
                self.bus.shutdown()
        except Exception as e:
            LOGGER.warning(f"Error shutting down CAN bus: {e}")

    def send_message(self, motor_id: int, data: list, data_len: int) -> None:
        """Send CAN message"""
        if data_len > 8:
            raise ValueError(f"Data too long in message for motor {motor_id}")

        if self.debug:
            LOGGER.info(f'ID: {hex(motor_id)} Data: [{", ".join(hex(d) for d in data)}]')

        message = can.Message(arbitration_id=motor_id, data=data, is_extended_id=True)

        try:
            self.bus.send(message)
        except can.CanError as e:
            LOGGER.error(f"Failed to send CAN message: {e}")

    def power_on(self, motor_id: int) -> None:
        """Send power on command"""
        self.send_message(motor_id, [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC], 8)

    def power_off(self, motor_id: int) -> None:
        """Send power off command"""
        self.send_message(motor_id, [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD], 8)

    def set_current(self, controller_id: int, current: float) -> None:
        """Send current control command"""
        current_protocol = int(current * 1000.0)  # Convert to protocol units
        buffer = self._pack_int32(current_protocol)
        message_id = (CAN_PACKET_ID["SET_CURRENT"] << 8) | controller_id
        self.send_message(message_id, buffer, len(buffer))

    def set_velocity(self, controller_id: int, velocity: float) -> None:
        """Send velocity control command"""
        buffer = self._pack_int32(int(velocity))
        message_id = (CAN_PACKET_ID["SET_RPM"] << 8) | controller_id
        self.send_message(message_id, buffer, len(buffer))

    def set_position(self, controller_id: int, position: float) -> None:
        """Send position control command"""
        # NOTE(IMPORTANT): TMotor spec uses 1,000,000 scale (0.000001° resolution) per documentation
        # Current implementation uses 10 scale (0.1° resolution) for simplicity
        # To enable high-precision position control, change to: int(position * 1000000.0)
        buffer = self._pack_int32(int(position * 10.0))  # 0.1 degree resolution
        message_id = (CAN_PACKET_ID["SET_POS"] << 8) | controller_id
        self.send_message(message_id, buffer, len(buffer))

    def set_origin(self, controller_id: int, mode: int = 1) -> None:
        """Set motor origin"""
        buffer = [mode]
        message_id = (CAN_PACKET_ID["SET_ORIGIN_HERE"] << 8) | controller_id
        self.send_message(message_id, buffer, len(buffer))

    def set_control_mode(self, controller_id: int, mode: int) -> None:
        """Send control mode switch command"""
        buffer = [mode]
        message_id = (7 << 8) | controller_id  # Assume packet_id=7 for mode switching
        self.send_message(message_id, buffer, len(buffer))

    @staticmethod
    def _pack_int32(number: int) -> list:
        """Pack 32-bit integer to byte list (big-endian, MSB first)"""
        if number < 0:
            number = (1 << 32) + number
        # TMotor expects big-endian (MSB first) per documentation
        return [(number >> 24) & 0xFF, (number >> 16) & 0xFF, (number >> 8) & 0xFF, number & 0xFF]

    def parse_servo_message(self, data: bytes) -> ServoMotorState:
        """Parse servo message to motor state"""
        if len(data) < 8:
            raise ValueError(f"Invalid message length: {len(data)}")

        # Fix endian issue: TMotor packs as big-endian (Data[n]<<8 | Data[n+1])
        pos_int = int.from_bytes(data[0:2], byteorder="big", signed=True)
        spd_int = int.from_bytes(data[2:4], byteorder="big", signed=True)
        cur_int = int.from_bytes(data[4:6], byteorder="big", signed=True)

        motor_pos = float(pos_int * 0.1)  # position (degrees)
        motor_spd = float(spd_int * 10.0)  # velocity (ERPM)
        motor_cur = float(cur_int * 0.01)  # current (amps)
        motor_temp = float(data[6])  # temperature (celsius)
        motor_error = int(data[7])  # error code

        return ServoMotorState(motor_pos, motor_spd, motor_cur, motor_temp, motor_error)

    def add_motor_listener(self, motor: "TMotorServoActuator") -> None:
        """Add motor listener"""
        listener = MotorListener(self, motor)
        self.notifier.add_listener(listener)

    def enable_debug(self, enable: bool = True) -> None:
        """Enable/disable debug mode"""
        self.debug = enable
        if enable:
            LOGGER.info("CAN debug mode enabled")

add_motor_listener(motor)

Add motor listener

Source code in opensourceleg/actuators/tmotor.py
def add_motor_listener(self, motor: "TMotorServoActuator") -> None:
    """Add motor listener"""
    listener = MotorListener(self, motor)
    self.notifier.add_listener(listener)

enable_debug(enable=True)

Enable/disable debug mode

Source code in opensourceleg/actuators/tmotor.py
def enable_debug(self, enable: bool = True) -> None:
    """Enable/disable debug mode"""
    self.debug = enable
    if enable:
        LOGGER.info("CAN debug mode enabled")

parse_servo_message(data)

Parse servo message to motor state

Source code in opensourceleg/actuators/tmotor.py
def parse_servo_message(self, data: bytes) -> ServoMotorState:
    """Parse servo message to motor state"""
    if len(data) < 8:
        raise ValueError(f"Invalid message length: {len(data)}")

    # Fix endian issue: TMotor packs as big-endian (Data[n]<<8 | Data[n+1])
    pos_int = int.from_bytes(data[0:2], byteorder="big", signed=True)
    spd_int = int.from_bytes(data[2:4], byteorder="big", signed=True)
    cur_int = int.from_bytes(data[4:6], byteorder="big", signed=True)

    motor_pos = float(pos_int * 0.1)  # position (degrees)
    motor_spd = float(spd_int * 10.0)  # velocity (ERPM)
    motor_cur = float(cur_int * 0.01)  # current (amps)
    motor_temp = float(data[6])  # temperature (celsius)
    motor_error = int(data[7])  # error code

    return ServoMotorState(motor_pos, motor_spd, motor_cur, motor_temp, motor_error)

power_off(motor_id)

Send power off command

Source code in opensourceleg/actuators/tmotor.py
def power_off(self, motor_id: int) -> None:
    """Send power off command"""
    self.send_message(motor_id, [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFD], 8)

power_on(motor_id)

Send power on command

Source code in opensourceleg/actuators/tmotor.py
def power_on(self, motor_id: int) -> None:
    """Send power on command"""
    self.send_message(motor_id, [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFC], 8)

send_message(motor_id, data, data_len)

Send CAN message

Source code in opensourceleg/actuators/tmotor.py
def send_message(self, motor_id: int, data: list, data_len: int) -> None:
    """Send CAN message"""
    if data_len > 8:
        raise ValueError(f"Data too long in message for motor {motor_id}")

    if self.debug:
        LOGGER.info(f'ID: {hex(motor_id)} Data: [{", ".join(hex(d) for d in data)}]')

    message = can.Message(arbitration_id=motor_id, data=data, is_extended_id=True)

    try:
        self.bus.send(message)
    except can.CanError as e:
        LOGGER.error(f"Failed to send CAN message: {e}")

set_control_mode(controller_id, mode)

Send control mode switch command

Source code in opensourceleg/actuators/tmotor.py
def set_control_mode(self, controller_id: int, mode: int) -> None:
    """Send control mode switch command"""
    buffer = [mode]
    message_id = (7 << 8) | controller_id  # Assume packet_id=7 for mode switching
    self.send_message(message_id, buffer, len(buffer))

set_current(controller_id, current)

Send current control command

Source code in opensourceleg/actuators/tmotor.py
def set_current(self, controller_id: int, current: float) -> None:
    """Send current control command"""
    current_protocol = int(current * 1000.0)  # Convert to protocol units
    buffer = self._pack_int32(current_protocol)
    message_id = (CAN_PACKET_ID["SET_CURRENT"] << 8) | controller_id
    self.send_message(message_id, buffer, len(buffer))

set_origin(controller_id, mode=1)

Set motor origin

Source code in opensourceleg/actuators/tmotor.py
def set_origin(self, controller_id: int, mode: int = 1) -> None:
    """Set motor origin"""
    buffer = [mode]
    message_id = (CAN_PACKET_ID["SET_ORIGIN_HERE"] << 8) | controller_id
    self.send_message(message_id, buffer, len(buffer))

set_position(controller_id, position)

Send position control command

Source code in opensourceleg/actuators/tmotor.py
def set_position(self, controller_id: int, position: float) -> None:
    """Send position control command"""
    # NOTE(IMPORTANT): TMotor spec uses 1,000,000 scale (0.000001° resolution) per documentation
    # Current implementation uses 10 scale (0.1° resolution) for simplicity
    # To enable high-precision position control, change to: int(position * 1000000.0)
    buffer = self._pack_int32(int(position * 10.0))  # 0.1 degree resolution
    message_id = (CAN_PACKET_ID["SET_POS"] << 8) | controller_id
    self.send_message(message_id, buffer, len(buffer))

set_velocity(controller_id, velocity)

Send velocity control command

Source code in opensourceleg/actuators/tmotor.py
def set_velocity(self, controller_id: int, velocity: float) -> None:
    """Send velocity control command"""
    buffer = self._pack_int32(int(velocity))
    message_id = (CAN_PACKET_ID["SET_RPM"] << 8) | controller_id
    self.send_message(message_id, buffer, len(buffer))

MotorListener

Bases: Listener

CAN message listener

Source code in opensourceleg/actuators/tmotor.py
class MotorListener(can.Listener):
    """CAN message listener"""

    def __init__(self, canman: CANManagerServo, motor: "TMotorServoActuator") -> None:
        self.canman = canman
        self.motor = motor

    def on_message_received(self, msg: can.Message) -> None:
        """Handle received CAN message"""
        data = bytes(msg.data)
        motor_id = msg.arbitration_id & 0x00000FF
        if motor_id == self.motor.motor_id:
            self.motor._update_state_async(self.canman.parse_servo_message(data))

on_message_received(msg)

Handle received CAN message

Source code in opensourceleg/actuators/tmotor.py
def on_message_received(self, msg: can.Message) -> None:
    """Handle received CAN message"""
    data = bytes(msg.data)
    motor_id = msg.arbitration_id & 0x00000FF
    if motor_id == self.motor.motor_id:
        self.motor._update_state_async(self.canman.parse_servo_message(data))

ServoControlMode

Bases: Enum

TMotor servo control modes

Source code in opensourceleg/actuators/tmotor.py
class ServoControlMode(Enum):
    """TMotor servo control modes"""

    POSITION = 4
    VELOCITY = 3
    CURRENT = 1
    IDLE = 7

ServoMotorState dataclass

Motor state data structure

Source code in opensourceleg/actuators/tmotor.py
@dataclass
class ServoMotorState:
    """Motor state data structure"""

    position: float = 0.0  # degrees
    velocity: float = 0.0  # ERPM
    current: float = 0.0  # amps
    temperature: float = 0.0  # celsius
    error: int = 0

TMotorServoActuator

Bases: ActuatorBase

TMotor servo mode actuator for AK series motors.

Before using this actuator, the CAN interface must be configured:

sudo /sbin/ip link set can0 down sudo /sbin/ip link set can0 up type can bitrate 1000000

For detailed setup instructions, see docs/tutorials/actuators/tmotor_servo_setup.md

Example

with TMotorServoActuator(motor_type="AK80-9", motor_id=1) as motor: ... motor.set_control_mode(CONTROL_MODES.CURRENT) ... motor.set_output_torque(2.5) ... motor.update()

Source code in opensourceleg/actuators/tmotor.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
class TMotorServoActuator(ActuatorBase):
    """
    TMotor servo mode actuator for AK series motors.

    Important: Before using this actuator, the CAN interface must be configured:
        sudo /sbin/ip link set can0 down
        sudo /sbin/ip link set can0 up type can bitrate 1000000

    For detailed setup instructions, see docs/tutorials/actuators/tmotor_servo_setup.md

    Example:
        >>> with TMotorServoActuator(motor_type="AK80-9", motor_id=1) as motor:
        ...     motor.set_control_mode(CONTROL_MODES.CURRENT)
        ...     motor.set_output_torque(2.5)
        ...     motor.update()
    """

    def __init__(
        self,
        tag: str = "TMotorServoActuator",
        motor_type: str = "AK80-9",
        motor_id: int = 104,
        gear_ratio: float = 1.0,
        frequency: int = 1000,
        offline: bool = False,
        max_temperature: float = 80.0,
        current_mode: str = "driver",
        **kwargs: Any,
    ) -> None:
        """
        Initialize TMotor servo actuator

        Args:
            tag: actuator identifier
            motor_type: motor model (AK80-9, AK10-9)
            motor_id: CAN ID
            gear_ratio: gear ratio
            frequency: control frequency Hz
            offline: offline mode
            max_temperature: maximum temperature
            current_mode: current convention ('driver', 'amplitude-invariant', 'power-invariant')
                - 'driver': use driver's native current (default)
                - 'amplitude-invariant': true phase current convention
                - 'power-invariant': power-invariant current convention
        """
        # Validate motor type
        if motor_type not in TMOTOR_MODELS:
            raise ValueError(f"Unsupported motor type: {motor_type}")

        # Validate current mode
        if current_mode not in ["driver", "amplitude-invariant", "power-invariant"]:
            raise ValueError(
                f"Invalid current_mode: {current_mode}. Must be 'driver', 'amplitude-invariant', or 'power-invariant'"
            )

        super().__init__(
            tag=tag,
            gear_ratio=gear_ratio,
            motor_constants=TMOTOR_SERVO_CONSTANTS,
            frequency=frequency,
            offline=offline,
            **kwargs,
        )

        # Motor configuration
        self.motor_type = motor_type
        self.motor_id = motor_id
        self.max_temperature = max_temperature
        self._motor_params = TMOTOR_MODELS[motor_type]
        self.current_mode = current_mode

        # Current conversion factors
        # Physical: I_drv (line current) = K * I_user
        # Code: driver_current = user_current / self._current_scale
        # So: I_drv = I_user / scale, which means scale = I_user / I_drv = 1 / K
        # Kt relationship: τ = Kt_drv * I_drv = Kt_user * I_user
        # So: Kt_user = Kt_drv * (I_drv / I_user) = Kt_drv * K
        if current_mode == "amplitude-invariant":
            # I_drv (line) = √3 * I_phase, so scale = I_phase / I_drv = 1/√3
            # Kt_amp = Kt_drv * √3
            self._current_scale = 1.0 / np.sqrt(3.0)
            self._kt_scale = np.sqrt(3.0)
        elif current_mode == "power-invariant":
            # I_drv (line) = √2 * I_pwr, so scale = I_pwr / I_drv = 1/√2
            # Kt_pwr = Kt_drv * √2
            self._current_scale = 1.0 / np.sqrt(2.0)
            self._kt_scale = np.sqrt(2.0)
        else:  # driver
            self._current_scale = 1.0
            self._kt_scale = 1.0

        # CAN communication
        self._canman: Optional[CANManagerServo] = None
        if not self.is_offline:
            self._canman = CANManagerServo()
            self._canman.add_motor_listener(self)

        # State management
        self._motor_state = ServoMotorState()
        self._motor_state_async = ServoMotorState()
        self._servo_mode = ServoControlMode.IDLE

        # Error handling
        self._error_code: Optional[int] = None
        self._error_message: Optional[str] = None

        # Time management
        self._start_time = time.time()
        self._last_update_time = self._start_time
        self._last_command_time: Optional[float] = None

        # Thermal management
        self._thermal_model = ThermalModel(
            motor_constants=self._MOTOR_CONSTANTS,
            actuator_tag=self.tag,
        )

        LOGGER.info(f"Initialized TMotor servo: {self.motor_type} ID:{self.motor_id} (current_mode: {current_mode})")

    @property
    def _CONTROL_MODE_CONFIGS(self) -> CONTROL_MODE_CONFIGS:
        return TMOTOR_SERVO_CONTROL_MODE_CONFIGS

    def device_info_string(self) -> str:
        return f"{self.motor_type} ID:{self.motor_id}"

    @check_actuator_connection
    def start(self) -> None:
        """Start motor"""
        LOGGER.info(f"Starting {self.device_info_string()}")

        if not self.is_offline and self._canman:
            self._canman.power_on(self.motor_id)

            # Poll for motor readiness (max 1 second timeout)
            start_time = time.time()
            timeout = 1.0
            poll_interval = 0.01  # 10ms polling interval
            motor_ready = False

            while time.time() - start_time < timeout:
                # The motor should start sending status messages after power on
                # Check if we've received valid data by checking if position/velocity/current are non-zero
                # or if the motor_state has been updated (timestamp check could be added)
                if (
                    self._motor_state_async.position != 0.0
                    or self._motor_state_async.velocity != 0.0
                    or self._motor_state_async.current != 0.0
                    or self._motor_state_async.temperature > 0.0
                ):
                    motor_ready = True
                    LOGGER.debug(f"Motor ready after {time.time() - start_time:.3f} seconds")
                    break
                time.sleep(poll_interval)

            if not motor_ready:
                # Fall back to minimum wait time if no status received
                LOGGER.warning("No status received from motor, using fallback delay")
                time.sleep(0.1)  # Minimum wait time

            # Set initial control mode to IDLE
            self._canman.set_control_mode(self.motor_id, ServoControlMode.IDLE.value)

            # Poll for mode switch confirmation (max 200ms timeout)
            mode_start_time = time.time()
            mode_timeout = 0.2

            while time.time() - mode_start_time < mode_timeout:
                self.update()  # Read status
                if self._motor_state.error == 0:
                    LOGGER.debug(f"Mode switch confirmed after {time.time() - mode_start_time:.3f} seconds")
                    break
                time.sleep(poll_interval)

            # Final status check
            if self._motor_state.error != 0:
                raise RuntimeError(f"Motor startup failed with error: {self._motor_state.error}")

        self._is_open = True
        self._is_streaming = True

    @check_actuator_stream
    @check_actuator_open
    def stop(self) -> None:
        """Stop motor"""
        LOGGER.info(f"Stopping {self.device_info_string()}")

        if not self.is_offline and self._canman:
            self._canman.power_off(self.motor_id)

        self._is_open = False
        self._is_streaming = False

    def update(self) -> None:
        """Update motor state"""
        # Temperature check
        if self.case_temperature > self.max_temperature:
            raise RuntimeError(f"Temperature {self.case_temperature}°C exceeds limit")

        # Update state
        self._motor_state.position = self._motor_state_async.position
        self._motor_state.velocity = self._motor_state_async.velocity
        self._motor_state.current = self._motor_state_async.current
        self._motor_state.temperature = self._motor_state_async.temperature
        self._motor_state.error = self._motor_state_async.error

        # Communication timeout check
        now = time.time()
        if (
            self._last_command_time is not None
            and (now - self._last_command_time) < 0.25
            and (now - self._last_update_time) > 0.1
        ):
            warnings.warn(f"No data from motor: {self.device_info_string()}", RuntimeWarning, stacklevel=2)

        self._last_update_time = now

    def _update_state_async(self, servo_state: ServoMotorState) -> None:
        """Asynchronously update state"""
        # More detailed error handling
        if servo_state.error != 0:
            error_codes = TMOTOR_ERROR_CODES
            error_msg = error_codes.get(servo_state.error, f"Unknown error code: {servo_state.error}")
            self._error_code = servo_state.error
            self._error_message = error_msg

            # Take different actions based on error type
            if servo_state.error in [1, 2]:  # Voltage errors
                LOGGER.error(f"Voltage error {servo_state.error}: {error_msg}")
            elif servo_state.error in [4, 5, 6]:  # Overcurrent or overtemperature
                LOGGER.critical(f"Critical error {servo_state.error}: {error_msg}")
                # Auto-stop motor
                if self._canman:
                    self._canman.set_current(self.motor_id, 0.0)
            else:
                LOGGER.warning(f"Motor error {servo_state.error}: {error_msg}")
        else:
            self._error_code = None
            self._error_message = None

        self._motor_state_async = servo_state

    @property
    def error_info(self) -> Optional[tuple[int, str]]:
        """Get error information"""
        if self._error_code is not None and self._error_message is not None:
            return (self._error_code, self._error_message)
        return None

    def home(
        self,
        homing_voltage: int = 0,
        homing_frequency: Optional[int] = None,
        homing_direction: int = 0,
        output_position_offset: float = 0.0,
        current_threshold: int = 0,
        velocity_threshold: float = 0.0,
    ) -> None:
        """Home motor"""
        if not self.is_offline and self._canman:
            self._canman.set_origin(self.motor_id, 1)
            time.sleep(0.1)
        self._is_homed = True
        LOGGER.info(f"Homed {self.device_info_string()}")

    # ============ Control Interface ============

    def set_motor_voltage(self, value: float) -> None:
        """Set motor voltage (not directly supported in servo mode)"""
        LOGGER.warning("Voltage control not supported in servo mode")

    def set_motor_current(self, value: float) -> None:
        """Set motor current with clamping to motor limits"""
        if not self.is_offline and self._canman:
            # Convert from desired current convention to driver current
            driver_current = value / self._current_scale

            # Get current limits from motor parameters (these are in driver convention)
            motor_params = cast(dict[str, Any], self._motor_params)
            max_current = motor_params["Curr_max"] / 1000.0  # Convert from protocol units to amps
            min_current = motor_params["Curr_min"] / 1000.0  # Convert from protocol units to amps

            # Clamp current to safe limits
            clamped_driver_current = np.clip(driver_current, min_current, max_current)

            # Log warning if clamping occurred (show in user's convention)
            if driver_current != clamped_driver_current:
                clamped_user_current = clamped_driver_current * self._current_scale
                LOGGER.warning(
                    f"Current command {value:.2f}A clamped to {clamped_user_current:.2f}A "
                    f"(limits: [{min_current * self._current_scale:.1f}, {max_current * self._current_scale:.1f}]A)"
                )

            self._canman.set_current(self.motor_id, clamped_driver_current)
            self._last_command_time = time.time()

    def set_motor_position(self, value: float) -> None:
        """Set motor position (radians) with clamping to motor limits"""
        position_deg = radians_to_degrees(value)

        if not self.is_offline and self._canman:
            # Get position limits from motor parameters
            motor_params = cast(dict[str, Any], self._motor_params)
            max_position = motor_params["P_max"] / 10.0  # Convert from protocol units to degrees
            min_position = motor_params["P_min"] / 10.0  # Convert from protocol units to degrees

            # Clamp position to safe limits
            clamped_position = np.clip(position_deg, min_position, max_position)

            # Log warning if clamping occurred
            if position_deg != clamped_position:
                LOGGER.warning(
                    f"Position command {position_deg:.1f}° clamped to {clamped_position:.1f}° "
                    f"(limits: [{min_position:.0f}, {max_position:.0f}]°)"
                )

            self._canman.set_position(self.motor_id, clamped_position)
            self._last_command_time = time.time()

    def set_motor_torque(self, value: float) -> None:
        """Set motor torque (Nm) - core functionality as requested by user"""
        # Torque to current: T = I * Kt
        # Kt_user = Kt_drv * kt_scale
        kt_user = self._motor_params["Kt_actual"] * self._kt_scale
        current = value / kt_user
        self.set_motor_current(current)

    def set_output_torque(self, value: float) -> None:
        """Set output torque (Nm) - core functionality as requested by user"""
        # Output torque to motor torque: T_motor = T_output * gear_ratio
        motor_torque = value * self.gear_ratio
        self.set_motor_torque(motor_torque)

    def set_motor_velocity(self, value: float) -> None:
        """Set motor velocity (rad/s) with clamping to motor limits"""
        motor_params = cast(dict[str, Any], self._motor_params)
        velocity_erpm = rad_per_sec_to_erpm(value, self.num_pole_pairs)

        if not self.is_offline and self._canman:
            # Get velocity limits from motor parameters
            max_velocity = motor_params["V_max"]  # Already in ERPM
            min_velocity = motor_params["V_min"]  # Already in ERPM

            # Clamp velocity to safe limits
            clamped_velocity = np.clip(velocity_erpm, min_velocity, max_velocity)

            # Log warning if clamping occurred
            if velocity_erpm != clamped_velocity:
                LOGGER.warning(
                    f"Velocity command {velocity_erpm:.0f} ERPM clamped to {clamped_velocity:.0f} ERPM "
                    f"(limits: [{min_velocity}, {max_velocity}] ERPM)"
                )

            self._canman.set_velocity(self.motor_id, clamped_velocity)
            self._last_command_time = time.time()

    def set_motor_impedance(self, position: float, velocity: float, kp: float, kd: float, torque_ff: float) -> None:
        """TMotor servo mode does not support impedance control"""
        LOGGER.error(
            "TMotor servo mode does not support impedance control. "
            "Use position, velocity, or current control modes instead."
        )
        raise NotImplementedError("TMotor servo mode does not support impedance control.")

    def set_output_impedance(self, position: float, velocity: float, kp: float, kd: float, torque_ff: float) -> None:
        """TMotor servo mode does not support impedance control"""
        LOGGER.error(
            "TMotor servo mode does not support impedance control. "
            "Use position, velocity, or current control modes instead."
        )
        raise NotImplementedError("TMotor servo mode does not support impedance control.")

    def set_output_velocity(self, value: float) -> None:
        """Set output velocity (rad/s)"""
        motor_velocity = value * self.gear_ratio
        self.set_motor_velocity(motor_velocity)

    # ============ Unsupported PID Functions - TMotor Servo Mode Handles All Control Loops Internally ============

    def set_current_gains(self, kp: float, ki: float, kd: float, ff: float) -> None:
        """TMotor servo mode does not support external current PID gains - motor handles current control internally"""
        LOGGER.debug(
            "TMotor servo mode handles current control internally. " "External current PID gains are not used."
        )
        # Motor handles current control internally, no action needed

    def set_position_gains(self, kp: float, ki: float, kd: float, ff: float) -> None:
        """TMotor servo mode does not support external position PID gains - motor handles position control internally"""
        LOGGER.debug(
            "TMotor servo mode handles position control internally. " "External position PID gains are not used."
        )
        # Motor handles position control internally, no action needed

    def set_impedance_gains(self, kp: float, ki: float, kd: float, k: float, b: float, ff: float) -> None:
        """TMotor servo mode does not support impedance control"""
        LOGGER.debug(
            "TMotor servo mode does not support impedance control. "
            "Use position, velocity, or current control modes instead."
        )
        # Impedance control not supported in servo mode, no action needed

    def _set_impedance_gains(self, k: float, b: float) -> None:
        """Internal method for impedance gains - not supported in TMotor servo mode"""
        LOGGER.debug("TMotor servo mode handles control internally. " "Impedance gains are not used.")
        # Motor handles control internally, no action needed

    # ============ State Properties ============

    @property
    def motor_position(self) -> float:
        """Motor position (radians)"""
        return degrees_to_radians(self._motor_state.position)

    @property
    def output_position(self) -> float:
        """Output position (radians)"""
        return self.motor_position / self.gear_ratio

    @property
    def motor_velocity(self) -> float:
        """Motor velocity (rad/s)"""
        return erpm_to_rad_per_sec(self._motor_state.velocity, self.num_pole_pairs)

    @property
    def output_velocity(self) -> float:
        """Output velocity (rad/s)"""
        return self.motor_velocity / self.gear_ratio

    @property
    def num_pole_pairs(self) -> int:
        """Number of motor pole pairs"""
        motor_params = cast(dict[str, Any], self._motor_params)
        return motor_params["NUM_POLE_PAIRS"]

    @property
    def motor_voltage(self) -> float:
        """Motor voltage - not available in servo mode"""
        raise NotImplementedError(
            "Motor voltage reading is not available in TMotor servo mode. "
            "The motor does not provide voltage feedback through the CAN protocol."
        )

    @property
    def motor_current(self) -> float:
        """Motor current (A) - converted to selected current convention"""
        return self._motor_state.current * self._current_scale

    @property
    def motor_torque(self) -> float:
        """Motor torque (Nm)"""
        motor_params = cast(dict[str, Any], self._motor_params)
        kt_user = cast(float, motor_params["Kt_actual"]) * self._kt_scale
        return self.motor_current * kt_user

    @property
    def output_torque(self) -> float:
        """Output torque (Nm)"""
        return self.motor_torque / self.gear_ratio

    @property
    def case_temperature(self) -> float:
        """Case temperature (°C)"""
        return self._motor_state.temperature

    @property
    def winding_temperature(self) -> float:
        """Winding temperature (°C)"""
        return cast(float, getattr(self._thermal_model, "T_w", self.case_temperature))

    def __str__(self) -> str:
        """State string"""
        return (
            f"{self.device_info_string()} | "
            f"Pos: {self.output_position:.3f}rad | "
            f"Vel: {self.output_velocity:.3f}rad/s | "
            f"Torque: {self.output_torque:.3f}Nm | "
            f"Current: {self.motor_current:.3f}A | "
            f"Temp: {self.case_temperature:.1f}°C"
        )

case_temperature property

Case temperature (°C)

error_info property

Get error information

motor_current property

Motor current (A) - converted to selected current convention

motor_position property

Motor position (radians)

motor_torque property

Motor torque (Nm)

motor_velocity property

Motor velocity (rad/s)

motor_voltage property

Motor voltage - not available in servo mode

num_pole_pairs property

Number of motor pole pairs

output_position property

Output position (radians)

output_torque property

Output torque (Nm)

output_velocity property

Output velocity (rad/s)

winding_temperature property

Winding temperature (°C)

__init__(tag='TMotorServoActuator', motor_type='AK80-9', motor_id=104, gear_ratio=1.0, frequency=1000, offline=False, max_temperature=80.0, current_mode='driver', **kwargs)

Initialize TMotor servo actuator

Parameters:

Name Type Description Default
tag str

actuator identifier

'TMotorServoActuator'
motor_type str

motor model (AK80-9, AK10-9)

'AK80-9'
motor_id int

CAN ID

104
gear_ratio float

gear ratio

1.0
frequency int

control frequency Hz

1000
offline bool

offline mode

False
max_temperature float

maximum temperature

80.0
current_mode str

current convention ('driver', 'amplitude-invariant', 'power-invariant') - 'driver': use driver's native current (default) - 'amplitude-invariant': true phase current convention - 'power-invariant': power-invariant current convention

'driver'
Source code in opensourceleg/actuators/tmotor.py
def __init__(
    self,
    tag: str = "TMotorServoActuator",
    motor_type: str = "AK80-9",
    motor_id: int = 104,
    gear_ratio: float = 1.0,
    frequency: int = 1000,
    offline: bool = False,
    max_temperature: float = 80.0,
    current_mode: str = "driver",
    **kwargs: Any,
) -> None:
    """
    Initialize TMotor servo actuator

    Args:
        tag: actuator identifier
        motor_type: motor model (AK80-9, AK10-9)
        motor_id: CAN ID
        gear_ratio: gear ratio
        frequency: control frequency Hz
        offline: offline mode
        max_temperature: maximum temperature
        current_mode: current convention ('driver', 'amplitude-invariant', 'power-invariant')
            - 'driver': use driver's native current (default)
            - 'amplitude-invariant': true phase current convention
            - 'power-invariant': power-invariant current convention
    """
    # Validate motor type
    if motor_type not in TMOTOR_MODELS:
        raise ValueError(f"Unsupported motor type: {motor_type}")

    # Validate current mode
    if current_mode not in ["driver", "amplitude-invariant", "power-invariant"]:
        raise ValueError(
            f"Invalid current_mode: {current_mode}. Must be 'driver', 'amplitude-invariant', or 'power-invariant'"
        )

    super().__init__(
        tag=tag,
        gear_ratio=gear_ratio,
        motor_constants=TMOTOR_SERVO_CONSTANTS,
        frequency=frequency,
        offline=offline,
        **kwargs,
    )

    # Motor configuration
    self.motor_type = motor_type
    self.motor_id = motor_id
    self.max_temperature = max_temperature
    self._motor_params = TMOTOR_MODELS[motor_type]
    self.current_mode = current_mode

    # Current conversion factors
    # Physical: I_drv (line current) = K * I_user
    # Code: driver_current = user_current / self._current_scale
    # So: I_drv = I_user / scale, which means scale = I_user / I_drv = 1 / K
    # Kt relationship: τ = Kt_drv * I_drv = Kt_user * I_user
    # So: Kt_user = Kt_drv * (I_drv / I_user) = Kt_drv * K
    if current_mode == "amplitude-invariant":
        # I_drv (line) = √3 * I_phase, so scale = I_phase / I_drv = 1/√3
        # Kt_amp = Kt_drv * √3
        self._current_scale = 1.0 / np.sqrt(3.0)
        self._kt_scale = np.sqrt(3.0)
    elif current_mode == "power-invariant":
        # I_drv (line) = √2 * I_pwr, so scale = I_pwr / I_drv = 1/√2
        # Kt_pwr = Kt_drv * √2
        self._current_scale = 1.0 / np.sqrt(2.0)
        self._kt_scale = np.sqrt(2.0)
    else:  # driver
        self._current_scale = 1.0
        self._kt_scale = 1.0

    # CAN communication
    self._canman: Optional[CANManagerServo] = None
    if not self.is_offline:
        self._canman = CANManagerServo()
        self._canman.add_motor_listener(self)

    # State management
    self._motor_state = ServoMotorState()
    self._motor_state_async = ServoMotorState()
    self._servo_mode = ServoControlMode.IDLE

    # Error handling
    self._error_code: Optional[int] = None
    self._error_message: Optional[str] = None

    # Time management
    self._start_time = time.time()
    self._last_update_time = self._start_time
    self._last_command_time: Optional[float] = None

    # Thermal management
    self._thermal_model = ThermalModel(
        motor_constants=self._MOTOR_CONSTANTS,
        actuator_tag=self.tag,
    )

    LOGGER.info(f"Initialized TMotor servo: {self.motor_type} ID:{self.motor_id} (current_mode: {current_mode})")

__str__()

State string

Source code in opensourceleg/actuators/tmotor.py
def __str__(self) -> str:
    """State string"""
    return (
        f"{self.device_info_string()} | "
        f"Pos: {self.output_position:.3f}rad | "
        f"Vel: {self.output_velocity:.3f}rad/s | "
        f"Torque: {self.output_torque:.3f}Nm | "
        f"Current: {self.motor_current:.3f}A | "
        f"Temp: {self.case_temperature:.1f}°C"
    )

home(homing_voltage=0, homing_frequency=None, homing_direction=0, output_position_offset=0.0, current_threshold=0, velocity_threshold=0.0)

Home motor

Source code in opensourceleg/actuators/tmotor.py
def home(
    self,
    homing_voltage: int = 0,
    homing_frequency: Optional[int] = None,
    homing_direction: int = 0,
    output_position_offset: float = 0.0,
    current_threshold: int = 0,
    velocity_threshold: float = 0.0,
) -> None:
    """Home motor"""
    if not self.is_offline and self._canman:
        self._canman.set_origin(self.motor_id, 1)
        time.sleep(0.1)
    self._is_homed = True
    LOGGER.info(f"Homed {self.device_info_string()}")

set_current_gains(kp, ki, kd, ff)

TMotor servo mode does not support external current PID gains - motor handles current control internally

Source code in opensourceleg/actuators/tmotor.py
def set_current_gains(self, kp: float, ki: float, kd: float, ff: float) -> None:
    """TMotor servo mode does not support external current PID gains - motor handles current control internally"""
    LOGGER.debug(
        "TMotor servo mode handles current control internally. " "External current PID gains are not used."
    )

set_impedance_gains(kp, ki, kd, k, b, ff)

TMotor servo mode does not support impedance control

Source code in opensourceleg/actuators/tmotor.py
def set_impedance_gains(self, kp: float, ki: float, kd: float, k: float, b: float, ff: float) -> None:
    """TMotor servo mode does not support impedance control"""
    LOGGER.debug(
        "TMotor servo mode does not support impedance control. "
        "Use position, velocity, or current control modes instead."
    )

set_motor_current(value)

Set motor current with clamping to motor limits

Source code in opensourceleg/actuators/tmotor.py
def set_motor_current(self, value: float) -> None:
    """Set motor current with clamping to motor limits"""
    if not self.is_offline and self._canman:
        # Convert from desired current convention to driver current
        driver_current = value / self._current_scale

        # Get current limits from motor parameters (these are in driver convention)
        motor_params = cast(dict[str, Any], self._motor_params)
        max_current = motor_params["Curr_max"] / 1000.0  # Convert from protocol units to amps
        min_current = motor_params["Curr_min"] / 1000.0  # Convert from protocol units to amps

        # Clamp current to safe limits
        clamped_driver_current = np.clip(driver_current, min_current, max_current)

        # Log warning if clamping occurred (show in user's convention)
        if driver_current != clamped_driver_current:
            clamped_user_current = clamped_driver_current * self._current_scale
            LOGGER.warning(
                f"Current command {value:.2f}A clamped to {clamped_user_current:.2f}A "
                f"(limits: [{min_current * self._current_scale:.1f}, {max_current * self._current_scale:.1f}]A)"
            )

        self._canman.set_current(self.motor_id, clamped_driver_current)
        self._last_command_time = time.time()

set_motor_impedance(position, velocity, kp, kd, torque_ff)

TMotor servo mode does not support impedance control

Source code in opensourceleg/actuators/tmotor.py
def set_motor_impedance(self, position: float, velocity: float, kp: float, kd: float, torque_ff: float) -> None:
    """TMotor servo mode does not support impedance control"""
    LOGGER.error(
        "TMotor servo mode does not support impedance control. "
        "Use position, velocity, or current control modes instead."
    )
    raise NotImplementedError("TMotor servo mode does not support impedance control.")

set_motor_position(value)

Set motor position (radians) with clamping to motor limits

Source code in opensourceleg/actuators/tmotor.py
def set_motor_position(self, value: float) -> None:
    """Set motor position (radians) with clamping to motor limits"""
    position_deg = radians_to_degrees(value)

    if not self.is_offline and self._canman:
        # Get position limits from motor parameters
        motor_params = cast(dict[str, Any], self._motor_params)
        max_position = motor_params["P_max"] / 10.0  # Convert from protocol units to degrees
        min_position = motor_params["P_min"] / 10.0  # Convert from protocol units to degrees

        # Clamp position to safe limits
        clamped_position = np.clip(position_deg, min_position, max_position)

        # Log warning if clamping occurred
        if position_deg != clamped_position:
            LOGGER.warning(
                f"Position command {position_deg:.1f}° clamped to {clamped_position:.1f}° "
                f"(limits: [{min_position:.0f}, {max_position:.0f}]°)"
            )

        self._canman.set_position(self.motor_id, clamped_position)
        self._last_command_time = time.time()

set_motor_torque(value)

Set motor torque (Nm) - core functionality as requested by user

Source code in opensourceleg/actuators/tmotor.py
def set_motor_torque(self, value: float) -> None:
    """Set motor torque (Nm) - core functionality as requested by user"""
    # Torque to current: T = I * Kt
    # Kt_user = Kt_drv * kt_scale
    kt_user = self._motor_params["Kt_actual"] * self._kt_scale
    current = value / kt_user
    self.set_motor_current(current)

set_motor_velocity(value)

Set motor velocity (rad/s) with clamping to motor limits

Source code in opensourceleg/actuators/tmotor.py
def set_motor_velocity(self, value: float) -> None:
    """Set motor velocity (rad/s) with clamping to motor limits"""
    motor_params = cast(dict[str, Any], self._motor_params)
    velocity_erpm = rad_per_sec_to_erpm(value, self.num_pole_pairs)

    if not self.is_offline and self._canman:
        # Get velocity limits from motor parameters
        max_velocity = motor_params["V_max"]  # Already in ERPM
        min_velocity = motor_params["V_min"]  # Already in ERPM

        # Clamp velocity to safe limits
        clamped_velocity = np.clip(velocity_erpm, min_velocity, max_velocity)

        # Log warning if clamping occurred
        if velocity_erpm != clamped_velocity:
            LOGGER.warning(
                f"Velocity command {velocity_erpm:.0f} ERPM clamped to {clamped_velocity:.0f} ERPM "
                f"(limits: [{min_velocity}, {max_velocity}] ERPM)"
            )

        self._canman.set_velocity(self.motor_id, clamped_velocity)
        self._last_command_time = time.time()

set_motor_voltage(value)

Set motor voltage (not directly supported in servo mode)

Source code in opensourceleg/actuators/tmotor.py
def set_motor_voltage(self, value: float) -> None:
    """Set motor voltage (not directly supported in servo mode)"""
    LOGGER.warning("Voltage control not supported in servo mode")

set_output_impedance(position, velocity, kp, kd, torque_ff)

TMotor servo mode does not support impedance control

Source code in opensourceleg/actuators/tmotor.py
def set_output_impedance(self, position: float, velocity: float, kp: float, kd: float, torque_ff: float) -> None:
    """TMotor servo mode does not support impedance control"""
    LOGGER.error(
        "TMotor servo mode does not support impedance control. "
        "Use position, velocity, or current control modes instead."
    )
    raise NotImplementedError("TMotor servo mode does not support impedance control.")

set_output_torque(value)

Set output torque (Nm) - core functionality as requested by user

Source code in opensourceleg/actuators/tmotor.py
def set_output_torque(self, value: float) -> None:
    """Set output torque (Nm) - core functionality as requested by user"""
    # Output torque to motor torque: T_motor = T_output * gear_ratio
    motor_torque = value * self.gear_ratio
    self.set_motor_torque(motor_torque)

set_output_velocity(value)

Set output velocity (rad/s)

Source code in opensourceleg/actuators/tmotor.py
def set_output_velocity(self, value: float) -> None:
    """Set output velocity (rad/s)"""
    motor_velocity = value * self.gear_ratio
    self.set_motor_velocity(motor_velocity)

set_position_gains(kp, ki, kd, ff)

TMotor servo mode does not support external position PID gains - motor handles position control internally

Source code in opensourceleg/actuators/tmotor.py
def set_position_gains(self, kp: float, ki: float, kd: float, ff: float) -> None:
    """TMotor servo mode does not support external position PID gains - motor handles position control internally"""
    LOGGER.debug(
        "TMotor servo mode handles position control internally. " "External position PID gains are not used."
    )

start()

Start motor

Source code in opensourceleg/actuators/tmotor.py
@check_actuator_connection
def start(self) -> None:
    """Start motor"""
    LOGGER.info(f"Starting {self.device_info_string()}")

    if not self.is_offline and self._canman:
        self._canman.power_on(self.motor_id)

        # Poll for motor readiness (max 1 second timeout)
        start_time = time.time()
        timeout = 1.0
        poll_interval = 0.01  # 10ms polling interval
        motor_ready = False

        while time.time() - start_time < timeout:
            # The motor should start sending status messages after power on
            # Check if we've received valid data by checking if position/velocity/current are non-zero
            # or if the motor_state has been updated (timestamp check could be added)
            if (
                self._motor_state_async.position != 0.0
                or self._motor_state_async.velocity != 0.0
                or self._motor_state_async.current != 0.0
                or self._motor_state_async.temperature > 0.0
            ):
                motor_ready = True
                LOGGER.debug(f"Motor ready after {time.time() - start_time:.3f} seconds")
                break
            time.sleep(poll_interval)

        if not motor_ready:
            # Fall back to minimum wait time if no status received
            LOGGER.warning("No status received from motor, using fallback delay")
            time.sleep(0.1)  # Minimum wait time

        # Set initial control mode to IDLE
        self._canman.set_control_mode(self.motor_id, ServoControlMode.IDLE.value)

        # Poll for mode switch confirmation (max 200ms timeout)
        mode_start_time = time.time()
        mode_timeout = 0.2

        while time.time() - mode_start_time < mode_timeout:
            self.update()  # Read status
            if self._motor_state.error == 0:
                LOGGER.debug(f"Mode switch confirmed after {time.time() - mode_start_time:.3f} seconds")
                break
            time.sleep(poll_interval)

        # Final status check
        if self._motor_state.error != 0:
            raise RuntimeError(f"Motor startup failed with error: {self._motor_state.error}")

    self._is_open = True
    self._is_streaming = True

stop()

Stop motor

Source code in opensourceleg/actuators/tmotor.py
@check_actuator_stream
@check_actuator_open
def stop(self) -> None:
    """Stop motor"""
    LOGGER.info(f"Stopping {self.device_info_string()}")

    if not self.is_offline and self._canman:
        self._canman.power_off(self.motor_id)

    self._is_open = False
    self._is_streaming = False

update()

Update motor state

Source code in opensourceleg/actuators/tmotor.py
def update(self) -> None:
    """Update motor state"""
    # Temperature check
    if self.case_temperature > self.max_temperature:
        raise RuntimeError(f"Temperature {self.case_temperature}°C exceeds limit")

    # Update state
    self._motor_state.position = self._motor_state_async.position
    self._motor_state.velocity = self._motor_state_async.velocity
    self._motor_state.current = self._motor_state_async.current
    self._motor_state.temperature = self._motor_state_async.temperature
    self._motor_state.error = self._motor_state_async.error

    # Communication timeout check
    now = time.time()
    if (
        self._last_command_time is not None
        and (now - self._last_command_time) < 0.25
        and (now - self._last_update_time) > 0.1
    ):
        warnings.warn(f"No data from motor: {self.device_info_string()}", RuntimeWarning, stacklevel=2)

    self._last_update_time = now

degrees_to_radians(degrees)

Convert degrees to radians

Source code in opensourceleg/actuators/tmotor.py
def degrees_to_radians(degrees: float) -> float:
    """Convert degrees to radians"""
    return degrees * np.pi / 180.0

erpm_to_rad_per_sec(erpm, num_pole_pairs)

Convert ERPM to rad/s

Source code in opensourceleg/actuators/tmotor.py
def erpm_to_rad_per_sec(erpm: float, num_pole_pairs: int) -> float:
    """Convert ERPM to rad/s"""
    return erpm * 2 * np.pi / (60 * num_pole_pairs)

rad_per_sec_to_erpm(rad_per_sec, num_pole_pairs)

Convert rad/s to ERPM

Source code in opensourceleg/actuators/tmotor.py
def rad_per_sec_to_erpm(rad_per_sec: float, num_pole_pairs: int) -> float:
    """Convert rad/s to ERPM"""
    return rad_per_sec * 60 * num_pole_pairs / (2 * np.pi)

radians_to_degrees(radians)

Convert radians to degrees

Source code in opensourceleg/actuators/tmotor.py
def radians_to_degrees(radians: float) -> float:
    """Convert radians to degrees"""
    return radians * 180.0 / np.pi