Select Git revision
MqttToRosNode.cpp
Forked from
CeTI / ROS / ROS Packages / panda_simulation
24 commits behind, 20 commits ahead of the upstream repository.

Sebastian Ebert authored
MqttToRosNode.cpp 4.47 KiB
#include <iostream>
#include <string>
#include <cstring>
#include <thread>
#include <chrono>
#include "mqtt/client.h"
#include "ros/ros.h"
#include "messages/robotconfig.pb.h"
#include "messages/dataconfig.pb.h"
using namespace std;
using namespace std::chrono;
const string SERVER_ADDRESS { "tcp://localhost:1883" };
const string CLIENT_ID { "ros_mqtt_consumer" };
// Simple function to manually reconnect a client.
bool try_reconnect(mqtt::client& cli)
{
constexpr int N_ATTEMPT = 30;
for (int i=0; i<N_ATTEMPT && !cli.is_connected(); ++i) {
try {
cli.reconnect();
return true;
}
catch (const mqtt::exception&) {
this_thread::sleep_for(seconds(1));
}
}
return false;
}
void handleRobotConfig(config::RobotConfig robotConfig, ros::NodeHandle n)
{
std::cout << "Retrieved new target-speed: " << robotConfig.speed() << std::endl;
n.setParam("robot_speed_factor", robotConfig.speed());
}
void handleDataConfig(config::DataConfig dataConfig, ros::NodeHandle n)
{
std::cout << "Retrieved new data-config: -- publish-rate: " << dataConfig.publishrate()
<< " -- enablePosition: " << dataConfig.enableposition()
<< " -- enableOrientation: " << dataConfig.enableorientation()
<< " -- enableTwistLinear: " << dataConfig.enabletwistlinear()
<< " -- enableTwistAngular: " << dataConfig.enabletwistangular()
<< std::endl;
n.setParam("data_publish_rate", dataConfig.publishrate());
n.setParam("data_enable_position", dataConfig.enableposition());
n.setParam("data_enable_orientation", dataConfig.enableorientation());
n.setParam("data_enable_twist_linear", dataConfig.enabletwistlinear());
n.setParam("data_enable_twist_angular", dataConfig.enabletwistangular());
}
void receiveMqttMessages(ros::NodeHandle n)
{
mqtt::connect_options connOpts;
mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
const vector<string> TOPICS { "robotconfig", "dataconfig" };
const vector<int> QOS { 0, 1 };
try {
ROS_INFO_STREAM("Connecting to the MQTT server.");
mqtt::connect_response rsp = cli.connect(connOpts);
ROS_INFO_STREAM("Connected to the MQTT server.");
if (!rsp.is_session_present()) {
ROS_INFO_STREAM("Subscribing to topics.");
cli.subscribe(TOPICS, QOS);
ROS_INFO_STREAM("Subscribed to topics.");
}else {
ROS_WARN_STREAM("Session already present. Skipping subscribe.");
}
// Consume messages
while (true) {
auto msg = cli.consume_message();
if (!msg) {
if (!cli.is_connected()) {
ROS_ERROR_STREAM("Lost connection. Attempting reconnect");
if (try_reconnect(cli)) {
cli.subscribe(TOPICS, QOS);
ROS_INFO_STREAM("Reconnected");
continue;
}
else {
ROS_ERROR_STREAM("Reconnect failed.");
}
}
else {
ROS_ERROR_STREAM("An error occurred retrieving messages.");
}
break;
}
if (msg->get_topic() == "robotconfig")
{
const std::string rc_payload = msg->get_payload_str();
config::RobotConfig robotConfig;
robotConfig.ParseFromString(rc_payload);
handleRobotConfig(robotConfig, n);
}
if (msg->get_topic() == "dataconfig")
{
const std::string dc_payload = msg->get_payload_str();
config::DataConfig dataConfig;
dataConfig.ParseFromString(dc_payload);
handleDataConfig(dataConfig, n);
}
//ROS_INFO_STREAM("NEW MESSAGE: " << msg->get_topic() << ": " << msg->to_string());
}
}
catch (const mqtt::exception& exc) {
ROS_ERROR_STREAM("MQTT2ROS " << exc.what());
cli.disconnect();
return;
}
cli.disconnect();
}
int main(int argc, char* argv[])
{
ros::init(argc, argv, "mqtt_listener");
ros::NodeHandle n;
receiveMqttMessages(n);
ros::spin();
return 0;
}