因进程空间相对独立, 资源无法相互获取, 此时在不同进程间通信需要专门方法.
进程间通信方法: 管道 消息队列 共享内存 信号 信号量 套接字
1. 管道通信 Pipe
通信原理: 在内存中开辟管道空间生成管道操作对象, 多个进程使用"同一个"管道对象进行操作即可实现通信
multiprocessing / Pipe
fd1, fd2 = Pipe(duplex = True)
功能: 创建管道
参数:
默认表示双向管道
设置为False则为单向管道
返回值:
表示管道的两端
如果是双向管道, 都可以读写
如果是单向管道, 则fd1只读, fd2只写
fd.recv()
功能: 从管道读取信息
返回值: 读取到的内容
* 如果管道为空则阻塞
fd.send(data)
功能: 向管道写入内容
参数: 要写入的内容
* 可以发送python数据类型
from multprocessing import Process, Pipe
import os, time
fd1, fd2 = Pipe()
def fun(name):
time.sleep(3)
fd1.send('hello'+str(name))
jobs = []
for i in range(5):
p = Process(target = fun, args=(i,))
jobs.append(p)
p.start()
for i in range(5):
data = fd2.recv()
print(data)
for i in jobs:
i.join()
2. 消息队列
队列: 先进先出
通信原理: 在内存中建立队列数据结构模型. 多个进程都可以通过队列存入内容, 取出内容的顺序和存入顺序保持一致
创建队列
q = Queue(maxsize = 0)
功能: 创建消息队列
参数: 表示最多存放多少消息. 默认表示根据内存分配存储
返回值: 队列对象
q.put(data, [block, timeout])
功能: 向队列存储消息
参数:
data 要存的内容
block 默认队列满时会阻塞, 设置为False则非阻塞
timeout 超时时间
data = q.get([block, timeout])
功能: 获取队列消息
参数:
block 默认队列空时会阻塞, 设置为False则非阻塞
timeout 超时时间
q.full() 判断队列是否为满
q.empty() 判断队列是否为空
q.qsize() 判断队列中存在的消息数量
q.close() 关闭队列
from multiprocessing import Queue
from time import sleep
#创建队列
q = Queue(3)
q.put(1)
print(q.empty()) #True 内存还来不及写入, 所以判断为队列空
q.put(2)
print(q.get())
print(q.size())
q.close()
from multiprocessing import Process, Queue
import time
#创建消息队列
q = Queue()
def fun1():
time.sleep(1)
q.put({'a':1, 'b':2})
def fun2():
time.sleep(2)
print('收到消息:', q.get())
p1 = Process(target = fun1)
p2 = Process(target = fun2)
p1.start()
p2.start()
p1.join()
p2.join()
3. 共享内存sharememory
通信原理: 在内存中开辟一块空间, 对多个进程可见, 进程可以写入, 但是每次写入的内容会覆盖之前的内容.
obj = Value(ctype, obj)
功能: 开辟共享内存空间
参数:
ctype 要存储的数据类型
obj 共享内存的初化数据
返回: 共享内存对象
from multiprocessing import Process, Value
import time
import random
#创建共享内存
money = Value('i', 2000)
def deposite():
for i in range(100):
time.sleep(0.05)
#对value属性操作即操作共存数据
money.value += random.randint(1, 200)
#取钱
def withdraw():
for i in range(100):
time.sleep(0.04)
money.value -= random.randint(1, 180)
d = Process(target = deposite)
w = Process(target = withdraw)
d.start()
w.start()
d.join()
w.join()
print('余额:', money.value)
obj.value 即为共享内存值, 对其修改即修改共享内存
obj = Array(ctype, obj)
功能: 开辟共享内存空间
参数:
ctype 要存储的数据格式
obj 初始化存入的内容, 比如列表, 字符串
如果是整数则表示开辟空间的个数
返回值: 返回共享内存对象
- 可以通过遍历获取每个元素的值
e.g. [1, 2, 3] ----> obj[1] ==2 - 如果存入的是字符串
obj.value 表示字符串的首地址
from multiprocessing import Process, Array
import time
##创建共享内存, 初始放入列表
#shm = Array('i', [1, 2, 3, 4, 5])
##创建共享内存,开辟5个整型空间
#shm = Array('i', 5) #打印出是5 个0
#存入字符串, 要求bytes格式
shm = Array('c', b'Hello')
def fun():
for i in shm:
print(i)
#shm[3] = 10000
shm[0] = b'h'
p = Process(target = fun)
p.start()
p.join()
for i in shm:
print(i)
print(shm.value) #r打印字符串
管道 | 消息队列 | 共享内存 | |
---|---|---|---|
读写方式 | 两端读写双向/单向 | 先进先出 | 覆盖之前内容 |
效率 | 一般 | 一般 | 较高 |
应用 | 多用于父子进程 | 广泛灵活 | 需要注意进行互斥操作 |
4. 信号通信
一个进程向另一个进程发送一个信号来传递某种讯息, 接受者根据接收到的信号进行相应的行为
- kill -l 查看系统信号
- kill -sig PID 向一个进程发送信号
关于信号
python 发送信号
signal
os.kill(pid, sig)
功能:发送信号
参数:
pid 目标进程
sig 要发送的信号
import signal
import os
向20959发送信号
os.kill(20959, signal.SIGKILL)
from signal import *
import time
#信号处理函数
def handler(sig,frame):
if sig == SIGALRM:
print("接收到时钟信号")
elif sig == SIGINT:
print("就不结束")
alarm(5)
signal(SIGALRM,handler)
signal(SIGINT,handler)
while True:
print("Waiting for a signal")
time.sleep(2)
day7
signal.alarm(sec)
向自身
发送时钟信号 SIGALRM
sec 时钟时间
同步执行: 按照顺序逐句执行, 一步完成再做下一步
异步执行: 在执行过程中利用内核记录延迟发生或者准备处理的事件. 这样不影响应用层的持续执行.当事件发生时再由内核告知应用层处理
- 信号是唯一的异步通信方法
import signal
import time
signal.alarm(3)
while True:
time.sleep(1)
print('88888')
signal.pause()
阻塞等待接收一个信号
signal.signal(signum, handler)
处理信号
signum 要处理的信号
handler 信号的处理方法
- SIG_DFL 表示使用默认的方法处理
- SIG_IGN 表示忽略这个信号
- func 传入一个函数表示用指定函数处理
- def func(sig, frame)
- sig:捕获到的信号
- frame: 信号对象
5. 信号量(信号灯)
给定一个数量, 对多个进程可见, 且多个进程都可以操作. 进程通过对数量多少的判断执行各自的行为.
multiprocessing/Semaphore()
Semaphore(num)
创建信号量
num信号量初始值
返回: 信号量对象
sem.get_value() 获取信号量值
sem.acquire() 将信号量减1, 当信号量为0时会阻塞
sem.release() 将信号量加1