Two-phase Termination(两阶段终止)模式
标签:并发模式

Two-phase Termination(两阶段终止)模式

Two-phase Termination通过将将停止线程分解成准备阶段和执行阶段两个阶段提供一种优雅的停止线程的方式。

准备阶段:主要是通知目标线程准备进行停止。这一个阶段会设置一个标志变量用于指示目标线程可以停止了。但是由于目标线程可能处于阻塞状态(等待锁的获得),等待状态(调用Object.wait()),或者IO等待状态(Inputstream.read())等,这样的话,即时设置状态标志,但是线程没有办法看见,故这一阶段还需要通过调用目标线程的interrupt方法,来期望目标线程可以捕获相关的异常来检测到interrupt方法被调用,从而中断阻塞、等待状态。

对于能够对interrupt做出反应的方法有:

方法或类 响应interrupt调用抛出异常
Object.wait()、Object.wait(long timeout)、Object.wait(long millis,int nanos) InterruptedException
Threed.sleep(long millis)、hread(long millis,int nanos) InterruptedException
Thread.join()、Thread.join(long millis)、Thread.join(long millis,int nanos) InterruptedException
j.u.c.BlockingQueue.take() InterruptedException
j.u.c.locks.Lock.lockInterruptibly() InterruptedException
java.nio.channels.InterruptibleChannel java.nio.channels.ClosedByInterruptException

对于一些并不对interrupt调用做出反应的,需要进行手动处理异常。

执行阶段:该阶段主要检查准备阶段所设置的停止标志和信号,在此基础上决定线程停止时机,并进行适当的清理操作。

1. 架构

可见上面的使用的是模板方法模式进行设计与扩展的。

2. 实例

Terminatable

public interface Terminatable {
    void terminate();
}

AbstractTerminatableThread

/**
 * 可停止的抽象线程。
 * 
 * 模式角色:Two-phaseTermination.AbstractTerminatableThread
 *
 */
public abstract class AbstractTerminatableThread extends Thread
        implements Terminatable {
    private final boolean DEBUG = true;

    // 模式角色:Two-phaseTermination.TerminationToken
    public final TerminationToken terminationToken;

    public AbstractTerminatableThread() {
        this(new TerminationToken());
    }

    /**
     * 
     * @param terminationToken
     *            线程间共享的线程终止标志实例
     */
    public AbstractTerminatableThread(TerminationToken terminationToken) {
        this.terminationToken = terminationToken;
        terminationToken.register(this);
    }

    /**
     * 留给子类实现其线程处理逻辑。
     * 
     * @throws Exception
     */
    protected abstract void doRun() throws Exception;

    /**
     * 留给子类实现。用于实现线程停止后的一些清理动作。
     * 
     * @param cause
     */
    protected void doCleanup(Exception cause) {
        // 什么也不做
    }

    /**
     * 留给子类实现。用于执行线程停止所需的操作。
     */
    protected void doTerminiate() {
        // 什么也不做
    }

    @Override
    public void run() {
        Exception ex = null;
        try {
            for (;;) {

                // 在执行线程的处理逻辑前先判断线程停止的标志。
                if (terminationToken.isToShutdown()
                        && terminationToken.reservations.get() <= 0) {
                    break;
                }
                doRun();
            }

        } catch (Exception e) {
            // 使得线程能够响应interrupt调用而退出
            ex = e;
            if (e instanceof InterruptedException) {
                System.out.println(e);
            } else {
                System.out.println(e);
            }
        } finally {
            try {
                doCleanup(ex);
            } finally {
                terminationToken.notifyThreadTermination(this);
            }
        }
    }

    @Override
    public void interrupt() {
        terminate();
    }

    /*
     * 请求停止线程。
     * 
     */
    @Override
    public void terminate() {
        terminationToken.setToShutdown(true);
        try {
            doTerminiate();
        } finally {

            // 若无待处理的任务,则试图强制终止线程
            if (terminationToken.reservations.get() <= 0) {
                super.interrupt();
            }
        }
    }

    public void terminate(boolean waitUtilThreadTerminated) {
        terminate();
        if (waitUtilThreadTerminated) {
            try {
                this.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

}

TerminationToken

import java.lang.ref.WeakReference;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程停止标志。
 *
 */
public class TerminationToken {

    // 使用volatile修饰,以保证无需显式锁的情况下该变量的内存可见性
    protected volatile boolean toShutdown = false;
    public final AtomicInteger reservations = new AtomicInteger(0);

    /*
     * 在多个可停止线程实例共享一个TerminationToken实例的情况下,该队列用于
     * 记录那些共享TerminationToken实例的可停止线程,以便尽可能减少锁的使用 的情况下,实现这些线程的停止。
     */
    private final Queue<WeakReference<Terminatable>> coordinatedThreads;

    public TerminationToken() {
        coordinatedThreads = new ConcurrentLinkedQueue<WeakReference<Terminatable>>();
    }

    public boolean isToShutdown() {
        return toShutdown;
    }

    protected void setToShutdown(boolean toShutdown) {
        this.toShutdown = true;
    }

    protected void register(Terminatable thread) {
        coordinatedThreads.add(new WeakReference<Terminatable>(thread));
    }

    /**
     * 通知TerminationToken实例:共享该实例的所有可停止线程中的一个线程停止了, 以便其停止其它未被停止的线程。
     * 
     * @param thread
     *            已停止的线程
     */
    protected void notifyThreadTermination(Terminatable thread) {
        WeakReference<Terminatable> wrThread;
        Terminatable otherThread;
        while (null != (wrThread = coordinatedThreads.poll())) {
            otherThread = wrThread.get();
            if (null != otherThread && otherThread != thread) {
                otherThread.terminate();
            }
        }
    }

}

下面是一个生产者与消费者的例子:

SomeService

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class SomeService {
    private final BlockingQueue<String> queue =
            new ArrayBlockingQueue<String>(
                    20);

    private final Producer producer = new Producer();
    private final Consumer consumer = new Consumer();

    private class Producer extends AbstractTerminatableThread {
        private int i = 0;

        @Override
        protected void doRun() throws Exception {
            queue.put(String.valueOf(i++));
            System.out.println("put"+String.valueOf(i));
            consumer.terminationToken.reservations.incrementAndGet();
        }

    };

    private class Consumer extends AbstractTerminatableThread {

        @Override
        protected void doRun() throws Exception {
            String product = queue.take();
            System.out.println("get product:" + product);

            // 模拟执行真正操作的时间消耗
            try {
                Thread.sleep(50);
            } finally {
                terminationToken.reservations.decrementAndGet();
            }

        }

    }

    public void shutdown() {
        // 生产者线程停止后再停止消费者线程
        producer.terminate(true);
        consumer.terminate();
    }

    public void init() {
        producer.start();
        consumer.start();
    }

    public static void main(String[] args) throws InterruptedException {
        SomeService ss = new SomeService();
        ss.init();
        Thread.sleep(50);
        ss.shutdown();
    }
}

这里面的ProducerCustomer都继承了AbstractTerminatableThread,它们都是架构图里面的ConcreteTerminatableThread,而SomeService里面包含了initshutdown方法是架构图里面的ThreadOwner

这样我们在外面调用SomeService的实例就可以进行开始和暂停了。


输出结果为:

put 0
put 1
get product0
put 2
put 3
put 4
put 5
put 6
put 7
put 8
put 9
put 10
put 11
put 12
put 13
put 14
put 15
put 16
put 17
put 18
put 19
put 20
put 21
java.lang.InterruptedException
get product1
get product2
get product3
get product4
get product5
get product6
get product7
get product8
get product9
get product10
get product11
get product12
get product13
get product14
get product15
get product16
get product17
get product18
get product19
get product20

上面的Termintable、TerminationToken、AbstractTerminatableThread都是可以复用的,我们主要是实现AbstractTerminatableThread的子类。

  • 6 min read

CONTRIBUTORS


  • 6 min read