Python学习之路--进程,线程,协程

时间:2023-04-03 08:20:02
  1. 进程、与线程区别
  2. cpu运行原理
  3. python GIL全局解释器锁
  4. 线程
    1. 语法
    2. join
    3. 线程锁之Lock\Rlock\信号量
    4. 将线程变为守护进程
    5. Event事件 
    6. queue队列
    7. 生产者消费者模型
    8. Queue队列
    9. 开发一个线程池
  5. 进程
    1. 语法
    2. 进程间通讯
    3. 进程池    

进程与线程

什么是线程(thread)?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.

Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

什么是进程(process)?

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

进程与线程的区别?

    • Threads share the address space of the process that created it; processes have their own address space.
    • Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
    • Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
    • New threads are easily created; new processes require duplication of the parent process.
    • Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
    • Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes 。  

Python threading模块

线程有2种调用方式,如下:

直接调用

 import threading
import time def sayhi(num): #定义每个线程要运行的函数 print("running on number:%s" %num) time.sleep() if __name__ == '__main__': t1 = threading.Thread(target=sayhi,args=(,)) #生成一个线程实例
t2 = threading.Thread(target=sayhi,args=(,)) #生成另一个线程实例 t1.start() #启动线程
t2.start() #启动另一个线程 print(t1.getName()) #获取线程名
print(t2.getName())

继承式调用

 import threading
import time
 
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
 
    def run(self):#定义每个线程要运行的函数
 
        print("running on number:%s" %self.num)
 
        time.sleep()
 
if __name__ == '__main__':
 
    t1 = MyThread()
    t2 = MyThread()
    t1.start()
    t2.start()

Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

 #!/usr/bin/env python
# -*- coding:utf- -*-
import threading
import time def show(arg):
time.sleep()
print 'thread'+str(arg) for i in range():
t = threading.Thread(target=show, args=(i,))
t.start() print 'main thread stop'

上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

更多方法:

    • start            线程准备就绪,等待CPU调度
    • setName      为线程设置名称
    • getName      获取线程名称
    • setDaemon   设置为后台线程或前台线程(默认)
                         如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                          如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
    • join              逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
    • run              线程被cpu调度后自动执行线程对象的run方法

Join & Daemon

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

 import time
import threading
 
def run(n):
 
    print('[%s]------running----\n' % n)
    time.sleep()
    print('--done--')
 
def main():
    for i in range():
        t = threading.Thread(target=run,args=[i,])
        #time.sleep()
        t.start()
        t.join()
        print('starting thread', t.getName())
 
 
m = threading.Thread(target=main,args=[])
m.setDaemon(True) #将主线程设置为Daemon线程,它退出时,其它子线程会同时退出,不管是否执行完任务
m.start()
#m.join(timeout=)
print("---main thread done----")
Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.

  

线程锁(互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

 import time
import threading
 
def addNum():
    global num #在每个线程中都获取这个全局变量
    print('--get num:',num )
    time.sleep()
    num  -= #对此公共变量进行-1操作
 
num =   #设定一个共享变量
thread_list = []
for i in range():
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
 
for t in thread_list: #等待所有线程执行完毕
    t.join()
 
 
print('final num:', num )

正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

加锁版本

 import time
import threading
 
def addNum():
    global num #在每个线程中都获取这个全局变量
    print('--get num:',num )
    time.sleep()
    lock.acquire() #修改数据前加锁
    num  -= #对此公共变量进行-1操作
    lock.release() #修改后释放
 
num =   #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range():
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
 
for t in thread_list: #等待所有线程执行完毕
    t.join()
 
print('final num:', num )

  

RLock(递归锁)

说白了就是在一个大锁中还要再包含子锁

 import threading,time

 def run1():
print("grab the first part data")
lock.acquire()
global num
num +=
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2+=
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res,res2) if __name__ == '__main__': num,num2 = ,
lock = threading.RLock()
for i in range():
t = threading.Thread(target=run3)
t.start() while threading.active_count() != :
print(threading.active_count())
else:
print('----all threads done---')
print(num,num2)

  

Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

 import threading,time

 def run(n):
semaphore.acquire()
time.sleep()
print("run the thread: %s\n" %n)
semaphore.release() if __name__ == '__main__': num=
semaphore = threading.BoundedSemaphore() #最多允许5个线程同时运行
for i in range():
t = threading.Thread(target=run,args=(i,))
t.start() while threading.active_count() != :
pass #print threading.active_count()
else:
print('----all threads done---')
print(num)

Events

An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

clear:将“Flag”设置为False
set:将“Flag”设置为True

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

 import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #绿灯状态
    count =
    while True:
        if count < :
            print('\033[42;1m--green light on---\033[0m')
        elif count <:
            print('\033[43;1m--yellow light on---\033[0m')
        elif count <:
            if event.isSet():
                event.clear()
            print('\033[41;1m--red light on---\033[0m')
        else:
            count =
            event.set() #打开绿灯
        time.sleep()
        count +=
def car(n):
    while :
        time.sleep(random.randrange())
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range():
        t = threading.Thread(target=car,args=(i,))
        t.start()

  

queue队列

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty  
Queue.full() # return True if full 
Queue.put(itemblock=Truetimeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)

Equivalent to put(item, False).

Queue.get(block=Truetimeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()

Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消费完毕

生产者消费者模型

 import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count =
while count <:
time.sleep(random.randrange())
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=
def Consumer(name):
count =
while count <:
time.sleep(random.randrange())
if not q.empty():
data = q.get()
print(data)
print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
else:
print("-----no baozi anymore----")
count +=
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()

  

多进程multiprocessing

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

 from multiprocessing import Process
import time
def f(name):
time.sleep()
print('hello', name) if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()

注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。

To show the individual process IDs involved, here is an expanded example:

 from multiprocessing import Process
import os def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("\n\n") def f(name):
info('\033[31;1mfunction f\033[0m')
print('hello', name) if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p = Process(target=f, args=('bob',))
p.start()
p.join()

进程间通讯  

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queues

使用方法跟threading里的queue差不多

 from multiprocessing import Process, Queue

 def f(q):
q.put([, None, 'hello']) if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()

Pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

 from multiprocessing import Process, Pipe

 def f(conn):
conn.send([, None, 'hello'])
conn.close() if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

Managers

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example,

#没有共享数据
from multiprocessing import Process
import time
li = [] def foo(i):
li.append(i)
print(''say hi",li) if __name__=='__main__':
for i in range():
p=Process(target=foo,args=(i,))
p.start() print('ending',li) #方法一:Array
from multiprocessing import Process,Array
temp = Array('i', [,,,])
 
def Foo(i):
    temp[i] = +i
    for item in temp:
        print i,'----->',item
if __name__=='__main__': 
for i in range():
     p = Process(target=Foo,args=(i,))
     p.start() #方法二:manage.dict()共享数据
from multiprocessing import Process, Manager def f(d, l):
d[] = ''
d[''] =
d[0.25] = None
l.append()
print(l) if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range())
p_list = []
for i in range():
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)

当创建进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。

进程锁实例

 from multiprocessing import Process, Array, RLock

 def Foo(lock,temp,i):
"""
将第0个数加100
"""
lock.acquire()
temp[] = +i
for item in temp:
print(i,'----->',item)
lock.release() lock = RLock()
temp = Array('i', [, , , ])
if __name__=='__main__': for i in range():
p = Process(target=Foo,args=(lock,temp,i,))
p.start()

进程同步

Without using the lock output from the different processes is liable to get all mixed up.

 from multiprocessing import Process, Lock

 def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release() if __name__ == '__main__':
lock = Lock() for num in range():
Process(target=f, args=(lock, num)).start()

  

进程池  

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
 #!/usr/bin/env python
# -*- coding:utf- -*-
from multiprocessing import Process,Pool
import time def Foo(i):
time.sleep()
print('第%s次'%i)
return i+
def Bar(arg):
print('what-->',arg)
#print pool.apply(Foo,(,))
#print pool.apply_async(func =Foo, args=(,)).get()
if __name__=='__main__':
pool = Pool() #创建5个有进程的进程池
for i in range():
pool.apply_async(func=Foo, args=(i,),callback=Bar) #callback是回调
print('end')
pool.close() #先写close,再写join
pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

协程

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的好处:

无需线程上下文切换的开销
无需原子操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:

无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
使用yield实现协程操作例子

 import time
import queue
def consumer(name):
print("--->starting eating baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name,new_baozi))
#time.sleep() def producer():
r = con.__next__()
r = con2.__next__()
n =
while n < :
n +=
con.send(n)
con2.send(n)
print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__':
con = consumer("c1")
con2 = consumer("c2")
p = producer()

Greenlet

 from greenlet import greenlet
  
def test1():
    print
    gr2.switch()
    print
    gr2.switch()
  
  
def test2():
    print
    gr1.switch()
    print
  
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

Gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

 import gevent

 def foo():
print('Running in foo')
gevent.sleep()
print('Explicit context switch to foo again') def bar():
print('Explicit context to bar')
gevent.sleep()
print('Implicit context switch back to bar') gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])

同步与异步的性能区别 

 import gevent

 def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(0.5)
print('Task %s done' % pid) def synchronous():
for i in range(,):
task(i) def asynchronous():
threads = [gevent.spawn(task, i) for i in range()]
gevent.joinall(threads) print('Synchronous:')
synchronous() print('Asynchronous:')
asynchronous()

上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。  

遇到IO阻塞时会自动切换任务

 from gevent import monkey; monkey.patch_all()
import gevent
from urllib.request import urlopen def f(url):
print('GET: %s' % url)
resp = urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])

通过gevent实现单线程下的多socket并发

server side 

 import sys
import socket
import time
import gevent from gevent import socket,monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen()
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(s):
try:
while True:
data = s.recv()
print("recv:", data)
s.send(data)
if not data:
s.shutdown(socket.SHUT_WR) except Exception as ex:
print(ex)
finally: s.close()
if __name__ == '__main__':
server()

client side

 import socket

 HOST = 'localhost'    # The remote host
PORT = # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"),encoding="utf8")
s.sendall(msg)
data = s.recv()
#print(data) print('Received', repr(data))
s.close()

 

转载--> http://www.cnblogs.com/wupeiqi/articles/5040827.html