From 5e79c0a205e05b4050c20f5bd635ea5582b2614c Mon Sep 17 00:00:00 2001
From: SebastianEbert <sebastian.ebert@tu-dresden.de>
Date: Thu, 2 Apr 2020 17:20:07 +0200
Subject: [PATCH] stubs for retreiving and sending mqtt-messages between ros
 and a mqtt-broker, updated cmake with eclipse-paho, new launch-file for
 ros-rag-connection

---
 CMakeLists.txt                  |  45 ++++++++++-
 launch/simulation_rosrag.launch |   1 +
 src/MqttToRosNode.cpp           | 127 ++++++++++++++++++++++++++++++++
 src/RobotStateProvider.cpp      |  26 ++++++-
 4 files changed, 193 insertions(+), 6 deletions(-)
 create mode 100644 src/MqttToRosNode.cpp

diff --git a/CMakeLists.txt b/CMakeLists.txt
index a88278c..458dd4b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,3 +1,14 @@
+#cmake_minimum_required(VERSION 3.10)
+#project(MY_PROJECT)
+#set(CMAKE_CXX_STANDARD 14)
+
+#add_executable(MY_PROJECT main.cpp)
+# Perform linking of the paho lib
+#target_link_libraries(MY_PROJECT LINK_PUBLIC ${PahoMqtt} ${PahoMqttCpp})
+# LINK_PUBLIC vs LINK_PRIVATE: https://cmake.org/cmake/help/v3.10/command/target_link_libraries.html
+
+
+
 cmake_minimum_required(VERSION 2.8.3)
 project(panda_simulation)
 
@@ -31,6 +42,18 @@ find_package(catkin REQUIRED
 # System dependencies are found with CMake's conventions
 find_package(Boost REQUIRED COMPONENTS filesystem)
 
+find_library(PahoMqtt3c libpaho-mqtt3c.so REQUIRED)
+find_library(PahoMqtt3cs libpaho-mqtt3cs.so REQUIRED)
+find_library(PahoMqtt3a libpaho-mqtt3a.so REQUIRED)
+find_library(PahoMqtt3as libpaho-mqtt3as.so REQUIRED)
+find_library(PahoMqttCpp libpaho-mqttpp3.so REQUIRED)
+
+message("Lib PahoMqtt3c is at: ${PahoMqtt3c}")
+message("Lib PahoMqtt3cs is at: ${PahoMqtt3cs}")
+message("Lib PahoMqtt3a is at: ${PahoMqtt3a}")
+message("Lib PahoMqtt3as is at: ${PahoMqtt3as}")
+message("Lib PahoMqttCpp is at: ${PahoMqttCpp}")
+
 find_package(PkgConfig REQUIRED)
 pkg_check_modules(JSONCPP jsoncpp)
 message(${JSONCPP_LIBRARIES})
@@ -46,9 +69,9 @@ catkin_package(CATKIN_DEPENDS
                controller_interface
                hardware_interface
                pluginlib
-               #DEPENDS
-               #libpaho-mqtt
-#               system_lib
+               DEPENDS
+               #libpaho-mqtt3
+              # system_lib
         )
 
 # ################################################################################################################################
@@ -71,11 +94,25 @@ add_executable(SafetyAwarePlanner src/SafetyAwarePlanner.cpp src/SafetyEnvironme
 add_executable(SampleConstraintPlanner src/SampleConstraintPlanner.cpp)
 add_executable(RobotStateProvider src/RobotStateProvider.cpp)
 add_executable(TimedCartesianPlanner src/TimedCartesianPlanner.cpp)
+add_executable(MqttToRosNode src/MqttToRosNode.cpp)
 
 target_link_libraries(robot_control_node ${catkin_LIBRARIES} boost_filesystem ${JSONCPP_LIBRARIES})
 target_link_libraries(box_publisher_node ${catkin_LIBRARIES})
 target_link_libraries(robot_state_initializer_node ${catkin_LIBRARIES})
 target_link_libraries(SafetyAwarePlanner ${catkin_LIBRARIES})
 target_link_libraries(SampleConstraintPlanner ${catkin_LIBRARIES} ${Boost_LIBRARIES})
-target_link_libraries(RobotStateProvider ${catkin_LIBRARIES})
+target_link_libraries(RobotStateProvider LINK_PUBLIC
+        ${catkin_LIBRARIES}
+        ${PahoMqtt3c}
+        ${PahoMqtt3cs}
+        ${PahoMqtt3a}
+        ${PahoMqtt3as}
+        ${PahoMqttCpp})
+target_link_libraries(MqttToRosNode LINK_PUBLIC
+        ${catkin_LIBRARIES}
+        ${PahoMqtt3c}
+        ${PahoMqtt3cs}
+        ${PahoMqtt3a}
+        ${PahoMqtt3as}
+        ${PahoMqttCpp})
 target_link_libraries(TimedCartesianPlanner ${catkin_LIBRARIES})
\ No newline at end of file
diff --git a/launch/simulation_rosrag.launch b/launch/simulation_rosrag.launch
index 91e8184..918b565 100644
--- a/launch/simulation_rosrag.launch
+++ b/launch/simulation_rosrag.launch
@@ -58,6 +58,7 @@
     <node pkg="panda_simulation" type="robot_state_initializer_node" name="robot_state_initializer_node" />
 
     <node name="TimedCartesianPlanner" pkg="panda_simulation" type="TimedCartesianPlanner" respawn="false" output="screen"/>
+    <node name="MqttToRosNode" pkg="panda_simulation" type="MqttToRosNode" respawn="false" output="screen"/>
     <node name="RobotStateProvider" pkg="panda_simulation" type="RobotStateProvider"/>
 
 </launch>
diff --git a/src/MqttToRosNode.cpp b/src/MqttToRosNode.cpp
new file mode 100644
index 0000000..3743875
--- /dev/null
+++ b/src/MqttToRosNode.cpp
@@ -0,0 +1,127 @@
+#include <iostream>
+#include <cstdlib>
+#include <string>
+#include <cstring>
+#include <cctype>
+#include <thread>
+#include <chrono>
+#include "mqtt/client.h"
+
+#include "ros/ros.h"
+
+using namespace std;
+using namespace std::chrono;
+
+const string SERVER_ADDRESS	{ "tcp://localhost:1883" };
+const string CLIENT_ID		{ "sync_consume_cpp2" };
+
+// --------------------------------------------------------------------------
+// Simple function to manually reconnect a client.
+
+bool try_reconnect(mqtt::client& cli)
+{
+    constexpr int N_ATTEMPT = 30;
+
+    for (int i=0; i<N_ATTEMPT && !cli.is_connected(); ++i) {
+        try {
+            cli.reconnect();
+            return true;
+        }
+        catch (const mqtt::exception&) {
+            this_thread::sleep_for(seconds(1));
+        }
+    }
+    return false;
+}
+
+/////////////////////////////////////////////////////////////////////////////
+
+void handleCommand(auto msg)
+{
+    // TBD
+}
+
+void handleConfig(auto msg)
+{
+    // TBD
+}
+
+void receiveMqttMessages()
+{
+    mqtt::connect_options connOpts;
+    mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);
+
+    connOpts.set_keep_alive_interval(20);
+    connOpts.set_clean_session(true);
+
+    const vector<string> TOPICS { "config", "command" };
+    const vector<int> QOS { 0, 1 };
+
+    try {
+        ROS_INFO_STREAM("Connecting to the MQTT server.");
+        mqtt::connect_response rsp = cli.connect(connOpts);
+        ROS_INFO_STREAM("Connected to the MQTT server.");
+
+        if (!rsp.is_session_present()) {
+            ROS_INFO_STREAM("Subscribing to topics.");
+            cli.subscribe(TOPICS, QOS);
+            ROS_INFO_STREAM("Subscribed to topics.");
+        }else {
+            ROS_WARN_STREAM("Session already present. Skipping subscribe.");
+         }
+
+        // Consume messages
+        while (true) {
+
+            auto msg = cli.consume_message();
+
+            if (!msg) {
+                if (!cli.is_connected()) {
+                    ROS_ERROR_STREAM("Lost connection. Attempting reconnect");
+                    if (try_reconnect(cli)) {
+                        cli.subscribe(TOPICS, QOS);
+                        ROS_INFO_STREAM("Reconnected");
+                        continue;
+                    }
+                    else {
+                        ROS_ERROR_STREAM("Reconnect failed.");
+                    }
+                }
+                else {
+                    ROS_ERROR_STREAM("An error occurred retrieving messages.");
+                }
+                break;
+            }
+
+            if (msg->get_topic() == "command")
+            {
+                handleCommand(msg);
+            }
+
+            if (msg->get_topic() == "config")
+            {
+                handleConfig(msg);
+            }
+
+            ROS_INFO_STREAM("NEW MESSAGE: " << msg->get_topic() << ": " << msg->to_string());
+        }
+    }
+    catch (const mqtt::exception& exc) {
+        ROS_ERROR_STREAM(exc.what());
+        cli.disconnect();
+        return;
+    }
+
+    cli.disconnect();
+}
+
+int main(int argc, char* argv[])
+{
+    ros::init(argc, argv, "mqtt_listener");
+    ros::NodeHandle n;
+
+    receiveMqttMessages();
+
+    ros::spin();
+    return 0;
+}
\ No newline at end of file
diff --git a/src/RobotStateProvider.cpp b/src/RobotStateProvider.cpp
index 33fc60e..b370c3c 100644
--- a/src/RobotStateProvider.cpp
+++ b/src/RobotStateProvider.cpp
@@ -2,9 +2,9 @@
 // Created by sebastian on 31.03.20.
 //
 #include "ros/ros.h"
-#include "std_msgs/String.h"
 #include <gazebo_msgs/LinkStates.h>
-#include <MQTTClient.h>
+#include <mqtt/client.h>
+#include "string.h"
 
 // CONFIGURATION START
 // ^^^^^^^^^^^^^^^^^^^
@@ -22,6 +22,11 @@ bool export_names = true;
 // CONFIGURATION END
 // ^^^^^^^^^^^^^^^^^
 
+const std::string SERVER_ADDRESS { "tcp://localhost:1883" };
+const std::string CLIENT_ID { "ros_mqtt_client" };
+const std::string TOPIC { "robot_data_feed" };
+const int QOS = 1;
+
 int current_redirect = 0;
 bool isInitialized = false;
 
@@ -105,6 +110,21 @@ void providerCallback(const gazebo_msgs::LinkStates & msg)
     }
 }
 
+void sendMqttMessage(){
+    mqtt::client client(SERVER_ADDRESS, CLIENT_ID);
+
+    mqtt::connect_options connOpts;
+    connOpts.set_keep_alive_interval(20);
+    connOpts.set_clean_session(true);
+
+    client.connect(connOpts);
+    auto pubmsg = mqtt::make_message(TOPIC, "test");
+    pubmsg->set_qos(QOS);
+    client.publish(pubmsg);
+
+    client.disconnect();
+}
+
 int main(int argc, char **argv)
 {
     ros::init(argc, argv, "listener");
@@ -117,6 +137,8 @@ int main(int argc, char **argv)
         n.getParam("tud_planner_ready", isInitialized);
     }
 
+    //sendMqttMessage();
+
     ros::Subscriber sub = n.subscribe("/gazebo/link_states", 10000, providerCallback);
     ros::spin();
 
-- 
GitLab