线程和进程
一、 什么是进程 / 线程
1、 引论
众所周知,CPU是计算机的核心,它承担了所有的计算任务。而操作系统是计算机的管理者,是一个大管家,它负责任务的调度,资源的分配和管理,统领整个计算机硬件。应用程序是具有某种功能的程序,程序运行与操作系统之上
2、 线程
在很早的时候计算机并没有线程这个概念,但是随着时代的发展,只用进程来处理程序出现很多的不足。如当一个进程堵塞时,整个程序会停止在堵塞处,并且如果频繁的切换进程,会浪费系统资源。所以线程出现了
线程是能拥有资源和独立运行的最小单位,也是程序执行的最小单位。一个进程可以拥有多个线程,而且属于同一个进程的多个线程间会共享该进行的资源
3、 进程
进程时一个具有一定功能的程序在一个数据集上的一次动态执行过程。进程由程序,数据集合和进程控制块三部分组成。程序用于描述进程要完成的功能,是控制进程执行的指令集;数据集合是程序在执行时需要的数据和工作区;程序控制块(PCB)包含程序的描述信息和控制信息,是进程存在的唯一标志
4、 区别
- 一个进程由一个或者多个线程组成,线程是一个进程中代码的不同执行路线
- 切换进程需要的资源比切换线程的要多的多
- 进程之间相互独立,而同一个进程下的线程共享程序的内存空间(如代码段,数据集,堆栈等)。某进程内的线程在其他进程不可见。换言之,线程共享同一片内存空间,而进程各有独立的内存空间
5、 使用
在Python中,通过两个标准库 thread
和 Threading
提供对线程的支持,threading
对 thread
进行了封装。threading
模块中提供了 Thread
,Lock
, RLOCK
, Condition
等组件
二、 多线程使用
在Python中线程和进程的使用就是通过Thread
这个类。这个类在我们的thread
和threading
模块中。我们一般通过threading
导入
默认情况下,只要在解释器中,如果没有报错,则说明线程可用
from threading import Thread
1、 常用方法
Thread.run(self) # 线程启动时运行的方法,由该方法调用 target 参数所指定的函数
Thread.start(self) # 启动线程,start 方法就是去调用 run 方法
Thread.terminate(self) # 强制终止线程
Thread.join(self, timeout) # 阻塞调用,主线程进行等待
Thread.setDaemon(self, daemonic) # 将子线程设置为守护线程
Thread.getName(self, name) # 获取线程名称
Thread.setName(self, name) # 设置线程名称
2、 常用参数
参数 | 说明 |
---|---|
target | 表示调用对象,即子线程要执行的任务 |
name | 子线程的名称 |
args | 传入 target 函数中的位置参数,是一个元组,参数后必须添加逗号 |
3、 多线程的应用
3.1 重写线程法
import time, queue, threading
class MyThread(threading.Thread):
def __init__(self):
super().__init__()
self.daemon = True # 开启守护模式
self.queue = queue.Queue(3) # 开启队列对象,存储三个任务
self.start() # 实例化的时候直接启动线程,不需要手动启动线程
def run(self) -> None: # run方法线程自带的方法,内置方法,在线程运行时会自动调用
while True: # 不断处理任务
func, args, kwargs = self.queue.get()
func(*args, **kwargs) # 调用函数执行任务 元组不定长记得一定要拆包
self.queue.task_done() # 解决一个任务就让计数器减一,避免阻塞
# 生产者模型
def submit_tasks(self, func, args=(), kwargs={}): # func为要执行的任务,加入不定长参数使用(默认使用默认参数)
self.queue.put((func, args, kwargs)) # 提交任务
# 重写join方法
def join(self) -> None:
self.queue.join() # 查看队列计时器是否为0 任务为空 为空关闭队列
def f2(*args, **kwargs):
time.sleep(2)
print("任务2完成", args, kwargs)
# 实例化线程对象
mt = MyThread()
# 提交任务
mt.submit_tasks(f2, args=("aa", "aasd"), kwargs={"a": 2, "s": 3})
# 让主线程等待子线程结束再结束
mt.join()
守护模式:
- 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束
3.2 直接调用法
def f2(i):
time.sleep(2)
print("任务2完成", i)
lis = []
for i in range(5):
t = Thread(target=f2, args=(i,))
t.start() # 启动 5 个线程
lis.append(t)
for i in lis:
i.join() # 线程等待
4、 线程间数据的共享
现在我们程序代码中,有多个线程, 并且在这个几个线程中都会去 操作同一部分内容,那么如何实现这些数据的共享呢?
这时,可以使用 threading库里面的锁对象 Lock 去保护
Lock 对象的acquire方法 是申请锁
每个线程在操作共享数据对象之前,都应该申请获取操作权,也就是调用该共享数据对象对应的锁对象的acquire方法,如果线程A 执行了 acquire()
方法,别的线程B 已经申请到了这个锁, 并且还没有释放,那么 线程A的代码就在此处 等待 线程B 释放锁,不去执行后面的代码。
直到线程B 执行了锁的 release 方法释放了这个锁, 线程A 才可以获取这个锁,就可以执行下面的代码了
如:
import threading
var = 1
# 添加互斥锁,并且拿到锁
lock = threading.Lock()
# 定义两个线程要用做的任务
def func1():
global var # 声明全局变量
for i in range(1000000):
lock.acquire() # 操作前上锁
var += i
lock.release() # 操作完后释放锁
def func2():
global var # 声明全局变量
for i in range(1000000):
lock.acquire() # 操作前上锁
var -= i
lock.release() # 操作完后释放锁
# 创建2个线程
t1 = threading.Thread(target=func1)
t2 = threading.Thread(target=func2)
t1.start()
t2.start()
t1.join()
t2.join()
print(var)
到在使用多线程时,如果数据出现和自己预期不符的问题,就可以考虑是否是共享的数据被调用覆盖的问题
使用
threading
库里面的锁对象Lock
去保护
三、 多进程使用
1、 简介
Python中的多进程是通过multiprocessing包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多也有start(), run(), join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的
2、 应用
2.1 重写进程法
import time
from multiprocessing import Process
class MyProcess(Process): # 继承Process类
def __init__(self, target, args=(), kwargs={}):
super(MyProcess, self).__init__()
self.daemon = True # 开启守护进程
self.target = target
self.args = args
self.kwargs = kwargs
self.start() # 自动开启进程
def run(self):
self.target(*self.args, **self.kwargs)
def fun(*args, **kwargs):
print(time.time())
print(args[0])
if __name__ == '__main__':
lis = []
for i in range(5):
p = MyProcess(fun, args=(1, ))
lis.append(p)
for i in lis:
i.join() # 让进程等待
守护模式:
- 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束
2.2 直接调用法
import time
from multiprocessing import Process
def fun(*args, **kwargs):
print(time.time())
print(args[0])
if __name__ == '__main__':
lis = []
for i in range(5):
p = Process(target=fun, args=(1, ))
lis.append(p)
for i in lis:
i.join() # 让进程等待
3、 进程之间的数据共享
3.1 Lock 方法
其使用方法和线程的那个 Lock 使用方法类似
3.2 Manager 方法
Manager的作用是提供多进程共享的全局变量,Manager()方法会返回一个对象,该对象控制着一个服务进程,该进程中保存的对象运行其他进程使用代理进行操作
Manager支持的类型有:list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array
语法:
from multiprocessing import Process, Lock, Manager
def f(n, d, l, lock):
lock.acquire()
d[str(n)] = n
l[n] = -99
lock.release()
if __name__ == '__main__':
lock = Lock()
with Manager() as manager:
d = manager.dict() # 空字典
l = manager.list(range(10)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 启动10个进程,不同的进程对d和l中的不同元素进行操作
for i in range(10):
p = Process(target=f, args=(i, d, l, lock))
p.start()
p.join()
print(d)
print(l)
四、 池并发
1、 语法
线程池的基类是 concurrent.futures
模块中的 Executor
,Executor
提供了两个子类,即 ThreadPoolExecutor
和 ProcessPoolExecutor
,其中 ThreadPoolExecutor
用于创建线程池,而 ProcessPoolExecutor
用于创建进程池
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定
Exectuor 提供了如下常用方法:
- submit(fn, *args, *kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数
- map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
- shutdown(wait=True):关闭线程池
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表
Future 提供了如下方法:
- cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
- cancelled():返回 Future 代表的线程任务是否被成功取消。
- running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
- done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
- result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
- exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
- add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数
2、 获取 CPU 数量
from multiprocessing import cpu_count # cpu核心数模块,其可以获取 CPU 核心数
n = cpu_count() # 获取cpu核心数
3、 线程池
使用线程池来执行线程任务的步骤如下:
- 调用 ThreadPoolExecutor 类的构造器创建一个线程池
- 定义一个普通函数作为线程任务
- 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务
- 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
def get_result(future):
print(future.result())
# 为future1添加线程完成的回调函数
future1.add_done_callback(get_result)
# 为future2添加线程完成的回调函数
future2.add_done_callback(get_result)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown() # 序可以使用 with 语句来管理线程池,这样即可避免手动关闭线程池
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
也可以低于 CPU 核心数
3、 进程池
使用线程池来执行线程任务的步骤如下:
- 调用 ProcessPoolExecutor 类的构造器创建一个线程池
- 定义一个普通函数作为进程程任务
- 调用 ProcessPoolExecutor 对象的 submit() 方法来提交线程任务
- 当不想提交任何任务时,调用 ProcessPoolExecutor 对象的 shutdown() 方法来关闭线程池
关于进程的开启代码一定要放在if __name__ == '__main__':
代码之下,不能放到函数中或其他地方
开启进程的技巧
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor(max_workers=cpu_count()) # 根据cpu核心数开启多少个进程
开启进程的数量最好低于最大 CPU 核心数