test_srv.py 7.3 KB
Newer Older
Maximilien Naveau's avatar
Maximilien Naveau committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"""
license BSD 3-clause
Copyright (c) 2020, CNRS

Unit-tests for the python API of the DynamicGraphManager
"""

from pathlib import Path
import sys
import unittest
import pytest
import rclpy
from rclpy.node import Node

import launch
import launch_ros
import launch_ros.actions
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
18

Maximilien Naveau's avatar
Maximilien Naveau committed
19
20
try:
    import launch_testing.actions
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
21

Maximilien Naveau's avatar
Maximilien Naveau committed
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    is_launch_testing = True
except ImportError:
    is_launch_testing = False


from dynamic_graph_bridge_msgs.srv import (
    RunPythonCommand,
    RunCommand,
    RunPythonFile,
)

if is_launch_testing:

    @pytest.mark.rostest
    def generate_test_description():
        # Normally, talker publishes on the 'chatter' topic and listener listens on the
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
38
39
40
        # 'chatter' topic, but we want to show how to use remappings to munge the data
        # so we will remap these topics when we launch the nodes and insert our own node
        # that can change the data as it passes through
Maximilien Naveau's avatar
Maximilien Naveau committed
41
42
43
44
        path_to_test = Path(__file__).resolve().parent

        server_node = launch_ros.actions.Node(
            executable=sys.executable,
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
45
46
            arguments=[str(path_to_test / "unit_test_node.py")],
            additional_env={"PYTHONUNBUFFERED": "1"},
Maximilien Naveau's avatar
Maximilien Naveau committed
47
48
        )
        return (
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
49
50
51
52
53
54
55
56
            launch.LaunchDescription(
                [
                    server_node,
                    # Start tests right away - no need to wait for anything
                    launch_testing.actions.ReadyToTest(),
                ]
            ),
            {"python_server_node": server_node},
Maximilien Naveau's avatar
Maximilien Naveau committed
57
58
        )

Guilhem Saurel's avatar
format    
Guilhem Saurel committed
59
else:  # is_launch_testing
Maximilien Naveau's avatar
Maximilien Naveau committed
60
61
62
63

    @pytest.mark.rostest
    def generate_test_description(ready_fn):
        # Normally, talker publishes on the 'chatter' topic and listener listens on the
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
64
65
66
        # 'chatter' topic, but we want to show how to use remappings to munge the data
        # so we will remap these topics when we launch the nodes and insert our own node
        # that can change the data as it passes through
Maximilien Naveau's avatar
Maximilien Naveau committed
67
68
69
70
        path_to_test = Path(__file__).resolve().parent

        server_node = launch_ros.actions.Node(
            executable=sys.executable,
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
71
72
            arguments=[str(path_to_test / "unit_test_node.py")],
            additional_env={"PYTHONUNBUFFERED": "1"},
Maximilien Naveau's avatar
Maximilien Naveau committed
73
74
        )
        return (
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
75
76
77
78
79
80
81
82
83
            launch.LaunchDescription(
                [
                    server_node,
                    # Start tests right away
                    # No need to wait for anything in this example
                    launch.actions.OpaqueFunction(function=lambda context: ready_fn()),
                ]
            ),
            {"python_server_node": server_node},
Maximilien Naveau's avatar
Maximilien Naveau committed
84
85
86
87
88
        )


class RunPythonCommandClient(Node):
    def __init__(self):
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
89
90
        super().__init__("run_python_command_client_node")
        self.cli = self.create_client(RunPythonCommand, "run_python_command")
Maximilien Naveau's avatar
Maximilien Naveau committed
91
        while not self.cli.wait_for_service(timeout_sec=1.0):
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
92
            self.get_logger().info("service not available, waiting again...")
Maximilien Naveau's avatar
Maximilien Naveau committed
93
94
95
96
97
98
99
100
101
        self.req = RunPythonCommand.Request()

    def send_request(self):
        self.req.input = "1+1"
        self.future = self.cli.call_async(self.req)


class RunCommandClient(Node):
    def __init__(self):
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
102
103
        super().__init__("run_command_client_node")
        self.cli = self.create_client(RunCommand, "run_command")
Maximilien Naveau's avatar
Maximilien Naveau committed
104
        while not self.cli.wait_for_service(timeout_sec=1.0):
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
105
            self.get_logger().info("service not available, waiting again...")
Maximilien Naveau's avatar
Maximilien Naveau committed
106
107
108
109
110
111
112
113
114
        self.req = RunCommand.Request()

    def send_request(self):
        self.req.input = "1+1"
        self.future = self.cli.call_async(self.req)


class RunPythonFileClient(Node):
    def __init__(self):
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
115
116
        super().__init__("run_python_file_client_node")
        self.cli = self.create_client(RunPythonFile, "run_python_file")
Maximilien Naveau's avatar
Maximilien Naveau committed
117
        while not self.cli.wait_for_service(timeout_sec=1.0):
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
118
            self.get_logger().info("service not available, waiting again...")
Maximilien Naveau's avatar
Maximilien Naveau committed
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
        self.req = RunPythonFile.Request()

    def send_good_request(self):
        self.req.input = str(Path(__file__).resolve())
        self.future = self.cli.call_async(self.req)

    def send_bad_request(self):
        self.req.input = "hthre21@#$%@)#_#%*+($^&$i;gnvj;bae"
        self.future = self.cli.call_async(self.req)


class TestPythonServices(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        # Initialize the ROS context for the test node
        rclpy.init()

    @classmethod
    def tearDownClass(cls):
        # Shutdown the ROS context
        rclpy.shutdown()

    def setUp(self):
        pass

    def tearDown(self):
        pass

    def test_run_python_command(self):
        client = RunPythonCommandClient()
        client.send_request()

        while rclpy.ok():
            rclpy.spin_once(client)
            if client.future.done():
                try:
                    response = client.future.result()
                except Exception as e:
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
157
                    client.get_logger().info("Service call failed %r" % (e,))
Maximilien Naveau's avatar
Maximilien Naveau committed
158
                else:
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
159
                    client.get_logger().info("Result acquired")
Maximilien Naveau's avatar
Maximilien Naveau committed
160
161
162
163
164
165
                break

        self.assertEqual(client.req.input, "1+1")
        self.assertEqual(response.result, "1+1_result_python_cmd")
        self.assertEqual(response.standardoutput, "standardoutput")
        self.assertEqual(response.standarderror, "standarderror")
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
166

Maximilien Naveau's avatar
Maximilien Naveau committed
167
168
169
170
171
172
173
174
175
176
177
178
        client.destroy_node()

    def test_run_command(self):
        client = RunCommandClient()
        client.send_request()

        while rclpy.ok():
            rclpy.spin_once(client)
            if client.future.done():
                try:
                    response = client.future.result()
                except Exception as e:
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
179
                    client.get_logger().info("Service call failed %r" % (e,))
Maximilien Naveau's avatar
Maximilien Naveau committed
180
                else:
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
181
                    client.get_logger().info("Result acquired")
Maximilien Naveau's avatar
Maximilien Naveau committed
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
                break

        self.assertEqual(client.req.input, "1+1")
        self.assertEqual(response.result, "1+1_result_cmd")
        self.assertEqual(response.standardoutput, "standardoutput")
        self.assertEqual(response.standarderror, "standarderror")

        client.destroy_node()

    def test_run_python_file_good(self):
        client = RunPythonFileClient()
        client.send_good_request()

        while rclpy.ok():
            rclpy.spin_once(client)
            if client.future.done():
                try:
                    response = client.future.result()
                except Exception as e:
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
201
                    client.get_logger().info("Service call failed %r" % (e,))
Maximilien Naveau's avatar
Maximilien Naveau committed
202
                else:
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
203
                    client.get_logger().info("Result acquired")
Maximilien Naveau's avatar
Maximilien Naveau committed
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
                break

        self.assertEqual(client.req.input, str(Path(__file__)))
        self.assertEqual(response.result, "True")

        client.destroy_node()

    def test_run_python_file_bad(self):
        client = RunPythonFileClient()
        client.send_bad_request()

        while rclpy.ok():
            rclpy.spin_once(client)
            if client.future.done():
                try:
                    response = client.future.result()
                except Exception as e:
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
221
                    client.get_logger().info("Service call failed %r" % (e,))
Maximilien Naveau's avatar
Maximilien Naveau committed
222
                else:
Guilhem Saurel's avatar
format    
Guilhem Saurel committed
223
                    client.get_logger().info("Result acquired")
Maximilien Naveau's avatar
Maximilien Naveau committed
224
225
226
227
228
229
                break

        self.assertEqual(client.req.input, "hthre21@#$%@)#_#%*+($^&$i;gnvj;bae")
        self.assertEqual(response.result, "False")

        client.destroy_node()