Python之并发编程-多线程

时间:2021-08-22 00:00:18

目录

一、threading模块介绍
二、使用说明
三、进一步介绍(守护线程,锁(互斥锁、递归锁),信号量,队列,event,condition,定时器)
  1、守护线程
  2、锁(互斥锁、递归锁)
  3、信号量
  4、队列
  5、event
  6、condition
  7、定时器
四、线程池

一、threading模块介绍

multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍

官网链接:https://docs.python.org/3/library/threading.html?highlight=threading#

理论:http://www.cnblogs.com/linhaifeng/articles/7430082.html

二、使用说明

1、方法说明

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If not None, daemon explicitly sets whether the thread is daemonic. If None (the default), the daemonic property is inherited from the current thread.

If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

Changed in version 3.3: Added the daemon argument.

start()
Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

run()
Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

join(timeout=None)
Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception – or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raise the same exception.

name
A string used for identification purposes only. It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

getName()
setName()
Old getter/setter API for name; use it directly as a property instead.

ident
The ‘thread identifier’ of this thread or None if the thread has not been started. This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

is_alive()
Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

daemon
A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when no alive non-daemon threads are left.

isDaemon()
setDaemon()
Old getter/setter API for daemon; use it directly as a property instead.

 

Thread实例对象的方法
  # isAlive(): 返回线程是否活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。

threading模块提供的一些方法:
  # threading.currentThread(): 返回当前的线程变量。
  # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

 

2、示例

(1)开启线程的2个方法

Python之并发编程-多线程Python之并发编程-多线程
#方式一
from threading import Thread
import time
def talk(name):
    time.sleep(2)
    print('%s is talking' %name)

if __name__ == '__main__':
    t=Thread(target=talk,args=('a',))
    t.start()
    print('主线程')

'''
主线程
a is talking
'''
方法一
Python之并发编程-多线程Python之并发编程-多线程
#方式二
from threading import Thread
import time
class Talk(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s is talking' % self.name)


if __name__ == '__main__':
    t = Talk('a')
    t.start()
    print('主线程')


"""
主线程
a is talking
"""
方法二

(2)进程和线程的一些对比

Python之并发编程-多线程Python之并发编程-多线程
from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    hello
    主线程/主进程
    '''

    #在主进程下开启子进程
    t=Process(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    主线程/主进程
    hello
    '''
打开子线程比打开子进程快
Python之并发编程-多线程Python之并发编程-多线程
from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主线程/主进程pid',os.getpid())

    print("============================================")
    #part2:开多个进程,每个进程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主线程/主进程pid',os.getpid())

# 子线程存活在父进程里,pid一致
"""
hello 8116
hello 8116
主线程/主进程pid 8116
"""
# 子进程与父进程pid不一致,另开辟一块内存开启子进程
"""
============================================
主线程/主进程pid 8116
hello 9908
hello 764
"""
子线程、子线程的pid对比
Python之并发编程-多线程Python之并发编程-多线程
# from threading import Thread
# from multiprocessing import Process
# import os
#
# def work():
#     print('hello')
#
# if __name__ == '__main__':
#     #在主进程下开启线程
#     t=Thread(target=work)
#     t.start()
#     print('主线程/主进程')
#     '''
#     打印结果:
#     hello
#     主线程/主进程
#     '''
#
#     #在主进程下开启子进程
#     t=Process(target=work)
#     t.start()
#     print('主线程/主进程')
#     '''
#     打印结果:
#     主线程/主进程
#     hello
#     '''

#
# from threading import Thread
# from multiprocessing import Process
# import os
#
# def work():
#     print('hello',os.getpid())
#
# if __name__ == '__main__':
#     #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
#     t1=Thread(target=work)
#     t2=Thread(target=work)
#     t1.start()
#     t2.start()
#     print('主线程/主进程pid',os.getpid())
#
#     print("============================================")
#     #part2:开多个进程,每个进程都有不同的pid
#     p1=Process(target=work)
#     p2=Process(target=work)
#     p1.start()
#     p2.start()
#     print('主线程/主进程pid',os.getpid())
#
# # 子线程存活在父进程里,pid一致
# """
# hello 8116
# hello 8116
# 主线程/主进程pid 8116
# """
# # 子进程与父进程pid不一致,另开辟一块内存开启子进程
# """
# ============================================
# 主线程/主进程pid 8116
# hello 9908
# hello 764
# """

from  threading import Thread
from multiprocessing import Process
import os
def work_t():
    global n
    n=0
    print("子线程里对象n的内存地址:",id(n))


def work_p():
    global m
    m=0
    print("子进程里对象m的内存地址:",id(m))

if __name__ == '__main__':
    n=100
    print("父进程里对象n的内存地址:",id(n))
    t=Thread(target=work_t)
    t.start()
    t.join()
    print('父进程',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据

    m = 1
    print("父进程里对象n的内存地址:", id(m))
    p=Process(target=work_p)
    p.start()
    p.join()
    print('父进程',m) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100
    
    
"""
父进程里对象n的内存地址: 500620640
子线程里对象n的内存地址: 500619040
父进程 0


父进程里对象n的内存地址: 500619056
子进程里对象m的内存地址: 500619040
父进程 1

"""
子线程使用父进程的对象,内存一致;进程新开辟内存,对象不一致

(3)join

与进程差不多,略

三、进一步介绍

1、守护线程

(1)说明

  无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

  需要强调的是:运行完毕并非终止运行

#1.对主进程来说,运行完毕指的是主进程代码运行完毕
#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,

#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

(2)例子

Python之并发编程-多线程Python之并发编程-多线程
from threading import Thread
import time
def talk(name):
    time.sleep(10)
    print('%s is talking' %name)

if __name__ == '__main__':
    t=Thread(target=talk,args=('a',))
    t.setDaemon(True) #必须在t.start()之前设置
    t.start()

    print('主线程')
    print(t.is_alive())
    '''
    主线程
    True  # 立刻退出,不会sleep(10)
    '''
daemon—1
Python之并发编程-多线程Python之并发编程-多线程
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")


"""
123
456
main-------  # 等待t2完成
end123
end456
"""
daemon-2

2、锁(互斥锁、递归锁)

(1)互斥锁Lock

用法

import threading

R=threading.Lock()

R.acquire()
'''
对公共数据的操作
'''
R.release()

示例

不加锁:并发执行,速度快,数据不安全
加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
串行join:整个线程串行,速度慢,数据安全
Python之并发编程-多线程Python之并发编程-多线程
from threading import Thread,Lock
import os,time
def work(i):
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
    print("线程{}结束".format(i))
    
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work,args=(i,))
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
lock—1
Python之并发编程-多线程Python之并发编程-多线程
分析:
  #1.100个线程去抢GIL锁,即抢执行权限
     #2. 肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
     #3. 极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,*交出执行权限,即释放GIL
    #4.直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程
GIL锁与互斥锁综合分析

(2)递归锁RLock

用法与Lock基本一致

Python之并发编程-多线程Python之并发编程-多线程
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A锁\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''
互斥锁的死锁现象
Python之并发编程-多线程Python之并发编程-多线程
from threading import Thread,Lock
from threading import RLock
import time
# mutexA=Lock()
# mutexB=Lock()
mutexA = RLock()
mutexB = mutexA  # 两者是同一个

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A锁\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

"""
成功,结果略
"""
递归锁解决死锁问题

3、信号量

同进程的一样

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

Python之并发编程-多线程Python之并发编程-多线程
from threading import Thread,Semaphore
import threading
import time

def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()
信号量示例

4、队列

queue队列 :使用import queue,用法与进程Queue一样

class queue.Queue(maxsize=0) #先进先出

class queue.LifoQueue(maxsize=0) #last in fisrt out 

class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
Python之并发编程-多线程Python之并发编程-多线程
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty  
Queue.full() # return True if full 
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)
Equivalent to put(item, False).

Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消费完毕
queue方法
Python之并发编程-多线程Python之并发编程-多线程
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''


import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''


import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''
示例

5、event

说明

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

方法

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

示例

Python之并发编程-多线程Python之并发编程-多线程
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('链接超时')
        print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>链接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
先检查mysql状态,ok了再连接

6、condition

使得线程等待,只有满足某条件时,才释放n个线程

Python之并发编程-多线程Python之并发编程-多线程
import threading
 
def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()
 
if __name__ == '__main__':
 
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
示例1
Python之并发编程-多线程Python之并发编程-多线程
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
示例2

7、定时器

 定时器,指定n秒后执行某操作

Python之并发编程-多线程Python之并发编程-多线程
from threading import Timer


def hello():
    print("hello, world")


t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed
示例1
Python之并发编程-多线程Python之并发编程-多线程
from threading import Timer
import random,time

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache=self.make_code()
        print(self.cache)
        self.t=Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res=''
        for i in range(n):
            s1=str(random.randint(0,9))
            s2=chr(random.randint(65,90))
            res+=random.choice([s1,s2])
        return res

    def check(self):
        while True:
            inp=input('>>: ').strip()
            if inp.upper() ==  self.cache:
                print('验证成功',end='\n')
                self.t.cancel()
                break


if __name__ == '__main__':
    obj=Code()
    obj.check()
示例2:验证码定时器

 

四、线程池

 concurrent:http://www.cnblogs.com/fat39/p/8655040.html

参考或转发

http://www.cnblogs.com/linhaifeng/articles/7428877.html