python并发_线程

时间:2021-08-03 16:40:17

关于进程的复习:

# 管道
# 数据的共享 Manager dict list
# 进程池
# cpu个数+1
# ret = map(func,iterable)
# 异步 自带close和join
# 所有结果的[]
# apply
# 同步的:只有当func执行完之后,才会继续向下执行其他代码
# ret = apply(func,args=())
# 返回值就是func的return
# apply_async
# 异步的:当func被注册进入一个进程之后,程序就继续向下执行
# apply_async(func,args=())
# 返回值 : apply_async返回的对象obj
# 为了用户能从中获取func的返回值obj.get()
# get会阻塞直到对应的func执行完毕拿到结果
# 使用apply_async给进程池分配任务,
# 需要先close后join来保持多进程和主进程代码的同步性
# 回调函数是在主进程中执行的
from multiprocessing import Pool
def func1(n):
return n+1 def func2(m):
print(m) if __name__ == '__main__':
p = Pool(5)
for i in range(10,20):
p.apply_async(func1,args=(i,),callback=func2)
p.close()
p.join()
import requests
from urllib.request import urlopen
from multiprocessing import Pool ## 爬取字节数的例子: def get(url):
response = requests.get(url)
if response.status_code == 200:
return url,response.content.decode('utf-8') # def get_urllib(url):
# ret = urlopen(url)
# return ret.read().decode('utf-8') def call_back(args):
url,content = args
print(url,len(content)) if __name__ == '__main__':
url_lst = [
'https://www.cnblogs.com/',
'http://www.baidu.com',
'https://www.sogou.com/',
'http://www.sohu.com/',
]
p = Pool(5)
for url in url_lst:
p.apply_async(get,args=(url,),callback=call_back)
p.close()
p.join()

线程:

import os
import time
from threading import Thread ## 多线程并发
# def func(a, b):
# n = a + b
# print(n, os.getpid()) # 都在一个进程中
#
# print('主线程', os.getpid()) # 都在一个进程中
#
# for i in range(10):
# t = Thread(target=func, args=(i, 6))
# t.start() ## 同一进程中的各个线程,都可以共享该进程所拥有的资源
# def func(a,b):
# global g
# g = 0
# print(g,os.getpid())
#
# g = 100
# t_lst = []
# for i in range(10):
# t = Thread(target=func,args=(i,5))
# t.start()
# t_lst.append(t)
# for t in t_lst : t.join()
# print(g) ## 继承 类 实现
# class MyThread(Thread):
# # 重写初始化方法
# def __init__(self,arg):
# super().__init__()
# self.arg = arg
#
# def run(self):
# time.sleep(1)
# print(':::',self.arg)
#
# t = MyThread(10)
# t.start() # class Sayhi(Thread):
# def __init__(self,name):
# super().__init__()
# self.name=name
#
# def run(self):
# time.sleep(1)
# print('%s say hello' % self.name)
#
# if __name__ == '__main__':
# t = Sayhi('egon')
# t.start()
# print('主线程') # https://www.cnblogs.com/Eva-J/articles/8306047.html
# 进程 是 最小的 内存分配单位
# 线程 是 操作系统调度的最小单位
# 线程直接被CPU执行,进程内至少含有一个线程,也可以开启多个线程
# 开启一个线程所需要的时间要远远小于开启一个进程
# 多个线程内部有自己的数据栈,数据不共享
# 全局变量在多个线程之间是共享的 # GIL锁(即全局解释器锁)
# 在Cpython解释器下的python程序 在同一时刻 多个线程中只能有一个线程被CPU执行
# 高CPU : 计算类 --- 高CPU利用率
# 如果真的需要高并发,可使用多进程,避免多线程GIL锁
# 高IO : 一般程序都不会受GIL影响. 爬取网页 200个网页
# qq聊天 send recv
# 处理日志文件 读文件
# 处理web请求
# 读数据库 写数据库 import time
from threading import Thread
from multiprocessing import Process def func(n):
n + 1 if __name__ == '__main__':
start = time.time()
t_lst = []
for i in range(100):
t = Thread(target=func,args=(i,))
t.start()
t_lst.append(t)
for t in t_lst:t.join()
t1 = time.time() - start ## 证明线程比进程快的例子: start = time.time()
t_lst = []
for i in range(100):
t = Process(target=func, args=(i,))
t.start()
t_lst.append(t)
for t in t_lst: t.join()
t2 = time.time() - start
print(t1,t2)

Threading模块的其它方法:

import time
import threading ## Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。 def wahaha(n):
time.sleep(0.5)
print(n,threading.current_thread(),threading.get_ident())
# print(threading.current_thread().getName()) for i in range(10):
threading.Thread(target=wahaha,args=(i,)).start() # 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
print(threading.active_count()) #
print(threading.current_thread()) # 返回当前的线程变量。 # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
print(threading.enumerate()) ## https://www.cnblogs.com/Eva-J/articles/8306047.html

守护线程

import time
from threading import Thread def func1():
while True:
print('*'*10)
time.sleep(1) def func2():
print('in func2')
time.sleep(5) t = Thread(target=func1,)
t.daemon = True
t.start()
t2 = Thread(target=func2,)
t2.start()
t2.join() # 等待t2执行完毕
print('主线程') # 守护进程随着主进程代码的执行结束而结束
# 守护线程会在主线程结束之后等待其他子线程的结束才结束 # 主进程在执行完自己的代码之后不会立即结束 而是等待子进程结束之后 回收子进程的资源 #1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,
而进程必须保证非守护线程都运行完毕后才能结束。

线程锁:  互斥锁 递归锁

import time
from threading import Lock,Thread '''由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,
所以,出现了线程锁 - 同一时刻允许一个线程执行操作。
''' ### Lock 互斥锁 只有一把钥匙
# def func(lock):
# global n
# lock.acquire() # 加锁
# temp = n
# time.sleep(0.2)
# n = temp - 1
# lock.release()
#
# n = 10
# t_lst = []
# lock = Lock()
# for i in range(10):lkqi
# t = Thread(target=func,args=(lock,)) # 线程锁
# t.start()
# t_lst.append(t)
#
# for t in t_lst: t.join()
# print(n) ### 科学家吃面 问题 造成死锁
# noodle_lock = Lock()
# fork_lock = Lock()
# def eat1(name):
# noodle_lock.acquire()
# print('%s拿到面条啦'%name)
# fork_lock.acquire()
# print('%s拿到叉子了'%name)
# print('%s吃面'%name)
# fork_lock.release()
# noodle_lock.release()
#
# def eat2(name):
# fork_lock.acquire()
# print('%s拿到叉子了'%name)
# time.sleep(1)
# noodle_lock.acquire()
# print('%s拿到面条啦'%name)
# print('%s吃面'%name)
# noodle_lock.release()
# fork_lock.release()
#
# Thread(target=eat1,args=('alex',)).start()
# Thread(target=eat2,args=('Egon',)).start()
# Thread(target=eat1,args=('bossjin',)).start()
# Thread(target=eat2,args=('nezha',)).start()
## 当在同一个进程或线程中用到2把以上的锁时,就容易产生死锁。 from threading import RLock # 递归锁 可以多次使用 解决死锁问题
fork_lock = noodle_lock = RLock() # 类似于一串上的两把钥匙
def eat1(name):
noodle_lock.acquire() # 一把钥匙
print('%s拿到面条啦'%name)
fork_lock.acquire()
print('%s拿到叉子了'%name)
print('%s吃面'%name)
fork_lock.release()
noodle_lock.release() # 必须全部释放之后,其它人才能用。 def eat2(name):
fork_lock.acquire()
print('%s拿到叉子了'%name)
time.sleep(1)
noodle_lock.acquire()
print('%s拿到面条啦'%name)
print('%s吃面'%name)
noodle_lock.release()
fork_lock.release() Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('Egon',)).start()
Thread(target=eat1,args=('bossjin',)).start()
Thread(target=eat2,args=('nezha',)).start()

线程的信号量:

# import time
# from threading import Semaphore,Thread
# def func(sem,a,b):
# sem.acquire()
# time.sleep(1)
# print(a+b)
# sem.release()
#
# sem = Semaphore(4)
# for i in range(10):
# t = Thread(target=func,args=(sem,i,i+5))
# t.start() '''互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,
比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
''' import threading, time def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s" % n)
semaphore.release() if __name__ == '__main__':
num = 0
semaphore = threading.BoundedSemaphore(3) # 最多允许5个线程同时运行
for i in range(12):
t = threading.Thread(target=run, args=(i,))
t.start()

线程的事件:

# !/usr/bin/env python
# -*- coding:utf-8 -*- '''线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,
如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
''' import threading def do(event):
print('start.')
event.wait()
print('execute') event_obj = threading.Event() # 默认False
for i in range(6):
t = threading.Thread(target=do,args=(event_obj,))
t.start() event_obj.clear() # 设为False
input2 = input('>>>')
if input2 == 'true':
event_obj.set() # 设为 True # 事件被创建的时候
# False状态
# wait() 阻塞
# True状态
# wait() 非阻塞
# clear 设置状态为False
# set 设置状态为True

定时器 Timer

import time
from threading import Timer
def func():
print('时间同步') #1-3 while True:
t = Timer(5,func).start() # 非阻塞的 5秒之后开始
time.sleep(2) # 定时器,指定n秒后执行某操作

更多内容,参考:http://www.cnblogs.com/wupeiqi/articles/5040827.html

线程队列:

import queue

# 线程的队列,内置了锁,保证数据安全

# q = queue.Queue()  # 队列 先进先出
# q.put(123)
# print(q.get())
# q.put_nowait(456)
# print(q.get_nowait()) # q = queue.LifoQueue() # 栈 先进后出
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())
# print(q.get()) q = queue.PriorityQueue() # 优先级队列
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
q.put((-5,'f'))
q.put((-5,'d'))
q.put((1,'?'))
print(q.get()) # 数字越小,优先级越高 按ascii码顺序

线程池:https://www.cnblogs.com/Eva-J/articles/8306047.html#_label17