From 874da76118630e9d343a9d45f8a9fbf4c95a2414 Mon Sep 17 00:00:00 2001
From: rschoene <rene.schoene@tu-dresden.de>
Date: Tue, 21 Apr 2020 17:36:18 +0200
Subject: [PATCH] Reusing MqttUpdater in senderstub and new receiverstub.

- Example: add name to MqttUpdater
- Senderstub: replace MQTTSender with MqttUpdater
- Receiverstub: new component to observe topics and pretty-print sent protobuf objects
---
 receiver.sh                                   |   2 +
 .../inf/st/ros2rag/example/MqttUpdater.java   |  18 ++--
 ros2rag.example/src/main/resources/log4j2.xml |   4 +-
 ros2rag.receiverstub/.gitignore               |   5 +
 ros2rag.receiverstub/build.gradle             |  47 ++++++++
 .../inf/st/ros2rag/receiverstub/Main.java     | 100 ++++++++++++++++++
 .../src/main/resources/log4j2.xml             |  13 +++
 ros2rag.senderstub/build.gradle               |   2 +
 .../inf/st/ros2rag/senderstub/MQTTSender.java |  62 -----------
 .../st/ros2rag/senderstub/MQTTSenderImpl.java |  97 -----------------
 .../inf/st/ros2rag/senderstub/Main.java       |   3 +-
 .../src/main/resources/log4j2.xml             |   2 +-
 settings.gradle                               |   1 +
 13 files changed, 187 insertions(+), 169 deletions(-)
 create mode 100755 receiver.sh
 create mode 100644 ros2rag.receiverstub/.gitignore
 create mode 100644 ros2rag.receiverstub/build.gradle
 create mode 100644 ros2rag.receiverstub/src/main/java/de/tudresden/inf/st/ros2rag/receiverstub/Main.java
 create mode 100644 ros2rag.receiverstub/src/main/resources/log4j2.xml
 delete mode 100644 ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSender.java
 delete mode 100644 ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSenderImpl.java

diff --git a/receiver.sh b/receiver.sh
new file mode 100755
index 0000000..732ebc2
--- /dev/null
+++ b/receiver.sh
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+./ros2rag.receiverstub/build/install/ros2rag.receiverstub/bin/ros2rag.receiverstub $@
diff --git a/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/MqttUpdater.java b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/MqttUpdater.java
index bc85411..293ced5 100644
--- a/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/MqttUpdater.java
+++ b/ros2rag.example/src/main/java/de/tudresden/inf/st/ros2rag/example/MqttUpdater.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
 public class MqttUpdater {
 
   private final Logger logger;
+  private final String name;
 
   /** The host running the MQTT broker. */
   private URI host;
@@ -40,6 +41,11 @@ public class MqttUpdater {
   private final Map<String, Consumer<byte[]>> callbacks;
 
   public MqttUpdater() {
+    this("Ros2Rag");
+  }
+
+  public MqttUpdater(String name) {
+    this.name = Objects.requireNonNull(name, "Name must be set");
     this.logger = LogManager.getLogger(MqttUpdater.class);
     this.callbacks = new HashMap<>();
     this.readyLock = new ReentrantLock();
@@ -55,7 +61,7 @@ public class MqttUpdater {
    */
   public MqttUpdater setHost(String host, int port) throws IOException {
     this.host = URI.create("tcp://" + host + ":" + port);
-    logger.debug("Host is {}", this.host);
+    logger.debug("Host for {} is {}", this.name, this.host);
 
     Objects.requireNonNull(this.host, "Host need to be set!");
     MQTT mqtt = new MQTT();
@@ -105,7 +111,7 @@ public class MqttUpdater {
     connection.connect(new Callback<Void>() {
       @Override
       public void onSuccess(Void value) {
-        connection.publish("components", "Ros2Rag is listening".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
+        connection.publish("components", (name + " is connected").getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
           @Override
           public void onSuccess(Void value) {
             logger.debug("success sending welcome message");
@@ -162,12 +168,12 @@ public class MqttUpdater {
     connection.subscribe(topicArray, new Callback<byte[]>() {
       @Override
       public void onSuccess(byte[] qoses) {
-        logger.debug("Subscribed, qoses: {}", qoses);
+        logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
       }
 
       @Override
       public void onFailure(Throwable cause) {
-        logger.error("Could not subscribe", cause);
+        logger.error("Could not subscribe to {}", topic, cause);
       }
     });
   }
@@ -204,7 +210,7 @@ public class MqttUpdater {
     connection.disconnect(new Callback<Void>() {
       @Override
       public void onSuccess(Void value) {
-        logger.info("Disconnected from {}", host);
+        logger.info("Disconnected {} from {}", name, host);
       }
 
       @Override
@@ -218,7 +224,7 @@ public class MqttUpdater {
     connection.publish(topic, bytes, qos, false, new Callback<Void>() {
       @Override
       public void onSuccess(Void value) {
-        logger.debug("Published some bytes");
+        logger.debug("Published some bytes to {}", topic);
       }
 
       @Override
diff --git a/ros2rag.example/src/main/resources/log4j2.xml b/ros2rag.example/src/main/resources/log4j2.xml
index 9566029..98d9c02 100644
--- a/ros2rag.example/src/main/resources/log4j2.xml
+++ b/ros2rag.example/src/main/resources/log4j2.xml
@@ -2,11 +2,11 @@
 <Configuration status="INFO">
     <Appenders>
         <Console name="Console" target="SYSTEM_OUT">
-            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{1.} - %msg%n"/>
         </Console>
     </Appenders>
     <Loggers>
-        <Root level="info">
+        <Root level="debug">
             <AppenderRef ref="Console"/>
         </Root>
     </Loggers>
diff --git a/ros2rag.receiverstub/.gitignore b/ros2rag.receiverstub/.gitignore
new file mode 100644
index 0000000..87b4cdd
--- /dev/null
+++ b/ros2rag.receiverstub/.gitignore
@@ -0,0 +1,5 @@
+build
+src/gen-res/
+src/gen/
+out/
+*.class
diff --git a/ros2rag.receiverstub/build.gradle b/ros2rag.receiverstub/build.gradle
new file mode 100644
index 0000000..815b43e
--- /dev/null
+++ b/ros2rag.receiverstub/build.gradle
@@ -0,0 +1,47 @@
+apply plugin: 'application'
+apply plugin: 'com.google.protobuf'
+
+sourceCompatibility = 1.8
+
+mainClassName = 'de.tudresden.inf.st.ros2rag.receiverstub.Main'
+
+repositories {
+    jcenter()
+}
+
+buildscript {
+    repositories.jcenter()
+    dependencies {
+        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.12'
+    }
+}
+
+sourceSets.main.java.srcDir "src/gen/java"
+jar.manifest.attributes('Main-Class': 'de.tudresden.inf.st.ros2rag.receiverstub.Main')
+
+dependencies {
+    implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: "${jackson_version}"
+    implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}"
+    implementation group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11'
+    compile 'com.google.protobuf:protobuf-java:3.0.0'
+    compile group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15'
+
+    implementation project(':ros2rag.example')
+
+    protobuf files("$projectDir/../ros2rag.example/src/main/proto")
+}
+
+test {
+    useJUnitPlatform()
+
+    maxHeapSize = '1G'
+}
+
+protobuf {
+    // create strange directories, so use default here
+//    generatedFilesBaseDir = "$projectDir/src/gen/java"
+    protoc {
+        // The artifact spec for the Protobuf Compiler
+        artifact = 'com.google.protobuf:protoc:3.0.0'
+    }
+}
diff --git a/ros2rag.receiverstub/src/main/java/de/tudresden/inf/st/ros2rag/receiverstub/Main.java b/ros2rag.receiverstub/src/main/java/de/tudresden/inf/st/ros2rag/receiverstub/Main.java
new file mode 100644
index 0000000..36a0334
--- /dev/null
+++ b/ros2rag.receiverstub/src/main/java/de/tudresden/inf/st/ros2rag/receiverstub/Main.java
@@ -0,0 +1,100 @@
+package de.tudresden.inf.st.ros2rag.receiverstub;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import config.Robotconfig.RobotConfig;
+import de.tudresden.inf.st.ros2rag.example.MqttUpdater;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import panda.Linkstate.PandaLinkState;
+
+import java.text.MessageFormat;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class Main {
+
+  private static final Logger logger = LogManager.getLogger(Main.class);
+
+  public static void main(String[] args) throws Exception {
+    String jointTopic, configTopic;
+    if (args.length < 2) {
+      jointTopic = "robot/joint1";
+      configTopic = "robot/config";
+    } else {
+      jointTopic = args[0];
+      configTopic = args[1];
+    }
+
+    final Lock finishLock = new ReentrantLock();
+    final Condition finishCondition = finishLock.newCondition();
+
+    MqttUpdater receiver = new MqttUpdater("receiver stub");
+    receiver.setHost("localhost", 1883);
+    receiver.waitUntilReady(2, TimeUnit.SECONDS);
+    receiver.newConnection(configTopic, bytes -> {
+      try {
+        logger.debug("Got a config message, parsing ...");
+        RobotConfig robotConfig = RobotConfig.parseFrom(bytes);
+        logger.info("robotConfig: speed = {}, loopTrajectory = {}, planningMode = {}",
+            robotConfig.getSpeed(),
+            robotConfig.getLoopTrajectory(),
+            robotConfig.getPlanningMode().toString());
+      } catch (InvalidProtocolBufferException e) {
+        e.printStackTrace();
+      }
+    });
+    receiver.newConnection(jointTopic, bytes -> {
+      try {
+        logger.debug("Got a joint message, parsing ...");
+        PandaLinkState pls = PandaLinkState.parseFrom(bytes);
+        PandaLinkState.Position tmpPosition = pls.getPos();
+        PandaLinkState.Orientation tmpOrientation = pls.getOrient();
+        PandaLinkState.TwistLinear tmpTwistLinear = pls.getTl();
+        PandaLinkState.TwistAngular tmpTwistAngular = pls.getTa();
+        logger.info("{}: pos({},{},{}), orient({},{},{},{})," +
+                " twist-linear({},{},{}), twist-angular({},{},{})",
+            pls.getName(),
+            tmpPosition.getPositionX(),
+            tmpPosition.getPositionY(),
+            tmpPosition.getPositionZ(),
+            tmpOrientation.getOrientationX(),
+            tmpOrientation.getOrientationY(),
+            tmpOrientation.getOrientationZ(),
+            tmpOrientation.getOrientationW(),
+            tmpTwistLinear.getTwistLinearX(),
+            tmpTwistLinear.getTwistLinearY(),
+            tmpTwistLinear.getTwistLinearZ(),
+            tmpTwistAngular.getTwistAngularX(),
+            tmpTwistAngular.getTwistAngularY(),
+            tmpTwistAngular.getTwistAngularZ());
+      } catch (InvalidProtocolBufferException e) {
+        e.printStackTrace();
+      }
+    });
+    receiver.newConnection("components", bytes -> {
+      String message = new String(bytes);
+      logger.info("Components: {}", message);
+    });
+    receiver.newConnection("receiver", bytes -> {
+      String message = new String(bytes);
+      if (message.equals("exit")) {
+        try {
+          finishLock.lock();
+          finishCondition.signal();
+        } finally {
+          finishLock.unlock();
+        }
+      }
+    });
+    try {
+      finishLock.lock();
+      finishCondition.await();
+    } finally {
+      finishLock.unlock();
+    }
+    receiver.close();
+    Runtime.getRuntime().addShutdownHook(new Thread(receiver::close));
+  }
+}
diff --git a/ros2rag.receiverstub/src/main/resources/log4j2.xml b/ros2rag.receiverstub/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..679a7bb
--- /dev/null
+++ b/ros2rag.receiverstub/src/main/resources/log4j2.xml
@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<Configuration status="INFO">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %logger{1.} - %msg%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="debug">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
diff --git a/ros2rag.senderstub/build.gradle b/ros2rag.senderstub/build.gradle
index b5a2762..517e991 100644
--- a/ros2rag.senderstub/build.gradle
+++ b/ros2rag.senderstub/build.gradle
@@ -26,6 +26,8 @@ dependencies {
     compile 'com.google.protobuf:protobuf-java:3.0.0'
     compile group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version: '1.15'
 
+    implementation project(':ros2rag.example')
+
     protobuf files("$projectDir/../ros2rag.example/src/main/proto")
 }
 
diff --git a/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSender.java b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSender.java
deleted file mode 100644
index 53ecd22..0000000
--- a/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSender.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package de.tudresden.inf.st.ros2rag.senderstub;
-
-import org.fusesource.mqtt.client.QoS;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Small helper to publish messages to a MQTT broker.
- *
- * @author rschoene - Initial contribution
- */
-public interface MQTTSender extends AutoCloseable {
-
-  /**
-   * Sets the host running the MQTT broker.
-   * @param host host name (IP address or domain name)
-   * @param port port to use
-   */
-  MQTTSender setHost(String host, int port);
-
-  /**
-   * Set the timeout used for connecting and disconnecting.
-   * @param connectTimeout     Timeout value
-   * @param connectTimeoutUnit Timeout unit
-   */
-  void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit);
-
-  /**
-   * Set the timeout used for publishing messages.
-   * @param publishTimeout     Timeout value
-   * @param publishTimeoutUnit Timeout unit
-   */
-  void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit);
-
-  /**
-   * Publishes a message in a topic at most once.
-   * @param topic   the topic to publish at
-   * @param message the message to publish
-   * @throws Exception if the underlying connection throws an error
-   */
-  default void publish(String topic, byte[] message) throws Exception {
-    this.publish(topic, message, QoS.AT_MOST_ONCE);
-  }
-
-  /**
-   * Publishes a message in a topic with the given quality of service (QoS).
-   * @param topic   the topic to publish at
-   * @param message the message to publish
-   * @param qos     the needed quality of service (at most once, at least once, exactly once)
-   * @throws Exception if the underlying connection throws an error
-   */
-  void publish(String topic, byte[] message, QoS qos) throws Exception;
-
-  /**
-   * Checks, whether the connection to the host (set in the constructor) is established.
-   * @return <code>true</code> if this sender is connected to the host
-   */
-  boolean isConnected();
-
-  @Override
-  void close() throws Exception;
-}
diff --git a/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSenderImpl.java b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSenderImpl.java
deleted file mode 100644
index f020cac..0000000
--- a/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/MQTTSenderImpl.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package de.tudresden.inf.st.ros2rag.senderstub;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.fusesource.mqtt.client.*;
-
-import java.net.URI;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Implementation of a MQTT sender using <code>org.fusesource.mqtt.client</code>.
- *
- * @author rschoene - Initial contribution
- */
-public class MQTTSenderImpl implements MQTTSender {
-
-  private final Logger logger = LogManager.getLogger(MQTTSenderImpl.class);
-  /** The connection to the MQTT broker. */
-  private FutureConnection connection;
-
-  /** Timeout for connect/disconnect methods */
-  private long connectTimeout;
-  /** Unit of timeout for connect/disconnect methods */
-  private TimeUnit connectTimeoutUnit;
-
-  /** Timeout for publish method */
-  private long publishTimeout;
-  /** Unit of timeout for publish method */
-  private TimeUnit publishTimeoutUnit;
-
-  @Override
-  public MQTTSender setHost(String host, int port) {
-    /* The host running the MQTT broker. */
-    URI hostUri = URI.create("tcp://" + host + ":" + port);
-    logger.debug("Host is {}", hostUri);
-    MQTT mqtt = new MQTT();
-    mqtt.setHost(hostUri);
-    connection = mqtt.futureConnection();
-    setConnectTimeout(2, TimeUnit.SECONDS);
-    setPublishTimeout(1, TimeUnit.SECONDS);
-    ensureConnected();
-    return this;
-  }
-
-  @Override
-  public void setConnectTimeout(long connectTimeout, TimeUnit connectTimeoutUnit) {
-    this.connectTimeout = connectTimeout;
-    this.connectTimeoutUnit = connectTimeoutUnit;
-  }
-
-  @Override
-  public void setPublishTimeout(long publishTimeout, TimeUnit publishTimeoutUnit) {
-    this.publishTimeout = publishTimeout;
-    this.publishTimeoutUnit = publishTimeoutUnit;
-  }
-
-  @Override
-  public void publish(String topic, byte[] message, QoS qos) throws Exception {
-    if (ensureConnected()) {
-      logger.debug("Send: {} -> {}", topic, message);
-      connection.publish(topic, message, qos, false).await(publishTimeout, publishTimeoutUnit);
-    }
-  }
-
-  /**
-   * Ensures an established connection.
-   * If already connected, return immediately. Otherwise try to connect.
-   * @return <code>true</code> if the connected was established successfully, <code>false</code> if there was an error
-   */
-  private boolean ensureConnected() {
-    if (!isConnected()) {
-      try {
-        connection.connect().await(connectTimeout, connectTimeoutUnit);
-      } catch (Exception e) {
-        logger.warn("Could not connect", e);
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public boolean isConnected() {
-    return connection != null && connection.isConnected();
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (connection == null) {
-      logger.warn("Stopping without connection.");
-      return;
-    }
-    if (isConnected()) {
-      connection.disconnect().await(connectTimeout, connectTimeoutUnit);
-    }
-  }
-}
diff --git a/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/Main.java b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/Main.java
index 5eda5a0..8a3fd0a 100644
--- a/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/Main.java
+++ b/ros2rag.senderstub/src/main/java/de/tudresden/inf/st/ros2rag/senderstub/Main.java
@@ -1,5 +1,6 @@
 package de.tudresden.inf.st.ros2rag.senderstub;
 
+import de.tudresden.inf.st.ros2rag.example.MqttUpdater;
 import panda.Linkstate;
 
 public class Main {
@@ -24,7 +25,7 @@ public class Main {
             .setOrientationW(0)
             .build())
         .build();
-    MQTTSender sender = new MQTTSenderImpl();
+    MqttUpdater sender = new MqttUpdater("sender stub");
     sender.setHost("localhost", 1883);
 //    System.out.println("pls.toByteArray() = " + Arrays.toString(pls.toByteArray()));
     sender.publish(topic, pls.toByteArray());
diff --git a/ros2rag.senderstub/src/main/resources/log4j2.xml b/ros2rag.senderstub/src/main/resources/log4j2.xml
index 9566029..16afeeb 100644
--- a/ros2rag.senderstub/src/main/resources/log4j2.xml
+++ b/ros2rag.senderstub/src/main/resources/log4j2.xml
@@ -6,7 +6,7 @@
         </Console>
     </Appenders>
     <Loggers>
-        <Root level="info">
+        <Root level="debug">
             <AppenderRef ref="Console"/>
         </Root>
     </Loggers>
diff --git a/settings.gradle b/settings.gradle
index 5a1c0e7..22db44e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -3,3 +3,4 @@ rootProject.name = 'ros2rag'
 include 'ros2rag.base'
 include 'ros2rag.example'
 include 'ros2rag.senderstub'
+include 'ros2rag.receiverstub'
-- 
GitLab