Java 消费 Kafka 如何精确控制部分 consumer 的消费速率?

2023-12-19 12:50:44 +08:00
 Tongwin
请教各位大佬。
现状:我的应用如下:
跑一个 Java 应用,消费多个 topic 。创建了好几个 consumer 进行消费,每次 consumer.poll 得到的 records 都通过线程池 ThreadPoolExecutor 处理数据。每个 consumer 对应一个 ThreadPoolExecutor

新需求:现需要新增消费一个 topic ,该 topic 的量比存量的 topic 要远远大于。因此需要控制一下该 topic 每个批次消费到的数量。

发现问题:刚开始我以为只需要配置 max.poll.record 就可以控制每个批次的消费速率,但经过测试发现,由于每次消费到的 records 都让线程池去处理了,因此 consumer.poll 一次数据在一个批次内就识别到很快就处理完,然后 consumer 就会在一个批次内尽可能地去 poll 多几次。这样就没法实现每个批次控制了。

请教大家:针对以上情况有优化的可能性吗?需要尽可能精确控制指定 topic 一个批次内只需要消费固定的数据量,我目前发现 sparkStreaming 倒是很好地控制,但是 Java 目前没找到合适的方案来实现控制。
4376 次点击
所在节点    Java
44 条回复
Tongwin
2023-12-19 17:48:23 +08:00
@dd31san 感觉思路是可行的。不过现阶段 consumer 配置是 auto.commit 自动提交偏移量的。如果改成手动提交偏移量,得重新评估影响范围了。
Tongwin
2023-12-19 17:49:30 +08:00
@liuhan907 是的,我现在就是用令牌桶来实现 1 秒只 poll 一次,设定 poll 的最大数量来实现
opengps
2023-12-19 17:49:43 +08:00
我怀疑我没看懂这个题目,为啥别人都在设法提速,而 op 却在设法限速降速呢?
liuhan907
2023-12-19 17:50:10 +08:00
而且还可以用 redis 平滑迁移到多进程消费者,我认为是最优解了。
ZZ74
2023-12-19 18:52:46 +08:00
@Tongwin
设计时,同一个消费者组,一般一个消费者能分到至少一个分区。建议你看看项目代码,确定消费者和分区数量配比
flmn
2023-12-19 20:35:56 +08:00
我猜你是不是想,poll 一次多拿几条数据一起处理,假设 100 ,而不是一条条拿?
因为我看你在设置 max.poll.record 参数。
这里面有一个小坑,光设置 max.poll.record 没用,你看一下 fetch.min.bytes 参数,设大一点试试。
另外,fetch.max.wait.ms 也要设置合适,这样,时间到了有多少条拿多少条。这样既实现了批量,也保证一定的实时性。
Takamine
2023-12-20 11:49:09 +08:00
我很好奇你们这个多线程消费加自动提交的实现里需要提交的这个 offset 是怎么确定的,还是做了消费端幂等?
ymz
2023-12-20 12:17:34 +08:00
当使用 Kafka 时,可以针对不同的 topic 设置不同的消息拉取数量。这可以通过配置 Kafka consumer 的属性来实现。在 Spring Boot 中,可以使用 @KafkaListener 注解来监听指定的 topic ,并且可以为每个 @KafkaListener 注解配置不同的消费者属性。

要为不同的 topic 设置不同的消息拉取数量,可以按照以下步骤进行操作:

创建不同的 Kafka consumer 配置类,每个配置类对应一个 topic 。
在每个配置类中设置不同的消息拉取数量。
在消费者服务中,使用 @KafkaListener 注解指定要监听的 topic ,并引用相应的 Kafka consumer 配置类。
codedreamstar
2023-12-20 17:18:14 +08:00
如果消费者处理的消息是无关的, 那么每个消费者消费单个消息只需要加分区和消费者数量
如果消费者处理的消息是相关的, 也就是需要一批一起处理的(如果相关,在生产者就应该打包成一条消息), 那为什么又加个线程池并行跑..
暂且不论是否合理, 你的 offset 是怎么提交的, 在批量用线程池的情况下?
如果是等待这批消息处理完统一提交, 那么通过限制线程池能到达限制效果
如果是丢到线程池中后直接提交, 直接改方案, 这个方案基本等于错误设计


技术上通过各种方式都能实现目标, 但是真的需要在技术面解决吗?
如果方便可以发下业务场景一起探讨一下
codedreamstar
2023-12-20 17:20:56 +08:00
kafka 本身就是 poll 模型, 处理速率和并发度都是预先设置的, 理论上不搞花活是不会出现需要控制消费者消费速率问题的, 极大概率是错误设计或者滥用了
XepMCWEKZ76L695l
2023-12-20 18:15:56 +08:00
用谷歌 guava 的 RateLimiter 限速即可
XepMCWEKZ76L695l
2023-12-20 18:17:43 +08:00
不建议在 poll 这里做限流,很蛋疼
Tongwin
2023-12-21 08:37:34 +08:00
@1Q1 目前我就是用谷歌 guava 的 RateLimiter 来限制指定时间最多 Poll 几次
Tongwin
2023-12-21 08:41:23 +08:00
@flmn 你好,你提到的小坑,设置 max.poll.record 的同时是需要配合 fetch.min.bytes 使用是吧,我理解的是,如果一条数据本身不小,fetch.min.bytes 应该是有一个默认值,如果 max.poll.record*单条数据的大小 > fetch.min.bytes 默认值,实际还是按照默认值可获取的数量来获取吧。
Tongwin
2023-12-21 08:43:52 +08:00
@Takamine 应用里并不需要严格关注 kafka 自动提交 offset 与处理完 records 的数目一致。 目前设置的 auto.commit.interval.ms 是 1 秒,而且应用也有手动每秒往 redis 里写入当前读写的 offset 。
Tongwin
2023-12-21 08:45:59 +08:00
@ymz 感谢大佬提供 springboot 注解的思路,目前应用并不是依赖 springboot 框架搭建的,但后续是有升级到 springboot 框架的需求的。后续在应用需要迁移重构的时候,我会着重构思注解的可行性可实现方式。
Tongwin
2023-12-21 09:10:10 +08:00
@codedreamstar 你好大佬,应用本身设计就是为了尽可能多消费来使用多线程实现的。 目前多线程主要是用来处理数据,且消费者处理的消息是无关的,提到 offset 提交,其实在 poll 到数据后,就先手动把 Offset 保存到 redis 里,然后配置 auto.commit.interval.ms=1 秒去自动提供 offset ,拿到的数据是直接丢到多线程里去异步处理了,应用不需要关注到当前批次的 records 处理完后才更新 offset 。这一点并不是很关注,主要是后续应用处理数据的时候会有各种机制把数据丢到 redis 里,成功的失败的处理都丢到 redis 里。
Tongwin
2023-12-21 09:25:17 +08:00
@codedreamstar 我大概讲一下场景出来吧。 应用 A 设计之初并没考虑到那么长远,初衷也是能消费多快就消费多快。因此就用上了多线程异步处理数据。 处理数据这块其实也只是为了把数据存到 redis 里。 然后我们有另外的进程去从 redis 的队列里拿到数据,然后把这些数据再下发到下游(通过调用下游接口,简称应用 B )。 目前消费的 topic 都是推过来的实时数据,因此各项的 tps 都能够满足;不过应用 B 是有一个峰值的 tps 的。之前来了个需求,新接入一个 topic (简称(topic-new),topic-new 推过来的量是固定的,我这边撑这块业务为:存量初始化。 之前协商好上游提供 topic 过来的时候是控制速率的(因此原本我这边不用考虑限速限流的),后来因沟通问题上游又不作限速处理,最终限速操作只能在应用 A 这边进行。
针对限速这块其实我是有过几个思考方案的
方案一:直接搞一个 Spark 应用来进行存量初始化,Spark 在控制批量消费还是很好控制的
方案二:使用令牌桶对应用 A 特殊的 Consumer 进行限流
方案三:对应用 A 的流入和流出都作限流操作(后续一定会排期对数据流出作限流操作,但是听各位大佬的建议,好像并不推荐对流入数据也作限流操作)
综合考虑各种因素,目前是考虑使用方案二进行限流操作,当完成存量初始化之后就可以下线该 topic 了,后续先实现流出的限流功能,其他功能再考虑可行性。
Tongwin
2023-12-21 09:35:52 +08:00
@flmn 我之前可能理解错意思了, 但我还是有点疑惑, 如果我设置 max.poll.record=1000 ,fetch.min.bytes 默认值是 1 ,你说的小坑是什么场景呢? 我理解的是只要有数据就会获取, 一次 Poll 最多拿 1000 条,如果不足 1000 条就拿剩余的条数回来。
flmn
2023-12-21 09:50:21 +08:00
如果 fetch.min.bytes 设的太小,即使 fetch.min.bytes 设的再大,可能有几条数据,就取回来了,达不到“搓堆儿”的效果,你可以写个程序测试一下,加深理解。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1001620

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX