GitLab update scheduled for Friday, October 22th between 08:15 and 08:45 CEST. If unpleasant, please contact René or Martin.

MqttHandler.jadd 12.5 KB
Newer Older
1
import java.io.IOException;
2
import java.util.ArrayList;
René Schöne's avatar
René Schöne committed
3
4
5
import java.util.concurrent.TimeUnit;

aspect MqttHandler {
6
7
public class MqttServerHandler {
  private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>();
8
  private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  private long time;
  private java.util.concurrent.TimeUnit unit;
  private String name;

  public MqttServerHandler() {
    this("RagConnect");
  }

  public MqttServerHandler(String name) {
    this.name = name;
    setupWaitUntilReady(1, TimeUnit.SECONDS);
  }

  public void setupWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
    this.time = time;
    this.unit = unit;
  }

  public MqttHandler resolveHandler(java.net.URI uri) throws IOException {
    MqttHandler handler = handlers.get(uri.getHost());
    if (handler == null) {
      // first connect to that server
      handler = new MqttHandler();
      if (uri.getPort() == -1) {
        handler.setHost(uri.getHost());
      } else {
        handler.setHost(uri.getHost(), uri.getPort());
      }
      handlers.put(uri.getHost(), handler);
    }
    handler.waitUntilReady(this.time, this.unit);
    return handler;
  }

43
44
45
46
47
48
49
50
51
52
  public ConnectToken newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException {
    ConnectToken connectToken = new ConnectToken(uri);
    resolveHandler(uri).newConnection(extractTopic(uri), callback);
    tokensForRemoval.put(connectToken, callback);
    return connectToken;
  }

  public boolean disconnect(ConnectToken connectToken) throws IOException {
    MqttHandler handler = resolveHandler(connectToken.uri);
    return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
  }

  public void publish(java.net.URI uri, byte[] bytes) throws IOException {
    resolveHandler(uri).publish(extractTopic(uri), bytes);
  }

  public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws IOException {
    resolveHandler(uri).publish(extractTopic(uri), bytes, retain);
  }

  public void publish(java.net.URI uri, byte[] bytes,
                      org.fusesource.mqtt.client.QoS qos, boolean retain) throws IOException {
    resolveHandler(uri).publish(extractTopic(uri), bytes, qos, retain);
  }

René Schöne's avatar
René Schöne committed
68
  public static String extractTopic(java.net.URI uri) {
69
70
71
72
73
74
75
76
77
78
79
80
81
82
    String path = uri.getPath();
    if (path.charAt(0) == '/') {
      path = path.substring(1);
    }
    return path;
  }

  public void close() {
    for (MqttHandler handler : handlers.values()) {
      handler.close();
    }
  }

}
83
84
85
86
87
/**
 * Helper class to receive updates via MQTT and use callbacks to handle those messages.
 *
 * @author rschoene - Initial contribution
 */
88
public class MqttHandler {
René Schöne's avatar
René Schöne committed
89
  private static final int DEFAULT_PORT = 1883;
90

René Schöne's avatar
René Schöne committed
91
  private final org.apache.logging.log4j.Logger logger;
92
93
94
  private final String name;

  /** The host running the MQTT broker. */
René Schöne's avatar
René Schöne committed
95
  private java.net.URI host;
96
  /** The connection to the MQTT broker. */
René Schöne's avatar
René Schöne committed
97
  private org.fusesource.mqtt.client.CallbackConnection connection;
98
99
  /** Whether we are connected yet */
  private final java.util.concurrent.CountDownLatch readyLatch;
René Schöne's avatar
René Schöne committed
100
  private boolean sendWelcomeMessage = true;
René Schöne's avatar
René Schöne committed
101
  private org.fusesource.mqtt.client.QoS qos;
102
  /** Dispatch knowledge */
103
  private final java.util.Map<String, java.util.List<java.util.function.Consumer<byte[]>>> callbacks;
104

105
  public MqttHandler() {
106
    this("RagConnect");
107
108
  }

109
  public MqttHandler(String name) {
René Schöne's avatar
René Schöne committed
110
    this.name = java.util.Objects.requireNonNull(name, "Name must be set");
111
    this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class);
René Schöne's avatar
René Schöne committed
112
    this.callbacks = new java.util.HashMap<>();
113
    this.readyLatch = new java.util.concurrent.CountDownLatch(1);
René Schöne's avatar
René Schöne committed
114
    this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
115
116
  }

117
  public MqttHandler dontSendWelcomeMessage() {
René Schöne's avatar
René Schöne committed
118
119
120
121
122
123
124
125
126
    this.sendWelcomeMessage = false;
    return this;
  }

  /**
   * Sets the host (with default port) to receive messages from, and connects to it.
   * @throws IOException if could not connect, or could not subscribe to a topic
   * @return self
   */
127
  public MqttHandler setHost(String host) throws java.io.IOException {
René Schöne's avatar
René Schöne committed
128
129
130
    return setHost(host, DEFAULT_PORT);
  }

131
132
133
134
135
  /**
   * Sets the host to receive messages from, and connects to it.
   * @throws IOException if could not connect, or could not subscribe to a topic
   * @return self
   */
136
  public MqttHandler setHost(String host, int port) throws java.io.IOException {
René Schöne's avatar
René Schöne committed
137
138
    java.util.Objects.requireNonNull(host, "Host need to be set!");

René Schöne's avatar
René Schöne committed
139
    this.host = java.net.URI.create("tcp://" + host + ":" + port);
140
141
    logger.debug("Host for {} is {}", this.name, this.host);

René Schöne's avatar
René Schöne committed
142
    org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT();
143
144
    mqtt.setHost(this.host);
    connection = mqtt.callbackConnection();
René Schöne's avatar
René Schöne committed
145
    java.util.concurrent.atomic.AtomicReference<Throwable> error = new java.util.concurrent.atomic.AtomicReference<>();
146
147

    // add the listener to dispatch messages later
René Schöne's avatar
René Schöne committed
148
    connection.listener(new org.fusesource.mqtt.client.ExtendedListener() {
149
150
151
152
153
154
155
156
157
158
      public void onConnected() {
        logger.debug("Connected");
      }

      @Override
      public void onDisconnected() {
        logger.debug("Disconnected");
      }

      @Override
159
160
161
162
      public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic,
                            org.fusesource.hawtbuf.Buffer body,
                            org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
        // this method is called, whenever a MQTT message is received
163
        String topicString = topic.toString();
164
165
        // TODO: maybe copy list here to avoid concurrent modification. or use a concurrent list.
        java.util.List<java.util.function.Consumer<byte[]>> callbackList = new java.util.ArrayList<>(callbacks.get(topicString));
166
        if (callbackList == null || callbackList.isEmpty()) {
167
          logger.debug("Got a message at {}, but no callback to call. Forgot to subscribe?", topic);
168
169
        } else {
          byte[] message = body.toByteArray();
170
          logger.debug("initial: {}", callbackList);
171
          for (java.util.function.Consumer<byte[]> callback : callbackList) {
172
            logger.debug("before: {}", callbackList);
173
            callback.accept(message);
174
            logger.debug("after: {}", callbackList);
175
          }
176
177
178
179
180
        }
        ack.onSuccess(null);  // always acknowledge message
      }

      @Override
181
182
183
184
      public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer,
                            org.fusesource.hawtbuf.Buffer body,
                            Runnable ack) {
        // not used by this type of connection
185
186
187
188
189
190
191
192
193
194
195
        logger.warn("onPublish should not be called");
      }

      @Override
      public void onFailure(Throwable cause) {
        error.set(cause);
      }
    });
    throwIf(error);

    // actually establish the connection
René Schöne's avatar
René Schöne committed
196
    connection.connect(new org.fusesource.mqtt.client.Callback<Void>() {
197
198
      @Override
      public void onSuccess(Void value) {
199
        if (MqttHandler.this.sendWelcomeMessage) {
200
201
202
203
204
          connection.publish("components",
                             (name + " is connected").getBytes(),
                             org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE,
                             false,
                             new org.fusesource.mqtt.client.Callback<Void>() {
René Schöne's avatar
René Schöne committed
205
206
207
208
            @Override
            public void onSuccess(Void value) {
              logger.debug("success sending welcome message");
              setReady();
209
210
            }

René Schöne's avatar
René Schöne committed
211
212
213
214
215
216
217
218
            @Override
            public void onFailure(Throwable value) {
              logger.debug("failure sending welcome message", value);
            }
          });
        } else {
          setReady();
        }
219
220
221
222
223
224
225
226
227
228
229
      }

      @Override
      public void onFailure(Throwable cause) {
        error.set(cause);
      }
    });
    throwIf(error);
    return this;
  }

René Schöne's avatar
René Schöne committed
230
  public java.net.URI getHost() {
231
232
233
    return host;
  }

René Schöne's avatar
René Schöne committed
234
  private void setReady() {
235
    readyLatch.countDown();
René Schöne's avatar
René Schöne committed
236
237
  }

René Schöne's avatar
René Schöne committed
238
  private void throwIf(java.util.concurrent.atomic.AtomicReference<Throwable> error) throws java.io.IOException {
239
    if (error.get() != null) {
René Schöne's avatar
René Schöne committed
240
      throw new java.io.IOException(error.get());
241
242
243
    }
  }

René Schöne's avatar
René Schöne committed
244
  public void setQoSForSubscription(org.fusesource.mqtt.client.QoS qos) {
245
246
247
    this.qos = qos;
  }

248
249
250
251
  public boolean newConnection(String topic, java.util.function.Consumer<byte[]> callback) {
    if (readyLatch.getCount() > 0) {
      System.err.println("Handler not ready");
      return false;
252
253
    }
    // register callback
254
    logger.debug("new connection for {}", topic);
255
    if (callbacks.get(topic) == null || callbacks.get(topic).isEmpty()) {
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
      callbacks.put(topic, new java.util.ArrayList<>());

      // subscribe at broker
      org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) };
      connection.getDispatchQueue().execute(() -> {
        connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
          @Override
          public void onSuccess(byte[] qoses) {
            logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
          }

          @Override
          public void onFailure(Throwable cause) {
            logger.error("Could not subscribe to {}", topic, cause);
          }
        });
272
      });
273
274
    }
    callbacks.get(topic).add(callback);
275
    return true;
276
  }
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305

  public boolean disconnect(String topic, Object callback) {
    java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topic);
    if (callbackList == null) {
      logger.warn("Disconnect for not connected topic '{}'", topic);
      return false;
    }
    java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>();
    success.set(callbackList.remove(callback));
    if (callbackList.isEmpty()) {
      // no callbacks anymore for this topic, unsubscribe from mqtt
      connection.getDispatchQueue().execute(() -> {
        org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topic);
        org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer};
        connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<Void>() {
          @Override
          public void onSuccess(Void value) {
            // empty, all good
          }

          @Override
          public void onFailure(Throwable cause) {
            success.set(false);
          }
        });
      });
    }
    return success.get();
  }
306
307
308
309
310
311
312
313
314
315

  /**
   * Waits until this updater is ready to receive MQTT messages.
   * If it already is ready, return immediately with the value <code>true</code>.
   * Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan,
   * if it got ready, or <code>false</code> upon a timeout.
   * @param time the maximum time to wait
   * @param unit the time unit of the time argument
   * @return whether this updater is ready
   */
René Schöne's avatar
René Schöne committed
316
  public boolean waitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
317
    try {
318
      return readyLatch.await(time, unit);
319
320
321
322
323
324
325
326
327
328
329
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return false;
  }

  public void close() {
    if (connection == null) {
      logger.warn("Stopping without connection. Was setHost() called?");
      return;
    }
330
331
332
333
334
335
    connection.getDispatchQueue().execute(() -> {
      connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() {
        @Override
        public void onSuccess(Void value) {
          logger.info("Disconnected {} from {}", name, host);
        }
336

337
338
339
340
341
        @Override
        public void onFailure(Throwable ignored) {
          // Disconnects never fail. And we do not care either.
        }
      });
342
343
344
345
    });
  }

  public void publish(String topic, byte[] bytes) {
346
347
348
349
350
351
352
353
    publish(topic, bytes, false);
  }

  public void publish(String topic, byte[] bytes, boolean retain) {
    publish(topic, bytes, this.qos, retain);
  }

  public void publish(String topic, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) {
354
    connection.getDispatchQueue().execute(() -> {
355
      connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback<Void>() {
356
357
358
359
        @Override
        public void onSuccess(Void value) {
          logger.debug("Published some bytes to {}", topic);
        }
360

361
362
        @Override
        public void onFailure(Throwable value) {
363
          logger.warn("Could not publish on topic '{}'", topic, value);
364
365
        }
      });
366
367
368
    });
  }
}
René Schöne's avatar
René Schöne committed
369
}