如何在消费完所有消息后关闭Kafka消费者?
- 论坛
- 如何在消费完所有消息后关闭Kafka消费者?
9 浏览
如何在消费完所有消息后关闭Kafka消费者?
我有以下程序来消费传入Kafka的所有消息。
from kafka import KafkaConsumer consumer = KafkaConsumer('my_test_topic', group_id='my-group', bootstrap_servers=['my_kafka:9092']) for message in consumer: consumer.commit() print ("%s key=%s value=%s" % (message.topic,message.key, message.value)) consumer.close()
使用以上程序,我能够消费传入Kafka的所有消息。但是一旦所有消息都被消费完,我想关闭Kafka消费者,但这并没有发生。我需要帮助解决这个问题。