目录
1. 进程的创建与销毁
multiprocessing:进程模块
Process(group, target, name, args, kwargs)
- group:指定进程组,也可以是None
- target:执行的目标函数
- name:进程名称
- args:以元组的方式给进程传参
- kwargs:以字典方式给进程传参
Process创建实例化对象:
start():启动子进程实例
join(timeout):等待子进程执行结束(也可设置等待时间)
terminate():立即终止子进程
pid:当前进程id
ppid:父进程id
multiprocessing.current_process()可获取当前进程信息
multiprocessing.current_process().pid == os.getpid()
# 导入进程模块
import multiprocessing
import time
import os
def child_process(name, age):
while True:
# 查看当前进程
current_process = multiprocessing.current_process()
print('这是一个子进程,pid=', current_process.pid, ', ppid=', os.getppid())
for i in range(5):
print(f'{age}岁的程序员{name}创建的子进程正在运行...')
time.sleep(1)
# os.kill(os.getppid(), 9) # 杀死父进程,子进程直接退出
os.kill(current_process.pid, 9) # 杀死子进程,父进程并不会直接退出
if __name__ == '__main__':
# 创建子进程
# process = multiprocessing.Process(target=child_process, name='ChildPro', args=('张三', 18))
process = multiprocessing.Process(target=child_process, name='ChildPro',
kwargs={'age': 18, 'name': '张三'})
# 用字典传参可以指定参数名称传递,可改变顺序
# 启动子进程
process.start()
cnt = 0
while True:
print(f'{cnt}: 这是父进程,pid=', multiprocessing.current_process().pid)
time.sleep(1)
cnt += 1
if cnt == 3:
process.terminate() # 子进程直接终止
2. 进程间不共享全局数据
import multiprocessing
import time
import os
# 定义全局变量
my_list = list()
def ChildProcess():
for i in range(5):
print(f'{i}: 这是一个子进程, pid=', os.getpid(), ',ppid=', os.getppid())
my_list.append(i)
time.sleep(1)
def Run():
print(my_list)
if __name__ == '__main__':
child_process = multiprocessing.Process(target=ChildProcess)
run_process = multiprocessing.Process(target=Run)
child_process.start()
# 主进程等待写入进程执行完毕之后再继续执行
child_process.join()
run_process.start()
3. 消息队列
进程之间不可直接进行通信,可通过消息队列实现进程间通信
import multiprocessing
import time
# 写入数据
def WriteProcess(queue):
for i in range(10):
if queue.full():
print('队列已满')
break
queue.put(i)
time.sleep(1)
print(f'{i} 入队列')
# 读取数据
def ReadProcess(queue):
while True:
if queue.qsize() == 0:
print('队列已空')
break
value = queue.get()
print(value, end=' ')
if __name__ == '__main__':
# 创建消息队列
queue = multiprocessing.Queue(5) # 设置容量为5
# 创建进程
write_process = multiprocessing.Process(target=WriteProcess, args=(queue,))
read_process = multiprocessing.Process(target=ReadProcess, args=(queue,))
write_process.start()
write_process.join()
read_process.start()
初始化Queue对象时,若q = multiprocessing.Queue() 没有指定容量或容量设置为负值,则代表可接受的消息数量没有上限(直到内存耗尽)
4. 进程池
进程池会根据任务执行情况自动创建进程,而且尽量少创建进程,合理利用进程池中的进程完成多任务。
当需要创建的子进程数量不多时,可以直接利用multiprocessing的Process动态生成多个子进程,但当要调用大量子进程时,可以用到multiprocessing模块提供的Pool进程池方法
初始化 Pool 时,可以设定进程容量,当有新的请求提交到 Pool 中时,如果池还没有满,就会创建一个新的进程来执行该请求;但如果池中的进程已满,该请求会等待池中某进程结束之后再执行该请求
apply(func, args, kwds):同步进程池,阻塞方式调用函数
apply_async(func, args, kwds):异步进程池,非阻塞方式调用函数
进程池同步执行:一个任务执行完毕另一个任务才开始执行
import multiprocessing
import time
import os
def Task():
print(f'这是一个任务,执行者: {os.getpid()}')
time.sleep(0.5)
if __name__ == '__main__':
pool = multiprocessing.Pool(3) # 设置进程池容量为3
for i in range(10):
pool.apply(Task)
进程池异步执行:进程不会等待,多个任务可同时执行
import multiprocessing
import time
import os
def Task():
print(f'这是一个任务,执行者: {os.getpid()}')
time.sleep(0.5)
if __name__ == '__main__':
pool = multiprocessing.Pool(3) # 设置进程池容量为3
for i in range(10):
# pool.apply(Task) # 同步进程池
pool.apply_async(Task) # 异步进程池
pool.close() # 关闭进程池,代表不再有新任务添加进入进程池
pool.join() # 如果不进程等待的话,主进程会直接退出,进程池也会立即终止