From 2c1540b7194c1f2658a09e6284f6dbbf0f302eb4 Mon Sep 17 00:00:00 2001 From: rschoene <rene.schoene@tu-dresden.de> Date: Wed, 8 Apr 2020 18:07:41 +0200 Subject: [PATCH] Somewhat working READ. - Example: use Position class created by protobuf, use a wrapper in grammar instead for lists - Example: manually extend Joint, currently only one connect - Example: added generic MqttUpdater handling subscriptions - Example: added some glue code, that would be generated normally - Example: copied protobuf definition for linkstate - SenderStub: new module with main to construct minimal linkstate and send it via mqtt --- ros2rag.example/build.gradle | 5 +- .../src/main/jastadd/Example.relast | 12 +- .../src/main/jastadd/Generated.jrag | 59 +++++ .../src/main/jastadd/Generated.relast | 3 + .../st/ros2rag/example/GeneratedJoint.java | 39 ++++ .../st/ros2rag/example/GeneratedRobotArm.java | 11 + .../inf/st/ros2rag/example/Main.java | 64 +++++- .../inf/st/ros2rag/example/MqttUpdater.java | 214 ++++++++++++++++++ .../src/main/proto/linkstate.proto | 38 ++++ ros2rag.example/src/main/resources/log4j2.xml | 13 ++ ros2rag.senderstub/.gitignore | 5 + ros2rag.senderstub/build.gradle | 45 ++++ .../inf/st/ros2rag/senderstub/MQTTSender.java | 62 +++++ .../st/ros2rag/senderstub/MQTTSenderImpl.java | 97 ++++++++ .../inf/st/ros2rag/senderstub/Main.java | 35 +++ .../src/main/resources/log4j2.xml | 13 ++ send_one.sh | 2 + settings.gradle | 1 + 18 files changed, 709 insertions(+), 9 deletions(-) create mode 100644 ros2rag.example/src/main/jastadd/Generated.jrag create mode 100644 ros2rag.example/src/main/jastadd/Generated.relast create mode 100644 ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/GeneratedJoint.java create mode 100644 ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/GeneratedRobotArm.java create mode 100644 ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/MqttUpdater.java create mode 100644 ros2rag.example/src/main/proto/linkstate.proto create mode 100644 ros2rag.example/src/main/resources/log4j2.xml create mode 100644 ros2rag.senderstub/.gitignore create mode 100644 ros2rag.senderstub/build.gradle create mode 100644 ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSender.java create mode 100644 ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSenderImpl.java create mode 100644 ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/Main.java create mode 100644 ros2rag.senderstub/src/main/resources/log4j2.xml create mode 100755 send_one.sh diff --git a/ros2rag.example/build.gradle b/ros2rag.example/build.gradle index 70dba00..ddfeaf8 100644 --- a/ros2rag.example/build.gradle +++ b/ros2rag.example/build.gradle @@ -19,13 +19,14 @@ buildscript { } sourceSets.main.java.srcDir "src/gen/java" -jar.manifest.attributes('Main-Class': 'de.tudresden.inf.st.ros2rag.Main') +jar.manifest.attributes('Main-Class': 'de.tudresden.inf.st.ros2rag.example.Main') dependencies { implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: "${jackson_version}" implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}" implementation group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11' compile 'com.google.protobuf:protobuf-java:3.0.0' + compile group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15' jastadd2 "org.jastadd:jastadd:2.3.4" } @@ -99,7 +100,7 @@ jastadd { } // Input files -def relastFiles = ["./src/main/jastadd/Example.relast"] +def relastFiles = ["src/main/jastadd/Example.relast", "src/main/jastadd/Generated.relast"] // phase: RelAst -> JastAdd task relastToJastAdd(type: JavaExec) { diff --git a/ros2rag.example/src/main/jastadd/Example.relast b/ros2rag.example/src/main/jastadd/Example.relast index 7b83143..b72ddd5 100644 --- a/ros2rag.example/src/main/jastadd/Example.relast +++ b/ros2rag.example/src/main/jastadd/Example.relast @@ -1,15 +1,17 @@ Model ::= RobotArm ZoneModel ; -ZoneModel ::= Size:Position SafetyZone:Zone*; +ZoneModel ::= <Size:Position> SafetyZone:Zone*; -Zone ::= Position*; +Zone ::= PositionWrapper*; // Do not use terminal-NTA's for now, as relast has problems with it "/<ShouldUseLowSpeed:Boolean>/" ; RobotArm ::= Joint* EndEffector ; -Joint ::= <Name> ; -rel Joint.CurrentPosition -> Position ; +Joint ::= <Name> <CurrentPosition:Position> ; +//rel Joint.CurrentPosition -> Position_Old ; EndEffector : Joint; -Position ::= <x:int> <y:int> <z:int> ; +Position_Old ::= <x:int> <y:int> <z:int> ; +PositionWrapper ::= <Position:Position> ; + diff --git a/ros2rag.example/src/main/jastadd/Generated.jrag b/ros2rag.example/src/main/jastadd/Generated.jrag new file mode 100644 index 0000000..0c0fce1 --- /dev/null +++ b/ros2rag.example/src/main/jastadd/Generated.jrag @@ -0,0 +1,59 @@ +import de.tudresden.inf.st.ros2rag.example.MqttUpdater; +import panda.Linkstate.PandaLinkState.Position; + +// this aspect depends on the actual grammar. probably we need to provide the root node type, in this case "Model" +// it is somewhat problematic, because it assumes a single root to store the mqtt-host +aspect GrammarExtension { + // kind of private NTA typed "MqttRoot" and named "_MqttRoot" + syn nta MqttRoot Model.get_MqttRoot() { + return new MqttRoot(); + } + + public void Model.updateMqttHost(String host) throws java.io.IOException { + get_MqttRoot().updateHost(host); + } + + public void Model.updateMqttHost(String host, int port) throws java.io.IOException { + get_MqttRoot().updateHost(host, port); + } + + public boolean Model.waitUntilReady(long time, java.util.concurrent.TimeUnit unit) { + return get_MqttRoot().getUpdater().waitUntilReady(time, unit); + } + + inh MqttUpdater Joint._mqttUpdater(); + eq Model.getRobotArm()._mqttUpdater() = get_MqttRoot().getUpdater(); + eq Model.getZoneModel()._mqttUpdater() = get_MqttRoot().getUpdater(); +} + +// this aspect is generic and will be always generated in the same way +aspect Mqtt { + // --- default values --- + private static final int MqttRoot.DEFAULT_PORT = 1883; + + void MqttRoot.updateHost(String host) throws java.io.IOException { + updateHost(host, DEFAULT_PORT); + } + + void MqttRoot.updateHost(String host, int port) throws java.io.IOException { + setHost(ExternalHost.of(host, port)); + if (getUpdater() != null) { + // close connection to old updater first + getUpdater().close(); + } + setUpdater(new MqttUpdater().setHost(host, port)); + } + + public static ExternalHost ExternalHost.of(String hostName, int defaultPort) { + String host = hostName; + int port = defaultPort; + if (hostName.contains(":")) { + String[] parts = hostName.split(":"); + host = parts[0]; + port = Integer.parseInt(parts[1]); + } + return new ExternalHost(host, port); + } + + syn String ExternalHost.urlAsString() = String.format("http://%s:%s", getHostName(), getPort()); +} diff --git a/ros2rag.example/src/main/jastadd/Generated.relast b/ros2rag.example/src/main/jastadd/Generated.relast new file mode 100644 index 0000000..7c6b9db --- /dev/null +++ b/ros2rag.example/src/main/jastadd/Generated.relast @@ -0,0 +1,3 @@ +MqttRoot ::= [Host:ExternalHost] <Updater:MqttUpdater> ; +ExternalHost ::= <HostName:String> <Port:int> ; + diff --git a/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/GeneratedJoint.java b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/GeneratedJoint.java new file mode 100644 index 0000000..6641c84 --- /dev/null +++ b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/GeneratedJoint.java @@ -0,0 +1,39 @@ +package de.tudresden.inf.st.ros2rag.example; + +import com.google.protobuf.InvalidProtocolBufferException; +import de.tudresden.inf.st.ros2rag.ast.Joint; +import panda.Linkstate.PandaLinkState; +import panda.Linkstate.PandaLinkState.Position; + +/** + * Manually written code for Joint to be actually generated later. + * + * @author rschoene - Initial contribution + */ +public class GeneratedJoint extends Joint { + + /* + Input for this to be generated: + + // when an update of pose is read via mqtt, then update current position + [always] read Joint.CurrentPosition using LinkStateToPosition; + + // panda.LinkState is a datatype defined in protobuf + LinkStateToPosition: map panda.Linkstate x to Position y using { + y = x.getPos(); + } + */ + public void connectCurrentPosition(String topic) { + _mqttUpdater().newConnection(topic, message -> { + // Parse message into a LinkState + try { + PandaLinkState x = PandaLinkState.parseFrom(message); + Position y = x.getPos(); + setCurrentPosition(y); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + }); + } + +} diff --git a/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/GeneratedRobotArm.java b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/GeneratedRobotArm.java new file mode 100644 index 0000000..56d326a --- /dev/null +++ b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/GeneratedRobotArm.java @@ -0,0 +1,11 @@ +package de.tudresden.inf.st.ros2rag.example; + +import de.tudresden.inf.st.ros2rag.ast.RobotArm; + +/** + * Manually written code for RobotArm to be actually generated later. + * + * @author rschoene - Initial contribution + */ +public class GeneratedRobotArm extends RobotArm { +} diff --git a/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/Main.java b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/Main.java index c5b575d..8606a19 100644 --- a/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/Main.java +++ b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/Main.java @@ -1,12 +1,72 @@ package de.tudresden.inf.st.ros2rag.example; +import com.google.protobuf.InvalidProtocolBufferException; +import de.tudresden.inf.st.ros2rag.ast.*; +import panda.Linkstate.PandaLinkState.Position; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + /** * Testing Ros2Rag without generating something. * * @author rschoene - Initial contribution */ public class Main { - public static void main(String[] args) { - System.out.println("Hello"); + public static void main(String[] args) throws InvalidProtocolBufferException, IOException, InterruptedException { + Model model = new Model(); + model.updateMqttHost("localhost"); + + ZoneModel zoneModel = new ZoneModel(); + zoneModel.setSize(makePosition(1, 1, 1)); + + Position myPosition = makePosition(0, 0, 0); + PositionWrapper myPositionWrapper = new PositionWrapper(myPosition); + PositionWrapper leftPosition = new PositionWrapper(makePosition(-1, 0, 0)); + PositionWrapper rightPosition = new PositionWrapper(makePosition(1, 0, 0)); + + Zone safetyZone = new Zone(); + safetyZone.addPositionWrapper(myPositionWrapper); + safetyZone.addPositionWrapper(leftPosition); + safetyZone.addPositionWrapper(rightPosition); + zoneModel.addSafetyZone(safetyZone); + + RobotArm robotArm = new GeneratedRobotArm(); + + GeneratedJoint joint1 = new GeneratedJoint(); + joint1.setName("joint1"); + joint1.setCurrentPosition(myPosition); + + EndEffector endEffector = new EndEffector(); + endEffector.setName("gripper"); + endEffector.setCurrentPosition(myPosition); + + robotArm.addJoint(joint1); + robotArm.setEndEffector(endEffector); + model.setRobotArm(robotArm); + + model.waitUntilReady(2, TimeUnit.SECONDS); + + joint1.connectCurrentPosition("robot/joint1"); + System.out.println("BEFORE joint1.getCurrentPosition() = " + stringify(joint1.getCurrentPosition())); + + Thread.sleep(10000); + + System.out.println("AFTER joint1.getCurrentPosition() = " + stringify(joint1.getCurrentPosition())); + + // TODO close/shutdown should be exposed + model.get_MqttRoot().getUpdater().close(); + } + + private static Position makePosition(int x, int y, int z) { + return Position.newBuilder() + .setPositionX(x) + .setPositionY(y) + .setPositionZ(z) + .build(); + } + + private static String stringify(Position position) { + return "(" + position.getPositionX() + ", " + position.getPositionY() + ", " + position.getPositionZ() + ")"; } } diff --git a/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/MqttUpdater.java b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/MqttUpdater.java new file mode 100644 index 0000000..cc9be87 --- /dev/null +++ b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/MqttUpdater.java @@ -0,0 +1,214 @@ +package de.tudresden.inf.st.ros2rag.example; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.mqtt.client.*; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +/** + * Helper class to receive updates via MQTT and use callbacks to handle those messages. + * + * @author rschoene - Initial contribution + */ +public class MqttUpdater { + + private final Logger logger; + + /** The host running the MQTT broker. */ + private URI host; + /** The connection to the MQTT broker. */ + private CallbackConnection connection; + /** Whether we are subscribed to the topics yet */ + private Condition readyCondition; + private Lock readyLock; + private boolean ready; + private QoS qos; + /** Dispatch knowledge */ + private final Map<String, Consumer<byte[]>> callbacks; + + public MqttUpdater() { + this.logger = LogManager.getLogger(MqttUpdater.class); + this.callbacks = new HashMap<>(); + this.readyLock = new ReentrantLock(); + this.readyCondition = readyLock.newCondition(); + this.ready = false; + this.qos = QoS.AT_LEAST_ONCE; + } + + /** + * Sets the host to receive messages from, and connects to it. + * @throws IOException if could not connect, or could not subscribe to a topic + * @return self + */ + public MqttUpdater setHost(String host, int port) throws IOException { + this.host = URI.create("tcp://" + host + ":" + port); + logger.debug("Host is {}", this.host); + + Objects.requireNonNull(this.host, "Host need to be set!"); + MQTT mqtt = new MQTT(); + mqtt.setHost(this.host); + connection = mqtt.callbackConnection(); + AtomicReference<Throwable> error = new AtomicReference<>(); + + // add the listener to dispatch messages later + connection.listener(new ExtendedListener() { + public void onConnected() { + logger.debug("Connected"); + } + + @Override + public void onDisconnected() { + logger.debug("Disconnected"); + } + + @Override + public void onPublish(UTF8Buffer topic, Buffer body, Callback<Callback<Void>> ack) { + String topicString = topic.toString(); + Consumer<byte[]> callback = callbacks.get(topicString); + if (callback == null) { + logger.debug("Got a message, but no callback to call. Forgot to unsubscribe?"); + } else { + byte[] message = body.toByteArray(); +// System.out.println("message = " + Arrays.toString(message)); + callback.accept(message); + } + ack.onSuccess(null); // always acknowledge message + } + + @Override + public void onPublish(UTF8Buffer topicBuffer, Buffer body, Runnable ack) { + logger.warn("onPublish should not be called"); + } + + @Override + public void onFailure(Throwable cause) { +// logger.catching(cause); + error.set(cause); + } + }); + throwIf(error); + + // actually establish the connection + connection.connect(new Callback<Void>() { + @Override + public void onSuccess(Void value) { + connection.publish("components", "Ros2Rag is listening".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() { + @Override + public void onSuccess(Void value) { + logger.debug("success sending welcome message"); + try { + readyLock.lock(); + ready = true; + readyCondition.signalAll(); + } finally { + readyLock.unlock(); + } + } + + @Override + public void onFailure(Throwable value) { + logger.debug("failure sending welcome message", value); + } + }); + } + + @Override + public void onFailure(Throwable cause) { +// logger.error("Could not connect", cause); + error.set(cause); + } + }); + throwIf(error); + return this; + } + + private void throwIf(AtomicReference<Throwable> error) throws IOException { + if (error.get() != null) { + throw new IOException(error.get()); + } + } + + public void setQoSForSubscription(QoS qos) { + this.qos = qos; + } + + public void newConnection(String topic, Consumer<byte[]> callback) { + if (!ready) { + // TODO should maybe be something more kind than throwing an exception here + throw new IllegalStateException("Updater not ready"); + } + // register callback + callbacks.put(topic, callback); + + // subscribe at broker + Topic[] topicArray = { new Topic(topic, this.qos) }; + connection.subscribe(topicArray, new Callback<byte[]>() { + @Override + public void onSuccess(byte[] qoses) { + logger.debug("Subscribed, qoses: {}", qoses); + } + + @Override + public void onFailure(Throwable cause) { + logger.error("Could not subscribe", cause); + } + }); + } + + /** + * Waits until this updater is ready to receive MQTT messages. + * If it already is ready, return immediately with the value <code>true</code>. + * Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan, + * if it got ready, or <code>false</code> upon a timeout. + * @param time the maximum time to wait + * @param unit the time unit of the time argument + * @return whether this updater is ready + */ + public boolean waitUntilReady(long time, TimeUnit unit) { + try { + readyLock.lock(); + if (ready) { + return true; + } + return readyCondition.await(time, unit); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + readyLock.unlock(); + } + return false; + } + + + public void close() { + if (connection == null) { + logger.warn("Stopping without connection. Was setHost() called?"); + return; + } + connection.disconnect(new Callback<Void>() { + @Override + public void onSuccess(Void value) { + logger.info("Disconnected from {}", host); + } + + @Override + public void onFailure(Throwable ignored) { + // Disconnects never fail. And we do not care either. + } + }); + } + +} diff --git a/ros2rag.example/src/main/proto/linkstate.proto b/ros2rag.example/src/main/proto/linkstate.proto new file mode 100644 index 0000000..dc95138 --- /dev/null +++ b/ros2rag.example/src/main/proto/linkstate.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package panda; + +message PandaLinkState { + + string name = 1; + + message Position { + float positionX = 1; + float positionY = 2; + float positionZ = 3; + } + + message Orientation { + float orientationX = 1; + float orientationY = 2; + float orientationZ = 3; + float orientationW = 4; + } + + message TwistLinear { + float twistLinearX = 1; + float twistLinearY = 2; + float twistLinearZ = 3; + } + + message TwistAngular { + float twistAngularX = 1; + float twistAngularY = 2; + float twistAngularZ = 3; + } + + Position pos = 2; + Orientation orient = 3; + TwistLinear tl = 4; + TwistAngular ta = 5; +} diff --git a/ros2rag.example/src/main/resources/log4j2.xml b/ros2rag.example/src/main/resources/log4j2.xml new file mode 100644 index 0000000..9566029 --- /dev/null +++ b/ros2rag.example/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ +<?xml version="1.0" encoding="UTF-8"?> +<Configuration status="INFO"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/ros2rag.senderstub/.gitignore b/ros2rag.senderstub/.gitignore new file mode 100644 index 0000000..87b4cdd --- /dev/null +++ b/ros2rag.senderstub/.gitignore @@ -0,0 +1,5 @@ +build +src/gen-res/ +src/gen/ +out/ +*.class diff --git a/ros2rag.senderstub/build.gradle b/ros2rag.senderstub/build.gradle new file mode 100644 index 0000000..b5a2762 --- /dev/null +++ b/ros2rag.senderstub/build.gradle @@ -0,0 +1,45 @@ +apply plugin: 'application' +apply plugin: 'com.google.protobuf' + +sourceCompatibility = 1.8 + +mainClassName = 'de.tudresden.inf.st.ros2rag.senderstub.Main' + +repositories { + jcenter() +} + +buildscript { + repositories.jcenter() + dependencies { + classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.12' + } +} + +sourceSets.main.java.srcDir "src/gen/java" +jar.manifest.attributes('Main-Class': 'de.tudresden.inf.st.ros2rag.senderstub.Main') + +dependencies { + implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: "${jackson_version}" + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}" + implementation group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11' + compile 'com.google.protobuf:protobuf-java:3.0.0' + compile group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15' + + protobuf files("$projectDir/../ros2rag.example/src/main/proto") +} + +test { + useJUnitPlatform() + + maxHeapSize = '1G' +} + +protobuf { + // create strange directories, so use default here +// generatedFilesBaseDir = "$projectDir/src/gen/java" + protoc { + // The artifact spec for the Protobuf Compiler + artifact = 'com.google.protobuf:protoc:3.0.0' + } +} diff --git a/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSender.java b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSender.java new file mode 100644 index 0000000..53ecd22 --- /dev/null +++ b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSender.java @@ -0,0 +1,62 @@ +package de.tudresden.inf.st.ros2rag.senderstub; + +import org.fusesource.mqtt.client.QoS; + +import java.util.concurrent.TimeUnit; + +/** + * Small helper to publish messages to a MQTT broker. + * + * @author rschoene - Initial contribution + */ +public interface MQTTSender extends AutoCloseable { + + /** + * Sets the host running the MQTT broker. + * @param host host name (IP address or domain name) + * @param port port to use + */ + MQTTSender setHost(String host, int port); + + /** + * Set the timeout used for connecting and disconnecting. + * @param connectTimeout Timeout value + * @param connectTimeoutUnit Timeout unit + */ + void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit); + + /** + * Set the timeout used for publishing messages. + * @param publishTimeout Timeout value + * @param publishTimeoutUnit Timeout unit + */ + void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit); + + /** + * Publishes a message in a topic at most once. + * @param topic the topic to publish at + * @param message the message to publish + * @throws Exception if the underlying connection throws an error + */ + default void publish(String topic, byte[] message) throws Exception { + this.publish(topic, message, QoS.AT_MOST_ONCE); + } + + /** + * Publishes a message in a topic with the given quality of service (QoS). + * @param topic the topic to publish at + * @param message the message to publish + * @param qos the needed quality of service (at most once, at least once, exactly once) + * @throws Exception if the underlying connection throws an error + */ + void publish(String topic, byte[] message, QoS qos) throws Exception; + + /** + * Checks, whether the connection to the host (set in the constructor) is established. + * @return <code>true</code> if this sender is connected to the host + */ + boolean isConnected(); + + @Override + void close() throws Exception; +} diff --git a/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSenderImpl.java b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSenderImpl.java new file mode 100644 index 0000000..f020cac --- /dev/null +++ b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSenderImpl.java @@ -0,0 +1,97 @@ +package de.tudresden.inf.st.ros2rag.senderstub; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.fusesource.mqtt.client.*; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of a MQTT sender using <code>org.fusesource.mqtt.client</code>. + * + * @author rschoene - Initial contribution + */ +public class MQTTSenderImpl implements MQTTSender { + + private final Logger logger = LogManager.getLogger(MQTTSenderImpl.class); + /** The connection to the MQTT broker. */ + private FutureConnection connection; + + /** Timeout for connect/disconnect methods */ + private long connectTimeout; + /** Unit of timeout for connect/disconnect methods */ + private TimeUnit connectTimeoutUnit; + + /** Timeout for publish method */ + private long publishTimeout; + /** Unit of timeout for publish method */ + private TimeUnit publishTimeoutUnit; + + @Override + public MQTTSender setHost(String host, int port) { + /* The host running the MQTT broker. */ + URI hostUri = URI.create("tcp://" + host + ":" + port); + logger.debug("Host is {}", hostUri); + MQTT mqtt = new MQTT(); + mqtt.setHost(hostUri); + connection = mqtt.futureConnection(); + setConnectTimeout(2, TimeUnit.SECONDS); + setPublishTimeout(1, TimeUnit.SECONDS); + ensureConnected(); + return this; + } + + @Override + public void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit) { + this.connectTimeout = connectTimeout; + this.connectTimeoutUnit = connectTimeoutUnit; + } + + @Override + public void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit) { + this.publishTimeout = publishTimeout; + this.publishTimeoutUnit = publishTimeoutUnit; + } + + @Override + public void publish(String topic, byte[] message, QoS qos) throws Exception { + if (ensureConnected()) { + logger.debug("Send: {} -> {}", topic, message); + connection.publish(topic, message, qos, false).await(publishTimeout, publishTimeoutUnit); + } + } + + /** + * Ensures an established connection. + * If already connected, return immediately. Otherwise try to connect. + * @return <code>true</code> if the connected was established successfully, <code>false</code> if there was an error + */ + private boolean ensureConnected() { + if (!isConnected()) { + try { + connection.connect().await(connectTimeout, connectTimeoutUnit); + } catch (Exception e) { + logger.warn("Could not connect", e); + return false; + } + } + return true; + } + + @Override + public boolean isConnected() { + return connection != null && connection.isConnected(); + } + + @Override + public void close() throws Exception { + if (connection == null) { + logger.warn("Stopping without connection."); + return; + } + if (isConnected()) { + connection.disconnect().await(connectTimeout, connectTimeoutUnit); + } + } +} diff --git a/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/Main.java b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/Main.java new file mode 100644 index 0000000..a3d0ae1 --- /dev/null +++ b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/Main.java @@ -0,0 +1,35 @@ +package de.tudresden.inf.st.ros2rag.senderstub; + +import panda.Linkstate; + +import java.util.Arrays; + +public class Main { + public static void main(String[] args) throws Exception { + String topic; + if (args.length < 1) { + topic = "robot/joint1"; + } else { + topic = args[0]; + } + Linkstate.PandaLinkState pls = Linkstate.PandaLinkState.newBuilder() + .setName("Joint1") + .setPos(Linkstate.PandaLinkState.Position.newBuilder() + .setPositionX(0.5f) + .setPositionY(0.5f) + .setPositionZ(0.5f) + .build()) + .setOrient(Linkstate.PandaLinkState.Orientation.newBuilder() + .setOrientationX(0) + .setOrientationY(0) + .setOrientationZ(0) + .setOrientationW(0) + .build()) + .build(); + MQTTSender sender = new MQTTSenderImpl(); + sender.setHost("localhost", 1883); +// System.out.println("pls.toByteArray() = " + Arrays.toString(pls.toByteArray())); + sender.publish(topic, pls.toByteArray()); + sender.close(); + } +} diff --git a/ros2rag.senderstub/src/main/resources/log4j2.xml b/ros2rag.senderstub/src/main/resources/log4j2.xml new file mode 100644 index 0000000..9566029 --- /dev/null +++ b/ros2rag.senderstub/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ +<?xml version="1.0" encoding="UTF-8"?> +<Configuration status="INFO"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/send_one.sh b/send_one.sh new file mode 100755 index 0000000..5af156b --- /dev/null +++ b/send_one.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +./ros2rag.senderstub/build/install/ros2rag.senderstub/bin/ros2rag.senderstub $@ diff --git a/settings.gradle b/settings.gradle index 584ff30..5a1c0e7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -2,3 +2,4 @@ rootProject.name = 'ros2rag' include 'ros2rag.base' include 'ros2rag.example' +include 'ros2rag.senderstub' -- GitLab