From f2134b2891e6eccb795b6af538644ef3d56bff10 Mon Sep 17 00:00:00 2001 From: Rodrigo Queiro <overdrigzed@gmail.com> Date: Fri, 18 May 2018 17:14:19 +0100 Subject: [PATCH] Avoid a race condition in RetryingExecutorService (#278) This fixes #274, which occurs when the task completes before the `Future` is added to the `callables` map (because the corresponding submit() is still executing). In that case, `callable` is null, which causes latches.get(callable) to throw an NPE. The bug can be reproduced with the added test by adding `Thread.sleep(1000)` below the `completionService.submit` call. --- .../concurrent/RetryingExecutorService.java | 13 +++- .../RetryingExecutorServiceTest.java | 64 +++++++++++++++++++ 2 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 rosjava/src/test/java/org/ros/concurrent/RetryingExecutorServiceTest.java diff --git a/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java b/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java index 7a36c360..5fc5f64a 100644 --- a/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java +++ b/rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java @@ -63,7 +63,13 @@ public class RetryingExecutorService { @Override public void loop() throws InterruptedException { Future<Boolean> future = completionService.take(); - final Callable<Boolean> callable = callables.remove(future); + Callable<Boolean> callable; + CountDownLatch latch; + // Grab the mutex to make sure submit() of the future that we took is finished. + synchronized (mutex) { + callable = callables.remove(future); + latch = latches.get(callable); + } boolean retry; try { retry = future.get(); @@ -74,14 +80,15 @@ public class RetryingExecutorService { if (DEBUG) { log.info("Retry requested."); } + final Callable<Boolean> finalCallable = callable; scheduledExecutorService.schedule(new Runnable() { @Override public void run() { - submit(callable); + submit(finalCallable); } }, retryDelay, retryTimeUnit); } else { - latches.get(callable).countDown(); + latch.countDown(); } } } diff --git a/rosjava/src/test/java/org/ros/concurrent/RetryingExecutorServiceTest.java b/rosjava/src/test/java/org/ros/concurrent/RetryingExecutorServiceTest.java new file mode 100644 index 00000000..88195872 --- /dev/null +++ b/rosjava/src/test/java/org/ros/concurrent/RetryingExecutorServiceTest.java @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2012 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.ros.concurrent; + +import static org.mockito.Mockito.*; + + +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; + +/** + * @author rodrigoq@google.com (Rodrigo Queiro) + */ +public class RetryingExecutorServiceTest { + + private ScheduledExecutorService executorService; + + @Before + public void before() { + executorService = Executors.newScheduledThreadPool(4); + } + + @Test + public void testNoRetry_calledOnce() throws Exception { + RetryingExecutorService service = new RetryingExecutorService(executorService); + Callable<Boolean> callable = mock(Callable.class); + when(callable.call()).thenReturn(false); + service.submit(callable); + service.shutdown(10, TimeUnit.SECONDS); + verify(callable, times(1)).call(); + } + + @Test + public void testOneRetry_calledTwice() throws Exception { + RetryingExecutorService service = new RetryingExecutorService(executorService); + service.setRetryDelay(0, TimeUnit.SECONDS); + Callable<Boolean> callable = mock(Callable.class); + when(callable.call()).thenReturn(true).thenReturn(false); + service.submit(callable); + + // Call verify() with a timeout before calling shutdown, as shutdown() will prevent further + // retries. + verify(callable, timeout(10000).times(2)).call(); + service.shutdown(10, TimeUnit.SECONDS); + } +} -- GitLab