diff --git a/ragconnect.base/src/main/jastadd/Configuration.jadd b/ragconnect.base/src/main/jastadd/Configuration.jadd index 8c7213850afc5471c70c215d5462442b7da32326..88e4b5edd7ea918143c84b4450f8128060c659a0 100644 --- a/ragconnect.base/src/main/jastadd/Configuration.jadd +++ b/ragconnect.base/src/main/jastadd/Configuration.jadd @@ -7,6 +7,7 @@ aspect Configuration { public static boolean ASTNode.usesMqtt; public static boolean ASTNode.usesRest; public static boolean ASTNode.usesJava; + public static boolean ASTNode.usesRos; public static boolean ASTNode.incrementalOptionActive; public static boolean ASTNode.experimentalJastAdd329; } diff --git a/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd b/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd index 7a09e6547c1ce7c635e2d2f37f9d7db77248ae68..1c0d8f90f83aa40eee3336f66c1c646e0b5dc3c8 100644 --- a/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd +++ b/ragconnect.base/src/main/jastadd/intermediate/Generation.jadd @@ -21,6 +21,9 @@ aspect AttributesForMustache { syn String MRagConnect.javaHandlerAttribute() = "_javaHandler"; syn String MRagConnect.javaHandlerField() = "_javaHandler"; + syn String MRagConnect.rosHandlerAttribute() = "_rosHandler"; + syn String MRagConnect.rosHandlerField() = "_rosHandler"; + syn boolean MRagConnect.hasTreeListEndpoints() = !sendingTreeListEndpoints().isEmpty() || !receivingTreeListEndpoints().isEmpty(); syn List<MTypeEndpointDefinition> MRagConnect.sendingTreeListEndpoints() { List<MTypeEndpointDefinition> result = new ArrayList<>(); @@ -259,6 +262,8 @@ aspect AttributesForMustache { result.restHandlerAttribute(), result.restHandlerField(), usesRest)); result.addHandler(new MHandler("JavaHandler", "JavaHandler.getInstance()", result.javaHandlerAttribute(), result.javaHandlerField(), usesJava)); + result.addHandler(new MHandler("RosHandler", "RosHandler.getInstance()", + result.rosHandlerAttribute(), result.rosHandlerField(), usesRos)); return result; } diff --git a/ragconnect.base/src/main/jastadd/intermediate2mustache/MustacheNodesToYAML.jrag b/ragconnect.base/src/main/jastadd/intermediate2mustache/MustacheNodesToYAML.jrag index bd3afcd607a3be9757ec920251d48080fccd1901..8ceda2137dbd0764c8e10829c714d0f2c2429df0 100644 --- a/ragconnect.base/src/main/jastadd/intermediate2mustache/MustacheNodesToYAML.jrag +++ b/ragconnect.base/src/main/jastadd/intermediate2mustache/MustacheNodesToYAML.jrag @@ -7,6 +7,7 @@ aspect MustacheNodesToYAML { root.put("usesMqtt", usesMqtt); root.put("usesRest", usesRest); root.put("usesJava", usesJava); + root.put("usesRos", usesRos); // mqtt root.put("mqttHandlerField", mqttHandlerField()); @@ -17,6 +18,10 @@ aspect MustacheNodesToYAML { root.put("javaHandlerField", javaHandlerField()); root.put("javaHandlerAttribute", javaHandlerAttribute()); + // ros + root.put("rosHandlerField", rosHandlerField()); + root.put("rosHandlerAttribute", rosHandlerAttribute()); + // rootTypeComponents ListElement rootTypeComponents = new ListElement(); for (MTypeComponent comp : getRootTypeComponentList()) { diff --git a/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java b/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java index 6638c1836dd7537f2ce4d9598a529ad03b223849..d6c936935fb165fb74f956fbc4f1847eff9f9e5b 100644 --- a/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java +++ b/ragconnect.base/src/main/java/org/jastadd/ragconnect/compiler/Compiler.java @@ -33,6 +33,7 @@ public class Compiler extends AbstractCompiler { private static final String OPTION_PROTOCOL_MQTT = "mqtt"; private static final String OPTION_PROTOCOL_REST = "rest"; private static final String OPTION_PROTOCOL_JAVA = "java"; + private static final String OPTION_PROTOCOL_ROS = "ros"; private final static Logger LOGGER = Logger.getLogger(Compiler.class.getName()); @@ -110,6 +111,9 @@ public class Compiler extends AbstractCompiler { if (ASTNode.usesRest) { handlers.add("RestHandler.jadd"); } + if (ASTNode.usesRos) { + handlers.add("RosHandler.jadd"); + } // copy handlers into outputDir for (String handlerFileName : handlers) { try { @@ -181,6 +185,7 @@ public class Compiler extends AbstractCompiler { .acceptMultipleValues(true) .addAcceptedValue(OPTION_PROTOCOL_JAVA, "Enable Java") .addAcceptedValue(OPTION_PROTOCOL_REST, "Enable REST") + .addAcceptedValue(OPTION_PROTOCOL_ROS, "Enable ROS") .addDefaultValue(OPTION_PROTOCOL_MQTT, "Enable MQTT") ); optionPrintYaml = addOption( @@ -265,6 +270,7 @@ public class Compiler extends AbstractCompiler { ASTNode.usesMqtt = optionProtocols.hasValue(OPTION_PROTOCOL_MQTT); ASTNode.usesJava = optionProtocols.hasValue(OPTION_PROTOCOL_JAVA); ASTNode.usesRest = optionProtocols.hasValue(OPTION_PROTOCOL_REST); + ASTNode.usesRos = optionProtocols.hasValue(OPTION_PROTOCOL_ROS); return ragConnect; } diff --git a/ragconnect.base/src/main/resources/JavaHandler.jadd b/ragconnect.base/src/main/resources/JavaHandler.jadd index 1ba24cbce6b712eec317861c6d7de4b8505a0590..98fe6c47cddf36d88db44fe3fbb8f49fcb8fd858 100644 --- a/ragconnect.base/src/main/resources/JavaHandler.jadd +++ b/ragconnect.base/src/main/resources/JavaHandler.jadd @@ -1,3 +1,5 @@ +import org.jastadd.ragconnect.ast.Pair; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -15,8 +17,8 @@ aspect JavaHandler { public static JavaHandler JAVA_HANDLER_INSTANCE = null; private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(JavaHandler.class); - - private Map<String, List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>>> callbackList = new ConcurrentHashMap<>(); + Pair + private Map<String, List<Pair<String, BiConsumer<String, byte[]>>>> callbackList = new ConcurrentHashMap<>(); private JavaHandler() { @@ -35,10 +37,10 @@ aspect JavaHandler { String callbackUUID = java.util.UUID.randomUUID().toString(); - List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> registeredCallbacks = getAllCallbacks().get(topic); + List<Pair<String, BiConsumer<String, byte[]>>> registeredCallbacks = getAllCallbacks().get(topic); if(registeredCallbacks == null){ - List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> newCallbackList = Collections.synchronizedList(new ArrayList<>()); + List<Pair<String, BiConsumer<String, byte[]>>> newCallbackList = Collections.synchronizedList(new ArrayList<>()); newCallbackList.add(new org.apache.commons.lang3.tuple.MutablePair<>(callbackUUID, callback)); callbackList.put(topic, newCallbackList); } else { @@ -52,12 +54,12 @@ aspect JavaHandler { logger.debug("[JAVA_HANDLER] Unregistering callback with uuid: " + uuid + " on path: " + path); - List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(path); + List<Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(path); int count = 0; if(callbacks != null){ - for(org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){ + for(Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){ if(callbackPair.getLeft().equals(uuid)){ callbacks.remove(count); return true; @@ -79,14 +81,14 @@ aspect JavaHandler { logger.info("[JAVA_HANDLER] Data: " + dataString); - List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(topic); + List<Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(topic); if(callbacks == null){ logger.error("[JAVA_HANDLER] Could not publish message. No callback registered for topic " + topic); return false; } - for(org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){ + for(Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){ logger.debug("[JAVA_HANDLER] Calling callback: " + callbackPair.getLeft()); callbackPair.getRight().accept(topic, data); } @@ -94,7 +96,7 @@ aspect JavaHandler { return true; } - public Map<String, List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>>> getAllCallbacks() { + public Map<String, List<Pair<String, BiConsumer<String, byte[]>>>> getAllCallbacks() { return callbackList; } } diff --git a/ragconnect.base/src/main/resources/RosHandler.jadd b/ragconnect.base/src/main/resources/RosHandler.jadd index ff2ad3be2d075f6a45d70cfdd047f97f65d85699..7674938f5c2e10116cb129f31429fb8f74c10ecd 100644 --- a/ragconnect.base/src/main/resources/RosHandler.jadd +++ b/ragconnect.base/src/main/resources/RosHandler.jadd @@ -1,4 +1,3 @@ -import org.apache.commons.lang3.tuple.Pair; import org.ros.concurrent.CancellableLoop; import org.ros.namespace.GraphName; import org.ros.node.AbstractNodeMain; @@ -10,6 +9,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Flow; import java.util.function.BiConsumer; import org.ros.node.*; @@ -17,6 +17,7 @@ import org.ros.node.NodeMainExecutor; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; aspect RosHandler { @@ -24,16 +25,30 @@ aspect RosHandler { public static RosHandler ROS_HANDLER_INSTANCE = null; - private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(RosHandler.class); - private final Map<String, RosDefaultPublisher> publishers = new HashMap<>(); private final java.util.Map<ConnectToken, RosDefaultSubscriber> tokensForRemoval = new java.util.HashMap<>(); - NodeMainExecutor nodeMainExecutor = DefaultNodeMainExecutor.newDefault(); - NodeConfiguration nodeConfiguration = NodeConfiguration.newPublic("localhost"); + private final RosDefaultSubscriber subscriber = new RosDefaultSubscriber(); + private final RosDefaultPublisher publisher = new RosDefaultPublisher(); + + private NodeMainExecutor nodeMainExecutor = DefaultNodeMainExecutor.newDefault(); + private NodeConfiguration nodeConfiguration = NodeConfiguration.newPublic("localhost"); // TODO: replace this with a configurable URI private RosHandler(){ - nodeConfiguration.setMasterUri(new URI("http://localhost:11311")); + + nodeConfiguration.setMasterUri(URI.create("http://localhost:11311")); + + new Thread(() -> { + nodeMainExecutor.execute(subscriber, nodeConfiguration); + }) {{ + start(); + }}; + + new Thread(() -> { + nodeMainExecutor.execute(publisher, nodeConfiguration); + }) {{ + start(); + }}; } public synchronized static RosHandler getInstance(){ @@ -43,112 +58,116 @@ aspect RosHandler { return ROS_HANDLER_INSTANCE; } - public ConnectToken publish (java.net.URI uri, String topic, byte[] msg){ + public void close(){}; - RosDefaultPublisher publisher = null; - ConnectToken connectToken = new ConnectToken(uri); - - if(!publishers.get(topic)){ - publisher = newPublisher(uri, topic); - publishers.put(topic, publisher); - }else{ - publisher = publishers.get(topic); - } - - publisher.publish(msg); - return connectToken; + public void publish (java.net.URI uri, String topic, byte[] msg){ + publisher.publish(extractTopic(uri), msg); } - private RosDefaultPublisher newPublisher(java.net.URI uri, String topic){ - - RosDefaultPublisher publisher = new RosDefaultPublisher(topic); + public String extractTopic(java.net.URI uri){ - new Thread(() -> { - nodeMainExecutor.execute(publisher, nodeConfiguration); - }) {{ - start(); - }}; - - return publisher; + int idx = uri.toString().indexOf("/", uri.toString().indexOf("/") + 1); + return uri.toString().substring(idx); } public ConnectToken newSubscriber(java.net.URI uri, java.util.function.BiConsumer<String, byte[]> callback){ - int idx = uri.toString().indexOf("/", uri.toString().indexOf("/") + 1); - - RosDefaultSubscriber subscriber = new RosDefaultSubscriber(uri.toString().substring(idx)); ConnectToken connectToken = new ConnectToken(uri); - - new Thread(() -> { - nodeMainExecutor.execute(subscriber, nodeConfiguration); - }) {{ - start(); - }}; - - nodeMainExecutor.addListener(callback); + subscriber.addListener(extractTopic(uri), callback); tokensForRemoval.put(connectToken, subscriber); return connectToken; } - public void disconnect(ConnectToken connectToken) { - tokensForRemoval.get(connectToken).removeAllMessageListeners(); + public boolean disconnect(ConnectToken connectToken) { + + tokensForRemoval.get(connectToken).removeAllMessageListeners(extractTopic(connectToken.uri)); tokensForRemoval.remove(connectToken); + return true; } } public class RosDefaultPublisher extends AbstractNodeMain { - private String topic; + private volatile ConnectedNode node; + private Map<String, Publisher<std_msgs.String>> publishers = new HashMap<>(); - private Publisher<std_msgs.String> publisher; - - public RosDefaultPublisher(String topic){ - this.topic = topic; - } + public RosDefaultPublisher(){} @Override public GraphName getDefaultNodeName() { - String nodeID = java.util.UUID.randomUUID().toString(); - return GraphName.of("ragconnect/publisher/" + nodeID); + return GraphName.of("ragconnect/publisher/"); } @Override public void onStart(final ConnectedNode connectedNode) { - publisher = connectedNode.newPublisher(topic_name, std_msgs.String._TYPE); + + System.out.println("Initializing publisher."); + this.node = connectedNode; } - public void publish(byte[] msgContent){ - std_msgs.String msg = publisher.newMessage(); - String s = new String(msgContent, StandardCharsets.UTF_8); - msg.setData(s); - publisher.publish(msg); + public void publish(String topic, byte[] msgContent){ + + System.out.println("Publishing new message to " + topic); + Publisher<std_msgs.String> pub = publishers.get(topic); + + if(pub == null){ + while(true){ + if(this.node != null){ + break; + } + } + pub = this.node.newPublisher(topic, std_msgs.String._TYPE); + + std_msgs.String msg = pub.newMessage(); + String s = new String(msgContent, StandardCharsets.UTF_8); + msg.setData(s); + pub.publish(msg); + } } } public class RosDefaultSubscriber extends AbstractNodeMain { - private String topic; + private volatile ConnectedNode node; + private Map<String, Subscriber<std_msgs.String>> subscribers = new HashMap<>(); - private Subscriber<std_msgs.String> subscriber; - - public RosDefaultSubscriber(String topic){ - this.topic = topic; - } + public RosDefaultSubscriber(){} @Override public GraphName getDefaultNodeName() { - String nodeID = java.util.UUID.randomUUID().toString(); - return GraphName.of("ragconnect/subscriber/" + nodeID); + return GraphName.of("ragconnect/subscriber"); } @Override public void onStart(final ConnectedNode connectedNode) { - subscriber = connectedNode.newSubscriber(topic_name, std_msgs.String._TYPE); + System.out.println("Initializing subscriber."); + this.node = connectedNode; } - public void addListener(BiConsumer<String, byte[]> callback){ - subscriber.addMessageListener(message -> callback.accept(topic, new String(message.getData(), StandardCharsets.UTF_8))); + public void addListener(String topic, BiConsumer<String, byte[]> callback){ + System.out.println("Adding listener"); + + Subscriber<std_msgs.String> sub = subscribers.get(topic); + + if(sub == null){ + while(true){ + if(this.node != null){ + break; + } + } + + sub = this.node.newSubscriber(topic, std_msgs.String._TYPE); + } + sub.addMessageListener(message -> callback.accept(topic, message.getData().getBytes())); + } + + public void removeAllMessageListeners(String topic){ + Subscriber<std_msgs.String> sub = subscribers.get(topic); + + if(sub != null){ + sub.removeAllMessageListeners(); + } } } } \ No newline at end of file diff --git a/ragconnect.base/src/main/resources/receiveDefinition.mustache b/ragconnect.base/src/main/resources/receiveDefinition.mustache index 43ac6fd9451a8143abad0e77c10c1e815070d702..3530fb493080fd98b600fbae34a5d3daa06b13b9 100644 --- a/ragconnect.base/src/main/resources/receiveDefinition.mustache +++ b/ragconnect.base/src/main/resources/receiveDefinition.mustache @@ -153,7 +153,7 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); {{/usesMqtt}} {{#usesRos}} - case "mqtt": return {{rosHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); + case "ros": return {{rosHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); {{/usesRos}} {{#usesJava}} case "java": return {{javaHandlerAttribute}}().unregisterCallback(uri.getPath(), connectTokens.get(this).get(uri).globalId); diff --git a/ragconnect.base/src/main/resources/sendDefinition.mustache b/ragconnect.base/src/main/resources/sendDefinition.mustache index c0d20aa33c20283f8158c33db058eaf1cff36f92..bdeecfc4d21c7226ea615c500a4207dd111a1dff 100644 --- a/ragconnect.base/src/main/resources/sendDefinition.mustache +++ b/ragconnect.base/src/main/resources/sendDefinition.mustache @@ -27,6 +27,23 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam connectToken = new ConnectToken(uri); break; {{/usesMqtt}} + {{#usesRos}} + case "ros": + final RosHandler handler = {{rosHandlerAttribute}}().getInstance(); + final String topic = {{rosHandlerAttribute}}().extractTopic(uri); + + {{sender}} = () -> { + handler.publish(uri, topic, {{lastValue}}); + }; + + {{updateMethod}}(); + if (writeCurrentValue) { + {{writeMethod}}(); + } + + connectToken = new ConnectToken(uri); + break; + {{/usesRos}} {{#usesJava}} case "java": final JavaHandler handler = {{javaHandlerAttribute}}().getInstance(); @@ -85,6 +102,12 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter {{lastValue}} = null; break; {{/usesMqtt}} + {{#usesRos}} + case "mqtt": + {{sender}} = null; + {{lastValue}} = null; + break; + {{/usesRos}} {{#usesJava}} case "java": {{sender}} = null;