摘要:
Socket编程
异常处理
线程、进程
1.socket编程
1.1 socket
三次握手,注意阻塞的应用。
1.2 socketserver(2.x写作:SocketServer)
实现多线程?
import socketserver class MyTCPHandler(socketserver.BaseRequestHandler): """ The request handler class for our server. It is instantiated once per connection to the server, and must override the handle() method to implement communication to the client. """ def handle(self): # self.request is the TCP socket connected to the client self.data = self.request.recv(1024).strip() print("{} wrote:".format(self.client_address[0])) print(self.data) # just send back the same data, but upper-cased self.request.sendall(self.data.upper()) if __name__ == "__main__": HOST, PORT = "localhost", 9999 # Create the server, binding to localhost on port 9999 server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) # Activate the server; this will keep running until you # interrupt the program with Ctrl-C server.serve_forever()
server端
import socket import sys HOST, PORT = "localhost", 9999 data = " ".join(sys.argv[1:]) # Create a socket (SOCK_STREAM means a TCP socket) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # Connect to server and send data sock.connect((HOST, PORT)) sock.sendall(bytes(data + "\n", "utf-8")) # Receive data from the server and shut down received = str(sock.recv(1024), "utf-8") finally: sock.close() print("Sent: {}".format(data)) print("Received: {}".format(received))
Client端
2.异常处理
2.1 异常
try: # 主代码块 pass except KeyError,e: # 异常时,执行该块 pass else: # 主代码块执行完,执行该块 pass finally: # 无论异常与否,最终执行该块 pass
2.2 自定义异常:
""" 自定义异常, 多用于业务类型异常,方便自己排查 """ # 定义一个自己的异常类 class qimiException(Exception): def __init__(self, msg): self.msg = msg def __str__(self): return self.msg try: # 抛异常 raise qimiException("qimi的异常") except qimiException as e: print(e)
3.断言
确保条件成立,使得后面的程序能够顺利进行下去
# 断言,用于确保条件成立,接下来的代码才能运行 def test(a): # 传入的参数a必须为int类型,才能进入到异常处理代码块,这里打印print一个start try...做标记 assert type(a) == int try: print("start try...") # 此处断言a == 1,如果不是就会抛出一个AssertionError assert a == 1 print("yes!") # 捕获到断言抛出异常的话,就打印a的值 except AssertionError: print("a = {}".format(a)) # 给test方法传一个非1的int test(2) ================ Cutting line ===================== output==>: start try... a = 2
4.线程
4.1 Python GIL(Global Interpreter Lock)
这篇文章透彻的剖析了GIL对python多线程的影响,强烈推荐看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf
首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
4.2 Python threading模块
线程调用的两种方式:直接调用和继承调用
""" 直接调用的多线程 """ import threading import time # 定义一个每个线程要运行的函数 def sayhi(num): print("running number:{}".format(num)) time.sleep(3) if __name__ == "__main__": # target:任务名;arg:任务要传的参数 t1 = threading.Thread(target=sayhi, args=[1, ]) # 生成一个线程实例t1 t2 = threading.Thread(target=sayhi, args=[2, ]) # 生成一个线程实例t2 t1.start() # 启动线程t1 t2.start() # 启动线程t2 print("========main===========") # 主线程的print() print(t1.getName()) print(t2.getName()) =============== Cutting line ====================== output==>: running number:1 running number:2 ========main=========== Thread-1 Thread-2
import threading import time # 定义一个线程类 class MyThread(threading.Thread): def __init__(self, num): super(MyThread, self).__init__() self.num = num def run(self): print("Running num:{}".format(self.num)) time.sleep(3) if __name__ == "__main__": t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start() print("=======main==========") print(t1.getName()) print(t2.getName()) ================= Cutting line ==================== output==>: Running num:1 Running num:2 =======main========== Thread-1 Thread-2
4.2.1 Join&Daemon
import time import threading # 定义一个所有线程都要运行的函数 def run(n): print("=========>{} running...".format(n)) time.sleep(2) print("{} done...<=========".format(n)) def main(): for i in range(5): t = threading.Thread(target=run, args=[i, ]) t.start() t.join() print("Start thread:{}".format(t.getName())) m = threading.Thread(target=main) m.setDaemon(True) # 将主线程设置为Daemon线程,它退出时,其它子线程会同时退出,不管是否执行完任务 m.start() m.join(2) print("=======main thread done========") ================ Cutting line ====================== output==>: =========>0 running... 0 done...<========= Start thread:Thread-2 =========>1 running... =======main thread done========
""" join、Daemon 守护线程结束时会自动关闭子线程 join被设置timeout参数时会等待指定的时间,否则就等待到程序完成 """ import time import threading # 定义一个所有线程都要运行的函数 def run(n): print("=========>{} running...".format(n)) time.sleep(2) print("{} done...<=========".format(n)) def main(): for i in range(5): t = threading.Thread(target=run, args=[i, ]) t.start() t.join() print("Start thread:{}".format(t.getName())) m = threading.Thread(target=main) m.setDaemon(True) # 将主线程设置为Daemon线程,它退出时,其它子线程会同时退出,不管是否执行完任务 m.start() m.join() print("=======main thread done========") ================ Cutting line =================== output==>: =========>0 running... 0 done...<========= Start thread:Thread-2 =========>1 running... 1 done...<========= Start thread:Thread-3 =========>2 running... 2 done...<========= Start thread:Thread-4 =========>3 running... 3 done...<========= Start thread:Thread-5 =========>4 running... 4 done...<========= Start thread:Thread-6 =======main thread done========
4.2.2 Lock&Rlock
没加锁版本:
""" 没有加锁的多线程处理同一个数据 """ import threading import time def minus_num(): global num print("get the num:", num) time.sleep(1) num -= 1 print("The result:", num) num = 100 lock = threading.Lock() thread_list = [] for i in range(100): t = threading.Thread(target=minus_num) t.start() thread_list.append(t) for j in thread_list: j.join() print("The result=>:", num)
加锁的版本:
""" 线程锁,区别于GIL """ import threading import time def minus_num(): global num # 因为在函数里面要对num进行修改,所以必须声明num为全局变量 # 获取线程锁 lock.acquire() # 因为在函数里只是对lock进行访问,不涉及到修改,所以无需在开头声明lock为全局变量 print("get num=>{}".format(num)) time.sleep(1) num -= 1 print("the result=>{}".format(num)) # 释放线程锁 lock.release() lock = threading.Lock() # 生成一个线程锁实例(全局锁) num = 100 # 设置一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=minus_num) t.start() thread_list.append(t) # 等到所有的子线程都结束 for j in thread_list: j.join() print("final num:{}".format(num))
递归锁:
""" RLock,递归锁,也就是大锁里面包含小锁 """ import threading def run1(): # 小锁1 lock.acquire() global num1 num1 += 1 lock.release() return num1 def run2(): # 小锁2 lock.acquire() global num2 num2 += 1 lock.release() return num2 def run(): # 大锁 lock.acquire() result1 = run1() print("===-between run1 and run2-===") result2 = run2() print("result1:{}, result2:{}".format(result1, result2)) lock.release() if __name__ == "__main__": num1, num2 = 0, 0 lock = threading.RLock() for i in range(3): t = threading.Thread(target=run) t.start() t.join() while threading.active_count() != 1: print(threading.active_count()) else: print("======-all threading done!-======") print("num1:{},num2:{}".format(num1, num2)) output==>: ===-between run1 and run2-=== result1:1, result2:1 ===-between run1 and run2-=== result1:2, result2:2 ===-between run1 and run2-=== result1:3, result2:3 ======-all threading done!-====== num1:3,num2:3
无论是lock还是rlock,提供的方法都非常简单,acquire和release。但是rlock和lock的区别是什么呢?RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。
4.2.3 Semaphore
""" 信号量 互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据。 如果线程数超过了定义的值,就要等待一个线程结束之后,再进行下一个。 """ import threading import time def run(n): semaphore.acquire() time.sleep(1) print("run the thread:{}\n".format(n)) semaphore.release() if __name__ == "__main__": num = 0 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时启动 for i in range(20): t = threading.Thread(target=run, args=(i,)) t.start() while threading.active_count() != 1: # print("there are {} threads.".format(threading.active_count())) pass else: print("=====-all threads done-=====") print(num)
4.2.4 Events
An event is a simple synchronization object;
the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set
event.wait()
# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.
① event.wait(timeout) 当Flag为‘False’时,线程将被阻塞
② clear 将“Flag”设置为False
③ set 将“Flag”设置为True
④ is_set 返回当前‘Flag’
通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
""" event红绿灯的例子 """ import threading import time # 定义信号灯功能 def light(): """ 10秒绿灯,3秒黄灯;7秒红灯 :return: """ if not event.isSet(): event.set() count = 0 while True: if count < 10: print("\033[42;1m== green light on ==\033[0m") elif count < 13: print("\033[43;1m== yellow light on ==\033[0m") elif count < 20: if event.isSet(): event.clear() print("\033[41;1m== red light on ==\033[0m") else: count = 0 event.set() # 打开绿灯 time.sleep(1) count += 1 # 定义汽车功能 def car(n): # no bug version """ 绿灯行,红灯停 :param n: 汽车的编号 :return: """ while True: time.sleep(1.1) if event.isSet(): print("car {} is running...".format(n)) else: print("car {} is waiting for the red light...".format(n)) event.wait() if __name__ == "__main__": event = threading.Event() # 起一个线程模拟红绿灯 Light = threading.Thread(target=light) Light.start() # 起三个线程模拟汽车 for i in range(3): Car = threading.Thread(target=car, args=(i,)) Car.start()
4.2.5 queue
4.2.6 生产者消费者模型
Condition类:条件变量对象能让一个线程停下来,等待其它线程满足了某个“条件”。如,状态的改变或值的改变。
① acquire 给线程上锁
② wait wait方法释放当前线程占用的锁,同时挂起线程,直至被唤醒或超时(需timeout参数)。当线程被唤醒并重新占有锁的时候,程序才会继续执行下去。
③ notify 唤醒一个挂起的线程(如果存在挂起的线程)。注:notify()方法不会释放所占用的锁。
④ notifyall 调用这个方法将通知等待池中所有线程,这些线程都将进入锁定池尝试获得锁定。此方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
""" 生产者消费者模型 代码中写了两个类,Consumer和Producer,分别继承了Thread类,我们分别初始化这两个类获得了c和p对象,并启动这两个线程。 则这两个线程去执行run方法(这里与Thread类内部的调度有关),定义了producer全局变量和condition对象为全局变量, 当producer不大于1时,消费者线程被condition对象阻塞,不能继续消费(这里是不再递减), 当producer不小于10时,生产者线程被condition对象阻塞,不再生产(这里是不再累加)。 """ import threading import time condition = threading.Condition() products = 0 class Producer(threading.Thread): def __init__(self): super(Producer, self).__init__() def run(self): global condition, products while True: if condition.acquire(): if products < 10: products += 1 print("Producer:{} deliver one, now products:{}.".format(self.name, products)) condition.notify() # 唤醒一个挂起的线程,这里就是喊人来消费 else: print("Producer:{} already 10, stop deliver, now products:{}.".format(self.name, products)) condition.wait() # 释放线程锁,并挂起线程。 condition.release() time.sleep(2) class Consumer(threading.Thread): def __init__(self): super(Consumer, self).__init__() def run(self): global condition, products while True: if condition.acquire(): if products > 1: products -= 1 print("Consumer:{} consume one, now products:{}".format(self.name, products)) condition.notify() # 唤醒一个挂起的线程。 else: print("Cousumer:{} only one, stop consume, now products:{}".format(self.name, products)) condition.wait() # 释放线程锁,并挂起线程。 condition.release() time.sleep(2) if __name__ == "__main__": for p in range(2): p = Producer() p.start() for c in range(10): c = Consumer() c.start()
5.进程
5.1 multiprocessing
5.2 进程间通信
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
5.2.1 Queue
""" 多进程的Queue """ from multiprocessing import Process, Queue def func(q): q.put(["alex", 18, "Girls"]) if __name__ == "__main__": que = Queue() p1 = Process(target=func, args=(que,)) p2 = Process(target=func, args=(que,)) p1.start() p2.start() print("from parent p1:", que.get()) print("from parent p2:", que.get()) p1.join() p2.join()
5.2.2 Pipes
""" 进程间通信的一种方法:pipes """ from multiprocessing import Process, Pipe def func(conn): conn.send(["alex", 18, "Girls"]) if __name__ == "__main__": # 父连接发,子连接收 parent_conn, child_conn = Pipe() p1 = Process(target=func, args=(parent_conn,)) p2 = Process(target=func, args=(parent_conn,)) p1.start() p2.start() print("from parent p1:", child_conn.recv()) print("from parent p2:", child_conn.recv()) p1.join() p2.join()
The two connection objects returned by Pipe()
represent the two ends of the pipe. Each connection object has send()
and recv()
methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
5.2.3 Managers
""" Manager: """ from multiprocessing import Process, Manager def f(d, l, n): d[n] = n d['] = 2 d[0.25] = None l.append(n) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l, i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
5.3 进程同步
""" 进程同步:加锁 """ from multiprocessing import Process, Lock def func(l, i): l.acquire() try: print("Hello world:", i) finally: l.release() if __name__ == "__main__": lock = Lock() for num in range(10): Process(target=func, args=(lock, num)).start()
5.4 进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
① apply:同步
② apply_async:异步
apply_async表示异步,就是子进程接收到请求之后就各自去执行了;而apply表示同步,子进程们将一个一个的执行,后一个子进程的执行永远以前一个子进程的结束为开始执行的信号。
""" 进程池 """ from multiprocessing import Pool, freeze_support import time def func1(i): time.sleep(1) return i + 100 def func2(arg): print("==> exec done:", arg) if __name__ == "__main__": freeze_support() # 定义一个数量为5的进程池 pool = Pool(5) for i in range(10): # 异步,将func1的返回值传给func2 pool.apply_async(func1, args=(i,), callback=func2) # 同步,没有callback时 # result = pool.apply(func=func1, args=(i,)) print("=====-end-=====") pool.close() pool.join() # 进程池中进程执行完毕后再关闭,如果不写,那么程序直接关闭 output==>: 五个一组打印结果