Skip to content

Kubernetes 部署 Kafka 集群

Kafka 是一个分布式流处理平台,广泛用于消息队列、日志收集、事件驱动架构等场景。本文详细介绍在 Kubernetes 环境中使用 StatefulSet 部署 Kafka 集群的完整方案,支持 ZooKeeper 和 KRaft 两种模式。

架构介绍

Kafka 集群架构

Kafka 是一个分布式的流平台,其核心概念包括:Topic(主题)用于分类消息;Partition(分区)实现数据分片;Replica(副本)保证高可用;Broker(代理)存储分区副本。

在 Kubernetes 环境中,Kafka 集群的典型部署包括:多个 Kafka Broker Pod(3-5个);ZooKeeper 集成(3个奇数)或使用 KRaft 模式;使用 PVC 实现数据持久化;通过 Service 实现服务发现。

架构拓扑

┌─────────────────────────────────────────────────────────────┐
│                    Kubernetes Cluster                       │
│                                                      │
│  ┌─────────────────────────────────────────────────────┐  │
│  │          Kafka Brokers (3+)                        │  │
│  │    kafka-0    kafka-1    kafka-2                  │  │
│  │      │         │         │                         │  │
│  │   PVC:10G   PVC:10G   PVC:10G                   │  │
│  └─────────────────────────────────────────────────────┘  │
│                         │                               │
│  ┌─────────────────────────────────────────────────────┐  │
│  │          ZooKeeper (3)  或  KRaft                    │  │
│  │    zk-0     zk-1     zk-2                        │  │
│  └─────────────────────────────────────────────────────┘  │
│                                                      │
│  ┌────────────────┐  ┌───────────────────┐                │
│  │ kafka-broker  │  │  kafka-headless  │                │
│  │  Service     │  │   Service       │                │
│  └────────────────┘  └───────────────────┘                │
└─────────────────────────────────────────────────────────┘

KRaft vs ZooKeeper

Kafka 2.8+ 版本引入了 KRaft 模式,Kafka 不再依赖 ZooKeeper。KRaft 模式的优点包括:部署更简单,减少了组件;更好的扩展性;更快的启动时间。

部署资源清单(KRaft 模式)

1. 命名空间

yaml
apiVersion: v1
kind: Namespace
metadata:
  name: kafka
  labels:
    name: kafka
    environment: production

2. ConfigMap 配置

yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-config
  namespace: kafka
data:
  server.properties: |
    # 基础配置
    broker.id=${BROKER_ID}
    listeners=PLAINTEXT://:9092,CONTROLLER://:9093
    advertised.listeners=PLAINTEXT://${HOSTNAME}.kafka-headless.kafka.svc.cluster.local:9092
    controller.listener.names=CONTROLLER
    controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS}
    
    # 日志配置
    log.dirs=/var/lib/kafka/data
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    
    # 分区和副本配置
    num.partitions=3
    default.replication.factor=2
    min.insync.replicas=1
    
    # 日志保留配置
    log.retention.hours=168
    log.retention.check.interval.ms=300000
    log.segment.bytes=1073741824
    
    # 压缩配置
    compression.type=producer
    
    # 内存配置
    heap.opts=-Xmx2G -Xms2G
    
    # ZooKeeper 配置(KRaft 模式不需要)
    # zookeeper.connect=zk-headless.kafka.svc.cluster.local:2181
    
    # KRaft 配置
    process.roles=${PROCESS_ROLES}
    node.id=${NODE_ID}
    controller.quorum.voters=${CONTROLLER_QUORUM_VOTERS}
    
    # ACL 配置
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    allow.everyone.if.no.acl.found=false
    
    # 监控指标
    metric.reporters=org.apache.kafka.common.metrics.JmxReporter
    jmx.port=9999

  log4j.properties: |
    kafka.root.logger.level=INFO
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=[%d] %p %m (%c)%n
    log4j.rootLogger=INFO, console

3. Service 配置

yaml
apiVersion: v1
kind: Service
metadata:
  name: kafka-headless
  namespace: kafka
  labels:
    app: kafka
spec:
  clusterIP: None
  ports:
  - name: kafka
    port: 9092
  - name: jmx
    port: 9999
  selector:
    app: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-broker
  namespace: kafka
  labels:
    app: kafka
spec:
  type: ClusterIP
  ports:
  - name: kafka
    port: 9092
    targetPort: 9092
  selector:
    app: kafka

4. Kafka StatefulSet(KRaft 模式)

yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
spec:
  serviceName: kafka-headless
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: kafka
    spec:
      terminationGracePeriodSeconds: 30
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchLabels:
                app: kafka
            topologyKey: kubernetes.io/hostname
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:7.5.0
        imagePullPolicy: IfNotPresent
        ports:
        - name: kafka
          containerPort: 9092
        - name: controller
          containerPort: 9093
        - name: jmx
          containerPort: 9999
        env:
        - name: KAFKA_HEAP_OPTS
          value: "-Xmx2g -Xms2g"
        - name: KAFKA_JMX_OPTS
          value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false"
        - name: BROKER_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: NODE_ID
          valueFrom:
            statefulsetAnnotation: kubernetes.io/enforce_replica_assignment
              # 需要处理
        - name: PROCESS_ROLES
          value: "broker,controller"
        - name: CONTROLLER_QUORUM_VOTERS
          value: "0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093"
        - name: CLUSTER_ID
          value: "MkU3VGtTa19LUkcwT0dGMjBTWjJHRE9GRjFGRjFT"
        - name: KAFKA_OPTS
          value: "-Djava.security.egd=file:/dev/./urandom"
        - name: KAFKA_LOG_DIRS
          value: "/var/lib/kafka/data"
        - name: HOSTNAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        resources:
          requests:
            cpu: 500m
            memory: 2Gi
          limits:
            cpu: 2000m
            memory: 4Gi
        volumeMounts:
        - name: kafka-data
          mountPath: /var/lib/kafka/data
        - name: kafka-config
          mountPath: /etc/kafka
        livenessProbe:
          exec:
            command:
            - kafka-broker-api-versions
            - --bootstrap-server
            - localhost:9092
          initialDelaySeconds: 60
          periodSeconds: 30
        readinessProbe:
          exec:
            command:
            - kafka-broker-api-versions
            - --bootstrap-server
            - localhost:9092
          initialDelaySeconds: 30
          periodSeconds: 10
      volumes:
      - name: kafka-data
        persistentVolumeClaim:
          claimName: kafka-data
      - name: kafka-config
        configMap:
          name: kafka-config
          items:
          - key: server.properties
            path: server.properties
  volumeClaimTemplates:
  - metadata:
      name: kafka-data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: fast
      resources:
        requests:
          storage: 20Gi

5. StorageClass

yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: fast
provisioner: kubernetes.io/gce-pd
parameters:
  type: pd-ssd
allowVolumeExpansion: true
reclaimPolicy: Retain

6. Init 容器(可选)

用于设置正确的 BR OKER_ID:

yaml
initContainers:
- name: init-kafka
  image: confluentinc/cp-kafka:7.5.0
  command:
  - sh
  - -c
  - |
    #!/bin/bash
    set -ex
    
    # 获取 Pod 序号
    ordinal=${HOSTNAME##*-}
    
    # 设置环境变量
    export BROKER_ID=${ordinal}
    export NODE_ID=${ordinal}
    
    echo "Broker ID: ${BROKER_ID}"
  env:
  - name: HOSTNAME
    valueFrom:
      fieldRef:
        fieldPath: metadata.name
  volumeMounts:
  - name: kafka-data
    mountPath: /var/lib/kafka/data

部署资源清单(ZooKeeper 模式)

如果使用 ZooKeeper 模式,需要额外部署 ZooKeeper 集群。

1. ZooKeeper ConfigMap

yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: zookeeper-config
  namespace: kafka
data:
 zoo.cfg: |
  dataDir=/var/lib/zookeeper/data
  dataLogDir=/var/lib/zookeeper/log
  tickTime=2000
  initLimit=10
  syncLimit=5
  maxClientCnxns=0
  4lw.commands.whitelist=*
  admin.enableServer=false

2. ZooKeeper Service

yaml
apiVersion: v1
kind: Service
metadata:
  name: zk-headless
  namespace: kafka
spec:
  clusterIP: None
  ports:
  - name: client
    port: 2181
  - name: quorum
    port: 2888
  - name: leader-election
    port: 3888
  selector:
    app: zookeeper
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper
  namespace: kafka
spec:
  type: ClusterIP
  ports:
  - name: client
    port: 2181
    targetPort: 2181
  selector:
    app: zookeeper

3. ZooKeeper StatefulSet

yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper
  namespace: kafka
spec:
  serviceName: zk-headless
  replicas: 3
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      terminationGracePeriodSeconds: 30
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchLabels:
                app: zookeeper
            topologyKey: kubernetes.io/hostname
      containers:
      - name: zookeeper
        image: confluentinc/cp-zookeeper:7.5.0
        imagePullPolicy: IfNotPresent
        ports:
        - name: client
          containerPort: 2181
        - name: quorum
          containerPort: 2888
        - name: leader-election
          containerPort: 3888
        env:
        - name: ZOOKEEPER_CLIENT_PORT
          value: "2181"
        - name: ZOOKEEPER_TICK_TIME
          value: "2000"
        - name: ZOOKEEPER_INIT_LIMIT
          value: "10"
        - name: ZOOKEEPER_SYNC_LIMIT
          value: "5"
        - name: ZOOKEEPER_SERVERS
          value: "1"
        - name: ZOOKEEPER_MY_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        resources:
          requests:
            cpu: 100m
            memory: 256Mi
          limits:
            cpu: 500m
            memory: 1Gi
        volumeMounts:
        - name: zookeeper-data
          mountPath: /var/lib/zookeeper/data
        - name: zookeeper-logs
          mountPath: /var/lib/zookeeper/log
        livenessProbe:
          exec:
            command:
            - zookeeper-shell
            - localhost:2181
            - ruok
          initialDelaySeconds: 30
          periodSeconds: 10
      volumes:
      - name: zookeeper-data
        persistentVolumeClaim:
          claimName: zookeeper-data
      - name: zookeeper-logs
        emptyDir: {}
  volumeClaimTemplates:
  - metadata:
      name: zookeeper-data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: fast
      resources:
        requests:
          storage: 5Gi

4. Kafka 使用 ZooKeeper 连接

修改 Kafka 的 ConfigMap:

yaml
env:
- name: KAFKA_ZOOKEEPER_CONNECT
  value: "zookeeper.kafka.svc.cluster.local:2181"

部署步骤

KRaft 模式部署

bash
# 1. 创建命名空间
kubectl apply -f 00-namespace.yaml

# 2. 创建配置
kubectl apply -f 01-configmap.yaml

# 3. 创建 StorageClass
kubectl apply -f 02-storageclass.yaml

# 4. 创建 Services
kubectl apply -f 03-services.yaml

# 5. 创建 Kafka StatefulSet
kubectl apply -f 04-kafka.yaml

# 6. 验证部署
kubectl get pods -n kafka
kubectl get svc -n kafka
kubectl get pvc -n kafka

ZooKeeper 模式部署

bash
# 1. 创建命名空间
kubectl apply -f 00-namespace.yaml

# 2. 创建配置(Kafka + ZooKeeper)
kubectl apply -f 01-kafka-configmap.yaml
kubectl apply -f 01-zookeeper-configmap.yaml

# 3. 创建 StorageClass
kubectl apply -f 02-storageclass.yaml

# 4. 创建 Services
kubectl apply -f 03-kafka-services.yaml
kubectl apply -f 03-zookeeper-services.yaml

# 5. 创建 StatefulSets
kubectl apply -f 04-zookeeper.yaml
kubectl apply -f 05-kafka.yaml

验证 Kafka 集群

bash
# 进入容器测试
kubectl exec -it kafka-0 -n kafka -- kafka-broker-api-versions --bootstrap-server localhost:9092

# 创建 Topic
kubectl exec -it kafka-0 -n kafka -- kafka-topics --create --topic test-topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

# 查看 Topic
kubectl exec -it kafka-0 -n kafka -- kafka-topics --list --bootstrap-server localhost:9092

# 生产消息
echo "test message" | kubectl exec -i kafka-0 -n kafka -- kafka-console-producer --topic test-topic --bootstrap-server localhost:9092

# 消费消息
kubectl exec -it kafka-0 -n kafka -- kafka-console-consumer --topic test-topic --from-beginning --max-messages 5 --bootstrap-server localhost:9092

使用示例

Java 应用配置

java
// Maven 依赖
// <dependency>
//     <groupId>org.apache.kafka</groupId>
//     <artifactId>kafka-clients</artifactId>
//     <version>3.5.0</version>
// </dependency>

// 生产者配置
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "kafka-broker.kafka.svc.cluster.local:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

// 消费者配置
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "kafka-broker.kafka.svc.cluster.local:9092");
consumerProps.put("group.id", "my-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

Spring Boot 配置

yaml
spring:
  kafka:
    bootstrap-servers: kafka-broker.kafka.svc.cluster.local:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-consumer-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      security.protocol: PLAINTEXT

Python 应用配置

python
from kafka import KafkaProducer, KafkaConsumer

# 生产者
producer = KafkaProducer(
    bootstrap_servers='kafka-broker.kafka.svc.cluster.local:9092',
    value_serializer=lambda v: v.encode('utf-8')
)
producer.send('test-topic', value='test message')

# 消费者
consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers='kafka-broker.kafka.svc.cluster.local:9092',
    group_id='my-consumer-group',
    value_deserializer=lambda v: v.decode('utf-8')
)

监控和告警

JMX 监控配置

yaml
apiVersion: v1
kind: Service
metadata:
  name: kafka-jmx
  namespace: kafka
spec:
  type: ClusterIP
  ports:
  - name: jmx
    port: 9999
    targetPort: 9999
  selector:
    app: kafka

Prometheus 规则

yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: kafka-alerts
  namespace: kafka
spec:
  groups:
  - name: kafka.rules
    rules:
    - alert: KafkaDown
      expr: up{job="kafka"} == 0
      for: 2m
    - alert: KafkaUnderReplicated
      expr: kafka_server_replica_manager_underreplicated_partitions > 0
      for: 5m
    - alert: KafkaDiskSpaceLow
      expr: kafka_server_log_log_free_space_bytes / kafka_server_log_log_total_space_bytes < 0.1
      for: 5m

运维操作

扩缩容

bash
# 扩容(需要确保分区重新分配)
kubectl scale statefulset kafka -n kafka --replicas=5

# 手动重新均衡(可选)
kubectl exec -it kafka-0 -n kafka -- kafka-leader-election --bootstrap-server localhost:9092 --topic test-topic --election-type preferred

备份和恢复

bash
# 使用 MirrorMaker2 进行跨集群复制
# 或使用 Kafka Connect 进行数据导出

常见问题排查

Broker 无法启动

检查 ZooKeeper/KRaft 配置是否正确;验证存储卷是否正常挂载;检查端口冲突。

分区无法分配

可能原因包括副本数大于 Broker 数量;存储空间不足。网络连通性检查。

消息延迟

检查网络带宽;优化消费者批处理大小;增加 Broker 数量。

总结

本文提供了在 Kubernetes 环境中部署 Kafka 集群的完整方案。KRaft 模式推荐在 Kafka 2.8+ 版本中使用,ZooKeeper 模式适用于旧版本。根据业务数据量和吞吐量需求配置合适的 Broker 数量和存储容量。