MqttUpdater.jadd 7.19 KB
Newer Older
1
2
3
4
5
6
7
/**
 * Helper class to receive updates via MQTT and use callbacks to handle those messages.
 *
 * @author rschoene - Initial contribution
 */
public class MqttUpdater {

René Schöne's avatar
René Schöne committed
8
  private final org.apache.logging.log4j.Logger logger;
9
10
11
  private final String name;

  /** The host running the MQTT broker. */
René Schöne's avatar
René Schöne committed
12
  private java.net.URI host;
13
  /** The connection to the MQTT broker. */
René Schöne's avatar
René Schöne committed
14
  private org.fusesource.mqtt.client.CallbackConnection connection;
15
  /** Whether we are subscribed to the topics yet */
René Schöne's avatar
René Schöne committed
16
17
  private final java.util.concurrent.locks.Condition readyCondition;
  private final java.util.concurrent.locks.Lock readyLock;
18
  private boolean ready;
René Schöne's avatar
René Schöne committed
19
  private org.fusesource.mqtt.client.QoS qos;
20
  /** Dispatch knowledge */
René Schöne's avatar
René Schöne committed
21
  private final java.util.Map<String, java.util.function.Consumer<byte[]>> callbacks;
22
23
24
25
26
27

  public MqttUpdater() {
    this("Ros2Rag");
  }

  public MqttUpdater(String name) {
René Schöne's avatar
René Schöne committed
28
29
30
31
    this.name = java.util.Objects.requireNonNull(name, "Name must be set");
    this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttUpdater.class);
    this.callbacks = new java.util.HashMap<>();
    this.readyLock = new java.util.concurrent.locks.ReentrantLock();
32
33
    this.readyCondition = readyLock.newCondition();
    this.ready = false;
René Schöne's avatar
René Schöne committed
34
    this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
35
36
37
38
39
40
41
  }

  /**
   * Sets the host to receive messages from, and connects to it.
   * @throws IOException if could not connect, or could not subscribe to a topic
   * @return self
   */
René Schöne's avatar
René Schöne committed
42
43
  public MqttUpdater setHost(String host, int port) throws java.io.IOException {
    this.host = java.net.URI.create("tcp://" + host + ":" + port);
44
45
    logger.debug("Host for {} is {}", this.name, this.host);

René Schöne's avatar
René Schöne committed
46
47
    java.util.Objects.requireNonNull(this.host, "Host need to be set!");
    org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT();
48
49
    mqtt.setHost(this.host);
    connection = mqtt.callbackConnection();
René Schöne's avatar
René Schöne committed
50
    java.util.concurrent.atomic.AtomicReference<Throwable> error = new java.util.concurrent.atomic.AtomicReference<>();
51
52

    // add the listener to dispatch messages later
René Schöne's avatar
René Schöne committed
53
    connection.listener(new org.fusesource.mqtt.client.ExtendedListener() {
54
55
56
57
58
59
60
61
62
63
      public void onConnected() {
        logger.debug("Connected");
      }

      @Override
      public void onDisconnected() {
        logger.debug("Disconnected");
      }

      @Override
René Schöne's avatar
René Schöne committed
64
      public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, org.fusesource.hawtbuf.Buffer body, org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
65
        String topicString = topic.toString();
René Schöne's avatar
René Schöne committed
66
        java.util.function.Consumer<byte[]> callback = callbacks.get(topicString);
67
68
69
70
71
72
73
74
75
76
77
        if (callback == null) {
          logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?");
        } else {
          byte[] message = body.toByteArray();
//          System.out.println("message = " + Arrays.toString(message));
          callback.accept(message);
        }
        ack.onSuccess(null);  // always acknowledge message
      }

      @Override
René Schöne's avatar
René Schöne committed
78
      public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer, org.fusesource.hawtbuf.Buffer body, Runnable ack) {
79
80
81
82
83
84
85
86
87
88
89
90
        logger.warn("onPublish should not be called");
      }

      @Override
      public void onFailure(Throwable cause) {
//        logger.catching(cause);
        error.set(cause);
      }
    });
    throwIf(error);

    // actually establish the connection
René Schöne's avatar
René Schöne committed
91
    connection.connect(new org.fusesource.mqtt.client.Callback<Void>() {
92
93
      @Override
      public void onSuccess(Void value) {
René Schöne's avatar
René Schöne committed
94
        connection.publish("components", (name + " is connected").getBytes(), org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, false, new org.fusesource.mqtt.client.Callback<Void>() {
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
          @Override
          public void onSuccess(Void value) {
            logger.debug("success sending welcome message");
            try {
              readyLock.lock();
              ready = true;
              readyCondition.signalAll();
            } finally {
              readyLock.unlock();
            }
          }

          @Override
          public void onFailure(Throwable value) {
            logger.debug("failure sending welcome message", value);
          }
        });
      }

      @Override
      public void onFailure(Throwable cause) {
//        logger.error("Could not connect", cause);
        error.set(cause);
      }
    });
    throwIf(error);
    return this;
  }

René Schöne's avatar
René Schöne committed
124
  public java.net.URI getHost() {
125
126
127
    return host;
  }

René Schöne's avatar
René Schöne committed
128
  private void throwIf(java.util.concurrent.atomic.AtomicReference<Throwable> error) throws java.io.IOException {
129
    if (error.get() != null) {
René Schöne's avatar
René Schöne committed
130
      throw new java.io.IOException(error.get());
131
132
133
    }
  }

René Schöne's avatar
René Schöne committed
134
  public void setQoSForSubscription(org.fusesource.mqtt.client.QoS qos) {
135
136
137
    this.qos = qos;
  }

René Schöne's avatar
René Schöne committed
138
  public void newConnection(String topic, java.util.function.Consumer<byte[]> callback) {
139
140
141
142
143
144
145
146
147
    if (!ready) {
      // TODO should maybe be something more kind than throwing an exception here
      throw new IllegalStateException("Updater not ready");
    }
    // register callback
    callbacks.put(topic, callback);

    // subscribe at broker
    Topic[] topicArray = { new Topic(topic, this.qos) };
René Schöne's avatar
René Schöne committed
148
    connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
      @Override
      public void onSuccess(byte[] qoses) {
        logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
      }

      @Override
      public void onFailure(Throwable cause) {
        logger.error("Could not subscribe to {}", topic, cause);
      }
    });
  }

  /**
   * Waits until this updater is ready to receive MQTT messages.
   * If it already is ready, return immediately with the value <code>true</code>.
   * Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan,
   * if it got ready, or <code>false</code> upon a timeout.
   * @param time the maximum time to wait
   * @param unit the time unit of the time argument
   * @return whether this updater is ready
   */
René Schöne's avatar
René Schöne committed
170
  public boolean waitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
    try {
      readyLock.lock();
      if (ready) {
        return true;
      }
      return readyCondition.await(time, unit);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      readyLock.unlock();
    }
    return false;
  }

  public void close() {
    if (connection == null) {
      logger.warn("Stopping without connection. Was setHost() called?");
      return;
    }
René Schöne's avatar
René Schöne committed
190
    connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() {
191
192
193
194
195
196
197
198
199
200
201
202
203
      @Override
      public void onSuccess(Void value) {
        logger.info("Disconnected {} from {}", name, host);
      }

      @Override
      public void onFailure(Throwable ignored) {
        // Disconnects never fail. And we do not care either.
      }
    });
  }

  public void publish(String topic, byte[] bytes) {
René Schöne's avatar
René Schöne committed
204
    connection.publish(topic, bytes, qos, false, new org.fusesource.mqtt.client.Callback<Void>() {
205
206
207
208
209
210
211
212
213
214
215
216
      @Override
      public void onSuccess(Void value) {
        logger.debug("Published some bytes to {}", topic);
      }

      @Override
      public void onFailure(Throwable value) {
        logger.warn("Could not publish on topic '{}'", topic);
      }
    });
  }
}