Atomic相关
标签:Java并发

Atomic相关

对于并发控制,我们可以采用悲观的策略,使用锁,它总是假设每一次的临界区操作都会产生冲突,每次操作都小心翼翼,如果有多个线程需要访问临界区资源,就宁可牺牲性能让线程等待,故锁会阻塞线程执行。

而无锁是一种乐观策略,它会假设对资源的访问是没有冲突的,既然没有冲突,那么所有的线程都可以在不停顿的情况下持续执行,如果遇到冲突,无锁策略采用比较交换的技术来鉴别线程冲突(CAS Compare And Swap),一旦检测到冲突产生,就重试当前操作到没有冲突为止。

CAS算法包含三个值(V,A,B)一个当前内存值V、旧的预期值A、即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。

来看一下atomic包下包含了那些类:

UnSafe

对于CAS操作,我们使用的是Unsafe下的类似指针操作的一些native函数,比如:

public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);

AtomicReference

AtomicReference可以确保你在修改对象引用时的线程安全性

比如:

AtomicReference<Integer> money=new AtomicReference<Integer>();
money.set(19);//赋值

AtomicStampedReference

由于AtomicReference没办法解决CAS中的ABA问题,所以我们可以使用AtomicStampedReference,AtomicStampedReference不仅维护了对象值,还维护了一个状态值(类似于时间戳),当AtomicStampedReference被修改时,除了要修改对象值外,还有修改状态值,这样CAS操作,要保证对象值和状态值都要满足各自的期望值,才可以修改成功了。

下面这个例子余额小于20元就充值20元,但只能充一次,这个时候就可以用AtomicStampedReference,这样当我们充值完消费后,就不会发生重复充值的情况。

public class AtomicStampedReferenceDemo {
    static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            final int timestamp=money.getStamp();
            new Thread(){
                @Override
                public void run() {
                    while (true){
                        Integer m=money.getReference();
                        if (m < 20){
                            if (money.compareAndSet(m,m+20,timestamp,timestamp+1)){
                                System.out.println("余额小于20元,充值成功:"+money.getReference()+"元");
                                break;
                            }
                        }else {
                            System.out.println("余额大于20元,无需充值");
                            break;
                        }
                    }
                }
            }.start();
        }
        new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    while (true){
                        int timestamp=money.getStamp();
                        Integer m=money.getReference();
                        if (m>10){
                            System.out.println("大于10元");
                            if (money.compareAndSet(m,m-10,timestamp,timestamp+1)){
                                System.out.println("成功消费,余额:"+money.getReference());
                                break;
                            }
                        }else {
                            System.out.println("余额不足");
                            break;
                        }
                    }
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }
}

运行结果为:

余额大于20元,无需充值
余额大于20元,无需充值
余额小于20元,充值成功:39元
大于10元
成功消费,余额:29
大于10元
成功消费,余额:19
大于10元
成功消费,余额:9
余额不足
余额不足
余额不足
余额不足
余额不足
余额不足
余额不足

我们先启动三个线程充值,只有一个线程可以充值成功,其余两个线程充值失败,但消费小于20元以后,也不会发生重复充值。

AtomicIntegerArray

除了Atomic可以操作基本的数据类型外,还可以操作数组等复合结构,比如AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。它们同样也是使用CAS进行在多线程下操作

public class AtomicIntegerArrayDemo {
    static AtomicIntegerArray arr=new AtomicIntegerArray(10);
    public static class AddThread implements Runnable{

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                arr.getAndIncrement(i%arr.length());
            }
        }

    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts=new Thread[10];
        for (int i = 0; i < 10; i++) {
            ts[i]=new Thread(new AddThread());
        }
        for (int i = 0; i < 10; i++) {
            ts[i].start();
        }
        for (int i = 0; i < 10; i++) {
            ts[i].join();
        }
        System.out.println(arr);
    }
}

上面采用 arr.getAndIncrement(i%arr.length());对每个元素进行累加1000次,一共10个线程进行累加,如果不是线程安全的,那么某些元素的值可能会小于10000,如果是线程安全的话,那么将是:

[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

AtomicIntegerFieldUpdater

让普通的变量也可以用CAS来保证线程的安全性,除了Integer,还可以保证Long,Reference,下面是一个投票的例子:

public class AtomicIntegerFieldUpdaterDemo {
    public static class Candidate{
        int id;
        volatile int score;
    }
    public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater=AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
    public static AtomicInteger allScore=new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        final Candidate stu=new Candidate();
        Thread[] t=new Thread[10000];
        for (int i = 0; i < 10000; i++) {
            t[i]=new Thread(){
                @Override
                public void run() {
                    if (Math.random()>0.4){
                        scoreUpdater.incrementAndGet(stu);
                        allScore.incrementAndGet();
                    }
                }
            };
            t[i].start();
        }
        for (int i = 0; i < 10000; i++) {
            t[i].join();
        }
        System.out.println("score="+stu.score);
        System.out.println("allScore="+allScore);
    }
}

由于Candidate类的中score只是volatile的,不能保证原子性,故这个时候我们可以使用AtomicIntegerFieldUpdater进行包装一下:

score=5978
allScore=5978

使用AtomicIntegerFieldUpdater要注意几点:

  1. Updater只能修改它可见范围内的变量,因为Updater使用反射得到这个变量,如果变量不可见,那么就会出错,如上面的score不能声明为private的
  2. 为了保证变量的正确读取,必须要将变量声明为volatile类型,否则就会运行报错
  3. 由于CAS会通过对象实例的偏移量直接进行赋值,因此它不支持static静态变量。
  • 6 min read

CONTRIBUTORS


  • 6 min read