Kubernetes 部署 Kafka 集群
Kafka 是一个分布式流处理平台,广泛用于消息队列、日志收集、事件驱动架构等场景。本文详细介绍在 Kubernetes 环境中使用 StatefulSet 部署 Kafka 集群的完整方案,支持 ZooKeeper 和 KRaft 两种模式。
架构介绍
Kafka 集群架构
Kafka 是一个分布式的流平台,其核心概念包括:Topic(主题)用于分类消息;Partition(分区)实现数据分片;Replica(副本)保证高可用;Broker(代理)存储分区副本。
在 Kubernetes 环境中,Kafka 集群的典型部署包括:多个 Kafka Broker Pod(3-5个);ZooKeeper 集成(3个奇数)或使用 KRaft 模式;使用 PVC 实现数据持久化;通过 Service 实现服务发现。
架构拓扑
┌─────────────────────────────────────────────────────────────┐
│ Kubernetes Cluster │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Brokers (3+) │ │
│ │ kafka-0 kafka-1 kafka-2 │ │
│ │ │ │ │ │ │
│ │ PVC:10G PVC:10G PVC:10G │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ZooKeeper (3) 或 KRaft │ │
│ │ zk-0 zk-1 zk-2 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────┐ ┌───────────────────┐ │
│ │ kafka-broker │ │ kafka-headless │ │
│ │ Service │ │ Service │ │
│ └────────────────┘ └───────────────────┘ │
└─────────────────────────────────────────────────────────┘KRaft vs ZooKeeper
Kafka 2.8+ 版本引入了 KRaft 模式,Kafka 不再依赖 ZooKeeper。KRaft 模式的优点包括:部署更简单,减少了组件;更好的扩展性;更快的启动时间。
部署资源清单(KRaft 模式)
1. 命名空间
yaml
apiVersion: v1
kind: Namespace
metadata:
name: kafka
labels:
name: kafka
environment: production2. ConfigMap 配置
yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config
namespace: kafka
data:
server.properties: |
# 基础配置
broker.id=${BROKER_ID}
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://${HOSTNAME}.kafka-headless.kafka.svc.cluster.local:9092
controller.listener.names=CONTROLLER
controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS}
# 日志配置
log.dirs=/var/lib/kafka/data
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 分区和副本配置
num.partitions=3
default.replication.factor=2
min.insync.replicas=1
# 日志保留配置
log.retention.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
# 压缩配置
compression.type=producer
# 内存配置
heap.opts=-Xmx2G -Xms2G
# ZooKeeper 配置(KRaft 模式不需要)
# zookeeper.connect=zk-headless.kafka.svc.cluster.local:2181
# KRaft 配置
process.roles=${PROCESS_ROLES}
node.id=${NODE_ID}
controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS}
# ACL 配置
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
# 监控指标
metric.reporters=org.apache.kafka.common.metrics.JmxReporter
jmx.port=9999
log4j.properties: |
kafka.root.logger.level=INFO
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.rootLogger=INFO, console3. Service 配置
yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-headless
namespace: kafka
labels:
app: kafka
spec:
clusterIP: None
ports:
- name: kafka
port: 9092
- name: jmx
port: 9999
selector:
app: kafka
---
apiVersion: v1
kind: Service
metadata:
name: kafka-broker
namespace: kafka
labels:
app: kafka
spec:
type: ClusterIP
ports:
- name: kafka
port: 9092
targetPort: 9092
selector:
app: kafka4. Kafka StatefulSet(KRaft 模式)
yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: kafka
spec:
serviceName: kafka-headless
replicas: 3
selector:
matchLabels:
app: kafka
podManagementPolicy: Parallel
template:
metadata:
labels:
app: kafka
spec:
terminationGracePeriodSeconds: 30
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
app: kafka
topologyKey: kubernetes.io/hostname
containers:
- name: kafka
image: confluentinc/cp-kafka:7.5.0
imagePullPolicy: IfNotPresent
ports:
- name: kafka
containerPort: 9092
- name: controller
containerPort: 9093
- name: jmx
containerPort: 9999
env:
- name: KAFKA_HEAP_OPTS
value: "-Xmx2g -Xms2g"
- name: KAFKA_JMX_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false"
- name: BROKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NODE_ID
valueFrom:
statefulsetAnnotation: kubernetes.io/enforce_replica_assignment
# 需要处理
- name: PROCESS_ROLES
value: "broker,controller"
- name: CONTROLLER_QUORUM_VOTERS
value: "0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093"
- name: CLUSTER_ID
value: "MkU3VGtTa19LUkcwT0dGMjBTWjJHRE9GRjFGRjFT"
- name: KAFKA_OPTS
value: "-Djava.security.egd=file:/dev/./urandom"
- name: KAFKA_LOG_DIRS
value: "/var/lib/kafka/data"
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
cpu: 500m
memory: 2Gi
limits:
cpu: 2000m
memory: 4Gi
volumeMounts:
- name: kafka-data
mountPath: /var/lib/kafka/data
- name: kafka-config
mountPath: /etc/kafka
livenessProbe:
exec:
command:
- kafka-broker-api-versions
- --bootstrap-server
- localhost:9092
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
exec:
command:
- kafka-broker-api-versions
- --bootstrap-server
- localhost:9092
initialDelaySeconds: 30
periodSeconds: 10
volumes:
- name: kafka-data
persistentVolumeClaim:
claimName: kafka-data
- name: kafka-config
configMap:
name: kafka-config
items:
- key: server.properties
path: server.properties
volumeClaimTemplates:
- metadata:
name: kafka-data
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: fast
resources:
requests:
storage: 20Gi5. StorageClass
yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: fast
provisioner: kubernetes.io/gce-pd
parameters:
type: pd-ssd
allowVolumeExpansion: true
reclaimPolicy: Retain6. Init 容器(可选)
用于设置正确的 BR OKER_ID:
yaml
initContainers:
- name: init-kafka
image: confluentinc/cp-kafka:7.5.0
command:
- sh
- -c
- |
#!/bin/bash
set -ex
# 获取 Pod 序号
ordinal=${HOSTNAME##*-}
# 设置环境变量
export BROKER_ID=${ordinal}
export NODE_ID=${ordinal}
echo "Broker ID: ${BROKER_ID}"
env:
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- name: kafka-data
mountPath: /var/lib/kafka/data部署资源清单(ZooKeeper 模式)
如果使用 ZooKeeper 模式,需要额外部署 ZooKeeper 集群。
1. ZooKeeper ConfigMap
yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: zookeeper-config
namespace: kafka
data:
zoo.cfg: |
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/log
tickTime=2000
initLimit=10
syncLimit=5
maxClientCnxns=0
4lw.commands.whitelist=*
admin.enableServer=false2. ZooKeeper Service
yaml
apiVersion: v1
kind: Service
metadata:
name: zk-headless
namespace: kafka
spec:
clusterIP: None
ports:
- name: client
port: 2181
- name: quorum
port: 2888
- name: leader-election
port: 3888
selector:
app: zookeeper
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper
namespace: kafka
spec:
type: ClusterIP
ports:
- name: client
port: 2181
targetPort: 2181
selector:
app: zookeeper3. ZooKeeper StatefulSet
yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper
namespace: kafka
spec:
serviceName: zk-headless
replicas: 3
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
terminationGracePeriodSeconds: 30
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
app: zookeeper
topologyKey: kubernetes.io/hostname
containers:
- name: zookeeper
image: confluentinc/cp-zookeeper:7.5.0
imagePullPolicy: IfNotPresent
ports:
- name: client
containerPort: 2181
- name: quorum
containerPort: 2888
- name: leader-election
containerPort: 3888
env:
- name: ZOOKEEPER_CLIENT_PORT
value: "2181"
- name: ZOOKEEPER_TICK_TIME
value: "2000"
- name: ZOOKEEPER_INIT_LIMIT
value: "10"
- name: ZOOKEEPER_SYNC_LIMIT
value: "5"
- name: ZOOKEEPER_SERVERS
value: "1"
- name: ZOOKEEPER_MY_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 1Gi
volumeMounts:
- name: zookeeper-data
mountPath: /var/lib/zookeeper/data
- name: zookeeper-logs
mountPath: /var/lib/zookeeper/log
livenessProbe:
exec:
command:
- zookeeper-shell
- localhost:2181
- ruok
initialDelaySeconds: 30
periodSeconds: 10
volumes:
- name: zookeeper-data
persistentVolumeClaim:
claimName: zookeeper-data
- name: zookeeper-logs
emptyDir: {}
volumeClaimTemplates:
- metadata:
name: zookeeper-data
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: fast
resources:
requests:
storage: 5Gi4. Kafka 使用 ZooKeeper 连接
修改 Kafka 的 ConfigMap:
yaml
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper.kafka.svc.cluster.local:2181"部署步骤
KRaft 模式部署
bash
# 1. 创建命名空间
kubectl apply -f 00-namespace.yaml
# 2. 创建配置
kubectl apply -f 01-configmap.yaml
# 3. 创建 StorageClass
kubectl apply -f 02-storageclass.yaml
# 4. 创建 Services
kubectl apply -f 03-services.yaml
# 5. 创建 Kafka StatefulSet
kubectl apply -f 04-kafka.yaml
# 6. 验证部署
kubectl get pods -n kafka
kubectl get svc -n kafka
kubectl get pvc -n kafkaZooKeeper 模式部署
bash
# 1. 创建命名空间
kubectl apply -f 00-namespace.yaml
# 2. 创建配置(Kafka + ZooKeeper)
kubectl apply -f 01-kafka-configmap.yaml
kubectl apply -f 01-zookeeper-configmap.yaml
# 3. 创建 StorageClass
kubectl apply -f 02-storageclass.yaml
# 4. 创建 Services
kubectl apply -f 03-kafka-services.yaml
kubectl apply -f 03-zookeeper-services.yaml
# 5. 创建 StatefulSets
kubectl apply -f 04-zookeeper.yaml
kubectl apply -f 05-kafka.yaml验证 Kafka 集群
bash
# 进入容器测试
kubectl exec -it kafka-0 -n kafka -- kafka-broker-api-versions --bootstrap-server localhost:9092
# 创建 Topic
kubectl exec -it kafka-0 -n kafka -- kafka-topics --create --topic test-topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
# 查看 Topic
kubectl exec -it kafka-0 -n kafka -- kafka-topics --list --bootstrap-server localhost:9092
# 生产消息
echo "test message" | kubectl exec -i kafka-0 -n kafka -- kafka-console-producer --topic test-topic --bootstrap-server localhost:9092
# 消费消息
kubectl exec -it kafka-0 -n kafka -- kafka-console-consumer --topic test-topic --from-beginning --max-messages 5 --bootstrap-server localhost:9092使用示例
Java 应用配置
java
// Maven 依赖
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-clients</artifactId>
// <version>3.5.0</version>
// </dependency>
// 生产者配置
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "kafka-broker.kafka.svc.cluster.local:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// 消费者配置
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "kafka-broker.kafka.svc.cluster.local:9092");
consumerProps.put("group.id", "my-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);Spring Boot 配置
yaml
spring:
kafka:
bootstrap-servers: kafka-broker.kafka.svc.cluster.local:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-consumer-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
security.protocol: PLAINTEXTPython 应用配置
python
from kafka import KafkaProducer, KafkaConsumer
# 生产者
producer = KafkaProducer(
bootstrap_servers='kafka-broker.kafka.svc.cluster.local:9092',
value_serializer=lambda v: v.encode('utf-8')
)
producer.send('test-topic', value='test message')
# 消费者
consumer = KafkaConsumer(
'test-topic',
bootstrap_servers='kafka-broker.kafka.svc.cluster.local:9092',
group_id='my-consumer-group',
value_deserializer=lambda v: v.decode('utf-8')
)监控和告警
JMX 监控配置
yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-jmx
namespace: kafka
spec:
type: ClusterIP
ports:
- name: jmx
port: 9999
targetPort: 9999
selector:
app: kafkaPrometheus 规则
yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: kafka-alerts
namespace: kafka
spec:
groups:
- name: kafka.rules
rules:
- alert: KafkaDown
expr: up{job="kafka"} == 0
for: 2m
- alert: KafkaUnderReplicated
expr: kafka_server_replica_manager_underreplicated_partitions > 0
for: 5m
- alert: KafkaDiskSpaceLow
expr: kafka_server_log_log_free_space_bytes / kafka_server_log_log_total_space_bytes < 0.1
for: 5m运维操作
扩缩容
bash
# 扩容(需要确保分区重新分配)
kubectl scale statefulset kafka -n kafka --replicas=5
# 手动重新均衡(可选)
kubectl exec -it kafka-0 -n kafka -- kafka-leader-election --bootstrap-server localhost:9092 --topic test-topic --election-type preferred备份和恢复
bash
# 使用 MirrorMaker2 进行跨集群复制
# 或使用 Kafka Connect 进行数据导出常见问题排查
Broker 无法启动
检查 ZooKeeper/KRaft 配置是否正确;验证存储卷是否正常挂载;检查端口冲突。
分区无法分配
可能原因包括副本数大于 Broker 数量;存储空间不足。网络连通性检查。
消息延迟
检查网络带宽;优化消费者批处理大小;增加 Broker 数量。
总结
本文提供了在 Kubernetes 环境中部署 Kafka 集群的完整方案。KRaft 模式推荐在 Kafka 2.8+ 版本中使用,ZooKeeper 模式适用于旧版本。根据业务数据量和吞吐量需求配置合适的 Broker 数量和存储容量。
