| /* |
| * Copyright (C) 2011 The Guava Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.google.common.collect; |
| |
| import com.google.common.util.concurrent.Uninterruptibles; |
| |
| import junit.framework.TestCase; |
| |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.PriorityBlockingQueue; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * Tests for {@link Queues}. |
| * |
| * @author Dimitris Andreou |
| */ |
| |
| public class QueuesTest extends TestCase { |
| /* |
| * All the following tests relate to BlockingQueue methods in Queues. |
| */ |
| |
| public static List<BlockingQueue<Object>> blockingQueues() { |
| return ImmutableList.<BlockingQueue<Object>>of( |
| new LinkedBlockingQueue<Object>(), |
| new LinkedBlockingQueue<Object>(10), |
| new SynchronousQueue<Object>(), |
| new ArrayBlockingQueue<Object>(10), |
| new PriorityBlockingQueue<Object>(10, Ordering.arbitrary())); |
| } |
| |
| private ExecutorService threadPool; |
| |
| @Override |
| public void setUp() { |
| threadPool = Executors.newCachedThreadPool(); |
| } |
| |
| @Override |
| public void tearDown() throws InterruptedException { |
| // notice that if a Producer is interrupted (a bug), the Producer will go into an infinite |
| // loop, which will be noticed here |
| threadPool.shutdown(); |
| assertTrue("Some worker didn't finish in time", |
| threadPool.awaitTermination(1, TimeUnit.SECONDS)); |
| } |
| |
| private static <T> int drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, |
| long timeout, TimeUnit unit, boolean interruptibly) throws InterruptedException { |
| return interruptibly |
| ? Queues.drain(q, buffer, maxElements, timeout, unit) |
| : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit); |
| } |
| |
| public void testMultipleProducers() throws Exception { |
| for (BlockingQueue<Object> q : blockingQueues()) { |
| testMultipleProducers(q); |
| } |
| } |
| |
| private void testMultipleProducers(BlockingQueue<Object> q) |
| throws InterruptedException { |
| for (boolean interruptibly : new boolean[] { true, false }) { |
| threadPool.submit(new Producer(q, 20)); |
| threadPool.submit(new Producer(q, 20)); |
| threadPool.submit(new Producer(q, 20)); |
| threadPool.submit(new Producer(q, 20)); |
| threadPool.submit(new Producer(q, 20)); |
| |
| List<Object> buf = Lists.newArrayList(); |
| int elements = drain(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, interruptibly); |
| assertEquals(100, elements); |
| assertEquals(100, buf.size()); |
| assertDrained(q); |
| } |
| } |
| |
| public void testDrainTimesOut() throws Exception { |
| for (BlockingQueue<Object> q : blockingQueues()) { |
| testDrainTimesOut(q); |
| } |
| } |
| |
| private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception { |
| for (boolean interruptibly : new boolean[] { true, false }) { |
| assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, TimeUnit.MILLISECONDS)); |
| |
| // producing one, will ask for two |
| Future<?> submitter = threadPool.submit(new Producer(q, 1)); |
| |
| // make sure we time out |
| long startTime = System.nanoTime(); |
| |
| int drained = drain(q, Lists.newArrayList(), 2, 10, TimeUnit.MILLISECONDS, interruptibly); |
| assertTrue(drained <= 1); |
| |
| assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10)); |
| |
| // If even the first one wasn't there, clean up so that the next test doesn't see an element. |
| submitter.get(); |
| if (drained == 0) { |
| assertNotNull(q.poll()); |
| } |
| } |
| } |
| |
| public void testZeroElements() throws Exception { |
| for (BlockingQueue<Object> q : blockingQueues()) { |
| testZeroElements(q); |
| } |
| } |
| |
| private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException { |
| for (boolean interruptibly : new boolean[] { true, false }) { |
| // asking to drain zero elements |
| assertEquals(0, drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS, interruptibly)); |
| } |
| } |
| |
| public void testEmpty() throws Exception { |
| for (BlockingQueue<Object> q : blockingQueues()) { |
| testEmpty(q); |
| } |
| } |
| |
| private void testEmpty(BlockingQueue<Object> q) { |
| assertDrained(q); |
| } |
| |
| public void testNegativeMaxElements() throws Exception { |
| for (BlockingQueue<Object> q : blockingQueues()) { |
| testNegativeMaxElements(q); |
| } |
| } |
| |
| private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException { |
| threadPool.submit(new Producer(q, 1)); |
| |
| List<Object> buf = Lists.newArrayList(); |
| int elements = Queues.drain(q, buf, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS); |
| assertEquals(elements, 0); |
| assertTrue(buf.isEmpty()); |
| |
| // Clean up produced element to free the producer thread, otherwise it will complain |
| // when we shutdown the threadpool. |
| Queues.drain(q, buf, 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS); |
| } |
| |
| public void testDrain_throws() throws Exception { |
| for (BlockingQueue<Object> q : blockingQueues()) { |
| testDrain_throws(q); |
| } |
| } |
| |
| private void testDrain_throws(BlockingQueue<Object> q) { |
| threadPool.submit(new Interrupter(Thread.currentThread())); |
| try { |
| Queues.drain(q, ImmutableList.of(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS); |
| fail(); |
| } catch (InterruptedException expected) { |
| } |
| } |
| |
| public void testDrainUninterruptibly_doesNotThrow() throws Exception { |
| for (BlockingQueue<Object> q : blockingQueues()) { |
| testDrainUninterruptibly_doesNotThrow(q); |
| } |
| } |
| |
| private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) { |
| final Thread mainThread = Thread.currentThread(); |
| threadPool.submit(new Runnable() { |
| public void run() { |
| new Producer(q, 50).run(); |
| new Interrupter(mainThread).run(); |
| new Producer(q, 50).run(); |
| } |
| }); |
| List<Object> buf = Lists.newArrayList(); |
| int elements = |
| Queues.drainUninterruptibly(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS); |
| // so when this drains all elements, we know the thread has also been interrupted in between |
| assertTrue(Thread.interrupted()); |
| assertEquals(100, elements); |
| assertEquals(100, buf.size()); |
| } |
| |
| public void testNewLinkedBlockingQueueCapacity() { |
| try { |
| Queues.newLinkedBlockingQueue(0); |
| fail("Should have thrown IllegalArgumentException"); |
| } catch (IllegalArgumentException expected) { |
| // any capacity less than 1 should throw IllegalArgumentException |
| } |
| assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity()); |
| assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity()); |
| } |
| |
| /** |
| * Checks that #drain() invocations behave correctly for a drained (empty) queue. |
| */ |
| private void assertDrained(BlockingQueue<Object> q) { |
| assertNull(q.peek()); |
| assertInterruptibleDrained(q); |
| assertUninterruptibleDrained(q); |
| } |
| |
| private void assertInterruptibleDrained(BlockingQueue<Object> q) { |
| // nothing to drain, thus this should wait doing nothing |
| try { |
| assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS)); |
| } catch (InterruptedException e) { |
| throw new AssertionError(); |
| } |
| |
| // but does the wait actually occurs? |
| threadPool.submit(new Interrupter(Thread.currentThread())); |
| try { |
| // if waiting works, this should get stuck |
| Queues.drain(q, Lists.newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS); |
| fail(); |
| } catch (InterruptedException expected) { |
| // we indeed waited; a slow thread had enough time to interrupt us |
| } |
| } |
| |
| // same as above; uninterruptible version |
| private void assertUninterruptibleDrained(BlockingQueue<Object> q) { |
| assertEquals(0, |
| Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS)); |
| |
| // but does the wait actually occurs? |
| threadPool.submit(new Interrupter(Thread.currentThread())); |
| |
| long startTime = System.nanoTime(); |
| Queues.drainUninterruptibly( |
| q, Lists.newArrayList(), 1, 10, TimeUnit.MILLISECONDS); |
| assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10)); |
| // wait for interrupted status and clear it |
| while (!Thread.interrupted()) { Thread.yield(); } |
| } |
| |
| private static class Producer implements Runnable { |
| final BlockingQueue<Object> q; |
| final int elements; |
| |
| Producer(BlockingQueue<Object> q, int elements) { |
| this.q = q; |
| this.elements = elements; |
| } |
| |
| @Override public void run() { |
| try { |
| for (int i = 0; i < elements; i++) { |
| q.put(new Object()); |
| } |
| } catch (InterruptedException e) { |
| // TODO(user): replace this when there is a better way to spawn threads in tests and |
| // have threads propagate their errors back to the test thread. |
| e.printStackTrace(); |
| // never returns, so that #tearDown() notices that one worker isn't done |
| Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.MILLISECONDS); |
| } |
| } |
| } |
| |
| private static class Interrupter implements Runnable { |
| final Thread threadToInterrupt; |
| |
| Interrupter(Thread threadToInterrupt) { |
| this.threadToInterrupt = threadToInterrupt; |
| } |
| |
| @Override public void run() { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| throw new AssertionError(); |
| } finally { |
| threadToInterrupt.interrupt(); |
| } |
| } |
| } |
| } |