关于 Reactor API,如何写异常后恢复

179 天前
 zhady009

方法client.downloadFile会不断发射FilePart,需要collec写入到文件中即可。

在接收FilePart期间会有网络等其他异常,现在直接用onErrorResume从 offset 开始请求返回新的 Flux 会有一个问题。 第一次异常会进入onErrorResume返回新的 Flux ,由于新的 Flux 没有声明onErrorResume就噶了

我也不可能在新的 Flux 里声明onErrorResume,无限套娃了属于是。

client.downloadFile(fileReferenceId)
    .publishOn(Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor()))
    .timeout(Duration.ofMinutes(3))
    .onErrorResume(RpcException::class.java) {
        if (it.error.errorCode() == TIMEOUT_CODE && monitoredChannel.isDone().not()) {
            log.warn("Download timeout, resuming: $fileDownloadPath")
            return@onErrorResume client.downloadFile(
                fileReferenceId,
                monitoredChannel.getDownloadedBytes(),
                MAX_FILE_PART_SIZE,
                true
            )
        }
        Flux.error(it)
    }
    .collect({ monitoredChannel }, { fc, filePart ->
        fc.write(filePart.bytes.nioBuffer())
    })
    .doOnSuccess {
        tempDownloadPath.moveTo(fileDownloadPath)
        downloadCounting.incrementAndGet()
        log.info("Downloaded file: $fileDownloadPath")
    }
    .doOnError {
        log.error("Error downloading file:$fileDownloadPath", it)
    }
    .onErrorMap {
        wrapRetryableExceptionIfNeeded(it)
    }
    .doFinally {
        runCatching {
            closePath(fileDownloadPath)
        }.onFailure {
            log.error("Error closing file channel", it)
        }
        hashingPathMapping.remove(hashing)
    }
    .block()
514 次点击
所在节点    程序员
3 条回复
guyeu
179 天前
retry 操作符?
zhady009
179 天前
@guyeu 一开始就用的 retry 不过流会重头开始
yuhongtai114514
178 天前
把 flux 中的动作先用操作符转成 mono ,然后把 retry 挂在 mono 上试试?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/991815

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX