同福

Python操作Kafka消息系统的方法【20210423】

介绍

介绍

上一课大家跟福哥学会了在我们的TFLinux系统上面安装Kafka软件,今天福哥要带着大家学习使用Python去操作Kafka消息系统的方法。

Python操作Kafka可以使用pykafka这个库来实现,看这个命名是不是很眼熟?对了,类似的库名还有一个就是用来操作MySQL数据库的pymysql。pykafka库的语法和pymysql很像,不过福哥觉得Kafka和MySQL之间没有什么可比性,所以福哥选择了另外一个库。

福哥是用kafka-python这个库来操作Kafka的,这个库的设计和我们之前学习的PHP语言的rdkafka以及Java语言的KafkaConsumer/KafkaProducer对象很相似,学习起来更加舒服。

安装

直接通过pip安装kafka-python库就可以。

pip install kafka-python

使用

消费者

概念

低级消费者模式就是一个人接了一个项目,所有事情都要自己一点点做。如果当天没有做完,我会记录做到第几件了,然后第二天来了继续做下面的工作。

代码

低级消费者示例代码,Python作为控制台程序运行消费者真是太合适了。

from kafka import KafkaConsumer

topic = "test"
group_id = "lowLevel"
bootstrap_servers = "192.168.2.168:9092"
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id, auto_offset_reset="latest")
consumer.subscribe([topic])
while True:
    msg = consumer.poll(timeout_ms=12000)
    for records in msg.values():
        for i in range(0, len(records)):
            record = records[i]
            print(record.value.decode("utf-8"))

效果

home/topic/2021/0424/10/e361f08a9e3b22c19ac0592e2fcb0a73.jpg

生产者

代码

生产者示例代码。

from kafka import KafkaProducer
import time

topic = "test"
bootstrap_servers = "192.168.2.168:9092"
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
msg = "福哥说:现在是" + time.strftime("%H:%M:%S", time.localtime())
print(msg)
producer.send(topic, msg)

踩坑

消费者

阻塞进程

编写消费者代码的时候,福哥特别不建议使用如下两种方式,因为它会阻塞进程,按Ctrl+C都退不出来。

for方式

for msg in consumer:
    for records in msg.values():
        for i in range(0, len(records)):
            record = records[i]
            print(record.value.decode("utf-8"))

next方式

while True:
    msg = next(consumer)
    for records in msg.values():
        for i in range(0, len(records)):
            record = records[i]
            print(record.value.decode("utf-8"))

官方文档

消费者

这是官方的消费者文档

KafkaConsumer — kafka-python 2.0.2-dev documentation

生产者

这是官方的生产者文档

KafkaProducer — kafka-python 2.0.2-dev documentation

总结

今天福哥带着童鞋们学习了使用Python语言连接操作Kafka消息系统的方法,Python操作Kafka有pykafka和kafka-python两种方式,大家有空可以自行学习一下pykafka库的使用方法,然后对比一下,看看是不是和福哥有着一样的观点。