Skip to content
Snippets Groups Projects
Commit 5288fc1c authored by Johannes Mey's avatar Johannes Mey
Browse files

make mqtt server configurable

parent e0fac778
No related branches found
No related tags found
No related merge requests found
......@@ -30,6 +30,16 @@ $ sudo apt-get install libcppunit-dev
### Start the System
* Production-Mode: roslaunch panda_mqtt_connector simulation_rosrag.launch
- the default mqtt server address is defined in config/mqtt.yaml
- it must follow the notation [as specified](https://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html#a9a0518d9ca924d12c1329dbe3de5f2b6) by the employed [Eclipse Paho](https://www.eclipse.org/paho/) mqtt library:
> A null-terminated string specifying the server to which the client will connect.
It takes the form protocol://host:port. Currently, protocol must be tcp or ssl.
For host, you can specify either an IP address or a host name. For instance,
to connect to a server running on the local machines with the default MQTT port,
specify tcp://localhost:1883.
- Please note that SSL is currently not supported.
- The default server address can be changed by adding the parameter `mqtt_server` to the launch file:
`roslaunch panda_mqtt_connector simulation_rosrag.launch mqtt_server:="tcp://<address>:<port>"`
* Test-Mode: roslaunch panda_mqtt_connector simulation_rosrag_test.launch
* Starts an additional node which listens to MQTT channels for testing
* Channel 1: configtest -> manipulates messages (switch on/off the message parts) and the publish-rate
......
panda_mqtt_connector:
server: "tcp://localhost:1883"
<launch>
<arg name="mqtt_server" default="" />
<include file="$(find panda_simulation)/launch/simulation.launch"/>
<!-- the launch file parameter mqtt_server overrides the config file parameter -->
<rosparam file="$(find panda_mqtt_connector)/config/mqtt.yaml" command="load" />
<rosparam unless="$(eval arg('mqtt_server') == '')" param="panda_mqtt_connector/server" subst_value="True">$(arg mqtt_server)</rosparam>
<node name="TimedPlanner" pkg="panda_mqtt_connector" type="TimedPlanner" respawn="false" output="screen"/>
<node name="MqttToRosNode" pkg="panda_mqtt_connector" type="MqttToRosNode" respawn="false" output="screen"/>
<node name="RobotStateProvider" pkg="panda_mqtt_connector" type="RobotStateProvider" respawn="false" output="screen"/>
......
<launch>
<node name="MqttRosConnectionTestNode" pkg="panda_mqtt_connector" type="MqttRosConnectionTestNode" respawn="false" output="screen"/>
</launch>
......@@ -24,7 +24,7 @@ std::string topics[11] = {"panda_ground", "panda_link_0", "panda_link_1", "panda
const string CLIENT_ID{"ros_mqtt_tester"};
MqttUtil mqttUtil(CLIENT_ID);
MqttUtil *mqttUtil = nullptr;
void testTrajectoryUpdate(ros::NodeHandle n) {
......@@ -50,7 +50,7 @@ void testTrajectoryUpdate(ros::NodeHandle n) {
traj.SerializeToString(&mqtt_msg);
auto pubmsg = mqtt::make_message("trajectoryconfig", mqtt_msg);
mqttUtil.getClient().publish(pubmsg);
mqttUtil->getClient().publish(pubmsg);
ROS_INFO_STREAM_NAMED("MqttRosConnectionTestNode", ">>>>>>>>>>>> CHECKING SERVER <<<<<<<<<<<<<");
......@@ -84,7 +84,7 @@ void testPlanningModeChange(ros::NodeHandle n) {
rc.SerializeToString(&mqtt_msg);
auto pubmsg = mqtt::make_message("robotconfig", mqtt_msg);
mqttUtil.getClient().publish(pubmsg);
mqttUtil->getClient().publish(pubmsg);
}
void testConfig(ros::NodeHandle n) {
......@@ -103,14 +103,14 @@ void testConfig(ros::NodeHandle n) {
dc.SerializeToString(&mqtt_msg);
auto pubmsg = mqtt::make_message("dataconfig", mqtt_msg);
mqttUtil.getClient().publish(pubmsg);
mqttUtil->getClient().publish(pubmsg);
}
void receiveMqttMessages(ros::NodeHandle n) {
if (mqttUtil.ensureConnection()) {
if (mqttUtil->ensureConnection()) {
mqtt::const_message_ptr *msg = new mqtt::const_message_ptr;
if (mqttUtil.getClient().try_consume_message_for(msg, std::chrono::milliseconds (500))) {
if (mqttUtil->getClient().try_consume_message_for(msg, std::chrono::milliseconds (500))) {
if (msg->get()->get_topic() == "test_config") {
testConfig(n);
} else if (msg->get()->get_topic() == "test_mode_change") {
......@@ -127,15 +127,23 @@ void receiveMqttMessages(ros::NodeHandle n) {
int main(int argc, char **argv) {
ros::init(argc, argv, "MqttRosConnectionTestNode");
ros::NodeHandle n;
ros::NodeHandle n("panda_mqtt_connector");
ROS_INFO_NAMED("MqttRosConnectionTestNode", "Setting up MQTT.");
mqttUtil.addTopic("test_config");
mqttUtil.addTopic("test_mode_change");
mqttUtil.addTopic("test_trajectory_update");
std::string server;
if (!n.getParam("server", server)) {
ROS_ERROR("Could not get string value for panda_mqtt_connector/server from param server");
return -1;
}
mqttUtil = new MqttUtil(CLIENT_ID, server);
mqttUtil->addTopic("test_config");
mqttUtil->addTopic("test_mode_change");
mqttUtil->addTopic("test_trajectory_update");
mqttUtil.connect();
mqttUtil->connect();
while (ros::ok()) {
receiveMqttMessages(n);
......
......@@ -17,7 +17,7 @@ using namespace std::chrono;
const string CLIENT_ID{"mqtt_to_ros"};
MqttUtil mqttUtil(CLIENT_ID);
MqttUtil *mqttUtil = nullptr;
void handleRobotConfig(config::RobotConfig robotConfig, ros::NodeHandle n) {
std::cout << "Retrieved new target-speed: " << robotConfig.speed() << std::endl;
......@@ -102,9 +102,9 @@ void handleMessage(const ros::NodeHandle &n, const mqtt::const_message_ptr &msg)
}
void receiveMqttMessages(ros::NodeHandle n) {
if (mqttUtil.ensureConnection()) {
if (mqttUtil->ensureConnection()) {
mqtt::const_message_ptr msg;
if (mqttUtil.getClient().try_consume_message_for(&msg, std::chrono::milliseconds(500))) {
if (mqttUtil->getClient().try_consume_message_for(&msg, std::chrono::milliseconds(500))) {
ROS_INFO_STREAM("NEW MESSAGE: " << msg.get()->get_topic() << ": " << msg.get()->to_string());
handleMessage(n, msg);
}
......@@ -115,13 +115,21 @@ void receiveMqttMessages(ros::NodeHandle n) {
int main(int argc, char *argv[]) {
ros::init(argc, argv, "mqtt_listener");
ros::NodeHandle n;
ros::NodeHandle n("panda_mqtt_connector");
mqttUtil.addTopic("robotconfig");
mqttUtil.addTopic("dataconfig");
mqttUtil.addTopic("trajectoryconfig");
std::string server;
if (!n.getParam("server", server)) {
ROS_ERROR("Could not get string value for panda_mqtt_connector/server from param server");
return -1;
}
mqttUtil = new MqttUtil(CLIENT_ID, server);
mqttUtil->addTopic("robotconfig");
mqttUtil->addTopic("dataconfig");
mqttUtil->addTopic("trajectoryconfig");
if (!mqttUtil.connect()) {
if (!mqttUtil->connect()) {
ROS_ERROR_STREAM("Unable to connect to MQTT server. Exiting...");
return -1;
}
......
......@@ -26,7 +26,7 @@ int current_redirect = 0;
const std::string CLIENT_ID{"robot_state_provider_mqtt"};
MqttUtil mqttUtil(CLIENT_ID);
MqttUtil *mqttUtil = nullptr;
/*
* logs to its own file in /.ros/logs (configured in launch-file)
......@@ -100,7 +100,7 @@ panda::PandaLinkState buildLinkStateMessage(const gazebo_msgs::LinkStates &msg,
bool export_twist_linear = false;
bool export_twist_angular = false;
ros::NodeHandle n;
ros::NodeHandle n("panda_mqtt_connector");
if (!n.getParam("data_enable_position", export_position)) {
export_position = false;
......@@ -157,7 +157,7 @@ panda::PandaLinkState buildLinkStateMessage(const gazebo_msgs::LinkStates &msg,
void sendMqttMessages(const gazebo_msgs::LinkStates &msg) {
if (mqttUtil.ensureConnection()) {
if (mqttUtil->ensureConnection()) {
try {
for (int i = 0; i < msg.name.size(); i++) {
panda::PandaLinkState pls_msg = buildLinkStateMessage(msg, i);
......@@ -166,7 +166,7 @@ void sendMqttMessages(const gazebo_msgs::LinkStates &msg) {
auto pubmsg = mqtt::make_message(topics[i], mqtt_msg);
pubmsg->set_qos(QOS);
mqttUtil.getClient().publish(pubmsg);
mqttUtil->getClient().publish(pubmsg);
}
}
// happens at lib paho sometimes when multiple instances of an mqtt-client are publishing at the same time
......@@ -200,9 +200,17 @@ void providerCallback(const gazebo_msgs::LinkStates &msg) {
int main(int argc, char **argv) {
ros::init(argc, argv, "robot_state_provider");
ros::NodeHandle n;
ros::NodeHandle n("panda_mqtt_connector");
std::string server;
if (!n.getParam("server", server)) {
ROS_ERROR("Could not get string value for panda_mqtt_connector/server from param server");
return -1;
}
mqttUtil = new MqttUtil(CLIENT_ID, server);
mqttUtil.connect();
mqttUtil->connect();
ros::Subscriber sub = n.subscribe("/gazebo/link_states", 10000, providerCallback);
ros::spin();
......
......@@ -149,7 +149,7 @@ int main(int argc, char **argv) {
// setup this ros-node
ros::init(argc, argv, "timed_cartesian_planner");
ros::NodeHandle node_handle;
ros::NodeHandle node_handle("panda_mqtt_connector");
ros::AsyncSpinner spinner(1);
spinner.start();
......
......@@ -24,7 +24,7 @@ bool MqttUtil::connect() {
return true;
}
} catch (const mqtt::exception &e) {
ROS_ERROR_STREAM("Unable to connect because of exception " << e.get_message());
ROS_ERROR_STREAM("Unable to connect to " << client.get_server_uri() << ".");
return false;
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment