diff --git a/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd b/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd index 607f96066832b1f788d0d1d6e5d0a18552f11337..9ee95b7ba4d87ecbaf60a75271dadcd82e38d190 100644 --- a/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd +++ b/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd @@ -26,6 +26,17 @@ aspect AttributesForMustache { syn String MEndpointDefinition.connectParameterName() = "uriString"; syn String MEndpointDefinition.connectMethod() = "connect" + tokenName(); + + syn String MEndpointDefinition.disconnectMethod() { + // if both (send and receive) are defined for the token, ensure methods with different names + String extra = endpointDef().lookupTokenEndpointDefinitions(token()).size() > 1 ? uniqueSuffix() : ""; + return "disconnect" + extra + tokenName(); + } + // + syn String MEndpointDefinition.uniqueSuffix(); + eq MSendDefinition.uniqueSuffix() = "Send"; + eq MReceiveDefinition.uniqueSuffix() = "Receive"; + syn TokenComponent MEndpointDefinition.token() = endpointDef().getToken(); syn boolean MEndpointDefinition.alwaysApply() = endpointDef().getAlwaysApply(); syn String MEndpointDefinition.parentTypeName() = token().containingTypeDecl().getName(); diff --git a/ragconnect.base/src/main/resources/MqttHandler.jadd b/ragconnect.base/src/main/resources/MqttHandler.jadd index b6750261d9f4ad06d92721c95aa7aa6e7971849b..f5e6ba99b3058f59f6b9b40ae0d9c07f18da88c3 100644 --- a/ragconnect.base/src/main/resources/MqttHandler.jadd +++ b/ragconnect.base/src/main/resources/MqttHandler.jadd @@ -4,6 +4,7 @@ import java.util.concurrent.TimeUnit; aspect MqttHandler { public class MqttServerHandler { private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>(); + private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>(); private long time; private java.util.concurrent.TimeUnit unit; private String name; @@ -38,8 +39,16 @@ public class MqttServerHandler { return handler; } - public boolean newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException { - return resolveHandler(uri).newConnection(extractTopic(uri), callback); + public ConnectToken newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException { + ConnectToken connectToken = new ConnectToken(uri); + resolveHandler(uri).newConnection(extractTopic(uri), callback); + tokensForRemoval.put(connectToken, callback); + return connectToken; + } + + public boolean disconnect(ConnectToken connectToken) throws IOException { + MqttHandler handler = resolveHandler(connectToken.uri); + return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false; } public void publish(java.net.URI uri, byte[] bytes) throws IOException { @@ -146,14 +155,16 @@ public class MqttHandler { } @Override - public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, org.fusesource.hawtbuf.Buffer body, org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) { + public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, + org.fusesource.hawtbuf.Buffer body, + org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) { + // this method is called, whenever a MQTT message is received String topicString = topic.toString(); java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString); if (callbackList == null || callbackList.isEmpty()) { logger.debug("Got a message, but no callback to call. Forgot to subscribe?"); } else { byte[] message = body.toByteArray(); -// System.out.println("message = " + Arrays.toString(message)); for (java.util.function.Consumer<byte[]> callback : callbackList) { callback.accept(message); } @@ -162,13 +173,15 @@ public class MqttHandler { } @Override - public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer, org.fusesource.hawtbuf.Buffer body, Runnable ack) { + public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer, + org.fusesource.hawtbuf.Buffer body, + Runnable ack) { + // not used by this type of connection logger.warn("onPublish should not be called"); } @Override public void onFailure(Throwable cause) { -// logger.catching(cause); error.set(cause); } }); @@ -179,7 +192,11 @@ public class MqttHandler { @Override public void onSuccess(Void value) { if (MqttHandler.this.sendWelcomeMessage) { - connection.publish("components", (name + " is connected").getBytes(), org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, false, new org.fusesource.mqtt.client.Callback<Void>() { + connection.publish("components", + (name + " is connected").getBytes(), + org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, + false, + new org.fusesource.mqtt.client.Callback<Void>() { @Override public void onSuccess(Void value) { logger.debug("success sending welcome message"); @@ -198,7 +215,6 @@ public class MqttHandler { @Override public void onFailure(Throwable cause) { -// logger.error("Could not connect", cause); error.set(cause); } }); @@ -228,8 +244,6 @@ public class MqttHandler { if (readyLatch.getCount() > 0) { System.err.println("Handler not ready"); return false; -// // should maybe be something more kind than throwing an exception here -// throw new IllegalStateException("Updater not ready"); } // register callback logger.debug("new connection for {}", topic); @@ -256,6 +270,35 @@ public class MqttHandler { return true; } + public boolean disconnect(String topic, Object callback) { + java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topic); + if (callbackList == null) { + logger.warn("Disconnect for not connected topic '{}'", topic); + return false; + } + java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>(); + success.set(callbackList.remove(callback)); + if (callbackList.isEmpty()) { + // no callbacks anymore for this topic, unsubscribe from mqtt + connection.getDispatchQueue().execute(() -> { + org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topic); + org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer}; + connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<Void>() { + @Override + public void onSuccess(Void value) { + // empty, all good + } + + @Override + public void onFailure(Throwable cause) { + success.set(false); + } + }); + }); + } + return success.get(); + } + /** * Waits until this updater is ready to receive MQTT messages. * If it already is ready, return immediately with the value <code>true</code>. diff --git a/ragconnect.base/src/main/resources/RestHandler.jadd b/ragconnect.base/src/main/resources/RestHandler.jadd index f7b1a83304fe7aaebe97d105514c3f192d052aec..b69bf71b73ba9d4c5a9f1998ee2dd92ede77561d 100644 --- a/ragconnect.base/src/main/resources/RestHandler.jadd +++ b/ragconnect.base/src/main/resources/RestHandler.jadd @@ -2,6 +2,7 @@ import java.util.concurrent.TimeUnit;aspect RestHandler { public class RestServerHandler { private static final int DEFAULT_PORT = 4567; private final java.util.Map<Integer, RestHandler> handlers = new java.util.HashMap<>(); + private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>(); private String name; public RestServerHandler() { @@ -24,14 +25,23 @@ public class RestServerHandler { return handler; } - public boolean newPUTConnection(java.net.URI uri, java.util.function.Consumer<String> callback) { + public ConnectToken newPUTConnection(java.net.URI uri, java.util.function.Consumer<String> callback) { + ConnectToken connectToken = new ConnectToken(uri); resolveHandler(uri).newPUTConnection(uri.getPath(), callback); - return true; + tokensForRemoval.put(connectToken, callback); + return connectToken; } - public boolean newGETConnection(java.net.URI uri, SupplierWithException<String> supplier) { + public ConnectToken newGETConnection(java.net.URI uri, SupplierWithException<String> supplier) { + ConnectToken connectToken = new ConnectToken(uri); resolveHandler(uri).newGETConnection(uri.getPath(), supplier); - return true; + tokensForRemoval.put(connectToken, supplier); + return connectToken; + } + + public boolean disconnect(ConnectToken connectToken) { + RestHandler handler = resolveHandler(connectToken.uri); + return handler != null ? handler.disconnect(connectToken.uri.getPath(), tokensForRemoval.get(connectToken)) : false; } public void close() { @@ -108,6 +118,7 @@ public class RestHandler { suppliers.put(path, supplier); spark.Spark.get(path, (request, response) -> { try { + // we could check for null here in case supplier has been disconnected return supplier.get(); } catch (Exception e) { return makeError(response, 500, e.getMessage()); @@ -115,6 +126,12 @@ public class RestHandler { }); } + public boolean disconnect(String path, Object callbackOrSupplier) { + // only one will succeed (or false will be returned) + return callbacks.getOrDefault(path, java.util.Collections.emptyList()).remove(callbackOrSupplier) || + suppliers.remove(path, callbackOrSupplier); + } + private String makeError(spark.Response response, int statusCode, String message) { response.status(statusCode); return message; diff --git a/ragconnect.base/src/main/resources/handler.mustache b/ragconnect.base/src/main/resources/handler.mustache index 796f76027fd22899b60e02a2fdd08fbf0dc9ebdb..41e42387f69239f054b8ddaafa80680b13ca7591 100644 --- a/ragconnect.base/src/main/resources/handler.mustache +++ b/ragconnect.base/src/main/resources/handler.mustache @@ -14,4 +14,15 @@ aspect RagConnectHandler { {{#InUse}}{{FieldName}}.close();{{/InUse}} {{/Handlers}} } + class ConnectToken { + static java.util.concurrent.atomic.AtomicLong counter = new java.util.concurrent.atomic.AtomicLong(0); + final long id; + final java.net.URI uri; + public ConnectToken(java.net.URI uri) { + this.id = counter.incrementAndGet(); + this.uri = uri; + } + + } + static java.util.Map<ASTNode, java.util.Map<java.net.URI, ConnectToken>> ASTNode.connectTokens = new java.util.HashMap<>(); } diff --git a/ragconnect.base/src/main/resources/receiveDefinition.mustache b/ragconnect.base/src/main/resources/receiveDefinition.mustache index 1dd0e0ddbfbfd2acf03536ff41a2d740043eb34b..4f2aafeaf7a872aced9fb6f723b89c89a5f7d691 100644 --- a/ragconnect.base/src/main/resources/receiveDefinition.mustache +++ b/ragconnect.base/src/main/resources/receiveDefinition.mustache @@ -9,12 +9,42 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam }; switch (scheme) { {{#usesMqtt}} - case "mqtt": return {{mqttHandlerAttribute}}().newConnection(uri, consumer); + case "mqtt": + ConnectToken connectToken = {{mqttHandlerAttribute}}().newConnection(uri, consumer); + if (connectToken == null) { + return false; + } + connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>()) + .put(uri, connectToken); + break; {{/usesMqtt}} {{#usesRest}} - case "rest": return {{restHandlerAttribute}}().newPUTConnection(uri, input -> { - consumer.accept(input.getBytes()); - }); + case "rest": + ConnectToken connectToken = {{restHandlerAttribute}}().newPUTConnection(uri, input -> { + consumer.accept(input.getBytes()); + }); + if (connectToken == null) { + return false; + } + connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>()) + .put(uri, connectToken); + break; + {{/usesRest}} + default: + System.err.println("Unknown protocol '" + scheme + "'."); + return false; + } + return true; +} + +public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameterName}}) throws java.io.IOException { + {{>handleUri}} + switch (scheme) { + {{#usesMqtt}} + case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); + {{/usesMqtt}} + {{#usesRest}} + case "rest": return {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); {{/usesRest}} default: System.err.println("Unknown protocol '" + scheme + "'."); diff --git a/ragconnect.base/src/main/resources/sendDefinition.mustache b/ragconnect.base/src/main/resources/sendDefinition.mustache index 60f5efcca8f3333d41fc8cc4706a736946a42ab7..d382645f4b5ecfde148857e3073bb48e78ebebb2 100644 --- a/ragconnect.base/src/main/resources/sendDefinition.mustache +++ b/ragconnect.base/src/main/resources/sendDefinition.mustache @@ -22,10 +22,36 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam {{/usesMqtt}} {{#usesRest}} case "rest": - {{restHandlerAttribute}}().newGETConnection(uri, () -> { + ConnectToken connectToken = {{restHandlerAttribute}}().newGETConnection(uri, () -> { {{updateMethod}}(); return new String({{lastValue}}); }); + if (connectToken == null) { + return false; + } + connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>()) + .put(uri, connectToken); + break; + {{/usesRest}} + default: + System.err.println("Unknown protocol '" + scheme + "'."); + return false; + } + return true; +} + +public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameterName}}) throws java.io.IOException { + {{>handleUri}} + switch (scheme) { + {{#usesMqtt}} + case "mqtt": + {{sender}} = null; + {{lastValue}} = null; + break; + {{/usesMqtt}} + {{#usesRest}} + case "rest": + {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); break; {{/usesRest}} default: