进程间通信(IPC)

时间:2021-11-27 19:06:02

因进程空间相对独立, 资源无法相互获取, 此时在不同进程间通信需要专门方法.

进程间通信方法: 管道 消息队列 共享内存 信号 信号量 套接字

1. 管道通信 Pipe


通信原理: 在内存中开辟管道空间生成管道操作对象, 多个进程使用"同一个"管道对象进行操作即可实现通信
multiprocessing / Pipe

fd1, fd2 = Pipe(duplex = True)
功能: 创建管道
参数:
默认表示双向管道
设置为False则为单向管道
返回值:
表示管道的两端
如果是双向管道, 都可以读写
如果是单向管道, 则fd1只读, fd2只写

fd.recv()
功能: 从管道读取信息
返回值: 读取到的内容
* 如果管道为空则阻塞

fd.send(data)
功能: 向管道写入内容
参数: 要写入的内容
* 可以发送python数据类型

from multprocessing import Process, Pipe
import os, time

fd1, fd2 = Pipe()

def fun(name):
    time.sleep(3)
    fd1.send('hello'+str(name))

jobs = []
for i in range(5):
    p = Process(target = fun, args=(i,))
    jobs.append(p)
    p.start()

for i in range(5):
    data = fd2.recv()
    print(data)

for i in jobs:
    i.join()

2. 消息队列


队列: 先进先出
通信原理: 在内存中建立队列数据结构模型. 多个进程都可以通过队列存入内容, 取出内容的顺序和存入顺序保持一致

创建队列
q = Queue(maxsize = 0)
功能: 创建消息队列
参数: 表示最多存放多少消息. 默认表示根据内存分配存储
返回值: 队列对象

q.put(data, [block, timeout])
功能: 向队列存储消息
参数:
data 要存的内容
block 默认队列满时会阻塞, 设置为False则非阻塞
timeout 超时时间

data = q.get([block, timeout])
功能: 获取队列消息
参数:
block 默认队列空时会阻塞, 设置为False则非阻塞
timeout 超时时间

q.full() 判断队列是否为满
q.empty() 判断队列是否为空
q.qsize() 判断队列中存在的消息数量
q.close() 关闭队列
from multiprocessing import Queue
from time import sleep

#创建队列
q = Queue(3)

q.put(1)
print(q.empty())    #True 内存还来不及写入, 所以判断为队列空
q.put(2)
print(q.get())
print(q.size())
q.close()
from multiprocessing import Process, Queue
import time

#创建消息队列
q = Queue()

def fun1():
    time.sleep(1)
    q.put({'a':1, 'b':2})

def fun2():
    time.sleep(2)
    print('收到消息:', q.get())

p1 = Process(target = fun1)
p2 = Process(target = fun2)
p1.start()
p2.start()
p1.join()
p2.join()

3. 共享内存sharememory


通信原理: 在内存中开辟一块空间, 对多个进程可见, 进程可以写入, 但是每次写入的内容会覆盖之前的内容.

obj = Value(ctype, obj)
功能: 开辟共享内存空间
参数:
ctype 要存储的数据类型
obj 共享内存的初化数据
返回: 共享内存对象
进程间通信(IPC)

from multiprocessing import Process, Value
import time
import random

#创建共享内存
money = Value('i', 2000)
def deposite():
    for i in range(100):
        time.sleep(0.05)
        #对value属性操作即操作共存数据
        money.value += random.randint(1, 200)

#取钱
def withdraw():
    for i in range(100):
        time.sleep(0.04)
        money.value -= random.randint(1, 180)

d = Process(target = deposite)
w = Process(target = withdraw)
d.start()
w.start()
d.join()
w.join()

print('余额:', money.value)

obj.value 即为共享内存值, 对其修改即修改共享内存

obj = Array(ctype, obj)
功能: 开辟共享内存空间
参数:
ctype 要存储的数据格式
obj 初始化存入的内容, 比如列表, 字符串
如果是整数则表示开辟空间的个数
返回值: 返回共享内存对象

  • 可以通过遍历获取每个元素的值
    e.g. [1, 2, 3] ----> obj[1] ==2
  • 如果存入的是字符串
    obj.value 表示字符串的首地址
from multiprocessing import Process, Array
import time

##创建共享内存, 初始放入列表
#shm = Array('i', [1, 2, 3, 4, 5])
##创建共享内存,开辟5个整型空间
#shm = Array('i', 5) #打印出是5 个0
#存入字符串, 要求bytes格式
shm = Array('c', b'Hello')

def fun():
    for i in shm:
        print(i)
    #shm[3] = 10000
    shm[0] = b'h'
p = Process(target = fun)
p.start()
p.join()

for i in shm:
    print(i)
print(shm.value) #r打印字符串
管道 消息队列 共享内存
读写方式 两端读写双向/单向 先进先出 覆盖之前内容
效率 一般 一般 较高
应用 多用于父子进程 广泛灵活 需要注意进行互斥操作

4. 信号通信


一个进程向另一个进程发送一个信号来传递某种讯息, 接受者根据接收到的信号进行相应的行为

  • kill -l 查看系统信号
  • kill -sig PID 向一个进程发送信号

关于信号
进程间通信(IPC)
进程间通信(IPC)

python 发送信号

signal

os.kill(pid, sig)
功能:发送信号
参数:
pid 目标进程
sig 要发送的信号

import signal
import os
向20959发送信号
os.kill(20959, signal.SIGKILL)
from signal import * 
import time 

#信号处理函数
def handler(sig,frame):
    if sig == SIGALRM:
        print("接收到时钟信号")
    elif sig == SIGINT:
        print("就不结束")

alarm(5)

signal(SIGALRM,handler)
signal(SIGINT,handler)

while True:
    print("Waiting for a signal")
    time.sleep(2)

day7

signal.alarm(sec)
自身发送时钟信号 SIGALRM
sec 时钟时间

同步执行: 按照顺序逐句执行, 一步完成再做下一步
异步执行: 在执行过程中利用内核记录延迟发生或者准备处理的事件. 这样不影响应用层的持续执行.当事件发生时再由内核告知应用层处理

  • 信号是唯一的异步通信方法
import signal
import time

signal.alarm(3)
while True:
    time.sleep(1)
    print('88888')

signal.pause()
阻塞等待接收一个信号

signal.signal(signum, handler)
处理信号
signum 要处理的信号
handler 信号的处理方法

  • SIG_DFL 表示使用默认的方法处理
  • SIG_IGN 表示忽略这个信号
  • func 传入一个函数表示用指定函数处理
    • def func(sig, frame)
    • sig:捕获到的信号
    • frame: 信号对象

5. 信号量(信号灯)


给定一个数量, 对多个进程可见, 且多个进程都可以操作. 进程通过对数量多少的判断执行各自的行为.
multiprocessing/Semaphore()

Semaphore(num)
创建信号量
num信号量初始值
返回: 信号量对象

sem.get_value() 获取信号量值
sem.acquire() 将信号量减1, 当信号量为0时会阻塞
sem.release() 将信号量加1