Commit 729568de authored by René Schöne's avatar René Schöne
Browse files

WIP: working on correct connect and disconnect

- introduced new types to encapsulate behaviour: RagConnectTokenMap, RagConnectPublisher
- move lastValue (of sendDefinitions) into new publisher
- create connectToken outside of handlers
- MqttHandler: make wildcardPatterns a list
- RestHandler: update dependency requirement to Spark 3.9.3 in order to unmap existing routes upon disconnect
- testing disconnect functionality in (most) existing tests
- still sometimes (!) getting "Top of handler stack does not match at pop!" Error during connect
parent d79785b9
Pipeline #10648 failed with stages
in 4 minutes and 22 seconds
......@@ -125,6 +125,6 @@ RagConnect itself does not introduce dependencies.
However, depending on the selected protocols (see [compiler options](using#compiler-options)), additional dependencies are required.
| Protocol | Dependency (Gradle format) | Remarks |
|-|-|-|
|---|---|---|
| `mqtt` | `group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15'` | Mqtt is selected by default, so this dependency therefore is required "by default". Might work with other versions as well. |
| `rest` | `group: 'com.sparkjava', name: 'spark-core', version: '2.9.2'` | Might work with other versions as well. For debugging, it is beneficial to include an implementation for [SLF4J](http://www.slf4j.org/). |
| `rest` | `group: 'com.sparkjava', name: 'spark-core', version: '2.9.3'` | Might work with newer versions as well. For debugging, it is beneficial to include an implementation for [SLF4J](http://www.slf4j.org/). |
......@@ -64,7 +64,7 @@ aspect AttributesForMustache {
syn boolean MEndpointDefinition.isTypeEndpointDefinition() = endpointDef().isTypeEndpointDefinition();
syn String MEndpointDefinition.disconnectMethod() {
// if both (send and receive) are defined for the token, ensure methods with different names
// if both (send and receive) are defined for an endpoint, ensure methods with different names
String extra;
if (endpointDef().isTokenEndpointDefinition()) {
extra = endpointDef().asTokenEndpointDefinition().lookupTokenEndpointDefinitions(token()).size() > 1 ? uniqueSuffix() : "";
......@@ -114,6 +114,8 @@ aspect AttributesForMustache {
}
return preemptiveExpectedValue() + " != null ? " + preemptiveExpectedValue() + ".equals(" + lastResult() + ") : " + lastResult() + " == null";
}
syn String MEndpointDefinition.sender() = null; // only for M*SendDefinitions
syn String MEndpointDefinition.lastValue() = sender() + ".lastValue"; // only for M*SendDefinitions
// --- MTokenEndpointDefinition ---
eq MTokenEndpointDefinition.getterMethod() = "get" + tokenName();
......@@ -151,8 +153,7 @@ aspect AttributesForMustache {
eq MTokenSendDefinition.updateMethod() = "_update_" + tokenName();
eq MTokenSendDefinition.writeMethod() = "_writeLastValue_" + tokenName();
syn String MTokenSendDefinition.sender() = "_sender_" + tokenName();
syn String MTokenSendDefinition.lastValue() = "_lastValue" + tokenName();
eq MTokenSendDefinition.sender() = "_sender_" + tokenName();
syn String MTokenSendDefinition.tokenResetMethod() = getterMethod() + "_reset";
syn boolean MTokenSendDefinition.shouldSendValue() = endpointDef().asTokenEndpointDefinition().shouldSendValue();
......@@ -175,8 +176,7 @@ aspect AttributesForMustache {
eq MTypeSendDefinition.updateMethod() = "_update_" + typeName();
eq MTypeSendDefinition.writeMethod() = "_writeLastValue_" + typeName();
syn String MTypeSendDefinition.sender() = "_sender_" + typeName();
syn String MTypeSendDefinition.lastValue() = "_lastValue" + typeName();
eq MTypeSendDefinition.sender() = "_sender_" + typeName();
syn String MTypeSendDefinition.tokenResetMethod() = getterMethod() + "_reset";
syn boolean MTypeSendDefinition.shouldSendValue() = endpointDef().asTypeEndpointDefinition().shouldSendValue();
......
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;aspect MqttHandler {
aspect MqttHandler {
public class MqttServerHandler {
private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, java.util.function.BiConsumer<String, byte[]>> tokensForRemoval = new java.util.HashMap<>();
private final java.util.Map<RagConnectToken, java.util.function.BiConsumer<String, byte[]>> tokensForRemoval = new java.util.HashMap<>();
private long time;
private java.util.concurrent.TimeUnit unit;
private String name;
......@@ -39,14 +36,12 @@ public class MqttServerHandler {
return handler;
}
public ConnectToken newConnection(java.net.URI uri, java.util.function.BiConsumer<String, byte[]> callback) throws java.io.IOException {
ConnectToken connectToken = new ConnectToken(uri);
resolveHandler(uri).newConnection(extractTopic(uri), callback);
public boolean newConnection(RagConnectToken connectToken, java.util.function.BiConsumer<String, byte[]> callback) throws java.io.IOException {
tokensForRemoval.put(connectToken, callback);
return connectToken;
return resolveHandler(connectToken.uri).newConnection(extractTopic(connectToken.uri), callback);
}
public boolean disconnect(ConnectToken connectToken) throws java.io.IOException {
public boolean disconnect(RagConnectToken connectToken) throws java.io.IOException {
MqttHandler handler = resolveHandler(connectToken.uri);
return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false;
}
......@@ -89,6 +84,10 @@ public class MqttServerHandler {
* @author rschoene - Initial contribution
*/
public class MqttHandler {
private class PatternCallbackListPair {
java.util.regex.Pattern pattern;
java.util.List<java.util.function.BiConsumer<String, byte[]>> callbacks;
}
private static final int DEFAULT_PORT = 1883;
private final org.apache.logging.log4j.Logger logger;
......@@ -104,7 +103,7 @@ public class MqttHandler {
private org.fusesource.mqtt.client.QoS qos;
/** Dispatch knowledge */
private final java.util.Map<String, java.util.List<java.util.function.BiConsumer<String, byte[]>>> normalCallbacks;
private final java.util.Map<java.util.regex.Pattern, java.util.List<java.util.function.BiConsumer<String, byte[]>>> wildcardCallbacks;
private final java.util.List<PatternCallbackListPair> wildcardCallbacks;
public MqttHandler() {
this("RagConnect");
......@@ -114,7 +113,7 @@ public class MqttHandler {
this.name = java.util.Objects.requireNonNull(name, "Name must be set");
this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class);
this.normalCallbacks = new java.util.HashMap<>();
this.wildcardCallbacks = new java.util.HashMap<>();
this.wildcardCallbacks = new java.util.ArrayList<>();
this.readyLatch = new java.util.concurrent.CountDownLatch(1);
this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
}
......@@ -244,9 +243,9 @@ public class MqttHandler {
if (normalCallbackList != null) {
result.addAll(normalCallbackList);
}
wildcardCallbacks.forEach((topicPattern, callback) -> {
if (topicPattern.matcher(topicString).matches()) {
result.addAll(callback);
wildcardCallbacks.forEach(pair -> {
if (pair.pattern.matcher(topicString).matches()) {
result.addAll(pair.callbacks);
}
});
return result;
......@@ -295,20 +294,29 @@ public class MqttHandler {
logger.debug("new connection for {}", topic);
final boolean needSubscribe;
if (isWildcardTopic(topic)) {
String regexForTopic = topic.replace("*", "[^/]*").replace("#", ".*");
java.util.regex.Pattern pattern = java.util.regex.Pattern.compile(regexForTopic);
wildcardCallbacks.computeIfAbsent(pattern, p -> new java.util.ArrayList<>())
.add(callback);
needSubscribe = true;
String regex = regexForWildcardTopic(topic);
PatternCallbackListPair pairToAddTo = null;
for (PatternCallbackListPair pair : wildcardCallbacks) {
if (pair.pattern.pattern().equals(regex)) {
pairToAddTo = pair;
break;
}
}
if (pairToAddTo == null) {
pairToAddTo = new PatternCallbackListPair();
pairToAddTo.pattern = java.util.regex.Pattern.compile(regex);
pairToAddTo.callbacks = new ArrayList<>();
wildcardCallbacks.add(pairToAddTo);
}
needSubscribe = pairToAddTo.callbacks.isEmpty();
pairToAddTo.callbacks.add(callback);
} else { // normal topic
java.util.List<java.util.function.BiConsumer<String, byte[]>> callbacksForTopic = normalCallbacks.get(topic);
if (callbacksForTopic == null || callbacksForTopic.isEmpty()) {
if (callbacksForTopic == null) {
callbacksForTopic = new java.util.ArrayList<>();
normalCallbacks.put(topic, callbacksForTopic);
needSubscribe = true;
} else {
needSubscribe = false;
}
needSubscribe = callbacksForTopic.isEmpty();
callbacksForTopic.add(callback);
}
if (needSubscribe) {
......@@ -347,40 +355,49 @@ public class MqttHandler {
return topic.contains("*") || topic.contains("#");
}
private String regexForWildcardTopic(String topic) {
return topic.replace("*", "[^/]*").replace("#", ".*");
}
public boolean disconnect(String topic, java.util.function.BiConsumer<String, byte[]> callback) {
boolean needUnsubscribe = false;
java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>(true);
boolean foundTopicInCallbacks = false;
final String topicToUnsubscribe;
// check if wildcard is to be removed
if (isWildcardTopic(topic)) {
java.util.regex.Pattern wildcardPatternToRemove = null;
for (java.util.Map.Entry<java.util.regex.Pattern, java.util.List<java.util.function.BiConsumer<String, byte[]>>> entry : wildcardCallbacks.entrySet()) {
if (entry.getKey().pattern().equals(topic)) {
foundTopicInCallbacks = true;
boolean topicRegistered = false;
String topicRegex = regexForWildcardTopic(topic);
for (PatternCallbackListPair pair : wildcardCallbacks) {
if (pair.pattern.pattern().equals(topicRegex)) {
topicRegistered = true;
// if still successful, update with whether callback could be removed
success.compareAndSet(true, (entry.getValue().remove(callback)));
if (entry.getValue().isEmpty()) {
wildcardPatternToRemove = entry.getKey();
success.compareAndSet(true, pair.callbacks.remove(callback));
// if no more callbacks left, unsubscribe and remove from list
if (pair.callbacks.isEmpty()) {
needUnsubscribe = true;
wildcardCallbacks.remove(pair.pattern);
}
break;
}
}
;
if (wildcardPatternToRemove != null) {
wildcardCallbacks.remove(wildcardPatternToRemove);
}
topicToUnsubscribe = topicRegistered ? topicRegex : null;
} else if (normalCallbacks.containsKey(topic)) {
foundTopicInCallbacks = true;
// if still successful, update with whether callback could be removed
topicToUnsubscribe = topic;
var normalCallbackList = normalCallbacks.get(topic);
// if still successful, update with whether callback could be removed
success.compareAndSet(true, normalCallbackList.remove(callback));
needUnsubscribe |= normalCallbackList.isEmpty();
// if no more callbacks left, unsubscribe and remove from list
if (normalCallbackList.isEmpty()) {
needUnsubscribe = true;
normalCallbacks.remove(topic);
}
} else {
topicToUnsubscribe = null;
}
if (!foundTopicInCallbacks) {
if (topicToUnsubscribe == null) {
logger.warn("Disconnect for not connected topic '{}'", topic);
return false;
}
......@@ -389,7 +406,7 @@ public class MqttHandler {
java.util.concurrent.CountDownLatch operationFinished = new java.util.concurrent.CountDownLatch(1);
// no callbacks anymore for this topic, unsubscribe from mqtt
connection.getDispatchQueue().execute(() -> {
org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topic);
org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topicToUnsubscribe);
org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer};
connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<>() {
@Override
......
......@@ -2,7 +2,7 @@ import java.util.concurrent.TimeUnit;aspect RestHandler {
public class RestServerHandler {
private static final int DEFAULT_PORT = 4567;
private final java.util.Map<Integer, RestHandler> handlers = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
private final java.util.Map<RagConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
private String name;
public RestServerHandler() {
......@@ -25,21 +25,19 @@ public class RestServerHandler {
return handler;
}
public ConnectToken newPUTConnection(java.net.URI uri, java.util.function.Consumer<String> callback) {
ConnectToken connectToken = new ConnectToken(uri);
resolveHandler(uri).newPUTConnection(uri.getPath(), callback);
public boolean newPUTConnection(RagConnectToken connectToken, java.util.function.Consumer<String> callback) {
tokensForRemoval.put(connectToken, callback);
return connectToken;
resolveHandler(connectToken.uri).newPUTConnection(connectToken.uri.getPath(), callback);
return true;
}
public ConnectToken newGETConnection(java.net.URI uri, SupplierWithException<String> supplier) {
ConnectToken connectToken = new ConnectToken(uri);
resolveHandler(uri).newGETConnection(uri.getPath(), supplier);
public boolean newGETConnection(RagConnectToken connectToken, SupplierWithException<String> supplier) {
tokensForRemoval.put(connectToken, supplier);
return connectToken;
resolveHandler(connectToken.uri).newGETConnection(connectToken.uri.getPath(), supplier);
return true;
}
public boolean disconnect(ConnectToken connectToken) {
public boolean disconnect(RagConnectToken connectToken) {
RestHandler handler = resolveHandler(connectToken.uri);
return handler != null ? handler.disconnect(connectToken.uri.getPath(), tokensForRemoval.get(connectToken)) : false;
}
......@@ -127,9 +125,15 @@ public class RestHandler {
}
public boolean disconnect(String path, Object callbackOrSupplier) {
// only one will succeed (or false will be returned)
return callbacks.getOrDefault(path, java.util.Collections.emptyList()).remove(callbackOrSupplier) ||
suppliers.remove(path, callbackOrSupplier);
if (callbacks.getOrDefault(path, java.util.Collections.emptyList()).remove(callbackOrSupplier)) {
return true;
}
if (suppliers.remove(path, callbackOrSupplier)) {
// unmap the route
return spark.Spark.unmap(path);
}
System.err.println("Disconnect for not connected path '" + path + "'!");
return false;
}
private String makeError(spark.Response response, int statusCode, String message) {
......
......@@ -14,15 +14,74 @@ aspect RagConnectHandler {
{{#InUse}}{{FieldName}}.close();{{/InUse}}
{{/Handlers}}
}
class ConnectToken {
class RagConnectToken {
static java.util.concurrent.atomic.AtomicLong counter = new java.util.concurrent.atomic.AtomicLong(0);
final long id;
final java.net.URI uri;
public ConnectToken(java.net.URI uri) {
final String entityName;
public RagConnectToken(java.net.URI uri, String entityName) {
this.id = counter.incrementAndGet();
this.uri = uri;
this.entityName = entityName;
}
}
class RagConnectTokenMap {
java.util.Map<ASTNode, java.util.List<RagConnectToken>> connectTokensSend = new java.util.HashMap<>();
java.util.Map<ASTNode, java.util.List<RagConnectToken>> connectTokensReceive = new java.util.HashMap<>();
void add(ASTNode node, boolean isReceive, RagConnectToken token) {
java.util.Map<ASTNode, java.util.List<RagConnectToken>> mapOfTokens = (isReceive ? connectTokensReceive : connectTokensSend);
mapOfTokens.computeIfAbsent(node, n -> new java.util.ArrayList<>()).add(token);
}
java.util.List<RagConnectToken> removeAll(ASTNode node, boolean isReceive, java.net.URI uri, String entityName) {
java.util.List<RagConnectToken> listOfTokens = (isReceive ? connectTokensReceive : connectTokensSend).get(node);
if (listOfTokens == null) {
return java.util.Collections.emptyList();
}
java.util.List<RagConnectToken> tokensToRemove = listOfTokens.stream()
.filter(token -> token.uri.equals(uri) && token.entityName.equals(entityName))
.collect(java.util.stream.Collectors.toList());
listOfTokens.removeAll(tokensToRemove);
return tokensToRemove;
}
}
static RagConnectTokenMap ASTNode.connectTokenMap = new RagConnectTokenMap();
interface RagConnectDisconnectHandlerMethod {
boolean call(RagConnectToken token) throws java.io.IOException;
}
class RagConnectPublisher {
java.util.List<Runnable> senders = new java.util.ArrayList<>();
java.util.Map<RagConnectToken, Runnable> tokenToSender;
byte[] lastValue;
void add(Runnable sender, RagConnectToken connectToken) {
if (tokenToSender == null) {
tokenToSender = new java.util.HashMap<>();
}
senders.add(sender);
tokenToSender.put(connectToken, sender);
}
boolean remove(RagConnectToken token) {
if (tokenToSender == null) {
System.err.println("Removing sender before first addition for " + token.entityName + " at " + token.uri);
return false;
}
Runnable sender = tokenToSender.remove(token);
if (sender == null) {
System.err.println("Could not find connected sender for " + token.entityName + " at " + token.uri);
return false;
}
boolean success = senders.remove(sender);
if (senders.isEmpty()) {
lastValue = null;
}
return success;
}
void run() {
senders.forEach(Runnable::run);
}
}
static java.util.Map<ASTNode, java.util.Map<java.net.URI, ConnectToken>> ASTNode.connectTokens = new java.util.HashMap<>();
}
......@@ -58,12 +58,12 @@ aspect RagConnectObserver {
class RagConnectObserver implements ASTState.Trace.Receiver {
class RagConnectObserverEntry {
final ConnectToken connectToken;
final RagConnectToken connectToken;
final ASTNode node;
final String attributeString;
final Runnable attributeCall;
RagConnectObserverEntry(ConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
RagConnectObserverEntry(RagConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
this.connectToken = connectToken;
this.node = node;
this.attributeString = attributeString;
......@@ -99,13 +99,13 @@ aspect RagConnectObserver {
node.trace().setReceiver(this);
}
void add(ConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
void add(RagConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
{{#loggingEnabledForIncremental}}
System.out.println("** observer add: " + node + " on " + attributeString);
{{/loggingEnabledForIncremental}}
observedNodes.add(new RagConnectObserverEntry(connectToken, node, attributeString, attributeCall));
}
void remove(ConnectToken connectToken) {
void remove(RagConnectToken connectToken) {
observedNodes.removeIf(entry -> entry.connectToken.equals(connectToken));
}
@Override
......
......@@ -99,47 +99,56 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
private boolean {{parentTypeName}}.{{internalConnectMethod}}(String {{connectParameterName}},
java.util.function.BiConsumer<String, byte[]> consumer) throws java.io.IOException {
{{>handleUri}}
ConnectToken connectToken;
RagConnectToken connectToken = new RagConnectToken(uri, "{{entityName}}");
boolean success;
switch (scheme) {
{{#usesMqtt}}
case "mqtt":
connectToken = {{mqttHandlerAttribute}}().newConnection(uri, consumer);
if (connectToken == null) {
return false;
}
success = {{mqttHandlerAttribute}}().newConnection(connectToken, consumer);
break;
{{/usesMqtt}}
{{#usesRest}}
case "rest":
connectToken = {{restHandlerAttribute}}().newPUTConnection(uri, input -> {
success = {{restHandlerAttribute}}().newPUTConnection(connectToken, input -> {
// TODO wildcard-topic not supported yet
consumer.accept("", input.getBytes());
});
if (connectToken == null) {
return false;
}
break;
{{/usesRest}}
default:
System.err.println("Unknown protocol '" + scheme + "'.");
return false;
success = false;
}
if (success) {
connectTokenMap.add(this, true, connectToken);
}
connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>())
.put(uri, connectToken);
return true;
return success;
}
public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameterName}}) throws java.io.IOException {
{{>handleUri}}
java.util.List<RagConnectToken> connectTokens = connectTokenMap.removeAll(this, true, uri, "{{entityName}}");
if (connectTokens.isEmpty()) {
System.err.println("Disconnect called without connection for receiving " + this + ".{{entityName}} to '" + {{connectParameterName}} + "'!");
return false;
}
RagConnectDisconnectHandlerMethod disconnectingMethod;
switch (scheme) {
{{#usesMqtt}}
case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
case "mqtt": disconnectingMethod = {{mqttHandlerAttribute}}()::disconnect;
break;
{{/usesMqtt}}
{{#usesRest}}
case "rest": return {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
case "rest": disconnectingMethod = {{restHandlerAttribute}}()::disconnect;
break;
{{/usesRest}}
default:
System.err.println("Unknown protocol '" + scheme + "'.");
System.err.println("Unknown protocol '" + scheme + "' in '" + {{connectParameterName}} + "' for disconnecting {{parentTypeName}}.{{entityName}}");
return false;
}
boolean success = true;
for (RagConnectToken connectToken : connectTokens) {
success &= disconnectingMethod.call(connectToken);
}
return success;
}
private Runnable {{parentTypeName}}.{{sender}} = null;
private byte[] {{parentTypeName}}.{{lastValue}} = null;
private RagConnectPublisher {{parentTypeName}}.{{sender}} = new RagConnectPublisher();
public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterName}}, boolean writeCurrentValue) throws java.io.IOException {
{{>handleUri}}
ConnectToken connectToken;
if (connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>())
.get(uri) != null) {
System.err.println("Already connected for " + uri + " on " + this + "!");
return true;
}
RagConnectToken connectToken = new RagConnectToken(uri, "{{entityName}}");
boolean success;
switch (scheme) {
{{#usesMqtt}}
case "mqtt":
final MqttHandler handler = {{mqttHandlerAttribute}}().resolveHandler(uri);
final String topic = {{mqttHandlerAttribute}}().extractTopic(uri);
{{sender}} = () -> {
{{sender}}.add(() -> {
{{#loggingEnabledForWrites}}
System.out.println("[Send] {{entityName}} = " + {{getterMethod}}() + " -> " + {{connectParameterName}});
{{/loggingEnabledForWrites}}
handler.publish(topic, {{lastValue}});
};
}, connectToken);
{{updateMethod}}();
if (writeCurrentValue) {
{{writeMethod}}();
}
connectToken = new ConnectToken(uri);
success = true;
break;
{{/usesMqtt}}
{{#usesRest}}
case "rest":
connectToken = {{restHandlerAttribute}}().newGETConnection(uri, () -> {
success = {{restHandlerAttribute}}().newGETConnection(connectToken, () -> {
{{updateMethod}}();
return new String({{lastValue}});
});
if (connectToken == null) {
return false;
}
break;
{{/usesRest}}
default:
System.err.println("Unknown protocol '" + scheme + "'.");
return false;
success = false;
}
connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>())
.put(uri, connectToken);
{{#incrementalOptionActive}}
_ragConnectObserver().add(connectToken, this, "{{getterMethod}}", () -> {
if (this.{{updateMethod}}()) {
this.{{writeMethod}}();
}
});
{{/incrementalOptionActive}}
return true;
if (success) {
connectTokenMap.add(this, false, connectToken);
{{#incrementalOptionActive}}
_ragConnectObserver().add(connectToken, this, "{{getterMethod}}", () -> {
if (this.{{updateMethod}}()) {
this.{{writeMethod}}();
}
});
{{/incrementalOptionActive}}
}
return success;
}
public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameterName}}) throws java.io.IOException {
{{>handleUri}}
ConnectToken connectToken = connectTokens.get(this).remove(uri);
if (connectToken == null) {
System.err.println("Disconnect without connect for " + uri + " on " + this + "!");
java.util.List<RagConnectToken> connectTokens = connectTokenMap.removeAll(this, false, uri, "{{entityName}}");
if (connectTokens.isEmpty()) {
System.err.println("Disconnect called without connection for sending " + this + ".{{entityName}} to '" + {{connectParameterName}} + "'!");
return false;
}
{{#incrementalOptionActive}}
_ragConnectObserver().remove(connectToken);
connectTokens.forEach(token -> _ragConnectObserver().remove(token));
{{/incrementalOptionActive}}
RagConnectDisconnectHandlerMethod disconnectingMethod;
switch (scheme) {
{{#usesMqtt}}
case "mqtt":
{{sender}} = null;
{{lastValue}} = null;
disconnectingMethod = {{sender}}::remove;
break;
{{/usesMqtt}}
{{#usesRest}}
case "rest":
{{restHandlerAttribute}}().disconnect(connectToken);
disconnectingMethod = {{restHandlerAttribute}}()::disconnect;
break;
{{/usesRest}}
default:
System.err.println("Unknown protocol '" + scheme + "'.");
System.err.println("Unknown protocol '" + scheme + "' in '" + {{connectParameterName}} + "' for disconnecting {{parentTypeName}}.{{entityName}}");
return false;
}
return true;
boolean success = true;
for (RagConnectToken connectToken : connectTokens) {
success &= disconnectingMethod.call(connectToken);
}
return success;
}
protected boolean {{parentTypeName}}.{{updateMethod}}() {
......
......@@ -51,7 +51,7 @@ dependencies {
testImplementation group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15'
// rest and client
testImplementation group: 'com.sparkjava', name: 'spark-core', version: '2.9.2'
testImplementation group: 'com.sparkjava', name: 'spark-core', version: '2.9.3'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.11.2'
testImplementation group: 'org.glassfish.jersey.core', name: 'jersey-client', version: '2.31'
testImplementation group: 'org.glassfish.jersey.inject', name: 'jersey-hk2', version: '2.31'
......@@ -550,15 +550,21 @@ task compileSingleListVariantIncremental(type: RagConnectTest, dependsOn: ':ragc
}
}
//task cleanCurrentManualTest(type: Delete) {
task cleanCurrentManualTest(type: Delete) {
// delete "src/test/02-after-ragconnect/singleListVariant"
// delete "src/test/03-after-relast/singleListVariant"
// delete "src/test/java-gen/singleListVariant/ast"
//}
//task cleanCurrentIncrementalTest(type: Delete) {
delete "src/test/02-after-ragconnect/singleList"
delete "src/test/03-after-relast/singleList"
delete "src/test/java-gen/singleList/ast"
}
task cleanCurrentIncrementalTest(type: Delete) {