Skip to content
Snippets Groups Projects
Select Git revision
  • 85a55298ff167b32d75c1ed869a02ceb0e9393f0
  • noetic/main default
2 results

NngConnection.cpp

Blame
  • NngConnection.cpp 2.96 KiB
    //
    // Created by jm on 28/04/2021.
    //
    
    #include <nng/nng.h>
    #include <ros/ros.h>
    #include <nng/protocol/pair1/pair.h>
    #include <memory>
    #include <thread>
    
    #include "NngConnection.h"
    
    void NngConnection::receiveMessages() {
    
        //receive object selections
        ros::Rate loop_rate(200);
        while (ros::ok()) {
    
            char *buf = nullptr;
            size_t sz;
            int recv_rv;
            if ((recv_rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC | NNG_FLAG_NONBLOCK)) == 0) {
    
                std::string str(reinterpret_cast<char *>(buf), sz);
                messageReceivedCallback(getReceiveTopic(), str);
                nng_free(buf, sz);
    
            } else if (recv_rv == NNG_EAGAIN) {
                // no message received in current spin
            } else {
                ROS_ERROR_STREAM("[ros2cgv] nng_recv returned: " << nng_strerror(recv_rv));
            }
            loop_rate.sleep();
        }
    
    }
    
    bool NngConnection::initializeConnection(std::function<void(std::string, std::string)> callback) {
    
    
        // first, establish the connection
        ROS_INFO_STREAM("[ros2cgv] listening for connections at " << cgv_address);
    
        int rv;
        if ((rv = nng_pair1_open(&sock)) != 0) {
            ROS_ERROR_STREAM("[ros2cgv] nng_pair1_open returned: " << nng_strerror(rv));
            return false;
        }
    
        nng_listener listener;
        if ((rv = nng_listen(sock, cgv_address.c_str(), &listener, 0)) != 0) {
            ROS_ERROR_STREAM("[ros2cgv] nng_listen returned: " << nng_strerror(rv));
            return false;
        }
    
        // then, set the callback
        messageReceivedCallback = callback;
    
        // then, start the thread that uses the callback
        nng_receiver_thread = std::make_unique<std::thread>(&NngConnection::receiveMessages, this);
    
        return true;
    }
    
    NngConnection::NngConnection(const std::string &cgvAddress) : cgv_address{cgvAddress}, sock{0} {}
    
    bool NngConnection::send(const std::string &channel, const std::string &message) {
    
        if (channel != sendTopic) {
            ROS_INFO_STREAM("[NngConnection] Ignoring message to channel " << channel << ".");
            return true;
        }
    
        int length = message.size();
        void *data = (void *) message.c_str();
    
        ROS_INFO_STREAM(
                "[NngConnection] Sending message to channel " << channel << " with a length of " << message.size() << ".");
    
        int rv;
        if ((rv = nng_send(sock, data, length, 0)) != 0) {
            ROS_ERROR_STREAM("[NngConnection] nng_send returned: " << nng_strerror(rv));
            return false;
        } else {
            ROS_INFO_STREAM("[NngConnection] Message has been sent successfully.");
            return true;
        }
    }
    
    const std::string &NngConnection::getSendTopic() const {
        return sendTopic;
    }
    
    const std::string &NngConnection::getReceiveTopic() const {
        return receiveTopic;
    }
    
    void NngConnection::setSendTopic(const std::string &newTopic) {
        NngConnection::sendTopic = newTopic;
    }
    
    void NngConnection::setReceiveTopic(const std::string &newTopic) {
        NngConnection::receiveTopic = newTopic;
    }
    
    NngConnection::~NngConnection() {
        nng_receiver_thread->join();
    }