Commit 8b5fe01e authored by Sebastian Ebert's avatar Sebastian Ebert
Browse files

extended impl. for disconnects, fixed bugs related to datatransfer and config and code-gen

parent 65f59ce4
...@@ -31,6 +31,8 @@ dependencies { ...@@ -31,6 +31,8 @@ dependencies {
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4j_version}" implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4j_version}"
implementation group: 'org.apache.logging.log4j', name: 'log4j-jul', version: "${log4j_version}" implementation group: 'org.apache.logging.log4j', name: 'log4j-jul', version: "${log4j_version}"
implementation group: 'com.github.spullara.mustache.java', name: 'compiler', version: "${mustache_java_version}" implementation group: 'com.github.spullara.mustache.java', name: 'compiler', version: "${mustache_java_version}"
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
runtimeOnly group: 'org.jastadd', name: 'jastadd', version: '2.3.4' runtimeOnly group: 'org.jastadd', name: 'jastadd', version: '2.3.4'
api group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11' api group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11'
} }
......
...@@ -61,6 +61,18 @@ public class Compiler extends AbstractCompiler { ...@@ -61,6 +61,18 @@ public class Compiler extends AbstractCompiler {
RagConnect ragConnect = parseProgram(getConfiguration().getFiles()); RagConnect ragConnect = parseProgram(getConfiguration().getFiles());
// lets inspect ragconnects tree
/*
for(EndpointDefinition ed : ragConnect.getEndpointDefinitionList()){
if(ed.isReceiveTokenEndpointDefinition()){
for(MappingDefinition md : ed.asReceiveTokenEndpointDefinition().effectiveMappings()){
// if (!effectiveMappings().get(effectiveMappings().size() - 1).getToType().assignableTo(
// getToken().effectiveJavaTypeUse())) {
System.out.println("gtt: " + md.getToType());
}
}
}
*/
if (!ragConnect.errors().isEmpty()) { if (!ragConnect.errors().isEmpty()) {
System.err.println("Errors:"); System.err.println("Errors:");
for (ErrorMessage e : ragConnect.errors()) { for (ErrorMessage e : ragConnect.errors()) {
...@@ -243,7 +255,7 @@ public class Compiler extends AbstractCompiler { ...@@ -243,7 +255,7 @@ public class Compiler extends AbstractCompiler {
grammarFile.dumpTree(System.out); grammarFile.dumpTree(System.out);
} }
program.addGrammarFile(grammarFile); program.addGrammarFile(grammarFile);
grammarFile.treeResolveAll(); //grammarFile.treeResolveAll();
grammarFile.setFileName(toBaseName(filename)); grammarFile.setFileName(toBaseName(filename));
} catch (IOException | Parser.Exception e) { } catch (IOException | Parser.Exception e) {
throw new CompilerException("Could not parse grammar file " + filename, e); throw new CompilerException("Could not parse grammar file " + filename, e);
......
...@@ -16,7 +16,7 @@ aspect JavaHandler { ...@@ -16,7 +16,7 @@ aspect JavaHandler {
private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(JavaHandler.class); private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(JavaHandler.class);
private Map<String, List<Consumer<byte[]>>> callbackList = new ConcurrentHashMap<>(); private Map<String, List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>>> callbackList = new ConcurrentHashMap<>();
private JavaHandler() { private JavaHandler() {
...@@ -29,21 +29,44 @@ aspect JavaHandler { ...@@ -29,21 +29,44 @@ aspect JavaHandler {
return JAVA_HANDLER_INSTANCE; return JAVA_HANDLER_INSTANCE;
} }
public boolean registerCallback(String topic, Consumer<byte[]> callback) { public String registerCallback(String topic, Consumer<byte[]> callback) {
logger.debug("[JAVA_HANDLER] Registering new callback."); logger.debug("[JAVA_HANDLER] Registering new callback.");
List<Consumer<byte[]>> registeredCallbacks = getAllCallbacks().get(topic); String callbackUUID = java.util.UUID.randomUUID().toString();
List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> registeredCallbacks = getAllCallbacks().get(topic);
if(registeredCallbacks == null){ if(registeredCallbacks == null){
List<Consumer<byte[]>> newCallbackList = Collections.synchronizedList(new ArrayList<>()); List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> newCallbackList = Collections.synchronizedList(new ArrayList<>());
newCallbackList.add(callback); newCallbackList.add(new org.apache.commons.lang3.tuple.MutablePair<>(callbackUUID, callback));
callbackList.put(topic, newCallbackList); callbackList.put(topic, newCallbackList);
} else { } else {
registeredCallbacks.add(callback); registeredCallbacks.add(new org.apache.commons.lang3.tuple.MutablePair<>(callbackUUID, callback));
}
return callbackUUID;
} }
public boolean unregisterCallback(String path, String uuid){
logger.debug("[JAVA_HANDLER] Unregistering callback with uuid: " + uuid + " on path: " + path);
List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> callbacks = getAllCallbacks().get(path);
int count = 0;
if(callbacks != null){
for(org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>> callbackPair : callbacks){
if(callbackPair.getLeft().equals(uuid)){
callbacks.remove(count);
return true; return true;
}else{
count++;
}
}
}
return false;
} }
public void close(){}; public void close(){};
...@@ -52,22 +75,22 @@ aspect JavaHandler { ...@@ -52,22 +75,22 @@ aspect JavaHandler {
logger.debug("[JAVA_HANDLER] Pushing a message."); logger.debug("[JAVA_HANDLER] Pushing a message.");
List<Consumer<byte[]>> callbacks = getAllCallbacks().get(topic); List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> callbacks = getAllCallbacks().get(topic);
if(callbacks == null){ if(callbacks == null){
logger.error("[JAVA_HANDLER] Could not publish message. No callback registered for topic " + topic); logger.error("[JAVA_HANDLER] Could not publish message. No callback registered for topic " + topic);
return false; return false;
} }
for(Consumer<byte[]> callback : callbacks){ for(org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>> callbackPair : callbacks){
logger.debug("[JAVA_HANDLER] Calling callback: " + callback.toString()); logger.debug("[JAVA_HANDLER] Calling callback: " + callbackPair.getLeft());
callback.accept(data); callbackPair.getRight().accept(data);
} }
return true; return true;
} }
public Map<String, List<Consumer<byte[]>>> getAllCallbacks() { public Map<String, List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>>> getAllCallbacks() {
return callbackList; return callbackList;
} }
} }
......
aspect RagConnectHandler { aspect RagConnectHandler {
{{#Handlers}} {{#Handlers}}
{{#InUse}} {{#InUse}}
...@@ -15,14 +16,19 @@ aspect RagConnectHandler { ...@@ -15,14 +16,19 @@ aspect RagConnectHandler {
{{/Handlers}} {{/Handlers}}
} }
class ConnectToken { class ConnectToken {
static java.util.concurrent.atomic.AtomicLong counter = new java.util.concurrent.atomic.AtomicLong(0); //static java.util.concurrent.atomic.AtomicLong counter = new java.util.concurrent.atomic.AtomicLong(0);
final long id; final String id;
final java.net.URI uri; final java.net.URI uri;
public ConnectToken(java.net.URI uri) { public ConnectToken(java.net.URI uri) {
this.id = counter.incrementAndGet(); this.id = java.util.UUID.randomUUID().toString(); // counter.incrementAndGet();
this.uri = uri; this.uri = uri;
} }
public ConnectToken(java.net.URI uri, String uuid) {
this.id = uuid; // counter.incrementAndGet();
this.uri = uri;
}
} }
static java.util.Map<ASTNode, java.util.Map<java.net.URI, ConnectToken>> ASTNode.connectTokens = new java.util.HashMap<>(); static java.util.Map<ASTNode, java.util.Map<java.net.URI, ConnectToken>> ASTNode.connectTokens = new java.util.HashMap<>();
} }
...@@ -18,7 +18,13 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam ...@@ -18,7 +18,13 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
break; break;
{{/usesMqtt}} {{/usesMqtt}}
{{#usesJava}} {{#usesJava}}
case "java": return {{javaHandlerAttribute}}().registerCallback(path, consumer); case "java":
String uuid = {{javaHandlerAttribute}}().registerCallback(path, consumer);
connectToken = new ConnectToken(uri, uuid);
if (uuid == null) {
return false;
}
break;
{{/usesJava}} {{/usesJava}}
{{#usesRest}} {{#usesRest}}
case "rest": case "rest":
...@@ -45,6 +51,9 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter ...@@ -45,6 +51,9 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
{{#usesMqtt}} {{#usesMqtt}}
case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{/usesMqtt}} {{/usesMqtt}}
{{#usesJava}}
case "java": return {{javaHandlerAttribute}}().unregisterCallback(uri.getPath(), connectTokens.get(this).get(uri).id);
{{/usesJava}}
{{#usesRest}} {{#usesRest}}
case "rest": return {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); case "rest": return {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{/usesRest}} {{/usesRest}}
......
...@@ -62,6 +62,11 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter ...@@ -62,6 +62,11 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
{{lastValue}} = null; {{lastValue}} = null;
break; break;
{{/usesMqtt}} {{/usesMqtt}}
{{#usesJava}}
case "java":
// nothing todo, because senders are not registered (just callbacks)
break;
{{/usesJava}}
{{#usesRest}} {{#usesRest}}
case "rest": case "rest":
{{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment