准备写个基于 kafka 的延迟队列, 有感兴趣的吗

2019-05-29 17:53:27 +08:00
 petelin

解决的问题: kafka 不支持延迟队列

如何解决: 如果是延迟小时, push 之前先放到 redis 里, 然后 work 通过 lua 轮训拿到需要真的 push 到队列里的请求, 然后 push 到 kafka 里.

整个功能其实和 Python 的 celery 或者 Go 的 machinery 很像.但是前者需要单独部署项目太复杂, 后者不支持 kafka.

有搞头吗?

10266 次点击
所在节点    Kafka
21 条回复
bthulu
2021-03-29 20:35:34 +08:00
可以做固定时间点的延时重发.
比如说, 1 秒, 10 秒, 30 秒, 1 分钟, 5 分钟, 1 小时, 8 小时, 24 小时延时等.
针对每个延时时间创建一个队列, 生产者按延时需求将数据(数据里包一层最终要去的队列名)发送到对应队列.
然后每个队列起一个消费者, 轮询数据, 到点发送到目标队列即可.
```
headers.put('finalTopic', topic);
producer.send(new ProducerRecord(delayedTopic, key, value, headers));
```

```
// 60 秒延时队列
int delay = 60_000;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
long timeLeft = record.timestamp() + delay - System.currentTimeMillis();
if (timeLeft > 0) {
Thread.sleep(timeLeft);
}
var topic = record.headers.lastHeader('finalTopic')
record.headers.remove('finalTopic');
producer.send(new ProducerRecord(topic, key, value, headers));
}
}
```

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/568856

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX