Commit 16a5550d authored by René Schöne's avatar René Schöne
Browse files

Towards functional starter.

- sender/receiver: reuse generated MqttUpdater
- base: added option "--verbose" to print stacktrace if any
- base: move mqtt default port definition to MqttUpdater
- base: add option in MqttUpdater to suppress welcome message
- starter: create 10 joints, use correct mqtt topics
parent d121217f
Pipeline #6809 passed with stage
in 1 minute and 50 seconds
#!/usr/bin/env bash
./ros2rag.senderstub/build/install/ros2rag.senderstub/bin/ros2rag.senderstub exit 1
#!/usr/bin/env bash
./gradlew :ros2rag.receiverstub:installDist
./ros2rag.receiverstub/build/install/ros2rag.receiverstub/bin/ros2rag.receiverstub $@
......@@ -50,15 +50,13 @@ aspect AspectGeneration {
public void Ros2Rag.generateMqttAspect(StringBuilder sb, TypeDecl rootNode) {
String rootNodeName = rootNode.getName();
sb.append("aspect MQTT {\n");
sb.append(ind(1)).append("private static final int ")
.append(rootNodeName).append("._MQTT_DEFAULT_PORT = 1883;\n");
sb.append(ind(1)).append("private MqttUpdater ").append(rootNodeName)
.append(".").append(mqttUpdaterField()).append(" = new MqttUpdater();\n");
// mqttSetHost(String host)
sb.append(ind(1)).append("public void ").append(rootNodeName).append(".")
.append(mqttSetHostMethod()).append("(String host) throws java.io.IOException {\n");
sb.append(ind(2)).append("MqttSetHost(host, _MQTT_DEFAULT_PORT);\n");
sb.append(ind(2)).append(mqttUpdaterField()).append(".setHost(host);\n");
sb.append(ind(1)).append("}\n");
// mqttSetHost(String host, int port)
......
......@@ -24,6 +24,7 @@ public class Compiler {
private StringOption optionRootNode;
private StringOption optionInputRos2Rag;
private FlagOption optionHelp;
private FlagOption optionVerbose;
private ArrayList<Option<?>> options;
private CommandLine commandLine;
......@@ -44,6 +45,19 @@ public class Compiler {
return;
}
if (optionVerbose.isSet()) {
try {
run();
} catch (CompilerException e) {
e.printStackTrace();
throw e;
}
} else {
run();
}
}
private void run() throws CompilerException {
String outputDir;
if (optionOutputDir.isSet()) {
outputDir = optionOutputDir.getValue();
......@@ -129,6 +143,7 @@ public class Compiler {
optionRootNode = addOption(new StringOption("rootNode", "root node in the base grammar."));
optionInputRos2Rag = addOption(new StringOption("inputRos2Rag", "ros2rag definition file."));
optionHelp = addOption(new FlagOption("help", "Print usage and exit."));
optionVerbose = addOption(new FlagOption("verbose", "Print more messages."));
}
private <OptionType extends Option<?>> OptionType addOption(OptionType option) {
......@@ -145,7 +160,9 @@ public class Compiler {
Ros2RagScanner scanner = new Ros2RagScanner(reader);
Ros2RagParser parser = new Ros2RagParser();
inputGrammar = (GrammarFile) parser.parse(scanner);
inputGrammar.dumpTree(System.out);
if (optionVerbose.isSet()) {
inputGrammar.dumpTree(System.out);
}
program.addGrammarFile(inputGrammar);
inputGrammar.treeResolveAll();
} catch (IOException | Parser.Exception e) {
......
aspect MqttUpdater {
/**
* Helper class to receive updates via MQTT and use callbacks to handle those messages.
*
* @author rschoene - Initial contribution
*/
public class MqttUpdater {
private static final int DEFAULT_PORT = 1883;
private final org.apache.logging.log4j.Logger logger;
private final String name;
......@@ -16,6 +18,7 @@ public class MqttUpdater {
private final java.util.concurrent.locks.Condition readyCondition;
private final java.util.concurrent.locks.Lock readyLock;
private boolean ready;
private boolean sendWelcomeMessage = true;
private org.fusesource.mqtt.client.QoS qos;
/** Dispatch knowledge */
private final java.util.Map<String, java.util.function.Consumer<byte[]>> callbacks;
......@@ -34,16 +37,31 @@ public class MqttUpdater {
this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
}
public MqttUpdater dontSendWelcomeMessage() {
this.sendWelcomeMessage = false;
return this;
}
/**
* Sets the host (with default port) to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttUpdater setHost(String host) throws java.io.IOException {
return setHost(host, DEFAULT_PORT);
}
/**
* Sets the host to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttUpdater setHost(String host, int port) throws java.io.IOException {
java.util.Objects.requireNonNull(host, "Host need to be set!");
this.host = java.net.URI.create("tcp://" + host + ":" + port);
logger.debug("Host for {} is {}", this.name, this.host);
java.util.Objects.requireNonNull(this.host, "Host need to be set!");
org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT();
mqtt.setHost(this.host);
connection = mqtt.callbackConnection();
......@@ -91,24 +109,22 @@ public class MqttUpdater {
connection.connect(new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
connection.publish("components", (name + " is connected").getBytes(), org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, false, new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
try {
readyLock.lock();
ready = true;
readyCondition.signalAll();
} finally {
readyLock.unlock();
if (MqttUpdater.this.sendWelcomeMessage) {
connection.publish("components", (name + " is connected").getBytes(), org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, false, new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
setReady();
}
}
@Override
public void onFailure(Throwable value) {
logger.debug("failure sending welcome message", value);
}
});
@Override
public void onFailure(Throwable value) {
logger.debug("failure sending welcome message", value);
}
});
} else {
setReady();
}
}
@Override
......@@ -125,6 +141,16 @@ public class MqttUpdater {
return host;
}
private void setReady() {
try {
readyLock.lock();
ready = true;
readyCondition.signalAll();
} finally {
readyLock.unlock();
}
}
private void throwIf(java.util.concurrent.atomic.AtomicReference<Throwable> error) throws java.io.IOException {
if (error.get() != null) {
throw new java.io.IOException(error.get());
......@@ -220,3 +246,4 @@ public class MqttUpdater {
});
}
}
}
/**
* Version 2020-04-17
*/
// --- update definitions ---
read Joint.CurrentPosition using LinkStateToIntPosition ;
write RobotArm._AppropriateSpeed using CreateSpeedMessage ;
// --- dependency definitions ---
RobotArm._AppropriateSpeed canDependOn Joint.CurrentPosition as dependency1 ;
RobotArm._AppropriateSpeed canDependOn RobotArm._AttributeTestSource as dependency2 ;
// --- mapping definitions ---
LinkStateToIntPosition maps protobuf panda.Linkstate.PandaLinkState x to IntPosition y {
panda.Linkstate.PandaLinkState.Position p = x.getPos();
y = IntPosition.of((int) p.getPositionX(), (int) p.getPositionY(), (int) p.getPositionZ());
}
CreateSpeedMessage maps double x to protobuf config.Robotconfig.RobotConfig y {
y = config.Robotconfig.RobotConfig.newBuilder()
.setSpeed(x)
.build();
}
import de.tudresden.inf.st.ros2rag.example.MqttUpdater;
import panda.Linkstate.PandaLinkState.Position;
// this aspect depends on the actual grammar. probably we need to provide the root node type, in this case "Model"
// it is somewhat problematic, because it assumes a single root to store the mqtt-host
aspect MQTT {
private MqttUpdater Model._mqttUpdater = new MqttUpdater();
private static final int Model._MQTT_DEFAULT_PORT = 1883;
public void Model.MqttSetHost(String host) throws java.io.IOException {
MqttSetHost(host, _MQTT_DEFAULT_PORT);
}
public void Model.MqttSetHost(String host, int port) throws java.io.IOException {
_mqttUpdater.setHost(host, port);
}
public boolean Model.MqttWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
return _mqttUpdater.waitUntilReady(time, unit);
}
public void Model.MqttCloseConnections() {
_mqttUpdater.close();
}
inh MqttUpdater Joint._mqttUpdater();
inh MqttUpdater RobotArm._mqttUpdater();
eq Model.getRobotArm()._mqttUpdater() = _mqttUpdater;
eq Model.getZoneModel()._mqttUpdater() = _mqttUpdater;
}
aspect GrammarExtension {
// --- Joint ---
public void Joint.connectCurrentPosition(String topic) {
_mqttUpdater().newConnection(topic, message -> {
// Parse message into a PandaLinkState
try {
System.out.println("begin update current position");
panda.Linkstate.PandaLinkState x = panda.Linkstate.PandaLinkState.parseFrom(message);
panda.Linkstate.PandaLinkState.Position p = x.getPos();
IntPosition y = IntPosition.of((int) p.getPositionX(), (int) p.getPositionY(), (int) p.getPositionZ());
// >> when "always" is absent, generate this
if (getCurrentPosition().equals(y)) { return; }
// >> until here
System.out.println("really setting current position");
setCurrentPosition(y);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
e.printStackTrace();
}
});
}
public Joint Joint.setCurrentPosition(IntPosition value) {
set_CurrentPosition(value);
for (RobotArm target : get_dependency1Targets()) {
System.out.println("checking robotarm " + target);
if (target._update_AppropriateSpeed()) {
System.out.println("writing " + target + "_AppropriateSpeed");
target._write_AppropriateSpeed_lastValue();
}
}
return this;
}
public IntPosition Joint.getCurrentPosition() {
return get_CurrentPosition();
}
// --- RobotArm ---
/*
this is one way to store the topic, i.e., close to the producing nonterminal.
another way would be a map (or multiple) in the MqttUpdater mapping nonterminal and operation to a topic
*/
private String RobotArm._write_AppropriateSpeed_topic = null;
private config.Robotconfig.RobotConfig RobotArm._write_AppropriateSpeed_lastValue = null;
public RobotArm RobotArm.setAttributeTestSource(int value) {
set_AttributeTestSource(value);
for (RobotArm target : get_dependency2Targets()) {
if (target._update_AppropriateSpeed()) {
target._write_AppropriateSpeed_lastValue();
}
}
return this;
}
public int RobotArm.getAttributeTestSource() {
return get_AttributeTestSource();
}
public void RobotArm.connect_AppropriateSpeed(String topic, boolean writeCurrentValue) {
_write_AppropriateSpeed_topic = topic;
_update_AppropriateSpeed();
if (writeCurrentValue) {
_write_AppropriateSpeed_lastValue();
}
}
/**
* @apilevel internal
* @return true, if CreateSpeedMessage changed for _AppropriateSpeed
*/
public boolean RobotArm._update_AppropriateSpeed() {
// 1) read _AppropriateSpeed (which triggers computation, right?)
// 2) maybe: check if value has changed since last publish
get_AppropriateSpeed_reset();
double x = get_AppropriateSpeed();
// begin transformation "CreateSpeedMessage"
config.Robotconfig.RobotConfig y = config.Robotconfig.RobotConfig.newBuilder()
.setSpeed(x)
.build();
// end transformation "CreateSpeedMessage"
if (y.equals(_write_AppropriateSpeed_lastValue)) {
// if always is absent, then return here
return false;
}
// remember last value
_write_AppropriateSpeed_lastValue = y;
return true;
}
public void RobotArm._write_AppropriateSpeed_lastValue() {
// 3) publish
// we know, it is a protobuf type, so call "toByteArray"
byte[] bytes = _write_AppropriateSpeed_lastValue.toByteArray();
_mqttUpdater().publish(_write_AppropriateSpeed_topic, bytes);
}
public void RobotArm.addDependency1(Joint source) {
this.add_dependency1Source(source);
}
public void RobotArm.addDependency2(RobotArm source) {
this.add_dependency2Source(source);
}
}
// RobotArm._AppropriateSpeed canDependOn Joint.CurrentPosition ;
rel RobotArm._dependency1Source* <-> Joint._dependency1Target* ;
// RobotArm._AppropriateSpeed canDependOn RobotArm._AttributeTestSource as dependency2 ;
rel RobotArm._dependency2Source* <-> RobotArm._dependency2Target* ;
package de.tudresden.inf.st.ros2rag.example;
import de.tudresden.inf.st.ros2rag.ast.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import panda.Linkstate;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Testing Ros2Rag without generating something.
*
* @author rschoene - Initial contribution
*/
public class Main {
private static final String TOPIC_JOINT1 = "robot/joint1";
private static final String TOPIC_CONFIG = "robot/config";
private static final Logger logger = LogManager.getLogger(Main.class);
public static void main(String[] args) throws IOException, InterruptedException {
Model model = new Model();
model.MqttSetHost("localhost");
ZoneModel zoneModel = new ZoneModel();
zoneModel.setSize(makePosition(1, 1, 1));
IntPosition myPosition = makePosition(0, 0, 0);
Coordinate myCoordinate = new Coordinate(myPosition);
Coordinate leftPosition = new Coordinate(makePosition(-1, 0, 0));
Coordinate rightPosition = new Coordinate(makePosition(1, 0, 0));
Zone safetyZone = new Zone();
safetyZone.addCoordinate(myCoordinate);
safetyZone.addCoordinate(leftPosition);
safetyZone.addCoordinate(rightPosition);
zoneModel.addSafetyZone(safetyZone);
model.setZoneModel(zoneModel);
RobotArm robotArm = new RobotArm();
robotArm.set_AttributeTestSource(1); // set initial value, no trigger
Joint joint1 = new Joint();
joint1.setName("joint1");
joint1.setCurrentPosition(myPosition);
EndEffector endEffector = new EndEffector();
endEffector.setName("gripper");
endEffector.setCurrentPosition(makePosition(2, 2, 3));
robotArm.addJoint(joint1);
robotArm.setEndEffector(endEffector);
model.setRobotArm(robotArm);
// add dependencies
robotArm.addDependency1(joint1);
robotArm.addDependency1(endEffector);
robotArm.addDependency2(robotArm);
model.MqttWaitUntilReady(2, TimeUnit.SECONDS);
joint1.connectCurrentPosition(TOPIC_JOINT1);
robotArm.connect_AppropriateSpeed(TOPIC_CONFIG, true);
startDelayedSendOne();
logStatus("BEFORE", robotArm, joint1);
// logger.info("Now invoke ./send_one.sh");
// Thread.sleep(3000);
// robotArm.setAttributeTestSource(1);
// Thread.sleep(3000);
// robotArm.setAttributeTestSource(5);
Thread.sleep(6000);
logStatus("AFTER", robotArm, joint1);
logger.info("Now changing AttributeTestSource - this should not emit any new mqtt message");
robotArm.setAttributeTestSource(1);
Thread.sleep(1000);
robotArm.setAttributeTestSource(5);
model.MqttCloseConnections();
}
private static void logStatus(String prefix, RobotArm robotArm, Joint joint1) {
logger.info("{} joint1.getCurrentPosition() = {}, " +
"robotArm.isInSafetyZone = {}, " +
"robotArm.get_AppropriateSpeed = {}",
prefix,
joint1.getCurrentPosition(),
robotArm.isInSafetyZone(),
robotArm.get_AppropriateSpeed());
}
private static void startDelayedSendOne() {
final int millis = 1500;
new Thread(() -> {
try {
Linkstate.PandaLinkState pls = Linkstate.PandaLinkState.newBuilder()
.setName("Joint1")
.setPos(Linkstate.PandaLinkState.Position.newBuilder()
.setPositionX(0.5f)
.setPositionY(0.5f)
.setPositionZ(0.5f)
.build())
.build();
byte[] bytes = pls.toByteArray();
MqttUpdater mqttUpdater = new MqttUpdater("sender").setHost("localhost", 1883);
// wait until connected, but in total millis
long before = System.currentTimeMillis();
mqttUpdater.waitUntilReady(millis, TimeUnit.MILLISECONDS);
long alreadyWaited = System.currentTimeMillis() - before;
Thread.sleep((long) millis - alreadyWaited);
mqttUpdater.publish(TOPIC_JOINT1, bytes);
Thread.sleep(1000);
pls = Linkstate.PandaLinkState.newBuilder()
.setName("Joint1")
.setPos(Linkstate.PandaLinkState.Position.newBuilder()
.setPositionX(2.5f)
.setPositionY(2.5f)
.setPositionZ(2.5f)
.build())
.build();
bytes = pls.toByteArray();
mqttUpdater.publish(TOPIC_JOINT1, bytes);
mqttUpdater.close();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}).start();
}
private static IntPosition makePosition(int x, int y, int z) {
return IntPosition.of(x, y, z);
}
}
package de.tudresden.inf.st.ros2rag.example;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
/**
* Helper class to receive updates via MQTT and use callbacks to handle those messages.
*
* @author rschoene - Initial contribution
*/
public class MqttUpdater {
private final Logger logger;
private final String name;
/** The host running the MQTT broker. */
private URI host;
/** The connection to the MQTT broker. */
private CallbackConnection connection;
/** Whether we are subscribed to the topics yet */
private final Condition readyCondition;
private final Lock readyLock;
private boolean ready;
private QoS qos;
/** Dispatch knowledge */
private final Map<String, Consumer<byte[]>> callbacks;
public MqttUpdater() {
this("Ros2Rag");
}
public MqttUpdater(String name) {
this.name = Objects.requireNonNull(name, "Name must be set");
this.logger = LogManager.getLogger(MqttUpdater.class);
this.callbacks = new HashMap<>();
this.readyLock = new ReentrantLock();
this.readyCondition = readyLock.newCondition();
this.ready = false;
this.qos = QoS.AT_LEAST_ONCE;
}
/**
* Sets the host to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttUpdater setHost(String host, int port) throws IOException {
this.host = URI.create("tcp://" + host + ":" + port);
logger.debug("Host for {} is {}", this.name, this.host);
Objects.requireNonNull(this.host, "Host need to be set!");
MQTT mqtt = new MQTT();
mqtt.setHost(this.host);
connection = mqtt.callbackConnection();
AtomicReference<Throwable> error = new AtomicReference<>();
// add the listener to dispatch messages later
connection.listener(new ExtendedListener() {
public void onConnected() {
logger.debug("Connected");
}
@Override
public void onDisconnected() {
logger.debug("Disconnected");
}
@Override
public void onPublish(UTF8Buffer topic, Buffer body, Callback<Callback<Void>> ack) {
String topicString = topic.toString();
Consumer<byte[]> callback = callbacks.get(topicString);
if (callback == null) {
logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?");
} else {
byte[] message = body.toByteArray();
// System.out.println("message = " + Arrays.toString(message));
callback.accept(message);
}
ack.onSuccess(null); // always acknowledge message
}
@Override
public void onPublish(UTF8Buffer topicBuffer, Buffer body, Runnable ack) {
logger.warn("onPublish should not be called");
}
@Override
public void onFailure(Throwable cause) {