进程是资源的一个集合,
1、一个应用程序,可以有多进程和多线程
2、默认一个程序是单进程单线程
IO操作使用多线程提高并发
计算操作使用多进程提高并发
进程与线程区别
1、线程共享内存空间,进程的内存是独立的
2、线程共享进程内存的数据,进程之间的数据是无法进行访问的
3、在同一个进程内,线程之间可以直接进行数据的访问,两个进程如果通信,必须通过一个中间代理进行通信
4、创建线程非常容易,创建新进程需要对其父进程进行一次克隆
5、一个线程可以控制和操作同一进程里的其它线程,但是进程只能操作子进程
什么时候适合多线程:
IO密集型,socket,爬虫,web
什么时候适合多进程:
CPU运算密集型,金融分析
一个简单的创建多线程例子:
import threading
import time def run(n):
print('task', n)
time.sleep(2) # 多线程
t1 = threading.Thread(target=run, args=("t1",)) # 要创建一个线程,并让线程执行run方法
t2 = threading.Thread(target=run, args=("t2",))
t1.start() # 并不代表当前线程会被立即执行,需要等待cpu进行调度
t2.start() # 启动另一个线程 print(t1.getName()) # 获取线程名
print(t2.getName()) # 非多线程
# run('t1')
# run('t2')
利用for循环创建多线程:
import threading
import time def run(n):
print('task', n)
time.sleep(2) for i in range(50):
t = threading.Thread(target=run, args=("t-%s" % i,))
t.start()
那主线程是否等子线程呢?举个例子
通过这个例子可以看出,当主线程执行完毕后,等待子线程
import time
import threading def run(num):
time.sleep(5)
print(num) t1 = threading.Thread(target=run,args=(1,))
t1.start()
print('end')
setDaemon方法:主线程执行完毕后,不等子线程,所以在这里你永远看不见子线程有结果
import time
import threading def run(num):
time.sleep(5)
print(num) for i in range(10):
t1 = threading.Thread(target=run, args=(i,))
t1.setDaemon(True) # 设置成守护线程,true ,表示主线程不等子线程
t1.start()
print('end')
join方法:等待子线程执行完毕后,再继续
import time
import threading def run(num):
time.sleep(2)
print(num) for i in range(5):
t1 = threading.Thread(target=run, args=(i,))
t1.start()
t1.join() # 主线程执行到这里就等待,直到子线程执行完毕后,再继续
print('end')
上边的例子貌似解决了,等待子线程都执行完毕后,主线程继续执行,但是变为串行了,怎么解决呢?
看下面的例子
import time
import threading def run(num):
time.sleep(2)
print(num) t_list = []
for i in range(5):
t1 = threading.Thread(target=run, args=(i,))
t1.start()
t_list.append(t1) for t in t_list:
t.join() print('main thread')
join()+参数:表示主线程在此最多等N秒
import time
import threading def run(num):
time.sleep(2)
print(num) for i in range(5):
t1 = threading.Thread(target=run, args=(i,))
# t1.setDaemon(True) # true ,表示主线程不等子线程
t1.start()
t1.join(2) # 主线程执行到这里就等待,直到子线程执行完毕后,再继续
print('end')
import threading class MyThread(threading.Thread):
def __init__(self, func, args):
self.func = func
self.args = args
super(MyThread, self).__init__() def run(self):
self.func(self.args) def f2(arg):
print(arg) obj = MyThread(f2, 1234)
obj.start()
import threading class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super(MyThread, self).__init__() def run(self): # 必须是run名字,定义每个线程要运行的函数
print("running task", self.n) t1 = MyThread('t1')
t2 = MyThread('t2') t1.start()
t2.start()
queue模块
import queue q = queue.Queue(maxsize=2) # 默认先进先出,可以添加参数maxsize=2
q1 = queue.LifoQueue() # last in fisrt out 先进后出
q2 = queue.PriorityQueue # 优先级 # put 放数据
# get 取数据
# 队列最大长度 q.put(1)
q.put(2)
q.put(3, block=False) # 当队列默认最大2条消息时候,再放就等待,可以在put里面增加block=False,timeout参数,就不等待了
task_done()和join方法
import queue q = queue.Queue(5) q.put(1)
q.put(2) q.get()
q.task_done() # 告诉队列这个数据我取出来
q.get()
q.task_done()
q.join() # 如果队列里面还有数据,我就等待,否则就终止,这个参数需要task_done配合一起是用
优先级队列
q = queue.PriorityQueue() q.put([3, 'abc'])
q.put([0, 'ccc'])
print(q.get())
多线程锁机制
防止多个线程同时修改同一个共享数据
例子1:模拟多个线程修改同一个共享数据
import time
import threading NUM = 10 def func():
global NUM
NUM -= 1
time.sleep(2)
print(NUM) t_list = []
for i in range(10):
t = threading.Thread(target=func)
t.start()
t_list.append(t) for t in t_list:
t.join() print('end')
通过锁机制,同一时间只允许一个线程进行值的修改
import time
import threading NUM = 10 def func(l):
global NUM
# 加锁
l.acquire()
NUM -= 1
time.sleep(2)
print(NUM)
# 释放锁
l.release() lock = threading.Lock() for i in range(10):
t = threading.Thread(target=func, args=(lock,))
t.start()
Semaphore(信号量)
锁允许一个线程在同一时间更改数据,而Semaphore是同时允许一定数量的线程更改数据。
import time
import threading NUM = 10 def func(s):
global NUM
# 加锁
s.acquire()
NUM -= 1
time.sleep(2)
print(NUM)
# 释放锁
s.release() lock = threading.Lock()
semaphore = threading.BoundedSemaphore(3) for i in range(10):
t = threading.Thread(target=func, args=(semaphore,))
t.start()
事件(event)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
event = threading.Event()
event.wait()
event.set()
event.clear()
import threading
import time event = threading.Event() def ligher():
count = 0
while True:
if count < 30:
if not event.is_set():
event.set()
print('green')
elif count < 35:
print('yellow')
elif count < 60:
if event.is_set():
event.clear()
print('red')
else:
count = 0
count += 1
time.sleep(0.3) def car(n):
count = 0
while True:
event.wait()
print("car [%s] is running.." % n)
count += 1
time.sleep(1) red_light = threading.Thread(target=ligher)
red_light.start() c1 = threading.Thread(target=car, args=(1,))
c1.start()
条件(Condition)
Timer
from threading import Timer def hello():
print("hello, world") t = Timer(1, hello)
t.start() # after 1 seconds, "hello, world" will be printed
生产者消费者模型
例1:
import queue
import time
import threading def consumer(name):
while True:
print("%s-->取到骨头[%s]" % (name, q.get()))
time.sleep(0.5) def producer(name):
count = 0
while q.qsize() < 5:
print("%s 生产了骨头" % name, count)
q.put(count)
count += 1
time.sleep(3) q = queue.Queue(4) p1 = threading.Thread(target=producer, args=('生产者1',))
p2 = threading.Thread(target=producer, args=('生产者2',))
c1 = threading.Thread(target=consumer, args=('消费者1',))
p1.start()
p2.start()
c1.start()
例2:
import queue
import time
import threading def consumer(name):
while True:
print("%s-->取到骨头[%s]" % (name, q.get()))
time.sleep(0.5)
q.task_done() # 给生产者发一个回执,这个参数跟q.join联合是用 def producer(name):
count = 0
for i in range(10):
print("%s 生产了骨头" % name, count)
q.put(count)
count += 1
time.sleep(0.3)
q.join()
print('-----------所有骨头都吃完了-----') q = queue.Queue(4) p1 = threading.Thread(target=producer, args=('生产者1',))
# p2 = threading.Thread(target=producer, args=('生产者2',))
c1 = threading.Thread(target=consumer, args=('消费者1',))
p1.start()
# p2.start()
c1.start()
多进程multiprocessing
1、开销大
2、可以利用cpu的多核特性
:
进程间的通信
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
只是实现了数据的传递
Queue:
from multiprocessing import Process
from multiprocessing import Queue def f(q):
q.put([42, None, 'hello']) if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
Pipes
Managers
2个进程修改同一个数据
from multiprocessing import Process, Manager def f(d, l,n):
d[n] = n
l.append(n)
print(l) if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l, i))
p.start()
p_list.append(p)
for res in p_list:
res.join() print(d)
print(l)
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
- apply
- apply_async
apply:串行基本没用callback方法
from multiprocessing import Pool
import time def f(i):
print('hello world', i)
time.sleep(1) def callback(data):
print("exec done--->", data) if __name__ == '__main__': pool = Pool(5)
for num in range(10):
pool.apply(func=f, args=(num,))
apply_async:
from multiprocessing import Pool
import time
import os def f(i):
print('hello world', i)
time.sleep(1)
print('-->PID', i, os.getpid())
return i def callback(data): # 接收f()函数的返回值
print('-->callback > pid', data, os.getpid())
# print("exec done--->", data) if __name__ == '__main__': pool = Pool(5)
for num in range(100):
pool.apply_async(func=f, args=(num,), callback=callback) # 必须有下边的 pool.close()和 pool.join(),别问为啥
print('end')
pool.close()
pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。