Day10-线程进程

时间:2023-03-10 04:11:56
Day10-线程进程

什么是进程?

程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。

有了进程为什么还要线程?

进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:

  • 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。

  • 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

例如,我们在使用qq聊天, qq做为一个独立进程如果同一时间只能干一件事,那他如何实现在同一时刻 即能监听键盘输入、又能监听其它人给你发的消息、同时还能把别人发的消息显示在屏幕上呢?你会说,操作系统不是有分时么?但我的亲,分时是指在不同进程间的分时呀, 即操作系统处理一会你的qq任务,又切换到word文档任务上了,每个cpu时间片分给你的qq程序时,你的qq还是只能同时干一件事呀。

再直白一点, 一个操作系统就像是一个工厂,工厂里面有很多个生产车间,不同的车间生产不同的产品,每个车间就相当于一个进程,且你的工厂又穷,供电不足,同一时间只能给一个车间供电,为了能让所有车间都能同时生产,你的工厂的电工只能给不同的车间分时供电,但是轮到你的qq车间时,发现只有一个干活的工人,结果生产效率极低,为了解决这个问题,应该怎么办呢?。。。。没错,你肯定想到了,就是多加几个工人,让几个人工人并行工作,这每个工人,就是线程!

什么是线程?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

进程线程总结:

进程:
  • 一个程序要运行时所需的所有资源的集合
  • 进程是资源的集合,相当于一个车间
  • 一个进程至少需要一个线程,这个线程称为主线程
  • 一个进程里可以有多个线程
  • cpu的核数越多,代表着可以真正并发的线程越多
  • 2个进程之间的数据是完全独立的,互相不能访问
线程:
  • 一道单一的指令的控制流,寄生在进程中
  • 单一进程里的多个线程是共享数据的
  • 多个线程在涉及修改同一个数据时一定要加锁

GIL

全局解释器锁,防止多个线程修改同一份数据

无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行

线程threading模块

启线程两种方式

直接调用

1、导入模块

2、定义函数

3、实际化threading.Thread

target跟函数名,args跟函数的参数,元祖,加逗号

import threading

def run(n):
print('thread',n) t = threading.Thread(target=run,args=(1,))
t.start()

启10个线程

import threading
import time def run(n):
time.sleep(1)
print('thread',n) for i in range(10):
t = threading.Thread(target=run,args=(i,))
t.start()
print(t.getName())#线程名字
print(threading.current_thread())#当前正在运行的线程的实例
print(threading.active_count())#统计线程数

继承式调用

import threading
import time class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
#super(MyThread, self).__init__()
self.num = num def run(self): # 定义每个线程要运行的函数
print("running on number:%s" % self.num)
time.sleep(3) if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()

join&&Daemon

主线程等子线程全部走完再走

import threading
import time def run(n):
time.sleep(1)
print('thread',n)
t_list = []
for i in range(10):
t = threading.Thread(target=run,args=(i,))
t.start()
t_list.append(t)
for t in t_list:
t.join()
print('main...')

#输出

thread 0
thread 1
thread 2
thread 3
thread 7
thread 8
thread 5
thread 4
thread 6
thread 9
main...

这里就是等10个子线程全部生成结束后执行主线程

Daemon

主线程挂了,子线程也一起挂啦

import  threading
import time def run(n):
time.sleep(1)
print("thread",n) for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.setDaemon(True)#把子线程设置为主线程的守护线程
t.start() print("--main thread---")

#输出

--main thread---

线程锁(互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?可能数据会乱,python3中没问题,python2中有问题。

import threading
import time def run(n):
global num
l.acquire()#获取锁
num += 1
l.release()#创建锁
print(num) num = 0
t_list = []
l = threading.Lock()#申明实例
for i in range(100):
t = threading.Thread(target=run,args=(i,))
t.start()
t_list.append(t)
for i in t_list:
i.join()
print('main')
print(num)

如果在num+=1后面加上time.sleep(1),就变成了串行,不要占着茅坑不拉屎,修改完数据后要立刻释放。对别的没有用到此修改数据的线程是没有关系的。

GIL VS Lock

那你又问了, 既然用户程序已经自己有锁了,那为什么C python还需要GIL呢?加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,  这可以说是Python早期版本的遗留问题。

RLock

import threading,time

def run1():
print("grab the first part data")
lock.acquire()
global num
num +=1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2+=1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res,res2) if __name__ == '__main__': num,num2 = 0,0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start() while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num,num2)

Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading, time

def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" % n)
semaphore.release() if __name__ == '__main__':
num = 0
semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start() while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('----all threads done---')
print(num)

Timer

import threading
def hello():
print("hello, world") t = threading.Timer(3.0, hello)
t.start()

events实现线程间通信

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

使用

event = threading.Event()

event.set()

event.clear()

event.wait()

如果event被clear,wait会阻塞,一直等到event被set

import threading
import time def light():
count = 0 while True:
if count < 30:
if not event.is_set():
event.set()
print("\033[32;1mgreen light-----\033[0m")
elif count < 34:
print("\033[33;1myellow light-----\033[0m")
elif count < 60:
if event.is_set():
event.clear()
print("\033[31;1mred light-----\033[0m")
else:
count = 0
count+=1
time.sleep(0.2)
def car(n):
count = 0
while True:
event.wait()
print("car [%s] is running...." % n )
count +=1
time.sleep(1)
event = threading.Event()

red_light = threading.Thread(target=light)
red_light.start() c1 = threading.Thread(target=car,args=(1,))
c1.start()

队列

python2 Queue

python3 queue

import queue
#q = queue.Queue(maxsize=3)#先进先出,最大3
#q = queue.LifoQueue()#先进后出
q = queue.PriorityQueue()#优先级
q.put([1,1])
q.put([4,2])
q.put([3,3])#前面是优先级
# print(q.empty())#判断是否空
# print(q.full())#是否满
q.qsize()队列大小
print(q.get())
print(q.get())
print(q.get())

三种应用场景:

FIFO 排队

LIFO 卖水果

优先级queue vip

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

import queue
import threading
import time def consumer(name):
while True:
print('%s 取得骨头[%s]并吃了它'%(name,q.get())) def producer(name):
count = 0
while q.qsize()<5:
print('%s生成了骨头'%name,count)
q.put(count)
count += 1
time.sleep(3) q = queue.Queue(maxsize=4) p = threading.Thread(target=producer,args=('alex',))
p2 = threading.Thread(target=producer,args=('alex1',))
c=threading.Thread(target=consumer,args=('ds',))
p.start()
p2.start()
c.start()

回执

import queue
import threading
import time def consumer(name):
while True:
print('%s 取得骨头[%s]并吃了它'%(name,q.get()))
time.sleep(0.5)
q.task_done() def producer(name):
count = 0
for i in range(10):
print('%s生成了骨头'%name,count)
q.put(count)
count += 1
time.sleep(1)
q.join()
print('-----all eat------') q = queue.Queue(maxsize=4) p = threading.Thread(target=producer,args=('alex',))
p2 = threading.Thread(target=producer,args=('alex1',))
c=threading.Thread(target=consumer,args=('ds',))
p.start()
p2.start()
c.start()

多进程

multiprocessing

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

subprocess支持使用多核

from multiprocessing import Process
import time def f(name):
time.sleep(1)
print('hello',name) if __name__ == '__main__':
for i in range(5):
p = Process(target=f,args=('alex',))
p.start()

使用多进程开销远比多线程多

进程间通信

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queue

from multiprocessing import Process, Queue

def f(q):
q.put([42, None, 'hello']) if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))#这里得把q传给f,进程复制一份数据
p.start()
print(q.get()) # prints "[42, None, 'hello']"
#p.join()

Pipe

from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close() if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()

Manager实现进程间数据共享

两个进程修改一份数据

from multiprocessing import Process, Manager

def f(d, l):
d[1] = ''
d[''] = 2
d[0.25] = None
l.append(1)
print(l) if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join() print(d)
print(l)

一般进程之间不进行数据的共享,开销大

进程同步

python2不加锁共享屏幕串行

python3

from multiprocessing import Process, Lock

def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release() if __name__ == '__main__':
lock = Lock() for num in range(10):
Process(target=f, args=(lock, num)).start()

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

  • apply_async
from  multiprocessing import Process,Pool
import time def Foo(i):
time.sleep(2)
return i+100 def Bar(arg):
print('-->exec done:',arg) pool = Pool(5) for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)
#pool.apply(func=Foo, args=(i,)) print('end')
pool.close()
pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

线程、进程的应用场景

线程:IO密集型 socket 爬虫 web
进程:cpu运算密集型 金融分析