Commit ae4848c1 authored by René Schöne's avatar René Schöne
Browse files

make use of new local jastadd version

parent b609430f
......@@ -167,7 +167,11 @@ public class MqttHandler {
} else {
byte[] message = body.toByteArray();
for (java.util.function.Consumer<byte[]> callback : callbackList) {
callback.accept(message);
try {
callback.accept(message);
} catch (Exception e) {
logger.catching(e);
}
}
}
ack.onSuccess(null); // always acknowledge message
......
......@@ -46,13 +46,15 @@ aspect RagConnect {
{{#incrementalOptionActive}}
aspect RagConnectObserver {
class RagConnectObserver implements ASTState.Trace.Receiver {
ASTState.Trace.Receiver oldReceiver;
class RagConnectObserverEntry {
final ConnectToken connectToken;
final ASTNode node;
final String attributeString;
final Runnable attributeCall;
RagConnectObserverEntry(ConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
this.connectToken = connectToken;
this.node = node;
......@@ -60,12 +62,30 @@ aspect RagConnectObserver {
this.attributeCall = attributeCall;
}
}
class RagConnectObserverStartEntry {
final ASTNode node;
final String attributeString;
final Object flushIncToken;
RagConnectObserverStartEntry(ASTNode node, String attributeString, Object flushIncToken) {
this.node = node;
this.attributeString = attributeString;
this.flushIncToken = flushIncToken;
}
}
ASTState.Trace.Receiver oldReceiver;
java.util.List<RagConnectObserverEntry> observedNodes = new java.util.ArrayList<>();
java.util.Set<RagConnectObserverEntry> entryQueue = new java.util.HashSet<>();
RagConnectObserverStartEntry startEntry = null;
RagConnectObserver(ASTNode node) {
// set the receiver. potentially dangerous because overriding existing receiver!
oldReceiver = node.trace().getReceiver();
node.trace().setReceiver(this);
}
void add(ConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
{{#loggingEnabledForWrites}}
System.out.println("** observer add: " + node + " on " + attributeString);
......@@ -78,10 +98,38 @@ aspect RagConnectObserver {
@Override
public void accept(ASTState.Trace.Event event, ASTNode node, String attribute, Object params, Object value) {
oldReceiver.accept(event, node, attribute, params, value);
// ignore all events but INC_FLUSH_ATTR
// react to INC_FLUSH_START and remember entry
if (event == ASTState.Trace.Event.INC_FLUSH_START && startEntry == null) {
{{#loggingEnabledForWrites}}
System.out.println("** observer start: " + node + " on " + attribute);
{{/loggingEnabledForWrites}}
startEntry = new RagConnectObserverStartEntry(node, attribute, value);
return;
}
// react to INC_FLUSH_END and process queued entries, if it matches start entry
if (event == ASTState.Trace.Event.INC_FLUSH_END &&
node == startEntry.node &&
attribute == startEntry.attributeString &&
value == startEntry.flushIncToken) {
// create a copy of the queue to avoid entering this again causing an endless recursion
RagConnectObserverEntry[] entriesToProcess = entryQueue.toArray(new RagConnectObserverEntry[entryQueue.size()]);
entryQueue.clear();
startEntry = null;
{{#loggingEnabledForWrites}}
System.out.println("** observer process (" + entriesToProcess.length + "): " + node + " on " + attribute);
{{/loggingEnabledForWrites}}
for (RagConnectObserverEntry entry : entriesToProcess) {
entry.attributeCall.run();
}
return;
}
// ignore all other events but INC_FLUSH_ATTR
if (event != ASTState.Trace.Event.INC_FLUSH_ATTR) {
return;
}
{{#loggingEnabledForWrites}}
System.out.println("** observer check INC_FLUSH_ATTR event: " + node + " on " + attribute);
{{/loggingEnabledForWrites}}
......@@ -92,7 +140,7 @@ aspect RagConnectObserver {
{{#loggingEnabledForWrites}}
System.out.println("** observer hit: " + entry.node + " on " + entry.attributeString);
{{/loggingEnabledForWrites}}
entry.attributeCall.run();
entryQueue.add(entry);
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment