Commit 3886c0f7 authored by René Schöne's avatar René Schöne
Browse files

Merge branch '21-allow-connection-endpoints-for-list-nonterminals' into 'dev'

Allow connection endpoints for list nonterminals

See merge request !10
parents 58ad4788 aa579aa0
Pipeline #10618 failed with stages
in 9 minutes and 12 seconds
......@@ -15,3 +15,20 @@
- Add methods to `disconnect` an endpoint
- Internal: PoC for incremental dependency tracking and subtree endpoint definitions ([#14](https://git-st.inf.tu-dresden.de/jastadd/ragconnect/-/issues/14))
- Bugfix [#17](https://git-st.inf.tu-dresden.de/jastadd/ragconnect/-/issues/17): Added missing support for `boolean`
## 0.2.2
- Allow normal tokens to be used in send definitions
## 0.2.1
- New communication protocol: REST
- Selection of protocol when `connect` methods are called, by scheme of given URI
- Development changes:
- Supported printing out YAML data used for mustache templates
- Moved string constants to `MRagConnect` structure
## 0.2.0
- Version submitted in paper "A Connection from ROS to RAG-Based Models" (2020)
- Supported communication protocols: MQTT
......@@ -95,4 +95,10 @@ aspect Analysis {
to MappingDefinition.effectiveUsedAt()
for each effectiveMappings();
// --- typeIsList ---
syn boolean EndpointDefinition.typeIsList() = false;
eq TypeEndpointDefinition.typeIsList() {
return getType().isListComponent();
}
}
......@@ -3,6 +3,7 @@ aspect Configuration {
public static boolean ASTNode.loggingEnabledForWrites = false;
public static boolean ASTNode.loggingEnabledForIncremental = false;
public static TypeDecl ASTNode.rootNode;
public static String ASTNode.JastAddList = "List";
public static boolean ASTNode.usesMqtt;
public static boolean ASTNode.usesRest;
public static boolean ASTNode.incrementalOptionActive;
......
......@@ -13,10 +13,10 @@ rel TokenEndpointDefinition.Token <-> TokenComponent.TokenEndpointDefinition*;
ReceiveTokenEndpointDefinition : TokenEndpointDefinition;
SendTokenEndpointDefinition : TokenEndpointDefinition;
abstract TypeEndpointDefinition : EndpointDefinition;
abstract TypeEndpointDefinition : EndpointDefinition ::= <UseList:boolean> ;
rel TypeEndpointDefinition.Type <-> TypeComponent.TypeEndpointDefinition*;
ReceiveTypeEndpointDefinition : TypeEndpointDefinition;
ReceiveTypeEndpointDefinition : TypeEndpointDefinition ::= <WithAdd:boolean>;
SendTypeEndpointDefinition : TypeEndpointDefinition;
DependencyDefinition ::= <ID>;
......
......@@ -4,6 +4,9 @@ Design considerations
*/
aspect AttributesForMustache {
// --- EndpointDefinition ---
syn String EndpointDefinition.idTokenName() = "InternalRagconnectTopicInList";
// --- MRagConnect ---
eq MRagConnect.getRootTypeComponent(int i).isFirst() = i == 0;
......@@ -15,6 +18,32 @@ aspect AttributesForMustache {
syn String MRagConnect.restHandlerAttribute() = "_restHandler";
syn String MRagConnect.restHandlerField() = "_restHandler";
syn boolean MRagConnect.hasTreeListEndpoints() = !sendingTreeListEndpoints().isEmpty() || !receivingTreeListEndpoints().isEmpty();
syn List<MTypeEndpointDefinition> MRagConnect.sendingTreeListEndpoints() {
List<MTypeEndpointDefinition> result = new ArrayList<>();
for (var mEndpointDef : getTypeSendDefinitionList()) {
if (mEndpointDef.typeIsList()) {
result.add(mEndpointDef);
}
}
return result;
}
syn List<MTypeEndpointDefinition> MRagConnect.receivingTreeListEndpoints() {
List<MTypeEndpointDefinition> result = new ArrayList<>();
for (var mEndpointDef : getTypeReceiveDefinitionList()) {
if (mEndpointDef.typeIsList()) {
result.add(mEndpointDef);
}
}
return result;
}
syn List<TypeDecl> MRagConnect.typesForReceivingListEndpoints() {
return receivingTreeListEndpoints().stream()
.map(mEndpointDef -> mEndpointDef.type().getTypeDecl())
.distinct()
.collect(java.util.stream.Collectors.toList());
}
// --- MEndpointDefinition ---
syn String MEndpointDefinition.preemptiveExpectedValue();
syn String MEndpointDefinition.preemptiveReturn();
......@@ -24,12 +53,14 @@ aspect AttributesForMustache {
syn String MEndpointDefinition.entityName();
syn String MEndpointDefinition.updateMethod();
syn String MEndpointDefinition.writeMethod();
syn String MEndpointDefinition.getterMethod();
eq MEndpointDefinition.getInnerMappingDefinition(int i).isLast() = i == getNumInnerMappingDefinition() - 1;
eq MEndpointDefinition.getInnerMappingDefinition(int i).inputVarName() = i == 0 ? firstInputVarName() : getInnerMappingDefinition(i - 1).outputVarName();
syn String MEndpointDefinition.connectParameterName() = "uriString";
syn String MEndpointDefinition.connectMethod() = "connect" + entityName();
syn String MEndpointDefinition.internalConnectMethod() = "_internal_" + connectMethod();
syn boolean MEndpointDefinition.isTypeEndpointDefinition() = endpointDef().isTypeEndpointDefinition();
syn String MEndpointDefinition.disconnectMethod() {
......@@ -55,6 +86,7 @@ aspect AttributesForMustache {
syn TokenComponent MEndpointDefinition.token() = endpointDef().asTokenEndpointDefinition().getToken();
syn TypeComponent MEndpointDefinition.type() = endpointDef().asTypeEndpointDefinition().getType();
syn boolean MEndpointDefinition.alwaysApply() = endpointDef().getAlwaysApply();
syn boolean MEndpointDefinition.typeIsList() = endpointDef().typeIsList();
syn String MEndpointDefinition.tokenName() = token().getName();
syn String MEndpointDefinition.typeName() = type().getName();
syn String MEndpointDefinition.typeDeclName() = type().getTypeDecl().getName();
......@@ -69,6 +101,10 @@ aspect AttributesForMustache {
if (endpointDef().isTokenEndpointDefinition() && token().isPrimitiveType() && lastDefinition().mappingDef().getToType().isPrimitiveType()) {
return preemptiveExpectedValue() + " == " + lastResult();
}
if (endpointDef().isReceiveTypeEndpointDefinition() && endpointDef().asReceiveTypeEndpointDefinition().getWithAdd()) {
// only check if received list is not null
return lastResult() + " == null";
}
if (endpointDef().isTypeEndpointDefinition() && type().isOptComponent()) {
// use "hasX()" instead of "getX() != null" for optionals
return "has" + typeName() + "()" + " && " + preemptiveExpectedValue() + ".equals(" + lastResult() + ")";
......@@ -80,12 +116,16 @@ aspect AttributesForMustache {
}
// --- MTokenEndpointDefinition ---
eq MTokenEndpointDefinition.getterMethod() = "get" + tokenName();
eq MTokenEndpointDefinition.parentTypeName() = token().containingTypeDecl().getName();
eq MTokenEndpointDefinition.entityName() = tokenName();
// --- MTypeEndpointDefinition ---
syn boolean MTypeEndpointDefinition.isWithAdd() = endpointDef().isReceiveTypeEndpointDefinition() ? endpointDef().asReceiveTypeEndpointDefinition().getWithAdd() : false;
syn boolean MTypeEndpointDefinition.isUseList() = endpointDef().asTypeEndpointDefinition().getUseList();
eq MTypeEndpointDefinition.getterMethod() = "get" + typeName() + (typeIsList() ? "List" : "");
eq MTypeEndpointDefinition.parentTypeName() = type().containingTypeDecl().getName();
eq MTypeEndpointDefinition.entityName() = typeName();
eq MTypeEndpointDefinition.entityName() = typeName() + (isUseList() ? "List" : "");
// --- MInnerMappingDefinition ---
inh boolean MInnerMappingDefinition.isLast();
......@@ -96,7 +136,7 @@ aspect AttributesForMustache {
syn String MInnerMappingDefinition.outputVarName() = "result" + methodName(); // we do not need "_" in between here, because methodName begins with one
// --- MTokenReceiveDefinition ---
eq MTokenReceiveDefinition.preemptiveExpectedValue() = "get" + tokenName() + "()";
eq MTokenReceiveDefinition.preemptiveExpectedValue() = getterMethod() + "()";
eq MTokenReceiveDefinition.preemptiveReturn() = "return;";
eq MTokenReceiveDefinition.endpointDef() = getReceiveTokenEndpointDefinition();
eq MTokenReceiveDefinition.firstInputVarName() = "message";
......@@ -107,34 +147,37 @@ aspect AttributesForMustache {
eq MTokenSendDefinition.preemptiveExpectedValue() = lastValue();
eq MTokenSendDefinition.preemptiveReturn() = "return false;";
eq MTokenSendDefinition.endpointDef() = getSendTokenEndpointDefinition();
eq MTokenSendDefinition.firstInputVarName() = "get" + tokenName() + "()";
eq MTokenSendDefinition.firstInputVarName() = getterMethod() + "()";
eq MTokenSendDefinition.updateMethod() = "_update_" + tokenName();
eq MTokenSendDefinition.writeMethod() = "_writeLastValue_" + tokenName();
syn String MTokenSendDefinition.sender() = "_sender_" + tokenName();
syn String MTokenSendDefinition.lastValue() = "_lastValue" + tokenName();
syn String MTokenSendDefinition.tokenResetMethod() = "get" + tokenName() + "_reset";
syn String MTokenSendDefinition.tokenResetMethod() = getterMethod() + "_reset";
syn boolean MTokenSendDefinition.shouldSendValue() = endpointDef().asTokenEndpointDefinition().shouldSendValue();
// MTypeReceiveDefinition
eq MTypeReceiveDefinition.preemptiveExpectedValue() = "get" + typeName() + "()";
eq MTypeReceiveDefinition.preemptiveExpectedValue() = getterMethod() + "()";
eq MTypeReceiveDefinition.preemptiveReturn() = "return;";
eq MTypeReceiveDefinition.endpointDef() = getReceiveTypeEndpointDefinition();
eq MTypeReceiveDefinition.firstInputVarName() = "message";
eq MTypeReceiveDefinition.updateMethod() = null;
eq MTypeReceiveDefinition.writeMethod() = null;
syn String MTypeReceiveDefinition.resolveInListAttributeName() = "resolve" + entityName() + "InList";
syn String MTypeReceiveDefinition.idTokenName() = endpointDef().idTokenName();
// MTypeSendDefinition
eq MTypeSendDefinition.preemptiveExpectedValue() = lastValue();
eq MTypeSendDefinition.preemptiveReturn() = "return false;";
eq MTypeSendDefinition.endpointDef() = getSendTypeEndpointDefinition();
eq MTypeSendDefinition.firstInputVarName() = "get" + typeName() + "()";
eq MTypeSendDefinition.firstInputVarName() = getterMethod() + "()";
eq MTypeSendDefinition.updateMethod() = "_update_" + typeName();
eq MTypeSendDefinition.writeMethod() = "_writeLastValue_" + typeName();
syn String MTypeSendDefinition.sender() = "_sender_" + typeName();
syn String MTypeSendDefinition.lastValue() = "_lastValue" + typeName();
syn String MTypeSendDefinition.tokenResetMethod() = "get" + typeName() + "_reset";
syn String MTypeSendDefinition.tokenResetMethod() = getterMethod() + "_reset";
syn boolean MTypeSendDefinition.shouldSendValue() = endpointDef().asTypeEndpointDefinition().shouldSendValue();
// --- MMappingDefinition ---
......@@ -313,7 +356,7 @@ aspect AspectGeneration {
}
}
aspect RelationGeneration {
aspect GrammarGeneration {
syn java.util.List<Relation> RagConnect.additionalRelations() {
java.util.List<Relation> result = new java.util.ArrayList<>();
for (DependencyDefinition dd : allDependencyDefinitionList()) {
......@@ -334,6 +377,37 @@ aspect RelationGeneration {
result.addComment(new WhitespaceComment("\n"));
return result;
}
// coll java.util.Map<TypeDecl, TokenComponent> RagConnect.additionalTokens() [new java.util.HashMap<>()] with put root RagConnect;
// TypeEndpointDefinition contributes getTokenToCreate()
// when typeIsList() && !getUseList()
// to RagConnect.additionalTokens()
//// for ragconnect()
// ;
syn java.util.Map<TypeDecl, TokenComponent> RagConnect.additionalTokens() {
java.util.Map<TypeDecl, TokenComponent> result = new java.util.HashMap<>();
for (EndpointDefinition def : allEndpointDefinitionList()) {
if (def.isTypeEndpointDefinition() && def.getTokenToCreate() != null) {
result.put(def.asTypeEndpointDefinition().getType().getTypeDecl(), def.getTokenToCreate());
}
}
return result;
}
syn TokenComponent EndpointDefinition.getTokenToCreate() = null;
eq TypeEndpointDefinition.getTokenToCreate() {
if (typeIsList() && !getUseList()) {
TokenComponent result = new TokenComponent();
result.setName(idTokenName());
result.setNTA(false);
result.setJavaTypeUse(new SimpleJavaTypeUse("String"));
return result;
} else {
return null;
}
}
}
aspect GrammarExtension {
......
aspect DefaultMappings {
private String RagConnect.baseDefaultMappingTypeNamePart(String typeName) {
return capitalize(typeName).replace("[]", "s");
return capitalize(typeName).replace("[]", "s").replace("<", "").replace(">", "List");
}
private MappingDefinitionType RagConnect.baseDefaultMappingTypeFromName(String typeName) {
......@@ -67,6 +67,28 @@ aspect DefaultMappings {
);
}
syn nta DefaultMappingDefinition RagConnect.defaultBytesToListTreeMapping(String typeName) {
return treeDefaultMappingDefinition("byte[]", JastAddList + "<" + typeName + ">",
"String content = new String(input);\n" +
"com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();\n" +
"com.fasterxml.jackson.core.JsonFactory factory = new com.fasterxml.jackson.core.JsonFactory();\n" +
"com.fasterxml.jackson.core.JsonParser parser = factory.createParser(content);\n" +
JastAddList + "<" + typeName + ">" + " result = " + typeName + ".deserializeList((com.fasterxml.jackson.databind.node.ArrayNode)mapper.readTree(parser));\n" +
"parser.close();\n" +
"return result;"
);
}
syn nta DefaultMappingDefinition RagConnect.defaultListTreeToBytesMapping() {
return treeDefaultMappingDefinition(JastAddList, "byte[]",
"java.io.ByteArrayOutputStream outputStream = new java.io.ByteArrayOutputStream();\n" +
"com.fasterxml.jackson.core.JsonFactory factory = new com.fasterxml.jackson.core.JsonFactory();\n" +
"com.fasterxml.jackson.core.JsonGenerator generator = factory.createGenerator(outputStream, com.fasterxml.jackson.core.JsonEncoding.UTF8);\n"+
"input.serialize(generator);\n" +
"generator.flush();\n" +
"return outputStream.toString().getBytes();"
);
}
syn nta DefaultMappingDefinition RagConnect.defaultBooleanToBytesMapping() = baseDefaultMappingDefinition(
"boolean", "byte[]", "return java.nio.ByteBuffer.allocate(1).put((byte) (input ? 1 : 0)).array();");
syn nta DefaultMappingDefinition RagConnect.defaultIntToBytesMapping() = baseDefaultMappingDefinition(
......@@ -187,13 +209,22 @@ aspect Mappings {
case "String": return ragconnect().defaultBytesToStringMapping();
default:
try {
TypeDecl typeDecl = program().resolveTypeDecl(targetTypeName());
return ragconnect().defaultBytesToTreeMapping(typeDecl.getName());
TypeDecl typeDecl = program().resolveTypeDecl(targetTypeName());
// TODO: also support list-types, if list is first type
return ragconnect().defaultBytesToTreeMapping(typeDecl.getName());
} catch (Exception ignore) {}
System.err.println("Could not find suitable default mapping for " + targetTypeName() + " on " + this);
return null;
}
}
eq TypeEndpointDefinition.suitableReceiveDefaultMapping() {
try {
TypeDecl typeDecl = program().resolveTypeDecl(targetTypeName());
return typeIsList() && getUseList() ? ragconnect().defaultBytesToListTreeMapping(typeDecl.getName()) : ragconnect().defaultBytesToTreeMapping(typeDecl.getName());
} catch (Exception ignore) {}
return super.suitableReceiveDefaultMapping();
}
// --- suitableSendDefaultMapping ---
syn DefaultMappingDefinition EndpointDefinition.suitableSendDefaultMapping() {
switch (targetTypeName()) {
......@@ -214,13 +245,21 @@ aspect Mappings {
case "String": return ragconnect().defaultStringToBytesMapping();
default:
try {
TypeDecl typeDecl = program().resolveTypeDecl(targetTypeName());
return ragconnect().defaultTreeToBytesMapping(typeDecl.getName());
TypeDecl typeDecl = program().resolveTypeDecl(targetTypeName());
// TODO: also support list-types, if list is last type
return ragconnect().defaultTreeToBytesMapping(typeDecl.getName());
} catch (Exception ignore) {}
System.err.println("Could not find suitable default mapping for " + targetTypeName() + " on " + this);
return null;
}
}
eq TypeEndpointDefinition.suitableSendDefaultMapping() {
try {
TypeDecl typeDecl = program().resolveTypeDecl(targetTypeName());
return typeIsList() && getUseList() ? ragconnect().defaultListTreeToBytesMapping() : ragconnect().defaultTreeToBytesMapping(typeDecl.getName());
} catch (Exception ignore) {}
return super.suitableSendDefaultMapping();
}
// --- targetTypeName ---
syn String EndpointDefinition.targetTypeName();
......@@ -320,7 +359,9 @@ aspect Mappings {
for (TypeDecl typeDecl : getProgram().typeDecls()) {
result.add(defaultBytesToTreeMapping(typeDecl.getName()));
result.add(defaultTreeToBytesMapping(typeDecl.getName()));
result.add(defaultBytesToListTreeMapping(typeDecl.getName()));
}
result.add(defaultListTreeToBytesMapping());
// // string conversion
// result.add(defaultStringToBooleanMapping());
// result.add(defaultStringToIntMapping());
......
......@@ -121,12 +121,14 @@ aspect MustacheNodesToYAML {
syn MappingElement MTypeReceiveDefinition.toYAML() {
MappingElement result = super.toYAML();
result.put("typeIsList", typeIsList());
result.put("loggingEnabledForReads", loggingEnabledForReads);
return result;
}
syn MappingElement MTypeSendDefinition.toYAML() {
MappingElement result = super.toYAML();
result.put("typeIsList", typeIsList());
result.put("sender", sender());
result.put("lastValue", lastValue());
result.put("loggingEnabledForWrites", loggingEnabledForWrites);
......
......@@ -46,7 +46,36 @@ EndpointDefinition endpoint_definition_type
= RECEIVE token_ref {: return new ReceiveTokenEndpointDefinition().setToken(token_ref); :}
| SEND token_ref {: return new SendTokenEndpointDefinition().setToken(token_ref); :}
| RECEIVE TREE type_ref {: return new ReceiveTypeEndpointDefinition().setType(type_ref); :}
| RECEIVE TREE WITH ADD type_ref
{:
ReceiveTypeEndpointDefinition result = new ReceiveTypeEndpointDefinition();
result.setType(type_ref);
result.setWithAdd(true);
return result;
:}
| SEND TREE type_ref {: return new SendTypeEndpointDefinition().setType(type_ref); :}
| RECEIVE LIST type_ref
{:
ReceiveTypeEndpointDefinition result = new ReceiveTypeEndpointDefinition();
result.setType(type_ref);
result.setUseList(true);
return result;
:}
| RECEIVE LIST WITH ADD type_ref
{:
ReceiveTypeEndpointDefinition result = new ReceiveTypeEndpointDefinition();
result.setType(type_ref);
result.setWithAdd(true);
result.setUseList(true);
return result;
:}
| SEND LIST type_ref
{:
SendTypeEndpointDefinition result = new SendTypeEndpointDefinition();
result.setType(type_ref);
result.setUseList(true);
return result;
:}
;
TokenComponent token_ref
......
......@@ -6,3 +6,6 @@
"to" { return sym(Terminals.TO); }
"as" { return sym(Terminals.AS); }
"tree" { return sym(Terminals.TREE); }
"list" { return sym(Terminals.LIST); }
"with" { return sym(Terminals.WITH); }
"add" { return sym(Terminals.ADD); }
......@@ -232,6 +232,7 @@ public class Compiler extends AbstractCompiler {
ragConnect.treeResolveAll();
ragConnect.additionalRelations().forEach(ragConnectGrammarPart::addDeclaration);
ragConnect.additionalTokens().forEach(TypeDecl::addComponent);
ASTNode.loggingEnabledForReads = optionLogReads.value();
ASTNode.loggingEnabledForWrites = optionLogWrites.value();
ASTNode.loggingEnabledForIncremental = optionLogIncremental.value();
......@@ -241,6 +242,9 @@ public class Compiler extends AbstractCompiler {
ASTNode.incrementalOptionActive = getConfiguration().incremental() && getConfiguration().traceFlush();
LOGGER.fine(() -> "ASTNode.incrementalOptionActive = " + ASTNode.incrementalOptionActive);
// reuse "--List" option of JastAdd
ASTNode.JastAddList = getConfiguration().listType();
ASTNode.usesMqtt = optionProtocols.hasValue(OPTION_PROTOCOL_MQTT);
ASTNode.usesRest = optionProtocols.hasValue(OPTION_PROTOCOL_REST);
return ragConnect;
......
{{#hasTreeListEndpoints}}
public void {{JastAddList}}.serialize(com.fasterxml.jackson.core.JsonGenerator g) throws SerializationException {
try {
g.writeStartArray();
for (T child : this) {
child.serialize(g);
}
g.writeEndArray();
} catch (java.io.IOException e) {
throw new SerializationException("unable to serialize {{JastAddList}}", e);
}
}
{{#typesForReceivingListEndpoints}}
public static {{JastAddList}}<{{Name}}> {{Name}}.deserializeList(com.fasterxml.jackson.databind.node.ArrayNode node) throws DeserializationException {
{{JastAddList}}<{{Name}}> result = new {{JastAddList}}<>();
for (java.util.Iterator<com.fasterxml.jackson.databind.JsonNode> it = node.elements(); it.hasNext();) {
com.fasterxml.jackson.databind.JsonNode element = it.next();
result.add(deserialize(element));
}
return result;
}
{{/typesForReceivingListEndpoints}}
{{/hasTreeListEndpoints}}
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
aspect MqttHandler {
import java.util.function.BiConsumer;aspect MqttHandler {
public class MqttServerHandler {
private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, Object> tokensForRemoval = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, java.util.function.BiConsumer<String, byte[]>> tokensForRemoval = new java.util.HashMap<>();
private long time;
private java.util.concurrent.TimeUnit unit;
private String name;
......@@ -16,7 +15,7 @@ public class MqttServerHandler {
public MqttServerHandler(String name) {
this.name = name;
setupWaitUntilReady(1, TimeUnit.SECONDS);
setupWaitUntilReady(1, java.util.concurrent.TimeUnit.SECONDS);
}
public void setupWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) {
......@@ -24,7 +23,7 @@ public class MqttServerHandler {
this.unit = unit;
}
public MqttHandler resolveHandler(java.net.URI uri) throws IOException {
public MqttHandler resolveHandler(java.net.URI uri) throws java.io.IOException {
MqttHandler handler = handlers.get(uri.getHost());
if (handler == null) {
// first connect to that server
......@@ -40,33 +39,37 @@ public class MqttServerHandler {
return handler;
}
public ConnectToken newConnection(java.net.URI uri, java.util.function.Consumer<byte[]> callback) throws IOException {
public ConnectToken newConnection(java.net.URI uri, java.util.function.BiConsumer<String, byte[]> callback) throws java.io.IOException {
ConnectToken connectToken = new ConnectToken(uri);
resolveHandler(uri).newConnection(extractTopic(uri), callback);
tokensForRemoval.put(connectToken, callback);
return connectToken;
}
public boolean disconnect(ConnectToken connectToken) throws IOException {
public boolean disconnect(ConnectToken connectToken) throws java.io.IOException {
MqttHandler handler = resolveHandler(connectToken.uri);
return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false;
}
public void publish(java.net.URI uri, byte[] bytes) throws IOException {
public void publish(java.net.URI uri, byte[] bytes) throws java.io.IOException {
resolveHandler(uri).publish(extractTopic(uri), bytes);
}
public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws IOException {
public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws java.io.IOException {
resolveHandler(uri).publish(extractTopic(uri), bytes, retain);
}
public void publish(java.net.URI uri, byte[] bytes,
org.fusesource.mqtt.client.QoS qos, boolean retain) throws IOException {
org.fusesource.mqtt.client.QoS qos, boolean retain) throws java.io.IOException {
resolveHandler(uri).publish(extractTopic(uri), bytes, qos, retain);
}
public static String extractTopic(java.net.URI uri) {
String path = uri.getPath();
if (uri.getFragment() != null) {
// do not also append fragment, as it is illegal, that anything follows "#" in a mqtt topic anyway
path += "#";
}
if (path.charAt(0) == '/') {
path = path.substring(1);
}
......@@ -100,7 +103,8 @@ public class MqttHandler {
private boolean sendWelcomeMessage = true;
private org.fusesource.mqtt.client.QoS qos;
/** Dispatch knowledge */
private final java.util.Map<String, java.util.List<java.util.function.Consumer<byte[]>>> callbacks;
private final java.util.Map<String, java.util.List<java.util.function.BiConsumer<String, byte[]>>> normalCallbacks;
private final java.util.Map<java.util.regex.Pattern, java.util.List<java.util.function.BiConsumer<String, byte[]>>> wildcardCallbacks;
public MqttHandler() {
this("RagConnect");
......@@ -109,7 +113,8 @@ public class MqttHandler {
public MqttHandler(String name) {
this.name = java.util.Objects.requireNonNull(name, "Name must be set");
this.logger = org.apache.logging.log4j.LogManager.getLogger(MqttHandler.class);
this.callbacks = new java.util.HashMap<>();
this.normalCallbacks = new java.util.HashMap<>();
this.wildcardCallbacks = new java.util.HashMap<>();
this.readyLatch = new java.util.concurrent.CountDownLatch(1);
this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE;
}
......@@ -122,21 +127,21 @@ public class MqttHandler {
/**
* Sets the host to receive messages from, and connects to it.
* @param host name of the host to connect to, format is either <code>"$name"</code> or <code>"$name:$port"</code>
* @throws IOException if could not connect, or could not subscribe to a topic
* @throws java.io.IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttHandler setHost(String host) throws java.io.IOException {
if (host.contains(":")) {
int colon_index = host.indexOf(":");
return setHost(host.substring(0, colon_index),
Integer.parseInt(host.substring(colon_index + 1)));
Integer.parseInt(host.substring(colon_index + 1)));
}
return setHost(host, DEFAULT_PORT);
}
/**
* Sets the host to receive messages from, and connects to it.
* @throws IOException if could not connect, or could not subscribe to a topic
* @throws java.io.IOException if could not connect, or could not subscribe to a topic
* @return self
*/
public MqttHandler setHost(String host, int port) throws java.io.IOException {
......@@ -167,14 +172,14 @@ public class MqttHandler {
org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) {
// this method is called, whenever a MQTT message is received
String topicString = topic.toString();
java.util.List<java.util.function.Consumer<byte[]>> callbackList = new java.util.ArrayList<>(callbacks.get(topicString));
if (callbackList == null || callbackList.isEmpty()) {
java.util.List<java.util.function.BiConsumer<String, byte[]>> callbackList = callbacksFor(topicString);
if (callbackList.isEmpty()) {
logger.debug("Got a message at {}, but no callback to call. Forgot to subscribe?", topic);
} else {
byte[] message = body.toByteArray();
for (java.util.function.Consumer<byte[]> callback : callbackList) {
for (java.util.function.BiConsumer<String, byte[]> callback : callbackList) {
try {
callback.accept(message);
callback.accept(topicString, message);
} catch (Exception e) {
logger.catching(e);
}
......@@ -199,20 +204,20 @@ public class MqttHandler {
throwIf(error);
// actually establish the connection
connection.connect(new org.fusesource.mqtt.client.Callback<Void>() {
connection.connect(new org.fusesource.mqtt.client.Callback<>() {
@Override
public void onSuccess(Void value) {
if (MqttHandler.this.sendWelcomeMessage) {
connection.publish("components",
(name + " is connected").getBytes(),
org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE,
false,
new org.fusesource.mqtt.client.Callback<Void>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
setReady();
}
(name + " is connected").getBytes(),
org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE,
false,
new org.fusesource.mqtt.client.Callback<>() {
@Override
public void onSuccess(Void value) {
logger.debug("success sending welcome message");
setReady();
}
@Override
public void onFailure(Throwable value) {
......@@ -233,6 +238,20 @@ public class MqttHandler {
return this;
}