diff --git a/buildSrc/src/main/groovy/eraser.java-ragconnect-conventions.gradle b/buildSrc/src/main/groovy/eraser.java-ragconnect-conventions.gradle new file mode 100644 index 0000000000000000000000000000000000000000..83a2995fa0e7bdb5697925dd0c3cf79f9782007a --- /dev/null +++ b/buildSrc/src/main/groovy/eraser.java-ragconnect-conventions.gradle @@ -0,0 +1,7 @@ +plugins { + id 'eraser.java-jastadd-conventions' +} + +dependencies { + compileOnly group: 'de.tudresden.inf.st', name: 'ragconnect', version: '0.3.1' +} diff --git a/eraser-base/build.gradle b/eraser-base/build.gradle index 63dc1cf62d41dbb534eb12133e494ac66379656d..b338994fa7665d19f4ae2b1145d6cbc4c3bf8d8a 100644 --- a/eraser-base/build.gradle +++ b/eraser-base/build.gradle @@ -8,11 +8,11 @@ buildscript { plugins { id 'eraser.java-application-conventions' - id 'eraser.java-jastadd-conventions' + id 'eraser.java-ragconnect-conventions' } dependencies { - jastadd2 "org.jastadd:jastadd:2.3.4" + jastadd2 "org.jastadd:jastadd:2.3.5" compileOnly group: 'de.tudresden.inf.st.jastadd', name: 'coverage-generator', version: '0.0.4' api group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}" @@ -25,12 +25,30 @@ dependencies { testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: "${log4j_version}" } -application { - mainClass = 'de.tudresden.inf.st.eraser.Main' -} +mainClassName = 'de.tudresden.inf.st.eraser.Main' -def relastFiles = fileTree('src/main/jastadd/') { +def ragConnectRelastFiles = fileTree('src/main/jastadd/') { include '**/*.relast' }.toList().toArray() +String[] ragconnectArguments = [ + '--o=src/gen/jastadd', + '--logReads', + '--logWrites', +// '--verbose', + '--rootNode=Root', +// '--experimental-jastadd-329', +// '--incremental=param', +// '--tracing=cache,flush', + 'src/main/jastadd/shem.connect', +] + +task ragConnect(type: JavaExec) { + group = 'Build' + main = 'org.jastadd.ragconnect.compiler.Compiler' + classpath = configurations.compileOnly + + args ragconnectArguments + ragConnectRelastFiles +} + String[] relastArguments = [ "libs/relast.jar", "--grammarName=./src/gen/jastadd/mainGen", @@ -39,6 +57,8 @@ String[] relastArguments = [ "--resolverHelper", "--file" ] +String[] relastFiles = ragConnectRelastFiles.collect { new File(it.toString().replace('/main/', '/gen/')) } + task preprocess(type: JavaExec) { group = 'Build' main = "-jar" @@ -52,6 +72,7 @@ task preprocess(type: JavaExec) { String[] coverageGenArguments = [ '--List=JastAddList', '--printYaml', + '--inputBaseDir=src/gen/jastadd', '--outputBaseDir=src/gen/jastadd' ] task generateCoverage(type: JavaExec) { @@ -115,6 +136,12 @@ sourceSets.main { } } +cleanGen.doFirst { + delete "src/gen/jastadd" + delete "src/gen/java" +} + +preprocess.dependsOn ragConnect generateAst.dependsOn preprocess generateAst.dependsOn generateCoverage generateAst.inputs.files file("./src/main/jastadd/mainGen.ast"), file("./src/main/jastadd/mainGen.jadd") diff --git a/eraser-base/src/main/jastadd/ItemHistory.jrag b/eraser-base/src/main/jastadd/ItemHistory.jrag index 1c256a56aa32bddda86f65a5ce951afdaf87d206..e1a36041291f1ab59d5c46ccec73e2f289670bac 100644 --- a/eraser-base/src/main/jastadd/ItemHistory.jrag +++ b/eraser-base/src/main/jastadd/ItemHistory.jrag @@ -98,7 +98,7 @@ aspect ItemHistory { } // override Item.sendState from MQTT aspect - refine MQTT protected void Item.sendState() throws Exception { + refine ItemHandling protected void Item.sendState() throws Exception { refined(); getRoot().getInfluxRoot().influxAdapter().write(pointFromState()); } diff --git a/eraser-base/src/main/jastadd/Printing.jrag b/eraser-base/src/main/jastadd/Printing.jrag index 60e77e3a00a122f60495df3f66216a08accfbdb2..18f5f5b106a5f73a27a2d41423abb504b350c0be 100644 --- a/eraser-base/src/main/jastadd/Printing.jrag +++ b/eraser-base/src/main/jastadd/Printing.jrag @@ -64,7 +64,7 @@ aspect Printing { .addNonDefault("label", getLabel()) .addRequired("state", getStateAsString()) .addOptional("category", hasCategory(), () -> getCategory().getName()) - .addOptional("topic", hasTopic(), () -> getTopic().getTopicString()) + .addNonDefault("topic", getTopicString()) .addOptionalPrettyPrint(getMetaData()) .build(); } @@ -91,7 +91,7 @@ aspect Printing { .addRequired("id", getID()) .addNonDefault("label", getLabel()) .addOptional("category", hasCategory(), () -> getCategory().getName()) - .addOptional("topic", hasTopic(), () -> getTopic().getTopicString()) + .addNonDefault("topic", getTopicString()) .addOptionalPrettyPrint(getMetaData()) .build(); } diff --git a/eraser-base/src/main/jastadd/Resolving.jrag b/eraser-base/src/main/jastadd/Resolving.jrag index f194a9f72348ede36fc56b6840a16b082880906c..0c898aa4e47347b4c7213a473cba3eba4d594af2 100644 --- a/eraser-base/src/main/jastadd/Resolving.jrag +++ b/eraser-base/src/main/jastadd/Resolving.jrag @@ -73,11 +73,6 @@ aspect Resolving { return java.util.Optional.empty(); } - //--- resolveMqttTopic --- - syn java.util.Optional<MqttTopic> Root.resolveMqttTopic(String mqttTopicId) { - return this.getMqttRoot().resolveTopic(mqttTopicId); - } - //--- resolveItemCategory --- syn java.util.Optional<ItemCategory> SmartHomeEntityModel.resolveItemCategory(String categoryName) { for (ItemCategory category : getItemCategoryList()) { @@ -161,12 +156,6 @@ aspect Resolving { return containingSmartHomeEntityModel().resolveDefaultChannelCategory(id).orElseThrow(() -> new RuntimeException("DefaultChannelCategory '" + id + "' not found!")); } - // Item.Topic? <-> MqttTopic.Item* - refine RefResolverStubs eq Item.resolveTopicByToken(String id) { - // not an actual resolving, also adds the new mqtt-topic under mqtt-root - return getRoot().getMqttRoot().getOrCreateMqttTopic(id); - } - // Item.Category? <-> ItemCategory.Items* refine RefResolverStubs eq Item.resolveCategoryByToken(String id) { // not an actual resolving, also adds the new item-category under containing model diff --git a/eraser-base/src/main/jastadd/eraser.parser b/eraser-base/src/main/jastadd/eraser.parser index 0da0601d2136e5ed105eddd81751da9d38e848c8..862fad3cb0f34812b95e57ccba42819355b0d71d 100644 --- a/eraser-base/src/main/jastadd/eraser.parser +++ b/eraser-base/src/main/jastadd/eraser.parser @@ -210,7 +210,7 @@ ItemPrototype item_body = ID EQUALS TEXT.n item_body.i {: i.setID(n); return i; :} | LABEL EQUALS TEXT.n item_body.i {: i.setLabel(n); return i; :} | STATE EQUALS TEXT.n item_body.i {: i.setStateFromString(n); return i; :} - | TOPIC EQUALS TEXT.n item_body.i {: i.setTopic(MqttTopic.createRef(n)); return i; :} + | TOPIC EQUALS TEXT.n item_body.i {: i.setTopicString(n); return i; :} | CATEGORY EQUALS TEXT.n item_body.i {: i.setCategory(ItemCategory.createRef(n)); return i; :} | PERFORMANCE EQUALS TEXT.n item_body.i {: i.setFrequencySetting(FrequencySetting.createRef(n)); return i; :} | META_DATA EQUALS meta_data.md item_body.i {: i.setMetaData(md); return i; :} diff --git a/eraser-base/src/main/jastadd/mqtt.jrag b/eraser-base/src/main/jastadd/mqtt.jrag index 95dd53770199b461679960acd542f5ed0486681a..f96d5a54cfdc5a8e77e11fd34f1f42cd2154fe03 100644 --- a/eraser-base/src/main/jastadd/mqtt.jrag +++ b/eraser-base/src/main/jastadd/mqtt.jrag @@ -3,27 +3,7 @@ aspect MQTT { // --- default values --- private static final int MqttRoot.DEFAULT_PORT = 1883; - //--- resolveTopic --- - syn java.util.Optional<MqttTopic> MqttRoot.resolveTopic(String topic) { - ensureCorrectPrefixes(); - if (!topic.startsWith(getIncomingPrefix())) { - logger.warn("Topic '{}' does not start with incoming prefix '{}'", topic, getIncomingPrefix()); - return java.util.Optional.empty(); - } - String suffix = topic.substring(getIncomingPrefix().length()); - return resolveTopicSuffix(suffix); - } - - //--- resolveTopicSuffix --- - syn java.util.Optional<MqttTopic> MqttRoot.resolveTopicSuffix(String suffix) { - for (MqttTopic current : getTopics()) { - if (current.getTopicString().equals(suffix)) { - return Optional.of(current); - } - } - return Optional.empty(); - } - + // --- ensureCorrectPrefixes --- public void MqttRoot.ensureCorrectPrefixes() { if (!getIncomingPrefix().isEmpty() && !getIncomingPrefix().endsWith("/")) { setIncomingPrefix(getIncomingPrefix() + "/"); @@ -33,65 +13,47 @@ aspect MQTT { } } - - //--- getIncomingTopic --- - syn String MqttTopic.getIncomingTopic() = getMqttRoot().getIncomingPrefix() + getTopicString(); - - //--- getOutgoingTopic --- - syn String MqttTopic.getOutgoingTopic() = getMqttRoot().getOutgoingPrefix() + getTopicString(); - - //--- getMqttSender (should be cached) --- - cache MqttRoot.getMqttSender(); - syn MQTTSender MqttRoot.getMqttSender() { - MQTTSender result; - if (getHost().exists()) { - result = new MQTTSenderImpl(); - } else { - result = new MQTTSenderStub(); + // --- getHost --- + syn ExternalHost MqttRoot.getHost() = new ExternalHost(); + + // --- connectAllItems --- + public boolean SmartHomeEntityModel.connectAllItems() throws IOException { + MqttRoot mqttRoot = getRoot().getMqttRoot(); + ExternalHost host = mqttRoot.getHost(); + // TODO user/password not used yet (not supported by ragconnect yet) + String prefix = "mqtt://" + host.getHostName() + (host.getPort() != 0 ? ":" + host.getPort() : "") + "/"; + + boolean success = true; + for (Item item : this.items()) { + String suffix = item.getTopicString().isBlank() ? item.getID() : item.getTopicString(); + ConnectReceive connectReceive; + ConnectSend connectSend; + if (item.isItemWithDoubleState()) { + connectReceive = item.asItemWithDoubleState()::connect_state; + connectSend = item.asItemWithDoubleState()::connect_state; + } else if (item.isItemWithBooleanState()) { + connectReceive = item.asItemWithBooleanState()::connect_state; + connectSend = item.asItemWithBooleanState()::connect_state; + } else if (item.isItemWithStringState()) { + connectReceive = item.asItemWithStringState()::connect_state; + connectSend = item.asItemWithStringState()::connect_state; + } else { + // unsupported item type + continue; + } + success &= connectReceive.apply(prefix + mqttRoot.getIncomingPrefix() + suffix) & + connectSend.apply(prefix + mqttRoot.getOutgoingPrefix() + suffix, false); } - return result.setHost(getHost()); + return success; } - //--- getMqttRoot --- - inh MqttRoot MqttTopic.getMqttRoot(); - - eq MqttRoot.getTopic().getMqttRoot() = this; - - /** - * Sends the current state via MQTT. - */ - refine ItemHandling protected void Item.sendState() throws Exception { - refined(); - if (getTopic() != null) { - getTopic().send(getStateAsString()); + class SmartHomeEntityModel { + interface ConnectReceive { + boolean apply(String uriString) throws IOException; } - } - -// public void Item.setState(String value, boolean shouldSendState) { -// this.setState(value); -// } - public void MqttTopic.send(String message) throws Exception { - getMqttRoot().getMqttSender().publish(getOutgoingTopic(), message); - } - - refine SmartHomeEntityModel public void SmartHomeEntityModel.addNewItem(Item item) { - refined(item); - // update mqtt-topic to new mqtt-root - item.setTopic(getRoot().getMqttRoot().getOrCreateMqttTopic(item.getTopic().getTopicString())); - } - - - - public MqttTopic MqttRoot.getOrCreateMqttTopic(String topicString) { - return resolveTopicSuffix(topicString).orElseGet(() -> { - MqttTopic result = new MqttTopic(); - result.setTopicString(topicString); - addTopic(result); - return result; - }); + interface ConnectSend { + boolean apply(String uriString, boolean writeCurrentValue) throws IOException; + } } - -syn ExternalHost MqttRoot.getHost() = new ExternalHost(); - } diff --git a/eraser-base/src/main/jastadd/mqtt.relast b/eraser-base/src/main/jastadd/mqtt.relast index 9251c2483448050e25a7bdff4153119b47f212d1..affe793340b0e71a94545ec9b3e5f88d2159e582 100644 --- a/eraser-base/src/main/jastadd/mqtt.relast +++ b/eraser-base/src/main/jastadd/mqtt.relast @@ -1,4 +1,2 @@ // ---------------- MQTT ------------------------------ -MqttRoot ::= Topic:MqttTopic* <IncomingPrefix:String> <OutgoingPrefix:String> /Host:ExternalHost/ ; -MqttTopic ::= <TopicString:String> ; -rel Item.Topic? <-> MqttTopic.Item* ; +MqttRoot ::= <IncomingPrefix:String> <OutgoingPrefix:String> /Host:ExternalHost/ ; diff --git a/eraser-base/src/main/jastadd/shem.connect b/eraser-base/src/main/jastadd/shem.connect new file mode 100644 index 0000000000000000000000000000000000000000..1f3f06ec92822ccea0104425b51550d2f6f73ba6 --- /dev/null +++ b/eraser-base/src/main/jastadd/shem.connect @@ -0,0 +1,24 @@ +receive ItemWithDoubleState._state using StringToDouble ; +send ItemWithDoubleState._state using DoubleToString ; + +StringToDouble maps String s to double {: + return Double.parseDouble(s); +:} + +DoubleToString maps double d to String {: + return Double.toString(d); +:} + +receive ItemWithBooleanState._state using StringToBoolean ; +send ItemWithBooleanState._state using BooleanToString ; + +StringToBoolean maps String s to boolean {: + return Boolean.parseBoolean(s); +:} + +BooleanToString maps boolean b to String {: + return Boolean.toString(b); +:} + +receive ItemWithStringState._state ; +send ItemWithStringState._state ; diff --git a/eraser-base/src/main/jastadd/shem.jrag b/eraser-base/src/main/jastadd/shem.jrag index f163ebf4824dab8d3c3107a35fe4654b48e6c2ad..030cf917bb221a93739effce1aef8b12f2a36ba3 100644 --- a/eraser-base/src/main/jastadd/shem.jrag +++ b/eraser-base/src/main/jastadd/shem.jrag @@ -28,9 +28,7 @@ aspect SmartHomeEntityModel { result.setID(this.getID()); result.setLabel(this.getLabel()); result.setMetaData(this.getMetaData()); - if (this.hasTopic()) { - result.setTopic(this.getTopic()); - } + result.setTopicString(this.getTopicString()); if (this.hasCategory()) { result.setCategory(this.getCategory()); } diff --git a/eraser-base/src/main/jastadd/shem.relast b/eraser-base/src/main/jastadd/shem.relast index aa83375e5ca39d9e61f1dfeba1141bd722d90739..58092bfc7da38ab73ccc8ab73afbb6e3cf12ba2b 100644 --- a/eraser-base/src/main/jastadd/shem.relast +++ b/eraser-base/src/main/jastadd/shem.relast @@ -29,7 +29,7 @@ rel Channel.LinkedItem* <-> Item.Channel? ; Parameter : DescribableModelElement ::= <Type:ParameterValueType> [DefaultValue:ParameterDefaultValue] <Context:String> <Required:boolean> ; ParameterDefaultValue ::= <Value:String> ; -abstract Item : LabelledModelElement ::= <_fetched_data:boolean> [MetaData] /ItemObserver/ /LastChanged/; +abstract Item : LabelledModelElement ::= <_fetched_data:boolean> <TopicString> [MetaData] /ItemObserver/ /LastChanged/; rel Item.Category? <-> ItemCategory.Items* ; rel Item.FrequencySetting? -> FrequencySetting ; diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/Main.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/Main.java index 59cf34a193f7f7d7960a4356b06b9ea9fc8307b8..23c5bdcbb527abb5c831e8d37994aefc947ff96a 100644 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/Main.java +++ b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/Main.java @@ -2,29 +2,63 @@ package de.tudresden.inf.st.eraser; import beaver.Parser; import de.tudresden.inf.st.eraser.deserializer.ASTNodeDeserializer; +import de.tudresden.inf.st.eraser.jastadd.model.MqttHandler; +import de.tudresden.inf.st.eraser.jastadd.model.MqttRoot; +import de.tudresden.inf.st.eraser.jastadd.model.NumberItem; import de.tudresden.inf.st.eraser.jastadd.model.Root; import de.tudresden.inf.st.eraser.openhab2.OpenHab2Importer; -import de.tudresden.inf.st.eraser.openhab2.mqtt.MQTTUpdater; import de.tudresden.inf.st.eraser.util.ParserUtils; +import de.tudresden.inf.st.eraser.util.TestUtils; import org.apache.logging.log4j.LogManager; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.CountDownLatch; /** * Main entry point for testing eraser. * @author rschoene - Initial contribution */ -@SuppressWarnings({"unused", "RedundantThrows"}) +@SuppressWarnings({"unused", "RedundantThrows", "CommentedOutCode"}) public class Main { - public static void main(String[] args) throws IOException, Parser.Exception { + public static void main(String[] args) throws Exception { // testSimple(); // Root model = testParser(); // Root model = importFromOpenHab(); // testPrinterWith(model); -// testUpdaterWith(model); + testRagConnectConnection(); + } + + private static void testRagConnectConnection() throws IOException, InterruptedException { + TestUtils.ModelAndItem mai = TestUtils.createModelAndItem(0, true); + NumberItem item = mai.item; + MqttRoot mqttRoot = mai.model.getRoot().getMqttRoot(); + mqttRoot.getHost().setHostName("localhost"); + MqttHandler handler = new MqttHandler().dontSendWelcomeMessage().setHost("localhost"); + mqttRoot.setIncomingPrefix("inc"); + mqttRoot.setOutgoingPrefix("out"); + mqttRoot.ensureCorrectPrefixes(); + + CountDownLatch exit = new CountDownLatch(1); + +// item.connect_state(mqttUri(mqttRoot.getIncomingPrefix() + item.getID())); +// item.connect_state(mqttUri(mqttRoot.getOutgoingPrefix() + item.getID()), true); + + boolean success = mai.model.connectAllItems(); + System.out.println("success = " + success); + + handler.newConnection("model", bytes -> System.out.println(mai.model.prettyPrint())); + handler.newConnection("exit", bytes -> exit.countDown()); + + exit.await(); + handler.close(); + mai.model.getRoot().ragconnectCloseConnections(); + } + + private static String mqttUri(String s) { + return "mqtt://localhost/" + s; } private static double readFromSystemIn(BufferedReader in, String prompt) throws IOException { @@ -46,27 +80,6 @@ public class Main { System.out.println(model.prettyPrint()); } - private static void testUpdater() { - Root model; -// model = importFromOpenHab(); - model = importFromFile(); - System.out.println("Got model: " + model.getSmartHomeEntityModel().description()); -// JsonSerializer.write(model, "openhab2-data.json"); - testUpdaterWith(model); - } - - private static void testUpdaterWith(Root model) { - final int seconds = 10; - System.out.println("Start!"); - try (MQTTUpdater updater = new MQTTUpdater(model)) { - LogManager.getLogger(Main.class).info("Processing mqtt updates for {} seconds", seconds); - updater.start(); - Thread.sleep(seconds * 1000); - } catch (IllegalArgumentException | InterruptedException | IOException e) { - LogManager.getLogger(Main.class).catching(e); - } - } - private static void testSimple() { String s = "hingType: id="; for (Character c : s.toCharArray()) { diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/deserializer/ASTNodeDeserializer.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/deserializer/ASTNodeDeserializer.java index ce62a6efa1d1bb40d7a9e05cd618938fdcebfa9d..ad9c374d8b332a541d510cb3b2cd7dc731ccaf57 100644 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/deserializer/ASTNodeDeserializer.java +++ b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/deserializer/ASTNodeDeserializer.java @@ -59,7 +59,6 @@ public class ASTNodeDeserializer extends StdDeserializer<ASTNode> { addResolverForSmartHomeEntityModel(resolversForSmartHomeEntityModel, ThingType.class, SmartHomeEntityModel::resolveThingType, "ID"); addResolverForSmartHomeEntityModel(resolversForSmartHomeEntityModel, ChannelType.class, SmartHomeEntityModel::resolveChannelType, "ID"); addResolverForSmartHomeEntityModel(resolversForSmartHomeEntityModel, Item.class, SmartHomeEntityModel::resolveItem, "ID"); - addResolver(resolvers, MqttTopic.class, Root::resolveMqttTopic, "IncomingTopic"); } private static void init() { diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/jastadd/model/MQTTSender.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/jastadd/model/MQTTSender.java deleted file mode 100644 index 34d06fe31302b49a46bf920541ab9d849b70d07f..0000000000000000000000000000000000000000 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/jastadd/model/MQTTSender.java +++ /dev/null @@ -1,62 +0,0 @@ -package de.tudresden.inf.st.eraser.jastadd.model; - -import org.fusesource.mqtt.client.QoS; - -import java.util.concurrent.TimeUnit; - -/** - * Small helper to publish messages to a MQTT broker. - * - * @author rschoene - Initial contribution - */ -public interface MQTTSender extends AutoCloseable { - - /** - * Sets the host running the MQTT broker (no username/password set). - * @param host host name (IP address or domain name) and port - */ - MQTTSender setHost(ExternalHost host); - - - /** - * Set the timeout used for connecting and disconnecting. - * @param connectTimeout Timeout value - * @param connectTimeoutUnit Timeout unit - */ - void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit); - - /** - * Set the timeout used for publishing messages. - * @param publishTimeout Timeout value - * @param publishTimeoutUnit Timeout unit - */ - void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit); - - /** - * Publishes a message in a topic at most once. - * @param topic the topic to publish at - * @param message the message to publish - * @throws Exception if the underlying connection throws an error - */ - default void publish(String topic, String message) throws Exception { - this.publish(topic, message, QoS.AT_MOST_ONCE); - } - - /** - * Publishes a message in a topic with the given quality of service (QoS). - * @param topic the topic to publish at - * @param message the message to publish - * @param qos the needed quality of service (at most once, at least once, exactly once) - * @throws Exception if the underlying connection throws an error - */ - void publish(String topic, String message, QoS qos) throws Exception; - - /** - * Checks, whether the connection to the host (set in the constructor) is established. - * @return <code>true</code> if this sender is connected to the host - */ - boolean isConnected(); - - @Override - void close() throws Exception; -} diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/jastadd/model/MQTTSenderImpl.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/jastadd/model/MQTTSenderImpl.java deleted file mode 100644 index 9fecbe6913f798a530c29b45be7af793df56e3b1..0000000000000000000000000000000000000000 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/jastadd/model/MQTTSenderImpl.java +++ /dev/null @@ -1,105 +0,0 @@ -package de.tudresden.inf.st.eraser.jastadd.model; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.fusesource.mqtt.client.*; - -import java.net.URI; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of a MQTT sender using <code>org.fusesource.mqtt.client</code>. - * - * @author rschoene - Initial contribution - */ -public class MQTTSenderImpl implements MQTTSender { - - private final Logger logger = LogManager.getLogger(MQTTSenderImpl.class);; - /** The connection to the MQTT broker. */ - private FutureConnection connection; - - /** Timeout for connect/disconnect methods */ - private long connectTimeout; - /** Unit of timeout for connect/disconnect methods */ - private TimeUnit connectTimeoutUnit; - - /** Timeout for publish method */ - private long publishTimeout; - /** Unit of timeout for publish method */ - private TimeUnit publishTimeoutUnit; - - @Override - public MQTTSender setHost(ExternalHost host) { - /* The host running the MQTT broker. */ - URI hostUri = URI.create("tcp://" + host.getHostName() + ":" + host.getPort()); - logger.debug("Host is {}", hostUri); - MQTT mqtt = new MQTT(); - String username = host.getUserName(); - String password = host.getPassword(); - if (username != null && !username.isEmpty()) { - mqtt.setUserName(username); - } - if (password != null && !password.isEmpty()) { - mqtt.setPassword(password); - } - mqtt.setHost(hostUri); - connection = mqtt.futureConnection(); - setConnectTimeout(2, TimeUnit.SECONDS); - setPublishTimeout(1, TimeUnit.SECONDS); - ensureConnected(); - return this; - } - - @Override - public void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit) { - this.connectTimeout = connectTimeout; - this.connectTimeoutUnit = connectTimeoutUnit; - } - - @Override - public void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit) { - this.publishTimeout = publishTimeout; - this.publishTimeoutUnit = publishTimeoutUnit; - } - - @Override - public void publish(String topic, String message, QoS qos) throws Exception { - if (ensureConnected()) { - logger.debug("Send: {} -> {}", topic, message); - connection.publish(topic, message.getBytes(), qos, false).await(publishTimeout, publishTimeoutUnit); - } - } - - /** - * Ensures an established connection. - * If already connected, return immediately. Otherwise try to connect. - * @return <code>true</code> if the connected was established successfully, <code>false</code> if there was an error - */ - private boolean ensureConnected() { - if (!isConnected()) { - try { - connection.connect().await(connectTimeout, connectTimeoutUnit); - } catch (Exception e) { - logger.warn("Could not connect", e); - return false; - } - } - return true; - } - - @Override - public boolean isConnected() { - return connection != null && connection.isConnected(); - } - - @Override - public void close() throws Exception { - if (connection == null) { - logger.warn("Stopping without connection."); - return; - } - if (isConnected()) { - connection.disconnect().await(connectTimeout, connectTimeoutUnit); - } - } -} diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/jastadd/model/MQTTSenderStub.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/jastadd/model/MQTTSenderStub.java deleted file mode 100644 index 2694b85f8232bb6cb83e20eef9bcd349fa63d63b..0000000000000000000000000000000000000000 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/jastadd/model/MQTTSenderStub.java +++ /dev/null @@ -1,61 +0,0 @@ -package de.tudresden.inf.st.eraser.jastadd.model; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.fusesource.mqtt.client.QoS; - -import java.util.concurrent.TimeUnit; - -/** - * Stub implementation of a MQTT sender only printing publish messages. - * - * @author rschoene - Initial contribution - */ -public class MQTTSenderStub implements MQTTSender { - - public interface PublishCallback { - void onPublish(String topic, String message, QoS qos); - } - - private Logger logger = LogManager.getLogger(MQTTSenderStub.class); - private PublishCallback callback; - - @Override - public MQTTSender setHost(ExternalHost host) { - return this; - } - - - public void setCallback(PublishCallback callback) { - this.callback = callback; - } - - @Override - public void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit) { - // empty - } - - @Override - public void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit) { - // empty - } - - @Override - public void publish(String topic, String message, QoS qos) { - // ignore QoS for now - logger.info("{}: {}", topic, message); - if (callback != null) { - callback.onPublish(topic, message, qos); - } - } - - @Override - public boolean isConnected() { - return true; - } - - @Override - public void close() { - // empty - } -} diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/openhab2/mqtt/MQTTUpdater.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/openhab2/mqtt/MQTTUpdater.java deleted file mode 100644 index 0952d3c333a06256c3a0fb97181854e43f6a077f..0000000000000000000000000000000000000000 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/openhab2/mqtt/MQTTUpdater.java +++ /dev/null @@ -1,84 +0,0 @@ -package de.tudresden.inf.st.eraser.openhab2.mqtt; - -import de.tudresden.inf.st.eraser.jastadd.model.ExternalHost; -import de.tudresden.inf.st.eraser.jastadd.model.Item; -import de.tudresden.inf.st.eraser.jastadd.model.Root; -import de.tudresden.inf.st.eraser.util.MqttReceiver; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.fusesource.mqtt.client.QoS; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * Update an imported model by subscribing to MQTT topics. - * - * @author rschoene - Initial contribution - */ -public class MQTTUpdater implements AutoCloseable { - - private final Logger logger; - - /** Receiver actually receiving messages. */ - private MqttReceiver delegatee; - - public MQTTUpdater() { - this.logger = LogManager.getLogger(MQTTUpdater.class); - this.delegatee = new MqttReceiver(); - } - - public MQTTUpdater(Root root) throws IllegalArgumentException { - this(); - this.setRoot(root); - } - - /** - * Sets the model root to update - * @param root the model root to update - */ - public void setRoot(Root root) { - ExternalHost host = root.getMqttRoot().getHost(); - delegatee.setHost(host); - delegatee.setOnMessage((topicString, message)-> - root.getMqttRoot().resolveTopic(topicString).ifPresent(topic -> - topic.getItems().forEach( - item -> itemUpdate(item, message)))); - delegatee.setTopicsForSubscription(root.getMqttRoot().getIncomingPrefix() + "#"); - delegatee.setQoSForSubscription(QoS.AT_LEAST_ONCE); - } - - private void itemUpdate(Item item, String state) { - String oldState = item.getStateAsString(); - if (oldState == null || !oldState.equals(state)) { - this.logger.debug("Update state of {} [{}] from '{}' to '{}'.", - item.getLabel(), item.getID(), oldState, state); - item.setStateFromString(state, false); - } - } - - /** - * Waits until this updater is ready to receive MQTT messages. - * If it already is ready, return immediately with the value <code>true</code>. - * Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan, - * if it got ready, or <code>false</code> upon a timeout. - * @param time the maximum time to wait - * @param unit the time unit of the time argument - * @return whether this updater is ready - */ - public boolean waitUntilReady(long time, TimeUnit unit) { - return delegatee.waitUntilReady(time, unit); - } - - /** - * Starts the updating process, i.e., subscribe to the required topics, and updates the model. - * @throws IOException if could not connect, or could not subscribe to a topic - */ - public void start() throws IOException { - delegatee.start(); - } - - public void close() { - delegatee.close(); - } -} diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/parser/EraserParserHelper.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/parser/EraserParserHelper.java deleted file mode 100644 index c832b194ac9d7592d7ddb9e3c099b6f19a972f55..0000000000000000000000000000000000000000 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/parser/EraserParserHelper.java +++ /dev/null @@ -1,476 +0,0 @@ -package de.tudresden.inf.st.eraser.parser; - -import de.tudresden.inf.st.eraser.jastadd.model.*; -import de.tudresden.inf.st.eraser.util.JavaUtils; -import de.tudresden.inf.st.eraser.util.ParserUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.*; -import java.util.function.BiConsumer; - -/** - * Resolving names while parsing model files. - * - * @author rschoene - Initial contribution - */ -@Deprecated -class EraserParserHelper { - private Logger logger = LogManager.getLogger(EraserParserHelper.class); - - private Map<String, ThingType> thingTypeMap = new HashMap<>(); - private Map<String, ChannelType> channelTypeMap = new HashMap<>(); - private Map<String, ChannelCategory> channelCategoryMap = new HashMap<>(); - private Map<String, Channel> channelMap = new HashMap<>(); - private Map<String, Parameter> parameterMap = new HashMap<>(); - private Map<String, Item> itemMap = new HashMap<>(); - private Map<String, Group> groupMap = new HashMap<>(); - private Map<String, FrequencySetting> FrequencySettingMap = new HashMap<>(); - - private Map<Thing, String> missingThingTypeMap = new HashMap<>(); - private Map<Channel, String> missingChannelTypeMap = new HashMap<>(); - private Map<Item, String> missingTopicMap = new HashMap<>(); - private Map<Item, String> missingItemCategoryMap = new HashMap<>(); - - private Map<Designator, String> missingItemForDesignator = new HashMap<>(); - - private Map<Thing, Iterable<String>> missingChannelListMap = new HashMap<>(); - private Map<Channel, Iterable<String>> missingItemLinkListMap = new HashMap<>(); - - private Map<Group, Iterable<String>> missingSubGroupListMap = new HashMap<>(); - private Map<Group, Iterable<String>> missingItemListMap = new HashMap<>(); - private Map<ThingType, Iterable<String>> missingChannelTypeListMap = new HashMap<>(); - private Map<ThingType, Iterable<String>> missingParameterListMap = new HashMap<>(); - - private Set<ASTNode> unusedElements = new HashSet<>(); - private Set<Item> groupedItems = new HashSet<>(); - - private List<Item> itemOrder = new LinkedList<>(); - - private Root root; - - private static boolean checkUnusedElements = true; - private static Root initialRoot = null; - - private class ItemPrototype extends DefaultItem { - - } - - /** - * Changes the behavior of the parser to check for unused elements. (Default: true) - * @param checkUnusedElements <code>true</code> to check for unused elements, <code>false</code> to skip the check - */ - public static void setCheckUnusedElements(boolean checkUnusedElements) { - EraserParserHelper.checkUnusedElements = checkUnusedElements; - } - - public static void setInitialRoot(Root root) { - EraserParserHelper.initialRoot = root; - } - - /** - * Post processing step after parsing a model, to resolve all references within the model. - */ - public void resolveReferences() { - if (this.root == null) { - // when parsing expressions - this.root = EraserParserHelper.initialRoot != null ? EraserParserHelper.initialRoot : createRoot(); - } - - if (checkUnusedElements) { - fillUnused(); - } - resolve(thingTypeMap, missingThingTypeMap, Thing::setType); - resolve(channelTypeMap, missingChannelTypeMap, Channel::setType); - if (itemMap == null || itemMap.isEmpty()) { - missingItemForDesignator.forEach((designator, itemName) -> - JavaUtils.ifPresentOrElse(root.getSmartHomeEntityModel().resolveItem(itemName), - designator::setItem, - () -> logger.warn("Could not resolve item {} for {}", itemName, designator))); - } else { - resolve(itemMap, missingItemForDesignator, Designator::setItem); - } -// missingTopicMap.forEach((item, s) -> item.setMqttTopic(s)); - this.root.getMqttRoot().ensureCorrectPrefixes(); - - resolveList(channelMap, missingChannelListMap, Thing::addChannel); - resolveList(itemMap, missingItemLinkListMap, Channel::addLinkedItem); - resolveList(groupMap, missingSubGroupListMap, Group::addGroup); - resolveList(itemMap, missingItemListMap, this::addItemToGroup); - resolveList(channelTypeMap, missingChannelTypeListMap, ThingType::addChannelType); - resolveList(parameterMap, missingParameterListMap, ThingType::addParameter); - - - createUnknownGroupIfNecessary(); -// createChannelCategories(); - createItemCategories(); - - if (checkUnusedElements) { - checkUnusedElements(); - } - - this.root.treeResolveAll(); - this.root.doFullTraversal(); - } - - private void addItemToGroup(Group group, Item item) { - groupedItems.add(item); - group.addItem(item); - } - - private void fillUnused() { - unusedElements.addAll(thingTypeMap.values()); - unusedElements.addAll(channelTypeMap.values()); - unusedElements.addAll(channelMap.values()); - unusedElements.addAll(parameterMap.values()); -// unusedElements.addAll(topicMap.values()); - } - - /** - * Create a group for all items not contained in a group. And warn if this was necessary. - */ - private void createUnknownGroupIfNecessary() { - if (itemMap.size() > groupedItems.size()) { - Set<Item> danglingItems = new HashSet<>(itemMap.values()); - danglingItems.removeAll(groupedItems); - // probably terrible performance - List<Item> sortedDanglingItems = new LinkedList<>(); - for (Item item : itemOrder) { - if (danglingItems.contains(item)) { - sortedDanglingItems.add(item); - } - } - ParserUtils.addToUnknownGroup(this.root.getSmartHomeEntityModel(), sortedDanglingItems); - } - } - - private void createItemCategories() { - Map<String, ItemCategory> newCategories = new HashMap<>(); - missingItemCategoryMap.forEach((item, category) -> - item.setCategory(newCategories.computeIfAbsent(category, ItemCategory::new))); - newCategories.values().forEach(node -> root.getSmartHomeEntityModel().addItemCategory(node)); - } - - private void checkUnusedElements() { - unusedElements.forEach(elem -> logger.info("{} '{}' defined, but not referenced.", elem.getClass().getSimpleName(), ident(elem))); - } - - private String ident(ASTNode elem) { - if (elem instanceof ModelElement) { - return ((ModelElement) elem).getID(); - } else if (elem instanceof MqttTopic) { - return ((MqttTopic) elem).getTopicString(); - } else if (elem instanceof DefaultChannelCategory) { - return ((DefaultChannelCategory) elem).getValue().name(); - } else if (elem instanceof SimpleChannelCategory) { - return ((SimpleChannelCategory) elem).getValue(); - } - return elem.toString(); - } - - private <Src extends ASTNode, Target extends ASTNode> void resolveList( - Map<String, Target> resolved, Map<Src, Iterable<String>> missing, BiConsumer<Src, Target> adder) { - missing.forEach( - (elem, keyList) -> keyList.forEach( - key -> resolve0(resolved, key, elem, adder))); - missing.clear(); - } - - private <Src extends ASTNode, Target extends ASTNode> void resolve( - Map<String, Target> resolved, Map<Src, String> missing, BiConsumer<Src, Target> setter) { - missing.forEach( - (elem, key) -> resolve0(resolved, key, elem, setter)); - missing.clear(); - } - - private <Src extends ASTNode, Target extends ASTNode> void resolve0( - Map<String, Target> resolved, String key, Src elem, BiConsumer<Src, Target> action) { - Target value = resolved.get(key); - if (value == null) { - logger.warn("Reference in {} {} for '{}' cannot be resolved", - elem.getClass().getSimpleName(), ident(elem), key); - return; - } - if (checkUnusedElements) { - unusedElements.remove(value); - } - action.accept(elem, value); - } - - //--- Thing and ThingType --- - - public Thing addThingType(Thing t, String typeName) { - missingThingTypeMap.put(t, typeName); - return t; - } - - public ThingType setChannelTypes(ThingType tt, StringList channelTypeNames) { - missingChannelTypeListMap.put(tt, channelTypeNames); - return tt; - } - - public ThingType setParameters(ThingType tt, StringList parameterNames) { - missingParameterListMap.put(tt, parameterNames); - return tt; - } - - public Thing setChannels(Thing t, StringList channelNames) { - missingChannelListMap.put(t, channelNames); - return t; - } - - public Thing setID(Thing thing, String id) { - thing.setID(id); - return thing; - } - - public FrequencySetting setID(FrequencySetting FrequencySetting, String id) { - FrequencySetting.setID(id); - FrequencySettingMap.put(id,FrequencySetting); - return FrequencySetting; - } - - public ThingType setID(ThingType thingType, String id) { - thingType.setID(id); - thingTypeMap.put(id, thingType); - return thingType; - } - - //--- Channel and ChannelType --- - - public ChannelType setItemType(ChannelType ct, String itemTypeName) { - ct.setItemType(ItemType.valueOf(itemTypeName)); - return ct; - } - - public Channel setChannelType(Channel c, String channelTypeName) { - missingChannelTypeMap.put(c, channelTypeName); - return c; - } - - - public Channel setLinks(Channel c, StringList linkNames) { - missingItemLinkListMap.put(c, linkNames); - return c; - } - - public ChannelType setID(ChannelType channelType, String id) { - channelType.setID(id); - channelTypeMap.put(id, channelType); - return channelType; - } - - public Channel setID(Channel channel, String id) { - channel.setID(id); - channelMap.put(id, channel); - return channel; - } - - //--- Item --- - - public Item createItem() { - ItemPrototype result = new ItemPrototype(); - result.disableSendState(); - return result; - } - - public Item setCategory(Item item, String categoryName) { - missingItemCategoryMap.put(item, categoryName); - return item; - } - - public Item retype(Item itemWithCorrectType, Item prototype) { - itemWithCorrectType.setID(prototype.getID()); - itemWithCorrectType.setLabel(prototype.getLabel()); - itemWithCorrectType.setMetaData(prototype.getMetaData()); - itemWithCorrectType.setFrequencySetting(prototype.getFrequencySetting()); - if (!(itemWithCorrectType instanceof ActivityItem)) { - String state = prototype.getStateAsString(); - itemWithCorrectType.disableSendState(); - if (state.isEmpty()) { - itemWithCorrectType.setStateToDefault(); - } else { - itemWithCorrectType.setStateFromString(state); - } - itemWithCorrectType.enableSendState(); - } - - moveMissingForRetype(itemWithCorrectType, prototype, missingTopicMap); - moveMissingForRetype(itemWithCorrectType, prototype, missingItemCategoryMap); - - itemMap.put(prototype.getID(), itemWithCorrectType); - - itemOrder.add(itemWithCorrectType); - - return itemWithCorrectType; - } - - private <T> void moveMissingForRetype(Item itemWithCorrectType, Item prototype, Map<Item, T> missingXMap) { - T value = missingXMap.get(prototype); - if (value != null) { - missingXMap.put(itemWithCorrectType, value); - } - missingXMap.remove(prototype); - } - - public Item setID(Item item, String id) { - item.setID(id); - itemMap.put(id, item); - return item; - } - - //--- Group --- - - public Group setSubGroups(Group g, StringList subGroupNames) { - missingSubGroupListMap.put(g, subGroupNames); - return g; - } - - public Group setItems(Group g, StringList itemNames) { - missingItemListMap.put(g, itemNames); - return g; - } - - public Group setSimpleAggregationFunction(Group g, String aggFunctionName) { - SimpleGroupAggregationFunctionName name = SimpleGroupAggregationFunctionName.valueOf(aggFunctionName.toUpperCase()); - g.setAggregationFunction(new SimpleGroupAggregationFunction(name)); - return g; - } - - public Group setParameterizedAggregationFunction(Group g, String aggFunctionName, StringList params) { - ParameterizedGroupAggregationFunctionName name = ParameterizedGroupAggregationFunctionName.valueOf( - aggFunctionName.toUpperCase()); - List<String> paramList = new ArrayList<>(); - params.iterator().forEachRemaining(paramList::add); - String param1, param2; - if (paramList.size() == 2) { - param1 = paramList.get(0); - param2 = paramList.get(1); - } else { - logger.error("Got {} instead of 2 parameters in group function {}!", paramList.size(), aggFunctionName); - param1 = "?"; - param2 = "?"; - } - g.setAggregationFunction(new ParameterizedGroupAggregationFunction(name, param1, param2)); - return g; - } - - public Group setID(Group group, String id) { - group.setID(id); - groupMap.put(id, group); - return group; - } - - //--- Parameter --- - - public Parameter setParameterValueType(Parameter p, String pvt) { - p.setType(ParameterValueType.valueOf(JavaUtils.toTitleCase(pvt))); - return p; - } - - public Parameter setDefault(Parameter p, String defaultValue) { - p.setDefaultValue(new ParameterDefaultValue(defaultValue)); - return p; - } - - public Parameter setID(Parameter parameter, String id) { - parameter.setID(id); - parameterMap.put(id, parameter); - return parameter; - } - - //--- MQTT --- - - public Item setTopic(Item item, String mqttTopicName) { - missingTopicMap.put(item, mqttTopicName); - return item; - } - - //--- Activity --- - - public MachineLearningRoot setActivities(MachineLearningRoot mlr, IntegerKeyMap map) { - for (AbstractMap.SimpleEntry<Integer, String> entry : map) { - Activity activity = new Activity(); - activity.setIdentifier(entry.getKey()); - activity.setLabel(entry.getValue()); - mlr.addActivity(activity); - } - return mlr; - } - - //--- Root --- - - public Root createRoot() { - this.root = Root.createEmptyRoot(); - return this.root; - } - - public Root createRoot(Thing t) { - Root result = createRoot(); - result.getSmartHomeEntityModel().addThing(t); - return result; - } - - public Root createRoot(Group g) { - Root result = createRoot(); - result.getSmartHomeEntityModel().addGroup(g); - return result; - } - - public Root createRoot(ThingType tt) { - Root result = createRoot(); - result.getSmartHomeEntityModel().addThingType(tt); - return result; - } - - public Root createRoot(ChannelType ct) { - Root result = createRoot(); - result.getSmartHomeEntityModel().addChannelType(ct); - return result; - } - - public Root createRoot(MqttRoot mr) { - Root result = createRoot(); - result.setMqttRoot(mr); - return result; - } - - public Root createRoot(InfluxRoot ir) { - Root result = createRoot(); - result.setInfluxRoot(ir); - return result; - } - - public Root createRoot(MachineLearningRoot ml) { - Root result = createRoot(); - result.setMachineLearningRoot(ml); - return result; - } - - public Root createRoot(Rule rule) { - Root result = createRoot(); - result.addRule(rule); - return result; - } - - public Root createRoot(FrequencySetting frequencySetting) { - Root result = createRoot(); - result.getSmartHomeEntityModel().addFrequencySetting(frequencySetting); - return result; - } - - //+++ newStuff (to be categorized) +++ - public Designator createDesignator(String itemName) { - Designator result = new Designator(); - missingItemForDesignator.put(result, itemName); - return result; - } - - public Rule createRule(Condition c, Action a) { - Rule result = new Rule(); - result.addCondition(c); - result.addAction(a); - return result; - } - -} diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/serializer/ASTNodeSerializer.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/serializer/ASTNodeSerializer.java index b8f56fac1bc68ef6609dff70e181d26d21924081..fd61ef39318c058389e9e412fcd36180ef912281 100644 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/serializer/ASTNodeSerializer.java +++ b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/serializer/ASTNodeSerializer.java @@ -37,16 +37,6 @@ public class ASTNodeSerializer extends StdSerializer<ASTNode> { jgen.writeStringField("v", ((ModelElement) elem).getID()); jgen.writeEndObject(); // end ID {} })); - serializers.put(MqttTopic.class, ((jgen, elem) -> - { - jgen.writeStartObject(); - jgen.writeFieldName("IncomingTopic"); - jgen.writeStartObject(); - jgen.writeStringField("k", "t"); - jgen.writeStringField("t", String.class.getName()); - jgen.writeStringField("v", ((MqttTopic) elem).getIncomingTopic()); - jgen.writeEndObject(); // end IncomingTopic {} - })); } Map<Class<?>, SerializeId> serializers = new HashMap<>(); @@ -98,8 +88,6 @@ public class ASTNodeSerializer extends StdSerializer<ASTNode> { SerializeId specificSerializer; if (elem instanceof ModelElement) { specificSerializer = serializers.get(ModelElement.class); - } else if (elem instanceof MqttTopic) { - specificSerializer = serializers.get(MqttTopic.class); } else if (elem == null) { throw new JsonGenerationException("Intrinsic reference to null in " + value.getClass().getSimpleName() + "." + m.getName(), jgen); diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/util/MqttReceiver.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/util/MqttReceiver.java deleted file mode 100644 index 161cbaf16025d56ad5a4f9189ecde65203108051..0000000000000000000000000000000000000000 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/util/MqttReceiver.java +++ /dev/null @@ -1,211 +0,0 @@ -package de.tudresden.inf.st.eraser.util; - -import de.tudresden.inf.st.eraser.jastadd.model.ExternalHost; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.fusesource.hawtbuf.Buffer; -import org.fusesource.hawtbuf.UTF8Buffer; -import org.fusesource.mqtt.client.*; - -import java.io.IOException; -import java.net.URI; -import java.util.Arrays; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; - -/** - * Subscribe to topics, receive and store messages. - * - * @author rschoene - Initial contribution - */ -public class MqttReceiver implements AutoCloseable { - - private final Logger logger; - - /** The host running the MQTT broker. */ - private URI host; - private String username; - private String password; - /** The connection to the MQTT broker. */ - private CallbackConnection connection; - /** Whether we are subscribed to the topics yet */ - private Condition readyCondition; - private Lock readyLock; - private boolean ready; - private BiConsumer<String, String> onMessageCallback; - private String[] topics; - private QoS qos; - - public MqttReceiver() { - this.logger = LogManager.getLogger(MqttReceiver.class); - this.readyLock = new ReentrantLock(); - this.readyCondition = readyLock.newCondition(); - this.ready = false; - qos = QoS.AT_LEAST_ONCE; - } - - /** - * Sets the host to receive messages from - */ - public void setHost(ExternalHost externalHost) { - this.host = URI.create("tcp://" + externalHost.getHostName() + ":" + externalHost.getPort()); - logger.debug("Host is {}", externalHost.getHostName()); - } - - - public void setOnMessage(BiConsumer<String, String> callback) { - this.onMessageCallback = callback; - } - - public void setTopicsForSubscription(String... topics) { - this.topics = topics; - } - - public void setQoSForSubscription(QoS qos) { - this.qos = qos; - } - - /** - * Waits until this updater is ready to receive MQTT messages. - * If it already is ready, return immediately with the value <code>true</code>. - * Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan, - * if it got ready, or <code>false</code> upon a timeout. - * @param time the maximum time to wait - * @param unit the time unit of the time argument - * @return whether this updater is ready - */ - public boolean waitUntilReady(long time, TimeUnit unit) { - try { - readyLock.lock(); - if (ready) { - return true; - } - return readyCondition.await(time, unit); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - readyLock.unlock(); - } - return false; - } - - /** - * Starts the updating process, i.e., subscribe to the required topics, and updates the model. - * @throws IOException if could not connect, or could not subscribe to a topic - */ - public void start() throws IOException { - if (ready) { - return; - } - Objects.requireNonNull(this.host, "Host need to be set!"); - MQTT mqtt = new MQTT(); - mqtt.setHost(this.host); - mqtt.setPassword(this.password); - mqtt.setUserName(this.username); - connection = mqtt.callbackConnection(); - AtomicReference<Throwable> error = new AtomicReference<>(); - connection.listener(new ExtendedListener() { - public void onConnected() { - logger.debug("Connected"); - } - - @Override - public void onDisconnected() { - logger.debug("Disconnected"); - } - - @Override - public void onPublish(UTF8Buffer topic, Buffer body, Callback<Callback<Void>> ack) { - String topicString = topic.toString(); - String message = body.ascii().toString(); -// logger.debug("{}: {}", topicString, message); - onMessageCallback.accept(topicString, message); - ack.onSuccess(null); // always acknowledge message - } - - @Override - public void onPublish(UTF8Buffer topicBuffer, Buffer body, Runnable ack) { - logger.warn("onPublish should not be called"); - } - - @Override - public void onFailure(Throwable cause) { -// logger.catching(cause); - error.set(cause); - } - }); - throwIf(error); - connection.connect(new Callback<Void>() { - @Override - public void onSuccess(Void value) { - connection.publish("components", "Eraser is listening".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() { - @Override - public void onSuccess(Void value) { - logger.debug("success sending welcome message"); - } - - @Override - public void onFailure(Throwable value) { - logger.debug("failure sending welcome message", value); - } - }); - Topic[] topicArray = Arrays.stream(topics).map(topicName -> new Topic(topicName, qos)).toArray(Topic[]::new); - logger.info("Connected, subscribing to {} topic(s) now.", topicArray.length); - connection.subscribe(topicArray, new Callback<byte[]>() { - @Override - public void onSuccess(byte[] qoses) { - logger.debug("Subscribed, qoses: {}", qoses); - try { - readyLock.lock(); - ready = true; - readyCondition.signalAll(); - } finally { - readyLock.unlock(); - } - } - - @Override - public void onFailure(Throwable cause) { - logger.error("Could not subscribe", cause); - } - }); - } - - @Override - public void onFailure(Throwable cause) { -// logger.error("Could not connect", cause); - error.set(cause); - } - }); - throwIf(error); - } - - private void throwIf(AtomicReference<Throwable> error) throws IOException { - if (error.get() != null) { - throw new IOException(error.get()); - } - } - - public void close() { - if (connection == null) { - logger.warn("Stopping without connection. Was start() called?"); - return; - } - connection.disconnect(new Callback<Void>() { - @Override - public void onSuccess(Void value) { - logger.info("Disconnected from {}", host); - } - - @Override - public void onFailure(Throwable ignored) { - // Disconnects never fail. - } - }); - } -} diff --git a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/util/TestUtils.java b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/util/TestUtils.java index 1e215384019efafe9139a8139d5034e9cc10f864..4873f2a35673881296f71b46d08f1d76f03d8d1f 100644 --- a/eraser-base/src/main/java/de/tudresden/inf/st/eraser/util/TestUtils.java +++ b/eraser-base/src/main/java/de/tudresden/inf/st/eraser/util/TestUtils.java @@ -3,6 +3,7 @@ package de.tudresden.inf.st.eraser.util; import de.tudresden.inf.st.eraser.jastadd.model.*; import java.util.ArrayList; +import java.util.concurrent.TimeUnit; /** * Helper class to create models used in tests. @@ -72,4 +73,18 @@ public class TestUtils { return model.getGroup(0); } + public static String getMqttHost() { + if (System.getenv("GITLAB_CI") != null) { + // we are in the CI, so use "mqtt" as host + return "mqtt"; + } { + // else assume a locally running mqtt broker + return "localhost"; + } + } + + public static void waitForMqtt() throws InterruptedException { + TimeUnit.MILLISECONDS.sleep(1500); + } + } diff --git a/eraser-base/src/test/java/de/tudresden/inf/st/eraser/MqttTests.java b/eraser-base/src/test/java/de/tudresden/inf/st/eraser/MqttTests.java index 7bfe873c38cfa72d907ab98d0eb28f2ba4edc195..b41bf65cded47beabf4d5e22fef2bd4aaa568311 100644 --- a/eraser-base/src/test/java/de/tudresden/inf/st/eraser/MqttTests.java +++ b/eraser-base/src/test/java/de/tudresden/inf/st/eraser/MqttTests.java @@ -1,25 +1,19 @@ package de.tudresden.inf.st.eraser; import de.tudresden.inf.st.eraser.jastadd.model.*; -import de.tudresden.inf.st.eraser.util.MqttReceiver; import de.tudresden.inf.st.eraser.util.TestUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import de.tudresden.inf.st.eraser.util.TestUtils.ModelAndItem; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; +import static de.tudresden.inf.st.eraser.util.TestUtils.getMqttHost; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assumptions.assumeTrue; /** * Test for everything related to MQTT. @@ -29,16 +23,6 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue; @Tag("mqtt") public class MqttTests { - public static String getMqttHost() { - if (System.getenv("GITLAB_CI") != null) { - // we are in the CI, so use "mqtt" as host - return "mqtt"; - } { - // else assume a locally running mqtt broker - return "localhost"; - } - } - private static final String outgoingPrefix = "out"; private static final String firstPart = "a"; private static final String alternativeFirstPart = "x"; @@ -47,185 +31,93 @@ public class MqttTests { private static final double secondState = 2.0; private static final double thirdState = 3.0; - private final List<String> messages = new ArrayList<>(); - private static final Logger logger = LogManager.getLogger(MqttTests.class); - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void resolve1(boolean useStub) { - ModelItemAndTwoTopics modelAB = createAB(useStub); - MqttRoot sut = modelAB.model.getRoot().getMqttRoot(); - - // incoming mqtt topic might be "inc/a/" or "inc/a/b" - - assertTrue(sut.resolveTopic("inc/a").isPresent()); - assertEquals(modelAB.firstTopic, sut.resolveTopic("inc/a").get(), "Could not resolve a."); - assertTrue(sut.resolveTopic("inc/a/b").isPresent()); - assertEquals(modelAB.secondTopic, sut.resolveTopic("inc/a/b").get(), "Could not resolve a/b."); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void brokerConnected(boolean useStub) throws Exception { - ModelItemAndTwoTopics modelAB = createAB(useStub); - MqttRoot sut = modelAB.model.getRoot().getMqttRoot(); -// MqttRoot mqttRoot = new MqttRoot(); -// mqttRoot.setHostByName("localhost"); -// MQTTSender sender = new MQTTSenderImpl().setHost(mqttRoot.getHost()); - MQTTSender sender = sut.getMqttSender(); - assumeTrue(sender.isConnected(), "Broker is not connected"); -// assertTrue(sender.isConnected()); - sender.publish("test", "me"); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void itemUpdateSend1(boolean useStub) throws Exception { + @Test + public void itemUpdateSend1() throws Exception { + // Given model with single item String expectedTopic = outgoingPrefix + "/" + firstPart + "/" + secondPart; - ModelItemAndTwoTopics modelAB = createAB(useStub); - assertSenderConnected(modelAB); + MqttHandler handler = new MqttHandler().dontSendWelcomeMessage().setHost(getMqttHost()); + assertTrue(handler.waitUntilReady(2, TimeUnit.SECONDS), "Could not connect to MQTT broker"); + List<String> messages = new ArrayList<>(); + handler.newConnection(expectedTopic, bytes -> messages.add(new String(bytes))); + ModelAndItem modelAB = createModelAndSetupMqttRoot(); NumberItem sut = modelAB.item; - sut.setTopic(modelAB.secondTopic); - - createMqttReceiver(useStub, expectedTopic); + sut.setTopicString(firstPart + "/" + secondPart); + assertTrue(modelAB.model.connectAllItems(), "Could not connect items"); + TestUtils.waitForMqtt(); + // When setting first state sut.setState(firstState); - assertTimeoutEquals(2, 1, messages::size); + // Then a message with first state should be sent + TestUtils.waitForMqtt(); + assertEquals(1, messages.size()); assertEquals(Double.toString(firstState), messages.get(0)); + // When setting second state sut.setState(secondState); - assertTimeoutEquals(2, 2, messages::size); + // Then a message with second state should be sent + TestUtils.waitForMqtt(); + assertEquals(2, messages.size()); assertEquals(Double.toString(secondState), messages.get(1)); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void itemUpdateSend2(boolean useStub) throws Exception { + @Test + public void itemUpdateSend2() throws Exception { + // Given model with two items, each with a different topic String expectedTopic1 = outgoingPrefix + "/" + firstPart + "/" + secondPart; String expectedTopic2 = outgoingPrefix + "/" + alternativeFirstPart + "/" + secondPart; - ModelItemAndTwoTopics modelAB = createAB(useStub); - assertSenderConnected(modelAB); + MqttHandler handler = new MqttHandler().dontSendWelcomeMessage().setHost(getMqttHost()); + assertTrue(handler.waitUntilReady(2, TimeUnit.SECONDS), "Could not connect to MQTT broker"); + List<String> messagesTopic1 = new ArrayList<>(); + List<String> messagesTopic2 = new ArrayList<>(); + handler.newConnection(expectedTopic1, bytes -> messagesTopic1.add(new String(bytes))); + handler.newConnection(expectedTopic2, bytes -> messagesTopic2.add(new String(bytes))); + ModelAndItem modelAB = createModelAndSetupMqttRoot(); NumberItem item1 = modelAB.item; - item1.setTopic(modelAB.secondTopic); - - MqttTopic alternativeB = createAndAddMqttTopic(modelAB.model.getRoot().getMqttRoot(),alternativeFirstPart + "/" + secondPart); - NumberItem item2 = TestUtils.addItemTo(modelAB.model, 0, true); - item2.setTopic(alternativeB); - - createMqttReceiver(useStub, expectedTopic1, expectedTopic2); + item1.setTopicString(firstPart + "/" + secondPart); + item2.setTopicString(alternativeFirstPart + "/" + secondPart); + assertTrue(modelAB.model.connectAllItems(), "Could not connect items"); + TestUtils.waitForMqtt(); + // When setting first state for both items item1.setState(firstState); - item2.setState(firstState); - assertTimeoutEquals(2, 2, messages::size); - assertEquals(Double.toString(firstState), messages.get(0)); - assertEquals(Double.toString(firstState), messages.get(1)); + // Then a message with first state should be sent to each topic + TestUtils.waitForMqtt(); + assertEquals(1, messagesTopic1.size()); + assertEquals(1, messagesTopic2.size()); + assertEquals(Double.toString(firstState), messagesTopic1.get(0)); + assertEquals(Double.toString(firstState), messagesTopic2.get(0)); + // When setting second state for first item, and third state for second item item1.setState(secondState); item2.setState(thirdState); - assertTimeoutEquals(3, 4, messages::size); - // TODO actually this does not test, whether the topic was correct for each state - assertThat(messages).contains(Double.toString(secondState)); - assertThat(messages).contains(Double.toString(thirdState)); - } - - private void assertSenderConnected(ModelItemAndTwoTopics modelAB) { - MqttRoot mqttRoot = modelAB.model.getRoot().getMqttRoot(); - if (!mqttRoot.getMqttSender().isConnected()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - logger.catching(e); - } - } - assertTrue(mqttRoot.getMqttSender().isConnected(), "Broker is not connected"); - } - - private void createMqttReceiver(boolean useStub, String... expectedTopics) throws IOException { - if (useStub) { - // do not need receiver, as messages are directly written by MqttSenderStub and callback - messages.clear(); - return; - } - MqttReceiver receiver = new MqttReceiver(); - List<String> expectedTopicList = Arrays.asList(expectedTopics); - receiver.setHost(ExternalHost.of(getMqttHost(),1883)); - receiver.setTopicsForSubscription(expectedTopics); - receiver.setOnMessage((topic, message) -> { - assertThat(expectedTopicList).contains(topic); - messages.add(message); - }); - receiver.start(); - receiver.waitUntilReady(2, TimeUnit.SECONDS); + // Then a message with respective state should be sent to each topic + TestUtils.waitForMqtt(); + assertEquals(2, messagesTopic1.size()); + assertEquals(2, messagesTopic2.size()); + assertThat(messagesTopic1).contains(Double.toString(secondState)); + assertThat(messagesTopic2).contains(Double.toString(thirdState)); } - private ModelItemAndTwoTopics createAB(boolean useStub) { - TestUtils.ModelAndItem mai = TestUtils.createModelAndItem(0, true); + private ModelAndItem createModelAndSetupMqttRoot() { + ModelAndItem mai = TestUtils.createModelAndItem(0, true); SmartHomeEntityModel model = mai.model; MqttRoot mqttRoot = new MqttRoot(); mqttRoot.setIncomingPrefix("inc"); mqttRoot.setOutgoingPrefix(outgoingPrefix); - if (useStub) { - - // now a SenderStub is being used - ((MQTTSenderStub) mqttRoot.getMqttSender()).setCallback(((topic, message, qos) -> messages.add(message))); - } else { - mqttRoot.getHost().setHostName(getMqttHost()).setPort(1883); - } - MqttTopic a = createAndAddMqttTopic(mqttRoot, firstPart); - MqttTopic ab = createAndAddMqttTopic(mqttRoot, firstPart + "/" + secondPart); + mqttRoot.getHost().setHostName(getMqttHost()).setPort(1883); mqttRoot.ensureCorrectPrefixes(); model.getRoot().setMqttRoot(mqttRoot); - return ModelItemAndTwoTopics.of(model, mai.item, a, ab); - } - - static class ModelItemAndTwoTopics { - SmartHomeEntityModel model; - NumberItem item; - MqttTopic firstTopic; - MqttTopic secondTopic; - static ModelItemAndTwoTopics of(SmartHomeEntityModel model, NumberItem item, MqttTopic firstTopic, MqttTopic secondTopic) { - ModelItemAndTwoTopics result = new ModelItemAndTwoTopics(); - result.model = model; - result.item = item; - result.firstTopic = firstTopic; - result.secondTopic = secondTopic; - return result; - } - } - - private MqttTopic createAndAddMqttTopic(MqttRoot mqttRoot, String suffix) { - MqttTopic result = new MqttTopic(); - result.setTopicString(suffix); - mqttRoot.addTopic(result); - return result; - } - - private <T> void assertTimeoutEquals(long seconds, T expected, Supplier<T> actualProvider) throws InterruptedException { - if (expected == actualProvider.get()) { - // already matched right now. return immediately. - return; - } - long targetEndTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(seconds); - while (System.nanoTime() < targetEndTime) { - // this is indeed busy waiting in favour of new dependencies handling it - //noinspection BusyWait - Thread.sleep(100); - if (expected == actualProvider.get()) { - break; - } - } - // final assessment, throw exception if not matched. Or pass, if previously matched. - assertEquals(expected, actualProvider.get()); + return mai; } } diff --git a/eraser.spark/src/main/java/de/tudresden/inf/st/eraser/spark/Application.java b/eraser.spark/src/main/java/de/tudresden/inf/st/eraser/spark/Application.java index 1e793fb89c91b7b5b496fa027825ae2a634d7602..f6ea69770cd0afbeeb3df4e8968d1c4f9f2c63a0 100644 --- a/eraser.spark/src/main/java/de/tudresden/inf/st/eraser/spark/Application.java +++ b/eraser.spark/src/main/java/de/tudresden/inf/st/eraser/spark/Application.java @@ -240,7 +240,7 @@ public class Application { private SimpleItem wrapItem(Item item) { return SimpleItem.of(item.getID(), item.getLabel(), - item.getTopic() != null ? item.getTopic().getTopicString() : null, + item.getTopicString(), item.isSendState(), wrapMetaData(item.getMetaData())); } diff --git a/eraser.starter/src/main/java/de/tudresden/inf/st/eraser/starter/EraserStarter.java b/eraser.starter/src/main/java/de/tudresden/inf/st/eraser/starter/EraserStarter.java index 83815a11615b3b2c1c8f851772c9a84208e4f316..cf5224c997d86c0949bc92b16e51c6366a65a66c 100644 --- a/eraser.starter/src/main/java/de/tudresden/inf/st/eraser/starter/EraserStarter.java +++ b/eraser.starter/src/main/java/de/tudresden/inf/st/eraser/starter/EraserStarter.java @@ -12,7 +12,6 @@ import de.tudresden.inf.st.eraser.feedbackloop.execute.ExecuteImpl; import de.tudresden.inf.st.eraser.feedbackloop.plan.PlanImpl; import de.tudresden.inf.st.eraser.jastadd.model.*; import de.tudresden.inf.st.eraser.openhab2.OpenHab2Importer; -import de.tudresden.inf.st.eraser.openhab2.mqtt.MQTTUpdater; import de.tudresden.inf.st.eraser.spark.Application; import de.tudresden.inf.st.eraser.util.ParserUtils; import net.sourceforge.argparse4j.ArgumentParsers; @@ -160,22 +159,11 @@ public class EraserStarter { } if (settings.mqttUpdate) { - logger.info("Starting MQTT updater"); - Thread t = new Thread(() -> { - try (MQTTUpdater updater = new MQTTUpdater(root)) { - updater.start(); - updater.waitUntilReady(5, TimeUnit.SECONDS); - lock.lock(); - quitCondition.await(); - } catch (IOException | InterruptedException e) { - logger.catching(e); - } finally { - lock.unlock(); - } - logger.info("MQTT update stopped"); - }, "MQTT-Updater"); - t.setDaemon(true); - t.start(); + try { + root.getSmartHomeEntityModel().connectAllItems(); + } catch (IOException e) { + logger.catching(e); + } } if (startRest) { @@ -217,6 +205,7 @@ public class EraserStarter { if (analyze != null) { analyze.stop(); } + root.ragconnectCloseConnections(); activityFactory.shutdown(); preferenceFactory.shutdown(); InfluxAdapter influxAdapter = root.getInfluxRoot().influxAdapter(); diff --git a/feedbackloop.learner_backup/src/main/java/de/tudresden/inf/st/eraser/feedbackloop.learner_backup/MachineLearningImpl.java b/feedbackloop.learner_backup/src/main/java/de/tudresden/inf/st/eraser/feedbackloop.learner_backup/MachineLearningImpl.java index b1f0062b0065e7fabb8a175c49c7d76ba1e4fcb0..9530d07923ee00b09cca5f0d45c253b263d446d3 100644 --- a/feedbackloop.learner_backup/src/main/java/de/tudresden/inf/st/eraser/feedbackloop.learner_backup/MachineLearningImpl.java +++ b/feedbackloop.learner_backup/src/main/java/de/tudresden/inf/st/eraser/feedbackloop.learner_backup/MachineLearningImpl.java @@ -97,7 +97,7 @@ public class MachineLearningImpl implements MachineLearningDecoder, MachineLearn } int i = 0; for (Item item1 : relevant_item_list) { - if (item.getTopic().toString().equals(item1.getTopic().toString())) { + if (item.getTopicString().equals(item1.getTopicString())) { this.a_new_data[i] = item.getStateAsString(); } i++; diff --git a/feedbackloop.learner_backup/src/test/java/de/tudresden/inf/st/eraser/feedbackloop/learner_backup/LearnerTestUtils.java b/feedbackloop.learner_backup/src/test/java/de/tudresden/inf/st/eraser/feedbackloop/learner_backup/LearnerTestUtils.java index 50d235f4b736ea0cb2d29f951004a977f61b449f..d127bf42d23984bfa33c00bbfcdf5d0c8ad5c08e 100644 --- a/feedbackloop.learner_backup/src/test/java/de/tudresden/inf/st/eraser/feedbackloop/learner_backup/LearnerTestUtils.java +++ b/feedbackloop.learner_backup/src/test/java/de/tudresden/inf/st/eraser/feedbackloop/learner_backup/LearnerTestUtils.java @@ -47,7 +47,7 @@ public class LearnerTestUtils { if (itemName.equals("activity")) return; Item item = createItem(itemName); group.addItem(item); - item.setTopic(result.getMqttRoot().getOrCreateMqttTopic(itemName)); + item.setTopicString(itemName); } ); // init activities diff --git a/feedbackloop.learner_backup/src/test/resources/log4j2-test.xml b/feedbackloop.learner_backup/src/test/resources/log4j2-test.xml index 8e963f0161f4fedbfc0ea28884cce0d22b42186a..b44aa172bf647c2e8f647fd245b29258b27c3b30 100644 --- a/feedbackloop.learner_backup/src/test/resources/log4j2-test.xml +++ b/feedbackloop.learner_backup/src/test/resources/log4j2-test.xml @@ -24,7 +24,5 @@ <!-- Stubs reduce noise--> <Logger name="de.tudresden.inf.st.eraser.jastadd.model.InfluxAdapterStub" level="WARN" additivity="false"> </Logger> - <Logger name="de.tudresden.inf.st.eraser.jastadd.model.MQTTSenderStub" level="WARN" additivity="false"> - </Logger> </Loggers> </Configuration> diff --git a/integration/src/main/java/de/tudresden/inf/st/eraser/integration/IntegrationMain.java b/integration/src/main/java/de/tudresden/inf/st/eraser/integration/IntegrationMain.java index 3d16a465d8bb2e67e47b4a7ae3fea0f3122791db..ac632518b93a0ca615095299d82d3164e311356a 100644 --- a/integration/src/main/java/de/tudresden/inf/st/eraser/integration/IntegrationMain.java +++ b/integration/src/main/java/de/tudresden/inf/st/eraser/integration/IntegrationMain.java @@ -1,21 +1,20 @@ package de.tudresden.inf.st.eraser.integration; -import com.opencsv.CSVParser; import com.opencsv.CSVParserBuilder; import com.opencsv.CSVReader; import com.opencsv.CSVReaderBuilder; import de.tudresden.inf.st.eraser.Main; import de.tudresden.inf.st.eraser.deserializer.ASTNodeDeserializer; import de.tudresden.inf.st.eraser.jastadd.model.*; -import de.tudresden.inf.st.eraser.openhab2.mqtt.MQTTUpdater; import de.tudresden.inf.st.eraser.serializer.JsonSerializer; -import org.fusesource.mqtt.client.QoS; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.fusesource.mqtt.client.QoS; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -56,14 +55,14 @@ public class IntegrationMain { .withQuoteChar('"') .build()) .withSkipLines(1) - .build(); - MQTTSender sender = mqttRoot.getMqttSender()){ - if (!sender.isConnected()) { - String msg = "MQTT sender is not connected, aborting to avoid waits at publish calls"; + .build()) { + MqttHandler sender = new MqttHandler().dontSendWelcomeMessage().setHost("localhost"); + if (!sender.waitUntilReady(2, TimeUnit.SECONDS)) { + String msg = "MQTT handler is not connected, aborting to avoid waits at publish calls"; logger.error(msg); throw new RuntimeException(msg); } else { - logger.debug("MQTT sender is connected"); + logger.debug("MQTT handler is connected"); } reader.iterator().forEachRemaining(line -> { System.out.println(Arrays.toString(line)); @@ -73,18 +72,17 @@ public class IntegrationMain { String message = line[3]; // TODO replay messages in real time, i.e., with delay between messages try { - sender.publish(topic, message, getQoSEnum(qos)); + sender.publish(topic, message.getBytes(StandardCharsets.UTF_8), getQoSEnum(qos), false); } catch (Exception e) { // abort the whole operation throw new RuntimeException(e); } }); - } catch(Exception e){ + } catch (Exception e) { e.printStackTrace(); } }); Thread eraser = new Thread(() -> { - final int seconds = 5; logger.info("Start!"); Root model = Main.importFromFile(); // Root model = importFromLocalFile(); @@ -92,8 +90,6 @@ public class IntegrationMain { MqttRoot mqttRoot = new MqttRoot(); mqttRoot.getHost().setHostName("localhost"); mqttRoot.setIncomingPrefix("oh2/out/"); - MqttTopic irisStateTopic = new MqttTopic(); - irisStateTopic.setTopicString("iris1_item/state"); Item iris = null; for (Item item : model.getSmartHomeEntityModel().items()) { if (item.getID().equals("iris1_item")) { @@ -105,21 +101,19 @@ public class IntegrationMain { logger.error("Could not find iris1. Exiting"); return; } - irisStateTopic.addItem(iris); + iris.setTopicString("iris1_item/state"); model.setMqttRoot(mqttRoot); // JsonSerializer.write(model, "src/main/resources/openhab2-data.json"); JsonSerializer.write(model, "openhab2-data.json"); - try (MQTTUpdater updater = new MQTTUpdater(model)) { - logger.info("Processing mqtt updates for {} seconds", seconds); - updater.start(); - boolean isReady = updater.waitUntilReady(2, TimeUnit.SECONDS); - if (!isReady) { - logger.warn("Updater seems not ready yet. Continue, but expect errors."); - } - modelLoaded.countDown(); - Thread.sleep(seconds * 1000); - } catch (IllegalArgumentException | InterruptedException | IOException e) { + boolean couldNotConnect; + try { + couldNotConnect = !model.getSmartHomeEntityModel().connectAllItems(); + } catch (IOException e) { logger.catching(e); + couldNotConnect = true; + } + if (couldNotConnect) { + logger.warn("Could not connect all items. Continue, but expect errors."); } }); mock.start();