diff --git a/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java b/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java index 28e7e2d54a56f5545941fedc7fe89a486c8abdee..c961b688d75891e11733a7b1fb58200d668e13fb 100644 --- a/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java +++ b/rosjava/src/main/java/org/ros/internal/node/DefaultNode.java @@ -65,6 +65,7 @@ import org.ros.node.topic.DefaultPublisherListener; import org.ros.node.topic.DefaultSubscriberListener; import org.ros.node.topic.Publisher; import org.ros.node.topic.Subscriber; +import org.ros.node.topic.TransportHints; import org.ros.time.ClockTopicTimeProvider; import org.ros.time.TimeProvider; @@ -283,7 +284,7 @@ public class DefaultNode implements ConnectedNode { TopicDescription topicDescription = nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType); TopicDeclaration topicDeclaration = - TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription); + TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, null); org.ros.message.MessageSerializer<T> serializer = newMessageSerializer(messageType); return publisherFactory.newOrExisting(topicDeclaration, serializer); } @@ -295,11 +296,16 @@ public class DefaultNode implements ConnectedNode { @Override public <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType) { + return newSubscriber(topicName, messageType, null); + } + + @Override + public <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType, TransportHints transportHints) { GraphName resolvedTopicName = resolveName(topicName); TopicDescription topicDescription = nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType); TopicDeclaration topicDeclaration = - TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription); + TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, transportHints); MessageDeserializer<T> deserializer = newMessageDeserializer(messageType); Subscriber<T> subscriber = subscriberFactory.newOrExisting(topicDeclaration, deserializer); return subscriber; @@ -307,7 +313,12 @@ public class DefaultNode implements ConnectedNode { @Override public <T> Subscriber<T> newSubscriber(String topicName, String messageType) { - return newSubscriber(GraphName.of(topicName), messageType); + return newSubscriber(GraphName.of(topicName), messageType, null); + } + + @Override + public <T> Subscriber<T> newSubscriber(String topicName, String messageType, TransportHints transportHints) { + return newSubscriber(GraphName.of(topicName), messageType, transportHints); } @Override diff --git a/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java b/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java index 6422ce0e98158a610c86d8cfea322888d9c59127..eb0ede2290d54f738baa7352f44a8cde16ed39b3 100644 --- a/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/response/TopicListResultFactory.java @@ -43,7 +43,7 @@ public class TopicListResultFactory implements ResultFactory<List<TopicDeclarati String name = (String) ((Object[]) topic)[0]; String type = (String) ((Object[]) topic)[1]; descriptions.add(TopicDeclaration.newFromTopicName(GraphName.of(name), new TopicDescription(type, null, - null))); + null), null)); } return descriptions; } diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java b/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java index c55603fb36b83428b1a489558866dc5fe4c596a9..4908c611029182ff103ed0d78add80cbffa58872 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/TopicDeclaration.java @@ -23,6 +23,7 @@ import org.ros.internal.message.topic.TopicDescription; import org.ros.internal.transport.ConnectionHeader; import org.ros.internal.transport.ConnectionHeaderFields; import org.ros.namespace.GraphName; +import org.ros.node.topic.TransportHints; import java.util.List; import java.util.Map; @@ -36,6 +37,7 @@ public class TopicDeclaration { private final TopicIdentifier topicIdentifier; private final TopicDescription topicDescription; + private final TransportHints transportHints; /** * @param header @@ -49,19 +51,26 @@ public class TopicDeclaration { String definition = header.get(ConnectionHeaderFields.MESSAGE_DEFINITION); String md5Checksum = header.get(ConnectionHeaderFields.MD5_CHECKSUM); TopicDescription topicDescription = new TopicDescription(type, definition, md5Checksum); - return new TopicDeclaration(new TopicIdentifier(name), topicDescription); + boolean tcpNoDelay = "1".equals(header.get(ConnectionHeaderFields.TCP_NODELAY)); + return new TopicDeclaration(new TopicIdentifier(name), topicDescription, new TransportHints(tcpNoDelay)); } public static TopicDeclaration newFromTopicName(GraphName topicName, - TopicDescription topicDescription) { - return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription); + TopicDescription topicDescription, TransportHints transportHints) { + return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription, transportHints); } - public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription) { + public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription, TransportHints transportHints) { Preconditions.checkNotNull(topicIdentifier); Preconditions.checkNotNull(topicDescription); this.topicIdentifier = topicIdentifier; this.topicDescription = topicDescription; + + if (transportHints != null) { + this.transportHints = transportHints; + } else { + this.transportHints = new TransportHints(); + } } public TopicIdentifier getIdentifier() { @@ -84,6 +93,7 @@ public class TopicDeclaration { topicDescription.getDefinition()); connectionHeader.addField(ConnectionHeaderFields.MD5_CHECKSUM, topicDescription.getMd5Checksum()); + connectionHeader.addField(ConnectionHeaderFields.TCP_NODELAY, transportHints.getTcpNoDelay() ? "1" : "0"); return connectionHeader; } diff --git a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java index f5f1a149e1def482336c1dee9b51c7b53fbbf413..6f534d585e9cb571a31472b11f5774f3431d8fdb 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java +++ b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpServerHandshakeHandler.java @@ -96,6 +96,10 @@ public class TcpServerHandshakeHandler extends SimpleChannelHandler { DefaultPublisher<?> publisher = topicParticipantManager.getPublisher(topicName); ChannelBuffer outgoingBuffer = publisher.finishHandshake(incomingConnectionHeader); Channel channel = ctx.getChannel(); + if (incomingConnectionHeader.hasField(ConnectionHeaderFields.TCP_NODELAY)) { + boolean tcpNoDelay = "1".equals(incomingConnectionHeader.getField(ConnectionHeaderFields.TCP_NODELAY)); + channel.getConfig().setOption("tcpNoDelay", tcpNoDelay); + } ChannelFuture future = channel.write(outgoingBuffer).await(); if (!future.isSuccess()) { throw new RosRuntimeException(future.getCause()); diff --git a/rosjava/src/main/java/org/ros/node/ConnectedNode.java b/rosjava/src/main/java/org/ros/node/ConnectedNode.java index 34d6994c1edbf6d682353b574272d806d776e955..4dfa6606d43a426d86a9ddcc14c25165815cf3bd 100644 --- a/rosjava/src/main/java/org/ros/node/ConnectedNode.java +++ b/rosjava/src/main/java/org/ros/node/ConnectedNode.java @@ -26,6 +26,7 @@ import org.ros.node.service.ServiceResponseBuilder; import org.ros.node.service.ServiceServer; import org.ros.node.topic.Publisher; import org.ros.node.topic.Subscriber; +import org.ros.node.topic.TransportHints; import java.net.URI; @@ -82,11 +83,29 @@ public interface ConnectedNode extends Node { */ <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType); + /** + * @param <T> + * the message type to create the {@link Subscriber} for + * @param topicName + * the topic name to be subscribed to, this will be auto resolved + * @param messageType + * the message data type (e.g. "std_msgs/String") + * @param transportHints + * the transport hints + * @return a {@link Subscriber} for the specified topic + */ + <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType, TransportHints transportHints); + /** * @see #newSubscriber(GraphName, String) */ <T> Subscriber<T> newSubscriber(String topicName, String messageType); + /** + * @see #newSubscriber(GraphName, String, TransportHints) + */ + <T> Subscriber<T> newSubscriber(String topicName, String messageType, TransportHints transportHints); + /** * Create a new {@link ServiceServer}. * diff --git a/rosjava/src/main/java/org/ros/node/topic/TransportHints.java b/rosjava/src/main/java/org/ros/node/topic/TransportHints.java new file mode 100644 index 0000000000000000000000000000000000000000..97ac02ede3e13151818cf48ba3c2829489c56856 --- /dev/null +++ b/rosjava/src/main/java/org/ros/node/topic/TransportHints.java @@ -0,0 +1,33 @@ +package org.ros.node.topic; + +import org.ros.node.ConnectedNode; + + +/** + * Provides a way of specifying network transport hints to + * {@link ConnectedNode#newSubscriber(org.ros.namespace.GraphName, String, TransportHints)} and + * {@link ConnectedNode#newSubscriber(String, String, TransportHints)}. + * + * @author stefan.glaser@hs-offenburg.de (Stefan Glaser) + */ +public class TransportHints { + + private boolean tcpNoDelay; + + public TransportHints() { + this(false); + } + + public TransportHints(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public TransportHints tcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + return this; + } + + public boolean getTcpNoDelay() { + return tcpNoDelay; + } +}