Select Git revision
MqttHandler.jadd 12.16 KiB
import java.io.IOException;
import java.util.concurrent.TimeUnit;
aspect MqttHandler {
public class MqttServerHandler {
private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
private long time;
private java.util.concurrent.TimeUnit unit;
private String name;
public MqttServerHandler() {
this("RagConnect");
}
public MqttServerHandler(String name) {
this.name = name;
setupWaitUntilReady(1, TimeUnit.SECONDS);
}
public void setupWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
this.time = time;
this.unit = unit;
}
public MqttHandler resolveHandler(java.net.URI uri) throws IOException {
MqttHandler handler = handlers.get(uri.getHost());
if (handler == null) {
// first connect to that server
handler = new MqttHandler();
if (uri.getPort() == -1) {
handler.setHost(uri.getHost());
} else {
handler.setHost(uri.getHost(), uri.getPort());
}
handlers.put(uri.getHost(), handler);
}
handler.waitUntilReady(this.time, this.unit);
return handler;
}
public ConnectToken newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException {
ConnectToken connectToken = new ConnectToken(uri);
resolveHandler(uri).newConnection(extractTopic(uri), callback);
tokensForRemoval.put(connectToken, callback);
return connectToken;
}
public boolean disconnect(ConnectToken connectToken) throws IOException {
MqttHandler handler = resolveHandler(connectToken.uri);
return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false;
}
public void publish(java.net.URI uri, byte[] bytes) throws IOException {
resolveHandler(uri).publish(extractTopic(uri), bytes);
}
public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws IOException {
resolveHandler(uri).publish(extractTopic(uri), bytes, retain);
}
public void publish(java.net.URI uri, byte[] bytes,
org.fusesource.mqtt.client.QoS qos, boolean retain) throws IOException {
resolveHandler(uri).publish(extractTopic(uri), bytes, qos, retain);
}
public static String extractTopic(java.net.URI uri) {
String path = uri.getPath();
if (path.charAt(0) == '/') {
path = path.substring(1);
}
return path;
}
public void close() {
for (MqttHandler handler : handlers.values()) {
handler.close();
}
}
}
/**
* Helper class to receive updates via MQTT and use callbacks to handle those messages.
*
* @author rschoene - Initial contribution
*/
public class MqttHandler {
private static final int DEFAULT_PORT = 1883;
private final org.apache.logging.log4j.Logger logger;
private final String name;
/** The host running the MQTT broker. */
private java.net.URI host;
/** The connection to the MQTT broker. */
private org.fusesource.mqtt.client.CallbackConnection connection;
/** Whether we are connected yet */
private final java.util.concurrent.CountDownLatch readyLatch;
private boolean sendWelcomeMessage = true;
private org.fusesource.mqtt.client.QoS qos;
/** Dispatch knowledge */
private final java.util.Map<String, java.util.List<java.util.function.Consumer<byte[]>>> callbacks;
public MqttHandler() {
this("RagConnect");
}
public MqttHandler(String name) {
this.name = java.util.Objects.requireNonNull(name, "Name must be set");
this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class);
this.callbacks = new java.util.HashMap<>();
this.readyLatch = new java.util.concurrent.CountDownLatch(1);
this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
}
public MqttHandler dontSendWelcomeMessage() {
this.sendWelcomeMessage = false;
return this;
}
/**
* Sets the host (with default port) to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttHandler setHost(String host) throws java.io.IOException {
return setHost(host, DEFAULT_PORT);
}
/**
* Sets the host to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttHandler setHost(String host, int port) throws java.io.IOException {
java.util.Objects.requireNonNull(host, "Host need to be set!");
this.host = java.net.URI.create("tcp://" + host + ":" + port);
logger.debug("Host for {} is {}", this.name, this.host);
org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT();
mqtt.setHost(this.host);
connection = mqtt.callbackConnection();
java.util.concurrent.atomic.AtomicReference<Throwable> error = new java.util.concurrent.atomic.AtomicReference<>();
// add the listener to dispatch messages later
connection.listener(new org.fusesource.mqtt.client.ExtendedListener() {
public void onConnected() {
logger.debug("Connected");
}
@Override
public void onDisconnected() {
logger.debug("Disconnected");
}
@Override
public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic,
org.fusesource.hawtbuf.Buffer body,
org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
// this method is called, whenever a MQTT message is received
String topicString = topic.toString();
java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString);
if (callbackList == null || callbackList.isEmpty()) {
logger.debug("Got a message, but no callback to call. Forgot to subscribe?");
} else {
byte[] message = body.toByteArray();
for (java.util.function.Consumer<byte[]> callback : callbackList) {
callback.accept(message);
}
}
ack.onSuccess(null); // always acknowledge message
}
@Override
public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer,
org.fusesource.hawtbuf.Buffer body,
Runnable ack) {
// not used by this type of connection
logger.warn("onPublish should not be called");
}
@Override
public void onFailure(Throwable cause) {
error.set(cause);
}
});
throwIf(error);
// actually establish the connection
connection.connect(new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
if (MqttHandler.this.sendWelcomeMessage) {
connection.publish("components",
(name + " is connected").getBytes(),
org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE,
false,
new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
setReady();
}
@Override
public void onFailure(Throwable value) {
logger.debug("failure sending welcome message", value);
}
});
} else {
setReady();
}
}
@Override
public void onFailure(Throwable cause) {
error.set(cause);
}
});
throwIf(error);
return this;
}
public java.net.URI getHost() {
return host;
}
private void setReady() {
readyLatch.countDown();
}
private void throwIf(java.util.concurrent.atomic.AtomicReference<Throwable> error) throws java.io.IOException {
if (error.get() != null) {
throw new java.io.IOException(error.get());
}
}
public void setQoSForSubscription(org.fusesource.mqtt.client.QoS qos) {
this.qos = qos;
}
public boolean newConnection(String topic, java.util.function.Consumer<byte[]> callback) {
if (readyLatch.getCount() > 0) {
System.err.println("Handler not ready");
return false;
}
// register callback
logger.debug("new connection for {}", topic);
if (callbacks.get(topic) == null) {
callbacks.put(topic, new java.util.ArrayList<>());
// subscribe at broker
org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) };
connection.getDispatchQueue().execute(() -> {
connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
@Override
public void onSuccess(byte[] qoses) {
logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
}
@Override
public void onFailure(Throwable cause) {
logger.error("Could not subscribe to {}", topic, cause);
}
});
});
}
callbacks.get(topic).add(callback);
return true;
}
public boolean disconnect(String topic, Object callback) {
java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topic);
if (callbackList == null) {
logger.warn("Disconnect for not connected topic '{}'", topic);
return false;
}
java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>();
success.set(callbackList.remove(callback));
if (callbackList.isEmpty()) {
// no callbacks anymore for this topic, unsubscribe from mqtt
connection.getDispatchQueue().execute(() -> {
org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topic);
org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer};
connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
// empty, all good
}
@Override
public void onFailure(Throwable cause) {
success.set(false);
}
});
});
}
return success.get();
}
/**
* Waits until this updater is ready to receive MQTT messages.
* If it already is ready, return immediately with the value <code>true</code>.
* Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan,
* if it got ready, or <code>false</code> upon a timeout.
* @param time the maximum time to wait
* @param unit the time unit of the time argument
* @return whether this updater is ready
*/
public boolean waitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
try {
return readyLatch.await(time, unit);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public void close() {
if (connection == null) {
logger.warn("Stopping without connection. Was setHost() called?");
return;
}
connection.getDispatchQueue().execute(() -> {
connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.info("Disconnected {} from {}", name, host);
}
@Override
public void onFailure(Throwable ignored) {
// Disconnects never fail. And we do not care either.
}
});
});
}
public void publish(String topic, byte[] bytes) {
publish(topic, bytes, false);
}
public void publish(String topic, byte[] bytes, boolean retain) {
publish(topic, bytes, this.qos, retain);
}
public void publish(String topic, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) {
connection.getDispatchQueue().execute(() -> {
connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("Published some bytes to {}", topic);
}
@Override
public void onFailure(Throwable value) {
logger.warn("Could not publish on topic '{}'", topic, value);
}
});
});
}
}
}