使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
#!/usr/bin/env python
#coding=utf-8
import sys
import time
import json
sys.path.append( '/usr/local/lib/python3.5/site-packages' )
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase1 import Hbase #调用hbase thrif1
from hbase1.ttypes import *
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import unittest
class HbaseOpreator:
def __init__( self ,host,port,table = 'test' ):
self .tableName = table
self .transport = TTransport.TBufferedTransport(TSocket.TSocket(host,port))
self .protocol = TBinaryProtocol.TBinaryProtocol( self .transport)
self .client = Hbase.Client( self .protocol)
self .transport. open ()
def __del__( self ):
self .transport.close()
def scanTablefilter( self ,table, * args):
d = dict ()
L = []
try :
tableName = table
# scan = Hbase.TScan(startRow, stopRow)
scan = TScan()
#主键首字母123
# filter = "PrefixFilter('123_')"
# filter = "RowFilter(=,'regexstring:.aaa')"
#过滤条件,当前为 statis_date 字段,值为20170223
# fitler = "SingleColumnValueFilter(tableName,'f','statis_date','20170223')"
# filter="SingleColumnValueFilter('f','statis_date',=,'binary:20170223') AND SingleColumnValueFilter('f','name',=,'binary:LXS')"
filter = "SingleColumnValueFilter('info','name',=,'binary:lilei') OR SingleColumnValueFilter('info','name',=,'binary:lily')"
scan.filterString = filter
id = self .client.scannerOpenWithScan(tableName,scan, None )
result = self .client.scannerGet( id )
# result=self.client.scannerGetList(id,100)
while result:
for r in result:
key = r.row
name = r.columns.get( 'info:name' ).value
age = r.columns.get( 'info:age' ).value
phone = r.columns.get( 'info:phone' ).value
d[ 'key' ] = key
d[ 'name' ] = name
d[ 'age' ] = age
d[ 'phone' ] = phone
# encode_result_json=json.dumps(d).encode(encoding="utf-8")
# print(encode_result_json)
L.append(d)
result = self .client.scannerGet( id )
return json.dumps(L).encode(encoding = "utf-8" )
finally :
# self.client.scannerClose(scan)
print ( "scan finish" )
def sendKfafkaProduct(data):
# self.host_port='localhost:9092'
producer = KafkaProducer(bootstrap_servers = [ 'localhost:9092' ])
for d in data:
producer.send( 'test' , key = b 'lxs' , value = d)
time.sleep( 5 )
print (d)
while True :
producer.send( 'test' , key = b 'lxs' , value = data)
time.sleep( 5 )
print (data)
if __name__ = = '__main__' :
# unittest.main()
B = HbaseOpreator( '10.27.1.138' , 9090 )
value = B.scanTablefilter( 'ns_lbi:test_hbase_student' )
print (value)
#sendKfafkaProduct(value)
|
以上这篇python hbase读取数据发送kafka的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/meiguopai1/article/details/70175069