网络编程基础--多线程---concurrent.futures 模块---事件Event---信号量Semaphore---定时器Timer---死锁现象 递归锁----线程队列queue

时间:2021-07-28 00:23:31

1 concurrent.futures 模块:

# from abc import abstractmethod,ABCMeta
#
# class A(metaclass=ABCMeta):
#     def mai(self):
#         pass
# @classmethod
# class B(A):
#     def mai(self):
#         pass

# 抽象类----定义子类的一些接口标准 @abstractmethod




=================== 进程池 与 线程池 =================== 引入池 的 概念是为了 控制个数 concurrent.futures =======>>> 异步调用 # 1 ====== shutdown(wait=True)====>> close()+join() shutdown(wait=False)=======>> close() # 2 ====== submit ===>> apply_async() # 3 ====== map===== # 4 ======add_done_callback(fn) 

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random,os


===============================计算多的 进程池==================================================

# def work(n):
#     print('%s is running'%os.getpid())
#     time.sleep(random.randint(1,3))
#     return n*n
# #               1
# if __name__ == '__main__':
#     executor_p=ProcessPoolExecutor(4)
#     futures=[]
#     for i in range(10):
#         future=executor_p.submit(work,i)    # 得到异步提交的future对象结果
#         futures.append(future)
#     executor_p.shutdown(wait=True)           #  executor_p ==== shutdown
#     print('main')
#     for obj in futures:
#         print(obj.result())   #  结果
#
# #             2  =====  with as
# if __name__ == '__main__':
#     with ProcessPoolExecutor(4) as e:
#         futures=[]
#         for i in range(6):
#             future=e.submit(work,i)
#             futures.append(future )
#     print('Main')
#     for obj in futures:
#         print(obj.result())

#     ================================  简写  ===========================================

# from concurrent.futures import ThreadPoolExecutor
# from threading import current_thread,enumerate,active_count
#
# def work(n):
#     print('%s is running'%current_thread())
#     return n**2
#
# if __name__ == '__main__':
#     with ThreadPoolExecutor() as executor:
#         futures=[executor.submit(work,i) for i in range(40)]
#
#     for i in futures:
#         print(i.result())

=======================================  IO多的 线程池 ===========================
#
# def work(n):
#     print('%s is running'%os.getpid())
#     time.sleep(random.randint(1,3))
#     return n*n
# #               1
# if __name__ == '__main__':
#     executor_p=ThreadPoolExecutor(30)
#     futures=[]
#     for i in range(10):
#         future=executor_p.submit(work,i)    # 得到异步提交的future对象结果
#         futures.append(future)
#     executor_p.shutdown(wait=True)           #  executor_p ==== shutdown
#     print('main')
#     for obj in futures:
#         print(obj.result())   #  结果
#
# #             2  =====  with as
# if __name__ == '__main__':
#     with ThreadPoolExecutor(40) as e:
#         futures=[]
#         for i in range(6):
#             future=e.submit(work,i)
#             futures.append(future )
#     print('Main')
#     for obj in futures:
#         print(obj.result())

=========================== map ========循环提交 多次结果====================================

#
from concurrent.futures import ThreadPoolExecutor # from threading import current_thread,enumerate,active_count # # def work(n): # print('%s is running'%current_thread()) # return n**2 # # if __name__ == '__main__': # with ThreadPoolExecutor() as executor: # futures=[executor.map(work,range(40))] # [1,2,3,4,5] # executor.map(work,range(6)) 把后边的可迭代对象当做 参数传给work # ===============================future.add_done_callback(fn)========回调函数============================================= # # from concurrent.futures import ProcessPoolExecutor # # def work(n): # return n**n # # def work2(m): # m=m.result() # print( m/2) # # if __name__ == '__main__': # with ProcessPoolExecutor() as e: # for i in range(6): # e.submit(work,i).add_done_callback(work2) #======================future.exeption(4)========等待时间--等待超时异常=============================================

2.事件Event:

 两个进程之间的协同工作 

from threading import Event,current_thread,Thread
import time
e
=Event() def check(): print('%s 正在检测'%current_thread().getName()) time.sleep(3) e.set() # 确认设置 def conn(): count=1 while not e.is_set(): if count >3: raise TimeoutError('连接超时') print('%s 正在等待连接'%current_thread().getName()) e.wait(timeout=0.1) # 超时时间限制 尝试次数 count+=1 print('%s 正在连接'%current_thread().getName()) if __name__ == '__main__': t1=Thread(target=check) t2=Thread(target=conn) t3=Thread(target=conn) t4=Thread(target=conn) t1.start() t2.start() t3.start() t4.start()

3.信号量Semaphore:

 

from threading import Semaphore

信号量 Semaphore ---本质是一把锁======控制同时运行的进程数量(后台可以开启更多的进程)


进程池 ----同时运行的进程数量()

 

4.-定时器Timer:

# from threading import Timer
#
# def hello():
#     print('hello')
#
# t=Timer(4,hello) # Timer--是Thread的子类 4秒后启动
# t.start()

5.死锁现象 递归锁:

============================== 死锁现象 =================================

 递归锁 RLock---可以aquire多次        每aquire一次 计数 加一,只要计数不为一 就不能被其他线程抢到

 互斥锁 ----只能aquire一次

from threading import Thread,Lock,current_thread
import time
mutexA=Lock()
mutexB=Lock()

class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        with mutexA:
            print('%s 抢到了A'%current_thread().getName())
        with mutexB:
            print('抢到了B'%current_thread().getName())
    def f2(self):
        with mutexB:
            print('抢到了B'%current_thread().getName())
        time.sleep(0.1)
        with mutexA:
            print('抢到了A'%current_thread().getName())

if __name__ == '__main__':
    for i in range(6):
        t = Mythread()
        t.start()

6.线程队列queue:

 

import queue

q=queue.Queue(4)

q.put(2)
q.put(2)
q.put(2)
q.put(2)

print(q.get())
print(q.get())
print(q.get())
print(q.get())

# 1 优先级队列 q=queue.PriorityQueue(3)
q.put((10,'tsd'))
q.put((4,'ard'))    #    相同的优先级 比较 后面的数据
q.put((10,'asd'))   #     数字越小 优先级越高

print(q.get())
print(q.get())
print(q.get())

# 2 先进后出 ---堆栈 队列 
q=queue.LifoQueue(3)