LockSupport
LockSupport LockSupport是一个非常好用的的线程阻塞的工具类,它可以在线程内任意位置让线程阻塞。 和Thread.suspend()相比,它弥补了由于resume发生在前,导致线程无法继续执行的情况。 suspend方法在暂停线程的时候,也会不释放任何资源,…
多线程情况下,将一个任务分解为不同的部分,交由不同的线程执行,线程之间相互协作。可能存在一个线程执行前需要满足一些特定的条件,在该条件未满足的情况下,暂停执行,使线程处于
Waiting
状态,直到该条件满足后才能继续执行。
Guarded Suspension
的核心是一个受保护方法(Guarded Method)。该方法执行需要满足特定的条件(Predicate
)。
GuardedObject
:包含了受保护的方法的对象
guardedMethod
:受保护方法stateChanged
:改变GuardedObject实例状态的方法,该方法负责在保护条件成立时唤醒受保护方法的执行线程GuardedAction
:抽象了目标动作,并关联了目标动作所需的保护条件
call
:用于表示目标动作的方法ConcreteGuardedAction
:应用程序所实现的具体目标动作及其关联的保护条件Predicate
:抽象了保护条件
evaluate
:用于表示保护条件的方法ConcretePredicate
:应用程序所实现的具体保护条件Blocker
:负责对执行guardedMethod方法的线程进行唤醒和挂起,并执行ConcreteGuardedAction所实现的目标操作
callWithGuard
:负责执行目标操作和暂挂当前线程signalAfter
:负责执行其参数指定的动作和唤醒由该方法所属Blocker实例所暂挂的线程中的一个线程signal
:负责唤醒由该方法所属Blocker实例所暂挂的线程中的一个线程broadcastAfter
:负责执行其参数指定的动作和唤醒由该方法所属Blocker实例所暂挂的所有线程broadcast
:负责唤醒由该方法所属Blocker实例暂挂的所有线程ConditionVarBlocker
:基于Java条件变量(java.util.concurrent.lock.Condition
)实现的Blocker保护条件 Predicate
public interface Predicate {
boolean evaluate();
}
目标动作 GuardedAction
import java.util.concurrent.Callable;
/**
* @author liuyao
* @date 2018/08/24
*/
public abstract class GuardedAction<V> implements Callable<V> {
protected final Predicate guard;
protected GuardedAction(Predicate guard) {
this.guard = guard;
}
}
Blocker
import java.util.concurrent.Callable;
/**
* @author liuyao
* @date 2018/08/24
*/
public interface Blocker {
/**
* 在保护条件成立时执行目标动作,否则阻塞当前线程,直到保护条件成立
* @param guardedAction
* @param <V>
* @return
* @throws Exception
*/
<V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception;
/**
* 执行stateOperation所指定的操作后,决定是否唤醒本Blocker所暂挂的所有线程中的一个
* @param stateOperation 更改状态操作,其call方法的返回值为true时,该方法才会唤醒被暂挂的线程
* @throws Exception
*/
void signalAfter(Callable<Boolean> stateOperation) throws Exception;
void signal() throws InterruptedException;
/**
* 执行stateOperation所指定的操作后,决定是否唤醒本Blocker所暂挂的所有线程中的所有线程
* @param stateOperation 更改状态操作,其call方法的返回值为true时,该方法才会唤醒被暂挂的线程
* @throws Exception
*/
void broadcastAfter(Callable<Boolean> stateOperation) throws Exception;
}
ConditionVarBlocker
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author liuyao
* @date 2018/08/24
*/
public class ConditionVarBlocker implements Blocker {
private final Lock lock;
private final Condition condition;
public ConditionVarBlocker(Lock lock) {
this.lock = lock;
this.condition=lock.newCondition();
}
public ConditionVarBlocker() {
this.lock=new ReentrantLock();
this.condition=lock.newCondition();
}
@Override
public <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception {
lock.lockInterruptibly();
V result;
try {
final Predicate guard=guardedAction.guard;
while (!guard.evaluate()){
condition.await();
}
result=guardedAction.call();
return result;
}finally {
lock.unlock();
}
}
@Override
public void signalAfter(Callable<Boolean> stateOperation) throws Exception {
lock.lockInterruptibly();
try {
if (stateOperation.call()){
condition.signal();
}
}finally {
lock.unlock();
}
}
@Override
public void signal() throws InterruptedException {
lock.lockInterruptibly();
try {
condition.signal();
}finally {
lock.unlock();
}
}
@Override
public void broadcastAfter(Callable<Boolean> stateOperation) throws Exception {
lock.lockInterruptibly();
try {
if (stateOperation.call()){
condition.signalAll();
}
}finally {
lock.unlock();
}
}
}
AlarmInfo
public class AlarmInfo {
private String info;
}
AlarmAgent
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
/**
* 负责连接告警服务器,并发送告警信息给服务器
* @author liuyao
* @date 2018/08/24
*/
public class AlarmAgent {
// 用于记录AlarmAgent是否连接上告警服务器
private volatile boolean connectedToServer = false;
private final Predicate agentConnected =new Predicate() {
@Override
public boolean evaluate() {
return connectedToServer;
}
};
private final Blocker blocker=new ConditionVarBlocker();
// 心跳定时器
private final Timer heartbeatTimer=new Timer(true);
/**
* 发送告警信息
* @param alarm
* @throws Exception
*/
public void sendAlarm(final AlarmInfo alarm) throws Exception {
// 可能需要等待,直到AlarmAgent连接上告警服务器(或者连接中断后重新连上服务器)
GuardedAction<Void> guardedAction=new GuardedAction<Void>(agentConnected) {
@Override
public Void call() throws Exception {
doSendAlarm(alarm);
return null;
}
};
blocker.callWithGuard(guardedAction);
}
/**
* 通过网络连接将告警信息发送给服务器
* @param alarm
*/
private void doSendAlarm(AlarmInfo alarm) {
System.out.println("sending alarm "+alarm);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void init(){
//...
// 告警服务器连接线程
Thread connectingThread =new Thread(new ConnectingTask());
connectingThread.start();
heartbeatTimer.schedule(new HeartbeatTask(),60000,2000);
}
public void disconnect(){
System.out.println("disconnected from alarm server");
connectedToServer=false;
}
protected void onConnected(){
try {
blocker.signalAfter(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
connectedToServer=true;
System.out.println("Connected to Server");
return Boolean.TRUE;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
protected void onDisConnected(){
connectedToServer=false;
}
/**
* 负责与告警服务器建立连接
*/
private class ConnectingTask implements Runnable{
@Override
public void run() {
//...
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
onConnected();
}
}
/**
* 心跳定时任务,定时检查与告警服务器的连接是否正常,发现异常后自动重新连接
*/
private class HeartbeatTask extends TimerTask{
@Override
public void run() {
if (!testConnection()){
onDisConnected();
reconnect();
}
}
}
private boolean testConnection(){
//...
return true;
}
private void reconnect(){
ConnectingTask connectingThread=new ConnectingTask();
// 直接在心跳定时器线程中执行
connectingThread.run();
}
}
AlarmAgent
里面有两个内部类,负责连接告警服务器和进行心跳连接,当我们调用sendAlarm
方法的时候,先去构造一个guardedAction
,将我们的保护条件传进guardedAction
里面,最后将该guardedAction
传进我们之前构造的一个blocker
里面,这样我们就可以在blocker
的实现类ConditionVarBlocker
中,利用Java提供的Lock
进行保护条件检测,当保护条件不满足的情况下,该方法一直阻塞,直到条件满足后,执行方法并将结果返回。
可见上面对于执行条件的判断都放在了blocker
的实现类中,我们构建多线程任务的时候,只需把具体的保护条件和受保护方法构造后,传进blocker
进行判断就可以了,条件满足就会执行并返回结果。
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
/**
* 本程序是为了演示“嵌套监视器锁死“而写的,因此本程序需要通过手工终止进程才能结束。
*
* @author Viscent Huang
*
*/
public class NestedMonitorLockoutExample {
public static void main(String[] args) {
final Helper helper = new Helper();
System.out.println("Before calling guaredMethod.");
Thread t = new Thread(new Runnable() {
@Override
public void run() {
String result;
result = helper.xGuarededMethod("test");
System.out.println(result);
}
});
t.start();
final Timer timer = new Timer();
// 延迟50ms调用helper.stateChanged方法
timer.schedule(new TimerTask() {
@Override
public void run() {
helper.xStateChanged();
timer.cancel();
}
}, 50, 10);
}
private static class Helper {
private volatile boolean isStateOK = false;
private final Predicate stateBeOK = new Predicate() {
@Override
public boolean evaluate() {
return isStateOK;
}
};
private final Blocker blocker = new ConditionVarBlocker();
public synchronized String xGuarededMethod(final String message) {
GuardedAction<String> ga = new GuardedAction<String>(stateBeOK) {
@Override
public String call() throws Exception {
return message + "->received.";
}
};
String result = null;
try {
result = blocker.callWithGuard(ga);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public synchronized void xStateChanged() {
try {
blocker.signalAfter(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
isStateOK = true;
System.out.println("state ok.");
return Boolean.TRUE;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
上面的的 xGuarededMethod
和 xStateChanged
通过了synchronized进行加锁,导致了程序运行之后就会出现只显示
Before calling guaredMethod.
出现这个可见我们在主函数里面执行了 t.start()
方法后,xGuarededMethod
一直没有返回
现在来分析一下原因:如果我们在这两个方法都加上synchronized的话,在t线程启动进入 xGuarededMethod
时,由于是同步的需要获得一个锁(它所属的NestedMonitorLockoutExample
实例),在ConditionVarBlocker
的 callWithGuard
的方法中有condition.await();
,在调用await方法后会释放之前其所属的Condition实例相关的锁,但是其synchronized的NestedMonitorLockoutExample
并没有释放,这样后面的xStateChanged
的就没有办法获得NestedMonitorLockoutExample
锁,就没有办法唤醒前面阻塞在await中的方法,所以最后只会输出一句话。
下面我们来通过JVisualVM查看一下内存:
上面的代码中,Predicate
、GuardedAction
、Blocker
和ConditionVarBlocker
都是可以复用的,我们只需要根据自身情况实现相应的 GuardedObject
、ConcretePredicate
和ConcreteGuardedAction
就可以了。
LockSupport LockSupport是一个非常好用的的线程阻塞的工具类,它可以在线程内任意位置让线程阻塞。 和Thread.suspend()相比,它弥补了由于resume发生在前,导致线程无法继续执行的情况。 suspend方法在暂停线程的时候,也会不释放任何资源,…
主从DB与Cache一致性问题 1. 缓存设计一些问题 1.1 “更新缓存”还是“淘汰缓存” 更新缓存:数据不但会写入数据库,…