diff --git a/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java b/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java
index 7a36c360259715432711294bc84ddb65d1110fa8..5fc5f64ab040d9ae378f6b565de66f383962ce67 100644
--- a/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java
+++ b/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java
@@ -63,7 +63,13 @@ public class RetryingExecutorService {
     @Override
     public void loop() throws InterruptedException {
       Future<Boolean> future = completionService.take();
-      final Callable<Boolean> callable = callables.remove(future);
+      Callable<Boolean> callable;
+      CountDownLatch latch;
+      // Grab the mutex to make sure submit() of the future that we took is finished.
+      synchronized (mutex) {
+        callable = callables.remove(future);
+        latch = latches.get(callable);
+      }
       boolean retry;
       try {
         retry = future.get();
@@ -74,14 +80,15 @@ public class RetryingExecutorService {
         if (DEBUG) {
           log.info("Retry requested.");
         }
+        final Callable<Boolean> finalCallable = callable;
         scheduledExecutorService.schedule(new Runnable() {
           @Override
           public void run() {
-            submit(callable);
+            submit(finalCallable);
           }
         }, retryDelay, retryTimeUnit);
       } else {
-        latches.get(callable).countDown();
+        latch.countDown();
       }
     }
   }
diff --git a/rosjava/src/test/java/org/ros/concurrent/RetryingExecutorServiceTest.java b/rosjava/src/test/java/org/ros/concurrent/RetryingExecutorServiceTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..88195872f02399717497e76595919a50f723e84d
--- /dev/null
+++ b/rosjava/src/test/java/org/ros/concurrent/RetryingExecutorServiceTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2012 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.ros.concurrent;
+
+import static org.mockito.Mockito.*;
+
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author rodrigoq@google.com (Rodrigo Queiro)
+ */
+public class RetryingExecutorServiceTest {
+
+  private ScheduledExecutorService executorService;
+
+  @Before
+  public void before() {
+    executorService = Executors.newScheduledThreadPool(4);
+  }
+
+  @Test
+  public void testNoRetry_calledOnce() throws Exception {
+    RetryingExecutorService service = new RetryingExecutorService(executorService);
+    Callable<Boolean> callable = mock(Callable.class);
+    when(callable.call()).thenReturn(false);
+    service.submit(callable);
+    service.shutdown(10, TimeUnit.SECONDS);
+    verify(callable, times(1)).call();
+  }
+
+  @Test
+  public void testOneRetry_calledTwice() throws Exception {
+    RetryingExecutorService service = new RetryingExecutorService(executorService);
+    service.setRetryDelay(0, TimeUnit.SECONDS);
+    Callable<Boolean> callable = mock(Callable.class);
+    when(callable.call()).thenReturn(true).thenReturn(false);
+    service.submit(callable);
+
+    // Call verify() with a timeout before calling shutdown, as shutdown() will prevent further
+    // retries.
+    verify(callable, timeout(10000).times(2)).call();
+    service.shutdown(10, TimeUnit.SECONDS);
+  }
+}