| /* |
| * Copyright (C) 2006 Google Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.google.common.util.concurrent; |
| |
| import com.google.common.base.Function; |
| |
| import java.lang.reflect.UndeclaredThrowableException; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import javax.annotation.Nullable; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| /** |
| * Static utility methods pertaining to the {@link Future} interface. |
| * |
| * @author Kevin Bourrillion |
| * @author Nishant Thakkar |
| * @author Sven Mawson |
| * @since 2009.09.15 <b>tentative</b> |
| */ |
| public class Futures { |
| private Futures() {} |
| |
| /** |
| * Returns an uninterruptible view of a {@code Future}. If a thread is |
| * interrupted during an attempt to {@code get()} from the returned future, it |
| * continues to wait on the result until it is available or the timeout |
| * elapses, and only then re-interrupts the thread. |
| */ |
| public static <V> UninterruptibleFuture<V> makeUninterruptible( |
| final Future<V> future) { |
| checkNotNull(future); |
| if (future instanceof UninterruptibleFuture) { |
| return (UninterruptibleFuture<V>) future; |
| } |
| return new UninterruptibleFuture<V>() { |
| public boolean cancel(boolean mayInterruptIfRunning) { |
| return future.cancel(mayInterruptIfRunning); |
| } |
| public boolean isCancelled() { |
| return future.isCancelled(); |
| } |
| public boolean isDone() { |
| return future.isDone(); |
| } |
| |
| public V get(long timeoutDuration, TimeUnit timeoutUnit) |
| throws TimeoutException, ExecutionException { |
| boolean interrupted = false; |
| try { |
| long timeoutNanos = timeoutUnit.toNanos(timeoutDuration); |
| long end = System.nanoTime() + timeoutNanos; |
| for (long remaining = timeoutNanos; remaining > 0; |
| remaining = end - System.nanoTime()) { |
| try { |
| return future.get(remaining, TimeUnit.NANOSECONDS); |
| } catch (InterruptedException ignored) { |
| interrupted = true; |
| } |
| } |
| throw new TimeoutException(); |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| public V get() throws ExecutionException { |
| boolean interrupted = false; |
| try { |
| while (true) { |
| try { |
| return future.get(); |
| } catch (InterruptedException ignored) { |
| interrupted = true; |
| } |
| } |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| }; |
| } |
| |
| /** |
| * Creates a {@link ListenableFuture} out of a normal {@link Future}. The |
| * returned future will create a thread to wait for the source future to |
| * complete before executing the listeners. |
| * |
| * <p>Callers who have a future that subclasses |
| * {@link java.util.concurrent.FutureTask} may want to instead subclass |
| * {@link ListenableFutureTask}, which adds the {@link ListenableFuture} |
| * functionality to the standard {@code FutureTask} implementation. |
| */ |
| public static <T> ListenableFuture<T> makeListenable(Future<T> future) { |
| if (future instanceof ListenableFuture) { |
| return (ListenableFuture<T>) future; |
| } |
| return new ListenableFutureAdapter<T>(future); |
| } |
| |
| /** |
| * Creates a {@link CheckedFuture} out of a normal {@link Future} and a |
| * {@link Function} that maps from {@link Exception} instances into the |
| * appropriate checked type. |
| * |
| * <p>The given mapping function will be applied to an |
| * {@link InterruptedException}, a {@link CancellationException}, or an |
| * {@link ExecutionException} with the actual cause of the exception. |
| * See {@link Future#get()} for details on the exceptions thrown. |
| */ |
| public static <T, E extends Exception> CheckedFuture<T, E> makeChecked( |
| Future<T> future, Function<Exception, E> mapper) { |
| return new MappingCheckedFuture<T, E>(makeListenable(future), mapper); |
| } |
| |
| /** |
| * Creates a {@code ListenableFuture} which has its value set immediately upon |
| * construction. The getters just return the value. This {@code Future} can't |
| * be canceled or timed out and its {@code isDone()} method always returns |
| * {@code true}. It's useful for returning something that implements the |
| * {@code ListenableFuture} interface but already has the result. |
| */ |
| public static <T> ListenableFuture<T> immediateFuture(@Nullable T value) { |
| ValueFuture<T> future = ValueFuture.create(); |
| future.set(value); |
| return future; |
| } |
| |
| /** |
| * Creates a {@code CheckedFuture} which has its value set immediately upon |
| * construction. The getters just return the value. This {@code Future} can't |
| * be canceled or timed out and its {@code isDone()} method always returns |
| * {@code true}. It's useful for returning something that implements the |
| * {@code CheckedFuture} interface but already has the result. |
| */ |
| public static <T, E extends Exception> CheckedFuture<T, E> |
| immediateCheckedFuture(@Nullable T value) { |
| ValueFuture<T> future = ValueFuture.create(); |
| future.set(value); |
| return Futures.makeChecked(future, new Function<Exception, E>() { |
| public E apply(Exception e) { |
| throw new AssertionError("impossible"); |
| } |
| }); |
| } |
| |
| /** |
| * Creates a {@code ListenableFuture} which has an exception set immediately |
| * upon construction. The getters just return the value. This {@code Future} |
| * can't be canceled or timed out and its {@code isDone()} method always |
| * returns {@code true}. It's useful for returning something that implements |
| * the {@code ListenableFuture} interface but already has a failed |
| * result. Calling {@code get()} will throw the provided {@code Throwable} |
| * (wrapped in an {@code ExecutionException}). |
| * |
| * @throws Error if the throwable was an {@link Error}. |
| */ |
| public static <T> ListenableFuture<T> immediateFailedFuture( |
| Throwable throwable) { |
| checkNotNull(throwable); |
| ValueFuture<T> future = ValueFuture.create(); |
| future.setException(throwable); |
| return future; |
| } |
| |
| /** |
| * Creates a {@code CheckedFuture} which has an exception set immediately |
| * upon construction. The getters just return the value. This {@code Future} |
| * can't be canceled or timed out and its {@code isDone()} method always |
| * returns {@code true}. It's useful for returning something that implements |
| * the {@code CheckedFuture} interface but already has a failed result. |
| * Calling {@code get()} will throw the provided {@code Throwable} (wrapped in |
| * an {@code ExecutionException}) and calling {@code checkedGet()} will throw |
| * the provided exception itself. |
| * |
| * @throws Error if the throwable was an {@link Error}. |
| */ |
| public static <T, E extends Exception> CheckedFuture<T, E> |
| immediateFailedCheckedFuture(final E exception) { |
| checkNotNull(exception); |
| return makeChecked(Futures.<T>immediateFailedFuture(exception), |
| new Function<Exception, E>() { |
| public E apply(Exception e) { |
| return exception; |
| } |
| }); |
| } |
| |
| /** |
| * Creates a new {@code ListenableFuture} that wraps another |
| * {@code ListenableFuture}. The result of the new future is the result of |
| * the provided function called on the result of the provided future. |
| * The resulting future doesn't interrupt when aborted. |
| * |
| * <p>TODO: Add a version that accepts a normal {@code Future} |
| * |
| * <p>The typical use for this method would be when a RPC call is dependent on |
| * the results of another RPC. One would call the first RPC (input), create a |
| * function that calls another RPC based on input's result, and then call |
| * chain on input and that function to get a {@code ListenableFuture} of |
| * the result. |
| * |
| * @param input The future to chain |
| * @param function A function to chain the results of the provided future |
| * to the results of the returned future. This will be run in the thread |
| * that notifies input it is complete. |
| * @return A future that holds result of the chain. |
| */ |
| public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, |
| Function<? super I, ? extends ListenableFuture<? extends O>> function) { |
| return chain(input, function, Executors.sameThreadExecutor()); |
| } |
| |
| /** |
| * Creates a new {@code ListenableFuture} that wraps another |
| * {@code ListenableFuture}. The result of the new future is the result of |
| * the provided function called on the result of the provided future. |
| * The resulting future doesn't interrupt when aborted. |
| * |
| * <p>This version allows an arbitrary executor to be passed in for running |
| * the chained Function. When using {@link Executors#sameThreadExecutor}, the |
| * thread chained Function executes in will be whichever thread set the |
| * result of the input Future, which may be the network thread in the case of |
| * RPC-based Futures. |
| * |
| * @param input The future to chain |
| * @param function A function to chain the results of the provided future |
| * to the results of the returned future. |
| * @param exec Executor to run the function in. |
| * @return A future that holds result of the chain. |
| */ |
| public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, |
| Function<? super I, ? extends ListenableFuture<? extends O>> function, |
| Executor exec) { |
| ChainingListenableFuture<I, O> chain = |
| new ChainingListenableFuture<I, O>(function, input); |
| input.addListener(chain, exec); |
| return chain; |
| } |
| |
| /** |
| * Creates a new {@code ListenableFuture} that wraps another |
| * {@code ListenableFuture}. The result of the new future is the result of |
| * the provided function called on the result of the provided future. |
| * The resulting future doesn't interrupt when aborted. |
| * |
| * <p>An example use of this method is to convert a serializable object |
| * returned from an RPC into a POJO. |
| * |
| * @param future The future to compose |
| * @param function A Function to compose the results of the provided future |
| * to the results of the returned future. This will be run in the thread |
| * that notifies input it is complete. |
| * @return A future that holds result of the composition. |
| */ |
| public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future, |
| final Function<? super I, ? extends O> function) { |
| return compose(future, function, Executors.sameThreadExecutor()); |
| } |
| |
| /** |
| * Creates a new {@code ListenableFuture} that wraps another |
| * {@code ListenableFuture}. The result of the new future is the result of |
| * the provided function called on the result of the provided future. |
| * The resulting future doesn't interrupt when aborted. |
| * |
| * <p>An example use of this method is to convert a serializable object |
| * returned from an RPC into a POJO. |
| * |
| * <p>This version allows an arbitrary executor to be passed in for running |
| * the chained Function. When using {@link Executors#sameThreadExecutor}, the |
| * thread chained Function executes in will be whichever thread set the result |
| * of the input Future, which may be the network thread in the case of |
| * RPC-based Futures. |
| * |
| * @param future The future to compose |
| * @param function A Function to compose the results of the provided future |
| * to the results of the returned future. |
| * @param exec Executor to run the function in. |
| * @return A future that holds result of the composition. |
| * @since 2010.01.04 <b>tentative</b> |
| */ |
| public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future, |
| final Function<? super I, ? extends O> function, Executor exec) { |
| Function<I, ListenableFuture<O>> wrapperFunction |
| = new Function<I, ListenableFuture<O>>() { |
| /*@Override*/ public ListenableFuture<O> apply(I input) { |
| O output = function.apply(input); |
| return immediateFuture(output); |
| } |
| }; |
| return chain(future, wrapperFunction, exec); |
| } |
| |
| /** |
| * Creates a new {@code Future} that wraps another {@code Future}. |
| * The result of the new future is the result of the provided function called |
| * on the result of the provided future. |
| * |
| * <p>An example use of this method is to convert a Future that produces a |
| * handle to an object to a future that produces the object itself. |
| * |
| * <p>Each call to {@code Future<O>.get(*)} results in a call to |
| * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it |
| * is assumed that {@code Future<I>.get(*)} is idempotent. |
| * |
| * <p>When calling {@link Future#get(long, TimeUnit)} on the returned |
| * future, the timeout only applies to the future passed in to this method. |
| * Any additional time taken by applying {@code function} is not considered. |
| * |
| * @param future The future to compose |
| * @param function A Function to compose the results of the provided future |
| * to the results of the returned future. This will be run in the thread |
| * that calls one of the varieties of {@code get()}. |
| * @return A future that computes result of the composition. |
| */ |
| public static <I, O> Future<O> compose(final Future<I> future, |
| final Function<? super I, ? extends O> function) { |
| |
| return new Future<O>() { |
| |
| /* |
| * Concurrency detail: |
| * |
| * <p>To preserve the idempotency of calls to this.get(*) calls to the |
| * function are only applied once. A lock is required to prevent multiple |
| * applications of the function. The calls to future.get(*) are performed |
| * outside the lock, as is required to prevent calls to |
| * get(long, TimeUnit) to persist beyond their timeout. |
| * |
| * <p>Calls to future.get(*) on every call to this.get(*) also provide |
| * the cancellation behavior for this. |
| * |
| * <p>(Consider: in thread A, call get(), in thread B call get(long, |
| * TimeUnit). Thread B may have to wait for Thread A to finish, which |
| * would be unacceptable.) |
| * |
| * <p>Note that each call to Future<O>.get(*) results in a call to |
| * Future<I>.get(*), but the function is only applied once, so |
| * Future<I>.get(*) is assumed to be idempotent. |
| */ |
| |
| private final Object lock = new Object(); |
| private boolean set = false; |
| private O value = null; |
| |
| /*@Override*/ |
| public O get() throws InterruptedException, ExecutionException { |
| return apply(future.get()); |
| } |
| |
| /*@Override*/ |
| public O get(long timeout, TimeUnit unit) throws InterruptedException, |
| ExecutionException, TimeoutException { |
| return apply(future.get(timeout, unit)); |
| } |
| |
| private O apply(I raw) { |
| synchronized(lock) { |
| if (!set) { |
| value = function.apply(raw); |
| set = true; |
| } |
| return value; |
| } |
| } |
| |
| /*@Override*/ |
| public boolean cancel(boolean mayInterruptIfRunning) { |
| return future.cancel(mayInterruptIfRunning); |
| } |
| |
| /*@Override*/ |
| public boolean isCancelled() { |
| return future.isCancelled(); |
| } |
| |
| /*@Override*/ |
| public boolean isDone() { |
| return future.isDone(); |
| } |
| }; |
| } |
| |
| /** |
| * An implementation of {@code ListenableFuture} that also implements |
| * {@code Runnable} so that it can be used to nest ListenableFutures. |
| * Once the passed-in {@code ListenableFuture} is complete, it calls the |
| * passed-in {@code Function} to generate the result. |
| * The resulting future doesn't interrupt when aborted. |
| * |
| * <p>If the function throws any checked exceptions, they should be wrapped |
| * in a {@code UndeclaredThrowableException} so that this class can get |
| * access to the cause. |
| */ |
| private static class ChainingListenableFuture<I, O> |
| extends AbstractListenableFuture<O> implements Runnable { |
| |
| private final Function<? super I, ? extends ListenableFuture<? extends O>> |
| function; |
| private final UninterruptibleFuture<? extends I> inputFuture; |
| |
| private ChainingListenableFuture( |
| Function<? super I, ? extends ListenableFuture<? extends O>> function, |
| ListenableFuture<? extends I> inputFuture) { |
| this.function = function; |
| this.inputFuture = makeUninterruptible(inputFuture); |
| } |
| |
| public void run() { |
| try { |
| I sourceResult; |
| try { |
| sourceResult = inputFuture.get(); |
| } catch (CancellationException e) { |
| // Cancel this future and return. |
| cancel(); |
| return; |
| } catch (ExecutionException e) { |
| // Set the cause of the exception as this future's exception |
| setException(e.getCause()); |
| return; |
| } |
| |
| final ListenableFuture<? extends O> outputFuture = |
| function.apply(sourceResult); |
| outputFuture.addListener(new Runnable() { |
| public void run() { |
| try { |
| // Here it would have been nice to have had an |
| // UninterruptibleListenableFuture, but we don't want to start a |
| // combinatorial explosion of interfaces, so we have to make do. |
| set(makeUninterruptible(outputFuture).get()); |
| } catch (ExecutionException e) { |
| // Set the cause of the exception as this future's exception |
| setException(e.getCause()); |
| } |
| } |
| }, Executors.sameThreadExecutor()); |
| } catch (UndeclaredThrowableException e) { |
| // Set the cause of the exception as this future's exception |
| setException(e.getCause()); |
| } catch (RuntimeException e) { |
| // This exception is irrelevant in this thread, but useful for the |
| // client |
| setException(e); |
| } catch (Error e) { |
| // This seems evil, but the client needs to know an error occured and |
| // the error needs to be propagated ASAP. |
| setException(e); |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * A checked future that uses a function to map from exceptions to the |
| * appropriate checked type. |
| */ |
| private static class MappingCheckedFuture<T, E extends Exception> extends |
| AbstractCheckedFuture<T, E> { |
| |
| final Function<Exception, E> mapper; |
| |
| MappingCheckedFuture(ListenableFuture<T> delegate, |
| Function<Exception, E> mapper) { |
| super(delegate); |
| |
| this.mapper = mapper; |
| } |
| |
| @Override |
| protected E mapException(Exception e) { |
| return mapper.apply(e); |
| } |
| } |
| |
| /** |
| * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This |
| * will wait on the future to finish, and when it completes, run the |
| * listeners. This implementation will wait on the source future |
| * indefinitely, so if the source future never completes, the adapter will |
| * never complete either. |
| * |
| * <p>If the delegate future is interrupted or throws an unexpected unchecked |
| * exception, the listeners will not be invoked. |
| */ |
| private static class ListenableFutureAdapter<T> extends ForwardingFuture<T> |
| implements ListenableFuture<T> { |
| |
| private static final Executor adapterExecutor = |
| java.util.concurrent.Executors.newCachedThreadPool(); |
| |
| // The execution list to hold our listeners. |
| private final ExecutionList executionList = new ExecutionList(); |
| |
| // This allows us to only start up a thread waiting on the delegate future |
| // when the first listener is added. |
| private final AtomicBoolean hasListeners = new AtomicBoolean(false); |
| |
| // The delegate future. |
| private final Future<T> delegate; |
| |
| ListenableFutureAdapter(final Future<T> delegate) { |
| this.delegate = delegate; |
| } |
| |
| @Override |
| protected Future<T> delegate() { |
| return delegate; |
| } |
| |
| /*@Override*/ |
| public void addListener(Runnable listener, Executor exec) { |
| |
| // When a listener is first added, we run a task that will wait for |
| // the delegate to finish, and when it is done will run the listeners. |
| if (!hasListeners.get() && hasListeners.compareAndSet(false, true)) { |
| adapterExecutor.execute(new Runnable() { |
| /*@Override*/ |
| public void run() { |
| try { |
| delegate.get(); |
| } catch (CancellationException e) { |
| // The task was cancelled, so it is done, run the listeners. |
| } catch (InterruptedException e) { |
| // This thread was interrupted. This should never happen, so we |
| // throw an IllegalStateException. |
| throw new IllegalStateException("Adapter thread interrupted!", e); |
| } catch (ExecutionException e) { |
| // The task caused an exception, so it is done, run the listeners. |
| } |
| executionList.run(); |
| } |
| }); |
| } |
| executionList.add(listener, exec); |
| } |
| } |
| } |