Kubernetes集群之Kafka和ZooKeeper

Deploy Kafka And ZP With K8s

Posted by ChenJian on April 11, 2017

系列博文

Github

依据Yolean/kubernetes-kafka

镜像和Yaml文件

可以在这里进行下载tar包和yaml文件

当前使用的版本:

  • kafka 2.11-0.10.1.1
  • zookeeper 3.4.9

分布部署

创建命名空间

kubectl create -f namespace.yaml

创建PV和PVC
kubectl create -f pv.yaml
kubectl create -f pvc.yaml

先pv后pvc,通过kubectl get pv可以看见三个pv已被Bound

注意:

  • pv路径在/tmp/kafka-data/下。当服务器重启后,/tmp文件夹会被清空;

  • pv与pvc的storeage需要注意;

  • 使用的是hostPath,可以改用为NFS。官方告知
    • single node testing only – local storage is not supported in any way and WILL NOT WORK in a multi-node cluster
  • 需要对kafka的类型StatefulSet熟悉
搭建ZooKeeper
kubectl create -f zookeeper/

注意:

terminationGracePeriodSeconds:60, 这个参数可以优雅的每隔60s开启一个容器

搭建Kafka
kubectl create -f zookeeper/
创建topics
kubectl create -f createTopics/topic-create.yaml

其中使用的类型是Job,即一次性运行成功即可。

在pv的数据存储路径下/tmp/kafka-data/下面可以看到创建的topics。

但是当你失去整个zookeeper集群时,kafka集群将不知晓已存在的topic,即使数据仍然存在,但是还需再次创建topics

python连接Kafka/ZP

Dockerfile

没玩过Docker,来看Easy With Docker吧!

代码下载:Dockerfile

FROM ubuntu:14.04
MAINTAINER chenjian "[email protected]"

# Set the locale
RUN locale-gen zh_CN.UTF-8
ENV LANG zh_CN.UTF-8
ENV LANGUAGE zh_CN:zh
ENV LC_ALL zh_CN.UTF-8

# set timezone
RUN echo "Asia/Shanghai" > /etc/timezone
RUN dpkg-reconfigure -f noninteractive tzdata

# 换源
ADD sources.list /
RUN cp /etc/apt/sources.list /etc/apt/sources.list_backup
RUN rm -rf /etc/apt/sources.list
ADD sources.list /etc/apt/sources.list
############################################

RUN apt-get clean
RUN apt-get update
RUN apt-get -y upgrade

RUN apt-get install -y python-dev
RUN apt-get install -y build-essential
RUN apt-get install -y python-pip

ADD librdkafka /librdkafka
WORKDIR /librdkafka
RUN ./configure
RUN make
RUN make install
RUN ldconfig

RUN pip install confluent-kafka

RUN apt-get clean

WORKDIR /
ADD confluentkafka.py /
ADD whilerun.py /
whilerun.py

代码下载:whilerun.py

为保证容器后台有一直运行的程序

# -*- coding:utf8 -*-

"""
@author: [email protected]

@date: Thu, Apr 20 2017

@time: 15:20:08 GMT+8
"""
import time
from datetime import datetime


def main():
    while 1:
        print datetime.now()
        time.sleep(60)


if __name__ == '__main__':
    main()
zpkafka.yaml

代码下载:zpkafka.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: zpkafka
  namespace: kafka
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: zpkafka
    spec:
      containers:
        - name: zpkafka
          image: zpkafka:test
          command:
            - "python"
            - "whilerun.py" 
  • 注意namespace要和kafka相同
confluentkafka.py

代码下载:confluentkafka.py

# -*- coding:utf8 -*-

"""
Using confluent-kafka

@author: [email protected]

@date: Wed, Nov 23

@time: 11:39:30 GMT+8
"""

from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaError, KafkaException


class TestConfluentKafka(object):
    def __init__(self):
        self.broker = 'kafka:9092'
        self.group_id = 'vul_test'
        self.topic_con = ['vul_test']
        self.topic_pro = 'vul_test'

    def test_producer(self):
        """ 消息生产者
        
        """
        conf = {'bootstrap.servers': self.broker}

        p = Producer(**conf)
        some_data_source = [
            "chennnnnnnnnnnnnnnnnnnnnn",
            "jiansssssssssssssssssss",
            "hellossssssssssssssss",
            "dddddddddddddddddddddddd"]
        for data in some_data_source:
            p.produce(self.topic_pro, data.encode('utf-8'))

        p.flush()

    def test_consumer(self):
        """ 消息消费者
        
        """
        conf = {'bootstrap.servers': self.broker,
                'group.id': self.group_id,
                'default.topic.config': {'auto.offset.reset': 'smallest'}}

        c = Consumer(**conf)
        c.subscribe(self.topic_con)

        try:
            while True:
                msg = c.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        print msg.topic(), msg.partition(), msg.offset()
                    elif msg.error():
                        raise KafkaException(msg.error())
                else:
                    print '%% %s [%d] at offset %d with key %s:\n' \
                          % (msg.topic(),
                             msg.partition(),
                             msg.offset(),
                             str(msg.key()))
                    print msg.value()
        except KeyboardInterrupt:
            print '%% Aborted by user\n'

        finally:
            c.close()


if __name__ == '__main__':
    #TestConfluentKafka().test_producer()
    TestConfluentKafka().test_consumer()
  • 注意:
    • 采用第三方库confluent-kafka,可以尝试换用其他库
    • 在配置里面的topics,要和之前插入的topics相同。这个建议改用环境变量形式,在yaml中加入env即可
    • 方法test_producer()是信息生产者,方法test_consumer()是消息消费者

nodejs连接Kafka/ZP

代码下载:connectKafka.js

/**
 * Created by jianchan on 21/04/2017.
 */

var kafka = require('kafka-node');
var util = require('util');
var moment = require('moment');

var params = {'zookeeper_connec': 'zookeeper:2181'};
var topics = {'abc': 'abc_test'};
var groupId = {'abc': 'abc_test'};


var Client = kafka.Client;
var KeyedMessage = kafka.KeyedMessage;
var HighLevelProducer = kafka.HighLevelProducer;
var HighLevelConsumer = kafka.HighLevelConsumer;

var client = new Client(params.zookeeper_connec);

// 消息生产者
var producer = new HighLevelProducer(client);
var data = {
    "data": "dddddddxxxxxx"
};

producer.on('ready', function () {
    var timeSpan = Date.now();
    var sendData = JSON.stringify(data.data);
    send(topics.abc, timeSpan, sendData);
});

producer.on('error', function (err) {
    console.log(err);
});

function send(topic, key, value) {
    if (!util.isString(key)) {
        key = key.toString();
    }
    var keyedMessage = new KeyedMessage(key, value);
    producer.send([{topic: topic, messages: [keyedMessage]}],
        function (err, data) {
            if (err) {
                console.log(err);
            }
            log(key, value);
            console.log("=====================================");
        });
}

function log(key, value) {
    console.log('send message to kafka:--datetime: %s--key: %s--value: %s',
        moment().format('YYYY-MM-DD HH:mm:ss'),
        key,
        value);
}

// 消息消费者
var consumer = new HighLevelConsumer(
    client, 
    [{topic: topics.abc}], 
    {groupId: groupId.abc}
);
consumer.on('message', function (message) {
    console.log(message);
});

参考博文

  1. 工作日志——k8s_pv/pvc
  2. 工作日志——k8s_pv/pvc二
  3. Graceful shutdown of pods with Kubernetes
  4. Kubernetes 1.5配置Job
  5. kafka-node

知识共享许可协议本作品由陈健采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。