Skip to content
Snippets Groups Projects
Commit f1436394 authored by René Schöne's avatar René Schöne
Browse files

use coordinator in place-impls

- add dry-run option
- check for running services
parent 89f1b192
No related branches found
No related tags found
1 merge request!1Multiple scenes, multiple robots and more
Pipeline #9851 failed
......@@ -14,10 +14,15 @@ public class Configuration {
public String mqttHost;
public String filenameInitialScene;
public boolean useReachability;
public String coordinatorMqttTopicPrefix;
public List<ReachabilityConfig> reachability;
public static class ReachabilityConfig {
public String idRobot;
public String filename;
}
public boolean useCoordinator() {
return coordinatorMqttTopicPrefix != null;
}
}
......@@ -28,11 +28,15 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
private final String TOPIC_EXIT;
private final String TOPIC_SCENE_INIT;
private final String TOPIC_SUFFIX_COORDINATOR_STATUS = "status";
private final String TOPIC_SUFFIX_COORDINATOR_COMMAND = "command";
protected MqttHandler mainHandler;
protected WorldModel model;
protected Configuration config;
protected final String cellName;
protected final Path pathToConfig;
private CountDownLatch startCondition;
public SharedMainParts(String cellName, Path pathToConfig) {
this.cellName = cellName;
......@@ -64,6 +68,7 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
mainHandler.setHost(config.mqttHost);
mainHandler.waitUntilReady(2, TimeUnit.SECONDS);
CountDownLatch exitCondition = new CountDownLatch(1);
startCondition = new CountDownLatch(1);
mainHandler.newConnection(TOPIC_EXIT, bytes -> exitCondition.countDown());
mainHandler.newConnection(TOPIC_MODEL, bytes -> logStatus(new String(bytes)));
mainHandler.newConnection(TOPIC_REWIND, bytes ->
......@@ -75,6 +80,10 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
}
}
);
if (config.useCoordinator()) {
mainHandler.newConnection(joinTopics(config.coordinatorMqttTopicPrefix, TOPIC_SUFFIX_COORDINATOR_COMMAND),
bytes -> reactToCoordinatorCommand(new String(bytes)));
}
createSpecificMainHandlerConnections();
......@@ -87,6 +96,41 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
exitCondition.await();
}
private String joinTopics(String... topics) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < topics.length; i++) {
String topic = topics[i];
if (i > 0 && topic.startsWith("/")) {
topic = topic.substring(1);
}
if (i < topics.length - 1 && topic.endsWith("/")) {
topic = topic.substring(0, topic.length() - 1);
}
if (i > 0) {
sb.append("/");
}
sb.append(topic);
}
return sb.toString();
}
private void reactToCoordinatorCommand(String command) {
switch (command) {
case "rewind":
try {
rewind(command);
} catch (Exception e) {
e.printStackTrace();
}
break;
case "start":
startCondition.countDown();
break;
default:
System.err.println("Unknown command: " + command);
}
}
protected abstract void createSpecificMainHandlerConnections();
private void rewind(String statusMessage) throws Exception {
......@@ -101,11 +145,21 @@ public abstract class SharedMainParts<MqttHandler extends SharedMainParts.MqttHa
model.ragconnectCheckIncremental();
model.ragconnectSetupMqttWaitUntilReady(2, TimeUnit.SECONDS);
if (config.useCoordinator()) {
mainHandler.publish(joinTopics(config.coordinatorMqttTopicPrefix, TOPIC_SUFFIX_COORDINATOR_STATUS),
"up".getBytes(StandardCharsets.UTF_8));
}
startCondition.await();
connectEndpoints();
logStatus(statusMessage);
mainHandler.publish(TOPIC_SCENE_INIT, scene.toByteArray());
if (config.useCoordinator()) {
mainHandler.publish(joinTopics(config.coordinatorMqttTopicPrefix, TOPIC_SUFFIX_COORDINATOR_STATUS),
"ready".getBytes(StandardCharsets.UTF_8));
}
}
protected abstract Scene readSceneAndRobots() throws Exception;
......
......@@ -18,18 +18,53 @@ aspect Computation {
System.out.println(getName() + " ready to be started");
return "start";
}
}
aspect Manipulation {
public static boolean Coordinator.DRY_RUN = false;
public Set<Component> Coordinator.getRunningComponents() throws IOException, InterruptedException {
String[] args = { "docker-compose", "ps", "--services", "--filter", "status=running" };
if (DRY_RUN) {
System.out.println("Would start > " + java.util.Arrays.toString(args));
return Collections.emptySet();
}
ProcessBuilder builder = new ProcessBuilder(args);
Process process = builder.start();
process.waitFor();
if (process.exitValue() != 0) {
System.err.println("Could not list services.");
return Collections.emptySet();
}
Set<Component> result = new HashSet<>();
List<String> services;
try (java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(process.getInputStream()))) {
services = reader.lines().collect(java.util.stream.Collectors.toList());
} catch (IOException e) {
e.printStackTrace();
return Collections.emptySet();
}
for (String service : services) {
resolveComponentByDockerComposeName(service).ifPresentOrElse(
comp -> result.add(comp),
() -> System.err.println("Could not resolve component for '" + service + "'!")
);
}
return result;
}
public void Component.callDockerCompose() {
public boolean Component.callDockerCompose() throws IOException, InterruptedException {
String[] args = { "docker-compose", "up", "-d", getDockerComposeName() };
if (coordinator().DRY_RUN) {
System.out.println("Would start > " + java.util.Arrays.toString(args));
return true;
}
ProcessBuilder builder = new ProcessBuilder(args);
// builder.redirectOutput(err);
// builder.redirectError(err);
// Process process = builder.start();
// process.waitFor();
// return process.exitValue();
return;
Process process = builder.start();
process.waitFor();
return process.exitValue() == 0;
}
}
......@@ -73,6 +108,15 @@ aspect Resolving {
return Optional.empty();
}
syn Optional<Component> Coordinator.resolveComponentByDockerComposeName(String dockerComposeName) {
for (Component comp : getComponentList()) {
if (comp.getDockerComposeName().equals(dockerComposeName)) {
return Optional.of(comp);
}
}
return Optional.empty();
}
refine RefResolverStubs eq ParsedPrecedenceRelation.resolvePredecessorByToken(String id, int position) {
return coordinator().resolveComponent(id).orElseThrow(() -> new RuntimeException("Predecessor '" + id + "' not found in " + this.prettyPrint() + "!"));
}
......
......@@ -12,6 +12,8 @@ import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
......@@ -35,9 +37,13 @@ public class MainCoordinator {
MainCoordinator main = new MainCoordinator();
// main.manuallyBuild();
main.parsedBuild();
if (Arrays.asList(args).contains("--dry-run")) {
Coordinator.DRY_RUN = true;
}
main.start();
}
@SuppressWarnings("unused")
private void manuallyBuild() {
coordinator = new Coordinator();
Component robotCtrlA = newComponent("Robot Control A", "ros_place_a", "ros-place-a");
......@@ -111,9 +117,12 @@ public class MainCoordinator {
comp.connectNextCommand("mqtt://" + mqttHost + "/" + comp.getMqttTopicPrefix() + "/command", false);
}
Set<Component> alreadyRunning = coordinator.getRunningComponents();
for (Component comp : coordinator.getComponentList()) {
if (!alreadyRunning.contains(comp)) {
comp.callDockerCompose();
}
}
mainHandler = new MqttHandler().dontSendWelcomeMessage();
mainHandler.setHost(mqttHost);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment