kafka简介

kafka 是一个高性能的实时消息系统,平均消息延迟仅在 10ms 左右(恐怖如斯~)一般我们潜意识把消息队列当成一个缓存堆积数据的地方,然后目标数据系统再慢慢从队列中消费数据处理,实际上是当成非实时离线计算,但是kafka如此出色的性能,完成可以支撑将其当成实时消息系统来使用。
基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等.

基本概念:

Broker : 和AMQP里协议的概念一样, 就是消息中间件所在的服务器
Topic(主题) : 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition(分区) : Partition是物理上的概念,体现在磁盘上面,每个Topic包含一个或多个Partition.
Producer : 负责发布消息到Kafka broker
Consumer : 消息消费者,向Kafka broker读取消息的客户端。
Consumer Group(消费者群组) : 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
offset 偏移量: 是kafka用来确定消息是否被消费过的标识,在kafka内部体现就是一个递增的数字

kafka安装

docker安装kafka 与测试

  1. Docker拉取 zookeeperkafka 的镜像,Kafka依赖zookeeper所以先安装zookeeper
1
2
docker pull zookeeper
docker pull wurstmeister/kafka
  1. 创建网络,让kafka与zookeeper在同一个网络中,可以方便它们之间的通信
1
docker network create --driver bridge zookeeper_network
  1. 启动zookeeper容器,并指定网络
1
2
3
4
5
6
 docker run -d --name zookeeper_kafka  --network zookeeper_network -p 2181:2181   zookeeper

#--name 容器名称
#--network 指定的网络
#-p:设置映射端口(默认2181)
#-d:后台启动
  1. 查看zookeeper的ip地址
1
2
docker inspect zookeeper_kafka
# 查看IPv4address字段,即为zookeeper的ip地址
  1. 进入容器,启动zooleeper服务
1
2
3
4
docker exec -it zookeeper_kafka /bin/bash
## 进入后
cd bin
zkCli.sh
  1. 部署安装kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 # 启动kafka
docker run -d --name kafka
--network zookeeper_network -p 9092:9092
-e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
wurstmeister/kafka

#--network 指定的网络
#-p:设置映射端口(默认2181)
#-d:后台启动
#KAFKA_BROKER_ID=0 kafka的broker的ID号,在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
#KAFKA_ZOOKEEPER_CONNECT 配置zookeeper管理kafka的路径
#KAFKA_ADVERTISED_LISTENERS 把kafka的地址端口注册给zookeeper
#KAFKA_LISTENERS 配置kafka的监听端口
  1. 启动kafka服务与测试
1
2
3
4
5
6
7
8
docker exec -it kafka bash

# 运行kafka生产者发送消息
kafka-console-producer.sh --broker-list localhost:9092 --topic test

#发送消息
>哈哈哈

再开启新的终端,运行kafka消费者消费消息

1
2
3
4
5
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

#收到消息
>哈哈哈

使用docker-compose部署kafka

创建docker-compose.yml文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: '3'
services:
zookeeper:
image: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka #名称
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 #连接zookeeper的地址
- KAFKA_CREATE_TOPICS=test:1:1 #创建topic名称,分区数,副本数
depends_on:
- zookeeper #依赖zookeeper服务

进行执行

1
docker-compose up -d

也可完成搭建

kafka的基本操作

创建主题

在Kafka中,需要先创建主题(topics)才能进行数据的发送和接收。

1
2
3
4
5
6
7
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic demo --partitions 1 --replication-factor 1 
##输出 Created topic demo. 即主题创建成功

#解释:这里我们创建了一个 名为test1的主题,并为其设置了 1 个分区 1 个副本
#–replication-factor 1 #复制1份 如果选1以上,但Kafka集群中只有一个broker可用。因此,Kafka无法创建具有两个副本的分区,因为它没有足够的broker来存放这些副本。
#–partitions 1 #创建1个分区
#–topic #主题名为demo

查看主题

1
kafka-topics.sh --list --bootstrap-server localhost:9092

删除主题

1
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic demo  # 删除demo这个topic

压力测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
kafka-producer-perf-test.sh --topic demo --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=localhost:9092,localhost:9092,localhost:9092 batch.size=4096 linger.ms=0


# --record-size 1024 #消息大小为1024字节这指定了每条消息(或称为“记录”)的大小为1024字节。性能测试将会生成大小为1024字节的消息,并发送给Kafka集群。
# --num-records 1000000:这指定了生产者将要发送的消息总数为1,000,000条。当达到这个数量时,性能测试将会停止。
#--throughput 10000:这指定了期望的吞吐量,单位为“记录/秒”(records-per-second, RPS)。然而,请注意,这只是一个目标值,并不意味着性能测试将严格按照这个速度发送消息。
# --producer-props 参数:这指定了生产者的配置。这里,我们设置了三个相同的服务器作为bootstrap.servers参数的值。这意味着生产者将会尝试连接三个相同的服务器,并将消息发送到其中一个。
# bootstrap.servers 参数:列出了三个相同的 localhost:9092 地址作为引导服务器。这通常意味着您只有一个Kafka broker运行在这个地址上。除非您确实有三个不同的Kafka broker实例运行在同一个端口上(这通常不推荐,因为会导致端口冲突),否则您应该只提供一个broker的地址。
# batch.size 和 linger.ms 参数:您已经设置了batch.size=4096(这意味着生产者会尝试将这么多字节的消息组合成一个批次发送)和linger.ms=0(这意味着生产者不会等待特定的毫秒数来积累更多的消息到批次中,而是立即发送批次,如果当前有可用的消息要发送)。

##反馈信息

49972 records sent, 9994.4 records/sec (9.76 MB/sec), 62.8 ms avg latency, 376.0 ms max latency.
···
1000000 records sent, 9995.102400 records/sec (9.76 MB/sec), 9.94 ms avg latency, 719.00 ms max latency, 1 ms 50th, 10 ms 95th, 364 ms 99th, 699 ms 99.9th.

# 这表示在某一时间段内,生产者已经发送了49972条记录。平均每秒发送了9994.4条记录。平均每秒的吞吐量是9.76 MB/sec。注意这里的“MB”是指兆字节(Megabytes),而不是兆比特(Megabits)。
···
# 输出详细地展示了Kafka生产者的性能测试结果.测试期间总共发送了1,000,000条记录, 每秒平均发送了9995.1024条记录,相当于每秒9.76 MB的吞吐量。平均延迟是9.94毫秒。在测试期间,最大延迟达到了719毫秒.这意味着50%的消息(即中位数)的延迟低于或等于1毫秒.

#1 ms 50th:这意味着50%的消息(即中位数)的延迟低于或等于1毫秒。
#10 ms 95th:95%的消息的延迟低于或等于10毫秒。
#364 ms 99th:99%的消息的延迟低于或等于364毫秒。
#699 ms 99.9th:99.9%的消息的延迟低于或等于699毫秒。

发送和接收消息

Kafka提供了命令行工具来发送和接收消息。

消息收发的大致流程:生产者产生消息数据一>写入到Kafkal的Topic中一>消费者从Topic中读取消息数据

生产者(Producer)创建消息数据。
生产者将消息数据发送到Kafka的消息队列中。Kafka的节点包含一个或多个Broker,消息会存储在这些Broker中的Topic(主题)里面。不同类型的消息数据可以存储在不同的Topic中,这样可以利用Topic实现消息的分类。
在发送消息时,生产者还可以选择一些参数进行配置,例如批处理大小(batch.size)和延迟(linger.ms)等,来优化性能和吞吐量。


返回 php 系列
avatar
懒觉猫先生
欢迎你们的到来!
关注我吧
最新文章
最新评论
正在加载中...
网站资讯
文章数目 :
176
已运行时间 :
本站总字数 :
119.7k
本站访客数 :
本站总访问量 :
最后更新时间 :