33# SPDX-License-Identifier: Apache-2.0
44
55
6- import time
7-
86import pytest
9- import rclpy
10- from geometry_msgs .msg import PoseStamped
11- from rcdt_utilities .launch_utils import assert_for_message , assert_for_node
12- from rcdt_utilities .test_utils import wait_for_register
7+ from rcdt_utilities .geometry import Pose
8+ from rcdt_utilities .launch_utils import assert_for_message
9+ from rcdt_utilities .test_utils import (
10+ assert_joy_topic_switch ,
11+ assert_movements_with_joy ,
12+ wait_for_register ,
13+ wait_until_reached_joint ,
14+ )
1315from rclpy .node import Node
14- from rclpy .qos import QoSDurabilityPolicy , QoSHistoryPolicy , QoSProfile
1516from sensor_msgs .msg import JointState , Joy
16- from std_msgs .msg import String
17- from utils import EndToEndUtils , call_express_pose_in_other_frame
1817
1918
20- class FrankaFullTests ( EndToEndUtils ) :
21- def test_wait_for_register (self , pytestconfig : pytest . Config ) -> None :
22- wait_for_register (pytestconfig )
19+ class FrankaFullTests :
20+ def test_wait_for_register (self , timeout : int ) -> None :
21+ wait_for_register (timeout = timeout )
2322
24- def test_joint_states_published (self ) -> None :
23+ def test_joint_states_published (self , timeout : int ) -> None :
2524 """Test that joint states are published. This is a basic test to check that the
2625 launch file is working and that the robot is publishing joint states."""
27- assert_for_message (JointState , "franka/joint_states" , 60 )
28-
29- def test_ready_to_start (self , test_node : Node ) -> None :
30- """This test will ensure the tests are ready to start by waiting for the move_group and moveit_manager node.
31- Also waits until the gripper_action_controller is active."""
32- assert (
33- assert_for_node (
34- fully_qualified_node_name = "franka/move_group" ,
35- test_node = test_node ,
36- timeout = 30 ,
37- )
38- is True
39- )
40- assert (
41- assert_for_node (
42- fully_qualified_node_name = "franka/moveit_manager" ,
43- test_node = test_node ,
44- timeout = 30 ,
45- )
46- is True
47- )
48- assert (
49- self .wait_until_active (
50- node = test_node , controller_name = "gripper_action_controller"
51- )
52- is True
53- )
26+ assert_for_message (JointState , "franka/joint_states" , timeout = timeout )
5427
55- def test_switch_joy_to_franka_topic (self , test_node : Node ) -> None :
28+ def test_switch_joy_to_franka_topic (self , test_node : Node , timeout : int ) -> None :
5629 """Test to see if the switch to Franka mode is correct."""
57- latched_qos = QoSProfile (
58- depth = 1 ,
59- durability = QoSDurabilityPolicy .TRANSIENT_LOCAL ,
60- history = QoSHistoryPolicy .KEEP_LAST ,
61- )
62- result = {}
63-
64- def listener_callback (msg : String ) -> None :
65- result ["state" ] = msg .data
66-
67- # Subscribe to the correct topic
68- test_node .create_subscription (
69- String ,
70- "/joy_topic_manager/state" , # this matches ~/state in JoyTopicManager
71- listener_callback ,
72- qos_profile = latched_qos ,
73- )
74-
75- pub = test_node .create_publisher (Joy , "/joy" , 10 )
76- msg = Joy ()
77- msg .buttons = [1 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
78- pub .publish (msg )
79- timeout_sec = 10.0
80- start_time = time .time ()
81- while (
82- result .get ("state" ) != "/franka/joy"
83- and time .time () - start_time < timeout_sec
84- ):
85- rclpy .spin_once (test_node , timeout_sec = 0.1 )
86- time .sleep (0.1 )
87-
88- assert result .get ("state" ) == "/franka/joy" , (
89- f"Unexpected state: { result .get ('state' )} "
30+ assert_joy_topic_switch (
31+ node = test_node ,
32+ expected_topic = "/franka/joy" ,
33+ button_config = [1 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ],
34+ timeout = timeout ,
9035 )
9136
9237 @pytest .mark .parametrize (
@@ -102,6 +47,7 @@ def test_joy_gripper_node(
10247 expected_value : float ,
10348 test_node : Node ,
10449 finger_joint_fault_tolerance : float ,
50+ timeout : int ,
10551 ) -> None :
10652 """Test that the joy node is running and the gripper is moving.
10753 The first call is an initialization call.
@@ -114,13 +60,13 @@ def test_joy_gripper_node(
11460 msg = Joy ()
11561 msg .buttons = buttons
11662 pub .publish (msg )
117- rclpy .spin_once (test_node , timeout_sec = 0.1 )
11863
119- reached_goal , joint_value = self . wait_until_reached_joint (
64+ reached_goal , joint_value = wait_until_reached_joint (
12065 namespace = "franka" ,
12166 joint = "fr3_finger_joint1" ,
12267 expected_value = expected_value ,
12368 tolerance = finger_joint_fault_tolerance ,
69+ timeout_sec = timeout ,
12470 )
12571 assert reached_goal is True , (
12672 f"The joint did not reach the joint. Currently { joint_value } , expected { expected_value } "
@@ -139,34 +85,22 @@ def test_move_arm_with_joy(
13985 axes : list [float ],
14086 direction : str ,
14187 test_node : Node ,
88+ timeout : int ,
14289 movement_threshold : float = 0.01 ,
14390 ) -> None :
14491 """Tests the linear movements of the hand while controlling with the joystick.
14592 Assert if it moves above a certain movement_threshold."""
146- pose = PoseStamped ()
147- pose .header .frame_id = "franka/fr3_hand"
148- first_pose = call_express_pose_in_other_frame (
149- node = test_node , pose = pose , target_frame = "franka/fr3_link0"
150- ).pose .pose
151-
152- pub = test_node .create_publisher (Joy , "/joy" , 10 )
153-
154- msg = Joy ()
155- msg .axes = axes
156- pub .publish (msg )
157- rclpy .spin_once (test_node , timeout_sec = 0.1 )
158- time .sleep (1 ) # small delay for the robot to move
159-
160- pose = PoseStamped ()
161- pose .header .frame_id = "franka/fr3_hand"
162- moved_pose = call_express_pose_in_other_frame (
163- node = test_node , pose = pose , target_frame = "franka/fr3_link0"
164- ).pose .pose
165-
166- delta = getattr (moved_pose .position , direction ) - getattr (
167- first_pose .position , direction
168- )
16993
170- assert abs (delta ) > movement_threshold , (
171- f"{ direction } position did not change after input. Δ = { delta :.4f} "
94+ def compare_fn (p1 : Pose , p2 : Pose ) -> float :
95+ return getattr (p2 .position , direction ) - getattr (p1 .position , direction )
96+
97+ assert_movements_with_joy (
98+ node = test_node ,
99+ joy_axes = axes ,
100+ compare_fn = compare_fn ,
101+ threshold = movement_threshold ,
102+ description = f"{ direction } position" ,
103+ frame_base = "franka/fr3_hand" ,
104+ frame_target = "franka/fr3_link0" ,
105+ timeout = timeout ,
172106 )
0 commit comments