Skip to content
Snippets Groups Projects
Select Git revision
  • 8cdb20469c6aba4130324270cad79b0354b510ef
  • master default protected
  • v1.0
3 results

MqttToRosNode.cpp

Blame
  • MqttToRosNode.cpp 4.47 KiB
    #include <iostream>
    #include <string>
    #include <cstring>
    #include <thread>
    #include <chrono>
    #include "mqtt/client.h"
    
    #include "ros/ros.h"
    
    #include "messages/robotconfig.pb.h"
    #include "messages/dataconfig.pb.h"
    
    using namespace std;
    using namespace std::chrono;
    
    const string SERVER_ADDRESS	{ "tcp://localhost:1883" };
    const string CLIENT_ID		{ "ros_mqtt_consumer" };
    
    
    // Simple function to manually reconnect a client.
    bool try_reconnect(mqtt::client& cli)
    {
        constexpr int N_ATTEMPT = 30;
    
        for (int i=0; i<N_ATTEMPT && !cli.is_connected(); ++i) {
            try {
                cli.reconnect();
                return true;
            }
            catch (const mqtt::exception&) {
                this_thread::sleep_for(seconds(1));
            }
        }
        return false;
    }
    
    void handleRobotConfig(config::RobotConfig robotConfig, ros::NodeHandle n)
    {
        std::cout << "Retrieved new target-speed: " << robotConfig.speed() << std::endl;
        n.setParam("robot_speed_factor", robotConfig.speed());
    }
    
    void handleDataConfig(config::DataConfig dataConfig, ros::NodeHandle n)
    {
        std::cout << "Retrieved new data-config: -- publish-rate: " << dataConfig.publishrate()
                                            << " -- enablePosition: " << dataConfig.enableposition()
                                            << " -- enableOrientation: " << dataConfig.enableorientation()
                                            << " -- enableTwistLinear: " << dataConfig.enabletwistlinear()
                                            << " -- enableTwistAngular: " << dataConfig.enabletwistangular()
                                            << std::endl;
    
        n.setParam("data_publish_rate", dataConfig.publishrate());
        n.setParam("data_enable_position", dataConfig.enableposition());
        n.setParam("data_enable_orientation", dataConfig.enableorientation());
        n.setParam("data_enable_twist_linear", dataConfig.enabletwistlinear());
        n.setParam("data_enable_twist_angular", dataConfig.enabletwistangular());
    }
    
    void receiveMqttMessages(ros::NodeHandle n)
    {
        mqtt::connect_options connOpts;
        mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);
    
        connOpts.set_keep_alive_interval(20);
        connOpts.set_clean_session(true);
    
        const vector<string> TOPICS { "robotconfig", "dataconfig" };
        const vector<int> QOS { 0, 1 };
    
        try {
            ROS_INFO_STREAM("Connecting to the MQTT server.");
            mqtt::connect_response rsp = cli.connect(connOpts);
            ROS_INFO_STREAM("Connected to the MQTT server.");
    
            if (!rsp.is_session_present()) {
                ROS_INFO_STREAM("Subscribing to topics.");
                cli.subscribe(TOPICS, QOS);
                ROS_INFO_STREAM("Subscribed to topics.");
            }else {
                ROS_WARN_STREAM("Session already present. Skipping subscribe.");
             }
    
            // Consume messages
            while (true) {
    
                auto msg = cli.consume_message();
    
                if (!msg) {
                    if (!cli.is_connected()) {
                        ROS_ERROR_STREAM("Lost connection. Attempting reconnect");
                        if (try_reconnect(cli)) {
                            cli.subscribe(TOPICS, QOS);
                            ROS_INFO_STREAM("Reconnected");
                            continue;
                        }
                        else {
                            ROS_ERROR_STREAM("Reconnect failed.");
                        }
                    }
                    else {
                        ROS_ERROR_STREAM("An error occurred retrieving messages.");
                    }
                    break;
                }
    
                if (msg->get_topic() == "robotconfig")
                {
                    const std::string rc_payload = msg->get_payload_str();
                    config::RobotConfig robotConfig;
                    robotConfig.ParseFromString(rc_payload);
                    handleRobotConfig(robotConfig, n);
                }
    
                if (msg->get_topic() == "dataconfig")
                {
                    const std::string dc_payload = msg->get_payload_str();
                    config::DataConfig dataConfig;
                    dataConfig.ParseFromString(dc_payload);
                    handleDataConfig(dataConfig, n);
                }
    
                //ROS_INFO_STREAM("NEW MESSAGE: " << msg->get_topic() << ": " << msg->to_string());
            }
        }
        catch (const mqtt::exception& exc) {
            ROS_ERROR_STREAM("MQTT2ROS " << exc.what());
            cli.disconnect();
            return;
        }
    
        cli.disconnect();
    }
    
    int main(int argc, char* argv[])
    {
        ros::init(argc, argv, "mqtt_listener");
        ros::NodeHandle n;
    
        receiveMqttMessages(n);
    
        ros::spin();
        return 0;
    }