From 2a56b71688b1d055d231356c2ff68e25a04da116 Mon Sep 17 00:00:00 2001
From: rschoene <rene.schoene@tu-dresden.de>
Date: Mon, 11 May 2020 17:54:17 +0200
Subject: [PATCH] First real test using MQTT to send and receive messages.

- also try to setup CI (might fail)
---
 src/main/resources/MqttUpdater.jadd | 60 ++++++++++++++++-------------
 1 file changed, 33 insertions(+), 27 deletions(-)

diff --git a/src/main/resources/MqttUpdater.jadd b/src/main/resources/MqttUpdater.jadd
index ef3e483..6e986f2 100644
--- a/src/main/resources/MqttUpdater.jadd
+++ b/src/main/resources/MqttUpdater.jadd
@@ -145,16 +145,18 @@ public class MqttUpdater {
 
     // subscribe at broker
     org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) };
-    connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
-      @Override
-      public void onSuccess(byte[] qoses) {
-        logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
-      }
+    connection.getDispatchQueue().execute(() -> {
+      connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
+        @Override
+        public void onSuccess(byte[] qoses) {
+          logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
+        }
 
-      @Override
-      public void onFailure(Throwable cause) {
-        logger.error("Could not subscribe to {}", topic, cause);
-      }
+        @Override
+        public void onFailure(Throwable cause) {
+          logger.error("Could not subscribe to {}", topic, cause);
+        }
+      });
     });
   }
 
@@ -187,30 +189,34 @@ public class MqttUpdater {
       logger.warn("Stopping without connection. Was setHost() called?");
       return;
     }
-    connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() {
-      @Override
-      public void onSuccess(Void value) {
-        logger.info("Disconnected {} from {}", name, host);
-      }
+    connection.getDispatchQueue().execute(() -> {
+      connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() {
+        @Override
+        public void onSuccess(Void value) {
+          logger.info("Disconnected {} from {}", name, host);
+        }
 
-      @Override
-      public void onFailure(Throwable ignored) {
-        // Disconnects never fail. And we do not care either.
-      }
+        @Override
+        public void onFailure(Throwable ignored) {
+          // Disconnects never fail. And we do not care either.
+        }
+      });
     });
   }
 
   public void publish(String topic, byte[] bytes) {
-    connection.publish(topic, bytes, qos, false, new org.fusesource.mqtt.client.Callback<Void>() {
-      @Override
-      public void onSuccess(Void value) {
-        logger.debug("Published some bytes to {}", topic);
-      }
+    connection.getDispatchQueue().execute(() -> {
+      connection.publish(topic, bytes, qos, false, new org.fusesource.mqtt.client.Callback<Void>() {
+        @Override
+        public void onSuccess(Void value) {
+          logger.debug("Published some bytes to {}", topic);
+        }
 
-      @Override
-      public void onFailure(Throwable value) {
-        logger.warn("Could not publish on topic '{}'", topic);
-      }
+        @Override
+        public void onFailure(Throwable value) {
+          logger.warn("Could not publish on topic '{}'", topic);
+        }
+      });
     });
   }
 }
-- 
GitLab