Skip to content
Snippets Groups Projects
Commit 8dfb50bb authored by René Schöne's avatar René Schöne
Browse files

working on forwarding using implicit NTAs

- use local version of jastadd with fixed flush-inc-attr event
- use changed jastadd to check params of that event (the index of the NTA in this case)
- fixed cleanup of ragconnect-observer
- disconnect now is successful, if one sub-publisher removed the given token
- still todo: finish test, check "check" method
- use packaged jastadd version once it incorporates local changes
parent 790e3f51
No related branches found
No related tags found
1 merge request!24Resolve "Feature: Send enpoint for non-terminal using implicit NTA"
Pipeline #12253 failed
This commit is part of merge request !24. Comments created here will be created in the context of that merge request.
Showing with 127 additions and 44 deletions
No preview for this file type
......@@ -22,7 +22,10 @@ mainClassName = 'org.jastadd.ragconnect.compiler.Compiler'
repositories {
mavenCentral()
jcenter()
maven {
name "gitlab-maven"
url "https://git-st.inf.tu-dresden.de/api/v4/groups/jastadd/-/packages/maven"
}
}
tasks.compileJava {
options.release.set(11)
......@@ -31,8 +34,8 @@ tasks.compileJava {
dependencies {
implementation project(':relast-preprocessor')
implementation group: 'com.github.spullara.mustache.java', name: 'compiler', version: "${mustache_java_version}"
// runtimeOnly group: 'org.jastadd', name: 'jastadd', version: '2.3.5'
runtimeOnly fileTree(include: ['jastadd2.jar'], dir: '../libs')
runtimeOnly group: 'org.jastadd', name: 'jastadd2', version: '2.3.5-dresden'
// runtimeOnly fileTree(include: ['jastadd2.jar'], dir: '../libs')
api group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11'
}
......
......@@ -4,6 +4,10 @@ Design considerations
- no complete intermediate structure, but instead single nodes where applicable/needed
*/
aspect NewStuff {
// unsorted
syn String RagConnect.observerInstanceFieldName() = internalRagConnectPrefix() + "ObserverInstance";
syn String RagConnect.observerInstanceSingletonMethodName() = internalRagConnectPrefix() + "Observer";
syn String RagConnect.observerInstanceResetMethodName() = internalRagConnectPrefix() + "resetObserver";
// send.mustache
syn boolean EndpointDefinition.needForwardingNTA() = getEndpointTarget().needForwardingNTA();
syn String EndpointDefinition.forwardingNTA_Name() = getEndpointTarget().forwardingNTA_Name();
......
......@@ -13,6 +13,8 @@ aspect RagConnectHandler {
{{#Handlers}}
{{#InUse}}{{fieldName}}.close();{{/InUse}}
{{/Handlers}}
trace().setReceiver({{observerInstanceSingletonMethodName}}().oldReceiver);
{{observerInstanceResetMethodName}}();
}
{{#mqttHandler}}
......@@ -73,20 +75,33 @@ aspect RagConnectHandler {
}
boolean remove(RagConnectToken token) {
if (tokenToSender == null) {
System.err.println("Removing sender before first addition for " + token.entityName + " at " + token.uri);
String errorMessage = internal_remove(token);
if (errorMessage == null) {
return true;
} else {
System.err.println(errorMessage);
return false;
}
}
/**
* (internal) Removes the token, returning an error message if there is one.
* @param token the token to be removed
* @return an error message (upon error), or null (upon success)
*/
String internal_remove(RagConnectToken token) {
if (tokenToSender == null) {
return "Removing sender before first addition for " + token.entityName + " at " + token.uri;
}
Runnable sender = tokenToSender.remove(token);
if (sender == null) {
System.err.println("Could not find connected sender for " + token.entityName + " at " + token.uri);
return false;
return "Could not find connected sender for " + token.entityName + " at " + token.uri;
}
boolean success = senders.remove(sender);
if (senders.isEmpty()) {
lastValue = null;
}
return success;
return success ? null : "Could not remove sender for " + token.entityName + " at " + token.uri;
}
void run() {
......@@ -111,9 +126,22 @@ aspect RagConnectHandler {
boolean remove(RagConnectToken token) {
// publishers.forEach((index, publisher) -> publisher.remove(token));
return publishers.values().stream()
.map(publisher -> publisher.remove(token))
.reduce(true, (result, success) -> result && success);
// remove token from each publisher, at least one has to successfully remove the token to make this call a success
boolean result = false;
java.util.List<String> errorMessages = new java.util.ArrayList<>();
for (RagConnectPublisher publisher : publishers.values()) {
String errorMessage = publisher.internal_remove(token);
if (errorMessage == null) {
result = true;
} else {
errorMessages.add(errorMessage);
}
}
if (!result) {
// only print error message, if all publishers failed to remove the token
errorMessages.stream().forEachOrdered(System.err::println);
}
return result;
}
void run(int index) {
......
......@@ -57,12 +57,17 @@ aspect RagConnectObserver {
final RagConnectToken connectToken;
final ASTNode node;
final String attributeString;
final boolean compareParams;
final Object params;
final Runnable attributeCall;
RagConnectObserverEntry(RagConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
RagConnectObserverEntry(RagConnectToken connectToken, ASTNode node, String attributeString,
boolean compareParams, Object params, Runnable attributeCall) {
this.connectToken = connectToken;
this.node = node;
this.attributeString = attributeString;
this.compareParams = compareParams;
this.params = params;
this.attributeCall = attributeCall;
}
}
......@@ -96,11 +101,21 @@ aspect RagConnectObserver {
}
void add(RagConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
internal_add(connectToken, node, attributeString, false, null, attributeCall);
}
void add(RagConnectToken connectToken, ASTNode node, String attributeString, Object params, Runnable attributeCall) {
internal_add(connectToken, node, attributeString, true, params, attributeCall);
}
private void internal_add(RagConnectToken connectToken, ASTNode node, String attributeString,
boolean compareParams, Object params, Runnable attributeCall) {
{{#configLoggingEnabledForIncremental}}
System.out.println("** observer add: " + node + " on " + attributeString);
System.out.println("** observer add: " + node + " on " + attributeString + (compareParams ? " (parameterized)" : ""));
{{/configLoggingEnabledForIncremental}}
observedNodes.add(new RagConnectObserverEntry(connectToken, node, attributeString, attributeCall));
observedNodes.add(new RagConnectObserverEntry(connectToken, node, attributeString,
compareParams, params, attributeCall));
}
void remove(RagConnectToken connectToken) {
observedNodes.removeIf(entry -> entry.connectToken.equals(connectToken));
}
......@@ -127,7 +142,7 @@ aspect RagConnectObserver {
entryQueue.clear();
startEntry = null;
{{#configLoggingEnabledForIncremental}}
System.out.println("** observer process (" + entriesToProcess.length + "): " + node + " on " + attribute);
System.out.println("** observer process (entries: " + entriesToProcess.length + "): " + node + " on " + attribute);
{{/configLoggingEnabledForIncremental}}
for (RagConnectObserverEntry entry : entriesToProcess) {
entry.attributeCall.run();
......@@ -146,7 +161,7 @@ aspect RagConnectObserver {
{{/configLoggingEnabledForIncremental}}
// iterate through list, if matching pair. could maybe be more efficient.
for (RagConnectObserverEntry entry : observedNodes) {
if (entry.node.equals(node) && entry.attributeString.equals(attribute)) {
if (entry.node.equals(node) && entry.attributeString.equals(attribute) && (!entry.compareParams || java.util.Objects.equals(entry.params, params))) {
// hit. call the attribute/nta-token
{{#configLoggingEnabledForIncremental}}
System.out.println("** observer hit: " + entry.node + " on " + entry.attributeString);
......@@ -162,13 +177,16 @@ aspect RagConnectObserver {
}
}
private static RagConnectObserver ASTNode.{{internalRagConnectPrefix}}ObserverInstance;
RagConnectObserver ASTNode.{{internalRagConnectPrefix}}Observer() {
if ({{internalRagConnectPrefix}}ObserverInstance == null) {
private static RagConnectObserver ASTNode.{{observerInstanceFieldName}};
RagConnectObserver ASTNode.{{observerInstanceSingletonMethodName}}() {
if ({{observerInstanceFieldName}} == null) {
// does not matter, which node is used to create the observer as ASTState/tracing is also static
{{internalRagConnectPrefix}}ObserverInstance = new RagConnectObserver(this);
{{observerInstanceFieldName}} = new RagConnectObserver(this);
}
return {{observerInstanceFieldName}};
}
return {{internalRagConnectPrefix}}ObserverInstance;
void ASTNode.{{observerInstanceResetMethodName}}() {
{{observerInstanceFieldName}} = null;
}
}
{{/configIncrementalOptionActive}}
......@@ -42,11 +42,17 @@ public boolean {{parentTypeName}}.{{connectMethodName}}(String {{connectParamete
connectTokenMap.add(this, false, connectToken);
{{#configIncrementalOptionActive}}
{{!todo maybe getterMethodName needs to be change for indexed send}}
{{internalRagConnectPrefix}}Observer().add(connectToken, this, "{{getterMethodName}}", () -> {
{{observerInstanceSingletonMethodName}}().add(
connectToken,
this,
"{{getterMethodName}}{{#IndexBasedListAccess}}_int{{/IndexBasedListAccess}}",
{{#IndexBasedListAccess}}index,{{/IndexBasedListAccess}}
() -> {
if (this.{{updateMethodName}}({{#IndexBasedListAccess}}index{{/IndexBasedListAccess}})) {
this.{{writeMethodName}}({{#IndexBasedListAccess}}index{{/IndexBasedListAccess}});
}
});
}
);
{{/configIncrementalOptionActive}}
}
return success;
......@@ -61,7 +67,7 @@ public boolean {{parentTypeName}}.{{disconnectMethodName}}(String {{connectParam
return false;
}
{{#configIncrementalOptionActive}}
connectTokens.forEach(token -> {{internalRagConnectPrefix}}Observer().remove(token));
connectTokens.forEach(token -> {{observerInstanceSingletonMethodName}}().remove(token));
{{/configIncrementalOptionActive}}
RagConnectDisconnectHandlerMethod disconnectingMethod;
switch (scheme) {
......
......@@ -31,6 +31,10 @@ group = 'de.tudresden.inf.st'
repositories {
mavenCentral()
maven {
name "gitlab-maven"
url "https://git-st.inf.tu-dresden.de/api/v4/groups/jastadd/-/packages/maven"
}
}
tasks.compileTestJava {
options.release.set(11)
......@@ -39,8 +43,8 @@ tasks.compileTestJava {
dependencies {
implementation project(':ragconnect.base')
runtimeOnly group: 'org.jastadd', name: 'jastadd', version: '2.3.5-dresden'
// runtimeOnly fileTree(include: ['jastadd2.jar'], dir: '../libs')
// runtimeOnly group: 'org.jastadd', name: 'jastadd', version: '2.3.5-dresden'
runtimeOnly fileTree(include: ['jastadd2.jar'], dir: '../libs')
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.4.0'
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: '5.4.0'
......
......@@ -12,3 +12,12 @@ aspect MakeCodeWork {
return tree;
}
}
aspect NameResolution {
// overriding customID guarantees to produce the same JSON representation for equal lists
// otherwise, the value for id is different each time
@Override
protected String A.customID() {
return getClass().getSimpleName() + getValue();
}
}
package org.jastadd.ragconnect.tests;
import defaultOnlyRead.ast.MqttHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.*;
import java.io.IOException;
......@@ -16,9 +18,10 @@ public abstract class AbstractMqttTest {
private static boolean checkDone = false;
protected static MqttHandler publisher;
protected Logger logger = LogManager.getLogger(getClass());
@BeforeAll
public static void createPublishAndOnceCheckMqttConnection() {
public static void createPublisherAndCheckMqttConnectionOnce() {
boolean checkResult;
try {
publisher = new MqttHandler("Publisher")
......@@ -45,9 +48,11 @@ public abstract class AbstractMqttTest {
@Tag("mqtt")
@Test
public final void testCommunicateSendInitialValue() throws IOException, InterruptedException {
logger.debug("Start testCommunicateSendInitialValue");
createModel();
setupReceiverAndConnect(true);
logger.debug("Calling communicateSendInitialValue");
communicateSendInitialValue();
}
......@@ -60,9 +65,11 @@ public abstract class AbstractMqttTest {
@Tag("mqtt")
@Test
public final void testCommunicateOnlyUpdatedValue() throws IOException, InterruptedException {
logger.debug("Start testCommunicateOnlyUpdatedValue");
createModel();
setupReceiverAndConnect(false);
logger.debug("Calling communicateOnlyUpdatedValue");
communicateOnlyUpdatedValue();
}
......@@ -80,10 +87,12 @@ public abstract class AbstractMqttTest {
/**
* Begin with this snippet
* <pre>
* {@code
* model.ragconnectSetupMqttWaitUntilReady(2, TimeUnit.SECONDS);
*
* handler = new MqttHandler().dontSendWelcomeMessage().setHost(TestUtils.getMqttHost());
* assertTrue(handler.waitUntilReady(2, TimeUnit.SECONDS));
* }
* </pre>
*
* And then add dependencies, initialise receiver, add connections to those receivers,
......@@ -94,18 +103,21 @@ public abstract class AbstractMqttTest {
@AfterEach
public void alwaysCloseConnections() {
logger.debug("Closing connections");
closeConnections();
}
/**
* Write the following snippet (using your correct handler and model):
* <pre>
* {@code
* if (handler != null) {
* handler.close();
* }
* if (model != null) {
* model.ragconnectCloseConnections();
* }
* }
* </pre>
*/
protected abstract void closeConnections();
......
......@@ -15,7 +15,8 @@ import static org.assertj.core.groups.Tuple.tuple;
import static org.awaitility.Awaitility.await;
import static org.jastadd.ragconnect.tests.TestUtils.mqttUri;
import static org.jastadd.ragconnect.tests.TestUtils.waitForMqtt;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test case "forwarding".
......@@ -38,6 +39,7 @@ public class IndexedSendTest extends AbstractMqttTest {
/** Use initially created members as values in {@link #check} todo link correct method */
private static final String INITIAL_VALUE = "initial" + rand.nextInt(100);
private static final Logger logger = LogManager.getLogger(IndexedSendTest.class);
private MqttHandler handler;
private ReceiverData data;
......@@ -101,7 +103,6 @@ public class IndexedSendTest extends AbstractMqttTest {
@Override
protected void communicateSendInitialValue() throws IOException, InterruptedException {
// TODO check
// Sink.ManyA <-- Root.MultipleA
// Sink.ManyAWithSuffix <-- Root.MultipleAWithSuffix
checkNoWait(4, tuple("am0", "am1"), tuple("am0post", "am1post"));
......@@ -140,7 +141,6 @@ public class IndexedSendTest extends AbstractMqttTest {
// TODO check
assertEquals(listA0.getValue(), senderRoot._ragconnect_MultipleA(0).getValue());
System.out.println("before changing value");
listA0.setValue("changedValue");
assertEquals(listA0.getValue(), senderRoot._ragconnect_MultipleA(0).getValue());
check(1, tuple("changedValue"), tuple());
......@@ -244,16 +244,15 @@ public class IndexedSendTest extends AbstractMqttTest {
private static class ReceiverData {
int numberOfValues = 0;
}
}
class PrintingReceiver implements ASTState.Trace.Receiver {
static class PrintingReceiver implements ASTState.Trace.Receiver {
static Logger logger = LogManager.getLogger(PrintingReceiver.class);
@Override
public void accept(ASTState.Trace.Event event, ASTNode node, String attribute, Object params, Object value) {
public void accept(ASTState.Trace.Event event, @SuppressWarnings("rawtypes") ASTNode node, String attribute, Object params, Object value) {
logger.info("event: {}, node: {}, attribute: {}, params: {}, value: {}",
event, node, attribute, params, value);
}
}
}
......@@ -2,7 +2,7 @@
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level [%t] %logger{20} - %msg%n"/>
<PatternLayout pattern="%highlight{%d{HH:mm:ss.SSS} %-5level [%t] %logger{20} - %msg%n}" disableAnsi="false"/>
</Console>
</Appenders>
<Loggers>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment