Java ThreadPool

Java线程池实现原理及其在美团业务中的实践


1. 线程状态转换

Java线程状态转换


2. ThreadPoolExecutor

2.1 线程池参数

1
2
3
4
5
6
7
8
9
10
 public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExectionHandler handler)
{
...
}
  • corePoolSize :线程池基本大小,任务数目超过corePoolSize之后, 首先进入workQueue排队等待。
  • workQueue : 只有当workQueue排队满了之后, 才会再继续创建新的线程(启用maximumPoolSize)。
  • maximumPoolSize: 最大线程数, 如果已经达到了maximumPoolSize,依然有任务进来,则启用饱和策略。
  • keepAliveTime: 保活时间, 如果某个线程空闲的时间超过了keepAliveTime, 那么它将被标记为可回收, 并且如果此时线程池的大小超过了coolPoolSize, 这个线程将会被终止。
  • unit: TimeUnit.NANOSECONDS, TimeUnit.MICROSECONDS, TimeUnit.MILLISECONDS, TimeUnit.SECONDS…
  • BlockingQueue: the queue to use for holding tasks before they are executed
  • ThreadFactory threadFactory : the factory to use when the executor creates a new thread
  • RejectedExecutionHandler: the handler to use when execution is blocked because the thread bounds and queue capacities are reached

2.2 workQueue和rejectedHandler(饱和策略)

2.2.1 workQueue有三种:
  • 无界队列(LinkedBlockingQueue) : newFixed和newSingle默认采用无界队列
  • 有界队列(ArrayBlockingQueue, 有界的LinkedBlockingQueue, PriorityBlockingQueue) : ArrayBlockingQueue和LinkedBlockingQueue依据的是FIFO原则, 如果要控制任务的优先级, 则可以使用PriorityBlockingQueue。
  • 同步移交(SynchronousQueue) : SynchronousQueue不是一种真正的队列, 而是一种在线程中移交的机制。当线程池无限大或者可以拒绝任务时候, SynchronousQueue才有价值。newCached就是使用了SynchronousQueue。
2.2.2 饱和策略
  • AbortPolicy : 默认饱和策略, 抛出unchecked RejectedExecutionException异常。
  • DiscardPolicy&DiscardOldestPolicy : 当新提交的任务无法保存到workQueue中时, Discard策略会抛弃该任务。DiscardOldestPolicy会抛弃队列最前面的任务,并且尝试重新提交新任务。
  • CallerRunsPolicy : 既不会跑异常,也不会抛弃任务,而是由调用者(例如main线程)来执行任务。
  • UserDefinedPolicy(RejectedExecutionHandler) :
2.2.3 线程工厂

ThreadFactory默认创建一个新的、非守护的线程,并且不含特殊配置信息。通过制定一个线程工厂方法,也可以自己定制线程池的配置信息。

2.2.4 扩展ThreadPoolExecutor

可以改写以下几个方法:

1
2
3
4
5
beforeExecute();

afterExecute();

terminated();

自定义实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.quantums.chap8;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* 定制线程池
*/
public class TimingThreadPool extends ThreadPoolExecutor {
public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

private final ThreadLocal<Long> start = new ThreadLocal<Long>();
private final Logger logger = Logger.getLogger("TimingThreadPool");

private final AtomicLong numTask = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);

logger.log(Level.FINE, String.format("Thread %s : start %s", r, t));
start.set(System.currentTimeMillis());
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long timeConsumed = System.currentTimeMillis() - start.get();
logger.log(Level.FINE, String.format("Thread %s : start %s, totalTime=%d", r, t, timeConsumed));
numTask.getAndIncrement();
totalTime.addAndGet(timeConsumed);
} finally {
super.afterExecute(r, t);
}
}

@Override
protected void terminated() {
try {
logger.log(Level.FINE, String.format("Terminated: numTask=%d, totalTime=%d, avgTime=%d",
numTask.get(), totalTime.get(), totalTime.get() / numTask.get()));

} finally {
super.terminated();
}
}
}

2.3 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestThreadPool{
public static void main(String[] ar){
ThreadPoolExecutor pool1 = new ThreadPoolExecutor(2, 16, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());

ThreadPoolExecutor pool2 = new ThreadPoolExecutor(2, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5), new MyRejectedHandler());

ThreadPoolExecutor pool3 = new ThreadPoolExecutor(2, 16, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());

for (int i = 0; i < 30; i++)
pool2.execute(new Runner());

System.out.println("ActiveCount: " + pool2.getActiveCount());
System.out.println("MaximumPoolSize: " + pool2.getMaximumPoolSize());
System.out.println("Core: " + pool2.getCorePoolSize());
System.out.println("CompletedTaskCount: " + pool2.getCompletedTaskCount());
}
}

class Runner implements Runnable{

private static AtomicInteger count = new AtomicInteger(0);

@Override
public void run(){
try{
System.out.println("working thread..." + (count.incrementAndGet()));
Thread.sleep(1000000);
}
catch (InterruptedException e){
e.printStackTrace();
}
}
}

class MyRejectedHandler implements RejectedExecutionHandler{

private static AtomicInteger count = new AtomicInteger(0);

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
System.out.println("Rejected Thread NO." + count.incrementAndGet());
}
}