aspect MqttHandler { /** * Helper class to receive updates via MQTT and use callbacks to handle those messages. * * @author rschoene - Initial contribution */ public class MqttHandler { private static final int DEFAULT_PORT = 1883; private final org.apache.logging.log4j.Logger logger; private final String name; /** The host running the MQTT broker. */ private java.net.URI host; /** The connection to the MQTT broker. */ private org.fusesource.mqtt.client.CallbackConnection connection; /** Whether we are subscribed to the topics yet */ private final java.util.concurrent.locks.Condition readyCondition; private final java.util.concurrent.locks.Lock readyLock; private boolean ready; private boolean sendWelcomeMessage = true; private org.fusesource.mqtt.client.QoS qos; /** Dispatch knowledge */ private final java.util.Map> callbacks; public MqttHandler() { this("Ros2Rag"); } public MqttHandler(String name) { this.name = java.util.Objects.requireNonNull(name, "Name must be set"); this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class); this.callbacks = new java.util.HashMap<>(); this.readyLock = new java.util.concurrent.locks.ReentrantLock(); this.readyCondition = readyLock.newCondition(); this.ready = false; this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE; } public MqttHandler dontSendWelcomeMessage() { this.sendWelcomeMessage = false; return this; } /** * Sets the host (with default port) to receive messages from, and connects to it. * @throws IOException if could not connect, or could not subscribe to a topic * @return self */ public MqttHandler setHost(String host) throws java.io.IOException { return setHost(host, DEFAULT_PORT); } /** * Sets the host to receive messages from, and connects to it. * @throws IOException if could not connect, or could not subscribe to a topic * @return self */ public MqttHandler setHost(String host, int port) throws java.io.IOException { java.util.Objects.requireNonNull(host, "Host need to be set!"); this.host = java.net.URI.create("tcp://" + host + ":" + port); logger.debug("Host for {} is {}", this.name, this.host); org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT(); mqtt.setHost(this.host); connection = mqtt.callbackConnection(); java.util.concurrent.atomic.AtomicReference error = new java.util.concurrent.atomic.AtomicReference<>(); // add the listener to dispatch messages later connection.listener(new org.fusesource.mqtt.client.ExtendedListener() { public void onConnected() { logger.debug("Connected"); } @Override public void onDisconnected() { logger.debug("Disconnected"); } @Override public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, org.fusesource.hawtbuf.Buffer body, org.fusesource.mqtt.client.Callback> ack) { String topicString = topic.toString(); java.util.function.Consumer callback = callbacks.get(topicString); if (callback == null) { logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?"); } else { byte[] message = body.toByteArray(); // System.out.println("message = " + Arrays.toString(message)); callback.accept(message); } ack.onSuccess(null); // always acknowledge message } @Override public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer, org.fusesource.hawtbuf.Buffer body, Runnable ack) { logger.warn("onPublish should not be called"); } @Override public void onFailure(Throwable cause) { // logger.catching(cause); error.set(cause); } }); throwIf(error); // actually establish the connection connection.connect(new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(Void value) { if (MqttHandler.this.sendWelcomeMessage) { connection.publish("components", (name + " is connected").getBytes(), org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, false, new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(Void value) { logger.debug("success sending welcome message"); setReady(); } @Override public void onFailure(Throwable value) { logger.debug("failure sending welcome message", value); } }); } else { setReady(); } } @Override public void onFailure(Throwable cause) { // logger.error("Could not connect", cause); error.set(cause); } }); throwIf(error); return this; } public java.net.URI getHost() { return host; } private void setReady() { try { readyLock.lock(); ready = true; readyCondition.signalAll(); } finally { readyLock.unlock(); } } private void throwIf(java.util.concurrent.atomic.AtomicReference error) throws java.io.IOException { if (error.get() != null) { throw new java.io.IOException(error.get()); } } public void setQoSForSubscription(org.fusesource.mqtt.client.QoS qos) { this.qos = qos; } public void newConnection(String topic, java.util.function.Consumer callback) { if (!ready) { // TODO should maybe be something more kind than throwing an exception here throw new IllegalStateException("Updater not ready"); } // register callback callbacks.put(topic, callback); // subscribe at broker org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; connection.getDispatchQueue().execute(() -> { connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(byte[] qoses) { logger.debug("Subscribed to {}, qoses: {}", topic, qoses); } @Override public void onFailure(Throwable cause) { logger.error("Could not subscribe to {}", topic, cause); } }); }); } /** * Waits until this updater is ready to receive MQTT messages. * If it already is ready, return immediately with the value true. * Otherwise waits for the given amount of time, and either return true within the timespan, * if it got ready, or false upon a timeout. * @param time the maximum time to wait * @param unit the time unit of the time argument * @return whether this updater is ready */ public boolean waitUntilReady(long time, java.util.concurrent.TimeUnit unit) { try { readyLock.lock(); if (ready) { return true; } return readyCondition.await(time, unit); } catch (InterruptedException e) { e.printStackTrace(); } finally { readyLock.unlock(); } return false; } public void close() { if (connection == null) { logger.warn("Stopping without connection. Was setHost() called?"); return; } connection.getDispatchQueue().execute(() -> { connection.disconnect(new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(Void value) { logger.info("Disconnected {} from {}", name, host); } @Override public void onFailure(Throwable ignored) { // Disconnects never fail. And we do not care either. } }); }); } public void publish(String topic, byte[] bytes) { publish(topic, bytes, false); } public void publish(String topic, byte[] bytes, boolean retain) { publish(topic, bytes, this.qos, retain); } public void publish(String topic, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) { connection.getDispatchQueue().execute(() -> { connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback() { @Override public void onSuccess(Void value) { logger.debug("Published some bytes to {}", topic); } @Override public void onFailure(Throwable value) { logger.warn("Could not publish on topic '{}'", topic); } }); }); } } }