diff --git a/include/ccf/connection/NngConnection.h b/include/ccf/connection/NngConnection.h index 17efd7f65dba57fc2ac94ca2642940023fdacdf6..ae8e4dcd902ba7dd0bdad7bc3ae9864a36381613 100644 --- a/include/ccf/connection/NngConnection.h +++ b/include/ccf/connection/NngConnection.h @@ -15,7 +15,8 @@ /// see https://nng.nanomsg.org/man/v0.2.0/nng_pair.html class NngConnection : public Connection { - const std::string &cgv_address; + const std::string &connection_address; + const bool server; nng_socket sock{}; std::unique_ptr<std::thread> nng_receiver_thread; std::string sendTopic; @@ -23,8 +24,10 @@ class NngConnection : public Connection { public: /// Constructor requiring an address as defined in https://nng.nanomsg.org/man/v1.3.2/nng.7.html - /// \param cgvAddress - explicit NngConnection(const std::string &cgvAddress); + /// \param connection_address in server mode the port on which the connections listens for connections, in client + /// mode the address of the server + /// \param server true, if the connection is a server (listens for connections), otherwise client mode is enabled + explicit NngConnection(const std::string &connection_address, bool server = true); bool send(const std::string &channel, const std::string &message) override; diff --git a/src/ccf/connection/NngConnection.cpp b/src/ccf/connection/NngConnection.cpp index a1cab0490e12fc3c81a8280b6d86c44813af4627..0ace0b248f5cd54f80dd36a62f3160240f7cee03 100644 --- a/src/ccf/connection/NngConnection.cpp +++ b/src/ccf/connection/NngConnection.cpp @@ -28,7 +28,7 @@ void NngConnection::receiveMessages() { } else if (recv_rv == NNG_EAGAIN) { // no message received in current spin } else { - ROS_ERROR_STREAM("[ros2cgv] nng_recv returned: " << nng_strerror(recv_rv)); + ROS_ERROR_STREAM("[" << ros::this_node::getName() << "] nng_recv returned: " << nng_strerror(recv_rv)); } loop_rate.sleep(); } @@ -37,32 +37,50 @@ void NngConnection::receiveMessages() { bool NngConnection::initializeConnection(std::function<void(std::string, std::string)> callback) { + int rv; - // first, establish the connection - ROS_INFO_STREAM("[ros2cgv] listening for connections at " << cgv_address); + if (server) { + // first, establish the connection + ROS_INFO_STREAM("[NngConnection] listening for connections at " << connection_address); - int rv; - if ((rv = nng_pair1_open(&sock)) != 0) { - ROS_ERROR_STREAM("[ros2cgv] nng_pair1_open returned: " << nng_strerror(rv)); - return false; - } + if ((rv = nng_pair1_open(&sock)) != 0) { + ROS_ERROR_STREAM("[NngConnection] nng_pair1_open returned: " << nng_strerror(rv)); + return false; + } - nng_listener listener; - if ((rv = nng_listen(sock, cgv_address.c_str(), &listener, 0)) != 0) { - ROS_ERROR_STREAM("[ros2cgv] nng_listen returned: " << nng_strerror(rv)); - return false; - } + nng_listener listener; + if ((rv = nng_listen(sock, connection_address.c_str(), &listener, 0)) != 0) { + ROS_ERROR_STREAM("[NngConnection] nng_listen returned: " << nng_strerror(rv)); + return false; + } + + // then, set the callback + messageReceivedCallback = callback; - // then, set the callback - messageReceivedCallback = callback; + // then, start the thread that uses the callback + nng_receiver_thread = std::make_unique<std::thread>(&NngConnection::receiveMessages, this); + } else { // client mode + + ROS_INFO_STREAM("[NngConnection] establishing connection with " << connection_address); + if ((rv = nng_pair1_open(&sock)) != 0) { + ROS_ERROR_STREAM("[NngConnection] nng_pair1_open returned: " << nng_strerror(rv)); + } + + ros::Rate connection_retry_rate(1); + while ((rv = nng_dial(sock, connection_address.c_str(), nullptr, 0)) != 0) { + ROS_WARN_STREAM("[NngConnection] nng_dial returned: " << nng_strerror(rv) + << ". Trying to connect again in one second..."); + connection_retry_rate.sleep(); + } + ROS_INFO_STREAM("[NngConnection] Connection established!"); + } - // then, start the thread that uses the callback - nng_receiver_thread = std::make_unique<std::thread>(&NngConnection::receiveMessages, this); return true; } -NngConnection::NngConnection(const std::string &cgvAddress) : cgv_address{cgvAddress}, sock{0} {} +NngConnection::NngConnection(const std::string &connection_address, bool server) : + connection_address{connection_address}, sock{0}, server{server} {} bool NngConnection::send(const std::string &channel, const std::string &message) {