Skip to content
Snippets Groups Projects
Commit fa735e60 authored by René Schöne's avatar René Schöne
Browse files

Towards functional starter.

- sender/receiver: reuse generated MqttUpdater
- base: added option "--verbose" to print stacktrace if any
- base: move mqtt default port definition to MqttUpdater
- base: add option in MqttUpdater to suppress welcome message
- starter: create 10 joints, use correct mqtt topics
parent 2a56b716
No related branches found
No related tags found
No related merge requests found
......@@ -50,15 +50,13 @@ aspect AspectGeneration {
public void Ros2Rag.generateMqttAspect(StringBuilder sb, TypeDecl rootNode) {
String rootNodeName = rootNode.getName();
sb.append("aspect MQTT {\n");
sb.append(ind(1)).append("private static final int ")
.append(rootNodeName).append("._MQTT_DEFAULT_PORT = 1883;\n");
sb.append(ind(1)).append("private MqttUpdater ").append(rootNodeName)
.append(".").append(mqttUpdaterField()).append(" = new MqttUpdater();\n");
// mqttSetHost(String host)
sb.append(ind(1)).append("public void ").append(rootNodeName).append(".")
.append(mqttSetHostMethod()).append("(String host) throws java.io.IOException {\n");
sb.append(ind(2)).append("MqttSetHost(host, _MQTT_DEFAULT_PORT);\n");
sb.append(ind(2)).append(mqttUpdaterField()).append(".setHost(host);\n");
sb.append(ind(1)).append("}\n");
// mqttSetHost(String host, int port)
......
......@@ -24,6 +24,7 @@ public class Compiler {
private StringOption optionRootNode;
private StringOption optionInputRos2Rag;
private FlagOption optionHelp;
private FlagOption optionVerbose;
private ArrayList<Option<?>> options;
private CommandLine commandLine;
......@@ -44,6 +45,19 @@ public class Compiler {
return;
}
if (optionVerbose.isSet()) {
try {
run();
} catch (CompilerException e) {
e.printStackTrace();
throw e;
}
} else {
run();
}
}
private void run() throws CompilerException {
String outputDir;
if (optionOutputDir.isSet()) {
outputDir = optionOutputDir.getValue();
......@@ -129,6 +143,7 @@ public class Compiler {
optionRootNode = addOption(new StringOption("rootNode", "root node in the base grammar."));
optionInputRos2Rag = addOption(new StringOption("inputRos2Rag", "ros2rag definition file."));
optionHelp = addOption(new FlagOption("help", "Print usage and exit."));
optionVerbose = addOption(new FlagOption("verbose", "Print more messages."));
}
private <OptionType extends Option<?>> OptionType addOption(OptionType option) {
......@@ -145,7 +160,9 @@ public class Compiler {
Ros2RagScanner scanner = new Ros2RagScanner(reader);
Ros2RagParser parser = new Ros2RagParser();
inputGrammar = (GrammarFile) parser.parse(scanner);
if (optionVerbose.isSet()) {
inputGrammar.dumpTree(System.out);
}
program.addGrammarFile(inputGrammar);
inputGrammar.treeResolveAll();
} catch (IOException | Parser.Exception e) {
......
aspect MqttUpdater {
/**
* Helper class to receive updates via MQTT and use callbacks to handle those messages.
*
* @author rschoene - Initial contribution
*/
public class MqttUpdater {
private static final int DEFAULT_PORT = 1883;
private final org.apache.logging.log4j.Logger logger;
private final String name;
......@@ -16,6 +18,7 @@ public class MqttUpdater {
private final java.util.concurrent.locks.Condition readyCondition;
private final java.util.concurrent.locks.Lock readyLock;
private boolean ready;
private boolean sendWelcomeMessage = true;
private org.fusesource.mqtt.client.QoS qos;
/** Dispatch knowledge */
private final java.util.Map<String, java.util.function.Consumer<byte[]>> callbacks;
......@@ -34,16 +37,31 @@ public class MqttUpdater {
this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
}
public MqttUpdater dontSendWelcomeMessage() {
this.sendWelcomeMessage = false;
return this;
}
/**
* Sets the host (with default port) to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttUpdater setHost(String host) throws java.io.IOException {
return setHost(host, DEFAULT_PORT);
}
/**
* Sets the host to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttUpdater setHost(String host, int port) throws java.io.IOException {
java.util.Objects.requireNonNull(host, "Host need to be set!");
this.host = java.net.URI.create("tcp://" + host + ":" + port);
logger.debug("Host for {} is {}", this.name, this.host);
java.util.Objects.requireNonNull(this.host, "Host need to be set!");
org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT();
mqtt.setHost(this.host);
connection = mqtt.callbackConnection();
......@@ -91,17 +109,12 @@ public class MqttUpdater {
connection.connect(new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
if (MqttUpdater.this.sendWelcomeMessage) {
connection.publish("components", (name + " is connected").getBytes(), org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, false, new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
try {
readyLock.lock();
ready = true;
readyCondition.signalAll();
} finally {
readyLock.unlock();
}
setReady();
}
@Override
......@@ -109,6 +122,9 @@ public class MqttUpdater {
logger.debug("failure sending welcome message", value);
}
});
} else {
setReady();
}
}
@Override
......@@ -125,6 +141,16 @@ public class MqttUpdater {
return host;
}
private void setReady() {
try {
readyLock.lock();
ready = true;
readyCondition.signalAll();
} finally {
readyLock.unlock();
}
}
private void throwIf(java.util.concurrent.atomic.AtomicReference<Throwable> error) throws java.io.IOException {
if (error.get() != null) {
throw new java.io.IOException(error.get());
......@@ -220,3 +246,4 @@ public class MqttUpdater {
});
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment