学到就是赚到,面试加分项之 WebServer 线程池管理!

2022-08-16 10:18:55 +08:00
 yanhomlin

大家好,这篇文章我们来讨论一个话题,怎么去管理 SpringBoot 内置的三大 WebServer ( Tomcat 、Jetty 、Undertow )的线程池,包括监控告警、动态调参。

不管是应对越来越卷的面试考察,还是自己项目日常性能调优,绝对有用,学到就是赚到,搬好椅子,开始我们的分析之旅。


写在前面

要想去管理第三方组件的线程池,首先肯定要对这些组件有一定的熟悉度,了解整个请求的一个处理过程,找到对应处理请求的线程池,这些线程池不一定是 JUC 包下的 ThreadPoolExecutor 类,也可能是组件自己实现的线程池,但是基本原理都差不多。

Tomcat 、Jetty 、Undertow 这三个 WebServer 都是这样,他们并没有直接使用 JUC 提供的线程池实现,而是自己实现了一套,或者扩展了 JUC 的实现。翻源码找到相应的目标线程池后,然后看有没有暴露 public 方法供我们调用获取,如果没有就需要考虑通过反射来获取了。

下面我们来简单分析下这三大 WebServer 的线程池内部实现。


Tomcat 内部线程池的实现

1.继承 JUC 原生 ThreadPoolExecutor ( 9.0.50 版本及以下),并覆写了一些方法,主要 execute() 和 afterExecute()

2.继承 JUC 的 AbstractExecutorService ( 9.0.51 版本及以上),代码基本是拷贝 JUC 的 ThreadPoolExecutor ,也相应的微调了 execute() 方法执行流程

注意 Tomcat 实现的线程池类名称也叫 ThreadPoolExecutor ,名字跟 JUC 下的是一样的,Tomcat 的 ThreadPoolExecutor 类 execute() 方法如下:

public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

可以看出他是先调用父类的 execute()方法,然后捕获 RejectedExecutionException 异常,再去判断如果任务队列类型是 TaskQueue ,则尝试将任务添加到任务队列中,如果添加失败,证明队列已满,然后再执行拒绝策略,此处 submittedCount 是一个原子变量,记录提交到此线程池但未执行完成的任务数(主要在下面要提到的 TaskQueue 队列的 offer()方法用),为什么要这样设计呢?继续往下看!

 @Override
    public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

可以看到他在入队之前做了几个判断,这里的 parent 就是所属的线程池对象

1.如果 parent 为 null ,直接调用父类 offer 方法入队

2.如果当前线程数等于最大线程数,则直接调用父类 offer()方法入队

3.如果当前未执行的任务数量小于等于当前线程数,仔细思考下,是不是说明有空闲的线程呢,那么直接调用父类 offer()入队后就马上有线程去执行它

4.如果当前线程数小于最大线程数量,则直接返回 false ,然后回到 JUC 线程池的执行流程回想下,是不是就去添加新线程去执行任务了呢

5.其他情况都直接入队

1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

2.如果当前线程数大于核心线程数且队列没满,则将任务放入任务队列等待执行

3.如果当前当前线程池数大于核心线程池,小于最大线程数,且任务队列已满,则创建新的线程执行提交的任务

4.如果当前线程数等于最大线程数,且队列已满,则拒绝该任务

可以看出当当前线程数大于核心线程数时,JUC 原生线程池首先是把任务放到队列里等待执行,而不是先创建线程执行。

如果 Tomcat 接收的请求数量大于核心线程数,请求就会被放到队列中,等待核心线程处理,这样会降低请求的总体处理速度,所以 Tomcat 并没有使用 JUC 原生线程池,利用 TaskQueue 的 offer()方法巧妙的修改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:

1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

2.如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务

3.如果当前线程数等于最大线程数,则将任务放入任务队列等待执行

4.如果队列已满,则执行拒绝策略

    public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {
        TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer;
        return new ExecutorWrapper(POOL_NAME,
                tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor());
    }
spring:
  dynamic:
    tp:
      // 省略其他配置项
      tomcatTp:    # tomcat webserver 线程池配置
        corePoolSize: 100
        maximumPoolSize: 200
        keepAliveTime: 60

Tomcat 线程池就介绍到这里吧,通过以上的一些介绍想必大家对 Tomcat 线程池执行任务的流程应该比较清楚了吧。


Jetty 内部线程池的实现

public void execute(Runnable job)
    {
        // Determine if we need to start a thread, use and idle thread or just queue this job
        int startThread;
        while (true)
        {
            // Get the atomic counts
            long counts = _counts.get();

            // Get the number of threads started (might not yet be running)
            int threads = AtomicBiInteger.getHi(counts);
            if (threads == Integer.MIN_VALUE)
                throw new RejectedExecutionException(job.toString());

            // Get the number of truly idle threads. This count is reduced by the
            // job queue size so that any threads that are idle but are about to take
            // a job from the queue are not counted.
            int idle = AtomicBiInteger.getLo(counts);

            // Start a thread if we have insufficient idle threads to meet demand
            // and we are not at max threads.
            startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;

            // The job will be run by an idle thread when available
            if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
                continue;

            break;
        }

        if (!_jobs.offer(job))
        {
            // reverse our changes to _counts.
            if (addCounts(-startThread, 1 - startThread))
                LOG.warn("{} rejected {}", this, job);
            throw new RejectedExecutionException(job.toString());
        }

        if (LOG.isDebugEnabled())
            LOG.debug("queue {} startThread={}", job, startThread);

        // Start a thread if one was needed
        while (startThread-- > 0)
            startThread();
    }
    public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {
        JettyWebServer jettyWebServer = (JettyWebServer) webServer;
        return new ExecutorWrapper(POOL_NAME, jettyWebServer.getServer().getThreadPool());
    }
spring:
  dynamic:
    tp:
      // 省略其他配置项
      jettyTp:    # jetty weberver 线程池配置
        corePoolSize: 100
        maximumPoolSize: 200

Undertow 内部线程池的实现

    public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {

        UndertowWebServer undertowWebServer = (UndertowWebServer) webServer;
        val undertow = (Undertow) ReflectionUtil.getFieldValue(UndertowWebServer.class, "undertow", undertowWebServer);
        if (Objects.isNull(undertow)) {
            return null;
        }
        return new ExecutorWrapper(POOL_NAME, undertow.getWorker());
    }
spring:
  dynamic:
    tp:
      // 省略其他配置项
      undertowTp:      # undertow webserver 线程池配置
        corePoolSize: 100
        maximumPoolSize: 200
        keepAliveTime: 60

总结

以上介绍了 Tomcat 、Jetty 、Undertow 三大 WebServer 内置线程池的一些情况,重点介绍了 Tomcat 的,篇幅有限,其他两个感兴趣可以自己分析,原理都差不多。同时也介绍了基于 DynamicTp 怎么动态调整线程池的参数,当我们做 WebServer 性能调优时,能动态调整参数真的是非常好用的。

再次欢迎大家使用 DynamicTp 框架,一起完善项目。


关于 DynamicTp 框架

DynamicTp 是一个基于配置中心实现的轻量级动态线程池管理工具,主要功能可以总结为 动态调参、通知报警、运行监控、三方包线程池管理等几大类。

经过几个版本迭代,目前最新版本 v1.0.7 具有以下特性

特性


项目地址

目前累计 1.6k star ,感谢你的 star ,欢迎 pr ,业务之余一起给开源贡献一份力量

官网https://dynamictp.cn

gitee 地址https://gitee.com/dromara/dynamic-tp

github 地址https://github.com/dromara/dynamic-tp


加入社群

看到这儿,请给项目一个 star,你的支持是我们前进的动力!

使用过程中有任何问题,或者对项目有什么想法或者建议,可以加入社群,跟群友一起交流讨论。

微信群已满 200 人,可以关注微信公众号,加我个人微信拉群(备注:dynamic-tp )。

1533 次点击
所在节点    程序员
1 条回复
lmshl
2022-08-16 11:33:31 +08:00
别动态线程池了,还不如把 CompletableFuture 用对用好,动态线程池纯粹是先把代码写屎,再在屎上雕花的不必要方案。
JVM 往保守了说有 fiber ,CompletableFuture 可以用,往激进了说还有 kotlin suspend ,展望未来还可以上 loom ,不管哪条路都没有动态线程池的活路。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/873124

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX