Skip to content
Snippets Groups Projects
Commit 2a56b716 authored by René Schöne's avatar René Schöne
Browse files

First real test using MQTT to send and receive messages.

- also try to setup CI (might fail)
parent 81c1b10c
No related branches found
No related tags found
No related merge requests found
...@@ -145,6 +145,7 @@ public class MqttUpdater { ...@@ -145,6 +145,7 @@ public class MqttUpdater {
// subscribe at broker // subscribe at broker
org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) };
connection.getDispatchQueue().execute(() -> {
connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() { connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
@Override @Override
public void onSuccess(byte[] qoses) { public void onSuccess(byte[] qoses) {
...@@ -156,6 +157,7 @@ public class MqttUpdater { ...@@ -156,6 +157,7 @@ public class MqttUpdater {
logger.error("Could not subscribe to {}", topic, cause); logger.error("Could not subscribe to {}", topic, cause);
} }
}); });
});
} }
/** /**
...@@ -187,6 +189,7 @@ public class MqttUpdater { ...@@ -187,6 +189,7 @@ public class MqttUpdater {
logger.warn("Stopping without connection. Was setHost() called?"); logger.warn("Stopping without connection. Was setHost() called?");
return; return;
} }
connection.getDispatchQueue().execute(() -> {
connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() { connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() {
@Override @Override
public void onSuccess(Void value) { public void onSuccess(Void value) {
...@@ -198,9 +201,11 @@ public class MqttUpdater { ...@@ -198,9 +201,11 @@ public class MqttUpdater {
// Disconnects never fail. And we do not care either. // Disconnects never fail. And we do not care either.
} }
}); });
});
} }
public void publish(String topic, byte[] bytes) { public void publish(String topic, byte[] bytes) {
connection.getDispatchQueue().execute(() -> {
connection.publish(topic, bytes, qos, false, new org.fusesource.mqtt.client.Callback<Void>() { connection.publish(topic, bytes, qos, false, new org.fusesource.mqtt.client.Callback<Void>() {
@Override @Override
public void onSuccess(Void value) { public void onSuccess(Void value) {
...@@ -212,5 +217,6 @@ public class MqttUpdater { ...@@ -212,5 +217,6 @@ public class MqttUpdater {
logger.warn("Could not publish on topic '{}'", topic); logger.warn("Could not publish on topic '{}'", topic);
} }
}); });
});
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment