kafka 重启后, kafka- Python 消费者需要重启才能订阅 topic

2018-03-08 22:22:14 +08:00
 qi1070445109

最近项目用到 kafka-python,在试验把 kafka broker 停止后,生产者和消费者都没有出现异常。

重启后生产者继续生产,但是消费者不能消费那个 topic 了。

想知道有什么办法,让消费者继续消费,或者让它能抛出异常捕捉后重新订阅一下 topic ?

4764 次点击
所在节点    Kafka
2 条回复
qi1070445109
2018-03-08 23:43:31 +08:00
补充一下我用的 kafka-python 1.4.1 代码如下:

#consumer.py
from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# producer.py

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['broker1:1234'])


for _ in range(100000):
producer.send('my-topic', b'msg'
qi1070445109
2018-03-08 23:44:46 +08:00
抱歉,没整好格式。

#consumer.py
from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# producer.py

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['broker1:1234'])


for _ in range(100000):
producer.send('my-topic', b'msg'

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/436316

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX