1.线程理论
- 线程是CPU的执行单位
- 多线程(即多个控制线程)的概念是,在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内又多条流水线,都共用一个车间的资源。例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。
2.线程与进程的区别
- 同一个进程内的多个线程共享该进程内的地址资源
- 创建线程的开销要远小于创建进程的开销(创建一个进程,就是创建一个车间,涉及到申请空间,而且在该空间内建至少一条流水线,但创建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小)
3.开启线程的两种方式
- 方式一:函数
-
1 #-*- coding:utf-8 -*-
2 from threading import Thread
3 import time
4 import os
5 def sayhi(name):
6 time.sleep(2)
7 print("%s say hello"%name)
8 if __name__ == "__main__":
9 t = Thread(target=sayhi,args=('egon',))
10 t.start()
11 print("主") - 方式二:类
-
1 class SayHi(Thread):
2 def __init__(self,name):
3 super().__init__()
4 self.name = name
5 def run(self):
6 print("%s say hello" % self.name)
7 print("线程pid:",os.getpid())
8 if __name__ =="__main__":
9 t1 = SayHi("egon")
10 t2 = SayHi("alex")
11 t1.start()
12 t2.start()
13 print("主进程pid:",os.getpid())
4.多线程与多进程的区别
- 开启速度:线程快于进程
- pid:同一进程下不同线程的pid是相同的,进程之间的pid是不同的
- 内存:进程之间内存地址空间是隔离的,而同一进程内开启的多个线程是共享该进程内存地址空间的
5.Thread对象的其他属性或方法
- Thread实例对象的方法
- isAlive():返回线程是否活动的
- getName():返回线程名
- setName():设置线程名
- threading模块提供的一些方法
- threading.currentThread():返回当前的线程变量
- threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前后终止后的线程
- threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
-
1 #-*- coding:utf-8 -*-
2 from threading import Thread
3 import time
4 import threading
5 import os
6
7 def sayhi(name):
8 time.sleep(2)
9 print("%s say hello"%name)
10 print(threading.current_thread().getName())#Thread-1
11 if __name__ == "__main__":
12 t = Thread(target=sayhi,args=('egon',))
13 t.start()
14 print(threading.current_thread().getName())#MainThread
15 print(threading.current_thread())#<_MainThread(MainThread, started 8308)>
16 print(threading.enumerate())#[<_MainThread(MainThread, started 9072)>, <Thread(Thread-1, started 7052)>]
17 print(threading.active_count())#2
18 print(t.is_alive())#True
19 t.join()
20 print('主线程/主进程')
21 print(t.is_alive())#False
5.守护线程
- 无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
- 强调:运行完毕并非终止运行
- 对于主进程来说,运行完毕指的是主进程代码运行完毕
- 对于主线程来说,运行完毕指的是主线程所在进程内所有非守护线程统统运行完毕,主线程才算运行完毕
- 守护线程:在同一进程内,其他非守护线程运行完毕后才算运行完毕,此时守护线程被回收
-
1 #-*- coding:utf-8 -*-
2 from threading import Thread
3 import time
4
5 def walk():
6 print("start123")
7 time.sleep(1)
8 print("end123")
9 def run():
10 print("start456")
11 time.sleep(3)
12 print("end456")
13 if __name__ == "__main__":
14 t1 = Thread(target=walk)
15 t2 = Thread(target=run)
16 t1.daemon = True
17 t1.start()
18 t2.start()
19 print("主")
20
21
22 #start123
23 #start456
24 #主
25 #end123
26 #end456 -
1 #-*- coding:utf-8 -*-
2 from threading import Thread
3 import time
4
5 def walk():
6 print("start123")
7 time.sleep(3)
8 print("end123")
9 def run():
10 print("start456")
11 time.sleep(1)
12 print("end456")
13 if __name__ == "__main__":
14 t1 = Thread(target=walk)
15 t2 = Thread(target=run)
16 t1.daemon = True
17 t1.start()
18 t2.start()
19 print("主")
20 #start123
21 #start456
22 #主
23 #end456
6.GIL全局解释器锁
- 本质上也是互斥锁
- 保护不同的数据应该加不同的锁,GIL是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据);lock是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自己加锁定义
- 有了GIL的存在,同一时刻同一进程中只能有一个线程被执行
- GIL与多线程
- ·对于计算来说,CPU越多越好,但是对于I/O来说,再多的CPU也没用
- 对于单核计算机来说,常采用开启一个进程,多个线程的方案
- 对于多核计算机来说,如果任务是计算密集型,多核意味着并行计算,多进程方案更优;如果任务是I/O密集型,则多线程方案更优
- 多线程用于IO密集型,如socket,爬虫,web
- 多进程用于计算密集型,如金融分析
-
1 #-*- coding:utf-8 -*-
2 from multiprocessing import Process
3 from threading import Thread
4 import os
5 import time
6 def work():
7 time.sleep(2)
8 print("===>")
9 if __name__ == "__main__":
10 l_p = []
11 print(os.cpu_count())
12 start = time.time()
13 for i in range(400):
14 # p = Process(target=work)#耗时时间长
15 p = Thread(target=work)#耗时时间短
16 l_p.append(p)
17 p.start()
18 for p in l_p:
19 p.join()
20 stop = time.time()
21 print("run time is %s"%(stop-start))1 #-*- coding:utf-8 -*-
2 from multiprocessing import Process
3 from threading import Thread
4 import os
5 import time
6 def work():
7 res = 0
8 for i in range(100000000):
9 res*=1
10 if __name__ == "__main__":
11 l_p = []
12 print(os.cpu_count())
13 start = time.time()
14 for i in range(4):
15 # p = Process(target=work)#耗时时间短
16 p = Thread(target=work)#耗时时间长
17 l_p.append(p)
18 p.start()
19 for p in l_p:
20 p.join()
21 stop = time.time()
22 print("run time is %s"%(stop-start))
7.死锁现象与递归锁RLOCK
- 死锁现象
- 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
-
1 # -*- coding:utf-8 -*-
2 from threading import Thread, Lock
3 import time
4
5 mutexA = Lock()
6 mutexB = Lock()
7
8
9 class MyThread(Thread):
10 def run(self):
11 self.func1()
12 self.func2()
13
14 def func1(self):
15 mutexA.acquire()
16 print('\033[41m%s 拿到A锁\033[0m' % self.name)
17 mutexB.acquire()
18 print('\033[42m%s 拿到B锁\033[0m' % self.name)
19 mutexB.release()
20 mutexA.release()
21 def func2(self):
22 mutexB.acquire()
23 print('\033[43m%s 拿到B锁\033[0m' % self.name)
24 time.sleep(2)
25 mutexA.acquire()
26 print('\033[44m%s 拿到A锁\033[0m' % self.name)
27 mutexA.release()
28 mutexB.release()
29 if __name__ == "__main__":
30 for i in range(10):
31 t = MyThread()
32 t.start()
- 递归锁
- 递归锁RLOCK用于解决死锁现象
- 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,
- 二者的区别
- 递归锁可以连续acquire多次,
- 互斥锁只能acquire一次
-
1 # -*- coding:utf-8 -*-
2 from threading import Thread, Lock, RLock
3 import time
4
5 mutexA = mutexB = RLock()
6
7
8 class MyThread(Thread):
9 def run(self):
10 self.func1()
11 self.func2()
12
13 def func1(self):
14 mutexA.acquire()
15 print('\033[41m%s 拿到A锁\033[0m' % self.name)
16 mutexB.acquire()
17 print('\033[42m%s 拿到B锁\033[0m' % self.name)
18 mutexB.release()
19 mutexA.release()
20
21 def func2(self):
22 mutexB.acquire()
23 print('\033[43m%s 拿到B锁\033[0m' % self.name)
24 time.sleep(2)
25 mutexA.acquire()
26 print('\033[44m%s 拿到A锁\033[0m' % self.name)
27 mutexA.release()
28 mutexB.release()
29
30
31 if __name__ == "__main__":
32 for i in range(10):
33 t = MyThread()
34 t.start()
8.信号量
- 信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小
-
1 #-*- coding:utf-8 -*-
2 from threading import Thread,Semaphore
3 import threading
4 import time
5 def func():
6 sm.acquire()
7 print('%s get sm' % threading.current_thread().getName())
8 time.sleep(3)
9 sm.release()
10 if __name__ == "__main__":
11 sm = Semaphore(5)
12 for i in range(23):
13 t = Thread(target=func)
14 t.start() -
1 Semaphore管理一个内置的计数器,
2 每当调用acquire()时内置计数器-1;
3 调用release() 时内置计数器+1;
4 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
9.event
- 线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
-
1 from threading import Event
2
3 event.isSet():返回event的状态值;
4
5 event.wait():如果 event.isSet()==False将阻塞线程;
6
7 event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
8
9 event.clear():恢复event的状态值为False。1 def conn_mysql():
2 count = 1
3 while not event.is_set():
4 if count > 3:
5 print("%s try too many times " % threading.currentThread().getName())
6 return
7 print('<%s>第%s次尝试链接'%(threading.current_thread().getName(),count))
8 event.wait(0.5)
9 count += 1
10 print('<%s>链接成功' % threading.current_thread().getName())
11
12 def check_mysql():
13 print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
14 time.sleep(random.randint(2,4))
15 event.set()
16
17 if __name__ == "__main__":
18 event = Event()
19 conn1 = Thread(target=conn_mysql)
20 conn2 = Thread(target=conn_mysql)
21 conn3 = Thread(target=conn_mysql)
22 check = Thread(target=check_mysql)
23
24 conn1.start()
25 conn2.start()
26 conn3.start()
27 check.start()
10.定时器
- 指定n秒后执行某操作
-
def hello():
print("hello,world!")
t = Timer(2,hello)
t.start() # after 2 seconds, "hello, world" will be printed -
1 # -*- coding:utf-8 -*-
2 from threading import Timer
3 import random
4
5
6 class Code(object):
7 def __init__(self):
8 self.make_cache()
9
10 def make_cache(self, interval=5):
11 self.cache = self.make_code()
12 print(self.cache)
13 self.t = Timer(interval, self.make_cache)
14 self.t.start()
15
16 def make_code(self, n=4):
17 res = ""
18 for i in range(n):
19 s1 = str(random.randint(0, 9))
20 s2 = chr(random.randint(65, 90))
21 res += random.choice([s1, s2])
22 return res
23
24 def check(self):
25 while True:
26 code = input("请输入验证码>>:").strip()
27 if code.upper() == self.cache:
28 print("验证码输入正确")
29 self.t.cancel()
30 break
31
32 if __name__ == "__main__":
33 obj = Code()
34 obj.check()
.11.线程queue
-
1 #-*- coding:utf-8 -*-
2 import queue
3 q = queue.Queue(3)
4 q.put(1)
5 q.put(2)
6 q.put(3)
7 # q.put(4)#阻塞
8 # q.put(4,block=False)#抛出异常
9 # q.put(4,block=True,timeout=3) #3s阻塞后,抛出异常
10
11 q.get()
12 q.get()
13 q.get()
14 # q.get()#阻塞
15 # q.get(block=False)#抛出异常
16 # q.get_nowait()#抛出异常,和上一条等价
17 q.get(block=True,timeout=3)#3s阻塞后,抛出异常 -
1 #堆栈,先进后出
2 import queue
3 q = queue.LifoQueue()
4 q.put('first')
5 q.put('second')
6 q.put('third')
7 print(q.get())#third
8 print(q.get())#second
9 print(q.get())#first -
1 #优先级队列
2 import queue
3 q = queue.PriorityQueue()
4 #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
5 q.put((20,'a'))
6 q.put((10,'b'))
7 q.put((30,'c'))
8
9 print(q.get())
10 print(q.get())
11 print(q.get())
12
13 # 结果(数字越小优先级越高,优先级高的优先出队):
14 # (10, 'b')
15 # (20, 'a')
16 # (30, 'c') - 多线程套接字通信
-
1 #-*- coding:utf-8 -*-
2 import socket
3 from threading import Thread
4 from concurrent.futures import ThreadPoolExecutor
5
6 def communicate(conn):
7 while True:
8 try:
9 data = conn.recv(1024)
10 if not data:break
11 conn.send(data.upper())
12 except ConnectionResetError:
13 break
14 conn.close()
15
16
17 def server(server_ip,port):
18 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
19 server.bind((server_ip, port))
20 server.listen(5)
21 while True:
22 conn, client_addr = server.accept()
23 # t = Thread(target=communicate,args=(conn,))
24 # t.start()
25 pool.submit(communicate,conn)
26 server.close()
27
28 if __name__ == "__main__":
29 pool = ThreadPoolExecutor(2)
30 server_ip = socket.gethostbyname(socket.gethostname())
31 port = 8080
32 server(server_ip, port)1 #-*- coding:utf-8 -*-
2 import socket
3 server_ip = socket.gethostbyname(socket.gethostname())
4 client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
5 client.connect((server_ip,8080))
6 while True:
7 msg = input(">>:").strip()
8 if not msg:continue
9 client.send(msg.encode("utf-8"))
10 data = client.recv(1024)
11 print(data.decode("utf-8"))
12 client.close()
12.线程池与进程池
-
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.1、submit(fn, *args, **kwargs)
异步提交任务
2、map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作
3、shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
4、result(timeout=None)
取得结果
5、add_done_callback(fn)
回调函数 -
1 #-*- coding:utf-8 -*-
2 from threading import Thread,currentThread
3 from multiprocessing import Process
4 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
5 import os
6 import time
7 import random
8 def task(name):
9 print("name:%s pid:%s"%(name,os.getpid()))
10 # print("name:%s pid:%s"%(name,currentThread().getName()))
11 time.sleep(random.randint(1,3))
12 if __name__ == "__main__":
13 pool = ProcessPoolExecutor(5)
14 # pool = ThreadPoolExecutor(5)
15 for i in range(10):
16 pool.submit(task,i)
17 pool.shutdown(wait=True)
18 print("主") - map函数
-
1 # -*- coding:utf-8 -*-
2 import os
3 import random
4 import time
5 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
6
7
8 def task(n):
9 print('%s is running' % os.getpid())
10 time.sleep(random.randint(1, 3))
11 return n ** 2
12
13
14 if __name__ == "__main__":
15 executor = ProcessPoolExecutor(max_workers=3)
16 # for i in range(11):
17 # executor.submit(task,i)
18 executor.map(task,range(1,12))#map取代了for+submit - 回调函数
- 可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数
-
1 #-*- coding:utf-8 -*-
2 import requests
3 import time
4 from concurrent.futures import ThreadPoolExecutor
5 def get(url):
6 response = requests.get(url)
7 time.sleep(3)
8 return {"url":url,"content":response.text}
9 def parse(res):
10 res = res.result()
11 print("%s parse res is %s"%(res["url"],len(res["content"])))
12 if __name__=="__main__":
13 urls = [
14 "http://www.woshipm.com/rp/415309.html",
15 "https://www.python.org",
16 "http://blog.csdn.net/shanzhizi/article/details/50903748",
17 ]
18 pool = ThreadPoolExecutor(2)
19 for url in urls:
20 pool.submit(get,url).add_done_callback(parse)
21 pool.shutdown()
22 print("主")