Commit b6992f11 authored by René Schöne's avatar René Schöne
Browse files

Untested disconnect functionality.

parent ae2e1755
......@@ -26,6 +26,17 @@ aspect AttributesForMustache {
syn String MEndpointDefinition.connectParameterName() = "uriString";
syn String MEndpointDefinition.connectMethod() = "connect" + tokenName();
syn String MEndpointDefinition.disconnectMethod() {
// if both (send and receive) are defined for the token, ensure methods with different names
String extra = endpointDef().lookupTokenEndpointDefinitions(token()).size() > 1 ? uniqueSuffix() : "";
return "disconnect" + extra + tokenName();
}
//
syn String MEndpointDefinition.uniqueSuffix();
eq MSendDefinition.uniqueSuffix() = "Send";
eq MReceiveDefinition.uniqueSuffix() = "Receive";
syn TokenComponent MEndpointDefinition.token() = endpointDef().getToken();
syn boolean MEndpointDefinition.alwaysApply() = endpointDef().getAlwaysApply();
syn String MEndpointDefinition.parentTypeName() = token().containingTypeDecl().getName();
......
......@@ -4,6 +4,7 @@ import java.util.concurrent.TimeUnit;
aspect MqttHandler {
public class MqttServerHandler {
private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
private long time;
private java.util.concurrent.TimeUnit unit;
private String name;
......@@ -38,8 +39,16 @@ public class MqttServerHandler {
return handler;
}
public boolean newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException {
return resolveHandler(uri).newConnection(extractTopic(uri), callback);
public ConnectToken newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException {
ConnectToken connectToken = new ConnectToken(uri);
resolveHandler(uri).newConnection(extractTopic(uri), callback);
tokensForRemoval.put(connectToken, callback);
return connectToken;
}
public boolean disconnect(ConnectToken connectToken) throws IOException {
MqttHandler handler = resolveHandler(connectToken.uri);
return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false;
}
public void publish(java.net.URI uri, byte[] bytes) throws IOException {
......@@ -146,14 +155,16 @@ public class MqttHandler {
}
@Override
public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, org.fusesource.hawtbuf.Buffer body, org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic,
org.fusesource.hawtbuf.Buffer body,
org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
// this method is called, whenever a MQTT message is received
String topicString = topic.toString();
java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString);
if (callbackList == null || callbackList.isEmpty()) {
logger.debug("Got a message, but no callback to call. Forgot to subscribe?");
} else {
byte[] message = body.toByteArray();
// System.out.println("message = " + Arrays.toString(message));
for (java.util.function.Consumer<byte[]> callback : callbackList) {
callback.accept(message);
}
......@@ -162,13 +173,15 @@ public class MqttHandler {
}
@Override
public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer, org.fusesource.hawtbuf.Buffer body, Runnable ack) {
public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer,
org.fusesource.hawtbuf.Buffer body,
Runnable ack) {
// not used by this type of connection
logger.warn("onPublish should not be called");
}
@Override
public void onFailure(Throwable cause) {
// logger.catching(cause);
error.set(cause);
}
});
......@@ -179,7 +192,11 @@ public class MqttHandler {
@Override
public void onSuccess(Void value) {
if (MqttHandler.this.sendWelcomeMessage) {
connection.publish("components", (name + " is connected").getBytes(), org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, false, new org.fusesource.mqtt.client.Callback<Void>() {
connection.publish("components",
(name + " is connected").getBytes(),
org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE,
false,
new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
......@@ -198,7 +215,6 @@ public class MqttHandler {
@Override
public void onFailure(Throwable cause) {
// logger.error("Could not connect", cause);
error.set(cause);
}
});
......@@ -228,8 +244,6 @@ public class MqttHandler {
if (readyLatch.getCount() > 0) {
System.err.println("Handler not ready");
return false;
// // should maybe be something more kind than throwing an exception here
// throw new IllegalStateException("Updater not ready");
}
// register callback
logger.debug("new connection for {}", topic);
......@@ -256,6 +270,35 @@ public class MqttHandler {
return true;
}
public boolean disconnect(String topic, Object callback) {
java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topic);
if (callbackList == null) {
logger.warn("Disconnect for not connected topic '{}'", topic);
return false;
}
java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>();
success.set(callbackList.remove(callback));
if (callbackList.isEmpty()) {
// no callbacks anymore for this topic, unsubscribe from mqtt
connection.getDispatchQueue().execute(() -> {
org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topic);
org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer};
connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
// empty, all good
}
@Override
public void onFailure(Throwable cause) {
success.set(false);
}
});
});
}
return success.get();
}
/**
* Waits until this updater is ready to receive MQTT messages.
* If it already is ready, return immediately with the value <code>true</code>.
......
......@@ -2,6 +2,7 @@ import java.util.concurrent.TimeUnit;aspect RestHandler {
public class RestServerHandler {
private static final int DEFAULT_PORT = 4567;
private final java.util.Map<Integer, RestHandler> handlers = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
private String name;
public RestServerHandler() {
......@@ -24,14 +25,23 @@ public class RestServerHandler {
return handler;
}
public boolean newPUTConnection(java.net.URI uri, java.util.function.Consumer<String> callback) {
public ConnectToken newPUTConnection(java.net.URI uri, java.util.function.Consumer<String> callback) {
ConnectToken connectToken = new ConnectToken(uri);
resolveHandler(uri).newPUTConnection(uri.getPath(), callback);
return true;
tokensForRemoval.put(connectToken, callback);
return connectToken;
}
public boolean newGETConnection(java.net.URI uri, SupplierWithException<String> supplier) {
public ConnectToken newGETConnection(java.net.URI uri, SupplierWithException<String> supplier) {
ConnectToken connectToken = new ConnectToken(uri);
resolveHandler(uri).newGETConnection(uri.getPath(), supplier);
return true;
tokensForRemoval.put(connectToken, supplier);
return connectToken;
}
public boolean disconnect(ConnectToken connectToken) {
RestHandler handler = resolveHandler(connectToken.uri);
return handler != null ? handler.disconnect(connectToken.uri.getPath(), tokensForRemoval.get(connectToken)) : false;
}
public void close() {
......@@ -108,6 +118,7 @@ public class RestHandler {
suppliers.put(path, supplier);
spark.Spark.get(path, (request, response) -> {
try {
// we could check for null here in case supplier has been disconnected
return supplier.get();
} catch (Exception e) {
return makeError(response, 500, e.getMessage());
......@@ -115,6 +126,12 @@ public class RestHandler {
});
}
public boolean disconnect(String path, Object callbackOrSupplier) {
// only one will succeed (or false will be returned)
return callbacks.getOrDefault(path, java.util.Collections.emptyList()).remove(callbackOrSupplier) ||
suppliers.remove(path, callbackOrSupplier);
}
private String makeError(spark.Response response, int statusCode, String message) {
response.status(statusCode);
return message;
......
......@@ -14,4 +14,15 @@ aspect RagConnectHandler {
{{#InUse}}{{FieldName}}.close();{{/InUse}}
{{/Handlers}}
}
class ConnectToken {
static java.util.concurrent.atomic.AtomicLong counter = new java.util.concurrent.atomic.AtomicLong(0);
final long id;
final java.net.URI uri;
public ConnectToken(java.net.URI uri) {
this.id = counter.incrementAndGet();
this.uri = uri;
}
}
static java.util.Map<ASTNode, java.util.Map<java.net.URI, ConnectToken>> ASTNode.connectTokens = new java.util.HashMap<>();
}
......@@ -9,12 +9,42 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
};
switch (scheme) {
{{#usesMqtt}}
case "mqtt": return {{mqttHandlerAttribute}}().newConnection(uri, consumer);
case "mqtt":
ConnectToken connectToken = {{mqttHandlerAttribute}}().newConnection(uri, consumer);
if (connectToken == null) {
return false;
}
connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>())
.put(uri, connectToken);
break;
{{/usesMqtt}}
{{#usesRest}}
case "rest": return {{restHandlerAttribute}}().newPUTConnection(uri, input -> {
consumer.accept(input.getBytes());
});
case "rest":
ConnectToken connectToken = {{restHandlerAttribute}}().newPUTConnection(uri, input -> {
consumer.accept(input.getBytes());
});
if (connectToken == null) {
return false;
}
connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>())
.put(uri, connectToken);
break;
{{/usesRest}}
default:
System.err.println("Unknown protocol '" + scheme + "'.");
return false;
}
return true;
}
public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameterName}}) throws java.io.IOException {
{{>handleUri}}
switch (scheme) {
{{#usesMqtt}}
case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{/usesMqtt}}
{{#usesRest}}
case "rest": return {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{/usesRest}}
default:
System.err.println("Unknown protocol '" + scheme + "'.");
......
......@@ -22,10 +22,36 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
{{/usesMqtt}}
{{#usesRest}}
case "rest":
{{restHandlerAttribute}}().newGETConnection(uri, () -> {
ConnectToken connectToken = {{restHandlerAttribute}}().newGETConnection(uri, () -> {
{{updateMethod}}();
return new String({{lastValue}});
});
if (connectToken == null) {
return false;
}
connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>())
.put(uri, connectToken);
break;
{{/usesRest}}
default:
System.err.println("Unknown protocol '" + scheme + "'.");
return false;
}
return true;
}
public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameterName}}) throws java.io.IOException {
{{>handleUri}}
switch (scheme) {
{{#usesMqtt}}
case "mqtt":
{{sender}} = null;
{{lastValue}} = null;
break;
{{/usesMqtt}}
{{#usesRest}}
case "rest":
{{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
break;
{{/usesRest}}
default:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment