Skip to content
Snippets Groups Projects
Commit 07accb5e authored by René Schöne's avatar René Schöne
Browse files

Update goal, fixed bug in mappings and MqttHandler.

- Base: inserted DefaultMappings did not account for existing other mappings, but always use type of token
- Base: MqttHandler was not able to have multiple newConnections for the same topic
- Goal: Changed way how last update is handled (now packed into currentStep)
- Goal: Added StartStep
- Goal: Changed wait to be in milliseconds
parent d0cd0239
Branches
No related tags found
No related merge requests found
...@@ -111,7 +111,7 @@ aspect Mappings { ...@@ -111,7 +111,7 @@ aspect Mappings {
// or if no mappings are specified. // or if no mappings are specified.
// then prepend the suitable default mapping // then prepend the suitable default mapping
java.util.List<MappingDefinition> result; java.util.List<MappingDefinition> result;
if (getMappingList().size() == 0 || !getMappingList().get(0).getFromType().isByteArray()) { if (getMappingList().isEmpty() || !getMappingList().get(0).getFromType().isByteArray()) {
result = new java.util.ArrayList(); result = new java.util.ArrayList();
result.add(suitableDefaultMapping()); result.add(suitableDefaultMapping());
result.addAll(getMappingList()); result.addAll(getMappingList());
...@@ -161,7 +161,9 @@ aspect Mappings { ...@@ -161,7 +161,9 @@ aspect Mappings {
// --- suitableDefaultMapping --- // --- suitableDefaultMapping ---
syn DefaultMappingDefinition UpdateDefinition.suitableDefaultMapping(); syn DefaultMappingDefinition UpdateDefinition.suitableDefaultMapping();
eq ReadFromMqttDefinition.suitableDefaultMapping() { eq ReadFromMqttDefinition.suitableDefaultMapping() {
String typeName = getToken().getJavaTypeUse().getName(); String typeName = getMappingList().isEmpty() ?
getToken().getJavaTypeUse().getName() :
getMappingList().get(0).getFromType().prettyPrint();
switch(typeName) { switch(typeName) {
case "int": case "int":
case "Integer": return ros2rag().defaultBytesToIntMapping(); case "Integer": return ros2rag().defaultBytesToIntMapping();
...@@ -180,7 +182,9 @@ aspect Mappings { ...@@ -180,7 +182,9 @@ aspect Mappings {
} }
} }
eq WriteToMqttDefinition.suitableDefaultMapping() { eq WriteToMqttDefinition.suitableDefaultMapping() {
String typeName = getToken().getJavaTypeUse().getName(); String typeName = getMappingList().isEmpty() ?
getToken().getJavaTypeUse().getName() :
getMappingList().get(getMappingList().size() - 1).getFromType().prettyPrint();
switch(typeName) { switch(typeName) {
case "int": case "int":
case "Integer": return ros2rag().defaultIntToBytesMapping(); case "Integer": return ros2rag().defaultIntToBytesMapping();
......
...@@ -21,7 +21,7 @@ public class MqttHandler { ...@@ -21,7 +21,7 @@ public class MqttHandler {
private boolean sendWelcomeMessage = true; private boolean sendWelcomeMessage = true;
private org.fusesource.mqtt.client.QoS qos; private org.fusesource.mqtt.client.QoS qos;
/** Dispatch knowledge */ /** Dispatch knowledge */
private final java.util.Map<String, java.util.function.Consumer<byte[]>> callbacks; private final java.util.Map<String, java.util.List<java.util.function.Consumer<byte[]>>> callbacks;
public MqttHandler() { public MqttHandler() {
this("Ros2Rag"); this("Ros2Rag");
...@@ -81,13 +81,15 @@ public class MqttHandler { ...@@ -81,13 +81,15 @@ public class MqttHandler {
@Override @Override
public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, org.fusesource.hawtbuf.Buffer body, org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) { public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, org.fusesource.hawtbuf.Buffer body, org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
String topicString = topic.toString(); String topicString = topic.toString();
java.util.function.Consumer<byte[]> callback = callbacks.get(topicString); java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString);
if (callback == null) { if (callbackList == null || callbackList.isEmpty()) {
logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?"); logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?");
} else { } else {
byte[] message = body.toByteArray(); byte[] message = body.toByteArray();
// System.out.println("message = " + Arrays.toString(message)); // System.out.println("message = " + Arrays.toString(message));
callback.accept(message); for (java.util.function.Consumer<byte[]> callback : callbackList) {
callback.accept(message);
}
} }
ack.onSuccess(null); // always acknowledge message ack.onSuccess(null); // always acknowledge message
} }
...@@ -163,27 +165,30 @@ public class MqttHandler { ...@@ -163,27 +165,30 @@ public class MqttHandler {
public void newConnection(String topic, java.util.function.Consumer<byte[]> callback) { public void newConnection(String topic, java.util.function.Consumer<byte[]> callback) {
if (!ready) { if (!ready) {
// TODO should maybe be something more kind than throwing an exception here // should maybe be something more kind than throwing an exception here
throw new IllegalStateException("Updater not ready"); throw new IllegalStateException("Updater not ready");
} }
// register callback // register callback
callbacks.put(topic, callback); if (callbacks.get(topic) == null) {
callbacks.put(topic, new java.util.ArrayList<>());
// subscribe at broker // subscribe at broker
org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) };
connection.getDispatchQueue().execute(() -> { connection.getDispatchQueue().execute(() -> {
connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() { connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
@Override @Override
public void onSuccess(byte[] qoses) { public void onSuccess(byte[] qoses) {
logger.debug("Subscribed to {}, qoses: {}", topic, qoses); logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
} }
@Override @Override
public void onFailure(Throwable cause) { public void onFailure(Throwable cause) {
logger.error("Could not subscribe to {}", topic, cause); logger.error("Could not subscribe to {}", topic, cause);
} }
});
}); });
}); }
callbacks.get(topic).add(callback);
} }
/** /**
......
aspect MQTT { aspect MQTT {
private MqttHandler {{rootNodeName}}.{{mqttHandlerField}} = new MqttHandler(); private String {{rootNodeName}}.MqttName() { return "Ros2Rag"; }
private MqttHandler {{rootNodeName}}.{{mqttHandlerField}} = new MqttHandler(MqttName());
public void {{rootNodeName}}.{{mqttSetHostMethod}}(String host) throws java.io.IOException { public void {{rootNodeName}}.{{mqttSetHostMethod}}(String host) throws java.io.IOException {
{{mqttHandlerField}}.setHost(host); {{mqttHandlerField}}.setHost(host);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment