如何在消费完所有消息后关闭Kafka消费者?

9 浏览
0 Comments

如何在消费完所有消息后关闭Kafka消费者?

我有以下程序来消费传入Kafka的所有消息。

from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
consumer.close()

使用以上程序,我能够消费传入Kafka的所有消息。但是一旦所有消息都被消费完,我想关闭Kafka消费者,但这并没有发生。我需要帮助解决这个问题。

0