day12_雷神_线程总结

时间:2025-01-18 08:03:55

线程


1、 多线程理论

0、进程只是一个资源单位,用来隔离资源,从执行角度是主线程。

1、多个线程共享一个进程的数据资源;

2、线程开销小;

2、 开线程的两种方式

0、 站在资源的角度,主进程;执行的角度主线程。

方法一:
from threading import Thread
import time
import random def piao(name):
print('%s is piaoing' %name)
time.sleep(random.randint(1,3))
print('%s is piao end' %name) if __name__ == '__main__': # 这句话开启线程没必要写。因为线程都在一个内存空间里。
t1=Thread(target=piao,args=('alex',))
t1.start()
print('主') 方法二: class MyThread(Thread):
def __init__(self,name):
super().__init__()
self.name=name def run(self):
print('%s is piaoing' %self.name)
time.sleep(random.randint(1,3))
print('%s is piao end' %self.name) # if __name__ == '__main__':
t1=MyThread('alex')
t1.start()
print('主')

3、 进程与线程的区别

# python阶段 进程和线程之间的区别
# 进程pid,多进程的时候每个子进程有自己的pid
# 多个线程共享一个进程id
# 数据隔离和共享,多进程之间数据隔离
# 线程之间全局变量都是共享的
# main : 进程必须写if __name__ == '__main__':
# 线程由于共享进程的代码,不需要再执行文件中的代码,不需要if __name__ == '__main__'
# 效率差 : 进程的开启和销毁消耗的时间长
# 线程的开启和销毁消耗的时间远远小于进程

4、 多线程实现的套接字并发通信

服务端

from threading import Thread,current_thread   # 查看当前线程的对象
from socket import * def comunicate(conn):
print('子线程:%s' %current_thread().getName())
while True:
try:
data=conn.recv(1024)
if not data:break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close() def server(ip,port):
print('主线程:%s' %current_thread().getName())
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip,port))
server.listen(5) while True:
conn, addr = server.accept()
print(addr)
# comunicate(conn)
t=Thread(target=comunicate,args=(conn,))
t.start() server.close() if __name__ == '__main__':
server('127.0.0.1', 8081)

客户端

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081)) while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
data=client.recv(1024)
print(data.decode('utf-8')) client.close() 问题1: 用多线程合适?
问题2: 无线线程?

5、 守护线程

一个进程内有多个线程,所有线程干完活。

主进程等子进程,要给儿子收尸;

主线程等子线程,主线程就代表了进程的生命周期,进程内所有线程结束才代表这个进程结束。

主线程在所有非守护线程都死了,才死;

from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print("end123") def bar():
print(456)
time.sleep(3)
print("end456") if __name__ == '__main__':
t1=Thread(target=foo)
t2=Thread(target=bar) t1.daemon=True
t1.start()
t2.start()
print("main-------") 等bar运行结束,主线程才算结束。

6、 GIL解释器锁

全局解释器锁,是cpython解释器的特性。就是一个互斥锁,Lock。

互斥锁: 把并发编程串行执行,Lock,互斥锁,牺牲效率,保护数据。

一个进程下的多个线程,同一时间只能有一个线程运行,无法利用多核优势。

因为cpython的内存管理机制不是线程安全的

运行一个python文件经过三步:

一运行这个文件,申请了一个内存空间;
python 解释器由硬盘读进内存,先加载解释器代码;
再加载python代码;、
主线程吧python代码当做字符串传递给解释器,解释器执行c语言功能;

IO、计算:

1个CPU执行10个IO操作,每个睡3s,运行时间等于3+切换时间
10个CPU等于3s;
IO操作,多核用不上。

GIL锁与互斥锁

GIL锁是第一步争抢,当线程1抢到GIL,很悠闲的拿到互斥锁,操作数据,遇到阻塞,释放GIL,线程2抢到GIL,其实没用,不能对数据进程操作。等线程1睡醒,并且抢到GIL,此时才执行下去。

python自己优化这个,使线程1很迅速的睡醒后马上抢到GIL锁。

7、 线程的互斥锁

from threading import Thread,Lock
import time n=100 def task():
global n
with mutex:
temp=n
time.sleep(0.1)
n=temp-1 if __name__ == '__main__':
start_time=time.time()
mutex=Lock()
t_l=[]
for i in range(100):
t=Thread(target=task)
t_l.append(t)
t.start() for t in t_l:
t.join()
stop_time=time.time()
print('主',n)
print('run time is %s' %(stop_time-start_time))

8、 paramiko模块

基于密码验证:

import paramiko

# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='120.92.84.249', port=22, username='root', password='xxx') # 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
print(result.decode('utf-8'))
# 关闭连接
ssh.close()

9、 死锁与递归所

# 递归锁的特点
# 如果能够在第一个acquire的地方通过
# 那么在一个线程中后面所有的acquire都能通过
# 但是其他所有的线程都会在第一个acquire处阻塞
# 在这个线程中acquire了多少次,就必须release多少次
# 如果acquire的次数和release的次数不想等,那么其他线程也不能继续向下执行 # 递归锁
# 能够快速解决死锁为题,保证线程之间的数据安全 # 将程序的逻辑改过来,使用互斥锁是最节省地缘的 # 科学家吃面
# from threading import Thread,RLock
# noodle = 100
# fork = 100
# noodle_lock = fork_lock = RLock()
# def eat1(name):
# global noodle,fork
# noodle_lock.acquire()
# print('%s拿到面了' % name)
# fork_lock.acquire()
# print('%s拿到叉子了' % name)
# noodle -= 1
# print('%s吃面'%name)
# time.sleep(0.1)
# fork_lock.release()
# print('%s放下叉子了' % name)
# noodle_lock.release()
# print('%s放下面' % name)
#
# def eat2(name):
# global noodle,fork
# fork_lock.acquire()
# print('%s拿到叉子了'%name)
# noodle_lock.acquire()
# print('%s拿到面了'%name)
# noodle -= 1
# print('%s吃面'%name)
# time.sleep(0.1)
# noodle_lock.release()
# print('%s放下面'%name)
# fork_lock.release()
# print('%s放下叉子了'%name)
#
#
# for i in ['alex','wusir','egon','快老师']:
# Thread(target=eat1,args=(i,)).start()
# Thread(target=eat2,args=(i+'2',)).start()

10、 信号量

相当于上公共卫生间,有5个坑,同时5个人上。

from threading import Thread,Semaphore,current_thread
import time,random sm=Semaphore(5) def task():
with sm:
print('%s is laing' %current_thread().getName())
time.sleep(random.randint(1,3)) if __name__ == '__main__':
for i in range(20):
t=Thread(target=task)
t.start()

11、 Event事件

一个线程可以在工作到某个时间点的时候,通知另一个线程,你可以开始工作了。 

from threading import Thread,Event,current_thread
import time event=Event() def check():
print('checking MySQL...')
time.sleep(5)
event.set() def conn():
count=1
while not event.is_set():
if count > 3:
raise TimeoutError('超时')
print('%s try to connect MySQL time %s' %(current_thread().getName(),count))
event.wait(1)
count+=1 print('%s connected MySQL' %current_thread().getName()) if __name__ == '__main__':
t1=Thread(target=check)
t2=Thread(target=conn)
t3=Thread(target=conn)
t4=Thread(target=conn) t1.start()
t2.start()
t3.start()
t4.start()

12、 定时器

图片验证码,时间到了,60s刷新一下。
就是多长时间以后,运行一个任务 from threading import Timer def hello(name):
print("hello, world %s " %name) t = Timer(3, hello,args=('egon',))
t.start() # after 1 seconds, "hello, world" will be printed

13、 线程队列queue

模块名字都是小写了,在python3里面

封装了锁,不用考虑线程锁了。

一个put,一个get
import queue q = queue.Queue() 先进先出
在线程之间数据安全,自带线程锁的数据容器 lq = queue.LifoQueue() # 栈 后进先出 算法和数据结构中
lq.put(1)
lq.put(2)
lq.put(3)
print(lq.get())
print(lq.get())
print(lq.get())
print(lq.get()) pq = queue.PriorityQueue() # 优先级队列
pq.put(3)
pq.put(5)
pq.put(2)
print(pq.get())
print(pq.get())
print(pq.get()) pq.put('c')
pq.put('a')
pq.put('A')
print(pq.get())
print(pq.get())
print(pq.get()) pq.put((10,'asfghfgk'))
pq.put((20,'2iyfhejcn'))
pq.put((15,'qwuriyhf'))
print(pq.get())
print(pq.get())
print(pq.get())

14、 进程池、线程池

进程池

程序效率高: 尽可能多的拿到CPU,效率低,经常遇到IO。

同步调用、异步调用:提交任务的两种方式;

提交任务的两种方式:
同步调用:提交完任务后,就在原地等待,等待任务执行完毕,拿到任务的返回值,才能继续下一行代码,导致程序串行执行
异步调用+回调机制:提交完任务后,不在原地等待,任务一旦执行完毕就会触发回调函数的执行, 程序是并发执行 进程的执行状态:
阻塞
非阻塞 #同步调用示例:
# from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random,os def task(n):
print('%s is ruuning' %os.getpid())
time.sleep(random.randint(1,3))
return n**2 def handle(res):
print('handle res %s' %res) if __name__ == '__main__':
#同步调用
pool=ProcessPoolExecutor(2) for i in range(5):
res=pool.submit(task,i).result()
# print(res)
handle(res) pool.shutdown(wait=True)
# pool.submit(task,33333)
print('主') 异步调用示例:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random,os def task(n):
print('%s is ruuning' %os.getpid())
time.sleep(random.randint(1,3))
# res=n**2
# handle(res)
return n**2 def handle(res):
res=res.result()
print('handle res %s' %res) if __name__ == '__main__':
#异步调用
pool=ProcessPoolExecutor(2) for i in range(5):
obj=pool.submit(task,i)
obj.add_done_callback(handle) #handle(obj) pool.shutdown(wait=True)
print('主')

线程池

from threading import get_ident
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor # 任务数超过了CPU个数的2倍,
# 进程的个数就不应该和任务数相等了 # 任务数超过了CPU个数的5倍,
# 线程的个数就不应该和任务数相等了 # 什么叫池?
# 如果任务多的情况下,无限的开启进程/线程
# 不仅会浪费非常多的时间来开启和销毁
# 还需要占用系统的调度资源 # 如果我开启有限的线程/进程,来完成无限的任务
# 这样能够最大化的保证并发
# 且维护操作系统资源的协调 # 线程池
# 为什么要用池
import os
import time
import random
def func(i):
time.sleep(random.randint(1,2))
print(get_ident(),i)
return '*'*i*i def call_bak(ret):
print(get_ident(),len(ret.result())) t_pool = ThreadPoolExecutor(os.cpu_count()*1)
# for i in range(20):
# t_pool.submit(func,i)
# t_pool.shutdown() # 阻塞
# t_pool.map(func,range(20))
print('main thread') # ret_l = []
# for i in range(20):
# ret = t_pool.submit(func,i)
# ret_l.append(ret)
# for ret in ret_l:print(ret.result()) # 阻塞 # submit
# shutdown() == join整个线程池中的任务
# map
# result
# add_done_callback # ret_l = []
# for i in range(20):
# t_pool.submit(func,i).add_done_callback(call_bak)
# 回调函数是由执行func的同一个子线程执行的
# 回调函数的特点是,执行完func之后会立即出发callback函数
# 回掉函数的参数就是func的返回值。

15、 进程、线程总结

# cpython解释器下
# 进程 : 利用多核-并行,数据不共享;开启和切换和销毁的开销大,数据不安全
# 线程 : 不能利用多核-并发,数据共享;开启和切换和销毁的开销小,数据不安全 # 进程的数量非常有限 : cpu的个数 + 1
# 线程的数量也要限制 : cpu的个数 * 5
# 以上操作都由池来完成 # 4c的计算机
# 5个进程 * 每个进程20个线程 = 100个并发 # 多进程能够利用多核 : 高计算型应该开多进程
# 多线程能够利实现并发 : 高IO型应该开多线程

16、 协程

进程是计算机最小的资源分配单位
线程是CPU调度的最小单位 协程
一条线程分成几个任务执行
每个任务执行一会儿
再切到下一个任务 单纯的切换回浪费时间的 切换任务 是 有程序来完成的
不是由操作系统控制的 如果在执行任务的过程中
是遇到阻塞才切换,是能够节约时间的 协程是非常好的 协程的本质就是一条线程
多个协程也是宏观上的并发
协程是数据安全的 ,不需要考虑锁 from gevent import monkey;monkey.patch_all()
import time
import gevent
from threading import currentThread def eat():
print('eating1',currentThread().getName())
time.sleep(1)
print('eating2') def play():
print('playing1',currentThread().getName())
time.sleep(2)
print('playing2') g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
g1.join()
g2.join()

17、 基于协程实现套接字并发

服务端

from gevent import monkey;monkey.patch_all()
from threading import currentThread
import socket
import gevent def talk(conn):
while True:
print('xiancheng', currentThread().getName())
conn.send(b'hello')
print(conn.recv(1024)) sk = socket.socket()
sk.bind(('127.0.0.1',9090))
sk.listen() while True:
conn,addr = sk.accept()
print('主',currentThread().getName())
g = gevent.spawn(talk,conn)

客户端

import socket
def func():
sk = socket.socket()
sk.connect(('127.0.0.1',9090)) while True:
print(sk.recv(1024))
sk.send(b'byebye') from threading import Thread
for i in range(500):
Thread(target=func).start()