Skip to content
Snippets Groups Projects
Commit 44c64154 authored by Sebastian Ebert's avatar Sebastian Ebert
Browse files

working on ros java backend

parent 669090a1
No related branches found
No related tags found
No related merge requests found
Pipeline #11867 failed
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;aspect MqttHandler { import java.util.function.BiConsumer;
aspect MqttHandler {
public class MqttServerHandler { public class MqttServerHandler {
private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>(); private final java.util.Map<String, MqttHandler> handlers = new java.util.HashMap<>();
private final java.util.Map<ConnectToken, java.util.function.BiConsumer<String, byte[]>> tokensForRemoval = new java.util.HashMap<>(); private final java.util.Map<ConnectToken, java.util.function.BiConsumer<String, byte[]>> tokensForRemoval = new java.util.HashMap<>();
......
...@@ -4,9 +4,12 @@ import org.ros.namespace.GraphName; ...@@ -4,9 +4,12 @@ import org.ros.namespace.GraphName;
import org.ros.node.AbstractNodeMain; import org.ros.node.AbstractNodeMain;
import org.ros.node.ConnectedNode; import org.ros.node.ConnectedNode;
import org.ros.node.topic.Publisher; import org.ros.node.topic.Publisher;
import org.ros.node.topic.Subscriber;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import org.ros.node.*; import org.ros.node.*;
...@@ -20,12 +23,15 @@ aspect RosHandler { ...@@ -20,12 +23,15 @@ aspect RosHandler {
public class RosHandler { public class RosHandler {
public static RosHandler ROS_HANDLER_INSTANCE = null; public static RosHandler ROS_HANDLER_INSTANCE = null;
private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(RosHandler.class); private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(RosHandler.class);
List<Pair<String, RosDefaultPublisher>> publishers = new ArrayList<>(); private final Map<String, RosDefaultPublisher> publishers = new HashMap<>();
private final java.util.Map<ConnectToken, RosDefaultSubscriber> tokensForRemoval = new java.util.HashMap<>();
NodeMainExecutor nodeMainExecutor = DefaultNodeMainExecutor.newDefault(); NodeMainExecutor nodeMainExecutor = DefaultNodeMainExecutor.newDefault();
NodeConfiguration nodeConfiguration = NodeConfiguration.newPublic("localhost"); NodeConfiguration nodeConfiguration = NodeConfiguration.newPublic("localhost");
// TODO: replace this with a configurable URI
private RosHandler(){ private RosHandler(){
nodeConfiguration.setMasterUri(new URI("http://localhost:11311")); nodeConfiguration.setMasterUri(new URI("http://localhost:11311"));
} }
...@@ -37,21 +43,58 @@ aspect RosHandler { ...@@ -37,21 +43,58 @@ aspect RosHandler {
return ROS_HANDLER_INSTANCE; return ROS_HANDLER_INSTANCE;
} }
public ConnectToken newPublisher(java.net.URI uri, String topic){ public ConnectToken publish (java.net.URI uri, String topic, byte[] msg){
RosDefaultPublisher publisher = null;
ConnectToken connectToken = new ConnectToken(uri);
if(!publishers.get(topic)){
publisher = newPublisher(uri, topic);
publishers.put(topic, publisher);
}else{
publisher = publishers.get(topic);
}
publisher.publish(msg);
return connectToken;
}
private RosDefaultPublisher newPublisher(java.net.URI uri, String topic){
RosDefaultPublisher publisher = new RosDefaultPublisher(topic); RosDefaultPublisher publisher = new RosDefaultPublisher(topic);
new Thread(() -> {
nodeMainExecutor.execute(publisher, nodeConfiguration);
}) {{
start();
}};
return publisher;
}
public ConnectToken newSubscriber(java.net.URI uri, java.util.function.BiConsumer<String, byte[]> callback){
int idx = uri.toString().indexOf("/", uri.toString().indexOf("/") + 1);
RosDefaultSubscriber subscriber = new RosDefaultSubscriber(uri.toString().substring(idx));
ConnectToken connectToken = new ConnectToken(uri); ConnectToken connectToken = new ConnectToken(uri);
new Thread(() -> { new Thread(() -> {
nodeMainExecutor.execute(listener, nodeConfiguration); nodeMainExecutor.execute(subscriber, nodeConfiguration);
}) {{ }) {{
start(); start();
}}; }};
nodeMainExecutor.addListener(callback);
tokensForRemoval.put(connectToken, subscriber);
return connectToken; return connectToken;
} }
public void disconnect(ConnectToken connectToken) {
tokensForRemoval.get(connectToken).removeAllMessageListeners();
tokensForRemoval.remove(connectToken);
}
} }
public class RosDefaultPublisher extends AbstractNodeMain { public class RosDefaultPublisher extends AbstractNodeMain {
...@@ -85,5 +128,27 @@ aspect RosHandler { ...@@ -85,5 +128,27 @@ aspect RosHandler {
public class RosDefaultSubscriber extends AbstractNodeMain { public class RosDefaultSubscriber extends AbstractNodeMain {
private String topic;
private Subscriber<std_msgs.String> subscriber;
public RosDefaultSubscriber(String topic){
this.topic = topic;
}
@Override
public GraphName getDefaultNodeName() {
String nodeID = java.util.UUID.randomUUID().toString();
return GraphName.of("ragconnect/subscriber/" + nodeID);
}
@Override
public void onStart(final ConnectedNode connectedNode) {
subscriber = connectedNode.newSubscriber(topic_name, std_msgs.String._TYPE);
}
public void addListener(BiConsumer<String, byte[]> callback){
subscriber.addMessageListener(message -> callback.accept(topic, new String(message.getData(), StandardCharsets.UTF_8)));
}
} }
} }
\ No newline at end of file
...@@ -109,6 +109,14 @@ private boolean {{parentTypeName}}.{{internalConnectMethod}}(String {{connectPar ...@@ -109,6 +109,14 @@ private boolean {{parentTypeName}}.{{internalConnectMethod}}(String {{connectPar
} }
break; break;
{{/usesMqtt}} {{/usesMqtt}}
{{#usesRos}}
case "ros":
connectToken = {{rosHandlerAttribute}}().newSubscriber(uri, consumer);
if (connectToken == null) {
return false;
}
break;
{{/usesRos}}
{{#usesJava}} {{#usesJava}}
case "java": case "java":
String uuid = {{javaHandlerAttribute}}().registerCallback(path, consumer); String uuid = {{javaHandlerAttribute}}().registerCallback(path, consumer);
...@@ -144,6 +152,9 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter ...@@ -144,6 +152,9 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
{{#usesMqtt}} {{#usesMqtt}}
case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri)); case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{/usesMqtt}} {{/usesMqtt}}
{{#usesRos}}
case "mqtt": return {{rosHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{/usesRos}}
{{#usesJava}} {{#usesJava}}
case "java": return {{javaHandlerAttribute}}().unregisterCallback(uri.getPath(), connectTokens.get(this).get(uri).globalId); case "java": return {{javaHandlerAttribute}}().unregisterCallback(uri.getPath(), connectTokens.get(this).get(uri).globalId);
{{/usesJava}} {{/usesJava}}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment