Skip to content
Snippets Groups Projects
Forked from Stack Of Tasks / dynamic_graph_bridge
34 commits behind the upstream repository.
ros_queued_subscribe.hxx 3.58 KiB
//
// Copyright (c) 2017-2018 CNRS
// Authors: Joseph Mirabel
//
//

#ifndef DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
#define DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX
#define ENABLE_RT_LOG

#include <vector>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <dynamic-graph/real-time-logger.h>
#include <dynamic-graph/signal-caster.h>
#include <dynamic-graph/linear-algebra.h>
#include <dynamic-graph/signal-cast-helper.h>
#include <std_msgs/Float64.h>
#include "dynamic_graph_bridge_msgs/Matrix.h"
#include "dynamic_graph_bridge_msgs/Vector.h"

namespace dynamicgraph {
namespace internal {
static const int BUFFER_SIZE = 1 << 10;

template <typename T>
struct Add {
  void operator()(RosQueuedSubscribe& rosSubscribe, const std::string& type, const std::string& signal,
                  const std::string& topic) {
    typedef typename SotToRos<T>::sot_t sot_t;
    typedef typename SotToRos<T>::ros_const_ptr_t ros_const_ptr_t;
    typedef BindedSignal<sot_t, BUFFER_SIZE> BindedSignal_t;
    typedef typename BindedSignal_t::Signal_t Signal_t;

    // Initialize the bindedSignal object.
    BindedSignal_t* bs = new BindedSignal_t(&rosSubscribe);
    SotToRos<T>::setDefault(bs->last);

    // Initialize the signal.
    boost::format signalName("RosQueuedSubscribe(%1%)::output(%2%)::%3%");
    signalName % rosSubscribe.getName() % type % signal;

    bs->signal.reset(new Signal_t(signalName.str()));
    bs->signal->setFunction(boost::bind(&BindedSignal_t::reader, bs, _1, _2));
    rosSubscribe.signalRegistration(*bs->signal);

    // Initialize the subscriber.
    typedef boost::function<void(const ros_const_ptr_t& data)> callback_t;
    callback_t callback = boost::bind(&BindedSignal_t::template writer<ros_const_ptr_t>, bs, _1);

    // Keep 50 messages in queue, but only 20 are sent every 100ms
    // -> No message should be lost because of a full buffer
    bs->subscriber = boost::make_shared<ros::Subscriber>(rosSubscribe.nh().subscribe(topic, BUFFER_SIZE, callback));

    RosQueuedSubscribe::bindedSignal_t bindedSignal(bs);
    rosSubscribe.bindedSignal()[signal] = bindedSignal;
  }
};

// template <typename T, typename R>
template <typename T, int N>
template <typename R>
void BindedSignal<T, N>::writer(const R& data) {
  // synchronize with method clear
  boost::mutex::scoped_lock lock(wmutex);
  if (full()) {
    rmutex.lock();
    frontIdx = (frontIdx + 1) % N;
    rmutex.unlock();
  }
  converter(buffer[backIdx], data);
  // No need to synchronize with reader here because:
  // - if the buffer was not empty, then it stays not empty,
  // - if it was empty, then the current value will be used at next time. It
  //   means the transmission bandwidth is too low.
  if (!init) {
    last = buffer[backIdx];
    init = true;
  }
  backIdx = (backIdx + 1) % N;
}

template <typename T, int N>
T& BindedSignal<T, N>::reader(T& data, int time) {
  // synchronize with method clear:
  // If reading from the list cannot be done, then return last value.
  boost::mutex::scoped_lock lock(rmutex, boost::try_to_lock);
  if (!lock.owns_lock() || entity->readQueue_ == -1 || time < entity->readQueue_) {
    data = last;
  } else {
    if (empty())
      data = last;
    else {
      data = buffer[frontIdx];
      frontIdx = (frontIdx + 1) % N;
      last = data;
    }
  }
  return data;
}
}  // end of namespace internal.

template <typename T>
void RosQueuedSubscribe::add(const std::string& type, const std::string& signal, const std::string& topic) {
  internal::Add<T>()(*this, type, signal, topic);
}
}  // end of namespace dynamicgraph.

#endif  //! DYNAMIC_GRAPH_ROS_QUEUED_SUBSCRIBE_HXX