如下代码,本意是将 flux 流发送到子线程处理,再将处理结果汇聚到主线程。如何能够实现主线程处理报错时停止 Flux 的呢。
    public static void main(String[] args) {
        String[] data = {"2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15"};
        Iterable<Integer> integers = Flux.fromArray(data)
                .flatMapSequential(s -> Mono.fromSupplier(() -> Integer.parseInt(s)).subscribeOn(Schedulers.boundedElastic()), 3)
                .doOnNext(s -> {
                    System.out.println(Thread.currentThread().getName() + "---------->" + s);
                })
                .toIterable();
        for (Integer i : integers) {
            //如何实现这里报错时,停止 Flux
            System.out.println((10 / i) + "------>>>>" + Thread.currentThread().getName());
        }
    }
|      1yazinnnn      2021-12-28 09:14:58 +08:00 fun main() { val data = arrayOf("2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15") val integers = Flux.fromArray(data) .flatMapSequential({ s: String -> Mono.fromSupplier { s.toInt() }.subscribeOn(Schedulers.boundedElastic()) }, 3) val a = integers.doOnNext { s -> println(Thread.currentThread().name + "---------->" + s) }.subscribe() for (i in integers.toIterable()) { try { 10 / i } catch (t: Throwable) { a.dispose() } } CountDownLatch(1).await() } 不清楚你的具体需求,如果整个链路不使用 reactivestream 的话,似乎性能(吞吐)并没什么提高 toIterable()迭代时会阻塞当前线程,这样写跟直接用线程池处理比没啥优点 | 
|  |      2Macolor21      2021-12-28 09:22:53 +08:00 CompletableFuture 最后 join 回主线程? | 
|  |      3git00ll OP @yazinnnn  假设 toInt 这个操作是比较耗时的,可以实现将 toInt 放置在多核上运行,最终结果再汇聚到主线程上。 因为主线程上开启了传统注解事务,需要在主线程上操作 Flux 的处理结果 |