Unverified Commit 7a757ee0 authored by Guilhem Saurel's avatar Guilhem Saurel Committed by GitHub
Browse files

Merge pull request #14 from nim65s/devel

format
parents 956bfabf b85a4f83
Pipeline #18173 passed with stage
in 7 minutes and 27 seconds
# format (Guilhem Saurel, 2022-04-05)
64ba25febf5d45a0b773c75e2e20ea89ad4c0d05
......@@ -2,4 +2,4 @@ build/
msg_gen/
srv_gen/
src/dynamic_graph_bridge_msgs/
**/__pycache__/
\ No newline at end of file
**/__pycache__/
repos:
- repo: https://github.com/pre-commit/mirrors-clang-format
rev: v13.0.1
hooks:
- id: clang-format
args: [-i, --style=Google]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.1.0
hooks:
- id: check-added-large-files
- id: check-ast
- id: check-executables-have-shebangs
- id: check-json
- id: check-merge-conflict
- id: check-symlinks
- id: check-toml
- id: check-yaml
- id: debug-statements
- id: destroyed-symlinks
- id: detect-private-key
- id: end-of-file-fixer
- id: fix-byte-order-marker
- id: mixed-line-ending
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
hooks:
- id: flake8
# Dynamic Graph Bridge MSGS
[![Pipeline status](https://gitlab.laas.fr/stack-of-tasks/dynamic_graph_bridge_msgs/badges/master/pipeline.svg)](https://gitlab.laas.fr/stack-of-tasks/dynamic_graph_bridge_msgs/commits/master)
[![Coverage report](https://gitlab.laas.fr/stack-of-tasks/dynamic_graph_bridge_msgs/badges/master/coverage.svg?job=doc-coverage)](http://projects.laas.fr/gepetto/doc/stack-of-tasks/dynamic_graph_bridge_msgs/master/coverage/)
[![Coverage report](https://gitlab.laas.fr/stack-of-tasks/dynamic_graph_bridge_msgs/badges/master/coverage.svg?job=doc-coverage)](https://gepettoweb.laas.fr/doc/stack-of-tasks/dynamic_graph_bridge_msgs/master/coverage/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![pre-commit.ci status](https://results.pre-commit.ci/badge/github/stack-of-tasks/dynamic_graph_bridge_msgs/master.svg)](https://results.pre-commit.ci/latest/github/stack-of-tasks/dynamic_graph_bridge_msgs)
This packages provides the ROS1/2 messages and services for the user interface
of the
[SoT](https://stack-of-tasks.github.io/sot-doc/doxygen/HEAD/page_overview.html)
[SoT](https://stack-of-tasks.github.io/sot-doc/doxygen/HEAD/page_overview.html)
framework.
The current messages are:
......@@ -28,4 +31,4 @@ We use the `ROS_VERSION` environment variable in order to handle the different
build.
The ROS1 build is based on the `catkin` cmake module, and the ROS2 is based on
`ament` cmake module.
\ No newline at end of file
`ament` cmake module.
Subproject commit f4997a81cebfa2dfb69733bc088c55688965dfe8
Subproject commit 3d6176d439963702d97b82555e3007de05a4e6a4
INPUT = @PROJECT_SOURCE_DIR@/doc
......@@ -27,8 +27,8 @@
<member_of_group condition="$ROS_VERSION == 2">rosidl_interface_packages</member_of_group>
<test_depend condition="$ROS_VERSION == 1">rospy</test_depend>
<test_depend condition="$ROS_VERSION == 2">rclpy</test_depend>
<test_depend condition="$ROS_VERSION == 1">rospy</test_depend>
<test_depend condition="$ROS_VERSION == 2">rclpy</test_depend>
<export>
<rosdoc config="rosdoc.yaml" />
......
[tool.black]
exclude = "cmake"
[flake8]
exclude = cmake
max-line-length = 88
ignore = E226, E704, E24, E121, W504, E126, E123, W503, E203
"""
license BSD 3-clause
Copyright (c) 2020, CNRS
"""
\ No newline at end of file
"""
......@@ -24,8 +24,9 @@ class TestMessages(unittest.TestCase):
def setUp(self):
self.random_vector = np.random.rand(int(np.random.rand() * 10.0 + 1))
self.random_matrix = np.random.rand(int(np.random.rand() * 10.0 + 2),
int(np.random.rand() * 10.0 + 2))
self.random_matrix = np.random.rand(
int(np.random.rand() * 10.0 + 2), int(np.random.rand() * 10.0 + 2)
)
def tearDown(self):
pass
......@@ -48,5 +49,5 @@ class TestMessages(unittest.TestCase):
self.assertEqual(m.width, width)
if __name__ == '__main__':
rostest.rosrun('dynamic_graph_bridge_msgs', 'test_msg', TestMessages)
\ No newline at end of file
if __name__ == "__main__":
rostest.rosrun("dynamic_graph_bridge_msgs", "test_msg", TestMessages)
......@@ -37,61 +37,61 @@ class TestServices(unittest.TestCase):
pass
def test_run_python_command(self):
rospy.wait_for_service('run_python_command')
rospy.wait_for_service("run_python_command")
try:
client = rospy.ServiceProxy('run_python_command', RunPythonCommand)
client = rospy.ServiceProxy("run_python_command", RunPythonCommand)
request = RunPythonCommandRequest()
request.input = "1+1"
response = client(request)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
print("Service call failed: %s" % e)
self.assertTrue(False)
self.assertEqual(response.result, "1+1_result_python_cmd")
self.assertEqual(response.standardoutput, "standardoutput")
self.assertEqual(response.standarderror, "standarderror")
def test_run_command(self):
rospy.wait_for_service('run_command')
rospy.wait_for_service("run_command")
try:
client = rospy.ServiceProxy('run_command', RunCommand)
client = rospy.ServiceProxy("run_command", RunCommand)
request = RunCommandRequest()
request.input = "1+1"
response = client(request)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
print("Service call failed: %s" % e)
self.assertTrue(False)
self.assertEqual(response.result, "1+1_result_cmd")
self.assertEqual(response.standardoutput, "standardoutput")
self.assertEqual(response.standarderror, "standarderror")
def test_run_python_file_good(self):
rospy.wait_for_service('run_python_file')
rospy.wait_for_service("run_python_file")
try:
client = rospy.ServiceProxy('run_python_file', RunPythonFile)
client = rospy.ServiceProxy("run_python_file", RunPythonFile)
request = RunPythonFileRequest()
request.input = os.path.abspath(__file__)
response = client(request)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
print("Service call failed: %s" % e)
self.assertTrue(False)
self.assertEqual(response.result, "True")
def test_run_python_file_bad(self):
rospy.wait_for_service('run_python_file')
rospy.wait_for_service("run_python_file")
try:
client = rospy.ServiceProxy('run_python_file', RunPythonFile)
client = rospy.ServiceProxy("run_python_file", RunPythonFile)
request = RunPythonFileRequest()
request.input = "hthre21@#$%@)#_#%*+($^&$i;gnvj;bae"
response = client(request)
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
print("Service call failed: %s" % e)
self.assertTrue(False)
self.assertEqual(response.result, "False")
if __name__ == '__main__':
rostest.rosrun('dynamic_graph_bridge_msgs', 'test_srv', TestServices)
if __name__ == "__main__":
rostest.rosrun("dynamic_graph_bridge_msgs", "test_srv", TestServices)
......@@ -20,17 +20,19 @@ from dynamic_graph_bridge_msgs.srv import (
class ServerNode(object):
def __init__(self):
self._run_python_command_srv = rospy.Service(
'run_python_command', RunPythonCommand, self._run_python_command)
"run_python_command", RunPythonCommand, self._run_python_command
)
self._run_command_srv = rospy.Service(
'run_command', RunCommand, self._run_command)
"run_command", RunCommand, self._run_command
)
self._run_python_file_srv = rospy.Service(
'run_python_file', RunPythonFile, self._run_python_file)
"run_python_file", RunPythonFile, self._run_python_file
)
def _run_python_command(self, request):
response = RunPythonCommandResponse()
response.result = request.input + "_result_python_cmd"
......@@ -50,11 +52,12 @@ class ServerNode(object):
response.result = str(os.path.exists(request.input))
return response
def main(args=None):
rospy.init_node('unit_test_node')
server = ServerNode()
rospy.init_node("unit_test_node")
ServerNode()
rospy.spin()
if __name__ == '__main__':
if __name__ == "__main__":
main()
......@@ -2,4 +2,4 @@
<test test-name="test_msg" pkg="dynamic_graph_bridge_msgs" type="test_msg.py" />
<test test-name="test_srv" pkg="dynamic_graph_bridge_msgs" type="test_srv.py" />
<node name="unit_test_node" pkg="dynamic_graph_bridge_msgs" type="unit_test_node.py" />
</launch>
\ No newline at end of file
</launch>
"""
license BSD 3-clause
Copyright (c) 2020, CNRS
"""
\ No newline at end of file
"""
#!/usr/bin/env python
"""
license BSD 3-clause
......@@ -27,8 +26,9 @@ class TestMessages(unittest.TestCase):
def setUp(self):
self.random_vector = np.random.rand(int(np.random.rand() * 10.0 + 1))
self.random_matrix = np.random.rand(int(np.random.rand() * 10.0 + 2),
int(np.random.rand() * 10.0 + 2))
self.random_matrix = np.random.rand(
int(np.random.rand() * 10.0 + 2), int(np.random.rand() * 10.0 + 2)
)
def tearDown(self):
pass
......
......@@ -15,8 +15,10 @@ from rclpy.node import Node
import launch
import launch_ros
import launch_ros.actions
try:
import launch_testing.actions
is_launch_testing = True
except ImportError:
is_launch_testing = False
......@@ -33,61 +35,61 @@ if is_launch_testing:
@pytest.mark.rostest
def generate_test_description():
# Normally, talker publishes on the 'chatter' topic and listener listens on the
# 'chatter' topic, but we want to show how to use remappings to munge the data so we
# will remap these topics when we launch the nodes and insert our own node that can
# change the data as it passes through
# 'chatter' topic, but we want to show how to use remappings to munge the data
# so we will remap these topics when we launch the nodes and insert our own node
# that can change the data as it passes through
path_to_test = Path(__file__).resolve().parent
server_node = launch_ros.actions.Node(
executable=sys.executable,
arguments=[str(path_to_test / 'unit_test_node.py')],
additional_env={'PYTHONUNBUFFERED': '1'}
arguments=[str(path_to_test / "unit_test_node.py")],
additional_env={"PYTHONUNBUFFERED": "1"},
)
return (
launch.LaunchDescription([
server_node,
# Start tests right away - no need to wait for anything
launch_testing.actions.ReadyToTest(),
]),
{
'python_server_node': server_node
},
launch.LaunchDescription(
[
server_node,
# Start tests right away - no need to wait for anything
launch_testing.actions.ReadyToTest(),
]
),
{"python_server_node": server_node},
)
else: # is_launch_testing
else: # is_launch_testing
@pytest.mark.rostest
def generate_test_description(ready_fn):
# Normally, talker publishes on the 'chatter' topic and listener listens on the
# 'chatter' topic, but we want to show how to use remappings to munge the data so we
# will remap these topics when we launch the nodes and insert our own node that can
# change the data as it passes through
# 'chatter' topic, but we want to show how to use remappings to munge the data
# so we will remap these topics when we launch the nodes and insert our own node
# that can change the data as it passes through
path_to_test = Path(__file__).resolve().parent
server_node = launch_ros.actions.Node(
executable=sys.executable,
arguments=[str(path_to_test / 'unit_test_node.py')],
additional_env={'PYTHONUNBUFFERED': '1'}
arguments=[str(path_to_test / "unit_test_node.py")],
additional_env={"PYTHONUNBUFFERED": "1"},
)
return (
launch.LaunchDescription([
server_node,
# Start tests right away - no need to wait for anything in this example
launch.actions.OpaqueFunction(function=lambda context: ready_fn()),
]),
{
'python_server_node': server_node
},
launch.LaunchDescription(
[
server_node,
# Start tests right away
# No need to wait for anything in this example
launch.actions.OpaqueFunction(function=lambda context: ready_fn()),
]
),
{"python_server_node": server_node},
)
class RunPythonCommandClient(Node):
def __init__(self):
super().__init__('run_python_command_client_node')
self.cli = self.create_client(RunPythonCommand, 'run_python_command')
super().__init__("run_python_command_client_node")
self.cli = self.create_client(RunPythonCommand, "run_python_command")
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.get_logger().info("service not available, waiting again...")
self.req = RunPythonCommand.Request()
def send_request(self):
......@@ -96,12 +98,11 @@ class RunPythonCommandClient(Node):
class RunCommandClient(Node):
def __init__(self):
super().__init__('run_command_client_node')
self.cli = self.create_client(RunCommand, 'run_command')
super().__init__("run_command_client_node")
self.cli = self.create_client(RunCommand, "run_command")
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.get_logger().info("service not available, waiting again...")
self.req = RunCommand.Request()
def send_request(self):
......@@ -110,12 +111,11 @@ class RunCommandClient(Node):
class RunPythonFileClient(Node):
def __init__(self):
super().__init__('run_python_file_client_node')
self.cli = self.create_client(RunPythonFile, 'run_python_file')
super().__init__("run_python_file_client_node")
self.cli = self.create_client(RunPythonFile, "run_python_file")
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.get_logger().info("service not available, waiting again...")
self.req = RunPythonFile.Request()
def send_good_request(self):
......@@ -154,17 +154,16 @@ class TestPythonServices(unittest.TestCase):
try:
response = client.future.result()
except Exception as e:
client.get_logger().info(
'Service call failed %r' % (e,))
client.get_logger().info("Service call failed %r" % (e,))
else:
client.get_logger().info('Result acquired')
client.get_logger().info("Result acquired")
break
self.assertEqual(client.req.input, "1+1")
self.assertEqual(response.result, "1+1_result_python_cmd")
self.assertEqual(response.standardoutput, "standardoutput")
self.assertEqual(response.standarderror, "standarderror")
client.destroy_node()
def test_run_command(self):
......@@ -177,10 +176,9 @@ class TestPythonServices(unittest.TestCase):
try:
response = client.future.result()
except Exception as e:
client.get_logger().info(
'Service call failed %r' % (e,))
client.get_logger().info("Service call failed %r" % (e,))
else:
client.get_logger().info('Result acquired')
client.get_logger().info("Result acquired")
break
self.assertEqual(client.req.input, "1+1")
......@@ -200,10 +198,9 @@ class TestPythonServices(unittest.TestCase):
try:
response = client.future.result()
except Exception as e:
client.get_logger().info(
'Service call failed %r' % (e,))
client.get_logger().info("Service call failed %r" % (e,))
else:
client.get_logger().info('Result acquired')
client.get_logger().info("Result acquired")
break
self.assertEqual(client.req.input, str(Path(__file__)))
......@@ -221,10 +218,9 @@ class TestPythonServices(unittest.TestCase):
try:
response = client.future.result()
except Exception as e:
client.get_logger().info(
'Service call failed %r' % (e,))
client.get_logger().info("Service call failed %r" % (e,))
else:
client.get_logger().info('Result acquired')
client.get_logger().info("Result acquired")
break
self.assertEqual(client.req.input, "hthre21@#$%@)#_#%*+($^&$i;gnvj;bae")
......
......@@ -10,26 +10,24 @@ small server for running unit-test on the generated API of ROS2 services.
from pathlib import Path
import rclpy
from rclpy.node import Node
from dynamic_graph_bridge_msgs.srv import (
RunPythonCommand,
RunCommand,
RunPythonFile
)
from dynamic_graph_bridge_msgs.srv import RunPythonCommand, RunCommand, RunPythonFile
class ServerNode(Node):
def __init__(self):
super().__init__('python_node')
super().__init__("python_node")
self._run_python_command_srv = self.create_service(
RunPythonCommand, 'run_python_command', self._run_python_command)
RunPythonCommand, "run_python_command", self._run_python_command
)
self._run_command_srv = self.create_service(
RunCommand, 'run_command', self._run_command)
RunCommand, "run_command", self._run_command
)
self._run_python_file_srv = self.create_service(
RunPythonFile, 'run_python_file', self._run_python_file)
RunPythonFile, "run_python_file", self._run_python_file
)
def _run_python_command(self, request, response):
response.result = request.input + "_result_python_cmd"
response.standardoutput = "standardoutput"
......@@ -45,11 +43,12 @@ class ServerNode(Node):
def _run_python_file(self, request, response):
response.result = str(Path(request.input).exists())
return response
def cleanup(self):
self.destroy_service(self._run_python_command_srv)
self.destroy_service(self._run_python_file_srv)
def main(args=None):
rclpy.init(args=args)
......@@ -64,5 +63,5 @@ def main(args=None):
rclpy.shutdown()
if __name__ == '__main__':
if __name__ == "__main__":
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment