Commit 16a91b3e authored by René Schöne's avatar René Schöne
Browse files

first try for incremental eval usage

parent 24062cab
......@@ -78,6 +78,7 @@ public class Compiler extends AbstractCompiler {
printMessage("Writing output files");
final List<String> handlers = new ArrayList<>();
handlers.add("RagConnectObserver.jadd");
if (ASTNode.usesMqtt) {
handlers.add("MqttHandler.jadd");
}
......
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
aspect MqttHandler {
......@@ -160,13 +161,17 @@ public class MqttHandler {
org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
// this method is called, whenever a MQTT message is received
String topicString = topic.toString();
java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString);
// TODO: maybe copy list here to avoid concurrent modification. or use a concurrent list.
java.util.List<java.util.function.Consumer<byte[]>> callbackList = new java.util.ArrayList<>(callbacks.get(topicString));
if (callbackList == null || callbackList.isEmpty()) {
logger.debug("Got a message, but no callback to call. Forgot to subscribe?");
logger.debug("Got a message at {}, but no callback to call. Forgot to subscribe?", topic);
} else {
byte[] message = body.toByteArray();
logger.debug("initial: {}", callbackList);
for (java.util.function.Consumer<byte[]> callback : callbackList) {
logger.debug("before: {}", callbackList);
callback.accept(message);
logger.debug("after: {}", callbackList);
}
}
ack.onSuccess(null); // always acknowledge message
......
aspect RagConnectObserver {
class RagConnectObserver implements ASTState.Trace.Receiver {
ASTState.Trace.Receiver oldReceiver;
class RagConnectObserverEntry {
ASTNode node;
String attributeString;
Runnable attributeCall;
RagConnectObserverEntry(ASTNode node, String attributeString, Runnable attributeCall) {
this.node = node;
this.attributeString = attributeString;
this.attributeCall = attributeCall;
}
}
java.util.List<RagConnectObserverEntry> observedNodes = new java.util.ArrayList<>();
RagConnectObserver(ASTNode node) {
// set the receiver. potentially dangerous because overriding existing receiver!
oldReceiver = node.trace().getReceiver();
node.trace().setReceiver(this);
}
void add(ASTNode node, String attributeString, Runnable attributeCall) {
System.out.println("** observer add " + node + " on " + attributeString);
observedNodes.add(new RagConnectObserverEntry(node, attributeString, attributeCall));
}
void remove(ASTNode node, String attributeString, Runnable attributeCall) {
observedNodes.remove(new RagConnectObserverEntry(node, attributeString, attributeCall));
}
@Override
public void accept(ASTState.Trace.Event event, ASTNode node, String attribute, Object params, Object value) {
oldReceiver.accept(event, node, attribute, params, value);
// ignore all events but INC_FLUSH_ATTR
if (event != ASTState.Trace.Event.INC_FLUSH_ATTR) {
return;
}
System.out.println("** observer check INC_FLUSH_ATTR event");
// iterate through list, if matching pair. could maybe be more efficient.
for (RagConnectObserverEntry entry : observedNodes) {
if (entry.node.equals(node) && entry.attributeString.equals(attribute)) {
// hit. call the attribute/nta-token
System.out.println("** observer hit " + entry.node + " on " + entry.attributeString);
entry.attributeCall.run();
}
}
}
}
private static RagConnectObserver ASTNode._ragConnectObserverInstance;
RagConnectObserver ASTNode._ragConnectObserver() {
if (_ragConnectObserverInstance == null) {
// does not matter, which node is used to create the observer as ASTState/tracing is also static
_ragConnectObserverInstance = new RagConnectObserver(this);
}
return _ragConnectObserverInstance;
}
}
......@@ -37,6 +37,11 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
System.err.println("Unknown protocol '" + scheme + "'.");
return false;
}
_ragConnectObserver().add(this, "{{parentTypeName}}.get{{tokenName}}()", () -> {
if (this.{{updateMethod}}()) {
this.{{writeMethod}}();
}
});
return true;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment