在实际处理数据时,因系统内存有限,我们不可能一次把所有数据都导出进行操作,所以需要批量导出依次操作。为了加快运行,我们会采用多线程的方法进行数据处理,以下为我总结的多线程批量处理数据的模板:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import threading
# 从数据库提取数据的类
class Scheduler():
def __init__( self ):
self ._lock = threading.RLock()
self .start = 0
# 每次取10000条数据
self .step = 10000
def getdata( self ):
# 上锁,以免多线程同时对数据库进行访问,取出重复数据
self ._lock.acquire()
# 进行取数据操作
data = 'select * from table' \
'where id between self.start and self.start + self.step'
# 取完数据后,指针后移
self .start + = self .step
self ._lock.release()
return data
# 处理数据的过程写在这里
def processdata():
# 从该实例中提取数据
data = scheduler.getdata()
while data:
# 进行处理数据的具体操作:
# 去重、补缺、运算...只要还有数据,本线程就继续取新数据
# 然后再获取数据,进行循环
data = scheduler.getdata()
# 创建多线程,threads_num为创建的线程数
def threads_scheduler(threads_num):
threads = []
for i in range (threads_num):
# 创建线程
td = threading.Thread(target = processdata, name = 'th' + str (i + 1 ))
threads.append(td)
for t in threads:
# 启动线程
t.start()
for t in threads:
# 子线程守护
t.join()
print ( '数据已全部处理成功' )
if __name__ = = '__main__' :
# 实例化一个调度器,初始化参数
scheduler = Scheduler()
# 创建线程,开始处理数据
threads_scheduler( 4 )
|
主要分为三大部分:
- Scheduler类,负责初始化参数,getdata方法负责提取数据
- processdata方法中写具体处理数据的流程
- threads_scheduler方法负责创建线程
Python多线程的知识我分为4部分进行讲解,以下带大家来回顾重点:
多线程threading
本章先为大家介绍了线程的相关概念:
主线程:当一个程序启动时,就有一个进程被操作系统(OS)创建,与此同时一个线程也立刻运行,该线程通常叫做程序的主线程(Main Thread)。因为它是程序开始时就执行的,如果你需要再创建线程,那么创建的线程就是这个主线程的子线程。
子线程:使用threading、ThreadPoolExecutor创建的线性均为子线程。
主线程的重要性体现在两方面:1.是产生其他子线程的线程;2.通常它必须最后完成执行,比如执行各种关闭动作。
在飞车程序中,如果没有多线程,我们就不能一边听歌一边玩飞车,听歌与玩游戏不能并行;在使用多线程后,我们就可以在玩游戏的同时听背景音乐。在这个例子中启动飞车程序就是一个进程,玩游戏和听音乐是两个线程。
Python提供了threading模块来实现多线程:threading.Thread可以创建线程;setDaemon(True)为守护主线程,默认为False;join()为守护子线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
from time import sleep
import threading
def music(music_name):
for i in range ( 2 ):
print ( '正在听{}' . format (music_name))
sleep( 1 )
print ( 'music over' )
def game(game_name):
for i in range ( 2 ):
print ( '正在玩{}' . format (game_name))
sleep( 3 )
print ( 'game over' )
threads = []
t1 = threading.Thread(target = music,args = ( '稻香' ,))
threads.append(t1)
t2 = threading.Thread(target = game,args = ( '飞车' ,))
threads.append(t2)
if __name__ = = '__main__' :
for t in threads:
# t.setDaemon(True)
t.start()
for t in threads:
t.join()
print ( '主线程运行结束' )
|
线程池
因为新建线程系统需要分配资源、终止线程系统需要回收资源,所以如果可以重用线程,则可以减去新建/终止的开销以提升性能。同时,使用线程池的语法比自己新建线程执行线程更加简洁。
Python为我们提供了ThreadPoolExecutor来实现线程池,此线程池默认子线程守护。它的适应场景为突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短。
1
2
3
4
5
6
7
8
9
10
11
12
|
from time import sleep
# fun为定义的待运行函数
with ThreadPoolExecutor(max_workers = 5 ) as executor:
ans = executor. map (fun, [遍历值])
for res in ans:
print (res)
with ThreadPoolExecutor(max_workers = 5 ) as executor:
list = [遍历值]
ans = [executor.submit(fun, i) for i in list ]
for res in as_completed(ans):
print (res.result())
|
其中max_workers为线程池中的线程个数,常用的遍历方法有map和submit+as_completed。根据业务场景的不同,若我们需要输出结果按遍历顺序返回,我们就用map方法,若想谁先完成就返回谁,我们就用submit+as_complete方法。
线程互斥
我们把一个时间段内只允许一个线程使用的资源称为临界资源,对临界资源的访问,必须互斥的进行。互斥,也称间接制约关系。线程互斥指当一个线程访问某临界资源时,另一个想要访问该临界资源的线程必须等待。当前访问临界资源的线程访问结束,释放该资源之后,另一个线程才能去访问临界资源。锁的功能就是实现线程互斥。
我把线程互斥比作厕所包间上大号的过程,因为包间里只有一个坑,所以只允许一个人进行大号。当第一个人要上厕所时,会将门上上锁,这时如果第二个人也想大号,那就必须等第一个人上完,将锁解开后才能进行,在这期间第二个人就只能在门外等着。这个过程与代码中使用锁的原理如出一辙,这里的坑就是临界资源。
Python 的 threading 模块引入了锁。threading 模块提供了 Lock 类,它有如下方法加锁和释放锁:
- acquire():对 Lock加锁,其中timeout参数指定加锁多少秒
- release():释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
class Account:
def __init__( self , card_id, balance):
# 封装账户ID、账户余额的两个变量
self .card_id = card_id
self .balance = balance
def withdraw(account, money):
# 进行加锁
lock.acquire()
# 账户余额大于取钱数目
if account.balance > = money:
# 吐出钞票
print (threading.current_thread().name + "取钱成功!吐出钞票:" + str (money),end = ' ' )
# 修改余额
account.balance - = money
print ( "\t余额为: " + str (account.balance))
else :
print (threading.current_thread().name + "取钱失败!余额不足" )
# 进行解锁
lock.release()
# 创建一个账户,银行卡id为8888,存款1000元
acct = Account( "8888" , 1000 )
# 模拟两个对同一个账户取钱
# 在主线程中创建一把锁
lock = threading.Lock()
threading.Thread(name = '窗口A' , target = withdraw , args = (acct , 800 )).start()
threading.Thread(name = '窗口B' , target = withdraw , args = (acct , 800 )).start()
|
lock与Rlock的区别
区别一:Lock被称为原始锁,一个线程只能请求一次;RLock被称为重入锁,可以被一个线程请求多次,即锁中可以嵌套锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import threading
def main():
lock.acquire()
print ( '第一道锁' )
lock.acquire()
print ( '第二道锁' )
lock.release()
lock.release()
if __name__ = = '__main__' :
lock = threading.Lock()
main()
|
我们会发现这个程序只会打印“第一道锁”,而且程序既没有终止,也没有继续运行。这是因为Lock锁在同一线程内第一次加锁之后还没有释放时,就进行了第二次acquire请求,导致无法执行release,所以锁永远无法释放,这就是死锁。如果我们使用RLock就能正常运行,不会发生死锁的状态。
区别二:当Lock处于锁定状态时,不属于特定线程,可在另一个线程中进行解锁释放;而RLock只有当前线程才能释放本线程上的锁,不可由其他线程进行释放,所以在使用RLock时,acquire与release必须成对出现,即解铃还须系铃人。
1
2
3
4
5
6
7
8
9
10
|
import threading
def main():
lock.release()
print ( "在子线程解锁后打印" )
if __name__ = = '__main__' :
lock = threading.Lock()
lock.acquire()
t = threading.Thread(target = main)
t.start()
|
在主线程中定义Lock锁,然后上锁,再创建一个子线程t运行main函数释放锁,结果正常输出,说明主线程上的锁,可由子线程解锁。
如果把上面的锁改为RLock则报错。在实际中设计程序时,我们会将每个功能分别封装成一个函数,每个函数中都可能会有临界区域,所以就需要用到RLock。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import threading
import time
def fun_1():
print ( '开始' )
time.sleep( 1 )
lock.acquire()
print ( "第一道锁" )
fun_2()
lock.release()
def fun_2():
lock.acquire()
print ( "第二道锁" )
lock.release()
if __name__ = = '__main__' :
lock = threading.RLock()
t1 = threading.Thread(target = fun_1)
t2 = threading.Thread(target = fun_1)
t1.start()
t2.start()
|
一句话总结就是Lock不能套娃,RLock可以套娃;Lock可以由其他线程中的锁进行操作,RLock只能由本线程进行操作。
以上就是多线程所有内容,喜欢的小伙伴支持,收藏。
技术交流
欢迎转载、收藏、有所收获点赞支持一下!
到此这篇关于Python 多线程超详细到位总结的文章就介绍到这了,更多相关Python 多线程内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/weixin_38037405/article/details/121052797