import java.io.IOException; import java.util.concurrent.TimeUnit;aspect MqttHandler { public class MqttServerHandler { private final java.util.Map handlers = new java.util.HashMap<>(); private long time; private java.util.concurrent.TimeUnit unit; private String name; public MqttServerHandler() { this("RagConnect"); } public MqttServerHandler(String name) { this.name = name; setupWaitUntilReady(1, TimeUnit.SECONDS); } public void setupWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) { this.time = time; this.unit = unit; } public MqttHandler resolveHandler(java.net.URI uri) throws IOException { MqttHandler handler = handlers.get(uri.getHost()); if (handler == null) { // first connect to that server handler = new MqttHandler(); if (uri.getPort() == -1) { handler.setHost(uri.getHost()); } else { handler.setHost(uri.getHost(), uri.getPort()); } handlers.put(uri.getHost(), handler); } handler.waitUntilReady(this.time, this.unit); return handler; } public boolean newConnection(java.net.URI uri, java.util.function.Consumer callback) throws IOException { return resolveHandler(uri).newConnection(extractTopic(uri), callback); } public void publish(java.net.URI uri, byte[] bytes) throws IOException { resolveHandler(uri).publish(extractTopic(uri), bytes); } public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws IOException { resolveHandler(uri).publish(extractTopic(uri), bytes, retain); } public void publish(java.net.URI uri, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) throws IOException { resolveHandler(uri).publish(extractTopic(uri), bytes, qos, retain); } public static String extractTopic(java.net.URI uri) { String path = uri.getPath(); if (path.charAt(0) == '/') { path = path.substring(1); } return path; } public void close() { for (MqttHandler handler : handlers.values()) { handler.close(); } } } /** * Helper class to receive updates via MQTT and use callbacks to handle those messages. * * @author rschoene - Initial contribution */ public class MqttHandler { private static final int DEFAULT_PORT = 1883; private final org.apache.logging.log4j.Logger logger; private final String name; /** The host running the MQTT broker. */ private java.net.URI host; /** The connection to the MQTT broker. */ private org.fusesource.mqtt.client.CallbackConnection connection; /** Whether we are connected yet */ private final java.util.concurrent.CountDownLatch readyLatch; private boolean sendWelcomeMessage = true; private org.fusesource.mqtt.client.QoS qos; /** Dispatch knowledge */ private final java.util.Map>> callbacks; public MqttHandler() { this("RagConnect"); } public MqttHandler(String name) { this.name = java.util.Objects.requireNonNull(name, "Name must be set"); this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class); this.callbacks = new java.util.HashMap<>(); this.readyLatch = new java.util.concurrent.CountDownLatch(1); this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE; } public MqttHandler dontSendWelcomeMessage() { this.sendWelcomeMessage = false; return this; } /** * Sets the host (with default port) to receive messages from, and connects to it. * @throws IOException if could not connect, or could not subscribe to a topic * @return self */ public MqttHandler setHost(String host) throws java.io.IOException { return setHost(host, DEFAULT_PORT); } /** * Sets the host to receive messages from, and connects to it. * @throws IOException if could not connect, or could not subscribe to a topic * @return self */ public MqttHandler setHost(String host, int port) throws java.io.IOException { java.util.Objects.requireNonNull(host, "Host need to be set!"); this.host = java.net.URI.create("tcp://" + host + ":" + port); logger.debug("Host for {} is {}", this.name, this.host); org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT(); mqtt.setHost(this.host); connection = mqtt.callbackConnection(); java.util.concurrent.atomic.AtomicReference error = new java.util.concurrent.atomic.AtomicReference<>(); // add the listener to dispatch messages later connection.listener(new org.fusesource.mqtt.client.ExtendedListener() { public void onConnected() { logger.debug("Connected"); } @Override public void onDisconnected() { logger.debug("Disconnected"); } @Override public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, org.fusesource.hawtbuf.Buffer body, org.fusesource.mqtt.client.Callback> ack) { String topicString = topic.toString(); java.util.List> callbackList = callbacks.get(topicString); if (callbackList == null || callbackList.isEmpty()) { logger.debug("Got a message, but no callback to call. Forgot to subscribe?"); } else { byte[] message = body.toByteArray(); // System.out.println("message = " + Arrays.toString(message)); for (java.util.function.Consumer callback : callbackList) { callback.accept(message); } } ack.onSuccess(null); // always acknowledge message } @Override public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer, org.fusesource.hawtbuf.Buffer body, Runnable ack) { logger.warn("onPublish should not be called"); } @Override public void onFailure(Throwable cause) { // logger.catching(cause); error.set(cause); } }); throwIf(error); // actually establish the connection connection.connect(new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(Void value) { if (MqttHandler.this.sendWelcomeMessage) { connection.publish("components", (name + " is connected").getBytes(), org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, false, new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(Void value) { logger.debug("success sending welcome message"); setReady(); } @Override public void onFailure(Throwable value) { logger.debug("failure sending welcome message", value); } }); } else { setReady(); } } @Override public void onFailure(Throwable cause) { // logger.error("Could not connect", cause); error.set(cause); } }); throwIf(error); return this; } public java.net.URI getHost() { return host; } private void setReady() { readyLatch.countDown(); } private void throwIf(java.util.concurrent.atomic.AtomicReference error) throws java.io.IOException { if (error.get() != null) { throw new java.io.IOException(error.get()); } } public void setQoSForSubscription(org.fusesource.mqtt.client.QoS qos) { this.qos = qos; } public boolean newConnection(String topic, java.util.function.Consumer callback) { if (readyLatch.getCount() > 0) { System.err.println("Handler not ready"); return false; // // should maybe be something more kind than throwing an exception here // throw new IllegalStateException("Updater not ready"); } // register callback logger.debug("new connection for {}", topic); if (callbacks.get(topic) == null) { callbacks.put(topic, new java.util.ArrayList<>()); // subscribe at broker org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; connection.getDispatchQueue().execute(() -> { connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(byte[] qoses) { logger.debug("Subscribed to {}, qoses: {}", topic, qoses); } @Override public void onFailure(Throwable cause) { logger.error("Could not subscribe to {}", topic, cause); } }); }); } callbacks.get(topic).add(callback); return true; } /** * Waits until this updater is ready to receive MQTT messages. * If it already is ready, return immediately with the value true. * Otherwise waits for the given amount of time, and either return true within the timespan, * if it got ready, or false upon a timeout. * @param time the maximum time to wait * @param unit the time unit of the time argument * @return whether this updater is ready */ public boolean waitUntilReady(long time, java.util.concurrent.TimeUnit unit) { try { return readyLatch.await(time, unit); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public void close() { if (connection == null) { logger.warn("Stopping without connection. Was setHost() called?"); return; } connection.getDispatchQueue().execute(() -> { connection.disconnect(new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(Void value) { logger.info("Disconnected {} from {}", name, host); } @Override public void onFailure(Throwable ignored) { // Disconnects never fail. And we do not care either. } }); }); } public void publish(String topic, byte[] bytes) { publish(topic, bytes, false); } public void publish(String topic, byte[] bytes, boolean retain) { publish(topic, bytes, this.qos, retain); } public void publish(String topic, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) { connection.getDispatchQueue().execute(() -> { connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(Void value) { logger.debug("Published some bytes to {}", topic); } @Override public void onFailure(Throwable value) { logger.warn("Could not publish on topic '{}'", topic, value); } }); }); } } }