借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.
这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.
zk_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
import threading
import json
import socket
import sys
from kazoo.client import KazooClient
# TCP服务端绑定端口开启监听,同时将自己注册到zk
class ZKServer( object ):
def __init__( self , host, port):
self .sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self .sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
self .host = host
self .port = port
self .sock.bind((host, port))
self .zk = None
def serve( self ):
"""
开始服务,每次获取得到一个信息,都新建一个线程处理
"""
self .sock.listen( 128 )
self .register_zk()
print ( "开始监听" )
while True :
conn, addr = self .sock.accept()
print ( "建立链接%s" % str (addr))
t = threading.Thread(target = self .handle, args = (conn, addr))
t.start()
# 具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束
def handle( self , conn, addr):
while True :
data = conn.recv( 1024 )
if not data or data.decode( 'utf-8' ) = = 'exit' :
break
print (data.decode( 'utf-8' ))
conn.close()
print ( 'My work is done!!!' )
# 将自己注册到zk,临时节点,所以连接不能中断
def register_zk( self ):
"""
注册到zookeeper
"""
self .zk = KazooClient(hosts = '127.0.0.1:2181' )
self .zk.start()
self .zk.ensure_path( '/rpc' ) # 创建根节点
value = json.dumps({ 'host' : self .host, 'port' : self .port})
# 创建服务子节点
self .zk.create( '/rpc/server' , value.encode(), ephemeral = True , sequence = True )
if __name__ = = '__main__' :
if len (sys.argv) < 3 :
print ( "usage:python server.py [host] [port]" )
exit( 1 )
host = sys.argv[ 1 ]
port = sys.argv[ 2 ]
server = ZKServer(host, int (port))
server.serve()
|
zk_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
import random
import sys
import time
import json
import socket
from kazoo.client import KazooClient
# 客户端连接zk,并从zk获取可用的服务器列表
class ZKClient( object ):
def __init__( self ):
self ._zk = KazooClient(hosts = '127.0.0.1:2181' )
self ._zk.start()
self ._get_servers()
def _get_servers( self , event = None ):
"""
从zookeeper获取服务器地址信息列表
"""
servers = self ._zk.get_children( '/rpc' , watch = self ._get_servers)
# print(servers)
self ._servers = []
for server in servers:
data = self ._zk.get( '/rpc/' + server)[ 0 ]
if data:
addr = json.loads(data.decode())
self ._servers.append(addr)
def _get_server( self ):
"""
随机选出一个可用的服务器
"""
return random.choice( self ._servers)
def get_connection( self ):
"""
提供一个可用的tcp连接
"""
sock = None
while True :
server = self ._get_server()
print ( 'server:%s' % server)
try :
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((server[ 'host' ], server[ 'port' ]))
except ConnectionRefusedError:
time.sleep( 1 )
continue
else :
break
return sock
if __name__ = = '__main__' :
# 模拟多个客户端批量生成任务,推送给服务器执行
client = ZKClient()
for i in range ( 40 ):
sock = client.get_connection()
sock.send(bytes( str (i), encoding = 'utf8' ))
sock.close()
time.sleep( 1 )
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/wangbin2188/p/13346079.html