Java 消费 Kafka 如何精确控制部分 consumer 的消费速率?

141 天前
 Tongwin
请教各位大佬。
现状:我的应用如下:
跑一个 Java 应用,消费多个 topic 。创建了好几个 consumer 进行消费,每次 consumer.poll 得到的 records 都通过线程池 ThreadPoolExecutor 处理数据。每个 consumer 对应一个 ThreadPoolExecutor

新需求:现需要新增消费一个 topic ,该 topic 的量比存量的 topic 要远远大于。因此需要控制一下该 topic 每个批次消费到的数量。

发现问题:刚开始我以为只需要配置 max.poll.record 就可以控制每个批次的消费速率,但经过测试发现,由于每次消费到的 records 都让线程池去处理了,因此 consumer.poll 一次数据在一个批次内就识别到很快就处理完,然后 consumer 就会在一个批次内尽可能地去 poll 多几次。这样就没法实现每个批次控制了。

请教大家:针对以上情况有优化的可能性吗?需要尽可能精确控制指定 topic 一个批次内只需要消费固定的数据量,我目前发现 sparkStreaming 倒是很好地控制,但是 Java 目前没找到合适的方案来实现控制。
2820 次点击
所在节点    Java
44 条回复
codedreamstar
141 天前
有没有一种可能, 你的 Java 应用=线程池, 创建的消费者数量=线程数, topic=任务队列, 所以在消费者这里再加一个内存的线程池是为了什么?
至于控制 kafka 消费者查一下 pause 相关的 api, 能暂停或者说挂起消费者
diagnostics
141 天前
Kafka Consumer 不一直都是需要写自旋逻辑不断拉取线程吗?写个计数器和计时器不就完了,超出就 sleep 一下
diagnostics
141 天前
2 分钟写了个案例,如果要更精细用 nanoTime
```java
int MAX_CNT_OF_SEC = 100;
int count = 0;
long lastPoll = -1l;
long ONE_SEC = Duration.ofSeconds(1L).toMillis();
while (true) {
long now = System.currentTimeMillis();
long diff = now - lastPoll;
if (diff < ONE_SEC || count >= MAX_CNT_OF_SEC) {
Thread.sleep(ONE_SEC - diff);
}

ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));

count += records.count();

// TODO
}

```
lsk569937453
141 天前
新需求:现需要新增消费一个 topic ,该 topic 的量比存量的 topic 要远远大于。因此需要控制一下该 topic 每个批次消费到的数量

先把问题表述清楚吧。。。。。你自己看看你说的是啥

两个不同的消费者消费不同的 topic ,这俩之间没任何逻辑上的关系的,所以你限制数量是为了啥????????
diagnostics
141 天前
@diagnostics #3 发完发现漏写 count 归 0 和 更新 lastPoll 了,思路差不多
wqhui
141 天前
没看懂,是因为应用使用线程池处理数据,而拉取线程只管拉取,导致拉取过快,消费跟不上,把应用内存弄爆?这样理解的话可以在拉下来数据推线程池处理的时候限流,阻塞拉取线程
Tongwin
141 天前
@codedreamstar 不好意思之前没讲清楚,创建内存的线程池主要是为了异步处理消费者数量,consumer.poll 是不受内存线程池影响的。
@diagnostics 感谢提供思路,我之前也有考虑过用 sleep 来控制 poll 的次数,但考虑到实例多、消费 Topic 数量多等复杂情况,没有深入了解就用 sleep 感觉不太稳妥。
@lsk569937453 不好意思没有表达清楚需求。 由于存量 topic 推送过来的数据量并不大,因此目前并没有做任何限速处理,现有应用就是尽可能地去消费很多数据(依赖线程池异步处理 record)。 然后新 topic 由于数据量远远大于存量 topic 的数据量,如果不作消费限制的话,对于后续的业务处理是有着极大的压力和风险的。
Tongwin
141 天前
@wqhui 没错,就是大佬你说的意思。目前我考虑到通过使用限流器来限定特定的 consumer 1 秒只 poll 一次
ZZ74
141 天前
自己写代码调用 poll ,可以定时,也可以用 future 控制上一批消费完了再 poll ,注意 rebalance

如果你要控制所有消费者和对应 topic 一起的单批次数量,那就麻烦了。
Tongwin
141 天前
@ZZ74 谢谢大佬提供思路。我的想法是,只需要计算好 topic 对应的分区,多实例消费 topic 的时候,只有获取到分区的实例才能消费到数据。计算一下分区数*限流量应该就可以得到想要的结果了吧?
diagnostics
141 天前
@Tongwin #7 RateLimiter 和 sleep 没区别,重点不是 sleep 而是限流思路,sleep 只是让出 CPU 时间片
iX8NEGGn
141 天前
限流五算法:固定窗口、滑动窗口、滑动日志、漏桶、令牌桶,总有一款适合你
tomorrow092
141 天前
@wqhui

我对 kafka 不了解, 作为头部 mq 产品,他内部没有默认的 消费者本地流控策略?自己能把自己撑死?




我感觉上猜测这种消费流控很常见的需求,大概率上应该只需要配置参数就能实现。多看看其他配置参数。

fetch.max.bytes:单次获取数据的最大消息数。
max.poll.records <= 吞吐量 :单次 poll 调用返回的最大消息数,如果处理逻辑很轻量,可以适当提高该值。默认值为 500
lsk569937453
141 天前
有没有可能根本不需要限流。
kafka 本来让业务主动去拉取,就是让你在拉取的时候控制速率。

fetch-max-wait: 10000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
fetch-min-size: 10 # poll 一次消息拉取的最小数据量,单位:字节
max-poll-records: 100 # poll 一次消息拉取的最大数量

可以完全通过这三个参数控制你的消费速率,直接同步消费就是最好的选择。你却本末倒置,做异步消费然后再限流。本来配置修改一下就可以的事情,你却写一堆代码,把简单的事情搞复杂。
ZZ74
141 天前
@Tongwin
无法理解你设计的时候为什么会有没分配到分区的消费者...
这和分区无关,每个 consumer 都负责 1...N 个分区,简单的就是确保每个 consumer 在上一批完成后再拉取。
你上一次拉了 500 只 commit 了 200 条,consumer 内部也不会让你本地堆积 800 条。
@tomorrow092
OP 的问题很有可能是把拉取到任务提交到线程池后就 commit offset 了,对于 kafka broker 和 consumer 来说就是消费完了。
所以就导致了线程池任务还在跑,consumer 又拉一批过来...然后满了 爆了... rebalance 了等等....
Tongwin
141 天前
@lsk569937453 这个项目我是从前辈那里接过来的。 目前已经在线上稳定运行一段时间。目前是不适宜在短时间内重构它。只想着在现有的情况下,特殊处理一下这个量大的 topic ,这个 topic 后续会下线掉。
@ZZ74 其实就是应用部署在云上有多个实例,每个实例在创建的时候都会尝试去创建 consumer 获取分区。由于都是用同一个消费者组,最终也就只有 topic 分区数的实力能够获取到该 topic 的其中一个分区,我这样理解是没问题的吧?
zhaogaz
141 天前
你怕 consumer 处理的时候,把下游搞弄挂了,然后就尝试去限制 mq ?

这个头疼医头,脚疼医脚的做法感觉不太好。

你说 [后续的业务处理是有着极大的压力和风险的。] ,那么我认为正确的做法是,改动 [后续的业务处理] 的 api 或者是类库,你加一个限流好了。。。而不是去限制 mq

如果有一天,后续业务修改好了,谁能知道限流再 mq 上?谁又能看懂你那一坨代码?
dd31san
141 天前
个人小白,kafka broker 似乎可以直接配置每秒字节数 Quotas 。
直接在消费者上,控制消息条数,思路上也许可以:
1 记录轮次 poll 开始时间 begin ,count 消息数量
2 当 count 达到 limit 时,用 pause 方法暂停避免 rebalance
3 手动提交 offset
4 判断时间,超 过 1s 重置 count 和 begin ,若暂停中则调 resume 恢复
Tongwin
141 天前
@zhaogaz 目前是优先处理一下特殊情况,后续我们会排期在流量出口进行限流。流量入口的限流我们也是有计划排期优化的。感谢
liuhan907
141 天前
看起来你的消费者是一个单进程?那样的话我觉得写一个全局的令牌桶然后按请求限流那样做比较方便,kafka pull 消息的时候是可以指定最多拉多少个的。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1001620

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX