一.线程:
创建线程有两种方式(本质是一样的,创建好线程之后,cpu调度创建好的线程时执行的其实是Thread的run()方法):
import threading def f1(args):
print(args)
#方式一
t=threading.Thread(target=f1,args=(123,))
t.start() #方式二
class MyThread(threading.Thread):
def __init__(self,func,args):
self.func=func
self.args=args
super(MyThread,self).__init__()
def run(self):
self.func(self.args)
t1=MyThread(f1,123)
t1.start()
123
123
队列:
import queue q=queue.Queue(2)
q.put(1)
q.put(2)
q.put(3,block=False) #设置不阻塞(block为False),插不进去直接报错
Traceback (most recent call last):
File "/Users/admin/Desktop/zy/test.py", line 10, in <module>
q.put(3,block=False)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/queue.py", line 130, in put
raise Full
queue.Full
import queue,threading,time def f1(q):
q.put(123,timeout=3) #超时5s没有插进队列就会抛异常
q=queue.Queue(2)
q.put(1)
q.put(2)
t=threading.Thread(target=f1,args=(q,)) #子线程插入第三条数据
t.start() #创建好子线程啦,等待cpu调度
#情况一:
time.sleep(1) #主线程等待1s,子线程put超时3秒,先主线程get,后子线程put,put成功
#情况二:
time.sleep(5) #主线程等待5s,子线程put超时3秒,主线程没来得及get,子线程就put,会报错 q.get()
import queue q=queue.Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
print(q.get(block=False)) #设置不阻塞(block为False),队列里没有数据会报错
1
Traceback (most recent call last):
2
File "/Users/admin/Desktop/zy/test.py", line 12, in <module>
print(q.get(block=False))
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/queue.py", line 161, in get
raise Empty
queue.Empty
import queue,threading,time def f1(q):
print(q.get(timeout=3))
q=queue.Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
t=threading.Thread(target=f1,args=(q,))
t.start()
#情况一:
time.sleep(1) #主线程等待1s,子线程超时3s,主线程先put,子线程后get,不会报错
#情况二:
time.sleep(5) #主线程等待5s,子线程超时3s,子线程先get,主线程再put,子线程get不到数据,直接报错 q.put(3)
#!/usr/bin/env python
# -*- coding:utf-8 -*- import queue
#队列:先进先出
q=queue.Queue(10) #队列最大长度,默认是0 无限大
q.put(11,timeout=5) #放数据,默认阻塞True 阻塞时可以设置超时时间
q.put(22,block=False) #非阻塞,不等待
size=q.qsize() #队列长度
data=q.get(timeout=5) #取数据 默认阻塞True 阻塞时可以设置超时时间
data=q.get(block=False) #非阻塞,不等待
bool_num=q.empty() #队列为空返回True
bool_num=q.full() #队列满了返回True
q.join() #阻塞进程,当队列中任务执行完毕之后(q.task_done),不再阻塞
q.task_done()
import queue
#队列:先进先出
q=queue.Queue(2) #队列最大长度,默认是0 无限大 q.put(11)
q.task_done()
q.put(22) q.join()
阻塞进程
import queue
#队列:先进先出
q=queue.Queue(2) #队列最大长度,默认是0 无限大 q.put(11)
q.task_done()
q.put(22) #每执行完一步要执行下q.taskdone()
q.task_done()
q.join()
不阻塞
import queue
#队列:后进先出
q=queue.LifoQueue(5)
q.put(123) #优先级相同,先进先出
q.put(345)
q.put(456) print(q.get())
print(q.get())
print(q.get())
456
345
123
import queue
#队列:优先级
q=queue.PriorityQueue(5) q.put((0,123)) #优先级相同,先进先出
q.put((1,345))
q.put((0,456)) print(q.get())
print(q.get())
print(q.get())
(0, 123)
(0, 456)
(1, 345)
import queue
#队列:双向队列 q=queue.deque()
q.append(123)
q.append(456)
q.appendleft(789)
q.appendleft(890)
# 980 789 123 456
print(q.pop())
print(q.popleft())
456
890
生产者消费者模型:功能:1.客户端不阻塞,不持续占用连接,解决瞬时高并发 2.程序解耦合(生产者改变不影响消费者那端)
#!/usr/bin/env python
# -*- coding:utf-8 -*- import threading,queue,time
q=queue.Queue(50)
def producter(user):
print("%s来买票啦" % user)
q.put("%s来买票啦" % user)
def consumer():
who=q.get()
time.sleep(5)
print(who + "--出票") for i in range():
user="user" + str(i)
t=threading.Thread(target=producter,args=(user,))
t.start()
while True:
t1=threading.Thread(target=consumer)
t1.start()
user0来买票啦
user1来买票啦
user2来买票啦
user3来买票啦
user1来买票啦--出票
user0来买票啦--出票
user2来买票啦--出票
user3来买票啦--出票
线程里锁的机制:
应用程序的最小单位是线程,他们共用这个应用程序里的资源,如果同时对某个数据进行修改的话,就会产生脏数据
解决:锁
#!/usr/bin/env python
# -*- coding:utf-8 -*- import threading,queue,time
q=queue.Queue(5)
def producter(user):
print("%s来买票啦" % user)
q.put("%s来买票啦" % user)
num=12
def consumer(lock):
global num
lock.acquire() #上锁
who=q.get()
num-=1
time.sleep(2)
print(who + "--出票","剩余票数:%d" % num)
lock.release() #开锁
for i in range(4):
user="user" + str(i)
t=threading.Thread(target=producter,args=(user,))
t.start()
#lock=threading.Lock()
lock=threading.RLock() #构建锁对象 Lock只能加一重锁 RLock可以加多重锁
while True:
t1=threading.Thread(target=consumer,args=(lock,))
t1.start()
user0来买票啦
user1来买票啦
user2来买票啦
user3来买票啦
user0来买票啦--出票 剩余票数:11
user1来买票啦--出票 剩余票数:10
user2来买票啦--出票 剩余票数:9
user3来买票啦--出票 剩余票数:8
#lock=threading.Lock()
#lock=threading.RLock() #构建锁对象 Lock只能加一重锁 RLock可以加多重锁
lock=threading.BoundedSemaphore(2) #上面两种锁都是每次放行1个线程,这个可放行多个
一次放行2个线程
#!/usr/bin/env python
# -*- coding:utf-8 -*- import threading def func(i,event):
print(i)
event.wait() #监测是什么灯,红灯停绿灯行
print(i+100) event=threading.Event() #信号 收到信号就放行所有线程 for i in range(4):
t=threading.Thread(target=func,args=(i,event,))
t.start() event.clear() #设置成红灯
inp=input(">>>:")
if inp == "1":
event.set() #设置成绿灯
1
2
3
>>>:1
100
101
102
103
线程池:
#!/usr/bin/env python
# -*- coding:utf-8 -*- import queue,threading,time
class ThreadPool:
def __init__(self,maxsize=5):
self.maxsize=maxsize
self._q=queue.Queue(maxsize)
for i in range(maxsize):
self._q.put(threading.Thread)
def get_thread(self):
return self._q.get()
def add_thread(self):
self._q.put(threading.Thread) pool=ThreadPool(5)
def task(args,p):
print(args)
time.sleep(2)
p.add_thread()
for i in range(10):
t=pool.get_thread()
obj = t(target=task,args=(i,pool,))
obj.start()
3个缺点:
1.
进程:
共享空间:
#!/usr/bin/env python
# -*- coding:utf-8 -*- from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
from multiprocessing import Manager def foo(i,args):
args[i]=i+100
print(args.values())
if __name__ == "__main__":
obj=Manager()
li=obj.dict()
for i in range(5):
p = Process(target=foo,args=(i,li,))
p.start()
import time
time.sleep(2)
[101]
[101, 102]
[101, 102, 103]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]