From 07accb5ea4e43cd12b9ed52d22ebd423a659200c Mon Sep 17 00:00:00 2001 From: rschoene <rene.schoene@tu-dresden.de> Date: Wed, 8 Jul 2020 11:28:49 +0200 Subject: [PATCH] Update goal, fixed bug in mappings and MqttHandler. - Base: inserted DefaultMappings did not account for existing other mappings, but always use type of token - Base: MqttHandler was not able to have multiple newConnections for the same topic - Goal: Changed way how last update is handled (now packed into currentStep) - Goal: Added StartStep - Goal: Changed wait to be in milliseconds --- src/main/jastadd/backend/Mappings.jrag | 10 ++++-- src/main/resources/MqttHandler.jadd | 43 ++++++++++++++------------ src/main/resources/mqtt.mustache | 3 +- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/src/main/jastadd/backend/Mappings.jrag b/src/main/jastadd/backend/Mappings.jrag index fd40385..600ce92 100644 --- a/src/main/jastadd/backend/Mappings.jrag +++ b/src/main/jastadd/backend/Mappings.jrag @@ -111,7 +111,7 @@ aspect Mappings { // or if no mappings are specified. // then prepend the suitable default mapping java.util.List<MappingDefinition> result; - if (getMappingList().size() == 0 || !getMappingList().get(0).getFromType().isByteArray()) { + if (getMappingList().isEmpty() || !getMappingList().get(0).getFromType().isByteArray()) { result = new java.util.ArrayList(); result.add(suitableDefaultMapping()); result.addAll(getMappingList()); @@ -161,7 +161,9 @@ aspect Mappings { // --- suitableDefaultMapping --- syn DefaultMappingDefinition UpdateDefinition.suitableDefaultMapping(); eq ReadFromMqttDefinition.suitableDefaultMapping() { - String typeName = getToken().getJavaTypeUse().getName(); + String typeName = getMappingList().isEmpty() ? + getToken().getJavaTypeUse().getName() : + getMappingList().get(0).getFromType().prettyPrint(); switch(typeName) { case "int": case "Integer": return ros2rag().defaultBytesToIntMapping(); @@ -180,7 +182,9 @@ aspect Mappings { } } eq WriteToMqttDefinition.suitableDefaultMapping() { - String typeName = getToken().getJavaTypeUse().getName(); + String typeName = getMappingList().isEmpty() ? + getToken().getJavaTypeUse().getName() : + getMappingList().get(getMappingList().size() - 1).getFromType().prettyPrint(); switch(typeName) { case "int": case "Integer": return ros2rag().defaultIntToBytesMapping(); diff --git a/src/main/resources/MqttHandler.jadd b/src/main/resources/MqttHandler.jadd index 4700527..c3e98dc 100644 --- a/src/main/resources/MqttHandler.jadd +++ b/src/main/resources/MqttHandler.jadd @@ -21,7 +21,7 @@ public class MqttHandler { private boolean sendWelcomeMessage = true; private org.fusesource.mqtt.client.QoS qos; /** Dispatch knowledge */ - private final java.util.Map<String, java.util.function.Consumer<byte[]>> callbacks; + private final java.util.Map<String, java.util.List<java.util.function.Consumer<byte[]>>> callbacks; public MqttHandler() { this("Ros2Rag"); @@ -81,13 +81,15 @@ public class MqttHandler { @Override public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, org.fusesource.hawtbuf.Buffer body, org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) { String topicString = topic.toString(); - java.util.function.Consumer<byte[]> callback = callbacks.get(topicString); - if (callback == null) { + java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString); + if (callbackList == null || callbackList.isEmpty()) { logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?"); } else { byte[] message = body.toByteArray(); // System.out.println("message = " + Arrays.toString(message)); - callback.accept(message); + for (java.util.function.Consumer<byte[]> callback : callbackList) { + callback.accept(message); + } } ack.onSuccess(null); // always acknowledge message } @@ -163,27 +165,30 @@ public class MqttHandler { public void newConnection(String topic, java.util.function.Consumer<byte[]> callback) { if (!ready) { - // TODO should maybe be something more kind than throwing an exception here + // should maybe be something more kind than throwing an exception here throw new IllegalStateException("Updater not ready"); } // register callback - callbacks.put(topic, callback); + if (callbacks.get(topic) == null) { + callbacks.put(topic, new java.util.ArrayList<>()); - // subscribe at broker - org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; - connection.getDispatchQueue().execute(() -> { - connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() { - @Override - public void onSuccess(byte[] qoses) { - logger.debug("Subscribed to {}, qoses: {}", topic, qoses); - } + // subscribe at broker + org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; + connection.getDispatchQueue().execute(() -> { + connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() { + @Override + public void onSuccess(byte[] qoses) { + logger.debug("Subscribed to {}, qoses: {}", topic, qoses); + } - @Override - public void onFailure(Throwable cause) { - logger.error("Could not subscribe to {}", topic, cause); - } + @Override + public void onFailure(Throwable cause) { + logger.error("Could not subscribe to {}", topic, cause); + } + }); }); - }); + } + callbacks.get(topic).add(callback); } /** diff --git a/src/main/resources/mqtt.mustache b/src/main/resources/mqtt.mustache index fb2b173..d65ca94 100644 --- a/src/main/resources/mqtt.mustache +++ b/src/main/resources/mqtt.mustache @@ -1,5 +1,6 @@ aspect MQTT { - private MqttHandler {{rootNodeName}}.{{mqttHandlerField}} = new MqttHandler(); + private String {{rootNodeName}}.MqttName() { return "Ros2Rag"; } + private MqttHandler {{rootNodeName}}.{{mqttHandlerField}} = new MqttHandler(MqttName()); public void {{rootNodeName}}.{{mqttSetHostMethod}}(String host) throws java.io.IOException { {{mqttHandlerField}}.setHost(host); } -- GitLab