主要内容 :
1 . 线程
a : 线程的概念
1 ) 线程由 代码段, 数据段, tcb(thread control block)组成(进程由 代码段, 数据段, pcb(process control block)组成)
2 ) 进程被称为轻量级的进程. GIL(全局解释锁)只有cpython解释器才有.对于线程来说, 因为有了gil,所以并没有真正的并行.
3 ) 计算机的执行单位以线程为单位. 进程中必须至少应该有一个线程.
4 ) 进程是资源分配的基本单位.线程是可执行的基本单位, 是可被调度的基本单位.
5 ) 进程不可以自己独立拥有资源.线程的执行, 必须依赖于所属进程中的资源.
b : 进程与线程的比较
(1) cpu切换进程要比cpu切换线程 慢很多
在python中,如果IO操作过多的话,使用多线程最好了
from threading import Thread from multiprocessing import Process import time def func(): pass if __name__ == '__main__': start = time.time() for i in range(100): t = Thread(target=func, args=()) t.start() print('开启100个线程所需的时间:', time.time() - start) #开启100个线程所需的时间: 0.015957355499267578 start = time.time() for i in range(100): p = Process(target=func,args=()) p.start() print('开启100个进程所需的时间:', time.time() - start) #开启100个进程所需的时间: 0.304187536239624
(2) 在同一个进程内,所有线程共享这个进程的pid,也就是说所有线程共享所属进程的所有资源和内存地址
from threading import Thread import time import os def func(name): print('我是%s,我的pid为%s' % (name, os.getpid())) #我是线程,我的pid为5696 if __name__ == '__main__': t = Thread(target=func, args=('线程',)) t.start() print('主进程的pid为:', os.getpid()) #主进程的pid为: 5696
(3) 在同一个进程内,所有线程共享该进程中的全局变量
from multiprocessing import Process
from threading import Thread,Lock
import time,os
def func():
global num
tmp = num
time.sleep(0.05) #由于gil锁的存在, 停0.05秒后, 程序执行第一个线程, 载停留, 再执行下一个线程,等执行完所有的线程,拿到的num值为100
# 回到第一个线程执行时, num = 99 , 第二个也是99, 都是99
num = tmp - 1
if __name__ == '__main__':
num = 100
t_l = []
for i in range(100):
t = Thread(target=func)
t.start()
t_l.append(t)
# time.sleep(1)
[t.join() for t in t_l]
print(num)
线程会被强迫放弃cpu的原因, 首先线程会受时间片的影响, 其次gil会限制每个线程的执行时间, 一般是5ms左右, 或者限制线程执行固定数量的bytecode.
gil锁限制在同一时间只能有一个线程在使用(不论是单核还是双核.)
(4) 因为有GIL锁的存在,在Cpython中,没有真正的线程并行。但是有真正的多进程并行当你的任务是计算密集的情况下,使用多进程好
总结:在CPython中,IO密集用多线程,计算密集用多进程
(5)关于守护线程和守护进程的事情(注意:代码执行结束并不代表程序结束)
守护进程:要么自己正常结束,要么根据父进程的代码执行结束而结束 守护线程:要么自己正常结束,要么根据父线程的执行结束而结束
from multiprocessing import Process from threading import Thread import time def func(): time.sleep(1) print('守护线程') def fun(): time.sleep(5) print('普通进程') if __name__ == '__main__': t = Thread(target=func) t.daemon = True t.start() t1 = Thread(target=fun) t1.start() # 守护线程不是根据主线程的代码执行结束而结束 # 守护进程会随着主线程执行结束而结束 # 守护线程会等待主线程结束,再结束 # 所以,一般把不重要的事情设置为守护线程 # 守护进程是根据主进程的代码执行完毕,守护进程就结束
2 . 锁机制
1 ) 递归锁 RLock () 可以有无止尽的锁, 但是会有一把万能钥匙
解决死锁:
from multiprocessing import Process from threading import Thread, RLock import time def man(l_tot, l_pap): l_tot.acquire() print('在厕所') time.sleep(0.1) l_pap.acquire() print('拿到纸了') time.sleep(0.1) print('完事了') l_pap.release() l_tot.release() def woman(l_tot, l_pap): l_pap.acquire() print('拿到纸') time.sleep(0.5) l_tot.acquire() print('在厕所') time.sleep(0.5) print('完事了') l_tot.release() l_pap.release() if __name__ == '__main__': l_tot = l_pap = RLock() p1 = Thread(target=man,args=(l_tot, l_pap) ) p1.start() p2 = Thread(target=woman, args=(l_tot, l_pap)) p2.start()
2 ) 互斥锁 Lock() 一把钥匙配一把锁
死锁的演示:
from multiprocessing import Process from threading import Thread,Lock import time,os def man(l_tot,l_pap): l_tot.acquire()# 是男的获得厕所资源,把厕所锁上了 print('alex在厕所上厕所') time.sleep(1) l_pap.acquire()# 男的拿纸资源 print('alex拿到卫生纸了!') time.sleep(0.5) print('alex完事了!') l_pap.release()# 男的先还纸 l_tot.release()# 男的还厕所 def woman(l_tot,l_pap): l_pap.acquire() # 女的拿纸资源 print('小雪拿到卫生纸了!') time.sleep(1) l_tot.acquire() # 是女的获得厕所资源,把厕所锁上了 print('小雪在厕所上厕所') time.sleep(0.5) print('小雪完事了!') l_tot.release() # 女的还厕所 l_pap.release() # 女的先还纸 if __name__ == '__main__': l_tot = Lock() l_pap = Lock() t_man = Thread(target=man,args=(l_tot,l_pap)) t_woman = Thread(target=woman,args=(l_tot,l_pap)) t_man.start() t_woman.start()
3 ) 全局解释器锁 : 锁的是线程, 是cpython解释器上的一个锁,意思在同一个时间只允许一个线程访问cpu
3 . 信号量 from threading import semaphore
from threading import Semaphore, Thread
import time
def func(i, s):
s.acquire()
print('第%s个人进入了小黑屋' % i)
time.sleep(2)
print('第%s个人出了小黑屋' % i)
s.release()
if __name__ == '__main__':
s = Semaphore(5) #一把钥匙配了五把锁.
for i in range(7):
t = Thread(target=func, args = (i, s))
t.start()
4 . 事件
from threading import Thread, Event import time def sq_conn(i, e): count = 1 while count <= 3: if e.is_set(): # 默认为false print('第%s个人连接成功' % i) break print('正在尝试第%s次连接' % count) e.wait(0.5) count = count + 1 def sq_chec(e): print('\033[31m检查数据库\033[0m') time.sleep(1) #执行while循环的print, 两次,过了1秒,通过e.set,将e.is_set 置 为ture e.set() if __name__ == '__main__': e = Event() t_check = Thread(target=sq_chec, args=(e,)) t_check.start() for i in range(10): t_conn = Thread(target=sq_conn, args=(i, e)) t_conn.start()
5 . 条件 from threading import condition 条件是让程序员自行去调度线程的一个机制.
condition 涉及4个方法
acquire() release() wait() 是指让线程阻塞住
notify(int) 是指给wait发一个信号, 让wait变成不阻塞, int是指给wait发多少信号.
from threading import Thread,Condition import time #如果不设置锁, 默认为递归锁. def func(c, i): #主线程和其他10个子线程争夺递归锁的一把钥匙, #如果子线程拿到钥匙, while1循环, iput, notify发信号, 还钥匙,如果主线程执行过快, #极有可能继续执行主线程,即使子线程wait接收到信号, 由于没拿到钥匙, 也不会执行子线程. c.acquire() c.wait() c.release() print('第%s个线程开始了' % i) if __name__ == '__main__': c = Condition() for i in range(10): t = Thread(target=func, args=(c, i)) t.start() while 1: c.acquire() num = input('>>>') c.notify(int(num)) c.release() time.sleep(1) #如果不设置时间, 就会造成 # 递归锁的两种情况: # 第一种情况 在同一个线程内,递归锁可以无止尽的acquire,但是互斥锁不行 # 第二种情况,在不同的线程内,递归锁是保证只能被一个线程拿到钥匙,然后无止尽的acquire,其他线程等待
6 . 定时器 from threading import timer
Timer (t, func) #过t秒后执行函数
from threading import Timer def func(): print('执行我啦') if __name__ == '__main__': t = Timer(3, func) t.start()
7 . 利用线程实现socket
a : 服务器端代码:
from threading import Thread
import socket
sk = socket.socket()
sk.bind(('127.0.0.1',8090))
sk.listen(5)
def talk(conn):
while 1:
msg = conn.recv(1024).decode('utf-8')
if not msg:break
conn.send(msg.upper().encode('utf-8'))
if __name__ == '__main__':
while 1: #(在此加上while, 可以实现一个服务器与多个客户端进行通信)
conn, addr = sk.accept()
t = Thread(target=talk, args = (conn, ))
t.start()
b : 客户端代码:
import socket sk = socket.socket() sk.connect(('127.0.0.1', 8090)) while 1: content = input('请输入内容>>>') if not content:continue sk.send(content.encode('utf-8')) msg = sk.recv(1024).decode('utf-8') print(msg)