diff --git a/CMakeLists.txt b/CMakeLists.txt index 2b52daf54bd37d600640ac42e4f2cc87ef0d3314..c58e04ebddc1f8ed99849b99674f17e219b9aeb8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -132,6 +132,7 @@ endmacro() # Build Sot Entities compile_plugin(ros_publish) compile_plugin(ros_subscribe) +compile_plugin(ros_queued_subscribe) compile_plugin(ros_time) compile_plugin(ros_joint_state) pkg_config_use_dependency(ros_joint_state pinocchio) diff --git a/src/ros_queued_subscribe.cpp b/src/ros_queued_subscribe.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3c93628fcd2742ba7f097ae7b40c16f7e50f5dcb --- /dev/null +++ b/src/ros_queued_subscribe.cpp @@ -0,0 +1,354 @@ +// +// Copyright (c) 2017-2018 CNRS +// Authors: Joseph Mirabel +// +// +// This file is part of dynamic_graph_bridge +// dynamic_graph_bridge is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation, either version +// 3 of the License, or (at your option) any later version. +// +// dynamic_graph_bridge is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Lesser Public License for more details. You should have +// received a copy of the GNU Lesser General Public License along with +// dynamic_graph_bridge If not, see +// <http://www.gnu.org/licenses/>. + +#include <boost/assign.hpp> +#include <boost/bind.hpp> +#include <boost/format.hpp> +#include <boost/function.hpp> +#include <boost/make_shared.hpp> + +#include <ros/ros.h> +#include <std_msgs/Float64.h> +#include <std_msgs/UInt32.h> + +#include <dynamic-graph/factory.h> + +#include "dynamic_graph_bridge/ros_init.hh" +#include "ros_queued_subscribe.hh" + +namespace dynamicgraph +{ + DYNAMICGRAPH_FACTORY_ENTITY_PLUGIN(RosQueuedSubscribe, "RosQueuedSubscribe"); + + namespace command + { + namespace rosQueuedSubscribe + { + Clear::Clear + (RosQueuedSubscribe& entity, const std::string& docstring) + : Command + (entity, + std::vector<Value::Type> (), + docstring) + {} + + Value Clear::doExecute () + { + RosQueuedSubscribe& entity = + static_cast<RosQueuedSubscribe&> (owner ()); + + entity.clear (); + return Value (); + } + + ClearQueue::ClearQueue + (RosQueuedSubscribe& entity, const std::string& docstring) + : Command + (entity, + boost::assign::list_of (Value::STRING), + docstring) + {} + + Value ClearQueue::doExecute () + { + RosQueuedSubscribe& entity = + static_cast<RosQueuedSubscribe&> (owner ()); + + std::vector<Value> values = getParameterValues (); + const std::string& signal = values[0].value (); + entity.clearQueue (signal); + + return Value (); + } + + List::List + (RosQueuedSubscribe& entity, const std::string& docstring) + : Command + (entity, + std::vector<Value::Type> (), + docstring) + {} + + Value List::doExecute () + { + RosQueuedSubscribe& entity = + static_cast<RosQueuedSubscribe&> (owner ()); + return Value (entity.list ()); + } + + Add::Add + (RosQueuedSubscribe& entity, const std::string& docstring) + : Command + (entity, + boost::assign::list_of + (Value::STRING) (Value::STRING) (Value::STRING), + docstring) + {} + + Value Add::doExecute () + { + RosQueuedSubscribe& entity = + static_cast<RosQueuedSubscribe&> (owner ()); + std::vector<Value> values = getParameterValues (); + + const std::string& type = values[0].value (); + const std::string& signal = values[1].value (); + const std::string& topic = values[2].value (); + + if (type == "double") + entity.add<double> (type, signal, topic); + else if (type == "unsigned") + entity.add<unsigned int> (type, signal, topic); + else if (type == "matrix") + entity.add<dg::Matrix> (type, signal, topic); + else if (type == "vector") + entity.add<dg::Vector> (type, signal, topic); + else if (type == "vector3") + entity.add<specific::Vector3> (type, signal, topic); + else if (type == "matrixHomo") + entity.add<sot::MatrixHomogeneous> (type, signal, topic); + else if (type == "twist") + entity.add<specific::Twist> (type, signal, topic); + else + throw std::runtime_error("bad type"); + return Value (); + } + + Rm::Rm + (RosQueuedSubscribe& entity, const std::string& docstring) + : Command + (entity, + boost::assign::list_of (Value::STRING), + docstring) + {} + + Value Rm::doExecute () + { + RosQueuedSubscribe& entity = + static_cast<RosQueuedSubscribe&> (owner ()); + std::vector<Value> values = getParameterValues (); + const std::string& signal = values[0].value (); + entity.rm (signal); + return Value (); + } + + QueueSize::QueueSize + (RosQueuedSubscribe& entity, const std::string& docstring) + : Command + (entity, + boost::assign::list_of (Value::STRING), + docstring) + {} + + Value QueueSize::doExecute () + { + RosQueuedSubscribe& entity = + static_cast<RosQueuedSubscribe&> (owner ()); + + std::vector<Value> values = getParameterValues (); + const std::string& signal = values[0].value (); + + return Value ((unsigned)entity.queueSize (signal)); + } + + ReadQueue::ReadQueue + (RosQueuedSubscribe& entity, const std::string& docstring) + : Command + (entity, + boost::assign::list_of (Value::INT), + docstring) + {} + + Value ReadQueue::doExecute () + { + RosQueuedSubscribe& entity = + static_cast<RosQueuedSubscribe&> (owner ()); + + std::vector<Value> values = getParameterValues (); + int read = values[0].value (); + entity.readQueue (read); + + return Value (); + } + } // end of errorEstimator. + } // end of namespace command. + + const std::string RosQueuedSubscribe::docstring_ + ("Subscribe to a ROS topics and convert it into a dynamic-graph signals.\n" + "\n" + " Use command \"add\" to subscribe to a new signal.\n"); + + RosQueuedSubscribe::RosQueuedSubscribe (const std::string& n) + : dynamicgraph::Entity(n), + nh_ (rosInit (true)), + bindedSignal_ (), + readQueue_ (-1) + { + std::string docstring = + "\n" + " Add a signal reading data from a ROS topic\n" + "\n" + " Input:\n" + " - type: string among ['double', 'matrix', 'vector', 'vector3',\n" + " 'matrixHomo', 'twist'],\n" + " - signal: the signal name in dynamic-graph,\n" + " - topic: the topic name in ROS.\n" + "\n"; + addCommand ("add", + new command::rosQueuedSubscribe::Add + (*this, docstring)); + docstring = + "\n" + " Remove a signal reading data from a ROS topic\n" + "\n" + " Input:\n" + " - name of the signal to remove (see method list for the list of signals).\n" + "\n"; + addCommand ("rm", + new command::rosQueuedSubscribe::Rm + (*this, docstring)); + docstring = + "\n" + " Remove all signals reading data from a ROS topic\n" + "\n" + " No input:\n" + "\n"; + addCommand ("clear", + new command::rosQueuedSubscribe::Clear + (*this, docstring)); + docstring = + "\n" + " List signals reading data from a ROS topic\n" + "\n" + " No input:\n" + "\n"; + addCommand ("list", + new command::rosQueuedSubscribe::List + (*this, docstring)); + docstring = + "\n" + " Empty the queue of a given signal\n" + "\n" + " Input is:\n" + " - name of the signal (see method list for the list of signals).\n" + "\n"; + addCommand ("clearQueue", + new command::rosQueuedSubscribe::ClearQueue + (*this, docstring)); + docstring = + "\n" + " Return the queue size of a given signal\n" + "\n" + " Input is:\n" + " - name of the signal (see method list for the list of signals).\n" + "\n"; + addCommand ("queueSize", + new command::rosQueuedSubscribe::QueueSize + (*this, docstring)); + docstring = + "\n" + " Whether signals should read values from the queues, and when.\n" + "\n" + " Input is:\n" + " - int (dynamic graph time at which the reading begin).\n" + "\n"; + addCommand ("readQueue", + new command::rosQueuedSubscribe::ReadQueue + (*this, docstring)); + } + + RosQueuedSubscribe::~RosQueuedSubscribe () + { + std::cout << getName() << ": Delete" << std::endl; + } + + void RosQueuedSubscribe::display (std::ostream& os) const + { + os << CLASS_NAME << std::endl; + } + + void RosQueuedSubscribe::rm (const std::string& signal) + { + std::string signalTs = signal+"Timestamp"; + + signalDeregistration(signal); + bindedSignal_.erase (signal); + + if(bindedSignal_.find(signalTs) != bindedSignal_.end()) + { + signalDeregistration(signalTs); + bindedSignal_.erase(signalTs); + } + } + + std::string RosQueuedSubscribe::list () + { + std::string result("["); + for (std::map<std::string, bindedSignal_t>::const_iterator it = + bindedSignal_.begin (); it != bindedSignal_.end (); it++) { + result += "'" + it->first + "',"; + } + result += "]"; + return result; + } + + void RosQueuedSubscribe::clear () + { + std::map<std::string, bindedSignal_t>::iterator it = bindedSignal_.begin(); + for(; it!= bindedSignal_.end(); ) + { + rm(it->first); + it = bindedSignal_.begin(); + } + } + + void RosQueuedSubscribe::clearQueue (const std::string& signal) + { + if(bindedSignal_.find(signal) != bindedSignal_.end()) + { + bindedSignal_[signal]->clear(); + } + } + + std::size_t RosQueuedSubscribe::queueSize (const std::string& signal) const + { + std::map<std::string, bindedSignal_t>::const_iterator _bs = + bindedSignal_.find(signal); + if(_bs != bindedSignal_.end()) + { + return _bs->second->size(); + } + return -1; + } + + void RosQueuedSubscribe::readQueue (int beginReadingAt) + { + // Prints signal queues sizes + /*for (std::map<std::string, bindedSignal_t>::const_iterator it = + bindedSignal_.begin (); it != bindedSignal_.end (); it++) { + std::cout << it->first << " : " << it->second->size() << '\n'; + }*/ + readQueue_ = beginReadingAt; + } + + std::string RosQueuedSubscribe::getDocString () const + { + return docstring_; + } +} // end of namespace dynamicgraph. diff --git a/src/ros_queued_subscribe.hh b/src/ros_queued_subscribe.hh new file mode 100644 index 0000000000000000000000000000000000000000..2e938cfcdfdd39f76b690ec437a94f077e948702 --- /dev/null +++ b/src/ros_queued_subscribe.hh @@ -0,0 +1,226 @@ +// +// Copyright (c) 2017-2018 CNRS +// Authors: Joseph Mirabel +// +// +// This file is part of dynamic_graph_bridge +// dynamic_graph_bridge is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation, either version +// 3 of the License, or (at your option) any later version. +// +// dynamic_graph_bridge is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Lesser Public License for more details. You should have +// received a copy of the GNU Lesser General Public License along with +// dynamic_graph_bridge If not, see +// <http://www.gnu.org/licenses/>. + +#ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HH +# define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HH +# include <iostream> +# include <map> + +# include <boost/shared_ptr.hpp> +# include <boost/thread/mutex.hpp> + +# include <dynamic-graph/entity.h> +# include <dynamic-graph/signal-time-dependent.h> +# include <dynamic-graph/signal-ptr.h> +# include <dynamic-graph/command.h> +# include <sot/core/matrix-geometry.hh> + +# include <ros/ros.h> + +# include "converter.hh" +# include "sot_to_ros.hh" + +namespace dynamicgraph +{ + class RosQueuedSubscribe; + + namespace command + { + namespace rosQueuedSubscribe + { + using ::dynamicgraph::command::Command; + using ::dynamicgraph::command::Value; + +# define ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(CMD) \ + class CMD : public Command \ + { \ + public: \ + CMD (RosQueuedSubscribe& entity, \ + const std::string& docstring); \ + virtual Value doExecute (); \ + } + + ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(Add); + ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(Clear); + ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(List); + ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(Rm); + ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(ClearQueue); + ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(QueueSize); + ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(ReadQueue); + +#undef ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND + + } // end of namespace rosQueuedSubscribe. + } // end of namespace command. + + class RosQueuedSubscribe; + + namespace internal + { + template <typename T> + struct Add; + + struct BindedSignalBase { + typedef boost::shared_ptr<ros::Subscriber> Subscriber_t; + + BindedSignalBase(RosQueuedSubscribe* e) : entity(e) {} + virtual ~BindedSignalBase() {} + + virtual void clear () = 0; + virtual std::size_t size () const = 0; + + Subscriber_t subscriber; + RosQueuedSubscribe* entity; + }; + + template <typename T, int BufferSize> + struct BindedSignal : BindedSignalBase { + typedef dynamicgraph::Signal<T, int> Signal_t; + typedef boost::shared_ptr<Signal_t> SignalPtr_t; + typedef std::vector<T> buffer_t; + typedef typename buffer_t::size_type size_type; + + BindedSignal(RosQueuedSubscribe* e) + : BindedSignalBase (e) + , frontIdx(0) + , backIdx(0) + , buffer (BufferSize) + , init(false) + {} + ~BindedSignal() + { + std::cout << signal->getName() << ": Delete" << std::endl; + signal.reset(); + clear(); + } + + /// See comments in reader and writer for details about synchronisation. + void clear () + { + // synchronize with method writer + wmutex.lock(); + if (!empty()) { + if (backIdx == 0) + last = buffer[BufferSize-1]; + else + last = buffer[backIdx-1]; + } + // synchronize with method reader + rmutex.lock(); + frontIdx = backIdx = 0; + rmutex.unlock(); + wmutex.unlock(); + } + + bool empty () const + { + return frontIdx == backIdx; + } + + bool full () const + { + return ((backIdx + 1) % BufferSize) == frontIdx; + } + + size_type size () const + { + if (frontIdx <= backIdx) + return backIdx - frontIdx; + else + return backIdx + BufferSize - frontIdx; + } + + SignalPtr_t signal; + /// Index of the next value to be read. + size_type frontIdx; + /// Index of the slot where to write next value (does not contain valid data). + size_type backIdx; + buffer_t buffer; + boost::mutex wmutex, rmutex; + T last; + bool init; + + template <typename R> void writer (const R& data); + T& reader (T& val, int time); + }; + } // end of internal namespace. + + + /// \brief Publish ROS information in the dynamic-graph. + class RosQueuedSubscribe : public dynamicgraph::Entity + { + DYNAMIC_GRAPH_ENTITY_DECL(); + typedef boost::posix_time::ptime ptime; + public: + typedef boost::shared_ptr<internal::BindedSignalBase> bindedSignal_t; + + RosQueuedSubscribe (const std::string& n); + virtual ~RosQueuedSubscribe (); + + virtual std::string getDocString () const; + void display (std::ostream& os) const; + + void rm (const std::string& signal); + std::string list (); + void clear (); + void clearQueue (const std::string& signal); + void readQueue (int beginReadingAt); + std::size_t queueSize (const std::string& signal) const; + + template <typename T> + void add (const std::string& type, const std::string& signal, const std::string& topic); + + std::map<std::string, bindedSignal_t>& + bindedSignal () + { + return bindedSignal_; + } + + ros::NodeHandle& nh () + { + return nh_; + } + + template <typename R, typename S> + void callback + (boost::shared_ptr<dynamicgraph::SignalPtr<S, int> > signal, + const R& data); + + template <typename R> + void callbackTimestamp + (boost::shared_ptr<dynamicgraph::SignalPtr<ptime, int> > signal, + const R& data); + + template <typename T> + friend class internal::Add; + private: + static const std::string docstring_; + ros::NodeHandle& nh_; + std::map<std::string, bindedSignal_t> bindedSignal_; + + int readQueue_; + // Signal<bool, int> readQueue_; + + template <typename T> + friend class internal::BindedSignal; + }; +} // end of namespace dynamicgraph. + +# include "ros_queued_subscribe.hxx" +#endif //! DYNAMIC_GRAPH_QUEUED_ROS_SUBSCRIBE_HH diff --git a/src/ros_queued_subscribe.hxx b/src/ros_queued_subscribe.hxx new file mode 100644 index 0000000000000000000000000000000000000000..44de8b702aed3e9c04034b67db02bbe2d71ecd18 --- /dev/null +++ b/src/ros_queued_subscribe.hxx @@ -0,0 +1,134 @@ +// +// Copyright (c) 2017-2018 CNRS +// Authors: Joseph Mirabel +// +// +// This file is part of dynamic_graph_bridge +// dynamic_graph_bridge is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation, either version +// 3 of the License, or (at your option) any later version. +// +// dynamic_graph_bridge is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// General Lesser Public License for more details. You should have +// received a copy of the GNU Lesser General Public License along with +// dynamic_graph_bridge If not, see +// <http://www.gnu.org/licenses/>. + +#ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX +# define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX +# include <vector> +# include <boost/bind.hpp> +# include <boost/date_time/posix_time/posix_time.hpp> +# include <dynamic-graph/signal-caster.h> +# include <dynamic-graph/linear-algebra.h> +# include <dynamic-graph/signal-cast-helper.h> +# include <std_msgs/Float64.h> +# include "dynamic_graph_bridge_msgs/Matrix.h" +# include "dynamic_graph_bridge_msgs/Vector.h" + +namespace dg = dynamicgraph; +typedef boost::mutex::scoped_lock scoped_lock; + +namespace dynamicgraph +{ + namespace internal + { + static const int BUFFER_SIZE = 1 << 10; + + template <typename T> + struct Add + { + void operator () (RosQueuedSubscribe& rosSubscribe, + const std::string& type, + const std::string& signal, + const std::string& topic) + { + typedef typename SotToRos<T>::sot_t sot_t; + typedef typename SotToRos<T>::ros_const_ptr_t ros_const_ptr_t; + typedef BindedSignal<sot_t, BUFFER_SIZE> BindedSignal_t; + typedef typename BindedSignal_t::Signal_t Signal_t; + + // Initialize the bindedSignal object. + BindedSignal_t* bs = new BindedSignal_t(&rosSubscribe); + SotToRos<T>::setDefault (bs->last); + + // Initialize the signal. + boost::format signalName ("RosQueuedSubscribe(%1%)::output(%2%)::%3%"); + signalName % rosSubscribe.getName () % type % signal; + + bs->signal.reset (new Signal_t (signalName.str ())); + bs->signal->setFunction (boost::bind(&BindedSignal_t::reader, bs, _1, _2)); + rosSubscribe.signalRegistration (*bs->signal); + + // Initialize the subscriber. + typedef boost::function<void (const ros_const_ptr_t& data)> callback_t; + callback_t callback = boost::bind + (&BindedSignal_t::template writer<ros_const_ptr_t>, bs, _1); + + // Keep 50 messages in queue, but only 20 are sent every 100ms + // -> No message should be lost because of a full buffer + bs->subscriber = + boost::make_shared<ros::Subscriber> + (rosSubscribe.nh ().subscribe (topic, BUFFER_SIZE, callback)); + + RosQueuedSubscribe::bindedSignal_t bindedSignal (bs); + rosSubscribe.bindedSignal ()[signal] = bindedSignal; + } + }; + + // template <typename T, typename R> + template <typename T, int N> + template <typename R> + void BindedSignal<T, N>::writer (const R& data) + { + // synchronize with method clear + boost::mutex::scoped_lock lock(wmutex); + if (full()) { + rmutex.lock(); + frontIdx = (frontIdx + 1) % N; + rmutex.unlock(); + } + converter (buffer[backIdx], data); + // No need to synchronize with reader here because: + // - if the buffer was not empty, then it stays not empty, + // - if it was empty, then the current value will be used at next time. It + // means the transmission bandwidth is too low. + if (!init) { + last = buffer[backIdx]; + init = true; + } + backIdx = (backIdx+1) % N; + } + + template <typename T, int N> + T& BindedSignal<T, N>::reader (T& data, int time) + { + // synchronize with method clear: + // If reading from the list cannot be done, then return last value. + scoped_lock lock(rmutex, boost::try_to_lock); + if (!lock.owns_lock() || entity->readQueue_ == -1 || time < entity->readQueue_) { + data = last; + } else { + if (empty()) + data = last; + else { + data = buffer[frontIdx]; + frontIdx = (frontIdx + 1) % N; + last = data; + } + } + return data; + } + } // end of namespace internal. + + template <typename T> + void RosQueuedSubscribe::add (const std::string& type, const std::string& signal, const std::string& topic) + { + internal::Add<T> () (*this, type, signal, topic); + } +} // end of namespace dynamicgraph. + +#endif //! DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX diff --git a/src/sot_to_ros.hh b/src/sot_to_ros.hh index a09f91c022bbc486dc95f844fb3146ce4f5324c7..7163abd3a56785cf7b9ae1b9b60f6466bd76ae1c 100644 --- a/src/sot_to_ros.hh +++ b/src/sot_to_ros.hh @@ -65,6 +65,11 @@ namespace dynamicgraph { s.setConstant (0.); } + + static void setDefault(sot_t& s) + { + s = 0.; + } }; template <> @@ -84,6 +89,11 @@ namespace dynamicgraph { s.setConstant (0); } + + static void setDefault(sot_t& s) + { + s = 0; + } }; template <> @@ -105,6 +115,11 @@ namespace dynamicgraph m.resize(0, 0); s.setConstant (m); } + + static void setDefault(sot_t& s) + { + s.resize(0,0); + } }; template <> @@ -126,6 +141,11 @@ namespace dynamicgraph v.resize (0); s.setConstant (v); } + + static void setDefault(sot_t& s) + { + s.resize(0,0); + } }; template <> @@ -143,10 +163,14 @@ namespace dynamicgraph template <typename S> static void setDefault(S& s) { - Vector v; - v.resize (0); + Vector v (Vector::Zero(3)); s.setConstant (v); } + + static void setDefault(sot_t& s) + { + s = Vector::Zero(3); + } }; template <> @@ -167,6 +191,11 @@ namespace dynamicgraph sot::MatrixHomogeneous m; s.setConstant (m); } + + static void setDefault(sot_t& s) + { + s.setIdentity(); + } }; template <> @@ -188,6 +217,11 @@ namespace dynamicgraph v.setZero (); s.setConstant (v); } + + static void setDefault(sot_t& s) + { + s = Vector::Zero(6); + } }; // Stamped vector3