Commit 64ba25fe authored by Guilhem Saurel's avatar Guilhem Saurel
Browse files

format

parent bfd756eb
......@@ -2,4 +2,4 @@ build/
msg_gen/
srv_gen/
src/dynamic_graph_bridge_msgs/
**/__pycache__/
\ No newline at end of file
**/__pycache__/
INPUT = @PROJECT_SOURCE_DIR@/doc
......@@ -27,8 +27,8 @@
<member_of_group condition="$ROS_VERSION == 2">rosidl_interface_packages</member_of_group>
<test_depend condition="$ROS_VERSION == 1">rospy</test_depend>
<test_depend condition="$ROS_VERSION == 2">rclpy</test_depend>
<test_depend condition="$ROS_VERSION == 1">rospy</test_depend>
<test_depend condition="$ROS_VERSION == 2">rclpy</test_depend>
<export>
<rosdoc config="rosdoc.yaml" />
......
"""
license BSD 3-clause
Copyright (c) 2020, CNRS
"""
\ No newline at end of file
"""
......@@ -24,8 +24,9 @@ class TestMessages(unittest.TestCase):
def setUp(self):
self.random_vector = np.random.rand(int(np.random.rand() * 10.0 + 1))
self.random_matrix = np.random.rand(int(np.random.rand() * 10.0 + 2),
int(np.random.rand() * 10.0 + 2))
self.random_matrix = np.random.rand(
int(np.random.rand() * 10.0 + 2), int(np.random.rand() * 10.0 + 2)
)
def tearDown(self):
pass
......@@ -48,5 +49,5 @@ class TestMessages(unittest.TestCase):
self.assertEqual(m.width, width)
if __name__ == '__main__':
rostest.rosrun('dynamic_graph_bridge_msgs', 'test_msg', TestMessages)
\ No newline at end of file
if __name__ == "__main__":
rostest.rosrun("dynamic_graph_bridge_msgs", "test_msg", TestMessages)
......@@ -37,61 +37,61 @@ class TestServices(unittest.TestCase):
pass
def test_run_python_command(self):
rospy.wait_for_service('run_python_command')
rospy.wait_for_service("run_python_command")
try:
client = rospy.ServiceProxy('run_python_command', RunPythonCommand)
client = rospy.ServiceProxy("run_python_command", RunPythonCommand)
request = RunPythonCommandRequest()
request.input = "1+1"
response = client(request)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
print("Service call failed: %s" % e)
self.assertTrue(False)
self.assertEqual(response.result, "1+1_result_python_cmd")
self.assertEqual(response.standardoutput, "standardoutput")
self.assertEqual(response.standarderror, "standarderror")
def test_run_command(self):
rospy.wait_for_service('run_command')
rospy.wait_for_service("run_command")
try:
client = rospy.ServiceProxy('run_command', RunCommand)
client = rospy.ServiceProxy("run_command", RunCommand)
request = RunCommandRequest()
request.input = "1+1"
response = client(request)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
print("Service call failed: %s" % e)
self.assertTrue(False)
self.assertEqual(response.result, "1+1_result_cmd")
self.assertEqual(response.standardoutput, "standardoutput")
self.assertEqual(response.standarderror, "standarderror")
def test_run_python_file_good(self):
rospy.wait_for_service('run_python_file')
rospy.wait_for_service("run_python_file")
try:
client = rospy.ServiceProxy('run_python_file', RunPythonFile)
client = rospy.ServiceProxy("run_python_file", RunPythonFile)
request = RunPythonFileRequest()
request.input = os.path.abspath(__file__)
response = client(request)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
print("Service call failed: %s" % e)
self.assertTrue(False)
self.assertEqual(response.result, "True")
def test_run_python_file_bad(self):
rospy.wait_for_service('run_python_file')
rospy.wait_for_service("run_python_file")
try:
client = rospy.ServiceProxy('run_python_file', RunPythonFile)
client = rospy.ServiceProxy("run_python_file", RunPythonFile)
request = RunPythonFileRequest()
request.input = "hthre21@#$%@)#_#%*+($^&$i;gnvj;bae"
response = client(request)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
print("Service call failed: %s" % e)
self.assertTrue(False)
self.assertEqual(response.result, "False")
if __name__ == '__main__':
rostest.rosrun('dynamic_graph_bridge_msgs', 'test_srv', TestServices)
if __name__ == "__main__":
rostest.rosrun("dynamic_graph_bridge_msgs", "test_srv", TestServices)
......@@ -20,17 +20,19 @@ from dynamic_graph_bridge_msgs.srv import (
class ServerNode(object):
def __init__(self):
self._run_python_command_srv = rospy.Service(
'run_python_command', RunPythonCommand, self._run_python_command)
"run_python_command", RunPythonCommand, self._run_python_command
)
self._run_command_srv = rospy.Service(
'run_command', RunCommand, self._run_command)
"run_command", RunCommand, self._run_command
)
self._run_python_file_srv = rospy.Service(
'run_python_file', RunPythonFile, self._run_python_file)
"run_python_file", RunPythonFile, self._run_python_file
)
def _run_python_command(self, request):
response = RunPythonCommandResponse()
response.result = request.input + "_result_python_cmd"
......@@ -50,11 +52,12 @@ class ServerNode(object):
response.result = str(os.path.exists(request.input))
return response
def main(args=None):
rospy.init_node('unit_test_node')
server = ServerNode()
rospy.init_node("unit_test_node")
ServerNode()
rospy.spin()
if __name__ == '__main__':
if __name__ == "__main__":
main()
......@@ -2,4 +2,4 @@
<test test-name="test_msg" pkg="dynamic_graph_bridge_msgs" type="test_msg.py" />
<test test-name="test_srv" pkg="dynamic_graph_bridge_msgs" type="test_srv.py" />
<node name="unit_test_node" pkg="dynamic_graph_bridge_msgs" type="unit_test_node.py" />
</launch>
\ No newline at end of file
</launch>
"""
license BSD 3-clause
Copyright (c) 2020, CNRS
"""
\ No newline at end of file
"""
#!/usr/bin/env python
"""
license BSD 3-clause
......@@ -27,8 +26,9 @@ class TestMessages(unittest.TestCase):
def setUp(self):
self.random_vector = np.random.rand(int(np.random.rand() * 10.0 + 1))
self.random_matrix = np.random.rand(int(np.random.rand() * 10.0 + 2),
int(np.random.rand() * 10.0 + 2))
self.random_matrix = np.random.rand(
int(np.random.rand() * 10.0 + 2), int(np.random.rand() * 10.0 + 2)
)
def tearDown(self):
pass
......
......@@ -15,8 +15,10 @@ from rclpy.node import Node
import launch
import launch_ros
import launch_ros.actions
try:
import launch_testing.actions
is_launch_testing = True
except ImportError:
is_launch_testing = False
......@@ -33,61 +35,61 @@ if is_launch_testing:
@pytest.mark.rostest
def generate_test_description():
# Normally, talker publishes on the 'chatter' topic and listener listens on the
# 'chatter' topic, but we want to show how to use remappings to munge the data so we
# will remap these topics when we launch the nodes and insert our own node that can
# change the data as it passes through
# 'chatter' topic, but we want to show how to use remappings to munge the data
# so we will remap these topics when we launch the nodes and insert our own node
# that can change the data as it passes through
path_to_test = Path(__file__).resolve().parent
server_node = launch_ros.actions.Node(
executable=sys.executable,
arguments=[str(path_to_test / 'unit_test_node.py')],
additional_env={'PYTHONUNBUFFERED': '1'}
arguments=[str(path_to_test / "unit_test_node.py")],
additional_env={"PYTHONUNBUFFERED": "1"},
)
return (
launch.LaunchDescription([
server_node,
# Start tests right away - no need to wait for anything
launch_testing.actions.ReadyToTest(),
]),
{
'python_server_node': server_node
},
launch.LaunchDescription(
[
server_node,
# Start tests right away - no need to wait for anything
launch_testing.actions.ReadyToTest(),
]
),
{"python_server_node": server_node},
)
else: # is_launch_testing
else: # is_launch_testing
@pytest.mark.rostest
def generate_test_description(ready_fn):
# Normally, talker publishes on the 'chatter' topic and listener listens on the
# 'chatter' topic, but we want to show how to use remappings to munge the data so we
# will remap these topics when we launch the nodes and insert our own node that can
# change the data as it passes through
# 'chatter' topic, but we want to show how to use remappings to munge the data
# so we will remap these topics when we launch the nodes and insert our own node
# that can change the data as it passes through
path_to_test = Path(__file__).resolve().parent
server_node = launch_ros.actions.Node(
executable=sys.executable,
arguments=[str(path_to_test / 'unit_test_node.py')],
additional_env={'PYTHONUNBUFFERED': '1'}
arguments=[str(path_to_test / "unit_test_node.py")],
additional_env={"PYTHONUNBUFFERED": "1"},
)
return (
launch.LaunchDescription([
server_node,
# Start tests right away - no need to wait for anything in this example
launch.actions.OpaqueFunction(function=lambda context: ready_fn()),
]),
{
'python_server_node': server_node
},
launch.LaunchDescription(
[
server_node,
# Start tests right away
# No need to wait for anything in this example
launch.actions.OpaqueFunction(function=lambda context: ready_fn()),
]
),
{"python_server_node": server_node},
)
class RunPythonCommandClient(Node):
def __init__(self):
super().__init__('run_python_command_client_node')
self.cli = self.create_client(RunPythonCommand, 'run_python_command')
super().__init__("run_python_command_client_node")
self.cli = self.create_client(RunPythonCommand, "run_python_command")
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.get_logger().info("service not available, waiting again...")
self.req = RunPythonCommand.Request()
def send_request(self):
......@@ -96,12 +98,11 @@ class RunPythonCommandClient(Node):
class RunCommandClient(Node):
def __init__(self):
super().__init__('run_command_client_node')
self.cli = self.create_client(RunCommand, 'run_command')
super().__init__("run_command_client_node")
self.cli = self.create_client(RunCommand, "run_command")
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.get_logger().info("service not available, waiting again...")
self.req = RunCommand.Request()
def send_request(self):
......@@ -110,12 +111,11 @@ class RunCommandClient(Node):
class RunPythonFileClient(Node):
def __init__(self):
super().__init__('run_python_file_client_node')
self.cli = self.create_client(RunPythonFile, 'run_python_file')
super().__init__("run_python_file_client_node")
self.cli = self.create_client(RunPythonFile, "run_python_file")
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.get_logger().info("service not available, waiting again...")
self.req = RunPythonFile.Request()
def send_good_request(self):
......@@ -154,17 +154,16 @@ class TestPythonServices(unittest.TestCase):
try:
response = client.future.result()
except Exception as e:
client.get_logger().info(
'Service call failed %r' % (e,))
client.get_logger().info("Service call failed %r" % (e,))
else:
client.get_logger().info('Result acquired')
client.get_logger().info("Result acquired")
break
self.assertEqual(client.req.input, "1+1")
self.assertEqual(response.result, "1+1_result_python_cmd")
self.assertEqual(response.standardoutput, "standardoutput")
self.assertEqual(response.standarderror, "standarderror")
client.destroy_node()
def test_run_command(self):
......@@ -177,10 +176,9 @@ class TestPythonServices(unittest.TestCase):
try:
response = client.future.result()
except Exception as e:
client.get_logger().info(
'Service call failed %r' % (e,))
client.get_logger().info("Service call failed %r" % (e,))
else:
client.get_logger().info('Result acquired')
client.get_logger().info("Result acquired")
break
self.assertEqual(client.req.input, "1+1")
......@@ -200,10 +198,9 @@ class TestPythonServices(unittest.TestCase):
try:
response = client.future.result()
except Exception as e:
client.get_logger().info(
'Service call failed %r' % (e,))
client.get_logger().info("Service call failed %r" % (e,))
else:
client.get_logger().info('Result acquired')
client.get_logger().info("Result acquired")
break
self.assertEqual(client.req.input, str(Path(__file__)))
......@@ -221,10 +218,9 @@ class TestPythonServices(unittest.TestCase):
try:
response = client.future.result()
except Exception as e:
client.get_logger().info(
'Service call failed %r' % (e,))
client.get_logger().info("Service call failed %r" % (e,))
else:
client.get_logger().info('Result acquired')
client.get_logger().info("Result acquired")
break
self.assertEqual(client.req.input, "hthre21@#$%@)#_#%*+($^&$i;gnvj;bae")
......
......@@ -10,26 +10,24 @@ small server for running unit-test on the generated API of ROS2 services.
from pathlib import Path
import rclpy
from rclpy.node import Node
from dynamic_graph_bridge_msgs.srv import (
RunPythonCommand,
RunCommand,
RunPythonFile
)
from dynamic_graph_bridge_msgs.srv import RunPythonCommand, RunCommand, RunPythonFile
class ServerNode(Node):
def __init__(self):
super().__init__('python_node')
super().__init__("python_node")
self._run_python_command_srv = self.create_service(
RunPythonCommand, 'run_python_command', self._run_python_command)
RunPythonCommand, "run_python_command", self._run_python_command
)
self._run_command_srv = self.create_service(
RunCommand, 'run_command', self._run_command)
RunCommand, "run_command", self._run_command
)
self._run_python_file_srv = self.create_service(
RunPythonFile, 'run_python_file', self._run_python_file)
RunPythonFile, "run_python_file", self._run_python_file
)
def _run_python_command(self, request, response):
response.result = request.input + "_result_python_cmd"
response.standardoutput = "standardoutput"
......@@ -45,11 +43,12 @@ class ServerNode(Node):
def _run_python_file(self, request, response):
response.result = str(Path(request.input).exists())
return response
def cleanup(self):
self.destroy_service(self._run_python_command_srv)
self.destroy_service(self._run_python_file_srv)
def main(args=None):
rclpy.init(args=args)
......@@ -64,5 +63,5 @@ def main(args=None):
rclpy.shutdown()
if __name__ == '__main__':
if __name__ == "__main__":
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment