/** @file RobotStateProvider.cpp * * @author Johannes Mey * @date 31.03.20 */ #include "ros/ros.h" #include <mqtt/client.h> #include <tf2_ros/transform_listener.h> #include <geometry_msgs/TransformStamped.h> #include <geometry_msgs/PoseStamped.h> #include "robot_state.pb.h" #include "util/MqttUtil.h" namespace robot_state_provider { /** * A map from the ROS element name to an MQTT topic */ std::map<std::string, std::string> topics{}; /** * The MQTT QOS is set to 0, meaning each message is received *at most* once. */ constexpr int QOS = 0; const std::string CLIENT_ID = "robot_state_provider_mqtt"; MqttUtil *mqttUtil = nullptr; void sendMqttMessages(const robot::RobotState &pls_msg) { if (mqttUtil->ensureConnection()) { try { std::string mqtt_msg; if (!pls_msg.SerializeToString(&mqtt_msg)) { ROS_ERROR_STREAM("Serialization of MQTT message has failed."); } auto pubmsg = mqtt::make_message(pls_msg.name(), mqtt_msg); pubmsg->set_qos(QOS); mqttUtil->getClient().publish(pubmsg); } catch (const mqtt::exception &exc) { ROS_ERROR_STREAM("Unable to publish robot state message. " << exc.what()); return; } } else { ROS_ERROR_STREAM("Not connected! Unable to listen to messages."); } } void readTopicList(const ros::NodeHandle &n) { const std::vector<std::string> elementTypes{"/parts/", "/end_effectors/"}; std::vector<std::string> parameterNames; n.getParamNames(parameterNames); std::set<std::string> groups{}; for (const auto &elementType : elementTypes) { // get the groups for (const auto ¶m : parameterNames) { std::string prefix{n.getNamespace() + elementType}; if (param.rfind(prefix, 0) == 0) { std::string rest{param.substr(prefix.size())}; std::string element{rest.substr(0, rest.rfind('/'))}; groups.insert(element); } } // get the elements in the group for (const auto &group: groups) { std::map<std::string, std::string> element_topics{}; std::string key{n.getNamespace() + elementType + group}; if (!n.getParam(key, element_topics)) { ROS_ERROR_STREAM("Unable to retrieve value for " << key); } for (const auto &pair : element_topics) { std::string mqttTopic{group + "/" + pair.first}; topics[pair.second] = mqttTopic; } } } ROS_INFO_STREAM("Publishing the state of " << topics.size() << " ROS elements to MQTT."); for (const auto &pair : topics) { ROS_INFO_STREAM("ROS element: '" << pair.first << "' \t\tMQTT topic: '" << pair.second << "'"); } } } int main(int argc, char **argv) { ros::init(argc, argv, "robot_state_provider"); ros::NodeHandle n("panda_mqtt_connector"); std::string server; if (!n.getParam("server", server)) { ROS_ERROR_STREAM("Could not get string value for " << n.getNamespace() << "/server from param server"); return -1; } robot_state_provider::readTopicList(n); robot_state_provider::mqttUtil = new MqttUtil(robot_state_provider::CLIENT_ID, server); robot_state_provider::mqttUtil->connect(); tf2_ros::Buffer tfBuffer; tf2_ros::TransformListener tfListener(tfBuffer); int message_redirect_rate; if (!n.getParam("data_publish_rate", message_redirect_rate)) { message_redirect_rate = 200; // fallback } // map from ROS topic to ROS publisher std::map<std::string, ros::Publisher> publishers{}; // advertise the ROS messages for (const auto& topic : robot_state_provider::topics) { publishers[topic.second] = n.advertise<geometry_msgs::PoseStamped>(topic.second, 10); } ros::Rate rate(message_redirect_rate); while (n.ok()){ for (const auto& topic : robot_state_provider::topics) { geometry_msgs::TransformStamped transformStamped; try{ transformStamped = tfBuffer.lookupTransform("world", topic.first,ros::Time(0)); } catch (tf2::TransformException &ex) { ROS_WARN("%s",ex.what()); ros::Duration(0.1).sleep(); continue; } // publish the ROS message geometry_msgs::PoseStamped ros_msg; ros_msg.header.stamp = transformStamped.header.stamp; ros_msg.header.frame_id = topic.first; ros_msg.pose.position.x = transformStamped.transform.translation.x; ros_msg.pose.position.y = transformStamped.transform.translation.y; ros_msg.pose.position.z = transformStamped.transform.translation.z; ros_msg.pose.orientation.w = transformStamped.transform.rotation.w; ros_msg.pose.orientation.x = transformStamped.transform.rotation.x; ros_msg.pose.orientation.y = transformStamped.transform.rotation.y; ros_msg.pose.orientation.z = transformStamped.transform.rotation.z; publishers[topic.second].publish(ros_msg); // publish the MQTT message robot::RobotState pls_msg; // set the name to the MQTT topic provided by the topics map pls_msg.set_name(topic.second); auto *pos = new robot::RobotState_Position(); pos->set_x(transformStamped.transform.translation.x); pos->set_y(transformStamped.transform.translation.y); pos->set_z(transformStamped.transform.translation.z); pls_msg.set_allocated_position(pos); auto *orient = new robot::RobotState_Orientation(); orient->set_w(transformStamped.transform.rotation.w); orient->set_x(transformStamped.transform.rotation.x); orient->set_y(transformStamped.transform.rotation.y); orient->set_z(transformStamped.transform.rotation.z); pls_msg.set_allocated_orientation(orient); robot_state_provider::sendMqttMessages(pls_msg); } rate.sleep(); } return 0; }