1. 什么是Kafka
Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish(发布)或者subscribe(订阅)消息,分布式提供了容错性,并发处理消息的机制。
kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键值(key),值(value)和时间戳(timestamp)。
2. kafka基本概念
producer: 消息生产者,就是向kafka broker发消息的客户端。
consumer: 消息消费者,是消息的使用方,从Kafka Broker 拉取消息,负责消费Kafka服务器上的消息。
topic: 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。你可以把它理解为一个队列,topic 将消息分类,生产者和消费者面向的是同一个 topic。
partition:消息分区,一个topic可以分为多个 partition,partition是相对于topic是在在物理上的概念,每个partition是一个有序的队列,partition中的每条消息都会被分配一个有序的id(offset)。
broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
consumer-group:消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
消费者组内每个消费者,负责消费不同分区的数据,提高消费能力。一个分区只能由组内一个消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
offset :消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。记录消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉,再重新恢复的时候,可以从消费位置继续消费。
Zookeeper: Kafka 集群能够正常工作,需要依赖于 zookeeper,zookeeper 帮助Kafka 存储和管理集群信息。
3.安装kafka
3.1 安装JAVA JDK
打开https://www.oracle.com/cn/java/technologies/downloads/#jdk20-windows,选择Windows 64位安装包下载
安装好之后,将jdk路径添加到系统环境变量中(根据自己的安装路径来)
JAVA_HOME=D:\Program Files\Java\jdk-20
Path新增一条(根据自己的安装路径来)
D:\Program Files\Java\jdk-20\bin
3.2安装kafka
打开https://kafka.apache.org/downloads,(Windows环境)选择二进制版本安装
将文件解压到任意目录,我这里是D:\Software\kafka_2.12-3.4.0
打开D:\Software\kafka_2.12-3.4.0\bin\config\server.properties
将这两行解除注释,并修改成本地配置
listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://localhost:9092
如需修改Zookeeper端口(默认端口2181),打开D:\Software\kafka_2.12-3.4.0\bin\config\zookeeper.properties
dataDir=/tmp/zookeeper clientPort=2181 # 这里修改成你想要的端口即可 maxClientCnxns=0 admin.enableServer=false
4.启动kafka和zookeeper
进入存放卡夫卡的目录,我这里是在D:\Software\kafka_2.12-3.4.0\bin\windows
在这个路径下打开两个cmd窗口,分别执行以下两条命令:
zookeeper-server-start.bat ..\..\config\zookeeper.properties kafka-server-start.bat ..\..\config\server.properties
运行生产者和消费者demo
生产者.py
from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers=['localhost:9092']) # 生成一条消息并将其序列化为json格式 message = {'foo': 'bar'} serialized_message = json.dumps(message).encode('utf-8') # 发送消息到Kafka集群上名为“test_topic”的主题 producer.send('test_topic', value=serialized_message) # 关闭生产者实例 producer.close()
消费者.py
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'test_topic', # 消费的主题名称 group_id='my-group', # 消费者组ID bootstrap_servers=['localhost:9092'] # Kafka集群地址 ) # 循环获取消息并处理它们 for msg in consumer: # 解序列化json格式的消息 deserialized_message = json.loads(msg.value.decode('utf-8')) # 处理消息 print(deserialized_message) # 手动提交偏移量以确保不会重复消费相同的消息 consumer.commit() # 关闭消费者实例 consumer.close()
-End-