From 84ae41438195c04e1099108ebb67c99fbe69b87a Mon Sep 17 00:00:00 2001
From: Joseph Mirabel <jmirabel@laas.fr>
Date: Thu, 14 Jun 2018 14:27:26 +0200
Subject: [PATCH] Add entity RosQueuedSubscribe

---
 CMakeLists.txt               |   1 +
 src/ros_queued_subscribe.cpp | 354 +++++++++++++++++++++++++++++++++++
 src/ros_queued_subscribe.hh  | 226 ++++++++++++++++++++++
 src/ros_queued_subscribe.hxx | 134 +++++++++++++
 src/sot_to_ros.hh            |  38 +++-
 5 files changed, 751 insertions(+), 2 deletions(-)
 create mode 100644 src/ros_queued_subscribe.cpp
 create mode 100644 src/ros_queued_subscribe.hh
 create mode 100644 src/ros_queued_subscribe.hxx

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2b52daf..c58e04e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -132,6 +132,7 @@ endmacro()
 # Build Sot Entities
 compile_plugin(ros_publish)
 compile_plugin(ros_subscribe)
+compile_plugin(ros_queued_subscribe)
 compile_plugin(ros_time)
 compile_plugin(ros_joint_state)
 pkg_config_use_dependency(ros_joint_state pinocchio)
diff --git a/src/ros_queued_subscribe.cpp b/src/ros_queued_subscribe.cpp
new file mode 100644
index 0000000..3c93628
--- /dev/null
+++ b/src/ros_queued_subscribe.cpp
@@ -0,0 +1,354 @@
+//
+// Copyright (c) 2017-2018 CNRS
+// Authors: Joseph Mirabel
+//
+//
+// This file is part of dynamic_graph_bridge
+// dynamic_graph_bridge is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation, either version
+// 3 of the License, or (at your option) any later version.
+//
+// dynamic_graph_bridge is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Lesser Public License for more details.  You should have
+// received a copy of the GNU Lesser General Public License along with
+// dynamic_graph_bridge  If not, see
+// <http://www.gnu.org/licenses/>.
+
+#include <boost/assign.hpp>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+#include <boost/function.hpp>
+#include <boost/make_shared.hpp>
+
+#include <ros/ros.h>
+#include <std_msgs/Float64.h>
+#include <std_msgs/UInt32.h>
+
+#include <dynamic-graph/factory.h>
+
+#include "dynamic_graph_bridge/ros_init.hh"
+#include "ros_queued_subscribe.hh"
+
+namespace dynamicgraph
+{
+  DYNAMICGRAPH_FACTORY_ENTITY_PLUGIN(RosQueuedSubscribe, "RosQueuedSubscribe");
+
+  namespace command
+  {
+    namespace rosQueuedSubscribe
+    {
+      Clear::Clear
+      (RosQueuedSubscribe& entity, const std::string& docstring)
+	: Command
+	  (entity,
+	   std::vector<Value::Type> (),
+	   docstring)
+      {}
+
+      Value Clear::doExecute ()
+      {
+	RosQueuedSubscribe& entity =
+	  static_cast<RosQueuedSubscribe&> (owner ());
+
+	entity.clear ();
+	return Value ();
+      }
+
+      ClearQueue::ClearQueue
+      (RosQueuedSubscribe& entity, const std::string& docstring)
+	: Command
+	  (entity,
+	   boost::assign::list_of (Value::STRING),
+	   docstring)
+      {}
+
+      Value ClearQueue::doExecute ()
+      {
+	RosQueuedSubscribe& entity =
+	  static_cast<RosQueuedSubscribe&> (owner ());
+
+	std::vector<Value> values = getParameterValues ();
+	const std::string& signal = values[0].value ();
+        entity.clearQueue (signal);
+
+	return Value ();
+      }
+
+      List::List
+      (RosQueuedSubscribe& entity, const std::string& docstring)
+	: Command
+	  (entity,
+	   std::vector<Value::Type> (),
+	   docstring)
+      {}
+
+      Value List::doExecute ()
+      {
+	RosQueuedSubscribe& entity =
+	  static_cast<RosQueuedSubscribe&> (owner ());
+	return Value (entity.list ());
+      }
+
+      Add::Add
+      (RosQueuedSubscribe& entity, const std::string& docstring)
+	: Command
+	  (entity,
+	   boost::assign::list_of
+	   (Value::STRING) (Value::STRING) (Value::STRING),
+	   docstring)
+      {}
+
+      Value Add::doExecute ()
+      {
+	RosQueuedSubscribe& entity =
+	  static_cast<RosQueuedSubscribe&> (owner ());
+	std::vector<Value> values = getParameterValues ();
+
+	const std::string& type = values[0].value ();
+	const std::string& signal = values[1].value ();
+	const std::string& topic = values[2].value ();
+
+	if (type == "double")
+	  entity.add<double> (type, signal, topic);
+	else if (type == "unsigned")
+	  entity.add<unsigned int> (type, signal, topic);
+	else if (type == "matrix")
+	  entity.add<dg::Matrix> (type, signal, topic);
+	else if (type == "vector")
+	  entity.add<dg::Vector> (type, signal, topic);
+	else if (type == "vector3")
+	  entity.add<specific::Vector3> (type, signal, topic);
+	else if (type == "matrixHomo")
+	  entity.add<sot::MatrixHomogeneous> (type, signal, topic);
+	else if (type == "twist")
+	  entity.add<specific::Twist> (type, signal, topic);
+	else
+	  throw std::runtime_error("bad type");
+	return Value ();
+      }
+
+      Rm::Rm
+      (RosQueuedSubscribe& entity, const std::string& docstring)
+	: Command
+	  (entity,
+	   boost::assign::list_of (Value::STRING),
+	   docstring)
+      {}
+
+      Value Rm::doExecute ()
+      {
+	RosQueuedSubscribe& entity =
+	  static_cast<RosQueuedSubscribe&> (owner ());
+	std::vector<Value> values = getParameterValues ();
+	const std::string& signal = values[0].value ();
+	entity.rm (signal);
+	return Value ();
+      }
+
+      QueueSize::QueueSize
+      (RosQueuedSubscribe& entity, const std::string& docstring)
+	: Command
+	  (entity,
+	   boost::assign::list_of (Value::STRING),
+	   docstring)
+      {}
+
+      Value QueueSize::doExecute ()
+      {
+	RosQueuedSubscribe& entity =
+	  static_cast<RosQueuedSubscribe&> (owner ());
+
+	std::vector<Value> values = getParameterValues ();
+	const std::string& signal = values[0].value ();
+
+	return Value ((unsigned)entity.queueSize (signal));
+      }
+
+      ReadQueue::ReadQueue
+      (RosQueuedSubscribe& entity, const std::string& docstring)
+	: Command
+	  (entity,
+	   boost::assign::list_of (Value::INT),
+	   docstring)
+      {}
+
+      Value ReadQueue::doExecute ()
+      {
+	RosQueuedSubscribe& entity =
+	  static_cast<RosQueuedSubscribe&> (owner ());
+
+	std::vector<Value> values = getParameterValues ();
+        int read = values[0].value ();
+        entity.readQueue (read);
+
+        return Value ();
+      }
+    } // end of errorEstimator.
+  } // end of namespace command.
+
+  const std::string RosQueuedSubscribe::docstring_
+  ("Subscribe to a ROS topics and convert it into a dynamic-graph signals.\n"
+   "\n"
+   "  Use command \"add\" to subscribe to a new signal.\n");
+
+  RosQueuedSubscribe::RosQueuedSubscribe (const std::string& n)
+    : dynamicgraph::Entity(n),
+      nh_ (rosInit (true)),
+      bindedSignal_ (),
+      readQueue_ (-1)
+  {
+    std::string docstring =
+      "\n"
+      "  Add a signal reading data from a ROS topic\n"
+      "\n"
+      "  Input:\n"
+      "    - type: string among ['double', 'matrix', 'vector', 'vector3',\n"
+      "                          'matrixHomo', 'twist'],\n"
+      "    - signal: the signal name in dynamic-graph,\n"
+      "    - topic:  the topic name in ROS.\n"
+      "\n";
+    addCommand ("add",
+		new command::rosQueuedSubscribe::Add
+		(*this, docstring));
+    docstring =
+      "\n"
+      "  Remove a signal reading data from a ROS topic\n"
+      "\n"
+      "  Input:\n"
+      "    - name of the signal to remove (see method list for the list of signals).\n"
+      "\n";
+    addCommand ("rm",
+		new command::rosQueuedSubscribe::Rm
+		(*this, docstring));
+    docstring =
+      "\n"
+      "  Remove all signals reading data from a ROS topic\n"
+      "\n"
+      "  No input:\n"
+      "\n";
+    addCommand ("clear",
+		new command::rosQueuedSubscribe::Clear
+		(*this, docstring));
+    docstring =
+      "\n"
+      "  List signals reading data from a ROS topic\n"
+      "\n"
+      "  No input:\n"
+      "\n";
+    addCommand ("list",
+		new command::rosQueuedSubscribe::List
+		(*this, docstring));
+    docstring =
+      "\n"
+      "  Empty the queue of a given signal\n"
+      "\n"
+      "  Input is:\n"
+      "    - name of the signal (see method list for the list of signals).\n"
+      "\n";
+    addCommand ("clearQueue",
+		new command::rosQueuedSubscribe::ClearQueue
+		(*this, docstring));
+    docstring =
+      "\n"
+      "  Return the queue size of a given signal\n"
+      "\n"
+      "  Input is:\n"
+      "    - name of the signal (see method list for the list of signals).\n"
+      "\n";
+    addCommand ("queueSize",
+		new command::rosQueuedSubscribe::QueueSize
+		(*this, docstring));
+    docstring =
+      "\n"
+      "  Whether signals should read values from the queues, and when.\n"
+      "\n"
+      "  Input is:\n"
+      "    - int (dynamic graph time at which the reading begin).\n"
+      "\n";
+    addCommand ("readQueue",
+		new command::rosQueuedSubscribe::ReadQueue
+		(*this, docstring));
+  }
+
+  RosQueuedSubscribe::~RosQueuedSubscribe ()
+  {
+    std::cout << getName() << ": Delete" << std::endl;
+  }
+
+  void RosQueuedSubscribe::display (std::ostream& os) const
+  {
+    os << CLASS_NAME << std::endl;
+  }
+
+  void RosQueuedSubscribe::rm (const std::string& signal)
+  {
+    std::string signalTs = signal+"Timestamp";
+
+    signalDeregistration(signal);
+    bindedSignal_.erase (signal);
+
+    if(bindedSignal_.find(signalTs) != bindedSignal_.end())
+    {
+       signalDeregistration(signalTs);
+       bindedSignal_.erase(signalTs);
+    }
+  }
+
+  std::string RosQueuedSubscribe::list ()
+  {
+    std::string result("[");
+    for (std::map<std::string, bindedSignal_t>::const_iterator it =
+	   bindedSignal_.begin (); it != bindedSignal_.end (); it++) {
+      result += "'" + it->first + "',";
+    }
+    result += "]";
+    return result;
+  }
+
+  void RosQueuedSubscribe::clear ()
+  {
+    std::map<std::string, bindedSignal_t>::iterator it = bindedSignal_.begin();
+    for(; it!= bindedSignal_.end(); )
+    {
+      rm(it->first);
+      it = bindedSignal_.begin();
+    }
+  }
+
+  void RosQueuedSubscribe::clearQueue (const std::string& signal)
+  {
+    if(bindedSignal_.find(signal) != bindedSignal_.end())
+    {
+       bindedSignal_[signal]->clear();
+    }
+  }
+
+  std::size_t RosQueuedSubscribe::queueSize (const std::string& signal) const
+  {
+    std::map<std::string, bindedSignal_t>::const_iterator _bs =
+      bindedSignal_.find(signal);
+    if(_bs != bindedSignal_.end())
+    {
+       return _bs->second->size();
+    }
+    return -1;
+  }
+
+  void RosQueuedSubscribe::readQueue (int beginReadingAt)
+  {
+    // Prints signal queues sizes
+    /*for (std::map<std::string, bindedSignal_t>::const_iterator it =
+	   bindedSignal_.begin (); it != bindedSignal_.end (); it++) {
+      std::cout << it->first << " : " << it->second->size() << '\n';
+    }*/
+    readQueue_ = beginReadingAt;
+  }
+
+  std::string RosQueuedSubscribe::getDocString () const
+  {
+    return docstring_;
+  }
+} // end of namespace dynamicgraph.
diff --git a/src/ros_queued_subscribe.hh b/src/ros_queued_subscribe.hh
new file mode 100644
index 0000000..2e938cf
--- /dev/null
+++ b/src/ros_queued_subscribe.hh
@@ -0,0 +1,226 @@
+//
+// Copyright (c) 2017-2018 CNRS
+// Authors: Joseph Mirabel
+//
+//
+// This file is part of dynamic_graph_bridge
+// dynamic_graph_bridge is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation, either version
+// 3 of the License, or (at your option) any later version.
+//
+// dynamic_graph_bridge is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Lesser Public License for more details.  You should have
+// received a copy of the GNU Lesser General Public License along with
+// dynamic_graph_bridge  If not, see
+// <http://www.gnu.org/licenses/>.
+
+#ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HH
+# define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HH
+# include <iostream>
+# include <map>
+
+# include <boost/shared_ptr.hpp>
+# include <boost/thread/mutex.hpp>
+
+# include <dynamic-graph/entity.h>
+# include <dynamic-graph/signal-time-dependent.h>
+# include <dynamic-graph/signal-ptr.h>
+# include <dynamic-graph/command.h>
+# include <sot/core/matrix-geometry.hh>
+
+# include <ros/ros.h>
+
+# include "converter.hh"
+# include "sot_to_ros.hh"
+
+namespace dynamicgraph
+{
+  class RosQueuedSubscribe;
+
+  namespace command
+  {
+    namespace rosQueuedSubscribe
+    {
+      using ::dynamicgraph::command::Command;
+      using ::dynamicgraph::command::Value;
+
+# define ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(CMD)			\
+      class CMD : public Command			\
+      {							\
+      public:						\
+	CMD (RosQueuedSubscribe& entity,				\
+	     const std::string& docstring);		\
+	virtual Value doExecute ();			\
+      }
+
+      ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(Add);
+      ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(Clear);
+      ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(List);
+      ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(Rm);
+      ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(ClearQueue);
+      ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(QueueSize);
+      ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(ReadQueue);
+
+#undef ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND
+
+    } // end of namespace rosQueuedSubscribe.
+  } // end of namespace command.
+
+  class RosQueuedSubscribe;
+
+  namespace internal
+  {
+    template <typename T>
+    struct Add;
+
+    struct BindedSignalBase {
+      typedef boost::shared_ptr<ros::Subscriber> Subscriber_t;
+
+      BindedSignalBase(RosQueuedSubscribe* e) : entity(e) {}
+      virtual ~BindedSignalBase() {}
+
+      virtual void clear () = 0;
+      virtual std::size_t size () const = 0;
+
+      Subscriber_t subscriber;
+      RosQueuedSubscribe* entity;
+    };
+
+    template <typename T, int BufferSize>
+    struct BindedSignal : BindedSignalBase {
+      typedef dynamicgraph::Signal<T, int> Signal_t;
+      typedef boost::shared_ptr<Signal_t> SignalPtr_t;
+      typedef std::vector<T> buffer_t;
+      typedef typename buffer_t::size_type size_type;
+
+      BindedSignal(RosQueuedSubscribe* e)
+        : BindedSignalBase (e)
+        , frontIdx(0)
+        , backIdx(0)
+        , buffer (BufferSize)
+        , init(false)
+      {}
+      ~BindedSignal()
+      {
+        std::cout << signal->getName() << ": Delete" << std::endl;
+        signal.reset();
+        clear();
+      }
+
+      /// See comments in reader and writer for details about synchronisation.
+      void clear ()
+      {
+        // synchronize with method writer
+        wmutex.lock();
+        if (!empty()) {
+          if (backIdx == 0)
+            last = buffer[BufferSize-1];
+          else
+            last = buffer[backIdx-1];
+        }
+        // synchronize with method reader
+        rmutex.lock();
+        frontIdx = backIdx = 0;
+        rmutex.unlock();
+        wmutex.unlock();
+      }
+
+      bool empty () const
+      {
+        return frontIdx == backIdx;
+      }
+
+      bool full () const
+      {
+        return ((backIdx + 1) % BufferSize) == frontIdx;
+      }
+
+      size_type size () const
+      {
+        if (frontIdx <= backIdx)
+          return backIdx - frontIdx;
+        else
+          return backIdx + BufferSize - frontIdx;
+      }
+
+      SignalPtr_t signal;
+      /// Index of the next value to be read.
+      size_type frontIdx;
+      /// Index of the slot where to write next value (does not contain valid data).
+      size_type backIdx;
+      buffer_t buffer;
+      boost::mutex wmutex, rmutex;
+      T last;
+      bool init;
+
+      template <typename R> void writer (const R& data);
+      T& reader (T& val, int time);
+    };
+  } // end of internal namespace.
+
+
+  /// \brief Publish ROS information in the dynamic-graph.
+  class RosQueuedSubscribe : public dynamicgraph::Entity
+  {
+    DYNAMIC_GRAPH_ENTITY_DECL();
+    typedef boost::posix_time::ptime ptime;
+  public:
+    typedef boost::shared_ptr<internal::BindedSignalBase> bindedSignal_t;
+
+    RosQueuedSubscribe (const std::string& n);
+    virtual ~RosQueuedSubscribe ();
+
+    virtual std::string getDocString () const;
+    void display (std::ostream& os) const;
+
+    void rm (const std::string& signal);
+    std::string list ();
+    void clear ();
+    void clearQueue (const std::string& signal);
+    void readQueue (int beginReadingAt);
+    std::size_t queueSize (const std::string& signal) const;
+
+    template <typename T>
+    void add (const std::string& type, const std::string& signal, const std::string& topic);
+
+    std::map<std::string, bindedSignal_t>&
+    bindedSignal ()
+    {
+      return bindedSignal_;
+    }
+
+    ros::NodeHandle& nh ()
+    {
+      return nh_;
+    }
+
+    template <typename R, typename S>
+    void callback
+    (boost::shared_ptr<dynamicgraph::SignalPtr<S, int> > signal,
+     const R& data);
+
+    template <typename R>
+    void callbackTimestamp
+    (boost::shared_ptr<dynamicgraph::SignalPtr<ptime, int> > signal,
+     const R& data);
+
+    template <typename T>
+    friend class internal::Add;
+  private:
+    static const std::string docstring_;
+    ros::NodeHandle& nh_;
+    std::map<std::string, bindedSignal_t> bindedSignal_;
+
+    int readQueue_;
+    // Signal<bool, int> readQueue_;
+
+    template <typename T>
+    friend class internal::BindedSignal;
+  };
+} // end of namespace dynamicgraph.
+
+# include "ros_queued_subscribe.hxx"
+#endif //! DYNAMIC_GRAPH_QUEUED_ROS_SUBSCRIBE_HH
diff --git a/src/ros_queued_subscribe.hxx b/src/ros_queued_subscribe.hxx
new file mode 100644
index 0000000..44de8b7
--- /dev/null
+++ b/src/ros_queued_subscribe.hxx
@@ -0,0 +1,134 @@
+//
+// Copyright (c) 2017-2018 CNRS
+// Authors: Joseph Mirabel
+//
+//
+// This file is part of dynamic_graph_bridge
+// dynamic_graph_bridge is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation, either version
+// 3 of the License, or (at your option) any later version.
+//
+// dynamic_graph_bridge is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// General Lesser Public License for more details.  You should have
+// received a copy of the GNU Lesser General Public License along with
+// dynamic_graph_bridge  If not, see
+// <http://www.gnu.org/licenses/>.
+
+#ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
+# define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
+# include <vector>
+# include <boost/bind.hpp>
+# include <boost/date_time/posix_time/posix_time.hpp>
+# include <dynamic-graph/signal-caster.h>
+# include <dynamic-graph/linear-algebra.h>
+# include <dynamic-graph/signal-cast-helper.h>
+# include <std_msgs/Float64.h>
+# include "dynamic_graph_bridge_msgs/Matrix.h"
+# include "dynamic_graph_bridge_msgs/Vector.h"
+
+namespace dg = dynamicgraph;
+typedef boost::mutex::scoped_lock scoped_lock;
+
+namespace dynamicgraph
+{
+  namespace internal
+  {
+    static const int BUFFER_SIZE = 1 << 10;
+
+    template <typename T>
+    struct Add
+    {
+      void operator () (RosQueuedSubscribe& rosSubscribe,
+			const std::string& type,
+			const std::string& signal,
+			const std::string& topic)
+      {
+        typedef typename SotToRos<T>::sot_t sot_t;
+	typedef typename SotToRos<T>::ros_const_ptr_t ros_const_ptr_t;
+        typedef BindedSignal<sot_t, BUFFER_SIZE> BindedSignal_t;
+	typedef typename BindedSignal_t::Signal_t Signal_t;
+
+	// Initialize the bindedSignal object.
+        BindedSignal_t* bs = new BindedSignal_t(&rosSubscribe);
+        SotToRos<T>::setDefault (bs->last);
+
+	// Initialize the signal.
+	boost::format signalName ("RosQueuedSubscribe(%1%)::output(%2%)::%3%");
+	signalName % rosSubscribe.getName () % type % signal;
+
+	bs->signal.reset (new Signal_t (signalName.str ()));
+        bs->signal->setFunction (boost::bind(&BindedSignal_t::reader, bs, _1, _2));
+	rosSubscribe.signalRegistration (*bs->signal);
+
+	// Initialize the subscriber.
+	typedef boost::function<void (const ros_const_ptr_t& data)> callback_t;
+	callback_t callback = boost::bind
+	  (&BindedSignal_t::template writer<ros_const_ptr_t>, bs, _1);
+
+  // Keep 50 messages in queue, but only 20 are sent every 100ms
+  // -> No message should be lost because of a full buffer
+	bs->subscriber =
+	  boost::make_shared<ros::Subscriber>
+	  (rosSubscribe.nh ().subscribe (topic, BUFFER_SIZE, callback)); 
+
+	RosQueuedSubscribe::bindedSignal_t bindedSignal (bs);
+	rosSubscribe.bindedSignal ()[signal] = bindedSignal;
+      }
+    };
+
+    // template <typename T, typename R>
+    template <typename T, int N>
+    template <typename R>
+    void BindedSignal<T, N>::writer (const R& data)
+    {
+      // synchronize with method clear
+      boost::mutex::scoped_lock lock(wmutex);
+      if (full()) {
+        rmutex.lock();
+        frontIdx = (frontIdx + 1) % N;
+        rmutex.unlock();
+      }
+      converter (buffer[backIdx], data);
+      // No need to synchronize with reader here because:
+      // - if the buffer was not empty, then it stays not empty,
+      // - if it was empty, then the current value will be used at next time. It
+      //   means the transmission bandwidth is too low.
+      if (!init) {
+        last = buffer[backIdx];
+        init = true;
+      }
+      backIdx = (backIdx+1) % N;
+    }
+
+    template <typename T, int N>
+    T& BindedSignal<T, N>::reader (T& data, int time)
+    {
+      // synchronize with method clear:
+      // If reading from the list cannot be done, then return last value.
+      scoped_lock lock(rmutex, boost::try_to_lock);
+      if (!lock.owns_lock() || entity->readQueue_ == -1 || time < entity->readQueue_) {
+        data = last;
+      } else {
+        if (empty())
+          data = last;
+        else {
+          data = buffer[frontIdx];
+          frontIdx = (frontIdx + 1) % N;
+          last = data;
+        }
+      }
+      return data;
+    }
+  } // end of namespace internal.
+
+  template <typename T>
+  void RosQueuedSubscribe::add (const std::string& type, const std::string& signal, const std::string& topic)
+  {
+    internal::Add<T> () (*this, type, signal, topic);
+  }
+} // end of namespace dynamicgraph.
+
+#endif //! DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
diff --git a/src/sot_to_ros.hh b/src/sot_to_ros.hh
index a09f91c..7163abd 100644
--- a/src/sot_to_ros.hh
+++ b/src/sot_to_ros.hh
@@ -65,6 +65,11 @@ namespace dynamicgraph
     {
       s.setConstant (0.);
     }
+
+    static void setDefault(sot_t& s)
+    {
+      s = 0.;
+    }
   };
 
   template <>
@@ -84,6 +89,11 @@ namespace dynamicgraph
     {
       s.setConstant (0);
     }
+
+    static void setDefault(sot_t& s)
+    {
+      s = 0;
+    }
   };
 
   template <>
@@ -105,6 +115,11 @@ namespace dynamicgraph
       m.resize(0, 0);
       s.setConstant (m);
     }
+
+    static void setDefault(sot_t& s)
+    {
+      s.resize(0,0);
+    }
   };
 
   template <>
@@ -126,6 +141,11 @@ namespace dynamicgraph
       v.resize (0);
       s.setConstant (v);
     }
+
+    static void setDefault(sot_t& s)
+    {
+      s.resize(0,0);
+    }
   };
 
   template <>
@@ -143,10 +163,14 @@ namespace dynamicgraph
     template <typename S>
     static void setDefault(S& s)
     {
-      Vector v;
-      v.resize (0);
+      Vector v (Vector::Zero(3));
       s.setConstant (v);
     }
+
+    static void setDefault(sot_t& s)
+    {
+      s = Vector::Zero(3);
+    }
   };
 
   template <>
@@ -167,6 +191,11 @@ namespace dynamicgraph
       sot::MatrixHomogeneous m;
       s.setConstant (m);
     }
+
+    static void setDefault(sot_t& s)
+    {
+      s.setIdentity();
+    }
   };
 
   template <>
@@ -188,6 +217,11 @@ namespace dynamicgraph
       v.setZero ();
       s.setConstant (v);
     }
+
+    static void setDefault(sot_t& s)
+    {
+      s = Vector::Zero(6);
+    }
   };
 
   // Stamped vector3
-- 
GitLab