Select Git revision
MqttRosConnectionTestNode.cpp

Sebastian Ebert authored
MqttRosConnectionTestNode.cpp 4.42 KiB
//
// Created by sebastian on 31.03.20.
//
#include "ros/ros.h"
#include <gazebo_msgs/LinkStates.h>
#include <mqtt/client.h>
#include "string.h"
#include "robotconfig.pb.h"
#include "dataconfig.pb.h"
#include "mem_persistence.cpp"
using namespace std;
using namespace std::chrono;
/*
* mqtt-topics for all links
* ground, link0, link1, link2, link3, link4, link5, link6, link7 (end-effector), left finger, right finder
*/
std::string topics[11] = {"panda_link_0", "panda_link_1", "panda_link_2", "panda_link_3",
"panda_link_4", "panda_link_5", "panda_link_6", "panda_link_7",
"panda_link_8", "panda_link_9", "panda_link_10"};
const string SERVER_ADDRESS { "tcp://localhost:1883" };
const string CLIENT_ID { "ros_mqtt_tester" };
bool mqttSetup = false;
bool isInitialized = false;
const int QOS = 1;
mem_persistence persist;
mqtt::client client("tcp://localhost:1883", "robot_state_provider_mqtt_test", &persist);
void setupMqtt() {
if (!mqttSetup) {
std::cout << "MQTT: Initializing." << std::endl;
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
std::cout << "MQTT: Initialized." << std::endl;
std::cout << "MQTT: Connecting." << std::endl;
client.connect(connOpts);
std::cout << "MQTT: Connected." << std::endl;
mqttSetup = true;
}
}
bool try_reconnect(mqtt::client& cli)
{
constexpr int N_ATTEMPT = 30;
for (int i=0; i<N_ATTEMPT && !cli.is_connected(); ++i) {
try {
cli.reconnect();
return true;
}
catch (const mqtt::exception&) {
this_thread::sleep_for(seconds(1));
}
}
return false;
}
void testConfig(ros::NodeHandle n) {
std::cout << ">>>>>>>>>>>> STARTING TEST <<<<<<<<<<<<<" << std::endl;
setupMqtt();
config::DataConfig dc;
dc.set_enableposition(true);
dc.set_enableorientation(true);
dc.set_enabletwistangular(true);
dc.set_enabletwistlinear(true);
dc.set_publishrate(1000);
std::string mqtt_msg;
dc.SerializeToString(&mqtt_msg);
auto pubmsg = mqtt::make_message("dataconfig", mqtt_msg);
pubmsg->set_qos(QOS);
client.publish(pubmsg);
}
void receiveMqttMessages(ros::NodeHandle n)
{
mqtt::connect_options connOpts;
mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
try {
ROS_INFO_STREAM("TEST Connecting to the MQTT server.");
mqtt::connect_response rsp = cli.connect(connOpts);
ROS_INFO_STREAM("TEST Connected to the MQTT server.");
if (!rsp.is_session_present()) {
ROS_INFO_STREAM("TEST Subscribing to topics.");
cli.subscribe("nodetest", 1);
ROS_INFO_STREAM("TEST Subscribed to topics.");
}else {
ROS_WARN_STREAM("TEST Session already present. Skipping subscribe.");
}
// Consume messages
while (true) {
auto msg = cli.consume_message();
if (!msg) {
if (!cli.is_connected()) {
ROS_ERROR_STREAM("TEST Lost connection. Attempting reconnect");
if (try_reconnect(cli)) {
cli.subscribe("nodetest", 1);
ROS_INFO_STREAM("TEST Reconnected");
continue;
}
else {
ROS_ERROR_STREAM("TEST Reconnect failed.");
}
}
else {
ROS_ERROR_STREAM("TEST An error occurred retrieving messages.");
}
break;
}
if (msg->get_topic() == "nodetest")
{
testConfig(n);
}
}
}
catch (const mqtt::exception& exc) {
ROS_ERROR_STREAM("TEST: " << exc.what());
cli.disconnect();
return;
}
cli.disconnect();
}
int main(int argc, char **argv) {
ros::init(argc, argv, "robot_state_provider_test");
ros::NodeHandle n;
n.getParam("ready", isInitialized);
while (ros::ok()) {
// make sure the robot is initialized
while (!isInitialized) {
n.getParam("tud_planner_ready", isInitialized);
}
receiveMqttMessages(n);
ros::spin();
}
// disconnect from mqtt
client.disconnect();
return 0;
}