一、Kafka 部署

1.1 Kafka 部署说明

kafka 版本选择

kafka 基于scala语言实现,所以使用kafka需要指定scala的相应的版本.kafka 为多个版本的Scala构建。这仅在使用 Scala 时才重要,并且希望为使用的相同 Scala 版本构建一个版本。否则,任何版本都可以

kafka下载链接

http://kafka.apache.org/downloads

kafka版本格式

kafka_<scala 版本>_<kafka 版本>
#示例:
kafka_2.13-2.7.0.tgz
kafka_2.12-3.9.0.tgz

image-20260406173828046

scala 语言官网: https://www.scala-lang.org/

scale 与 java关系:https://baike.baidu.com/item/Scala/2462287?fr=aladdin

Kafka 支持单机和集群部署,生产通常为集群模式

官方文档:

http://kafka.apache.org/quickstart

image-20260406173858087

1.2 单机部署

1.2.1 安装说明

注意:Kafka提供的脚本不支持创建软链接到/usr/local/bin/路径,只支持绝对路径或相对径执行

Download the latest Kafka release and extract it:

$ apt update && apt -y install openjdk-8-jdk 
$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0

Apache Kafka can be started using ZooKeeper or KRaft. To get started with either configuration follow one the sections below but not both.

Kafka with ZooKeeper

Kafka-v4.0开始即将不再支持Zookeeper

image-20260406173948248

Run the following commands in order to start all services in the correct order:

# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties

Open another terminal session and run:

#默认kafka配置文件指定zookeeper服务器地址信息可选
[root@ubuntu2404 ~]#grep zookeeper /usr/local/kafka/config/server.properties

# Zookeeper connection string (see zookeeper docs for details).
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

#启动单机kafka
$ bin/kafka-server-start.sh config/server.properties

1.2.2 基于 zookeeper 部署 kafka单机

注意:Kafka-4.0.0 版本不再支持Zookeeper

范例:基于 zookeeper 部署 kafka单机

#注意主机名解析,默认会将当前主机名称解析IP进行访问可能会解析到错误的IP
#所以需要通过DNS和hosts文件解析当前主机名到当前IP
[root@ubuntu2404 ~]#hostname
ubuntu2404.wang.org
[root@ubuntu2404 ~]#vim /etc/hosts
10.0.0.201 ubuntu2404.wang.org

#支持多个JAVA版本
[root@ubuntu2404 ~]#apt update && apt install -y openjdk-21-jdk
[root@ubuntu2404 ~]#apt update && apt install -y openjdk-8-jdk

#下载Kafka 3.x版本
[root@ubuntu2404 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.9.0/kafka_2.13-3.9.1.tgz
[root@ubuntu2404 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.9.0/kafka_2.13-3.9.0.tgz
[root@ubuntu2404 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.8.0/kafka_2.13-3.8.0.tgz

[root@ubuntu2404 ~]#tar xf kafka_2.13-3.8.0.tgz -C /usr/local/
[root@ubuntu2404 ~]#cd /usr/local/
[root@ubuntu2404 local]#ln -s kafka_2.13-3.8.0/ kafka

#进入kafka目录
[root@ubuntu2404 local]#cd kafka

#修改zookeeper配置
[root@ubuntu2404 kafka]#vim config/zookeeper.properties
dataDir=/data/zookeeper  #自动创建无需手动建

#启动内置zookeeper
[root@ubuntu2404 kafka]#bin/zookeeper-server-start.sh config/zookeeper.properties

#修改kafka配置
[root@ubuntu2404 kafka]#vim config/server.properties
log.dirs=/data/kafka-logs  #自动创建无需手动建

#启动kafka
[root@ubuntu2404 kafka]#bin/kafka-server-start.sh config/server.properties

#查看端口监听
[root@ubuntu2404 ~]#ss -ntlp
State           Recv-Q         Send-Q                   Local Address:Port                   Peer Address:Port         Process
LISTEN          0              128                        127.0.0.1:6010                      0.0.0.0:*             users:(("sshd",pid=1055,fd=7))
LISTEN          0              128                        127.0.0.1:6011                      0.0.0.0:*             users:(("sshd",pid=1568,fd=7))
LISTEN          0              128                        127.0.0.1:6012                      0.0.0.0:*             users:(("sshd",pid=4986,fd=7))
LISTEN          0              128                        127.0.0.1:6013                      0.0.0.0:*             users:(("sshd",pid=5554,fd=7))
LISTEN          0              4096                   127.0.0.53%lo:53                        0.0.0.0:*             users:(("systemd-resolve",pid=794,fd=14))
LISTEN          0              128                          0.0.0.0:22                        0.0.0.0:*             users:(("sshd",pid=839,fd=3))
LISTEN          0              128                            [::1]:6010                        [::]:*             users:(("sshd",pid=1055,fd=5))
LISTEN          0              128                            [::1]:6011                        [::]:*             users:(("sshd",pid=1568,fd=5))
LISTEN          0              128                            [::1]:6012                        [::]:*             users:(("sshd",pid=4986,fd=5))
LISTEN          0              128                            [::1]:6013                        [::]:*             users:(("sshd",pid=5554,fd=5))
LISTEN          0              50                                 *:42243                           *:*             users:(("java",pid=5088,fd=147))
LISTEN          0              50                                 *:9092                            *:*             users:(("java",pid=5088,fd=187))
LISTEN          0              50                                 *:2181                            *:*             users:(("java",pid=4556,fd=157))
LISTEN          0              50                                 *:45197                           *:*             users:(("java",pid=4556,fd=147))
LISTEN          0              128                             [::]:22                           [::]:*             users:(("sshd",pid=839,fd=4))

范例:一键安装 Kafka 单机版脚本

#!/bin/bash

KAFKA_VERSION=3.6.0
SCALA_VERSION=2.13
KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
KAFKA_INSTALL_DIR=/usr/local/kafka
HOST=`hostname -I|awk '{print $1}'`

. /etc/os-release

color () {
    RES_COL=60
    MOVE_TO_COL="echo -en \\033[${RES_COL}G"
    SETCOLOR_SUCCESS="echo -en \\033[1;32m"
    SETCOLOR_FAILURE="echo -en \\033[1;31m"
    SETCOLOR_WARNING="echo -en \\033[1;33m"
    SETCOLOR_NORMAL="echo -en \E[0m"
    echo -n "$1" && $MOVE_TO_COL
    echo -n "["
    if [ $2 = "success" -o $2 = "0" ] ;then
        ${SETCOLOR_SUCCESS}
        echo -n $" OK "
    elif [ $2 = "failure" -o $2 = "1" ] ;then
        ${SETCOLOR_FAILURE}
        echo -n $"FAILED"
    else
        ${SETCOLOR_WARNING}
        echo -n $"WARNING"
    fi
    ${SETCOLOR_NORMAL}
    echo -n "]"
    echo
}

env () {
    echo "$HOST $(hostname)" >> /etc/hosts
}

install_jdk() {
    java -version &>/dev/null && { color "JDK 已安装!" 1 ; return; }
    if command -v yum &>/dev/null ; then
        yum -y install java-1.8.0-openjdk-devel || { color "安装JDK失败!" 1; exit 1; }
    elif command -v apt &>/dev/null ; then
        apt update
        apt install openjdk-8-jdk -y || { color "安装JDK失败!" 1; exit 1; }
    else
        color "不支持当前操作系统!" 1
        exit 1
    fi
    java -version && { color "安装 JDK 完成!" 0 ; } || { color "安装JDK失败!" 1; exit 1; }
}

install_zookeeper() {
    cat > ${KAFKA_INSTALL_DIR}/bin/zookeeper-startup.sh <<EOF
#!/bin/bash
nohup ${KAFKA_INSTALL_DIR}/bin/zookeeper-server-start.sh ${KAFKA_INSTALL_DIR}/config/zookeeper.properties &
EOF
    chmod +x ${KAFKA_INSTALL_DIR}/bin/zookeeper-startup.sh

    cat > /lib/systemd/system/zookeeper.service <<EOF
[Unit]
Description=zookeeper.service
After=network.target

[Service]
Type=forking
ExecStart=${KAFKA_INSTALL_DIR}/bin/zookeeper-startup.sh
ExecStop=${KAFKA_INSTALL_DIR}/bin/zookeeper-server-stop.sh

[Install]
WantedBy=multi-user.target
EOF

    systemctl daemon-reload
    systemctl enable --now zookeeper.service

    if [ $? -eq 0 ] ;then
        color "zookeeper 安装成功!" 0
    else
        color "zookeeper 安装失败!" 1
        exit 1
    fi
}

install_kafka(){
    if [ ! -f kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz ];then
        wget -P /usr/local/src/ --no-check-certificate $KAFKA_URL || { color "下载失败!" 1 ;exit ; }
    fi

    tar xf /usr/local/src/${KAFKA_URL##*/} -C /usr/local/
    ln -s /usr/local/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_INSTALL_DIR}

    install_zookeeper

    echo "PATH=${KAFKA_INSTALL_DIR}/bin:'\$PATH'" >> /etc/profile

    cat > ${KAFKA_INSTALL_DIR}/bin/kafka-startup.sh <<EOF
#!/bin/bash
nohup ${KAFKA_INSTALL_DIR}/bin/kafka-server-start.sh ${KAFKA_INSTALL_DIR}/config/server.properties &
EOF
    chmod +x ${KAFKA_INSTALL_DIR}/bin/kafka-startup.sh

    cat > /lib/systemd/system/kafka.service <<EOF
[Unit]
Description=Apache kafka
After=network.target zookeeper.service

[Service]
Type=forking
ExecStart=${KAFKA_INSTALL_DIR}/bin/kafka-startup.sh
ExecStop=/bin/kill -TERM \${MAINPID}
Restart=always
RestartSec=20

[Install]
WantedBy=multi-user.target
EOF

    systemctl daemon-reload
    systemctl enable --now kafka.service

    if [ $? -eq 0 ] ;then
        color "kafka 安装成功!" 0
    else
        color "kafka 安装失败!" 1
        exit 1
    fi
}

env
install_jdk
install_kafka

image

1.2.3 基于KRaft 部署 kafka单机

注意:Kafka-4.0.0 版本不再支持Zookeeper,使用Kraft

范例:基于KRaft 部署 kafka单机

# Kafka 4.x 官方标准部署KRaft模式无ZooKeeper
# https://kafka.apache.org/quickstart

[root@ubuntu2404 ~]#apt update && apt -y install openjdk-21-jdk

# 下载 Kafka 4.x 版本
[root@ubuntu2404 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/4.1.0/kafka_2.13-4.1.0.tgz
[root@ubuntu2404 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/4.0.0/kafka_2.13-4.0.0.tgz

# 解压并创建软链接
[root@ubuntu2404 ~]#tar xf kafka_2.13-4.0.0.tgz -C /usr/local/
[root@ubuntu2404 ~]#cd /usr/local/
[root@ubuntu2404 local]#ln -s kafka_2.13-4.0.0/ kafka
[root@ubuntu2404 local]#cd kafka

# 生成集群唯一ID所有节点共用
[root@ubuntu2404 kafka]#KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

# 修改数据存储目录重要
[root@ubuntu2404 kafka]#vim config/server.properties
log.dirs=/data/kraft-combined-logs  # 自动创建无需手动建

# 格式化存储目录
[root@ubuntu2404 kafka]#bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

# 启动 KafkaKRaft 模式4.x 标准命令
[root@ubuntu2404 kafka]#bin/kafka-server-start.sh config/server.properties

# 查看端口9092 = Kafka9093 = Controller
[root@ubuntu2404 kafka]#ss -ntlp|grep java

1.3 集群部署

1.3.1 基于 zookeeper 模式的集群(重点)

1.3.1.1 环境准备

1.3.1.1.1 主机名解析

注意:每个kafka节点的主机名称解析需要提前准备,否则会导致失败

修改每个kafka节点的主机名

#修改每个kafka节点的主机名
[root@ubuntu2404 ~]#hostnamectl hostname node1
[root@ubuntu2404 ~]#hostnamectl hostname node2
[root@ubuntu2404 ~]#hostnamectl hostname node3

#在所有kafka节点上实现主机名称解析
[root@ubuntu2404 ~]#cat /etc/hosts
10.0.0.201 node1
10.0.0.202 node2
10.0.0.203 node3
1.3.1.1.2 安装 JAVA
[root@ubuntu24.04-node1 ~]#apt update && apt -y install openjdk-21-jre
[root@ubuntu24.04-node1 ~]#java -version
openjdk version "21.0.6" 2025-01-21
OpenJDK Runtime Environment (build 21.0.6+7-Ubuntu-124.04.1)
OpenJDK 64-Bit Server VM (build 21.0.6+7-Ubuntu-124.04.1, mixed mode, sharing)
[root@node1 ~]#apt update && apt install openjdk-8-jdk -y
1.3.1.1.3 安装和配置 ZooKeeper

4.0以前版本 Kafka 依赖 Zookeeper 服务,但以后将不再依赖

http://kafka.apache.org/quickstart
Note: Soon, ZooKeeper will no longer be required by Apache Kafka.

环境说明

#在三个Ubuntu18.04节点提前部署zookeeper和kafka三个节点复用
node1:10.0.0.201 
node2:10.0.0.202 
node3:10.0.0.203

注意:如果使用kafka自带的zookeeper,需要修改配置文件,如下示例

范例:

# 注意如果使用kafka自带的zookeeper需要修改配置文件
[root@ubuntu24.04-node1 ~]#apt update && apt -y install openjdk-21-jre
[root@node1 ~]#apt update && apt install openjdk-8-jdk -y

# 下载 Kafka
[root@node1 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.9.2/kafka_2.13-3.9.2.tgz
[root@node1 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.8.0/kafka_2.13-3.8.0.tgz

# 解压并创建软链接
[root@node1 ~]#tar xf kafka_2.13-3.8.0.tgz -C /usr/local
[root@node1 ~]#ln -s /usr/local/kafka_2.13-3.8.0/ /usr/local/kafka
[root@node1 ~]#ls /usr/local/kafka

# 集群配置必须配置时间相关
[root@node1 ~]#vim /usr/local/kafka/config/zookeeper.properties

# 必须添加时间相关配置
tickTime=2000
initLimit=10
syncLimit=5

# 保留下面内容
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

# 添加下面集群配置
dataDir=/usr/local/kafka/data/
server.1=10.0.0.201:2888:3888
server.2=10.0.0.202:2888:3888
server.3=10.0.0.203:2888:3888

# 创建数据目录并生成 myid
[root@node1 ~]#mkdir /usr/local/kafka/data
[root@node1 ~]#echo 1 > /usr/local/kafka/data/myid

# 同步到其他节点
[root@node1 ~]#rsync -av /usr/local/kafka* 10.0.0.202:/usr/local
[root@node1 ~]#rsync -av /usr/local/kafka* 10.0.0.203:/usr/local

# 修改各个节点的 MyID
[root@node2 ~]#echo 2 > /usr/local/kafka/data/myid
[root@node3 ~]#echo 3 > /usr/local/kafka/data/myid

# 在所有节点上启动 Zookeeper
[root@node1 ~]#/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties

1.3.1.2 各节点部署 Kafka

1.3.1.2.1 Kafka 节点配置

配置文件说明

############################# Server Basics ###############################
# brokerid,值为整数,且必须唯一,在一个集群中不能重复,此行必须修改
broker.id=1

############################# Socket Server Settings ######################
# kafka监听端口,默认9092
listeners=PLAINTEXT://10.0.0.101:9092
# 处理网络请求的线程数量,默认为3num.network.threads=3
# 执行磁盘IO操作的线程数量,默认为8num.io.threads=8
# socket服务发送数据的缓冲区大小,默认100KB
socket.send.buffer.bytes=102400
# socket服务接受数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400
# socket服务所能接受的一个请求的最大大小,默认为100M
socket.request.max.bytes=104857600

############################# Log Basics ###################################
# kafka存储消息数据的目录
log.dirs=/usr/local/kafka/data
# 每个topic默认的partition
num.partitions=1
# 设置副本数量为3,当LeaderReplication故障,会进行故障自动转移
default.replication.factor=3
# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################
# 消息刷新到磁盘中的消息条数阈值
log.flush.interval.messages=10000
# 消息刷新到磁盘中的最大时间间隔1s,单位是ms
log.flush.interval.ms=1000

############################# Log Retention Policy #########################
# 日志保留小时数,超时会自动删除,默认为7log.retention.hours=168
# 日志保留大小,超出大小会自动删除,默认为1G
#log.retention.bytes=1073741824
# 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
log.segment.bytes=1073741824
# 每隔多长时间检测数据是否达到删除条件,300s
log.retention.check.interval.ms=300000

############################# Zookeeper ####################################
# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开,此行必须修改
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
# 连接zookeeper的超时时间,6s
zookeeper.connection.timeout.ms=6000
# 是否允许删除topic,默认为falsetopic只会标记为marked for deletion
delete.topic.enable=true

范例:简要配置kafka_2.13-3.9.2

[root@node1 ~]#vim /usr/local/kafka/config/server.properties
#修改下面五行
broker.id=0 #每个节点不同
listeners=PLAINTEXT://10.0.0.201:9092 #每个节点不同
log.dirs=/usr/local/kafka/data
zookeeper.connect=10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181

范例:安装和配置kafka_2.13-3.9.2详细配置过程 在所有节点上执行安装java

#在所有节点上执行安装java
[root@node1 ~]#apt install openjdk-8-jdk -y

#在所有节点上执行下载,官方下载
[root@node1 ~]#wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
[root@node1 ~]#wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz

#国内镜像下载
[root@node1 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.9.2/kafka_2.13-3.9.2.tgz
[root@node1 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.3.1/kafka_2.13-3.3.1.tgz
[root@node1 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz
[root@node1 ~]#wget https://mirror.bit.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz

#解压缩
[root@node1 ~]#tar xf kafka_2.13-2.7.0.tgz -C /usr/local/
[root@node1 ~]#ln -s /usr/local/kafka_2.13-2.7.0/ /usr/local/kafka

#配置PATH变量
[root@node1 ~]#echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node1 ~]#. /etc/profile.d/kafka.sh

#修改配置文件
[root@node1 ~]#vim /usr/local/kafka/config/server.properties
broker.id=1 #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的meta.properties文件,修改此行
listeners=PLAINTEXT://10.0.0.101:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/local/kafka/data #kakfa用于保存数据的目录,所有的消息都会存储在该目录当中,修改此行
num.partitions=1 #设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
default.replication.factor=3 #指定默认的副本数为3,可以实现故障的自动转移
log.retention.hours=168 #设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 #指定连接的zk的地址,zk中存储了broker的元数据信息,修改此行
zookeeper.connection.timeout.ms=6000 #设置连接zookeeper的超时时间,单位为ms,默认6秒钟

#准备数据目录
[root@node1 ~]#mkdir /usr/local/kafka/data
[root@node1 ~]#scp /usr/local/kafka/config/server.properties 10.0.0.102:/usr/local/kafka/config
[root@node1 ~]#scp /usr/local/kafka/config/server.properties 10.0.0.103:/usr/local/kafka/config

#修改第2个节点配置
[root@node2 ~]#vim /usr/local/kafka/config/server.properties
broker.id=2 #每个broker 在集群中的唯一标识,正整数。
listeners=PLAINTEXT://10.0.0.102:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0

#修改第3个节点配置
[root@node3 ~]#vim /usr/local/kafka/config/server.properties
broker.id=3  #每个broker 在集群中的唯一标识,正整数。
listeners=PLAINTEXT://10.0.0.103:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0

#可以调整内存
[root@node1 ~]#vim /usr/local/kafka/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ] ; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
1.3.1.2.2 启动服务

在所有kafka节点执行下面操作

#前台启动
[root@node1 ~]#/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
#后台启动
[root@node1 ~]#/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
1.3.1.2.3 确保服务启动状态
[root@node1 ~]#ss -ntl|grep 9092
LISTEN  0        50         [::ffff:10.0.0.101]:9092                 *:*  
[root@node1 ~]#tail /usr/local/kafka/logs/server.log 
[2021-02-16 12:10:01,276] INFO [ExpirationReaper-1-AlterAcls]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-02-16 12:10:01,311] INFO [/config/changes-event-process-thread]: Starting 
(kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2021-02-16 12:10:01,332] INFO [SocketServer brokerId=1] Starting socket server acceptors and processors 
(kafka.network.SocketServer)
[2021-02-16 12:10:01,339] INFO [SocketServer brokerId=1] Started data-plane acceptor and processor(s) for
endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2021-02-16 12:10:01,340] INFO [SocketServer brokerId=1] Started socket server acceptors and processors 
(kafka.network.SocketServer)
[2021-02-16 12:10:01,344] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-16 12:10:01,344] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-16 12:10:01,344] INFO Kafka startTimeMs: 1613448601340 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-16 12:10:01,346] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
[2021-02-16 12:10:01,391] INFO [broker-1-to-controller-send-thread]: Recorded new controller, from now on will 
use broker 1 (kafka.server.BrokerToControllerRequestThread)
#如果使用id,还需要修改/usr/local/kafka/data/meta.properties
#打开zooinspector可以看到三个id

image

Broker 依赖于 Zookeeper,每个Broker 的id 和 Topic、Partition这些元数据信息都会写入Zookeeper 的 ZNode 节点中

consumer 依赖于Zookeeper,Consumer 在消费消息时,每消费完一条消息,会将产生的offset 保存到 Zookeeper 中,下次消费在当前offset往后继续消费.kafka0.9 之前Consumer 的offset 存储在 Zookeeper 中,kafka0.9 以后offset存储在本地。

Partition 依赖于 Zookeeper,Partition 完成Replication 备份后,选举出一个Leader,这个是依托于 Zookeeper 的选举机制实现的

1.3.1.2.4 准备Kafka的service文件
# Kafka systemd 服务配置(标准生产可用)
[root@node1 ~]# cat /lib/systemd/system/kafka.service

[Unit]
Description=Apache Kafka Server
After=network.target

[Service]
Type=simple
PIDFile=/usr/local/kafka/kafka.pid
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=always
RestartSec=20
User=root
Group=root

[Install]
WantedBy=multi-user.target

1.3.1.3 Kafka 集群部署脚本

#!/bin/bash

# 集群节点IP(根据实际环境修改)
NODE1=10.0.0.101
NODE2=10.0.0.102
NODE3=10.0.0.103

# Kafka版本
KAFKA_VERSION=3.5.1
SCALA_VERSION=2.13
KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"

KAFKA_INSTALL_DIR=/usr/local/kafka
HOST=$(hostname -I | awk '{print $1}')

. /etc/os-release

color() {
    RES_COL=60
    MOVE_TO_COL="echo -en \\033[${RES_COL}G"
    SETCOLOR_SUCCESS="echo -en \\033[1;32m"
    SETCOLOR_FAILURE="echo -en \\033[1;31m"
    SETCOLOR_WARNING="echo -en \\033[1;33m"
    SETCOLOR_NORMAL="echo -en \E[0m"
    echo -n "$1" && $MOVE_TO_COL
    echo -n "["
    if [ $2 = "success" -o $2 = "0" ]; then
        ${SETCOLOR_SUCCESS}
        echo -n " OK "
    elif [ $2 = "failure" -o $2 = "1" ]; then
        ${SETCOLOR_FAILURE}
        echo -n "FAILED"
    else
        ${SETCOLOR_WARNING}
        echo -n "WARNING"
    fi
    ${SETCOLOR_NORMAL}
    echo -n "]"
    echo
}

env() {
    if hostname -I | grep -q "$NODE1"; then
        ID=1
        hostnamectl set-hostname node1
    elif hostname -I | grep -q "$NODE2"; then
        ID=2
        hostnamectl set-hostname node2
    elif hostname -I | grep -q "$NODE3"; then
        ID=3
        hostnamectl set-hostname node3
    else
        color 'IP地址错误' 1
        exit
    fi

    cat >> /etc/hosts <<EOF
$NODE1   node1
$NODE2   node2
$NODE3   node3
EOF
}

install_jdk() {
    if java -version &>/dev/null; then
        color "JDK 已安装!" 1
        return
    fi

    if command -v yum &>/dev/null; then
        yum -y install java-1.8.0-openjdk-devel || { color "安装JDK失败!" 1; exit 1; }
    elif command -v apt &>/dev/null; then
        apt update
        apt install openjdk-8-jdk -y || { color "安装JDK失败!" 1; exit 1; }
    else
        color "不支持当前操作系统!" 1
        exit 1
    fi

    if java -version; then
        color "安装 JDK 完成!" 0
    else
        color "安装JDK失败!" 1
        exit 1
    fi
}

install_zookeeper() {
    mv ${KAFKA_INSTALL_DIR}/config/zookeeper.properties{,.bak}

    cat > ${KAFKA_INSTALL_DIR}/config/zookeeper.properties <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=${KAFKA_INSTALL_DIR}/data
clientPort=2181
maxClientCnxns=128
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=${NODE1}:2888:3888
server.2=${NODE2}:2888:3888
server.3=${NODE3}:2888:3888
EOF

    mkdir -p ${KAFKA_INSTALL_DIR}/data
    echo $ID > ${KAFKA_INSTALL_DIR}/data/myid

    cat > ${KAFKA_INSTALL_DIR}/bin/zookeeper-startup.sh <<EOF
#!/bin/bash
nohup ${KAFKA_INSTALL_DIR}/bin/zookeeper-server-start.sh ${KAFKA_INSTALL_DIR}/config/zookeeper.properties &
EOF

    chmod +x ${KAFKA_INSTALL_DIR}/bin/zookeeper-startup.sh

    cat > /lib/systemd/system/zookeeper.service <<EOF
[Unit]
Description=zookeeper.service
After=network.target

[Service]
Type=forking
ExecStart=${KAFKA_INSTALL_DIR}/bin/zookeeper-startup.sh
ExecStop=${KAFKA_INSTALL_DIR}/bin/zookeeper-server-stop.sh

[Install]
WantedBy=multi-user.target
EOF

    systemctl daemon-reload
    systemctl enable --now zookeeper.service

    if [ $? -eq 0 ]; then
        color "zookeeper 安装成功!" 0
    else
        color "zookeeper 安装失败!" 1
        exit 1
    fi
}

install_kafka() {
    if [ ! -f kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz ]; then
        wget -P /usr/local/src/ --no-check-certificate $KAFKA_URL || { color "下载失败!" 1; exit; }
    fi

    tar xf /usr/local/src/${KAFKA_URL##*/} -C /usr/local/
    ln -s /usr/local/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_INSTALL_DIR}

    install_zookeeper

    echo "PATH=${KAFKA_INSTALL_DIR}/bin:\$PATH" >> /etc/profile
    source /etc/profile

    mv ${KAFKA_INSTALL_DIR}/config/server.properties{,.bak}

    cat > ${KAFKA_INSTALL_DIR}/config/server.properties <<EOF
broker.id=$ID
listeners=PLAINTEXT://${HOST}:9092
log.dirs=${KAFKA_INSTALL_DIR}/data
num.partitions=3
default.replication.factor=3
log.retention.hours=168
zookeeper.connect=${NODE1}:2181,${NODE2}:2181,${NODE3}:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
EOF

    cat > ${KAFKA_INSTALL_DIR}/bin/kafka-startup.sh <<EOF
#!/bin/bash
nohup ${KAFKA_INSTALL_DIR}/bin/kafka-server-start.sh ${KAFKA_INSTALL_DIR}/config/server.properties &
EOF

    chmod +x ${KAFKA_INSTALL_DIR}/bin/kafka-startup.sh

    cat > /lib/systemd/system/kafka.service <<EOF
[Unit]
Description=Apache kafka
After=network.target zookeeper.service

[Service]
Type=forking
ExecStart=${KAFKA_INSTALL_DIR}/bin/kafka-startup.sh
ExecStop=/bin/kill -TERM \${MAINPID}
Restart=always
RestartSec=20

[Install]
WantedBy=multi-user.target
EOF

    systemctl daemon-reload
    systemctl enable --now kafka.service

    if [ $? -eq 0 ]; then
        color "kafka 安装成功!" 0
    else
        color "kafka 安装失败!" 1
        exit 1
    fi
}

env
install_jdk
install_kafka

1.3.2 基于Kraft模式的集群(了解)

1.3.2.1 基础环境准备

1、节点规划

准备3个节点(例如kafka01、kafka02、kafka03),每个节点同时作为Broker和Controller运行

确保节点间网络互通,并配置主机名解析(修改/etc/hosts或DNS)

2、软件安装

安装 JAVA

下载Kafka安装包(推荐 $3 . 0 +$ 版本),解压至统一目录(如 /opt/kafka )

所有节点安装相同版本的Kafka

案例: 基于Kraft模式的Kafka 集群基础环境准备

[root@ubuntu2404 ~]#cat >> /etc/hosts <<EOF

10.0.0.201 kafka01
10.0.0.202 kafka02
10.0.0.203 kafka03
EOF

[root@ubuntu2404 ~]#apt update && apt -y install openjdk-21-jre
[root@ubuntu2404 ~]#VERSION=4.0.0
[root@ubuntu2404 ~]#VERSION=3.9.0
[root@ubuntu2404 ~]#wget
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/${VERSION}/kafka_2.13-${VERSION}.tgz
[root@ubuntu2404 ~]#tar xf kafka_2.13-${VERSION}.tgz -c /usr/local/
[root@ubuntu2404 ~]#cd /usr/local/
[root@ubuntu2404 local]#ln -s kafka_2.13-${VERSION}/ kafka
[root@ubuntu2404 local]#cd kafka
[root@ubuntu2404 kafka]#pwd
/usr/local/kafka
[root@ubuntu2404 kafka]#ls
bin config libs LICENSE licenses NOTICE site-docs
[root@ubuntu2404 kafka]#tree -d
.
├── bin
   └── windows
├── config
   └── kraft
├── libs
├── licenses
└── site-docs
8 directories

1.3.2.2 集群配置

1、修改 server.properties 文件

关键参数配置,所有节点均需修改,以下以节点kafka01为例

范例: 4.0.0以后版本路径和配置

4.0.0以后版本路径和配置

#4.0.0以后版本路径和配置
[root@ubuntu2404 kafka]#vim config/server.properties
node.id=1 #每个节点不同
controller.quorum.bootstrap.servers=10.0.0.101:9093 #每个节点不同
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093 #添加此行
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 
advertised.listeners=PLAINTEXT://10.0.0.101:9092,CONTROLLER://10.0.0.101:9093 #每个节点不同
log.dirs=/data/kraft-combined-log

范例: 3.9以前旧版路径和配置

#3.9以前旧版路径和配置
[root@ubuntu2404 kafka]#vim config/kraft/server.properties
# 角色定义同时作为Broker和Controller
process.roles=broker,controller
# 节点唯一ID不同节点需不同
node.id=1
# 控制器节点列表格式node.id@host:port
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
# 监听地址Broker通信端口和Controller端口
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
# 对外暴露的地址根据实际IP或域名配置注意:不支持0.0.0.0
advertised.listeners=PLAINTEXT://kafka01:9092,CONTROLLER://kafka01:9093
# 消息数据存储目录
log.dirs=/data/kafka-logs
[root@ubuntu2404 kafka]#grep -Ev "#|^$" config/server.properties 
process.roles=broker,controller
node.id=1 #每个节点不同
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka01:9092,CONTROLLER://kafka01:9093 #每个节点不同
log.dirs=/data/kafka-logs
...

2、生成集群唯一ID

只在一个节点执行一次下面命令用于生成UUID

#所有节点使用相同的UUID
[root@ubuntu2404 kafka]#./bin/kafka-storage.sh random-uuid
# 示例输出iT0tvD1wSPGusA1SWlRNOQ

示例输出:iT0tvD1wSPGusA1SwlRNOQ

所有节点需使用同一集群ID

3、初始化元数据存储

每个节点均需执行此操作

# 使用生成的UUID初始化存储目录
#4.0.0版本路径
[root@ubuntu2404 kafka]#./bin/kafka-storage.sh format -t <生成的UUID> -c config/server.properties

#旧版
[root@ubuntu2404 kafka]#./bin/kafka-storage.sh format -t <生成的UUID> -c ./config/kraft/server.properties

1.3.2.3 集群启动

# 后台启动服务
#4.0.0以后版本路径
[root@ubuntu2404 kafka]#./bin/kafka-server-start.sh -daemon config/server.properties

#3.X.X以前版本路径
[root@ubuntu2404 kafka]#./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

1.3.2.4 集群验证

通过生产/消费消息验证集群可用性

创建Topic测试:

[root@ubuntu2404 kafka]#./bin/kafka-topics.sh --create --topic foo --partitions 3 --replication-factor 3 --
bootstrap-server kafka01:9092
[root@ubuntu2404 kafka]#./bin/kafka-topics.sh --list --bootstrap-server kafka01:9092
test

#生产消费信息
[root@ubuntu2404 kafka]#./bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic foo
[root@ubuntu2404 kafka]#./bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092 --from-beginning --
topic foo --group foo_group

检查节点状态:

[root@ubuntu2404 kafka]#./.bin/kafka-meta-data-shell.sh --snapshot /data/kafka-logs/_cluster_meta-data-0/0000000000000000000.1og

1.4 Kubernetes 部署 Kafka

基于Kubernetes的 Operator 安装 Kafka 集群

https://strimzi.io/quickstarts/

https://operatorhub.io/operator/strimzi-kafka-operator image

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 3.8.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.8"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}