深入学习Java线程池
标签:Java并发

深入学习Java线程池

先来看一下大体的架构:

先从最开始的开始吧,我们平时提交的任务都是 Runnable 类型的,可以看到 Executor 里面的 execute 方法就是接受一个Runnable类的参数

故我们平时可以直接实现Executor接口:

下面这种是线程池同步的执行每一个任务:

public class DirectExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
//        同步执行,每个任务
        command.run();
    }
}

也可以才用传进来一个任务的时候单独启动一个线程来运行:

public class NewTaskExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
//        每个任务都创建一个一个线程来执行
        new Thread(command).start();
        System.out.println(Thread.currentThread().getName());
    }
}

1. ExecutorService

再来看ExecutorService:

public interface ExecutorService extends Executor {

    /**
     * 关闭线程池,已提交的任务继续执行,不接受新的任务提交
     */
    void shutdown();

    /**
     * 关闭线程池,尝试关闭正在执行的任务,并且不再接受新的任务提交
     * 比前面多了一个now,区别在于它可以去停止当前正在执行的任务
     */
    List<Runnable> shutdownNow();

    /**
     * 线程池是否已经关闭了
     */
    boolean isShutdown();

    /**
     * 如果调用了shutdown和shutdownnow方法后,所有任务都结束了,那么返回true
     * 该方法必须在调用了shutdown和shutdownNow方法之后调用才会返回true
     */
    boolean isTerminated();

    /**
     * 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
     * 等待所有任务完成,并设置了超时时间
     * 实际中是:先调用shutdown和shutdownNow,然后再调用该方法等待所有线程真正完成,最后的返回值表示是否超时
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个Callable任务
     * 返回一个表示任务的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
     */
    <T> Future<T> submit(Callable<T> task);

   /**
    * 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
    * 该 Future 的 get 方法在成功完成时将会返回给定的结果。
    * 因为Runable没有返回值,所有我们制定第二个参数来作为返回值
    */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
     * 该 Future 的 get 方法在成功 完成时将会返回 null。
     */
    Future<?> submit(Runnable task);

    /**
     * 执行所有任务,返回Future类型的一个list
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
     * 给invokeAll设置了一个超时时间
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 只要其中的任意一个任务结束了就可以返回,返回是那个任务的结果
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

   /**
    * 跟上面方法一样,带超时时间,超过超时时间,则抛出TimeoutException异常
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可见ExecutorService在实现了Executor接口之后,增加了许多方法用于submit提交任务,关闭线程池等,还可以提交一个任务集合。


1.1 Runnable、Callable、Future、FutureTask

再来说一下submit的中的参数,一个是Runnable,一个是Callable,我们都知道Runnable不能取得方法执行完的结果,所以才有了Callable,Runnable 中提供了一个run方法让我们用来执行

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

注意上面有个 @FunctionalInterface表明这个再Java8之后可以用lambda表达式代替的。然后再来看一个常用的接口Future:

可见Future 提供了get方法可以取得方法执行完的结果

接下来是RunnableFuture:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

这样通过一个RunnableFuture将Runnable的run方法和Future的get方法整合在了一起,我们来看RunnableFuture的子类FutureTask:

可见我们传进去的Runnable和Callable最后都会变成FutureTask,我们往线程池中提交任务都是提交的FutureTask

最后我们来看一下他们之间的整体关系:

2. AbstractExecutorService

再来看ExecutorService下的AbstractExecutorService方法

可见我们传给 ExecutorService的Runnable,因为没有返回值,最后在调用其下面的抽象类的时候回调用newTaskFor方法将Runnable包装成Callable

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

再看FutureTask:

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

3. Executors

ThreadPoolExecutor是表示一个线程池,而Executors则是线程工厂的角色,通过Executors,我们可以得到一个拥有特定功能的线程池。

下面来看一下Executors提供了那些线程池创建方法:

3.1 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool():该方法将返回一个固定线程数量的线程池,该线程池的数量将始终不变。当一个线的任务提交时,线程池中若有空闲的线程,则立即执行,否则将会将任务添加到任务队列中,等到线程空闲时,再处理任务队列中的任务,但是由于它采用的阻塞队列是 LinkedBlockingQueue,是一个最大值很大(Integer.MAX_VALUE)的队列,也可以认为是无界队列(个人看法),当线程池中的任务处理不及时的时候,而一边又疯狂的提交任务,将会导致OOM发生。

3.2 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor:可见这个方法,只会创建一个线程的线程池。多余的任务还是会被添加到 LinkedBlockingQueue中,也会有OOM情况的发生。

3.3 newSingleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

newSingleThreadScheduledExecutor: 该方法将返回一个ScheduledExecutorService对象,而且线程池的大小为1。ScheduledExecutorService接口在 ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行或者周期性执行某个任务,后面再细说一下。

3.4 newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

newScheduleThreadPool:该方法也返回一个ScheduledExecutorService对象,不过可以指定线程的数量。

3.4 newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newCachedThreadPool:该方法将返回一个可根据实际情况调整的线程数量的线程池,线程池的数量不固定,我们可以看见上面的方法中设置的是corePoolSize为0,maximumPoolSize为整数最大值,保活时间为60秒,阻塞队列为SynchronousQueue,故线程池中有空闲线程可以复用的话,则会优先复用空闲线程,如果所有的线程都在工作的话,新的任务提交,直接会创建新的线程处理任务,所有线程处理完任务后,将会返回线程池进行复用。如果同时又大量任务提交,那么将会开启等量的线程,这样也会导致OOM。

4. 计划任务

我们在上面已经知道了ScheduleExecutorService可以根据时间需要对线程进行调度,则:

一个例子:

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses= Executors.newScheduledThreadPool(10);
        ses.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(System.currentTimeMillis()/1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

线程睡眠1秒,间隔两秒执行

1535267595
1535267597
1535267599
1535267601
1535267603
1535267605
1535267607

但是如果我们睡眠是5秒的话,那么

1535267752
1535267757
1535267762
1535267767

可见如果间隔时间period比线程执行时间短的话,还是得等到线程执行完才开始下一个。

此外,调度程序并不会保证任务会无限期执行的,如果任务本身抛出了异常,那么后面的执行任务都将会中断,故需要做好及时处理异常。

5. WorkQueue

5.1 ArrayBlockingQueue

一个由数组组成的有界阻塞队列

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
    	...
    }

构造ArrayBlockingQueue必须要指定容量,也可以指定是否是公平的,公平的情况下会按照FIFO的顺序等待执行任务

5.2 LinkedBlockingQueue

一个由链表组成的有界阻塞队列

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}


public LinkedBlockingQueue(Collection<? extends E> c) {
	...
}

LinkedBlockingQueue:默认的容量为Integer最大值,故其实也是一个有界队列,按照FIFO顺序等待执行

5.3 PriorityBlockingQueue

一个理由堆实现的含有优先级的阻塞队列

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

public PriorityBlockingQueue(Collection<? extends E> c) {
   ...
}

PriorityBlockingQueue:的默认初始元素个数为11个,默认情况下元素采取自然顺序升序排列,但不能保证相同优先级之间的顺序

5.4 DelayQueue

private final PriorityQueue<E> q = new PriorityQueue<E>();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

DelayQueue:一个支持延时的无界阻塞队列,使用PriorityBlockingQueue实现的,可以用于缓存(设置有效期),定时任务调度

5.5 SynchronousQueue

public SynchronousQueue() {
    this(false);
}
public SynchronousQueue(boolean fair) {
	transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

SynchronousQueue:一个不存储元素的阻塞队列,每一个put操作都需要等待一个take操作,相当于一个传递的效果

5.6 LinkedTransferQueue

一个由链表组成的无界阻塞队列

public LinkedTransferQueue() {
}
    public LinkedTransferQueue(Collection<? extends E> c) {
        this();
        addAll(c);
    }

LinkedTransferQueue:里面包含了tryTransfer和transfer两个方法

5.7 LinkedBlockingDeque

public LinkedBlockingDeque() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}


public LinkedBlockingDeque(Collection<? extends E> c) {
    ...
}

LinkedBlockingDeque:一个由链表组成的双向阻塞队列

6. ThreadPoolExecutor

上面的Executors创建出来的各种线程池,其核心都是利用的ThreadPoolExecutor,下面来介绍一下它:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

这是构造ThreadPoolExecutor的最全的构造方法:

下面来说一下线程的执行逻辑:

7. 拒绝策略

JVM一共提供了四种拒绝策略

7.1 AbortPolicy

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

该策略会直接抛出异常,阻止系统正常工作

7.2 CallerRunsPolicy

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

只要线程未关闭,该策略直接调用者线程中执行将被丢弃的任务,这样的话不会真正抛弃任务,但会影响提交线程的性能。

7.3 DiscardOldestPolicy

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

丢弃最开始的(即将被执行的)任务,并尝试再次提交任务。

7.4 DiscardPolicy

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

该策略将直接丢弃无法处理的任务,不予任何处理

7.5 自定义拒绝策略

在前面提供的四种策略中,如果没有能满足你需求的,可以实现拒绝策略接口,重写方法

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

下面是一个例子:

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask myTask=new MyTask();
        ExecutorService executorService= new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+" is discard");
            }
        });

        for (int i = 0; i < 20; i++) {
            executorService.submit(myTask);
            Thread.sleep(10);
        }
        executorService.shutdown();
    }
}

我们设置的corePoolSize和maximumPoolSize都为5,等待队列长度为10,使用的是Executors的默认线程工厂,自定义的拒绝策略就是输出一下,剩下的和直接丢弃一样的,下面是运行结果:

pool-1-thread-1 1535338398869
pool-1-thread-2 1535338398879
pool-1-thread-3 1535338398889
pool-1-thread-4 1535338398899
pool-1-thread-5 1535338398909
java.util.concurrent.FutureTask@45ee12a7 is discard
java.util.concurrent.FutureTask@330bedb4 is discard
java.util.concurrent.FutureTask@2503dbd3 is discard
java.util.concurrent.FutureTask@4b67cf4d is discard
java.util.concurrent.FutureTask@7ea987ac is discard
pool-1-thread-1 1535338399869
pool-1-thread-2 1535338399879
pool-1-thread-3 1535338399890
pool-1-thread-4 1535338399899
pool-1-thread-5 1535338399909
pool-1-thread-1 1535338400869
pool-1-thread-2 1535338400879
pool-1-thread-3 1535338400890
pool-1-thread-4 1535338400900
pool-1-thread-5 1535338400910

可见,最开始的5个,由corePoolSize执行,然后后面的10个进入等待队列,再剩下的5个就被抛弃了,然后后面等待队列中的10个开始执行。

8. 自定义线程创建

我们可以使用ThreadFactory进行线程的自定义创建

public class ThreadFactoryDemo {

    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MyTask myTask=new MyTask();
        ExecutorService executorService=new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t=new Thread(r);
                t.setDaemon(true);
                System.out.println("create"+t.getName());
                return t;
            }
        });

        for (int i = 0; i < 5; i++) {
            executorService.submit(myTask);
        }
        Thread.sleep(1000);
        executorService.shutdown();
    }
}

我们将所有的线程都设置为守护线程,主线程结束,则他们都得结束,则结果为:

createThread-0
createThread-1
createThread-2
createThread-3
createThread-4
Thread-0 1535338945922
Thread-1 1535338945922
Thread-2 1535338945922
Thread-3 1535338945922
Thread-4 1535338945922

可见,我们将创建线程的过程输出了,还有线程进入Sleep之前的话输出了,但是sleep之后的由于主线程结束而没有输出。

9. 线程池的扩展

当我们想要监控线程池的时候,比如获取每个任务的开始执行和结束执行时间,ThreadPoolExecutor提供beforeExecuteafterExecuteterminated三个接口对线程池进行控制。

在ThreadPoolExecutor中的Worker中的runWorker(JDK1.8)方法中

try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x; throw x;
    } catch (Error x) {
        thrown = x; throw x;
    } catch (Throwable x) {
        thrown = x; throw new Error(x);
    } finally {
        afterExecute(task, thrown);
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}

可见它会在之前调用beforeExecute和在finally语句块中调用afterExecute方法,而默认的是空的方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

我们可以对他们进行扩展用来对线程进行进行监控和调试

下面是一个例子:

public class ExtThreadPool {
    public static class MyTask implements Runnable{
        private String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在执行 ID: "+Thread.currentThread().getId()+" NAME: "+name);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService=new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:"+((MyTask) r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成:"+((MyTask) r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程退出");
            }
        };

        for (int i = 0; i < 5; i++) {
            MyTask myTask=new MyTask("Task-"+i);
            executorService.execute(myTask);
            Thread.sleep(100);
        }
        executorService.shutdown();
    }
}

我们在执行前,执行中,执行后,和退出都加了输出:

准备执行:Task-0
正在执行 ID: 12 NAME: Task-0
准备执行:Task-1
正在执行 ID: 13 NAME: Task-1
准备执行:Task-2
正在执行 ID: 14 NAME: Task-2
准备执行:Task-3
正在执行 ID: 15 NAME: Task-3
准备执行:Task-4
正在执行 ID: 16 NAME: Task-4
执行完成:Task-0
执行完成:Task-1
执行完成:Task-2
执行完成:Task-3
执行完成:Task-4
线程退出

可见,最后的terminated()方法要在线程池关闭之后才会退出

10. 线程池线程数量

一个经验公式:

Ncpu:CPU数量

Ucpu:目标CPU的使用率,0<=Ucpu<=1

W/C:等待时间与计算时间的比率

则最优的池的大小为:

Nthreads=Ncpu*Ucpu*(1+W/C)
  • 18 min read

CONTRIBUTORS


  • 18 min read