MqttUpdater.java_class 6.94 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package org.jastadd.ros2rag.compiler.mqtt;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

/**
 * Helper class to receive updates via MQTT and use callbacks to handle those messages.
 *
 * @author rschoene - Initial contribution
 */
public class MqttUpdater {

  private final Logger logger;
  private final String name;

  /** The host running the MQTT broker. */
  private URI host;
  /** The connection to the MQTT broker. */
  private CallbackConnection connection;
  /** Whether we are subscribed to the topics yet */
  private final Condition readyCondition;
  private final Lock readyLock;
  private boolean ready;
  private QoS qos;
  /** Dispatch knowledge */
  private final Map<String, Consumer<byte[]>> callbacks;

  public MqttUpdater() {
    this("Ros2Rag");
  }

  public MqttUpdater(String name) {
    this.name = Objects.requireNonNull(name, "Name must be set");
    this.logger = LogManager.getLogger(MqttUpdater.class);
    this.callbacks = new HashMap<>();
    this.readyLock = new ReentrantLock();
    this.readyCondition = readyLock.newCondition();
    this.ready = false;
    this.qos = QoS.AT_LEAST_ONCE;
  }

  /**
   * Sets the host to receive messages from, and connects to it.
   * @throws IOException if could not connect, or could not subscribe to a topic
   * @return self
   */
  public MqttUpdater setHost(String host, int port) throws IOException {
    this.host = URI.create("tcp://" + host + ":" + port);
    logger.debug("Host for {} is {}", this.name, this.host);

    Objects.requireNonNull(this.host, "Host need to be set!");
    MQTT mqtt = new MQTT();
    mqtt.setHost(this.host);
    connection = mqtt.callbackConnection();
    AtomicReference<Throwable> error = new AtomicReference<>();

    // add the listener to dispatch messages later
    connection.listener(new ExtendedListener() {
      public void onConnected() {
        logger.debug("Connected");
      }

      @Override
      public void onDisconnected() {
        logger.debug("Disconnected");
      }

      @Override
      public void onPublish(UTF8Buffer topic, Buffer body, Callback<Callback<Void>> ack) {
        String topicString = topic.toString();
        Consumer<byte[]> callback = callbacks.get(topicString);
        if (callback == null) {
          logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?");
        } else {
          byte[] message = body.toByteArray();
//          System.out.println("message = " + Arrays.toString(message));
          callback.accept(message);
        }
        ack.onSuccess(null);  // always acknowledge message
      }

      @Override
      public void onPublish(UTF8Buffer topicBuffer, Buffer body, Runnable ack) {
        logger.warn("onPublish should not be called");
      }

      @Override
      public void onFailure(Throwable cause) {
//        logger.catching(cause);
        error.set(cause);
      }
    });
    throwIf(error);

    // actually establish the connection
    connection.connect(new Callback<Void>() {
      @Override
      public void onSuccess(Void value) {
        connection.publish("components", (name + " is connected").getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
          @Override
          public void onSuccess(Void value) {
            logger.debug("success sending welcome message");
            try {
              readyLock.lock();
              ready = true;
              readyCondition.signalAll();
            } finally {
              readyLock.unlock();
            }
          }

          @Override
          public void onFailure(Throwable value) {
            logger.debug("failure sending welcome message", value);
          }
        });
      }

      @Override
      public void onFailure(Throwable cause) {
//        logger.error("Could not connect", cause);
        error.set(cause);
      }
    });
    throwIf(error);
    return this;
  }

  public URI getHost() {
    return host;
  }

  private void throwIf(AtomicReference<Throwable> error) throws IOException {
    if (error.get() != null) {
      throw new IOException(error.get());
    }
  }

  public void setQoSForSubscription(QoS qos) {
    this.qos = qos;
  }

  public void newConnection(String topic, Consumer<byte[]> callback) {
    if (!ready) {
      // TODO should maybe be something more kind than throwing an exception here
      throw new IllegalStateException("Updater not ready");
    }
    // register callback
    callbacks.put(topic, callback);

    // subscribe at broker
    Topic[] topicArray = { new Topic(topic, this.qos) };
    connection.subscribe(topicArray, new Callback<byte[]>() {
      @Override
      public void onSuccess(byte[] qoses) {
        logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
      }

      @Override
      public void onFailure(Throwable cause) {
        logger.error("Could not subscribe to {}", topic, cause);
      }
    });
  }

  /**
   * Waits until this updater is ready to receive MQTT messages.
   * If it already is ready, return immediately with the value <code>true</code>.
   * Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan,
   * if it got ready, or <code>false</code> upon a timeout.
   * @param time the maximum time to wait
   * @param unit the time unit of the time argument
   * @return whether this updater is ready
   */
  public boolean waitUntilReady(long time, TimeUnit unit) {
    try {
      readyLock.lock();
      if (ready) {
        return true;
      }
      return readyCondition.await(time, unit);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      readyLock.unlock();
    }
    return false;
  }

  public void close() {
    if (connection == null) {
      logger.warn("Stopping without connection. Was setHost() called?");
      return;
    }
    connection.disconnect(new Callback<Void>() {
      @Override
      public void onSuccess(Void value) {
        logger.info("Disconnected {} from {}", name, host);
      }

      @Override
      public void onFailure(Throwable ignored) {
        // Disconnects never fail. And we do not care either.
      }
    });
  }

  public void publish(String topic, byte[] bytes) {
    connection.publish(topic, bytes, qos, false, new Callback<Void>() {
      @Override
      public void onSuccess(Void value) {
        logger.debug("Published some bytes to {}", topic);
      }

      @Override
      public void onFailure(Throwable value) {
        logger.warn("Could not publish on topic '{}'", topic);
      }
    });
  }
}