Skip to content
Snippets Groups Projects
Commit 5f6c0d57 authored by Damon Kohler's avatar Damon Kohler
Browse files

Fix race condition in SubscriberFactory.

Fix race condition in ServiceFactory.
Change TopicParticipantManager to use GraphNames.
Change newServiceServer to throw an exception if the same service is created twice. Added test.
Add getServiceServer method for looking up an existing server by name.
Rename lookupService to lookupServiceUri to help distinguish it from getServiceServer.
Javadoc cleanups.
parent b99706a6
Branches
No related tags found
No related merge requests found
Showing
with 138 additions and 86 deletions
/*
* Copyright (C) 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.ros.exception;
/**
* @author damonkohler@google.com (Damon Kohler)
*/
public class DuplicateServiceException extends RosRuntimeException {
public DuplicateServiceException(final String message) {
super(message);
}
}
......@@ -18,8 +18,6 @@ package org.ros.internal.node;
import com.google.common.annotations.VisibleForTesting;
import org.ros.node.service.ServiceResponseBuilder;
import org.apache.commons.logging.Log;
import org.ros.concurrent.CancellableLoop;
import org.ros.concurrent.ListenerCollection;
......@@ -58,6 +56,7 @@ import org.ros.node.NodeConfiguration;
import org.ros.node.NodeListener;
import org.ros.node.parameter.ParameterTree;
import org.ros.node.service.ServiceClient;
import org.ros.node.service.ServiceResponseBuilder;
import org.ros.node.service.ServiceServer;
import org.ros.node.topic.Publisher;
import org.ros.node.topic.Subscriber;
......@@ -262,11 +261,39 @@ public class DefaultNode implements Node {
return newServiceServer(new GraphName(serviceName), serviceType, responseBuilder);
}
@SuppressWarnings("unchecked")
@Override
public <T, S> ServiceServer<T, S> getServiceServer(GraphName serviceName) {
return (ServiceServer<T, S>) serviceManager.getServer(serviceName);
}
@Override
public <T, S> ServiceServer<T, S> getServiceServer(String serviceName) {
return getServiceServer(new GraphName(serviceName));
}
@Override
public URI lookupServiceUri(GraphName serviceName) {
Response<URI> response =
masterClient.lookupService(slaveServer.toNodeIdentifier().getName(),
resolveName(serviceName).toString());
if (response.getStatusCode() == StatusCode.SUCCESS) {
return response.getResult();
} else {
return null;
}
}
@Override
public URI lookupServiceUri(String serviceName) {
return lookupServiceUri(new GraphName(serviceName));
}
@Override
public <T, S> ServiceClient<T, S> newServiceClient(GraphName serviceName, String serviceType)
throws ServiceNotFoundException {
GraphName resolvedServiceName = resolveName(serviceName);
URI uri = lookupService(resolvedServiceName);
URI uri = lookupServiceUri(resolvedServiceName);
if (uri == null) {
throw new ServiceNotFoundException("No such service " + resolvedServiceName + " of type "
+ serviceType);
......@@ -287,23 +314,6 @@ public class DefaultNode implements Node {
return newServiceClient(new GraphName(serviceName), serviceType);
}
@Override
public URI lookupService(GraphName serviceName) {
Response<URI> response =
masterClient.lookupService(slaveServer.toNodeIdentifier().getName(),
resolveName(serviceName).toString());
if (response.getStatusCode() == StatusCode.SUCCESS) {
return response.getResult();
} else {
return null;
}
}
@Override
public URI lookupService(String serviceName) {
return lookupService(new GraphName(serviceName));
}
@Override
public Time getCurrentTime() {
return nodeConfiguration.getTimeProvider().getCurrentTime();
......
......@@ -166,8 +166,9 @@ public class SlaveServer extends XmlRpcServer {
}
public void publisherUpdate(String callerId, String topicName, Collection<URI> publisherUris) {
if (topicParticipantManager.hasSubscriber(topicName)) {
DefaultSubscriber<?> subscriber = topicParticipantManager.getSubscriber(topicName);
GraphName graphName = new GraphName(topicName);
if (topicParticipantManager.hasSubscriber(graphName)) {
DefaultSubscriber<?> subscriber = topicParticipantManager.getSubscriber(graphName);
TopicDeclaration topicDeclaration = subscriber.getTopicDeclaration();
Collection<PublisherIdentifier> identifiers =
PublisherIdentifier.newCollectionFromUris(publisherUris, topicDeclaration);
......@@ -178,9 +179,9 @@ public class SlaveServer extends XmlRpcServer {
public ProtocolDescription requestTopic(String topicName, Collection<String> protocols)
throws ServerException {
// Canonicalize topic name.
topicName = new GraphName(topicName).toGlobal().toString();
if (!topicParticipantManager.hasPublisher(topicName)) {
throw new ServerException("No publishers for topic: " + topicName);
GraphName graphName = new GraphName(topicName).toGlobal();
if (!topicParticipantManager.hasPublisher(graphName)) {
throw new ServerException("No publishers for topic: " + graphName);
}
for (String protocol : protocols) {
if (protocol.equals(ProtocolNames.TCPROS)) {
......
......@@ -18,9 +18,9 @@ package org.ros.internal.node.service;
import com.google.common.base.Preconditions;
import org.ros.exception.DuplicateServiceException;
import org.ros.internal.message.service.ServiceDescription;
import org.ros.internal.node.server.SlaveServer;
import org.ros.internal.node.server.master.MasterServer;
import org.ros.message.MessageDeserializer;
import org.ros.message.MessageFactory;
import org.ros.message.MessageSerializer;
......@@ -52,10 +52,8 @@ public class ServiceFactory {
}
/**
* Gets or creates a {@link DefaultServiceServer} instance.
* {@link DefaultServiceServer}s are cached and reused per service. When a new
* {@link DefaultServiceServer} is generated, it is registered with the
* {@link MasterServer}.
* Creates a {@link DefaultServiceServer} instance and registers it with the
* master.
*
* @param serviceDeclaration
* the {@link ServiceDescription} that is being served
......@@ -69,30 +67,38 @@ public class ServiceFactory {
* a {@link MessageFactory} to be used for creating responses
* @return a {@link DefaultServiceServer} instance
*/
@SuppressWarnings("unchecked")
public <T, S> DefaultServiceServer<T, S> newServer(ServiceDeclaration serviceDeclaration,
ServiceResponseBuilder<T, S> responseBuilder, MessageDeserializer<T> deserializer,
MessageSerializer<S> serializer, MessageFactory messageFactory) {
DefaultServiceServer<T, S> serviceServer;
GraphName name = serviceDeclaration.getName();
boolean createdNewServer = false;
synchronized (serviceManager) {
if (serviceManager.hasServer(name)) {
serviceServer = (DefaultServiceServer<T, S>) serviceManager.getServer(name);
throw new DuplicateServiceException(String.format("ServiceServer %s already exists.", name));
} else {
serviceServer =
new DefaultServiceServer<T, S>(serviceDeclaration, responseBuilder,
slaveServer.getTcpRosAdvertiseAddress(), deserializer, serializer, messageFactory,
executorService);
createdNewServer = true;
serviceManager.addServer(serviceServer);
}
}
return serviceServer;
}
if (createdNewServer) {
serviceManager.addServer(serviceServer);
/**
* @param name
* the {@link GraphName} of the {@link DefaultServiceServer}
* @return the {@link DefaultServiceServer} with the given name or
* {@code null} if it does not exist
*/
@SuppressWarnings("unchecked")
public <T, S> DefaultServiceServer<T, S> getServer(GraphName name) {
if (serviceManager.hasServer(name)) {
return (DefaultServiceServer<T, S>) serviceManager.getServer(name);
}
return serviceServer;
return null;
}
/**
......@@ -127,12 +133,12 @@ public class ServiceFactory {
serviceClient =
DefaultServiceClient.newDefault(nodeName, serviceDeclaration, serializer, deserializer,
messageFactory, executorService);
serviceManager.addClient(serviceClient);
createdNewClient = true;
}
}
if (createdNewClient) {
serviceManager.addClient(serviceClient);
serviceClient.connect(serviceDeclaration.getUri());
}
return serviceClient;
......
......@@ -60,8 +60,7 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ
private static final TimeUnit DEFAULT_SHUTDOWN_TIMEOUT_UNITS = TimeUnit.SECONDS;
/**
* Queue of all messages being published by this {@link Publisher}
* .org.ros.message.MessageFactory
* Queue of all messages being published by this {@link Publisher}.
*/
private final OutgoingMessageQueue<T> outgoingMessageQueue;
private final ListenerCollection<PublisherListener<T>> listeners;
......@@ -216,7 +215,6 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ
/**
* Signal all {@link PublisherListener}s that the {@link Publisher} has
* successfully registered with the master.
*
* <p>
* Each listener is called in a separate thread.
*/
......@@ -234,7 +232,6 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ
/**
* Signal all {@link PublisherListener}s that the {@link Publisher} has failed
* to register with the master.
*
* <p>
* Each listener is called in a separate thread.
*/
......@@ -252,7 +249,6 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ
/**
* Signal all {@link PublisherListener}s that the {@link Publisher} has
* successfully unregistered with the master.
*
* <p>
* Each listener is called in a separate thread.
*/
......@@ -270,7 +266,6 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ
/**
* Signal all {@link PublisherListener}s that the {@link Publisher} has failed
* to unregister with the master.
*
* <p>
* Each listener is called in a separate thread.
*/
......
......@@ -19,6 +19,7 @@ package org.ros.internal.node.topic;
import org.ros.internal.node.server.NodeIdentifier;
import org.ros.message.MessageFactory;
import org.ros.message.MessageSerializer;
import org.ros.namespace.GraphName;
import org.ros.node.topic.DefaultPublisherListener;
import org.ros.node.topic.Publisher;
......@@ -61,7 +62,7 @@ public class PublisherFactory {
@SuppressWarnings("unchecked")
public <T> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration,
MessageSerializer<T> messageSerializer) {
String topicName = topicDeclaration.getName().toString();
GraphName topicName = topicDeclaration.getName();
synchronized (topicParticipantManager) {
if (topicParticipantManager.hasPublisher(topicName)) {
......
......@@ -18,6 +18,7 @@ package org.ros.internal.node.topic;
import org.ros.internal.node.server.NodeIdentifier;
import org.ros.message.MessageDeserializer;
import org.ros.namespace.GraphName;
import org.ros.node.topic.DefaultSubscriberListener;
import org.ros.node.topic.Subscriber;
......@@ -57,15 +58,13 @@ public class SubscriberFactory {
@SuppressWarnings("unchecked")
public <T> Subscriber<T> newOrExisting(TopicDeclaration topicDeclaration,
MessageDeserializer<T> messageDeserializer) {
String topicName = topicDeclaration.getName().toString();
DefaultSubscriber<T> subscriber;
boolean createdNewSubscriber = false;
GraphName topicName = topicDeclaration.getName();
synchronized (topicParticipantManager) {
if (topicParticipantManager.hasSubscriber(topicName)) {
subscriber = (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName);
return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName);
} else {
subscriber =
DefaultSubscriber<T> subscriber =
DefaultSubscriber.newDefault(nodeIdentifier, topicDeclaration, executorService,
messageDeserializer);
subscriber.addSubscriberListener(new DefaultSubscriberListener<T>() {
......@@ -81,13 +80,9 @@ public class SubscriberFactory {
topicParticipantManager.removeSubscriber((DefaultSubscriber<T>) subscriber);
}
});
createdNewSubscriber = true;
}
}
if (createdNewSubscriber) {
topicParticipantManager.addSubscriber(subscriber);
}
return subscriber;
}
}
}
}
......@@ -72,20 +72,20 @@ public class TopicParticipantManager {
this.listener = listener;
}
public boolean hasSubscriber(String topicName) {
return subscribers.containsKey(new GraphName(topicName));
public boolean hasSubscriber(GraphName topicName) {
return subscribers.containsKey(topicName);
}
public boolean hasPublisher(String topicName) {
return publishers.containsKey(new GraphName(topicName));
public boolean hasPublisher(GraphName topicName) {
return publishers.containsKey(topicName);
}
public DefaultPublisher<?> getPublisher(String topicName) {
return publishers.get(new GraphName(topicName));
public DefaultPublisher<?> getPublisher(GraphName topicName) {
return publishers.get(topicName);
}
public DefaultSubscriber<?> getSubscriber(String topicName) {
return subscribers.get(new GraphName(topicName));
public DefaultSubscriber<?> getSubscriber(GraphName topicName) {
return subscribers.get(topicName);
}
public void addPublisher(DefaultPublisher<?> publisher) {
......
......@@ -92,7 +92,7 @@ public class TcpServerHandshakeHandler extends SimpleChannelHandler {
Exception {
Preconditions.checkState(incomingHeader.containsKey(ConnectionHeaderFields.TOPIC),
"Handshake header missing field: " + ConnectionHeaderFields.TOPIC);
String topicName = incomingHeader.get(ConnectionHeaderFields.TOPIC);
GraphName topicName = new GraphName(incomingHeader.get(ConnectionHeaderFields.TOPIC));
Preconditions.checkState(topicParticipantManager.hasPublisher(topicName),
"No publisher for topic: " + topicName);
DefaultPublisher<?> publisher = topicParticipantManager.getPublisher(topicName);
......@@ -104,7 +104,7 @@ public class TcpServerHandshakeHandler extends SimpleChannelHandler {
}
String nodeName = incomingHeader.get(ConnectionHeaderFields.CALLER_ID);
publisher.addSubscriber(new SubscriberIdentifier(NodeIdentifier.forName(nodeName),
TopicIdentifier.forName(topicName)), channel);
new TopicIdentifier(topicName)), channel);
// Once the handshake is complete, there will be nothing incoming on the
// channel. So, we replace the handshake handler with a handler which will
......
......@@ -16,8 +16,6 @@
package org.ros.node;
import org.ros.node.service.ServiceResponseBuilder;
import org.apache.commons.logging.Log;
import org.ros.concurrent.CancellableLoop;
import org.ros.exception.ServiceNotFoundException;
......@@ -30,6 +28,7 @@ import org.ros.namespace.NameResolver;
import org.ros.namespace.NodeNameResolver;
import org.ros.node.parameter.ParameterTree;
import org.ros.node.service.ServiceClient;
import org.ros.node.service.ServiceResponseBuilder;
import org.ros.node.service.ServiceServer;
import org.ros.node.topic.Publisher;
import org.ros.node.topic.Subscriber;
......@@ -150,7 +149,7 @@ public interface Node {
<T> Subscriber<T> newSubscriber(String topicName, String messageType);
/**
* Create a {@link ServiceServer}.
* Create a new {@link ServiceServer}.
*
* @param serviceName
* the name of the service
......@@ -169,6 +168,31 @@ public interface Node {
<T, S> ServiceServer<T, S> newServiceServer(String serviceName, String serviceType,
ServiceResponseBuilder<T, S> serviceResponseBuilder);
/**
* @param serviceName
* the {@link GraphName} of the {@link ServiceServer}
* @return the {@link ServiceServer} with the given name or {@code null} if it
* does not exist
*/
<T, S> ServiceServer<T, S> getServiceServer(GraphName serviceName);
/**
* @see Node#getServiceServer(GraphName)
*/
<T, S> ServiceServer<T, S> getServiceServer(String serviceName);
/**
* @param serviceName
* the {@link GraphName} of the service {@link URI} to lookup
* @return the {@link URI} of the service or {@code null} if it does not exist
*/
URI lookupServiceUri(GraphName serviceName);
/**
* @see #lookupServiceUri(GraphName)
*/
URI lookupServiceUri(String serviceName);
/**
* Create a {@link ServiceClient}.
*
......@@ -189,19 +213,6 @@ public interface Node {
<T, S> ServiceClient<T, S> newServiceClient(String serviceName, String serviceType)
throws ServiceNotFoundException;
/**
* @param serviceName
* the {@link GraphName} of the service to lookup
* @return {@link URI} of the service or {@code null} if the service does not
* exist
*/
URI lookupService(GraphName serviceName);
/**
* @see #lookupService(GraphName)
*/
URI lookupService(String serviceName);
/**
* Create a {@link ParameterTree} to query and set parameters on the ROS
* parameter server.
......
......@@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
import org.junit.Test;
import org.ros.RosTest;
import org.ros.exception.DuplicateServiceException;
import org.ros.exception.RemoteException;
import org.ros.exception.RosRuntimeException;
import org.ros.exception.ServiceException;
......@@ -58,6 +59,12 @@ public class ServiceIntegrationTest extends RosTest {
response.setSum(request.getA() + request.getB());
}
});
try {
node.newServiceServer(SERVICE_NAME, test_ros.AddTwoInts._TYPE, null);
fail();
} catch (DuplicateServiceException e) {
// Only one ServiceServer with a given name can be created.
}
serviceServer.addListener(countDownServiceServerListener);
}
......@@ -75,7 +82,7 @@ public class ServiceIntegrationTest extends RosTest {
}
}, nodeConfiguration);
countDownServiceServerListener.awaitMasterRegistrationSuccess(1, TimeUnit.SECONDS);
assertTrue(countDownServiceServerListener.awaitMasterRegistrationSuccess(1, TimeUnit.SECONDS));
final CountDownLatch latch = new CountDownLatch(1);
nodeMainExecutor.execute(new NodeMain() {
......@@ -162,7 +169,7 @@ public class ServiceIntegrationTest extends RosTest {
}
}, nodeConfiguration);
countDownServiceServerListener.awaitMasterRegistrationSuccess(1, TimeUnit.SECONDS);
assertTrue(countDownServiceServerListener.awaitMasterRegistrationSuccess(1, TimeUnit.SECONDS));
final CountDownLatch latch = new CountDownLatch(1);
nodeMainExecutor.execute(new NodeMain() {
......
......@@ -32,5 +32,4 @@ public class RosRuntimeException extends RuntimeException {
public RosRuntimeException(final String message) {
super(message);
}
}
......@@ -38,7 +38,7 @@ public class Talker implements NodeMain {
public void onStart(final Node node) {
final Publisher<std_msgs.String> publisher =
node.newPublisher("chatter", std_msgs.String._TYPE);
// This CancellableLoop will be canceled automatically when the Node shuts
// This CancellableLoop will be canceled automatically when the node shuts
// down.
node.executeCancellableLoop(new CancellableLoop() {
private int sequenceNumber;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment