在Windows环境下安装运行Kafka,并用python实现生产者和消费者端小demo

1. 什么是Kafka

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish(发布)或者subscribe(订阅)消息,分布式提供了容错性,并发处理消息的机制。

kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键值(key),值(value)和时间戳(timestamp)。

2. kafka基本概念

producer: 消息生产者,就是向kafka broker发消息的客户端。
consumer: 消息消费者,是消息的使用方,从Kafka Broker 拉取消息,负责消费Kafka服务器上的消息。
topic: 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。你可以把它理解为一个队列,topic 将消息分类,生产者和消费者面向的是同一个 topic。
partition:消息分区,一个topic可以分为多个 partition,partition是相对于topic是在在物理上的概念,每个partition是一个有序的队列,partition中的每条消息都会被分配一个有序的id(offset)。
broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
consumer-group:消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
消费者组内每个消费者,负责消费不同分区的数据,提高消费能力。一个分区只能由组内一个消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
offset :消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。记录消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉,再重新恢复的时候,可以从消费位置继续消费。
Zookeeper: Kafka 集群能够正常工作,需要依赖于 zookeeper,zookeeper 帮助Kafka 存储和管理集群信息。

3.安装kafka

3.1 安装JAVA JDK

打开https://www.oracle.com/cn/java/technologies/downloads/#jdk20-windows,选择Windows 64位安装包下载

安装好之后,将jdk路径添加到系统环境变量中(根据自己的安装路径来)

JAVA_HOME=D:\Program Files\Java\jdk-20

Path新增一条(根据自己的安装路径来)

D:\Program Files\Java\jdk-20\bin

3.2安装kafka

打开https://kafka.apache.org/downloads,(Windows环境)选择二进制版本安装

将文件解压到任意目录,我这里是D:\Software\kafka_2.12-3.4.0
打开D:\Software\kafka_2.12-3.4.0\bin\config\server.properties
将这两行解除注释,并修改成本地配置

listeners=PLAINTEXT://:9092

advertised.listeners=PLAINTEXT://localhost:9092

如需修改Zookeeper端口(默认端口2181),打开D:\Software\kafka_2.12-3.4.0\bin\config\zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181 # 这里修改成你想要的端口即可
maxClientCnxns=0
admin.enableServer=false

4.启动kafka和zookeeper

进入存放卡夫卡的目录,我这里是在D:\Software\kafka_2.12-3.4.0\bin\windows
在这个路径下打开两个cmd窗口,分别执行以下两条命令:

zookeeper-server-start.bat ..\..\config\zookeeper.properties
kafka-server-start.bat ..\..\config\server.properties

成功启动两个服务后:

运行生产者和消费者demo

生产者.py

from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# 生成一条消息并将其序列化为json格式
message = {'foo': 'bar'}
serialized_message = json.dumps(message).encode('utf-8')

# 发送消息到Kafka集群上名为“test_topic”的主题
producer.send('test_topic', value=serialized_message)

# 关闭生产者实例
producer.close()

消费者.py

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'test_topic',  # 消费的主题名称
    group_id='my-group',  # 消费者组ID
    bootstrap_servers=['localhost:9092']  # Kafka集群地址
)

# 循环获取消息并处理它们
for msg in consumer:
    # 解序列化json格式的消息
    deserialized_message = json.loads(msg.value.decode('utf-8'))

    # 处理消息
    print(deserialized_message)

    # 手动提交偏移量以确保不会重复消费相同的消息
    consumer.commit()

# 关闭消费者实例
consumer.close()

-End-

风影OvO

风影OvO, 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA 4.0协议进行授权 | 转载请注明原文链接

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐