Skip to content
Snippets Groups Projects
Commit 5e79c0a2 authored by Sebastian Ebert's avatar Sebastian Ebert
Browse files

stubs for retreiving and sending mqtt-messages between ros and a mqtt-broker,...

stubs for retreiving and sending mqtt-messages between ros and a mqtt-broker, updated cmake with eclipse-paho, new launch-file for ros-rag-connection
parent 38734902
No related branches found
No related tags found
No related merge requests found
#cmake_minimum_required(VERSION 3.10)
#project(MY_PROJECT)
#set(CMAKE_CXX_STANDARD 14)
#add_executable(MY_PROJECT main.cpp)
# Perform linking of the paho lib
#target_link_libraries(MY_PROJECT LINK_PUBLIC ${PahoMqtt} ${PahoMqttCpp})
# LINK_PUBLIC vs LINK_PRIVATE: https://cmake.org/cmake/help/v3.10/command/target_link_libraries.html
cmake_minimum_required(VERSION 2.8.3)
project(panda_simulation)
......@@ -31,6 +42,18 @@ find_package(catkin REQUIRED
# System dependencies are found with CMake's conventions
find_package(Boost REQUIRED COMPONENTS filesystem)
find_library(PahoMqtt3c libpaho-mqtt3c.so REQUIRED)
find_library(PahoMqtt3cs libpaho-mqtt3cs.so REQUIRED)
find_library(PahoMqtt3a libpaho-mqtt3a.so REQUIRED)
find_library(PahoMqtt3as libpaho-mqtt3as.so REQUIRED)
find_library(PahoMqttCpp libpaho-mqttpp3.so REQUIRED)
message("Lib PahoMqtt3c is at: ${PahoMqtt3c}")
message("Lib PahoMqtt3cs is at: ${PahoMqtt3cs}")
message("Lib PahoMqtt3a is at: ${PahoMqtt3a}")
message("Lib PahoMqtt3as is at: ${PahoMqtt3as}")
message("Lib PahoMqttCpp is at: ${PahoMqttCpp}")
find_package(PkgConfig REQUIRED)
pkg_check_modules(JSONCPP jsoncpp)
message(${JSONCPP_LIBRARIES})
......@@ -46,8 +69,8 @@ catkin_package(CATKIN_DEPENDS
controller_interface
hardware_interface
pluginlib
#DEPENDS
#libpaho-mqtt
DEPENDS
#libpaho-mqtt3
# system_lib
)
......@@ -71,11 +94,25 @@ add_executable(SafetyAwarePlanner src/SafetyAwarePlanner.cpp src/SafetyEnvironme
add_executable(SampleConstraintPlanner src/SampleConstraintPlanner.cpp)
add_executable(RobotStateProvider src/RobotStateProvider.cpp)
add_executable(TimedCartesianPlanner src/TimedCartesianPlanner.cpp)
add_executable(MqttToRosNode src/MqttToRosNode.cpp)
target_link_libraries(robot_control_node ${catkin_LIBRARIES} boost_filesystem ${JSONCPP_LIBRARIES})
target_link_libraries(box_publisher_node ${catkin_LIBRARIES})
target_link_libraries(robot_state_initializer_node ${catkin_LIBRARIES})
target_link_libraries(SafetyAwarePlanner ${catkin_LIBRARIES})
target_link_libraries(SampleConstraintPlanner ${catkin_LIBRARIES} ${Boost_LIBRARIES})
target_link_libraries(RobotStateProvider ${catkin_LIBRARIES})
target_link_libraries(RobotStateProvider LINK_PUBLIC
${catkin_LIBRARIES}
${PahoMqtt3c}
${PahoMqtt3cs}
${PahoMqtt3a}
${PahoMqtt3as}
${PahoMqttCpp})
target_link_libraries(MqttToRosNode LINK_PUBLIC
${catkin_LIBRARIES}
${PahoMqtt3c}
${PahoMqtt3cs}
${PahoMqtt3a}
${PahoMqtt3as}
${PahoMqttCpp})
target_link_libraries(TimedCartesianPlanner ${catkin_LIBRARIES})
\ No newline at end of file
......@@ -58,6 +58,7 @@
<node pkg="panda_simulation" type="robot_state_initializer_node" name="robot_state_initializer_node" />
<node name="TimedCartesianPlanner" pkg="panda_simulation" type="TimedCartesianPlanner" respawn="false" output="screen"/>
<node name="MqttToRosNode" pkg="panda_simulation" type="MqttToRosNode" respawn="false" output="screen"/>
<node name="RobotStateProvider" pkg="panda_simulation" type="RobotStateProvider"/>
</launch>
#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include "mqtt/client.h"
#include "ros/ros.h"
using namespace std;
using namespace std::chrono;
const string SERVER_ADDRESS { "tcp://localhost:1883" };
const string CLIENT_ID { "sync_consume_cpp2" };
// --------------------------------------------------------------------------
// Simple function to manually reconnect a client.
bool try_reconnect(mqtt::client& cli)
{
constexpr int N_ATTEMPT = 30;
for (int i=0; i<N_ATTEMPT && !cli.is_connected(); ++i) {
try {
cli.reconnect();
return true;
}
catch (const mqtt::exception&) {
this_thread::sleep_for(seconds(1));
}
}
return false;
}
/////////////////////////////////////////////////////////////////////////////
void handleCommand(auto msg)
{
// TBD
}
void handleConfig(auto msg)
{
// TBD
}
void receiveMqttMessages()
{
mqtt::connect_options connOpts;
mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
const vector<string> TOPICS { "config", "command" };
const vector<int> QOS { 0, 1 };
try {
ROS_INFO_STREAM("Connecting to the MQTT server.");
mqtt::connect_response rsp = cli.connect(connOpts);
ROS_INFO_STREAM("Connected to the MQTT server.");
if (!rsp.is_session_present()) {
ROS_INFO_STREAM("Subscribing to topics.");
cli.subscribe(TOPICS, QOS);
ROS_INFO_STREAM("Subscribed to topics.");
}else {
ROS_WARN_STREAM("Session already present. Skipping subscribe.");
}
// Consume messages
while (true) {
auto msg = cli.consume_message();
if (!msg) {
if (!cli.is_connected()) {
ROS_ERROR_STREAM("Lost connection. Attempting reconnect");
if (try_reconnect(cli)) {
cli.subscribe(TOPICS, QOS);
ROS_INFO_STREAM("Reconnected");
continue;
}
else {
ROS_ERROR_STREAM("Reconnect failed.");
}
}
else {
ROS_ERROR_STREAM("An error occurred retrieving messages.");
}
break;
}
if (msg->get_topic() == "command")
{
handleCommand(msg);
}
if (msg->get_topic() == "config")
{
handleConfig(msg);
}
ROS_INFO_STREAM("NEW MESSAGE: " << msg->get_topic() << ": " << msg->to_string());
}
}
catch (const mqtt::exception& exc) {
ROS_ERROR_STREAM(exc.what());
cli.disconnect();
return;
}
cli.disconnect();
}
int main(int argc, char* argv[])
{
ros::init(argc, argv, "mqtt_listener");
ros::NodeHandle n;
receiveMqttMessages();
ros::spin();
return 0;
}
\ No newline at end of file
......@@ -2,9 +2,9 @@
// Created by sebastian on 31.03.20.
//
#include "ros/ros.h"
#include "std_msgs/String.h"
#include <gazebo_msgs/LinkStates.h>
#include <MQTTClient.h>
#include <mqtt/client.h>
#include "string.h"
// CONFIGURATION START
// ^^^^^^^^^^^^^^^^^^^
......@@ -22,6 +22,11 @@ bool export_names = true;
// CONFIGURATION END
// ^^^^^^^^^^^^^^^^^
const std::string SERVER_ADDRESS { "tcp://localhost:1883" };
const std::string CLIENT_ID { "ros_mqtt_client" };
const std::string TOPIC { "robot_data_feed" };
const int QOS = 1;
int current_redirect = 0;
bool isInitialized = false;
......@@ -105,6 +110,21 @@ void providerCallback(const gazebo_msgs::LinkStates & msg)
}
}
void sendMqttMessage(){
mqtt::client client(SERVER_ADDRESS, CLIENT_ID);
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
client.connect(connOpts);
auto pubmsg = mqtt::make_message(TOPIC, "test");
pubmsg->set_qos(QOS);
client.publish(pubmsg);
client.disconnect();
}
int main(int argc, char **argv)
{
ros::init(argc, argv, "listener");
......@@ -117,6 +137,8 @@ int main(int argc, char **argv)
n.getParam("tud_planner_ready", isInitialized);
}
//sendMqttMessage();
ros::Subscriber sub = n.subscribe("/gazebo/link_states", 10000, providerCallback);
ros::spin();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment