1. 首先要安装好zookeeperkafka。同时启动了zookeeper的客户端zkCli.sh

  2. 确保php 环境版本大于5.5

  3. 使用 Composer 安装

1
composer require nmred/kafka-php

Kafka-php 使用纯粹的PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 Kafka PHP v0.1.x Document, 不过建议切换到 v0.2.x 上。v0.2.x 使用 PHP 异步执行的方式来和kafka broker 交互,较 v0.1.x 更加稳定高效, 由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本

  1. 就可开始初步使用
    nmred/kafka-php 文档

生产者端 Produce (producer.php),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?php
require 'vendor/autoload.php';
date_default_timezone_set('PRC');

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);

# 替换为你的 Kafka broker 地址和端口
$config->setMetadataBrokerList('172.21.0.4:9092');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);

## 异步回调方式调用
$data = [
[
'topic' => 'demo-topic',
'value' => json_encode(['a','b']),
'key' => 'testkey',
]
];
$producer = new \Kafka\Producer(function() use ($data) {
return $data;
});

$producer->success(function($result) {
var_dump($result);
});
$producer->error(function($errorCode) {
var_dump($errorCode);
});
$producer->send(true);


## 同步方式调用生产者
// $producer = new \Kafka\Producer();
// $result = $producer->send([
// [
// 'topic' => 'demo',
// 'value' => '22222',
// 'key' => 'testkey',
// ]
// ]);
// var_dump($result);

消费者端 Consumer (consumer.php)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
require 'vendor/autoload.php';
date_default_timezone_set('PRC');

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
// 替换为你的 Kafka broker 地址和端口
$config->setMetadataBrokerList('172.21.0.4:9092');
// 消费者组 ID,确保它是唯一的或者与你的用例相匹配
$config->setGroupId('demo-group');
// 替换为你的 Kafka topic 名称
$config->setTopics(['demo-topic']);

$consumer = new \Kafka\Consumer();

$consumer->start(function($topic, $part, $message) {
var_dump($message);
});

浏览器域名访问消费端通常是不行的,因为浏览器不直接支持 Kafka 客户端库,并且浏览器环境也不适合长时间运行的进程(如 Kafka Consumer)。浏览器主要用于展示用户界面和与用户进行交互,而不是用于后台任务或消息处理。

  1. 运行测试
    需要在前端(浏览器)中显示或处理 Kafka 消息,你可以采取以下策略:
  • 通过 API 暴露数据:在服务器端运行一个 API 服务(如 RESTful API),该服务使用 Kafka Consumer 接收消息,并将消息数据作为 API 响应提供给前端。前端可以通过 AJAX 请求或其他方式与 API 进行通信,以获取所需的 Kafka 消息数据。
  • WebSocket:使用 WebSocket 技术建立前端与后端之间的持久连接。后端可以运行一个 WebSocket 服务器,该服务器使用 Kafka Consumer 接收消息,并通过 WebSocket 连接将消息推送到前端。这样,前端可以实时接收并显示 Kafka 消息。

如:http://127.0.0.1/producer.php

要在服务器端运行 PHP Kafka Consumer,你可以采取以下步骤:

  • 安装 Kafka PHP 客户端库:选择并安装一个支持 Kafka 的 PHP 客户端库,如 nmred/kafka-php 或其他类似的库。
  • 编写 PHP Consumer 脚本:使用所选的 Kafka PHP 客户端库编写一个 PHP 脚本,该脚本包含一个 Kafka Consumer,用于连接到 Kafka 集群并订阅感兴趣的主题。
  • 运行 PHP Consumer:在服务器端运行你的 PHP Consumer 脚本。这可以通过命令行、守护进程、容器或其他服务器管理工具来完成。确保 Consumer 脚本在后台持续运行,以便能够接收 Kafka 消息。
  • 处理接收到的消息:在 PHP Consumer 脚本中编写逻辑来处理接收到的 Kafka 消息。你可以将消息保存到数据库、触发其他服务或执行其他所需的操作。
  • 安全性考虑:确保你的 Kafka 集群和 PHP Consumer 脚本的安全性。使用适当的身份验证和授权机制来保护对 Kafka 的访问,并遵循最佳的安全实践来保护你的服务器和应用程序

如:在到消费端文件,执行 php consumer.php 即可.

消费端是持久化的,当生产端发送了消息后,消费端立即接收到消息。

Kafka在PHP运用场景

Kafka在PHP中有多种运用场景,这主要得益于Kafka作为一个高吞吐的分布式消息系统,能够解耦数据处理、缓存未处理消息等。以下是Kafka在PHP中的一些主要运用场景:

  1. 日志收集与监控:Kafka常被用于收集和分析系统日志、应用程序日志等。在PHP应用中,可以使用Kafka将日志消息发送到Kafka集群,然后利用Kafka的分布式处理能力,对这些日志进行实时分析或离线处理。
  2. 实时数据流处理:对于需要实时处理的数据流,如实时推荐系统、在线广告系统等,Kafka可以作为数据流的处理平台。PHP应用可以将实时数据发送到Kafka,然后由Kafka的Consumer进行实时处理和分析。
  3. 事件驱动的应用:在事件驱动的应用中,Kafka可以作为事件总线,负责在不同服务或组件之间传递事件消息。PHP应用可以将事件发送到Kafka,然后由其他服务或组件订阅这些事件并进行相应的处理。
  4. 分布式系统间的通信:在分布式系统中,Kafka可以作为不同系统或服务之间的通信桥梁。PHP应用可以将消息发送到Kafka,然后由其他系统或服务从Kafka中读取消息并进行处理。这种方式可以实现不同系统或服务之间的解耦和异步通信。
  5. 构建实时数据管道:Kafka可以与其他大数据工具(如Hadoop、Spark等)结合使用,构建实时数据管道。PHP应用可以将数据发送到Kafka,然后由Kafka将数据传递给其他大数据工具进行实时处理和分析。

对于在PHP中使用Kafka,有一些开源的Kafka客户端库可供选择,如nmred/kafka-phpPHP-RDKafka等。这些库提供了与Kafka集群进行交互的接口,使得PHP应用能够轻松地发送和接收Kafka消息。

消息队列的应用场景

消息队列主要有以下应用场景:

• 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,可以减少处理时间,如短信通知、文件下载、订单推送等。

• 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败,如人脸识别,文字翻译等不能做到业务系统里面,业务系统负责生产消息,人脸识别系统消费消息

• 限流削峰:广泛应用于秒杀或抢购活动中,使用消息队列避免流量过大导致应用系统挂掉的情况,1. 网关在接受到请求后,就把请求放入到消息队列里面 2.后端的服务从消息队列里面获取到请求,完成后续的秒杀处理流程。然后再给用户返回结果。优点:控制了流量 缺点:会让流程变慢

• 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;如大量的物联网系统中,平台和硬件端的通讯需要消息队列

• 日志聚合:- 可以使用 Kafka 收集来自不同系统的日志并将其存储在集中式系统中以供进一步处理。

什么是发布/订阅模式

发布/订阅模式下包括三个角色:

• 角色主题(Topic)

• 发布者(Publisher)

• 订阅者(Subscriber)


返回 php 系列
avatar
懒觉猫先生
欢迎你们的到来!
关注我吧
最新文章
最新评论
正在加载中...
网站资讯
文章数目 :
176
已运行时间 :
本站总字数 :
119.7k
本站访客数 :
本站总访问量 :
最后更新时间 :