Skip to content
Snippets Groups Projects
Commit 3c41cf34 authored by René Schöne's avatar René Schöne
Browse files

WIP: strategies and more

parent 5fd4a475
No related branches found
No related tags found
No related merge requests found
Dockerfile
...@@ -22,4 +22,4 @@ WORKDIR /coordinator ...@@ -22,4 +22,4 @@ WORKDIR /coordinator
RUN ./gradlew --no-daemon installDist RUN ./gradlew --no-daemon installDist
ENV CONFIG_FILE /config.coordinator ENV CONFIG_FILE /config.coordinator
ENTRYPOINT /bin/bash -c "./build/install/coordinator/bin/coordinator $CONFIG_FILE" ENTRYPOINT ["/bin/bash", "./build/install/coordinator/bin/coordinator", "--", "$CONFIG_FILE"]
components { components {
component robotCtrlA, docker compose = "ros_place_a", mqtt topic = "ros-place-a" component robotCtrlA, docker compose = "ros_place_a", mqtt topic = "ros-place-a", status "ready" after 1 sec
component robotCtrlB, docker compose = "ros_place_b", mqtt topic = "ros-place-b" component robotCtrlB, docker compose = "ros_place_b", mqtt topic = "ros-place-b", status "ready" after 1 sec
component ragA, docker compose = "rag_place_a", mqtt topic = "rag-a" component ragA, docker compose = "rag_place_a", mqtt topic = "rag-a"
component ragB, docker compose = "rag_place_b", mqtt topic = "rag-b" component ragB, docker compose = "rag_place_b", mqtt topic = "rag-b"
component random, docker compose = "cgv_random_dummy", status "ready" after 1 sec, start as up component random, docker compose = "cgv_random_dummy", status "ready" after 1 sec, start as up
......
...@@ -46,11 +46,16 @@ Comment = "//" [^\n\r]+ ...@@ -46,11 +46,16 @@ Comment = "//" [^\n\r]+
"components" { return sym(Terminals.COMPONENTS); } "components" { return sym(Terminals.COMPONENTS); }
"component" { return sym(Terminals.COMPONENT); } "component" { return sym(Terminals.COMPONENT); }
"docker compose" { return sym(Terminals.DOCKER_COMPOSE); } "docker compose" { return sym(Terminals.DOCKER_COMPOSE); }
"mqtt topic" { return sym(Terminals.MQTT_TOPIC); } "script" { return sym(Terminals.SCRIPT); }
"report" { return sym(Terminals.REPORT); }
"mqtt" { return sym(Terminals.MQTT); }
"status" { return sym(Terminals.STATUS); } "status" { return sym(Terminals.STATUS); }
"after" { return sym(Terminals.AFTER); } "after" { return sym(Terminals.AFTER); }
"sec" { return sym(Terminals.SEC); } "sec" { return sym(Terminals.SEC); }
"start as up" { return sym(Terminals.START_AS_UP); } "in" { return sym(Terminals.IN); }
"start" { return sym(Terminals.START); }
"using" { return sym(Terminals.USING); }
"after reqs" { return sym(Terminals.AFTER_REQS); }
"=" { return sym(Terminals.EQUALS); } "=" { return sym(Terminals.EQUALS); }
"," { return sym(Terminals.COMMA); } "," { return sym(Terminals.COMMA); }
......
...@@ -51,30 +51,91 @@ aspect Manipulation { ...@@ -51,30 +51,91 @@ aspect Manipulation {
for (String service : services) { for (String service : services) {
resolveComponentByDockerComposeName(service).ifPresentOrElse( resolveComponentByDockerComposeName(service).ifPresentOrElse(
comp -> result.add(comp), comp -> result.add(comp),
() -> System.err.println("Could not resolve component for '" + service + "'!") () -> System.err.println("Could not resolve docker component for '" + service + "'!")
); );
} }
return result; return result;
} }
public boolean Component.callDockerCompose() throws IOException, InterruptedException { //--- start ---
String[] args = { "docker-compose", "up", "-d", getDockerComposeName() }; public abstract boolean StartStrategy.start() throws IOException, InterruptedException;
@Override
public boolean DockerComposeStrategy.start() throws IOException, InterruptedException {
if (!getName().isBlank()) {
String[] args = { "docker-compose", "up", "-d", getName() };
if (coordinator().DRY_RUN) { if (coordinator().DRY_RUN) {
System.out.println("Would start > " + java.util.Arrays.toString(args)); System.out.println("Would start > " + java.util.Arrays.toString(args));
return true; return true;
} }
System.out.println("Starting " + Arrays.toString(args));
ProcessBuilder builder = new ProcessBuilder(args); ProcessBuilder builder = new ProcessBuilder(args);
Process process = builder.start(); Process process = builder.start();
process.waitFor(); process.waitFor();
try (java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(process.getInputStream()))) {
System.out.println("Out of " + java.util.Arrays.toString(args) + ":\n" + reader.lines().collect(java.util.stream.Collectors.joining("\n")));
} catch (IOException e) {
e.printStackTrace();
}
try (java.io.BufferedReader reader = new java.io.BufferedReader(new java.io.InputStreamReader(process.getErrorStream()))) {
System.out.println("Err of " + java.util.Arrays.toString(args) + ":\n" + reader.lines().collect(java.util.stream.Collectors.joining("\n")));
} catch (IOException e) {
e.printStackTrace();
}
return process.exitValue() == 0; return process.exitValue() == 0;
} }
return false;
}
@Override
public boolean ScriptStrategy.start() throws IOException, InterruptedException {
String[] command = getCommand().split(" ");
if (coordinator().DRY_RUN) {
System.out.println("Would start script " + Arrays.toString(command));
return true;
}
System.out.println("Starting script " + Arrays.toString(command));
ProcessBuilder builder = new ProcessBuilder(command);
if (!getCwd().isBlank()) {
builder.directory(new java.io.File(getCwd()));
}
Process process = builder.start();
// Do not wait for process to be finished
return true;
}
@Override
public boolean ManualStartStrategy.start() throws IOException, InterruptedException {
// do nothing
System.out.println("Component " + containingComponent().getName() + " has to be started");
return true;
}
} }
aspect Navigation { aspect Navigation {
inh Coordinator Component.coordinator(); inh Coordinator Component.coordinator();
inh Coordinator ParsedPrecedenceRelation.coordinator(); inh Coordinator ParsedPrecedenceRelation.coordinator();
eq Coordinator.getChild().coordinator() = this; eq Coordinator.getChild().coordinator() = this;
inh Component StartStrategy.containingComponent();
eq Component.getStartStrategy().containingComponent() = this;
syn boolean StartStrategy.isDockerComposeStartStrategy() = false;
eq DockerComposeStartStrategy.isDockerComposeStartStrategy() = true;
syn boolean StartStrategy.asDockerComposeStartStrategy() = null;
eq DockerComposeStartStrategy.asDockerComposeStartStrategy() = this;
syn boolean ReportStrategy.isMqttReportStrategy() = false;
eq MqttReportStrategy.isMqttReportStrategy() = true;
syn boolean ReportStrategy.asMqttReportStrategy() = null;
eq MqttReportStrategy.asMqttReportStrategy() = this;
} }
aspect Printing { aspect Printing {
...@@ -117,6 +178,7 @@ aspect Printing { ...@@ -117,6 +178,7 @@ aspect Printing {
return "Component " + getName() + " is " + status; return "Component " + getName() + " is " + status;
} }
//--- details ---
syn String Coordinator.details() { syn String Coordinator.details() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
getComponentList().forEach(comp -> sb.append(comp.details()).append("\n")); getComponentList().forEach(comp -> sb.append(comp.details()).append("\n"));
...@@ -128,8 +190,8 @@ aspect Printing { ...@@ -128,8 +190,8 @@ aspect Printing {
sj.add(pred.getName()); sj.add(pred.getName());
} }
return "<Name: " + getName() + return "<Name: " + getName() +
", DockerComposeName: " + getDockerComposeName() + ", StartStrategy: " + getStartStrategy().details() +
", MqttTopicPrefix: " + getMqttTopicPrefix() + ", ReportStrategy: " + getReportStrategy().details() +
", Status: " + getStatus() + ", Status: " + getStatus() +
", NextCommand: " + getNextCommand() + ", NextCommand: " + getNextCommand() +
", Deps: " + sj.toString() + ", Deps: " + sj.toString() +
...@@ -140,6 +202,12 @@ aspect Printing { ...@@ -140,6 +202,12 @@ aspect Printing {
syn String AutoSetStatus.details() { syn String AutoSetStatus.details() {
return "[" + getStatus() + " after " + getDelayInSeconds() + " sec]"; return "[" + getStatus() + " after " + getDelayInSeconds() + " sec]";
} }
syn String StartStrategy.details();
eq DockerComposeStrategy.details() = "docker compose (name: " + getName() + ")";
eq ScriptStrategy.details() = "script (cmd: '" + getCommand() + "'" + (getCwd().isBlank() ? "" : "cwd: '" + getCwd() + "'") + ")";
eq ManualStartStrategy.details() = "manual";
syn String ReportStrategy.details();
eq MqttReportStrategy.details() = "mqtt (topic: " + getTopicPrefix() + ")";
syn String ParsedPrecedenceRelation.prettyPrint() { syn String ParsedPrecedenceRelation.prettyPrint() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
......
...@@ -44,11 +44,21 @@ Component component = ...@@ -44,11 +44,21 @@ Component component =
; ;
Component component_body = Component component_body =
COMMA DOCKER_COMPOSE EQUALS TEXT.dc component_body.c {: c.setDockerComposeName(dc); return c; :} COMMA START USING start_strategy.s component_body.c {: c.setStartStrategy(s); return c; :}
| COMMA MQTT_TOPIC EQUALS TEXT.mqtt component_body.c {: c.setMqttTopicPrefix(mqtt); return c; :} | COMMA REPORT USING report_strategy.r component_body.c {: c.setReportStrategy(r); return c; :}
| COMMA STATUS TEXT.s AFTER INTEGER.i SEC component_body.c {: c.setAutoSetStatus(new AutoSetStatus(s, Integer.parseInt(i))); return c; :} | COMMA STATUS TEXT.s AFTER INTEGER.i SEC component_body.c {: c.setAutoSetStatus(new AutoSetStatus(s, Integer.parseInt(i))); return c; :}
| COMMA START_AS_UP component_body.c {: c.setStartAsUp(true); return c; :} | COMMA START AFTER_REQS component_body.c {: c.setStartAsUp(true); return c; :}
| {: return new Component(); :} | {: Component c = new Component(); c.setStartStrategy(new ManualStartStrategy()); return c; :}
;
StartStrategy start_strategy =
DOCKER_COMPOSE TEXT.dc component_body.c {: return new DockerComposeStrategy().setName(dc); :}
| SCRIPT TEXT.cmd {: return new ScriptStrategy().setCommand(cmd); :}
| SCRIPT TEXT.cmd IN TEXT.cwd {: return new ScriptStrategy().setCommand(cmd).setCwd(cwd); :}
;
ReportStrategy report_strategy =
MQTT TEXT.topic component_body.c {: return new ReportStrategy().setMqttTopicPrefix(topic); return c; :}
; ;
StringList string_list = StringList string_list =
......
Coordinator ::= Component* ParsedPrecedenceRelation* ; // /NextComponentToStart:Component/ ; Coordinator ::= Component* ParsedPrecedenceRelation* ; // /NextComponentToStart:Component/ ;
Component ::= <Name:String> <DockerComposeName:String> <MqttTopicPrefix:String> <Status:String> [AutoSetStatus] <StartAsUp:boolean> /<NextCommand:String>/ ; Component ::= <Name:String> [StartStrategy] [ReportStrategy] <Status:String> [AutoSetStatus] <StartAsUp:boolean> /<NextCommand:String>/ ;
rel Component.Predecessor* <-> Component.Successor* ; rel Component.Predecessor* <-> Component.Successor* ;
abstract StartStrategy ;
DockerComposeStartStrategy : StartStrategy ::= <Name:String> ;
ScriptStartStrategy : StartStrategy ::= <Command> <Cwd> ;
ManualStartStrategy : StartStrategy ;
abstract ReportStrategy ;
MqttReportStrategy : ReportStrategy ::= <TopicPrefix:String> ;
AutoSetStatus ::= <Status:String> <DelayInSeconds:int> ; AutoSetStatus ::= <Status:String> <DelayInSeconds:int> ;
ParsedPrecedenceRelation ; ParsedPrecedenceRelation ;
......
...@@ -85,8 +85,11 @@ public class MainCoordinator implements Callable<Integer> { ...@@ -85,8 +85,11 @@ public class MainCoordinator implements Callable<Integer> {
Runtime.getRuntime().addShutdownHook(new Thread(this::close)); Runtime.getRuntime().addShutdownHook(new Thread(this::close));
for (Component comp : coordinator.getComponentList()) { for (Component comp : coordinator.getComponentList()) {
if (comp.getMqttTopicPrefix() != null && !comp.getMqttTopicPrefix().isBlank()) { if (comp.hasReportStrategy() && comp.getReportStrategy().isMqttReportStrategy()) {
comp.connectStatus("mqtt://" + mqttHost + "/" + comp.getMqttTopicPrefix() + "/status"); MqttReportStrategy mqttStrategy = comp.getReportStrategy().asMqttReportStrategy();
if (mqttStrategy.getTopicPrefix() != null && !mqttStrategy.getTopicPrefix().isBlank()) {
comp.connectStatus("mqtt://" + mqttHost + "/" + mqttStrategy.getTopicPrefix() + "/status");
}
} }
final String commandTopic; final String commandTopic;
if (comp.getStartAsUp()) { if (comp.getStartAsUp()) {
...@@ -94,26 +97,35 @@ public class MainCoordinator implements Callable<Integer> { ...@@ -94,26 +97,35 @@ public class MainCoordinator implements Callable<Integer> {
commandTopic = "coordinator/" + comp.getName(); commandTopic = "coordinator/" + comp.getName();
mainHandler.newConnection(commandTopic, bytes -> { mainHandler.newConnection(commandTopic, bytes -> {
try { try {
comp.callDockerCompose(); comp.getStartStrategy().start();
comp.setStatus("ready"); comp.setStatus("ready");
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {
comp.setStatus("failedStart"); comp.setStatus("failedStart");
e.printStackTrace(); e.printStackTrace();
} }
}); });
} else if (comp.hasReportStrategy() && comp.getReportStrategy().isMqttReportStrategy()) {
commandTopic = comp.getReportStrategy().asMqttReportStrategy().getTopicPrefix() + "/command";
} else { } else {
commandTopic = comp.getMqttTopicPrefix() + "/command"; commandTopic = null;
} }
if (commandTopic != null) {
comp.connectNextCommand("mqtt://" + mqttHost + "/" + commandTopic, false); comp.connectNextCommand("mqtt://" + mqttHost + "/" + commandTopic, false);
} }
}
Set<Component> alreadyRunning = coordinator.getRunningComponents(); Set<Component> alreadyRunning = coordinator.getRunningComponents();
for (Component comp : coordinator.getComponentList()) { for (Component comp : coordinator.getComponentList()) {
if (alreadyRunning.contains(comp)) {
comp.setStatus("up");
if (comp.hasAutoSetStatus() && !comp.getStartAsUp()) {
schduleAutoSetStatus(comp);
}
}
if (!alreadyRunning.contains(comp) && !comp.getStartAsUp()) { if (!alreadyRunning.contains(comp) && !comp.getStartAsUp()) {
comp.callDockerCompose(); comp.getStartStrategy().start();
if (comp.hasAutoSetStatus()) { if (comp.hasAutoSetStatus()) {
executor.schedule(() -> comp.setStatus(comp.getAutoSetStatus().getStatus()), schduleAutoSetStatus(comp);
comp.getAutoSetStatus().getDelayInSeconds(), TimeUnit.SECONDS);
} }
} }
} }
...@@ -122,6 +134,13 @@ public class MainCoordinator implements Callable<Integer> { ...@@ -122,6 +134,13 @@ public class MainCoordinator implements Callable<Integer> {
return 0; return 0;
} }
private void schduleAutoSetStatus(Component comp) {
executor.schedule(() -> {
System.out.println("Setting status of " + comp.getName() + " to " + comp.getAutoSetStatus().getStatus());
comp.setStatus(comp.getAutoSetStatus().getStatus());
}, comp.getAutoSetStatus().getDelayInSeconds(), TimeUnit.SECONDS);
}
private void printStatus(String message) { private void printStatus(String message) {
String content = message.contains("detail") ? coordinator.details() : coordinator.prettyPrint(); String content = message.contains("detail") ? coordinator.details() : coordinator.prettyPrint();
System.out.println(content); System.out.println(content);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment