Skip to content
Snippets Groups Projects
Commit 12719133 authored by Damon Kohler's avatar Damon Kohler
Browse files

Fixes locking anti-patterns including double check locking and locking on...

Fixes locking anti-patterns including double check locking and locking on externally accessible objects.
parent 726f086f
No related branches found
No related tags found
No related merge requests found
Showing
with 84 additions and 57 deletions
...@@ -26,6 +26,9 @@ import java.util.concurrent.ExecutorService; ...@@ -26,6 +26,9 @@ import java.util.concurrent.ExecutorService;
* @author khughes@google.com (Keith M. Hughes) * @author khughes@google.com (Keith M. Hughes)
*/ */
public abstract class CancellableLoop implements Runnable { public abstract class CancellableLoop implements Runnable {
private final Object mutex;
/** /**
* {@code true} if the code has been run once, {@code false} otherwise. * {@code true} if the code has been run once, {@code false} otherwise.
*/ */
...@@ -36,9 +39,13 @@ public abstract class CancellableLoop implements Runnable { ...@@ -36,9 +39,13 @@ public abstract class CancellableLoop implements Runnable {
*/ */
private Thread thread; private Thread thread;
public CancellableLoop() {
mutex = new Object();
}
@Override @Override
public void run() { public void run() {
synchronized (this) { synchronized (mutex) {
Preconditions.checkState(!ranOnce, "CancellableLoops cannot be restarted."); Preconditions.checkState(!ranOnce, "CancellableLoops cannot be restarted.");
ranOnce = true; ranOnce = true;
thread = Thread.currentThread(); thread = Thread.currentThread();
......
...@@ -35,7 +35,8 @@ import java.util.concurrent.ScheduledExecutorService; ...@@ -35,7 +35,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* Wraps an {@link ScheduledExecutorService} to execute {@link Callable}s with retries. * Wraps an {@link ScheduledExecutorService} to execute {@link Callable}s with
* retries.
* *
* @author damonkohler@google.com (Damon Kohler) * @author damonkohler@google.com (Damon Kohler)
*/ */
...@@ -47,11 +48,12 @@ public class RetryingExecutorService { ...@@ -47,11 +48,12 @@ public class RetryingExecutorService {
private static final long DEFAULT_RETRY_DELAY = 5; private static final long DEFAULT_RETRY_DELAY = 5;
private static final TimeUnit DEFAULT_RETRY_TIME_UNIT = TimeUnit.SECONDS; private static final TimeUnit DEFAULT_RETRY_TIME_UNIT = TimeUnit.SECONDS;
private final ScheduledExecutorService scheduledExecutorService;
private final RetryLoop retryLoop;
private final Map<Callable<Boolean>, CountDownLatch> latches; private final Map<Callable<Boolean>, CountDownLatch> latches;
private final Map<Future<Boolean>, Callable<Boolean>> callables; private final Map<Future<Boolean>, Callable<Boolean>> callables;
private final CompletionService<Boolean> completionService; private final CompletionService<Boolean> completionService;
private final RetryLoop retryLoop; private final Object mutex;
private final ScheduledExecutorService executorService;
private long retryDelay; private long retryDelay;
private TimeUnit retryTimeUnit; private TimeUnit retryTimeUnit;
...@@ -72,7 +74,7 @@ public class RetryingExecutorService { ...@@ -72,7 +74,7 @@ public class RetryingExecutorService {
if (DEBUG) { if (DEBUG) {
log.info("Retry requested."); log.info("Retry requested.");
} }
executorService.schedule(new Runnable() { scheduledExecutorService.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
submit(callable); submit(callable);
...@@ -85,20 +87,21 @@ public class RetryingExecutorService { ...@@ -85,20 +87,21 @@ public class RetryingExecutorService {
} }
/** /**
* @param executorService * @param scheduledExecutorService
* the {@link ExecutorService} to wrap * the {@link ExecutorService} to wrap
*/ */
public RetryingExecutorService(ScheduledExecutorService executorService) { public RetryingExecutorService(ScheduledExecutorService scheduledExecutorService) {
this.executorService = executorService; this.scheduledExecutorService = scheduledExecutorService;
retryLoop = new RetryLoop(); retryLoop = new RetryLoop();
latches = Maps.newConcurrentMap(); latches = Maps.newConcurrentMap();
callables = Maps.newConcurrentMap(); callables = Maps.newConcurrentMap();
completionService = new ExecutorCompletionService<Boolean>(executorService); completionService = new ExecutorCompletionService<Boolean>(scheduledExecutorService);
mutex = new Object();
retryDelay = DEFAULT_RETRY_DELAY; retryDelay = DEFAULT_RETRY_DELAY;
retryTimeUnit = DEFAULT_RETRY_TIME_UNIT; retryTimeUnit = DEFAULT_RETRY_TIME_UNIT;
running = true; running = true;
// TODO(damonkohler): Unify this with the passed in ExecutorService. // TODO(damonkohler): Unify this with the passed in ExecutorService.
executorService.execute(retryLoop); scheduledExecutorService.execute(retryLoop);
} }
/** /**
...@@ -111,7 +114,8 @@ public class RetryingExecutorService { ...@@ -111,7 +114,8 @@ public class RetryingExecutorService {
* @throws RejectedExecutionException * @throws RejectedExecutionException
* if the {@link RetryingExecutorService} is shutting down * if the {@link RetryingExecutorService} is shutting down
*/ */
public synchronized void submit(Callable<Boolean> callable) { public void submit(Callable<Boolean> callable) {
synchronized (mutex) {
if (running) { if (running) {
Future<Boolean> future = completionService.submit(callable); Future<Boolean> future = completionService.submit(callable);
latches.put(callable, new CountDownLatch(1)); latches.put(callable, new CountDownLatch(1));
...@@ -120,6 +124,7 @@ public class RetryingExecutorService { ...@@ -120,6 +124,7 @@ public class RetryingExecutorService {
throw new RejectedExecutionException(); throw new RejectedExecutionException();
} }
} }
}
/** /**
* @param delay * @param delay
......
...@@ -42,6 +42,7 @@ public class ServiceFactory { ...@@ -42,6 +42,7 @@ public class ServiceFactory {
private final SlaveServer slaveServer; private final SlaveServer slaveServer;
private final ServiceManager serviceManager; private final ServiceManager serviceManager;
private final ScheduledExecutorService executorService; private final ScheduledExecutorService executorService;
private final Object mutex;
public ServiceFactory(GraphName nodeName, SlaveServer slaveServer, ServiceManager serviceManager, public ServiceFactory(GraphName nodeName, SlaveServer slaveServer, ServiceManager serviceManager,
ScheduledExecutorService executorService) { ScheduledExecutorService executorService) {
...@@ -49,6 +50,7 @@ public class ServiceFactory { ...@@ -49,6 +50,7 @@ public class ServiceFactory {
this.slaveServer = slaveServer; this.slaveServer = slaveServer;
this.serviceManager = serviceManager; this.serviceManager = serviceManager;
this.executorService = executorService; this.executorService = executorService;
mutex = new Object();
} }
/** /**
...@@ -73,7 +75,7 @@ public class ServiceFactory { ...@@ -73,7 +75,7 @@ public class ServiceFactory {
DefaultServiceServer<T, S> serviceServer; DefaultServiceServer<T, S> serviceServer;
GraphName name = serviceDeclaration.getName(); GraphName name = serviceDeclaration.getName();
synchronized (serviceManager) { synchronized (mutex) {
if (serviceManager.hasServer(name)) { if (serviceManager.hasServer(name)) {
throw new DuplicateServiceException(String.format("ServiceServer %s already exists.", name)); throw new DuplicateServiceException(String.format("ServiceServer %s already exists.", name));
} else { } else {
...@@ -126,7 +128,7 @@ public class ServiceFactory { ...@@ -126,7 +128,7 @@ public class ServiceFactory {
GraphName name = serviceDeclaration.getName(); GraphName name = serviceDeclaration.getName();
boolean createdNewClient = false; boolean createdNewClient = false;
synchronized (serviceManager) { synchronized (mutex) {
if (serviceManager.hasClient(name)) { if (serviceManager.hasClient(name)) {
serviceClient = (DefaultServiceClient<T, S>) serviceManager.getClient(name); serviceClient = (DefaultServiceClient<T, S>) serviceManager.getClient(name);
} else { } else {
......
...@@ -62,6 +62,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub ...@@ -62,6 +62,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub
private final IncomingMessageQueue<T> incomingMessageQueue; private final IncomingMessageQueue<T> incomingMessageQueue;
private final Set<PublisherIdentifier> knownPublishers; private final Set<PublisherIdentifier> knownPublishers;
private final TcpClientManager tcpClientManager; private final TcpClientManager tcpClientManager;
private final Object mutex;
/** /**
* Manages the {@link SubscriberListener}s for this {@link Subscriber}. * Manages the {@link SubscriberListener}s for this {@link Subscriber}.
...@@ -82,6 +83,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub ...@@ -82,6 +83,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub
incomingMessageQueue = new IncomingMessageQueue<T>(deserializer, executorService); incomingMessageQueue = new IncomingMessageQueue<T>(deserializer, executorService);
knownPublishers = Sets.newHashSet(); knownPublishers = Sets.newHashSet();
tcpClientManager = new TcpClientManager(executorService); tcpClientManager = new TcpClientManager(executorService);
mutex = new Object();
SubscriberHandshakeHandler<T> subscriberHandshakeHandler = SubscriberHandshakeHandler<T> subscriberHandshakeHandler =
new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(), new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(),
incomingMessageQueue, executorService); incomingMessageQueue, executorService);
...@@ -138,8 +140,8 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub ...@@ -138,8 +140,8 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub
} }
@VisibleForTesting @VisibleForTesting
public synchronized void addPublisher(PublisherIdentifier publisherIdentifier, public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) {
InetSocketAddress address) { synchronized (mutex) {
// TODO(damonkohler): If the connection is dropped, knownPublishers should // TODO(damonkohler): If the connection is dropped, knownPublishers should
// be updated. // be updated.
if (knownPublishers.contains(publisherIdentifier)) { if (knownPublishers.contains(publisherIdentifier)) {
...@@ -151,6 +153,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub ...@@ -151,6 +153,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub
knownPublishers.add(publisherIdentifier); knownPublishers.add(publisherIdentifier);
signalOnNewPublisher(publisherIdentifier); signalOnNewPublisher(publisherIdentifier);
} }
}
/** /**
* Updates the list of {@link Publisher}s for the topic that this * Updates the list of {@link Publisher}s for the topic that this
......
...@@ -36,6 +36,7 @@ public class PublisherFactory { ...@@ -36,6 +36,7 @@ public class PublisherFactory {
private final MessageFactory messageFactory; private final MessageFactory messageFactory;
private final ScheduledExecutorService executorService; private final ScheduledExecutorService executorService;
private final NodeIdentifier nodeIdentifier; private final NodeIdentifier nodeIdentifier;
private final Object mutex;
public PublisherFactory(NodeIdentifier nodeIdentifier, public PublisherFactory(NodeIdentifier nodeIdentifier,
TopicParticipantManager topicParticipantManager, MessageFactory messageFactory, TopicParticipantManager topicParticipantManager, MessageFactory messageFactory,
...@@ -44,6 +45,7 @@ public class PublisherFactory { ...@@ -44,6 +45,7 @@ public class PublisherFactory {
this.topicParticipantManager = topicParticipantManager; this.topicParticipantManager = topicParticipantManager;
this.messageFactory = messageFactory; this.messageFactory = messageFactory;
this.executorService = executorService; this.executorService = executorService;
mutex = new Object();
} }
/** /**
...@@ -63,8 +65,7 @@ public class PublisherFactory { ...@@ -63,8 +65,7 @@ public class PublisherFactory {
public <T> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration, public <T> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration,
MessageSerializer<T> messageSerializer) { MessageSerializer<T> messageSerializer) {
GraphName topicName = topicDeclaration.getName(); GraphName topicName = topicDeclaration.getName();
synchronized (mutex) {
synchronized (topicParticipantManager) {
if (topicParticipantManager.hasPublisher(topicName)) { if (topicParticipantManager.hasPublisher(topicName)) {
return (DefaultPublisher<T>) topicParticipantManager.getPublisher(topicName); return (DefaultPublisher<T>) topicParticipantManager.getPublisher(topicName);
} else { } else {
......
...@@ -34,12 +34,14 @@ public class SubscriberFactory { ...@@ -34,12 +34,14 @@ public class SubscriberFactory {
private final NodeIdentifier nodeIdentifier; private final NodeIdentifier nodeIdentifier;
private final TopicParticipantManager topicParticipantManager; private final TopicParticipantManager topicParticipantManager;
private final ScheduledExecutorService executorService; private final ScheduledExecutorService executorService;
private final Object mutex;
public SubscriberFactory(NodeIdentifier nodeIdentifier, public SubscriberFactory(NodeIdentifier nodeIdentifier,
TopicParticipantManager topicParticipantManager, ScheduledExecutorService executorService) { TopicParticipantManager topicParticipantManager, ScheduledExecutorService executorService) {
this.nodeIdentifier = nodeIdentifier; this.nodeIdentifier = nodeIdentifier;
this.topicParticipantManager = topicParticipantManager; this.topicParticipantManager = topicParticipantManager;
this.executorService = executorService; this.executorService = executorService;
mutex = new Object();
} }
/** /**
...@@ -60,7 +62,7 @@ public class SubscriberFactory { ...@@ -60,7 +62,7 @@ public class SubscriberFactory {
MessageDeserializer<T> messageDeserializer) { MessageDeserializer<T> messageDeserializer) {
GraphName topicName = topicDeclaration.getName(); GraphName topicName = topicDeclaration.getName();
synchronized (topicParticipantManager) { synchronized (mutex) {
if (topicParticipantManager.hasSubscriber(topicName)) { if (topicParticipantManager.hasSubscriber(topicName)) {
return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName); return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName);
} else { } else {
......
...@@ -54,9 +54,7 @@ public class LazyMessage<T> { ...@@ -54,9 +54,7 @@ public class LazyMessage<T> {
@VisibleForTesting @VisibleForTesting
LazyMessage(T message) { LazyMessage(T message) {
buffer = null; this(null, null);
deserializer = null;
mutex = null;
this.message = message; this.message = message;
} }
...@@ -64,14 +62,12 @@ public class LazyMessage<T> { ...@@ -64,14 +62,12 @@ public class LazyMessage<T> {
* @return the deserialized message * @return the deserialized message
*/ */
public T get() { public T get() {
synchronized (mutex) {
if (message != null) { if (message != null) {
return message; return message;
} }
synchronized (mutex) {
if (message == null) {
message = deserializer.deserialize(buffer); message = deserializer.deserialize(buffer);
} }
}
return message; return message;
} }
} }
\ No newline at end of file
...@@ -50,6 +50,7 @@ public class OutgoingMessageQueue<T> { ...@@ -50,6 +50,7 @@ public class OutgoingMessageQueue<T> {
private final Writer writer; private final Writer writer;
private final MessageBufferPool messageBufferPool; private final MessageBufferPool messageBufferPool;
private final ChannelBuffer latchedBuffer; private final ChannelBuffer latchedBuffer;
private final Object mutex;
private boolean latchMode; private boolean latchMode;
private T latchedMessage; private T latchedMessage;
...@@ -84,6 +85,7 @@ public class OutgoingMessageQueue<T> { ...@@ -84,6 +85,7 @@ public class OutgoingMessageQueue<T> {
writer = new Writer(); writer = new Writer();
messageBufferPool = new MessageBufferPool(); messageBufferPool = new MessageBufferPool();
latchedBuffer = MessageBuffers.dynamicBuffer(); latchedBuffer = MessageBuffers.dynamicBuffer();
mutex = new Object();
latchMode = false; latchMode = false;
executorService.execute(writer); executorService.execute(writer);
} }
...@@ -105,9 +107,11 @@ public class OutgoingMessageQueue<T> { ...@@ -105,9 +107,11 @@ public class OutgoingMessageQueue<T> {
setLatchedMessage(message); setLatchedMessage(message);
} }
private synchronized void setLatchedMessage(T message) { private void setLatchedMessage(T message) {
synchronized (mutex) {
latchedMessage = message; latchedMessage = message;
} }
}
/** /**
* Stop writing messages and close all outgoing connections. * Stop writing messages and close all outgoing connections.
...@@ -134,11 +138,13 @@ public class OutgoingMessageQueue<T> { ...@@ -134,11 +138,13 @@ public class OutgoingMessageQueue<T> {
// TODO(damonkohler): Avoid re-serializing the latched message if it hasn't // TODO(damonkohler): Avoid re-serializing the latched message if it hasn't
// changed. // changed.
private synchronized void writeLatchedMessage(Channel channel) { private void writeLatchedMessage(Channel channel) {
synchronized (mutex) {
latchedBuffer.clear(); latchedBuffer.clear();
serializer.serialize(latchedMessage, latchedBuffer); serializer.serialize(latchedMessage, latchedBuffer);
channel.write(latchedBuffer); channel.write(latchedBuffer);
} }
}
/** /**
* @return the number of {@link Channel}s which have been added to this queue * @return the number of {@link Channel}s which have been added to this queue
......
...@@ -35,6 +35,8 @@ import java.util.concurrent.TimeUnit; ...@@ -35,6 +35,8 @@ import java.util.concurrent.TimeUnit;
*/ */
public class SwitchableMasterUriProvider implements MasterUriProvider { public class SwitchableMasterUriProvider implements MasterUriProvider {
private final Object mutex;
/** /**
* The current provider in use. * The current provider in use.
*/ */
...@@ -51,6 +53,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { ...@@ -51,6 +53,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider {
*/ */
public SwitchableMasterUriProvider(MasterUriProvider provider) { public SwitchableMasterUriProvider(MasterUriProvider provider) {
this.provider = provider; this.provider = provider;
mutex = new Object();
} }
@Override @Override
...@@ -58,7 +61,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { ...@@ -58,7 +61,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider {
MasterUriProvider providerToUse = null; MasterUriProvider providerToUse = null;
ProviderRequest requestToUse = null; ProviderRequest requestToUse = null;
synchronized (this) { synchronized (mutex) {
if (provider != null) { if (provider != null) {
providerToUse = provider; providerToUse = provider;
} else { } else {
...@@ -80,7 +83,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { ...@@ -80,7 +83,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider {
// seems appropriate to wait rather than to return immediately. // seems appropriate to wait rather than to return immediately.
MasterUriProvider providerToUse = null; MasterUriProvider providerToUse = null;
synchronized (this) { synchronized (mutex) {
if (provider != null) { if (provider != null) {
providerToUse = provider; providerToUse = provider;
} }
...@@ -105,7 +108,8 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { ...@@ -105,7 +108,8 @@ public class SwitchableMasterUriProvider implements MasterUriProvider {
* @param switcher * @param switcher
* the new provider * the new provider
*/ */
public synchronized void switchProvider(MasterUriProviderSwitcher switcher) { public void switchProvider(MasterUriProviderSwitcher switcher) {
synchronized (mutex) {
MasterUriProvider oldProvider = provider; MasterUriProvider oldProvider = provider;
provider = switcher.switchProvider(oldProvider); provider = switcher.switchProvider(oldProvider);
...@@ -116,6 +120,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { ...@@ -116,6 +120,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider {
pending.clear(); pending.clear();
} }
} }
}
/** /**
* Perform a switch between {@link MasterUriProvider} instances for the * Perform a switch between {@link MasterUriProvider} instances for the
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment