From 127191334117e45dd478c2b520ad8351b80dfa5b Mon Sep 17 00:00:00 2001 From: Damon Kohler <damonkohler@google.com> Date: Mon, 6 Aug 2012 11:44:37 +0200 Subject: [PATCH] Fixes locking anti-patterns including double check locking and locking on externally accessible objects. --- .../org/ros/concurrent/CancellableLoop.java | 9 ++++- .../concurrent/RetryingExecutorService.java | 37 +++++++++++-------- .../internal/node/service/ServiceFactory.java | 6 ++- .../node/topic/DefaultSubscriber.java | 25 +++++++------ .../internal/node/topic/PublisherFactory.java | 5 ++- .../node/topic/SubscriberFactory.java | 4 +- .../internal/transport/queue/LazyMessage.java | 12 ++---- .../transport/queue/OutgoingMessageQueue.java | 18 ++++++--- .../uri/SwitchableMasterUriProvider.java | 25 ++++++++----- 9 files changed, 84 insertions(+), 57 deletions(-) diff --git a/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java b/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java index d04e5edc..fa6bc0ba 100644 --- a/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java +++ b/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java @@ -26,6 +26,9 @@ import java.util.concurrent.ExecutorService; * @author khughes@google.com (Keith M. Hughes) */ public abstract class CancellableLoop implements Runnable { + + private final Object mutex; + /** * {@code true} if the code has been run once, {@code false} otherwise. */ @@ -36,9 +39,13 @@ public abstract class CancellableLoop implements Runnable { */ private Thread thread; + public CancellableLoop() { + mutex = new Object(); + } + @Override public void run() { - synchronized (this) { + synchronized (mutex) { Preconditions.checkState(!ranOnce, "CancellableLoops cannot be restarted."); ranOnce = true; thread = Thread.currentThread(); diff --git a/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java b/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java index 36a00057..7a36c360 100644 --- a/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java +++ b/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java @@ -35,7 +35,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** - * Wraps an {@link ScheduledExecutorService} to execute {@link Callable}s with retries. + * Wraps an {@link ScheduledExecutorService} to execute {@link Callable}s with + * retries. * * @author damonkohler@google.com (Damon Kohler) */ @@ -47,11 +48,12 @@ public class RetryingExecutorService { private static final long DEFAULT_RETRY_DELAY = 5; private static final TimeUnit DEFAULT_RETRY_TIME_UNIT = TimeUnit.SECONDS; + private final ScheduledExecutorService scheduledExecutorService; + private final RetryLoop retryLoop; private final Map<Callable<Boolean>, CountDownLatch> latches; private final Map<Future<Boolean>, Callable<Boolean>> callables; private final CompletionService<Boolean> completionService; - private final RetryLoop retryLoop; - private final ScheduledExecutorService executorService; + private final Object mutex; private long retryDelay; private TimeUnit retryTimeUnit; @@ -72,7 +74,7 @@ public class RetryingExecutorService { if (DEBUG) { log.info("Retry requested."); } - executorService.schedule(new Runnable() { + scheduledExecutorService.schedule(new Runnable() { @Override public void run() { submit(callable); @@ -85,20 +87,21 @@ public class RetryingExecutorService { } /** - * @param executorService + * @param scheduledExecutorService * the {@link ExecutorService} to wrap */ - public RetryingExecutorService(ScheduledExecutorService executorService) { - this.executorService = executorService; + public RetryingExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = scheduledExecutorService; retryLoop = new RetryLoop(); latches = Maps.newConcurrentMap(); callables = Maps.newConcurrentMap(); - completionService = new ExecutorCompletionService<Boolean>(executorService); + completionService = new ExecutorCompletionService<Boolean>(scheduledExecutorService); + mutex = new Object(); retryDelay = DEFAULT_RETRY_DELAY; retryTimeUnit = DEFAULT_RETRY_TIME_UNIT; running = true; // TODO(damonkohler): Unify this with the passed in ExecutorService. - executorService.execute(retryLoop); + scheduledExecutorService.execute(retryLoop); } /** @@ -111,13 +114,15 @@ public class RetryingExecutorService { * @throws RejectedExecutionException * if the {@link RetryingExecutorService} is shutting down */ - public synchronized void submit(Callable<Boolean> callable) { - if (running) { - Future<Boolean> future = completionService.submit(callable); - latches.put(callable, new CountDownLatch(1)); - callables.put(future, callable); - } else { - throw new RejectedExecutionException(); + public void submit(Callable<Boolean> callable) { + synchronized (mutex) { + if (running) { + Future<Boolean> future = completionService.submit(callable); + latches.put(callable, new CountDownLatch(1)); + callables.put(future, callable); + } else { + throw new RejectedExecutionException(); + } } } diff --git a/rosjava/src/main/java/org/ros/internal/node/service/ServiceFactory.java b/rosjava/src/main/java/org/ros/internal/node/service/ServiceFactory.java index 9662dc21..ae2cdb2d 100644 --- a/rosjava/src/main/java/org/ros/internal/node/service/ServiceFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/service/ServiceFactory.java @@ -42,6 +42,7 @@ public class ServiceFactory { private final SlaveServer slaveServer; private final ServiceManager serviceManager; private final ScheduledExecutorService executorService; + private final Object mutex; public ServiceFactory(GraphName nodeName, SlaveServer slaveServer, ServiceManager serviceManager, ScheduledExecutorService executorService) { @@ -49,6 +50,7 @@ public class ServiceFactory { this.slaveServer = slaveServer; this.serviceManager = serviceManager; this.executorService = executorService; + mutex = new Object(); } /** @@ -73,7 +75,7 @@ public class ServiceFactory { DefaultServiceServer<T, S> serviceServer; GraphName name = serviceDeclaration.getName(); - synchronized (serviceManager) { + synchronized (mutex) { if (serviceManager.hasServer(name)) { throw new DuplicateServiceException(String.format("ServiceServer %s already exists.", name)); } else { @@ -126,7 +128,7 @@ public class ServiceFactory { GraphName name = serviceDeclaration.getName(); boolean createdNewClient = false; - synchronized (serviceManager) { + synchronized (mutex) { if (serviceManager.hasClient(name)) { serviceClient = (DefaultServiceClient<T, S>) serviceManager.getClient(name); } else { diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java index a5b70dde..1ec943d4 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java @@ -62,6 +62,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub private final IncomingMessageQueue<T> incomingMessageQueue; private final Set<PublisherIdentifier> knownPublishers; private final TcpClientManager tcpClientManager; + private final Object mutex; /** * Manages the {@link SubscriberListener}s for this {@link Subscriber}. @@ -82,6 +83,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub incomingMessageQueue = new IncomingMessageQueue<T>(deserializer, executorService); knownPublishers = Sets.newHashSet(); tcpClientManager = new TcpClientManager(executorService); + mutex = new Object(); SubscriberHandshakeHandler<T> subscriberHandshakeHandler = new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(), incomingMessageQueue, executorService); @@ -138,18 +140,19 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub } @VisibleForTesting - public synchronized void addPublisher(PublisherIdentifier publisherIdentifier, - InetSocketAddress address) { - // TODO(damonkohler): If the connection is dropped, knownPublishers should - // be updated. - if (knownPublishers.contains(publisherIdentifier)) { - return; + public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) { + synchronized (mutex) { + // TODO(damonkohler): If the connection is dropped, knownPublishers should + // be updated. + if (knownPublishers.contains(publisherIdentifier)) { + return; + } + tcpClientManager.connect(toString(), address); + // TODO(damonkohler): knownPublishers is duplicate information that is + // already available to the TopicParticipantManager. + knownPublishers.add(publisherIdentifier); + signalOnNewPublisher(publisherIdentifier); } - tcpClientManager.connect(toString(), address); - // TODO(damonkohler): knownPublishers is duplicate information that is - // already available to the TopicParticipantManager. - knownPublishers.add(publisherIdentifier); - signalOnNewPublisher(publisherIdentifier); } /** diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/PublisherFactory.java b/rosjava/src/main/java/org/ros/internal/node/topic/PublisherFactory.java index 8f27d1a7..0bed2e7e 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/PublisherFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/PublisherFactory.java @@ -36,6 +36,7 @@ public class PublisherFactory { private final MessageFactory messageFactory; private final ScheduledExecutorService executorService; private final NodeIdentifier nodeIdentifier; + private final Object mutex; public PublisherFactory(NodeIdentifier nodeIdentifier, TopicParticipantManager topicParticipantManager, MessageFactory messageFactory, @@ -44,6 +45,7 @@ public class PublisherFactory { this.topicParticipantManager = topicParticipantManager; this.messageFactory = messageFactory; this.executorService = executorService; + mutex = new Object(); } /** @@ -63,8 +65,7 @@ public class PublisherFactory { public <T> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration, MessageSerializer<T> messageSerializer) { GraphName topicName = topicDeclaration.getName(); - - synchronized (topicParticipantManager) { + synchronized (mutex) { if (topicParticipantManager.hasPublisher(topicName)) { return (DefaultPublisher<T>) topicParticipantManager.getPublisher(topicName); } else { diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java b/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java index 82705c8e..efb35eef 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java @@ -34,12 +34,14 @@ public class SubscriberFactory { private final NodeIdentifier nodeIdentifier; private final TopicParticipantManager topicParticipantManager; private final ScheduledExecutorService executorService; + private final Object mutex; public SubscriberFactory(NodeIdentifier nodeIdentifier, TopicParticipantManager topicParticipantManager, ScheduledExecutorService executorService) { this.nodeIdentifier = nodeIdentifier; this.topicParticipantManager = topicParticipantManager; this.executorService = executorService; + mutex = new Object(); } /** @@ -60,7 +62,7 @@ public class SubscriberFactory { MessageDeserializer<T> messageDeserializer) { GraphName topicName = topicDeclaration.getName(); - synchronized (topicParticipantManager) { + synchronized (mutex) { if (topicParticipantManager.hasSubscriber(topicName)) { return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName); } else { diff --git a/rosjava/src/main/java/org/ros/internal/transport/queue/LazyMessage.java b/rosjava/src/main/java/org/ros/internal/transport/queue/LazyMessage.java index 1be527b9..d2e18131 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/queue/LazyMessage.java +++ b/rosjava/src/main/java/org/ros/internal/transport/queue/LazyMessage.java @@ -54,9 +54,7 @@ public class LazyMessage<T> { @VisibleForTesting LazyMessage(T message) { - buffer = null; - deserializer = null; - mutex = null; + this(null, null); this.message = message; } @@ -64,13 +62,11 @@ public class LazyMessage<T> { * @return the deserialized message */ public T get() { - if (message != null) { - return message; - } synchronized (mutex) { - if (message == null) { - message = deserializer.deserialize(buffer); + if (message != null) { + return message; } + message = deserializer.deserialize(buffer); } return message; } diff --git a/rosjava/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java b/rosjava/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java index f9f3f8c3..19f8f259 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java +++ b/rosjava/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java @@ -50,6 +50,7 @@ public class OutgoingMessageQueue<T> { private final Writer writer; private final MessageBufferPool messageBufferPool; private final ChannelBuffer latchedBuffer; + private final Object mutex; private boolean latchMode; private T latchedMessage; @@ -84,6 +85,7 @@ public class OutgoingMessageQueue<T> { writer = new Writer(); messageBufferPool = new MessageBufferPool(); latchedBuffer = MessageBuffers.dynamicBuffer(); + mutex = new Object(); latchMode = false; executorService.execute(writer); } @@ -105,8 +107,10 @@ public class OutgoingMessageQueue<T> { setLatchedMessage(message); } - private synchronized void setLatchedMessage(T message) { - latchedMessage = message; + private void setLatchedMessage(T message) { + synchronized (mutex) { + latchedMessage = message; + } } /** @@ -134,10 +138,12 @@ public class OutgoingMessageQueue<T> { // TODO(damonkohler): Avoid re-serializing the latched message if it hasn't // changed. - private synchronized void writeLatchedMessage(Channel channel) { - latchedBuffer.clear(); - serializer.serialize(latchedMessage, latchedBuffer); - channel.write(latchedBuffer); + private void writeLatchedMessage(Channel channel) { + synchronized (mutex) { + latchedBuffer.clear(); + serializer.serialize(latchedMessage, latchedBuffer); + channel.write(latchedBuffer); + } } /** diff --git a/rosjava/src/main/java/org/ros/master/uri/SwitchableMasterUriProvider.java b/rosjava/src/main/java/org/ros/master/uri/SwitchableMasterUriProvider.java index dce3da7b..45248b76 100644 --- a/rosjava/src/main/java/org/ros/master/uri/SwitchableMasterUriProvider.java +++ b/rosjava/src/main/java/org/ros/master/uri/SwitchableMasterUriProvider.java @@ -35,6 +35,8 @@ import java.util.concurrent.TimeUnit; */ public class SwitchableMasterUriProvider implements MasterUriProvider { + private final Object mutex; + /** * The current provider in use. */ @@ -51,6 +53,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { */ public SwitchableMasterUriProvider(MasterUriProvider provider) { this.provider = provider; + mutex = new Object(); } @Override @@ -58,7 +61,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { MasterUriProvider providerToUse = null; ProviderRequest requestToUse = null; - synchronized (this) { + synchronized (mutex) { if (provider != null) { providerToUse = provider; } else { @@ -80,7 +83,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { // seems appropriate to wait rather than to return immediately. MasterUriProvider providerToUse = null; - synchronized (this) { + synchronized (mutex) { if (provider != null) { providerToUse = provider; } @@ -105,15 +108,17 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { * @param switcher * the new provider */ - public synchronized void switchProvider(MasterUriProviderSwitcher switcher) { - MasterUriProvider oldProvider = provider; - provider = switcher.switchProvider(oldProvider); - - if (oldProvider == null) { - for (ProviderRequest request : pending) { - request.setProvider(provider); + public void switchProvider(MasterUriProviderSwitcher switcher) { + synchronized (mutex) { + MasterUriProvider oldProvider = provider; + provider = switcher.switchProvider(oldProvider); + + if (oldProvider == null) { + for (ProviderRequest request : pending) { + request.setProvider(provider); + } + pending.clear(); } - pending.clear(); } } -- GitLab