Guarded Suspension(保护性暂挂)模式
标签:并发模式

Guarded Suspension(保护性暂挂)模式

多线程情况下,将一个任务分解为不同的部分,交由不同的线程执行,线程之间相互协作。可能存在一个线程执行前需要满足一些特定的条件,在该条件未满足的情况下,暂停执行,使线程处于 Waiting状态,直到该条件满足后才能继续执行。

Guarded Suspension的核心是一个受保护方法(Guarded Method)。该方法执行需要满足特定的条件(Predicate)。

1. 架构

2. 实例

保护条件 Predicate

public interface Predicate {
    boolean evaluate();
}

目标动作 GuardedAction

import java.util.concurrent.Callable;

/**
 * @author liuyao
 * @date 2018/08/24
 */
public abstract class GuardedAction<V> implements Callable<V> {
    protected final Predicate guard;

    protected GuardedAction(Predicate guard) {
        this.guard = guard;
    }
}

Blocker

import java.util.concurrent.Callable;

/**
 * @author liuyao
 * @date 2018/08/24
 */
public interface Blocker {
    /**
     * 在保护条件成立时执行目标动作,否则阻塞当前线程,直到保护条件成立
     * @param guardedAction
     * @param <V>
     * @return
     * @throws Exception
     */
    <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception;

    /**
     * 执行stateOperation所指定的操作后,决定是否唤醒本Blocker所暂挂的所有线程中的一个
     * @param stateOperation 更改状态操作,其call方法的返回值为true时,该方法才会唤醒被暂挂的线程
     * @throws Exception
     */
    void signalAfter(Callable<Boolean> stateOperation) throws Exception;

    void signal() throws InterruptedException;

    /**
     * 执行stateOperation所指定的操作后,决定是否唤醒本Blocker所暂挂的所有线程中的所有线程
     * @param stateOperation 更改状态操作,其call方法的返回值为true时,该方法才会唤醒被暂挂的线程
     * @throws Exception
     */
    void broadcastAfter(Callable<Boolean> stateOperation) throws Exception;
}

ConditionVarBlocker

import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author liuyao
 * @date 2018/08/24
 */
public class ConditionVarBlocker implements Blocker {
    private final Lock lock;
    private final Condition condition;

    public ConditionVarBlocker(Lock lock) {
        this.lock = lock;
        this.condition=lock.newCondition();
    }

    public ConditionVarBlocker() {
        this.lock=new ReentrantLock();
        this.condition=lock.newCondition();
    }

    @Override
    public <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception {
        lock.lockInterruptibly();
        V result;
        try {
            final Predicate guard=guardedAction.guard;
            while (!guard.evaluate()){
                condition.await();
            }
            result=guardedAction.call();
            return result;
        }finally {
            lock.unlock();
        }
    }

    @Override
    public void signalAfter(Callable<Boolean> stateOperation) throws Exception {
        lock.lockInterruptibly();
        try {
            if (stateOperation.call()){
                condition.signal();
            }
        }finally {
            lock.unlock();
        }
    }

    @Override
    public void signal() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

    @Override
    public void broadcastAfter(Callable<Boolean> stateOperation) throws Exception {
        lock.lockInterruptibly();
        try {
            if (stateOperation.call()){
                condition.signalAll();
            }
        }finally {
            lock.unlock();
        }
    }
}

AlarmInfo

public class AlarmInfo {
    private String info;
}

AlarmAgent

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;

/**
 * 负责连接告警服务器,并发送告警信息给服务器
 * @author liuyao
 * @date 2018/08/24
 */
public class AlarmAgent {
//    用于记录AlarmAgent是否连接上告警服务器
    private volatile boolean connectedToServer = false;

    private final Predicate agentConnected =new Predicate() {
        @Override
        public boolean evaluate() {
            return connectedToServer;
        }
    };

    private final Blocker blocker=new ConditionVarBlocker();

//    心跳定时器
    private final Timer heartbeatTimer=new Timer(true);

    /**
     * 发送告警信息
     * @param alarm
     * @throws Exception
     */
    public void sendAlarm(final AlarmInfo alarm) throws Exception {
//        可能需要等待,直到AlarmAgent连接上告警服务器(或者连接中断后重新连上服务器)
        GuardedAction<Void> guardedAction=new GuardedAction<Void>(agentConnected) {
            @Override
            public Void call() throws Exception {
                doSendAlarm(alarm);
                return null;
            }
        };
        blocker.callWithGuard(guardedAction);
    }

    /**
     * 通过网络连接将告警信息发送给服务器
     * @param alarm
     */
    private void doSendAlarm(AlarmInfo alarm) {
        System.out.println("sending alarm "+alarm);
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void init(){
        //...
//        告警服务器连接线程
        Thread connectingThread =new Thread(new ConnectingTask());

        connectingThread.start();
        heartbeatTimer.schedule(new HeartbeatTask(),60000,2000);
    }

    public void disconnect(){
        System.out.println("disconnected from alarm server");
        connectedToServer=false;
    }

    protected void onConnected(){
        try {
            blocker.signalAfter(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    connectedToServer=true;
                    System.out.println("Connected to Server");
                    return Boolean.TRUE;
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    protected void onDisConnected(){
        connectedToServer=false;
    }

    /**
     * 负责与告警服务器建立连接
     */
    private class ConnectingTask implements Runnable{

        @Override
        public void run() {
            //...
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            onConnected();
        }
    }

    /**
     * 心跳定时任务,定时检查与告警服务器的连接是否正常,发现异常后自动重新连接
     */
    private class HeartbeatTask extends TimerTask{

        @Override
        public void run() {
            if (!testConnection()){
                onDisConnected();
                reconnect();
            }
        }
    }

    private boolean testConnection(){
        //...
        return true;
    }

    private void reconnect(){
        ConnectingTask connectingThread=new ConnectingTask();
//        直接在心跳定时器线程中执行
        connectingThread.run();
    }
}

AlarmAgent里面有两个内部类,负责连接告警服务器和进行心跳连接,当我们调用sendAlarm方法的时候,先去构造一个guardedAction,将我们的保护条件传进guardedAction里面,最后将该guardedAction传进我们之前构造的一个blocker里面,这样我们就可以在blocker的实现类ConditionVarBlocker中,利用Java提供的Lock进行保护条件检测,当保护条件不满足的情况下,该方法一直阻塞,直到条件满足后,执行方法并将结果返回。

可见上面对于执行条件的判断都放在了blocker的实现类中,我们构建多线程任务的时候,只需把具体的保护条件和受保护方法构造后,传进blocker进行判断就可以了,条件满足就会执行并返回结果。

3. 嵌套监视器锁死

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;


/**
 * 本程序是为了演示“嵌套监视器锁死“而写的,因此本程序需要通过手工终止进程才能结束。
 * 
 * @author Viscent Huang
 *
 */
public class NestedMonitorLockoutExample {

    public static void main(String[] args) {
        final Helper helper = new Helper();
        System.out.println("Before calling guaredMethod.");

        Thread t = new Thread(new Runnable() {

            @Override
            public void run() {
                String result;
                result = helper.xGuarededMethod("test");
                System.out.println(result);
            }

        });
        t.start();

        final Timer timer = new Timer();

        // 延迟50ms调用helper.stateChanged方法
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                helper.xStateChanged();
                timer.cancel();
            }

        }, 50, 10);

    }

    private static class Helper {
        private volatile boolean isStateOK = false;
        private final Predicate stateBeOK = new Predicate() {

            @Override
            public boolean evaluate() {
                return isStateOK;
            }

        };

        private final Blocker blocker = new ConditionVarBlocker();

        public synchronized String xGuarededMethod(final String message) {
            GuardedAction<String> ga = new GuardedAction<String>(stateBeOK) {

                @Override
                public String call() throws Exception {
                    return message + "->received.";
                }

            };
            String result = null;
            try {
                result = blocker.callWithGuard(ga);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return result;
        }

        public synchronized void xStateChanged() {
            try {
                blocker.signalAfter(new Callable<Boolean>() {

                    @Override
                    public Boolean call() throws Exception {
                        isStateOK = true;
                        System.out.println("state ok.");
                        return Boolean.TRUE;
                    }

                });
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

上面的的 xGuarededMethodxStateChanged 通过了synchronized进行加锁,导致了程序运行之后就会出现只显示

Before calling guaredMethod.

出现这个可见我们在主函数里面执行了 t.start()方法后,xGuarededMethod 一直没有返回

现在来分析一下原因:如果我们在这两个方法都加上synchronized的话,在t线程启动进入 xGuarededMethod时,由于是同步的需要获得一个锁(它所属的NestedMonitorLockoutExample实例),在ConditionVarBlockercallWithGuard 的方法中有condition.await(); ,在调用await方法后会释放之前其所属的Condition实例相关的锁,但是其synchronized的NestedMonitorLockoutExample并没有释放,这样后面的xStateChanged的就没有办法获得NestedMonitorLockoutExample锁,就没有办法唤醒前面阻塞在await中的方法,所以最后只会输出一句话。

下面我们来通过JVisualVM查看一下内存:

3.总结

上面的代码中,PredicateGuardedActionBlockerConditionVarBlocker都是可以复用的,我们只需要根据自身情况实现相应的 GuardedObjectConcretePredicateConcreteGuardedAction就可以了。

  • 8 min read

CONTRIBUTORS


  • 8 min read