import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; aspect JavaHandler { /** * Singleton class providing routing functionality for byte[] based message calls. */ public class JavaHandler { public static JavaHandler JAVA_HANDLER_INSTANCE = null; private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(JavaHandler.class); private Map<String, List<Consumer<byte[]>>> callbackList = new ConcurrentHashMap<>(); private JavaHandler() { } public synchronized static JavaHandler getInstance() { if(JAVA_HANDLER_INSTANCE == null) { JAVA_HANDLER_INSTANCE = new JavaHandler(); } return JAVA_HANDLER_INSTANCE; } public boolean registerCallback(String topic, Consumer<byte[]> callback) { logger.debug("[JAVA_HANDLER] Registering new callback."); List<Consumer<byte[]>> registeredCallbacks = getAllCallbacks().get(topic); if(registeredCallbacks == null){ List<Consumer<byte[]>> newCallbackList = Collections.synchronizedList(new ArrayList<>()); newCallbackList.add(callback); callbackList.put(topic, newCallbackList); } else { registeredCallbacks.add(callback); } return true; } public void close(){}; public boolean push(String topic, byte[] data) { logger.debug("[JAVA_HANDLER] Pushing a message."); List<Consumer<byte[]>> callbacks = getAllCallbacks().get(topic); if(callbacks == null){ logger.error("[JAVA_HANDLER] Could not publish message. No callback registered for topic " + topic); return false; } for(Consumer<byte[]> callback : callbacks){ logger.debug("[JAVA_HANDLER] Calling callback: " + callback.toString()); callback.accept(data); } return true; } public Map<String, List<Consumer<byte[]>>> getAllCallbacks() { return callbackList; } } }