1、es的批量插入
这是为了方便后期配置的更改,把配置信息放在logging.conf中
用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from elasticsearch import Elasticsearch
class ImportEsData:
logging.config.fileConfig( "logging.conf" )
logger = logging.getLogger( "msg" )
def __init__( self ,hosts,index, type ):
self .es = Elasticsearch(hosts = hosts.strip( ',' ).split( ',' ), timeout = 5000 )
self .index = index
self . type = type
def set_date( self ,data):
# 批量处理
# es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
self .es.index(index = self .index,doc_type = self .index,body = data)
|
2、使用pykafka消费kafka
1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现
2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition
3. 为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。
4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer 的外层加入了while True 一个死循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime
class KafkaPython:
logging.config.fileConfig( "logging.conf" )
logger = logging.getLogger( "msg" )
logger_data = logging.getLogger( "data" )
def __init__( self ):
self .server = ConfigUtil().get( "kafka" , "kafka_server" )
self .topic = ConfigUtil().get( "kafka" , "topic" )
self .group = ConfigUtil().get( "kafka" , "group" )
self .partition_id = int (ConfigUtil().get( "kafka" , "partition" ))
self .consumer_timeout_ms = int (ConfigUtil().get( "kafka" , "consumer_timeout_ms" ))
self .consumer = None
self .hosts = ConfigUtil().get( "es" , "hosts" )
self .index_name = ConfigUtil().get( "es" , "index_name" )
self .type_name = ConfigUtil().get( "es" , "type_name" )
def getConnect( self ):
client = KafkaClient( self .server)
topic = client.topics[ self .topic]
p = topic.partitions
ps = {p.get( self .partition_id)}
self .consumer = topic.get_simple_consumer(
consumer_group = self .group,
auto_commit_enable = True ,
consumer_timeout_ms = self .consumer_timeout_ms,
# num_consumer_fetchers=1,
# consumer_id='test1',
partitions = ps
)
self .starttime = datetime.datetime.now()
def beginConsumer( self ):
print ( "beginConsumer kafka-python" )
imprtEsData = ImportEsData( self .hosts, self .index_name, self .type_name)
#创建ACTIONS
count = 0
ACTIONS = []
while True :
endtime = datetime.datetime.now()
print (endtime - self .starttime).seconds
for message in self .consumer:
if message is not None :
try :
count = count + 1
# print(str(message.partition.id)+","+str(message.offset)+","+str(count))
# self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
action = {
"_index" : self .index_name,
"_type" : self .type_name,
"_source" : message.value
}
ACTIONS.append(action)
if len (ACTIONS) > = 10000 :
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self .consumer.commit_offsets()
endtime = datetime.datetime.now()
print (endtime - self .starttime).seconds
#break
except (Exception) as e:
# self.consumer.commit_offsets()
print (e)
self .logger.error(e)
self .logger.error( str (message.partition. id ) + "," + str (message.offset) + "," + message.value + "\n" )
# self.logger_data.error(message.value+"\n")
# self.consumer.commit_offsets()
if len (ACTIONS) > 0 :
self .logger.info( "等待时间超过,consumer_timeout_ms,把集合数据插入es" )
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self .consumer.commit_offsets()
def disConnect( self ):
self .consumer.close()
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ImportEsData:
logging.config.fileConfig( "logging.conf" )
logger = logging.getLogger( "msg" )
def __init__( self ,hosts,index, type ):
self .es = Elasticsearch(hosts = hosts.strip( ',' ).split( ',' ), timeout = 5000 )
self .index = index
self . type = type
def set_date( self ,data):
# 批量处理
success = bulk( self .es, data, index = self .index, raise_on_error = True )
self .logger.info(success)
|
3、运行
1
2
3
4
5
|
if __name__ = = '__main__' :
kp = KafkaPython()
kp.getConnect()
kp.beginConsumer()
# kp.disConnect()
|
注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到 es的插件
现在还在批量的压测中。。。
以上这篇python消费kafka数据批量插入到es的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/liagliang/article/details/78712475