UDN-企业互联网技术人气社区

板块导航

浏览  : 461
回复  : 1

[讨论交流] Java开发教程重点(九)常用同步工具类

[复制链接]
王许柔的头像 楼主
发表于 2016-8-4 10:22:26 | 显示全部楼层 |阅读模式
  同步工具类可以使任何一种对象,只要该对象可以根据自身的状态来协调控制线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括:信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。

  闭锁

  首先我们来介绍闭锁。

  闭锁作用相当于一扇门:在闭锁到达某一状态之前,这扇门一直是关闭的,所有的线程都会在这扇门前等待(阻塞)。只有门打开后,所有的线程才会同时继续运行。

  闭锁可以用来确保某些活动直到其它活动都完成后才继续执行,例如:

  1、确保某个计算在其所有资源都被初始化之后才继续执行。二元闭锁(只有两个状态)可以用来表示“资源R已经被初始化”,而所有需要R操作都必须先在这个闭锁上等待。

  2、确保某个服务在所有其他服务都已经启动之后才启动。这时就需要多个闭锁。让S在每个闭锁上等待,只有所有的闭锁都打开后才会继续运行。

  3、等待直到某个操作的参与者(例如,多玩家游戏中的玩家)都就绪再继续执行。在这种情况下,当所有玩家都准备就绪时,闭锁将到达结束状态。

  CountDownLatch 是一种灵活的闭锁实现,可以用在上述各种情况中使用。闭锁状态包含一个计数器,初始化为一个正数,表示要等待的事件数量。countDown() 方法会递减计数器,表示等待的事件中发生了一件。await() 方法则阻塞,直到计数器值变为0。

  下面,我们使用闭锁来实现在主线程中计算多个子线程运行时间的功能。具体逻辑是使用两个闭锁,“起始门”用来控制子线程同时运行,“结束门”用来标识子线程是否都结束。
  1. package org.bupt.xiaoye;

  2. import java.util.concurrent.CountDownLatch;

  3. public class Test {
  4.         public static int nThread = 5;
  5.         public static void main(String[] args) throws InterruptedException{
  6.                 final CountDownLatch startGate = new CountDownLatch(1);
  7.                 final CountDownLatch endGate = new CountDownLatch(nThread);
  8.                 for(int i =0;i<nThread;i++){
  9.                         new Thread(){
  10.                                 @Override
  11.                                 public void run(){
  12.                                         try{
  13.                                         startGate.await();
  14.                                         Thread.sleep(300);}
  15.                                         catch(InterruptedException e){
  16.                                         }
  17.                                         finally{
  18.                                                 System.out.println(Thread.currentThread().getName()+"ended");
  19.                                                 endGate.countDown();
  20.                                         }
  21.                                 }
  22.                         }.start();
  23.                 }
  24.                 long start = System.nanoTime();
  25.                 startGate.countDown();
  26.                 endGate.await();
  27.                 long end = System.nanoTime();
  28.                 System.out.println("Total time :"+(end - start));
  29.         }
  30. }
复制代码

  FutureTask

  FutureTask也可以用作闭锁。它表示一种抽象的可生成结果的计算。是通过 Callable 来实现的,相当于一种可生成结果的 Runnable ,并且可处于以下三种状态:等待运行,正在运行,运行完成。当FutureTask进入完成状态后,它会停留在这个状态上。

  Future.get 用来获取计算结果,如果FutureTask还未运行完成,则会阻塞。FutureTask 将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask 的规范确保了这种传递过程能实现结果的安全发布。

  FutureTask在Executor框架中表示异步任务,还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。

  下面我们来构造一个简单的异步任务,来简单示范FutureTask的使用方法。
  1. package org.bupt.xiaoye;

  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.FutureTask;

  5. public class Preloader {
  6.         private final FutureTask<Integer> future = new FutureTask<Integer>(
  7.                         new Callable<Integer>() {
  8.                                 public Integer call() throws Exception {
  9.                                         Thread.sleep(3000);
  10.                                         return 969;
  11.                                 }
  12.                         });

  13.         private final Thread thread = new Thread(future);
  14.         public void start() {
  15.                 thread.start();
  16.         }

  17.         public Integer get() throws Exception {
  18.                 try{
  19.                 return future.get();}
  20.                 catch(ExecutionException e){
  21.                         Throwable cause = e.getCause();
  22.                         throw launderThrowable(cause);
  23.                 }
  24.         }
  25.         private static Exception launderThrowable(Throwable cause) {
  26.                 if(cause instanceof RuntimeException)return (RuntimeException )cause;
  27.                 else if(cause instanceof Error) throw (Error) cause;
  28.                 else throw new IllegalStateException("Not Checked",cause);
  29.         }
  30.         public static void main(String[] args) throws Exception{
  31.                 Preloader p = new Preloader();
  32.                 p.start();
  33.                 long start = System.currentTimeMillis();
  34.                 System.out.println(p.get());
  35.                 System.out.println(System.currentTimeMillis()-start);
  36.         }
  37. }
复制代码

  信号量

  之前讲的闭锁控制访问的时间,而信号量则用来控制访问某个特定资源的操作数量,控制空间。而且闭锁只能够减少,一次性使用,而信号量则申请可释放,可增可减。 计数信号量还可以用来实现某种资源池,或者对容器施加边界。

  Semaphone 管理这一组许可(permit),可通过构造函数指定。同时提供了阻塞方法acquire,用来获取许可。同时提供了release方法表示释放一个许可。

  Semaphone 可以将任何一种容器变为有界阻塞容器,如用于实现资源池。例如数据库连接池。我们可以构造一个固定长度的连接池,使用阻塞方法 acquire和release获取释放连接,而不是获取不到便失败。(当然,一开始设计时就使用BlockingQueue来保存连接池的资源是一种更简单的方法)

  例如我们将一个普通set容器变为可阻塞有界。
  1. package org.bupt.xiaoye;

  2. import java.util.Collections;
  3. import java.util.HashSet;
  4. import java.util.Set;
  5. import java.util.concurrent.Semaphore;

  6. public class BoundedHashSet<T> {
  7.         private final Set<T> set;
  8.         private Semaphore sem;

  9.         public BoundedHashSet(int bound) {
  10.                 if (bound < 1)
  11.                         throw new IllegalStateException();
  12.                 set = Collections.synchronizedSet(new HashSet<T>());
  13.                 sem = new Semaphore(bound);
  14.         }

  15.         public boolean add(T e) throws InterruptedException {
  16.                 sem.acquire();
  17.                 boolean wasAdded = false;
  18.                 try {
  19.                         wasAdded = set.add(e);
  20.                         return wasAdded;
  21.                 } finally {
  22.                         if (!wasAdded)
  23.                                 sem.release();
  24.                 }
  25.         }

  26.         public boolean remove(T e) {
  27.                 boolean wasRemoved = set.remove(e);
  28.                 if (wasRemoved)
  29.                         sem.release();
  30.                 return wasRemoved;
  31.         }

  32. }
复制代码

  栅栏

  栅栏(Bariier)类似于闭锁,它能阻塞一组线程知道某个事件发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待等待时间,而栅栏用于等待线程。

  CyclicBarrier 可以使一定数量的参与方反复的在栅栏位置汇聚,它在并行迭代算法中非常有用:将一个问题拆成一系列相互独立的子问题。当线程到达栅栏位置时,调用await() 方法,这个方法是阻塞方法,直到所有线程到达了栅栏位置,那么栅栏被打开,此时所有线程被释放,而栅栏将被重置以便下次使用。

  另一种形式的栅栏是Exchanger,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。例如当一个线程想缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用 Exchanger 来汇合,并将慢的缓冲区与空的缓冲区交换。当两个线程通过 Exchanger 交换对象时,这种交换就把这两个对象安全的发布给另一方。

  Exchanger 可能被视为 SynchronousQueue 的双向形式。我们也可以用两个SynchronousQueue来实现 Exchanger的功能。
  1. class FillAndEmpty {
  2.    Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
  3.    DataBuffer initialEmptyBuffer = ... a made-up type
  4.    DataBuffer initialFullBuffer = ...

  5.    class FillingLoop implements Runnable {
  6.      public void run() {
  7.        DataBuffer currentBuffer = initialEmptyBuffer;
  8.        try {
  9.          while (currentBuffer != null) {
  10.            addToBuffer(currentBuffer);
  11.            if (currentBuffer.isFull())
  12.              currentBuffer = exchanger.exchange(currentBuffer);
  13.          }
  14.        } catch (InterruptedException ex) { ... handle ... }
  15.      }
  16.    }

  17.    class EmptyingLoop implements Runnable {
  18.      public void run() {
  19.        DataBuffer currentBuffer = initialFullBuffer;
  20.        try {
  21.          while (currentBuffer != null) {
  22.            takeFromBuffer(currentBuffer);
  23.            if (currentBuffer.isEmpty())
  24.              currentBuffer = exchanger.exchange(currentBuffer);
  25.          }
  26.        } catch (InterruptedException ex) { ... handle ...}
  27.      }
  28.    }

  29.    void start() {
  30.      new Thread(new FillingLoop()).start();
  31.      new Thread(new EmptyingLoop()).start();
  32.    }
  33.   }
复制代码

  扩展阅读:

  Java开发教程重点(汇总)

相关帖子

发表于 2016-8-4 14:06:28 | 显示全部楼层
赞一个
使用道具 举报

回复

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部