Skip to content

Moteus

MoteusActuator

Bases: ActuatorBase, Controller

Source code in opensourceleg/actuators/moteus.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
class MoteusActuator(ActuatorBase, Controller):
    def __init__(
        self,
        tag: str = "Moteus",
        servo_id: int = 0,
        bus_id: int = 0,
        gear_ratio: float = 1.0,
        frequency: int = 500,
        offline: bool = False,
        query: Optional[MoteusQueryResolution] = None,
    ) -> None:
        if query is None:
            query = MoteusQueryResolution()

        self._servo_id = servo_id
        self._bus_id = bus_id
        super().__init__(
            tag=tag,
            gear_ratio=gear_ratio,
            motor_constants=MOTEUS_ACTUATOR_CONSTANTS,
            frequency=frequency,
            offline=offline,
        )

        self._interface = MoteusInterface()
        self._interface._add2map(servo_id=servo_id, bus_id=bus_id)

        self._is_streaming: bool = False
        self._is_open: bool = False

        self._command: Command = None
        self._data = None
        self._query = query

        self._thermal_model: ThermalModel = ThermalModel(
            temp_limit_windings=self.max_winding_temperature,
            soft_border_C_windings=10,
            temp_limit_case=self.max_case_temperature,
            soft_border_C_case=10,
        )
        self._thermal_scale: float = 1.0

        self._mode = CONTROL_MODES.IDLE

    def __repr__(self) -> str:
        return f"Moteus[{self._tag}]"

    @property
    def _CONTROL_MODE_CONFIGS(self) -> CONTROL_MODE_CONFIGS:
        return MOTEUS_CONTROL_MODE_CONFIGS

    @check_actuator_connection
    async def start(self) -> None:
        try:
            self._interface.start()
            Controller.__init__(
                self,
                id=self._servo_id,
                transport=self._interface.transport,
                query_resolution=self._query,
            )
            self._stream = Stream(controller=self)

            self._is_open = True
            self._is_streaming = True

        except OSError:
            print("\n")
            LOGGER.error(
                msg=f"[{self.__repr__()}] Need admin previleges to open the port. \n\n \
                    Please run the script with 'sudo' command or add the user to the dialout group.\n"
            )
            os._exit(status=1)

        default_mode_config = self._get_control_mode_config(self._mode)
        if default_mode_config:
            default_mode_config.entry_callback(self)

        if (await self._interface.transport.cycle([self.make_stop(query=True)])) == []:
            LOGGER.error(msg=f"[{self.__repr__()}] Could not start the actuator. Please check the connection.")
            self._is_streaming = False
            self._is_open = False
        # Keep the default command as query -- reading sensor data
        self._command = self.make_query()

    @check_actuator_stream
    @check_actuator_open
    async def stop(self) -> None:
        self.set_control_mode(mode=CONTROL_MODES.IDLE)

        await self._interface.transport.cycle([self.make_stop(query=True)])
        self._command = self.make_query()

    async def update(self):
        self._data = await self._interface.transport.cycle([self._command])

        self._thermal_model.T_c = self.case_temperature
        self._thermal_scale = self._thermal_model.update_and_get_scale(
            dt=1 / self.frequency,
            motor_current=self.motor_current,
        )
        if self.case_temperature >= self.max_case_temperature:
            LOGGER.error(
                msg=f"[{str.upper(self.tag)}] Case thermal limit {self.max_case_temperature} reached. Stopping motor."
            )
            raise ThermalLimitException()

        if self.winding_temperature >= self.max_winding_temperature:
            LOGGER.error(
                msg=f"[{str.upper(self.tag)}] Winding thermal limit {self.max_winding_temperature} reached. \
                Stopping motor."
            )
            raise ThermalLimitException()

        self._command = self.make_query()

    def home(
        self,
        homing_voltage: int = 2000,
        homing_frequency: Optional[int] = None,
        homing_direction: int = -1,
        output_position_offset: float = 0.0,
        current_threshold: int = 5000,
        velocity_threshold: float = 0.001,
    ) -> None:
        # TODO: implement homing
        LOGGER.info(msg=f"[{self.__repr__()}] Homing not implemented.")

    def set_motor_torque(self, value: float) -> None:
        """
        Sets the motor torque in Nm.

        Args:
            value (float): The torque to set in Nm.
        """
        self._command = self.make_position(
            position=math.nan,
            velocity=math.nan,
            feedforward_torque=value,
            kp_scale=0,
            kd_scale=0,
            ilimit_scale=0,
            watchdog_timeout=math.nan,
            query=True,
        )

    def set_output_torque(self, value: float) -> None:
        """
        Set the output torque of the actuator.
        This is the torque that is applied to the joint, not the motor.

        Args:
            value (float): torque in N_m
        """
        self.set_motor_torque(value=value / self.gear_ratio)

    def set_motor_current(
        self,
        value: float,
    ) -> None:
        LOGGER.info("Current Mode Not Implemented")

    def set_motor_velocity(self, value: float) -> None:
        self._command = self.make_position(
            position=math.nan,
            velocity=value / (np.pi * 2),  # TODO: Verify this conversion, are we converting from rad/s to rev/s?
            query=True,
            watchdog_timeout=math.nan,
        )

    def set_motor_voltage(self, value: float) -> None:
        """
        Sets the motor voltage in mV.

        Args:
            value (float): The voltage to set in mV.
        """
        LOGGER.info("Voltage Mode Not Implemented")

    def set_motor_position(self, value: float) -> None:
        """
        Sets the motor position in radians.
        If in impedance mode, this sets the equilibrium angle in radians.

        Args:
            value (float): The position to set
        """
        self._command = self.make_position(
            position=float((value) / (2 * np.pi)),  # TODO: Verify this conversion, are we converting from rad to rev?
            query=True,
            watchdog_timeout=math.nan,
        )

    async def set_torque_gains(
        self,
        kp: float = DEFAULT_TORQUE_GAINS.kp,
        ki: float = DEFAULT_TORQUE_GAINS.ki,
    ) -> None:
        """
        Sets the position gains in arbitrary Moteus units.

        Args:
            kp (float): The proportional gain
            ki (float): The integral gain
        """
        await self._stream.command(f"conf set servo.pid_dq.kp {kp}".encode())
        await self._stream.command(f"conf set servo.pid_dq.ki {ki}".encode())

    async def set_position_gains(
        self,
        kp: float = DEFAULT_POSITION_GAINS.kp,
        ki: float = DEFAULT_POSITION_GAINS.ki,
        kd: float = DEFAULT_POSITION_GAINS.kd,
        ff: float = DEFAULT_POSITION_GAINS.ff,
    ) -> None:
        """
        Sets the position gains in arbitrary Moteus units.

        Args:
            kp (float): The proportional gain
            ki (float): The integral gain
            kd (float): The derivative gain
            ff (float): The feedforward gain
        """
        await self._stream.command(f"conf set servo.pid_position.kp {kp}".encode())
        await self._stream.command(f"conf set servo.pid_position.ki {ki}".encode())
        await self._stream.command(f"conf set servo.pid_position.kd {kd}".encode())

    async def set_velocity_gains(
        self,
        kp: float = DEFAULT_VELOCITY_GAINS.kp,
        ki: float = DEFAULT_VELOCITY_GAINS.ki,
        kd: float = DEFAULT_VELOCITY_GAINS.kd,
        ff: float = DEFAULT_VELOCITY_GAINS.ff,
    ) -> None:
        """
        Sets the position gains in arbitrary Moteus units.

        Args:
            kp (float): The proportional gain
            ki (float): The integral gain
            kd (float): The derivative gain
            ff (float): The feedforward gain
        """
        await self._stream.command(f"conf set servo.pid_position.kp {kp}".encode())
        await self._stream.command(f"conf set servo.pid_position.ki {ki}".encode())
        await self._stream.command(f"conf set servo.pid_position.kd {kd}".encode())

    def set_current_gains(
        self,
        kp: float = DEFAULT_CURRENT_GAINS.kp,
        ki: float = DEFAULT_CURRENT_GAINS.ki,
        kd: float = DEFAULT_CURRENT_GAINS.kd,
        ff: float = DEFAULT_CURRENT_GAINS.ff,
    ) -> None:
        """
        Sets the current gains in arbitrary Moteus units.

        Args:
            kp (float): The proportional gain
            ki (float): The integral gain
            kd (float): The derivative gain
            ff (float): The feedforward gain
        """
        LOGGER.info(msg=f"[{self.__repr__()}] Current mode not implemented.")

    def set_impedance_gains(
        self,
        kp: float = DEFAULT_IMPEDANCE_GAINS.kp,
        ki: float = DEFAULT_IMPEDANCE_GAINS.ki,
        kd: float = DEFAULT_IMPEDANCE_GAINS.kd,
        k: float = DEFAULT_IMPEDANCE_GAINS.k,
        b: float = DEFAULT_IMPEDANCE_GAINS.b,
        ff: float = DEFAULT_IMPEDANCE_GAINS.ff,
    ) -> None:
        LOGGER.info(msg=f"[{self.__repr__()}] Impedance mode not implemented.")

    @property
    def motor_voltage(self) -> float:
        """Q-axis motor voltage in mV."""
        if self._data is not None:
            return float(self._data[0].values[MoteusRegister.VOLTAGE])
        else:
            LOGGER.warning(
                msg="Actuator data is none, please ensure that the actuator is connected and streaming. Returning 0.0."
            )
            return 0.0

    @property
    def motor_current(self) -> float:
        if self._data is not None:
            return float(self._data[0].values[MoteusRegister.Q_CURRENT])
        else:
            LOGGER.warning(
                msg="Actuator data is none, please ensure that the actuator is connected and streaming. Returning 0.0."
            )
            return 0.0

    @property
    def motor_torque(self) -> float:
        if self._data is not None:
            return float(self.motor_current * self.MOTOR_CONSTANTS.NM_PER_MILLIAMP) / self.gear_ratio
        else:
            LOGGER.warning(
                msg="Actuator data is none, please ensure that the actuator is connected and streaming. Returning 0.0."
            )
            return 0.0

    @property
    def motor_position(self) -> float:
        if self._data is not None:
            return float(self._data[0].values[MoteusRegister.POSITION] * 2 * np.pi) - self.motor_zero_position
        else:
            LOGGER.warning(
                msg="Actuator data is none, please ensure that the actuator is connected and streaming. Returning 0.0."
            )
            return 0.0

    @property
    def motor_velocity(self) -> float:
        if self._data is not None:
            return float(self._data[0].values[MoteusRegister.VELOCITY] * 2 * np.pi)
        else:
            LOGGER.warning(
                msg="Actuator data is none, please ensure that the actuator is connected and streaming. Returning 0.0."
            )
            return 0.0

    @property
    def battery_voltage(self) -> float:
        """Battery voltage in mV."""
        if self._data is not None:
            return float(self._data[0].values[MoteusRegister.VOLTAGE])
        else:
            LOGGER.warning(
                msg="Actuator data is none, please ensure that the actuator is connected and streaming. Returning 0.0."
            )
            return 0.0

    @property
    def battery_current(self) -> float:
        """Battery current in mA."""
        if self._data is not None:
            return float(self._data[0].values[MoteusRegister.Q_CURRENT])
        else:
            LOGGER.warning(
                msg="Actuator data is none, please ensure that the actuator is connected and streaming. Returning 0.0."
            )
            return 0.0

    @property
    def joint_torque(self) -> float:
        """
        Torque at the joint output in Nm.
        This is calculated using motor current, k_t, and the gear ratio.
        """
        return self.motor_torque * self.gear_ratio

    @property
    def case_temperature(self) -> float:
        if self._data is not None:
            return float(self._data[0].values[MoteusRegister.TEMPERATURE])
        else:
            LOGGER.warning(
                msg="Actuator data is none, please ensure that the actuator is connected and streaming. Returning 0.0."
            )
            return 0.0

    @property
    def winding_temperature(self) -> float:
        """
        ESTIMATED temperature of the windings in celsius.
        This is calculated based on the thermal model using motor current.
        """
        if self._data is not None:
            return float(self._thermal_model.T_w)
        else:
            return 0.0

    @property
    def thermal_scaling_factor(self) -> float:
        """
        Scale factor to use in torque control, in [0,1].
        If you scale the torque command by this factor, the motor temperature will
        never exceed max allowable temperature.
        For a proof, see paper referenced in thermal model.
        """
        return self._thermal_scale

battery_current: float property

Battery current in mA.

battery_voltage: float property

Battery voltage in mV.

joint_torque: float property

Torque at the joint output in Nm. This is calculated using motor current, k_t, and the gear ratio.

motor_voltage: float property

Q-axis motor voltage in mV.

thermal_scaling_factor: float property

Scale factor to use in torque control, in [0,1]. If you scale the torque command by this factor, the motor temperature will never exceed max allowable temperature. For a proof, see paper referenced in thermal model.

winding_temperature: float property

ESTIMATED temperature of the windings in celsius. This is calculated based on the thermal model using motor current.

set_current_gains(kp=DEFAULT_CURRENT_GAINS.kp, ki=DEFAULT_CURRENT_GAINS.ki, kd=DEFAULT_CURRENT_GAINS.kd, ff=DEFAULT_CURRENT_GAINS.ff)

Sets the current gains in arbitrary Moteus units.

Parameters:

Name Type Description Default
kp float

The proportional gain

kp
ki float

The integral gain

ki
kd float

The derivative gain

kd
ff float

The feedforward gain

ff
Source code in opensourceleg/actuators/moteus.py
def set_current_gains(
    self,
    kp: float = DEFAULT_CURRENT_GAINS.kp,
    ki: float = DEFAULT_CURRENT_GAINS.ki,
    kd: float = DEFAULT_CURRENT_GAINS.kd,
    ff: float = DEFAULT_CURRENT_GAINS.ff,
) -> None:
    """
    Sets the current gains in arbitrary Moteus units.

    Args:
        kp (float): The proportional gain
        ki (float): The integral gain
        kd (float): The derivative gain
        ff (float): The feedforward gain
    """
    LOGGER.info(msg=f"[{self.__repr__()}] Current mode not implemented.")

set_motor_position(value)

Sets the motor position in radians. If in impedance mode, this sets the equilibrium angle in radians.

Parameters:

Name Type Description Default
value float

The position to set

required
Source code in opensourceleg/actuators/moteus.py
def set_motor_position(self, value: float) -> None:
    """
    Sets the motor position in radians.
    If in impedance mode, this sets the equilibrium angle in radians.

    Args:
        value (float): The position to set
    """
    self._command = self.make_position(
        position=float((value) / (2 * np.pi)),  # TODO: Verify this conversion, are we converting from rad to rev?
        query=True,
        watchdog_timeout=math.nan,
    )

set_motor_torque(value)

Sets the motor torque in Nm.

Parameters:

Name Type Description Default
value float

The torque to set in Nm.

required
Source code in opensourceleg/actuators/moteus.py
def set_motor_torque(self, value: float) -> None:
    """
    Sets the motor torque in Nm.

    Args:
        value (float): The torque to set in Nm.
    """
    self._command = self.make_position(
        position=math.nan,
        velocity=math.nan,
        feedforward_torque=value,
        kp_scale=0,
        kd_scale=0,
        ilimit_scale=0,
        watchdog_timeout=math.nan,
        query=True,
    )

set_motor_voltage(value)

Sets the motor voltage in mV.

Parameters:

Name Type Description Default
value float

The voltage to set in mV.

required
Source code in opensourceleg/actuators/moteus.py
def set_motor_voltage(self, value: float) -> None:
    """
    Sets the motor voltage in mV.

    Args:
        value (float): The voltage to set in mV.
    """
    LOGGER.info("Voltage Mode Not Implemented")

set_output_torque(value)

Set the output torque of the actuator. This is the torque that is applied to the joint, not the motor.

Parameters:

Name Type Description Default
value float

torque in N_m

required
Source code in opensourceleg/actuators/moteus.py
def set_output_torque(self, value: float) -> None:
    """
    Set the output torque of the actuator.
    This is the torque that is applied to the joint, not the motor.

    Args:
        value (float): torque in N_m
    """
    self.set_motor_torque(value=value / self.gear_ratio)

set_position_gains(kp=DEFAULT_POSITION_GAINS.kp, ki=DEFAULT_POSITION_GAINS.ki, kd=DEFAULT_POSITION_GAINS.kd, ff=DEFAULT_POSITION_GAINS.ff) async

Sets the position gains in arbitrary Moteus units.

Parameters:

Name Type Description Default
kp float

The proportional gain

kp
ki float

The integral gain

ki
kd float

The derivative gain

kd
ff float

The feedforward gain

ff
Source code in opensourceleg/actuators/moteus.py
async def set_position_gains(
    self,
    kp: float = DEFAULT_POSITION_GAINS.kp,
    ki: float = DEFAULT_POSITION_GAINS.ki,
    kd: float = DEFAULT_POSITION_GAINS.kd,
    ff: float = DEFAULT_POSITION_GAINS.ff,
) -> None:
    """
    Sets the position gains in arbitrary Moteus units.

    Args:
        kp (float): The proportional gain
        ki (float): The integral gain
        kd (float): The derivative gain
        ff (float): The feedforward gain
    """
    await self._stream.command(f"conf set servo.pid_position.kp {kp}".encode())
    await self._stream.command(f"conf set servo.pid_position.ki {ki}".encode())
    await self._stream.command(f"conf set servo.pid_position.kd {kd}".encode())

set_torque_gains(kp=DEFAULT_TORQUE_GAINS.kp, ki=DEFAULT_TORQUE_GAINS.ki) async

Sets the position gains in arbitrary Moteus units.

Parameters:

Name Type Description Default
kp float

The proportional gain

kp
ki float

The integral gain

ki
Source code in opensourceleg/actuators/moteus.py
async def set_torque_gains(
    self,
    kp: float = DEFAULT_TORQUE_GAINS.kp,
    ki: float = DEFAULT_TORQUE_GAINS.ki,
) -> None:
    """
    Sets the position gains in arbitrary Moteus units.

    Args:
        kp (float): The proportional gain
        ki (float): The integral gain
    """
    await self._stream.command(f"conf set servo.pid_dq.kp {kp}".encode())
    await self._stream.command(f"conf set servo.pid_dq.ki {ki}".encode())

set_velocity_gains(kp=DEFAULT_VELOCITY_GAINS.kp, ki=DEFAULT_VELOCITY_GAINS.ki, kd=DEFAULT_VELOCITY_GAINS.kd, ff=DEFAULT_VELOCITY_GAINS.ff) async

Sets the position gains in arbitrary Moteus units.

Parameters:

Name Type Description Default
kp float

The proportional gain

kp
ki float

The integral gain

ki
kd float

The derivative gain

kd
ff float

The feedforward gain

ff
Source code in opensourceleg/actuators/moteus.py
async def set_velocity_gains(
    self,
    kp: float = DEFAULT_VELOCITY_GAINS.kp,
    ki: float = DEFAULT_VELOCITY_GAINS.ki,
    kd: float = DEFAULT_VELOCITY_GAINS.kd,
    ff: float = DEFAULT_VELOCITY_GAINS.ff,
) -> None:
    """
    Sets the position gains in arbitrary Moteus units.

    Args:
        kp (float): The proportional gain
        ki (float): The integral gain
        kd (float): The derivative gain
        ff (float): The feedforward gain
    """
    await self._stream.command(f"conf set servo.pid_position.kp {kp}".encode())
    await self._stream.command(f"conf set servo.pid_position.ki {ki}".encode())
    await self._stream.command(f"conf set servo.pid_position.kd {kd}".encode())

MoteusInterface

Singleton Class as Communication Portal between Moteus Controller and Moteus PiHat

Source code in opensourceleg/actuators/moteus.py
class MoteusInterface:
    """
    Singleton Class as Communication Portal between Moteus Controller and Moteus PiHat
    """

    _instance = None

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls.bus_map: dict[int, list[int]] = {}
            cls.bus_map: dict[int, list[int]] = {}
            cls._commands: list[Command] = []
            cls.transport = None
        return cls._instance

    def __init__(self):
        pass

    def __repr__(self):
        return "MoteusInterface"

    def _add2map(self, servo_id, bus_id) -> None:
        if bus_id in self.bus_map:
            self.bus_map[bus_id].append(servo_id)
        else:
            self.bus_map[bus_id] = [servo_id]

    def start(self):
        """
        Initialization of Pi3HatRouter
        """
        if self.transport is None:
            self.transport = pihat.Pi3HatRouter(servo_bus_map=self.bus_map)

    async def update(self):
        # TODO: multiple servo update simultaneously should go here
        self._commands = []

    async def stop(self):
        # TODO: multiple servo stop simultaneously should go here
        self._commands = []

start()

Initialization of Pi3HatRouter

Source code in opensourceleg/actuators/moteus.py
def start(self):
    """
    Initialization of Pi3HatRouter
    """
    if self.transport is None:
        self.transport = pihat.Pi3HatRouter(servo_bus_map=self.bus_map)