1 concurrent.futures 模块:
# from abc import abstractmethod,ABCMeta # # class A(metaclass=ABCMeta): # def mai(self): # pass # @classmethod # class B(A): # def mai(self): # pass # 抽象类----定义子类的一些接口标准 @abstractmethod =================== 进程池 与 线程池 =================== 引入池 的 概念是为了 控制个数 concurrent.futures =======>>> 异步调用 # 1 ====== shutdown(wait=True)====>> close()+join() shutdown(wait=False)=======>> close() # 2 ====== submit ===>> apply_async() # 3 ====== map===== # 4 ======add_done_callback(fn) from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,random,os ===============================计算多的 进程池================================================== # def work(n): # print('%s is running'%os.getpid()) # time.sleep(random.randint(1,3)) # return n*n # # 1 # if __name__ == '__main__': # executor_p=ProcessPoolExecutor(4) # futures=[] # for i in range(10): # future=executor_p.submit(work,i) # 得到异步提交的future对象结果 # futures.append(future) # executor_p.shutdown(wait=True) # executor_p ==== shutdown # print('main') # for obj in futures: # print(obj.result()) # 结果 # # # 2 ===== with as # if __name__ == '__main__': # with ProcessPoolExecutor(4) as e: # futures=[] # for i in range(6): # future=e.submit(work,i) # futures.append(future ) # print('Main') # for obj in futures: # print(obj.result()) # ================================ 简写 =========================================== # from concurrent.futures import ThreadPoolExecutor # from threading import current_thread,enumerate,active_count # # def work(n): # print('%s is running'%current_thread()) # return n**2 # # if __name__ == '__main__': # with ThreadPoolExecutor() as executor: # futures=[executor.submit(work,i) for i in range(40)] # # for i in futures: # print(i.result()) ======================================= IO多的 线程池 =========================== # # def work(n): # print('%s is running'%os.getpid()) # time.sleep(random.randint(1,3)) # return n*n # # 1 # if __name__ == '__main__': # executor_p=ThreadPoolExecutor(30) # futures=[] # for i in range(10): # future=executor_p.submit(work,i) # 得到异步提交的future对象结果 # futures.append(future) # executor_p.shutdown(wait=True) # executor_p ==== shutdown # print('main') # for obj in futures: # print(obj.result()) # 结果 # # # 2 ===== with as # if __name__ == '__main__': # with ThreadPoolExecutor(40) as e: # futures=[] # for i in range(6): # future=e.submit(work,i) # futures.append(future ) # print('Main') # for obj in futures: # print(obj.result()) =========================== map ========循环提交 多次结果====================================
# from concurrent.futures import ThreadPoolExecutor # from threading import current_thread,enumerate,active_count # # def work(n): # print('%s is running'%current_thread()) # return n**2 # # if __name__ == '__main__': # with ThreadPoolExecutor() as executor: # futures=[executor.map(work,range(40))] # [1,2,3,4,5] # executor.map(work,range(6)) 把后边的可迭代对象当做 参数传给work # ===============================future.add_done_callback(fn)========回调函数============================================= # # from concurrent.futures import ProcessPoolExecutor # # def work(n): # return n**n # # def work2(m): # m=m.result() # print( m/2) # # if __name__ == '__main__': # with ProcessPoolExecutor() as e: # for i in range(6): # e.submit(work,i).add_done_callback(work2) #======================future.exeption(4)========等待时间--等待超时异常=============================================
2.事件Event:
两个进程之间的协同工作 from threading import Event,current_thread,Thread import time
e=Event() def check(): print('%s 正在检测'%current_thread().getName()) time.sleep(3) e.set() # 确认设置 def conn(): count=1 while not e.is_set(): if count >3: raise TimeoutError('连接超时') print('%s 正在等待连接'%current_thread().getName()) e.wait(timeout=0.1) # 超时时间限制 尝试次数 count+=1 print('%s 正在连接'%current_thread().getName()) if __name__ == '__main__': t1=Thread(target=check) t2=Thread(target=conn) t3=Thread(target=conn) t4=Thread(target=conn) t1.start() t2.start() t3.start() t4.start()
3.信号量Semaphore:
from threading import Semaphore 信号量 Semaphore ---本质是一把锁======控制同时运行的进程数量(后台可以开启更多的进程) 进程池 ----同时运行的进程数量()
4.-定时器Timer:
# from threading import Timer # # def hello(): # print('hello') # # t=Timer(4,hello) # Timer--是Thread的子类 4秒后启动 # t.start()
5.死锁现象 递归锁:
============================== 死锁现象 ================================= 递归锁 RLock---可以aquire多次 每aquire一次 计数 加一,只要计数不为一 就不能被其他线程抢到 互斥锁 ----只能aquire一次 from threading import Thread,Lock,current_thread import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): with mutexA: print('%s 抢到了A'%current_thread().getName()) with mutexB: print('抢到了B'%current_thread().getName()) def f2(self): with mutexB: print('抢到了B'%current_thread().getName()) time.sleep(0.1) with mutexA: print('抢到了A'%current_thread().getName()) if __name__ == '__main__': for i in range(6): t = Mythread() t.start()
6.线程队列queue:
import queue q=queue.Queue(4) q.put(2) q.put(2) q.put(2) q.put(2) print(q.get()) print(q.get()) print(q.get()) print(q.get()) # 1 优先级队列 q=queue.PriorityQueue(3) q.put((10,'tsd')) q.put((4,'ard')) # 相同的优先级 比较 后面的数据 q.put((10,'asd')) # 数字越小 优先级越高 print(q.get()) print(q.get()) print(q.get()) # 2 先进后出 ---堆栈 队列 q=queue.LifoQueue(3)