From 44c641547e24ec6058c7a096a9f79115159e44f1 Mon Sep 17 00:00:00 2001 From: SebastianEbert <sebastian.ebert@tu-dresden.de> Date: Fri, 19 Nov 2021 17:37:05 +0100 Subject: [PATCH] working on ros java backend --- .../src/main/resources/MqttHandler.jadd | 4 +- .../src/main/resources/RosHandler.jadd | 71 ++++++++++++++++++- .../main/resources/receiveDefinition.mustache | 11 +++ 3 files changed, 82 insertions(+), 4 deletions(-) diff --git a/ragconnect.base/src/main/resources/MqttHandler.jadd b/ragconnect.base/src/main/resources/MqttHandler.jadd index a8f065e..734098d 100644 --- a/ragconnect.base/src/main/resources/MqttHandler.jadd +++ b/ragconnect.base/src/main/resources/MqttHandler.jadd @@ -1,7 +1,9 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer;aspect MqttHandler { +import java.util.function.BiConsumer; + +aspect MqttHandler { public class MqttServerHandler { private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>(); private final java.util.Map<ConnectToken, java.util.function.BiConsumer<String, byte[]>> tokensForRemoval = new java.util.HashMap<>(); diff --git a/ragconnect.base/src/main/resources/RosHandler.jadd b/ragconnect.base/src/main/resources/RosHandler.jadd index 5830c8e..ff2ad3b 100644 --- a/ragconnect.base/src/main/resources/RosHandler.jadd +++ b/ragconnect.base/src/main/resources/RosHandler.jadd @@ -4,9 +4,12 @@ import org.ros.namespace.GraphName; import org.ros.node.AbstractNodeMain; import org.ros.node.ConnectedNode; import org.ros.node.topic.Publisher; +import org.ros.node.topic.Subscriber; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.BiConsumer; import org.ros.node.*; @@ -20,12 +23,15 @@ aspect RosHandler { public class RosHandler { public static RosHandler ROS_HANDLER_INSTANCE = null; + private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(RosHandler.class); - List<Pair<String, RosDefaultPublisher>> publishers = new ArrayList<>(); + private final Map<String, RosDefaultPublisher> publishers = new HashMap<>(); + private final java.util.Map<ConnectToken, RosDefaultSubscriber> tokensForRemoval = new java.util.HashMap<>(); NodeMainExecutor nodeMainExecutor = DefaultNodeMainExecutor.newDefault(); NodeConfiguration nodeConfiguration = NodeConfiguration.newPublic("localhost"); + // TODO: replace this with a configurable URI private RosHandler(){ nodeConfiguration.setMasterUri(new URI("http://localhost:11311")); } @@ -37,21 +43,58 @@ aspect RosHandler { return ROS_HANDLER_INSTANCE; } - public ConnectToken newPublisher(java.net.URI uri, String topic){ + public ConnectToken publish (java.net.URI uri, String topic, byte[] msg){ + + RosDefaultPublisher publisher = null; + ConnectToken connectToken = new ConnectToken(uri); + + if(!publishers.get(topic)){ + publisher = newPublisher(uri, topic); + publishers.put(topic, publisher); + }else{ + publisher = publishers.get(topic); + } + + publisher.publish(msg); + return connectToken; + } + + private RosDefaultPublisher newPublisher(java.net.URI uri, String topic){ RosDefaultPublisher publisher = new RosDefaultPublisher(topic); + new Thread(() -> { + nodeMainExecutor.execute(publisher, nodeConfiguration); + }) {{ + start(); + }}; + + return publisher; + } + + public ConnectToken newSubscriber(java.net.URI uri, java.util.function.BiConsumer<String, byte[]> callback){ + + int idx = uri.toString().indexOf("/", uri.toString().indexOf("/") + 1); + + RosDefaultSubscriber subscriber = new RosDefaultSubscriber(uri.toString().substring(idx)); ConnectToken connectToken = new ConnectToken(uri); new Thread(() -> { - nodeMainExecutor.execute(listener, nodeConfiguration); + nodeMainExecutor.execute(subscriber, nodeConfiguration); }) {{ start(); }}; + nodeMainExecutor.addListener(callback); + tokensForRemoval.put(connectToken, subscriber); + return connectToken; } + public void disconnect(ConnectToken connectToken) { + tokensForRemoval.get(connectToken).removeAllMessageListeners(); + tokensForRemoval.remove(connectToken); + } } public class RosDefaultPublisher extends AbstractNodeMain { @@ -85,5 +128,27 @@ aspect RosHandler { public class RosDefaultSubscriber extends AbstractNodeMain { + private String topic; + + private Subscriber<std_msgs.String> subscriber; + + public RosDefaultSubscriber(String topic){ + this.topic = topic; + } + + @Override + public GraphName getDefaultNodeName() { + String nodeID = java.util.UUID.randomUUID().toString(); + return GraphName.of("ragconnect/subscriber/" + nodeID); + } + + @Override + public void onStart(final ConnectedNode connectedNode) { + subscriber = connectedNode.newSubscriber(topic_name, std_msgs.String._TYPE); + } + + public void addListener(BiConsumer<String, byte[]> callback){ + subscriber.addMessageListener(message -> callback.accept(topic, new String(message.getData(), StandardCharsets.UTF_8))); + } } } \ No newline at end of file diff --git a/ragconnect.base/src/main/resources/receiveDefinition.mustache b/ragconnect.base/src/main/resources/receiveDefinition.mustache index ddd50a3..43ac6fd 100644 --- a/ragconnect.base/src/main/resources/receiveDefinition.mustache +++ b/ragconnect.base/src/main/resources/receiveDefinition.mustache @@ -109,6 +109,14 @@ private boolean {{parentTypeName}}.{{internalConnectMethod}}(String {{connectPar } break; {{/usesMqtt}} + {{#usesRos}} + case "ros": + connectToken = {{rosHandlerAttribute}}().newSubscriber(uri, consumer); + if (connectToken == null) { + return false; + } + break; + {{/usesRos}} {{#usesJava}} case "java": String uuid = {{javaHandlerAttribute}}().registerCallback(path, consumer); @@ -144,6 +152,9 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter {{#usesMqtt}} case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); {{/usesMqtt}} + {{#usesRos}} + case "mqtt": return {{rosHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); + {{/usesRos}} {{#usesJava}} case "java": return {{javaHandlerAttribute}}().unregisterCallback(uri.getPath(), connectTokens.get(this).get(uri).globalId); {{/usesJava}} -- GitLab