Skip to content
Snippets Groups Projects
Select Git revision
  • 39f1effb6cc5bafb0c43e01300af868b51d96555
  • dev default protected
  • main protected
  • feature/ros-java-integration
4 results

MqttHandler.jadd

Blame
  • MqttHandler.jadd 12.16 KiB
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    aspect MqttHandler {
    public class MqttServerHandler {
      private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>();
      private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
      private long time;
      private java.util.concurrent.TimeUnit unit;
      private String name;
    
      public MqttServerHandler() {
        this("RagConnect");
      }
    
      public MqttServerHandler(String name) {
        this.name = name;
        setupWaitUntilReady(1, TimeUnit.SECONDS);
      }
    
      public void setupWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
        this.time = time;
        this.unit = unit;
      }
    
      public MqttHandler resolveHandler(java.net.URI uri) throws IOException {
        MqttHandler handler = handlers.get(uri.getHost());
        if (handler == null) {
          // first connect to that server
          handler = new MqttHandler();
          if (uri.getPort() == -1) {
            handler.setHost(uri.getHost());
          } else {
            handler.setHost(uri.getHost(), uri.getPort());
          }
          handlers.put(uri.getHost(), handler);
        }
        handler.waitUntilReady(this.time, this.unit);
        return handler;
      }
    
      public ConnectToken newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException {
        ConnectToken connectToken = new ConnectToken(uri);
        resolveHandler(uri).newConnection(extractTopic(uri), callback);
        tokensForRemoval.put(connectToken, callback);
        return connectToken;
      }
    
      public boolean disconnect(ConnectToken connectToken) throws IOException {
        MqttHandler handler = resolveHandler(connectToken.uri);
        return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false;
      }
    
      public void publish(java.net.URI uri, byte[] bytes) throws IOException {
        resolveHandler(uri).publish(extractTopic(uri), bytes);
      }
    
      public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws IOException {
        resolveHandler(uri).publish(extractTopic(uri), bytes, retain);
      }
    
      public void publish(java.net.URI uri, byte[] bytes,
                          org.fusesource.mqtt.client.QoS qos, boolean retain) throws IOException {
        resolveHandler(uri).publish(extractTopic(uri), bytes, qos, retain);
      }
    
      public static String extractTopic(java.net.URI uri) {
        String path = uri.getPath();
        if (path.charAt(0) == '/') {
          path = path.substring(1);
        }
        return path;
      }
    
      public void close() {
        for (MqttHandler handler : handlers.values()) {
          handler.close();
        }
      }
    
    }
    /**
     * Helper class to receive updates via MQTT and use callbacks to handle those messages.
     *
     * @author rschoene - Initial contribution
     */
    public class MqttHandler {
      private static final int DEFAULT_PORT = 1883;
    
      private final org.apache.logging.log4j.Logger logger;
      private final String name;
    
      /** The host running the MQTT broker. */
      private java.net.URI host;
      /** The connection to the MQTT broker. */
      private org.fusesource.mqtt.client.CallbackConnection connection;
      /** Whether we are connected yet */
      private final java.util.concurrent.CountDownLatch readyLatch;
      private boolean sendWelcomeMessage = true;
      private org.fusesource.mqtt.client.QoS qos;
      /** Dispatch knowledge */
      private final java.util.Map<String, java.util.List<java.util.function.Consumer<byte[]>>> callbacks;
    
      public MqttHandler() {
        this("RagConnect");
      }
    
      public MqttHandler(String name) {
        this.name = java.util.Objects.requireNonNull(name, "Name must be set");
        this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class);
        this.callbacks = new java.util.HashMap<>();
        this.readyLatch = new java.util.concurrent.CountDownLatch(1);
        this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
      }
    
      public MqttHandler dontSendWelcomeMessage() {
        this.sendWelcomeMessage = false;
        return this;
      }
    
      /**
       * Sets the host (with default port) to receive messages from, and connects to it.
       * @throws IOException if could not connect, or could not subscribe to a topic
       * @return self
       */
      public MqttHandler setHost(String host) throws java.io.IOException {
        return setHost(host, DEFAULT_PORT);
      }
    
      /**
       * Sets the host to receive messages from, and connects to it.
       * @throws IOException if could not connect, or could not subscribe to a topic
       * @return self
       */
      public MqttHandler setHost(String host, int port) throws java.io.IOException {
        java.util.Objects.requireNonNull(host, "Host need to be set!");
    
        this.host = java.net.URI.create("tcp://" + host + ":" + port);
        logger.debug("Host for {} is {}", this.name, this.host);
    
        org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT();
        mqtt.setHost(this.host);
        connection = mqtt.callbackConnection();
        java.util.concurrent.atomic.AtomicReference<Throwable> error = new java.util.concurrent.atomic.AtomicReference<>();
    
        // add the listener to dispatch messages later
        connection.listener(new org.fusesource.mqtt.client.ExtendedListener() {
          public void onConnected() {
            logger.debug("Connected");
          }
    
          @Override
          public void onDisconnected() {
            logger.debug("Disconnected");
          }
    
          @Override
          public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic,
                                org.fusesource.hawtbuf.Buffer body,
                                org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
            // this method is called, whenever a MQTT message is received
            String topicString = topic.toString();
            java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString);
            if (callbackList == null || callbackList.isEmpty()) {
              logger.debug("Got a message, but no callback to call. Forgot to subscribe?");
            } else {
              byte[] message = body.toByteArray();
              for (java.util.function.Consumer<byte[]> callback : callbackList) {
                callback.accept(message);
              }
            }
            ack.onSuccess(null);  // always acknowledge message
          }
    
          @Override
          public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer,
                                org.fusesource.hawtbuf.Buffer body,
                                Runnable ack) {
            // not used by this type of connection
            logger.warn("onPublish should not be called");
          }
    
          @Override
          public void onFailure(Throwable cause) {
            error.set(cause);
          }
        });
        throwIf(error);
    
        // actually establish the connection
        connection.connect(new org.fusesource.mqtt.client.Callback<Void>() {
          @Override
          public void onSuccess(Void value) {
            if (MqttHandler.this.sendWelcomeMessage) {
              connection.publish("components",
                                 (name + " is connected").getBytes(),
                                 org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE,
                                 false,
                                 new org.fusesource.mqtt.client.Callback<Void>() {
                @Override
                public void onSuccess(Void value) {
                  logger.debug("success sending welcome message");
                  setReady();
                }
    
                @Override
                public void onFailure(Throwable value) {
                  logger.debug("failure sending welcome message", value);
                }
              });
            } else {
              setReady();
            }
          }
    
          @Override
          public void onFailure(Throwable cause) {
            error.set(cause);
          }
        });
        throwIf(error);
        return this;
      }
    
      public java.net.URI getHost() {
        return host;
      }
    
      private void setReady() {
        readyLatch.countDown();
      }
    
      private void throwIf(java.util.concurrent.atomic.AtomicReference<Throwable> error) throws java.io.IOException {
        if (error.get() != null) {
          throw new java.io.IOException(error.get());
        }
      }
    
      public void setQoSForSubscription(org.fusesource.mqtt.client.QoS qos) {
        this.qos = qos;
      }
    
      public boolean newConnection(String topic, java.util.function.Consumer<byte[]> callback) {
        if (readyLatch.getCount() > 0) {
          System.err.println("Handler not ready");
          return false;
        }
        // register callback
        logger.debug("new connection for {}", topic);
        if (callbacks.get(topic) == null) {
          callbacks.put(topic, new java.util.ArrayList<>());
    
          // subscribe at broker
          org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) };
          connection.getDispatchQueue().execute(() -> {
            connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
              @Override
              public void onSuccess(byte[] qoses) {
                logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
              }
    
              @Override
              public void onFailure(Throwable cause) {
                logger.error("Could not subscribe to {}", topic, cause);
              }
            });
          });
        }
        callbacks.get(topic).add(callback);
        return true;
      }
    
      public boolean disconnect(String topic, Object callback) {
        java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topic);
        if (callbackList == null) {
          logger.warn("Disconnect for not connected topic '{}'", topic);
          return false;
        }
        java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>();
        success.set(callbackList.remove(callback));
        if (callbackList.isEmpty()) {
          // no callbacks anymore for this topic, unsubscribe from mqtt
          connection.getDispatchQueue().execute(() -> {
            org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topic);
            org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer};
            connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<Void>() {
              @Override
              public void onSuccess(Void value) {
                // empty, all good
              }
    
              @Override
              public void onFailure(Throwable cause) {
                success.set(false);
              }
            });
          });
        }
        return success.get();
      }
    
      /**
       * Waits until this updater is ready to receive MQTT messages.
       * If it already is ready, return immediately with the value <code>true</code>.
       * Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan,
       * if it got ready, or <code>false</code> upon a timeout.
       * @param time the maximum time to wait
       * @param unit the time unit of the time argument
       * @return whether this updater is ready
       */
      public boolean waitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
        try {
          return readyLatch.await(time, unit);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        return false;
      }
    
      public void close() {
        if (connection == null) {
          logger.warn("Stopping without connection. Was setHost() called?");
          return;
        }
        connection.getDispatchQueue().execute(() -> {
          connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() {
            @Override
            public void onSuccess(Void value) {
              logger.info("Disconnected {} from {}", name, host);
            }
    
            @Override
            public void onFailure(Throwable ignored) {
              // Disconnects never fail. And we do not care either.
            }
          });
        });
      }
    
      public void publish(String topic, byte[] bytes) {
        publish(topic, bytes, false);
      }
    
      public void publish(String topic, byte[] bytes, boolean retain) {
        publish(topic, bytes, this.qos, retain);
      }
    
      public void publish(String topic, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) {
        connection.getDispatchQueue().execute(() -> {
          connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback<Void>() {
            @Override
            public void onSuccess(Void value) {
              logger.debug("Published some bytes to {}", topic);
            }
    
            @Override
            public void onFailure(Throwable value) {
              logger.warn("Could not publish on topic '{}'", topic, value);
            }
          });
        });
      }
    }
    }