UDN-企业互联网技术人气社区

板块导航

浏览  : 928
回复  : 2

[原生js] Netty 中的 Future & Promise

[复制链接]
小辫儿的头像 楼主
发表于 2017-1-7 15:20:33 | 显示全部楼层 |阅读模式
  Netty 源码中大量使用了异步编程,从代码实现角度看就是大量使用了线程池和 Future。

  熟悉 Java 5 的同学一定对 Future 不陌生。简单来说就是其代表了一个异步任务,任务将在未来某个时刻完成,而 Future 这个接口就是用来提供例如获取接口、查看任务状态等功能。

  Netty 扩展了 Java 5 引入的 Future 机制。从下面的类图我们可以看到相关类的关系:
12893818451506.jpg

  Netty 的 Future 接口

  需要注意的是,上面类图中有两个 Future,最上面的是 java.util.concurrent.Future,而其下面的则是 io.netty.util.concurrent.Future。

  JDK 的 Future 对象,该接口的方法如下:
  1. // 取消异步操作
  2. boolean cancel(boolean mayInterruptIfRunning);
  3. // 异步操作是否取消
  4. boolean isCancelled();
  5. // 异步操作是否完成,正常终止、异常、取消都是完成
  6. boolean isDone();
  7. // 阻塞直到取得异步操作结果
  8. V get() throws InterruptedException, ExecutionException;
  9. // 同上,但最长阻塞时间为timeout
  10. V get(long timeout, TimeUnit unit)
  11.     throws InterruptedException, ExecutionException, TimeoutException;
复制代码

  接口中只有 isDone() 方法判断一个异步操作是否完成,但是对于完成的定义过于模糊,JDK 文档指出正常终止、抛出异常、用户取消都会使 isDone() 方法返回真。在我们的使用中,我们极有可能是对这三种情况分别处理,而 JDK 这样的设计不能满足我们的需求。

  Netty 扩展了 JDK 的 Future 接口,扩展的方法如下:
  1. // 异步操作完成且正常终止
  2. boolean isSuccess();
  3. // 异步操作是否可以取消
  4. boolean isCancellable();
  5. // 异步操作失败的原因
  6. Throwable cause();
  7. // 添加一个监听者,异步操作完成时回调,类比JavaScript的回调函数
  8. Future<V> addListener(
  9.     GenericFutureListener<? extends Future<? super V>> listener);
  10. Future<V> removeListener(
  11.     GenericFutureListener<? extends Future<? super V>> listener);
  12. // 阻塞直到异步操作完成
  13. Future<V> await() throws InterruptedException;
  14. // 同上,但异步操作失败时抛出异常
  15. Future<V> sync() throws InterruptedException;
  16. // 非阻塞地返回异步结果,如果尚未完成返回null
  17. V getNow();
复制代码

1.png

  可知,Future 对象有两种状态尚未完成和已完成,其中已完成又有三种状态:成功、失败、用户取消。

  Future 接口中的方法都是 getter 方法而没有 setter 方法,也就是说这样实现的 Future 子类的状态是不可变的,如果我们想要变化,Netty 提供的解决方法是:使用可写的 Future 即 Promise。

  Netty 的 Promise

  Promise 是特殊的可写 Future。Promise 在继承 Future 的基础之上进行了扩展,用来设置 IO 操作的结果。

  当 Netty 进行 IO 操作的时候,会创建一个 Promise 对象,当操作完成或者失败的时候就会对 Promise 进行结果设置。
  1. Promise<V> setSuccess(V result);
  2. boolean trySuccess(V result);
  3. Promise<V> setFailure(Throwable cause);
  4. boolean tryFailure(Throwable cause);
  5. boolean setUncancellable();
复制代码

  Netty 提供了一个 Promise 的默认实现 DefaultPromise。主要是 setSuccess 方法和 await 方法的实现
复制代码

  Promise 接口继承自 Future 接口,它提供的 setter 方法与常见的 setter 方法大为不同。Promise 从 Uncompleted 到 Completed 的状态转变有且只能有一次,也就是说 setSuccess 和 setFailure 方法最多只会成功一个,此外,在 setSuccess 和 setFailure 方法中会通知注册到其上的监听者。

  一个简单实现
  1. class Person extends Thread {
  2.     BlockingQueue<Runnable> taskQueue; //任务队列
  3.     public Person(String name) {
  4.         super(name);
  5.         taskQueue = new LinkedBlockingQueue<>();
  6.     }

  7.     @Override
  8.     public void run() {
  9.         while(true) { //无限循环, 不断从任务队列取任务
  10.             try {
  11.                 Runnable task = taskQueue.take();
  12.                 task.run();
  13.             } catch (InterruptedException e) {
  14.                 e.printStackTrace();
  15.             }
  16.         }
  17.     }

  18.     public void submit(Runnable task) { //将任务提交到任务队列中去
  19.         taskQueue.offer(task);
  20.     }
  21. }
复制代码

  做数学题的例子
  1. void main() {

  2.     final Person wang = new Person("wang");
  3.     final Person li = new Person("li");
  4.     li.start(); //启动小王
  5.     wang.start(); //启动小李

  6.     wang.submit(new Runnable() { //提交一个简单的题
  7.         @Override
  8.         public void run() {
  9.             System.out.println(
  10.                 Thread.currentThread().getName() + "1. 这是一道简单的题");
  11.         }
  12.     });

  13.     wang.submit(new Runnable() { //提交一个复杂的题
  14.         @Override
  15.         public void run() {
  16.             li.submit(new Runnable() { //将复杂的题交给li来做
  17.                 @Override
  18.                 public void run() {
  19.                     System.out.println(
  20.                         Thread.currentThread().getName() + " 2. 这是一道复杂的题");
  21.                     try {
  22.                         Thread.sleep(2000);
  23.                     } catch (InterruptedException e) {
  24.                         e.printStackTrace();
  25.                     }
  26.                     wang.submit(new Runnable() { //做完之后将结果作为Task返回给wang
  27.                         @Override
  28.                         public void run() {
  29.                             System.out.println(
  30.                                 Thread.currentThread().getName() + "复杂题执行结果");
  31.                         }
  32.                     });
  33.                 }
  34.             });
  35.         }
  36.     });

  37.     wang.submit(new Runnable() { //提交一个简单的题
  38.         @Override
  39.         public void run() {
  40.             System.out.println(
  41.             `Thread.currentThread().getName() + " 3. 这是一道简单的题");
  42.         }
  43.     });
  44. }
复制代码

  执行结果是
  1. wang 1. 这是一道简单的题
  2. wang 3. 这是一道简单的题
  3. li 2. 这是一道复杂的题
  4. wang 复杂题执行完毕
复制代码

  Netty 中的实现
  1. final DefaultEventExecutor wang = new DefaultEventExecutor();
  2. final DefaultEventExecutor li = new DefaultEventExecutor();

  3. wang.execute(new Runnable() {
  4.     @Override
  5.     public void run() {
  6.         System.out.println(
  7.             Thread.currentThread().getName() + " 1. 这是一道简单的题");
  8.     }
  9. });

  10. wang.execute(new Runnable() {
  11.     @Override
  12.     public void run() {
  13.         final Promise<Integer> promise = wang.newPromise();
  14.         promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
  15.             @Override
  16.             public void operationComplete(Future<? super Integer> future)
  17.                 throws Exception {
  18.                 System.out.println(Thread.currentThread().getName() + "复杂题执行结果");
  19.             }
  20.         });
  21.         li.execute(new Runnable() {
  22.             @Override
  23.             public void run() {
  24.                 System.out.println(
  25.                     Thread.currentThread().getName() + " 2. 这是一道复杂的题");
  26.                 promise.setSuccess(10);
  27.             }
  28.         });
  29.     }
  30. });

  31. wang.execute(new Runnable() {
  32.     @Override
  33.     public void run() {
  34.         System.out.println(
  35.             Thread.currentThread().getName() + " 3. 这是一道简单的题");
  36.     }
  37. });
复制代码

  执行结果是
  1. defaultEventExecutor-1-1 1. 这是一道简单的题
  2. defaultEventExecutor-1-1 3. 这是一道简单的题
  3. defaultEventExecutor-3-1 2. 这是一道复杂的题
  4. defaultEventExecutor-1-1 复杂题执行结果
复制代码

  看起来和简单实现中的代码差不多, DefaultEventExecutor可以简单的看做拥有一个队列的线程。与简单实现不同的是, 小李执行完任务后通知小王的方式。

  在 Netty 中 Promise 代码一个可写的异步任务结果,以上代码的含义是:

  生成一个 promise,为该 promise 注册一个 listener,当任务执行完后回调该 listener。

  在另一个线程中执行一个异步任务,执行完后,将 promise 设置为成功,回调 listener,该 listener 在异步任务提交者线程中执行。

  一种误用

  为了实现以上问题,还可以像下面这样写。
  1. wang.submit(new Runnable() {
  2.     @Override
  3.     public void run() {
  4.         System.out.println(Thread.currentThread().getName() + " 1. 这是一道简单的题");
  5.     }
  6. });

  7. wang.submit(new Runnable() {
  8.     @Override
  9.     public void run() {
  10.         Future<String> result = li.submit(new Callable<String>() {
  11.             @Override
  12.             public String call() throws Exception {
  13.                 for(int i = 0; i <= 10000000; i++){
  14.                     for(int j = 0; j <= 1000000; j++) {
  15.                         ;
  16.                     }
  17.                 }
  18.                 System.out.println(
  19.                     Thread.currentThread().getName() + " 2. 这是一道复杂的题");
  20.                 return null;
  21.             }
  22.         });
  23.         result.addListener(new GenericFutureListener<Future<? super String>>() {
  24.             @Override
  25.             public void operationComplete(
  26.                 Future<? super String> future) throws Exception {
  27.                 System.out.println(
  28.                     Thread.currentThread().getName() + "3. 复杂题执行结果");
  29.             }
  30.         });
  31.     }
  32. });

  33. wang.submit(new Runnable() {
  34.     @Override
  35.     public void run() {
  36.         System.out.println(Thread.currentThread().getName() + " 3. 这是一道简单的题");
  37.     }
  38. });
复制代码

  执行结果是:
  1. defaultEventExecutor-1-1 1. 这是一道简单的题
  2. defaultEventExecutor-3-1 2. 这是一道复杂的题
  3. defaultEventExecutor-1-1 3. 这是一道简单的题
  4. defaultEventExecutor-3-1 3. 复杂题执行结果
复制代码

  这样写似乎更简单,但运行一下会发现,listener 的执行却是由小李来处理,按理说,小王交给小李一个任务,小李做完之后将结果返回给小王,应该是小王处理才对啊,可为什么是小李来处理呢?

  查看源码可知,DefaultEventExecutor.submit 方法将一个 Callable 包装成一个 DefaultPromise,并且将执行者作为 DefaultPromise 的 exectutor,为什么要这样做呢?

  Netty 的异步回调机制需要提交者必须有一个 TaskQueue 才行,而这里 wang 并不一定含有一个 TaskQueue,为了防止因为提交者没有 TaskQueue 而出错,所以只能赋值为执行者,而使用 newPromise 就没有问题,因为 newPromise 是 DefaultEventExecutor 的接口,而 DefaultEventExecutor 肯定有一个 TaskQueue。

  Netty 源码中对异步回调的使用

  在 Netty 中,ChannelHandlerContext 的 write(msg, promise) 和 bind(address, promise) 等操作都是一个异步操作,为了使该操作不阻塞当前 executor 的执行,一般这样使用:
  1. Promise result = ctx.newPromise(ctx.executor());
  2. ctx.write(msg, result);
  3. result.addListener(listener);
复制代码

相关帖子

发表于 2017-1-7 15:21:02 | 显示全部楼层
貌似看过类似的文章恩,排版更清晰点就更好了
使用道具 举报

回复

发表于 2017-1-8 00:55:43 | 显示全部楼层
鄙视楼下的顶帖没我快,哈哈
使用道具 举报

回复

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部