Day11 线程、进程、协程

时间:2023-01-11 17:56:38

创建线程
第一种:
import threading
def f1(arg):
print(arg)

t = threading.Thread(target=f1, args=(123,))
#t.start()准备就绪,等待调度
t.start()
#cpu直接执行run方法
t.run()
第二种:
import threading
class MyThread(threading.Thread):

def __init__(self, func, args):
#自定义构造方法
self.func = func
self.args = args
#调用父类构造方法
super(MyThread, self).__init__()

def run(self):
self.func(self.args)

def f2(arg):
print(arg)

obj = MyThread(f2, 123)
obj.start()

队列:在内存中创建

import queue

#先进先出
q = queue.Queue(10)
#empty判断队列是否为空
print(q.empty())
q.put(11)
q.put(22)
print(q.empty())
# q.put(33)
# q.put(44)
#qsize:队列中的真实个数
#maxsize:最大支持个数
print(q.qsize())
#超时时间
# q.put(55, timeout = 2)
# #默认为阻塞,block=False表示不等待,如果队列满了直接报错
# q.put(66, block=False, timeout = 2)
print(q.get())
q.task_done()
#默认阻塞,block=False不再堵塞
print(q.get(block=False))
q.task_done()
#join队列中的元素还未完成,join阻塞,当队列中任务执行完成之后释放阻塞
q.join()

import queue

#后进先出
#last in first out
q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())

#优先级队列(数据+权重)
q = queue.PriorityQueue()
q.put((1, 'alex1'))
q.put((2, 'alex2'))
q.put((1, 'alex3'))
q.put((4, 'alex4'))
print(q.get())

#双向队列

q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
print(q.pop())
print(q.popleft())

#生产者、消费者模型
import queue,threading,time
q = queue.Queue(20)

def productor(arg):
while True:
q.put(str(arg) + '- 包子')

def consumer(arg):
while True:
print(arg, q.get())
time.sleep(4)

for i in range(3):
t = threading.Thread(target=productor, args=(i,))
t.start()

for j in range(20):
t = threading.Thread(target=consumer, args=(j,))
t.start()

模拟火车站售票
#生产者、消费者模型:支持瞬时并发数超大的情况
import queue,threading,time
q = queue.Queue(20)

def productor(arg):
"""
买票:
:param arg:
:return:
"""
q.put(str(arg) + '- 买票')

for i in range(300):
t = threading.Thread(target=productor, args=(i,))
t.start()

def consumer(arg):
"""
火车站售票:
:param arg:
:return:
"""
while True:
print(arg, q.get())
time.sleep(2)

for j in range(3):
t = threading.Thread(target=consumer, args=(j,))
t.start()

加锁、解锁
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

import threading
import time

NUM = 10

def func(l):
global NUM
#上锁
l.acquire()
NUM -= 1
l.acquire()
time.sleep(1)
l.release()
print(NUM)
# 开锁
l.release()

#只能锁一次
#lock = threading.Lock()
#递归锁,常用
lock = threading.RLock()

for i in range(10):
t = threading.Thread(target=func, args=(lock,))
t.start()

Event事件
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

import threading

def func(i, e):
print(i)
#wait:检测是什么灯,如果是红灯,停;绿灯,行
e.wait()
print(i + 100)

#event:将线程批量放行
event = threading.Event()
for i in range(10):
t = threading.Thread(target=func, args=(i, event ,))
t.start()

#clear:设置成红灯
event.clear()
inp = input('>>>>>')
if inp == "1":
#设置成绿灯
event.set()

Condition等待:
第一种:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

import threading
def func(i, conn):
print(i)
conn.acquire()
conn.wait()
print(i + 100)
conn.release()

#使得线程等待,只有满足某条件时,才释放n个线程
c = threading.Condition()

for i in range(10):
t = threading.Thread(target=func, args=(i, c ,))
t.start()

while True:
inp = input('>>>')
#满足条件时,释放inp个线程
if inp == 'q':
break
c.acquire()
c.notify(int(inp))
c.release()

第二种:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

import threading
def condition():
ret = False
r = input('>>>')
if r == 'true':
ret = True
else:
ret = False
return ret
def func(i, conn):
print(i)
conn.acquire()
# 使得线程等待,只有满足某条件时,才释放n个线程
conn.wait_for(condition)
print(i + 100)
conn.release()

c = threading.Condition()

for i in range(10):
t = threading.Thread(target=func, args=(i, c ,))
t.start()

Timer定时器:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

#Timer:定时器
from threading import Timer
def hello():
print('hello Timer')

t = Timer(2, hello)
t.start()

线程池:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

#最简单的线程池
import queue
import threading
import time

class ThreadPool:
def __init__(self, maxsize=5):
self.maxsize = maxsize
self._q = queue.Queue(maxsize)
for i in range(maxsize):
self._q.put(threading.Thread)
#[threading.Thread, threading.Thread, threading.Thread, threading.Thread, threading.Thread]

def get_thread(self):
return self._q.get()

def add_thread(self):
self._q.put(threading.Thread)

pool = ThreadPool(5)
def task(arg, p):
print(arg)
time.sleep(2)
p.add_thread()
for i in range(100):
# t是threading.Thread类
t = pool.get_thread()
#obj对象
obj = t(target=task, args=(i, pool, ))
obj.start()

复杂线程池:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):

def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
#当前已经创建的线程数
self.generate_list = []
#当前还空闲多少线程,空闲:队列中没有任务
self.free_list = []

def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if self.cancel:
return
#已经创建的线程小于最大线程数,没有空闲的线程的情况下创建线程
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
#把三个参数存放到元祖中
w = (func, args, callback,)
self.q.put(w)

def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.call)
t.start()

def call(self):
"""
循环去获取任务函数并执行任务函数
"""
current_thread = threading.currentThread()
#将当前创建的线程加入到已经创建的线程列表中
self.generate_list.append(current_thread)

#队列中取任务,即在元祖中取(函数, 元祖, 函数)
event = self.q.get()
while event != StopEvent:
#event = (函数, 元祖, 函数)

func, arguments, callback = event
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = None

if callback is not None:
try:
callback(success, result)
except Exception as e:
pass

#执行完action,则设置为空闲状态
with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else:
#event为空,当前线程被移除
self.generate_list.remove(current_thread)

def close(self):
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True
#获取创建的线程数
full_size = len(self.generate_list)
while full_size:
#设置停止标志
self.q.put(StopEvent)
full_size -= 1

def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True

while self.generate_list:
self.q.put(StopEvent)

self.q.queue.clear()

@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)

# How to use

pool = ThreadPool(5)

#任务
def callback(status, result):
# status, execute action status
# result, execute action return value
pass

def action(i):
print(i)

#30个任务
for i in range(30):
#pool中的run方法
#函数名,元祖,函数名
ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()

进程数据共享:
1、queues方式
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

#内部使用的是os.fork(),windows下不能直接运行
from multiprocessing import Process
#特殊queues,实现进程的数据共享
from multiprocessing import queues
import multiprocessing
import threading
import time

#默认情况下进程的数据不共享
def foo(i, arg):
arg.put(i)
print('say hi', i, arg.qsize())

#windows下最多测试,不能直接使用,*nix下去掉if
if __name__ == '__main__':
li = queues.Queue(20, ctx=multiprocessing)
for i in range(10):
p = Process(target=foo, args=(i, li, ))
#p.daemon = True
p.start()
#p.join()

2、Array方式

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

#内部使用的是os.fork(),windows下不能直接运行
from multiprocessing import Process
#特殊queues,实现进程的数据共享
from multiprocessing import queues
import multiprocessing

import threading
import time

from multiprocessing import Array

#默认情况下进程的数据不共享
def foo(i, arg):
arg[i] = i + 100
for item in arg:
print(item)
print('==========')

#windows下最多测试,不能直接使用,*nix下去掉if
if __name__ == '__main__':
li = Array('i', 10)
"""
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

类型对应表
"""
for i in range(10):
p = Process(target=foo, args=(i, li, ))
#p.daemon = True
p.start()
#p.join()

3、Manager.dict方式
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

"""
进程之间数据共享(掌握3种方式):
1、queues
2、Array
3.Manager.dict
pipe:管道,类似于socket
"""
#内部使用的是os.fork(),windows下不能直接运行
from multiprocessing import Process
from multiprocessing import Manager
import multiprocessing
import threading
import time

from multiprocessing import Array

def foo(i, arg):
arg[i] = i + 100
print(arg.values())

#windows下最多测试,不能直接使用,*nix下去掉if
if __name__ == '__main__':
obj = Manager()
li = obj.dict()
for i in range(10):
p = Process(target=foo, args=(i, li, ))
#p.daemon = True
p.start()
#不加join会导致,主进程执行完成后将主进程和子进程的连接断开,无法实现数据共享
p.join()

进程锁
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

#进程锁
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
import threading
import time
from multiprocessing import RLock, Lock, Event, Condition, Semaphore

from multiprocessing import Array

def foo(i, lis, lc):
lc.acquire()
lis[0] = lis[0] - 1
time.sleep(1)
print('say hi', lis[0])
lc.release()

if __name__ == '__main__':
li = Array('i', 1)
li[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=foo, args=(i, li, lock, ))
p.start()

进程池:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

"""
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,
如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
apply
apply_async
"""
from multiprocessing import Pool
import time

def f1(arg):
time.sleep(1)
print(arg)

if __name__ == '__main__':
pool = Pool(5)

for i in range(30):
#pool.apply(func=f1, args=(i, ))
pool.apply_async(func=f1, args=(i, ))
#close:所有的任务全部执行完毕,才终止
#pool.close()
#terminate:立即终止
time.sleep(2)
pool.terminate()
pool.join()
print('end')

总结:IO密集型用多线程(如:爬虫)、计算密集型用多进程。

协程:

greenlet方式:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

"""
协程
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。
协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
"""
# !/usr/bin/env python
# -*- coding:utf-8 -*-

from greenlet import greenlet

def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()

def test2():
print(56)
gr1.switch()
print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

gevent方式:
import gevent

def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')

def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')

gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])

遇到IO操作自动切换:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

#monkey.patch_all():把原有socket进行修改,增加发送请求并得到请求的结果
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
print('GET: %s' % url)
resp = requests.get(url)
data = resp.text
print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])

memcached使用:

#第一次操作
import memcache

#debug = True 表示运行出现错误时,显示错误信息,上线后移除该参数。
mc = memcache.Client(['192.168.113.128:12000'], debug=True)
mc.set('foo','bar')
ret = mc.get('foo')
print(ret)

#天生支持集群
mc = memcache.Client([('192.168.113.128:12000', 1), ('1.1.1.1:12000', 3), ('1.1.1.2:12000', 2), ('1.1.1.3:12000', 4)], debug=True)
mc.set('k1', 'v1')

#增加一条记录
mc = memcache.Client(['192.168.113.128:12000'], debug=True)
mc.add('k5', 'v5')
# mc.add('k1', 'v2') # 报错,对已经存在的key重复添加,失败!!!

#替换
mc = memcache.Client(['192.168.113.128:12000'], debug=True)
# 如果memcache中存在k1,则替换成功,否则一场
mc.replace('k1','999')
val = mc.get('k1')
print(val)

#设置一个键值对
mc = memcache.Client(['192.168.113.128:12000'], debug=True)
mc.set('key7', 'wanghuafeng')
val = mc.get('key7')
print(val)

#设置多个键值对
mc = memcache.Client(['192.168.113.128:12000'], debug=True)
mc.set_multi({'key8': 'val8', 'key9': 'val9'})
item_dict = mc.get_multi(["key8", "key9"])
print(item_dict)

#get 获取一个键值对
#get_multi 获取多一个键值对
mc = memcache.Client(['192.168.113.128:12000'], debug=True)
val = mc.get('key0')
item_dict = mc.get_multi(["key1", "key2", "key3"])

#append 修改指定key的值,在该值 后面 追加内容
#prepend 修改指定key的值,在该值 前面 插入内容
mc = memcache.Client(['192.168.113.128:12000'], debug=True)
mc.prepend('k1', 'before')
val = mc.get('k1')
print(val)

#incr 自增,将Memcached中的某一个值增加 N ( N默认为1 )
#decr 自减,将Memcached中的某一个值减少 N ( N默认为1 )
mc.set('k1', '777')
mc.incr('k1')
val = mc.get('k1')
print(val)

mc.incr('k1', 10)
val = mc.get('k1')
print(val)

mc.decr('k1')
val = mc.get('k1')
print(val)
mc.decr('k1', 10)
val = mc.get('k1')
print(val)

#delete 在Memcached中删除指定的一个键值对
#delete_multi 在Memcached中删除指定的多个键值对

mc = memcache.Client(['192.168.113.128:12000'], debug=True)
mc.delete('key0')
mc.delete_multi(['key1', 'key2'])

#本质上每次执行gets时,会从memcache中获取一个自增的数字,
通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值进行比较,
如果相等,则可以提交,如果不想等,那表示在gets和cas执行之间,
又有其他人执行了gets(获取了缓冲的指定值), 如此一来有可能出现非正常数据,则不允许修改。
mc = memcache.Client(['192.168.113.128:12000'], debug=True, cache_cas=True)
v = mc.gets('product_count')
# 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
mc.cas('product_count', "899")