线程的Thread模块 同步控制:锁,事件,信号量,条件,定时器

时间:2022-05-26 00:22:47

Thread 模块

import time

from threading import Thread

from multiprocessing import Process

#效率差别

def func(a):

  a += 1

if __name__ == "__main__":

  li = []

  start = time.time()

  for i in range(50):

    p = Thread(target=func)

    p.start()

    li.append(p)

  for i in li:

    i.join()

  pirnt(time.time()-start)

  start = time.time()

  li = []

  for i in range(50):

    t = Process(target=func)

    t.start()

    li.append(t)

  for i in li:

    i.join()

  print(time.time()-start)

terminate(强制结束子进程) 在线程中没有

线程之间的数据共享

from threading import Thread

n = 100

def func():

  global n

  n -= 1

li = []

for i in range(100):

  t = Thread(target=func)

  t.start()

  li.append(t)

for i in li:

  i.join()

print(n)

守护线程

import time

from threading import Thread

def func():

  while 1:

    print(True)

    time.sleep(0.5)

def inner():

  print("in  inner  start")

  time.sleep(3)

  print("in  inner  end")

t1 = Thread(target=func)

t1.setDaemon(True)

t1.start()

t2 = Thread(target=inner)

t2.start()

time.sleep(1)

print("主进程")

主线程如果结束了,那么整个进程就结束了,守护线程会等待主线程结束之后才结束.

主进程 等待  守护进程  子进程  守护进程只守护主进程的代码就可以了

守护线程不行,主线程结束了那么整个进程就结束了,所有的线程就都结束了

例子 : 

使用多线程实现tcp协议的socket server

客户端:

import socket

sk = socket.socket()

sk.connect_ex(("127.0.0.1",9090))

while 1:

  sk.send(b"hello")

  ret = sk.recv(1024).decode("utf-8")

  print(ret)

sk.close()

服务器 :

import socket

from threading import Thread

sk = socket.socket()

sk.bind(("127.0.0.1",9090))

sk.listen()

def func(conn):

  while 1:

    ret = conn.recv(1024).decode("utf-8")

    print(ret)

    conn.send(b"world")

if __name__ == "__main__":

  while 1:

    conn,addr = sk.accept()

    Thread(target=func,args=(conn,)).start()

from threading import Thread,get_ident

#开启线程的第二种方法和查看线程id

class Mythread(Thread):

  def __init__(self,args):

    super().__init__()

    self.args = args

  def run(self):

    print("in my thread:",get_ident(),self.args)

print("main",get_ident())

t = Mythread("nana")

t.start()

线程中的方法

import time

from threading import Thread,get_ident,currentThread,enumerate,activeCount

class Mythread(Thread):

  def __init__(self,args):

    super().__init__()

    self.args = args

  def run(self):

    time.sleep(0.1)

    print(currentThread())  #返回当前的线程变量

    print("in my thread:",get_ident(),self.args)

print("main:",get_ident())

t = Mythread("nana")

print(t.is_alive())  #返回线程是否是活动的

t.start()

t.setname("nana")  #设置线程名称

print(t.getname())  #获取线程名称

print(activeCount())  #正在运行的线程的数量len(enumerate)  #返回一个包含正在运行的线程list

print("t:",t)

Thread类 : 开启线程 传参数 join

和进程的差异 : 1,效率  2. 数据共享   3. 守护线程

面向对象的方式开启线程 :

thread对象的其他方法 : isAlive,setname,getname

threading模块的方法 : currentTread,activeCount,encumerate

GIL锁(全局解释器锁) : 锁线程, 同一个线程在同一时刻只能被一个CPU执行

在多进程/线程同时访问一个数据的时候就会产生数据不安全的现象.

多进程 访问文件

多线程 : 同时访问一个数据

GIL全局解释锁 : 在同一个进程里的每一个进程同一时间只能有一个线程访问CPU

尽量不要设置全局变量

只要在多线程/进程之间用到全局变量就加上锁

from threading import Thread,Lock

lock = Lock()  #互斥锁

noodle = 100

def func(name):

  global noodle

  lock.acquire()

  noodle -= 1

  lock.release()

  print("%s吃到面了"%name)

if __name__ == "__main__":

  li = []

  for i in range(100):

    t = Thread(target=func)

    t.start()

    li.append(t)

  for i in li:

    i.join()

  print(noodle)

科学家吃面问题(死锁现象)

import time

from threading import Thread,Lock

noodle = Lock()

fork = Lock()

def eat1(name):

  noodle.acquire()

  print("%s拿到面了"%name)

  fork.acquire()

  print("%s拿到叉子了"%name)

  print("%s在吃面"%name)

  time.sleep(0.5)

  fork.release()

  noodle.release()

def eat2(name):

  fork.acquire()

  print("%s拿到叉子了"%name)

  noodle.acquire()

  print("%s拿到面了"%name)

  print("%s 在吃面"%name)

  time.sleep(0.5)

  noodle.release()

  fork.release()

li = ["alex","egon","wusir","taibai"]

for i in li:

  Thread(target=eat1,args=(i,)).start()

  Thread(target=eat2,args=(i,)).start()

递归锁 :

from threading import RLock

lock = RLock()

lock.acquire()

print(55555)

lock.acquire()

print(666666)

lock.acquire()

print(99999)

递归锁解决死锁问题 :

import time

from threading import Thread,RLock

lock = RLock()

def eat1(name):

  lock.acquire()

  print("%s拿到面了"%name)

  lock.acquire()

  print("%s拿到了叉子了"%name)

  print("%s 在吃饭"%name)

  time.sleep(0.5)

  lock.release()

  lock.release()

def eat2(name):

  lock.acquire()

  print("%s拿到叉子了"%name)

  lock.acquire()

  print("%s拿到面了"%name)

  time.sleep(0.5)
  lock.release()

  lock.release()

li = ["alex","egon","wusir","taibai"]

for i in li:

  Thread(target=eat1,args=(i,)).start()

  Thread(tarage=eat2,args=(i,)).start()

互斥锁解决死锁问题 :

import time

from threading import Thred,Lock

lock = Lock()

def eat1(name):

  lock.acquire()
  print("%s拿到面了"%name)

  print("%s 拿到叉子了"%name)

  print("%s在吃面"%name)

  time.sleep(0.5)

  lock.release()

def eat2(name):

  lock.acquire()

  print("%s拿到叉子了"%name)

  print("%s拿到面了"%name)

  print("%s 在吃面"%name)

  time.sleep(0.5)

  lock.release()

li = ["alex","egon","wusir","taibai"]

for i in li:

  Thread(target=eat1,args=(i,)).start()

  Thread(target=eat2,args=(i,)).start()

死锁 :

多把锁同时应用在多个线程中

互斥锁和递归锁哪个好?

递归锁 : 快速回复服务

死锁问题的出现,是程序的设计或者逻辑的问题

还应该进一步的排除和重构逻辑来保证使用互斥锁也不会发生死锁

互斥锁和递归锁的区别 :

互斥锁 : 就是在一个线程中不能连续多次acquire

递归锁 : 可以在同一个线程中acquire任意次,注意acquire多少次就需要release多少次

信号量 和 进程池

信号量就是 : 锁 + 计数器

import time

from multiprocessing import Semaphore,Process,Pool

def ktv(sem,i):

  sem.acquire()

  i += 1

  sem.release()

def ktv1(i):

  i += 1

if __name__ == "__main__":

  start = time.time()

  li = []

  sem  =Semaphore(5)

  for i in  range(100)

    p = Process(targe=ktv,args=(sem,i))

    p.start()

    li.append(p)

  for i in li:

    i.join()

  print(time.time()-start)

  start  = time.time()

  p = Pool(5)

  for i in range(100):

    p.apply_async(func=ktv1,args=(i,))

  p.close()

  p.join()

  print(time.time()-start)

进程池和信号量:

进程池 : 进程池里有几个进程就起几个,不管多少任务,池子里的进程个数是固定的,开启进程和关闭进程这些事都是需要固定的时间开销,就不产生额外的时间开销.且进程池中的进程数控制的好,那么操作系统的压力也小.

信号量 :

有多少个任务就起多少个进程/线程

可以帮助减少操作系统切换负担

但是并不能减少进/线程开启和关闭的时间

数据库连接 :

import time,random

from threading import Thread,Event

def check(e):

  print("正在检测两台机器之间的网络情况...")

  time.sleep(random.randint(1,3))

  e.set()

def connect_db(e):

  e.wait()

  print("连接数据库..")

  print("连接数据库成功....")

 

e = Event()

Thread(target=check,args=(e,)).start()

Thread(target=connect_db,args=(e,)).start()

import time,random

from threading import Thread,Event

def check(e):

  print("正在检测两台机器之间的网络情况...")

  time.sleep(random.randint(0,2))

  e.set()

def connect_db(e):

  n = 0

  while n<3:

    if e.is_set():

      break

    else:

      e.wait(0.5)

      n += 1

  if n == 3:

    raise TimeoutError

  print("连接数据库....")

  print("连接数据库成功...")

e = Event()

Thread(target=check,args=(e,)).start()

Thread(target=connect_db,args=(e,)).start()

条件 :

from threading import Condition

常用方法:

acquire  ,  release  , wait  阻塞 ,  notify (让wait解除阻塞的工具)

wait还是notify在执行这两个方法的前后,必须执行acquire和release

from threading import Condition ,Thread

def func(con,i):

  con.acquire()

  con.wait()

  print("thread:",i)

  con.release()

con = Condition()

for i in range(20):

  Thread(target=func,args=(con,i)).start()

con.acquire()

con.notify_all()  #帮助wait的子线程处理某个数据直到满足条件

con.release()

while 1:

  num = int(input("<<<"))

  con.acquire()

  con.notify(num)

  con.release()

定时器 :

from threading import Thread , Timer

def func():

  print("我执行了")

Timer(1,func).start()  #定时器

创建线程的时候,就规定他多久之后去执行