Skip to content
Snippets Groups Projects
Select Git revision
  • 8d258e75e9e16007d24af80ae5ac95b3d619f68f
  • master default protected
2 results

MqttRosConnectionTestNode.cpp

Blame
  • MqttRosConnectionTestNode.cpp 4.42 KiB
    //
    // Created by sebastian on 31.03.20.
    //
    #include "ros/ros.h"
    #include <gazebo_msgs/LinkStates.h>
    #include <mqtt/client.h>
    #include "string.h"
    #include "robotconfig.pb.h"
    #include "dataconfig.pb.h"
    #include "mem_persistence.cpp"
    
    using namespace std;
    using namespace std::chrono;
    
    /*
     * mqtt-topics for all links
     * ground, link0, link1, link2, link3, link4, link5, link6, link7 (end-effector), left finger, right finder
     */
    std::string topics[11] = {"panda_link_0", "panda_link_1", "panda_link_2", "panda_link_3",
                              "panda_link_4", "panda_link_5", "panda_link_6", "panda_link_7",
                              "panda_link_8", "panda_link_9", "panda_link_10"};
    
    const string SERVER_ADDRESS	{ "tcp://localhost:1883" };
    const string CLIENT_ID		{ "ros_mqtt_tester" };
    
    bool mqttSetup = false;
    bool isInitialized = false;
    
    const int QOS = 1;
    
    mem_persistence persist;
    mqtt::client client("tcp://localhost:1883", "robot_state_provider_mqtt_test", &persist);
    
    void setupMqtt() {
    
        if (!mqttSetup) {
            std::cout << "MQTT: Initializing." << std::endl;
    
            mqtt::connect_options connOpts;
            connOpts.set_keep_alive_interval(20);
            connOpts.set_clean_session(true);
            std::cout << "MQTT: Initialized." << std::endl;
    
            std::cout << "MQTT: Connecting." << std::endl;
            client.connect(connOpts);
            std::cout << "MQTT: Connected." << std::endl;
            mqttSetup = true;
        }
    }
    
    bool try_reconnect(mqtt::client& cli)
    {
        constexpr int N_ATTEMPT = 30;
    
        for (int i=0; i<N_ATTEMPT && !cli.is_connected(); ++i) {
            try {
                cli.reconnect();
                return true;
            }
            catch (const mqtt::exception&) {
                this_thread::sleep_for(seconds(1));
            }
        }
        return false;
    }
    
    
    void testConfig(ros::NodeHandle n) {
    
        std::cout << ">>>>>>>>>>>> STARTING TEST <<<<<<<<<<<<<" << std::endl;
        setupMqtt();
    
        config::DataConfig dc;
    
        dc.set_enableposition(true);
        dc.set_enableorientation(true);
        dc.set_enabletwistangular(true);
        dc.set_enabletwistlinear(true);
        dc.set_publishrate(1000);
    
        std::string mqtt_msg;
        dc.SerializeToString(&mqtt_msg);
    
        auto pubmsg = mqtt::make_message("dataconfig", mqtt_msg);
        pubmsg->set_qos(QOS);
        client.publish(pubmsg);
    
    }
    
    void receiveMqttMessages(ros::NodeHandle n)
    {
        mqtt::connect_options connOpts;
        mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);
    
        connOpts.set_keep_alive_interval(20);
        connOpts.set_clean_session(true);
        try {
            ROS_INFO_STREAM("TEST Connecting to the MQTT server.");
            mqtt::connect_response rsp = cli.connect(connOpts);
            ROS_INFO_STREAM("TEST Connected to the MQTT server.");
    
            if (!rsp.is_session_present()) {
                ROS_INFO_STREAM("TEST Subscribing to topics.");
                cli.subscribe("nodetest", 1);
                ROS_INFO_STREAM("TEST Subscribed to topics.");
            }else {
                ROS_WARN_STREAM("TEST Session already present. Skipping subscribe.");
            }
    
            // Consume messages
            while (true) {
    
                auto msg = cli.consume_message();
    
                if (!msg) {
                    if (!cli.is_connected()) {
                        ROS_ERROR_STREAM("TEST Lost connection. Attempting reconnect");
                        if (try_reconnect(cli)) {
                            cli.subscribe("nodetest", 1);
                            ROS_INFO_STREAM("TEST Reconnected");
                            continue;
                        }
                        else {
                            ROS_ERROR_STREAM("TEST Reconnect failed.");
                        }
                    }
                    else {
                        ROS_ERROR_STREAM("TEST An error occurred retrieving messages.");
                    }
                    break;
                }
    
                if (msg->get_topic() == "nodetest")
                {
                    testConfig(n);
                }
            }
        }
        catch (const mqtt::exception& exc) {
            ROS_ERROR_STREAM("TEST: " << exc.what());
            cli.disconnect();
            return;
        }
    
        cli.disconnect();
    }
    
    int main(int argc, char **argv) {
        ros::init(argc, argv, "robot_state_provider_test");
        ros::NodeHandle n;
        n.getParam("ready", isInitialized);
    
        while (ros::ok()) {
    
            // make sure the robot is initialized
            while (!isInitialized) {
                n.getParam("tud_planner_ready", isInitialized);
            }
    
            receiveMqttMessages(n);
            ros::spin();
        }
    
        // disconnect from mqtt
        client.disconnect();
        return 0;
    }