0%

线程池

java的执行流模型

java应用是单进程多线程模式,执行流的实体是线程,每个线程拥有自己独立的调用栈,堆和打开文件是共享的。线程实际上是对体系结构中的多线程的支持。

java的多线程

线程是一个对象,这个对象可以执行某个函数,我暂且叫他运行函数。此外线程还有休眠,中断,加入,和线程私有存储等功能。实际上线程更像是一个worker,worker有执行task的能力,并且worker还可以和其他worker协作。当程序员需要另起一个执行流的时候,就需要使用线程.大白话就是你用一个线程来放歌,一个线程来浏览网页,一个线程来下载电影. 在大数据技术栈中的zk,es,hadoop等等都是这种多线程模式,一组线程负责心跳,一组线程负责内部rpc,一组线程提供外部访问

简单介绍下jvm的线程实现

jvm实际上是一个c++程序,在linux中被执行后就进入jvm的入口函数,这里会封装env,开辟堆内存,初始化jit等等,最后会启动一个main线程,来执行程序员编写的main函数。Main线程和普通线程的执行模型区别并不大。下面是创建普通线程的代码. 只有在调用线程start方法时,才会真正在linux中创建线程

1
2
3
4
Thread thread = new Thread(()->{

});//这里Thread实际是一个普通对象,对其中的parent,线程id做了一些赋值
thread.start();//这里才真正在操作系统中创建线程,并加入linux的就绪队列

注意 判断在jvm主进程中还是在java虚拟机中:

  1. 只要是调用了native方法,都是在jvm主进程中直接执行的.普通的java方法都是经过jvm字节码翻译/编译后执行的。比如gc和jdk库中的一些native方法.
  2. 如果要访问java虚拟机外的资源就只能在jvm主进程中
    Main线程是jvm主进程帮忙创建,并调用start方法.
    这里借用一下网上的一张时序图.start0() 这个本地方法,最终映射到了 JVM_StartThread 这个方法中,在``hotspot\src\share\vm\prims\jvm.cpp`
    线程启动
    如果你对C++比较熟悉的话,会发现java的线程实际上还是pthread_create出来的,所以线程的sleep,interput都是调用的native方法,因为需要访问linux的线程,在jvm中是做不到的。

线程中断

和cpu中断类似,对于执行流模型来说,都需要一个中断机制来中断执行流.cpu通过中断来响应外部事件,陷入操作系统等.*
线程中断可以用做终止线程的标志位;对于线程阻塞在某些可中断方法的场景,中断会抛出中断异常,让程序员来处理中断阻塞方法的后置处理
注意

  1. 线程中断标志位并不会终止线程,只是设置了一个标志位
  2. Thread.interupted() 方法会有副作用,清空当前的标志位。我觉得这个叫clearInterupted更好,返回值是当前的中断标志位。因为需要提供一个方法来清空标志位.

线程池的作用

线程池是一种资源池,可以达到复用的效果.此外,还可以增加并行度(因为线程是一个worker).

  • 常见的资源线程池有:数据库链接池等.
  • 常见的任务线程池有:tomcat的service线程池,netty的event事件处理线程池,spring的定时任务线程池等
    线程池在软件工程上来说,也实现了线程调度和业务逻辑的分离。

jdk的线程池框架

采用的还是抽象基类的模式.Executor是最顶层接口. AbstractExecutorService是抽象基类.常见的线程池实现有定时任务线程池ScheduledThreadPoolExecutor,实时线程池ThreadPoolExecutor
继承

jdk线程池的特性

如何描述一个线程池?自然的是池里面有多少个线程,因此有核心线程数(最小线程数),最大线程数,线程存活时间.线程池还需要给定一个异常处理策略,否则线程池不知道该如何处理执行发生时的异常.线程池也是一个资源池,访问资源会出现排队的情况,因此也需要指定线程池的队列大小

  1. 核心线程数,最大线程数
  2. 异常处理策略
  3. 队列大小
  4. 线程存活时间

ThreadPoolExecutor详解(基于jdk11)

先提出几个问题?
如何维护线程keepAlive?
如何执行任务?
线程池如何退出?

线程池状态维护

线程池维护了几个成员,corePoolSize,maxinumPoolSize,keepAliveTime,rejectHandler,workQueue(保存任务的阻塞队列),mainLock(主要是用来控制对线程池成员的并发访问的),ctl(是个int,但是分段使用的,高位保存的是线程池的状态,低位保存的是线程的woker数量)

线程池总共有5个状态,所以用到了int的高3位. 状态机是 RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED,越往后数字越大。
正常关闭是RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED; 立即关闭是RUNNING -> STOP -> TIDYING -> TERMINATED

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//1向右移动29位,来到第30位,再 减1变成, 连续的29个1

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 111, 补码,最低位是3个1 ,这个状态是里面最小的,
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 011

...
private static boolean runStateLessThan(int c, int s) {
return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 比SHUTDOWN小的就只有RUNNING,也可以用runStateOf(ctl) == RUNNING 来比较,搞不懂为啥这样写,逻辑反而更复杂了
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

...
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}


...

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; } //state对应的是work count那一段全部为0
private static int workerCountOf(int c) { return c & COUNT_MASK; } // work count对应的是取mask
private static int ctlOf(int rs, int wc) { return rs | wc; } //ctl = runState + workCount ,位运算的知识

执行任务

当在调用submit/execute时,主要流程如下:

  1. 先检查线程池状态,如果已经处于SHUTDOWN或者更高级别,则不往线程池里面加;如果是running,并且小于coreSize,则新创建线程来执行
  2. 如果大于coreSize,则看能否加到队列里去
  3. 如果加不到队列里去,如果没有大于max线程数,则开启一个线程去处理
    下面对照代码来解释前面的流程:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public void execute(Runnable command) {
    if (command == null)
    throw new NullPointerException();
    int c = ctl.get();
    //小于coreSize,则直接addWorker;
    if (workerCountOf(c) < corePoolSize) {
    //addWorker内部会检查线程池状态,如果处于SHUTDOWN就不再加了
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }
    //如果是运行中,则尝试加入到队列中
    if (isRunning(c) && workQueue.offer(command)) {

    int recheck = ctl.get();
    // //并发访问控制在在addWorker中已经做了,这里主要是在shutdown状态下,不再加入任务,将先前加入的并移出去
    if (!isRunning(recheck) && remove(command))
    reject(command);
    //如果是running,并且没有大于maxSize则开启一个线程
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);//这里第二参数表示非核心线程的意思
    }
    //如果不是running,并且没有大于maxSize则开启一个线程;如果队列满了并且没有大于maxSize则开启一个线程;
    else if (!addWorker(command, false))
    reject(command);
    }
    接下来看addWorker方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
    //这里如果是SHUTDOWN以上的,就直接终止流程
    if (runStateAtLeast(c, SHUTDOWN)
    && (runStateAtLeast(c, STOP)
    || firstTask != null
    || workQueue.isEmpty()))
    return false;

    for (;;) {
    //校验最大线程数在这里
    if (workerCountOf(c)
    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
    return false;
    //如果修改workerCount成功了,就表示增加woker成功了,
    if (compareAndIncrementWorkerCount(c))
    break retry;
    //如果workerCount没有修改成功,继续获取状态,继续修改
    c = ctl.get(); // Re-read ctl
    if (runStateAtLeast(c, SHUTDOWN))
    continue retry;
    // else CAS failed due to workerCount change; retry inner loop
    }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    //创建worker
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    // Recheck while holding lock.
    // Back out on ThreadFactory failure or if
    // shut down before lock acquired.
    int c = ctl.get();

    if (isRunning(c) ||
    (runStateLessThan(c, STOP) && firstTask == null)) {
    if (t.getState() != Thread.State.NEW)
    throw new IllegalThreadStateException();
    //用锁保护的woker set
    workers.add(w);
    workerAdded = true;
    int s = workers.size();
    if (s > largestPoolSize)
    largestPoolSize = s;
    }
    } finally {
    mainLock.unlock();
    }
    //让worker开始执行
    if (workerAdded) {
    t.start();
    workerStarted = true;
    }
    }
    } finally {
    if (! workerStarted)
    addWorkerFailed(w);
    }
    return workerStarted;
    }
    接下来看worker类,worker继承了AQS.它的lock逻辑上表达的是正在执行的,非idle的.主要流程是:
  4. 如果是第一个task,则取出来,
  5. 不断循环从队列中取数据,不断执行任务.如果在keepAliveTime时间内没有取到任务,线程就自动退出了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
 final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//不断取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//前置后置钩子函数
beforeExecute(wt, task);
try {
//执行
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();

// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//超过keepAliveTIme,就返回
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

线程池退出

主要流程是:

  1. 修改线程池状态
  2. 中断idle的线程
  3. 等待原先加入到线程中的任务被执行完
    下面对照代码来解释前面的流程:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49

    public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    checkShutdownAccess();
    //设置下一个状态
    advanceRunState(SHUTDOWN);
    //中断那些idle的,
    interruptIdleWorkers();
    onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
    mainLock.unlock();
    }
    tryTerminate();
    }

    final void tryTerminate() {
    for (;;) {
    int c = ctl.get();
    //设置state
    if (isRunning(c) ||
    runStateAtLeast(c, TIDYING) ||
    (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
    return;
    if (workerCountOf(c) != 0) { // Eligible to terminate
    interruptIdleWorkers(ONLY_ONE);
    return;
    }

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    try {
    terminated();
    } finally {
    ctl.set(ctlOf(TERMINATED, 0));
    termination.signalAll();
    }
    return;
    }
    } finally {
    mainLock.unlock();
    }
    // else retry on failed CAS
    }
    }

    这里设置线程中断标志好像并不是必要的,因为只要线程池处于TERMINATED状态(在调用shutdown之后,线程池的SHUTDOWN状态是内部状态),无法再提交任务,队列中的任务总会被执行完。这也是线程池设计的目标,不会丢掉任何一个没有执行的任务。并且中断标志并不会终止线程,只是设置标志。所以我理解这里是保持一致性,在线程池被调用shutdown之后,需要保持线程的中断标志都被设置。可能在提交的任务里有根据中断来作为终止条件的.

总结

开头提出的问题:

  1. 如何保持线程的keepAlive?
    如果在keepAlive时间内,在阻塞队列中没有取到任务,线程就执行完毕并退出
  2. 如何执行任务?
    用一个while循环,不断从阻塞队列中取任务,并调用task.run()
  3. 如何退出?
    线程池会维护内部的状态,在调用shutdown之后,就设置为TERMINATED,等待阻塞队列中已有任务执行完,线程会挨个终止,自然线程池也就退出了

Welcome to my other publishing channels