为什么 CompletableFuture 的 thenApplyAsync 没有新起一个线程?

2020-08-22 15:16:00 +08:00
 amiwrong123
    public static void test() {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            String supplyAsyncResult = " "+Thread.currentThread().getName()+" Hello world! ";
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(supplyAsyncResult);
            return supplyAsyncResult;
        }).thenApplyAsync(r -> {  //添加后续任务
            String thenApplyResult = Thread.currentThread().getName()+r + " thenApply! ";
            System.out.println(thenApplyResult);
            return thenApplyResult;
        });

        try {
            System.out.println(completableFuture.get() + " finish!");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

打印:

 ForkJoinPool.commonPool-worker-9 Hello world! 
ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world!  thenApply! 
ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world!  thenApply!  finish!
        public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;  //只是为了防止内存泄漏,方便 GC
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());  //执行 task
                    } catch (Throwable ex) {       //执行 task 期间抛出了异常
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }

从源码上来看,supplyAsync 新起了一个线程,等到线程执行完 task,开始执行 d.postComplete(),即开始执行后续 task,然后 postComplete 会执行后续 task 的 completion 对象的 tryFire 方法。

    static final class UniApply<T,V> extends UniCompletion<T,V> {
        Function<? super T,? extends V> fn;
        UniApply(Executor executor, CompletableFuture<V> dep,
                 CompletableFuture<T> src,
                 Function<? super T,? extends V> fn) {
            super(executor, dep, src); this.fn = fn;
        }
        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
            if ((d = dep) == null ||
                !d.uniApply(a = src, fn, mode > 0 ? null : this))//这里会发现前一个 stage 执行完毕,但提供了线程池
                return null;
            dep = null; src = null; fn = null;
            return d.postFire(a, mode);
        }
    }
    final <S> boolean uniApply(CompletableFuture<S> a,
                               Function<? super S,? extends T> f,
                               UniApply<S,T> c) {
        Object r; Throwable x;
        if (a == null || (r = a.result) == null || f == null)
            return false;
        tryComplete: if (result == null) {
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            try {
                if (c != null && !c.claim())//会执行到这里,然后发现 claim 返回 false
                    return false;
                @SuppressWarnings("unchecked") S s = (S) r;
                completeValue(f.apply(s));
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }
        final boolean claim() {
            Executor e = executor;
            if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
                if (e == null)
                    return true;
                executor = null; // disable
                e.execute(this); //会执行到这里,然后把 this completion 对象提交给线程池执行,当前线程即将返回
            }
            return false;
        }

我的问题在于,当 worker-9 线程执行完第一个 task 之后,它把第二个 task 提交给了 executor (e.execute(this)),然后线程就返回了(从 claim 函数一层一层返回,直到返回 postComplete )。那为什么第二个 task 从打印结果来看,还是同一个 worker-9 线程来执行的?

还是说,只是因为我的例子比较简单,所以 executor 没有分配一个新的线程出来,其他情况下,thenApplyAsync 里面在执行e.execute(this)时,还是有可能新起一个线程的吗?

2182 次点击
所在节点    Java
8 条回复
passerbytiny
2020-08-22 15:47:17 +08:00
supplyAsync 和 thenApplyAsync 虽然都是异步调用,但它们两个之间是串行的,为什么就不能在一个线程(执行器)中被执行。
amiwrong123
2020-08-22 15:56:32 +08:00
@passerbytiny
没说不可以,它们之间肯定是串行的,但不一定是同一个线程吧。从源码上可见,supplyAsync 的线程并不是直接执行下一个 task 的,因为它 e.execute(this)之后就马上返回了。
zyoo
2020-08-22 16:05:42 +08:00
async 的语义是不一定同一个线程,所以这个只能说是巧合了,你可以多试几把?
amiwrong123
2020-08-22 16:12:49 +08:00
@zyoo
多试几次也一样。我怀疑这跟 ForkJoinPool.commonPool()的线程调度有关系,但我现在还没来得及看它的原理呢。。
passerbytiny
2020-08-22 17:02:26 +08:00
@amiwrong123 异步任务都是将任务提交给执行器去执行的,而不是从线程池取出一个线程用来执行任务。选择哪个线程是由执行器自行决定的,任务的提交者很难也不该对线程选择产生影响。

从高层次上看,thenApplyAsync 是在 applyAsync 完成之后执行的,所以最优选择就是两者使用同一个线程(线程唤醒也是有成本的)。从源码看,你要主要看的应该是 Executor 的源码。
amiwrong123
2020-08-22 17:18:45 +08:00
@passerbytiny
好吧,大概理解了。主要之前我以为我这个例子,applyAsync 和 thenApplyAsync 的执行线程肯定是同一个线程,但从源码上看发现 前一个线程只是提交任务给 Executor 而已。

所以,applyAsync 和 thenApplyAsync 的执行线程不一定是同一个呗。只是这个例子里,线程池是这样调度的。
XuHuan1025
2020-08-22 22:20:55 +08:00
木宝厉害哦
RedBeanIce
2020-08-24 19:15:47 +08:00
@amiwrong123 #5
@passerbytiny #6

JDK8
求问一下,我也是看到这里将任务提交到 Executor
e.execute(new AsyncRun(d, f));
那么下一步应该看 forkjoinpool ?因为默认是他。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/700517

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX