Skip to content
Snippets Groups Projects
Commit 27cf1820 authored by René Schöne's avatar René Schöne
Browse files

working on attributes as endpoint target

- observer-entry now has list of connect-tokens and attributeCall is invoked once when attribute is flushed
- INC_FLUSH_START and INC_FLUSH_END can now be nested
- finished AttributeTest
parent 2a281c3e
No related branches found
No related tags found
1 merge request!25Resolve "Feature: Send endpoint for attributes"
Pipeline #12541 failed
This commit is part of merge request !25. Comments created here will be created in the context of that merge request.
......@@ -87,22 +87,33 @@ aspect RagConnectObserver {
class RagConnectObserver implements ASTState.Trace.Receiver {
class RagConnectObserverEntry {
final RagConnectToken connectToken;
final ASTNode node;
final String attributeString;
final boolean compareParams;
final Object params;
final Runnable attributeCall;
final java.util.List<RagConnectToken> connectList = new java.util.ArrayList<>();
RagConnectObserverEntry(RagConnectToken connectToken, ASTNode node, String attributeString,
RagConnectObserverEntry(ASTNode node, String attributeString,
boolean compareParams, Object params, Runnable attributeCall) {
this.connectToken = connectToken;
this.node = node;
this.attributeString = attributeString;
this.compareParams = compareParams;
this.params = params;
this.attributeCall = attributeCall;
}
boolean baseMembersEqualTo(RagConnectObserverEntry other) {
return baseMembersEqualTo(other.node, other.attributeString, other.compareParams, other.params);
}
boolean baseMembersEqualTo(ASTNode otherNode, String otherAttributeString,
boolean otherCompareParams, Object otherParams) {
return this.node.equals(otherNode) &&
this.attributeString.equals(otherAttributeString) &&
this.compareParams == otherCompareParams &&
(!this.compareParams || java.util.Objects.equals(this.params, otherParams));
}
}
{{#configExperimentalJastAdd329}}
......@@ -124,7 +135,7 @@ aspect RagConnectObserver {
{{#configExperimentalJastAdd329}}
java.util.Set<RagConnectObserverEntry> entryQueue = new java.util.HashSet<>();
RagConnectObserverStartEntry startEntry = null;
java.util.Deque<RagConnectObserverStartEntry> startEntries = new java.util.LinkedList<>();
{{/configExperimentalJastAdd329}}
RagConnectObserver(ASTNode node) {
......@@ -145,35 +156,64 @@ aspect RagConnectObserver {
{{#configLoggingEnabledForIncremental}}
System.out.println("** observer add: " + node + " on " + attributeString + (compareParams ? " (parameterized)" : ""));
{{/configLoggingEnabledForIncremental}}
observedNodes.add(new RagConnectObserverEntry(connectToken, node, attributeString,
compareParams, params, attributeCall));
// either add to an existing entry (with same node, attribute) or create new entry
boolean needNewEntry = true;
for (RagConnectObserverEntry entry : observedNodes) {
if (entry.baseMembersEqualTo(node, attributeString, compareParams, params)) {
entry.connectList.add(connectToken);
needNewEntry = false;
break;
}
}
if (needNewEntry) {
RagConnectObserverEntry newEntry = new RagConnectObserverEntry(node, attributeString,
compareParams, params, attributeCall);
newEntry.connectList.add(connectToken);
observedNodes.add(newEntry);
}
}
void remove(RagConnectToken connectToken) {
observedNodes.removeIf(entry -> entry.connectToken.equals(connectToken));
RagConnectObserverEntry entryToDelete = null;
for (RagConnectObserverEntry entry : observedNodes) {
entry.connectList.remove(connectToken);
if (entry.connectList.isEmpty()) {
entryToDelete = entry;
}
}
if (entryToDelete != null) {
observedNodes.remove(entryToDelete);
}
}
@Override
public void accept(ASTState.Trace.Event event, ASTNode node, String attribute, Object params, Object value) {
oldReceiver.accept(event, node, attribute, params, value);
{{#configExperimentalJastAdd329}}
// react to INC_FLUSH_START and remember entry
if (event == ASTState.Trace.Event.INC_FLUSH_START && startEntry == null) {
if (event == ASTState.Trace.Event.INC_FLUSH_START && startEntries.isEmpty()) {
{{#configLoggingEnabledForIncremental}}
System.out.println("** observer start: " + node + " on " + attribute);
{{/configLoggingEnabledForIncremental}}
startEntry = new RagConnectObserverStartEntry(node, attribute, value);
startEntries.addFirst(new RagConnectObserverStartEntry(node, attribute, value));
return;
}
// react to INC_FLUSH_END and process queued entries, if it matches start entry
if (event == ASTState.Trace.Event.INC_FLUSH_END &&
node == startEntry.node &&
if (event == ASTState.Trace.Event.INC_FLUSH_END) {
if (startEntries.isEmpty()) {
{{#configLoggingEnabledForIncremental}}
System.out.println("** observer end without start! for " + node + " on " + attribute);
{{/configLoggingEnabledForIncremental}}
return;
}
RagConnectObserverStartEntry startEntry = startEntries.removeFirst();
if (node == startEntry.node &&
attribute == startEntry.attributeString &&
value == startEntry.flushIncToken) {
// create a copy of the queue to avoid entering this again causing an endless recursion
RagConnectObserverEntry[] entriesToProcess = entryQueue.toArray(new RagConnectObserverEntry[entryQueue.size()]);
entryQueue.clear();
startEntry = null;
{{#configLoggingEnabledForIncremental}}
System.out.println("** observer process (entries: " + entriesToProcess.length + "): " + node + " on " + attribute);
{{/configLoggingEnabledForIncremental}}
......@@ -182,6 +222,7 @@ aspect RagConnectObserver {
}
return;
}
}
{{/configExperimentalJastAdd329}}
// ignore all other events but INC_FLUSH_ATTR
......
......@@ -49,7 +49,7 @@ public boolean {{parentTypeName}}.{{connectMethodName}}(String {{connectParamete
{{#IndexBasedListAccess}}index,{{/IndexBasedListAccess}}
() -> {
if (this.{{updateMethodName}}({{#IndexBasedListAccess}}index{{/IndexBasedListAccess}})) {
this.{{writeMethodName}}({{#IndexBasedListAccess}}index, {{/IndexBasedListAccess}}connectToken);
this.{{writeMethodName}}({{#IndexBasedListAccess}}index{{/IndexBasedListAccess}});
}
}
);
......@@ -106,6 +106,10 @@ protected boolean {{parentTypeName}}.{{updateMethodName}}({{#IndexBasedListAcces
return {{senderName}} != null;
}
protected void {{parentTypeName}}.{{writeMethodName}}({{#IndexBasedListAccess}}int index{{/IndexBasedListAccess}}) {
{{senderName}}.run({{#IndexBasedListAccess}}index{{/IndexBasedListAccess}});
}
protected void {{parentTypeName}}.{{writeMethodName}}({{#IndexBasedListAccess}}int index, {{/IndexBasedListAccess}}RagConnectToken token) {
{{senderName}}.run({{#IndexBasedListAccess}}index, {{/IndexBasedListAccess}}token);
}
......
......@@ -50,7 +50,6 @@ dependencies {
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.4.0'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.12.1'
testImplementation group: 'org.awaitility', name: 'awaitility', version: '4.1.1'
testImplementation group: 'de.tudresden.inf.st', name: 'dumpAst', version: '0.3.5'
// jackson (for serialization of types)
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.12.1'
......@@ -371,7 +370,6 @@ task compileTreeIncremental(type: RagConnectTest) {
inputFiles = [file('src/test/01-input/tree/Test.relast'),
file('src/test/01-input/tree/Test.connect')]
rootNode = 'Root'
logWrites = true
}
relast {
useJastAddNames = true
......@@ -414,7 +412,6 @@ task compileTreeAllowedTokensIncremental(type: RagConnectTest) {
inputFiles = [file('src/test/01-input/treeAllowedTokens/Test.relast'),
file('src/test/01-input/treeAllowedTokens/Test.connect')]
rootNode = 'Root'
logWrites = true
}
relast {
useJastAddNames = true
......@@ -605,9 +602,6 @@ task compileIndexedSendIncremental(type: RagConnectTest, dependsOn: ':ragconnect
inputFiles = [file('src/test/01-input/indexedSend/Test.relast'),
file('src/test/01-input/indexedSend/Test.connect')]
rootNode = 'Root'
logWrites = true
logReads = true
logIncremental = true
extraOptions = ['--experimental-jastadd-329']
}
relast {
......@@ -630,9 +624,6 @@ task compileAttributeIncremental(type: RagConnectTest, dependsOn: ':ragconnect.b
inputFiles = [file('src/test/01-input/attribute/Test.relast'),
file('src/test/01-input/attribute/Test.connect')]
rootNode = 'Root'
logWrites = true
logReads = true
logIncremental = true
extraOptions = ['--experimental-jastadd-329']
}
relast {
......@@ -647,4 +638,3 @@ task compileAttributeIncremental(type: RagConnectTest, dependsOn: ':ragconnect.b
extraOptions = JASTADD_INCREMENTAL_OPTIONS_TRACING_FULL
}
}
compileAttributeIncremental.outputs.upToDateWhen { false }
aspect Computation {
syn String SenderRoot.basic() = getInput();
syn String SenderRoot.simple() = getInput() + "Post";
syn int SenderRoot.transformed() = getInput().length();
syn int SenderRoot.transformed() = Integer.parseInt(getInput());
syn A SenderRoot.toReferenceType() {
A result = new A();
result.setValue(getInput());
......
package org.jastadd.ragconnect.tests;
import attributeInc.ast.*;
import de.tudresden.inf.st.jastadd.dumpAst.ast.Dumper;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Tag;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static java.util.function.Predicate.isEqual;
import static org.awaitility.Awaitility.await;
import static org.jastadd.ragconnect.tests.TestUtils.mqttUri;
import static org.jastadd.ragconnect.tests.TestUtils.waitForMqtt;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
......@@ -39,6 +38,7 @@ public class AttributeTest extends AbstractMqttTest {
private static final String TOPIC_NTA_WITH_MAPPING = "attr/a/nta/mapped";
private static final String INITIAL_STRING = "initial";
private static final String INITIAL_STRING_FOR_INT = "1";
private MqttHandler handler;
private ReceiverData data;
......@@ -54,7 +54,7 @@ public class AttributeTest extends AbstractMqttTest {
model = new Root();
// model.trace().setReceiver(TestUtils::logEvent);
senderString = new SenderRoot().setInput(INITIAL_STRING);
senderInt = new SenderRoot().setInput(INITIAL_STRING);
senderInt = new SenderRoot().setInput(INITIAL_STRING_FOR_INT);
senderA = new SenderRoot().setInput(INITIAL_STRING);
receiverRoot = new ReceiverRoot();
model.addSenderRoot(senderString);
......@@ -98,6 +98,7 @@ public class AttributeTest extends AbstractMqttTest {
waitForValue(senderString.basic(), receiverRoot::getFromBasic);
waitForValue(senderString.simple(), receiverRoot::getFromSimpleNoMapping);
waitForValue(senderInt.transformed(), receiverRoot::getFromTransformedNoMapping);
waitForNonNull(receiverRoot::getFromReferenceTypeNoMapping);
waitForNonNull(receiverRoot::getFromNTANoMapping);
}
......@@ -116,60 +117,130 @@ public class AttributeTest extends AbstractMqttTest {
@Override
protected void communicateSendInitialValue() throws IOException, InterruptedException {
// basic, simple <-- senderString
// transformed <-- senderInt
// ref-type, nta <-- senderA
check(9, INITIAL_STRING, INITIAL_STRING + "Post", INITIAL_STRING, INITIAL_STRING, INITIAL_STRING);
// basic, simple(2) <-- senderString
// transformed(2) <-- senderInt
// ref-type(2), nta(2) <-- senderA
check(9, INITIAL_STRING, INITIAL_STRING + "Post", INITIAL_STRING_FOR_INT, INITIAL_STRING, INITIAL_STRING);
senderString.setInput("test-01");
check(12, "test-01", "test-01Post", INITIAL_STRING, INITIAL_STRING, INITIAL_STRING);
check(12, "test-01", "test-01Post", INITIAL_STRING_FOR_INT, INITIAL_STRING, INITIAL_STRING);
senderString.setInput("test-01");
check(12, "test-01", "test-01Post", INITIAL_STRING_FOR_INT, INITIAL_STRING, INITIAL_STRING);
senderInt.setInput("20");
check(14, "test-01", "test-01Post", "20", INITIAL_STRING, INITIAL_STRING);
senderA.setInput("test-03");
check(18, "test-01", "test-01Post", "20", "test-03", "test-03");
assertTrue(senderString.disconnectSimple(mqttUri(TOPIC_SIMPLE_NO_MAPPING)));
assertTrue(senderString.disconnectSimple(mqttUri(TOPIC_SIMPLE_WITH_MAPPING)));
senderString.setInput("test-04");
check(19, "test-04", "test-01Post", "20", "test-03", "test-03");
assertTrue(senderA.disconnectToNTA(mqttUri(TOPIC_NTA_NO_MAPPING)));
senderA.setInput("test-05");
check(22, "test-04", "test-01Post", "20", "test-05", "test-03");
}
@Override
protected void communicateOnlyUpdatedValue() throws IOException, InterruptedException {
waitForMqtt();
// basic, simple(2) <-- senderString
// transformed(2) <-- senderInt
// ref-type(2), nta(2) <-- senderA
check(0, null, null, null, null, null);
senderString.setInput("test-01");
check(3, "test-01", "test-01Post", null, null, null);
senderString.setInput("test-01");
check(3, "test-01", "test-01Post", null, null, null);
senderInt.setInput("20");
check(5, "test-01", "test-01Post", "20", null, null);
senderA.setInput("test-03");
check(9, "test-01", "test-01Post", "20", "test-03", "test-03");
assertTrue(senderString.disconnectSimple(mqttUri(TOPIC_SIMPLE_NO_MAPPING)));
assertTrue(senderString.disconnectSimple(mqttUri(TOPIC_SIMPLE_WITH_MAPPING)));
senderString.setInput("test-04");
check(10, "test-04", "test-01Post", "20", "test-03", "test-03");
assertTrue(senderA.disconnectToNTA(mqttUri(TOPIC_NTA_NO_MAPPING)));
senderA.setInput("test-05");
check(13, "test-04", "test-01Post", "20", "test-05", "test-03");
}
private void check(int numberOfValues, String basic, String simple, String transformed,
String refType, String nta) {
String a, String ntaNoMapping) {
awaitEquals(numberOfValues, () -> data.numberOfValues, "numberOfValues");
try {
Path tempFile = Files.createTempFile("receiverRoot", "yml");
Dumper.read(receiverRoot).dumpAsYaml(tempFile, false);
String content = Files.readString(tempFile);
logger.debug("receiverRoot\n" + content);
} catch (IOException e) {
e.printStackTrace();
awaitEquals(Objects.requireNonNullElse(basic, ""),
receiverRoot::getFromBasic, "basic");
if (simple != null) {
awaitEquals(simple,
receiverRoot::getFromSimpleNoMapping, "simple");
awaitEquals(simple + "post",
receiverRoot::getFromSimpleWithMapping, "simple mapped");
} else {
awaitEquals("",
receiverRoot::getFromSimpleNoMapping, "simple null");
awaitEquals("",
receiverRoot::getFromSimpleWithMapping, "simple mapped null");
}
awaitEquals(basic, receiverRoot::getFromBasic, "basic");
awaitEquals(simple, receiverRoot::getFromSimpleNoMapping, "simple");
awaitEquals(simple + "post", receiverRoot::getFromSimpleWithMapping, "simple mapped");
int transformedLength = transformed.length();
awaitEquals(transformedLength, receiverRoot::getFromTransformedNoMapping, "transformed");
awaitEquals(transformedLength + 1, receiverRoot::getFromTransformedWithMapping, "transformed mapped");
if (transformed != null) {
awaitEquals(Integer.parseInt(transformed),
receiverRoot::getFromTransformedNoMapping, "transformed");
awaitEquals(Integer.parseInt(transformed) + 1,
receiverRoot::getFromTransformedWithMapping, "transformed mapped");
} else {
awaitEquals(0,
receiverRoot::getFromTransformedNoMapping, "transformed null");
awaitEquals(0,
receiverRoot::getFromTransformedWithMapping, "transformed mapped null");
}
checkA(refType, "1",
if (a != null) {
awaitA(a, "1",
receiverRoot.getFromReferenceTypeNoMapping(), "ref-type");
checkA(refType + "post", "inner1",
awaitA(a + "post", "inner1",
receiverRoot.getFromReferenceTypeWithMapping(), "ref-type mapped");
awaitA(a + "post", "inner2",
receiverRoot.getFromNTAWithMapping(), "nta mapped");
} else {
awaitNull(receiverRoot::getFromReferenceTypeNoMapping, "manual ref-type null");
awaitNull(receiverRoot::getFromReferenceTypeWithMapping, "ref-type mapped null");
awaitNull(receiverRoot::getFromNTAWithMapping, "nta mapped null");
}
checkA(nta, "2",
if (ntaNoMapping != null) {
awaitA(ntaNoMapping, "2",
receiverRoot.getFromNTANoMapping(), "nta");
checkA(nta + "post", "inner2",
receiverRoot.getFromNTAWithMapping(), "nta mapped");
} else {
awaitNull(receiverRoot::getFromNTANoMapping, "nta null");
}
}
private void checkA(String expectedValue, String expectedInner, A actual, String message) {
private void awaitNull(Supplier<A> actual, String alias) {
internalAwait(alias).until(() -> actual.get() == null);
}
private <T> void awaitEquals(T expected, Callable<T> actual, String alias) {
internalAwait(alias).until(actual, isEqual(expected));
}
private void awaitA(String expectedValue, String expectedInner, A actual, String message) {
awaitEquals(expectedValue, actual::getValue, message + " value");
awaitEquals(expectedInner, actual.getInner()::getInnerValue, message + " inner");
}
private <T> void awaitEquals(T expected, Callable<T> actual, String alias) {
await(alias).atMost(1500, TimeUnit.MILLISECONDS).until(actual, isEqual(expected));
private ConditionFactory internalAwait(String alias) {
return await(alias).atMost(1500, TimeUnit.MILLISECONDS);
}
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment