From 8b5fe01eae242bf607621b37733cc7b12b625050 Mon Sep 17 00:00:00 2001
From: SebastianEbert <sebastian.ebert@tu-dresden.de>
Date: Fri, 5 Feb 2021 13:54:43 +0100
Subject: [PATCH] extended impl. for disconnects, fixed bugs related to
 datatransfer and config and code-gen

---
 ragconnect.base/build.gradle                  |  2 +
 .../jastadd/ragconnect/compiler/Compiler.java | 14 +++++-
 .../src/main/resources/JavaHandler.jadd       | 47 ++++++++++++++-----
 .../src/main/resources/handler.mustache       | 12 +++--
 .../main/resources/receiveDefinition.mustache | 11 ++++-
 .../main/resources/sendDefinition.mustache    |  5 ++
 6 files changed, 74 insertions(+), 17 deletions(-)

diff --git a/ragconnect.base/build.gradle b/ragconnect.base/build.gradle
index b3e89a6..2b8fe2a 100644
--- a/ragconnect.base/build.gradle
+++ b/ragconnect.base/build.gradle
@@ -31,6 +31,8 @@ dependencies {
     implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4j_version}"
     implementation group: 'org.apache.logging.log4j', name: 'log4j-jul', version: "${log4j_version}"
     implementation group: 'com.github.spullara.mustache.java', name: 'compiler', version: "${mustache_java_version}"
+    // https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
+    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
     runtimeOnly group: 'org.jastadd', name: 'jastadd', version: '2.3.4'
     api group: 'net.sf.beaver', name: 'beaver-rt', version: '0.9.11'
 }
diff --git a/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java b/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java
index 8c18f32..6f8cf33 100644
--- a/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java
+++ b/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java
@@ -61,6 +61,18 @@ public class Compiler extends AbstractCompiler {
 
     RagConnect ragConnect = parseProgram(getConfiguration().getFiles());
 
+    // lets inspect ragconnects tree
+    /*
+    for(EndpointDefinition ed : ragConnect.getEndpointDefinitionList()){
+      if(ed.isReceiveTokenEndpointDefinition()){
+        for(MappingDefinition md : ed.asReceiveTokenEndpointDefinition().effectiveMappings()){
+          //     if (!effectiveMappings().get(effectiveMappings().size() - 1).getToType().assignableTo(
+          //              getToken().effectiveJavaTypeUse())) {
+          System.out.println("gtt: " + md.getToType());
+        }
+      }
+    }
+    */
     if (!ragConnect.errors().isEmpty()) {
       System.err.println("Errors:");
       for (ErrorMessage e : ragConnect.errors()) {
@@ -243,7 +255,7 @@ public class Compiler extends AbstractCompiler {
         grammarFile.dumpTree(System.out);
       }
       program.addGrammarFile(grammarFile);
-      grammarFile.treeResolveAll();
+      //grammarFile.treeResolveAll();
       grammarFile.setFileName(toBaseName(filename));
     } catch (IOException | Parser.Exception e) {
       throw new CompilerException("Could not parse grammar file " + filename, e);
diff --git a/ragconnect.base/src/main/resources/JavaHandler.jadd b/ragconnect.base/src/main/resources/JavaHandler.jadd
index 60f43b8..9ac0c65 100644
--- a/ragconnect.base/src/main/resources/JavaHandler.jadd
+++ b/ragconnect.base/src/main/resources/JavaHandler.jadd
@@ -16,7 +16,7 @@ aspect JavaHandler {
 
         private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(JavaHandler.class);
 
-        private Map<String, List<Consumer<byte[]>>> callbackList = new ConcurrentHashMap<>();
+        private Map<String, List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>>> callbackList = new ConcurrentHashMap<>();
 
         private JavaHandler() {
 
@@ -29,21 +29,44 @@ aspect JavaHandler {
             return JAVA_HANDLER_INSTANCE;
         }
 
-        public boolean registerCallback(String topic, Consumer<byte[]> callback) {
+        public String registerCallback(String topic, Consumer<byte[]> callback) {
 
             logger.debug("[JAVA_HANDLER] Registering new callback.");
 
-            List<Consumer<byte[]>> registeredCallbacks = getAllCallbacks().get(topic);
+            String callbackUUID = java.util.UUID.randomUUID().toString();
+
+            List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> registeredCallbacks = getAllCallbacks().get(topic);
 
             if(registeredCallbacks == null){
-                List<Consumer<byte[]>> newCallbackList = Collections.synchronizedList(new ArrayList<>());
-                newCallbackList.add(callback);
+                List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> newCallbackList = Collections.synchronizedList(new ArrayList<>());
+                newCallbackList.add(new org.apache.commons.lang3.tuple.MutablePair<>(callbackUUID, callback));
                 callbackList.put(topic, newCallbackList);
             } else {
-                registeredCallbacks.add(callback);
+                registeredCallbacks.add(new org.apache.commons.lang3.tuple.MutablePair<>(callbackUUID, callback));
             }
 
-            return true;
+            return callbackUUID;
+        }
+
+        public boolean unregisterCallback(String path, String uuid){
+
+            logger.debug("[JAVA_HANDLER] Unregistering callback with uuid: " + uuid + " on path: " + path);
+
+            List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> callbacks = getAllCallbacks().get(path);
+
+            int count = 0;
+
+            if(callbacks != null){
+                for(org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>> callbackPair : callbacks){
+                    if(callbackPair.getLeft().equals(uuid)){
+                        callbacks.remove(count);
+                        return true;
+                    }else{
+                        count++;
+                    }
+                }
+            }
+            return false;
         }
 
         public void close(){};
@@ -52,22 +75,22 @@ aspect JavaHandler {
 
             logger.debug("[JAVA_HANDLER] Pushing a message.");
 
-            List<Consumer<byte[]>> callbacks = getAllCallbacks().get(topic);
+            List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> callbacks = getAllCallbacks().get(topic);
 
             if(callbacks == null){
                 logger.error("[JAVA_HANDLER] Could not publish message. No callback registered for topic " + topic);
                 return false;
             }
 
-            for(Consumer<byte[]> callback : callbacks){
-                logger.debug("[JAVA_HANDLER] Calling callback: " + callback.toString());
-                callback.accept(data);
+            for(org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>> callbackPair : callbacks){
+                logger.debug("[JAVA_HANDLER] Calling callback: " + callbackPair.getLeft());
+                callbackPair.getRight().accept(data);
             }
 
             return true;
         }
 
-        public Map<String, List<Consumer<byte[]>>> getAllCallbacks() {
+        public Map<String, List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>>> getAllCallbacks() {
             return callbackList;
         }
     }
diff --git a/ragconnect.base/src/main/resources/handler.mustache b/ragconnect.base/src/main/resources/handler.mustache
index 41e4238..896f6ac 100644
--- a/ragconnect.base/src/main/resources/handler.mustache
+++ b/ragconnect.base/src/main/resources/handler.mustache
@@ -1,3 +1,4 @@
+
 aspect RagConnectHandler {
 {{#Handlers}}
   {{#InUse}}
@@ -15,14 +16,19 @@ aspect RagConnectHandler {
     {{/Handlers}}
   }
   class ConnectToken {
-    static java.util.concurrent.atomic.AtomicLong counter = new java.util.concurrent.atomic.AtomicLong(0);
-    final long id;
+    //static java.util.concurrent.atomic.AtomicLong counter = new java.util.concurrent.atomic.AtomicLong(0);
+    final String id;
     final java.net.URI uri;
+
     public ConnectToken(java.net.URI uri) {
-      this.id = counter.incrementAndGet();
+      this.id = java.util.UUID.randomUUID().toString(); // counter.incrementAndGet();
       this.uri = uri;
     }
 
+    public ConnectToken(java.net.URI uri, String uuid) {
+        this.id = uuid; // counter.incrementAndGet();
+        this.uri = uri;
+    }
   }
   static java.util.Map<ASTNode, java.util.Map<java.net.URI, ConnectToken>> ASTNode.connectTokens = new java.util.HashMap<>();
 }
diff --git a/ragconnect.base/src/main/resources/receiveDefinition.mustache b/ragconnect.base/src/main/resources/receiveDefinition.mustache
index 5294f89..6acddb2 100644
--- a/ragconnect.base/src/main/resources/receiveDefinition.mustache
+++ b/ragconnect.base/src/main/resources/receiveDefinition.mustache
@@ -18,7 +18,13 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
       break;
   {{/usesMqtt}}
   {{#usesJava}}
-    case "java": return {{javaHandlerAttribute}}().registerCallback(path, consumer);
+    case "java":
+        String uuid = {{javaHandlerAttribute}}().registerCallback(path, consumer);
+        connectToken = new ConnectToken(uri, uuid);
+        if (uuid == null) {
+            return false;
+        }
+        break;
   {{/usesJava}}
   {{#usesRest}}
     case "rest":
@@ -45,6 +51,9 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
   {{#usesMqtt}}
     case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
   {{/usesMqtt}}
+  {{#usesJava}}
+      case "java": return {{javaHandlerAttribute}}().unregisterCallback(uri.getPath(), connectTokens.get(this).get(uri).id);
+  {{/usesJava}}
   {{#usesRest}}
     case "rest": return {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
   {{/usesRest}}
diff --git a/ragconnect.base/src/main/resources/sendDefinition.mustache b/ragconnect.base/src/main/resources/sendDefinition.mustache
index e564ef8..5ddec00 100644
--- a/ragconnect.base/src/main/resources/sendDefinition.mustache
+++ b/ragconnect.base/src/main/resources/sendDefinition.mustache
@@ -62,6 +62,11 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
       {{lastValue}} = null;
       break;
   {{/usesMqtt}}
+  {{#usesJava}}
+      case "java":
+      // nothing todo, because senders are not registered (just callbacks)
+      break;
+  {{/usesJava}}
   {{#usesRest}}
     case "rest":
       {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
-- 
GitLab