Commit fa6e929f authored by Sebastian Ebert's avatar Sebastian Ebert
Browse files

first working version of the ros backend

parent 44c64154
Pipeline #11984 failed with stages
in 7 minutes and 17 seconds
......@@ -7,6 +7,7 @@ aspect Configuration {
public static boolean ASTNode.usesMqtt;
public static boolean ASTNode.usesRest;
public static boolean ASTNode.usesJava;
public static boolean ASTNode.usesRos;
public static boolean ASTNode.incrementalOptionActive;
public static boolean ASTNode.experimentalJastAdd329;
}
......@@ -21,6 +21,9 @@ aspect AttributesForMustache {
syn String MRagConnect.javaHandlerAttribute() = "_javaHandler";
syn String MRagConnect.javaHandlerField() = "_javaHandler";
syn String MRagConnect.rosHandlerAttribute() = "_rosHandler";
syn String MRagConnect.rosHandlerField() = "_rosHandler";
syn boolean MRagConnect.hasTreeListEndpoints() = !sendingTreeListEndpoints().isEmpty() || !receivingTreeListEndpoints().isEmpty();
syn List<MTypeEndpointDefinition> MRagConnect.sendingTreeListEndpoints() {
List<MTypeEndpointDefinition> result = new ArrayList<>();
......@@ -259,6 +262,8 @@ aspect AttributesForMustache {
result.restHandlerAttribute(), result.restHandlerField(), usesRest));
result.addHandler(new MHandler("JavaHandler", "JavaHandler.getInstance()",
result.javaHandlerAttribute(), result.javaHandlerField(), usesJava));
result.addHandler(new MHandler("RosHandler", "RosHandler.getInstance()",
result.rosHandlerAttribute(), result.rosHandlerField(), usesRos));
return result;
}
......
......@@ -7,6 +7,7 @@ aspect MustacheNodesToYAML {
root.put("usesMqtt", usesMqtt);
root.put("usesRest", usesRest);
root.put("usesJava", usesJava);
root.put("usesRos", usesRos);
// mqtt
root.put("mqttHandlerField", mqttHandlerField());
......@@ -17,6 +18,10 @@ aspect MustacheNodesToYAML {
root.put("javaHandlerField", javaHandlerField());
root.put("javaHandlerAttribute", javaHandlerAttribute());
// ros
root.put("rosHandlerField", rosHandlerField());
root.put("rosHandlerAttribute", rosHandlerAttribute());
// rootTypeComponents
ListElement rootTypeComponents = new ListElement();
for (MTypeComponent comp : getRootTypeComponentList()) {
......
......@@ -33,6 +33,7 @@ public class Compiler extends AbstractCompiler {
private static final String OPTION_PROTOCOL_MQTT = "mqtt";
private static final String OPTION_PROTOCOL_REST = "rest";
private static final String OPTION_PROTOCOL_JAVA = "java";
private static final String OPTION_PROTOCOL_ROS = "ros";
private final static Logger LOGGER = Logger.getLogger(Compiler.class.getName());
......@@ -110,6 +111,9 @@ public class Compiler extends AbstractCompiler {
if (ASTNode.usesRest) {
handlers.add("RestHandler.jadd");
}
if (ASTNode.usesRos) {
handlers.add("RosHandler.jadd");
}
// copy handlers into outputDir
for (String handlerFileName : handlers) {
try {
......@@ -181,6 +185,7 @@ public class Compiler extends AbstractCompiler {
.acceptMultipleValues(true)
.addAcceptedValue(OPTION_PROTOCOL_JAVA, "Enable Java")
.addAcceptedValue(OPTION_PROTOCOL_REST, "Enable REST")
.addAcceptedValue(OPTION_PROTOCOL_ROS, "Enable ROS")
.addDefaultValue(OPTION_PROTOCOL_MQTT, "Enable MQTT")
);
optionPrintYaml = addOption(
......@@ -265,6 +270,7 @@ public class Compiler extends AbstractCompiler {
ASTNode.usesMqtt = optionProtocols.hasValue(OPTION_PROTOCOL_MQTT);
ASTNode.usesJava = optionProtocols.hasValue(OPTION_PROTOCOL_JAVA);
ASTNode.usesRest = optionProtocols.hasValue(OPTION_PROTOCOL_REST);
ASTNode.usesRos = optionProtocols.hasValue(OPTION_PROTOCOL_ROS);
return ragConnect;
}
......
import org.jastadd.ragconnect.ast.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
......@@ -15,8 +17,8 @@ aspect JavaHandler {
public static JavaHandler JAVA_HANDLER_INSTANCE = null;
private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(JavaHandler.class);
private Map<String, List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>>> callbackList = new ConcurrentHashMap<>();
Pair
private Map<String, List<Pair<String, BiConsumer<String, byte[]>>>> callbackList = new ConcurrentHashMap<>();
private JavaHandler() {
......@@ -35,10 +37,10 @@ aspect JavaHandler {
String callbackUUID = java.util.UUID.randomUUID().toString();
List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> registeredCallbacks = getAllCallbacks().get(topic);
List<Pair<String, BiConsumer<String, byte[]>>> registeredCallbacks = getAllCallbacks().get(topic);
if(registeredCallbacks == null){
List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> newCallbackList = Collections.synchronizedList(new ArrayList<>());
List<Pair<String, BiConsumer<String, byte[]>>> newCallbackList = Collections.synchronizedList(new ArrayList<>());
newCallbackList.add(new org.apache.commons.lang3.tuple.MutablePair<>(callbackUUID, callback));
callbackList.put(topic, newCallbackList);
} else {
......@@ -52,12 +54,12 @@ aspect JavaHandler {
logger.debug("[JAVA_HANDLER] Unregistering callback with uuid: " + uuid + " on path: " + path);
List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(path);
List<Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(path);
int count = 0;
if(callbacks != null){
for(org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){
for(Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){
if(callbackPair.getLeft().equals(uuid)){
callbacks.remove(count);
return true;
......@@ -79,14 +81,14 @@ aspect JavaHandler {
logger.info("[JAVA_HANDLER] Data: " + dataString);
List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(topic);
List<Pair<String, BiConsumer<String, byte[]>>> callbacks = getAllCallbacks().get(topic);
if(callbacks == null){
logger.error("[JAVA_HANDLER] Could not publish message. No callback registered for topic " + topic);
return false;
}
for(org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){
for(Pair<String, BiConsumer<String, byte[]>> callbackPair : callbacks){
logger.debug("[JAVA_HANDLER] Calling callback: " + callbackPair.getLeft());
callbackPair.getRight().accept(topic, data);
}
......@@ -94,7 +96,7 @@ aspect JavaHandler {
return true;
}
public Map<String, List<org.apache.commons.lang3.tuple.Pair<String, BiConsumer<String, byte[]>>>> getAllCallbacks() {
public Map<String, List<Pair<String, BiConsumer<String, byte[]>>>> getAllCallbacks() {
return callbackList;
}
}
......
import org.apache.commons.lang3.tuple.Pair;
import org.ros.concurrent.CancellableLoop;
import org.ros.namespace.GraphName;
import org.ros.node.AbstractNodeMain;
......@@ -10,6 +9,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow;
import java.util.function.BiConsumer;
import org.ros.node.*;
......@@ -17,6 +17,7 @@ import org.ros.node.NodeMainExecutor;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
aspect RosHandler {
......@@ -24,16 +25,30 @@ aspect RosHandler {
public static RosHandler ROS_HANDLER_INSTANCE = null;
private final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(RosHandler.class);
private final Map<String, RosDefaultPublisher> publishers = new HashMap<>();
private final java.util.Map<ConnectToken, RosDefaultSubscriber> tokensForRemoval = new java.util.HashMap<>();
NodeMainExecutor nodeMainExecutor = DefaultNodeMainExecutor.newDefault();
NodeConfiguration nodeConfiguration = NodeConfiguration.newPublic("localhost");
private final RosDefaultSubscriber subscriber = new RosDefaultSubscriber();
private final RosDefaultPublisher publisher = new RosDefaultPublisher();
private NodeMainExecutor nodeMainExecutor = DefaultNodeMainExecutor.newDefault();
private NodeConfiguration nodeConfiguration = NodeConfiguration.newPublic("localhost");
// TODO: replace this with a configurable URI
private RosHandler(){
nodeConfiguration.setMasterUri(new URI("http://localhost:11311"));
nodeConfiguration.setMasterUri(URI.create("http://localhost:11311"));
new Thread(() -> {
nodeMainExecutor.execute(subscriber, nodeConfiguration);
}) {{
start();
}};
new Thread(() -> {
nodeMainExecutor.execute(publisher, nodeConfiguration);
}) {{
start();
}};
}
public synchronized static RosHandler getInstance(){
......@@ -43,112 +58,116 @@ aspect RosHandler {
return ROS_HANDLER_INSTANCE;
}
public ConnectToken publish (java.net.URI uri, String topic, byte[] msg){
public void close(){};
RosDefaultPublisher publisher = null;
ConnectToken connectToken = new ConnectToken(uri);
if(!publishers.get(topic)){
publisher = newPublisher(uri, topic);
publishers.put(topic, publisher);
}else{
publisher = publishers.get(topic);
}
publisher.publish(msg);
return connectToken;
public void publish (java.net.URI uri, String topic, byte[] msg){
publisher.publish(extractTopic(uri), msg);
}
private RosDefaultPublisher newPublisher(java.net.URI uri, String topic){
RosDefaultPublisher publisher = new RosDefaultPublisher(topic);
public String extractTopic(java.net.URI uri){
new Thread(() -> {
nodeMainExecutor.execute(publisher, nodeConfiguration);
}) {{
start();
}};
return publisher;
int idx = uri.toString().indexOf("/", uri.toString().indexOf("/") + 1);
return uri.toString().substring(idx);
}
public ConnectToken newSubscriber(java.net.URI uri, java.util.function.BiConsumer<String, byte[]> callback){
int idx = uri.toString().indexOf("/", uri.toString().indexOf("/") + 1);
RosDefaultSubscriber subscriber = new RosDefaultSubscriber(uri.toString().substring(idx));
ConnectToken connectToken = new ConnectToken(uri);
new Thread(() -> {
nodeMainExecutor.execute(subscriber, nodeConfiguration);
}) {{
start();
}};
nodeMainExecutor.addListener(callback);
subscriber.addListener(extractTopic(uri), callback);
tokensForRemoval.put(connectToken, subscriber);
return connectToken;
}
public void disconnect(ConnectToken connectToken) {
tokensForRemoval.get(connectToken).removeAllMessageListeners();
public boolean disconnect(ConnectToken connectToken) {
tokensForRemoval.get(connectToken).removeAllMessageListeners(extractTopic(connectToken.uri));
tokensForRemoval.remove(connectToken);
return true;
}
}
public class RosDefaultPublisher extends AbstractNodeMain {
private String topic;
private volatile ConnectedNode node;
private Map<String, Publisher<std_msgs.String>> publishers = new HashMap<>();
private Publisher<std_msgs.String> publisher;
public RosDefaultPublisher(String topic){
this.topic = topic;
}
public RosDefaultPublisher(){}
@Override
public GraphName getDefaultNodeName() {
String nodeID = java.util.UUID.randomUUID().toString();
return GraphName.of("ragconnect/publisher/" + nodeID);
return GraphName.of("ragconnect/publisher/");
}
@Override
public void onStart(final ConnectedNode connectedNode) {
publisher = connectedNode.newPublisher(topic_name, std_msgs.String._TYPE);
System.out.println("Initializing publisher.");
this.node = connectedNode;
}
public void publish(byte[] msgContent){
std_msgs.String msg = publisher.newMessage();
String s = new String(msgContent, StandardCharsets.UTF_8);
msg.setData(s);
publisher.publish(msg);
public void publish(String topic, byte[] msgContent){
System.out.println("Publishing new message to " + topic);
Publisher<std_msgs.String> pub = publishers.get(topic);
if(pub == null){
while(true){
if(this.node != null){
break;
}
}
pub = this.node.newPublisher(topic, std_msgs.String._TYPE);
std_msgs.String msg = pub.newMessage();
String s = new String(msgContent, StandardCharsets.UTF_8);
msg.setData(s);
pub.publish(msg);
}
}
}
public class RosDefaultSubscriber extends AbstractNodeMain {
private String topic;
private volatile ConnectedNode node;
private Map<String, Subscriber<std_msgs.String>> subscribers = new HashMap<>();
private Subscriber<std_msgs.String> subscriber;
public RosDefaultSubscriber(String topic){
this.topic = topic;
}
public RosDefaultSubscriber(){}
@Override
public GraphName getDefaultNodeName() {
String nodeID = java.util.UUID.randomUUID().toString();
return GraphName.of("ragconnect/subscriber/" + nodeID);
return GraphName.of("ragconnect/subscriber");
}
@Override
public void onStart(final ConnectedNode connectedNode) {
subscriber = connectedNode.newSubscriber(topic_name, std_msgs.String._TYPE);
System.out.println("Initializing subscriber.");
this.node = connectedNode;
}
public void addListener(BiConsumer<String, byte[]> callback){
subscriber.addMessageListener(message -> callback.accept(topic, new String(message.getData(), StandardCharsets.UTF_8)));
public void addListener(String topic, BiConsumer<String, byte[]> callback){
System.out.println("Adding listener");
Subscriber<std_msgs.String> sub = subscribers.get(topic);
if(sub == null){
while(true){
if(this.node != null){
break;
}
}
sub = this.node.newSubscriber(topic, std_msgs.String._TYPE);
}
sub.addMessageListener(message -> callback.accept(topic, message.getData().getBytes()));
}
public void removeAllMessageListeners(String topic){
Subscriber<std_msgs.String> sub = subscribers.get(topic);
if(sub != null){
sub.removeAllMessageListeners();
}
}
}
}
\ No newline at end of file
......@@ -153,7 +153,7 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
case "mqtt": return {{mqttHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{/usesMqtt}}
{{#usesRos}}
case "mqtt": return {{rosHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
case "ros": return {{rosHandlerAttribute}}().disconnect(connectTokens.get(this).get(uri));
{{/usesRos}}
{{#usesJava}}
case "java": return {{javaHandlerAttribute}}().unregisterCallback(uri.getPath(), connectTokens.get(this).get(uri).globalId);
......
......@@ -27,6 +27,23 @@ public boolean {{parentTypeName}}.{{connectMethod}}(String {{connectParameterNam
connectToken = new ConnectToken(uri);
break;
{{/usesMqtt}}
{{#usesRos}}
case "ros":
final RosHandler handler = {{rosHandlerAttribute}}().getInstance();
final String topic = {{rosHandlerAttribute}}().extractTopic(uri);
{{sender}} = () -> {
handler.publish(uri, topic, {{lastValue}});
};
{{updateMethod}}();
if (writeCurrentValue) {
{{writeMethod}}();
}
connectToken = new ConnectToken(uri);
break;
{{/usesRos}}
{{#usesJava}}
case "java":
final JavaHandler handler = {{javaHandlerAttribute}}().getInstance();
......@@ -85,6 +102,12 @@ public boolean {{parentTypeName}}.{{disconnectMethod}}(String {{connectParameter
{{lastValue}} = null;
break;
{{/usesMqtt}}
{{#usesRos}}
case "mqtt":
{{sender}} = null;
{{lastValue}} = null;
break;
{{/usesRos}}
{{#usesJava}}
case "java":
{{sender}} = null;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment