Forked from
Stack Of Tasks / dynamic_graph_bridge
34 commits behind the upstream repository.
-
Joseph Mirabel authoredJoseph Mirabel authored
ros_queued_subscribe.hxx 3.58 KiB
//
// Copyright (c) 2017-2018 CNRS
// Authors: Joseph Mirabel
//
//
#ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
#define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
#define ENABLE_RT_LOG
#include <vector>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <dynamic-graph/real-time-logger.h>
#include <dynamic-graph/signal-caster.h>
#include <dynamic-graph/linear-algebra.h>
#include <dynamic-graph/signal-cast-helper.h>
#include <std_msgs/Float64.h>
#include "dynamic_graph_bridge_msgs/Matrix.h"
#include "dynamic_graph_bridge_msgs/Vector.h"
namespace dynamicgraph {
namespace internal {
static const int BUFFER_SIZE = 1 << 10;
template <typename T>
struct Add {
void operator()(RosQueuedSubscribe& rosSubscribe, const std::string& type, const std::string& signal,
const std::string& topic) {
typedef typename SotToRos<T>::sot_t sot_t;
typedef typename SotToRos<T>::ros_const_ptr_t ros_const_ptr_t;
typedef BindedSignal<sot_t, BUFFER_SIZE> BindedSignal_t;
typedef typename BindedSignal_t::Signal_t Signal_t;
// Initialize the bindedSignal object.
BindedSignal_t* bs = new BindedSignal_t(&rosSubscribe);
SotToRos<T>::setDefault(bs->last);
// Initialize the signal.
boost::format signalName("RosQueuedSubscribe(%1%)::output(%2%)::%3%");
signalName % rosSubscribe.getName() % type % signal;
bs->signal.reset(new Signal_t(signalName.str()));
bs->signal->setFunction(boost::bind(&BindedSignal_t::reader, bs, _1, _2));
rosSubscribe.signalRegistration(*bs->signal);
// Initialize the subscriber.
typedef boost::function<void(const ros_const_ptr_t& data)> callback_t;
callback_t callback = boost::bind(&BindedSignal_t::template writer<ros_const_ptr_t>, bs, _1);
// Keep 50 messages in queue, but only 20 are sent every 100ms
// -> No message should be lost because of a full buffer
bs->subscriber = boost::make_shared<ros::Subscriber>(rosSubscribe.nh().subscribe(topic, BUFFER_SIZE, callback));
RosQueuedSubscribe::bindedSignal_t bindedSignal(bs);
rosSubscribe.bindedSignal()[signal] = bindedSignal;
}
};
// template <typename T, typename R>
template <typename T, int N>
template <typename R>
void BindedSignal<T, N>::writer(const R& data) {
// synchronize with method clear
boost::mutex::scoped_lock lock(wmutex);
if (full()) {
rmutex.lock();
frontIdx = (frontIdx + 1) % N;
rmutex.unlock();
}
converter(buffer[backIdx], data);
// No need to synchronize with reader here because:
// - if the buffer was not empty, then it stays not empty,
// - if it was empty, then the current value will be used at next time. It
// means the transmission bandwidth is too low.
if (!init) {
last = buffer[backIdx];
init = true;
}
backIdx = (backIdx + 1) % N;
}
template <typename T, int N>
T& BindedSignal<T, N>::reader(T& data, int time) {
// synchronize with method clear:
// If reading from the list cannot be done, then return last value.
boost::mutex::scoped_lock lock(rmutex, boost::try_to_lock);
if (!lock.owns_lock() || entity->readQueue_ == -1 || time < entity->readQueue_) {
data = last;
} else {
if (empty())
data = last;
else {
data = buffer[frontIdx];
frontIdx = (frontIdx + 1) % N;
last = data;
}
}
return data;
}
} // end of namespace internal.
template <typename T>
void RosQueuedSubscribe::add(const std::string& type, const std::string& signal, const std::string& topic) {
internal::Add<T>()(*this, type, signal, topic);
}
} // end of namespace dynamicgraph.
#endif //! DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX