一、守护进程
1、主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
2、程序核心表现:
p.daemon=True
注意要求:一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
print("子进程开始")
print("启动视频,开始战斗")
time.sleep(0.5)
print("%s和心仪的女主播大战了几百回合"%self.name)
time.sleep(2)
print("关闭视频")
if __name__ == '__main__':
p=MyProcess("太白")
# 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p.daemon=True
p.start()
time.sleep(0.5)
print("猪")
守护进程源码
二、同步锁
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。
加锁可以保证多个进程修改同一块数据时,同一时间只有一个任务可以进行修改,即串行修改,这样做的话牺牲了速度,保证了数据安全。
#加锁保证数据安全,不出现混乱
from multiprocessing import Process,Lock
import json,time,random #查看剩余票数
def search():
dic=json.load(open("db","r")) #打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
print("\033[43m系统剩余%s\033[0m"%dic["count"]) # 抢票
def get():
dic=json.load(open("db","r"))
#模拟读数据的网络延迟,那么进程之间的切换,导致所有人拿到的字典都是{"count": 5},也就是每个人都拿到了这一票。
time.sleep(0.1) if dic["count"]>0:
dic["count"] -=1
time.sleep(0.2) # 模拟写数据的网络延迟
json.dump(dic, open("db", "w"))
print('\033[46m购票成功\033[0m')
else:
print("票已经没了,你走吧!!") def task(loc):
search()
# 因为抢票的时候是发生数据变化的时候,所有我们将锁加加到这里
loc.acquire()
get()
loc.release() if __name__ == '__main__':
loc = Lock() # 创建一个锁
for i in range(10):
p=Process(target=task,args=(loc,))#模拟并发10个客户端抢票
p.start()
模拟抢票同步锁
保证数据安全用的,但是将锁起来的那段代码的执行变成了同步\串行,牺牲了效率,保证了安全,
使用lock基本格式:
L = Lock()#创建锁对象
L.acquire() #上锁
数据操作
L.release()#解锁
三、.信号量(了解)
限定一次性可执行的进程数量。
互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
信号量Semaphore介绍
比如大保健:提前设定好,一个房间只有4个床(计数器现在为4),那么同时只能四个人进来,谁先来的谁先占一个床(acquire,计数器减1),4个床满了之后(计数器为0了),第五个人就要等着,等其中一个人出来(release,计数器加1),他就去占用那个床了。
from multiprocessing import Process,Semaphore
import time,random def go_ktv(sem,user):
sem.acquire()
print('%s 占到一间ktv小屋' %user)
time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同
sem.release() if __name__ == '__main__':
sem=Semaphore(4)
p_l=[]
for i in range(13):
p=Process(target=go_ktv,args=(sem,'user%s' %i,))
p.start()
p_l.append(p) for i in p_l:
i.join()
print('============》')
信号量的使用
#模拟Semaphore 执行机制
#Semaphore中执行的进程数不能超过设定的进程数,如果超过就锁住,等进程执行完一个就进入一个
from multiprocessing import Process,Semaphore
import time,random def func(i,s):
time.sleep(random.uniform(0.2,1))
s.acquire()
time.sleep(random.uniform(2,3))
print(f"我是--{i}---号")
s.release() if __name__ == '__main__': s = Semaphore(4)
for i in range(1,15):
p = Process(target=func,args=(i,s,))
p.start()
执行机制模拟
四、事件
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。 clear:将“Flag”设置为False
set:将“Flag”设置为True
事件执行机制
from multiprocessing import Process,Event e=Event() #创建事件对象,时间初始值为False
print("事件开始")
print(e.is_set()) #False #查看时间当前状态
e.set() #将事件设置为True
print(">>>",e.is_set())#True
e.clear() #将事件恢复False
# e.wait() #等待事件变成True,才继续往下执行
print("结束了")
事件的基本使用和相关函数介绍
from multiprocessing import Process,Event
import time
def func1(e):
print("子进程计算开始")
time.sleep(2) #模拟子进程计算时间
e.set() #将进程设置为True,结束主进程的时间wait
print("子进程结束") if __name__ == '__main__':
e=Event()#创建时间
p=Process(target=func1,args=(e,)) #把时间传递给子进程
p.start() #就绪,等待操作系统调度创建进程和执行进程
print("主进程开启")
time.sleep(1) #时间模拟延迟
e.wait() #等待子进程操作完成后,再执行后续程序
print("主进程结束")
基于事件的通信
from multiprocessing import Process, Event
import time, random def car(e, n):
while True:
if not e.is_set(): # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
print('\033[31m红灯亮\033[0m,car%s等着' % n)
e.wait() # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
print('\033[32m车%s 看见绿灯亮了\033[0m' % n)
time.sleep(random.randint(2,4))
if not e.is_set(): #如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
continue
print('车开远了,car', n)
break # def police_car(e, n):
# while True:
# if not e.is_set():# 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
# print('\033[31m红灯亮\033[0m,car%s等着' % n)
# e.wait(0.1) # 阻塞,等待设置等待时间,等待0.1s之后没有等到绿灯就闯红灯走了
# if not e.is_set():
# print('\033[33m红灯,警车先走\033[0m,car %s' % n)
# else:
# print('\033[33;46m绿灯,警车走\033[0m,car %s' % n)
# break def traffic_lights(e, inverval):
while True:
time.sleep(inverval)
if e.is_set():
print('######', e.is_set())
e.clear() # ---->将is_set()的值设置为False
else:
e.set() # ---->将is_set()的值设置为True
print('***********',e.is_set()) if __name__ == '__main__':
e = Event()
for i in range(10):
p=Process(target=car,args=(e,i,)) # 创建10个进程控制10辆车
time.sleep(random.random(1, 3)) #车不是一下子全过来
p.start() # for i in range(5):
# p = Process(target=police_car, args=(e, i,)) # 创建5个进程控制5辆警车
# p.start() #信号灯必须是单独的进程,因为它不管你车开到哪了,我就按照我红绿灯的规律来闪烁变换,对吧
t = Process(target=traffic_lights, args=(e, 5)) # 创建一个进程控制红绿灯
t.start() print('预备~~~~开始!!!')
通过事件来模拟红绿灯示例
五、进程通信Queue
from multiprocessing import Queue
#队列 先进先出 fifo first in first out
q=Queue(3) #创建一个队列 ,容量为:3
#q.put() 往队列里面加入元素
q.put(1)
q.put(2)
q.put(3)
print(">>>",q.full()) #判断是否满了
#q.get() 取出数据
print(q.get())
print(q.get())
print(q.get())
print(">>>",q.empty()) #判断是否空
q.get_nowait()
try:
s=q.get_nowait()
print('+++',s)
except :
print(1123)
基本方法使用
#缓冲区解耦事件
from multiprocessing import Process,Queue
import time
def producer(q):
print("开始生产包子")
for i in range(1,11):
time.sleep(2)
baozi="%s号包子"%i
q.put(baozi)
print(baozi+"生产完毕") q.put(None) def consumer(q):
print(">>>客人准备,开始吃包子")
while 1:
time.sleep(1)
baozi=q.get()
if baozi==None:
print("可以走了")
break
print("%s被掉了"%baozi) if __name__ == '__main__':
q=Queue(10)
p1=Process(target=producer,args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
# print("吃包子")
生产者消费者模型(吃包子)
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,并且我可以根据生产速度和消费速度来均衡一下多少个生产者可以为多少个消费者提供足够的服务,就可以开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。
六、joinablequeue队列应用
import time
from multiprocessing import Process,JoinableQueue def producer(q):
for i in range(10):
time.sleep(0.5)
q.put('包子%s号'%i)
print('包子%s号生产完毕'%i)
print('aaaaaaaaaaaaa')
q.join() #
print('包子卖完了') def consumer(q):
while 1:
baozi = q.get()
time.sleep(0.8)
print('%s被吃掉了'%baozi)
q.task_done() #给队列发送一个任务处理完了的信号 if __name__ == '__main__': q = JoinableQueue()
p1 = Process(target=producer,args=(q,))
p2 = Process(target=consumer,args=(q,))
p2.daemon = True
p1.start()
p2.start()
p1.join() #主进程等着生产者进程的结束才结束 ,生产者结束意味着q获得了10个task_done的信号,
python 守护进程、同步锁、信号量、事件、进程通信Queue的更多相关文章
-
进程同步控制(锁,信号量,事件), 进程通讯(队列和管道,生产者消费者模型) 数据共享(进程池和mutiprocess.Pool模块)
参考博客 https://www.cnblogs.com/xiao987334176/p/9025072.html#autoid-1-1-0 进程同步(multiprocess.Lock.Semaph ...
-
《python》join、守护进程、锁/信号量/事件、进程队列
一.multiprocess.process模块 1.join方法 阻塞主进程,等待子进程执行完毕再放开阻塞 import time import random from multiprocessin ...
-
python 全栈开发,Day39(进程同步控制(锁,信号量,事件),进程间通信(队列,生产者消费者模型))
昨日内容回顾 python中启动子进程并发编程并发 :多段程序看起来是同时运行的ftp 网盘不支持并发socketserver 多进程 并发异步 两个进程 分别做不同的事情 创建新进程join :阻塞 ...
-
041.Python守护进程,锁信号量和事件
一 守护进程 1.1 基本概念 守护进程 正常情况下,主进程默认等待子进程调用结束之后结束 守护进程在主进程执行代码结束后,自动终止 守护进程语法: 进程对象.daemon = True ,设置该进程 ...
-
Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures
参考博客: https://www.cnblogs.com/xiao987334176/p/9046028.html 线程简述 什么是线程?线程是cpu调度的最小单位进程是资源分配的最小单位 进程和线 ...
-
python 全栈开发,Day42(Thread类的其他方法,同步锁,死锁与递归锁,信号量,事件,条件,定时器,队列,Python标准模块--concurrent.futures)
昨日内容回顾 线程什么是线程?线程是cpu调度的最小单位进程是资源分配的最小单位 进程和线程是什么关系? 线程是在进程中的 一个执行单位 多进程 本质上开启的这个进程里就有一个线程 多线程 单纯的在当 ...
-
网络编程基础----并发编程 ---守护进程----同步锁 lock-----IPC机制----生产者消费者模型
1 守护进程: 主进程 创建 守护进程 辅助主进程的运行 设置进程的 daemon属性 p1.daemon=True 1 守护进程会在主进程代码执行结束后就终止: 2 守护进程内无法再开启子进程 ...
-
Python守护进程、进程互斥锁、进程间通信ICP(Queue队列)、生产者消费者模型
知识点一:守护进程 守护进程:p1.daemon=True 守护进程其实就是一个“子进程“,守护=>伴随 守护进程会伴随主进程的代码运行完毕后而死掉 进程:当父进程需要将一个任务并发出去执行,需 ...
-
Python 线程同步锁, 信号量
同步锁 import time, threading def addNum(): global num num -= 1 num = 100 thread_list = [] for i in ran ...
-
Python:Day28 同步锁
同步锁: Python不是有一把锁了吗?为什么还要加锁? Python解释器的GIL的作用是同一时刻只有一个线程被CPU执行,而同步锁的作用同一时刻只有一个线程对锁定代码块操作 如果不加锁,当多个线程 ...
随机推荐
-
使用Apache Bench进行压力测试
Apache Bench是Apache中自带的压力测试工具 在linux中我们安装好apache后可以通过ab指令使用它 格式:ab [参数] [http://]ip地址/path/ 常用参数说明: ...
-
python strip()函数
转发:jihite-博客园-python strip()函数 函数原型 声明:s为字符串,rm为要删除的字符序列 s.strip(rm) 删除s字符串中开头.结尾处,位于 rm删除序列的 ...
-
只是一个用EF写的一个简单的分页方法而已
只是一个用EF写的一个简单的分页方法而已 慢慢的写吧.比如,第一步,先把所有数据查询出来吧. //第一步. public IQueryable<UserInfo> LoadPagesFor ...
-
Curator Recipes(Cache&;Counter)
Cache 路径缓存(Path Cache) 监视一个ZNode,当子节点增加.更新.删除改变状态时,路径缓存会在本地保存当前子节点及其数据和状态. public PathChildrenCache( ...
-
Mahout canopy聚类
Canopy 聚类 一.Canopy算法流程 Canopy 算法,流程简单,容易实现,一下是算法 (1)设样本集合为S,确定两个阈值t1和t2,且t1>t2. (2)任取一个样本点p,作为一个C ...
-
Oracle ORA-08104报错处理方法及注意事项
[环境介绍] 系统环境:IBM P740 8205-E6C (AIX) + 11.2.0.3.0 Oracle RAC [背景介绍] 故障描述:数据库表空间超过90%,无法进行扩容表空间,需要业务侧清 ...
-
table-一列细分为多列(合并单元格)
这个是一个很常见的一个表格展示需求,其中最要的就两个属性 rowspan 和 colspan .他们分别就是合并行 与 合并列. 要做的效果是如图下面这个,联系电话就是合并了单元格.这个说法类似于ex ...
-
转://Oracle A用户给B用户授权查询指定表或视图权限方案
用DNINMSV31账户登录数据库进行如下操作: CREATE USER NORTHBOUND IDENTIFIED BY NORTHBOUND DEFAULT TABLESPACE "TB ...
-
Nginx stream ssl
L 115 端口监听 netstat -anp | (端口名)
-
在Windows命令行中编译运行C/C++程序
此处运行环境是在Windos下,运行cmd命令进入DOS界面 现在有一段简单C++代码(文件名为 demo.cpp),用于计算a*b的值 #include<iostream> using ...