MqttHandler.jadd 12.3 KB
Newer Older
1
import java.io.IOException;
2
import java.util.ArrayList;
René Schöne's avatar
René Schöne committed
3
4
5
import java.util.concurrent.TimeUnit;

aspect MqttHandler {
6
7
public class MqttServerHandler {
  private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>();
8
  private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  private long time;
  private java.util.concurrent.TimeUnit unit;
  private String name;

  public MqttServerHandler() {
    this("RagConnect");
  }

  public MqttServerHandler(String name) {
    this.name = name;
    setupWaitUntilReady(1, TimeUnit.SECONDS);
  }

  public void setupWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
    this.time = time;
    this.unit = unit;
  }

  public MqttHandler resolveHandler(java.net.URI uri) throws IOException {
    MqttHandler handler = handlers.get(uri.getHost());
    if (handler == null) {
      // first connect to that server
      handler = new MqttHandler();
      if (uri.getPort() == -1) {
        handler.setHost(uri.getHost());
      } else {
        handler.setHost(uri.getHost(), uri.getPort());
      }
      handlers.put(uri.getHost(), handler);
    }
    handler.waitUntilReady(this.time, this.unit);
    return handler;
  }

43
44
45
46
47
48
49
50
51
52
  public ConnectToken newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException {
    ConnectToken connectToken = new ConnectToken(uri);
    resolveHandler(uri).newConnection(extractTopic(uri), callback);
    tokensForRemoval.put(connectToken, callback);
    return connectToken;
  }

  public boolean disconnect(ConnectToken connectToken) throws IOException {
    MqttHandler handler = resolveHandler(connectToken.uri);
    return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
  }

  public void publish(java.net.URI uri, byte[] bytes) throws IOException {
    resolveHandler(uri).publish(extractTopic(uri), bytes);
  }

  public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws IOException {
    resolveHandler(uri).publish(extractTopic(uri), bytes, retain);
  }

  public void publish(java.net.URI uri, byte[] bytes,
                      org.fusesource.mqtt.client.QoS qos, boolean retain) throws IOException {
    resolveHandler(uri).publish(extractTopic(uri), bytes, qos, retain);
  }

René Schöne's avatar
René Schöne committed
68
  public static String extractTopic(java.net.URI uri) {
69
70
71
72
73
74
75
76
77
78
79
80
81
82
    String path = uri.getPath();
    if (path.charAt(0) == '/') {
      path = path.substring(1);
    }
    return path;
  }

  public void close() {
    for (MqttHandler handler : handlers.values()) {
      handler.close();
    }
  }

}
83
84
85
86
87
/**
 * Helper class to receive updates via MQTT and use callbacks to handle those messages.
 *
 * @author rschoene - Initial contribution
 */
88
public class MqttHandler {
René Schöne's avatar
René Schöne committed
89
  private static final int DEFAULT_PORT = 1883;
90

René Schöne's avatar
René Schöne committed
91
  private final org.apache.logging.log4j.Logger logger;
92
93
94
  private final String name;

  /** The host running the MQTT broker. */
René Schöne's avatar
René Schöne committed
95
  private java.net.URI host;
96
  /** The connection to the MQTT broker. */
René Schöne's avatar
René Schöne committed
97
  private org.fusesource.mqtt.client.CallbackConnection connection;
98
99
  /** Whether we are connected yet */
  private final java.util.concurrent.CountDownLatch readyLatch;
René Schöne's avatar
René Schöne committed
100
  private boolean sendWelcomeMessage = true;
René Schöne's avatar
René Schöne committed
101
  private org.fusesource.mqtt.client.QoS qos;
102
  /** Dispatch knowledge */
103
  private final java.util.Map<String, java.util.List<java.util.function.Consumer<byte[]>>> callbacks;
104

105
  public MqttHandler() {
106
    this("RagConnect");
107
108
  }

109
  public MqttHandler(String name) {
René Schöne's avatar
René Schöne committed
110
    this.name = java.util.Objects.requireNonNull(name, "Name must be set");
111
    this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class);
René Schöne's avatar
René Schöne committed
112
    this.callbacks = new java.util.HashMap<>();
113
    this.readyLatch = new java.util.concurrent.CountDownLatch(1);
René Schöne's avatar
René Schöne committed
114
    this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
115
116
  }

117
  public MqttHandler dontSendWelcomeMessage() {
René Schöne's avatar
René Schöne committed
118
119
120
121
122
123
124
125
126
    this.sendWelcomeMessage = false;
    return this;
  }

  /**
   * Sets the host (with default port) to receive messages from, and connects to it.
   * @throws IOException if could not connect, or could not subscribe to a topic
   * @return self
   */
127
  public MqttHandler setHost(String host) throws java.io.IOException {
René Schöne's avatar
René Schöne committed
128
129
130
    return setHost(host, DEFAULT_PORT);
  }

131
132
133
134
135
  /**
   * Sets the host to receive messages from, and connects to it.
   * @throws IOException if could not connect, or could not subscribe to a topic
   * @return self
   */
136
  public MqttHandler setHost(String host, int port) throws java.io.IOException {
René Schöne's avatar
René Schöne committed
137
138
    java.util.Objects.requireNonNull(host, "Host need to be set!");

René Schöne's avatar
René Schöne committed
139
    this.host = java.net.URI.create("tcp://" + host + ":" + port);
140
141
    logger.debug("Host for {} is {}", this.name, this.host);

René Schöne's avatar
René Schöne committed
142
    org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT();
143
144
    mqtt.setHost(this.host);
    connection = mqtt.callbackConnection();
René Schöne's avatar
René Schöne committed
145
    java.util.concurrent.atomic.AtomicReference<Throwable> error = new java.util.concurrent.atomic.AtomicReference<>();
146
147

    // add the listener to dispatch messages later
René Schöne's avatar
René Schöne committed
148
    connection.listener(new org.fusesource.mqtt.client.ExtendedListener() {
149
150
151
152
153
154
155
156
157
158
      public void onConnected() {
        logger.debug("Connected");
      }

      @Override
      public void onDisconnected() {
        logger.debug("Disconnected");
      }

      @Override
159
160
161
162
      public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic,
                            org.fusesource.hawtbuf.Buffer body,
                            org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
        // this method is called, whenever a MQTT message is received
163
        String topicString = topic.toString();
164
        java.util.List<java.util.function.Consumer<byte[]>> callbackList = new java.util.ArrayList<>(callbacks.get(topicString));
165
        if (callbackList == null || callbackList.isEmpty()) {
166
          logger.debug("Got a message at {}, but no callback to call. Forgot to subscribe?", topic);
167
168
        } else {
          byte[] message = body.toByteArray();
169
170
171
          for (java.util.function.Consumer<byte[]> callback : callbackList) {
            callback.accept(message);
          }
172
173
174
175
176
        }
        ack.onSuccess(null);  // always acknowledge message
      }

      @Override
177
178
179
180
      public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer,
                            org.fusesource.hawtbuf.Buffer body,
                            Runnable ack) {
        // not used by this type of connection
181
182
183
184
185
186
187
188
189
190
191
        logger.warn("onPublish should not be called");
      }

      @Override
      public void onFailure(Throwable cause) {
        error.set(cause);
      }
    });
    throwIf(error);

    // actually establish the connection
René Schöne's avatar
René Schöne committed
192
    connection.connect(new org.fusesource.mqtt.client.Callback<Void>() {
193
194
      @Override
      public void onSuccess(Void value) {
195
        if (MqttHandler.this.sendWelcomeMessage) {
196
197
198
199
200
          connection.publish("components",
                             (name + " is connected").getBytes(),
                             org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE,
                             false,
                             new org.fusesource.mqtt.client.Callback<Void>() {
René Schöne's avatar
René Schöne committed
201
202
203
204
            @Override
            public void onSuccess(Void value) {
              logger.debug("success sending welcome message");
              setReady();
205
206
            }

René Schöne's avatar
René Schöne committed
207
208
209
210
211
212
213
214
            @Override
            public void onFailure(Throwable value) {
              logger.debug("failure sending welcome message", value);
            }
          });
        } else {
          setReady();
        }
215
216
217
218
219
220
221
222
223
224
225
      }

      @Override
      public void onFailure(Throwable cause) {
        error.set(cause);
      }
    });
    throwIf(error);
    return this;
  }

René Schöne's avatar
René Schöne committed
226
  public java.net.URI getHost() {
227
228
229
    return host;
  }

René Schöne's avatar
René Schöne committed
230
  private void setReady() {
231
    readyLatch.countDown();
René Schöne's avatar
René Schöne committed
232
233
  }

René Schöne's avatar
René Schöne committed
234
  private void throwIf(java.util.concurrent.atomic.AtomicReference<Throwable> error) throws java.io.IOException {
235
    if (error.get() != null) {
René Schöne's avatar
René Schöne committed
236
      throw new java.io.IOException(error.get());
237
238
239
    }
  }

René Schöne's avatar
René Schöne committed
240
  public void setQoSForSubscription(org.fusesource.mqtt.client.QoS qos) {
241
242
243
    this.qos = qos;
  }

244
245
246
247
  public boolean newConnection(String topic, java.util.function.Consumer<byte[]> callback) {
    if (readyLatch.getCount() > 0) {
      System.err.println("Handler not ready");
      return false;
248
249
    }
    // register callback
250
    logger.debug("new connection for {}", topic);
251
    if (callbacks.get(topic) == null || callbacks.get(topic).isEmpty()) {
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
      callbacks.put(topic, new java.util.ArrayList<>());

      // subscribe at broker
      org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) };
      connection.getDispatchQueue().execute(() -> {
        connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
          @Override
          public void onSuccess(byte[] qoses) {
            logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
          }

          @Override
          public void onFailure(Throwable cause) {
            logger.error("Could not subscribe to {}", topic, cause);
          }
        });
268
      });
269
270
    }
    callbacks.get(topic).add(callback);
271
    return true;
272
  }
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301

  public boolean disconnect(String topic, Object callback) {
    java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topic);
    if (callbackList == null) {
      logger.warn("Disconnect for not connected topic '{}'", topic);
      return false;
    }
    java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>();
    success.set(callbackList.remove(callback));
    if (callbackList.isEmpty()) {
      // no callbacks anymore for this topic, unsubscribe from mqtt
      connection.getDispatchQueue().execute(() -> {
        org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topic);
        org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer};
        connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<Void>() {
          @Override
          public void onSuccess(Void value) {
            // empty, all good
          }

          @Override
          public void onFailure(Throwable cause) {
            success.set(false);
          }
        });
      });
    }
    return success.get();
  }
302
303
304
305
306
307
308
309
310
311

  /**
   * Waits until this updater is ready to receive MQTT messages.
   * If it already is ready, return immediately with the value <code>true</code>.
   * Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan,
   * if it got ready, or <code>false</code> upon a timeout.
   * @param time the maximum time to wait
   * @param unit the time unit of the time argument
   * @return whether this updater is ready
   */
René Schöne's avatar
René Schöne committed
312
  public boolean waitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
313
    try {
314
      return readyLatch.await(time, unit);
315
316
317
318
319
320
321
322
323
324
325
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return false;
  }

  public void close() {
    if (connection == null) {
      logger.warn("Stopping without connection. Was setHost() called?");
      return;
    }
326
327
328
329
330
331
    connection.getDispatchQueue().execute(() -> {
      connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() {
        @Override
        public void onSuccess(Void value) {
          logger.info("Disconnected {} from {}", name, host);
        }
332

333
334
335
336
337
        @Override
        public void onFailure(Throwable ignored) {
          // Disconnects never fail. And we do not care either.
        }
      });
338
339
340
341
    });
  }

  public void publish(String topic, byte[] bytes) {
342
343
344
345
346
347
348
349
    publish(topic, bytes, false);
  }

  public void publish(String topic, byte[] bytes, boolean retain) {
    publish(topic, bytes, this.qos, retain);
  }

  public void publish(String topic, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) {
350
    connection.getDispatchQueue().execute(() -> {
351
      connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback<Void>() {
352
353
354
355
        @Override
        public void onSuccess(Void value) {
          logger.debug("Published some bytes to {}", topic);
        }
356

357
358
        @Override
        public void onFailure(Throwable value) {
359
          logger.warn("Could not publish on topic '{}'", topic, value);
360
361
        }
      });
362
363
364
    });
  }
}
René Schöne's avatar
René Schöne committed
365
}