介绍
介绍
上一课大家跟福哥学会了在我们的TFLinux系统上面安装Kafka软件,今天福哥要带着大家学习使用PHP去操作Kafka消息系统的方法。
PHP操作Kafka需要借助rdkafka库,我们可以在github上面下载到源代码进行安装。因为rdkafka是作为PHP扩展部署的,所以我们不需要重新编译PHP环境。
安装
下载
下载librdkafka
wget https://github.com/edenhill/librdkafka/archive/v0.11.0.tar.gz
下载php-rdkafka
wget https://github.com/arnaud-lb/php-rdkafka/archive/3.0.4.tar.gz
安装
安装librdkafka
tar -xzvf v0.11.0.tar.gz cd librdkafka-0.11.0/ ./configure --prefix=/tongfu.net/env/librdkafka/ make make install cd ..
安装php-rdkafka
tar -xzvf 3.0.4.tar.gz cd php-rdkafka-3.0.4/ /tongfu.net/env/php-7.4.6/bin/phpize ./configure \ --with-php-config=/tongfu.net/env/php-7.4.6/bin/php-config \ --with-rdkafka=/tongfu.net/env/librdkafka/ make make install cd ..
配置
配置
配置php.ini
extension=rdkafka
重启
重新启动apache
systemctl restart httpd
使用
低级消费者
概念
低级消费者模式就是一个人接了一个项目,所有事情都要自己一点点做。如果当天没有做完,我会记录做到第几件了,然后第二天来了继续做下面的工作。
代码
低级消费者示例代码
当 $message 为 null 或者 $message->err 为到底了,都表示没有新消息
强烈建议本地保存 offset(偏移),我做了多少我自己记着,放心!
$groupID = "lowLevel"; $topicName = "test"; $partitionID = 0; $conf = new RdKafka\Conf(); $conf->setErrorCb(function($kafka, $err, $reason){ var_dump($err); var_dump($reason); }); $conf->set('group.id', $groupID); $consumer = new RdKafka\Consumer($conf); $consumer->addBrokers("server_kafka:9092"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.path', __DIR__); $topicConf->set('auto.commit.enable', 1); $topicConf->set('auto.commit.interval.ms', 10); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $consumer->newTopic($topicName, $topicConf); $topic->consumeStart($partitionID, RD_KAFKA_OFFSET_STORED); var_dump("启动中..."); while (1) { // try consumer record $message = $topic->consume($partitionID, 120*1000); if($message == null){ continue; } switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message->payload); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: var_dump("没有更多消息了"); break; case RD_KAFKA_RESP_ERR__TIMED_OUT: var_dump("太长时间未收到消息了"); break; default: throw new Exception($message->errstr(), $message->err); break; } }
效果
生产一个消息
消费一个消息
高级消费者
概念
高级消费者模式就是团队做项目,要做的事情列出来了,你接一个,我接一个,各自做完手头的工作后再去接下一个。分配工作的是 Kafka 的 Rebalance 来控制,逻辑也非常简单。
它先把项目需要做的事情分开多个袋子里,然后每个袋子上写一个编号。当员工来接活的时候,会先看看有没有没人负责的袋子,如果有就告诉员工你的编号是这个,这个袋子里的事情都是你的。如果全部袋子都有人负责了,那就告诉员工暂时没事干,你先等等吧。
注意:消费者能不能拿到消息,完全看是不是可以得到分配到分区,而往往是这个地方会需要等很久~~
代码
高级消费者示例代码
$groupID = "highLevel"; $topicName = "test"; $conf = new RdKafka\Conf(); $conf->setErrorCb(function($kafka, $err, $reason){ var_dump($err); var_dump($reason); }); $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: foreach($partitions as $partition){ var_dump("指定分区:". $partition->getPartition()); } $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: foreach($partitions as $partition){ var_dump("删除分区:". $partition->getPartition()); } $kafka->assign(NULL); break; default: throw new \Exception($err); } }); $conf->set('group.id', $groupID); $conf->set('metadata.broker.list', "127.0.0.1:9092"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.path', __DIR__); $topicConf->set('auto.commit.enable', 1); $topicConf->set('auto.commit.interval.ms', 10); $topicConf->set('auto.offset.reset', 'smallest'); $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); $consumer->subscribe(array($topicName)); var_dump("启动中..."); while (1) { // try consumer record $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message->payload); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: var_dump("没有更多消息了"); break; case RD_KAFKA_RESP_ERR__TIMED_OUT: var_dump("太长时间未收到消息了"); break; default: throw new Exception($message->errstr(), $message->err); break; } }
效果
生产一个消息
消费一个消息
生产者
代码
生产者示例代码
使用 setErrorCb 回调检查程序是否出错
使用 setDrMsgCb 回调检查消息是否成功推送
设置 message.timeout 避免 produce 阻塞
$conf = new RdKafka\Conf(); $conf->setErrorCb(function($kafka, $err, $reason){ var_dump($err); var_dump($reason); }); $conf->setDrMsgCb(function($kafka, $message){ if($message->err) var_dump("发送失败:"); else var_dump("发送成功:"); var_dump($message); }); $rk = new RdKafka\Producer($conf); $rk->addBrokers("server_kafka:9092"); $topicConf = new RdKafka\TopicConf(); $topicConf->set("message.timeout.ms", 3000); $topic = $rk->newTopic("test", $topicConf); $topic->produce(RD_KAFKA_PARTITION_UA, 0, "福哥说:现在". date("H:i:s"). "了");
官方在线文档
Rdkafka扩展提供了在线文档,可以帮助我们编写代码的时候查阅
http://arnaud.le-blanc.net/php-rdkafka/phpdoc/book.rdkafka.html
总结
今天福哥带着童鞋们学习了使用PHP语言通过rdkafka扩展连接操作Kafka消息系统的方法。PHP本身是弱类型语言,又是解析型语言,在处理复杂的业务逻辑时候难免会有些力不从心。借助Kafka消息系统可以将一些同步要求不高的处理放到消息队列里面,可以大大提高主线业务的处理效率。
好了,下一课福哥会给大家讲解如何使用Java语言来操作Kafka消息系统,敬请期待~~