Skip to content
Snippets Groups Projects
Commit 020e4ab0 authored by Johannes Mey's avatar Johannes Mey
Browse files

add client mode for NNG connection

parent 76650ff4
No related branches found
No related tags found
No related merge requests found
...@@ -15,7 +15,8 @@ ...@@ -15,7 +15,8 @@
/// see https://nng.nanomsg.org/man/v0.2.0/nng_pair.html /// see https://nng.nanomsg.org/man/v0.2.0/nng_pair.html
class NngConnection : public Connection { class NngConnection : public Connection {
const std::string &cgv_address; const std::string &connection_address;
const bool server;
nng_socket sock{}; nng_socket sock{};
std::unique_ptr<std::thread> nng_receiver_thread; std::unique_ptr<std::thread> nng_receiver_thread;
std::string sendTopic; std::string sendTopic;
...@@ -23,8 +24,10 @@ class NngConnection : public Connection { ...@@ -23,8 +24,10 @@ class NngConnection : public Connection {
public: public:
/// Constructor requiring an address as defined in https://nng.nanomsg.org/man/v1.3.2/nng.7.html /// Constructor requiring an address as defined in https://nng.nanomsg.org/man/v1.3.2/nng.7.html
/// \param cgvAddress /// \param connection_address in server mode the port on which the connections listens for connections, in client
explicit NngConnection(const std::string &cgvAddress); /// mode the address of the server
/// \param server true, if the connection is a server (listens for connections), otherwise client mode is enabled
explicit NngConnection(const std::string &connection_address, bool server = true);
bool send(const std::string &channel, const std::string &message) override; bool send(const std::string &channel, const std::string &message) override;
......
...@@ -28,7 +28,7 @@ void NngConnection::receiveMessages() { ...@@ -28,7 +28,7 @@ void NngConnection::receiveMessages() {
} else if (recv_rv == NNG_EAGAIN) { } else if (recv_rv == NNG_EAGAIN) {
// no message received in current spin // no message received in current spin
} else { } else {
ROS_ERROR_STREAM("[ros2cgv] nng_recv returned: " << nng_strerror(recv_rv)); ROS_ERROR_STREAM("[" << ros::this_node::getName() << "] nng_recv returned: " << nng_strerror(recv_rv));
} }
loop_rate.sleep(); loop_rate.sleep();
} }
...@@ -37,19 +37,20 @@ void NngConnection::receiveMessages() { ...@@ -37,19 +37,20 @@ void NngConnection::receiveMessages() {
bool NngConnection::initializeConnection(std::function<void(std::string, std::string)> callback) { bool NngConnection::initializeConnection(std::function<void(std::string, std::string)> callback) {
int rv;
if (server) {
// first, establish the connection // first, establish the connection
ROS_INFO_STREAM("[ros2cgv] listening for connections at " << cgv_address); ROS_INFO_STREAM("[NngConnection] listening for connections at " << connection_address);
int rv;
if ((rv = nng_pair1_open(&sock)) != 0) { if ((rv = nng_pair1_open(&sock)) != 0) {
ROS_ERROR_STREAM("[ros2cgv] nng_pair1_open returned: " << nng_strerror(rv)); ROS_ERROR_STREAM("[NngConnection] nng_pair1_open returned: " << nng_strerror(rv));
return false; return false;
} }
nng_listener listener; nng_listener listener;
if ((rv = nng_listen(sock, cgv_address.c_str(), &listener, 0)) != 0) { if ((rv = nng_listen(sock, connection_address.c_str(), &listener, 0)) != 0) {
ROS_ERROR_STREAM("[ros2cgv] nng_listen returned: " << nng_strerror(rv)); ROS_ERROR_STREAM("[NngConnection] nng_listen returned: " << nng_strerror(rv));
return false; return false;
} }
...@@ -58,11 +59,28 @@ bool NngConnection::initializeConnection(std::function<void(std::string, std::st ...@@ -58,11 +59,28 @@ bool NngConnection::initializeConnection(std::function<void(std::string, std::st
// then, start the thread that uses the callback // then, start the thread that uses the callback
nng_receiver_thread = std::make_unique<std::thread>(&NngConnection::receiveMessages, this); nng_receiver_thread = std::make_unique<std::thread>(&NngConnection::receiveMessages, this);
} else { // client mode
ROS_INFO_STREAM("[NngConnection] establishing connection with " << connection_address);
if ((rv = nng_pair1_open(&sock)) != 0) {
ROS_ERROR_STREAM("[NngConnection] nng_pair1_open returned: " << nng_strerror(rv));
}
ros::Rate connection_retry_rate(1);
while ((rv = nng_dial(sock, connection_address.c_str(), nullptr, 0)) != 0) {
ROS_WARN_STREAM("[NngConnection] nng_dial returned: " << nng_strerror(rv)
<< ". Trying to connect again in one second...");
connection_retry_rate.sleep();
}
ROS_INFO_STREAM("[NngConnection] Connection established!");
}
return true; return true;
} }
NngConnection::NngConnection(const std::string &cgvAddress) : cgv_address{cgvAddress}, sock{0} {} NngConnection::NngConnection(const std::string &connection_address, bool server) :
connection_address{connection_address}, sock{0}, server{server} {}
bool NngConnection::send(const std::string &channel, const std::string &message) { bool NngConnection::send(const std::string &channel, const std::string &message) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment