Skip to content
Snippets Groups Projects
Commit 1c945a6e authored by Sebastian Ebert's avatar Sebastian Ebert
Browse files

initial merge state

parents d740a665 d79785b9
Branches
No related tags found
1 merge request!11Merge/dev to fork master merge
Showing
with 1157 additions and 452 deletions
MRagConnect ::= ReceiveDefinition:MReceiveDefinition* SendDefinition:MSendDefinition* MappingDefinition:MMappingDefinition* DependencyDefinition:MDependencyDefinition* RootTypeComponent:MTypeComponent* TokenComponent:MTokenComponent* Handler:MHandler*;
MRagConnect ::= TokenReceiveDefinition:MTokenReceiveDefinition* TokenSendDefinition:MTokenSendDefinition* TypeReceiveDefinition:MTypeReceiveDefinition* TypeSendDefinition:MTypeSendDefinition* MappingDefinition:MMappingDefinition* DependencyDefinition:MDependencyDefinition* RootTypeComponent:MTypeComponent* TokenComponent:MTokenComponent* Handler:MHandler*;
abstract MEndpointDefinition ::= InnerMappingDefinition:MInnerMappingDefinition*;
MReceiveDefinition : MEndpointDefinition;
MSendDefinition : MEndpointDefinition;
abstract MTokenEndpointDefinition : MEndpointDefinition;
MTokenReceiveDefinition : MTokenEndpointDefinition;
MTokenSendDefinition : MTokenEndpointDefinition;
abstract MTypeEndpointDefinition : MEndpointDefinition;
MTypeReceiveDefinition : MTypeEndpointDefinition;
MTypeSendDefinition : MTypeEndpointDefinition;
MMappingDefinition;
MInnerMappingDefinition;
MDependencyDefinition;
......@@ -11,8 +17,10 @@ MHandler ::= <ClassName> <Construction> <AttributeName> <FieldName> <InUse:boole
rel MRagConnect.RagConnect -> RagConnect;
rel MInnerMappingDefinition.MMappingDefinition -> MMappingDefinition;
rel MReceiveDefinition.ReceiveTokenEndpointDefinition -> ReceiveTokenEndpointDefinition;
rel MSendDefinition.SendTokenEndpointDefinition -> SendTokenEndpointDefinition;
rel MTokenReceiveDefinition.ReceiveTokenEndpointDefinition -> ReceiveTokenEndpointDefinition;
rel MTokenSendDefinition.SendTokenEndpointDefinition -> SendTokenEndpointDefinition;
rel MTypeReceiveDefinition.ReceiveTypeEndpointDefinition -> ReceiveTypeEndpointDefinition;
rel MTypeSendDefinition.SendTypeEndpointDefinition -> SendTypeEndpointDefinition;
rel MMappingDefinition.MappingDefinition -> MappingDefinition;
rel MDependencyDefinition.DependencyDefinition -> DependencyDefinition;
rel MTypeComponent.TypeComponent -> TypeComponent;
......
......@@ -31,19 +31,33 @@ aspect MustacheNodesToYAML {
root.put("restHandlerField", restHandlerField());
root.put("restHandlerAttribute", restHandlerAttribute());
// ReceiveDefinitions
// TokenReceiveDefinitions
ListElement receiveDefinitions = new ListElement();
for (MReceiveDefinition def : getReceiveDefinitionList()) {
for (MTokenReceiveDefinition def : getTokenReceiveDefinitionList()) {
receiveDefinitions.addElement(def.toYAML());
}
root.put("ReceiveDefinitions", receiveDefinitions);
root.put("TokenReceiveDefinitions", receiveDefinitions);
// SendDefinitions
// TokenSendDefinitions
ListElement sendDefinitions = new ListElement();
for (MSendDefinition def : getSendDefinitionList()) {
for (MTokenSendDefinition def : getTokenSendDefinitionList()) {
sendDefinitions.addElement(def.toYAML());
}
root.put("SendDefinitions", sendDefinitions);
root.put("TokenSendDefinitions", sendDefinitions);
// TypeReceiveDefinitions
ListElement typeReceiveDefinitions = new ListElement();
for (MTypeReceiveDefinition def : getTypeReceiveDefinitionList()) {
typeReceiveDefinitions.addElement(def.toYAML());
}
root.put("TypeReceiveDefinitions", typeReceiveDefinitions);
// TypeSendDefinitions
ListElement typeSendDefinitions = new ListElement();
for (MTypeSendDefinition def : getTypeSendDefinitionList()) {
typeSendDefinitions.addElement(def.toYAML());
}
root.put("TypeSendDefinitions", typeSendDefinitions);
// MappingDefinitions
ListElement mappingDefinitions = new ListElement();
......@@ -94,14 +108,33 @@ aspect MustacheNodesToYAML {
return result;
}
syn MappingElement MReceiveDefinition.toYAML() {
syn MappingElement MTokenReceiveDefinition.toYAML() {
MappingElement result = super.toYAML();
result.put("loggingEnabledForReads", loggingEnabledForReads);
return result;
}
syn MappingElement MTokenSendDefinition.toYAML() {
MappingElement result = super.toYAML();
result.put("sender", sender());
result.put("lastValue", lastValue());
result.put("loggingEnabledForWrites", loggingEnabledForWrites);
result.put("updateMethod", updateMethod());
result.put("writeMethod", writeMethod());
result.put("tokenResetMethod", tokenResetMethod());
return result;
}
syn MappingElement MTypeReceiveDefinition.toYAML() {
MappingElement result = super.toYAML();
result.put("typeIsList", typeIsList());
result.put("loggingEnabledForReads", loggingEnabledForReads);
return result;
}
syn MappingElement MSendDefinition.toYAML() {
syn MappingElement MTypeSendDefinition.toYAML() {
MappingElement result = super.toYAML();
result.put("typeIsList", typeIsList());
result.put("sender", sender());
result.put("lastValue", lastValue());
result.put("loggingEnabledForWrites", loggingEnabledForWrites);
......@@ -181,5 +214,5 @@ aspect Navigation {
eq Document.getChild().program() = null;
eq Document.getChild().ragconnect() = null;
eq Document.getChild().containedFile() = null;
eq Document.getChild().containedFileName() = getFileName();
eq Document.containedFileName() = getFileName();
}
......@@ -4,4 +4,4 @@ import org.jastadd.ragconnect.ast.*;
:};
%goal goal;
%goal ragconnect;
%goal connect_specification_file;
RagConnect ragconnect
= endpoint_definition.d ragconnect.r {: r.getEndpointDefinitionList().insertChild(d, 0); return r; :}
| dependency_definition.d ragconnect.r {: r.getDependencyDefinitionList().insertChild(d, 0); return r; :}
| mapping_definition.d ragconnect.r {: r.getMappingDefinitionList().insertChild(d, 0); return r; :}
| comment ragconnect.r {: return r; :}
| {: return new RagConnect(); :}
ConnectSpecificationFile connect_specification_file
= endpoint_definition.d connect_specification_file.r
{:
r.getEndpointDefinitionList().insertChild(d, 0); return r;
:}
| dependency_definition.d connect_specification_file.r
{:
r.getDependencyDefinitionList().insertChild(d, 0); return r;
:}
| mapping_definition.d connect_specification_file.r
{:
r.getMappingDefinitionList().insertChild(d, 0); return r;
:}
| comment connect_specification_file.r
{:
return r;
:}
| {: return new ConnectSpecificationFile(); :}
;
%embed {:
......@@ -33,23 +45,58 @@ EndpointDefinition endpoint_definition
EndpointDefinition endpoint_definition_type
= RECEIVE token_ref {: return new ReceiveTokenEndpointDefinition().setToken(token_ref); :}
| SEND token_ref {: return new SendTokenEndpointDefinition().setToken(token_ref); :}
| RECEIVE TREE type_ref {: return new ReceiveTypeEndpointDefinition().setType(type_ref); :}
| RECEIVE TREE WITH ADD type_ref
{:
ReceiveTypeEndpointDefinition result = new ReceiveTypeEndpointDefinition();
result.setType(type_ref);
result.setWithAdd(true);
return result;
:}
| SEND TREE type_ref {: return new SendTypeEndpointDefinition().setType(type_ref); :}
| RECEIVE LIST type_ref
{:
ReceiveTypeEndpointDefinition result = new ReceiveTypeEndpointDefinition();
result.setType(type_ref);
result.setUseList(true);
return result;
:}
| RECEIVE LIST WITH ADD type_ref
{:
ReceiveTypeEndpointDefinition result = new ReceiveTypeEndpointDefinition();
result.setType(type_ref);
result.setWithAdd(true);
result.setUseList(true);
return result;
:}
| SEND LIST type_ref
{:
SendTypeEndpointDefinition result = new SendTypeEndpointDefinition();
result.setType(type_ref);
result.setUseList(true);
return result;
:}
;
TokenComponent token_ref
= ID.type_name DOT ID.token_name {: return TokenComponent.createRef(type_name + "." + token_name); :}
;
TypeComponent type_ref
= ID.parent_type_name DOT ID.child_type_name {: return TypeComponent.createRef(parent_type_name + "." + child_type_name); :}
;
ArrayList string_list
= ID
| string_list COMMA ID
;
DependencyDefinition dependency_definition
= ID.target_type DOT ID.target_token CAN_DEPEND_ON ID.source_type DOT ID.source_token AS ID.id SCOL
= ID.target_type DOT ID.target_component CAN_DEPEND_ON ID.source_type DOT ID.source_token AS ID.id SCOL
{:
DependencyDefinition result = new DependencyDefinition();
result.setSource(TokenComponent.createRef(source_type + "." + source_token));
result.setTarget(TokenComponent.createRef(target_type + "." + target_token));
result.setTarget(Component.createRef(target_type + "." + target_component));
result.setID(id);
return result;
:}
......
......@@ -5,3 +5,7 @@
"maps" { return sym(Terminals.MAPS); }
"to" { return sym(Terminals.TO); }
"as" { return sym(Terminals.AS); }
"tree" { return sym(Terminals.TREE); }
"list" { return sym(Terminals.LIST); }
"with" { return sym(Terminals.WITH); }
"add" { return sym(Terminals.ADD); }
......@@ -3,11 +3,11 @@ package org.jastadd.ragconnect.compiler;
import beaver.Parser;
import org.jastadd.option.BooleanOption;
import org.jastadd.option.ValueOption;
import org.jastadd.relast.compiler.AbstractCompiler;
import org.jastadd.relast.compiler.CompilerException;
import org.jastadd.ragconnect.ast.*;
import org.jastadd.ragconnect.parser.RagConnectParser;
import org.jastadd.ragconnect.scanner.RagConnectScanner;
import org.jastadd.relast.compiler.AbstractCompiler;
import org.jastadd.relast.compiler.CompilerException;
import java.io.*;
import java.nio.file.Files;
......@@ -15,6 +15,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Compiler extends AbstractCompiler {
......@@ -25,11 +27,15 @@ public class Compiler extends AbstractCompiler {
private BooleanOption optionVerbose;
private BooleanOption optionLogReads;
private BooleanOption optionLogWrites;
private BooleanOption optionLogIncremental;
private BooleanOption optionExperimentalJastAdd329;
private static final String OPTION_PROTOCOL_MQTT = "mqtt";
private static final String OPTION_PROTOCOL_REST = "rest";
private static final String OPTION_PROTOCOL_JAVA = "java";
private final static Logger LOGGER = Logger.getLogger(Compiler.class.getName());
public Compiler() {
super("ragconnect", true);
}
......@@ -44,8 +50,11 @@ public class Compiler extends AbstractCompiler {
getConfiguration().printHelp(System.out);
return 0;
}
if (optionVerbose.value()) {
LOGGER.setLevel(Level.FINE);
}
printMessage("Running RagConnect " + readVersion());
LOGGER.info(() -> "Running RagConnect " + readVersion());
if (!getConfiguration().outputDir().exists()) {
try {
......@@ -74,10 +83,11 @@ public class Compiler extends AbstractCompiler {
}
*/
if (!ragConnect.errors().isEmpty()) {
System.err.println("Errors:");
StringBuilder sb = new StringBuilder("Errors:\n");
for (ErrorMessage e : ragConnect.errors()) {
System.err.println(e);
sb.append(e).append("\n");
}
LOGGER.severe(sb::toString);
System.exit(1);
}
......@@ -89,7 +99,7 @@ public class Compiler extends AbstractCompiler {
return 0;
}
printMessage("Writing output files");
LOGGER.fine("Writing output files");
final List<String> handlers = new ArrayList<>();
if (ASTNode.usesMqtt) {
handlers.add("MqttHandler.jadd");
......@@ -123,12 +133,11 @@ public class Compiler extends AbstractCompiler {
}
public static void main(String[] args) {
System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager");
System.setProperty("mustache.debug", "true");
try {
new Compiler().run(args);
} catch (CompilerException e) {
e.printStackTrace();
LOGGER.log(Level.SEVERE, e.getMessage(), e);
System.exit(1);
}
}
......@@ -153,12 +162,6 @@ public class Compiler extends AbstractCompiler {
}
}
private void printMessage(String message) {
if (optionVerbose.value()) {
System.out.println(message);
}
}
private void writeToFile(Path path, String str) throws CompilerException {
try (BufferedWriter writer = Files.newBufferedWriter(path)) {
writer.append(str);
......@@ -192,6 +195,12 @@ public class Compiler extends AbstractCompiler {
optionLogWrites = addOption(
new BooleanOption("logWrites", "Enable logging for every write.")
.defaultValue(false));
optionLogIncremental = addOption(
new BooleanOption("logIncremental", "Enable logging for observer in incremental dependency tracking.")
.defaultValue(false));
optionExperimentalJastAdd329 = addOption(
new BooleanOption("experimental-jastadd-329", "Use trace events INC_FLUSH_START and INC_FLUSH_END (JastAdd issue #329).")
.defaultValue(false));
}
private RagConnect parseProgram(Collection<String> files) throws CompilerException {
......@@ -212,64 +221,75 @@ public class Compiler extends AbstractCompiler {
case "ast":
case "relast":
// processGrammar
parseGrammar(program, filename);
program.addGrammarFile(parseGrammar(filename));
atLeastOneGrammar = true;
break;
case "connect":
case "ragconnect":
// process ragConnect
RagConnect parsedRagConnect = parseRagConnect(program, filename);
mergeRagConnectDefinitions(ragConnect, parsedRagConnect);
ragConnect.addConnectSpecificationFile(parseConnectSpec(filename));
atLeastOneRagConnect = true;
break;
default:
throw new CompilerException("Unknown file extension in " + filename);
throw new CompilerException("Unknown file extension " + extension + " in " + filename);
}
}
if (!atLeastOneGrammar) {
System.err.println("No grammar file specified! (*.ast, *.relast)");
LOGGER.severe(() -> "No grammar file specified! (*.ast, *.relast)");
}
if (!atLeastOneRagConnect) {
System.err.println("No ragconnect file specified! (*.connect, *.ragconnect)");
LOGGER.severe(() -> "No ragconnect file specified! (*.connect, *.ragconnect)");
}
if (!atLeastOneGrammar && !atLeastOneRagConnect) {
System.exit(1);
}
// here, the program subtree is also flushed and resolved
ragConnect.flushTreeCache();
ragConnect.treeResolveAll();
ragConnect.additionalRelations().forEach(ragConnectGrammarPart::addDeclaration);
ragConnect.additionalTokens().forEach(TypeDecl::addComponent);
ASTNode.loggingEnabledForReads = optionLogReads.value();
ASTNode.loggingEnabledForWrites = optionLogWrites.value();
ASTNode.loggingEnabledForIncremental = optionLogIncremental.value();
ASTNode.experimentalJastAdd329 = optionExperimentalJastAdd329.value();
// reuse "--incremental" option of JastAdd
ASTNode.incrementalOptionActive = getConfiguration().incremental() && getConfiguration().traceFlush();
LOGGER.fine(() -> "ASTNode.incrementalOptionActive = " + ASTNode.incrementalOptionActive);
// reuse "--List" option of JastAdd
ASTNode.JastAddList = getConfiguration().listType();
ASTNode.usesMqtt = optionProtocols.hasValue(OPTION_PROTOCOL_MQTT);
ASTNode.usesJava = optionProtocols.hasValue(OPTION_PROTOCOL_JAVA);
ASTNode.usesRest = optionProtocols.hasValue(OPTION_PROTOCOL_REST);
return ragConnect;
}
private void parseGrammar(Program program, String filename) throws CompilerException {
private GrammarFile parseGrammar(String filename) throws CompilerException {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(filename))) {
RagConnectScanner scanner = new RagConnectScanner(reader);
RagConnectParser parser = new RagConnectParser();
GrammarFile grammarFile = (GrammarFile) parser.parse(scanner);
if (optionVerbose.value()) {
grammarFile.dumpTree(System.out);
LOGGER.fine(grammarFile::dumpTree);
}
program.addGrammarFile(grammarFile);
//grammarFile.treeResolveAll();
grammarFile.setFileName(toBaseName(filename));
return grammarFile;
} catch (IOException | Parser.Exception e) {
throw new CompilerException("Could not parse grammar file " + filename, e);
}
}
private RagConnect parseRagConnect(Program program, String filename) throws CompilerException {
private ConnectSpecificationFile parseConnectSpec(String filename) throws CompilerException {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(filename))) {
RagConnectScanner scanner = new RagConnectScanner(reader);
RagConnectParser parser = new RagConnectParser();
RagConnect ragConnect = (RagConnect) parser.parse(scanner, RagConnectParser.AltGoals.ragconnect);
ragConnect.setProgram(program);
ragConnect.setFileName(toBaseName(filename));
return ragConnect;
ConnectSpecificationFile specificationFile = (ConnectSpecificationFile) parser.parse(scanner, RagConnectParser.AltGoals.connect_specification_file);
specificationFile.setFileName(toBaseName(filename));
return specificationFile;
} catch (IOException | Parser.Exception e) {
throw new CompilerException("Could not parse connect file " + filename, e);
}
......@@ -284,12 +304,6 @@ public class Compiler extends AbstractCompiler {
return new File(filename).getName();
}
private void mergeRagConnectDefinitions(RagConnect ragConnect, RagConnect ragConnectToIntegrate) {
ragConnect.setEndpointDefinitionList(ragConnectToIntegrate.getEndpointDefinitionList());
ragConnect.setMappingDefinitionList(ragConnectToIntegrate.getMappingDefinitionList());
ragConnect.setDependencyDefinitionList(ragConnectToIntegrate.getDependencyDefinitionList());
}
// protected void printUsage() {
// System.out.println("Usage: java -jar ragconnect.jar [--option1] [--option2=value] ... <filename1> <filename2> ... ");
// System.out.println("Options:");
......
package org.jastadd.ragconnect.compiler;
import org.jastadd.ragconnect.ast.Document;
import org.jastadd.ragconnect.ast.ListElement;
import org.jastadd.ragconnect.ast.MappingElement;
/**
* Testing Ros2Rag without parser.
*
* @author rschoene - Initial contribution
*/
public class SimpleMain {
private static void printManualYAML() {
Document doc = new Document();
MappingElement root = new MappingElement();
MappingElement firstLevel = new MappingElement();
firstLevel.put("server", "tcp://localhost:1883");
firstLevel.put("robot_speed_factor", ".7");
MappingElement theTopics = new MappingElement();
theTopics.put("robotConfig", "robotconfig");
theTopics.put("trajectory", "trajectory");
theTopics.put("nextStep", "ros2rag/nextStep");
firstLevel.put("topics", theTopics);
firstLevel.put("zone_size", "0.5");
ListElement theZones = new ListElement();
theZones.add("1 1");
theZones.add("0 1");
theZones.add("-1 1");
firstLevel.put("zones", theZones);
MappingElement pandaParts = new MappingElement();
MappingElement thePanda = new MappingElement();
thePanda.put("Link0", "panda_link0");
thePanda.put("Link1", "panda_link1");
thePanda.put("Link2", "panda_link2");
thePanda.put("Link3", "panda_link3");
thePanda.put("Link4", "panda_link4");
thePanda.put("Link5", "panda_link5");
thePanda.put("Link6", "panda_link6");
thePanda.put("RightFinger", "panda_rightfinger");
thePanda.put("LeftFinger", "panda_leftfinger");
pandaParts.put("panda", thePanda);
firstLevel.put("parts", pandaParts);
MappingElement endEffectorParts = new MappingElement();
MappingElement endEffector = new MappingElement();
endEffector.put("EndEffector", "panda_hand");
endEffectorParts.put("panda", endEffector);
firstLevel.put("end_effectors", endEffectorParts);
ListElement theGoalPoses = new ListElement();
theGoalPoses.add(makePose("0.4 0.4 0.3"));
theGoalPoses.add(makePose("-0.4 0.4 0.3"));
theGoalPoses.add(makePose("-0.4 -0.4 0.3"));
theGoalPoses.add(makePose("0.4 0.4 0.3"));
theGoalPoses.add(makePose("-0.4 0.4 0.3"));
theGoalPoses.add(makePose("0.4 0.4 0.3"));
firstLevel.put("goal_poses", theGoalPoses);
root.put("panda_mqtt_connector", firstLevel);
doc.setRootElement(root);
System.out.println(doc.prettyPrint());
}
private static MappingElement makePose(String position) {
MappingElement goalPose = new MappingElement();
goalPose.put("position", position);
goalPose.put("orientation", "1 1 0 0");
goalPose.put("work", "20000");
return goalPose;
}
public static void main(String[] args) {
printManualYAML();
}
}
{{#hasTreeListEndpoints}}
public void {{JastAddList}}.serialize(com.fasterxml.jackson.core.JsonGenerator g) throws SerializationException {
try {
g.writeStartArray();
for (T child : this) {
child.serialize(g);
}
g.writeEndArray();
} catch (java.io.IOException e) {
throw new SerializationException("unable to serialize {{JastAddList}}", e);
}
}
{{#typesForReceivingListEndpoints}}
public static {{JastAddList}}<{{Name}}> {{Name}}.deserializeList(com.fasterxml.jackson.databind.node.ArrayNode node) throws DeserializationException {
{{JastAddList}}<{{Name}}> result = new {{JastAddList}}<>();
for (java.util.Iterator<com.fasterxml.jackson.databind.JsonNode> it = node.elements(); it.hasNext();) {
com.fasterxml.jackson.databind.JsonNode element = it.next();
result.add(deserialize(element));
}
return result;
}
{{/typesForReceivingListEndpoints}}
{{/hasTreeListEndpoints}}
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
aspect MqttHandler {
import java.util.function.BiConsumer;aspect MqttHandler {
public class MqttServerHandler {
private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, java.util.function.BiConsumer<String, byte[]>> tokensForRemoval = new java.util.HashMap<>();
private long time;
private java.util.concurrent.TimeUnit unit;
private String name;
......@@ -15,7 +15,7 @@ public class MqttServerHandler {
public MqttServerHandler(String name) {
this.name = name;
setupWaitUntilReady(1, TimeUnit.SECONDS);
setupWaitUntilReady(1, java.util.concurrent.TimeUnit.SECONDS);
}
public void setupWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
......@@ -23,7 +23,7 @@ public class MqttServerHandler {
this.unit = unit;
}
public MqttHandler resolveHandler(java.net.URI uri) throws IOException {
public MqttHandler resolveHandler(java.net.URI uri) throws java.io.IOException {
MqttHandler handler = handlers.get(uri.getHost());
if (handler == null) {
// first connect to that server
......@@ -39,33 +39,37 @@ public class MqttServerHandler {
return handler;
}
public ConnectToken newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException {
public ConnectToken newConnection(java.net.URI uri, java.util.function.BiConsumer<String, byte[]> callback) throws java.io.IOException {
ConnectToken connectToken = new ConnectToken(uri);
resolveHandler(uri).newConnection(extractTopic(uri), callback);
tokensForRemoval.put(connectToken, callback);
return connectToken;
}
public boolean disconnect(ConnectToken connectToken) throws IOException {
public boolean disconnect(ConnectToken connectToken) throws java.io.IOException {
MqttHandler handler = resolveHandler(connectToken.uri);
return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false;
}
public void publish(java.net.URI uri, byte[] bytes) throws IOException {
public void publish(java.net.URI uri, byte[] bytes) throws java.io.IOException {
resolveHandler(uri).publish(extractTopic(uri), bytes);
}
public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws IOException {
public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws java.io.IOException {
resolveHandler(uri).publish(extractTopic(uri), bytes, retain);
}
public void publish(java.net.URI uri, byte[] bytes,
org.fusesource.mqtt.client.QoS qos, boolean retain) throws IOException {
org.fusesource.mqtt.client.QoS qos, boolean retain) throws java.io.IOException {
resolveHandler(uri).publish(extractTopic(uri), bytes, qos, retain);
}
public static String extractTopic(java.net.URI uri) {
String path = uri.getPath();
if (uri.getFragment() != null) {
// do not also append fragment, as it is illegal, that anything follows "#" in a mqtt topic anyway
path += "#";
}
if (path.charAt(0) == '/') {
path = path.substring(1);
}
......@@ -99,7 +103,8 @@ public class MqttHandler {
private boolean sendWelcomeMessage = true;
private org.fusesource.mqtt.client.QoS qos;
/** Dispatch knowledge */
private final java.util.Map<String, java.util.List<java.util.function.Consumer<byte[]>>> callbacks;
private final java.util.Map<String, java.util.List<java.util.function.BiConsumer<String, byte[]>>> normalCallbacks;
private final java.util.Map<java.util.regex.Pattern, java.util.List<java.util.function.BiConsumer<String, byte[]>>> wildcardCallbacks;
public MqttHandler() {
this("RagConnect");
......@@ -108,7 +113,8 @@ public class MqttHandler {
public MqttHandler(String name) {
this.name = java.util.Objects.requireNonNull(name, "Name must be set");
this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class);
this.callbacks = new java.util.HashMap<>();
this.normalCallbacks = new java.util.HashMap<>();
this.wildcardCallbacks = new java.util.HashMap<>();
this.readyLatch = new java.util.concurrent.CountDownLatch(1);
this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
}
......@@ -119,17 +125,23 @@ public class MqttHandler {
}
/**
* Sets the host (with default port) to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* Sets the host to receive messages from, and connects to it.
* @param host name of the host to connect to, format is either <code>"$name"</code> or <code>"$name:$port"</code>
* @throws java.io.IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttHandler setHost(String host) throws java.io.IOException {
if (host.contains(":")) {
int colon_index = host.indexOf(":");
return setHost(host.substring(0, colon_index),
Integer.parseInt(host.substring(colon_index + 1)));
}
return setHost(host, DEFAULT_PORT);
}
/**
* Sets the host to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @throws java.io.IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttHandler setHost(String host, int port) throws java.io.IOException {
......@@ -160,13 +172,17 @@ public class MqttHandler {
org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
// this method is called, whenever a MQTT message is received
String topicString = topic.toString();
java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topicString);
if (callbackList == null || callbackList.isEmpty()) {
logger.debug("Got a message, but no callback to call. Forgot to subscribe?");
java.util.List<java.util.function.BiConsumer<String, byte[]>> callbackList = callbacksFor(topicString);
if (callbackList.isEmpty()) {
logger.debug("Got a message at {}, but no callback to call. Forgot to subscribe?", topic);
} else {
byte[] message = body.toByteArray();
for (java.util.function.Consumer<byte[]> callback : callbackList) {
callback.accept(message);
for (java.util.function.BiConsumer<String, byte[]> callback : callbackList) {
try {
callback.accept(topicString, message);
} catch (Exception e) {
logger.catching(e);
}
}
}
ack.onSuccess(null); // always acknowledge message
......@@ -188,7 +204,7 @@ public class MqttHandler {
throwIf(error);
// actually establish the connection
connection.connect(new org.fusesource.mqtt.client.Callback<Void>() {
connection.connect(new org.fusesource.mqtt.client.Callback<>() {
@Override
public void onSuccess(Void value) {
if (MqttHandler.this.sendWelcomeMessage) {
......@@ -196,7 +212,7 @@ public class MqttHandler {
(name + " is connected").getBytes(),
org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE,
false,
new org.fusesource.mqtt.client.Callback<Void>() {
new org.fusesource.mqtt.client.Callback<>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
......@@ -222,6 +238,20 @@ public class MqttHandler {
return this;
}
private java.util.List<java.util.function.BiConsumer<String, byte[]>> callbacksFor(String topicString) {
java.util.List<java.util.function.BiConsumer<String, byte[]>> result = new java.util.ArrayList<>();
List<BiConsumer<String, byte[]>> normalCallbackList = normalCallbacks.get(topicString);
if (normalCallbackList != null) {
result.addAll(normalCallbackList);
}
wildcardCallbacks.forEach((topicPattern, callback) -> {
if (topicPattern.matcher(topicString).matches()) {
result.addAll(callback);
}
});
return result;
}
public java.net.URI getHost() {
return host;
}
......@@ -240,61 +270,147 @@ public class MqttHandler {
this.qos = qos;
}
/**
* Establish a new connection for some topic.
* @param topic the topic to create a connection for, may contain the wildcards "*" and "#"
* @param callback the callback to run if a new message arrives for this topic
* @return true if successful stored this connection, false otherwise (e.g., on failed subscribe)
*/
public boolean newConnection(String topic, java.util.function.Consumer<byte[]> callback) {
return newConnection(topic, (ignoredTopicString, bytes) -> callback.accept(bytes));
}
/**
* Establish a new connection for some topic.
* @param topic the topic to create a connection for, may contain the wildcards "*" and "#"
* @param callback the callback to run if a new message arrives for this topic
* @return true if successful stored this connection, false otherwise (e.g., on failed subscribe)
*/
public boolean newConnection(String topic, java.util.function.BiConsumer<String, byte[]> callback) {
if (readyLatch.getCount() > 0) {
System.err.println("Handler not ready");
return false;
}
// register callback
logger.debug("new connection for {}", topic);
if (callbacks.get(topic) == null) {
callbacks.put(topic, new java.util.ArrayList<>());
final boolean needSubscribe;
if (isWildcardTopic(topic)) {
String regexForTopic = topic.replace("*", "[^/]*").replace("#", ".*");
java.util.regex.Pattern pattern = java.util.regex.Pattern.compile(regexForTopic);
wildcardCallbacks.computeIfAbsent(pattern, p -> new java.util.ArrayList<>())
.add(callback);
needSubscribe = true;
} else { // normal topic
java.util.List<java.util.function.BiConsumer<String, byte[]>> callbacksForTopic = normalCallbacks.get(topic);
if (callbacksForTopic == null || callbacksForTopic.isEmpty()) {
callbacksForTopic = new java.util.ArrayList<>();
normalCallbacks.put(topic, callbacksForTopic);
needSubscribe = true;
} else {
needSubscribe = false;
}
callbacksForTopic.add(callback);
}
if (needSubscribe) {
// subscribe at broker
CountDownLatch operationFinished = new CountDownLatch(1);
java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>(true);
org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) };
connection.getDispatchQueue().execute(() -> {
connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<byte[]>() {
connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<>() {
@Override
public void onSuccess(byte[] qoses) {
logger.debug("Subscribed to {}, qoses: {}", topic, qoses);
operationFinished.countDown();
}
@Override
public void onFailure(Throwable cause) {
logger.error("Could not subscribe to {}", topic, cause);
success.set(false);
operationFinished.countDown();
}
});
});
try {
operationFinished.await(2, TimeUnit.SECONDS);
return success.get();
} catch (InterruptedException e) {
return false;
}
callbacks.get(topic).add(callback);
} else {
return true;
}
}
private boolean isWildcardTopic(String topic) {
return topic.contains("*") || topic.contains("#");
}
public boolean disconnect(String topic, java.util.function.BiConsumer<String, byte[]> callback) {
boolean needUnsubscribe = false;
java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>(true);
boolean foundTopicInCallbacks = false;
// check if wildcard is to be removed
if (isWildcardTopic(topic)) {
java.util.regex.Pattern wildcardPatternToRemove = null;
for (java.util.Map.Entry<java.util.regex.Pattern, java.util.List<java.util.function.BiConsumer<String, byte[]>>> entry : wildcardCallbacks.entrySet()) {
if (entry.getKey().pattern().equals(topic)) {
foundTopicInCallbacks = true;
// if still successful, update with whether callback could be removed
success.compareAndSet(true, (entry.getValue().remove(callback)));
if (entry.getValue().isEmpty()) {
wildcardPatternToRemove = entry.getKey();
needUnsubscribe = true;
}
break;
}
}
;
if (wildcardPatternToRemove != null) {
wildcardCallbacks.remove(wildcardPatternToRemove);
}
} else if (normalCallbacks.containsKey(topic)) {
foundTopicInCallbacks = true;
// if still successful, update with whether callback could be removed
var normalCallbackList = normalCallbacks.get(topic);
success.compareAndSet(true, normalCallbackList.remove(callback));
needUnsubscribe |= normalCallbackList.isEmpty();
}
public boolean disconnect(String topic, Object callback) {
java.util.List<java.util.function.Consumer<byte[]>> callbackList = callbacks.get(topic);
if (callbackList == null) {
if (!foundTopicInCallbacks) {
logger.warn("Disconnect for not connected topic '{}'", topic);
return false;
}
java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>();
success.set(callbackList.remove(callback));
if (callbackList.isEmpty()) {
if (needUnsubscribe) {
java.util.concurrent.CountDownLatch operationFinished = new java.util.concurrent.CountDownLatch(1);
// no callbacks anymore for this topic, unsubscribe from mqtt
connection.getDispatchQueue().execute(() -> {
org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topic);
org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer};
connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<Void>() {
connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<>() {
@Override
public void onSuccess(Void value) {
// empty, all good
operationFinished.countDown();
}
@Override
public void onFailure(Throwable cause) {
success.set(false);
logger.warn("Could not disconnect from {}", topic, cause);
operationFinished.countDown();
}
});
});
try {
operationFinished.await(2, java.util.concurrent.TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.catching(e);
success.set(false);
}
}
return success.get();
}
......@@ -323,7 +439,7 @@ public class MqttHandler {
return;
}
connection.getDispatchQueue().execute(() -> {
connection.disconnect(new org.fusesource.mqtt.client.Callback<Void>() {
connection.disconnect(new org.fusesource.mqtt.client.Callback<>() {
@Override
public void onSuccess(Void value) {
logger.info("Disconnected {} from {}", name, host);
......@@ -347,7 +463,7 @@ public class MqttHandler {
public void publish(String topic, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) {
connection.getDispatchQueue().execute(() -> {
connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback<Void>() {
connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback<>() {
@Override
public void onSuccess(Void value) {
logger.debug("Published some bytes to {}", topic);
......
......@@ -4,8 +4,20 @@ try {
uri = new java.net.URI({{connectParameterName}});
scheme = uri.getScheme();
host = uri.getHost();
path = uri.getPath();
path = uri.getPath() + (uri.getFragment() != null ? "#" : "");
} catch (java.net.URISyntaxException e) {
System.err.println(e.getMessage()); // Maybe re-throw error?
return false;
}
if (scheme == null || scheme.isBlank()) {
System.err.println("Missing or empty scheme in " + uri);
return false;
}
if (host == null || host.isBlank()) {
System.err.println("Missing or empty host in " + uri);
return false;
}
if (path == null || path.isBlank()) {
System.err.println("Missing or empty path in " + uri);
return false;
}
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
{{lastDefinitionToType}} {{lastResult}};
{{{lastDefinitionToType}}} {{lastResult}};
try {
{{#InnerMappingDefinitions}}
{{^last}}{{toType}} {{/last}}{{outputVarName}} = {{methodName}}({{inputVarName}});
{{^last}}{{{toType}}} {{/last}}{{outputVarName}} = {{methodName}}({{inputVarName}});
{{/InnerMappingDefinitions}}
} catch (RagConnectRejectMappingException e) {
// do not print message in case of rejection
{{preemptiveReturn}}
} catch (Exception e) {
e.printStackTrace();
{{preemptiveReturn}}
......
protected static {{toType}} ASTNode.{{methodName}}({{fromType}} {{fromVariableName}}) throws Exception {
{{{content}}}{{!maybe print line by line to get better indentation}}
protected static {{{toType}}} ASTNode.{{methodName}}({{{fromType}}} {{fromVariableName}}) throws Exception {
{{{content}}}
}
#Tue Jan 19 12:08:02 CET 2021
version=0.2.5
#Thu Jun 24 17:13:44 CEST 2021
version=0.3.2-alpha
{{#usesMqtt}}{{> mqtt}}{{/usesMqtt}}
{{> handler}}
aspect ROS2RAG {
{{#ReceiveDefinitions}}
aspect RagConnect {
{{#TokenReceiveDefinitions}}
{{> receiveDefinition}}
{{/ReceiveDefinitions}}
{{/TokenReceiveDefinitions}}
{{#SendDefinitions}}
{{#TokenSendDefinitions}}
{{> sendDefinition}}
{{/SendDefinitions}}
{{/TokenSendDefinitions}}
{{#TypeReceiveDefinitions}}
{{> receiveDefinition}}
{{/TypeReceiveDefinitions}}
{{#TypeSendDefinitions}}
{{> sendDefinition}}
{{/TypeSendDefinitions}}
class RagConnectRejectMappingException extends RuntimeException {}
private static void ASTNode.reject() {
throw new RagConnectRejectMappingException();
}
{{#MappingDefinitions}}
{{#isUsed}}
{{> mappingDefinition}}
{{/isUsed}}
{{/MappingDefinitions}}
{{#DependencyDefinitions}}
......@@ -20,4 +35,144 @@ aspect ROS2RAG {
{{#TokenComponents}}
{{> tokenComponent}}
{{/TokenComponents}}
{{> ListAspect}}
public void {{rootNodeName}}.ragconnectCheckIncremental() {
{{#incrementalOptionActive}}
// check if --tracing is active
trace().getReceiver();
// check if tracing of INC_FLUSH_ATTR is possible, i.e., if --tracing=flush
ASTState.Trace.Event checkTracing = ASTState.Trace.Event.INC_FLUSH_ATTR;
// check if --rewrite is active
mayHaveRewrite();
// check if --incremental is active
Object checkIncremental = inc_throwAway_visited;
{{/incrementalOptionActive}}
}
}
{{#incrementalOptionActive}}
aspect RagConnectObserver {
class RagConnectObserver implements ASTState.Trace.Receiver {
class RagConnectObserverEntry {
final ConnectToken connectToken;
final ASTNode node;
final String attributeString;
final Runnable attributeCall;
RagConnectObserverEntry(ConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
this.connectToken = connectToken;
this.node = node;
this.attributeString = attributeString;
this.attributeCall = attributeCall;
}
}
{{#experimentalJastAdd329}}
class RagConnectObserverStartEntry {
final ASTNode node;
final String attributeString;
final Object flushIncToken;
RagConnectObserverStartEntry(ASTNode node, String attributeString, Object flushIncToken) {
this.node = node;
this.attributeString = attributeString;
this.flushIncToken = flushIncToken;
}
}
{{/experimentalJastAdd329}}
ASTState.Trace.Receiver oldReceiver;
java.util.List<RagConnectObserverEntry> observedNodes = new java.util.ArrayList<>();
{{#experimentalJastAdd329}}
java.util.Set<RagConnectObserverEntry> entryQueue = new java.util.HashSet<>();
RagConnectObserverStartEntry startEntry = null;
{{/experimentalJastAdd329}}
RagConnectObserver(ASTNode node) {
// set the receiver. potentially dangerous because overriding existing receiver!
oldReceiver = node.trace().getReceiver();
node.trace().setReceiver(this);
}
void add(ConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) {
{{#loggingEnabledForIncremental}}
System.out.println("** observer add: " + node + " on " + attributeString);
{{/loggingEnabledForIncremental}}
observedNodes.add(new RagConnectObserverEntry(connectToken, node, attributeString, attributeCall));
}
void remove(ConnectToken connectToken) {
observedNodes.removeIf(entry -> entry.connectToken.equals(connectToken));
}
@Override
public void accept(ASTState.Trace.Event event, ASTNode node, String attribute, Object params, Object value) {
oldReceiver.accept(event, node, attribute, params, value);
{{#experimentalJastAdd329}}
// react to INC_FLUSH_START and remember entry
if (event == ASTState.Trace.Event.INC_FLUSH_START && startEntry == null) {
{{#loggingEnabledForIncremental}}
System.out.println("** observer start: " + node + " on " + attribute);
{{/loggingEnabledForIncremental}}
startEntry = new RagConnectObserverStartEntry(node, attribute, value);
return;
}
// react to INC_FLUSH_END and process queued entries, if it matches start entry
if (event == ASTState.Trace.Event.INC_FLUSH_END &&
node == startEntry.node &&
attribute == startEntry.attributeString &&
value == startEntry.flushIncToken) {
// create a copy of the queue to avoid entering this again causing an endless recursion
RagConnectObserverEntry[] entriesToProcess = entryQueue.toArray(new RagConnectObserverEntry[entryQueue.size()]);
entryQueue.clear();
startEntry = null;
{{#loggingEnabledForIncremental}}
System.out.println("** observer process (" + entriesToProcess.length + "): " + node + " on " + attribute);
{{/loggingEnabledForIncremental}}
for (RagConnectObserverEntry entry : entriesToProcess) {
entry.attributeCall.run();
}
return;
}
{{/experimentalJastAdd329}}
// ignore all other events but INC_FLUSH_ATTR
if (event != ASTState.Trace.Event.INC_FLUSH_ATTR) {
return;
}
{{#loggingEnabledForIncremental}}
System.out.println("** observer check INC_FLUSH_ATTR event: " + node + " on " + attribute);
{{/loggingEnabledForIncremental}}
// iterate through list, if matching pair. could maybe be more efficient.
for (RagConnectObserverEntry entry : observedNodes) {
if (entry.node.equals(node) && entry.attributeString.equals(attribute)) {
// hit. call the attribute/nta-token
{{#loggingEnabledForIncremental}}
System.out.println("** observer hit: " + entry.node + " on " + entry.attributeString);
{{/loggingEnabledForIncremental}}
{{#experimentalJastAdd329}}
entryQueue.add(entry);
{{/experimentalJastAdd329}}
{{^experimentalJastAdd329}}
entry.attributeCall.run();
{{/experimentalJastAdd329}}
}
}
}
}
private static RagConnectObserver ASTNode._ragConnectObserverInstance;
RagConnectObserver ASTNode._ragConnectObserver() {
if (_ragConnectObserverInstance == null) {
// does not matter, which node is used to create the observer as ASTState/tracing is also static
_ragConnectObserverInstance = new RagConnectObserver(this);
}
return _ragConnectObserverInstance;
}
}
{{/incrementalOptionActive}}
{{#typeIsList}}
{{^UseList}}
/* first try with resolve to type
syn {{typeName}} {{parentTypeName}}.{{resolveInListAttributeName}}(String topic) {
for ({{typeName}} element : get{{entityName}}()) {
if (element.get{{idTokenName}}().equals(topic)) {
return element;
}
}
return null;
}
*/
syn int {{parentTypeName}}.{{resolveInListAttributeName}}(String topic) {
for (int index = 0; index < getNum{{entityName}}(); index++) {
if (get{{entityName}}(index).get{{idTokenName}}().equals(topic)) {
return index;
}
}
return -1;
}
{{/UseList}}
{{/typeIsList}}
/**
* Connects the receive endpoint {{entityName}}.
{{#typeIsList}}{{#isWithAdd}}
* New values are appended to the end of the list.
{{/isWithAdd}}{{/typeIsList}}
* @param {{connectParameterName}} string describing protocol and path as an URI
{{#typeIsList}}{{^UseList}}{{^isWithAdd}}
* @param index index of node in list to connect (the list is expected to have enough elements)
{{/isWithAdd}}{{/UseList}}{{/typeIsList}}
* @return true if connect was successful, false otherwise
* @throws java.io.IOException if connect failed
*/
public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterName}}{{#typeIsList}}{{^UseList}}{{^isWithAdd}}, int index{{/isWithAdd}}{{/UseList}}{{/typeIsList}}) throws java.io.IOException {
java.util.function.BiConsumer<String, byte[]> consumer = (topic, message) -> {
{{> mappingApplication}}
{{#loggingEnabledForReads}}
System.out.println("[Receive] " + {{connectParameterName}} + " -> {{entityName}} = " + {{lastResult}});
{{/loggingEnabledForReads}}
{{#isTypeEndpointDefinition}}
{{lastResult}}.treeResolveAll();
{{#typeIsList}}
{{#UseList}}
{{#isWithAdd}}
{{getterMethod}}().addAll({{lastResult}});
{{/isWithAdd}}
{{^isWithAdd}}
set{{entityName}}({{lastResult}});
{{/isWithAdd}}
{{/UseList}}
{{^UseList}}
{{lastResult}}.set{{idTokenName}}(topic);
{{#isWithAdd}}
{{getterMethod}}().add({{lastResult}});
{{/isWithAdd}}
{{^isWithAdd}}
set{{entityName}}({{lastResult}}, index);
{{/isWithAdd}}
{{/UseList}}
{{/typeIsList}}
{{^typeIsList}}
set{{entityName}}({{lastResult}});
{{/typeIsList}}
{{/isTypeEndpointDefinition}}
{{^isTypeEndpointDefinition}}
set{{entityName}}({{lastResult}});
{{/isTypeEndpointDefinition}}
};
return {{internalConnectMethod}}({{connectParameterName}}, consumer);
}
{{#typeIsList}}{{^UseList}}{{^isWithAdd}}
/**
* Connects the receive endpoint {{entityName}} using a "wildcard" URI (if supported by the chosen protocol).
* @param {{connectParameterName}} string describing protocol and path as an URI
* @return true if connect was successful, false otherwise
* @throws java.io.IOException if connect failed
*/
public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterName}}) throws java.io.IOException {
{{>handleUri}}
java.util.function.Consumer<byte[]> consumer = message -> {
java.util.function.BiConsumer<String, byte[]> consumer = (topic, message) -> {
{{> mappingApplication}}
{{#loggingEnabledForReads}}
System.out.println("[Receive] " + {{connectParameterName}} + " -> {{tokenName}} = " + {{lastResult}});
System.out.println("[Receive] " + {{connectParameterName}} + " (" + topic + ") -> {{entityName}} = " + {{lastResult}});
{{/loggingEnabledForReads}}
set{{tokenName}}({{lastResult}});
{{lastResult}}.set{{idTokenName}}(topic);
int resolvedIndex = {{resolveInListAttributeName}}(topic);
if (resolvedIndex == -1) {
add{{entityName}}({{lastResult}});
} else {
set{{entityName}}({{lastResult}}, resolvedIndex);
}
};
return {{internalConnectMethod}}({{connectParameterName}}, consumer);
}
{{/isWithAdd}}{{/UseList}}{{/typeIsList}}
private boolean {{parentTypeName}}.{{internalConnectMethod}}(String {{connectParameterName}},
java.util.function.BiConsumer<String, byte[]> consumer) throws java.io.IOException {
{{>handleUri}}
ConnectToken connectToken;
switch (scheme) {
{{#usesMqtt}}
......@@ -29,7 +121,8 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
{{#usesRest}}
case "rest":
connectToken = {{restHandlerAttribute}}().newPUTConnection(uri, input -> {
consumer.accept(input.getBytes());
// TODO wildcard-topic not supported yet
consumer.accept("", input.getBytes());
});
if (connectToken == null) {
return false;
......
......@@ -3,6 +3,12 @@ private byte[] {{parentTypeName}}.{{lastValue}} = null;
public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterName}}, boolean writeCurrentValue) throws java.io.IOException {
{{>handleUri}}
ConnectToken connectToken;
if (connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>())
.get(uri) != null) {
System.err.println("Already connected for " + uri + " on " + this + "!");
return true;
}
switch (scheme) {
{{#usesMqtt}}
case "mqtt":
......@@ -10,7 +16,7 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
final String topic = {{mqttHandlerAttribute}}().extractTopic(uri);
{{sender}} = () -> {
{{#loggingEnabledForWrites}}
System.out.println("[Send] {{tokenName}} = " + get{{tokenName}}() + " -> " + {{connectParameterName}});
System.out.println("[Send] {{entityName}} = " + {{getterMethod}}() + " -> " + {{connectParameterName}});
{{/loggingEnabledForWrites}}
handler.publish(topic, {{lastValue}});
};
......@@ -18,6 +24,7 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
if (writeCurrentValue) {
{{writeMethod}}();
}
connectToken = new ConnectToken(uri);
break;
{{/usesMqtt}}
{{#usesJava}}
......@@ -35,26 +42,40 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
{{/usesJava}}
{{#usesRest}}
case "rest":
ConnectToken connectToken = {{restHandlerAttribute}}().newGETConnection(uri, () -> {
connectToken = {{restHandlerAttribute}}().newGETConnection(uri, () -> {
{{updateMethod}}();
return new String({{lastValue}});
});
if (connectToken == null) {
return false;
}
connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>())
.put(uri, connectToken);
break;
{{/usesRest}}
default:
System.err.println("Unknown protocol '" + scheme + "'.");
return false;
}
connectTokens.computeIfAbsent(this, astNode -> new java.util.HashMap<java.net.URI, ConnectToken>())
.put(uri, connectToken);
{{#incrementalOptionActive}}
_ragConnectObserver().add(connectToken, this, "{{getterMethod}}", () -> {
if (this.{{updateMethod}}()) {
this.{{writeMethod}}();
}
});
{{/incrementalOptionActive}}
return true;
}
public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameterName}}) throws java.io.IOException {
{{>handleUri}}
ConnectToken connectToken = connectTokens.get(this).remove(uri);
if (connectToken == null) {
System.err.println("Disconnect without connect for " + uri + " on " + this + "!");
}
{{#incrementalOptionActive}}
_ragConnectObserver().remove(connectToken);
{{/incrementalOptionActive}}
switch (scheme) {
{{#usesMqtt}}
case "mqtt":
......@@ -69,7 +90,7 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
{{/usesJava}}
{{#usesRest}}
case "rest":
{{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{restHandlerAttribute}}().disconnect(connectToken);
break;
{{/usesRest}}
default:
......
This diff is collapsed.
// --- update definitions ---
receive NativeTypes.BooleanValue;
receive NativeTypes.IntValue;
receive NativeTypes.ShortValue;
receive NativeTypes.LongValue;
......@@ -7,9 +8,52 @@ receive NativeTypes.DoubleValue;
receive NativeTypes.CharValue;
receive NativeTypes.StringValue;
receive NativeTypes.BooleanValueTransformed using BooleanTransformation;
receive NativeTypes.IntValueTransformed using IntTransformation;
receive NativeTypes.ShortValueTransformed using ShortTransformation;
receive NativeTypes.LongValueTransformed using LongTransformation;
receive NativeTypes.FloatValueTransformed using FloatTransformation;
receive NativeTypes.DoubleValueTransformed using DoubleTransformation;
receive NativeTypes.CharValueTransformed using CharTransformation;
receive NativeTypes.StringValueTransformed using StringTransformation;
receive BoxedTypes.BooleanValue;
receive BoxedTypes.IntValue;
receive BoxedTypes.ShortValue;
receive BoxedTypes.LongValue;
receive BoxedTypes.FloatValue;
receive BoxedTypes.DoubleValue;
receive BoxedTypes.CharValue;
receive BoxedTypes.BooleanValueTransformed using BooleanTransformation;
receive BoxedTypes.IntValueTransformed using IntTransformation;
receive BoxedTypes.ShortValueTransformed using ShortTransformation;
receive BoxedTypes.LongValueTransformed using LongTransformation;
receive BoxedTypes.FloatValueTransformed using FloatTransformation;
receive BoxedTypes.DoubleValueTransformed using DoubleTransformation;
receive BoxedTypes.CharValueTransformed using CharTransformation;
BooleanTransformation maps boolean b to boolean {:
return b;
:}
IntTransformation maps int i to int {:
return i;
:}
ShortTransformation maps short s to short {:
return s;
:}
LongTransformation maps long l to long {:
return l;
:}
FloatTransformation maps float f to float {:
return f;
:}
DoubleTransformation maps double d to double {:
return d;
:}
CharTransformation maps char c to char {:
return c;
:}
StringTransformation maps String s to String {:
return s;
:}
A ::= NativeTypes* BoxedTypes* ;
NativeTypes ::= <IntValue:int> <ShortValue:short> <LongValue:long> <FloatValue:float> <DoubleValue:double> <CharValue:char> <StringValue:String> ;
BoxedTypes ::= <IntValue:Integer> <ShortValue:Short> <LongValue:Long> <FloatValue:Float> <DoubleValue:Double> <CharValue:Character> ;
NativeTypes ::= <BooleanValue:boolean> <IntValue:int> <ShortValue:short> <LongValue:long> <FloatValue:float> <DoubleValue:double> <CharValue:char> <StringValue:String> <BooleanValueTransformed:boolean> <IntValueTransformed:int> <ShortValueTransformed:short> <LongValueTransformed:long> <FloatValueTransformed:float> <DoubleValueTransformed:double> <CharValueTransformed:char> <StringValueTransformed:String> ;
BoxedTypes ::= <BooleanValue:Boolean> <IntValue:Integer> <ShortValue:Short> <LongValue:Long> <FloatValue:Float> <DoubleValue:Double> <CharValue:Character> <BooleanValueTransformed:Boolean> <IntValueTransformed:Integer> <ShortValueTransformed:Short> <LongValueTransformed:Long> <FloatValueTransformed:Float> <DoubleValueTransformed:Double> <CharValueTransformed:Character> ;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment