Select Git revision
NngConnection.cpp

Johannes Mey authored
NngConnection.cpp 2.96 KiB
//
// Created by jm on 28/04/2021.
//
#include <nng/nng.h>
#include <ros/ros.h>
#include <nng/protocol/pair1/pair.h>
#include <memory>
#include <thread>
#include "NngConnection.h"
void NngConnection::receiveMessages() {
//receive object selections
ros::Rate loop_rate(200);
while (ros::ok()) {
char *buf = nullptr;
size_t sz;
int recv_rv;
if ((recv_rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC | NNG_FLAG_NONBLOCK)) == 0) {
std::string str(reinterpret_cast<char *>(buf), sz);
messageReceivedCallback(getReceiveTopic(), str);
nng_free(buf, sz);
} else if (recv_rv == NNG_EAGAIN) {
// no message received in current spin
} else {
ROS_ERROR_STREAM("[ros2cgv] nng_recv returned: " << nng_strerror(recv_rv));
}
loop_rate.sleep();
}
}
bool NngConnection::initializeConnection(std::function<void(std::string, std::string)> callback) {
// first, establish the connection
ROS_INFO_STREAM("[ros2cgv] listening for connections at " << cgv_address);
int rv;
if ((rv = nng_pair1_open(&sock)) != 0) {
ROS_ERROR_STREAM("[ros2cgv] nng_pair1_open returned: " << nng_strerror(rv));
return false;
}
nng_listener listener;
if ((rv = nng_listen(sock, cgv_address.c_str(), &listener, 0)) != 0) {
ROS_ERROR_STREAM("[ros2cgv] nng_listen returned: " << nng_strerror(rv));
return false;
}
// then, set the callback
messageReceivedCallback = callback;
// then, start the thread that uses the callback
nng_receiver_thread = std::make_unique<std::thread>(&NngConnection::receiveMessages, this);
return true;
}
NngConnection::NngConnection(const std::string &cgvAddress) : cgv_address{cgvAddress}, sock{0} {}
bool NngConnection::send(const std::string &channel, const std::string &message) {
if (channel != sendTopic) {
ROS_INFO_STREAM("[NngConnection] Ignoring message to channel " << channel << ".");
return true;
}
int length = message.size();
void *data = (void *) message.c_str();
ROS_INFO_STREAM(
"[NngConnection] Sending message to channel " << channel << " with a length of " << message.size() << ".");
int rv;
if ((rv = nng_send(sock, data, length, 0)) != 0) {
ROS_ERROR_STREAM("[NngConnection] nng_send returned: " << nng_strerror(rv));
return false;
} else {
ROS_INFO_STREAM("[NngConnection] Message has been sent successfully.");
return true;
}
}
const std::string &NngConnection::getSendTopic() const {
return sendTopic;
}
const std::string &NngConnection::getReceiveTopic() const {
return receiveTopic;
}
void NngConnection::setSendTopic(const std::string &newTopic) {
NngConnection::sendTopic = newTopic;
}
void NngConnection::setReceiveTopic(const std::string &newTopic) {
NngConnection::receiveTopic = newTopic;
}
NngConnection::~NngConnection() {
nng_receiver_thread->join();
}