UDN-企业互联网技术人气社区

板块导航

浏览  : 676
回复  : 1

[讨论交流] Java开发教程重点(八)阻塞队列和生产者-消费者模式

[复制链接]
王许柔的头像 楼主
发表于 2016-8-4 10:19:30 | 显示全部楼层 |阅读模式
  阻塞队列

  阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法。如果队列已经满了,那么put方法将阻塞直到有空间可以用;如果队列为空,那么take方法将一直阻塞直到有元素可用。队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞。一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式。

  意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用该数据的过程解耦开来以简化工作负载的管理。对于I/O密集型和 CPU密集型的生产者和消费者,可以带来许多性能优势。

  阻塞队列简化了消费者程序的编码,因为take操作会一直阻塞直到有可用的数据。在某些情况下,这种方式是非常合适的(例如:服务器应用程序中,没有客户端请求时便一直等待),在网络爬虫等有无穷工作需要完成时,实现更高的资源利用率。

  类库中包含了 BlockingQueue 的多种实现,其中,LinkedBlockingQueue 和 ArrayBlockingQueue 是 FIFO队列,二者分别与 LinkedList 和 ArrayList 类似,但比同步 List 拥有更好的并发性能。PriorityBlockingQueue 是一个按优先级排序的队列,要求实现 Comparable 或者使用 Comparator。

  最后一个实现是 SynchronousQueue ,实际上他不是一个真正的队列,它不会为队列中的元素维护存储空间。采用直接交付的机制,put 和take 会一直阻塞,除非put之后出现了take 或者 take之后出现了put操作。

  生产者-消费者模式

  下面我们利用生产者消费者模式建立一个类似于 Windows 索引服务。
  1. package org.bupt.xiaoye.chapter5_8;

  2. import java.io.File;
  3. import java.util.concurrent.BlockingQueue;

  4. public class Crawler implements Runnable {
  5.         private final BlockingQueue<File> b;
  6.         private final File root;

  7.         @Override
  8.         public void run() {
  9.                 System.out.println("Crawler begins to run!");
  10.                 if (root == null|| !root.exists())
  11.                         return;
  12.                 crawl(root);
  13.                 System.out.println("Crawler is shutdown!");

  14.         }

  15.         public void crawl(File root) {
  16.                 try {
  17.                         if (root.isFile()) {
  18.                                 System.out.println("Crawling " + root);
  19.                                 b.put(root);
  20.                         } else {
  21.                                 for (File f : root.listFiles()) {
  22.                                         crawl(f);
  23.                                 }
  24.                         }
  25.                 } catch (InterruptedException e) {
  26.                         System.out.println(e);
  27.                 }
  28.         }

  29.         public Crawler(BlockingQueue<File> b, File root) {
  30.                 this.b = b;
  31.                 this.root = root;
  32.         }

  33. }
复制代码

  1. package org.bupt.xiaoye.chapter5_8;

  2. import java.io.File;
  3. import java.util.concurrent.BlockingQueue;

  4. public class Indexer implements Runnable {
  5.         private final BlockingQueue<File> blockingQueue;
  6.         @Override
  7.         public void run() {
  8.                 System.out.println("Indexer begins to run!");

  9.                 try{
  10.                 while(true){
  11.                         indexer(blockingQueue.take());
  12.                 }
  13.                 }
  14.                 catch (InterruptedException e) {
  15.                         System.out.println(e);
  16.                 }
  17.                 System.out.println("Indexer is shutdown!");
  18.         }
  19.         
  20.         private void indexer(File file ){
  21.                 System.out.println("Indexing "+file.getAbsolutePath());
  22.         }

  23.         public Indexer(BlockingQueue<File> blockingQueue) {
  24.                 this.blockingQueue = blockingQueue;
  25.         }

  26. }
复制代码

  1. package org.bupt.xiaoye.chapter5_8;

  2. import java.io.File;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.SynchronousQueue;

  5. public class Starter {
  6.         public static final int BOUND = 1;
  7.         public static void main(String[] args) throws InterruptedException{
  8.                 BlockingQueue<File> blockingQueue = new SynchronousQueue<File>();
  9.                 File file = new File("E:\\BaiduYunDownload");
  10.                 Crawler c1 = new Crawler(blockingQueue,file);
  11.                 Indexer indexer = new Indexer(blockingQueue);
  12.                 Thread t1 = new Thread(c1);
  13.                 Thread t2 = new Thread(indexer);
  14.                 t1.start();
  15.                 t2.start();
  16.                 t1.join();
  17.                 t2.join();
  18.         }

  19. }
复制代码

  双端队列-工作密取

  Java6 增加了两种容器类型,Deque 和 BlockingDeque,他们分别对 Queue 和 BlockingQueue 进行了拓展。Deque 是一个双端队列,实现了在队列头和队列尾的高效插入和移除。具体实现包括 ArrayDeque 和 LinkedBlockingDeque。双端队列同样适用于另一种相关模式,即工作密取(Working stealing)。在生产者-消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己的双端队列中的全部工作,那么它可以从其它消费者双端队列末尾秘密地获取工作。密取工作模式比传统的生产者-消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减小了竞争。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是头部获取工作,因此进一步降低了队列上的竞争程度。

  工作密取非常适用于既是消费者也是生产者问题—当执行某个工作时可能导致出现更多的工作。例如,在网页爬虫程序在处理一个页面时,通常会发现有更多的页面需要处理。

  阻塞和中断

  线程可能会阻塞或者暂停执行,原因有很多种:等待I/O操作结束,等待获得一个锁,等待从 Thread.sleep 中醒来,或是等待另一个线程的计算结果。当线程阻塞时,它通常被挂起,并处于某种阻塞状态(BLOCKED、WAITING或TIMED_WAITING)。阻塞操作与执行时间很长的普通操作的差别在于,被阻塞的线程必须等待某个不受它控制的时间发生后才能继续执行。

  BlockingQueue 的 put 和take 方法会抛出受检查异常(Checked Exception) InterruptedException,这与类库中其他方法的做法相同,例如 Thread.sleep .当某方法抛出 InterruptedException时,表示该方法是一个阻塞方法。当方法抛出InterruptedException 时,有两种基本选择:

  传递 InterruptedException 避开这个异常通常是最明智的策略-只需要将 InterruptedException传递给方法的调用者。传递InterruptedException的方法包括,根本不捕获该异常或者捕获后再次抛出这个异常。(通常如果在自定义的方法中调用了阻塞方法,那么捕获到阻塞方法的中断异常后应该将其重新抛出)

  恢复中断 有时候不能抛出InterruptedException,例如代码是Runnable的一部分(因为run方法被定义为不抛出任何异常)。在这种情况下你,必须捕获InterruptedException,并通过调用当前线程的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。

  扩展阅读:

  Java开发教程重点(汇总)

相关帖子

发表于 2016-8-4 14:07:06 | 显示全部楼层
赞一个
使用道具 举报

回复

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部