diff --git a/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java b/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java index d04e5edce6e6736c330eae3866cbcd2f8e8c13dd..fa6bc0ba660fc3244dcde602edec783389e1fdb8 100644 --- a/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java +++ b/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java @@ -26,6 +26,9 @@ import java.util.concurrent.ExecutorService; * @author khughes@google.com (Keith M. Hughes) */ public abstract class CancellableLoop implements Runnable { + + private final Object mutex; + /** * {@code true} if the code has been run once, {@code false} otherwise. */ @@ -36,9 +39,13 @@ public abstract class CancellableLoop implements Runnable { */ private Thread thread; + public CancellableLoop() { + mutex = new Object(); + } + @Override public void run() { - synchronized (this) { + synchronized (mutex) { Preconditions.checkState(!ranOnce, "CancellableLoops cannot be restarted."); ranOnce = true; thread = Thread.currentThread(); diff --git a/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java b/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java index 36a0005772630765fb127f6fd4eceb54d9815f00..7a36c360259715432711294bc84ddb65d1110fa8 100644 --- a/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java +++ b/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java @@ -35,7 +35,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** - * Wraps an {@link ScheduledExecutorService} to execute {@link Callable}s with retries. + * Wraps an {@link ScheduledExecutorService} to execute {@link Callable}s with + * retries. * * @author damonkohler@google.com (Damon Kohler) */ @@ -47,11 +48,12 @@ public class RetryingExecutorService { private static final long DEFAULT_RETRY_DELAY = 5; private static final TimeUnit DEFAULT_RETRY_TIME_UNIT = TimeUnit.SECONDS; + private final ScheduledExecutorService scheduledExecutorService; + private final RetryLoop retryLoop; private final Map<Callable<Boolean>, CountDownLatch> latches; private final Map<Future<Boolean>, Callable<Boolean>> callables; private final CompletionService<Boolean> completionService; - private final RetryLoop retryLoop; - private final ScheduledExecutorService executorService; + private final Object mutex; private long retryDelay; private TimeUnit retryTimeUnit; @@ -72,7 +74,7 @@ public class RetryingExecutorService { if (DEBUG) { log.info("Retry requested."); } - executorService.schedule(new Runnable() { + scheduledExecutorService.schedule(new Runnable() { @Override public void run() { submit(callable); @@ -85,20 +87,21 @@ public class RetryingExecutorService { } /** - * @param executorService + * @param scheduledExecutorService * the {@link ExecutorService} to wrap */ - public RetryingExecutorService(ScheduledExecutorService executorService) { - this.executorService = executorService; + public RetryingExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = scheduledExecutorService; retryLoop = new RetryLoop(); latches = Maps.newConcurrentMap(); callables = Maps.newConcurrentMap(); - completionService = new ExecutorCompletionService<Boolean>(executorService); + completionService = new ExecutorCompletionService<Boolean>(scheduledExecutorService); + mutex = new Object(); retryDelay = DEFAULT_RETRY_DELAY; retryTimeUnit = DEFAULT_RETRY_TIME_UNIT; running = true; // TODO(damonkohler): Unify this with the passed in ExecutorService. - executorService.execute(retryLoop); + scheduledExecutorService.execute(retryLoop); } /** @@ -111,13 +114,15 @@ public class RetryingExecutorService { * @throws RejectedExecutionException * if the {@link RetryingExecutorService} is shutting down */ - public synchronized void submit(Callable<Boolean> callable) { - if (running) { - Future<Boolean> future = completionService.submit(callable); - latches.put(callable, new CountDownLatch(1)); - callables.put(future, callable); - } else { - throw new RejectedExecutionException(); + public void submit(Callable<Boolean> callable) { + synchronized (mutex) { + if (running) { + Future<Boolean> future = completionService.submit(callable); + latches.put(callable, new CountDownLatch(1)); + callables.put(future, callable); + } else { + throw new RejectedExecutionException(); + } } } diff --git a/rosjava/src/main/java/org/ros/internal/node/service/ServiceFactory.java b/rosjava/src/main/java/org/ros/internal/node/service/ServiceFactory.java index 9662dc21639f42ac56ee74a1d262e026b1081860..ae2cdb2de29f4e7187d6460c0d4994792613618e 100644 --- a/rosjava/src/main/java/org/ros/internal/node/service/ServiceFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/service/ServiceFactory.java @@ -42,6 +42,7 @@ public class ServiceFactory { private final SlaveServer slaveServer; private final ServiceManager serviceManager; private final ScheduledExecutorService executorService; + private final Object mutex; public ServiceFactory(GraphName nodeName, SlaveServer slaveServer, ServiceManager serviceManager, ScheduledExecutorService executorService) { @@ -49,6 +50,7 @@ public class ServiceFactory { this.slaveServer = slaveServer; this.serviceManager = serviceManager; this.executorService = executorService; + mutex = new Object(); } /** @@ -73,7 +75,7 @@ public class ServiceFactory { DefaultServiceServer<T, S> serviceServer; GraphName name = serviceDeclaration.getName(); - synchronized (serviceManager) { + synchronized (mutex) { if (serviceManager.hasServer(name)) { throw new DuplicateServiceException(String.format("ServiceServer %s already exists.", name)); } else { @@ -126,7 +128,7 @@ public class ServiceFactory { GraphName name = serviceDeclaration.getName(); boolean createdNewClient = false; - synchronized (serviceManager) { + synchronized (mutex) { if (serviceManager.hasClient(name)) { serviceClient = (DefaultServiceClient<T, S>) serviceManager.getClient(name); } else { diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java index a5b70dde00e5b599f01fa5dd2f46065380f69ad2..1ec943d4bd51b367afaa7f4e72736fe442d99314 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java @@ -62,6 +62,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub private final IncomingMessageQueue<T> incomingMessageQueue; private final Set<PublisherIdentifier> knownPublishers; private final TcpClientManager tcpClientManager; + private final Object mutex; /** * Manages the {@link SubscriberListener}s for this {@link Subscriber}. @@ -82,6 +83,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub incomingMessageQueue = new IncomingMessageQueue<T>(deserializer, executorService); knownPublishers = Sets.newHashSet(); tcpClientManager = new TcpClientManager(executorService); + mutex = new Object(); SubscriberHandshakeHandler<T> subscriberHandshakeHandler = new SubscriberHandshakeHandler<T>(toDeclaration().toConnectionHeader(), incomingMessageQueue, executorService); @@ -138,18 +140,19 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub } @VisibleForTesting - public synchronized void addPublisher(PublisherIdentifier publisherIdentifier, - InetSocketAddress address) { - // TODO(damonkohler): If the connection is dropped, knownPublishers should - // be updated. - if (knownPublishers.contains(publisherIdentifier)) { - return; + public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) { + synchronized (mutex) { + // TODO(damonkohler): If the connection is dropped, knownPublishers should + // be updated. + if (knownPublishers.contains(publisherIdentifier)) { + return; + } + tcpClientManager.connect(toString(), address); + // TODO(damonkohler): knownPublishers is duplicate information that is + // already available to the TopicParticipantManager. + knownPublishers.add(publisherIdentifier); + signalOnNewPublisher(publisherIdentifier); } - tcpClientManager.connect(toString(), address); - // TODO(damonkohler): knownPublishers is duplicate information that is - // already available to the TopicParticipantManager. - knownPublishers.add(publisherIdentifier); - signalOnNewPublisher(publisherIdentifier); } /** diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/PublisherFactory.java b/rosjava/src/main/java/org/ros/internal/node/topic/PublisherFactory.java index 8f27d1a78516486593a149fd6b63a5612c2746c8..0bed2e7ec9117facac8b2aea2d055373a491afdf 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/PublisherFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/PublisherFactory.java @@ -36,6 +36,7 @@ public class PublisherFactory { private final MessageFactory messageFactory; private final ScheduledExecutorService executorService; private final NodeIdentifier nodeIdentifier; + private final Object mutex; public PublisherFactory(NodeIdentifier nodeIdentifier, TopicParticipantManager topicParticipantManager, MessageFactory messageFactory, @@ -44,6 +45,7 @@ public class PublisherFactory { this.topicParticipantManager = topicParticipantManager; this.messageFactory = messageFactory; this.executorService = executorService; + mutex = new Object(); } /** @@ -63,8 +65,7 @@ public class PublisherFactory { public <T> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration, MessageSerializer<T> messageSerializer) { GraphName topicName = topicDeclaration.getName(); - - synchronized (topicParticipantManager) { + synchronized (mutex) { if (topicParticipantManager.hasPublisher(topicName)) { return (DefaultPublisher<T>) topicParticipantManager.getPublisher(topicName); } else { diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java b/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java index 82705c8e2480af2b1b305c25c23314deff04bcf7..efb35eef27e7b2af22291d54962acbb44d1ccd86 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java @@ -34,12 +34,14 @@ public class SubscriberFactory { private final NodeIdentifier nodeIdentifier; private final TopicParticipantManager topicParticipantManager; private final ScheduledExecutorService executorService; + private final Object mutex; public SubscriberFactory(NodeIdentifier nodeIdentifier, TopicParticipantManager topicParticipantManager, ScheduledExecutorService executorService) { this.nodeIdentifier = nodeIdentifier; this.topicParticipantManager = topicParticipantManager; this.executorService = executorService; + mutex = new Object(); } /** @@ -60,7 +62,7 @@ public class SubscriberFactory { MessageDeserializer<T> messageDeserializer) { GraphName topicName = topicDeclaration.getName(); - synchronized (topicParticipantManager) { + synchronized (mutex) { if (topicParticipantManager.hasSubscriber(topicName)) { return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName); } else { diff --git a/rosjava/src/main/java/org/ros/internal/transport/queue/LazyMessage.java b/rosjava/src/main/java/org/ros/internal/transport/queue/LazyMessage.java index 1be527b950d36f0ed480a239a1151192e647fdba..d2e181310db237a9c2193b4d21f48cc1fc839063 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/queue/LazyMessage.java +++ b/rosjava/src/main/java/org/ros/internal/transport/queue/LazyMessage.java @@ -54,9 +54,7 @@ public class LazyMessage<T> { @VisibleForTesting LazyMessage(T message) { - buffer = null; - deserializer = null; - mutex = null; + this(null, null); this.message = message; } @@ -64,13 +62,11 @@ public class LazyMessage<T> { * @return the deserialized message */ public T get() { - if (message != null) { - return message; - } synchronized (mutex) { - if (message == null) { - message = deserializer.deserialize(buffer); + if (message != null) { + return message; } + message = deserializer.deserialize(buffer); } return message; } diff --git a/rosjava/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java b/rosjava/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java index f9f3f8c378728ad7dd11c1dad76c359f79d8f107..19f8f2591397f91ef5778edd624ab6f631842ae7 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java +++ b/rosjava/src/main/java/org/ros/internal/transport/queue/OutgoingMessageQueue.java @@ -50,6 +50,7 @@ public class OutgoingMessageQueue<T> { private final Writer writer; private final MessageBufferPool messageBufferPool; private final ChannelBuffer latchedBuffer; + private final Object mutex; private boolean latchMode; private T latchedMessage; @@ -84,6 +85,7 @@ public class OutgoingMessageQueue<T> { writer = new Writer(); messageBufferPool = new MessageBufferPool(); latchedBuffer = MessageBuffers.dynamicBuffer(); + mutex = new Object(); latchMode = false; executorService.execute(writer); } @@ -105,8 +107,10 @@ public class OutgoingMessageQueue<T> { setLatchedMessage(message); } - private synchronized void setLatchedMessage(T message) { - latchedMessage = message; + private void setLatchedMessage(T message) { + synchronized (mutex) { + latchedMessage = message; + } } /** @@ -134,10 +138,12 @@ public class OutgoingMessageQueue<T> { // TODO(damonkohler): Avoid re-serializing the latched message if it hasn't // changed. - private synchronized void writeLatchedMessage(Channel channel) { - latchedBuffer.clear(); - serializer.serialize(latchedMessage, latchedBuffer); - channel.write(latchedBuffer); + private void writeLatchedMessage(Channel channel) { + synchronized (mutex) { + latchedBuffer.clear(); + serializer.serialize(latchedMessage, latchedBuffer); + channel.write(latchedBuffer); + } } /** diff --git a/rosjava/src/main/java/org/ros/master/uri/SwitchableMasterUriProvider.java b/rosjava/src/main/java/org/ros/master/uri/SwitchableMasterUriProvider.java index dce3da7b6b1bd1254469885edbd26594369ee947..45248b7623ac1e032e37bb063152034cf42136ed 100644 --- a/rosjava/src/main/java/org/ros/master/uri/SwitchableMasterUriProvider.java +++ b/rosjava/src/main/java/org/ros/master/uri/SwitchableMasterUriProvider.java @@ -35,6 +35,8 @@ import java.util.concurrent.TimeUnit; */ public class SwitchableMasterUriProvider implements MasterUriProvider { + private final Object mutex; + /** * The current provider in use. */ @@ -51,6 +53,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { */ public SwitchableMasterUriProvider(MasterUriProvider provider) { this.provider = provider; + mutex = new Object(); } @Override @@ -58,7 +61,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { MasterUriProvider providerToUse = null; ProviderRequest requestToUse = null; - synchronized (this) { + synchronized (mutex) { if (provider != null) { providerToUse = provider; } else { @@ -80,7 +83,7 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { // seems appropriate to wait rather than to return immediately. MasterUriProvider providerToUse = null; - synchronized (this) { + synchronized (mutex) { if (provider != null) { providerToUse = provider; } @@ -105,15 +108,17 @@ public class SwitchableMasterUriProvider implements MasterUriProvider { * @param switcher * the new provider */ - public synchronized void switchProvider(MasterUriProviderSwitcher switcher) { - MasterUriProvider oldProvider = provider; - provider = switcher.switchProvider(oldProvider); - - if (oldProvider == null) { - for (ProviderRequest request : pending) { - request.setProvider(provider); + public void switchProvider(MasterUriProviderSwitcher switcher) { + synchronized (mutex) { + MasterUriProvider oldProvider = provider; + provider = switcher.switchProvider(oldProvider); + + if (oldProvider == null) { + for (ProviderRequest request : pending) { + request.setProvider(provider); + } + pending.clear(); } - pending.clear(); } }