求助 Java 大量任务分布式处理的问题

2022-04-26 12:30:59 +08:00
 yesterdaysun

问题是这样, 现在系统中有大量去和第三方 API 交互的任务, 比如有 1000 个用户, 每个用户又有各自 1 万个小的记录去和第三方 API 慢慢交互, 或者没有那么多记录但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络, 之前的方式就是一个线程池, 把所有大小任务塞进去, 但是这个线程池大小很难搞, 多了的话, 有时会突然来一堆任务占住 CPU 和数据库, 少了的话, 一大堆任务又阻塞住.

现在想搞成分布式好几台机器一起跑, 考察了一下方案, 有点迷惑:

  1. 一种是分布式任务队列, 看到一个 Celery 好像是这种, 但是这个 python 的, 我想要 Java 的, 结果没找到
  2. 一种是任务调度框架, quartz, xxljob 这种, 感觉我想要的更靠近这种, 但是又有点迷惑, 比如感觉我这种需求适合"分片广播"这种任务, 比如我把 1000 个用户的任务分片到 3 台机器, 但是然后每台机器上的任务为每个用户再单独为他名下的 1 万条记录自己做线程池请求? 或者我把任务拆到单个小记录的级别, 那岂不是得成千上万的 trigger, 然后任务调度又一般是一个主 job, 然后传参数这种, 那比如我要确保一个时间只有一个用户的任务在跑, 怎么做这个限制, 全要自己在任务中处理吗

所以, 其实就是我想找一个比较现成的框架, 能处理超长的任务队列, 分布式, 并发的执行, 可以自动削峰填谷, 有一些任务自动处理, 比如重试, 故障转移等等, 又能够有一些保证一致性的机制, 比如按 job+某个参数确保不会重复执行, 还能程序方式发起调度, 而不是在某个管理后台手动编辑

我想知道这样的东西存在吗, 还是必须自己实现, 求各位大佬赐教

4403 次点击
所在节点    Java
31 条回复
aguesuka
2022-04-26 12:33:50 +08:00
storm
biubiuF
2022-04-26 12:40:35 +08:00
你需要 kafka ,把你现在的 jobs 弄成消费者
RedBeanIce
2022-04-26 12:59:38 +08:00
可能是 xxjob ????我记得有分片处理,,,就是一堆小任务,大家去处理
bthulu
2022-04-26 13:06:09 +08:00
既然时间都消耗在网络 IO 上, 上 windows 系统, 用 IOCP 去调接口, 单机就能搞定了, 用不着搞这么多骚操作
jorneyr
2022-04-26 13:09:21 +08:00
@biubiuF Kafka 的每个 partition 一个消费者组里同时只能有一个消费者进行消费,这种情况我觉得 RabbitMQ 可能更合适,不必明确的限制消费者个数,看情况随时动态增减消费者,每个消息可以使用阻塞的方式执行。
Leexiaobu
2022-04-26 14:11:28 +08:00
Akka
lmshl
2022-04-26 14:15:32 +08:00
改异步纤程,你这才一千万个 IO 小任务,犯不着上分布式。Akka Stream (调度) + Akka HTTP (调 API ) 随便搞一搞单机就完事了
ming159
2022-04-26 14:16:34 +08:00
如你所说:“其实时间都是消耗在网络 IO 上” 线程是不解决 IO 问题的,你需要的是 异步 IO 处理机制。一个线程同时处理多个 IO ,而不是一个线程处理一个 IO 。
ymmud
2022-04-26 14:25:24 +08:00
akka cluster sharding , 根据需求分片就行了
lmshl
2022-04-26 14:25:58 +08:00
我写过一个
所有 fiber 去数据库查任务状态,select * from tasks where state = 'todo',然后执行这一批任务,更新任务状态。
最后并行 128 同时跑所有 fiber
zmal
2022-04-26 14:57:11 +08:00
需求场景是两个问题:
1. 是否要把这部分逻辑从主系统解耦出来。
2. 怎样加快这部分业务的处理速度,减少资源占用,包括但不限于可以任意扩容的分布式、异步 IO 等等。
如果是我的话,个人对 Flink 比较熟悉,可能会选择解耦后用 Flink 来处理,Flink 解决了分布式、一致性容错等问题。
akka 解决的是异步 io 并发量问题,楼上 akka 的方案应该也是可行的。看你对哪个工具比较熟悉了。
git00ll
2022-04-26 15:26:13 +08:00
`但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络`

这句话不明白,啥接口要耗时 10 分钟? 等网络是什么意思。如果接口一次请求响应要 10 分钟,多开点线程如 200-300 个,网络堵塞的时候是不会大量占用 cpu 的。关键如果接口能否承受这么高并发数。
5boy
2022-04-26 16:12:24 +08:00
mark, 有没有不用大数据框架实现的方式?
litchinn
2022-04-26 16:24:53 +08:00
/t/848357 ,隔壁刚提出的这个动态线程池不知道能不能实现这个需求。另外你说线程池大小不好调,换成分布式多个机器跑,那节点数量不是一样需要调整吗,k8s 弹性伸缩?
misaka19000
2022-04-26 16:55:55 +08:00
用协程或者异步 IO
Saurichthys
2022-04-26 16:58:49 +08:00
不要用 xxl-job 的方案,基于数据库,性能不佳,莫名其妙问题很多
yesterdaysun
2022-04-26 17:56:23 +08:00
@git00ll 说的不清楚, 其实是一个长流程, 比如请求一个报告, 但是不会立即返回, 需要等第三方处理好, 才能拿到, 中间就每隔 1-2 分钟去轮询一次看看报告有没有好, 通常都要 10 分钟左右, 关键不是每种任务都是这样的, 如果单为它建一个线程池又感觉有点过了, 想搞个通用的解法

上面的我都研究了一下, 我这个系统比较简单, 本身就是个单体, 并不是分布式的, 这次也只是想要把这个后台任务独立出去搞多机并行, 感觉我这个还不到动用 akka/协程之类的方案的地步, 应该还是简单点, 一个简单的调度系统加动态线程池就足够了, 美团开源的那个动态线程池看上去比较适合, 我先研究一下试试看
polarbear007
2022-04-26 17:56:55 +08:00
个人认为使用非阻塞 io 即可
jekkro
2022-04-26 18:17:50 +08:00
用 redis 实现异步队列即可,一个进程专门负责插入任务到 Redis 队列中,另外几个负责从队列中获取信息并执行,完成后更新数据库里的状态。如果发生 Redis 所在的机器 down 机,则负责插入任务的那个进程重新把没有完成的再插入一遍(不过这个目前为止还没有发生过)。我有类似的业务,已经跑了 12 年了。
另外因为 Redis 有各种复杂数据结构,可以满足延时队列,优先级队列,自动去重等功能。感觉性能优秀,代码简单。
jekkro
2022-04-26 18:21:16 +08:00
不能用非阻塞 io 的原因一般是因为那些接口库不是自己实现的,没办法去改造那些接口底层库,虽然 http 的接口自己也可以实现,但是有些场景(比如各种开放平台的接口库)不可能把第三方提供的接口库重新写一边,而仅仅是为了解决阻塞 io 的问题。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/849331

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX