Skip to content
Snippets Groups Projects
Commit e4cfda69 authored by René Schöne's avatar René Schöne
Browse files

Updated to new mqtt binding.

parent 8a277ded
Branches
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@
xsi:schemaLocation="http://eclipse.org/smarthome/schemas/binding/v1.0.0 http://eclipse.org/smarthome/schemas/binding-1.0.0.xsd">
<name>OpenLicht Binding</name>
<description>This is the binding for OpenLicht (Last Update: 2018-10-01 15:07).</description>
<description>This is the binding for OpenLicht (Last Update: 2019-03-08 16:38).</description>
<author>René Schöne</author>
</binding:binding>
......@@ -10,6 +10,7 @@ Bundle-Version: 2.3.0.qualifier
Import-Package:
org.eclipse.jdt.annotation;resolution:=optional,
org.eclipse.smarthome.config.core,
org.eclipse.smarthome.core.common.registry,
org.eclipse.smarthome.core.library.types,
org.eclipse.smarthome.core.thing,
org.eclipse.smarthome.core.thing.binding,
......@@ -17,6 +18,7 @@ Import-Package:
org.eclipse.smarthome.core.thing.type,
org.eclipse.smarthome.core.types,
org.eclipse.smarthome.io.transport.mqtt,
org.openhab.binding.mqtt.handler,
org.osgi.framework,
org.osgi.service.component,
org.slf4j
......
......@@ -4,73 +4,112 @@ import static org.openhab.binding.openlicht.BindingConstants.*;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import javax.naming.ConfigurationException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.core.common.registry.RegistryChangeListener;
import org.eclipse.smarthome.core.thing.Thing;
import org.eclipse.smarthome.core.thing.ThingRegistry;
import org.eclipse.smarthome.core.thing.ThingStatus;
import org.eclipse.smarthome.core.thing.ThingStatusDetail;
import org.eclipse.smarthome.core.thing.binding.BaseThingHandler;
import org.eclipse.smarthome.core.thing.binding.ThingHandler;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokersObserver;
import org.eclipse.smarthome.io.transport.mqtt.MqttException;
import org.eclipse.smarthome.io.transport.mqtt.MqttMessageSubscriber;
import org.eclipse.smarthome.io.transport.mqtt.MqttService;
import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
import org.openhab.binding.openlicht.internal.ConfigurationHolder;
import org.osgi.framework.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractMqttHandler extends BaseThingHandler
implements MqttMessageSubscriber, MqttBrokersObserver {
@NonNull
protected final Logger logger = LoggerFactory.getLogger(AbstractMqttHandler.class);
// protected OpenLichtConfiguration config;
private @Nullable MqttService mqttService;
implements MqttMessageSubscriber, RegistryChangeListener<Thing> {
protected final @NonNull Logger logger = LoggerFactory.getLogger(AbstractMqttHandler.class);
private @Nullable ThingRegistry thingRegistry;
private int usedTopicLength = 0;
private MqttBrokerConnection currentBrokerConnection = null;
private boolean warnNoBrokerConnection = true;
protected @Nullable ScheduledExecutorService executor;
private Version version;
private Lock configUpdateLock;
private String myBrokerName;
public AbstractMqttHandler(Thing thing, ConfigurationHolder configurationHolder) {
super(thing);
this.mqttService = configurationHolder.getMqttService();
this.thingRegistry = configurationHolder.getThingRegistry();
this.thingRegistry.addRegistryChangeListener(this);
this.executor = configurationHolder.getExecutor();
this.version = configurationHolder.getVersion();
this.configUpdateLock = new ReentrantLock();
}
@Override
public void initialize() {
// config = getConfigAs(OpenLichtConfiguration.class);
String brokerName = getConfigValueAsString(CONFIG_BROKER_NAME);
public void added(Thing element) {
if (thingIsMyMqttBroker(element)) {
addConnectionOf(element);
}
}
// remove this handler from mqttService (in case initialize is called because of a config update)
this.mqttService.removeBrokersListener(this);
this.mqttService.addBrokersListener(this);
@Override
public void removed(Thing element) {
if (thingIsMyMqttBroker(element)) {
removeMyBroker();
}
}
updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.CONFIGURATION_PENDING, "Searching broker");
for (MqttBrokerConnection broker : this.mqttService.getAllBrokerConnections()) {
brokerAdded(broker);
@Override
public void updated(Thing oldElement, Thing element) {
if (thingIsMyMqttBroker(element)) {
removeMyBroker();
addConnectionOf(element);
}
if (currentBrokerConnection == null) {
MqttBrokerConnection myBroker = this.mqttService.getBrokerConnection(brokerName);
if (myBroker != null) {
brokerAdded(myBroker);
}
}
private boolean thingIsMyMqttBroker(Thing thing) {
if (!"mqtt:systemBroker".equals(thing.getThingTypeUID().getAsString())
&& !"mqtt:broker".equals(thing.getThingTypeUID().getAsString())) {
return false;
}
publish("init version: " + version.toString());
return myBrokerName.equals(thing.getUID().getAsString());
}
private void addConnectionOf(Thing mqttBroker) {
ThingHandler handler = mqttBroker.getHandler();
if (handler instanceof AbstractBrokerHandler) {
AbstractBrokerHandler abh = (AbstractBrokerHandler) handler;
myBrokerAdded(abh.getConnection());
}
}
@Override
public void initialize() {
updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.CONFIGURATION_PENDING, "Searching broker");
new Thread(() -> {
try {
configUpdateLock.lock();
String brokerName = getConfigValueAsString(CONFIG_BROKER_NAME);
if (brokerName == null) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "BrokerName not set");
return;
}
myBrokerName = brokerName;
logger.debug("Setting myBrokerName to '{}'", myBrokerName);
for (Thing thing : this.thingRegistry.getAll()) {
if (thingIsMyMqttBroker(thing)) {
addConnectionOf(thing);
}
}
} finally {
configUpdateLock.unlock();
}
}).start();
}
protected String getConfigValueAsString(String key) {
......@@ -82,31 +121,28 @@ public abstract class AbstractMqttHandler extends BaseThingHandler
return bd == null ? null : bd.intValue();
}
@Override
public void brokerAdded(MqttBrokerConnection broker) {
String brokerName = getConfigValueAsString(CONFIG_BROKER_NAME);
if (broker.getName().equals(brokerName)) {
// this is our broker!
this.logger.info("Got correct broker, subscribing to topic {}", getTopic());
currentBrokerConnection = broker;
warnNoBrokerConnection = true;
updateTopicLength();
try {
currentBrokerConnection.start();
if (subscribeTopic()) {
broker.addConsumer(this);
}
public void myBrokerAdded(MqttBrokerConnection broker) {
this.logger.info("Got correct broker, subscribing to topic {}", getTopic());
currentBrokerConnection = broker;
warnNoBrokerConnection = true;
updateTopicLength();
CompletableFuture<@NonNull Boolean> future = null;
try {
currentBrokerConnection.start();
if (subscribeTopic()) {
future = broker.subscribe(getTopic(), this);
}
if (future != null && future.get()) {
updateStatus(ThingStatus.ONLINE);
this.logger.info("Broker found, thing is online");
} catch (MqttException e) {
this.logger.error("Could not add subscriber to broker {}", brokerName);
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
"Could not add subscriber to broker " + brokerName);
} catch (ConfigurationException e) {
this.logger.error("Error during first connection to mqtt broker", e);
}
} else {
this.logger.debug("Got another broker, not interessted!");
} catch (InterruptedException e) {
this.logger.debug("Interrupted while waiting for connection");
} catch (ExecutionException e) {
String message = "Could not add subscriber to broker";
this.logger.error(message);
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
this.logger.warn("exception", e);
}
}
......@@ -116,14 +152,9 @@ public abstract class AbstractMqttHandler extends BaseThingHandler
usedTopicLength = getTopic().length() - (subscribeSubTopics() ? 1 : 0);
}
@Override
public void brokerRemoved(MqttBrokerConnection broker) {
String brokerName = getConfigValueAsString(CONFIG_BROKER_NAME);
if (broker.getName().equals(brokerName)) {
// this was our broker!
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Broker is offline");
currentBrokerConnection = null;
}
public void removeMyBroker() {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Broker is offline");
currentBrokerConnection = null;
}
protected void publish(String message) {
......@@ -138,14 +169,9 @@ public abstract class AbstractMqttHandler extends BaseThingHandler
String topic = subscribeSubTopics() ? (getTopic().substring(0, usedTopicLength) + "out")
: (getTopic() + "/out");
byte[] payload = message.getBytes();
try {
currentBrokerConnection.publish(topic, payload, null);
} catch (MqttException e) {
logger.error("Error while publishing", e);
}
currentBrokerConnection.publish(topic, payload);
}
@Override
public final String getTopic() {
return getConfigValueAsString(CONFIG_BASE_TOPIC) + (getSubTopic() == null ? "" : "/" + getSubTopic())
+ (subscribeSubTopics() ? "/#" : "");
......@@ -187,7 +213,7 @@ public abstract class AbstractMqttHandler extends BaseThingHandler
/**
* Writes a float array from the given input based on the length of the targeted output.
*
* @param input the buffer to read from
* @param input the buffer to read from
* @param output the array to write in
*/
protected void writeFloatArray(ByteBuffer input, float[] output) {
......
......@@ -3,6 +3,7 @@ package org.openhab.binding.openlicht.internal;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.core.thing.ThingRegistry;
import org.eclipse.smarthome.io.transport.mqtt.MqttService;
import org.osgi.framework.Version;
......@@ -14,6 +15,9 @@ public interface ConfigurationHolder {
@Nullable
MqttService getMqttService();
@Nullable
ThingRegistry getThingRegistry();
Version getVersion();
}
......@@ -24,6 +24,7 @@ import java.util.stream.Stream;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.core.thing.Thing;
import org.eclipse.smarthome.core.thing.ThingRegistry;
import org.eclipse.smarthome.core.thing.ThingTypeUID;
import org.eclipse.smarthome.core.thing.binding.BaseThingHandlerFactory;
import org.eclipse.smarthome.core.thing.binding.ThingHandler;
......@@ -56,6 +57,7 @@ public class OpenLichtHandlerFactory extends BaseThingHandlerFactory implements
Stream.of(THING_TYPE_SKYWRITER_HAT, THING_TYPE_POLAR_M600, THING_TYPE_MOTO_360, THING_TYPE_SAMSUNG_S6)
.collect(Collectors.toSet()));
private @Nullable MqttService service;
private @Nullable ThingRegistry thingRegistry;
private @Nullable ScheduledExecutorService executor;
@Override
......@@ -107,6 +109,11 @@ public class OpenLichtHandlerFactory extends BaseThingHandlerFactory implements
return service;
}
@Override
public @Nullable ThingRegistry getThingRegistry() {
return thingRegistry;
}
@Override
public Version getVersion() {
return FrameworkUtil.getBundle(getClass()).getVersion();
......@@ -122,4 +129,15 @@ public class OpenLichtHandlerFactory extends BaseThingHandlerFactory implements
LoggerFactory.getLogger(OpenLichtHandlerFactory.class).info("Deleting mqtt service {}", service);
this.service = null;
}
@Reference(cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.STATIC)
public void setThingRegistry(ThingRegistry thingRegistry) {
LoggerFactory.getLogger(OpenLichtHandlerFactory.class).info("Setting mqtt service to {}", thingRegistry);
this.thingRegistry = thingRegistry;
}
public void unsetThingRegistry(ThingRegistry service) {
LoggerFactory.getLogger(OpenLichtHandlerFactory.class).info("Deleting mqtt service {}", service);
this.thingRegistry = null;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment