JavaHandler.jadd 3.51 KB
Newer Older
Sebastian Ebert's avatar
Sebastian Ebert committed
1
2
3
4
5
6
7
8
9
10
11
12
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

aspect JavaHandler {

    /**
     * Singleton class providing routing functionality for byte[] based message calls.
     */
Sebastian Ebert's avatar
Sebastian Ebert committed
13
    public class JavaHandler {
Sebastian Ebert's avatar
Sebastian Ebert committed
14

Sebastian Ebert's avatar
Sebastian Ebert committed
15
        public static JavaHandler JAVA_HANDLER_INSTANCE = null;
Sebastian Ebert's avatar
Sebastian Ebert committed
16

17
        private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(JavaHandler.class);
Sebastian Ebert's avatar
Sebastian Ebert committed
18

19
        private Map<String, List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>>> callbackList = new ConcurrentHashMap<>();
Sebastian Ebert's avatar
Sebastian Ebert committed
20

Sebastian Ebert's avatar
Sebastian Ebert committed
21
        private JavaHandler() {
Sebastian Ebert's avatar
Sebastian Ebert committed
22
23
24

        }

Sebastian Ebert's avatar
Sebastian Ebert committed
25
26
27
        public synchronized static JavaHandler getInstance() {
            if(JAVA_HANDLER_INSTANCE == null) {
                JAVA_HANDLER_INSTANCE = new JavaHandler();
Sebastian Ebert's avatar
Sebastian Ebert committed
28
            }
Sebastian Ebert's avatar
Sebastian Ebert committed
29
            return JAVA_HANDLER_INSTANCE;
Sebastian Ebert's avatar
Sebastian Ebert committed
30
31
        }

32
        public String registerCallback(String topic, Consumer<byte[]> callback) {
Sebastian Ebert's avatar
Sebastian Ebert committed
33

Sebastian Ebert's avatar
Sebastian Ebert committed
34
            logger.debug("[JAVA_HANDLER] Registering new callback.");
Sebastian Ebert's avatar
Sebastian Ebert committed
35

36
37
38
            String callbackUUID = java.util.UUID.randomUUID().toString();

            List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> registeredCallbacks = getAllCallbacks().get(topic);
Sebastian Ebert's avatar
Sebastian Ebert committed
39
40

            if(registeredCallbacks == null){
41
42
                List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> newCallbackList = Collections.synchronizedList(new ArrayList<>());
                newCallbackList.add(new org.apache.commons.lang3.tuple.MutablePair<>(callbackUUID, callback));
Sebastian Ebert's avatar
Sebastian Ebert committed
43
44
                callbackList.put(topic, newCallbackList);
            } else {
45
                registeredCallbacks.add(new org.apache.commons.lang3.tuple.MutablePair<>(callbackUUID, callback));
Sebastian Ebert's avatar
Sebastian Ebert committed
46
            }
Sebastian Ebert's avatar
Sebastian Ebert committed
47

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
            return callbackUUID;
        }

        public boolean unregisterCallback(String path, String uuid){

            logger.debug("[JAVA_HANDLER] Unregistering callback with uuid: " + uuid + " on path: " + path);

            List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> callbacks = getAllCallbacks().get(path);

            int count = 0;

            if(callbacks != null){
                for(org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>> callbackPair : callbacks){
                    if(callbackPair.getLeft().equals(uuid)){
                        callbacks.remove(count);
                        return true;
                    }else{
                        count++;
                    }
                }
            }
            return false;
Sebastian Ebert's avatar
Sebastian Ebert committed
70
71
        }

Sebastian Ebert's avatar
Sebastian Ebert committed
72
73
        public void close(){};

Sebastian Ebert's avatar
Sebastian Ebert committed
74
75
        public boolean push(String topic, byte[] data) {

Sebastian Ebert's avatar
Sebastian Ebert committed
76
            logger.debug("[JAVA_HANDLER] Pushing a message.");
Sebastian Ebert's avatar
Sebastian Ebert committed
77

78
            List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> callbacks = getAllCallbacks().get(topic);
Sebastian Ebert's avatar
Sebastian Ebert committed
79
80

            if(callbacks == null){
Sebastian Ebert's avatar
Sebastian Ebert committed
81
                logger.error("[JAVA_HANDLER] Could not publish message. No callback registered for topic " + topic);
Sebastian Ebert's avatar
Sebastian Ebert committed
82
83
84
                return false;
            }

85
86
87
            for(org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>> callbackPair : callbacks){
                logger.debug("[JAVA_HANDLER] Calling callback: " + callbackPair.getLeft());
                callbackPair.getRight().accept(data);
Sebastian Ebert's avatar
Sebastian Ebert committed
88
89
90
91
92
            }

            return true;
        }

93
        public Map<String, List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>>> getAllCallbacks() {
Sebastian Ebert's avatar
Sebastian Ebert committed
94
95
96
97
            return callbackList;
        }
    }
}