Skip to content
Snippets Groups Projects
Commit 6c725390 authored by Damon Kohler's avatar Damon Kohler
Browse files

Changes CircularBlockingQueue to implement Iterable.

parent 2a94dac3
Branches
No related tags found
No related merge requests found
......@@ -16,16 +16,17 @@
package org.ros.concurrent;
import java.util.Iterator;
import java.util.Queue;
/**
* A {@link Queue} that removes the old elements when the number of elements
* exceeds the limit and blocks on {@link #take()} when there are no elements
* available.
* A {@link Queue}-like data structure that removes the old elements when the
* number of elements exceeds the limit and blocks on {@link #take()} when there
* are no elements available.
*
* @author damonkohler@google.com (Damon Kohler)
*/
public class CircularBlockingQueue<T> {
public class CircularBlockingQueue<T> implements Iterable<T> {
private final T[] queue;
private final Object mutex;
......@@ -65,8 +66,9 @@ public class CircularBlockingQueue<T> {
*
* @param entry
* the entry to add
* @return {@code true}
*/
public void add(T entry) {
public boolean add(T entry) {
synchronized (mutex) {
queue[(start + length) % limit] = entry;
if (length == limit) {
......@@ -76,6 +78,7 @@ public class CircularBlockingQueue<T> {
}
mutex.notify();
}
return true;
}
/**
......@@ -101,10 +104,39 @@ public class CircularBlockingQueue<T> {
return entry;
}
/**
* @return {@code true} if the queue is empty, {@code false} otherwise
*/
public boolean isEmpty() {
return length == 0;
}
/**
* Returns an iterator over the queue.
* <p>
* Note that this is not thread-safe and that {@link Iterator#remove()} is
* unsupported.
*
* @see java.lang.Iterable#iterator()
*/
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
int offset = 0;
@Override
public boolean hasNext() {
return offset < length;
}
@Override
public T next() {
T entry = queue[start + offset];
offset++;
return entry;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
......@@ -17,12 +17,14 @@
package org.ros.concurrent;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -41,7 +43,7 @@ public class CircularBlockingQueueTest {
}
@Test
public void testPutAndTake() throws InterruptedException {
public void testAddAndTake() throws InterruptedException {
CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(10);
String expectedString1 = "Hello, world!";
String expectedString2 = "Goodbye, world!";
......@@ -61,6 +63,23 @@ public class CircularBlockingQueueTest {
assertEquals(expectedString, queue.take());
}
@Test
public void testIterator() throws InterruptedException {
CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(10);
String expectedString1 = "Hello, world!";
String expectedString2 = "Goodbye, world!";
queue.add(expectedString1);
queue.add(expectedString2);
Iterator<String> iterator = queue.iterator();
assertEquals(expectedString1, iterator.next());
assertEquals(expectedString2, iterator.next());
assertFalse(iterator.hasNext());
queue.take();
iterator = queue.iterator();
assertEquals(expectedString2, iterator.next());
assertFalse(iterator.hasNext());
}
@Test
public void testBlockingTake() throws InterruptedException {
final CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(1);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment