Python并发编程-线程同步(线程安全)

时间:2021-08-25 00:28:25

      Python并发编程-线程同步(线程安全)

                       作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

  线程同步,线程间协调,通过某种技术,让一个线程访问某些数据时,其它线程不能访问这些数据,直到该线程完成对数据的操作。

 

一.Event

1>.Event的常用方法

Event事件,是线程通信机制中最简单的实现,使用一个内部的标记flag,通过flage的True或False的变化来进行操作。

常用方法如下:
set():
    标记为True。 clear():
    标记设置为Flase。
is_set():
    查询标记是否为True。 wait(timeout=None):
    设置等待标记为True的时长,None为无线等待。等到返回True,未等到超时了返回False。

2>.Event使用案例

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie from threading import Event,Thread
import logging FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
logging.basicConfig(format=FORMAT,level=logging.INFO) def boss(event:Event):
logging.info("I'm boss,waiting for U")
event.wait() #阻塞等待,直到event被标记为Ture
logging.info("Good Job.") def worker(event:Event,count=):
logging.info("I'm working for U.")
cups = []
while not event.wait(0.5): #使用wait等待0.5秒(相当于"time.sleep(0.5)"),若规定事件内event依旧标记依旧没有设置为True,则返回False
logging.info("make 1 cup")
cups.append()
if len(cups) >= count:
event.set() #将标记设置为True
break
logging.info("I finished my job.cups={}".format(cups)) event = Event()
print(event.is_set()) #event实例的标记默认为False b = Thread(target=boss,name="boss",args=(event,))
w = Thread(target=worker,name="worker",args=(event,))
b.start()
w.start()
-- ::, boss  I'm boss,waiting for U
-- ::, worker I'm working for U.
False
-- ::, worker make cup
-- ::, worker make cup
-- ::, worker make cup
-- ::, worker make cup
-- ::, worker make cup
-- ::, worker make cup
-- ::, worker make cup
-- ::, worker make cup
-- ::, worker make cup
-- ::, worker make cup
-- ::, worker I finished my job.cups=[, , , , , , , , , ]
-- ::, boss Good Job.

以上代码执行结果戳这里

3>.定时器Timer(延迟执行)

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie import threading
import logging
import time FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT) def worker():
logging.info("working")
time.sleep() """
Timer是线程Thread的子类,Timer实例内部提供了一个finished属性,该属性是Event对象。
Timer是线程Thread的子类,就是线程类,具有线程的能力和特征。
它的实例时能够延时执行目标函数的线程,在真正执行目标函数之前,都可以cancel它。
cacel方法本质使用Event类实现。这并不是说,线程提供了取消的方法。
"""
t = threading.Timer(,worker) #当t对象调用start方法后,等待4秒后执行worker函数
t.setName("timer") t.start()
# t.cancel() #本质上是在worker函数执行前对finished属性set方法操作,从而跳过了worker函数执行,达到了取消的效果。 for _ in range():
print(threading.enumerate())
time.sleep()
[<_MainThread(MainThread, started )>, <Timer(timer, started )>]
[<_MainThread(MainThread, started )>, <Timer(timer, started )>]
[<_MainThread(MainThread, started )>, <Timer(timer, started )>]
[<_MainThread(MainThread, started )>, <Timer(timer, started )>]
[<_MainThread(MainThread, started )>, <Timer(timer, started )>]
-- ::, timer working
[<_MainThread(MainThread, started )>, <Timer(timer, started )>]
[<_MainThread(MainThread, started )>]
[<_MainThread(MainThread, started )>]
[<_MainThread(MainThread, started )>]
[<_MainThread(MainThread, started )>]

以上代码执行结果戳这里

二.Lock

1>.Lock的常用方法

锁,一旦线程获得锁,其它试图获取锁的线程将被阻塞。

锁:凡是存在资源共享争抢的地方都可以使用锁,从而只有一个使用者可以完全使用这个资源。

锁常用的方法如下:
acquire(blocking=True,timeout=-):
    默认阻塞,阻塞可以设置超时事件。非阻塞时,timeout禁止设置。
    成功获取锁,返回True,否则返回False release():
    释放锁。可以从任何线程调用释放。
    已上锁的锁,会被重置为unlocked
    若在未上锁的锁上调用,则会抛出RuntimeError异常。

2>.Lock锁使用案例

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie from threading import Thread,Lock
import logging
import time FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT) cpus = []
lock = Lock() def worker(count=10):
logging.info("I'm working for U.")
flag = False
while True:
lock.acquire() #获取锁
if len(cpus) >= count:
flag = True
time.sleep(0.0001) #为了看出线程切换效果
if not flag:
cpus.append(1)
lock.release()
if flag:
break
logging.info("I finished . cups = {}".format(len(cpus))) for _ in range(10):
Thread(target=worker,args=(1000,)).start()
2019-11-23 16:03:35,225 Thread-1 16204 I'm working for U.
2019-11-23 16:03:35,226 Thread-2 14436 I'm working for U.
2019-11-23 16:03:35,226 Thread-3 1164 I'm working for U.
2019-11-23 16:03:35,226 Thread-4 10460 I'm working for U.
2019-11-23 16:03:35,227 Thread-5 5072 I'm working for U.
2019-11-23 16:03:35,227 Thread-6 12016 I'm working for U.
2019-11-23 16:03:35,227 Thread-7 9732 I'm working for U.
2019-11-23 16:03:35,228 Thread-8 15644 I'm working for U.
2019-11-23 16:03:35,228 Thread-9 104 I'm working for U.
2019-11-23 16:03:35,228 Thread-10 16508 I'm working for U.
2019-11-23 16:03:37,130 Thread-1 16204 I finished . cups = 1000
2019-11-23 16:03:37,132 Thread-3 1164 I finished . cups = 1000
2019-11-23 16:03:37,134 Thread-4 10460 I finished . cups = 1000
2019-11-23 16:03:37,136 Thread-2 14436 I finished . cups = 1000
2019-11-23 16:03:37,138 Thread-5 5072 I finished . cups = 1000
2019-11-23 16:03:37,140 Thread-6 12016 I finished . cups = 1000
2019-11-23 16:03:37,142 Thread-7 9732 I finished . cups = 1000
2019-11-23 16:03:37,144 Thread-8 15644 I finished . cups = 1000
2019-11-23 16:03:37,146 Thread-9 104 I finished . cups = 1000
2019-11-23 16:03:37,148 Thread-10 16508 I finished . cups = 1000

以上代码执行结果戳这里

3>.加锁和解锁(计数器类案例)

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie import threading
from threading import Thread,Lock
import logging
import time FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT) class Counter:
def __init__(self):
self._val = 0
self.__lock = Lock() @property
def value(self):
with self.__lock:
return self._val def inc(self):
try:
self.__lock.acquire()
self._val += 1
finally:
self.__lock.release() def dec(self):
with self.__lock:
self._val -= 1 def worker(c:Counter,count=100):
for _ in range(count):
for i in range(-50,50):
if i < 0:
c.dec()
else:
c.inc() c = Counter()
c1 = 10
c2 = 10000 for i in range(c1):
Thread(target=worker,args=(c,c2)).start() while True:
time.sleep(1)
print(threading.enumerate())
if threading.active_count() == 1:
print((c.value))
break
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-6, started 8300)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-2, started 10732)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-4, started 2624)>, <Thread(Thread-5, started 1620)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>, <Thread(Thread-1, started 17728)>, <Thread(Thread-3, started 2680)>, <Thread(Thread-7, started 7532)>, <Thread(Thread-8, started 9480)>, <Thread(Thread-9, started 11044)>, <Thread(Thread-10, started 7164)>]
[<_MainThread(MainThread, started 14856)>]
0

以上代码执行结果戳这里

4>.锁的应用场景

锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

如果全部都是读取同一个共享资源需要锁吗?
  不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁

使用锁的注意事项:
少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
加锁时间越短越好,不需要就立即释放锁
一定要避免死锁
不使用锁,有了效率,但是结果是错的。

使用了锁,效率低下,但是结果是对的。

所以,我们是为了效率要错误结果呢?还是为了对的结果,让计算机去计算吧。

5>.非阻塞锁使用

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie import threading
import logging
import time FORMAT = "%(asctime)s %(threadName)s %(thread)-10d %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT) lock = threading.Lock() def worker(l:threading.Lock):
while True:
flag = l.acquire(False)
if flag:
logging.info("do something.") #为了显示效果,没有释放锁
else:
logging.info("try again")
time.sleep(1) for i in range(5):
threading.Thread(target=worker,name="worker={}".format(i),args=(lock,)).start()
2019-11-24 15:58:31,932 worker=0 123145354420224 do something.
2019-11-24 15:58:31,933 worker=0 123145354420224 try again
2019-11-24 15:58:31,933 worker=1 123145359675392 try again
2019-11-24 15:58:31,933 worker=2 123145364930560 try again
2019-11-24 15:58:31,934 worker=3 123145370185728 try again
2019-11-24 15:58:31,934 worker=4 123145375440896 try again
2019-11-24 15:58:32,933 worker=0 123145354420224 try again
2019-11-24 15:58:32,933 worker=1 123145359675392 try again
2019-11-24 15:58:32,934 worker=2 123145364930560 try again
2019-11-24 15:58:32,936 worker=4 123145375440896 try again
2019-11-24 15:58:32,936 worker=3 123145370185728 try again
2019-11-24 15:58:33,935 worker=0 123145354420224 try again
2019-11-24 15:58:33,935 worker=1 123145359675392 try again
2019-11-24 15:58:33,935 worker=2 123145364930560 try again
2019-11-24 15:58:33,940 worker=4 123145375440896 try again
2019-11-24 15:58:33,940 worker=3 123145370185728 try again
2019-11-24 15:58:34,939 worker=0 123145354420224 try again
2019-11-24 15:58:34,940 worker=1 123145359675392 try again
2019-11-24 15:58:34,940 worker=2 123145364930560 try again
2019-11-24 15:58:34,944 worker=4 123145375440896 try again
2019-11-24 15:58:34,945 worker=3 123145370185728 try again
2019-11-24 15:58:35,943 worker=1 123145359675392 try again
2019-11-24 15:58:35,944 worker=0 123145354420224 try again
2019-11-24 15:58:35,944 worker=2 123145364930560 try again
2019-11-24 15:58:35,948 worker=4 123145375440896 try again
2019-11-24 15:58:35,949 worker=3 123145370185728 try again
......

以上代码执行结果戳这里

三.可重入锁RLock 

1>.可重入锁不可跨越线程

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie import threading
import time lock = threading.RLock()
print(lock.acquire()) #仅对当前线程上锁,但是代码并不会阻塞而是可以继续执行
print(lock.acquire(blocking=True))
print(lock.acquire(timeout=3)) #默认"blocking=True",因此可以设置值阻塞的超时时间,但当blocking=False时,timeout无法使用。
print(lock.acquire(blocking=False)) print("main thread {}".format(threading.main_thread().ident))
print("lock in main thread {}".format(lock)) print("{0} 我是分割线 {0}".format("*" * 15)) lock.release()
lock.release()
lock.release()
lock.release()
# lock.release() #由于上面锁定的lock调用了4此锁定,因此解锁也只能是4次,若解锁次数超过上锁次数则抛出"RuntimeError: cannot release un-acquired lock"异常。 print("main thread {}".format(threading.main_thread().ident))
print("lock in main thread {}".format(lock)) print("{0} 我是分割线 {0}".format("*" * 15)) print(lock.acquire(blocking=False))
print(lock.acquire(blocking=False))
print("main thread {}".format(threading.main_thread().ident))
print("lock in main thread {}".format(lock)) # threading.Thread(target=lambda l:l.release(),args=(lock,)).start() #可重入锁不可跨越线程,否则会抛出"RuntimeError: cannot release un-acquired lock"异常。
lock.release() #可重入锁无论是上锁还是解锁要求在同一个线程中。 time.sleep(3)
print("main thread {}".format(threading.main_thread().ident))
print("lock in main thread {}".format(lock))
True
True
True
True
main thread 18096
lock in main thread <locked _thread.RLock object owner=18096 count=4 at 0x00000172263B7C60>
*************** 我是分割线 ***************
main thread 18096
lock in main thread <unlocked _thread.RLock object owner=0 count=0 at 0x00000172263B7C60>
*************** 我是分割线 ***************
True
True
main thread 18096
lock in main thread <locked _thread.RLock object owner=18096 count=2 at 0x00000172263B7C60>
main thread 18096
lock in main thread <locked _thread.RLock object owner=18096 count=1 at 0x00000172263B7C60>

以上代码执行结果戳这里

2>.为另一个线程传入同一个RLock对象

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie import threading
import time lock = threading.RLock() def sub(l:threading.RLock):
print("{}:{}".format(threading.current_thread(),l.acquire())) #阻塞
print("{}:{}".format(threading.current_thread(),l.acquire()))
print("lock in sub thread {}".format(lock))
l.release()
print("release in sub 1")
print("lock in sub thread {}".format(lock))
l.release()
print("release in sub 2")
print("lock in sub thread {}".format(lock)) print(lock.acquire())
print("main thread {}".format(threading.main_thread().ident))
print("lock in main thread {}".format(lock)) print("{0} 我是分割线 {0}".format("*" * 15)) threading.Timer(2,sub,(lock,)).start() #为另一个线程传入同一个lock对象 print("in main thread, {}".format(lock.acquire()))
lock.release()
time.sleep(5)
print("release lock in main thread =======",end="\n\n")
lock.release()
print("lock in main thread {}".format(lock))
True
main thread 2456
lock in main thread <locked _thread.RLock object owner=2456 count=1 at 0x000001BF29077B10>
*************** 我是分割线 ***************
in main thread, True
release lock in main thread ======= lock in main thread <unlocked _thread.RLock object owner=0 count=0 at 0x000001BF29077B10>
<Timer(Thread-1, started 9072)>:True
<Timer(Thread-1, started 9072)>:True
lock in sub thread <locked _thread.RLock object owner=9072 count=2 at 0x000001BF29077B10>
release in sub 1
lock in sub thread <locked _thread.RLock object owner=9072 count=1 at 0x000001BF29077B10>
release in sub 2
lock in sub thread <unlocked _thread.RLock object owner=0 count=0 at 0x000001BF29077B10>

以上代码执行结果戳这里

3>.可重入锁相关总结

  可重入锁,是线程相关的锁。可在同一个线程中获取锁,并可以继续在同一线程不阻塞多次获取锁。

  当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁。

  锁都应该使用完后释放。可重入锁也是锁,应该acquire多少次,就release多少次。
  

四.Condition

1>.Condition常用方法

Condition(lock=None):
  可传入一个Lock或者RLock对象,默认是RLock。 acquire(*args):
  获取锁。 wait(self,timeout=None):
  等待或超时。 notify(n=1):
  唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作。 notify_all():
  唤醒所有等待的线程。

2>.生产者消费者模型

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie import logging
from threading import Event,Thread,Condition
import time
import random FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
logging.basicConfig(format=FORMAT,level=logging.INFO) class Dispachter:
def __init__(self):
self.data = None
self.event = Event() #event只是为了使用方便,与逻辑无关
self.cond =Condition() def produce(self,total):
for _ in range(total):
data = random.randint(1,100)
with self.cond:
logging.info(data)
self.data = data
# self.cond.notify_all()
self.cond.notify(2)
self.event.wait(1) def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait()
data = self.data
logging.info("recieved {}".format(data))
self.data = None
self.event.wait(0.5) d = Dispachter()
p = Thread(target=d.produce,name="producer",args=(10,)) #增加消费者
for i in range(5):
c = Thread(target=d.consume,name="consumer")
c.start() p.start()
2019-11-25 22:24:45,076 producer 12228 64
2019-11-25 22:24:45,076 consumer 7612 recieved 64
2019-11-25 22:24:45,076 consumer 18400 recieved None
2019-11-25 22:24:46,077 producer 12228 41
2019-11-25 22:24:46,077 consumer 15008 recieved 41
2019-11-25 22:24:46,078 consumer 16440 recieved None
2019-11-25 22:24:47,077 producer 12228 98
2019-11-25 22:24:47,077 consumer 14832 recieved 98
2019-11-25 22:24:47,077 consumer 7612 recieved None
2019-11-25 22:24:48,077 producer 12228 39
2019-11-25 22:24:48,077 consumer 18400 recieved 39
2019-11-25 22:24:48,078 consumer 15008 recieved None
2019-11-25 22:24:49,078 producer 12228 79
2019-11-25 22:24:49,078 consumer 16440 recieved 79
2019-11-25 22:24:49,078 consumer 14832 recieved None
2019-11-25 22:24:50,079 producer 12228 39
2019-11-25 22:24:50,079 consumer 7612 recieved 39
2019-11-25 22:24:50,080 consumer 15008 recieved None
......

以上代码执行结果戳这里

3>.Condition总结

  Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题。

  采用了通知机制,非常有效率。

  使用方式:
    使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock,最好的方式是使用with上下文。
    消费者这wait,等待通知。
    生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法。

五.semaphore

1>.semaphore常用方法

  和Lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。换句话说,计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞。

  semaphore常用方法如下:
    Semaphore(value=1):
      构造方法。value小于0,抛ValueError异常
    acquire(blocking=True, timeout=None):
      获取信号量,计数器减1,获取成功返回True
    release():
      释放信号量,计数器加1

2>.基本使用案例(存在release方法超界限的问题)

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie from threading import Thread, Semaphore
import logging
import time FORMAT = '%(asctime)s %(threadName)-12s %(thread)-8s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO) def worker(s:Semaphore):
logging.info("in worker thread")
logging.info(s.acquire())
logging.info('worker thread over') # 定义信号量的个数为3
s = Semaphore(3)
print(s.__dict__) logging.info(s.acquire()) #获取一把锁之后,"_value"计数器就会减1。 print(s.acquire(),s._value) print(s.acquire(),s._value) Thread(target=worker, name="worker",args=(s,)).start()
time.sleep(2)
logging.info(s.acquire(False))
logging.info(s.acquire(timeout=3)) # 释放一个
logging.info('release one')
s.release()
print(s.__dict__) #释放锁后可以被"worker"线程获取 s.release()
s.release()
s.release()
s.release()
s.release() #此处我们可以故意多释放几次锁 print(s.__dict__) #竟然内置计数器"_value"达到了6(也有可能是5,因为worker线程中需要获取一把锁),这样实际上超出我们的最大值,需要解决这个问题。
2019-11-26 09:54:22,345 MainThread   140735817298880 True
2019-11-26 09:54:22,345 worker 123145401769984 in worker thread
{'_cond': <Condition(<unlocked _thread.lock object at 0x102929170>, 0)>, '_value': 3}
True 1
True 0
2019-11-26 09:54:24,346 MainThread 140735817298880 False
2019-11-26 09:54:27,349 MainThread 140735817298880 False
2019-11-26 09:54:27,349 MainThread 140735817298880 release one
{'_cond': <Condition(<unlocked _thread.lock object at 0x102929170>, 0)>, '_value': 1}
{'_cond': <Condition(<unlocked _thread.lock object at 0x102929170>, 0)>, '_value': 6}
2019-11-26 09:54:27,350 worker 123145401769984 True
2019-11-26 09:54:27,350 worker 123145401769984 worker thread over

以上代码执行结果戳这里

3>.BoundedSemaphore类(有边界的信号量,不允许使用release超出初始化范围,否则,抛出“ValueError: Semaphore released too many times”异常)

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie from threading import Thread, BoundedSemaphore
import logging
import time FORMAT = '%(asctime)s %(threadName)-12s %(thread)-8s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO) def worker(s:BoundedSemaphore):
logging.info("in worker thread")
logging.info(s.acquire())
logging.info('worker thread over') # 定义有边界信号量的个数为3
s = BoundedSemaphore(3)
print(s.__dict__) logging.info(s.acquire()) #获取一把锁之后,"_value"计数器就会减1。 print(s.acquire(),s._value) print(s.acquire(),s._value) Thread(target=worker, name="worker",args=(s,)).start()
time.sleep(2)
logging.info(s.acquire(False))
logging.info(s.acquire(timeout=3)) # 释放一个
logging.info('release one')
s.release()
print(s.__dict__) #释放锁后可以被"worker"线程获取 s.release()
s.release()
s.release()
s.release()
s.release() #此处我们可以故意多释放几次锁,一旦release超出初始值的范围就抛出异常! print(s.__dict__)
{'_cond': <Condition(<unlocked _thread.lock object at 0x102029170>, 0)>, '_value': 3, '_initial_value': 3}
True 1
True 0
2019-11-26 10:09:17,632 MainThread 140735817298880 True
2019-11-26 10:09:17,635 worker 123145507561472 in worker thread
2019-11-26 10:09:19,638 MainThread 140735817298880 False
{'_cond': <Condition(<unlocked _thread.lock object at 0x102029170>, 0)>, '_value': 1, '_initial_value': 3}
2019-11-26 10:09:22,643 MainThread 140735817298880 False
2019-11-26 10:09:22,644 MainThread 140735817298880 release one
Traceback (most recent call last):
File "/yinzhengjie/python/devops/python基础/09.线程/04.信号量.py", line 43, in <module>
2019-11-26 10:09:22,644 worker 123145507561472 True
2019-11-26 10:09:22,644 worker 123145507561472 worker thread over
s.release()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 483, in release
raise ValueError("Semaphore released too many times")
ValueError: Semaphore released too many times

以上代码执行结果戳这里

4>.应用举例(一个简单的连接池)

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie import random
import threading
import logging
import time FORMAT = '%(asctime)s %(threadName)s %(thread)-8d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT) class Conn:
def __init__(self, name):
self.name = name """
连接池
因为资源有限,且开启一个连接成本高,所以,使用连接池。 一个简单的连接池
连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用。 使用信号量解决资源有限的问题。
如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源。
注意:这个连接池的例子不能用到生成环境,只是为了说明信号量使用的例子,连接池还有很多未完成功能。
"""
class Pool:
def __init__(self, count:int):
self.count = count
#池中提前放着连接备用
self.pool = [self._connect('conn-{}'.format(i)) for i in range(self.count)]
self.semaphore = threading.Semaphore(self.count) def _connect(self, conn_name):
# 创建连接的方法,返回一个连接对象
return Conn(conn_name) def get_conn(self):
#从池中拿走一个连接
logging.info('get~~~~~~~~~~~~~')
self.semaphore.acquire()
logging.info('-------------------------')
return self.pool.pop() def return_conn(self, conn: Conn):
# 向池中返回一个连接对象
logging.info('return~~~~~~~~~~~~~')
self.pool.append(conn)
self.semaphore.release() def worker(pool:Pool):
conn = pool.get_conn()
logging.info(conn)
# 模拟使用了一段时间
time.sleep(random.randint(1, 5))
pool.return_conn(conn) # 初始化连接池
pool = Pool(3) for i in range(6):
threading.Thread(target=worker, name='worker-{}'.format(i), args=(pool,)).start()
2019-11-26 11:27:58,148 worker-0 123145324670976 get~~~~~~~~~~~~~
2019-11-26 11:27:58,148 worker-0 123145324670976 -------------------------
2019-11-26 11:27:58,148 worker-0 123145324670976 <__main__.Conn object at 0x102db0438>
2019-11-26 11:27:58,149 worker-1 123145329926144 get~~~~~~~~~~~~~
2019-11-26 11:27:58,149 worker-1 123145329926144 -------------------------
2019-11-26 11:27:58,149 worker-1 123145329926144 <__main__.Conn object at 0x102db03c8>
2019-11-26 11:27:58,149 worker-2 123145335181312 get~~~~~~~~~~~~~
2019-11-26 11:27:58,149 worker-2 123145335181312 -------------------------
2019-11-26 11:27:58,150 worker-2 123145335181312 <__main__.Conn object at 0x102db0240>
2019-11-26 11:27:58,150 worker-3 123145340436480 get~~~~~~~~~~~~~
2019-11-26 11:27:58,150 worker-4 123145345691648 get~~~~~~~~~~~~~
2019-11-26 11:27:58,151 worker-5 123145350946816 get~~~~~~~~~~~~~
2019-11-26 11:28:02,153 worker-0 123145324670976 return~~~~~~~~~~~~~
2019-11-26 11:28:02,153 worker-3 123145340436480 -------------------------
2019-11-26 11:28:02,154 worker-3 123145340436480 <__main__.Conn object at 0x102db0438>
2019-11-26 11:28:02,154 worker-1 123145329926144 return~~~~~~~~~~~~~
2019-11-26 11:28:02,154 worker-4 123145345691648 -------------------------
2019-11-26 11:28:02,154 worker-4 123145345691648 <__main__.Conn object at 0x102db03c8>
2019-11-26 11:28:03,154 worker-2 123145335181312 return~~~~~~~~~~~~~
2019-11-26 11:28:03,155 worker-5 123145350946816 -------------------------
2019-11-26 11:28:03,155 worker-5 123145350946816 <__main__.Conn object at 0x102db0240>
2019-11-26 11:28:05,155 worker-4 123145345691648 return~~~~~~~~~~~~~
2019-11-26 11:28:07,154 worker-3 123145340436480 return~~~~~~~~~~~~~
2019-11-26 11:28:08,159 worker-5 123145350946816 return~~~~~~~~~~~~~

以上代码执行结果戳这里

5>.信号量和锁

信号量:
  可以多个线程访问共享资源,但这个共享资源数量有限。 锁:
  可以看做特殊的信号量,即信号量计数器初值为1。只允许同一个时间一个线程独占资源。

六.Queue的线程安全

 #!/usr/bin/env python
#_*_conding:utf-8_*_
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie import queue """
标准库queue模块,提供FIFO的Queue、LIFO的队列、优先队列。 Queue类是线程安全的,适用于多线程间安全的交换数据。内部使用了Lock和Condition。 为什么讲魔术方法时,说实现容器的大小,不准确?
1>.如果不加锁,是不可能获得准确的大小的,因为你刚读取到了一个大小,还没有取走数据,就有可能被其他线程改了。
2>.Queue类的size虽然加了锁,但是,依然不能保证立即get、put就能成功,因为读取大小和get、put方法是分开的。
""" q = queue.Queue(8) if q.qsize() == 7:
q.put("abc") #上下两句可能被打断 if q.qsize() == 1:
q.get() #未必会成功