diff --git a/ragconnect.base/src/main/resources/MqttHandler.jadd b/ragconnect.base/src/main/resources/MqttHandler.jadd index ab56c0405dd518204378d840d85954e0e27b784f..a22eeec6316fb68fd7498a401e3fd852bd0c15ae 100644 --- a/ragconnect.base/src/main/resources/MqttHandler.jadd +++ b/ragconnect.base/src/main/resources/MqttHandler.jadd @@ -167,7 +167,11 @@ public class MqttHandler { } else { byte[] message = body.toByteArray(); for (java.util.function.Consumer<byte[]> callback : callbackList) { - callback.accept(message); + try { + callback.accept(message); + } catch (Exception e) { + logger.catching(e); + } } } ack.onSuccess(null); // always acknowledge message diff --git a/ragconnect.base/src/main/resources/ragconnect.mustache b/ragconnect.base/src/main/resources/ragconnect.mustache index 2cc2423cc2008ea80f6e614cd0eabea471692cbe..35db18c4335f7f0aafd503a0dcd88148408002a9 100644 --- a/ragconnect.base/src/main/resources/ragconnect.mustache +++ b/ragconnect.base/src/main/resources/ragconnect.mustache @@ -46,13 +46,15 @@ aspect RagConnect { {{#incrementalOptionActive}} aspect RagConnectObserver { + class RagConnectObserver implements ASTState.Trace.Receiver { - ASTState.Trace.Receiver oldReceiver; + class RagConnectObserverEntry { final ConnectToken connectToken; final ASTNode node; final String attributeString; final Runnable attributeCall; + RagConnectObserverEntry(ConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) { this.connectToken = connectToken; this.node = node; @@ -60,12 +62,30 @@ aspect RagConnectObserver { this.attributeCall = attributeCall; } } + + class RagConnectObserverStartEntry { + final ASTNode node; + final String attributeString; + final Object flushIncToken; + RagConnectObserverStartEntry(ASTNode node, String attributeString, Object flushIncToken) { + this.node = node; + this.attributeString = attributeString; + this.flushIncToken = flushIncToken; + } + } + + ASTState.Trace.Receiver oldReceiver; + java.util.List<RagConnectObserverEntry> observedNodes = new java.util.ArrayList<>(); + java.util.Set<RagConnectObserverEntry> entryQueue = new java.util.HashSet<>(); + RagConnectObserverStartEntry startEntry = null; + RagConnectObserver(ASTNode node) { // set the receiver. potentially dangerous because overriding existing receiver! oldReceiver = node.trace().getReceiver(); node.trace().setReceiver(this); } + void add(ConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) { {{#loggingEnabledForWrites}} System.out.println("** observer add: " + node + " on " + attributeString); @@ -78,10 +98,38 @@ aspect RagConnectObserver { @Override public void accept(ASTState.Trace.Event event, ASTNode node, String attribute, Object params, Object value) { oldReceiver.accept(event, node, attribute, params, value); - // ignore all events but INC_FLUSH_ATTR + // react to INC_FLUSH_START and remember entry + if (event == ASTState.Trace.Event.INC_FLUSH_START && startEntry == null) { + {{#loggingEnabledForWrites}} + System.out.println("** observer start: " + node + " on " + attribute); + {{/loggingEnabledForWrites}} + startEntry = new RagConnectObserverStartEntry(node, attribute, value); + return; + } + + // react to INC_FLUSH_END and process queued entries, if it matches start entry + if (event == ASTState.Trace.Event.INC_FLUSH_END && + node == startEntry.node && + attribute == startEntry.attributeString && + value == startEntry.flushIncToken) { + // create a copy of the queue to avoid entering this again causing an endless recursion + RagConnectObserverEntry[] entriesToProcess = entryQueue.toArray(new RagConnectObserverEntry[entryQueue.size()]); + entryQueue.clear(); + startEntry = null; + {{#loggingEnabledForWrites}} + System.out.println("** observer process (" + entriesToProcess.length + "): " + node + " on " + attribute); + {{/loggingEnabledForWrites}} + for (RagConnectObserverEntry entry : entriesToProcess) { + entry.attributeCall.run(); + } + return; + } + + // ignore all other events but INC_FLUSH_ATTR if (event != ASTState.Trace.Event.INC_FLUSH_ATTR) { return; } + {{#loggingEnabledForWrites}} System.out.println("** observer check INC_FLUSH_ATTR event: " + node + " on " + attribute); {{/loggingEnabledForWrites}} @@ -92,7 +140,7 @@ aspect RagConnectObserver { {{#loggingEnabledForWrites}} System.out.println("** observer hit: " + entry.node + " on " + entry.attributeString); {{/loggingEnabledForWrites}} - entry.attributeCall.run(); + entryQueue.add(entry); } } }