Commit 669090a1 authored by Sebastian Ebert's avatar Sebastian Ebert
Browse files

compatibility for new ragconnect version (java backend), working on ros backend

parent ffcb930b
Pipeline #11852 passed with stages
in 8 minutes and 1 second
......@@ -3,7 +3,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
aspect JavaHandler {
......@@ -16,7 +16,7 @@ aspect JavaHandler {
private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(JavaHandler.class);
private Map<String, List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>>> callbackList = new ConcurrentHashMap<>();
private Map<String, List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>>> callbackList = new ConcurrentHashMap<>();
private JavaHandler() {
......@@ -29,16 +29,16 @@ aspect JavaHandler {
return JAVA_HANDLER_INSTANCE;
}
public String registerCallback(String topic, Consumer<byte[]> callback) {
public String registerCallback(String topic, BiConsumer<String, byte[]> callback) {
logger.debug("[JAVA_HANDLER] Registering new callback.");
String callbackUUID = java.util.UUID.randomUUID().toString();
List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> registeredCallbacks = getAllCallbacks().get(topic);
List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> registeredCallbacks = getAllCallbacks().get(topic);
if(registeredCallbacks == null){
List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> newCallbackList = Collections.synchronizedList(new ArrayList<>());
List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> newCallbackList = Collections.synchronizedList(new ArrayList<>());
newCallbackList.add(new org.apache.commons.lang3.tuple.MutablePair<>(callbackUUID, callback));
callbackList.put(topic, newCallbackList);
} else {
......@@ -52,12 +52,12 @@ aspect JavaHandler {
logger.debug("[JAVA_HANDLER] Unregistering callback with uuid: " + uuid + " on path: " + path);
List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> callbacks = getAllCallbacks().get(path);
List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(path);
int count = 0;
if(callbacks != null){
for(org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>> callbackPair : callbacks){
for(org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){
if(callbackPair.getLeft().equals(uuid)){
callbacks.remove(count);
return true;
......@@ -79,22 +79,22 @@ aspect JavaHandler {
logger.info("[JAVA_HANDLER] Data: " + dataString);
List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>> callbacks = getAllCallbacks().get(topic);
List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(topic);
if(callbacks == null){
logger.error("[JAVA_HANDLER] Could not publish message. No callback registered for topic " + topic);
return false;
}
for(org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>> callbackPair : callbacks){
for(org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){
logger.debug("[JAVA_HANDLER] Calling callback: " + callbackPair.getLeft());
callbackPair.getRight().accept(data);
callbackPair.getRight().accept(topic, data);
}
return true;
}
public Map<String, List<org.apache.commons.lang3.tuple.Pair<String, Consumer<byte[]>>>> getAllCallbacks() {
public Map<String, List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>>> getAllCallbacks() {
return callbackList;
}
}
......
import org.apache.commons.lang3.tuple.Pair;
import org.ros.concurrent.CancellableLoop;
import org.ros.namespace.GraphName;
import org.ros.node.AbstractNodeMain;
import org.ros.node.ConnectedNode;
import org.ros.node.topic.Publisher;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import org.ros.node.*;
import org.ros.node.NodeMainExecutor;
import java.net.URI;
import java.net.URISyntaxException;
aspect RosHandler {
public class RosHandler {
public static RosHandler ROS_HANDLER_INSTANCE = null;
private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(RosHandler.class);
List<Pair<String, RosDefaultPublisher>> publishers = new ArrayList<>();
NodeMainExecutor nodeMainExecutor = DefaultNodeMainExecutor.newDefault();
NodeConfiguration nodeConfiguration = NodeConfiguration.newPublic("localhost");
private RosHandler(){
nodeConfiguration.setMasterUri(new URI("http://localhost:11311"));
}
public synchronized static RosHandler getInstance(){
if(ROS_HANDLER_INSTANCE == null){
ROS_HANDLER_INSTANCE = new RosHandler();
}
return ROS_HANDLER_INSTANCE;
}
public ConnectToken newPublisher(java.net.URI uri, String topic){
RosDefaultPublisher publisher = new RosDefaultPublisher(topic);
ConnectToken connectToken = new ConnectToken(uri);
new Thread(() -> {
nodeMainExecutor.execute(listener, nodeConfiguration);
}) {{
start();
}};
return connectToken;
}
}
public class RosDefaultPublisher extends AbstractNodeMain {
private String topic;
private Publisher<std_msgs.String> publisher;
public RosDefaultPublisher(String topic){
this.topic = topic;
}
@Override
public GraphName getDefaultNodeName() {
String nodeID = java.util.UUID.randomUUID().toString();
return GraphName.of("ragconnect/publisher/" + nodeID);
}
@Override
public void onStart(final ConnectedNode connectedNode) {
publisher = connectedNode.newPublisher(topic_name, std_msgs.String._TYPE);
}
public void publish(byte[] msgContent){
std_msgs.String msg = publisher.newMessage();
String s = new String(msgContent, StandardCharsets.UTF_8);
msg.setData(s);
publisher.publish(msg);
}
}
public class RosDefaultSubscriber extends AbstractNodeMain {
}
}
\ No newline at end of file
......@@ -145,7 +145,7 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{/usesMqtt}}
{{#usesJava}}
case "java": return {{javaHandlerAttribute}}().unregisterCallback(uri.getPath(), connectTokens.get(this).get(uri).id);
case "java": return {{javaHandlerAttribute}}().unregisterCallback(uri.getPath(), connectTokens.get(this).get(uri).globalId);
{{/usesJava}}
{{#usesRest}}
case "rest": return {{restHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
......
......@@ -38,6 +38,8 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
if (writeCurrentValue) {
{{writeMethod}}();
}
connectToken = new ConnectToken(uri);
break;
{{/usesJava}}
{{#usesRest}}
......@@ -85,7 +87,8 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
{{/usesMqtt}}
{{#usesJava}}
case "java":
// nothing todo, because senders are not registered (just callbacks)
{{sender}} = null;
{{lastValue}} = null;
break;
{{/usesJava}}
{{#usesRest}}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment