diff --git a/cloc/run-cloc.sh b/cloc/run-cloc.sh index d17af8bd03a8286e41278e622d7c5b4cbc8d3cda..8cacc233e148fcb4821ce4af7f1e4481db059bcd 100755 --- a/cloc/run-cloc.sh +++ b/cloc/run-cloc.sh @@ -24,10 +24,13 @@ $CLOC_CMD --report-file=common-01-input-result.txt ../ros3rag.common/src/main/re $CLOC_CMD --report-file=placeA-01-input-result.txt ../ros3rag.placeA/src/main/ja* 2>>cloc-errors.log $CLOC_CMD --report-file=placeB-01-input-result.txt --categorized=cats.txt --ignored=ignored.txt ../ros3rag.placeB/src/main/ja* 2>>cloc-errors.log +$CLOC_CMD --report-file=static-02-ragconnect-result.txt static-Ragconnect.jadd 2>>cloc-errors.log +$CLOC_CMD --report-file=placeA-02-ragconnect-result.txt ../ros3rag.placeA/src/gen/jastadd/*.relast ../ros3rag.placeA/src/gen/jastadd/RagConnect.jadd 2>>cloc-errors.log +$CLOC_CMD --report-file=placeB-02-ragconnect-result.txt ../ros3rag.placeB/src/gen/jastadd/*.relast ../ros3rag.placeB/src/gen/jastadd/RagConnect.jadd 2>>cloc-errors.log # $CLOC_CMD --report-file=base-gen-result.txt ../ros3rag.base/src/gen 2>>cloc-errors.log -$CLOC_CMD --report-file=placeA-02-gen-result.txt ../ros3rag.placeA/src/gen 2>>cloc-errors.log -$CLOC_CMD --report-file=placeB-02-gen-result.txt ../ros3rag.placeB/src/gen 2>>cloc-errors.log -$CLOC_CMD --report-file=common-02-gen-result.txt ../ros3rag.common/build/generated/source/proto/main/java/de 2>>cloc-errors.log +$CLOC_CMD --report-file=placeA-03-gen-result.txt ../ros3rag.placeA/src/gen 2>>cloc-errors.log +$CLOC_CMD --report-file=placeB-03-gen-result.txt ../ros3rag.placeB/src/gen 2>>cloc-errors.log +$CLOC_CMD --report-file=common-03-gen-result.txt ../ros3rag.common/build/generated/source/proto/main/java/de 2>>cloc-errors.log # CFC_CMD='grep -o 'if'\|'for'\|'return'' diff --git a/cloc/static-Ragconnect.jadd b/cloc/static-Ragconnect.jadd new file mode 100644 index 0000000000000000000000000000000000000000..9de552078356aba1da73eec4673110d4ecefb672 --- /dev/null +++ b/cloc/static-Ragconnect.jadd @@ -0,0 +1,948 @@ +aspect RagConnectHandler { + private MqttServerHandler WorldModelA._ragconnect_mqttHandler = new MqttServerHandler("Handler for WorldModelA." + this.hashCode()); + inh MqttServerHandler ASTNode._ragconnect_mqttHandler(); + eq WorldModelA.getRegion()._ragconnect_mqttHandler() = _ragconnect_mqttHandler; + eq WorldModelA.getScene()._ragconnect_mqttHandler() = _ragconnect_mqttHandler; + eq WorldModelA.getLogicalScene()._ragconnect_mqttHandler() = _ragconnect_mqttHandler; + syn MqttServerHandler WorldModelA._ragconnect_mqttHandler() = _ragconnect_mqttHandler; + public void WorldModelA.ragconnectCloseConnections() { + + _ragconnect_mqttHandler.close(); + + trace().setReceiver(_ragconnect_Observer().oldReceiver); + _ragconnect_resetObserver(); + } + + + public void WorldModelA.ragconnectSetupMqttWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) { + _ragconnect_mqttHandler.setupWaitUntilReady(time, unit); + } + public class MqttServerHandler { + private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>(); + private final java.util.Map<RagConnectToken, java.util.function.BiConsumer<String, byte[]>> tokensForRemoval = new java.util.HashMap<>(); + private long time; + private java.util.concurrent.TimeUnit unit; + private String name; + + public MqttServerHandler() { + this("RagConnect"); + } + + public MqttServerHandler(String name) { + this.name = name; + setupWaitUntilReady(1, java.util.concurrent.TimeUnit.SECONDS); + } + + public void setupWaitUntilReady(long time, java.util.concurrent.TimeUnit unit) { + this.time = time; + this.unit = unit; + } + + public MqttHandler resolveHandler(java.net.URI uri) throws java.io.IOException { + MqttHandler handler = handlers.get(uri.getHost()); + if (handler == null) { + // first connect to that server + handler = new MqttHandler(); + if (uri.getPort() == -1) { + handler.setHost(uri.getHost()); + } else { + handler.setHost(uri.getHost(), uri.getPort()); + } + handlers.put(uri.getHost(), handler); + } + handler.waitUntilReady(this.time, this.unit); + return handler; + } + + public boolean newConnection(RagConnectToken connectToken, java.util.function.BiConsumer<String, byte[]> callback) throws java.io.IOException { + tokensForRemoval.put(connectToken, callback); + return resolveHandler(connectToken.uri).newConnection(extractTopic(connectToken.uri), callback); + } + + public boolean disconnect(RagConnectToken connectToken) throws java.io.IOException { + MqttHandler handler = resolveHandler(connectToken.uri); + return handler != null ? handler.disconnect(extractTopic(connectToken.uri), tokensForRemoval.get(connectToken)) : false; + } + + public void publish(java.net.URI uri, byte[] bytes) throws java.io.IOException { + resolveHandler(uri).publish(extractTopic(uri), bytes); + } + + public void publish(java.net.URI uri, byte[] bytes, boolean retain) throws java.io.IOException { + resolveHandler(uri).publish(extractTopic(uri), bytes, retain); + } + + public void publish(java.net.URI uri, byte[] bytes, + org.fusesource.mqtt.client.QoS qos, boolean retain) throws java.io.IOException { + resolveHandler(uri).publish(extractTopic(uri), bytes, qos, retain); + } + + public static String extractTopic(java.net.URI uri) { + String path = uri.getPath(); + if (uri.getFragment() != null) { + // do not also append fragment, as it is illegal, that anything follows "#" in a mqtt topic anyway + path += "#"; + } + if (path.charAt(0) == '/') { + path = path.substring(1); + } + return path; + } + + public void close() { + for (MqttHandler handler : handlers.values()) { + handler.close(); + } + } + +} +/** + * Helper class to receive updates via MQTT and use callbacks to handle those messages. + * + * @author rschoene - Initial contribution + */ +public class MqttHandler { + private class PatternCallbackListPair { + java.util.regex.Pattern pattern; + java.util.List<java.util.function.BiConsumer<String, byte[]>> callbacks; + } + private static final int DEFAULT_PORT = 1883; + + private final String name; + + /** The host running the MQTT broker. */ + private java.net.URI host; + /** The connection to the MQTT broker. */ + private org.fusesource.mqtt.client.CallbackConnection connection; + /** Whether we are connected yet */ + private final java.util.concurrent.CountDownLatch readyLatch; + private final java.util.concurrent.locks.Lock astLock; + private boolean sendWelcomeMessage = true; + private org.fusesource.mqtt.client.QoS qos; + /** Dispatch knowledge */ + private final java.util.Map<String, java.util.List<java.util.function.BiConsumer<String, byte[]>>> normalCallbacks; + private final java.util.List<PatternCallbackListPair> wildcardCallbacks; + + public MqttHandler() { + this("RagConnect"); + } + + public MqttHandler(String name) { + this.name = java.util.Objects.requireNonNull(name, "Name must be set"); + this.normalCallbacks = new java.util.HashMap<>(); + this.wildcardCallbacks = new java.util.ArrayList<>(); + this.readyLatch = new java.util.concurrent.CountDownLatch(1); + this.qos = org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE; + this.astLock = new java.util.concurrent.locks.ReentrantLock(); + } + + public MqttHandler dontSendWelcomeMessage() { + this.sendWelcomeMessage = false; + return this; + } + + /** + * Sets the host to receive messages from, and connects to it. + * @param host name of the host to connect to, format is either <code>"$name"</code> or <code>"$name:$port"</code> + * @throws java.io.IOException if could not connect, or could not subscribe to a topic + * @return self + */ + public MqttHandler setHost(String host) throws java.io.IOException { + if (host.contains(":")) { + int colon_index = host.indexOf(":"); + return setHost(host.substring(0, colon_index), + Integer.parseInt(host.substring(colon_index + 1))); + } + return setHost(host, DEFAULT_PORT); + } + + /** + * Sets the host to receive messages from, and connects to it. + * @throws java.io.IOException if could not connect, or could not subscribe to a topic + * @return self + */ + public MqttHandler setHost(String host, int port) throws java.io.IOException { + java.util.Objects.requireNonNull(host, "Host need to be set!"); + + this.host = java.net.URI.create("tcp://" + host + ":" + port); + ASTNode._ragconnect_logStdOut("Host for %s is %s", this.name, this.host); + + org.fusesource.mqtt.client.MQTT mqtt = new org.fusesource.mqtt.client.MQTT(); + mqtt.setHost(this.host); + connection = mqtt.callbackConnection(); + java.util.concurrent.atomic.AtomicReference<Throwable> error = new java.util.concurrent.atomic.AtomicReference<>(); + + // add the listener to dispatch messages later + connection.listener(new org.fusesource.mqtt.client.ExtendedListener() { + public void onConnected() { + ASTNode._ragconnect_logStdOut("Connected"); + } + + @Override + public void onDisconnected() { + ASTNode._ragconnect_logStdOut("Disconnected"); + } + + @Override + public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topic, + org.fusesource.hawtbuf.Buffer body, + org.fusesource.mqtt.client.Callback<org.fusesource.mqtt.client.Callback<Void>> ack) { + // this method is called, whenever a MQTT message is received + final String topicString = topic.toString(); + java.util.List<java.util.function.BiConsumer<String, byte[]>> callbackList = callbacksFor(topicString); + if (callbackList.isEmpty()) { + ASTNode._ragconnect_logStdOut("Got a message at %s, but no callback to call. Forgot to subscribe?", topic); + } else { + byte[] message = body.toByteArray(); + for (java.util.function.BiConsumer<String, byte[]> callback : callbackList) { + try { + astLock.lock(); + callback.accept(topicString, message); + } catch (Exception e) { + ASTNode._ragconnect_logStdErr("Exception in callback for " + topicString, e); + } finally { + astLock.unlock(); + } + } + } + ack.onSuccess(null); // always acknowledge message + } + + @Override + public void onPublish(org.fusesource.hawtbuf.UTF8Buffer topicBuffer, + org.fusesource.hawtbuf.Buffer body, + Runnable ack) { + // not used by this type of connection + ASTNode._ragconnect_logStdErr("onPublish should not be called"); + } + + @Override + public void onFailure(Throwable cause) { + error.set(cause); + } + }); + throwIf(error); + + // actually establish the connection + connection.connect(new org.fusesource.mqtt.client.Callback<>() { + @Override + public void onSuccess(Void value) { + if (MqttHandler.this.sendWelcomeMessage) { + connection.publish("components", + (name + " is connected").getBytes(), + org.fusesource.mqtt.client.QoS.AT_LEAST_ONCE, + false, + new org.fusesource.mqtt.client.Callback<>() { + @Override + public void onSuccess(Void value) { + ASTNode._ragconnect_logStdOut("success sending welcome message"); + setReady(); + } + + @Override + public void onFailure(Throwable cause) { + ASTNode._ragconnect_logStdErr("failure sending welcome message", cause); + } + }); + } else { + setReady(); + } + } + + @Override + public void onFailure(Throwable cause) { + error.set(cause); + } + }); + throwIf(error); + return this; + } + + private java.util.List<java.util.function.BiConsumer<String, byte[]>> callbacksFor(String topicString) { + java.util.List<java.util.function.BiConsumer<String, byte[]>> result = new java.util.ArrayList<>(); + java.util.List<java.util.function.BiConsumer<String, byte[]>> normalCallbackList = normalCallbacks.get(topicString); + if (normalCallbackList != null) { + result.addAll(normalCallbackList); + } + wildcardCallbacks.forEach(pair -> { + if (pair.pattern.matcher(topicString).matches()) { + result.addAll(pair.callbacks); + } + }); + return result; + } + + public java.net.URI getHost() { + return host; + } + + private void setReady() { + readyLatch.countDown(); + } + + private void throwIf(java.util.concurrent.atomic.AtomicReference<Throwable> error) throws java.io.IOException { + if (error.get() != null) { + throw new java.io.IOException(error.get()); + } + } + + public void setQoSForSubscription(org.fusesource.mqtt.client.QoS qos) { + this.qos = qos; + } + + /** + * Establish a new connection for some topic. + * @param topic the topic to create a connection for, may contain the wildcards "*" and "#" + * @param callback the callback to run if a new message arrives for this topic + * @return true if successful stored this connection, false otherwise (e.g., on failed subscribe) + */ + public boolean newConnection(String topic, java.util.function.Consumer<byte[]> callback) { + return newConnection(topic, (ignoredTopicString, bytes) -> callback.accept(bytes)); + } + + /** + * Establish a new connection for some topic. + * @param topic the topic to create a connection for, may contain the wildcards "*" and "#" + * @param callback the callback to run if a new message arrives for this topic + * @return true if successful stored this connection, false otherwise (e.g., on failed subscribe) + */ + public boolean newConnection(String topic, java.util.function.BiConsumer<String, byte[]> callback) { + if (readyLatch.getCount() > 0) { + System.err.println("Handler not ready"); + return false; + } + // register callback + ASTNode._ragconnect_logStdOut("new connection for %s", topic); + final boolean needSubscribe; + if (isWildcardTopic(topic)) { + String regex = regexForWildcardTopic(topic); + PatternCallbackListPair pairToAddTo = null; + for (PatternCallbackListPair pair : wildcardCallbacks) { + if (pair.pattern.pattern().equals(regex)) { + pairToAddTo = pair; + break; + } + } + if (pairToAddTo == null) { + pairToAddTo = new PatternCallbackListPair(); + pairToAddTo.pattern = java.util.regex.Pattern.compile(regex); + pairToAddTo.callbacks = new java.util.ArrayList<>(); + wildcardCallbacks.add(pairToAddTo); + } + needSubscribe = pairToAddTo.callbacks.isEmpty(); + pairToAddTo.callbacks.add(callback); + } else { // normal topic + java.util.List<java.util.function.BiConsumer<String, byte[]>> callbacksForTopic = normalCallbacks. + computeIfAbsent(topic, t -> new java.util.ArrayList<>()); + needSubscribe = callbacksForTopic.isEmpty(); + callbacksForTopic.add(callback); + } + if (needSubscribe) { + // subscribe at broker + java.util.concurrent.CountDownLatch operationFinished = new java.util.concurrent.CountDownLatch(1); + java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>(true); + org.fusesource.mqtt.client.Topic[] topicArray = { new org.fusesource.mqtt.client.Topic(topic, this.qos) }; + connection.getDispatchQueue().execute(() -> { + connection.subscribe(topicArray, new org.fusesource.mqtt.client.Callback<>() { + @Override + public void onSuccess(byte[] qoses) { + ASTNode._ragconnect_logStdOut("Subscribed to %s, qoses: %s", topic, qoses); + operationFinished.countDown(); + } + + @Override + public void onFailure(Throwable cause) { + ASTNode._ragconnect_logStdErr("Could not subscribe to " + topic, cause); + success.set(false); + operationFinished.countDown(); + } + }); + }); + try { + boolean finishedInTime = operationFinished.await(2, java.util.concurrent.TimeUnit.SECONDS); + if (!finishedInTime) { + return false; + } + return success.get(); + } catch (InterruptedException e) { + return false; + } + } else { + return true; + } + } + + private boolean isWildcardTopic(String topic) { + return topic.contains("*") || topic.contains("#"); + } + + private String regexForWildcardTopic(String topic) { + return topic.replace("*", "[^/]*").replace("#", ".*"); + } + + public boolean disconnect(String topic, java.util.function.BiConsumer<String, byte[]> callback) { + boolean needUnsubscribe = false; + java.util.concurrent.atomic.AtomicReference<Boolean> success = new java.util.concurrent.atomic.AtomicReference<>(true); + + final String topicToUnsubscribe; + + // check if wildcard is to be removed + if (isWildcardTopic(topic)) { + boolean topicRegistered = false; + String topicRegex = regexForWildcardTopic(topic); + for (PatternCallbackListPair pair : wildcardCallbacks) { + if (pair.pattern.pattern().equals(topicRegex)) { + topicRegistered = true; + // if still successful, update with whether callback could be removed + success.compareAndSet(true, pair.callbacks.remove(callback)); + // if no more callbacks left, unsubscribe and remove from list + if (pair.callbacks.isEmpty()) { + needUnsubscribe = true; + wildcardCallbacks.remove(pair.pattern); + } + break; + } + } + topicToUnsubscribe = topicRegistered ? topicRegex : null; + } else if (normalCallbacks.containsKey(topic)) { + topicToUnsubscribe = topic; + var normalCallbackList = normalCallbacks.get(topic); + // if still successful, update with whether callback could be removed + success.compareAndSet(true, normalCallbackList.remove(callback)); + // if no more callbacks left, unsubscribe and remove from list + if (normalCallbackList.isEmpty()) { + needUnsubscribe = true; + normalCallbacks.remove(topic); + } + } else { + topicToUnsubscribe = null; + } + + if (topicToUnsubscribe == null) { + ASTNode._ragconnect_logStdErr("Disconnect for not connected topic '%s'", topic); + return false; + } + + if (needUnsubscribe) { + java.util.concurrent.CountDownLatch operationFinished = new java.util.concurrent.CountDownLatch(1); + // no callbacks anymore for this topic, unsubscribe from mqtt + connection.getDispatchQueue().execute(() -> { + org.fusesource.hawtbuf.UTF8Buffer topicBuffer = org.fusesource.hawtbuf.Buffer.utf8(topicToUnsubscribe); + org.fusesource.hawtbuf.UTF8Buffer[] topicArray = new org.fusesource.hawtbuf.UTF8Buffer[]{topicBuffer}; + connection.unsubscribe(topicArray, new org.fusesource.mqtt.client.Callback<>() { + @Override + public void onSuccess(Void value) { + operationFinished.countDown(); + } + + @Override + public void onFailure(Throwable cause) { + success.set(false); + ASTNode._ragconnect_logStdErr("Could not disconnect from " + topic, cause); + operationFinished.countDown(); + } + }); + }); + try { + boolean finishedInTime = operationFinished.await(2, java.util.concurrent.TimeUnit.SECONDS); + if (!finishedInTime) { + return false; + } + } catch (InterruptedException e) { + ASTNode._ragconnect_logStdErr("Interrupted while disconnecting from " + topic, e); + success.set(false); + } + } + return success.get(); + } + + /** + * Waits until this updater is ready to receive MQTT messages. + * If it already is ready, return immediately with the value <code>true</code>. + * Otherwise waits for the given amount of time, and either return <code>true</code> within the timespan, + * if it got ready, or <code>false</code> upon a timeout. + * @param time the maximum time to wait + * @param unit the time unit of the time argument + * @return whether this updater is ready + */ + public boolean waitUntilReady(long time, java.util.concurrent.TimeUnit unit) { + try { + return readyLatch.await(time, unit); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return false; + } + + public void close() { + if (connection == null) { + ASTNode._ragconnect_logStdErr("Stopping without connection. Was setHost() called?"); + return; + } + connection.getDispatchQueue().execute(() -> { + connection.disconnect(new org.fusesource.mqtt.client.Callback<>() { + @Override + public void onSuccess(Void value) { + ASTNode._ragconnect_logStdOut("Disconnected %s from %s", name, host); + } + + @Override + public void onFailure(Throwable ignored) { + // Disconnects never fail. And we do not care either. + } + }); + }); + } + + public void publish(String topic, byte[] bytes) { + publish(topic, bytes, false); + } + + public void publish(String topic, byte[] bytes, boolean retain) { + publish(topic, bytes, this.qos, retain); + } + + public void publish(String topic, byte[] bytes, org.fusesource.mqtt.client.QoS qos, boolean retain) { + try { + astLock.lock(); + connection.getDispatchQueue().execute(() -> { + connection.publish(topic, bytes, qos, retain, new org.fusesource.mqtt.client.Callback<>() { + @Override + public void onSuccess(Void value) { + ASTNode._ragconnect_logStdOut("Published some bytes to %s", topic); + } + + @Override + public void onFailure(Throwable cause) { + ASTNode._ragconnect_logStdErr("Could not publish on topic " + topic, cause); + } + }); + }); + } finally { + astLock.unlock(); + } + } +} + + + + class RagConnectToken { + static java.util.concurrent.atomic.AtomicLong counter = new java.util.concurrent.atomic.AtomicLong(0); + final long id; + final java.net.URI uri; + final String entityName; + public RagConnectToken(java.net.URI uri, String entityName) { + this.id = counter.incrementAndGet(); + this.uri = uri; + this.entityName = entityName; + } + } + class RagConnectTokenMap { + java.util.Map<ASTNode, java.util.List<RagConnectToken>> connectTokensSend = new java.util.HashMap<>(); + java.util.Map<ASTNode, java.util.List<RagConnectToken>> connectTokensReceive = new java.util.HashMap<>(); + void add(ASTNode node, boolean isReceive, RagConnectToken token) { + java.util.Map<ASTNode, java.util.List<RagConnectToken>> mapOfTokens = (isReceive ? connectTokensReceive : connectTokensSend); + mapOfTokens.computeIfAbsent(node, n -> new java.util.ArrayList<>()).add(token); + } + java.util.List<RagConnectToken> removeAll(ASTNode node, boolean isReceive, java.net.URI uri, String entityName) { + java.util.List<RagConnectToken> listOfTokens = (isReceive ? connectTokensReceive : connectTokensSend).get(node); + if (listOfTokens == null) { + return java.util.Collections.emptyList(); + } + java.util.List<RagConnectToken> tokensToRemove = listOfTokens.stream() + .filter(token -> token.uri.equals(uri) && token.entityName.equals(entityName)) + .collect(java.util.stream.Collectors.toList()); + listOfTokens.removeAll(tokensToRemove); + return tokensToRemove; + } + } + static RagConnectTokenMap ASTNode.connectTokenMap = new RagConnectTokenMap(); + + interface RagConnectDisconnectHandlerMethod { + boolean call(RagConnectToken token) throws java.io.IOException; + } + + class RagConnectPublisher { + java.util.List<Runnable> senders = new java.util.ArrayList<>(); + java.util.Map<RagConnectToken, Runnable> tokenToSender; + private byte[] lastValue; + + void add(Runnable sender, RagConnectToken connectToken) { + if (tokenToSender == null) { + tokenToSender = new java.util.HashMap<>(); + } + senders.add(sender); + tokenToSender.put(connectToken, sender); + } + + boolean remove(RagConnectToken token) { + String errorMessage = internal_remove(token); + if (errorMessage == null) { + return true; + } else { + ASTNode._ragconnect_logStdErr(errorMessage); + return false; + } + } + + /** + * (internal) Removes the token, returning an error message if there is one. + * @param token the token to be removed + * @return an error message (upon error), or null (upon success) + */ + String internal_remove(RagConnectToken token) { + if (tokenToSender == null) { + return "Removing sender before first addition for " + token.entityName + " at " + token.uri; + } + Runnable sender = tokenToSender.remove(token); + if (sender == null) { + return "Could not find connected sender for " + token.entityName + " at " + token.uri; + } + boolean success = senders.remove(sender); + if (senders.isEmpty()) { + lastValue = null; + } + return success ? null : "Could not remove sender for " + token.entityName + " at " + token.uri; + } + + void run() { + senders.forEach(Runnable::run); + } + + void run(RagConnectToken token) { + tokenToSender.get(token).run(); + } + + byte[] getLastValue() { + return lastValue; + } + + void setLastValue(byte[] value) { + this.lastValue = value; + } + } + + class RagConnectMappingPublisher { + java.util.Map<Integer, RagConnectPublisher> publishers = new java.util.HashMap<>(); + + void add(Runnable sender, int index, RagConnectToken connectToken) { + publishers.computeIfAbsent(index, ignoredIndex -> new RagConnectPublisher()).add(sender, connectToken); + } + + boolean remove(RagConnectToken token) { + // publishers.forEach((index, publisher) -> publisher.remove(token)); + // remove token from each publisher, at least one has to successfully remove the token to make this call a success + boolean result = false; + java.util.List<String> errorMessages = new java.util.ArrayList<>(); + for (RagConnectPublisher publisher : publishers.values()) { + String errorMessage = publisher.internal_remove(token); + if (errorMessage == null) { + result = true; + } else { + errorMessages.add(errorMessage); + } + } + if (!result) { + // only print error message, if all publishers failed to remove the token + for (String message : errorMessages) { + ASTNode._ragconnect_logStdErr(message); + } + } + return result; + } + + void run(int index) { + java.util.Optional.ofNullable(publishers.get(index)).ifPresent(RagConnectPublisher::run); + } + + void run(int index, RagConnectToken token) { + java.util.Optional.ofNullable(publishers.get(index)).ifPresent(publisher -> publisher.run(token)); + } + + byte[] getLastValue(int index) { + RagConnectPublisher publisher = publishers.get(index); + if (publisher == null) { + return null; + } + return publisher.getLastValue(); + } + + void setLastValue(int index, final byte[] value) { + java.util.Optional.ofNullable(publishers.get(index)).ifPresent(publisher -> publisher.setLastValue(value)); + } + } +} + +aspect RagConnect { + + static void ASTNode._ragconnect_logStdOut(String message, Object... args) { + System.out.println(String.format(message, args)); + } + + static void ASTNode._ragconnect_logStdOut(String message, Throwable t) { + System.out.println(message); + t.printStackTrace(); + } + + static void ASTNode._ragconnect_logStdErr(String message, Object... args) { + System.err.println(String.format(message, args)); + } + + static void ASTNode._ragconnect_logStdErr(String message, Throwable t) { + System.err.println(message); + t.printStackTrace(); + } + + public void WorldModelA.ragconnectCheckIncremental() { + // check if --tracing is active + trace().getReceiver(); + // check if tracing of INC_FLUSH_ATTR is possible, i.e., if --tracing=flush + ASTState.Trace.Event checkTracing = ASTState.Trace.Event.INC_FLUSH_ATTR; + // check if --rewrite is active + mayHaveRewrite(); + // check if --incremental is active + Object checkIncremental = inc_throwAway_visited; + } +} + + +aspect RagConnectObserver { + + class RagConnectObserver implements ASTState.Trace.Receiver { + + class RagConnectObserverEntry { + final ASTNode node; + final String attributeString; + final boolean compareParams; + final Object params; + final Runnable attributeCall; + final java.util.List<RagConnectToken> connectList = new java.util.ArrayList<>(); + + RagConnectObserverEntry(ASTNode node, String attributeString, + boolean compareParams, Object params, Runnable attributeCall) { + this.node = node; + this.attributeString = attributeString; + this.compareParams = compareParams; + this.params = params; + this.attributeCall = attributeCall; + } + + boolean baseMembersEqualTo(RagConnectObserverEntry other) { + return baseMembersEqualTo(other.node, other.attributeString, other.compareParams, other.params); + } + + boolean baseMembersEqualTo(ASTNode otherNode, String otherAttributeString, + boolean otherCompareParams, Object otherParams) { + return this.node.equals(otherNode) && + this.attributeString.equals(otherAttributeString) && + this.compareParams == otherCompareParams && + (!this.compareParams || java.util.Objects.equals(this.params, otherParams)); + } + } + + class RagConnectObserverStartEntry { + final ASTNode node; + final String attributeString; + final Object flushIncToken; + RagConnectObserverStartEntry(ASTNode node, String attributeString, Object flushIncToken) { + this.node = node; + this.attributeString = attributeString; + this.flushIncToken = flushIncToken; + } + } + + ASTState.Trace.Receiver oldReceiver; + + java.util.List<RagConnectObserverEntry> observedNodes = new java.util.ArrayList<>(); + + java.util.Set<RagConnectObserverEntry> entryQueue = new java.util.HashSet<>(); + java.util.Deque<RagConnectObserverStartEntry> startEntries = new java.util.LinkedList<>(); + + RagConnectObserver(ASTNode node) { + // set the receiver. potentially dangerous because overriding existing receiver! + oldReceiver = node.trace().getReceiver(); + node.trace().setReceiver(this); + } + + void add(RagConnectToken connectToken, ASTNode node, String attributeString, Runnable attributeCall) { + internal_add(connectToken, node, attributeString, false, null, attributeCall); + } + void add(RagConnectToken connectToken, ASTNode node, String attributeString, Object params, Runnable attributeCall) { + internal_add(connectToken, node, attributeString, true, params, attributeCall); + } + + private void internal_add(RagConnectToken connectToken, ASTNode node, String attributeString, + boolean compareParams, Object params, Runnable attributeCall) { + // either add to an existing entry (with same node, attribute) or create new entry + boolean needNewEntry = true; + for (RagConnectObserverEntry entry : observedNodes) { + if (entry.baseMembersEqualTo(node, attributeString, compareParams, params)) { + entry.connectList.add(connectToken); + needNewEntry = false; + break; + } + } + if (needNewEntry) { + RagConnectObserverEntry newEntry = new RagConnectObserverEntry(node, attributeString, + compareParams, params, attributeCall); + newEntry.connectList.add(connectToken); + observedNodes.add(newEntry); + } + } + + void remove(RagConnectToken connectToken) { + RagConnectObserverEntry entryToDelete = null; + for (RagConnectObserverEntry entry : observedNodes) { + entry.connectList.remove(connectToken); + if (entry.connectList.isEmpty()) { + entryToDelete = entry; + } + } + if (entryToDelete != null) { + observedNodes.remove(entryToDelete); + } + } + + @Override + public void accept(ASTState.Trace.Event event, ASTNode node, String attribute, Object params, Object value) { + oldReceiver.accept(event, node, attribute, params, value); + // react to INC_FLUSH_START and remember entry + if (event == ASTState.Trace.Event.INC_FLUSH_START) { + startEntries.addFirst(new RagConnectObserverStartEntry(node, attribute, value)); + return; + } + + // react to INC_FLUSH_END and process queued entries, if it matches start entry + if (event == ASTState.Trace.Event.INC_FLUSH_END) { + if (startEntries.isEmpty()) { + return; + } + RagConnectObserverStartEntry startEntry = startEntries.peekFirst(); + if (node == startEntry.node && + attribute == startEntry.attributeString && + value == startEntry.flushIncToken) { + // create a copy of the queue to avoid entering this again causing an endless recursion + RagConnectObserverEntry[] entriesToProcess = entryQueue.toArray(new RagConnectObserverEntry[entryQueue.size()]); + entryQueue.clear(); + startEntries.removeFirst(); + for (RagConnectObserverEntry entry : entriesToProcess) { + entry.attributeCall.run(); + } + return; + } + } + + // ignore all other events but INC_FLUSH_ATTR + if (event != ASTState.Trace.Event.INC_FLUSH_ATTR) { + return; + } + + // iterate through list, if matching pair. could maybe be more efficient. + for (RagConnectObserverEntry entry : observedNodes) { + if (entry.node.equals(node) && entry.attributeString.equals(attribute) && (!entry.compareParams || java.util.Objects.equals(entry.params, params))) { + // hit. call the attribute/nta-token + entryQueue.add(entry); + } + } + } + } + + private static RagConnectObserver ASTNode._ragconnect_ObserverInstance; + RagConnectObserver ASTNode._ragconnect_Observer() { + if (_ragconnect_ObserverInstance == null) { + // does not matter, which node is used to create the observer as ASTState/tracing is also static + _ragconnect_ObserverInstance = new RagConnectObserver(this); + } + return _ragconnect_ObserverInstance; + } + void ASTNode._ragconnect_resetObserver() { + _ragconnect_ObserverInstance = null; + } +} + + +aspect EvaluationCounter { + public String ASTNode.ragconnectEvaluationCounterSummary() { + return _ragconnect_evaluationCounter.summary(); + } + static EvaluationCounter ASTNode._ragconnect_evaluationCounter = new EvaluationCounter(); + + public class EvaluationCounter { + private java.util.Map<String, java.util.Map<String, _ragconnect_Counter>> counters = new java.util.HashMap<>(); + private final java.util.function.Function<? super String, ? extends java.util.Map<String, _ragconnect_Counter>> parentAbsent = key -> { + return new java.util.HashMap<>(); + }; + private final java.util.function.Function<? super String, ? extends _ragconnect_Counter> entityAbsent = key -> { + return new _ragconnect_Counter(); + }; + + public void incrementReceive(String parentTypeName, String entityName) { + getCounter(parentTypeName, entityName).receive += 1; + } + + public void incrementSend(String parentTypeName, String entityName) { + getCounter(parentTypeName, entityName).send += 1; + } + + public void incrementCall(String parentTypeName, String entityName) { + getCounter(parentTypeName, entityName).call += 1; + } + + public void incrementFirstNull(String parentTypeName, String entityName) { + getCounter(parentTypeName, entityName).firstNull += 1; + } + + public void incrementSkip(String parentTypeName, String entityName) { + getCounter(parentTypeName, entityName).skip += 1; + } + + public void incrementException(String parentTypeName, String entityName) { + getCounter(parentTypeName, entityName).exception += 1; + } + + public void incrementReject(String parentTypeName, String entityName) { + getCounter(parentTypeName, entityName).reject += 1; + } + + public String summary() { + StringBuilder sb = new StringBuilder(); + // header + sb.append("parentTypeName,entityName,receive,send,call,firstNull,skip,exception,reject").append("\n"); + // values + java.util.Set<String> sortedParentTypes = new java.util.TreeSet<>(counters.keySet()); + for (String parentType : sortedParentTypes) { + java.util.Set<String> sortedEntityNames = new java.util.TreeSet<>(counters.get(parentType).keySet()); + for (String entityName : sortedEntityNames) { + _ragconnect_Counter count = getCounter(parentType, entityName); + java.util.StringJoiner sj = new java.util.StringJoiner(",", "", "\n"); + sj.add(parentType) + .add(entityName) + .add(Integer.toString(count.receive)) + .add(Integer.toString(count.send)) + .add(Integer.toString(count.call)) + .add(Integer.toString(count.firstNull)) + .add(Integer.toString(count.skip)) + .add(Integer.toString(count.exception)) + .add(Integer.toString(count.reject)) + ; + sb.append(sj); + } + } + return sb.toString(); + } + + private _ragconnect_Counter getCounter(String parentTypeName, String entityName) { + return counters.computeIfAbsent(parentTypeName, parentAbsent).computeIfAbsent(entityName, entityAbsent); + } + } + + class _ragconnect_Counter { + int receive = 0; + int send = 0; + int call = 0; + int firstNull = 0; + int skip = 0; + int exception = 0; + int reject = 0; + } +} diff --git a/ros3rag.placeB/src/main/jastadd/WorldModelB.connect b/ros3rag.placeB/src/main/jastadd/WorldModelB.connect index 9d58814df9fdceb98b96e95dab48754f4bd4ae29..da2da8dfadee74aa33c652094b854b0ea4d38f96 100644 --- a/ros3rag.placeB/src/main/jastadd/WorldModelB.connect +++ b/ros3rag.placeB/src/main/jastadd/WorldModelB.connect @@ -26,19 +26,20 @@ ParseCommand maps byte[] bytes to de.tudresden.inf.st.ceti.Command {: :} ConvertCommand maps de.tudresden.inf.st.ceti.Command command to Operation {: + WorldModelB model = (WorldModelB) this; if (command.hasPickAndPlace()) { return new PickAndPlace() - .setObjectToPick(worldModelB().getMyScene().getLogicalScene().resolveLogicalObjectOfInterest(command.getPickAndPlace().getIdPick()).asLogicalMovableObject()) - .setTargetLocation(worldModelB().getMyScene().resolveObjectOfInterest(command.getPickAndPlace().getIdPlace()).asDropOffLocation()) - .setRobotToExecute(worldModelB().findRobot(command.getPickAndPlace().getIdRobot()).get()); + .setObjectToPick(model.getMyScene().getLogicalScene().resolveLogicalObjectOfInterest(command.getPickAndPlace().getIdPick()).asLogicalMovableObject()) + .setTargetLocation(model.getMyScene().resolveObjectOfInterest(command.getPickAndPlace().getIdPlace()).asDropOffLocation()) + .setRobotToExecute(model.findRobot(command.getPickAndPlace().getIdRobot()).get()); } else if (command.hasConfigChange()) { return new ConfigChange() - .setCollaborationZone(worldModelB().getMyScene().resolveObjectOfInterest(command.getConfigChange().getIdCollaborationZone()).asDropOffLocation().asCollaborationZone()) - .setRobotToExecute(worldModelB().findRobot(command.getConfigChange().getIdRobotNewOwner()).get()); + .setCollaborationZone(model.getMyScene().resolveObjectOfInterest(command.getConfigChange().getIdCollaborationZone()).asDropOffLocation().asCollaborationZone()) + .setRobotToExecute(model.findRobot(command.getConfigChange().getIdRobotNewOwner()).get()); } else if (command.hasEvacuate()) { return new Evacuate() - .setCollaborationZone(worldModelB().getMyScene().resolveObjectOfInterest(command.getEvacuate().getIdCollaborationZone()).asDropOffLocation().asCollaborationZone()) - .setRobotToExecute(worldModelB().findRobot(command.getEvacuate().getIdRobot()).get()); + .setCollaborationZone(model.getMyScene().resolveObjectOfInterest(command.getEvacuate().getIdCollaborationZone()).asDropOffLocation().asCollaborationZone()) + .setRobotToExecute(model.findRobot(command.getEvacuate().getIdRobot()).get()); } return new ErrorOperation().setErrorMessage("Could not parse operation " + command); :} diff --git a/ros3rag.placeB/src/main/jastadd/WorldModelB.jadd b/ros3rag.placeB/src/main/jastadd/WorldModelB.jadd index d3ec38b27ce84ea484b85e0767ee6638d376452f..704964c2b233fec402b5ca835ce5f3518f3a26e7 100644 --- a/ros3rag.placeB/src/main/jastadd/WorldModelB.jadd +++ b/ros3rag.placeB/src/main/jastadd/WorldModelB.jadd @@ -567,3 +567,13 @@ aspect Resolving { // // do not resolve real regions and real locations (send by site-A) // refine RefResolverStubs eq LogicalMovableObject.resolveMyLocationByToken(String name) = null; } +aspect RagConnectAddOn { + public void WorldModelB.ragconnectResetEvaluationCounter() { + _ragconnect_evaluationCounter.reset(); + } + class EvaluationCounter { + public void reset() { + counters = new java.util.HashMap<>(); + } + } +} diff --git a/ros3rag.placeB/src/main/java/de/tudresden/inf/st/placeB/MainB.java b/ros3rag.placeB/src/main/java/de/tudresden/inf/st/placeB/MainB.java index c5b8a87adca8e05e3dae099f29421ad33b5aaa84..7e3a199aed5374a3040a0aab5ed33fc6dae539e6 100644 --- a/ros3rag.placeB/src/main/java/de/tudresden/inf/st/placeB/MainB.java +++ b/ros3rag.placeB/src/main/java/de/tudresden/inf/st/placeB/MainB.java @@ -109,6 +109,7 @@ public class MainB extends SharedMainParts<MqttHandler, WorldModelB> { @Override protected WorldModelB createWorldModel() { WorldModelB result = new WorldModelB(); + result.ragconnectResetEvaluationCounter(); result.addOtherScene(new LogicalScene()); return result; } @@ -145,8 +146,8 @@ public class MainB extends SharedMainParts<MqttHandler, WorldModelB> { "OtherScene"); checkSuccess(model.connectNextOperation(mqttUri(config.forB.topicCommand, config), false), "NextOperation"); -// checkSuccess(model.connectExecutedOperation(mqttUri(config.forB.topicCommand, config)), -// "OperationHistory"); + checkSuccess(model.connectExecutedOperation(mqttUri(config.forB.topicCommand, config)), + "OperationHistory"); for (Robot robot : model.getRobotList()) { // self-loop checkSuccess(robot.connectOwnedCollaborationZoneNames(mqttUri(config.forB.topicCommand, config)), diff --git a/settings.gradle b/settings.gradle index 51000054cdeee2737c688c5666e0e3908265d149..bf9973ba7eefa23a0b986f3166d505ca55f85d4e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,7 +10,7 @@ include 'ros3rag.placeA' include 'ros3rag.placeB' include 'ros3rag.common' -include 'ros3rag.altPlaceB' +//include 'ros3rag.altPlaceB' include 'ros3rag.scaling.a' include 'ros3rag.scaling.b'