并发编程-线程池

一些八股

what

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。
线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
JDK中提供的线程池是ThreadPoolExecutor类。

好处

  1. 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

why

  • 线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

    1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
    2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
    3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。
  • 为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.——wikipedia

  • “池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:
  1. 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
  2. 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
  3. 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

ThreadPoolExecutor(jdk 1.8)

简介

UML类图

  • ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:

    1. 将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
  • ExecutorService接口增加了一些能力:

    1. 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
    2. 提供了管控线程池的方法,比如停止线程池的运行。AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor运行流程

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。
线程池的运行主要分成两部分:1. 任务管理 2. 线程管理。

  • 任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
    (1)直接申请线程执行该任务;
    (2)缓冲到队列中等待线程执行;
    (3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

生命周期

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。
线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。
在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它同时包含两部分的信息:
线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。
通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

····

/**
* State check needed by ScheduledThreadPoolExecutor to
* enable running tasks during shutdown.
*
* @param shutdownOK true if should return true if SHUTDOWN
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}

ThreadPoolExecutor的运行状态有5种,分别为:

1
2
3
4
5
6
7
8
9
10
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

其生命周期转换如下入所示:

任务执行机制

任务调度

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。

  • 首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。
    1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
    2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
    3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
    4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
    5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

任务缓冲

任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。
线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。
阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

BlockingQueue

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。
这两个附加的操作是:

  1. 在队列为空时,获取元素的线程会等待队列变为非空。
  2. 当队列满时,存储元素的线程会等待队列可用。
    阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。
    阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

任务申请
  • 任务的执行有两种可能:
    1. 任务直接由新创建的线程执行。
    2. 线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。

第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。
线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。
这部分策略由getTask方法实现,其执行流程如下图所示:

getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。
如果线程池现在不应该持有那么多线程,则会返回null值。
工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。

任务拒绝

任务拒绝模块是线程池的保护部分。
线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,
就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

拒绝策略是一个接口,其设计如下

1
2
3
4
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略,其特点如下:

线程管理

Worker线程

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。

1
2
3
4
5
6
7
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null

......
}

Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。
thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;
firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。
如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;
如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。
线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。
这个时候重要的就是如何判断线程是否在运行。

​Worker是通过继承AQS,使用AQS来实现独占锁这个功能。
没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

在线程回收过程中就使用到了这种特性,回收过程如下图所示:

Worker线程增加

增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。
addWorker方法有两个参数:firstTask、core。
firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;
core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。

其执行流程如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

t.start() 启动时会调用Worker类中的run方法。

Worker线程回收

线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。
Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。
当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
   /** Delegates main run loop to outer runWorker  */
public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}


private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}


大致概括为:

1
2
3
4
5
6
7
8
9
10

try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}


事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。
但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。

Worker线程执行任务
  • 在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
    1. while循环不断地通过getTask()方法获取任务。
    2. getTask()方法从阻塞队列中取任务。
    3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
    4. 执行任务。
    5. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

参数调优

业务场景

快速响应

用户发起的实时请求,服务追求响应时间。比如说用户要查看一个复杂对象的信息,那么需要将多种资源聚合在一起返回(限时返回)。
使用FutureTask带超时控制功能的get方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ExecutorService executor = Executors.newSingleThreadExecutor();

FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
result += i;
}
return result;
}
});
//执行
executor.execute(future);
//超时设为1纳秒
System.out.println(future.get(1L,NANOSECONDS));
//抛出异常
//Exception in thread "main" java.util.concurrent.TimeoutException
//at java.util.concurrent.FutureTask.get(FutureTask.java:205)
//...

大量计算任务

离线的大量计算任务,需要快速执行。比如说,统计某个报表,需要计算所有用户中以某些公式结果为指标的排行。尽量使用较小的线程池。

大量I/O任务

大量的读写请求、或网络请求。尽量使用相对多的线程,让cpu在等待IO的时候又其他线程去处理别的任务。

参数解析

  • corePoolSize:核心线程数量。
  • maximumPoolSize:最大线程数量;
  • workQueue:保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:
    • 直接切换:这种方式常用的队列是SynchronousQueue。
    • 使用无界队列:一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了(后面也会说到)。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
    • 使用有界队列:一般使用ArrayBlockingQueue。(支持公平策论,能保证线程FIFO顺序,但会降低吞吐性)。
      • 如果要想降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等), 可以设置较大的队列容量和较小的线程池容量, 但这样也会降低线程处理任务的吞吐量。
      • 如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。
  • keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
  • threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
  • handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
    • AbortPolicy:直接抛出异常,这是默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

关于核心线程数的公式

在不涉及I/O操作或共享数据访问的计算问题中,线程数量为N(cpu)或N(cpu)+1时2将获得最佳吞吐量。

线程池的监控

通过线程池提供的参数进行监控。
线程池里有一些属性在监控线程池的时候可以使用:

  • getTaskCount:线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
  • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
  • getPoolSize:线程池当前的线程数量;
  • getActiveCount:当前线程池中正在执行任务的线程数量。

通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。

钩子

ThreadPoolExecutor提供了protected类型可以被覆盖的钩子方法,允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化ThreadLocal、收集统计信息、如记录日志等操作。这类Hook如beforeExecute和afterExecute。另外还有一个Hook可以用来在任务被执行完的时候让用户插入逻辑,如rerminated。
如果hook方法执行失败,则内部的工作线程的执行将会失败或被中断。

获取任务返回值

向线程池提交任务时,除了execute方法,还有一个submit方法,submit方法会返回一个Future对象用于获取返回值

Spring中的一些封装

ThreadPoolTaskExecutor(推荐,基本没坑)

  • 本质上是包装了ThreadPoolExecutor
  • 缓存任务队列
    • 大于0时采用 LinkedBlockingQueue 队列的默认长度为Integer.MAX_VALUE,有容积风险
    • 小于0时采用 SynchronousQueue 中的 TransferStack 栈模式(先进后出)

SimpleAsyncTaskExecutor

1
2
3
4
5
6
//类的注释
NOTE: This implementation does not reuse threads!
</b>
Consider a
thread-pooling TaskExecutor implementation instead, in particular for
executing a large number of short-lived tasks.

这个类并不会重用线程,每次调用都会创建一个新的线程,设计初衷是为了执行大量的短时间的任务。

SyncTaskExecutor

同步的执行任务,任务的执行是在主线程中,不会启动新的线程来执行提交的任务。主要使用在没有必要使用多线程的情况,如较为简单的测试用例。

ConcurrentTaskExecutor

适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。无参构造器或默认配置为单线程。

1
2
3
4
5
6
7
new ConcurrentTaskExecutor(Executors.newFixedThreadPool(3));

ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
executor.setPoolSize(20);
executor.setThreadNamePrefix("taskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
new ConcurrentTaskExecutor(executor);

SimpleThreadPoolTaskExecutor

是Quartz的ThreadPoolTaskExecutor类。线程池同时被quartz和非quartz使用,才需要使用此类。

stream中的线程池

parallelStream
基于ForkJoinPool,可以通过虚拟机启动参数来设置worker的数量:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N

1
2
3
4
5
6

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream() .forEach(num->System.out.println(Thread.currentThread().getName()+">>"+num));
//对顺序不敏感的操作。没有线程安全问题、较单纯的、简单且处理较快的数据处理任务。
Optional<Integer> findAny = numbers.parallelStream().filter(x -> x > 6).findAny();

目前线上使用的线程池

Threads类

使用了ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//线上核心数
private static final int CORE_POOL_SIZE =
Runtime.getRuntime().availableProcessors();
//线程创建共产类 指定了线程名称且不设优先级(默认普通-5)
private static final ThreadFactory NAMED_THREAD_FACTORY =
new ThreadFactoryBuilder().setNameFormat("common-pool-%d").build();
//线程池 LinkedBlockingQueue 队列满前都使用CORE_POOL_SIZE线程数
public static final ThreadPoolExecutor SINGLE_THREAD_POOL =
new ThreadPoolExecutor(CORE_POOL_SIZE,
CORE_POOL_SIZE+1,
10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
NAMED_THREAD_FACTORY);


在不涉及I/O操作或共享数据访问的计算问题中,线程数量为N(cpu)或N(cpu)+1时将获得最佳吞吐量。
-并发编程实战

用例:

1
2
3
4
5
6
7
8
//将任务直接赋给线程池同步的开始执行
Threads.SINGLE_THREAD_POOL.execute(() -> {
Set<String> corpIds = corpMap.keySet();
for (String cId : corpIds) {
externalTagService.upsertCorpExtTags(cId);
}
});

针对一些只需要执行任务的情况,接口可以将任务直接扔给线程池,当前线程就可以结束返回。
适用一些不常发生和资源占用不大的操作。
如果是经常发生的、频繁的操作,走kafka或者异步线程池。

marketingExecutor(MarketingThreadPoolConfig)

使用了ThreadPoolTaskExecutor,本质上还是ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
//核心数可以多设一些
taskExecutor.setCorePoolSize(9);
taskExecutor.setMaxPoolSize(12);
taskExecutor.setQueueCapacity(1000);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("marketingExecutor");
//等待队列任务完成才能关闭
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//关闭延时
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;

用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//每次请求的创意id最多3000个
int perSize = 3000;
int size = idList.size();
int page = (size + perSize - 1) / perSize;
CountDownLatch latch = new CountDownLatch(page);
for (int i = 0; i < page; i++) {
List<Long> ids = idList.
subList(i * perSize, Math.min(size, i * perSize + perSize));
String jsonParam =
setRequestParam(token, ids, BaiduBusinessIdTypeEnum.CREATIVEID.getValue());
taskExecutor.execute(() -> {
try {
String responseJson = baiduBusinessRestTemplateUtil.post(MarketingConstants.BaiduBusinessUrl.GET_BAIDU_CREATIVE_URL
, baiduBusinessRestTemplateUtil.setRequestEntity(jsonParam), new ParameterizedTypeReference<String>() {
});
responseCreatives.addAll(JSON.parseObject(responseJson, BaiduBusinessCreative.class).getBody().getData().stream().
filter(creative -> creative.getStatus().equals(BaiduBusinessStatusEnum.EFFECTIVE.getValue())).collect(Collectors.toList()));
} catch (Exception e) {
log.error("百度创意批量更新异常,请求参数:{},error:{}",jsonParam,e.getMessage());
} finally {
latch.countDown();
}
});
}
latch.await();

适合I/O密集型的场景。