UDN-企业互联网技术人气社区

板块导航

浏览  : 521
回复  : 1

[讨论交流] JAVA 多线程实现生产者 —— 消费者

[复制链接]
小辫儿的头像 楼主
  最近在学习Java多线程编程,当然少不了学习经典的 生产者——消费者 模型。

  一、生产者——消费者

  在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。

  单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。大概的结构如下图。

  为了不至于太抽象,我们举一个寄信的例子(虽说这年头寄信已经不时兴,但这个例子还是比较贴切的)。假设你要寄一封平信,大致过程如下:

  1、你把信写好——相当于生产者制造数据

  2、你把信放入邮筒——相当于生产者把数据放入缓冲区

  3、邮递员把信从邮筒取出——相当于消费者把数据取出缓冲区

  4、邮递员把信拿去邮局做相应的处理——相当于消费者处理数据

  生产者——消费者模式经常用到,关于他的优点和怎么灵活运用,大家可以查阅相关资料,自己好好总结一下。

  二、生产者——消费者的实现。

  生产者——消费者的实现方法有以下几种:

  1.用互斥锁实现,代码如下:
  1. package multiThreads;
  2. import java.util.concurrent.*;
  3. import java.util.concurrent.locks.*;

  4. /**
  5. * @author Administrator
  6. * 生产者——消费者
  7. * 缓冲区为FIFO队列
  8. */
  9. public class ConsumerProducer {
  10.   //定义缓冲池  
  11.   private static Buffer buffer = new Buffer();

  12.   public static void main(String[] args) {
  13.     //创建含有两个线程的线程池用于执行从缓冲区读写的操作  
  14.     ExecutorService executor = Executors.newFixedThreadPool(2);
  15.     executor.execute(new ProducerTask());
  16.     executor.execute(new ConsumerTask());
  17.     executor.shutdown();
  18.   }

  19.    
  20.   //生产者任务类  
  21.   private static class ProducerTask implements Runnable {
  22.     public void run() {
  23.       try {
  24.         int i = 1;
  25.         while (true) {
  26.           System.out.println("生产者写入 " + i);
  27.           //向缓存区写入整数  
  28.           buffer.write(i++);
  29.           //线程休眠随机时间  
  30.           Thread.sleep((int)(Math.random() * 10000));
  31.         }
  32.       } catch (InterruptedException ex) {
  33.         ex.printStackTrace();
  34.       }
  35.     }
  36.   }

  37.   //消费者任务类  
  38.   private static class ConsumerTask implements Runnable {
  39.     public void run() {
  40.       try {
  41.         while (true) {
  42.           //从缓存区读取整数  
  43.           System.out.println("\t\t\t消费者读取 " + buffer.read());
  44.           //线程休眠随机时间  
  45.           Thread.sleep((int)(Math.random() * 10000));
  46.         }
  47.       } catch (InterruptedException ex) {
  48.         ex.printStackTrace();
  49.       }
  50.     }
  51.   }

  52.   //定义缓存区,用FIFO队列存取数据  
  53.   private static class Buffer {
  54.       
  55.     //缓存区容量   
  56.     private static final int CAPACITY = 1;  
  57.      
  58.     //用LinkedList定义FIFO链队  
  59.     private java.util.LinkedList<Integer> queue = new java.util.LinkedList<Integer>();

  60.     //定义互斥锁  
  61.     private static Lock lock = new ReentrantLock();
  62.      
  63.     //条件:缓存区非空  
  64.     private static Condition notEmpty = lock.newCondition();
  65.     //条件:缓存区已满  
  66.     private static Condition notFull = lock.newCondition();

  67.     //从缓存区读取整数  
  68.     public void write(int value) {
  69.       lock.lock();
  70.       try {
  71.         while (queue.size() == CAPACITY) {
  72.           System.out.println("等待缓存区未满");
  73.           notFull.await();
  74.         }

  75.         //缓存区未满条件被唤醒之后,向队列添加  
  76.         queue.offer(value);
  77.         //向缓存区非空条件发送信号  
  78.         notEmpty.signal();  
  79.       } catch (InterruptedException ex) {
  80.         ex.printStackTrace();
  81.       } finally {
  82.         lock.unlock();  
  83.       }
  84.     }

  85.     @SuppressWarnings("finally")
  86.     public int read() {
  87.       int value = 0;
  88.       lock.lock();
  89.       try {
  90.         //当缓存区为空时候:等待非空条件  
  91.         while (queue.isEmpty()) {
  92.           System.out.println("\t\t\t等待唤醒非空条件");
  93.           notEmpty.await();
  94.         }

  95.         //读取并删除数据  
  96.         value = queue.remove();
  97.         //向缓存区未满条件发送信号  
  98.         notFull.signal();  
  99.       } catch (InterruptedException ex) {
  100.         ex.printStackTrace();
  101.       } finally {
  102.         lock.unlock();
  103.         return value;
  104.       }
  105.     }
  106.   }
  107. }
复制代码
  1. package multiThreads;
  2. import java.util.concurrent.*;
  3. import java.util.concurrent.locks.*;

  4. /**
  5. * @author Administrator
  6. * 生产者——消费者
  7. * 缓冲区为FIFO队列
  8. */
  9. public class ConsumerProducer {
  10.   //定义缓冲池
  11.   private static Buffer buffer = new Buffer();

  12.   public static void main(String[] args) {
  13.     //创建含有两个线程的线程池用于执行从缓冲区读写的操作
  14.     ExecutorService executor = Executors.newFixedThreadPool(2);
  15.     executor.execute(new ProducerTask());
  16.     executor.execute(new ConsumerTask());
  17.     executor.shutdown();
  18.   }

  19.   
  20.   //生产者任务类
  21.   private static class ProducerTask implements Runnable {
  22.     public void run() {
  23.       try {
  24.         int i = 1;
  25.         while (true) {
  26.           System.out.println("生产者写入 " + i);
  27.           //向缓存区写入整数
  28.           buffer.write(i++);
  29.           //线程休眠随机时间
  30.           Thread.sleep((int)(Math.random() * 10000));
  31.         }
  32.       } catch (InterruptedException ex) {
  33.         ex.printStackTrace();
  34.       }
  35.     }
  36.   }

  37.   //消费者任务类
  38.   private static class ConsumerTask implements Runnable {
  39.     public void run() {
  40.       try {
  41.         while (true) {
  42.           //从缓存区读取整数
  43.           System.out.println("\t\t\t消费者读取 " + buffer.read());
  44.           //线程休眠随机时间
  45.           Thread.sleep((int)(Math.random() * 10000));
  46.         }
  47.       } catch (InterruptedException ex) {
  48.         ex.printStackTrace();
  49.       }
  50.     }
  51.   }

  52.   //定义缓存区,用FIFO队列存取数据
  53.   private static class Buffer {
  54.          
  55.         //缓存区容量  
  56.     private static final int CAPACITY = 1;
  57.    
  58.     //用LinkedList定义FIFO链队
  59.     private java.util.LinkedList<Integer> queue = new java.util.LinkedList<Integer>();

  60.     //定义互斥锁
  61.     private static Lock lock = new ReentrantLock();
  62.    
  63.     //条件:缓存区非空
  64.     private static Condition notEmpty = lock.newCondition();
  65.     //条件:缓存区已满
  66.     private static Condition notFull = lock.newCondition();

  67.     //从缓存区读取整数
  68.     public void write(int value) {
  69.       lock.lock();
  70.       try {
  71.         while (queue.size() == CAPACITY) {
  72.           System.out.println("等待缓存区未满");
  73.           notFull.await();
  74.         }

  75.         //缓存区未满条件被唤醒之后,向队列添加
  76.         queue.offer(value);
  77.         //向缓存区非空条件发送信号
  78.         notEmpty.signal();
  79.       } catch (InterruptedException ex) {
  80.         ex.printStackTrace();
  81.       } finally {
  82.         lock.unlock();
  83.       }
  84.     }

  85.     @SuppressWarnings("finally")
  86.         public int read() {
  87.       int value = 0;
  88.       lock.lock();
  89.       try {
  90.         //当缓存区为空时候:等待非空条件
  91.         while (queue.isEmpty()) {
  92.           System.out.println("\t\t\t等待唤醒非空条件");
  93.           notEmpty.await();
  94.         }

  95.         //读取并删除数据
  96.         value = queue.remove();
  97.         //向缓存区未满条件发送信号
  98.         notFull.signal();
  99.       } catch (InterruptedException ex) {
  100.         ex.printStackTrace();
  101.       } finally {
  102.         lock.unlock();
  103.         return value;
  104.       }
  105.     }
  106.   }
  107. }
复制代码

   二、采用阻塞FIFO队列实现。
  1. package multiThreads;
  2. import java.util.concurrent.*;

  3. /**
  4. * @author Administrator
  5. * 采用阻塞队列实现生产者--消费者
  6. */
  7. public class ConsumerProducerUsingBlockingQueue {

  8.   //定义阻塞FIFO队列  
  9.   private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(2);
  10.    
  11.   //private static LinkedBlockingQueue<Integer> buffer = new LinkedBlockingQueue<Integer>(2);  

  12.   public static void main(String[] args) {
  13.     //创建两个线程的线程池  
  14.     ExecutorService executor = Executors.newFixedThreadPool(2);
  15.     executor.execute(new ProducerTask());
  16.     executor.execute(new ConsumerTask());
  17.     executor.shutdown();
  18.   }

  19.   //生产者任务:向缓冲池中添加整数  
  20.   private static class ProducerTask implements Runnable {
  21.     public void run() {
  22.       try {
  23.         int i = 1;
  24.         while (true) {
  25.           System.out.println("生产者写入 " + i);
  26.           buffer.put(i++);
  27.           // 让线程休眠若干秒  
  28.           Thread.sleep((int)(Math.random() * 10000));
  29.         }
  30.       } catch (InterruptedException ex) {
  31.         ex.printStackTrace();
  32.       }
  33.     }
  34.   }

  35.   //消费者任务:从缓冲池中读取并删除整数  
  36.   private static class ConsumerTask implements Runnable {
  37.     public void run() {
  38.       try {
  39.         while (true) {
  40.           System.out.println("\t\t\t消费者读取 " + buffer.take());
  41.           //让线程休眠若干秒  
  42.           Thread.sleep((int)(Math.random() * 10000));
  43.         }
  44.       } catch (InterruptedException ex) {
  45.         ex.printStackTrace();
  46.       }
  47.     }
  48.   }
  49. }
复制代码
  1. package multiThreads;
  2. import java.util.concurrent.*;

  3. /**
  4. * @author Administrator
  5. * 采用阻塞队列实现生产者--消费者
  6. */
  7. public class ConsumerProducerUsingBlockingQueue {

  8.   //定义阻塞FIFO队列
  9.   private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(2);
  10.   
  11.   //private static LinkedBlockingQueue<Integer> buffer = new LinkedBlockingQueue<Integer>(2);

  12.   public static void main(String[] args) {
  13.     //创建两个线程的线程池
  14.     ExecutorService executor = Executors.newFixedThreadPool(2);
  15.     executor.execute(new ProducerTask());
  16.     executor.execute(new ConsumerTask());
  17.     executor.shutdown();
  18.   }

  19.   //生产者任务:向缓冲池中添加整数
  20.   private static class ProducerTask implements Runnable {
  21.     public void run() {
  22.       try {
  23.         int i = 1;
  24.         while (true) {
  25.           System.out.println("生产者写入 " + i);
  26.           buffer.put(i++);
  27.           // 让线程休眠若干秒
  28.           Thread.sleep((int)(Math.random() * 10000));
  29.         }
  30.       } catch (InterruptedException ex) {
  31.         ex.printStackTrace();
  32.       }
  33.     }
  34.   }

  35.   //消费者任务:从缓冲池中读取并删除整数
  36.   private static class ConsumerTask implements Runnable {
  37.     public void run() {
  38.       try {
  39.         while (true) {
  40.           System.out.println("\t\t\t消费者读取 " + buffer.take());
  41.           //让线程休眠若干秒
  42.           Thread.sleep((int)(Math.random() * 10000));
  43.         }
  44.       } catch (InterruptedException ex) {
  45.         ex.printStackTrace();
  46.       }
  47.     }
  48.   }
  49. }
复制代码

  当然实现生产者——消费者还有其他方式。

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部