简单的socket项目:
client端:
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket obj = socket.socket() obj.connect(("127.0.0.1", 9999,)) # recv 也是阻塞的 ret_bytes = obj.recv(1024) ret_str = str(ret_bytes, encoding="utf-8") print(ret_str) while True: inp = input("请输入要发送的内容:") if inp == "q": obj.sendall(bytes(inp, encoding="utf-8")) break else: obj.sendall(bytes(inp, encoding="utf-8")) ret = str(obj.recv(1024), encoding="utf-8") print(ret) obj.close()
server端:
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket sk = socket.socket() # 绑定ip和端口 sk.bind(("127.0.0.1", 9999,)) # 开启监听 sk.listen(5) # 接收客户端的请求 while True: conn, address = sk.accept() # accept阻塞 conn.sendall(bytes("欢迎郭屌毛访问!", encoding="utf-8")) while True: # 接收客户端发过来的消息,限制1024个字节 ret_bytes = conn.recv(1024) ret_str = str(ret_bytes, encoding="utf-8") if ret_str == "q": break # 给客户端发送消息 conn.sendall(bytes(ret_str + "你好", encoding="utf-8"))
为解决socket的并发问题,使用socketserver:
#!/usr/bin/env python # -*- coding:utf-8 -*- # 解决socket并发问题,使用socketserver import socketserver # 定义一个class,必须继承 socketserver.BaseRequestHandler 类 class MyServer(socketserver.BaseRequestHandler): # 重写handle方法 def handle(self): print(self.client_address) print(self.server) print(self.request) conn = self.request conn.sendall(bytes("欢迎访问xxx系统!", encoding="utf-8")) while True: ret_bytes = conn.recv(1024) ret_str = str(ret_bytes, encoding="utf-8") if ret_str == "q": break conn.sendall(bytes(ret_str + " 你好!", encoding="utf-8")) if __name__ == '__main__': # 使用刚才的类创建server server = socketserver.ThreadingTCPServer(("127.0.0.1", 9999), MyServer) # serve_forever 等价于 while True,使server一直阻塞,等待连接 server.serve_forever()
socketserver源码分析:
#!/usr/bin/env python # -*- coding:utf-8 -*- import socketserver class MyServer(socketserver.BaseRequestHandler): def handle(self): self.request if __name__ == '__main__': # socket + select + 多线程 # ip和端口,类名 # MyServer == RequestHandlerClass # ThreadingTCPServer.init() => TCPServer.init() => BaseServer.init() # server 对象: # self.server_address == ("127.0.0.1", 9999) # self.RequestHandlerClass == MyServer # self.socket = 创建的socket对象 server = socketserver.ThreadingTCPServer(("127.0.0.1", 9999), MyServer) # server对象的serve_forever(), 在 BaseServer 中找到serve_forever() # --------执行流程如下--------- # BaseServer.serve_forever() => BaseServer._handle_request_noblock() => ThreadingMixIn.process_request() # => ThreadingMixIn.process_request_thread() => BaseServer.finish_request() # => self.RequestHandlerClass(request, client_address, self) 等价于 MyServer() 执行 BaseRequestHandler.init()方法 server.serve_forever()
IO多路复用:
#!/usr/bin/env python # -*- coding:utf-8 -*- import socket import select # IO多路复用: 可以监听多个 文件描述符(socket对象)(文件句柄),一旦文件句柄出现变化,即可感知 sk1 = socket.socket() sk1.bind(("127.0.0.1", 8001)) sk1.listen(5) # sk2 = socket.socket() # sk2.bind(("127.0.0.1", 8002)) # sk2.listen(5) # # sk3 = socket.socket() # sk3.bind(("127.0.0.1", 8003)) # sk3.listen(5) print("sk1 ", sk1) inputs = [sk1, ] outputs = [] message_dict = {} while True: # select 内部自动监听sk1,sk2,sk3三个对象,一旦某个句柄发生变化 # 解释: # select内部自动监听sk1,sk2,sk3三个对象,监听三个句柄是否发生变化,把发生变化的元素放入r_list中。 # 如果有人连接sk1,则r_list = [sk1] # 如果有人连接sk1和sk2,则r_list = [sk1,sk2] # select中第1个参数表示inputs中发生变化的句柄放入r_list。 # select中第2个参数表示[]中的值原封不动的传递给w_list。 # select中第3个参数表示inputs中发生错误的句柄放入e_list。 # 参数1表示1秒监听一次 # 如果有人第一次来连接,sk1发送变化 # r_list = [sk1] r_list, w_list, e_list = select.select(inputs, outputs, inputs, 1) print("正在监听的socket对象:%d" % len(inputs)) print(r_list) for sk_or_conn in r_list: # 每一个连接对象 if sk_or_conn == sk1: # 表示有新用户来连接了 conn, address = sk_or_conn.accept() # inputs = [sk1, conn, ....] inputs.append(conn) # message_dict = {"conn1":[], "conn2":[], ...} message_dict[conn] = [] else: # 有老用户发消息了 try: # 当用户连接中断的时候,data_bytes 为空 == 此情况适用于2.7版本 data_bytes = sk_or_conn.recv(1024) except Exception as e: # 如果用户中断连接 print(e) inputs.remove(sk_or_conn) else: # 用户正常发送消息 data_str = str(data_bytes, encoding="utf-8") # sk_or_conn.sendall(bytes(res + " 你好!", encoding="utf-8")) message_dict[sk_or_conn].append(data_str) outputs.append(sk_or_conn) # w_list仅仅保存了谁给我发过消息 for conn in w_list: # 这里可以优化,使用queue优化,后面再说 recv_str = message_dict[conn][0] del message_dict[conn][0] conn.sendall(bytes(recv_str + " 你好!", encoding="utf-8")) # 发完消息后删除socket对象 outputs.remove(conn) for sk_or_conn in e_list: inputs.remove(sk_or_conn) # while True: # conn, address = sk.accept() # while True: # content_bytes = conn.recv(1024) # content_str = bytes(content_bytes, encoding="utf-8") # conn.sendall(bytes(content_str + "好", encoding="utf-8")) # conn.close() # 一、概念 # 异步:某个事情需要10s完成。而我只需要调用某个函数告诉xxx来帮我做(然后我再干其他的事情) # 同步:某个事情需要10s完成,我需要一直等它完成(等10s),再能继续后面的工作。 # 阻塞:做某件事情,直到完成,除非超时 # 非阻塞:尝试做,如果不能做,就不做(直接返回),如果能做,就做。 # 前两者和后两者不容易区分,不过前两者更多的有涉及到多线程交互(消息)的场景。 # 二、举个例子 # 小李喝了想喝水,于是去煮开水。 # 1、小李把水壶放到炉子上,等待水烧开。(同步阻塞) # 小李感觉这样太费时间。 # 2、小李把水壶放到炉子上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞) # 小李还是觉得自己这样太累,于是买了把会响笛的那种水壶。水开之后,能发出声音。 # 3、小李把响水壶放到炉子上,等待水壶发出声音。(异步阻塞) # 觉得这样傻等意义不大 # 5、小李把响水壶放到炉子上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞) # 这样真好。