一、前言¶
本文主要以下几方面介绍Helm安装kafka集群:
- 两种方式安装kafka集群
- kafka集群测试
- kafka集群扩容
- kafka集群删除
二、安装kafka集群¶
注意:在下面的演示环境中,安装kafka集群前,需要提前安装完成
下面以Chart包和命令行两种方式分别介绍如何安装kafka集群
2.1 Chart包方式安装kafka集群¶
1.查看kafka的Chart包的历史版本
[root@k8s-master01 ~]# helm search repo bitnami/kafka -l
NAME CHART VERSION APP VERSION DESCRIPTION
bitnami/kafka 23.0.1 3.5.0 Apache Kafka is a distributed streaming platfor...
bitnami/kafka 23.0.0 3.5.0 Apache Kafka is a distributed streaming platfor...
bitnami/kafka 22.1.6 3.4.1 Apache Kafka is a distributed streaming platfor...
bitnami/kafka 22.1.5 3.4.1 Apache Kafka is a distributed streaming platfor...
2.下载最新chart包
[root@k8s-master01 ~]# helm pull bitnami/kafka
如果想要下载指定版本,需要指定--version参数
[root@k8s-master01 ~]# helm pull bitnami/kafka --version 23.0.0
3.解压chart包
[root@k8s-master01 ~]# tar -xf kafka-23.0.1.tgz
4.修改values.yaml相应配置
需要修改replicaCount的值为3
$ cd /root/kafka
$ vim values.yaml

根据自己需要修改image
$ cd /root/kafka
$ vim values.yaml
...
...
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: abroad_images/kafka
tag: 3.5.0-debian-11-r1
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: abroad_images/kubectl
tag: 1.25.11-debian-11-r4
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: abroad_images/bitnami-shell
tag: 11-debian-11-r130
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: abroad_images/kafka-exporter
tag: 1.7.0-debian-11-r11
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: abroad_images/jmx-exporter
tag: 0.18.0-debian-11-r34
...
...
如果使用指定的zookeeper,kraft模式要关闭,修改kraft.enable 的值为false,新版kafka新增了一个kraft模式,他与zookeeper是冲突的,不能同时使用

根据自己需要修改持久化配置,这里因为是测试环境没有使用持久化(生产必须要使用持久化)。其中修改内容如下:
- enabled修改为false
- 注释existingClaim
- 注释storageClass
$ cd /root/kafka
$ vim values.yaml

修改externalZookeeper.servers为zookeeper,如果不知道可以通过以下命令查询
[root@k8s-master01 zookeeper]# kubectl get svc -n public-service
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
zookeeper ClusterIP 10.0.43.52 <none> 2181/TCP,2888/TCP,3888/TCP 19m
zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 19m

5.开始安装
[root@k8s-master01 kafka]# helm install -n public-service kafka .
6.观察到kafka集群已经安装完成
[root@k8s-master01 zookeeper]# kubectl get po -n public-service
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 2m46s
kafka-1 1/1 Running 0 2m45s
kafka-2 1/1 Running 0 2m45s
zookeeper-0 1/1 Running 0 33m
zookeeper-1 1/1 Running 0 33m
zookeeper-2 1/1 Running 0 33m
7.查看安装的版本信息
[root@k8s-master01 kafka]# helm list -n public-service
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
kafka public-service 1 2023-07-08 17:37:32.729046912 +0800 CST deployed kafka-23.0.1 3.5.0
zookeeper public-service 1 2023-07-08 17:16:23.567379001 +0800 CST deployed zookeeper-11.4.3 3.8.1
8.验证kafka与zookeeper是否绑定,观察到已成功绑定
[root@k8s-master01 kafka]# kubectl logs -f kafka-0 -n public-service | grep socket

2.2 命令行方式安装kafka集群¶
1.直接安装
$ helm install kafka bitnami/kafka --set zookeeper.enabled=false --set replicaCount=3 --set externalZookeeper.servers=zookeeper --set persistence.enabled=false -n public-service
2.观察到kafka集群已经安装完成
[root@k8s-master01 zookeeper]# kubectl get po -n public-service
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 2m46s
kafka-1 1/1 Running 0 2m45s
kafka-2 1/1 Running 0 2m45s
zookeeper-0 1/1 Running 0 33m
zookeeper-1 1/1 Running 0 33m
zookeeper-2 1/1 Running 0 33m
3.查看安装的版本信息
[root@k8s-master01 kafka]# helm list -n public-service
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
kafka public-service 1 2023-07-08 17:37:32.729046912 +0800 CST deployed kafka-23.0.1 3.5.0
zookeeper public-service 1 2023-07-08 17:16:23.567379001 +0800 CST deployed zookeeper-11.4.3 3.8.1
4.验证kafka与zookeeper是否绑定,观察到已成功绑定
[root@k8s-master01 kafka]# kubectl logs -f kafka-0 -n public-service | grep socket

5.查看安装的values
[root@k8s-master01 zookeeper]# helm get values kafka -n public-service
USER-SUPPLIED VALUES:
externalZookeeper:
servers: zookeeper
persistence:
enabled: false
replicaCount: 3
zookeeper:
enabled: false
三、kafka集群测试¶
这里通过两种方式测试下kafka集群,区别只是一个是新起一个容器进行测试,另一个则是在原来的基础进行测试:
3.1 方式一¶
新起一个容器
1.运行一个kafka-client,用于连接kafka集群
[root@k8s-master01 kafka]# kubectl run kafka-client --restart='Never' --image registry.cn-hangzhou.aliyuncs.com/abroad_images/kafka:3.5.0-debian-11-r1 --namespace public-service --command -- sleep infinity
上面参数说明:
kubectl run kafka-client: 使用kubectl命令创建一个名为kafka-client的 Pod--restart='Never': 设置 Pod 的重启策略为 "Never",这意味着 Pod 不会自动重启--image registry.cn-hangzhou.aliyuncs.com/abroad_images/kafka:3.5.0-debian-11-r1: 指定要在 Pod 中使用的容器镜像。这里使用的是registry.cn-hangzhou.aliyuncs.com/abroad_images/kafka:3.5.0-debian-11-r1镜像--namespace public-service: 指定要在名为public-service的命名空间中创建 Pod--command -- sleep infinity: 在容器中执行命令sleep infinity,以保持 Pod 持续运行。--command表示后面的内容是一个命令而不是一个参数,sleep infinity是一个常用的命令,使得容器无限期地休眠
查看pod,已成功建立
[root@k8s-master01 kafka]# kubectl get po -n public-service
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 163m
kafka-1 1/1 Running 0 163m
kafka-2 1/1 Running 0 163m
kafka-client 1/1 Running 0 52s
zookeeper-0 1/1 Running 0 3h4m
zookeeper-1 1/1 Running 0 3h4m
zookeeper-2 1/1 Running 0 3h4m
2.在k8s-master01节点上开启两个窗口,一个用于生产者,一个用作消费者。 (1)生产者窗口
进入kafka创建一个名为test的topic,出现>代表成功
[root@k8s-master01 kafka]# kubectl exec -it kafka-client -n public-service -- bash
I have no name!@kafka-client:/$ cd /opt/bitnami/kafka/bin
I have no name!@kafka-client:/opt/bitnami/kafka/bin$ kafka-console-producer.sh --broker-list kafka-0.kafka-headless.public-service.svc.cluster.local:9092,kafka-1.kafka-headless.publiservice.svc.cluster.local:9092,kafka-2.kafka-headless.public-service.svc.cluster.local:9092 --topic test
>
上面参数说明:
-
kafka-console-producer.sh:用于创建生产者 -
--broker-list kafka-0.kafka-headless.public-service.svc.cluster.local:9092,kafka-1.kafka-headless.public-service.svc.cluster.local:9092,kafka-2.kafka-headless.public-service.svc.cluster.local:9092:指定要连接的 Kafka Broker 列表。使用逗号分隔多个 Broker 的地址。在这里,指定了三个 Kafka Broker 的地址 --topic test:指定要发布消息的主题名称,这里使用的是 "test"
(2)消费者窗口
[root@k8s-master01 kafka]# kubectl exec -it kafka-client -n public-service -- bash
I have no name!@kafka-client:/$ cd /opt/bitnami/kafka/bin/
I have no name!@kafka-client:/opt/bitnami/kafka/bin$ kafka-console-consumer.sh --bootstrap-server kafka.public-service.svc.cluster.local:9092 --topic test --from-beginning
上面参数说明:
kafka-console-consumer.sh:用于启动消费者--bootstrap-server localhost:9092:指定用于引导连接到 Kafka 集群的 Kafka Broker 的地址。使用的是本地主机(localhost)上的 Kafka Broker,并监听 9092 端口--topic test:指定要发布消息的主题名称,这里使用的是 "test"--from-beginning:设置消费者从主题的开始处开始消费消息。这意味着消费者将从主题中的最早可用消息开始消费
3.开始测试,观察到消费正常
(1)生产者窗口
>test2
>test1
>
(2)消费者窗口
test2
test1
3.2 方式二¶
1.进入kafka创建一个名为testtopic的topic
[root@k8s-master01 kafka]# kubectl exec -it kafka-0 -n public-service -- bash
[root@k8s-master01 kafka]# cd /opt/bitnami/kafka/bin
I have no name!@kafka-0:/opt/bitnami/kafka/bin$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testtopic
Created topic testtopic.
上面参数说明:
--create:指示kafka-topics.sh命令创建一个新的主题kafka-topics.sh:用于创建topic--bootstrap-server localhost:9092:指定用于引导连接到 Kafka 集群的 Kafka Broker 的地址。使用的是本地主机(localhost)上的 Kafka Broker,并监听 9092 端口--replication-factor 1:设置主题的副本因子(replication factor),指定每个分区的副本数量。--partitions 1:设置主题的分区数,指定要创建的分区数量--topic testtopic:指定要创建的主题的名称,这里使用的是 "testtopic"
2.启动消费者
I have no name!@kafka-0:/opt/bitnami/kafka/bin$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic
上面参数说明:
kafka-console-consumer.sh:用于创建消费者--bootstrap-server localhost:9092:指定用于引导连接到 Kafka 集群的 Kafka Broker 的地址。使用的是本地主机(localhost)上的 Kafka Broker,并监听 9092 端口
3.新起一个窗口后,进入kafka,启动一个生产者后,输出hello字段
[root@k8s-master01 kafka]# kubectl exec -it kafka-0 -n public-service -- bash
I have no name!@kafka-0:/$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testtopic
>hello
上面参数说明:
kafka-console-consumer.sh:用于创建生产者--bootstrap-server localhost:9092:指定用于引导连接到 Kafka 集群的 Kafka Broker 的地址。使用的是本地主机(localhost)上的 Kafka Broker,并监听 9092 端口
4.在消费者窗口上进行查看,观察到消费正常
I have no name!@kafka-0:/opt/bitnami/kafka/bin$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic
hello
四、kafka集群扩容¶
关于kafka集群扩容,这里介绍两种方式:一种是修改副本数进行扩容,另一种是使用helm upgrade进行扩容
4.1 方式一¶
1.修改values.yaml相应配置,搜索replicaCount,将副本数修改为5
[root@k8s-master01 ~]# cd /root/kafka
[root@k8s-master01 kafka]# vim values.yaml

2.开始扩容
[root@k8s-master01 ~]# cd /root/kafka
[root@k8s-master01 kafka]# helm upgrade -n public-service kafka .
3.查看pod建立情况,观察到已经成功扩容
[root@k8s-master01 kafka]# kubectl get po -n public-service
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 3h15m
kafka-1 1/1 Running 0 3h15m
kafka-2 1/1 Running 0 3h15m
kafka-3 1/1 Running 0 82s
kafka-4 1/1 Running 0 82s
kafka-client 1/1 Running 0 32m
zookeeper-0 1/1 Running 0 3h36m
zookeeper-1 1/1 Running 0 3h36m
zookeeper-2 1/1 Running 0 3h36m
4.2 方式二¶
其实这种方式只针对命令行方式安装kafka集群
1.直接使用helm upgrade命令进行扩容
$ helm upgrade kafka bitnami/kafka --set zookeeper.enabled=false --set replicaCount=3 --set externalZookeeper.servers=zookeeper --set persistence.enabled=false -n public-service
2.查看pod建立情况,观察到已经成功扩容
[root@k8s-master01 kafka]# kubectl get po -n public-service
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 3h15m
kafka-1 1/1 Running 0 3h15m
kafka-2 1/1 Running 0 3h15m
kafka-3 1/1 Running 0 82s
kafka-4 1/1 Running 0 82s
kafka-client 1/1 Running 0 32m
zookeeper-0 1/1 Running 0 3h36m
zookeeper-1 1/1 Running 0 3h36m
zookeeper-2 1/1 Running 0 3h36m
五、kafka集群删除¶
1.查看安装的集群
[root@k8s-master01 kafka]# helm list -A
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
kafka public-service 2 2023-07-08 20:51:17.114862828 +0800 CST deployed kafka-23.0.1 3.5.0
zookeeper public-service 1 2023-07-08 17:16:23.567379001 +0800 CST deployed zookeeper-11.4.3 3.8.1
2.删除kafka集群
[root@k8s-master01 kafka]# helm delete kafka -n public-service