kafka简介 kafka 是一个高性能的实时消息系统,平均消息延迟仅在 10ms 左右(恐怖如斯~)一般我们潜意识把消息队列当成一个缓存堆积数据的地方,然后目标数据系统再慢慢从队列中消费数据处理,实际上是当成非实时离线计算,但是kafka如此出色的性能,完成可以支撑将其当成实时消息系统来使用。 基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等.
基本概念:
Broker : 和AMQP里协议的概念一样, 就是消息中间件所在的服务器 Topic(主题) : 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处) Partition(分区) : Partition是物理上的概念,体现在磁盘上面,每个Topic包含一个或多个Partition. Producer : 负责发布消息到Kafka broker Consumer : 消息消费者,向Kafka broker读取消息的客户端。 Consumer Group(消费者群组) : 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。 offset 偏移量: 是kafka用来确定消息是否被消费过的标识,在kafka内部体现就是一个递增的数字
kafka安装 docker安装kafka 与测试
Docker拉取 zookeeper
与 kafka
的镜像,Kafka依赖zookeeper所以先安装zookeeper
1 2 docker pull zookeeper docker pull wurstmeister/kafka
创建网络,让kafka与zookeeper在同一个网络中,可以方便它们之间的通信
1 docker network create --driver bridge zookeeper_network
启动zookeeper容器,并指定网络
1 2 3 4 5 6 docker run -d --name zookeeper_kafka --network zookeeper_network -p 2181:2181 zookeeper
查看zookeeper的ip地址
1 2 docker inspect zookeeper_kafka
进入容器,启动zooleeper服务
1 2 3 4 docker exec -it zookeeper_kafka /bin/bash cd binzkCli.sh
部署安装kafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 docker run -d --name kafka --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
启动kafka服务与测试
1 2 3 4 5 6 7 8 docker exec -it kafka bash kafka-console-producer.sh --broker-list localhost:9092 --topic test >哈哈哈
再开启新的终端,运行kafka消费者消费消息
1 2 3 4 5 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning >哈哈哈
使用docker-compose部署kafka 创建docker-compose.yml文件,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 version: '3' services: zookeeper: image: zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: - KAFKA_ADVERTISED_HOST_NAME=kafka - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CREATE_TOPICS=test :1:1 depends_on: - zookeeper
进行执行
也可完成搭建
kafka的基本操作 创建主题 在Kafka中,需要先创建主题(topics)才能进行数据的发送和接收。
1 2 3 4 5 6 7 kafka-topics.sh --bootstrap-server localhost:9092 --create --topic demo --partitions 1 --replication-factor 1
查看主题 1 kafka-topics.sh --list --bootstrap-server localhost:9092
删除主题 1 kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic demo
压力测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 kafka-producer-perf-test.sh --topic demo --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=localhost:9092,localhost:9092,localhost:9092 batch.size=4096 linger.ms=0 49972 records sent, 9994.4 records/sec (9.76 MB/sec), 62.8 ms avg latency, 376.0 ms max latency. ··· 1000000 records sent, 9995.102400 records/sec (9.76 MB/sec), 9.94 ms avg latency, 719.00 ms max latency, 1 ms 50th, 10 ms 95th, 364 ms 99th, 699 ms 99.9th. ···
发送和接收消息 Kafka提供了命令行工具来发送和接收消息。
消息收发的大致流程:生产者产生消息数据
一>写入到Kafkal的Topic中
一>消费者从Topic中读取消息数据
。
生产者(Producer)创建消息数据。 生产者将消息数据发送到Kafka的消息队列中。Kafka的节点包含一个或多个Broker,消息会存储在这些Broker中的Topic(主题)里面。不同类型的消息数据可以存储在不同的Topic中,这样可以利用Topic实现消息的分类。 在发送消息时,生产者还可以选择一些参数进行配置,例如批处理大小(batch.size)和延迟(linger.ms)等,来优化性能和吞吐量。
返回 php 系列