Skip to content
Snippets Groups Projects
Commit adcc5a66 authored by René Schöne's avatar René Schöne
Browse files

Add RagConnect to project.

- removed MqttTopic, added TopicString to Item
- removed deprecated EraserParserHelper
- WIP: correct transformation of state to String and back
parent 8921b496
No related branches found
No related tags found
1 merge request!21Draft: Replace mqtt handling with RagConnect
Pipeline #10210 failed
Showing
with 147 additions and 1141 deletions
plugins {
id 'eraser.java-jastadd-conventions'
}
dependencies {
compileOnly group: 'de.tudresden.inf.st', name: 'ragconnect', version: '0.3.1'
}
...@@ -8,11 +8,11 @@ buildscript { ...@@ -8,11 +8,11 @@ buildscript {
plugins { plugins {
id 'eraser.java-application-conventions' id 'eraser.java-application-conventions'
id 'eraser.java-jastadd-conventions' id 'eraser.java-ragconnect-conventions'
} }
dependencies { dependencies {
jastadd2 "org.jastadd:jastadd:2.3.4" jastadd2 "org.jastadd:jastadd:2.3.5"
compileOnly group: 'de.tudresden.inf.st.jastadd', name: 'coverage-generator', version: '0.0.4' compileOnly group: 'de.tudresden.inf.st.jastadd', name: 'coverage-generator', version: '0.0.4'
api group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}" api group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}"
...@@ -25,12 +25,30 @@ dependencies { ...@@ -25,12 +25,30 @@ dependencies {
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: "${log4j_version}" testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: "${log4j_version}"
} }
application { mainClassName = 'de.tudresden.inf.st.eraser.Main'
mainClass = 'de.tudresden.inf.st.eraser.Main'
}
def relastFiles = fileTree('src/main/jastadd/') { def ragConnectRelastFiles = fileTree('src/main/jastadd/') {
include '**/*.relast' }.toList().toArray() include '**/*.relast' }.toList().toArray()
String[] ragconnectArguments = [
'--o=src/gen/jastadd',
'--logReads',
'--logWrites',
// '--verbose',
'--rootNode=Root',
// '--experimental-jastadd-329',
// '--incremental=param',
// '--tracing=cache,flush',
'src/main/jastadd/shem.connect',
]
task ragConnect(type: JavaExec) {
group = 'Build'
main = 'org.jastadd.ragconnect.compiler.Compiler'
classpath = configurations.compileOnly
args ragconnectArguments + ragConnectRelastFiles
}
String[] relastArguments = [ String[] relastArguments = [
"libs/relast.jar", "libs/relast.jar",
"--grammarName=./src/gen/jastadd/mainGen", "--grammarName=./src/gen/jastadd/mainGen",
...@@ -39,6 +57,8 @@ String[] relastArguments = [ ...@@ -39,6 +57,8 @@ String[] relastArguments = [
"--resolverHelper", "--resolverHelper",
"--file" "--file"
] ]
String[] relastFiles = ragConnectRelastFiles.collect { new File(it.toString().replace('/main/', '/gen/')) }
task preprocess(type: JavaExec) { task preprocess(type: JavaExec) {
group = 'Build' group = 'Build'
main = "-jar" main = "-jar"
...@@ -52,6 +72,7 @@ task preprocess(type: JavaExec) { ...@@ -52,6 +72,7 @@ task preprocess(type: JavaExec) {
String[] coverageGenArguments = [ String[] coverageGenArguments = [
'--List=JastAddList', '--List=JastAddList',
'--printYaml', '--printYaml',
'--inputBaseDir=src/gen/jastadd',
'--outputBaseDir=src/gen/jastadd' '--outputBaseDir=src/gen/jastadd'
] ]
task generateCoverage(type: JavaExec) { task generateCoverage(type: JavaExec) {
...@@ -115,6 +136,12 @@ sourceSets.main { ...@@ -115,6 +136,12 @@ sourceSets.main {
} }
} }
cleanGen.doFirst {
delete "src/gen/jastadd"
delete "src/gen/java"
}
preprocess.dependsOn ragConnect
generateAst.dependsOn preprocess generateAst.dependsOn preprocess
generateAst.dependsOn generateCoverage generateAst.dependsOn generateCoverage
generateAst.inputs.files file("./src/main/jastadd/mainGen.ast"), file("./src/main/jastadd/mainGen.jadd") generateAst.inputs.files file("./src/main/jastadd/mainGen.ast"), file("./src/main/jastadd/mainGen.jadd")
......
...@@ -98,7 +98,7 @@ aspect ItemHistory { ...@@ -98,7 +98,7 @@ aspect ItemHistory {
} }
// override Item.sendState from MQTT aspect // override Item.sendState from MQTT aspect
refine MQTT protected void Item.sendState() throws Exception { refine ItemHandling protected void Item.sendState() throws Exception {
refined(); refined();
getRoot().getInfluxRoot().influxAdapter().write(pointFromState()); getRoot().getInfluxRoot().influxAdapter().write(pointFromState());
} }
......
...@@ -64,7 +64,7 @@ aspect Printing { ...@@ -64,7 +64,7 @@ aspect Printing {
.addNonDefault("label", getLabel()) .addNonDefault("label", getLabel())
.addRequired("state", getStateAsString()) .addRequired("state", getStateAsString())
.addOptional("category", hasCategory(), () -> getCategory().getName()) .addOptional("category", hasCategory(), () -> getCategory().getName())
.addOptional("topic", hasTopic(), () -> getTopic().getTopicString()) .addNonDefault("topic", getTopicString())
.addOptionalPrettyPrint(getMetaData()) .addOptionalPrettyPrint(getMetaData())
.build(); .build();
} }
...@@ -91,7 +91,7 @@ aspect Printing { ...@@ -91,7 +91,7 @@ aspect Printing {
.addRequired("id", getID()) .addRequired("id", getID())
.addNonDefault("label", getLabel()) .addNonDefault("label", getLabel())
.addOptional("category", hasCategory(), () -> getCategory().getName()) .addOptional("category", hasCategory(), () -> getCategory().getName())
.addOptional("topic", hasTopic(), () -> getTopic().getTopicString()) .addNonDefault("topic", getTopicString())
.addOptionalPrettyPrint(getMetaData()) .addOptionalPrettyPrint(getMetaData())
.build(); .build();
} }
......
...@@ -73,11 +73,6 @@ aspect Resolving { ...@@ -73,11 +73,6 @@ aspect Resolving {
return java.util.Optional.empty(); return java.util.Optional.empty();
} }
//--- resolveMqttTopic ---
syn java.util.Optional<MqttTopic> Root.resolveMqttTopic(String mqttTopicId) {
return this.getMqttRoot().resolveTopic(mqttTopicId);
}
//--- resolveItemCategory --- //--- resolveItemCategory ---
syn java.util.Optional<ItemCategory> SmartHomeEntityModel.resolveItemCategory(String categoryName) { syn java.util.Optional<ItemCategory> SmartHomeEntityModel.resolveItemCategory(String categoryName) {
for (ItemCategory category : getItemCategoryList()) { for (ItemCategory category : getItemCategoryList()) {
...@@ -161,12 +156,6 @@ aspect Resolving { ...@@ -161,12 +156,6 @@ aspect Resolving {
return containingSmartHomeEntityModel().resolveDefaultChannelCategory(id).orElseThrow(() -> new RuntimeException("DefaultChannelCategory '" + id + "' not found!")); return containingSmartHomeEntityModel().resolveDefaultChannelCategory(id).orElseThrow(() -> new RuntimeException("DefaultChannelCategory '" + id + "' not found!"));
} }
// Item.Topic? <-> MqttTopic.Item*
refine RefResolverStubs eq Item.resolveTopicByToken(String id) {
// not an actual resolving, also adds the new mqtt-topic under mqtt-root
return getRoot().getMqttRoot().getOrCreateMqttTopic(id);
}
// Item.Category? <-> ItemCategory.Items* // Item.Category? <-> ItemCategory.Items*
refine RefResolverStubs eq Item.resolveCategoryByToken(String id) { refine RefResolverStubs eq Item.resolveCategoryByToken(String id) {
// not an actual resolving, also adds the new item-category under containing model // not an actual resolving, also adds the new item-category under containing model
......
...@@ -210,7 +210,7 @@ ItemPrototype item_body = ...@@ -210,7 +210,7 @@ ItemPrototype item_body =
ID EQUALS TEXT.n item_body.i {: i.setID(n); return i; :} ID EQUALS TEXT.n item_body.i {: i.setID(n); return i; :}
| LABEL EQUALS TEXT.n item_body.i {: i.setLabel(n); return i; :} | LABEL EQUALS TEXT.n item_body.i {: i.setLabel(n); return i; :}
| STATE EQUALS TEXT.n item_body.i {: i.setStateFromString(n); return i; :} | STATE EQUALS TEXT.n item_body.i {: i.setStateFromString(n); return i; :}
| TOPIC EQUALS TEXT.n item_body.i {: i.setTopic(MqttTopic.createRef(n)); return i; :} | TOPIC EQUALS TEXT.n item_body.i {: i.setTopicString(n); return i; :}
| CATEGORY EQUALS TEXT.n item_body.i {: i.setCategory(ItemCategory.createRef(n)); return i; :} | CATEGORY EQUALS TEXT.n item_body.i {: i.setCategory(ItemCategory.createRef(n)); return i; :}
| PERFORMANCE EQUALS TEXT.n item_body.i {: i.setFrequencySetting(FrequencySetting.createRef(n)); return i; :} | PERFORMANCE EQUALS TEXT.n item_body.i {: i.setFrequencySetting(FrequencySetting.createRef(n)); return i; :}
| META_DATA EQUALS meta_data.md item_body.i {: i.setMetaData(md); return i; :} | META_DATA EQUALS meta_data.md item_body.i {: i.setMetaData(md); return i; :}
......
...@@ -3,27 +3,7 @@ aspect MQTT { ...@@ -3,27 +3,7 @@ aspect MQTT {
// --- default values --- // --- default values ---
private static final int MqttRoot.DEFAULT_PORT = 1883; private static final int MqttRoot.DEFAULT_PORT = 1883;
//--- resolveTopic --- // --- ensureCorrectPrefixes ---
syn java.util.Optional<MqttTopic> MqttRoot.resolveTopic(String topic) {
ensureCorrectPrefixes();
if (!topic.startsWith(getIncomingPrefix())) {
logger.warn("Topic '{}' does not start with incoming prefix '{}'", topic, getIncomingPrefix());
return java.util.Optional.empty();
}
String suffix = topic.substring(getIncomingPrefix().length());
return resolveTopicSuffix(suffix);
}
//--- resolveTopicSuffix ---
syn java.util.Optional<MqttTopic> MqttRoot.resolveTopicSuffix(String suffix) {
for (MqttTopic current : getTopics()) {
if (current.getTopicString().equals(suffix)) {
return Optional.of(current);
}
}
return Optional.empty();
}
public void MqttRoot.ensureCorrectPrefixes() { public void MqttRoot.ensureCorrectPrefixes() {
if (!getIncomingPrefix().isEmpty() && !getIncomingPrefix().endsWith("/")) { if (!getIncomingPrefix().isEmpty() && !getIncomingPrefix().endsWith("/")) {
setIncomingPrefix(getIncomingPrefix() + "/"); setIncomingPrefix(getIncomingPrefix() + "/");
...@@ -33,65 +13,47 @@ aspect MQTT { ...@@ -33,65 +13,47 @@ aspect MQTT {
} }
} }
// --- getHost ---
syn ExternalHost MqttRoot.getHost() = new ExternalHost();
//--- getIncomingTopic --- // --- connectAllItems ---
syn String MqttTopic.getIncomingTopic() = getMqttRoot().getIncomingPrefix() + getTopicString(); public boolean SmartHomeEntityModel.connectAllItems() throws IOException {
MqttRoot mqttRoot = getRoot().getMqttRoot();
//--- getOutgoingTopic --- ExternalHost host = mqttRoot.getHost();
syn String MqttTopic.getOutgoingTopic() = getMqttRoot().getOutgoingPrefix() + getTopicString(); // TODO user/password not used yet (not supported by ragconnect yet)
String prefix = "mqtt://" + host.getHostName() + (host.getPort() != 0 ? ":" + host.getPort() : "") + "/";
//--- getMqttSender (should be cached) ---
cache MqttRoot.getMqttSender(); boolean success = true;
syn MQTTSender MqttRoot.getMqttSender() { for (Item item : this.items()) {
MQTTSender result; String suffix = item.getTopicString().isBlank() ? item.getID() : item.getTopicString();
if (getHost().exists()) { ConnectReceive connectReceive;
result = new MQTTSenderImpl(); ConnectSend connectSend;
if (item.isItemWithDoubleState()) {
connectReceive = item.asItemWithDoubleState()::connect_state;
connectSend = item.asItemWithDoubleState()::connect_state;
} else if (item.isItemWithBooleanState()) {
connectReceive = item.asItemWithBooleanState()::connect_state;
connectSend = item.asItemWithBooleanState()::connect_state;
} else if (item.isItemWithStringState()) {
connectReceive = item.asItemWithStringState()::connect_state;
connectSend = item.asItemWithStringState()::connect_state;
} else { } else {
result = new MQTTSenderStub(); // unsupported item type
continue;
} }
return result.setHost(getHost()); success &= connectReceive.apply(prefix + mqttRoot.getIncomingPrefix() + suffix) &
} connectSend.apply(prefix + mqttRoot.getOutgoingPrefix() + suffix, false);
//--- getMqttRoot ---
inh MqttRoot MqttTopic.getMqttRoot();
eq MqttRoot.getTopic().getMqttRoot() = this;
/**
* Sends the current state via MQTT.
*/
refine ItemHandling protected void Item.sendState() throws Exception {
refined();
if (getTopic() != null) {
getTopic().send(getStateAsString());
} }
return success;
} }
// public void Item.setState(String value, boolean shouldSendState) { class SmartHomeEntityModel {
// this.setState(value); interface ConnectReceive {
// } boolean apply(String uriString) throws IOException;
public void MqttTopic.send(String message) throws Exception {
getMqttRoot().getMqttSender().publish(getOutgoingTopic(), message);
} }
refine SmartHomeEntityModel public void SmartHomeEntityModel.addNewItem(Item item) { interface ConnectSend {
refined(item); boolean apply(String uriString, boolean writeCurrentValue) throws IOException;
// update mqtt-topic to new mqtt-root
item.setTopic(getRoot().getMqttRoot().getOrCreateMqttTopic(item.getTopic().getTopicString()));
} }
public MqttTopic MqttRoot.getOrCreateMqttTopic(String topicString) {
return resolveTopicSuffix(topicString).orElseGet(() -> {
MqttTopic result = new MqttTopic();
result.setTopicString(topicString);
addTopic(result);
return result;
});
} }
syn ExternalHost MqttRoot.getHost() = new ExternalHost();
} }
// ---------------- MQTT ------------------------------ // ---------------- MQTT ------------------------------
MqttRoot ::= Topic:MqttTopic* <IncomingPrefix:String> <OutgoingPrefix:String> /Host:ExternalHost/ ; MqttRoot ::= <IncomingPrefix:String> <OutgoingPrefix:String> /Host:ExternalHost/ ;
MqttTopic ::= <TopicString:String> ;
rel Item.Topic? <-> MqttTopic.Item* ;
receive ItemWithDoubleState._state using StringToDouble ;
send ItemWithDoubleState._state using DoubleToString ;
StringToDouble maps String s to double {:
return Double.parseDouble(s);
:}
DoubleToString maps double d to String {:
return Double.toString(d);
:}
receive ItemWithBooleanState._state using StringToBoolean ;
send ItemWithBooleanState._state using BooleanToString ;
StringToBoolean maps String s to boolean {:
return Boolean.parseBoolean(s);
:}
BooleanToString maps boolean b to String {:
return Boolean.toString(b);
:}
receive ItemWithStringState._state ;
send ItemWithStringState._state ;
...@@ -28,9 +28,7 @@ aspect SmartHomeEntityModel { ...@@ -28,9 +28,7 @@ aspect SmartHomeEntityModel {
result.setID(this.getID()); result.setID(this.getID());
result.setLabel(this.getLabel()); result.setLabel(this.getLabel());
result.setMetaData(this.getMetaData()); result.setMetaData(this.getMetaData());
if (this.hasTopic()) { result.setTopicString(this.getTopicString());
result.setTopic(this.getTopic());
}
if (this.hasCategory()) { if (this.hasCategory()) {
result.setCategory(this.getCategory()); result.setCategory(this.getCategory());
} }
......
...@@ -29,7 +29,7 @@ rel Channel.LinkedItem* <-> Item.Channel? ; ...@@ -29,7 +29,7 @@ rel Channel.LinkedItem* <-> Item.Channel? ;
Parameter : DescribableModelElement ::= <Type:ParameterValueType> [DefaultValue:ParameterDefaultValue] <Context:String> <Required:boolean> ; Parameter : DescribableModelElement ::= <Type:ParameterValueType> [DefaultValue:ParameterDefaultValue] <Context:String> <Required:boolean> ;
ParameterDefaultValue ::= <Value:String> ; ParameterDefaultValue ::= <Value:String> ;
abstract Item : LabelledModelElement ::= <_fetched_data:boolean> [MetaData] /ItemObserver/ /LastChanged/; abstract Item : LabelledModelElement ::= <_fetched_data:boolean> <TopicString> [MetaData] /ItemObserver/ /LastChanged/;
rel Item.Category? <-> ItemCategory.Items* ; rel Item.Category? <-> ItemCategory.Items* ;
rel Item.FrequencySetting? -> FrequencySetting ; rel Item.FrequencySetting? -> FrequencySetting ;
......
...@@ -2,29 +2,63 @@ package de.tudresden.inf.st.eraser; ...@@ -2,29 +2,63 @@ package de.tudresden.inf.st.eraser;
import beaver.Parser; import beaver.Parser;
import de.tudresden.inf.st.eraser.deserializer.ASTNodeDeserializer; import de.tudresden.inf.st.eraser.deserializer.ASTNodeDeserializer;
import de.tudresden.inf.st.eraser.jastadd.model.MqttHandler;
import de.tudresden.inf.st.eraser.jastadd.model.MqttRoot;
import de.tudresden.inf.st.eraser.jastadd.model.NumberItem;
import de.tudresden.inf.st.eraser.jastadd.model.Root; import de.tudresden.inf.st.eraser.jastadd.model.Root;
import de.tudresden.inf.st.eraser.openhab2.OpenHab2Importer; import de.tudresden.inf.st.eraser.openhab2.OpenHab2Importer;
import de.tudresden.inf.st.eraser.openhab2.mqtt.MQTTUpdater;
import de.tudresden.inf.st.eraser.util.ParserUtils; import de.tudresden.inf.st.eraser.util.ParserUtils;
import de.tudresden.inf.st.eraser.util.TestUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
/** /**
* Main entry point for testing eraser. * Main entry point for testing eraser.
* @author rschoene - Initial contribution * @author rschoene - Initial contribution
*/ */
@SuppressWarnings({"unused", "RedundantThrows"}) @SuppressWarnings({"unused", "RedundantThrows", "CommentedOutCode"})
public class Main { public class Main {
public static void main(String[] args) throws IOException, Parser.Exception { public static void main(String[] args) throws Exception {
// testSimple(); // testSimple();
// Root model = testParser(); // Root model = testParser();
// Root model = importFromOpenHab(); // Root model = importFromOpenHab();
// testPrinterWith(model); // testPrinterWith(model);
// testUpdaterWith(model); testRagConnectConnection();
}
private static void testRagConnectConnection() throws IOException, InterruptedException {
TestUtils.ModelAndItem mai = TestUtils.createModelAndItem(0, true);
NumberItem item = mai.item;
MqttRoot mqttRoot = mai.model.getRoot().getMqttRoot();
mqttRoot.getHost().setHostName("localhost");
MqttHandler handler = new MqttHandler().dontSendWelcomeMessage().setHost("localhost");
mqttRoot.setIncomingPrefix("inc");
mqttRoot.setOutgoingPrefix("out");
mqttRoot.ensureCorrectPrefixes();
CountDownLatch exit = new CountDownLatch(1);
// item.connect_state(mqttUri(mqttRoot.getIncomingPrefix() + item.getID()));
// item.connect_state(mqttUri(mqttRoot.getOutgoingPrefix() + item.getID()), true);
boolean success = mai.model.connectAllItems();
System.out.println("success = " + success);
handler.newConnection("model", bytes -> System.out.println(mai.model.prettyPrint()));
handler.newConnection("exit", bytes -> exit.countDown());
exit.await();
handler.close();
mai.model.getRoot().ragconnectCloseConnections();
}
private static String mqttUri(String s) {
return "mqtt://localhost/" + s;
} }
private static double readFromSystemIn(BufferedReader in, String prompt) throws IOException { private static double readFromSystemIn(BufferedReader in, String prompt) throws IOException {
...@@ -46,27 +80,6 @@ public class Main { ...@@ -46,27 +80,6 @@ public class Main {
System.out.println(model.prettyPrint()); System.out.println(model.prettyPrint());
} }
private static void testUpdater() {
Root model;
// model = importFromOpenHab();
model = importFromFile();
System.out.println("Got model: " + model.getSmartHomeEntityModel().description());
// JsonSerializer.write(model, "openhab2-data.json");
testUpdaterWith(model);
}
private static void testUpdaterWith(Root model) {
final int seconds = 10;
System.out.println("Start!");
try (MQTTUpdater updater = new MQTTUpdater(model)) {
LogManager.getLogger(Main.class).info("Processing mqtt updates for {} seconds", seconds);
updater.start();
Thread.sleep(seconds * 1000);
} catch (IllegalArgumentException | InterruptedException | IOException e) {
LogManager.getLogger(Main.class).catching(e);
}
}
private static void testSimple() { private static void testSimple() {
String s = "hingType: id="; String s = "hingType: id=";
for (Character c : s.toCharArray()) { for (Character c : s.toCharArray()) {
......
...@@ -59,7 +59,6 @@ public class ASTNodeDeserializer extends StdDeserializer<ASTNode> { ...@@ -59,7 +59,6 @@ public class ASTNodeDeserializer extends StdDeserializer<ASTNode> {
addResolverForSmartHomeEntityModel(resolversForSmartHomeEntityModel, ThingType.class, SmartHomeEntityModel::resolveThingType, "ID"); addResolverForSmartHomeEntityModel(resolversForSmartHomeEntityModel, ThingType.class, SmartHomeEntityModel::resolveThingType, "ID");
addResolverForSmartHomeEntityModel(resolversForSmartHomeEntityModel, ChannelType.class, SmartHomeEntityModel::resolveChannelType, "ID"); addResolverForSmartHomeEntityModel(resolversForSmartHomeEntityModel, ChannelType.class, SmartHomeEntityModel::resolveChannelType, "ID");
addResolverForSmartHomeEntityModel(resolversForSmartHomeEntityModel, Item.class, SmartHomeEntityModel::resolveItem, "ID"); addResolverForSmartHomeEntityModel(resolversForSmartHomeEntityModel, Item.class, SmartHomeEntityModel::resolveItem, "ID");
addResolver(resolvers, MqttTopic.class, Root::resolveMqttTopic, "IncomingTopic");
} }
private static void init() { private static void init() {
......
package de.tudresden.inf.st.eraser.jastadd.model;
import org.fusesource.mqtt.client.QoS;
import java.util.concurrent.TimeUnit;
/**
* Small helper to publish messages to a MQTT broker.
*
* @author rschoene - Initial contribution
*/
public interface MQTTSender extends AutoCloseable {
/**
* Sets the host running the MQTT broker (no username/password set).
* @param host host name (IP address or domain name) and port
*/
MQTTSender setHost(ExternalHost host);
/**
* Set the timeout used for connecting and disconnecting.
* @param connectTimeout Timeout value
* @param connectTimeoutUnit Timeout unit
*/
void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit);
/**
* Set the timeout used for publishing messages.
* @param publishTimeout Timeout value
* @param publishTimeoutUnit Timeout unit
*/
void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit);
/**
* Publishes a message in a topic at most once.
* @param topic the topic to publish at
* @param message the message to publish
* @throws Exception if the underlying connection throws an error
*/
default void publish(String topic, String message) throws Exception {
this.publish(topic, message, QoS.AT_MOST_ONCE);
}
/**
* Publishes a message in a topic with the given quality of service (QoS).
* @param topic the topic to publish at
* @param message the message to publish
* @param qos the needed quality of service (at most once, at least once, exactly once)
* @throws Exception if the underlying connection throws an error
*/
void publish(String topic, String message, QoS qos) throws Exception;
/**
* Checks, whether the connection to the host (set in the constructor) is established.
* @return <code>true</code> if this sender is connected to the host
*/
boolean isConnected();
@Override
void close() throws Exception;
}
package de.tudresden.inf.st.eraser.jastadd.model;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.mqtt.client.*;
import java.net.URI;
import java.util.concurrent.TimeUnit;
/**
* Implementation of a MQTT sender using <code>org.fusesource.mqtt.client</code>.
*
* @author rschoene - Initial contribution
*/
public class MQTTSenderImpl implements MQTTSender {
private final Logger logger = LogManager.getLogger(MQTTSenderImpl.class);;
/** The connection to the MQTT broker. */
private FutureConnection connection;
/** Timeout for connect/disconnect methods */
private long connectTimeout;
/** Unit of timeout for connect/disconnect methods */
private TimeUnit connectTimeoutUnit;
/** Timeout for publish method */
private long publishTimeout;
/** Unit of timeout for publish method */
private TimeUnit publishTimeoutUnit;
@Override
public MQTTSender setHost(ExternalHost host) {
/* The host running the MQTT broker. */
URI hostUri = URI.create("tcp://" + host.getHostName() + ":" + host.getPort());
logger.debug("Host is {}", hostUri);
MQTT mqtt = new MQTT();
String username = host.getUserName();
String password = host.getPassword();
if (username != null && !username.isEmpty()) {
mqtt.setUserName(username);
}
if (password != null && !password.isEmpty()) {
mqtt.setPassword(password);
}
mqtt.setHost(hostUri);
connection = mqtt.futureConnection();
setConnectTimeout(2, TimeUnit.SECONDS);
setPublishTimeout(1, TimeUnit.SECONDS);
ensureConnected();
return this;
}
@Override
public void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit) {
this.connectTimeout = connectTimeout;
this.connectTimeoutUnit = connectTimeoutUnit;
}
@Override
public void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit) {
this.publishTimeout = publishTimeout;
this.publishTimeoutUnit = publishTimeoutUnit;
}
@Override
public void publish(String topic, String message, QoS qos) throws Exception {
if (ensureConnected()) {
logger.debug("Send: {} -> {}", topic, message);
connection.publish(topic, message.getBytes(), qos, false).await(publishTimeout, publishTimeoutUnit);
}
}
/**
* Ensures an established connection.
* If already connected, return immediately. Otherwise try to connect.
* @return <code>true</code> if the connected was established successfully, <code>false</code> if there was an error
*/
private boolean ensureConnected() {
if (!isConnected()) {
try {
connection.connect().await(connectTimeout, connectTimeoutUnit);
} catch (Exception e) {
logger.warn("Could not connect", e);
return false;
}
}
return true;
}
@Override
public boolean isConnected() {
return connection != null && connection.isConnected();
}
@Override
public void close() throws Exception {
if (connection == null) {
logger.warn("Stopping without connection.");
return;
}
if (isConnected()) {
connection.disconnect().await(connectTimeout, connectTimeoutUnit);
}
}
}
package de.tudresden.inf.st.eraser.jastadd.model;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.mqtt.client.QoS;
import java.util.concurrent.TimeUnit;
/**
* Stub implementation of a MQTT sender only printing publish messages.
*
* @author rschoene - Initial contribution
*/
public class MQTTSenderStub implements MQTTSender {
public interface PublishCallback {
void onPublish(String topic, String message, QoS qos);
}
private Logger logger = LogManager.getLogger(MQTTSenderStub.class);
private PublishCallback callback;
@Override
public MQTTSender setHost(ExternalHost host) {
return this;
}
public void setCallback(PublishCallback callback) {
this.callback = callback;
}
@Override
public void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit) {
// empty
}
@Override
public void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit) {
// empty
}
@Override
public void publish(String topic, String message, QoS qos) {
// ignore QoS for now
logger.info("{}: {}", topic, message);
if (callback != null) {
callback.onPublish(topic, message, qos);
}
}
@Override
public boolean isConnected() {
return true;
}
@Override
public void close() {
// empty
}
}
package de.tudresden.inf.st.eraser.openhab2.mqtt;
import de.tudresden.inf.st.eraser.jastadd.model.ExternalHost;
import de.tudresden.inf.st.eraser.jastadd.model.Item;
import de.tudresden.inf.st.eraser.jastadd.model.Root;
import de.tudresden.inf.st.eraser.util.MqttReceiver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.mqtt.client.QoS;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Update an imported model by subscribing to MQTT topics.
*
* @author rschoene - Initial contribution
*/
public class MQTTUpdater implements AutoCloseable {
private final Logger logger;
/** Receiver actually receiving messages. */
private MqttReceiver delegatee;
public MQTTUpdater() {
this.logger = LogManager.getLogger(MQTTUpdater.class);
this.delegatee = new MqttReceiver();
}
public MQTTUpdater(Root root) throws IllegalArgumentException {
this();
this.setRoot(root);
}
/**
* Sets the model root to update
* @param root the model root to update
*/
public void setRoot(Root root) {
ExternalHost host = root.getMqttRoot().getHost();
delegatee.setHost(host);
delegatee.setOnMessage((topicString, message)->
root.getMqttRoot().resolveTopic(topicString).ifPresent(topic ->
topic.getItems().forEach(
item -> itemUpdate(item, message))));
delegatee.setTopicsForSubscription(root.getMqttRoot().getIncomingPrefix() + "#");
delegatee.setQoSForSubscription(QoS.AT_LEAST_ONCE);
}
private void itemUpdate(Item item, String state) {
String oldState = item.getStateAsString();
if (oldState == null || !oldState.equals(state)) {
this.logger.debug("Update state of {} [{}] from '{}' to '{}'.",
item.getLabel(), item.getID(), oldState, state);
item.setStateFromString(state, false);
}
}
/**
* Waits until this updater is ready to receive MQTT messages.
* If it already is ready, return immediately with the value <code>true</code>.
* Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan,
* if it got ready, or <code>false</code> upon a timeout.
* @param time the maximum time to wait
* @param unit the time unit of the time argument
* @return whether this updater is ready
*/
public boolean waitUntilReady(long time, TimeUnit unit) {
return delegatee.waitUntilReady(time, unit);
}
/**
* Starts the updating process, i.e., subscribe to the required topics, and updates the model.
* @throws IOException if could not connect, or could not subscribe to a topic
*/
public void start() throws IOException {
delegatee.start();
}
public void close() {
delegatee.close();
}
}
package de.tudresden.inf.st.eraser.parser;
import de.tudresden.inf.st.eraser.jastadd.model.*;
import de.tudresden.inf.st.eraser.util.JavaUtils;
import de.tudresden.inf.st.eraser.util.ParserUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.BiConsumer;
/**
* Resolving names while parsing model files.
*
* @author rschoene - Initial contribution
*/
@Deprecated
class EraserParserHelper {
private Logger logger = LogManager.getLogger(EraserParserHelper.class);
private Map<String, ThingType> thingTypeMap = new HashMap<>();
private Map<String, ChannelType> channelTypeMap = new HashMap<>();
private Map<String, ChannelCategory> channelCategoryMap = new HashMap<>();
private Map<String, Channel> channelMap = new HashMap<>();
private Map<String, Parameter> parameterMap = new HashMap<>();
private Map<String, Item> itemMap = new HashMap<>();
private Map<String, Group> groupMap = new HashMap<>();
private Map<String, FrequencySetting> FrequencySettingMap = new HashMap<>();
private Map<Thing, String> missingThingTypeMap = new HashMap<>();
private Map<Channel, String> missingChannelTypeMap = new HashMap<>();
private Map<Item, String> missingTopicMap = new HashMap<>();
private Map<Item, String> missingItemCategoryMap = new HashMap<>();
private Map<Designator, String> missingItemForDesignator = new HashMap<>();
private Map<Thing, Iterable<String>> missingChannelListMap = new HashMap<>();
private Map<Channel, Iterable<String>> missingItemLinkListMap = new HashMap<>();
private Map<Group, Iterable<String>> missingSubGroupListMap = new HashMap<>();
private Map<Group, Iterable<String>> missingItemListMap = new HashMap<>();
private Map<ThingType, Iterable<String>> missingChannelTypeListMap = new HashMap<>();
private Map<ThingType, Iterable<String>> missingParameterListMap = new HashMap<>();
private Set<ASTNode> unusedElements = new HashSet<>();
private Set<Item> groupedItems = new HashSet<>();
private List<Item> itemOrder = new LinkedList<>();
private Root root;
private static boolean checkUnusedElements = true;
private static Root initialRoot = null;
private class ItemPrototype extends DefaultItem {
}
/**
* Changes the behavior of the parser to check for unused elements. (Default: true)
* @param checkUnusedElements <code>true</code> to check for unused elements, <code>false</code> to skip the check
*/
public static void setCheckUnusedElements(boolean checkUnusedElements) {
EraserParserHelper.checkUnusedElements = checkUnusedElements;
}
public static void setInitialRoot(Root root) {
EraserParserHelper.initialRoot = root;
}
/**
* Post processing step after parsing a model, to resolve all references within the model.
*/
public void resolveReferences() {
if (this.root == null) {
// when parsing expressions
this.root = EraserParserHelper.initialRoot != null ? EraserParserHelper.initialRoot : createRoot();
}
if (checkUnusedElements) {
fillUnused();
}
resolve(thingTypeMap, missingThingTypeMap, Thing::setType);
resolve(channelTypeMap, missingChannelTypeMap, Channel::setType);
if (itemMap == null || itemMap.isEmpty()) {
missingItemForDesignator.forEach((designator, itemName) ->
JavaUtils.ifPresentOrElse(root.getSmartHomeEntityModel().resolveItem(itemName),
designator::setItem,
() -> logger.warn("Could not resolve item {} for {}", itemName, designator)));
} else {
resolve(itemMap, missingItemForDesignator, Designator::setItem);
}
// missingTopicMap.forEach((item, s) -> item.setMqttTopic(s));
this.root.getMqttRoot().ensureCorrectPrefixes();
resolveList(channelMap, missingChannelListMap, Thing::addChannel);
resolveList(itemMap, missingItemLinkListMap, Channel::addLinkedItem);
resolveList(groupMap, missingSubGroupListMap, Group::addGroup);
resolveList(itemMap, missingItemListMap, this::addItemToGroup);
resolveList(channelTypeMap, missingChannelTypeListMap, ThingType::addChannelType);
resolveList(parameterMap, missingParameterListMap, ThingType::addParameter);
createUnknownGroupIfNecessary();
// createChannelCategories();
createItemCategories();
if (checkUnusedElements) {
checkUnusedElements();
}
this.root.treeResolveAll();
this.root.doFullTraversal();
}
private void addItemToGroup(Group group, Item item) {
groupedItems.add(item);
group.addItem(item);
}
private void fillUnused() {
unusedElements.addAll(thingTypeMap.values());
unusedElements.addAll(channelTypeMap.values());
unusedElements.addAll(channelMap.values());
unusedElements.addAll(parameterMap.values());
// unusedElements.addAll(topicMap.values());
}
/**
* Create a group for all items not contained in a group. And warn if this was necessary.
*/
private void createUnknownGroupIfNecessary() {
if (itemMap.size() > groupedItems.size()) {
Set<Item> danglingItems = new HashSet<>(itemMap.values());
danglingItems.removeAll(groupedItems);
// probably terrible performance
List<Item> sortedDanglingItems = new LinkedList<>();
for (Item item : itemOrder) {
if (danglingItems.contains(item)) {
sortedDanglingItems.add(item);
}
}
ParserUtils.addToUnknownGroup(this.root.getSmartHomeEntityModel(), sortedDanglingItems);
}
}
private void createItemCategories() {
Map<String, ItemCategory> newCategories = new HashMap<>();
missingItemCategoryMap.forEach((item, category) ->
item.setCategory(newCategories.computeIfAbsent(category, ItemCategory::new)));
newCategories.values().forEach(node -> root.getSmartHomeEntityModel().addItemCategory(node));
}
private void checkUnusedElements() {
unusedElements.forEach(elem -> logger.info("{} '{}' defined, but not referenced.", elem.getClass().getSimpleName(), ident(elem)));
}
private String ident(ASTNode elem) {
if (elem instanceof ModelElement) {
return ((ModelElement) elem).getID();
} else if (elem instanceof MqttTopic) {
return ((MqttTopic) elem).getTopicString();
} else if (elem instanceof DefaultChannelCategory) {
return ((DefaultChannelCategory) elem).getValue().name();
} else if (elem instanceof SimpleChannelCategory) {
return ((SimpleChannelCategory) elem).getValue();
}
return elem.toString();
}
private <Src extends ASTNode, Target extends ASTNode> void resolveList(
Map<String, Target> resolved, Map<Src, Iterable<String>> missing, BiConsumer<Src, Target> adder) {
missing.forEach(
(elem, keyList) -> keyList.forEach(
key -> resolve0(resolved, key, elem, adder)));
missing.clear();
}
private <Src extends ASTNode, Target extends ASTNode> void resolve(
Map<String, Target> resolved, Map<Src, String> missing, BiConsumer<Src, Target> setter) {
missing.forEach(
(elem, key) -> resolve0(resolved, key, elem, setter));
missing.clear();
}
private <Src extends ASTNode, Target extends ASTNode> void resolve0(
Map<String, Target> resolved, String key, Src elem, BiConsumer<Src, Target> action) {
Target value = resolved.get(key);
if (value == null) {
logger.warn("Reference in {} {} for '{}' cannot be resolved",
elem.getClass().getSimpleName(), ident(elem), key);
return;
}
if (checkUnusedElements) {
unusedElements.remove(value);
}
action.accept(elem, value);
}
//--- Thing and ThingType ---
public Thing addThingType(Thing t, String typeName) {
missingThingTypeMap.put(t, typeName);
return t;
}
public ThingType setChannelTypes(ThingType tt, StringList channelTypeNames) {
missingChannelTypeListMap.put(tt, channelTypeNames);
return tt;
}
public ThingType setParameters(ThingType tt, StringList parameterNames) {
missingParameterListMap.put(tt, parameterNames);
return tt;
}
public Thing setChannels(Thing t, StringList channelNames) {
missingChannelListMap.put(t, channelNames);
return t;
}
public Thing setID(Thing thing, String id) {
thing.setID(id);
return thing;
}
public FrequencySetting setID(FrequencySetting FrequencySetting, String id) {
FrequencySetting.setID(id);
FrequencySettingMap.put(id,FrequencySetting);
return FrequencySetting;
}
public ThingType setID(ThingType thingType, String id) {
thingType.setID(id);
thingTypeMap.put(id, thingType);
return thingType;
}
//--- Channel and ChannelType ---
public ChannelType setItemType(ChannelType ct, String itemTypeName) {
ct.setItemType(ItemType.valueOf(itemTypeName));
return ct;
}
public Channel setChannelType(Channel c, String channelTypeName) {
missingChannelTypeMap.put(c, channelTypeName);
return c;
}
public Channel setLinks(Channel c, StringList linkNames) {
missingItemLinkListMap.put(c, linkNames);
return c;
}
public ChannelType setID(ChannelType channelType, String id) {
channelType.setID(id);
channelTypeMap.put(id, channelType);
return channelType;
}
public Channel setID(Channel channel, String id) {
channel.setID(id);
channelMap.put(id, channel);
return channel;
}
//--- Item ---
public Item createItem() {
ItemPrototype result = new ItemPrototype();
result.disableSendState();
return result;
}
public Item setCategory(Item item, String categoryName) {
missingItemCategoryMap.put(item, categoryName);
return item;
}
public Item retype(Item itemWithCorrectType, Item prototype) {
itemWithCorrectType.setID(prototype.getID());
itemWithCorrectType.setLabel(prototype.getLabel());
itemWithCorrectType.setMetaData(prototype.getMetaData());
itemWithCorrectType.setFrequencySetting(prototype.getFrequencySetting());
if (!(itemWithCorrectType instanceof ActivityItem)) {
String state = prototype.getStateAsString();
itemWithCorrectType.disableSendState();
if (state.isEmpty()) {
itemWithCorrectType.setStateToDefault();
} else {
itemWithCorrectType.setStateFromString(state);
}
itemWithCorrectType.enableSendState();
}
moveMissingForRetype(itemWithCorrectType, prototype, missingTopicMap);
moveMissingForRetype(itemWithCorrectType, prototype, missingItemCategoryMap);
itemMap.put(prototype.getID(), itemWithCorrectType);
itemOrder.add(itemWithCorrectType);
return itemWithCorrectType;
}
private <T> void moveMissingForRetype(Item itemWithCorrectType, Item prototype, Map<Item, T> missingXMap) {
T value = missingXMap.get(prototype);
if (value != null) {
missingXMap.put(itemWithCorrectType, value);
}
missingXMap.remove(prototype);
}
public Item setID(Item item, String id) {
item.setID(id);
itemMap.put(id, item);
return item;
}
//--- Group ---
public Group setSubGroups(Group g, StringList subGroupNames) {
missingSubGroupListMap.put(g, subGroupNames);
return g;
}
public Group setItems(Group g, StringList itemNames) {
missingItemListMap.put(g, itemNames);
return g;
}
public Group setSimpleAggregationFunction(Group g, String aggFunctionName) {
SimpleGroupAggregationFunctionName name = SimpleGroupAggregationFunctionName.valueOf(aggFunctionName.toUpperCase());
g.setAggregationFunction(new SimpleGroupAggregationFunction(name));
return g;
}
public Group setParameterizedAggregationFunction(Group g, String aggFunctionName, StringList params) {
ParameterizedGroupAggregationFunctionName name = ParameterizedGroupAggregationFunctionName.valueOf(
aggFunctionName.toUpperCase());
List<String> paramList = new ArrayList<>();
params.iterator().forEachRemaining(paramList::add);
String param1, param2;
if (paramList.size() == 2) {
param1 = paramList.get(0);
param2 = paramList.get(1);
} else {
logger.error("Got {} instead of 2 parameters in group function {}!", paramList.size(), aggFunctionName);
param1 = "?";
param2 = "?";
}
g.setAggregationFunction(new ParameterizedGroupAggregationFunction(name, param1, param2));
return g;
}
public Group setID(Group group, String id) {
group.setID(id);
groupMap.put(id, group);
return group;
}
//--- Parameter ---
public Parameter setParameterValueType(Parameter p, String pvt) {
p.setType(ParameterValueType.valueOf(JavaUtils.toTitleCase(pvt)));
return p;
}
public Parameter setDefault(Parameter p, String defaultValue) {
p.setDefaultValue(new ParameterDefaultValue(defaultValue));
return p;
}
public Parameter setID(Parameter parameter, String id) {
parameter.setID(id);
parameterMap.put(id, parameter);
return parameter;
}
//--- MQTT ---
public Item setTopic(Item item, String mqttTopicName) {
missingTopicMap.put(item, mqttTopicName);
return item;
}
//--- Activity ---
public MachineLearningRoot setActivities(MachineLearningRoot mlr, IntegerKeyMap map) {
for (AbstractMap.SimpleEntry<Integer, String> entry : map) {
Activity activity = new Activity();
activity.setIdentifier(entry.getKey());
activity.setLabel(entry.getValue());
mlr.addActivity(activity);
}
return mlr;
}
//--- Root ---
public Root createRoot() {
this.root = Root.createEmptyRoot();
return this.root;
}
public Root createRoot(Thing t) {
Root result = createRoot();
result.getSmartHomeEntityModel().addThing(t);
return result;
}
public Root createRoot(Group g) {
Root result = createRoot();
result.getSmartHomeEntityModel().addGroup(g);
return result;
}
public Root createRoot(ThingType tt) {
Root result = createRoot();
result.getSmartHomeEntityModel().addThingType(tt);
return result;
}
public Root createRoot(ChannelType ct) {
Root result = createRoot();
result.getSmartHomeEntityModel().addChannelType(ct);
return result;
}
public Root createRoot(MqttRoot mr) {
Root result = createRoot();
result.setMqttRoot(mr);
return result;
}
public Root createRoot(InfluxRoot ir) {
Root result = createRoot();
result.setInfluxRoot(ir);
return result;
}
public Root createRoot(MachineLearningRoot ml) {
Root result = createRoot();
result.setMachineLearningRoot(ml);
return result;
}
public Root createRoot(Rule rule) {
Root result = createRoot();
result.addRule(rule);
return result;
}
public Root createRoot(FrequencySetting frequencySetting) {
Root result = createRoot();
result.getSmartHomeEntityModel().addFrequencySetting(frequencySetting);
return result;
}
//+++ newStuff (to be categorized) +++
public Designator createDesignator(String itemName) {
Designator result = new Designator();
missingItemForDesignator.put(result, itemName);
return result;
}
public Rule createRule(Condition c, Action a) {
Rule result = new Rule();
result.addCondition(c);
result.addAction(a);
return result;
}
}
...@@ -37,16 +37,6 @@ public class ASTNodeSerializer extends StdSerializer<ASTNode> { ...@@ -37,16 +37,6 @@ public class ASTNodeSerializer extends StdSerializer<ASTNode> {
jgen.writeStringField("v", ((ModelElement) elem).getID()); jgen.writeStringField("v", ((ModelElement) elem).getID());
jgen.writeEndObject(); // end ID {} jgen.writeEndObject(); // end ID {}
})); }));
serializers.put(MqttTopic.class, ((jgen, elem) ->
{
jgen.writeStartObject();
jgen.writeFieldName("IncomingTopic");
jgen.writeStartObject();
jgen.writeStringField("k", "t");
jgen.writeStringField("t", String.class.getName());
jgen.writeStringField("v", ((MqttTopic) elem).getIncomingTopic());
jgen.writeEndObject(); // end IncomingTopic {}
}));
} }
Map<Class<?>, SerializeId> serializers = new HashMap<>(); Map<Class<?>, SerializeId> serializers = new HashMap<>();
...@@ -98,8 +88,6 @@ public class ASTNodeSerializer extends StdSerializer<ASTNode> { ...@@ -98,8 +88,6 @@ public class ASTNodeSerializer extends StdSerializer<ASTNode> {
SerializeId specificSerializer; SerializeId specificSerializer;
if (elem instanceof ModelElement) { if (elem instanceof ModelElement) {
specificSerializer = serializers.get(ModelElement.class); specificSerializer = serializers.get(ModelElement.class);
} else if (elem instanceof MqttTopic) {
specificSerializer = serializers.get(MqttTopic.class);
} else if (elem == null) { } else if (elem == null) {
throw new JsonGenerationException("Intrinsic reference to null in " + throw new JsonGenerationException("Intrinsic reference to null in " +
value.getClass().getSimpleName() + "." + m.getName(), jgen); value.getClass().getSimpleName() + "." + m.getName(), jgen);
......
package de.tudresden.inf.st.eraser.util;
import de.tudresden.inf.st.eraser.jastadd.model.ExternalHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
/**
* Subscribe to topics, receive and store messages.
*
* @author rschoene - Initial contribution
*/
public class MqttReceiver implements AutoCloseable {
private final Logger logger;
/** The host running the MQTT broker. */
private URI host;
private String username;
private String password;
/** The connection to the MQTT broker. */
private CallbackConnection connection;
/** Whether we are subscribed to the topics yet */
private Condition readyCondition;
private Lock readyLock;
private boolean ready;
private BiConsumer<String, String> onMessageCallback;
private String[] topics;
private QoS qos;
public MqttReceiver() {
this.logger = LogManager.getLogger(MqttReceiver.class);
this.readyLock = new ReentrantLock();
this.readyCondition = readyLock.newCondition();
this.ready = false;
qos = QoS.AT_LEAST_ONCE;
}
/**
* Sets the host to receive messages from
*/
public void setHost(ExternalHost externalHost) {
this.host = URI.create("tcp://" + externalHost.getHostName() + ":" + externalHost.getPort());
logger.debug("Host is {}", externalHost.getHostName());
}
public void setOnMessage(BiConsumer<String, String> callback) {
this.onMessageCallback = callback;
}
public void setTopicsForSubscription(String... topics) {
this.topics = topics;
}
public void setQoSForSubscription(QoS qos) {
this.qos = qos;
}
/**
* Waits until this updater is ready to receive MQTT messages.
* If it already is ready, return immediately with the value <code>true</code>.
* Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan,
* if it got ready, or <code>false</code> upon a timeout.
* @param time the maximum time to wait
* @param unit the time unit of the time argument
* @return whether this updater is ready
*/
public boolean waitUntilReady(long time, TimeUnit unit) {
try {
readyLock.lock();
if (ready) {
return true;
}
return readyCondition.await(time, unit);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readyLock.unlock();
}
return false;
}
/**
* Starts the updating process, i.e., subscribe to the required topics, and updates the model.
* @throws IOException if could not connect, or could not subscribe to a topic
*/
public void start() throws IOException {
if (ready) {
return;
}
Objects.requireNonNull(this.host, "Host need to be set!");
MQTT mqtt = new MQTT();
mqtt.setHost(this.host);
mqtt.setPassword(this.password);
mqtt.setUserName(this.username);
connection = mqtt.callbackConnection();
AtomicReference<Throwable> error = new AtomicReference<>();
connection.listener(new ExtendedListener() {
public void onConnected() {
logger.debug("Connected");
}
@Override
public void onDisconnected() {
logger.debug("Disconnected");
}
@Override
public void onPublish(UTF8Buffer topic, Buffer body, Callback<Callback<Void>> ack) {
String topicString = topic.toString();
String message = body.ascii().toString();
// logger.debug("{}: {}", topicString, message);
onMessageCallback.accept(topicString, message);
ack.onSuccess(null); // always acknowledge message
}
@Override
public void onPublish(UTF8Buffer topicBuffer, Buffer body, Runnable ack) {
logger.warn("onPublish should not be called");
}
@Override
public void onFailure(Throwable cause) {
// logger.catching(cause);
error.set(cause);
}
});
throwIf(error);
connection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
connection.publish("components", "Eraser is listening".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
}
@Override
public void onFailure(Throwable value) {
logger.debug("failure sending welcome message", value);
}
});
Topic[] topicArray = Arrays.stream(topics).map(topicName -> new Topic(topicName, qos)).toArray(Topic[]::new);
logger.info("Connected, subscribing to {} topic(s) now.", topicArray.length);
connection.subscribe(topicArray, new Callback<byte[]>() {
@Override
public void onSuccess(byte[] qoses) {
logger.debug("Subscribed, qoses: {}", qoses);
try {
readyLock.lock();
ready = true;
readyCondition.signalAll();
} finally {
readyLock.unlock();
}
}
@Override
public void onFailure(Throwable cause) {
logger.error("Could not subscribe", cause);
}
});
}
@Override
public void onFailure(Throwable cause) {
// logger.error("Could not connect", cause);
error.set(cause);
}
});
throwIf(error);
}
private void throwIf(AtomicReference<Throwable> error) throws IOException {
if (error.get() != null) {
throw new IOException(error.get());
}
}
public void close() {
if (connection == null) {
logger.warn("Stopping without connection. Was start() called?");
return;
}
connection.disconnect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.info("Disconnected from {}", host);
}
@Override
public void onFailure(Throwable ignored) {
// Disconnects never fail.
}
});
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment