Skip to content
Snippets Groups Projects
Commit 84ae4143 authored by Joseph Mirabel's avatar Joseph Mirabel Committed by Joseph Mirabel
Browse files

Add entity RosQueuedSubscribe

parent 496023c2
No related branches found
No related tags found
No related merge requests found
......@@ -132,6 +132,7 @@ endmacro()
# Build Sot Entities
compile_plugin(ros_publish)
compile_plugin(ros_subscribe)
compile_plugin(ros_queued_subscribe)
compile_plugin(ros_time)
compile_plugin(ros_joint_state)
pkg_config_use_dependency(ros_joint_state pinocchio)
......
//
// Copyright (c) 2017-2018 CNRS
// Authors: Joseph Mirabel
//
//
// This file is part of dynamic_graph_bridge
// dynamic_graph_bridge is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation, either version
// 3 of the License, or (at your option) any later version.
//
// dynamic_graph_bridge is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Lesser Public License for more details. You should have
// received a copy of the GNU Lesser General Public License along with
// dynamic_graph_bridge If not, see
// <http://www.gnu.org/licenses/>.
#include <boost/assign.hpp>
#include <boost/bind.hpp>
#include <boost/format.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <ros/ros.h>
#include <std_msgs/Float64.h>
#include <std_msgs/UInt32.h>
#include <dynamic-graph/factory.h>
#include "dynamic_graph_bridge/ros_init.hh"
#include "ros_queued_subscribe.hh"
namespace dynamicgraph
{
DYNAMICGRAPH_FACTORY_ENTITY_PLUGIN(RosQueuedSubscribe, "RosQueuedSubscribe");
namespace command
{
namespace rosQueuedSubscribe
{
Clear::Clear
(RosQueuedSubscribe& entity, const std::string& docstring)
: Command
(entity,
std::vector<Value::Type> (),
docstring)
{}
Value Clear::doExecute ()
{
RosQueuedSubscribe& entity =
static_cast<RosQueuedSubscribe&> (owner ());
entity.clear ();
return Value ();
}
ClearQueue::ClearQueue
(RosQueuedSubscribe& entity, const std::string& docstring)
: Command
(entity,
boost::assign::list_of (Value::STRING),
docstring)
{}
Value ClearQueue::doExecute ()
{
RosQueuedSubscribe& entity =
static_cast<RosQueuedSubscribe&> (owner ());
std::vector<Value> values = getParameterValues ();
const std::string& signal = values[0].value ();
entity.clearQueue (signal);
return Value ();
}
List::List
(RosQueuedSubscribe& entity, const std::string& docstring)
: Command
(entity,
std::vector<Value::Type> (),
docstring)
{}
Value List::doExecute ()
{
RosQueuedSubscribe& entity =
static_cast<RosQueuedSubscribe&> (owner ());
return Value (entity.list ());
}
Add::Add
(RosQueuedSubscribe& entity, const std::string& docstring)
: Command
(entity,
boost::assign::list_of
(Value::STRING) (Value::STRING) (Value::STRING),
docstring)
{}
Value Add::doExecute ()
{
RosQueuedSubscribe& entity =
static_cast<RosQueuedSubscribe&> (owner ());
std::vector<Value> values = getParameterValues ();
const std::string& type = values[0].value ();
const std::string& signal = values[1].value ();
const std::string& topic = values[2].value ();
if (type == "double")
entity.add<double> (type, signal, topic);
else if (type == "unsigned")
entity.add<unsigned int> (type, signal, topic);
else if (type == "matrix")
entity.add<dg::Matrix> (type, signal, topic);
else if (type == "vector")
entity.add<dg::Vector> (type, signal, topic);
else if (type == "vector3")
entity.add<specific::Vector3> (type, signal, topic);
else if (type == "matrixHomo")
entity.add<sot::MatrixHomogeneous> (type, signal, topic);
else if (type == "twist")
entity.add<specific::Twist> (type, signal, topic);
else
throw std::runtime_error("bad type");
return Value ();
}
Rm::Rm
(RosQueuedSubscribe& entity, const std::string& docstring)
: Command
(entity,
boost::assign::list_of (Value::STRING),
docstring)
{}
Value Rm::doExecute ()
{
RosQueuedSubscribe& entity =
static_cast<RosQueuedSubscribe&> (owner ());
std::vector<Value> values = getParameterValues ();
const std::string& signal = values[0].value ();
entity.rm (signal);
return Value ();
}
QueueSize::QueueSize
(RosQueuedSubscribe& entity, const std::string& docstring)
: Command
(entity,
boost::assign::list_of (Value::STRING),
docstring)
{}
Value QueueSize::doExecute ()
{
RosQueuedSubscribe& entity =
static_cast<RosQueuedSubscribe&> (owner ());
std::vector<Value> values = getParameterValues ();
const std::string& signal = values[0].value ();
return Value ((unsigned)entity.queueSize (signal));
}
ReadQueue::ReadQueue
(RosQueuedSubscribe& entity, const std::string& docstring)
: Command
(entity,
boost::assign::list_of (Value::INT),
docstring)
{}
Value ReadQueue::doExecute ()
{
RosQueuedSubscribe& entity =
static_cast<RosQueuedSubscribe&> (owner ());
std::vector<Value> values = getParameterValues ();
int read = values[0].value ();
entity.readQueue (read);
return Value ();
}
} // end of errorEstimator.
} // end of namespace command.
const std::string RosQueuedSubscribe::docstring_
("Subscribe to a ROS topics and convert it into a dynamic-graph signals.\n"
"\n"
" Use command \"add\" to subscribe to a new signal.\n");
RosQueuedSubscribe::RosQueuedSubscribe (const std::string& n)
: dynamicgraph::Entity(n),
nh_ (rosInit (true)),
bindedSignal_ (),
readQueue_ (-1)
{
std::string docstring =
"\n"
" Add a signal reading data from a ROS topic\n"
"\n"
" Input:\n"
" - type: string among ['double', 'matrix', 'vector', 'vector3',\n"
" 'matrixHomo', 'twist'],\n"
" - signal: the signal name in dynamic-graph,\n"
" - topic: the topic name in ROS.\n"
"\n";
addCommand ("add",
new command::rosQueuedSubscribe::Add
(*this, docstring));
docstring =
"\n"
" Remove a signal reading data from a ROS topic\n"
"\n"
" Input:\n"
" - name of the signal to remove (see method list for the list of signals).\n"
"\n";
addCommand ("rm",
new command::rosQueuedSubscribe::Rm
(*this, docstring));
docstring =
"\n"
" Remove all signals reading data from a ROS topic\n"
"\n"
" No input:\n"
"\n";
addCommand ("clear",
new command::rosQueuedSubscribe::Clear
(*this, docstring));
docstring =
"\n"
" List signals reading data from a ROS topic\n"
"\n"
" No input:\n"
"\n";
addCommand ("list",
new command::rosQueuedSubscribe::List
(*this, docstring));
docstring =
"\n"
" Empty the queue of a given signal\n"
"\n"
" Input is:\n"
" - name of the signal (see method list for the list of signals).\n"
"\n";
addCommand ("clearQueue",
new command::rosQueuedSubscribe::ClearQueue
(*this, docstring));
docstring =
"\n"
" Return the queue size of a given signal\n"
"\n"
" Input is:\n"
" - name of the signal (see method list for the list of signals).\n"
"\n";
addCommand ("queueSize",
new command::rosQueuedSubscribe::QueueSize
(*this, docstring));
docstring =
"\n"
" Whether signals should read values from the queues, and when.\n"
"\n"
" Input is:\n"
" - int (dynamic graph time at which the reading begin).\n"
"\n";
addCommand ("readQueue",
new command::rosQueuedSubscribe::ReadQueue
(*this, docstring));
}
RosQueuedSubscribe::~RosQueuedSubscribe ()
{
std::cout << getName() << ": Delete" << std::endl;
}
void RosQueuedSubscribe::display (std::ostream& os) const
{
os << CLASS_NAME << std::endl;
}
void RosQueuedSubscribe::rm (const std::string& signal)
{
std::string signalTs = signal+"Timestamp";
signalDeregistration(signal);
bindedSignal_.erase (signal);
if(bindedSignal_.find(signalTs) != bindedSignal_.end())
{
signalDeregistration(signalTs);
bindedSignal_.erase(signalTs);
}
}
std::string RosQueuedSubscribe::list ()
{
std::string result("[");
for (std::map<std::string, bindedSignal_t>::const_iterator it =
bindedSignal_.begin (); it != bindedSignal_.end (); it++) {
result += "'" + it->first + "',";
}
result += "]";
return result;
}
void RosQueuedSubscribe::clear ()
{
std::map<std::string, bindedSignal_t>::iterator it = bindedSignal_.begin();
for(; it!= bindedSignal_.end(); )
{
rm(it->first);
it = bindedSignal_.begin();
}
}
void RosQueuedSubscribe::clearQueue (const std::string& signal)
{
if(bindedSignal_.find(signal) != bindedSignal_.end())
{
bindedSignal_[signal]->clear();
}
}
std::size_t RosQueuedSubscribe::queueSize (const std::string& signal) const
{
std::map<std::string, bindedSignal_t>::const_iterator _bs =
bindedSignal_.find(signal);
if(_bs != bindedSignal_.end())
{
return _bs->second->size();
}
return -1;
}
void RosQueuedSubscribe::readQueue (int beginReadingAt)
{
// Prints signal queues sizes
/*for (std::map<std::string, bindedSignal_t>::const_iterator it =
bindedSignal_.begin (); it != bindedSignal_.end (); it++) {
std::cout << it->first << " : " << it->second->size() << '\n';
}*/
readQueue_ = beginReadingAt;
}
std::string RosQueuedSubscribe::getDocString () const
{
return docstring_;
}
} // end of namespace dynamicgraph.
//
// Copyright (c) 2017-2018 CNRS
// Authors: Joseph Mirabel
//
//
// This file is part of dynamic_graph_bridge
// dynamic_graph_bridge is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation, either version
// 3 of the License, or (at your option) any later version.
//
// dynamic_graph_bridge is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Lesser Public License for more details. You should have
// received a copy of the GNU Lesser General Public License along with
// dynamic_graph_bridge If not, see
// <http://www.gnu.org/licenses/>.
#ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HH
# define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HH
# include <iostream>
# include <map>
# include <boost/shared_ptr.hpp>
# include <boost/thread/mutex.hpp>
# include <dynamic-graph/entity.h>
# include <dynamic-graph/signal-time-dependent.h>
# include <dynamic-graph/signal-ptr.h>
# include <dynamic-graph/command.h>
# include <sot/core/matrix-geometry.hh>
# include <ros/ros.h>
# include "converter.hh"
# include "sot_to_ros.hh"
namespace dynamicgraph
{
class RosQueuedSubscribe;
namespace command
{
namespace rosQueuedSubscribe
{
using ::dynamicgraph::command::Command;
using ::dynamicgraph::command::Value;
# define ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(CMD) \
class CMD : public Command \
{ \
public: \
CMD (RosQueuedSubscribe& entity, \
const std::string& docstring); \
virtual Value doExecute (); \
}
ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(Add);
ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(Clear);
ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(List);
ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(Rm);
ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(ClearQueue);
ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(QueueSize);
ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND(ReadQueue);
#undef ROS_QUEUED_SUBSCRIBE_MAKE_COMMAND
} // end of namespace rosQueuedSubscribe.
} // end of namespace command.
class RosQueuedSubscribe;
namespace internal
{
template <typename T>
struct Add;
struct BindedSignalBase {
typedef boost::shared_ptr<ros::Subscriber> Subscriber_t;
BindedSignalBase(RosQueuedSubscribe* e) : entity(e) {}
virtual ~BindedSignalBase() {}
virtual void clear () = 0;
virtual std::size_t size () const = 0;
Subscriber_t subscriber;
RosQueuedSubscribe* entity;
};
template <typename T, int BufferSize>
struct BindedSignal : BindedSignalBase {
typedef dynamicgraph::Signal<T, int> Signal_t;
typedef boost::shared_ptr<Signal_t> SignalPtr_t;
typedef std::vector<T> buffer_t;
typedef typename buffer_t::size_type size_type;
BindedSignal(RosQueuedSubscribe* e)
: BindedSignalBase (e)
, frontIdx(0)
, backIdx(0)
, buffer (BufferSize)
, init(false)
{}
~BindedSignal()
{
std::cout << signal->getName() << ": Delete" << std::endl;
signal.reset();
clear();
}
/// See comments in reader and writer for details about synchronisation.
void clear ()
{
// synchronize with method writer
wmutex.lock();
if (!empty()) {
if (backIdx == 0)
last = buffer[BufferSize-1];
else
last = buffer[backIdx-1];
}
// synchronize with method reader
rmutex.lock();
frontIdx = backIdx = 0;
rmutex.unlock();
wmutex.unlock();
}
bool empty () const
{
return frontIdx == backIdx;
}
bool full () const
{
return ((backIdx + 1) % BufferSize) == frontIdx;
}
size_type size () const
{
if (frontIdx <= backIdx)
return backIdx - frontIdx;
else
return backIdx + BufferSize - frontIdx;
}
SignalPtr_t signal;
/// Index of the next value to be read.
size_type frontIdx;
/// Index of the slot where to write next value (does not contain valid data).
size_type backIdx;
buffer_t buffer;
boost::mutex wmutex, rmutex;
T last;
bool init;
template <typename R> void writer (const R& data);
T& reader (T& val, int time);
};
} // end of internal namespace.
/// \brief Publish ROS information in the dynamic-graph.
class RosQueuedSubscribe : public dynamicgraph::Entity
{
DYNAMIC_GRAPH_ENTITY_DECL();
typedef boost::posix_time::ptime ptime;
public:
typedef boost::shared_ptr<internal::BindedSignalBase> bindedSignal_t;
RosQueuedSubscribe (const std::string& n);
virtual ~RosQueuedSubscribe ();
virtual std::string getDocString () const;
void display (std::ostream& os) const;
void rm (const std::string& signal);
std::string list ();
void clear ();
void clearQueue (const std::string& signal);
void readQueue (int beginReadingAt);
std::size_t queueSize (const std::string& signal) const;
template <typename T>
void add (const std::string& type, const std::string& signal, const std::string& topic);
std::map<std::string, bindedSignal_t>&
bindedSignal ()
{
return bindedSignal_;
}
ros::NodeHandle& nh ()
{
return nh_;
}
template <typename R, typename S>
void callback
(boost::shared_ptr<dynamicgraph::SignalPtr<S, int> > signal,
const R& data);
template <typename R>
void callbackTimestamp
(boost::shared_ptr<dynamicgraph::SignalPtr<ptime, int> > signal,
const R& data);
template <typename T>
friend class internal::Add;
private:
static const std::string docstring_;
ros::NodeHandle& nh_;
std::map<std::string, bindedSignal_t> bindedSignal_;
int readQueue_;
// Signal<bool, int> readQueue_;
template <typename T>
friend class internal::BindedSignal;
};
} // end of namespace dynamicgraph.
# include "ros_queued_subscribe.hxx"
#endif //! DYNAMIC_GRAPH_QUEUED_ROS_SUBSCRIBE_HH
//
// Copyright (c) 2017-2018 CNRS
// Authors: Joseph Mirabel
//
//
// This file is part of dynamic_graph_bridge
// dynamic_graph_bridge is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation, either version
// 3 of the License, or (at your option) any later version.
//
// dynamic_graph_bridge is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Lesser Public License for more details. You should have
// received a copy of the GNU Lesser General Public License along with
// dynamic_graph_bridge If not, see
// <http://www.gnu.org/licenses/>.
#ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
# define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
# include <vector>
# include <boost/bind.hpp>
# include <boost/date_time/posix_time/posix_time.hpp>
# include <dynamic-graph/signal-caster.h>
# include <dynamic-graph/linear-algebra.h>
# include <dynamic-graph/signal-cast-helper.h>
# include <std_msgs/Float64.h>
# include "dynamic_graph_bridge_msgs/Matrix.h"
# include "dynamic_graph_bridge_msgs/Vector.h"
namespace dg = dynamicgraph;
typedef boost::mutex::scoped_lock scoped_lock;
namespace dynamicgraph
{
namespace internal
{
static const int BUFFER_SIZE = 1 << 10;
template <typename T>
struct Add
{
void operator () (RosQueuedSubscribe& rosSubscribe,
const std::string& type,
const std::string& signal,
const std::string& topic)
{
typedef typename SotToRos<T>::sot_t sot_t;
typedef typename SotToRos<T>::ros_const_ptr_t ros_const_ptr_t;
typedef BindedSignal<sot_t, BUFFER_SIZE> BindedSignal_t;
typedef typename BindedSignal_t::Signal_t Signal_t;
// Initialize the bindedSignal object.
BindedSignal_t* bs = new BindedSignal_t(&rosSubscribe);
SotToRos<T>::setDefault (bs->last);
// Initialize the signal.
boost::format signalName ("RosQueuedSubscribe(%1%)::output(%2%)::%3%");
signalName % rosSubscribe.getName () % type % signal;
bs->signal.reset (new Signal_t (signalName.str ()));
bs->signal->setFunction (boost::bind(&BindedSignal_t::reader, bs, _1, _2));
rosSubscribe.signalRegistration (*bs->signal);
// Initialize the subscriber.
typedef boost::function<void (const ros_const_ptr_t& data)> callback_t;
callback_t callback = boost::bind
(&BindedSignal_t::template writer<ros_const_ptr_t>, bs, _1);
// Keep 50 messages in queue, but only 20 are sent every 100ms
// -> No message should be lost because of a full buffer
bs->subscriber =
boost::make_shared<ros::Subscriber>
(rosSubscribe.nh ().subscribe (topic, BUFFER_SIZE, callback));
RosQueuedSubscribe::bindedSignal_t bindedSignal (bs);
rosSubscribe.bindedSignal ()[signal] = bindedSignal;
}
};
// template <typename T, typename R>
template <typename T, int N>
template <typename R>
void BindedSignal<T, N>::writer (const R& data)
{
// synchronize with method clear
boost::mutex::scoped_lock lock(wmutex);
if (full()) {
rmutex.lock();
frontIdx = (frontIdx + 1) % N;
rmutex.unlock();
}
converter (buffer[backIdx], data);
// No need to synchronize with reader here because:
// - if the buffer was not empty, then it stays not empty,
// - if it was empty, then the current value will be used at next time. It
// means the transmission bandwidth is too low.
if (!init) {
last = buffer[backIdx];
init = true;
}
backIdx = (backIdx+1) % N;
}
template <typename T, int N>
T& BindedSignal<T, N>::reader (T& data, int time)
{
// synchronize with method clear:
// If reading from the list cannot be done, then return last value.
scoped_lock lock(rmutex, boost::try_to_lock);
if (!lock.owns_lock() || entity->readQueue_ == -1 || time < entity->readQueue_) {
data = last;
} else {
if (empty())
data = last;
else {
data = buffer[frontIdx];
frontIdx = (frontIdx + 1) % N;
last = data;
}
}
return data;
}
} // end of namespace internal.
template <typename T>
void RosQueuedSubscribe::add (const std::string& type, const std::string& signal, const std::string& topic)
{
internal::Add<T> () (*this, type, signal, topic);
}
} // end of namespace dynamicgraph.
#endif //! DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
......@@ -65,6 +65,11 @@ namespace dynamicgraph
{
s.setConstant (0.);
}
static void setDefault(sot_t& s)
{
s = 0.;
}
};
template <>
......@@ -84,6 +89,11 @@ namespace dynamicgraph
{
s.setConstant (0);
}
static void setDefault(sot_t& s)
{
s = 0;
}
};
template <>
......@@ -105,6 +115,11 @@ namespace dynamicgraph
m.resize(0, 0);
s.setConstant (m);
}
static void setDefault(sot_t& s)
{
s.resize(0,0);
}
};
template <>
......@@ -126,6 +141,11 @@ namespace dynamicgraph
v.resize (0);
s.setConstant (v);
}
static void setDefault(sot_t& s)
{
s.resize(0,0);
}
};
template <>
......@@ -143,10 +163,14 @@ namespace dynamicgraph
template <typename S>
static void setDefault(S& s)
{
Vector v;
v.resize (0);
Vector v (Vector::Zero(3));
s.setConstant (v);
}
static void setDefault(sot_t& s)
{
s = Vector::Zero(3);
}
};
template <>
......@@ -167,6 +191,11 @@ namespace dynamicgraph
sot::MatrixHomogeneous m;
s.setConstant (m);
}
static void setDefault(sot_t& s)
{
s.setIdentity();
}
};
template <>
......@@ -188,6 +217,11 @@ namespace dynamicgraph
v.setZero ();
s.setConstant (v);
}
static void setDefault(sot_t& s)
{
s = Vector::Zero(6);
}
};
// Stamped vector3
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment