Skip to content
Snippets Groups Projects
Commit 8d587b45 authored by René Schöne's avatar René Schöne
Browse files

Testing incremental dependency tracking.

- use new branch of preprocessor for newest jastadd version
parent 29110428
No related branches found
No related tags found
1 merge request!5Testing incremental dependency tracking.
This commit is part of merge request !5. Comments created here will be created in the context of that merge request.
Showing
with 239 additions and 8 deletions
[submodule "relast-preprocessor"]
path = relast-preprocessor
url = ../relast-preprocessor.git
branch = jastadd-fix-inc-param-debug
[submodule "ragconnect.base/src/main/jastadd/mustache"]
path = ragconnect.base/src/main/jastadd/mustache
url = ../mustache
File added
......@@ -225,6 +225,7 @@ public class Compiler extends AbstractCompiler {
ASTNode.loggingEnabledForWrites = optionLogWrites.value();
// reuse "--incremental" option of JastAdd
ASTNode.incrementalOptionActive = getConfiguration().incremental() && getConfiguration().traceFlush();
printMessage("ASTNode.incrementalOptionActive = " + ASTNode.incrementalOptionActive);
ASTNode.usesMqtt = optionProtocols.hasValue(OPTION_PROTOCOL_MQTT);
ASTNode.usesRest = optionProtocols.hasValue(OPTION_PROTOCOL_REST);
return ragConnect;
......
......@@ -161,17 +161,13 @@ public class MqttHandler {
org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
// this method is called, whenever a MQTT message is received
String topicString = topic.toString();
// TODO: maybe copy list here to avoid concurrent modification. or use a concurrent list.
java.util.List<java.util.function.Consumer<byte[]>> callbackList = new java.util.ArrayList<>(callbacks.get(topicString));
if (callbackList == null || callbackList.isEmpty()) {
logger.debug("Got a message at {}, but no callback to call. Forgot to subscribe?", topic);
} else {
byte[] message = body.toByteArray();
logger.debug("initial: {}", callbackList);
for (java.util.function.Consumer<byte[]> callback : callbackList) {
logger.debug("before: {}", callbackList);
callback.accept(message);
logger.debug("after: {}", callbackList);
}
}
ack.onSuccess(null); // always acknowledge message
......
{{#usesMqtt}}{{> mqtt}}{{/usesMqtt}}
{{> handler}}
aspect ROS2RAG {
aspect RagConnect {
{{#ReceiveDefinitions}}
{{> receiveDefinition}}
{{/ReceiveDefinitions}}
......
......@@ -35,7 +35,9 @@ repositories {
dependencies {
implementation project(':ragconnect.base')
runtime group: 'org.jastadd', name: 'jastadd', version: '2.3.4'
// runtime group: 'org.jastadd', name: 'jastadd', version: '2.3.4'
runtime fileTree(include: ['jastadd2.jar'], dir: '../libs/')
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.4.0'
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.4.0'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.12.1'
......@@ -74,6 +76,7 @@ task allTests(type: Test, dependsOn: testClasses) {
useJUnitPlatform {
includeTags 'mqtt'
// excludeTags '!NewTest'
}
}
......@@ -339,8 +342,46 @@ task compileTutorialTest(type: RelastTest) {
moreInputFiles 'src/test/01-input/tutorial/Test.jadd',
'src/test/02-after-ragconnect/tutorial/MqttHandler.jadd',
'src/test/02-after-ragconnect/tutorial/RagConnect.jadd'
// extraJastAddOptions "--tracing=cache,flush"
}
compileTestJava.dependsOn compileTutorialTest
compileTutorialTest.dependsOn preprocessTutorialTest
// --- Test: incremental ---
task preprocessIncrementalTest(type: JavaExec, group: 'verification') {
doFirst {
delete 'src/test/02-after-ragconnect/incremental/Test.relast',
'src/test/02-after-ragconnect/incremental/MqttHandler.jadd',
'src/test/02-after-ragconnect/incremental/RagConnect.jadd'
}
classpath = sourceSets.main.runtimeClasspath
main = 'org.jastadd.ragconnect.compiler.Compiler'
args '--o=src/test/02-after-ragconnect/incremental',
'src/test/01-input/incremental/Test.relast',
'src/test/01-input/incremental/Test.connect',
'--rootNode=A',
'--tracing=cache,flush',
'--incremental=param',
'--logReads', '--logWrites', '--verbose'
}
task compileIncrementalTest(type: RelastTest) {
useJastAddNames = true
jastAddList = 'JastAddList'
relastFiles 'src/test/02-after-ragconnect/incremental/Test.relast',
'src/test/02-after-ragconnect/incremental/RagConnect.relast'
grammarName = 'src/test/03-after-relast/incremental/incremental'
packageName = 'incremental.ast'
moreInputFiles 'src/test/01-input/incremental/Test.jadd',
'src/test/02-after-ragconnect/incremental/MqttHandler.jadd',
'src/test/02-after-ragconnect/incremental/RagConnect.jadd'
extraJastAddOptions '--tracing=cache,flush',
'--incremental=param',
'--cache=all',
'--rewrite=cnta',
'--flush=full'
}
compileTestJava.dependsOn compileIncrementalTest
compileIncrementalTest.dependsOn preprocessIncrementalTest
# Tutorial
Idea: Test the example from the [documentation](https://jastadd.pages.st.inf.tu-dresden.de/ragconnect/using.html) with activated incremental dependency tracking
// endpoint definitions
receive A.Input ;
send A.OutputOnA ;
send B.OutputOnB using Transformation ;
// mapping definitions
Transformation maps String s to String {:
return s + "Postfix";
:}
// dependency definitions
A.OutputOnA canDependOn A.Input as dependencyA ;
B.OutputOnB canDependOn A.Input as dependencyB ;
aspect Computation {
syn String A.getOutputOnA() = "a" + getInput();
syn String B.getOutputOnB() = "b" + input();
inh String B.input();
eq A.getB().input() = getInput();
}
A ::= <Input:String> /<OutputOnA:String>/ B* ;
B ::= /<OutputOnB:String>/ ;
package org.jastadd.ragconnect.tests;
import incremental.ast.A;
import incremental.ast.B;
import incremental.ast.MqttHandler;
import org.junit.jupiter.api.Tag;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static org.jastadd.ragconnect.tests.TestUtils.mqttUri;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Testcase "Incremental Dependency".
*
* @author rschoene - Initial contribution
*/
@Tag("NewTest")
public class IncrementalDependencyTest extends AbstractMqttTest {
private static final String TOPIC_IN = "in/a";
private static final String TOPIC_OUT_A = "out/a";
private static final String TOPIC_OUT_B1 = "out/b1";
private static final String TOPIC_OUT_B2 = "out/b2";
private MqttHandler handler;
private A model;
private B b1;
private B b2;
private ReceiverData dataA;
private ReceiverData dataB1;
private ReceiverData dataB2;
@Override
protected void createModel() {
model = new A();
model.setInput("Start");
b1 = new B();
b2 = new B();
model.addB(b1);
model.addB(b2);
}
@Override
protected void setupReceiverAndConnect(boolean writeCurrentValue) throws IOException {
model.ragconnectSetupMqttWaitUntilReady(2, TimeUnit.SECONDS);
handler = new MqttHandler("TestHandler")
.dontSendWelcomeMessage()
.setHost(TestUtils.getMqttHost());
assertTrue(handler.waitUntilReady(2, TimeUnit.SECONDS));
// no dependencies for the model are set here
dataA = new ReceiverData();
dataB1 = new ReceiverData();
dataB2 = new ReceiverData();
handler.newConnection(TOPIC_OUT_A, bytes -> {
dataA.numberOfStringValues += 1;
dataA.lastStringValue = new String(bytes);
});
handler.newConnection(TOPIC_OUT_B1, bytes -> {
dataB1.numberOfStringValues += 1;
dataB1.lastStringValue = new String(bytes);
});
handler.newConnection(TOPIC_OUT_B2, bytes -> {
dataB2.numberOfStringValues += 1;
dataB2.lastStringValue = new String(bytes);
});
model.connectInput(mqttUri(TOPIC_IN));
model.connectOutputOnA(mqttUri(TOPIC_OUT_A), writeCurrentValue);
b1.connectOutputOnB(mqttUri(TOPIC_OUT_B1), writeCurrentValue);
b2.connectOutputOnB(mqttUri(TOPIC_OUT_B2), writeCurrentValue);
}
@Override
protected void communicateSendInitialValue() throws InterruptedException {
// check initial value
TestUtils.waitForMqtt();
checkData(1, "aStart",
"bStartPostfix",
"bStartPostfix");
// send and check new value
sendData("101");
checkData(2, "a101",
"b101Postfix",
"b101Postfix");
// send and check same value
sendData("101");
checkData(2, "a101",
"b101Postfix",
"b101Postfix");
// send and check new value
sendData("201");
checkData(3, "a201",
"b201Postfix",
"b201Postfix");
}
@Override
protected void communicateOnlyUpdatedValue() throws InterruptedException {
// check initial value
TestUtils.waitForMqtt();
checkData(0, null,
null,
null);
// send and check new value
sendData("102");
checkData(1, "a102",
"b102Postfix",
"b102Postfix");
// send and check same value
sendData("102");
checkData(1, "a102",
"b102Postfix",
"b102Postfix");
// send and check new value
sendData("202");
checkData(2, "a202",
"b202Postfix",
"b202Postfix");
}
@Override
protected void closeConnections() {
if (handler != null) {
handler.close();
}
if (model != null) {
model.ragconnectCloseConnections();
}
}
private void sendData(String input) throws InterruptedException {
handler.publish(TOPIC_IN, input.getBytes());
TestUtils.waitForMqtt();
}
private void checkData(int expectedNumberOfValues, String expectedLastAValue,
String expectedLastB1Value, String expectedLastB2Value) {
dataA.assertEqualData(expectedNumberOfValues, expectedLastAValue);
dataB1.assertEqualData(expectedNumberOfValues, expectedLastB1Value);
dataB2.assertEqualData(expectedNumberOfValues, expectedLastB2Value);
}
private static class ReceiverData {
String lastStringValue;
int numberOfStringValues = 0;
public void assertEqualData(int expectedNumberOfValues, String expectedLastValue) {
assertEquals(expectedNumberOfValues, this.numberOfStringValues);
assertEquals(expectedLastValue, this.lastStringValue);
}
}
}
Subproject commit c00441c03dc6723a08de0fcb041254a99497774f
Subproject commit b538a7f709167c5f56fe65e6d9e9f02179cacaef
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment