java的执行流模型
java应用是单进程多线程模式,执行流的实体是线程,每个线程拥有自己独立的调用栈,堆和打开文件是共享的。线程实际上是对体系结构中的多线程的支持。
java的多线程
线程是一个对象,这个对象可以执行某个函数,我暂且叫他运行函数。此外线程还有休眠,中断,加入,和线程私有存储等功能。实际上线程更像是一个worker,worker有执行task的能力,并且worker还可以和其他worker协作。当程序员需要另起一个执行流的时候,就需要使用线程.大白话就是你用一个线程来放歌,一个线程来浏览网页,一个线程来下载电影. 在大数据技术栈中的zk,es,hadoop等等都是这种多线程模式,一组线程负责心跳,一组线程负责内部rpc,一组线程提供外部访问
简单介绍下jvm的线程实现
jvm实际上是一个c++程序,在linux中被执行后就进入jvm的入口函数,这里会封装env,开辟堆内存,初始化jit等等,最后会启动一个main线程,来执行程序员编写的main函数。Main线程和普通线程的执行模型区别并不大。下面是创建普通线程的代码. 只有在调用线程start方法时,才会真正在linux中创建线程
1 | Thread thread = new Thread(()->{ |
注意 判断在jvm主进程中还是在java虚拟机中:
- 只要是调用了native方法,都是在jvm主进程中直接执行的.普通的java方法都是经过jvm字节码翻译/编译后执行的。比如gc和jdk库中的一些native方法.
- 如果要访问java虚拟机外的资源就只能在jvm主进程中
Main线程是jvm主进程帮忙创建,并调用start方法.
这里借用一下网上的一张时序图.start0() 这个本地方法,最终映射到了 JVM_StartThread 这个方法中,在``hotspot\src\share\vm\prims\jvm.cpp`
如果你对C++比较熟悉的话,会发现java的线程实际上还是pthread_create出来的,所以线程的sleep,interput都是调用的native方法,因为需要访问linux的线程,在jvm中是做不到的。
线程中断
和cpu中断类似,对于执行流模型来说,都需要一个中断机制来中断执行流.cpu通过中断来响应外部事件,陷入操作系统等.*
线程中断可以用做终止线程的标志位;对于线程阻塞在某些可中断方法的场景,中断会抛出中断异常,让程序员来处理中断阻塞方法的后置处理
注意
- 线程中断标志位并不会终止线程,只是设置了一个标志位
- Thread.interupted() 方法会有副作用,清空当前的标志位。我觉得这个叫clearInterupted更好,返回值是当前的中断标志位。因为需要提供一个方法来清空标志位.
线程池的作用
线程池是一种资源池,可以达到复用的效果.此外,还可以增加并行度(因为线程是一个worker).
- 常见的资源线程池有:数据库链接池等.
- 常见的任务线程池有:tomcat的service线程池,netty的event事件处理线程池,spring的定时任务线程池等
线程池在软件工程上来说,也实现了线程调度和业务逻辑的分离。
jdk的线程池框架
采用的还是抽象基类的模式.Executor是最顶层接口. AbstractExecutorService是抽象基类.常见的线程池实现有定时任务线程池ScheduledThreadPoolExecutor,实时线程池ThreadPoolExecutor
jdk线程池的特性
如何描述一个线程池?自然的是池里面有多少个线程,因此有核心线程数(最小线程数),最大线程数,线程存活时间.线程池还需要给定一个异常处理策略,否则线程池不知道该如何处理执行发生时的异常.线程池也是一个资源池,访问资源会出现排队的情况,因此也需要指定线程池的队列大小
- 核心线程数,最大线程数
- 异常处理策略
- 队列大小
- 线程存活时间
ThreadPoolExecutor详解(基于jdk11)
先提出几个问题?
如何维护线程keepAlive?
如何执行任务?
线程池如何退出?
线程池状态维护
线程池维护了几个成员,corePoolSize,maxinumPoolSize,keepAliveTime,rejectHandler,workQueue(保存任务的阻塞队列),mainLock(主要是用来控制对线程池成员的并发访问的),ctl(是个int,但是分段使用的,高位保存的是线程池的状态,低位保存的是线程的woker数量)
线程池总共有5个状态,所以用到了int的高3位. 状态机是 RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED,越往后数字越大。
正常关闭是RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED; 立即关闭是RUNNING -> STOP -> TIDYING -> TERMINATED
1 | private static final int COUNT_BITS = Integer.SIZE - 3; //29 |
执行任务
当在调用submit/execute时,主要流程如下:
- 先检查线程池状态,如果已经处于SHUTDOWN或者更高级别,则不往线程池里面加;如果是running,并且小于coreSize,则新创建线程来执行
- 如果大于coreSize,则看能否加到队列里去
- 如果加不到队列里去,如果没有大于max线程数,则开启一个线程去处理
下面对照代码来解释前面的流程:接下来看addWorker方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//小于coreSize,则直接addWorker;
if (workerCountOf(c) < corePoolSize) {
//addWorker内部会检查线程池状态,如果处于SHUTDOWN就不再加了
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果是运行中,则尝试加入到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// //并发访问控制在在addWorker中已经做了,这里主要是在shutdown状态下,不再加入任务,将先前加入的并移出去
if (!isRunning(recheck) && remove(command))
reject(command);
//如果是running,并且没有大于maxSize则开启一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//这里第二参数表示非核心线程的意思
}
//如果不是running,并且没有大于maxSize则开启一个线程;如果队列满了并且没有大于maxSize则开启一个线程;
else if (!addWorker(command, false))
reject(command);
}接下来看worker类,worker继承了AQS.它的lock逻辑上表达的是正在执行的,非idle的.主要流程是:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
//这里如果是SHUTDOWN以上的,就直接终止流程
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
//校验最大线程数在这里
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//如果修改workerCount成功了,就表示增加woker成功了,
if (compareAndIncrementWorkerCount(c))
break retry;
//如果workerCount没有修改成功,继续获取状态,继续修改
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
//用锁保护的woker set
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
//让worker开始执行
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
} - 如果是第一个task,则取出来,
- 不断循环从队列中取数据,不断执行任务.如果在keepAliveTime时间内没有取到任务,线程就自动退出了
1 | final void runWorker(Worker w) { |
线程池退出
主要流程是:
- 修改线程池状态
- 中断idle的线程
- 等待原先加入到线程中的任务被执行完
下面对照代码来解释前面的流程:这里设置线程中断标志好像并不是必要的,因为只要线程池处于TERMINATED状态(在调用shutdown之后,线程池的SHUTDOWN状态是内部状态),无法再提交任务,队列中的任务总会被执行完。这也是线程池设计的目标,不会丢掉任何一个没有执行的任务。并且中断标志并不会终止线程,只是设置标志。所以我理解这里是保持一致性,在线程池被调用shutdown之后,需要保持线程的中断标志都被设置。可能在提交的任务里有根据中断来作为终止条件的.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//设置下一个状态
advanceRunState(SHUTDOWN);
//中断那些idle的,
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//设置state
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
总结
开头提出的问题:
- 如何保持线程的keepAlive?
如果在keepAlive时间内,在阻塞队列中没有取到任务,线程就执行完毕并退出 - 如何执行任务?
用一个while循环,不断从阻塞队列中取任务,并调用task.run() - 如何退出?
线程池会维护内部的状态,在调用shutdown之后,就设置为TERMINATED,等待阻塞队列中已有任务执行完,线程会挨个终止,自然线程池也就退出了