diff --git a/src/main/jastadd/backend/Mappings.jrag b/src/main/jastadd/backend/Mappings.jrag index fd40385d9e8879dd188362f6a0acd802652f511e..600ce92ee064f45fe0f41860dd3ab0bc5e334bd3 100644 --- a/src/main/jastadd/backend/Mappings.jrag +++ b/src/main/jastadd/backend/Mappings.jrag @@ -111,7 +111,7 @@ aspect Mappings { // or if no mappings are specified. // then prepend the suitable default mapping java.util.List<MappingDefinition> result; - if (getMappingList().size() == 0 || !getMappingList().get(0).getFromType().isByteArray()) { + if (getMappingList().isEmpty() || !getMappingList().get(0).getFromType().isByteArray()) { result = new java.util.ArrayList(); result.add(suitableDefaultMapping()); result.addAll(getMappingList()); @@ -161,7 +161,9 @@ aspect Mappings { // --- suitableDefaultMapping --- syn DefaultMappingDefinition UpdateDefinition.suitableDefaultMapping(); eq ReadFromMqttDefinition.suitableDefaultMapping() { - String typeName = getToken().getJavaTypeUse().getName(); + String typeName = getMappingList().isEmpty() ? + getToken().getJavaTypeUse().getName() : + getMappingList().get(0).getFromType().prettyPrint(); switch(typeName) { case "int": case "Integer": return ros2rag().defaultBytesToIntMapping(); @@ -180,7 +182,9 @@ aspect Mappings { } } eq WriteToMqttDefinition.suitableDefaultMapping() { - String typeName = getToken().getJavaTypeUse().getName(); + String typeName = getMappingList().isEmpty() ? + getToken().getJavaTypeUse().getName() : + getMappingList().get(getMappingList().size() - 1).getFromType().prettyPrint(); switch(typeName) { case "int": case "Integer": return ros2rag().defaultIntToBytesMapping(); diff --git a/src/main/resources/MqttHandler.jadd b/src/main/resources/MqttHandler.jadd index 47005270a0cae49229d70760be5be694ea3ca20f..c3e98dc89377e6217946fafa54a9eb7ab7594c61 100644 --- a/src/main/resources/MqttHandler.jadd +++ b/src/main/resources/MqttHandler.jadd @@ -21,7 +21,7 @@ public class MqttHandler { private boolean sendWelcomeMessage = true; private org.fusesource.mqtt.client.QoS qos; /** Dispatch knowledge */ - private final java.util.Map<String, java.util.function.Consumer<byte[]>> callbacks; + private final java.util.Map<String, java.util.List<java.util.function.Consumer<byte[]>>> callbacks; public MqttHandler() { this("Ros2Rag"); @@ -81,13 +81,15 @@ public class MqttHandler { @Override public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, org.fusesource.hawtbuf.Buffer body, org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) { String topicString = topic.toString(); - java.util.function.Consumer<byte[]> callback = callbacks.get(topicString); - if (callback == null) { + java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString); + if (callbackList == null || callbackList.isEmpty()) { logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?"); } else { byte[] message = body.toByteArray(); // System.out.println("message = " + Arrays.toString(message)); - callback.accept(message); + for (java.util.function.Consumer<byte[]> callback : callbackList) { + callback.accept(message); + } } ack.onSuccess(null); // always acknowledge message } @@ -163,27 +165,30 @@ public class MqttHandler { public void newConnection(String topic, java.util.function.Consumer<byte[]> callback) { if (!ready) { - // TODO should maybe be something more kind than throwing an exception here + // should maybe be something more kind than throwing an exception here throw new IllegalStateException("Updater not ready"); } // register callback - callbacks.put(topic, callback); + if (callbacks.get(topic) == null) { + callbacks.put(topic, new java.util.ArrayList<>()); - // subscribe at broker - org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; - connection.getDispatchQueue().execute(() -> { - connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() { - @Override - public void onSuccess(byte[] qoses) { - logger.debug("Subscribed to {}, qoses: {}", topic, qoses); - } + // subscribe at broker + org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; + connection.getDispatchQueue().execute(() -> { + connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() { + @Override + public void onSuccess(byte[] qoses) { + logger.debug("Subscribed to {}, qoses: {}", topic, qoses); + } - @Override - public void onFailure(Throwable cause) { - logger.error("Could not subscribe to {}", topic, cause); - } + @Override + public void onFailure(Throwable cause) { + logger.error("Could not subscribe to {}", topic, cause); + } + }); }); - }); + } + callbacks.get(topic).add(callback); } /** diff --git a/src/main/resources/mqtt.mustache b/src/main/resources/mqtt.mustache index fb2b173c35cc802c5c5c661cf22ce331c20fbbae..d65ca944a9dc7e881ff75d89ea696d4bb1fe09fc 100644 --- a/src/main/resources/mqtt.mustache +++ b/src/main/resources/mqtt.mustache @@ -1,5 +1,6 @@ aspect MQTT { - private MqttHandler {{rootNodeName}}.{{mqttHandlerField}} = new MqttHandler(); + private String {{rootNodeName}}.MqttName() { return "Ros2Rag"; } + private MqttHandler {{rootNodeName}}.{{mqttHandlerField}} = new MqttHandler(MqttName()); public void {{rootNodeName}}.{{mqttSetHostMethod}}(String host) throws java.io.IOException { {{mqttHandlerField}}.setHost(host); }