python并发编程-进程理论-进程方法-守护进程-互斥锁-01

时间:2024-01-10 14:19:20

操作系统发展史(主要的几个阶段)

  • 初始系统

    1946年第一台计算机诞生,采用手工操作的方式(用穿孔卡片操作)

    同一个房间同一时刻只能运行一个程序,效率极低(操作一两个小时,CPU一两秒可能就运算完了)

  • 联机批处理系统

  • 脱机批处理系统

  • 多道程序系统

    python并发编程-进程理论-进程方法-守护进程-互斥锁-01

    1.空间上的复用

    ​ 多个程序公用一套计算机硬件

    2.时间上的复用

    切换+保存状态

    保存状态:保存当前的运行状态,下次接着该状态继续执行

    切换的两种情况

    ​ (1) 当一个程序遇到 I/O 操作(不需要使用CPU),操作系统会剥夺该程序的CPU执行权限(提高了CPU的利用率,并且也不影响程序的执行效率(利用空档期))

    ​ (2)当一个程序长时间占用CPU操作系统也会剥夺该程序的cpu执行权限)(降低了程序的执行效率)

并发:看起来像同时运行的就算

并行:真正意义上的同时执行

单核(CPU)的计算机能实现并发,不能实现并行

进程理论

程序:一坨代码 (没有在运行的代码)

进程:正在运行的程序(广义定义:一个具有独立功能的程序关于某个数据集合的一次运行活动)

注意:同一个程序执行两次,就会在操作系统中出现两个进程,所以我们可以同时运行一个软件,分别做不同的事情

进程调度:要想多个进程交替执行,操作系统必须对这些进程进行调度,这个调度不是随机进行的,要遵循一定的规则(调度算法)

时间片:就好比将一秒等分成N多分(对于CPU来说是差不多的时间)

时间片轮转法+多级反馈队列

python并发编程-进程理论-进程方法-守护进程-互斥锁-01

上面那三层就是“多级反馈队列

linux中可以给指定的程序设置优先级(了解即可

进程三状态

  • 就绪态

  • 运行态

  • 阻塞态

切换状态的时间非常短,可以忽略(CPU超级快)

python并发编程-进程理论-进程方法-守护进程-互斥锁-01

程序不会直接进入运行态,必须先进入就绪态,再根据时间片轮询算法来执行

让进程离开运行态的两种方式

  1. 会进入阻塞态的一些操作

    input

    print

    文件操作

    sleep

    ...其他的

    结束阻塞态

    ​ input获取到值

    print输出完结果

    文件读取完毕

    ​ 睡眠结束

  2. 时间片的时间到了(占用CPU太久了)会自动进入就绪态,而不是阻塞,等待下一次CPU执行

写程序的理想状态: 尽量少的进入阻塞态

同步异步

任务的提交方式的不同

同步:任务提交之后原地等待任务的执行并拿到返回结果才继续执行,期间不做任何事(程序层面的表现就是卡住了)

异步:任务提交之后不再原地等待,而是继续执行下一行代码(结果是要的,只是通过其他方式(异步回调)获取)

阻塞非阻塞

程序的运行状态不同

阻塞:对应进程三状态中的阻塞态

非阻塞:对应进程三状态中的就绪态、运行态

其他说法(了解)

同步阻塞形式

效率最低,专心排队,什么别的事都不做。

异步阻塞形式

如果在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行做其它的事情,那么很显然,这个人被阻塞在了这个等待的操作上面

异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

同步非阻塞形式

想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

异步非阻塞形式

因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。

比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步+非阻塞的方式了。

强调:同步异步、阻塞非阻塞是两对概念,不能混为一谈

异步非阻塞(是两个不同的概念)

用代码创建进程的两种方式 *****

创建进程就是在内存中重新开辟一块内存空间

将运行产生的代码丢进去

一个进程对应在内存就是一块独立的内存空间

进程与进程之间数据是隔离的,无法直接交互,可以通过某些技术实现间接交互

方式一

打印前加上 time.sleep() 可以看出异步效果(主进程暂停,子进程还在运行)

from multiprocessing import Process
import time def test(name):
print(f"{name} is runnig")
time.sleep(2)
print(f"{name} is over") '''
windows 创建进程 会将代码以 ** 模块 ** 的方式从上往下执行一遍
所以一定要在 下面代码内创建进程,否则报错
if __name__ == '__main__':
创建进程
pass linux 会直接将代码完完整整地拷贝(fork)一份 ''' if __name__ == '__main__':
p = Process(target=test, args=('egon', )) # 创建一个进程对象,容器类型的只有一个元素记得写逗号 (第一个元素, )
p.start() # 告诉操作系统帮你创建一个进程(这是异步的)
print(f"主进程") # 这个打印顺序加了time.sleep可能就不一样了,或者mac中也不一样
# 主进程
# egon is runnig
# egon is over

方式二

# 方式二  检验下和老师代码的区别
from multiprocessing import Process
import time class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
# self.test() def run(self): # 重写了父类的run 方法,会自动运行(父类中有调用), 改名了就不会自动调用
print(f"{self.name} is runnig")
time.sleep(2)
print(f"{self.name} is over") if __name__ == '__main__':
p = MyProcess('egon')
p.start() # 告诉操作系统帮你创建一个进程(这是异步的)
print(f"主进程") # 主进程
# egon is runnig
# egon is over

进程方法 join

让主进程等待某个子进程运行结束(不会影响其他进程的运行(调用 join的那个子进程))

注意进程对象.join()进程对象.start()的前后位置(进程对象.join()放在其他进程的进程对象.start()在之后,不然会浪费时间)

python并发编程-进程理论-进程方法-守护进程-互斥锁-01

案例

from multiprocessing import Process
import time def test(name):
print(f"{name} is runnig")
time.sleep(2)
print(f"{name} is over") # 不能这样创,会报错
# p = Process(target=test, args=('egon',)) # 创建一个进程对象,容器类型的只有一个元素记得写逗号 (第一个元素, )
# p.start() # 告诉操作系统帮你创建一个进程(这是异步的)
# print(f"主进程")
"""
报错:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "E:\python3-6-4\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
....省略..............
File "E:\python3-6-4\lib\multiprocessing\spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase. This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module: if __name__ == '__main__':
freeze_support()
... The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable. """ # if __name__ == '__main__':
# p = Process(target=test, args=('egon',)) # 创建一个进程对象,容器类型的只有一个元素记得写逗号 (第一个元素, )
# p2 = Process(target=test, args=('kevin',))
# p3 = Process(target=test, args=('jason',))
# p.start() # 仅仅只是告诉操作系统帮你创建一个进程,至于这个进程什么时候创建,由操作系统随机决定
# p2.start()
# p3.start()
# # time.sleep(10) # 下面打印顺序会改变
# p.join() # 主进程代码等待子进程运行结束
# print(f"主进程")
#
# # egon is runnig
# # jason is runnig
# # kevin is runnig
# # egon is over
# # jason is over
# # kevin is over
# # 主进程 # 简化写法
# if __name__ == '__main__':
# for i in range(3):
# p = Process(target=test, args=(f'进程{i}',))
# p.start()
# p.join() # 固定了进程执行顺序
#
# # 进程0 is runnig
# # 进程0 is over
# # 进程1 is runnig
# # 进程1 is over
# # 进程2 is runnig
# # 进程2 is over # # 还是想要随机
# if __name__ == '__main__':
# p_list = []
# start_time = time.time()
# for i in range(3):
# p = Process(target=test, args=(f'进程{i}',))
# p.start()
# p_list.append(p)
# for p in p_list:
# p.join()
#
# print("主进程")
# print(time.time() - start_time)
#
# # 进程1 is runnig
# # 进程0 is runnig
# # 进程2 is runnig
# # 进程1 is over
# # 进程0 is over
# # 进程2 is over
# # 主进程
# # 2.14320969581604 if __name__ == '__main__':
p = Process(target=test, args=('egon',)) # 创建一个进程对象,容器类型的只有一个元素记得写逗号 (第一个元素, )
p2 = Process(target=test, args=('kevin',))
p3 = Process(target=test, args=('jason',))
start_time = time.time() p.start() # 仅仅只是告诉操作系统帮你创建一个进程,至于这个进程什么时候创建,由操作系统随机决定
p2.start()
p.join() # 主进程代码等待子进程运行结束
p3.start()
# time.sleep(10) # 下面打印顺序会改变
p2.join()
p3.join() print(f"主进程")
print(time.time() - start_time)
# 4.193955898284912 # join的位置不一样会影响执行时间(与其他进程的start 前后关系)
# 这个时间比上面那个代码多了2s(小数部分的时间是程序执行时间,每次都不一样的)

进程间数据互相隔离

from multiprocessing import  Process

money = 100

def test():
global money
money = 999999
print(f"子进程的money:{money}...") # 主进程不调用这个方法 if __name__ == '__main__':
p = Process(target=test)
p.start()
p.join() # 让主进程等待某个子进程执行完后再继续执行
print(money) # 子进程的money:999999...
# 100 # 并没有被修改 主进程与子进程的名称空间是隔离的

虽然用了global 想要变成全局的,但子进程中的数据并没有影响到主进程的,进程间数据是相互隔离的

进程对象及其他方法

from multiprocessing import Process, current_process
import os
import time def test(name):
# --------------------------------------------
# current_process().pid 获取当前进程的 pid
# 不要忘了上面的导模块
# --------------------------------------------
print('%s is running' % name, current_process().pid)
# --------------------------------------------
# os.getpid() 获取当前进程的pid
# os.getppid() 获取当前进程的父级进程的pid
#
# os 模块是与操作系统打交道的,进程是属于系统的概念(所以可以用os来获取进程号嘛...个人推测)
# --------------------------------------------
print('%s is running' % name, '子进程%s' % os.getpid(), '父进程%s' % os.getppid())
time.sleep(30)
print('%s is over' % name) if __name__ == '__main__':
p = Process(target=test, args=('egon',))
p.start()
# --------------------------------------------
# p.terminate() # 杀死当前进程
# 其实是告诉操作系统帮你杀死一个进程,要等操作系统操作了才是真的杀死进程
# 也就是为什么 有时运行了p.terminate() 然后运行 p.is_alive()得到的结果是True
# --------------------------------------------
time.sleep(0.1)
# --------------------------------------------
# p.is_alive() # 判断调用此方法的进程是否存活
# --------------------------------------------
print(p.is_alive()) # 判断进程是否存活
print('主', current_process().pid)
print('主', os.getpid(), '主主进程:%s' % os.getppid())
time.sleep(60) # 预留点时间可以在命令行看到这些进程之间的关系 # 注释掉 p.terminate() 之前
# False
# 主 13900
# 主 13900 主主进程:14256 # 注释掉 p.terminate() 之后
# True
# 主 14388
# 主 14388 主主进程:14256
# egon is running 9332
# egon is running 子进程9332 父进程14388
# egon is over

创建进程开销还是挺大的(创建的进程都是 python.exe, python解释器也是两份)

在pycharm中运行代码

python并发编程-进程理论-进程方法-守护进程-互斥锁-01

在cmd命令行中运行代码

python并发编程-进程理论-进程方法-守护进程-互斥锁-01

谁运行的(创建的),谁就是父进程

强调:代码不能直接操作进程,只能告诉操作系统,让操作系统去做(代码速度很快,操作系统可能还没做/做完)

补充

僵尸进程

子进程在被关闭后,所占用的pid不会被回收(系统的进程数量是有限的),只有等父进程去释放它才能被回收(所以是有害的)

所有的进程都会步入僵尸进程(不会回收pid,除非主进程回收)

父进程回收子进程资源的两种方式

1.join方法(等待进程运行结束)

2.父进程正常结束(所有子进程运行完,且自身运行结束)

孤儿进程

子进程没有结束,父进程意外结束该子进程就成了孤儿进程

linux中,如果父进程意外死亡,他所创建的子进程都会被(init)回收

windows中也有一个类似init的来回收子进程

因为会自动回收,所以他是 无害

守护进程

被守护进程结束后守护进程立刻结束(后面的还未执行的代码直接不执行了)

from multiprocessing import Process
import time # 古代皇帝死了一般太监都需要陪葬,所以举这么一个例子
def test(name):
print(f'{name}总管正常活着') time.sleep(3)
print(f'{name}总管正常死亡') # # windows下创建子进程必须卸载 __main__ 这个结构里
# if __name__ == '__main__':
# p = Process(target=test, args=('egon', ))
# p.start()
# print(f"皇帝jason寿终正寝")
#
# # 皇帝jason寿终正寝
# # egon总管正常活着
# # egon总管正常死亡 # # 加上p.daemon = True 让子进程成为守护进程
# if __name__ == '__main__':
# p = Process(target=test, args=('egon', ))
# p.daemon = True # 将该进程设置为守护进程,这一句必须放在start()之前,否则报错
# p.start()
# # 守护进程一般不加 p.join,都等子进程运行完了再接着走主进程那就不是守护进程了
# # p.join() # 加了也能正常运行,但它失去了守护进程的意义
# print(f"皇帝jason寿终正寝")
#
# # 皇帝jason寿终正寝 # 加上 time.sleep 给子进程一些运行时间(CPU运行速度超快的,1秒已经很长了)
if __name__ == '__main__':
p = Process(target=test, args=('egon', ))
p.daemon = True
p.start()
time.sleep(0.1) # 暂停0.1 秒,给子进程一点运行时间(子进程和主进程是同步运行的)
# 守护进程一般不加 p.join,都等子进程运行完了再接着走主进程那就不是守护进程了
# p.join() # 加了也能正常运行,但它失去了守护进程的意义
print(f"皇帝jason寿终正寝") # egon总管正常活着
# 皇帝jason寿终正寝

互斥锁*****

先看一个用并发模拟多人的抢票的案例

import json
from multiprocessing import Process
import time # 查票
def search(i):
with open('07data.txt', 'r', encoding='utf-8') as f:
data = f.read() ticket_dict = json.loads(data)
print(f"用户{i}查询余票为:{ticket_dict.get('ticket')}") # 买票
def buy(i):
with open('07data.txt', 'r', encoding='utf-8') as f:
data = f.read() ticket_dict = json.loads(data)
time.sleep(3)
if ticket_dict.get('ticket') > 0:
ticket_dict['ticket'] -= 1
print(ticket_dict)
with open('07data.txt', 'wt', encoding='utf-8') as f:
json.dump(ticket_dict, f)
print(f"用户{i}抢票成功")
else:
print("没票了!") def run(i):
search(i)
buy(i) if __name__ == '__main__':
for i in range(4):
p = Process(target=run, args=(i, ))
p.start() # {"ticket": 2} 文件数据
# 用户0查询余票为:2
# 用户1查询余票为:2
# 用户2查询余票为:2
# 用户3查询余票为:2
# {'ticket': 1}
# 用户0抢票成功
# {'ticket': 1}
# 用户1抢票成功
# {'ticket': 1}
# 用户2抢票成功
# {'ticket': 1}
# 用户3抢票成功 # 大于余票了(拿到的都是2张票),这样肯定不行

可以看出,文件中虽然只有2张票,这4个用户却都抢票成功了,并且还有一张余票,在现实生活中这种情况肯定是不允许出现的!

当多个进程操作同一份数据时会造成数据的错乱,这个时候必须加锁处理

​ 将并发变成串行,虽然降低了效率,但是提高了数据安全

那么就尝试着用互斥锁来解决这个问题

import json
from multiprocessing import Process, Lock
import time # 查票
def search(name):
with open('07data.txt', 'r', encoding='utf-8') as f:
data = f.read() ticket_dict = json.loads(data)
print(f"用户{name}查询余票为:{ticket_dict.get('ticket')}") # 买票
def buy(name):
with open('07data.txt', 'r', encoding='utf-8') as f:
data = f.read() ticket_dict = json.loads(data)
time.sleep(1)
if ticket_dict.get('ticket') > 0:
ticket_dict['ticket'] -= 1
print(ticket_dict)
with open('07data.txt', 'wt', encoding='utf-8') as f:
json.dump(ticket_dict, f)
print(f"用户{name}抢票成功")
else:
print("没票了!") def run(i, mutex):
search(i)
mutex.acquire() # 抢锁,只要有人抢到了锁,其他人必须等待该人释放锁
buy(i)
mutex.release() # 释放锁 if __name__ == '__main__':
mutex = Lock() # 生成了一把锁
for i in range(4):
p = Process(target=run, args=(i, mutex))
p.start() # {"ticket": 1} 文件数据
# 用户0查询余票为:1
# 用户1查询余票为:1
# 用户2查询余票为:1
# 用户3查询余票为:1
# {'ticket': 0}
# 用户0抢票成功
# 没票了!
# 没票了!
# 没票了! # 总共1张票,被抢到一张票,票数变为0,符合预期设想

注意

  • 锁不要轻易使用,容易造成死锁现象
  • 只在处理数据的部分加锁,不要再全局加锁(将局部由并发变成串行)

锁必须在主进程中产生(实例化),交给子进程去使用

​ (在子进程中产生那不就是多把锁了吗)

只要多进程操作同一份数据,就必须要加锁处理