Skip to content
Snippets Groups Projects
Commit 2c1540b7 authored by René Schöne's avatar René Schöne
Browse files

Somewhat working READ.

- Example: use Position class created by protobuf, use a wrapper in grammar instead for lists
- Example: manually extend Joint, currently only one connect
- Example: added generic MqttUpdater handling subscriptions
- Example: added some glue code, that would be generated normally
- Example: copied protobuf definition for linkstate
- SenderStub: new module with main to construct minimal linkstate and send it via mqtt
parent d7770f93
No related branches found
No related tags found
No related merge requests found
Pipeline #6228 failed
Showing
with 709 additions and 9 deletions
......@@ -19,13 +19,14 @@ buildscript {
}
sourceSets.main.java.srcDir "src/gen/java"
jar.manifest.attributes('Main-Class': 'de.tudresden.inf.st.ros2rag.Main')
jar.manifest.attributes('Main-Class': 'de.tudresden.inf.st.ros2rag.example.Main')
dependencies {
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: "${jackson_version}"
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}"
implementation group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11'
compile 'com.google.protobuf:protobuf-java:3.0.0'
compile group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15'
jastadd2 "org.jastadd:jastadd:2.3.4"
}
......@@ -99,7 +100,7 @@ jastadd {
}
// Input files
def relastFiles = ["./src/main/jastadd/Example.relast"]
def relastFiles = ["src/main/jastadd/Example.relast", "src/main/jastadd/Generated.relast"]
// phase: RelAst -> JastAdd
task relastToJastAdd(type: JavaExec) {
......
Model ::= RobotArm ZoneModel ;
ZoneModel ::= Size:Position SafetyZone:Zone*;
ZoneModel ::= <Size:Position> SafetyZone:Zone*;
Zone ::= Position*;
Zone ::= PositionWrapper*;
// Do not use terminal-NTA's for now, as relast has problems with it "/<ShouldUseLowSpeed:Boolean>/" ;
RobotArm ::= Joint* EndEffector ;
Joint ::= <Name> ;
rel Joint.CurrentPosition -> Position ;
Joint ::= <Name> <CurrentPosition:Position> ;
//rel Joint.CurrentPosition -> Position_Old ;
EndEffector : Joint;
Position ::= <x:int> <y:int> <z:int> ;
Position_Old ::= <x:int> <y:int> <z:int> ;
PositionWrapper ::= <Position:Position> ;
import de.tudresden.inf.st.ros2rag.example.MqttUpdater;
import panda.Linkstate.PandaLinkState.Position;
// this aspect depends on the actual grammar. probably we need to provide the root node type, in this case "Model"
// it is somewhat problematic, because it assumes a single root to store the mqtt-host
aspect GrammarExtension {
// kind of private NTA typed "MqttRoot" and named "_MqttRoot"
syn nta MqttRoot Model.get_MqttRoot() {
return new MqttRoot();
}
public void Model.updateMqttHost(String host) throws java.io.IOException {
get_MqttRoot().updateHost(host);
}
public void Model.updateMqttHost(String host, int port) throws java.io.IOException {
get_MqttRoot().updateHost(host, port);
}
public boolean Model.waitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
return get_MqttRoot().getUpdater().waitUntilReady(time, unit);
}
inh MqttUpdater Joint._mqttUpdater();
eq Model.getRobotArm()._mqttUpdater() = get_MqttRoot().getUpdater();
eq Model.getZoneModel()._mqttUpdater() = get_MqttRoot().getUpdater();
}
// this aspect is generic and will be always generated in the same way
aspect Mqtt {
// --- default values ---
private static final int MqttRoot.DEFAULT_PORT = 1883;
void MqttRoot.updateHost(String host) throws java.io.IOException {
updateHost(host, DEFAULT_PORT);
}
void MqttRoot.updateHost(String host, int port) throws java.io.IOException {
setHost(ExternalHost.of(host, port));
if (getUpdater() != null) {
// close connection to old updater first
getUpdater().close();
}
setUpdater(new MqttUpdater().setHost(host, port));
}
public static ExternalHost ExternalHost.of(String hostName, int defaultPort) {
String host = hostName;
int port = defaultPort;
if (hostName.contains(":")) {
String[] parts = hostName.split(":");
host = parts[0];
port = Integer.parseInt(parts[1]);
}
return new ExternalHost(host, port);
}
syn String ExternalHost.urlAsString() = String.format("http://%s:%s", getHostName(), getPort());
}
MqttRoot ::= [Host:ExternalHost] <Updater:MqttUpdater> ;
ExternalHost ::= <HostName:String> <Port:int> ;
package de.tudresden.inf.st.ros2rag.example;
import com.google.protobuf.InvalidProtocolBufferException;
import de.tudresden.inf.st.ros2rag.ast.Joint;
import panda.Linkstate.PandaLinkState;
import panda.Linkstate.PandaLinkState.Position;
/**
* Manually written code for Joint to be actually generated later.
*
* @author rschoene - Initial contribution
*/
public class GeneratedJoint extends Joint {
/*
Input for this to be generated:
// when an update of pose is read via mqtt, then update current position
[always] read Joint.CurrentPosition using LinkStateToPosition;
// panda.LinkState is a datatype defined in protobuf
LinkStateToPosition: map panda.Linkstate x to Position y using {
y = x.getPos();
}
*/
public void connectCurrentPosition(String topic) {
_mqttUpdater().newConnection(topic, message -> {
// Parse message into a LinkState
try {
PandaLinkState x = PandaLinkState.parseFrom(message);
Position y = x.getPos();
setCurrentPosition(y);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
});
}
}
package de.tudresden.inf.st.ros2rag.example;
import de.tudresden.inf.st.ros2rag.ast.RobotArm;
/**
* Manually written code for RobotArm to be actually generated later.
*
* @author rschoene - Initial contribution
*/
public class GeneratedRobotArm extends RobotArm {
}
package de.tudresden.inf.st.ros2rag.example;
import com.google.protobuf.InvalidProtocolBufferException;
import de.tudresden.inf.st.ros2rag.ast.*;
import panda.Linkstate.PandaLinkState.Position;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Testing Ros2Rag without generating something.
*
* @author rschoene - Initial contribution
*/
public class Main {
public static void main(String[] args) {
System.out.println("Hello");
public static void main(String[] args) throws InvalidProtocolBufferException, IOException, InterruptedException {
Model model = new Model();
model.updateMqttHost("localhost");
ZoneModel zoneModel = new ZoneModel();
zoneModel.setSize(makePosition(1, 1, 1));
Position myPosition = makePosition(0, 0, 0);
PositionWrapper myPositionWrapper = new PositionWrapper(myPosition);
PositionWrapper leftPosition = new PositionWrapper(makePosition(-1, 0, 0));
PositionWrapper rightPosition = new PositionWrapper(makePosition(1, 0, 0));
Zone safetyZone = new Zone();
safetyZone.addPositionWrapper(myPositionWrapper);
safetyZone.addPositionWrapper(leftPosition);
safetyZone.addPositionWrapper(rightPosition);
zoneModel.addSafetyZone(safetyZone);
RobotArm robotArm = new GeneratedRobotArm();
GeneratedJoint joint1 = new GeneratedJoint();
joint1.setName("joint1");
joint1.setCurrentPosition(myPosition);
EndEffector endEffector = new EndEffector();
endEffector.setName("gripper");
endEffector.setCurrentPosition(myPosition);
robotArm.addJoint(joint1);
robotArm.setEndEffector(endEffector);
model.setRobotArm(robotArm);
model.waitUntilReady(2, TimeUnit.SECONDS);
joint1.connectCurrentPosition("robot/joint1");
System.out.println("BEFORE joint1.getCurrentPosition() = " + stringify(joint1.getCurrentPosition()));
Thread.sleep(10000);
System.out.println("AFTER joint1.getCurrentPosition() = " + stringify(joint1.getCurrentPosition()));
// TODO close/shutdown should be exposed
model.get_MqttRoot().getUpdater().close();
}
private static Position makePosition(int x, int y, int z) {
return Position.newBuilder()
.setPositionX(x)
.setPositionY(y)
.setPositionZ(z)
.build();
}
private static String stringify(Position position) {
return "(" + position.getPositionX() + ", " + position.getPositionY() + ", " + position.getPositionZ() + ")";
}
}
package de.tudresden.inf.st.ros2rag.example;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
/**
* Helper class to receive updates via MQTT and use callbacks to handle those messages.
*
* @author rschoene - Initial contribution
*/
public class MqttUpdater {
private final Logger logger;
/** The host running the MQTT broker. */
private URI host;
/** The connection to the MQTT broker. */
private CallbackConnection connection;
/** Whether we are subscribed to the topics yet */
private Condition readyCondition;
private Lock readyLock;
private boolean ready;
private QoS qos;
/** Dispatch knowledge */
private final Map<String, Consumer<byte[]>> callbacks;
public MqttUpdater() {
this.logger = LogManager.getLogger(MqttUpdater.class);
this.callbacks = new HashMap<>();
this.readyLock = new ReentrantLock();
this.readyCondition = readyLock.newCondition();
this.ready = false;
this.qos = QoS.AT_LEAST_ONCE;
}
/**
* Sets the host to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttUpdater setHost(String host, int port) throws IOException {
this.host = URI.create("tcp://" + host + ":" + port);
logger.debug("Host is {}", this.host);
Objects.requireNonNull(this.host, "Host need to be set!");
MQTT mqtt = new MQTT();
mqtt.setHost(this.host);
connection = mqtt.callbackConnection();
AtomicReference<Throwable> error = new AtomicReference<>();
// add the listener to dispatch messages later
connection.listener(new ExtendedListener() {
public void onConnected() {
logger.debug("Connected");
}
@Override
public void onDisconnected() {
logger.debug("Disconnected");
}
@Override
public void onPublish(UTF8Buffer topic, Buffer body, Callback<Callback<Void>> ack) {
String topicString = topic.toString();
Consumer<byte[]> callback = callbacks.get(topicString);
if (callback == null) {
logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?");
} else {
byte[] message = body.toByteArray();
// System.out.println("message = " + Arrays.toString(message));
callback.accept(message);
}
ack.onSuccess(null); // always acknowledge message
}
@Override
public void onPublish(UTF8Buffer topicBuffer, Buffer body, Runnable ack) {
logger.warn("onPublish should not be called");
}
@Override
public void onFailure(Throwable cause) {
// logger.catching(cause);
error.set(cause);
}
});
throwIf(error);
// actually establish the connection
connection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
connection.publish("components", "Ros2Rag is listening".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
try {
readyLock.lock();
ready = true;
readyCondition.signalAll();
} finally {
readyLock.unlock();
}
}
@Override
public void onFailure(Throwable value) {
logger.debug("failure sending welcome message", value);
}
});
}
@Override
public void onFailure(Throwable cause) {
// logger.error("Could not connect", cause);
error.set(cause);
}
});
throwIf(error);
return this;
}
private void throwIf(AtomicReference<Throwable> error) throws IOException {
if (error.get() != null) {
throw new IOException(error.get());
}
}
public void setQoSForSubscription(QoS qos) {
this.qos = qos;
}
public void newConnection(String topic, Consumer<byte[]> callback) {
if (!ready) {
// TODO should maybe be something more kind than throwing an exception here
throw new IllegalStateException("Updater not ready");
}
// register callback
callbacks.put(topic, callback);
// subscribe at broker
Topic[] topicArray = { new Topic(topic, this.qos) };
connection.subscribe(topicArray, new Callback<byte[]>() {
@Override
public void onSuccess(byte[] qoses) {
logger.debug("Subscribed, qoses: {}", qoses);
}
@Override
public void onFailure(Throwable cause) {
logger.error("Could not subscribe", cause);
}
});
}
/**
* Waits until this updater is ready to receive MQTT messages.
* If it already is ready, return immediately with the value <code>true</code>.
* Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan,
* if it got ready, or <code>false</code> upon a timeout.
* @param time the maximum time to wait
* @param unit the time unit of the time argument
* @return whether this updater is ready
*/
public boolean waitUntilReady(long time, TimeUnit unit) {
try {
readyLock.lock();
if (ready) {
return true;
}
return readyCondition.await(time, unit);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readyLock.unlock();
}
return false;
}
public void close() {
if (connection == null) {
logger.warn("Stopping without connection. Was setHost() called?");
return;
}
connection.disconnect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.info("Disconnected from {}", host);
}
@Override
public void onFailure(Throwable ignored) {
// Disconnects never fail. And we do not care either.
}
});
}
}
syntax = "proto3";
package panda;
message PandaLinkState {
string name = 1;
message Position {
float positionX = 1;
float positionY = 2;
float positionZ = 3;
}
message Orientation {
float orientationX = 1;
float orientationY = 2;
float orientationZ = 3;
float orientationW = 4;
}
message TwistLinear {
float twistLinearX = 1;
float twistLinearY = 2;
float twistLinearZ = 3;
}
message TwistAngular {
float twistAngularX = 1;
float twistAngularY = 2;
float twistAngularZ = 3;
}
Position pos = 2;
Orientation orient = 3;
TwistLinear tl = 4;
TwistAngular ta = 5;
}
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
build
src/gen-res/
src/gen/
out/
*.class
apply plugin: 'application'
apply plugin: 'com.google.protobuf'
sourceCompatibility = 1.8
mainClassName = 'de.tudresden.inf.st.ros2rag.senderstub.Main'
repositories {
jcenter()
}
buildscript {
repositories.jcenter()
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.12'
}
}
sourceSets.main.java.srcDir "src/gen/java"
jar.manifest.attributes('Main-Class': 'de.tudresden.inf.st.ros2rag.senderstub.Main')
dependencies {
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: "${jackson_version}"
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}"
implementation group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11'
compile 'com.google.protobuf:protobuf-java:3.0.0'
compile group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15'
protobuf files("$projectDir/../ros2rag.example/src/main/proto")
}
test {
useJUnitPlatform()
maxHeapSize = '1G'
}
protobuf {
// create strange directories, so use default here
// generatedFilesBaseDir = "$projectDir/src/gen/java"
protoc {
// The artifact spec for the Protobuf Compiler
artifact = 'com.google.protobuf:protoc:3.0.0'
}
}
package de.tudresden.inf.st.ros2rag.senderstub;
import org.fusesource.mqtt.client.QoS;
import java.util.concurrent.TimeUnit;
/**
* Small helper to publish messages to a MQTT broker.
*
* @author rschoene - Initial contribution
*/
public interface MQTTSender extends AutoCloseable {
/**
* Sets the host running the MQTT broker.
* @param host host name (IP address or domain name)
* @param port port to use
*/
MQTTSender setHost(String host, int port);
/**
* Set the timeout used for connecting and disconnecting.
* @param connectTimeout Timeout value
* @param connectTimeoutUnit Timeout unit
*/
void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit);
/**
* Set the timeout used for publishing messages.
* @param publishTimeout Timeout value
* @param publishTimeoutUnit Timeout unit
*/
void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit);
/**
* Publishes a message in a topic at most once.
* @param topic the topic to publish at
* @param message the message to publish
* @throws Exception if the underlying connection throws an error
*/
default void publish(String topic, byte[] message) throws Exception {
this.publish(topic, message, QoS.AT_MOST_ONCE);
}
/**
* Publishes a message in a topic with the given quality of service (QoS).
* @param topic the topic to publish at
* @param message the message to publish
* @param qos the needed quality of service (at most once, at least once, exactly once)
* @throws Exception if the underlying connection throws an error
*/
void publish(String topic, byte[] message, QoS qos) throws Exception;
/**
* Checks, whether the connection to the host (set in the constructor) is established.
* @return <code>true</code> if this sender is connected to the host
*/
boolean isConnected();
@Override
void close() throws Exception;
}
package de.tudresden.inf.st.ros2rag.senderstub;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.mqtt.client.*;
import java.net.URI;
import java.util.concurrent.TimeUnit;
/**
* Implementation of a MQTT sender using <code>org.fusesource.mqtt.client</code>.
*
* @author rschoene - Initial contribution
*/
public class MQTTSenderImpl implements MQTTSender {
private final Logger logger = LogManager.getLogger(MQTTSenderImpl.class);
/** The connection to the MQTT broker. */
private FutureConnection connection;
/** Timeout for connect/disconnect methods */
private long connectTimeout;
/** Unit of timeout for connect/disconnect methods */
private TimeUnit connectTimeoutUnit;
/** Timeout for publish method */
private long publishTimeout;
/** Unit of timeout for publish method */
private TimeUnit publishTimeoutUnit;
@Override
public MQTTSender setHost(String host, int port) {
/* The host running the MQTT broker. */
URI hostUri = URI.create("tcp://" + host + ":" + port);
logger.debug("Host is {}", hostUri);
MQTT mqtt = new MQTT();
mqtt.setHost(hostUri);
connection = mqtt.futureConnection();
setConnectTimeout(2, TimeUnit.SECONDS);
setPublishTimeout(1, TimeUnit.SECONDS);
ensureConnected();
return this;
}
@Override
public void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit) {
this.connectTimeout = connectTimeout;
this.connectTimeoutUnit = connectTimeoutUnit;
}
@Override
public void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit) {
this.publishTimeout = publishTimeout;
this.publishTimeoutUnit = publishTimeoutUnit;
}
@Override
public void publish(String topic, byte[] message, QoS qos) throws Exception {
if (ensureConnected()) {
logger.debug("Send: {} -> {}", topic, message);
connection.publish(topic, message, qos, false).await(publishTimeout, publishTimeoutUnit);
}
}
/**
* Ensures an established connection.
* If already connected, return immediately. Otherwise try to connect.
* @return <code>true</code> if the connected was established successfully, <code>false</code> if there was an error
*/
private boolean ensureConnected() {
if (!isConnected()) {
try {
connection.connect().await(connectTimeout, connectTimeoutUnit);
} catch (Exception e) {
logger.warn("Could not connect", e);
return false;
}
}
return true;
}
@Override
public boolean isConnected() {
return connection != null && connection.isConnected();
}
@Override
public void close() throws Exception {
if (connection == null) {
logger.warn("Stopping without connection.");
return;
}
if (isConnected()) {
connection.disconnect().await(connectTimeout, connectTimeoutUnit);
}
}
}
package de.tudresden.inf.st.ros2rag.senderstub;
import panda.Linkstate;
import java.util.Arrays;
public class Main {
public static void main(String[] args) throws Exception {
String topic;
if (args.length < 1) {
topic = "robot/joint1";
} else {
topic = args[0];
}
Linkstate.PandaLinkState pls = Linkstate.PandaLinkState.newBuilder()
.setName("Joint1")
.setPos(Linkstate.PandaLinkState.Position.newBuilder()
.setPositionX(0.5f)
.setPositionY(0.5f)
.setPositionZ(0.5f)
.build())
.setOrient(Linkstate.PandaLinkState.Orientation.newBuilder()
.setOrientationX(0)
.setOrientationY(0)
.setOrientationZ(0)
.setOrientationW(0)
.build())
.build();
MQTTSender sender = new MQTTSenderImpl();
sender.setHost("localhost", 1883);
// System.out.println("pls.toByteArray() = " + Arrays.toString(pls.toByteArray()));
sender.publish(topic, pls.toByteArray());
sender.close();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
#!/usr/bin/env bash
./ros2rag.senderstub/build/install/ros2rag.senderstub/bin/ros2rag.senderstub $@
......@@ -2,3 +2,4 @@ rootProject.name = 'ros2rag'
include 'ros2rag.base'
include 'ros2rag.example'
include 'ros2rag.senderstub'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment