diff --git a/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java b/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java index d58d10d09149d4eb9e3bdc1fe9e5f55a3606cb01..d2894e6c6475b3929f063fa7742a4bb11002c00d 100644 --- a/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java +++ b/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java @@ -78,6 +78,7 @@ public class Compiler extends AbstractCompiler { printMessage("Writing output files"); final List<String> handlers = new ArrayList<>(); + handlers.add("RagConnectObserver.jadd"); if (ASTNode.usesMqtt) { handlers.add("MqttHandler.jadd"); } diff --git a/ragconnect.base/src/main/resources/MqttHandler.jadd b/ragconnect.base/src/main/resources/MqttHandler.jadd index f5e6ba99b3058f59f6b9b40ae0d9c07f18da88c3..6b0ea71960b7d465ea1ddd63b62c6b19caaffe70 100644 --- a/ragconnect.base/src/main/resources/MqttHandler.jadd +++ b/ragconnect.base/src/main/resources/MqttHandler.jadd @@ -1,4 +1,5 @@ import java.io.IOException; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; aspect MqttHandler { @@ -160,13 +161,17 @@ public class MqttHandler { org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) { // this method is called, whenever a MQTT message is received String topicString = topic.toString(); - java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString); + // TODO: maybe copy list here to avoid concurrent modification. or use a concurrent list. + java.util.List<java.util.function.Consumer<byte[]>> callbackList = new java.util.ArrayList<>(callbacks.get(topicString)); if (callbackList == null || callbackList.isEmpty()) { - logger.debug("Got a message, but no callback to call. Forgot to subscribe?"); + logger.debug("Got a message at {}, but no callback to call. Forgot to subscribe?", topic); } else { byte[] message = body.toByteArray(); + logger.debug("initial: {}", callbackList); for (java.util.function.Consumer<byte[]> callback : callbackList) { + logger.debug("before: {}", callbackList); callback.accept(message); + logger.debug("after: {}", callbackList); } } ack.onSuccess(null); // always acknowledge message diff --git a/ragconnect.base/src/main/resources/RagConnectObserver.jadd b/ragconnect.base/src/main/resources/RagConnectObserver.jadd new file mode 100644 index 0000000000000000000000000000000000000000..3658f7b6211802ec795b0b608770256c83b0f97c --- /dev/null +++ b/ragconnect.base/src/main/resources/RagConnectObserver.jadd @@ -0,0 +1,54 @@ +aspect RagConnectObserver { + class RagConnectObserver implements ASTState.Trace.Receiver { + ASTState.Trace.Receiver oldReceiver; + class RagConnectObserverEntry { + ASTNode node; + String attributeString; + Runnable attributeCall; + RagConnectObserverEntry(ASTNode node, String attributeString, Runnable attributeCall) { + this.node = node; + this.attributeString = attributeString; + this.attributeCall = attributeCall; + } + } + java.util.List<RagConnectObserverEntry> observedNodes = new java.util.ArrayList<>(); + RagConnectObserver(ASTNode node) { + // set the receiver. potentially dangerous because overriding existing receiver! + oldReceiver = node.trace().getReceiver(); + node.trace().setReceiver(this); + } + void add(ASTNode node, String attributeString, Runnable attributeCall) { + System.out.println("** observer add " + node + " on " + attributeString); + observedNodes.add(new RagConnectObserverEntry(node, attributeString, attributeCall)); + } + void remove(ASTNode node, String attributeString, Runnable attributeCall) { + observedNodes.remove(new RagConnectObserverEntry(node, attributeString, attributeCall)); + } + @Override + public void accept(ASTState.Trace.Event event, ASTNode node, String attribute, Object params, Object value) { + oldReceiver.accept(event, node, attribute, params, value); + // ignore all events but INC_FLUSH_ATTR + if (event != ASTState.Trace.Event.INC_FLUSH_ATTR) { + return; + } + System.out.println("** observer check INC_FLUSH_ATTR event"); + // iterate through list, if matching pair. could maybe be more efficient. + for (RagConnectObserverEntry entry : observedNodes) { + if (entry.node.equals(node) && entry.attributeString.equals(attribute)) { + // hit. call the attribute/nta-token + System.out.println("** observer hit " + entry.node + " on " + entry.attributeString); + entry.attributeCall.run(); + } + } + } + } + + private static RagConnectObserver ASTNode._ragConnectObserverInstance; + RagConnectObserver ASTNode._ragConnectObserver() { + if (_ragConnectObserverInstance == null) { + // does not matter, which node is used to create the observer as ASTState/tracing is also static + _ragConnectObserverInstance = new RagConnectObserver(this); + } + return _ragConnectObserverInstance; + } +} diff --git a/ragconnect.base/src/main/resources/sendDefinition.mustache b/ragconnect.base/src/main/resources/sendDefinition.mustache index d382645f4b5ecfde148857e3073bb48e78ebebb2..44db8f73db8c4c1484acbb4adc11a4f7f9251275 100644 --- a/ragconnect.base/src/main/resources/sendDefinition.mustache +++ b/ragconnect.base/src/main/resources/sendDefinition.mustache @@ -37,6 +37,11 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam System.err.println("Unknown protocol '" + scheme + "'."); return false; } + _ragConnectObserver().add(this, "{{parentTypeName}}.get{{tokenName}}()", () -> { + if (this.{{updateMethod}}()) { + this.{{writeMethod}}(); + } + }); return true; }