From f1436394535836a8ac74d61469b106c949905dce Mon Sep 17 00:00:00 2001
From: rschoene <rene.schoene@tu-dresden.de>
Date: Wed, 26 May 2021 15:06:57 +0200
Subject: [PATCH] use coordinator in place-impls

- add dry-run option
- check for running services
---
 .../inf/st/ros3rag/common/Configuration.java  |  5 ++
 .../st/ros3rag/common/SharedMainParts.java    | 54 +++++++++++++++++
 .../src/main/jastadd/Coordinator.jrag         | 60 ++++++++++++++++---
 .../inf/st/coordinator/MainCoordinator.java   | 11 +++-
 4 files changed, 121 insertions(+), 9 deletions(-)

diff --git a/ros3rag.common/src/main/java/de/tudresden/inf/st/ros3rag/common/Configuration.java b/ros3rag.common/src/main/java/de/tudresden/inf/st/ros3rag/common/Configuration.java
index 4323482..962e31a 100644
--- a/ros3rag.common/src/main/java/de/tudresden/inf/st/ros3rag/common/Configuration.java
+++ b/ros3rag.common/src/main/java/de/tudresden/inf/st/ros3rag/common/Configuration.java
@@ -14,10 +14,15 @@ public class Configuration {
   public String mqttHost;
   public String filenameInitialScene;
   public boolean useReachability;
+  public String coordinatorMqttTopicPrefix;
   public List<ReachabilityConfig> reachability;
 
   public static class ReachabilityConfig {
     public String idRobot;
     public String filename;
   }
+
+  public boolean useCoordinator() {
+    return coordinatorMqttTopicPrefix != null;
+  }
 }
diff --git a/ros3rag.common/src/main/java/de/tudresden/inf/st/ros3rag/common/SharedMainParts.java b/ros3rag.common/src/main/java/de/tudresden/inf/st/ros3rag/common/SharedMainParts.java
index 0f3a53e..b51ab93 100644
--- a/ros3rag.common/src/main/java/de/tudresden/inf/st/ros3rag/common/SharedMainParts.java
+++ b/ros3rag.common/src/main/java/de/tudresden/inf/st/ros3rag/common/SharedMainParts.java
@@ -28,11 +28,15 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
   private final String TOPIC_EXIT;
   private final String TOPIC_SCENE_INIT;
 
+  private final String TOPIC_SUFFIX_COORDINATOR_STATUS = "status";
+  private final String TOPIC_SUFFIX_COORDINATOR_COMMAND = "command";
+
   protected MqttHandler mainHandler;
   protected WorldModel model;
   protected Configuration config;
   protected final String cellName;
   protected final Path pathToConfig;
+  private CountDownLatch startCondition;
 
   public SharedMainParts(String cellName, Path pathToConfig) {
     this.cellName = cellName;
@@ -64,6 +68,7 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
     mainHandler.setHost(config.mqttHost);
     mainHandler.waitUntilReady(2, TimeUnit.SECONDS);
     CountDownLatch exitCondition = new CountDownLatch(1);
+    startCondition = new CountDownLatch(1);
     mainHandler.newConnection(TOPIC_EXIT, bytes -> exitCondition.countDown());
     mainHandler.newConnection(TOPIC_MODEL, bytes -> logStatus(new String(bytes)));
     mainHandler.newConnection(TOPIC_REWIND, bytes ->
@@ -75,6 +80,10 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
           }
         }
     );
+    if (config.useCoordinator()) {
+      mainHandler.newConnection(joinTopics(config.coordinatorMqttTopicPrefix, TOPIC_SUFFIX_COORDINATOR_COMMAND),
+          bytes -> reactToCoordinatorCommand(new String(bytes)));
+    }
 
     createSpecificMainHandlerConnections();
 
@@ -87,6 +96,41 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
     exitCondition.await();
   }
 
+  private String joinTopics(String... topics) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < topics.length; i++) {
+      String topic = topics[i];
+      if (i > 0 && topic.startsWith("/")) {
+        topic = topic.substring(1);
+      }
+      if (i < topics.length - 1 && topic.endsWith("/")) {
+        topic = topic.substring(0, topic.length() - 1);
+      }
+      if (i > 0) {
+        sb.append("/");
+      }
+      sb.append(topic);
+    }
+    return sb.toString();
+  }
+
+  private void reactToCoordinatorCommand(String command) {
+    switch (command) {
+      case "rewind":
+        try {
+          rewind(command);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+        break;
+      case "start":
+        startCondition.countDown();
+        break;
+      default:
+        System.err.println("Unknown command: " + command);
+    }
+  }
+
   protected abstract void createSpecificMainHandlerConnections();
 
   private void rewind(String statusMessage) throws Exception {
@@ -101,11 +145,21 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
     model.ragconnectCheckIncremental();
     model.ragconnectSetupMqttWaitUntilReady(2, TimeUnit.SECONDS);
 
+    if (config.useCoordinator()) {
+      mainHandler.publish(joinTopics(config.coordinatorMqttTopicPrefix, TOPIC_SUFFIX_COORDINATOR_STATUS),
+          "up".getBytes(StandardCharsets.UTF_8));
+    }
+    startCondition.await();
+
     connectEndpoints();
 
     logStatus(statusMessage);
 
     mainHandler.publish(TOPIC_SCENE_INIT, scene.toByteArray());
+    if (config.useCoordinator()) {
+      mainHandler.publish(joinTopics(config.coordinatorMqttTopicPrefix, TOPIC_SUFFIX_COORDINATOR_STATUS),
+          "ready".getBytes(StandardCharsets.UTF_8));
+    }
   }
 
   protected abstract Scene readSceneAndRobots() throws Exception;
diff --git a/ros3rag.coordinator/src/main/jastadd/Coordinator.jrag b/ros3rag.coordinator/src/main/jastadd/Coordinator.jrag
index 2dede36..f45edb0 100644
--- a/ros3rag.coordinator/src/main/jastadd/Coordinator.jrag
+++ b/ros3rag.coordinator/src/main/jastadd/Coordinator.jrag
@@ -18,18 +18,53 @@ aspect Computation {
     System.out.println(getName() + " ready to be started");
     return "start";
   }
+}
+
+aspect Manipulation {
+  public static boolean Coordinator.DRY_RUN = false;
+
+  public Set<Component> Coordinator.getRunningComponents() throws IOException, InterruptedException {
+    String[] args = { "docker-compose", "ps", "--services", "--filter", "status=running" };
+    if (DRY_RUN) {
+      System.out.println("Would start > " + java.util.Arrays.toString(args));
+      return Collections.emptySet();
+    }
+    ProcessBuilder builder = new ProcessBuilder(args);
+
+    Process process = builder.start();
+    process.waitFor();
+    if (process.exitValue() != 0) {
+      System.err.println("Could not list services.");
+      return Collections.emptySet();
+    }
+    Set<Component> result = new HashSet<>();
+    List<String> services;
+    try (java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(process.getInputStream()))) {
+      services = reader.lines().collect(java.util.stream.Collectors.toList());
+    } catch (IOException e) {
+      e.printStackTrace();
+      return Collections.emptySet();
+    }
+    for (String service : services) {
+      resolveComponentByDockerComposeName(service).ifPresentOrElse(
+        comp -> result.add(comp),
+        () -> System.err.println("Could not resolve component for '" + service + "'!")
+      );
+    }
+    return result;
+  }
 
-  public void Component.callDockerCompose() {
+  public boolean Component.callDockerCompose() throws IOException, InterruptedException {
     String[] args = { "docker-compose", "up", "-d", getDockerComposeName() };
-    System.out.println("Would start > " + java.util.Arrays.toString(args));
+    if (coordinator().DRY_RUN) {
+      System.out.println("Would start > " + java.util.Arrays.toString(args));
+      return true;
+    }
     ProcessBuilder builder = new ProcessBuilder(args);
-//    builder.redirectOutput(err);
-//    builder.redirectError(err);
 
-//    Process process = builder.start();
-//    process.waitFor();
-//    return process.exitValue();
-    return;
+    Process process = builder.start();
+    process.waitFor();
+    return process.exitValue() == 0;
   }
 }
 
@@ -73,6 +108,15 @@ aspect Resolving {
     return Optional.empty();
   }
 
+  syn Optional<Component> Coordinator.resolveComponentByDockerComposeName(String dockerComposeName) {
+    for (Component comp : getComponentList()) {
+      if (comp.getDockerComposeName().equals(dockerComposeName)) {
+        return Optional.of(comp);
+      }
+    }
+    return Optional.empty();
+  }
+
   refine RefResolverStubs eq ParsedPrecedenceRelation.resolvePredecessorByToken(String id, int position) {
     return coordinator().resolveComponent(id).orElseThrow(() -> new RuntimeException("Predecessor '" + id + "' not found in " + this.prettyPrint() + "!"));
   }
diff --git a/ros3rag.coordinator/src/main/java/de/tudresden/inf/st/coordinator/MainCoordinator.java b/ros3rag.coordinator/src/main/java/de/tudresden/inf/st/coordinator/MainCoordinator.java
index e184767..51d58a1 100644
--- a/ros3rag.coordinator/src/main/java/de/tudresden/inf/st/coordinator/MainCoordinator.java
+++ b/ros3rag.coordinator/src/main/java/de/tudresden/inf/st/coordinator/MainCoordinator.java
@@ -12,6 +12,8 @@ import java.io.Reader;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -35,9 +37,13 @@ public class MainCoordinator {
     MainCoordinator main = new MainCoordinator();
 //    main.manuallyBuild();
     main.parsedBuild();
+    if (Arrays.asList(args).contains("--dry-run")) {
+      Coordinator.DRY_RUN = true;
+    }
     main.start();
   }
 
+  @SuppressWarnings("unused")
   private void manuallyBuild() {
     coordinator = new Coordinator();
     Component robotCtrlA = newComponent("Robot Control A", "ros_place_a", "ros-place-a");
@@ -111,8 +117,11 @@ public class MainCoordinator {
       comp.connectNextCommand("mqtt://" + mqttHost + "/" + comp.getMqttTopicPrefix() + "/command", false);
     }
 
+    Set<Component> alreadyRunning = coordinator.getRunningComponents();
     for (Component comp : coordinator.getComponentList()) {
-      comp.callDockerCompose();
+      if (!alreadyRunning.contains(comp)) {
+        comp.callDockerCompose();
+      }
     }
 
     mainHandler = new MqttHandler().dontSendWelcomeMessage();
-- 
GitLab