Java 消费 Kafka 如何精确控制部分 consumer 的消费速率?

2023-12-19 12:50:44 +08:00
 Tongwin
请教各位大佬。
现状:我的应用如下:
跑一个 Java 应用,消费多个 topic 。创建了好几个 consumer 进行消费,每次 consumer.poll 得到的 records 都通过线程池 ThreadPoolExecutor 处理数据。每个 consumer 对应一个 ThreadPoolExecutor

新需求:现需要新增消费一个 topic ,该 topic 的量比存量的 topic 要远远大于。因此需要控制一下该 topic 每个批次消费到的数量。

发现问题:刚开始我以为只需要配置 max.poll.record 就可以控制每个批次的消费速率,但经过测试发现,由于每次消费到的 records 都让线程池去处理了,因此 consumer.poll 一次数据在一个批次内就识别到很快就处理完,然后 consumer 就会在一个批次内尽可能地去 poll 多几次。这样就没法实现每个批次控制了。

请教大家:针对以上情况有优化的可能性吗?需要尽可能精确控制指定 topic 一个批次内只需要消费固定的数据量,我目前发现 sparkStreaming 倒是很好地控制,但是 Java 目前没找到合适的方案来实现控制。
4378 次点击
所在节点    Java
44 条回复
Tongwin
2023-12-21 09:59:35 +08:00
@flmn hi 大佬,我昨天就已经在测试了,配置大概是 max.poll.record 设置为 200 ,fetch.min.bytes 使用默认值。 通过限流打日志查看,每一次 poll 都是 200 。不过是在本地单实例跑的。 我大概懂你意思了,你说的情况应该是,我在消费的同时,上游也在造数据。如果我的消费速度超过生产速度,那么确实会出现,上游推来一条,我就消费 1 条的情况。
codedreamstar
2023-12-21 13:42:01 +08:00
你是在自己封装类似 Spring for Kafka 这样的框架吗? 那就照着 Spring for Kafka 的设计思路抄, 按照你们的需求精简一下.

不要在框架侧依赖这些配置以达到既定的框架逻辑, 这些配置都是业务侧来根据业务情况配置的, 你在框架层面能获取到的信息必定小于业务侧, 这些配置是给业务侧介入底层的手段.

框架需要做的是隐藏消费者创建的细节, 消息路由到消息处理方法的细节, 获取\提交消息的细节等各种可以封装的细节, 提供给业务侧封装好的接口或者使用方法就好.

你的设计应该是为每个消费者创建一个线程, 这个线程死循环 poll 以及 poll 之后对消息的路由以及处理, 消费完自动就该 poll, poll 之后就开始消费, 根本不存在需要限制速率的地方, 消费速度就是速率, 需要提高并发度只需要控制创建的消费者数量就行(当然要有对应的分区数量).

我看你的文章应该是给每一个消费者配了一个线程池, 路子是错的, 先不谈速率问题, 在业务侧按分区顺序消费都已经没有办法了, 同一个分区的消息都被线程池给并行了.

如果有我误解的地方欢迎你再回复我.
codedreamstar
2023-12-21 14:09:40 +08:00
我页面一直没刷新, 没看到你的回复, 继续按你的场景回复.

你的中间进程下发 B 本身就是有限流逻辑的吧. 否则按照 A 与中间进程通过 Redis 交互本身就会造成与中间进程对 B 这个链路的生产消费速率失调.

如果中间进程只是获取数据并下发应用 B, 不涉及对数据在加工, 直接把中间进程和 Redis 砍掉, 这个下发逻辑合并到消费者的消费逻辑.

如果中间进程负责数据加工再下发, 那就把 Kafka 的逻辑合并到那个中间进程的应用上, 再按照上述方法.

如果现在应用 A 和中间进程不能合并, 那么就把 Redis 砍掉, 使用同步调用 A-中-B 的方式, 中间失败重试就行, 或者 A 与中间进程不使用 Redis, 也换成 Kafka, 以免生产速率过高 Redis 爆内存, 中间进程与 B 使用上述方法, 超过 TPS 报错就重试, 中间加个等待步进或者限流器.

最好的情况就是沟通应用 B, 让 B 提供异步接口, TPS 问题让他们内部解决, 你就可以看情况把中间没用步骤都砍掉.
Tongwin
2023-12-28 15:39:47 +08:00
@codedreamstar 大佬真的万分抱歉,1 周后才来回复你信息。 最近杂事缠身。我们用 kafka 并不是自己封装的,也是使用正常的依赖,由于本身架构问题,所以没法使用 Spring for kafka 。不过后续我们需要重构这个项目,后面以 SpringBoot 来框架进行搭建就会用 Spring for kafka 去设计了。 尴尬的是中间进程下发 B 是没有限流逻辑,我们后面会优先开发这一块。之前的数据的实时速率都没有达到应用 B 的峰值。
后面应用重构,确实是考虑过把 redis 砍掉,原有 redis 功能是为了保证数据不丢失(比如应用 B 处理失败,应用 A 有相关的重发机制),后续重构我的想法是砍掉 redis , 消费 topic 进而下发数据,如果应用 B 处理失败,则应用 A 把失败的数据推送到另一个 topic-C 。 应用 A 继续消费 topic-C 的数据来实现重发机制。
感谢大佬提供的思路,让我对 kafka 以及项目设计有了进一步的认识。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1001620

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX