Commit 874da761 authored by René Schöne's avatar René Schöne
Browse files

Reusing MqttUpdater in senderstub and new receiverstub.

- Example: add name to MqttUpdater
- Senderstub: replace MQTTSender with MqttUpdater
- Receiverstub: new component to observe topics and pretty-print sent protobuf objects
parent afea43e5
#!/usr/bin/env bash
./ros2rag.receiverstub/build/install/ros2rag.receiverstub/bin/ros2rag.receiverstub $@
......@@ -26,6 +26,7 @@ import java.util.function.Consumer;
public class MqttUpdater {
private final Logger logger;
private final String name;
/** The host running the MQTT broker. */
private URI host;
......@@ -40,6 +41,11 @@ public class MqttUpdater {
private final Map<String, Consumer<byte[]>> callbacks;
public MqttUpdater() {
this("Ros2Rag");
}
public MqttUpdater(String name) {
this.name = Objects.requireNonNull(name, "Name must be set");
this.logger = LogManager.getLogger(MqttUpdater.class);
this.callbacks = new HashMap<>();
this.readyLock = new ReentrantLock();
......@@ -55,7 +61,7 @@ public class MqttUpdater {
*/
public MqttUpdater setHost(String host, int port) throws IOException {
this.host = URI.create("tcp://" + host + ":" + port);
logger.debug("Host is {}", this.host);
logger.debug("Host for {} is {}", this.name, this.host);
Objects.requireNonNull(this.host, "Host need to be set!");
MQTT mqtt = new MQTT();
......@@ -105,7 +111,7 @@ public class MqttUpdater {
connection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
connection.publish("components", "Ros2Rag is listening".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
connection.publish("components", (name + " is connected").getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
......@@ -162,12 +168,12 @@ public class MqttUpdater {
connection.subscribe(topicArray, new Callback<byte[]>() {
@Override
public void onSuccess(byte[] qoses) {
logger.debug("Subscribed, qoses: {}", qoses);
logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
}
@Override
public void onFailure(Throwable cause) {
logger.error("Could not subscribe", cause);
logger.error("Could not subscribe to {}", topic, cause);
}
});
}
......@@ -204,7 +210,7 @@ public class MqttUpdater {
connection.disconnect(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.info("Disconnected from {}", host);
logger.info("Disconnected {} from {}", name, host);
}
@Override
......@@ -218,7 +224,7 @@ public class MqttUpdater {
connection.publish(topic, bytes, qos, false, new Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("Published some bytes");
logger.debug("Published some bytes to {}", topic);
}
@Override
......
......@@ -2,11 +2,11 @@
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{1.} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
......
build
src/gen-res/
src/gen/
out/
*.class
apply plugin: 'application'
apply plugin: 'com.google.protobuf'
sourceCompatibility = 1.8
mainClassName = 'de.tudresden.inf.st.ros2rag.receiverstub.Main'
repositories {
jcenter()
}
buildscript {
repositories.jcenter()
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.12'
}
}
sourceSets.main.java.srcDir "src/gen/java"
jar.manifest.attributes('Main-Class': 'de.tudresden.inf.st.ros2rag.receiverstub.Main')
dependencies {
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: "${jackson_version}"
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}"
implementation group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11'
compile 'com.google.protobuf:protobuf-java:3.0.0'
compile group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15'
implementation project(':ros2rag.example')
protobuf files("$projectDir/../ros2rag.example/src/main/proto")
}
test {
useJUnitPlatform()
maxHeapSize = '1G'
}
protobuf {
// create strange directories, so use default here
// generatedFilesBaseDir = "$projectDir/src/gen/java"
protoc {
// The artifact spec for the Protobuf Compiler
artifact = 'com.google.protobuf:protoc:3.0.0'
}
}
package de.tudresden.inf.st.ros2rag.receiverstub;
import com.google.protobuf.InvalidProtocolBufferException;
import config.Robotconfig.RobotConfig;
import de.tudresden.inf.st.ros2rag.example.MqttUpdater;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import panda.Linkstate.PandaLinkState;
import java.text.MessageFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private static final Logger logger = LogManager.getLogger(Main.class);
public static void main(String[] args) throws Exception {
String jointTopic, configTopic;
if (args.length < 2) {
jointTopic = "robot/joint1";
configTopic = "robot/config";
} else {
jointTopic = args[0];
configTopic = args[1];
}
final Lock finishLock = new ReentrantLock();
final Condition finishCondition = finishLock.newCondition();
MqttUpdater receiver = new MqttUpdater("receiver stub");
receiver.setHost("localhost", 1883);
receiver.waitUntilReady(2, TimeUnit.SECONDS);
receiver.newConnection(configTopic, bytes -> {
try {
logger.debug("Got a config message, parsing ...");
RobotConfig robotConfig = RobotConfig.parseFrom(bytes);
logger.info("robotConfig: speed = {}, loopTrajectory = {}, planningMode = {}",
robotConfig.getSpeed(),
robotConfig.getLoopTrajectory(),
robotConfig.getPlanningMode().toString());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
});
receiver.newConnection(jointTopic, bytes -> {
try {
logger.debug("Got a joint message, parsing ...");
PandaLinkState pls = PandaLinkState.parseFrom(bytes);
PandaLinkState.Position tmpPosition = pls.getPos();
PandaLinkState.Orientation tmpOrientation = pls.getOrient();
PandaLinkState.TwistLinear tmpTwistLinear = pls.getTl();
PandaLinkState.TwistAngular tmpTwistAngular = pls.getTa();
logger.info("{}: pos({},{},{}), orient({},{},{},{})," +
" twist-linear({},{},{}), twist-angular({},{},{})",
pls.getName(),
tmpPosition.getPositionX(),
tmpPosition.getPositionY(),
tmpPosition.getPositionZ(),
tmpOrientation.getOrientationX(),
tmpOrientation.getOrientationY(),
tmpOrientation.getOrientationZ(),
tmpOrientation.getOrientationW(),
tmpTwistLinear.getTwistLinearX(),
tmpTwistLinear.getTwistLinearY(),
tmpTwistLinear.getTwistLinearZ(),
tmpTwistAngular.getTwistAngularX(),
tmpTwistAngular.getTwistAngularY(),
tmpTwistAngular.getTwistAngularZ());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
});
receiver.newConnection("components", bytes -> {
String message = new String(bytes);
logger.info("Components: {}", message);
});
receiver.newConnection("receiver", bytes -> {
String message = new String(bytes);
if (message.equals("exit")) {
try {
finishLock.lock();
finishCondition.signal();
} finally {
finishLock.unlock();
}
}
});
try {
finishLock.lock();
finishCondition.await();
} finally {
finishLock.unlock();
}
receiver.close();
Runtime.getRuntime().addShutdownHook(new Thread(receiver::close));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %logger{1.} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
......@@ -26,6 +26,8 @@ dependencies {
compile 'com.google.protobuf:protobuf-java:3.0.0'
compile group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15'
implementation project(':ros2rag.example')
protobuf files("$projectDir/../ros2rag.example/src/main/proto")
}
......
package de.tudresden.inf.st.ros2rag.senderstub;
import org.fusesource.mqtt.client.QoS;
import java.util.concurrent.TimeUnit;
/**
* Small helper to publish messages to a MQTT broker.
*
* @author rschoene - Initial contribution
*/
public interface MQTTSender extends AutoCloseable {
/**
* Sets the host running the MQTT broker.
* @param host host name (IP address or domain name)
* @param port port to use
*/
MQTTSender setHost(String host, int port);
/**
* Set the timeout used for connecting and disconnecting.
* @param connectTimeout Timeout value
* @param connectTimeoutUnit Timeout unit
*/
void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit);
/**
* Set the timeout used for publishing messages.
* @param publishTimeout Timeout value
* @param publishTimeoutUnit Timeout unit
*/
void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit);
/**
* Publishes a message in a topic at most once.
* @param topic the topic to publish at
* @param message the message to publish
* @throws Exception if the underlying connection throws an error
*/
default void publish(String topic, byte[] message) throws Exception {
this.publish(topic, message, QoS.AT_MOST_ONCE);
}
/**
* Publishes a message in a topic with the given quality of service (QoS).
* @param topic the topic to publish at
* @param message the message to publish
* @param qos the needed quality of service (at most once, at least once, exactly once)
* @throws Exception if the underlying connection throws an error
*/
void publish(String topic, byte[] message, QoS qos) throws Exception;
/**
* Checks, whether the connection to the host (set in the constructor) is established.
* @return <code>true</code> if this sender is connected to the host
*/
boolean isConnected();
@Override
void close() throws Exception;
}
package de.tudresden.inf.st.ros2rag.senderstub;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.mqtt.client.*;
import java.net.URI;
import java.util.concurrent.TimeUnit;
/**
* Implementation of a MQTT sender using <code>org.fusesource.mqtt.client</code>.
*
* @author rschoene - Initial contribution
*/
public class MQTTSenderImpl implements MQTTSender {
private final Logger logger = LogManager.getLogger(MQTTSenderImpl.class);
/** The connection to the MQTT broker. */
private FutureConnection connection;
/** Timeout for connect/disconnect methods */
private long connectTimeout;
/** Unit of timeout for connect/disconnect methods */
private TimeUnit connectTimeoutUnit;
/** Timeout for publish method */
private long publishTimeout;
/** Unit of timeout for publish method */
private TimeUnit publishTimeoutUnit;
@Override
public MQTTSender setHost(String host, int port) {
/* The host running the MQTT broker. */
URI hostUri = URI.create("tcp://" + host + ":" + port);
logger.debug("Host is {}", hostUri);
MQTT mqtt = new MQTT();
mqtt.setHost(hostUri);
connection = mqtt.futureConnection();
setConnectTimeout(2, TimeUnit.SECONDS);
setPublishTimeout(1, TimeUnit.SECONDS);
ensureConnected();
return this;
}
@Override
public void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit) {
this.connectTimeout = connectTimeout;
this.connectTimeoutUnit = connectTimeoutUnit;
}
@Override
public void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit) {
this.publishTimeout = publishTimeout;
this.publishTimeoutUnit = publishTimeoutUnit;
}
@Override
public void publish(String topic, byte[] message, QoS qos) throws Exception {
if (ensureConnected()) {
logger.debug("Send: {} -> {}", topic, message);
connection.publish(topic, message, qos, false).await(publishTimeout, publishTimeoutUnit);
}
}
/**
* Ensures an established connection.
* If already connected, return immediately. Otherwise try to connect.
* @return <code>true</code> if the connected was established successfully, <code>false</code> if there was an error
*/
private boolean ensureConnected() {
if (!isConnected()) {
try {
connection.connect().await(connectTimeout, connectTimeoutUnit);
} catch (Exception e) {
logger.warn("Could not connect", e);
return false;
}
}
return true;
}
@Override
public boolean isConnected() {
return connection != null && connection.isConnected();
}
@Override
public void close() throws Exception {
if (connection == null) {
logger.warn("Stopping without connection.");
return;
}
if (isConnected()) {
connection.disconnect().await(connectTimeout, connectTimeoutUnit);
}
}
}
package de.tudresden.inf.st.ros2rag.senderstub;
import de.tudresden.inf.st.ros2rag.example.MqttUpdater;
import panda.Linkstate;
public class Main {
......@@ -24,7 +25,7 @@ public class Main {
.setOrientationW(0)
.build())
.build();
MQTTSender sender = new MQTTSenderImpl();
MqttUpdater sender = new MqttUpdater("sender stub");
sender.setHost("localhost", 1883);
// System.out.println("pls.toByteArray() = " + Arrays.toString(pls.toByteArray()));
sender.publish(topic, pls.toByteArray());
......
......@@ -6,7 +6,7 @@
</Console>
</Appenders>
<Loggers>
<Root level="info">
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
......
......@@ -3,3 +3,4 @@ rootProject.name = 'ros2rag'
include 'ros2rag.base'
include 'ros2rag.example'
include 'ros2rag.senderstub'
include 'ros2rag.receiverstub'
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment