系列博文
- 在Linux上使用Kubeadm工具部署Kubernetes
- 带你玩转Docker
- Kubernetes集群之搭建ETCD集群
- Kubernetes集群之Dashboard
- Kubernetes集群之Monitoring
- Kubernetes集群之Logging
- Kubernetes集群之Ingress
- Kubernetes集群之Redis Sentinel集群
- Kubernetes集群之Kafka和ZooKeeper
Github
镜像和Yaml文件
可以在这里进行下载tar包和yaml文件
当前使用的版本:
- kafka 2.11-0.10.1.1
- zookeeper 3.4.9
分布部署
创建命名空间
kubectl create -f namespace.yaml
创建PV和PVC
kubectl create -f pv.yaml
kubectl create -f pvc.yaml
先pv后pvc,通过kubectl get pv
可以看见三个pv已被Bound。
注意:
-
pv路径在
/tmp/kafka-data/
下。当服务器重启后,/tmp
文件夹会被清空; -
pv与pvc的storeage需要注意;
- 使用的是hostPath,可以改用为NFS。官方告知
single node testing only – local storage is not supported in any way and WILL NOT WORK in a multi-node cluster
- 需要对kafka的类型
StatefulSet
熟悉
搭建ZooKeeper
kubectl create -f zookeeper/
注意:
terminationGracePeriodSeconds:60
, 这个参数可以优雅的每隔60s开启一个容器
搭建Kafka
kubectl create -f zookeeper/
创建topics
kubectl create -f createTopics/topic-create.yaml
其中使用的类型是Job,即一次性运行成功即可。
在pv的数据存储路径下/tmp/kafka-data/
下面可以看到创建的topics。
但是当你失去整个zookeeper集群时,kafka集群将不知晓已存在的topic,即使数据仍然存在,但是还需再次创建topics
python连接Kafka/ZP
Dockerfile
没玩过Docker,来看Easy With Docker吧!
代码下载:Dockerfile
FROM ubuntu:14.04
MAINTAINER chenjian "[email protected]"
# Set the locale
RUN locale-gen zh_CN.UTF-8
ENV LANG zh_CN.UTF-8
ENV LANGUAGE zh_CN:zh
ENV LC_ALL zh_CN.UTF-8
# set timezone
RUN echo "Asia/Shanghai" > /etc/timezone
RUN dpkg-reconfigure -f noninteractive tzdata
# 换源
ADD sources.list /
RUN cp /etc/apt/sources.list /etc/apt/sources.list_backup
RUN rm -rf /etc/apt/sources.list
ADD sources.list /etc/apt/sources.list
############################################
RUN apt-get clean
RUN apt-get update
RUN apt-get -y upgrade
RUN apt-get install -y python-dev
RUN apt-get install -y build-essential
RUN apt-get install -y python-pip
ADD librdkafka /librdkafka
WORKDIR /librdkafka
RUN ./configure
RUN make
RUN make install
RUN ldconfig
RUN pip install confluent-kafka
RUN apt-get clean
WORKDIR /
ADD confluentkafka.py /
ADD whilerun.py /
whilerun.py
代码下载:whilerun.py
为保证容器后台有一直运行的程序
# -*- coding:utf8 -*-
"""
@author: [email protected]
@date: Thu, Apr 20 2017
@time: 15:20:08 GMT+8
"""
import time
from datetime import datetime
def main():
while 1:
print datetime.now()
time.sleep(60)
if __name__ == '__main__':
main()
zpkafka.yaml
代码下载:zpkafka.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: zpkafka
namespace: kafka
spec:
replicas: 1
template:
metadata:
labels:
app: zpkafka
spec:
containers:
- name: zpkafka
image: zpkafka:test
command:
- "python"
- "whilerun.py"
- 注意namespace要和kafka相同
confluentkafka.py
代码下载:confluentkafka.py
# -*- coding:utf8 -*-
"""
Using confluent-kafka
@author: [email protected]
@date: Wed, Nov 23
@time: 11:39:30 GMT+8
"""
from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaError, KafkaException
class TestConfluentKafka(object):
def __init__(self):
self.broker = 'kafka:9092'
self.group_id = 'vul_test'
self.topic_con = ['vul_test']
self.topic_pro = 'vul_test'
def test_producer(self):
""" 消息生产者
"""
conf = {'bootstrap.servers': self.broker}
p = Producer(**conf)
some_data_source = [
"chennnnnnnnnnnnnnnnnnnnnn",
"jiansssssssssssssssssss",
"hellossssssssssssssss",
"dddddddddddddddddddddddd"]
for data in some_data_source:
p.produce(self.topic_pro, data.encode('utf-8'))
p.flush()
def test_consumer(self):
""" 消息消费者
"""
conf = {'bootstrap.servers': self.broker,
'group.id': self.group_id,
'default.topic.config': {'auto.offset.reset': 'smallest'}}
c = Consumer(**conf)
c.subscribe(self.topic_con)
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print msg.topic(), msg.partition(), msg.offset()
elif msg.error():
raise KafkaException(msg.error())
else:
print '%% %s [%d] at offset %d with key %s:\n' \
% (msg.topic(),
msg.partition(),
msg.offset(),
str(msg.key()))
print msg.value()
except KeyboardInterrupt:
print '%% Aborted by user\n'
finally:
c.close()
if __name__ == '__main__':
#TestConfluentKafka().test_producer()
TestConfluentKafka().test_consumer()
- 注意:
- 采用第三方库confluent-kafka,可以尝试换用其他库
- 在配置里面的topics,要和之前插入的topics相同。这个建议改用环境变量形式,在yaml中加入env即可
- 方法test_producer()是信息生产者,方法test_consumer()是消息消费者
nodejs连接Kafka/ZP
代码下载:connectKafka.js
/**
* Created by jianchan on 21/04/2017.
*/
var kafka = require('kafka-node');
var util = require('util');
var moment = require('moment');
var params = {'zookeeper_connec': 'zookeeper:2181'};
var topics = {'abc': 'abc_test'};
var groupId = {'abc': 'abc_test'};
var Client = kafka.Client;
var KeyedMessage = kafka.KeyedMessage;
var HighLevelProducer = kafka.HighLevelProducer;
var HighLevelConsumer = kafka.HighLevelConsumer;
var client = new Client(params.zookeeper_connec);
// 消息生产者
var producer = new HighLevelProducer(client);
var data = {
"data": "dddddddxxxxxx"
};
producer.on('ready', function () {
var timeSpan = Date.now();
var sendData = JSON.stringify(data.data);
send(topics.abc, timeSpan, sendData);
});
producer.on('error', function (err) {
console.log(err);
});
function send(topic, key, value) {
if (!util.isString(key)) {
key = key.toString();
}
var keyedMessage = new KeyedMessage(key, value);
producer.send([{topic: topic, messages: [keyedMessage]}],
function (err, data) {
if (err) {
console.log(err);
}
log(key, value);
console.log("=====================================");
});
}
function log(key, value) {
console.log('send message to kafka:--datetime: %s--key: %s--value: %s',
moment().format('YYYY-MM-DD HH:mm:ss'),
key,
value);
}
// 消息消费者
var consumer = new HighLevelConsumer(
client,
[{topic: topics.abc}],
{groupId: groupId.abc}
);
consumer.on('message', function (message) {
console.log(message);
});
参考博文
- 工作日志——k8s_pv/pvc
- 工作日志——k8s_pv/pvc二
- Graceful shutdown of pods with Kubernetes
- Kubernetes 1.5配置Job
- kafka-node
本作品由陈健采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。