J.U.C之AQS
标签:Java并发

AbstractQueuedSynchronizer - AQS

1. 同步组件

1.1 CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作

当程序执行过程中需要等待某个条件执行后才能继续后续的操作。如并行计算等。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created By liuyao on 2018/4/22 11:15.
 */
@Slf4j
public class CountDownLatchExample1 {
    private final static int threadCount=200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService= Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch=new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum=i;
            executorService.execute(()->{
                try {
                    test(threadNum);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();//执行countDown()方法
                }
            });
        }
        countDownLatch.await();//等待
        log.info("finished");
        executorService.shutdown(); //关闭线程池
    }

    private static void test(int threadNum) throws Exception{
        Thread.sleep(100);
        log.info("{}",threadNum);
        Thread.sleep(100);
    }
}

如果想让countDownLatch的await()方法执行一段时间后就停止执行,则可以在里面添加等待时间和时间的单位,超过这个时间以后,就不在等待了,而是先执行await()以后的方法

countDownLatch.await(10, TimeUnit.MILLISECONDS);

1.2 Semaphore

Semaphore 可以控制某个资源可被访问的个数,提供了 acquire(获取一个许可) 和 release (释放一个许可) 方法,Semaphore维护了当前访问的个数,通过提供同步机制来控制同时访问的个数,Semaphore可以实现有限个数的链表。适用于仅能提供有限访问的资源,比如数据库连接数。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created By liuyao on 2018/4/22 11:15.
 */
@Slf4j
public class SemaphoreExample1 {
    private final static int threadCount=20;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService= Executors.newCachedThreadPool();

        final Semaphore semaphore=new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum=i;
            executorService.execute(()->{
                try {
                    semaphore.acquire(); //获得一个许可
                    test(threadNum);
                    semaphore.release(); //释放一个许可
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                }
            });
        }
        executorService.shutdown(); //关闭线程池
    }

    private static void test(int threadNum) throws Exception{
        Thread.sleep(1000);
        log.info("{}",threadNum);
    }
}

上面执行的时候会三个一组的执行:

也可以一次性拿多个许可,修改上面的代码如下:

semaphore.acquire(3); //获得3个许可
test(threadNum);
semaphore.release(3); //释放3个许可

那么最后执行的结果将会一个一个执行,变成了单线程。

当然也可以尝试获得许可,如果没有获取到,则丢弃任务,不做,修改上面代码如下:

  if (semaphore.tryAcquire()){
		test(threadNum);
         semaphore.release(); //释放一个许可
    }

结果将会变成:

因为信号量只有三个,最后只有三个线程获得了许可,可以执行。

也可以设置 tryAcquire 等待时间

if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)){
        test(threadNum);
        semaphore.release(); 
    }

1.3 CyclicBarrier

字面意思是可循环使用(cyclic)的屏障(barrier)

让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行

CyclicBarrier执行的加一操作,当计数器的值达到我们设置初始值cnt时,进入的等待的状态会被唤醒,继续执行他们后续的操作。CyclicBarrier 可以重用,又称循环屏障。适合多线程计算数据,最后合并计算结果。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created By liuyao on 2018/4/22 14:57.
 */
@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier cyclicBarrier =new CyclicBarrier(3);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService= Executors.newCachedThreadPool();

        for (int i = 0; i < 6; i++) {
            final  int threadNum=i;
            Thread.sleep(1000);
            executorService.execute(()->{
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("{}",e);
                }
            });
        }
    }

    private static void race(int threadNum) throws InterruptedException, BrokenBarrierException {
        Thread.sleep(1000);
        log.info("{} is ready",threadNum);
        cyclicBarrier.await();  //线程到达这里进入等待状态,然后等所有线程到达barrier后继续执行。
        log.info("{} continue",threadNum);
    }
}

最后执行结果便是:

如果要使线程到达屏障时,先执行A操作:

static CyclicBarrier c=new CyclicBarrier(2,new A());

1.4 CountDownLatch和CyclicBarrier区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,所以CyclicBarrier能处理更为复杂的业务场景。 例如:计算错误的时候,可以重置计数器,并让线程重新计算一次。

1.5 ReentrantLock与锁

ReentrantLock实现是一种自旋锁,通过循环调用CAS操作来实现加锁,性能比较好是因为使线程避免进入线程内核状态的阻塞状态。当你需要使用下面ReentrantLock的三个独有的功能时,就需要使用它,否者都可以使用Synchronized

    private final static Lock lock=new ReentrantLock();

    private static  void add(){
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

1.6 ReentrantReadWriteLock

将读锁和写锁分离,在没有任何读写锁的情况下,才可以取得写入锁。

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * Created By liuyao on 2018/4/22 15:52.
 */
@Slf4j
public class LockExmaple2 {
    private final Map<String , Data> map=new TreeMap<>();

    private final ReentrantReadWriteLock lock=new ReentrantReadWriteLock();

    private final Lock readLock=lock.readLock();

    private final Lock writeLock =lock.writeLock();

    public Data get(String key){
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys(){
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public void put(String key,Data value){
        writeLock.lock();
        try {
            map.put(key,value);
        } finally {
            writeLock.unlock();
        }
    }
}

1.7 StampedLock

StampedLock 控制锁有三种模式,分别是写,读,乐观读。一个StampedLock有版本和模式两个部分组成。

测试用例:

private final static StampedLock lock=new StampedLock();

    private static  void add(){
        long stamped = lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamped);
        }

    }

下面这是官方JDK示例:

import java.util.concurrent.locks.StampedLock;

public class LockExample3 {

    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        //下面看看乐观读锁案例
        double distanceFromOrigin() { // A read-only method
            long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
            double currentX = x, currentY = y;  //将两个字段读入本地局部变量
            if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                try {
                    currentX = x; // 将两个字段读入本地局部变量
                    currentY = y; // 将两个字段读入本地局部变量
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        //下面是悲观读锁案例
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                    long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                    if (ws != 0L) { //这是确认转为写锁是否成功
                        stamp = ws; //如果成功 替换票据
                        x = newX; //进行状态改变
                        y = newY;  //进行状态改变
                        break;
                    } else { //如果不能成功转换为写锁
                        sl.unlockRead(stamp);  //我们显式释放读锁
                        stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                    }
                }
            } finally {
                sl.unlock(stamp); //释放读锁或写锁
            }
        }
    }
}

1.8 Condition

测试示例:

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class LockExample5 {

    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();//从实例里面取出Condition

        new Thread(() -> {
            try {
                reentrantLock.lock();
                log.info("wait signal"); // 1
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal"); // 4
            reentrantLock.unlock();
        }).start();

        new Thread(() -> {
            reentrantLock.lock();
            log.info("get lock"); // 2
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signalAll();
            log.info("send signal ~ "); // 3
            reentrantLock.unlock();
        }).start();
    }
}

线程一先执行 lock 方法,进入sync queue,然后执行await方法,输出wait signal,然后进入下面的Condition queue里面,此时线程二获得锁,输出get lock ,线程二休眠3秒后,执行signAll()方法,将线程一唤醒,线程一再次加入上面的sync queue,最后,线程二释放锁,线程一继续执行,最后输出 get signal。

  • 10 min read

CONTRIBUTORS


  • 10 min read