返回首页 Google Guava 官方教程

基本工具

集合

并发

I/O

google Guava 包的 ListenableFuture 解析

并发编程是一个难题,但是一个强大而简单的抽象可以显著的简化并发的编写。出于这样的考虑,Guava 定义了 ListenableFuture 接口并继承了 JDK concurrent 包下的 Future 接口。

我们强烈地建议你在代码中多使用 ListenableFuture 来代替 JDK 的 Future, 因为:

  • 大多数 Futures 方法中需要它。
  • 转到 ListenableFuture 编程比较容易。
  • Guava 提供的通用公共类封装了公共的操作方方法,不需要提供 Future 和 ListenableFuture 的扩展方法。

接口

传统 JDK 中的 Future 通过异步的方式计算返回结果:在多线程运算中可能或者可能在没有结束返回结果,Future 是运行中的多线程的一个引用句柄,确保在服务执行返回一个 Result。

ListenableFuture 可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。

ListenableFuture 中的基础方法是 addListener(Runnable, Executor), 该方法会在多线程运算完的时候,指定的 Runnable 参数传入的对象会被指定的 Executor 执行。

添加回调(Callbacks)

多数用户喜欢使用 Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor)的方式, 或者 另外一个版本version(译者注:addCallback(ListenableFuture<V> future,FutureCallback<? super V> callback)),默认是采用 MoreExecutors.sameThreadExecutor()线程池, 为了简化使用,Callback 采用轻量级的设计. FutureCallback<V> 中实现了两个方法:

  • onSuccess(V),在 Future 成功的时候执行,根据 Future 结果来判断。
  • onFailure(Throwable), 在 Future 失败的时候执行,根据 Future 结果来判断。

ListenableFuture 的创建

对应 JDK 中的 ExecutorService.submit(Callable) 提交多线程异步运算的方式,Guava 提供了 ListeningExecutorService 接口, 该接口返回 ListenableFuture 而相应的 ExecutorService 返回普通的 Future。将 ExecutorService 转为 ListeningExecutorService,可以使用 MoreExecutors.listeningDecorator(ExecutorService)进行装饰。


    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
    ListenableFuture explosion = service.submit(new Callable() {
      public Explosion call() {
    return pushBigRedButton();
      }
    });
    Futures.addCallback(explosion, new FutureCallback() {
      // we want this handler to run immediately after we push the big red button!
      public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
      }
      public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
      }
    });

另外, 假如你是从 FutureTask 转换而来的, Guava 提供 ListenableFutureTask.create(Callable)ListenableFutureTask.create(Runnable, V). 和 JDK 不同的是, ListenableFutureTask 不能随意被继承(译者注:ListenableFutureTask 中的 done 方法实现了调用 listener 的操作)。

假如你喜欢抽象的方式来设置 future 的值,而不是想实现接口中的方法,可以考虑继承抽象类 AbstractFuture 或者直接使用 SettableFuture

假如你必须将其他 API 提供的 Future 转换成 ListenableFuture,你没有别的方法只能采用硬编码的方式 JdkFutureAdapters.listenInPoolThread(Future) 来将 Future 转换成 ListenableFuture。尽可能地采用修改原生的代码返回 ListenableFuture 会更好一些。

Application

使用 ListenableFuture 最重要的理由是它可以进行一系列的复杂链式的异步操作。


    ListenableFuture rowKeyFuture = indexService.lookUp(query);
    AsyncFunction<RowKey, QueryResult> queryFunction =
    new AsyncFunction<RowKey, QueryResult>() {
    public ListenableFuture apply(RowKey rowKey) {
    return dataService.read(rowKey);
    }
    };
    ListenableFuture queryFuture = Futures.transform(rowKeyFuture, queryFunction, queryExecutor);

其他更多的操作可以更加有效的支持而 JDK 中的 Future 是没法支持的.

不同的操作可以在不同的 Executors 中执行,单独的 ListenableFuture 可以有多个操作等待。

当一个操作开始的时候其他的一些操作也会尽快开始执行–“fan-out”–ListenableFuture 能够满足这样的场景:促发所有的回调(callbacks)。反之更简单的工作是,同样可以满足“fan-in”场景,促发 ListenableFuture 获取(get)计算结果,同时其它的 Futures 也会尽快执行:可以参考 the implementation of Futures.allAsList 。(译者注:fan-in 和 fan-out 是软件设计的一个术语,可以参考这里:http://baike.baidu.com/view/388892.htm#1 或者看这里的解析 Design Principles: Fan-In vs Fan-Out,这里 fan-out 的实现就是封装的 ListenableFuture 通过回调,调用其它代码片段。fan-in 的意义是可以调用其它 Future)

方法 描述 参考
transform(ListenableFuture<A>, AsyncFunction<A, B>, Executor)* 返回一个新的 ListenableFuture ,该 ListenableFuture 返回的 result 是由传入的 AsyncFunction 参数指派到传入的 ListenableFuture 中. transform(ListenableFuture<A>, AsyncFunction<A, B>)
transform(ListenableFuture<A>, Function<A, B>, Executor) 返回一个新的 ListenableFuture ,该 ListenableFuture 返回的 result 是由传入的Function 参数指派到传入的 ListenableFuture 中. transform(ListenableFuture<A>, Function<A, B>)
allAsList(Iterable<ListenableFuture<V>>) 返回一个 ListenableFuture ,该ListenableFuture 返回的 result 是一个 List,List 中的值是每个 ListenableFuture 的返回值,假如传入的其中之一 fails 或者 cancel,这 个Future fails 或者 canceled allAsList(ListenableFuture<V>...)
successfulAsList(Iterable<ListenableFuture<V>>) 返回一个 ListenableFuture ,该 Future 的结果包含所有成功的 Future,按照原来的顺序,当其中之一 Failed 或者 cancel,则用 null 替代 successfulAsList(ListenableFuture<V>...)

AsyncFunction<A, B> 中提供一个方法 ListenableFuture<B> apply(A input),它可以被用于异步变换值。


    List<ListenableFuture> queries;
    // The queries go to all different data centers, but we want to wait until they're all done or failed.

    ListenableFuture<List> successfulQueries = Futures.successfulAsList(queries);

    Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);

CheckedFuture

Guava 也提供了 CheckedFuture<V, X extends Exception> 接口。CheckedFuture 是一个 ListenableFuture ,其中包含了多个版本的 get 方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的 Future 更加容易 。将 ListenableFuture 转换成 CheckedFuture,可以使用 Futures.makeChecked(ListenableFuture<V>, Function<Exception, X>)

上一篇: 函数式编程 下一篇: Google-Guava Con...