diff --git a/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd b/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd index e8531033c790d20f50e7c43c174f4604ebd07a76..da358f6f9c591f5ef5ce6e392219ca30169a0984 100644 --- a/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd +++ b/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd @@ -60,6 +60,7 @@ aspect AttributesForMustache { syn String MEndpointDefinition.connectParameterName() = "uriString"; syn String MEndpointDefinition.connectMethod() = "connect" + entityName(); + syn String MEndpointDefinition.internalConnectMethod() = "_internal_" + connectMethod(); syn boolean MEndpointDefinition.isTypeEndpointDefinition() = endpointDef().isTypeEndpointDefinition(); syn String MEndpointDefinition.disconnectMethod() { diff --git a/ragconnect.base/src/main/resources/ListAspect.mustache b/ragconnect.base/src/main/resources/ListAspect.mustache index 44acef0f90fe32de15968e01a3e10e32e3db55a7..764b5441862c96a010f6d07f86e2c1aad2dec81c 100644 --- a/ragconnect.base/src/main/resources/ListAspect.mustache +++ b/ragconnect.base/src/main/resources/ListAspect.mustache @@ -6,7 +6,7 @@ public void {{JastAddList}}.serialize(com.fasterxml.jackson.core.JsonGenerator g child.serialize(g); } g.writeEndArray(); - } catch (IOException e) { + } catch (java.io.IOException e) { throw new SerializationException("unable to serialize {{JastAddList}}", e); } } diff --git a/ragconnect.base/src/main/resources/MqttHandler.jadd b/ragconnect.base/src/main/resources/MqttHandler.jadd index 4f651f7143089fce1b89843c1a503a552e47a051..a8f065e818c32874703d7f2775c589492fb6e8ee 100644 --- a/ragconnect.base/src/main/resources/MqttHandler.jadd +++ b/ragconnect.base/src/main/resources/MqttHandler.jadd @@ -1,8 +1,7 @@ -import java.io.IOException; -import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - -aspect MqttHandler { +import java.util.function.BiConsumer;aspect MqttHandler { public class MqttServerHandler { private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>(); private final java.util.Map<ConnectToken, java.util.function.BiConsumer<String, byte[]>> tokensForRemoval = new java.util.HashMap<>(); @@ -16,7 +15,7 @@ public class MqttServerHandler { public MqttServerHandler(String name) { this.name = name; - setupWaitUntilReady(1, TimeUnit.SECONDS); + setupWaitUntilReady(1, java.util.concurrent.TimeUnit.SECONDS); } public void setupWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) { @@ -24,7 +23,7 @@ public class MqttServerHandler { this.unit = unit; } - public MqttHandler resolveHandler(java.net.URI uri) throws IOException { + public MqttHandler resolveHandler(java.net.URI uri) throws java.io.IOException { MqttHandler handler = handlers.get(uri.getHost()); if (handler == null) { // first connect to that server @@ -40,33 +39,37 @@ public class MqttServerHandler { return handler; } - public ConnectToken newConnection(java.net.URI uri, java.util.function.BiConsumer<String, byte[]> callback) throws IOException { + public ConnectToken newConnection(java.net.URI uri, java.util.function.BiConsumer<String, byte[]> callback) throws java.io.IOException { ConnectToken connectToken = new ConnectToken(uri); resolveHandler(uri).newConnection(extractTopic(uri), callback); tokensForRemoval.put(connectToken, callback); return connectToken; } - public boolean disconnect(ConnectToken connectToken) throws IOException { + public boolean disconnect(ConnectToken connectToken) throws java.io.IOException { MqttHandler handler = resolveHandler(connectToken.uri); return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false; } - public void publish(java.net.URI uri, byte[] bytes) throws IOException { + public void publish(java.net.URI uri, byte[] bytes) throws java.io.IOException { resolveHandler(uri).publish(extractTopic(uri), bytes); } - public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws IOException { + public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws java.io.IOException { resolveHandler(uri).publish(extractTopic(uri), bytes, retain); } public void publish(java.net.URI uri, byte[] bytes, - org.fusesource.mqtt.client.QoS qos, boolean retain) throws IOException { + org.fusesource.mqtt.client.QoS qos, boolean retain) throws java.io.IOException { resolveHandler(uri).publish(extractTopic(uri), bytes, qos, retain); } public static String extractTopic(java.net.URI uri) { String path = uri.getPath(); + if (uri.getFragment() != null) { + // do not also append fragment, as it is illegal, that anything follows "#" in a mqtt topic anyway + path += "#"; + } if (path.charAt(0) == '/') { path = path.substring(1); } @@ -100,7 +103,8 @@ public class MqttHandler { private boolean sendWelcomeMessage = true; private org.fusesource.mqtt.client.QoS qos; /** Dispatch knowledge */ - private final java.util.Map<String, java.util.List<java.util.function.BiConsumer<String, byte[]>>> callbacks; + private final java.util.Map<String, java.util.List<java.util.function.BiConsumer<String, byte[]>>> normalCallbacks; + private final java.util.Map<java.util.regex.Pattern, java.util.List<java.util.function.BiConsumer<String, byte[]>>> wildcardCallbacks; public MqttHandler() { this("RagConnect"); @@ -109,7 +113,8 @@ public class MqttHandler { public MqttHandler(String name) { this.name = java.util.Objects.requireNonNull(name, "Name must be set"); this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class); - this.callbacks = new java.util.HashMap<>(); + this.normalCallbacks = new java.util.HashMap<>(); + this.wildcardCallbacks = new java.util.HashMap<>(); this.readyLatch = new java.util.concurrent.CountDownLatch(1); this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE; } @@ -122,21 +127,21 @@ public class MqttHandler { /** * Sets the host to receive messages from, and connects to it. * @param host name of the host to connect to, format is either <code>"$name"</code> or <code>"$name:$port"</code> - * @throws IOException if could not connect, or could not subscribe to a topic + * @throws java.io.IOException if could not connect, or could not subscribe to a topic * @return self */ public MqttHandler setHost(String host) throws java.io.IOException { if (host.contains(":")) { int colon_index = host.indexOf(":"); return setHost(host.substring(0, colon_index), - Integer.parseInt(host.substring(colon_index + 1))); + Integer.parseInt(host.substring(colon_index + 1))); } return setHost(host, DEFAULT_PORT); } /** * Sets the host to receive messages from, and connects to it. - * @throws IOException if could not connect, or could not subscribe to a topic + * @throws java.io.IOException if could not connect, or could not subscribe to a topic * @return self */ public MqttHandler setHost(String host, int port) throws java.io.IOException { @@ -167,8 +172,8 @@ public class MqttHandler { org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) { // this method is called, whenever a MQTT message is received String topicString = topic.toString(); - java.util.List<java.util.function.BiConsumer<String, byte[]>> callbackList = new java.util.ArrayList<>(callbacks.get(topicString)); - if (callbackList == null || callbackList.isEmpty()) { + java.util.List<java.util.function.BiConsumer<String, byte[]>> callbackList = callbacksFor(topicString); + if (callbackList.isEmpty()) { logger.debug("Got a message at {}, but no callback to call. Forgot to subscribe?", topic); } else { byte[] message = body.toByteArray(); @@ -199,20 +204,20 @@ public class MqttHandler { throwIf(error); // actually establish the connection - connection.connect(new org.fusesource.mqtt.client.Callback<Void>() { + connection.connect(new org.fusesource.mqtt.client.Callback<>() { @Override public void onSuccess(Void value) { if (MqttHandler.this.sendWelcomeMessage) { connection.publish("components", - (name + " is connected").getBytes(), - org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, - false, - new org.fusesource.mqtt.client.Callback<Void>() { - @Override - public void onSuccess(Void value) { - logger.debug("success sending welcome message"); - setReady(); - } + (name + " is connected").getBytes(), + org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, + false, + new org.fusesource.mqtt.client.Callback<>() { + @Override + public void onSuccess(Void value) { + logger.debug("success sending welcome message"); + setReady(); + } @Override public void onFailure(Throwable value) { @@ -233,6 +238,20 @@ public class MqttHandler { return this; } + private java.util.List<java.util.function.BiConsumer<String, byte[]>> callbacksFor(String topicString) { + java.util.List<java.util.function.BiConsumer<String, byte[]>> result = new java.util.ArrayList<>(); + List<BiConsumer<String, byte[]>> normalCallbackList = normalCallbacks.get(topicString); + if (normalCallbackList != null) { + result.addAll(normalCallbackList); + } + wildcardCallbacks.forEach((topicPattern, callback) -> { + if (topicPattern.matcher(topicString).matches()) { + result.addAll(callback); + } + }); + return result; + } + public java.net.URI getHost() { return host; } @@ -251,10 +270,22 @@ public class MqttHandler { this.qos = qos; } + /** + * Establish a new connection for some topic. + * @param topic the topic to create a connection for, may contain the wildcards "*" and "#" + * @param callback the callback to run if a new message arrives for this topic + * @return true if successful stored this connection, false otherwise (e.g., on failed subscribe) + */ public boolean newConnection(String topic, java.util.function.Consumer<byte[]> callback) { return newConnection(topic, (ignoredTopicString, bytes) -> callback.accept(bytes)); } + /** + * Establish a new connection for some topic. + * @param topic the topic to create a connection for, may contain the wildcards "*" and "#" + * @param callback the callback to run if a new message arrives for this topic + * @return true if successful stored this connection, false otherwise (e.g., on failed subscribe) + */ public boolean newConnection(String topic, java.util.function.BiConsumer<String, byte[]> callback) { if (readyLatch.getCount() > 0) { System.err.println("Handler not ready"); @@ -262,54 +293,124 @@ public class MqttHandler { } // register callback logger.debug("new connection for {}", topic); - if (callbacks.get(topic) == null || callbacks.get(topic).isEmpty()) { - callbacks.put(topic, new java.util.ArrayList<>()); - + final boolean needSubscribe; + if (isWildcardTopic(topic)) { + String regexForTopic = topic.replace("*", "[^/]*").replace("#", ".*"); + java.util.regex.Pattern pattern = java.util.regex.Pattern.compile(regexForTopic); + wildcardCallbacks.computeIfAbsent(pattern, p -> new java.util.ArrayList<>()) + .add(callback); + needSubscribe = true; + } else { // normal topic + java.util.List<java.util.function.BiConsumer<String, byte[]>> callbacksForTopic = normalCallbacks.get(topic); + if (callbacksForTopic == null || callbacksForTopic.isEmpty()) { + callbacksForTopic = new java.util.ArrayList<>(); + normalCallbacks.put(topic, callbacksForTopic); + needSubscribe = true; + } else { + needSubscribe = false; + } + callbacksForTopic.add(callback); + } + if (needSubscribe) { // subscribe at broker + CountDownLatch operationFinished = new CountDownLatch(1); + java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>(true); org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; connection.getDispatchQueue().execute(() -> { - connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() { + connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<>() { @Override public void onSuccess(byte[] qoses) { logger.debug("Subscribed to {}, qoses: {}", topic, qoses); + operationFinished.countDown(); } @Override public void onFailure(Throwable cause) { logger.error("Could not subscribe to {}", topic, cause); + success.set(false); + operationFinished.countDown(); } }); }); + try { + operationFinished.await(2, TimeUnit.SECONDS); + return success.get(); + } catch (InterruptedException e) { + return false; + } + } else { + return true; } - callbacks.get(topic).add(callback); - return true; } - public boolean disconnect(String topic, Object callback) { - java.util.List<java.util.function.BiConsumer<String, byte[]>> callbackList = callbacks.get(topic); - if (callbackList == null) { + private boolean isWildcardTopic(String topic) { + return topic.contains("*") || topic.contains("#"); + } + + public boolean disconnect(String topic, java.util.function.BiConsumer<String, byte[]> callback) { + boolean needUnsubscribe = false; + java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>(true); + + boolean foundTopicInCallbacks = false; + + // check if wildcard is to be removed + if (isWildcardTopic(topic)) { + java.util.regex.Pattern wildcardPatternToRemove = null; + for (java.util.Map.Entry<java.util.regex.Pattern, java.util.List<java.util.function.BiConsumer<String, byte[]>>> entry : wildcardCallbacks.entrySet()) { + if (entry.getKey().pattern().equals(topic)) { + foundTopicInCallbacks = true; + // if still successful, update with whether callback could be removed + success.compareAndSet(true, (entry.getValue().remove(callback))); + if (entry.getValue().isEmpty()) { + wildcardPatternToRemove = entry.getKey(); + needUnsubscribe = true; + } + break; + } + } + ; + if (wildcardPatternToRemove != null) { + wildcardCallbacks.remove(wildcardPatternToRemove); + } + } else if (normalCallbacks.containsKey(topic)) { + foundTopicInCallbacks = true; + // if still successful, update with whether callback could be removed + var normalCallbackList = normalCallbacks.get(topic); + success.compareAndSet(true, normalCallbackList.remove(callback)); + needUnsubscribe |= normalCallbackList.isEmpty(); + } + + if (!foundTopicInCallbacks) { logger.warn("Disconnect for not connected topic '{}'", topic); return false; } - java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>(); - success.set(callbackList.remove(callback)); - if (callbackList.isEmpty()) { + + if (needUnsubscribe) { + java.util.concurrent.CountDownLatch operationFinished = new java.util.concurrent.CountDownLatch(1); // no callbacks anymore for this topic, unsubscribe from mqtt connection.getDispatchQueue().execute(() -> { org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topic); org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer}; - connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<Void>() { + connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<>() { @Override public void onSuccess(Void value) { - // empty, all good + operationFinished.countDown(); } @Override public void onFailure(Throwable cause) { success.set(false); + logger.warn("Could not disconnect from {}", topic, cause); + operationFinished.countDown(); } }); }); + try { + operationFinished.await(2, java.util.concurrent.TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.catching(e); + success.set(false); + } } return success.get(); } @@ -338,7 +439,7 @@ public class MqttHandler { return; } connection.getDispatchQueue().execute(() -> { - connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() { + connection.disconnect(new org.fusesource.mqtt.client.Callback<>() { @Override public void onSuccess(Void value) { logger.info("Disconnected {} from {}", name, host); @@ -362,7 +463,7 @@ public class MqttHandler { public void publish(String topic, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) { connection.getDispatchQueue().execute(() -> { - connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback<Void>() { + connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback<>() { @Override public void onSuccess(Void value) { logger.debug("Published some bytes to {}", topic); diff --git a/ragconnect.base/src/main/resources/handleUri.mustache b/ragconnect.base/src/main/resources/handleUri.mustache index aa4176ef0b067bca3c54ca754096f633cadcfa71..ff0ea7af7f920ac09a14de75a4afe1882540ac94 100644 --- a/ragconnect.base/src/main/resources/handleUri.mustache +++ b/ragconnect.base/src/main/resources/handleUri.mustache @@ -4,7 +4,7 @@ try { uri = new java.net.URI({{connectParameterName}}); scheme = uri.getScheme(); host = uri.getHost(); - path = uri.getPath(); + path = uri.getPath() + (uri.getFragment() != null ? "#" : ""); } catch (java.net.URISyntaxException e) { System.err.println(e.getMessage()); // Maybe re-throw error? return false; diff --git a/ragconnect.base/src/main/resources/receiveDefinition.mustache b/ragconnect.base/src/main/resources/receiveDefinition.mustache index 672ddfffdcab77487165368f42fb9cc64cbf47eb..4ae8a05c6f97e03f5522160a487f33a710186a2c 100644 --- a/ragconnect.base/src/main/resources/receiveDefinition.mustache +++ b/ragconnect.base/src/main/resources/receiveDefinition.mustache @@ -21,29 +21,69 @@ syn int {{parentTypeName}}.{{resolveInListAttributeName}}(String topic) { {{/UseList}} {{/typeIsList}} -public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterName}}) throws java.io.IOException { - {{>handleUri}} +/** + * Connects the receive endpoint {{entityName}}. +{{#typeIsList}}{{#isWithAdd}} + * New values are appended to the end of the list. +{{/isWithAdd}}{{/typeIsList}} + * @param {{connectParameterName}} string describing protocol and path as an URI +{{#typeIsList}}{{^UseList}}{{^isWithAdd}} + * @param index index of node in list to connect (the list is expected to have enough elements) +{{/isWithAdd}}{{/UseList}}{{/typeIsList}} + * @return true if connect was successful, false otherwise + * @throws java.io.IOException if connect failed + */ +public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterName}}{{#typeIsList}}{{^UseList}}{{^isWithAdd}}, int index{{/isWithAdd}}{{/UseList}}{{/typeIsList}}) throws java.io.IOException { java.util.function.BiConsumer<String, byte[]> consumer = (topic, message) -> { {{> mappingApplication}} - {{#loggingEnabledForReads}} +{{#loggingEnabledForReads}} System.out.println("[Receive] " + {{connectParameterName}} + " -> {{entityName}} = " + {{lastResult}}); - {{/loggingEnabledForReads}} - {{#isTypeEndpointDefinition}} +{{/loggingEnabledForReads}} +{{#isTypeEndpointDefinition}} {{lastResult}}.treeResolveAll(); - {{#typeIsList}} - {{#UseList}} - {{#isWithAdd}} + {{#typeIsList}} + {{#UseList}} + {{#isWithAdd}} {{getterMethod}}().addAll({{lastResult}}); - {{/isWithAdd}} - {{^isWithAdd}} + {{/isWithAdd}} + {{^isWithAdd}} set{{entityName}}({{lastResult}}); - {{/isWithAdd}} - {{/UseList}} - {{^UseList}} - {{#isWithAdd}} + {{/isWithAdd}} + {{/UseList}} + {{^UseList}} + {{lastResult}}.set{{idTokenName}}(topic); + {{#isWithAdd}} {{getterMethod}}().add({{lastResult}}); - {{/isWithAdd}} - {{^isWithAdd}} + {{/isWithAdd}} + {{^isWithAdd}} + set{{entityName}}({{lastResult}}, index); + {{/isWithAdd}} + {{/UseList}} + {{/typeIsList}} + {{^typeIsList}} + set{{entityName}}({{lastResult}}); + {{/typeIsList}} +{{/isTypeEndpointDefinition}} +{{^isTypeEndpointDefinition}} + set{{entityName}}({{lastResult}}); +{{/isTypeEndpointDefinition}} + }; + return {{internalConnectMethod}}({{connectParameterName}}, consumer); +} + +{{#typeIsList}}{{^UseList}}{{^isWithAdd}} +/** + * Connects the receive endpoint {{entityName}} using a "wildcard" URI (if supported by the chosen protocol). + * @param {{connectParameterName}} string describing protocol and path as an URI + * @return true if connect was successful, false otherwise + * @throws java.io.IOException if connect failed +*/ +public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterName}}) throws java.io.IOException { + java.util.function.BiConsumer<String, byte[]> consumer = (topic, message) -> { + {{> mappingApplication}} +{{#loggingEnabledForReads}} + System.out.println("[Receive] " + {{connectParameterName}} + " (" + topic + ") -> {{entityName}} = " + {{lastResult}}); +{{/loggingEnabledForReads}} {{lastResult}}.set{{idTokenName}}(topic); int resolvedIndex = {{resolveInListAttributeName}}(topic); if (resolvedIndex == -1) { @@ -51,17 +91,14 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam } else { set{{entityName}}({{lastResult}}, resolvedIndex); } - {{/isWithAdd}} - {{/UseList}} - {{/typeIsList}} - {{^typeIsList}} - set{{entityName}}({{lastResult}}); - {{/typeIsList}} - {{/isTypeEndpointDefinition}} - {{^isTypeEndpointDefinition}} - set{{entityName}}({{lastResult}}); - {{/isTypeEndpointDefinition}} }; + return {{internalConnectMethod}}({{connectParameterName}}, consumer); +} +{{/isWithAdd}}{{/UseList}}{{/typeIsList}} + +private boolean {{parentTypeName}}.{{internalConnectMethod}}(String {{connectParameterName}}, + java.util.function.BiConsumer<String, byte[]> consumer) throws java.io.IOException { + {{>handleUri}} ConnectToken connectToken; switch (scheme) { {{#usesMqtt}} diff --git a/ragconnect.tests/src/test/01-input/list/README.md b/ragconnect.tests/src/test/01-input/list/README.md index 25aac47db64a28bb0631fbc7f33b062a6a22a2c4..7634a3d2069a10b5c07bef272f757bd5ee2df8c9 100644 --- a/ragconnect.tests/src/test/01-input/list/README.md +++ b/ragconnect.tests/src/test/01-input/list/README.md @@ -2,3 +2,33 @@ Idea: send and receive lists of subtrees. Once without incremental evaluation (i.e., using manual dependencies), and the other time with incremental evaluation + +## Execution-Model + +``` +SenderRoot ReceiverRoot +|- A* ---( mqtt: a ) ---+------> A* --------------------| +| \------> WidthAddFromA:A* ------| +|- SingleA:A* ,-> FromSingleA:A* --------| + \---( mqtt: single-a ) -+-> WithAddFromSingleA:A* -| +``` + +## Execution-Trace (SendInitialValue) + +| Input | # | A* | WidthAddFromA | FromSingleA | WithAddFromSingleA:A | +|---|---|---|---|---|---| +| 0 | 1 | [] | [0] | [] | [0] | +| 1 | 2 | [1] | [1] | [1] | [0,1] | +| 1 | 2 | [1] | [1] | [1] | [0,1] | +| 2 | 3 | [1,2] | [2] | [1,1,2] | [0,1,2] | +| 3 | 4 | [1,2,3] | [3] | [1,1,2,1,2,3] | [0,1,2,3] | + +## Execution-Trace (OnlyUpdate) + +| Input | # | A* | WidthAddFromA | FromSingleA | WithAddFromSingleA:A | +|---|---|---|---|---|---| +| - | 0 | [] | [] | [] | [] | +| 1 | 1 | [1] | [1] | [1] | [1] | +| 1 | 1 | [1] | [1] | [1] | [1] | +| 2 | 2 | [1,2] | [2] | [1,1,2] | [1,2] | +| 3 | 3 | [1,2,3] | [3] | [1,1,2,1,2,3] | [1,2,3] | diff --git a/ragconnect.tests/src/test/01-input/singleList/README.md b/ragconnect.tests/src/test/01-input/singleList/README.md index 7b4aa75261d7a2631f95a516ecad86fd30ffa440..6a76bb4d8457d1f246edaeb96d6d72048ef4ced1 100644 --- a/ragconnect.tests/src/test/01-input/singleList/README.md +++ b/ragconnect.tests/src/test/01-input/singleList/README.md @@ -2,3 +2,46 @@ Idea: send and receive single values for lists of subtrees. Once without incremental evaluation (i.e., using manual dependencies), and the other time with incremental evaluation + +## Execution-Model + +``` +SenderRoot ReceiverRoot +|- A1 --( a/1 ) --\ | +|- A2 --( a/2 ) --+=\ /--> A* --------------| +|- A3 --( a/3 ) ----+==+--> WithAdd:A* ------| +|- A4 --( a/4 ) --+=/ | +|- IO --( a/5 ) --/ | + /--> UsingWc:A* ------| + ( a/# ) -+--> UsingWcWithA:A* -| +``` + +## Computation + +A _n_ = Input _n_ + 1, e.g., A1 = Input1 + 1 + +## Execution-Trace (SendInitialValue) + +| Input | [A1,A2,A3,A4,IO] | # | A* | UsingWcA | WithAddA | UsingWcWithAddA:A | +|---|---|---|---|---|---|---| +| * | [1,2,3,4,0] | 5 | [1,2,3,4,0] | [1,2,3,4,0] | [1,2,3,4,0] | [1,2,3,4,0] | +| I1:1 | [2,2,3,4,0] | 6 | [2,2,3,4,0] | [2,2,3,4,0] | [1,2,3,4,0,2] | [1,2,3,4,0,2] | +| I1:1 | [2,2,3,4,0] | 6 | [2,2,3,4,0] | [2,2,3,4,0] | [1,2,3,4,0,2] | [1,2,3,4,0,2] | +| I1:2 | [3,2,3,4,0] | 7 | [3,2,3,4,0] | [3,2,3,4,0] | [1,2,3,4,0,2,3] | [1,2,3,4,0,2,3] | +| IO:5 | [3,2,3,4,5] | 8 | [3,2,3,4,5] | [3,2,3,4,5] | [1,2,3,4,0,2,3,5] | [1,2,3,4,0,2,3,5] +| I3:4 | [3,2,7,4,5] | 9 | [3,2,7,4,5] | [3,2,7,4,5] | [1,2,3,4,0,2,3,5,7] | [1,2,3,4,0,2,3,5,7] | + +*: (1:0, 2:0, 3:0, 4:0, 5:0) + +## Execution-Trace (OnlyUpdate) + +| Input | [A1,A2,A3,A4,IO] | # | A* | UsingWcA | WithAddA | UsingWcWithAddA:A | +|---|---|---|---|---|---|---| +| * | [-,-,-,-,-] | 0 | [0,0,0,0,0] | [] | [] | [] | +| I1:1 | [2,-,-,-,-] | 1 | [2,0,0,0,0] | [2] | [2] | [2] | +| I1:1 | [2,-,-,-,-] | 1 | [2,0,0,0,0] | [2] | [2] | [2] | +| I1:2 | [3,-,-,-,-] | 2 | [3,0,0,0,0] | [3] | [2,3] | [2,3] | +| IO:5 | [2,-,-,-,5] | 3 | [3,0,0,0,5] | [3,5] | [2,3,5] | [2,3,5] +| I3:4 | [2,-,7,-,5] | 4 | [3,0,7,0,5] | [3,5,7] | [2,3,5,7] | [2,3,5,7] | + +*: (1:0, 2:0, 3:0, 4:0, 5:0) diff --git a/ragconnect.tests/src/test/01-input/singleList/Test.connect b/ragconnect.tests/src/test/01-input/singleList/Test.connect index 6bca293308b30cf222e514e8d85c1a7bc67ce3fe..21b2796544ae65e96da5b03f088e5f7966620a7f 100644 --- a/ragconnect.tests/src/test/01-input/singleList/Test.connect +++ b/ragconnect.tests/src/test/01-input/singleList/Test.connect @@ -5,7 +5,7 @@ send tree SenderRoot.A4 ; send SenderRoot.InOutput using IntToA ; receive tree ReceiverRoot.A ; -receive tree with add ReceiverRoot.UsingWildcardA ; +receive tree ReceiverRoot.UsingWildcardA ; receive tree with add ReceiverRoot.WithAddA ; receive tree with add ReceiverRoot.UsingWildcardWithAddA ; diff --git a/ragconnect.tests/src/test/01-input/singleList/Test.jadd b/ragconnect.tests/src/test/01-input/singleList/Test.jadd index b010c138356cb2aba70222556350256f930966ae..005b419bb1f625a460e219079dcd3a5361b5844d 100644 --- a/ragconnect.tests/src/test/01-input/singleList/Test.jadd +++ b/ragconnect.tests/src/test/01-input/singleList/Test.jadd @@ -12,7 +12,6 @@ aspect Testing { class SenderRoot implements org.jastadd.ragconnect.tests.singleList.AbstractSingleListTest.TestWrapperSenderRoot {} class ReceiverRoot implements org.jastadd.ragconnect.tests.singleList.AbstractSingleListTest.TestWrapperReceiverRoot {} class A implements org.jastadd.ragconnect.tests.singleList.AbstractSingleListTest.TestWrapperA {} - class B implements org.jastadd.ragconnect.tests.singleList.AbstractSingleListTest.TestWrapperB {} class JastAddList<T> implements org.jastadd.ragconnect.tests.singleList.AbstractSingleListTest.TestWrapperJastAddList<T> {} } diff --git a/ragconnect.tests/src/test/01-input/singleList/Test.relast b/ragconnect.tests/src/test/01-input/singleList/Test.relast index 7734b956178e896b3a07c85f2f4ecd02bb01ff66..59e57d7f4c5d526b6831ea8fd090960f5d4c9e08 100644 --- a/ragconnect.tests/src/test/01-input/singleList/Test.relast +++ b/ragconnect.tests/src/test/01-input/singleList/Test.relast @@ -8,5 +8,5 @@ SenderRoot : Nameable ::= <Input1:int> /A1:A/ <InOutput:int> ; ReceiverRoot : Nameable ::= A* UsingWildcardA:A* WithAddA:A* UsingWildcardWithAddA:A* ; -A : Nameable ::= B* ; +A : Nameable ::= ; B : Nameable ; diff --git a/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/AbstractSingleListTest.java b/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/AbstractSingleListTest.java index cd68a9e5fa105a8353839ab407330f044d89bed5..3c2779f8d4188c0f1fff7f8773d27401e94dd597 100644 --- a/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/AbstractSingleListTest.java +++ b/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/AbstractSingleListTest.java @@ -4,11 +4,16 @@ import org.jastadd.ragconnect.tests.AbstractMqttTest; import org.jastadd.ragconnect.tests.TestUtils; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import singleListInc.ast.SenderRoot; +import singleList.ast.MqttHandler; import java.io.IOException; import java.nio.file.Paths; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import static java.util.Collections.addAll; @@ -34,13 +39,17 @@ public abstract class AbstractSingleListTest extends AbstractMqttTest { TestWrapperJastAddList<? extends TestWrapperA> getAList(); TestWrapperJastAddList<? extends TestWrapperA> getAs(); int getNumA(); + int getNumWithAddA(); TestWrapperA getA(int index); TestWrapperJastAddList<? extends TestWrapperA> getWithAddAList(); - TestWrapperJastAddList<? extends TestWrapperA> getWithAddAs(); + TestWrapperJastAddList<? extends TestWrapperA> getUsingWildcardAList(); + TestWrapperJastAddList<? extends TestWrapperA> getUsingWildcardWithAddAList(); - boolean connectA(String mqttUri) throws IOException; + @SuppressWarnings("unused") boolean connectA(String mqttUri) throws IOException; + boolean connectA(String mqttUri, int index) throws IOException; boolean connectUsingWildcardA(String mqttUri) throws IOException; + @SuppressWarnings("unused") boolean connectUsingWildcardA(String mqttUri, int index) throws IOException; boolean connectWithAddA(String mqttUri) throws IOException; boolean connectUsingWildcardWithAddA(String mqttUri) throws IOException; } @@ -64,11 +73,6 @@ public abstract class AbstractSingleListTest extends AbstractMqttTest { TestWrapperA getA4(); } public interface TestWrapperA { - TestWrapperB getB(int i); - int getNumB(); - int getID(); - } - public interface TestWrapperB { int getID(); } @@ -80,8 +84,8 @@ public abstract class AbstractSingleListTest extends AbstractMqttTest { protected static final String TOPIC_A_2 = "a/second"; protected static final String TOPIC_A_3 = "a/third"; protected static final String TOPIC_A_4 = "a/fourth"; - protected static final String TOPIC_A_WILDCARD = "a/#"; protected static final String TOPIC_A_5_INOUT = "a/special"; + protected static final String TOPIC_A_WILDCARD = "a/#"; protected TestWrapperSenderRoot senderRoot; protected TestWrapperReceiverRoot receiverRoot; @@ -110,11 +114,11 @@ public abstract class AbstractSingleListTest extends AbstractMqttTest { // connect. important: first receivers, then senders. to not miss initial value. // receive: explicit topic subscription - assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_1))); - assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_2))); - assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_3))); - assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_4))); - assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_5_INOUT))); + assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_1), 0)); + assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_2), 1)); + assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_3), 2)); + assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_4), 3)); + assertTrue(receiverRoot.connectA(mqttUri(TOPIC_A_5_INOUT), 4)); assertTrue(receiverRoot.connectWithAddA(mqttUri(TOPIC_A_1))); assertTrue(receiverRoot.connectWithAddA(mqttUri(TOPIC_A_2))); @@ -127,57 +131,104 @@ public abstract class AbstractSingleListTest extends AbstractMqttTest { assertTrue(receiverRoot.connectUsingWildcardWithAddA(mqttUri(TOPIC_A_WILDCARD))); // send: explicit topics, wait between connections to ensure correct arrival at receiver - assertTrue(senderRoot.connectA1(mqttUri(TOPIC_A_1), writeCurrentValue)); - if (writeCurrentValue) TestUtils.waitForMqtt(); - assertTrue(senderRoot.connectA2(mqttUri(TOPIC_A_2), writeCurrentValue)); - if (writeCurrentValue) TestUtils.waitForMqtt(); - assertTrue(senderRoot.connectA3(mqttUri(TOPIC_A_3), writeCurrentValue)); - if (writeCurrentValue) TestUtils.waitForMqtt(); + MqttHandler checkArrivalHandler = new MqttHandler().dontSendWelcomeMessage().setHost(TestUtils.getMqttHost()); + Map<String, CountDownLatch> arrived = new HashMap<>() {{ + put(TOPIC_A_1, new CountDownLatch(1)); + put(TOPIC_A_2, new CountDownLatch(1)); + put(TOPIC_A_3, new CountDownLatch(1)); + put(TOPIC_A_4, new CountDownLatch(1)); + }}; + checkArrivalHandler.waitUntilReady(2, TimeUnit.SECONDS); + checkArrivalHandler.newConnection("#", (topic, bytes) -> + Optional.ofNullable(arrived.get(topic)).ifPresent(CountDownLatch::countDown)); + assertTrue(senderRoot.connectA4(mqttUri(TOPIC_A_4), writeCurrentValue)); - if (writeCurrentValue) TestUtils.waitForMqtt(); + if (writeCurrentValue) { + assertTrue(arrived.get(TOPIC_A_4).await(2, TimeUnit.SECONDS)); + } + + assertTrue(senderRoot.connectA3(mqttUri(TOPIC_A_3), writeCurrentValue)); + if (writeCurrentValue) { + assertTrue(arrived.get(TOPIC_A_3).await(2, TimeUnit.SECONDS)); + } + + assertTrue(senderRoot.connectA2(mqttUri(TOPIC_A_2), writeCurrentValue)); + if (writeCurrentValue) { + assertTrue(arrived.get(TOPIC_A_2).await(2, TimeUnit.SECONDS)); + } + + assertTrue(senderRoot.connectA1(mqttUri(TOPIC_A_1), writeCurrentValue)); + if (writeCurrentValue) { + assertTrue(arrived.get(TOPIC_A_1).await(2, TimeUnit.SECONDS)); + } + assertTrue(senderRoot.connectInOutput(mqttUri(TOPIC_A_5_INOUT), writeCurrentValue)); + // no need to wait here, because first "checkTree" will wait anyway + checkArrivalHandler.close(); } abstract protected void setupReceiverAndConnectPart() throws IOException; @Override protected void communicateSendInitialValue() throws InterruptedException { - checkTree(1, list(1, 2, 3, 4, 5), list(1, 2, 3, 4, 5), // normal - list(1, 2, 3, 4, 5), list(1, 2, 3, 4, 5)); // withAdd - - // TODO check below - - setInput(1, 0); - checkTree(1, list(1), list(2), list(3), list(4)); + checkTree(5, list(1, 2, 3, 4, 0), list(4, 3, 2, 1, 0), // normal: (normal / wildcard) + list(4, 3, 2, 1, 0), list(4, 3, 2, 1, 0)); // withAdd: (normal / wildcard) + // A1 will be 2 (1+1, previously 1) setInput(1, 1); - checkTree(2, list(2), list(1), list(1), list(0, 1)); + checkTree(6, list(2, 2, 3, 4, 0), list(4, 3, 2, 2, 0), // normal: (normal / wildcard) + list(4, 3, 2, 1, 0, 2), list(4, 3, 2, 1, 0, 2)); // withAdd: (normal / wildcard) - setInput(2); - checkTree(3, list(1, 2), list(2), list(1, 1, 2), list(0, 1, 2)); - - setInput(3); - checkTree(4, list(1, 2, 3), list(3), list(1, 1, 2, 1, 2, 3), list(0, 1, 2, 3)); + // A1 should stay at 2 + setInput(1, 1); + checkTree(6, list(2, 2, 3, 4, 0), list(4, 3, 2, 2, 0), // normal: (normal / wildcard) + list(4, 3, 2, 1, 0, 2), list(4, 3, 2, 1, 0, 2)); // withAdd: (normal / wildcard) + + // A1 will be 3 (2+1, previously 2) + setInput(1, 2); + checkTree(7, list(3, 2, 3, 4, 0), list(4, 3, 2, 3, 0), // normal: (normal / wildcard) + list(4, 3, 2, 1, 0, 2, 3), list(4, 3, 2, 1, 0, 2, 3)); // withAdd: (normal / wildcard) + + // InOut will be 5 (previously 0) + setInput(5, 5); + checkTree(8, list(3, 2, 3, 4, 5), list(4, 3, 2, 3, 5), // normal: (normal / wildcard) + list(4, 3, 2, 1, 0, 2, 3, 5), list(4, 3, 2, 1, 0, 2, 3, 5)); // withAdd: (normal / wildcard) + + // A3 will be 7 (4+3, previously 3) + setInput(3, 4); + checkTree(9, list(3, 2, 7, 4, 5), list(4, 7, 2, 3, 5), // normal: (normal / wildcard) + list(4, 3, 2, 1, 0, 2, 3, 5, 7), list(4, 3, 2, 1, 0, 2, 3, 5, 7)); // withAdd: (normal / wildcard) } @Override protected void communicateOnlyUpdatedValue() throws InterruptedException { + checkTree(0, list(0, 0, 0, 0, 0), list(), // normal + list(), list()); // withAdd - // TODO check below - - checkTree(0, list(), list(), list(), list()); - - setInput(1); - checkTree(1, list(1), list(1), list(1), list(1)); - - setInput(1); - checkTree(1, list(1), list(1), list(1), list(1)); - - setInput(2); - checkTree(2, list(1, 2), list(2), list(1, 1, 2), list(1, 2)); + // A1 will be 2 (1+1, previously 1) + setInput(1, 1); + checkTree(1, list(2, 0, 0, 0, 0), list(2), // normal + list(2), list(2)); // withAdd - setInput(3); - checkTree(3, list(1, 2, 3), list(3), list(1, 1, 2, 1, 2, 3), list(1, 2, 3)); + // A1 should stay at 2 + setInput(1, 1); + checkTree(1, list(2, 0, 0, 0, 0), list(2), // normal + list(2), list(2)); // withAdd + + // A1 will be 3 (2+1, previously 2) + setInput(1, 2); + checkTree(2, list(3, 0, 0, 0, 0), list(3), // normal + list(2, 3), list(2, 3)); // withAdd + + // InOut will be 5 (previously 0) + setInput(5, 5); + checkTree(3, list(3, 0, 0, 0, 5), list(3, 5), // normal + list(2, 3, 5), list(2, 3, 5)); // withAdd + + // A3 will be 7 (4+3, previously 3) + setInput(3, 4); + checkTree(4, list(3, 0, 7, 0, 5), list(3,5,7), // normal + list(2, 3, 5, 7), list(2, 3, 5, 7)); // withAdd } protected void setInput(int index, int input) { @@ -190,51 +241,36 @@ public abstract class AbstractSingleListTest extends AbstractMqttTest { case 5: senderRoot.setInOutput(input); return; default: fail("Wrong index " + index); return; } - assertEquals(input, actualComputedValue, "ID value of single A"); + assertEquals(input + index, actualComputedValue, "ID value of single A"); } private void checkTree(int expectedTransmissions, IntList normalA, IntList usingWildcardA, IntList withAddA, IntList usingWildcardWithAddA) throws InterruptedException { TestUtils.waitForMqtt(); + assertEquals(expectedTransmissions, data.numberOfElements, "transmissions for any A"); - // TODO check below + checkList(normalA.toList(), receiverRoot.getNumA(), receiverRoot::getA); + checkList(normalA.toList(), receiverRoot.getAList()); - assertEquals(expectedTransmissions, data.numberOfElements, "transmissions for normal"); + checkList(usingWildcardA.toList(), receiverRoot.getUsingWildcardAList()); - checkList(normalA.toList(), receiverRoot.getNumA(), receiverRoot::getA, true); - checkList(normalA.toList(), receiverRoot.getAList(), true); - checkList(normalA.toList(), receiverRoot.getAs(), true); + checkList(withAddA.toList(), receiverRoot.getWithAddAList()); - checkList(withAddA.toList(), receiverRoot.getWithAddAList(), true); - checkList(withAddA.toList(), receiverRoot.getWithAddAs(), true); + checkList(usingWildcardWithAddA.toList(), receiverRoot.getUsingWildcardWithAddAList()); } - private void checkList(List<Integer> expectedList, int numChildren, Function<Integer, TestWrapperA> getA, boolean expectB) { - - // TODO check below - + private void checkList(List<Integer> expectedList, int numChildren, Function<Integer, TestWrapperA> getA) { assertEquals(expectedList.size(), numChildren, "same list size"); for (int index = 0; index < expectedList.size(); index++) { TestWrapperA a = getA.apply(index); assertEquals(expectedList.get(index), a.getID(), "correct ID for A"); - if (expectB) { - assertEquals(1, a.getNumB(), "one B child"); - assertEquals(expectedList.get(index) + 1, a.getB(0).getID(), "correct ID for B child"); - } } } - private void checkList(List<Integer> expectedList, TestWrapperJastAddList<? extends TestWrapperA> actualList, boolean expectB) { - - // TODO check below - + private void checkList(List<Integer> expectedList, TestWrapperJastAddList<? extends TestWrapperA> actualList) { assertEquals(expectedList.size(), actualList.getNumChild(), "same list size"); int index = 0; for (TestWrapperA a : actualList) { assertEquals(expectedList.get(index), a.getID(), "correct ID for A"); - if (expectB) { - assertEquals(1, a.getNumB(), "one B child"); - assertEquals(expectedList.get(index) + 1, a.getB(0).getID(), "correct ID for B child"); - } index++; } } diff --git a/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/SingleListIncrementalTest.java b/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/SingleListIncrementalTest.java index f65651090b7ed2d640eaef0b0e8a73744e245d62..7a946e44499f71dc0dfb92271ca1fc414275597b 100644 --- a/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/SingleListIncrementalTest.java +++ b/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/SingleListIncrementalTest.java @@ -2,14 +2,12 @@ package org.jastadd.ragconnect.tests.singleList; import org.jastadd.ragconnect.tests.TestUtils; import org.junit.jupiter.api.Tag; -import singleListInc.ast.MqttHandler; -import singleListInc.ast.ReceiverRoot; -import singleListInc.ast.Root; -import singleListInc.ast.SenderRoot; +import singleListInc.ast.*; import java.io.IOException; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -33,8 +31,15 @@ public class SingleListIncrementalTest extends AbstractSingleListTest { senderRoot = new SenderRoot(); model.addSenderRoot((SenderRoot) senderRoot); - receiverRoot = new ReceiverRoot(); - model.addReceiverRoot((ReceiverRoot) receiverRoot); + ReceiverRoot localReceiverRoot = new ReceiverRoot(); + model.addReceiverRoot(localReceiverRoot); + + // first prepare non-wildcard lists + for (int i = 0; i < 5; i++) { + localReceiverRoot.addA(new A()); + } + receiverRoot = localReceiverRoot; + assertEquals(5, receiverRoot.getNumA()); } @Override diff --git a/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/SingleListManualTest.java b/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/SingleListManualTest.java index c5ba4752b231a99fbf376c6e6b314b8ac7dcf56b..c2896f633bc299159f0ab74913c3a9c0456d8370 100644 --- a/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/SingleListManualTest.java +++ b/ragconnect.tests/src/test/java/org/jastadd/ragconnect/tests/singleList/SingleListManualTest.java @@ -1,14 +1,12 @@ package org.jastadd.ragconnect.tests.singleList; import org.jastadd.ragconnect.tests.TestUtils; -import singleList.ast.MqttHandler; -import singleList.ast.ReceiverRoot; -import singleList.ast.Root; -import singleList.ast.SenderRoot; +import singleList.ast.*; import java.io.IOException; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -31,8 +29,15 @@ public class SingleListManualTest extends AbstractSingleListTest { senderRoot = new SenderRoot(); model.addSenderRoot((SenderRoot) senderRoot); - receiverRoot = new ReceiverRoot(); - model.addReceiverRoot((ReceiverRoot) receiverRoot); + ReceiverRoot localReceiverRoot = new ReceiverRoot(); + model.addReceiverRoot(localReceiverRoot); + + // first prepare non-wildcard lists + for (int i = 0; i < 5; i++) { + localReceiverRoot.addA(new A()); + } + receiverRoot = localReceiverRoot; + assertEquals(5, receiverRoot.getNumA()); } @Override @@ -43,7 +48,10 @@ public class SingleListManualTest extends AbstractSingleListTest { assertTrue(handler.waitUntilReady(2, TimeUnit.SECONDS)); // add dependencies - ((SenderRoot) senderRoot).addInputDependencyToA((SenderRoot) senderRoot); + ((SenderRoot) senderRoot).addInputDependencyToA1((SenderRoot) senderRoot); + ((SenderRoot) senderRoot).addInputDependencyToA2((SenderRoot) senderRoot); + ((SenderRoot) senderRoot).addInputDependencyToA3((SenderRoot) senderRoot); + ((SenderRoot) senderRoot).addInputDependencyToA4((SenderRoot) senderRoot); data = new ReceiverData(); handler.newConnection(TOPIC_A_WILDCARD, bytes -> data.numberOfElements += 1);