Skip to the content.

kubernetes集群中的kafka服务配置

kubernetes集群中的kafka服务配置

zookeeper Dockerfile https://github.com/31z4/zookeeper-docker/tree/master/3.5.6

kafka Dockerfile https://github.com/bbotte/bbotte.github.io/tree/master/Commonly-Dockerfile/kafka-cluster

如果是单机跑kafka服务,那么用docker-compose更方便,配置如下:

version: '2.4'
services:
  zookeeper-com:
    container_name: zookeeper-com
    image: zookeeper   #zookeeper image: https://github.com/31z4/zookeeper-docker/tree/master/3.5.6
    ports:
     - 2181:2181
    networks:
     - bbotte
    environment:
      ZOO_MY_ID: "1"
      ZOO_SERVERS: "server.1=0.0.0.0:2888:3888;2181 "
 
  kafka-com:
    container_name: kafka-com
    image: kafka:0.1
    ports:
     - 9092:9092
     - 9094:9094
    networks:
     - bbotte
    environment:
      KAFKA_BROKER_ID: "1"
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper-com:2181"
      KAFKA_ADVERTISED_LISTENERS: "INSIDE://:9092,OUTSIDE://kafka-com:9094"
      KAFKA_LISTENERS: "INSIDE://:9092,OUTSIDE://:9094"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INSIDE"
 
networks:
  bbotte:
    external: false
cd kafka_2.12-2.3.0/bin/
./kafka-topics.sh --create --bootstrap-server kafka-com:9094 --replication-factor 1 --partitions 1  --topic test
./kafka-topics.sh --list --bootstrap-server kafka-com:9094
 
./kafka-console-producer.sh --broker-list kafka-com:9094 --topic test
./kafka-console-consumer.sh --bootstrap-server kafka-com:9094 --topic test --from-beginning

连接kafka-com需要dns解析到这台docker-compose主机,简单的方式为添加hosts即可测试。下面说一下在kubernetes中的配置

# cat kafka.yaml
apiVersion: apps/v1beta2
kind: Deployment
metadata:
  name: kafka
  namespace: dev
  labels:
    k8s-app: kafka
spec:
  replicas: 1
  revisionHistoryLimit: 1
  minReadySeconds: 10
  strategy:
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      k8s-app: kafka
  template:
    metadata:
      labels:
        k8s-app: kafka
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: kubernetes.io/hostname
                operator: In
                values:
                - k8sn1
      containers:
      - name: kafka
        imagePullPolicy: IfNotPresent
        image: kafka:0.1
        resources:
          limits:
            memory: "2Gi"
          requests:
            memory: "0.1Gi"
            cpu: "0.1"
        ports:
        - containerPort: 9092
        #- containerPort: 9094
        - containerPort: 30322
        env:
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_CREATE_TOPICS
          value: "test:1:1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: "zookeeper-com:2181"
        - name: KAFKA_ADVERTISED_LISTENERS
          #value: "INSIDE://:9092,OUTSIDE://kafka-com:9094"
          value: "INSIDE://:9092,OUTSIDE://kafka-com:30322"
        - name: KAFKA_LISTENERS
          #value: "INSIDE://:9092,OUTSIDE://:9094"
          value: "INSIDE://:9092,OUTSIDE://:30322"
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: "INSIDE" 
 
        volumeMounts:
        - name: kafka-data
          mountPath: /data
      volumes:
      - name: kafka-data
        persistentVolumeClaim:
          claimName: gfsdev
      imagePullSecrets:
      - name: harbor-auth-dev
 
---
kind: Service
apiVersion: v1
metadata:
  name: kafka-com
  namespace: dev
  labels:
    k8s-app: kafka
spec:
  selector:
    k8s-app: kafka
  ports:
  - port: 9092
    name: innerport
    targetPort: 9092
    protocol: TCP
  #- port: 9094
  #  name: outport 
  #  targetPort: 9094
  #  protocol: TCP
  - port: 30322
    name: outport 
    targetPort: 30322
    protocol: TCP
    nodePort: 30322
  type: NodePort

上面有几行前面加#注释的,如果服务只是在k8s内部调用的话,用9094端口即可,不用另外开nodeport。如果在k8s集群内部和外部请求,那么需要保证nodeport和kafka OUTSIDE的端口一致,

# cd kafka_2.11-2.3.0/bin/
# ./kafka-topics.sh --list --bootstrap-server kafka-com:30322
test
# ./kafka-console-producer.sh --broker-list kafka-com:30322 --topic test
>hello123
>345
>^C
# 
# ./kafka-console-consumer.sh --bootstrap-server kafka-com:30322 --topic test --from-beginning
^CProcessed a total of 0 messages
# ./kafka-console-consumer.sh --bootstrap-server kafka-com:30322 --topic test --from-beginning --partition 0
hello123
345
^CProcessed a total of 2 messages

需要注意的是kafka-console-consumer.sh 没有partition参数的话,不能获取到messages consumer-not-receiving-messages-kafka-console-new-consumer-api-kafka-0-9 ,需要添加 –partition 0参数,然而docker-compose方式不用,主机直接运行kafka没有kubernetes平台也不用此参数

kafka集群没什么特别的,一般用到StatefulSet,存储可以参考redis cluster

kafka集群在kubernetes中的配置 kubernetes中配置kafka集群