From e4cfda69be183cde31bb6d4c041ece9f03384e1d Mon Sep 17 00:00:00 2001
From: rschoene <rene.schoene@tu-dresden.de>
Date: Fri, 8 Mar 2019 18:30:15 +0100
Subject: [PATCH] Updated to new mqtt binding.

---
 ESH-INF/binding/binding.xml                   |   2 +-
 META-INF/MANIFEST.MF                          |   2 +
 .../handler/AbstractMqttHandler.java          | 166 ++++++++++--------
 .../internal/ConfigurationHolder.java         |   4 +
 .../internal/OpenLichtHandlerFactory.java     |  18 ++
 5 files changed, 121 insertions(+), 71 deletions(-)

diff --git a/ESH-INF/binding/binding.xml b/ESH-INF/binding/binding.xml
index cdc1fdc..679dad4 100644
--- a/ESH-INF/binding/binding.xml
+++ b/ESH-INF/binding/binding.xml
@@ -4,7 +4,7 @@
 	xsi:schemaLocation="http://eclipse.org/smarthome/schemas/binding/v1.0.0 http://eclipse.org/smarthome/schemas/binding-1.0.0.xsd">
 
 	<name>OpenLicht Binding</name>
-	<description>This is the binding for OpenLicht (Last Update: 2018-10-01 15:07).</description>
+	<description>This is the binding for OpenLicht (Last Update: 2019-03-08 16:38).</description>
 	<author>René Schöne</author>
 
 </binding:binding>
diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF
index e81e710..35ea27a 100644
--- a/META-INF/MANIFEST.MF
+++ b/META-INF/MANIFEST.MF
@@ -10,6 +10,7 @@ Bundle-Version: 2.3.0.qualifier
 Import-Package: 
  org.eclipse.jdt.annotation;resolution:=optional,
  org.eclipse.smarthome.config.core,
+ org.eclipse.smarthome.core.common.registry,
  org.eclipse.smarthome.core.library.types,
  org.eclipse.smarthome.core.thing,
  org.eclipse.smarthome.core.thing.binding,
@@ -17,6 +18,7 @@ Import-Package:
  org.eclipse.smarthome.core.thing.type,
  org.eclipse.smarthome.core.types,
  org.eclipse.smarthome.io.transport.mqtt,
+ org.openhab.binding.mqtt.handler,
  org.osgi.framework,
  org.osgi.service.component,
  org.slf4j
diff --git a/src/main/java/org/openhab/binding/openlicht/handler/AbstractMqttHandler.java b/src/main/java/org/openhab/binding/openlicht/handler/AbstractMqttHandler.java
index 41c60a8..e2e8cbc 100644
--- a/src/main/java/org/openhab/binding/openlicht/handler/AbstractMqttHandler.java
+++ b/src/main/java/org/openhab/binding/openlicht/handler/AbstractMqttHandler.java
@@ -4,73 +4,112 @@ import static org.openhab.binding.openlicht.BindingConstants.*;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
-
-import javax.naming.ConfigurationException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.smarthome.core.common.registry.RegistryChangeListener;
 import org.eclipse.smarthome.core.thing.Thing;
+import org.eclipse.smarthome.core.thing.ThingRegistry;
 import org.eclipse.smarthome.core.thing.ThingStatus;
 import org.eclipse.smarthome.core.thing.ThingStatusDetail;
 import org.eclipse.smarthome.core.thing.binding.BaseThingHandler;
+import org.eclipse.smarthome.core.thing.binding.ThingHandler;
 import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection;
-import org.eclipse.smarthome.io.transport.mqtt.MqttBrokersObserver;
-import org.eclipse.smarthome.io.transport.mqtt.MqttException;
 import org.eclipse.smarthome.io.transport.mqtt.MqttMessageSubscriber;
-import org.eclipse.smarthome.io.transport.mqtt.MqttService;
+import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
 import org.openhab.binding.openlicht.internal.ConfigurationHolder;
 import org.osgi.framework.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class AbstractMqttHandler extends BaseThingHandler
-        implements MqttMessageSubscriber, MqttBrokersObserver {
-
-    @NonNull
-    protected final Logger logger = LoggerFactory.getLogger(AbstractMqttHandler.class);
-
-    // protected OpenLichtConfiguration config;
-
-    private @Nullable MqttService mqttService;
+        implements MqttMessageSubscriber, RegistryChangeListener<Thing> {
 
+    protected final @NonNull Logger logger = LoggerFactory.getLogger(AbstractMqttHandler.class);
+    private @Nullable ThingRegistry thingRegistry;
     private int usedTopicLength = 0;
-
     private MqttBrokerConnection currentBrokerConnection = null;
-
     private boolean warnNoBrokerConnection = true;
-
     protected @Nullable ScheduledExecutorService executor;
-
     private Version version;
+    private Lock configUpdateLock;
+    private String myBrokerName;
 
     public AbstractMqttHandler(Thing thing, ConfigurationHolder configurationHolder) {
         super(thing);
-        this.mqttService = configurationHolder.getMqttService();
+        this.thingRegistry = configurationHolder.getThingRegistry();
+        this.thingRegistry.addRegistryChangeListener(this);
         this.executor = configurationHolder.getExecutor();
         this.version = configurationHolder.getVersion();
+        this.configUpdateLock = new ReentrantLock();
     }
 
     @Override
-    public void initialize() {
-        // config = getConfigAs(OpenLichtConfiguration.class);
-        String brokerName = getConfigValueAsString(CONFIG_BROKER_NAME);
+    public void added(Thing element) {
+        if (thingIsMyMqttBroker(element)) {
+            addConnectionOf(element);
+        }
+    }
 
-        // remove this handler from mqttService (in case initialize is called because of a config update)
-        this.mqttService.removeBrokersListener(this);
-        this.mqttService.addBrokersListener(this);
+    @Override
+    public void removed(Thing element) {
+        if (thingIsMyMqttBroker(element)) {
+            removeMyBroker();
+        }
+    }
 
-        updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.CONFIGURATION_PENDING, "Searching broker");
-        for (MqttBrokerConnection broker : this.mqttService.getAllBrokerConnections()) {
-            brokerAdded(broker);
+    @Override
+    public void updated(Thing oldElement, Thing element) {
+        if (thingIsMyMqttBroker(element)) {
+            removeMyBroker();
+            addConnectionOf(element);
         }
-        if (currentBrokerConnection == null) {
-            MqttBrokerConnection myBroker = this.mqttService.getBrokerConnection(brokerName);
-            if (myBroker != null) {
-                brokerAdded(myBroker);
-            }
+    }
+
+    private boolean thingIsMyMqttBroker(Thing thing) {
+        if (!"mqtt:systemBroker".equals(thing.getThingTypeUID().getAsString())
+                && !"mqtt:broker".equals(thing.getThingTypeUID().getAsString())) {
+            return false;
         }
-        publish("init version: " + version.toString());
+        return myBrokerName.equals(thing.getUID().getAsString());
+    }
+
+    private void addConnectionOf(Thing mqttBroker) {
+        ThingHandler handler = mqttBroker.getHandler();
+        if (handler instanceof AbstractBrokerHandler) {
+            AbstractBrokerHandler abh = (AbstractBrokerHandler) handler;
+            myBrokerAdded(abh.getConnection());
+        }
+    }
+
+    @Override
+    public void initialize() {
+        updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.CONFIGURATION_PENDING, "Searching broker");
+        new Thread(() -> {
+            try {
+                configUpdateLock.lock();
+                String brokerName = getConfigValueAsString(CONFIG_BROKER_NAME);
+                if (brokerName == null) {
+                    updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "BrokerName not set");
+                    return;
+                }
+                myBrokerName = brokerName;
+                logger.debug("Setting myBrokerName to '{}'", myBrokerName);
+
+                for (Thing thing : this.thingRegistry.getAll()) {
+                    if (thingIsMyMqttBroker(thing)) {
+                        addConnectionOf(thing);
+                    }
+                }
+            } finally {
+                configUpdateLock.unlock();
+            }
+        }).start();
     }
 
     protected String getConfigValueAsString(String key) {
@@ -82,31 +121,28 @@ public abstract class AbstractMqttHandler extends BaseThingHandler
         return bd == null ? null : bd.intValue();
     }
 
-    @Override
-    public void brokerAdded(MqttBrokerConnection broker) {
-        String brokerName = getConfigValueAsString(CONFIG_BROKER_NAME);
-        if (broker.getName().equals(brokerName)) {
-            // this is our broker!
-            this.logger.info("Got correct broker, subscribing to topic {}", getTopic());
-            currentBrokerConnection = broker;
-            warnNoBrokerConnection = true;
-            updateTopicLength();
-            try {
-                currentBrokerConnection.start();
-                if (subscribeTopic()) {
-                    broker.addConsumer(this);
-                }
+    public void myBrokerAdded(MqttBrokerConnection broker) {
+        this.logger.info("Got correct broker, subscribing to topic {}", getTopic());
+        currentBrokerConnection = broker;
+        warnNoBrokerConnection = true;
+        updateTopicLength();
+        CompletableFuture<@NonNull Boolean> future = null;
+        try {
+            currentBrokerConnection.start();
+            if (subscribeTopic()) {
+                future = broker.subscribe(getTopic(), this);
+            }
+            if (future != null && future.get()) {
                 updateStatus(ThingStatus.ONLINE);
                 this.logger.info("Broker found, thing is online");
-            } catch (MqttException e) {
-                this.logger.error("Could not add subscriber to broker {}", brokerName);
-                updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
-                        "Could not add subscriber to broker " + brokerName);
-            } catch (ConfigurationException e) {
-                this.logger.error("Error during first connection to mqtt broker", e);
             }
-        } else {
-            this.logger.debug("Got another broker, not interessted!");
+        } catch (InterruptedException e) {
+            this.logger.debug("Interrupted while waiting for connection");
+        } catch (ExecutionException e) {
+            String message = "Could not add subscriber to broker";
+            this.logger.error(message);
+            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
+            this.logger.warn("exception", e);
         }
     }
 
@@ -116,14 +152,9 @@ public abstract class AbstractMqttHandler extends BaseThingHandler
         usedTopicLength = getTopic().length() - (subscribeSubTopics() ? 1 : 0);
     }
 
-    @Override
-    public void brokerRemoved(MqttBrokerConnection broker) {
-        String brokerName = getConfigValueAsString(CONFIG_BROKER_NAME);
-        if (broker.getName().equals(brokerName)) {
-            // this was our broker!
-            updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Broker is offline");
-            currentBrokerConnection = null;
-        }
+    public void removeMyBroker() {
+        updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Broker is offline");
+        currentBrokerConnection = null;
     }
 
     protected void publish(String message) {
@@ -138,14 +169,9 @@ public abstract class AbstractMqttHandler extends BaseThingHandler
         String topic = subscribeSubTopics() ? (getTopic().substring(0, usedTopicLength) + "out")
                 : (getTopic() + "/out");
         byte[] payload = message.getBytes();
-        try {
-            currentBrokerConnection.publish(topic, payload, null);
-        } catch (MqttException e) {
-            logger.error("Error while publishing", e);
-        }
+        currentBrokerConnection.publish(topic, payload);
     }
 
-    @Override
     public final String getTopic() {
         return getConfigValueAsString(CONFIG_BASE_TOPIC) + (getSubTopic() == null ? "" : "/" + getSubTopic())
                 + (subscribeSubTopics() ? "/#" : "");
@@ -187,7 +213,7 @@ public abstract class AbstractMqttHandler extends BaseThingHandler
     /**
      * Writes a float array from the given input based on the length of the targeted output.
      *
-     * @param input the buffer to read from
+     * @param input  the buffer to read from
      * @param output the array to write in
      */
     protected void writeFloatArray(ByteBuffer input, float[] output) {
diff --git a/src/main/java/org/openhab/binding/openlicht/internal/ConfigurationHolder.java b/src/main/java/org/openhab/binding/openlicht/internal/ConfigurationHolder.java
index 8355ff9..e9b407b 100644
--- a/src/main/java/org/openhab/binding/openlicht/internal/ConfigurationHolder.java
+++ b/src/main/java/org/openhab/binding/openlicht/internal/ConfigurationHolder.java
@@ -3,6 +3,7 @@ package org.openhab.binding.openlicht.internal;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.smarthome.core.thing.ThingRegistry;
 import org.eclipse.smarthome.io.transport.mqtt.MqttService;
 import org.osgi.framework.Version;
 
@@ -14,6 +15,9 @@ public interface ConfigurationHolder {
     @Nullable
     MqttService getMqttService();
 
+    @Nullable
+    ThingRegistry getThingRegistry();
+
     Version getVersion();
 
 }
diff --git a/src/main/java/org/openhab/binding/openlicht/internal/OpenLichtHandlerFactory.java b/src/main/java/org/openhab/binding/openlicht/internal/OpenLichtHandlerFactory.java
index 79ec856..d9760d8 100644
--- a/src/main/java/org/openhab/binding/openlicht/internal/OpenLichtHandlerFactory.java
+++ b/src/main/java/org/openhab/binding/openlicht/internal/OpenLichtHandlerFactory.java
@@ -24,6 +24,7 @@ import java.util.stream.Stream;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.eclipse.smarthome.core.thing.Thing;
+import org.eclipse.smarthome.core.thing.ThingRegistry;
 import org.eclipse.smarthome.core.thing.ThingTypeUID;
 import org.eclipse.smarthome.core.thing.binding.BaseThingHandlerFactory;
 import org.eclipse.smarthome.core.thing.binding.ThingHandler;
@@ -56,6 +57,7 @@ public class OpenLichtHandlerFactory extends BaseThingHandlerFactory implements
             Stream.of(THING_TYPE_SKYWRITER_HAT, THING_TYPE_POLAR_M600, THING_TYPE_MOTO_360, THING_TYPE_SAMSUNG_S6)
                     .collect(Collectors.toSet()));
     private @Nullable MqttService service;
+    private @Nullable ThingRegistry thingRegistry;
     private @Nullable ScheduledExecutorService executor;
 
     @Override
@@ -107,6 +109,11 @@ public class OpenLichtHandlerFactory extends BaseThingHandlerFactory implements
         return service;
     }
 
+    @Override
+    public @Nullable ThingRegistry getThingRegistry() {
+        return thingRegistry;
+    }
+
     @Override
     public Version getVersion() {
         return FrameworkUtil.getBundle(getClass()).getVersion();
@@ -122,4 +129,15 @@ public class OpenLichtHandlerFactory extends BaseThingHandlerFactory implements
         LoggerFactory.getLogger(OpenLichtHandlerFactory.class).info("Deleting mqtt service {}", service);
         this.service = null;
     }
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.STATIC)
+    public void setThingRegistry(ThingRegistry thingRegistry) {
+        LoggerFactory.getLogger(OpenLichtHandlerFactory.class).info("Setting mqtt service to {}", thingRegistry);
+        this.thingRegistry = thingRegistry;
+    }
+
+    public void unsetThingRegistry(ThingRegistry service) {
+        LoggerFactory.getLogger(OpenLichtHandlerFactory.class).info("Deleting mqtt service {}", service);
+        this.thingRegistry = null;
+    }
 }
-- 
GitLab