1 # 进程/线程/协程
2 # IO:同步/异步/阻塞/非阻塞
3 # greenlet gevent
4 # 事件驱动与异步IO
5 # Select\Poll\Epoll异步IO 以及selectors模块
6 # Python队列/RabbitMQ队列
7
8 ##############################################################################################
9 1.什么是进程?进程和程序之间有什么区别?
10 进程:一个程序的执行实例称为进程;
11 每个进程都提供执行程序所需的资源。
12 进程有一个虚拟地址空间、可执行代码、对系统对象的开放句柄、一个安全上下文、一个惟一的进程标识符、环境变量、一个优先级类、最小和最大工作集大小,以及至少一个执行线程;
13 每个进程都由一个线程启动,这个线程通常被称为主线程,但是可以从它的任何线程中创建额外的线程;
14 程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程;
15 程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
16 在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行,大大提高了CPU的利用率
17 2.什么是线程?
18 进程的缺点有:
19 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
20 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。
21 线程是操作系统能够进行运算调度的最小单位。
22 它被包含在进程之中,是进程中的实际运作单位。
23 一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
24 线程是一个执行上下文,它是一个CPU用来执行指令流的所有信息。
25 3.进程和线程之间的关系?
26 线程共享创建它的进程的地址空间;进程有自己的地址空间。(内存地址)
27 线程可以直接访问其进程的数据段;进程有自己的父进程数据段的副本。
28 线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。
29 新线程很容易创建;新进程需要复制父进程。
30 线程可以对同一进程的线程进行相当大的控制;进程只能对子进程执行控制。
31 对主线程的更改(取消、优先级更改等)可能会影响流程的其他线程的行为;对父进程的更改不会影响子进程。
32 4.python GIL全局解释器锁
33 无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行
34 http: // www.dabeaz.com / python / UnderstandingGIL.pdf
35 5.Python threading模块的使用
36 基本调用方式1
37 import threading
38 import time
39
40
41 def sayhi(num): # 定义每个线程要运行的函数
42
43 print("running on number:%s" % num)
44
45 time.sleep(3)
46
47
48 if __name__ == '__main__':
49 t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一个线程实例
50 t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另一个线程实例
51
52 t1.start() # 启动线程
53 t2.start() # 启动另一个线程
54
55 print(t1.getName()) # 获取线程名
56 print(t2.getName())
57 基本调用方式2
58 import threading
59 import time
60
61
62 class MyThread(threading.Thread):
63 def __init__(self, num):
64 threading.Thread.__init__(self)
65 self.num = num
66
67 def run(self): # 定义每个线程要运行的函数
68
69 print("running on number:%s" % self.num)
70
71 time.sleep(3)
72
73
74 if __name__ == '__main__':
75 t1 = MyThread(1)
76 t2 = MyThread(2)
77 t1.start()
78 t2.start()
79 6.守护线程Daemon:
80 非守护进程线程退出,就可以将守护线程杀死。
81 # _*_coding:utf-8_*_
82
83 import time
84 import threading
85
86
87 def run(n):
88 print('[%s]------running----\n' % n)
89 time.sleep(2)
90 print('--done--')
91
92
93 def main():
94 for i in range(5):
95 t = threading.Thread(target=run, args=[i, ])
96 t.start()
97 t.join(1)
98 print('starting thread', t.getName())
99
100
101 m = threading.Thread(target=main, args=[])
102 m.setDaemon(True) # 将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
103 m.start()
104 m.join(timeout=2)
105 print("---main thread done----")
106 7.线程锁(互斥锁)
107 一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,可能会导致数据被同时修改而使得计算结果不准确(重复赋值)
108 import time
109 import threading
110
111
112 def addNum():
113 global num # 在每个线程中都获取这个全局变量
114 print('--get num:', num)
115 time.sleep(1)
116 num -= 1 # 对此公共变量进行-1操作
117
118
119 num = 100 # 设定一个共享变量
120 thread_list = []
121 for i in range(100):
122 t = threading.Thread(target=addNum)
123 t.start()
124 thread_list.append(t)
125
126 for t in thread_list: # 等待所有线程执行完毕
127 t.join()
128
129 print('final num:', num)
130 加上线程锁
131 import time
132 import threading
133
134
135 def addNum():
136 global num # 在每个线程中都获取这个全局变量
137 print('--get num:', num)
138 time.sleep(1)
139 lock.acquire() # 修改数据前加锁
140 num -= 1 # 对此公共变量进行-1操作
141 lock.release() # 修改后释放
142
143
144 num = 100 # 设定一个共享变量
145 thread_list = []
146 lock = threading.Lock() # 生成全局锁
147 for i in range(100):
148 t = threading.Thread(target=addNum)
149 t.start()
150 thread_list.append(t)
151
152 for t in thread_list: # 等待所有线程执行完毕
153 t.join()
154
155 print('final num:', num)
156 8.线程锁与GIL之间的关系?
157 加入GIL主要的原因是为了降低程序的开发的复杂度,
158 比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,
159 每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序里的线程和 py解释器自己的线程是并发运行的,
160 假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,
161 可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,
162 为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动
163 9.递归锁(Rlock 不用递归锁而多重加lock锁会导致被锁住,程序卡死)
164 import threading, time
165
166
167 def run1():
168 print("grab the first part data")
169 lock.acquire()
170 global num
171 num += 1
172 lock.release()
173 return num
174
175
176 def run2():
177 print("grab the second part data")
178 lock.acquire()
179 global num2
180 num2 += 1
181 lock.release()
182 return num2
183
184
185 def run3():
186 lock.acquire()
187 res = run1()
188 print('--------between run1 and run2-----')
189 res2 = run2()
190 lock.release()
191 print(res, res2)
192
193
194 if __name__ == '__main__':
195
196 num, num2 = 0, 0
197 lock = threading.RLock() #注意递归锁是Rlock
198 for i in range(10):
199 t = threading.Thread(target=run3)
200 t.start()
201
202 while threading.active_count() != 1:
203 print(threading.active_count())
204 else:
205 print('----all threads done---')
206 print(num, num2)
207 10.Semaphore(信号量):
208 同时允许一定数量的线程更改数据
209 import threading, time
210
211 def run(n):
212 semaphore.acquire()
213 time.sleep(1)
214 print("run the thread: %s\n" % n)
215 semaphore.release()
216
217 if __name__ == '__main__':
218
219 num = 0
220 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
221 for i in range(20):
222 t = threading.Thread(target=run, args=(i,))
223 t.start()
224
225 while threading.active_count() != 1:
226 pass # print threading.active_count()
227 else:
228 print('----all threads done---')
229 print(num)
230 11.Events事件:通过Event来实现两个或多个线程间的交互
231 event = threading.Event()
232 # a client thread can wait for the flag to be set
233 event.wait()
234 # a server thread can set or reset it
235 event.set()
236 event.clear()
237 一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则:
238 import threading,time
239 import random
240 def light():
241 if not event.isSet():
242 event.set() #wait就不阻塞 #绿灯状态
243 count = 0
244 while True:
245 if count < 10:
246 print('\033[42;1m--green light on---\033[0m')
247 elif count <13:
248 print('\033[43;1m--yellow light on---\033[0m')
249 elif count <20:
250 if event.isSet():
251 event.clear()
252 print('\033[41;1m--red light on---\033[0m')
253 else:
254 count = 0
255 event.set() #打开绿灯
256 time.sleep(1)
257 count +=1
258 def car(n):
259 while 1:
260 time.sleep(random.randrange(10))
261 if event.isSet(): #绿灯
262 print("car [%s] is running.." % n)
263 else:
264 print("car [%s] is waiting for the red light.." %n)
265 if __name__ == '__main__':
266 event = threading.Event()
267 Light = threading.Thread(target=light)
268 Light.start()
269 for i in range(3):
270 t = threading.Thread(target=car,args=(i,))
271 t.start()
272 12.python queue队列(线程队列)
273 class queue.Queue(maxsize=0) #先入先出
274 class queue.LifoQueue(maxsize=0) #last in fisrt out
275 class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
276 Queue.qsize()
277 Queue.empty() #return True if empty
278 Queue.full() # return True if full
279 Queue.put(item, block=True, timeout=None) #将项目放入队列中。如果可选的args块是true,则超时为None(缺省值),如果需要则阻塞,直到空闲槽可用。如果超时是一个正数,它会在大多数超时秒中阻塞,如果在那个时间内没有空闲槽,则会引发完全的异常。否则(块是false),如果一个空闲槽立即可用,则在队列上放置一个项,否则就会抛出完全异常(在这种情况下会忽略超时)。
280 Queue.put_nowait(item) #Equivalent to put(item, False).
281 Queue.get(block=True, timeout=None) #从队列中删除并返回一个项目。如果可选的args块是true,则超时为None(缺省值),如果需要则阻塞,直到有可用的项。如果超时是一个正数,它会在大多数超时秒中阻塞,如果在那个时间内没有可用的项,则会抛出空的异常。否则(块是false),如果立即可用,返回一个项目,否则将抛出空异常(在这种情况下忽略超时)。
282 Queue.get_nowait() #Equivalent to get(False).
283 两种方法来支持跟踪队列的任务是否已经被守护进程的消费者线程完全地处理。
284 Queue.task_done()
285 Queue.join() block直到queue被消费完毕
286 13.生产者消费者模型
287 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。
288 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,
289 所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,
290 消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
291 import threading
292 import queue
293
294
295 def producer():
296 for i in range(10):
297 q.put("骨头 %s" % i)
298
299 print("开始等待所有的骨头被取走...")
300 q.join()
301 print("所有的骨头被取完了...")
302
303
304 def consumer(n):
305 while q.qsize() > 0:
306 print("%s 取到" % n, q.get())
307 q.task_done() # 告知这个任务执行完了
308
309
310 q = queue.Queue()
311
312 p = threading.Thread(target=producer, )
313 p.start()
314
315 c1 = consumer("李闯")
316
317
318 import time,random
319 import queue,threading
320 q = queue.Queue()
321 def Producer(name):
322 count = 0
323 while count <20:
324 time.sleep(random.randrange(3))
325 q.put(count)
326 print('Producer %s has produced %s baozi..' %(name, count))
327 count +=1
328 def Consumer(name):
329 count = 0
330 while count <20:
331 time.sleep(random.randrange(4))
332 if not q.empty():
333 data = q.get()
334 print(data)
335 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
336 else:
337 print("-----no baozi anymore----")
338 count +=1
339 p1 = threading.Thread(target=Producer, args=('A',))
340 c1 = threading.Thread(target=Consumer, args=('B',))
341 p1.start()
342 c1.start()
343 14.多进程模块multiprocessing
344 from multiprocessing import Process
345 import time
346
347
348 def f(name):
349 time.sleep(2)
350 print('hello', name)
351
352
353 if __name__ == '__main__':
354 p = Process(target=f, args=('bob',))
355 p.start()
356 p.join()
357 14.1展示进程号:
358 from multiprocessing import Process
359 import os
360
361
362 def info(title):
363 print(title)
364 print('module name:', __name__)
365 print('parent process:', os.getppid())
366 print('process id:', os.getpid())
367 print("\n\n")
368
369
370 def f(name):
371 info('\033[31;1mfunction f\033[0m')
372 print('hello', name)
373
374
375 if __name__ == '__main__':
376 info('\033[32;1mmain process line\033[0m')
377 p = Process(target=f, args=('bob',))
378 p.start()
379 p.join()
380 14.2进程间通讯
381 14.2.1Queues方法
382 from multiprocessing import Process, Queue
383
384
385 def f(q):
386 q.put([42, None, 'hello'])
387
388
389 if __name__ == '__main__':
390 q = Queue()
391 p = Process(target=f, args=(q,))
392 p.start()
393 print(q.get()) # prints "[42, None, 'hello']"
394 p.join()
395 14.2.2Pipes方法
396 from multiprocessing import Process, Pipe
397
398
399 def f(conn):
400 conn.send([42, None, 'hello'])
401 conn.close()
402
403
404 if __name__ == '__main__':
405 parent_conn, child_conn = Pipe()
406 p = Process(target=f, args=(child_conn,))
407 p.start()
408 print(parent_conn.recv()) # prints "[42, None, 'hello']"
409 p.join()
410 14.2.3Managers方法
411 from multiprocessing import Process, Manager
412
413
414 def f(d, l):
415 d[1] = '1'
416 d['2'] = 2
417 d[0.25] = None
418 l.append(1)
419 print(l)
420
421
422 if __name__ == '__main__':
423 with Manager() as manager:
424 d = manager.dict()
425
426 l = manager.list(range(5))
427 p_list = []
428 for i in range(10):
429 p = Process(target=f, args=(d, l))
430 p.start()
431 p_list.append(p)
432 for res in p_list:
433 res.join()
434
435 print(d)
436 print(l)
437 14.3进程同步
438 from multiprocessing import Process, Lock
439
440
441 def f(l, i):
442 l.acquire()
443 try:
444 print('hello world', i)
445 finally:
446 l.release()
447
448
449 if __name__ == '__main__':
450 lock = Lock()
451
452 for num in range(10):
453 Process(target=f, args=(lock, num)).start()
454 15.进程池
455 进程池中有两个方法:
456 apply;
457 apply_async;
458 from multiprocessing import Process, Pool
459 import time
460 def Foo(i):
461 time.sleep(2)
462 return i + 100
463 def Bar(arg):
464 print('-->exec done:', arg)
465 pool = Pool(5)
466 for i in range(10):
467 pool.apply_async(func=Foo, args=(i,), callback=Bar)
468 # pool.apply(func=Foo, args=(i,))
469 print('end')
470 pool.close()
471 pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
472
473
474 16.协程:
475 协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
476 协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
477 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
478 协程的好处:
479 无需线程上下文切换的开销
480 无需原子操作锁定及同步的开销
481 原子操作(atomic operation)是不需要同步的,所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。
482 原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
483 方便切换控制流,简化编程模型
484 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理
485 缺点:
486 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
487 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
488 16.1利用yield实现伪协程
489 import time
490 import queue
491
492
493 def consumer(name):
494 print("--->starting eating baozi...")
495 while True:
496 new_baozi = yield
497 print("[%s] is eating baozi %s" % (name, new_baozi))
498 # time.sleep(1)
499
500
501 def producer():
502 r = con.__next__()
503 r = con2.__next__()
504 n = 0
505 while n < 5:
506 n += 1
507 con.send(n)
508 con2.send(n)
509 print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
510
511
512 if __name__ == '__main__':
513 con = consumer("c1")
514 con2 = consumer("c2")
515 p = producer()
516 16.2协程的特点
517 必须在只有一个单线程里实现并发
518 修改共享数据不需加锁
519 用户程序里自己保存多个控制流的上下文栈
520 一个协程遇到IO操作自动切换到其它协程
521 16.3Greenlet实现协程(手动)
522 # -*- coding:utf-8 -*-
523 from greenlet import greenlet
524 def test1():
525 print(12)
526 gr2.switch()
527 print(34)
528 gr2.switch()
529 def test2():
530 print(56)
531 gr1.switch()
532 print(78)
533 gr1 = greenlet(test1)
534 gr2 = greenlet(test2)
535 gr1.switch()
536 16.4Gevent 协程自动切换
537 import gevent
538
539
540 def func1():
541 print('\033[31;1m李闯在跟海涛搞...\033[0m')
542 gevent.sleep(2)
543 print('\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m')
544
545
546 def func2():
547 print('\033[32;1m李闯切换到了跟海龙搞...\033[0m')
548 gevent.sleep(1)
549 print('\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m')
550
551
552 gevent.joinall([
553 gevent.spawn(func1),
554 gevent.spawn(func2),
555 # gevent.spawn(func3),
556 ])
557 16.5 比较同步与异步的性能差别
558 from gevent import monkey;
559
560 monkey.patch_all()
561 import gevent
562 from urllib.request import urlopen
563
564
565 def f(url):
566 print('GET: %s' % url)
567 resp = urlopen(url)
568 data = resp.read()
569 print('%d bytes received from %s.' % (len(data), url))
570
571
572 gevent.joinall([
573 gevent.spawn(f, 'https://www.python.org/'),
574 gevent.spawn(f, 'https://www.yahoo.com/'),
575 gevent.spawn(f, 'https://github.com/'),
576 ])
577 17.通过gevent实现单线程下的多socket并发
578 17.1server side :
579 import sys
580 import socket
581 import time
582 import gevent
583
584 from gevent import socket, monkey
585
586 monkey.patch_all()
587
588
589 def server(port):
590 s = socket.socket()
591 s.bind(('0.0.0.0', port))
592 s.listen(500)
593 while True:
594 cli, addr = s.accept()
595 gevent.spawn(handle_request, cli)
596
597
598 def handle_request(conn):
599 try:
600 while True:
601 data = conn.recv(1024)
602 print("recv:", data)
603 conn.send(data)
604 if not data:
605 conn.shutdown(socket.SHUT_WR)
606
607 except Exception as ex:
608 print(ex)
609 finally:
610 conn.close()
611
612
613 if __name__ == '__main__':
614 server(8001)
615 17.2client side :
616 import socket
617
618 HOST = 'localhost' # The remote host
619 PORT = 8001 # The same port as used by the server
620 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
621 s.connect((HOST, PORT))
622 while True:
623 msg = bytes(input(">>:"), encoding="utf8")
624 s.sendall(msg)
625 data = s.recv(1024)
626 # print(data)
627
628 print('Received', repr(data))
629 s.close()
630 18.事件驱动与异步IO
631 方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点:
632 1. CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;如果扫描鼠标点击的接口是阻塞的呢?
633 2. 如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
634 3. 如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题;
635 所以,该方式是非常不好的。
636
637 方式二:就是事件驱动模型
638 目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:
639 1. 有一个事件(消息)队列;
640 2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
641 3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
642 4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;
643
644 当我们面对如下的环境时,事件驱动模型通常是一个好的选择:
645 1.程序中有许多任务,而且…
646 2.任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
647 3.在等待事件到来时,某些任务会阻塞。
648 4.当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。
649 网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。
650 19.select多并发socket
651 #_*_coding:utf-8_*_
652 __author__ = 'Alex Li'
653
654 import select
655 import socket
656 import sys
657 import queue
658
659
660 server = socket.socket()
661 server.setblocking(0)
662
663 server_addr = ('localhost',10000)
664
665 print('starting up on %s port %s' % server_addr)
666 server.bind(server_addr)
667
668 server.listen(5)
669
670
671 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd
672 outputs = []
673
674 message_queues = {}
675
676 while True:
677 print("waiting for next event...")
678
679 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里
680
681 for s in readable: #每个s就是一个socket
682
683 if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
684 #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
685 #新连接进来了,接受这个连接
686 conn, client_addr = s.accept()
687 print("new connection from",client_addr)
688 conn.setblocking(0)
689 inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
690 #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
691 #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的
692
693 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送
694
695 else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了
696 #客户端的数据过来了,在这接收
697 data = s.recv(1024)
698 if data:
699 print("收到来自[%s]的数据:" % s.getpeername()[0], data)
700 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
701 if s not in outputs:
702 outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
703
704
705 else:#如果收不到data代表什么呢? 代表客户端断开了呀
706 print("客户端断开了",s)
707
708 if s in outputs:
709 outputs.remove(s) #清理已断开的连接
710
711 inputs.remove(s) #清理已断开的连接
712
713 del message_queues[s] ##清理已断开的连接
714
715
716 for s in writeable:
717 try :
718 next_msg = message_queues[s].get_nowait()
719
720 except queue.Empty:
721 print("client [%s]" %s.getpeername()[0], "queue is empty..")
722 outputs.remove(s)
723
724 else:
725 print("sending msg to [%s]"%s.getpeername()[0], next_msg)
726 s.send(next_msg.upper())
727
728
729 for s in exeptional:
730 print("handling exception for ",s.getpeername())
731 inputs.remove(s)
732 if s in outputs:
733 outputs.remove(s)
734 s.close()
735
736 del message_queues[s]
737
738 #_*_coding:utf-8_*_
739 __author__ = 'Alex Li'
740
741
742 import socket
743 import sys
744
745 messages = [ b'This is the message. ',
746 b'It will be sent ',
747 b'in parts.',
748 ]
749 server_address = ('localhost', 10000)
750
751 # Create a TCP/IP socket
752 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
753 socket.socket(socket.AF_INET, socket.SOCK_STREAM),
754 ]
755
756 # Connect the socket to the port where the server is listening
757 print('connecting to %s port %s' % server_address)
758 for s in socks:
759 s.connect(server_address)
760
761 for message in messages:
762
763 # Send messages on both sockets
764 for s in socks:
765 print('%s: sending "%s"' % (s.getsockname(), message) )
766 s.send(message)
767
768 # Read responses on both sockets
769 for s in socks:
770 data = s.recv(1024)
771 print( '%s: received "%s"' % (s.getsockname(), data) )
772 if not data:
773 print(sys.stderr, 'closing socket', s.getsockname() )
774
775 20.selectors模块
776 import selectors
777 import socket
778
779 sel = selectors.DefaultSelector()
780
781
782 def accept(sock, mask):
783 conn, addr = sock.accept() # Should be ready
784 print('accepted', conn, 'from', addr)
785 conn.setblocking(False)
786 sel.register(conn, selectors.EVENT_READ, read)
787
788
789 def read(conn, mask):
790 data = conn.recv(1000) # Should be ready
791 if data:
792 print('echoing', repr(data), 'to', conn)
793 conn.send(data) # Hope it won't block
794 else:
795 print('closing', conn)
796 sel.unregister(conn)
797 conn.close()
798
799
800 sock = socket.socket()
801 sock.bind(('localhost', 10000))
802 sock.listen(100)
803 sock.setblocking(False)
804 sel.register(sock, selectors.EVENT_READ, accept)
805
806 while True:
807 events = sel.select()
808 for key, mask in events:
809 callback = key.data
810 callback(key.fileobj, mask)
811
812 21.RabbitMQ队列
813 安装 http://www.rabbitmq.com/install-standalone-mac.html
814 安装python rabbitMQ module
815 pip install pika
816 or
817 easy_install pika
818 or
819 源码
820 https://pypi.python.org/pypi/pika
821
822 21.1send端
823 # !/usr/bin/env python
824 import pika
825
826 connection = pika.BlockingConnection(pika.ConnectionParameters(
827 'localhost'))
828 channel = connection.channel()
829
830 # 声明queue
831 channel.queue_declare(queue='hello')
832
833 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
834 channel.basic_publish(exchange='',
835 routing_key='hello',
836 body='Hello World!')
837 print(" [x] Sent 'Hello World!'")
838 connection.close()
839
840 21.2receive端
841 # _*_coding:utf-8_*_
842 __author__ = 'Alex Li'
843 import pika
844
845 connection = pika.BlockingConnection(pika.ConnectionParameters(
846 'localhost'))
847 channel = connection.channel()
848
849 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
850 # We could avoid that if we were sure that the queue already exists. For example if send.py program
851 # was run before. But we're not yet sure which program to run first. In such cases it's a good
852 # practice to repeat declaring the queue in both programs.
853 channel.queue_declare(queue='hello')
854
855
856 def callback(ch, method, properties, body):
857 print(" [x] Received %r" % body)
858
859
860 channel.basic_consume(callback,
861 queue='hello',
862 no_ack=True)
863
864 print(' [*] Waiting for messages. To exit press CTRL+C')
865 channel.start_consuming()
866
867 21.3 Work Queues
868 21.3.1消息提供者代码
869 import pika
870 import time
871
872 connection = pika.BlockingConnection(pika.ConnectionParameters(
873 'localhost'))
874 channel = connection.channel()
875
876 # 声明queue
877 channel.queue_declare(queue='task_queue')
878
879 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
880 import sys
881
882 message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
883 channel.basic_publish(exchange='',
884 routing_key='task_queue',
885 body=message,
886 properties=pika.BasicProperties(
887 delivery_mode=2, # make message persistent
888 )
889 )
890 print(" [x] Sent %r" % message)
891 connection.close()
892 21.3.2消费者代码
893 # _*_coding:utf-8_*_
894
895 import pika, time
896
897 connection = pika.BlockingConnection(pika.ConnectionParameters(
898 'localhost'))
899 channel = connection.channel()
900
901
902 def callback(ch, method, properties, body):
903 print(" [x] Received %r" % body)
904 time.sleep(20)
905 print(" [x] Done")
906 print("method.delivery_tag", method.delivery_tag)
907 ch.basic_ack(delivery_tag=method.delivery_tag)
908
909
910 channel.basic_consume(callback,
911 queue='task_queue',
912 no_ack=True
913 )
914
915 print(' [*] Waiting for messages. To exit press CTRL+C')
916 channel.start_consuming()
917 21.3.3消息持久化
918 First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
919 --channel.queue_declare(queue='hello', durable=True)
920 Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:
921 --channel.queue_declare(queue='task_queue', durable=True)
922 This queue_declare change needs to be applied to both the producer and consumer code.
923 At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.
924 --channel.basic_publish(exchange='',
925 routing_key="task_queue",
926 body=message,
927 properties=pika.BasicProperties(
928 delivery_mode=2, # make message persistent
929 ))
930 21.3.4消息公平分发
931 channel.basic_qos(prefetch_count=1)
932
933 21.3.5带消息持久化+公平分发的完整代码
934 生产者端
935 # !/usr/bin/env python
936 import pika
937 import sys
938
939 connection = pika.BlockingConnection(pika.ConnectionParameters(
940 host='localhost'))
941 channel = connection.channel()
942
943 channel.queue_declare(queue='task_queue', durable=True)
944
945 message = ' '.join(sys.argv[1:]) or "Hello World!"
946 channel.basic_publish(exchange='',
947 routing_key='task_queue',
948 body=message,
949 properties=pika.BasicProperties(
950 delivery_mode=2, # make message persistent
951 ))
952 print(" [x] Sent %r" % message)
953 connection.close()
954 消费者端
955 # !/usr/bin/env python
956 import pika
957 import time
958
959 connection = pika.BlockingConnection(pika.ConnectionParameters(
960 host='localhost'))
961 channel = connection.channel()
962
963 channel.queue_declare(queue='task_queue', durable=True)
964 print(' [*] Waiting for messages. To exit press CTRL+C')
965
966
967 def callback(ch, method, properties, body):
968 print(" [x] Received %r" % body)
969 time.sleep(body.count(b'.'))
970 print(" [x] Done")
971 ch.basic_ack(delivery_tag=method.delivery_tag)
972
973
974 channel.basic_qos(prefetch_count=1)
975 channel.basic_consume(callback,
976 queue='task_queue')
977
978 channel.start_consuming()
979 21.3.5Publish\Subscribe(消息发布\订阅)
980 之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
981
982 An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
983
984 Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
985
986
987 fanout: 所有bind到此exchange的queue都可以接收消息
988 direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
989 topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
990
991 表达式符号说明:#代表一个或多个字符,*代表任何字符
992 例:#.a会匹配a.a,aa.a,aaa.a等
993 *.a会匹配a.a,b.a,c.a等
994 注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
995
996 headers: 通过headers 来决定把消息发给哪些queue
997
998 21.3.6消息publisher
999
1000 import pika
1001 import sys
1002
1003 connection = pika.BlockingConnection(pika.ConnectionParameters(
1004 host='localhost'))
1005 channel = connection.channel()
1006
1007 channel.exchange_declare(exchange='logs',
1008 type='fanout')
1009
1010 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
1011 channel.basic_publish(exchange='logs',
1012 routing_key='',
1013 body=message)
1014 print(" [x] Sent %r" % message)
1015 connection.close()
1016
1017 21.3.7消息subscriber
1018 # _*_coding:utf-8_*_
1019 __author__ = 'Alex Li'
1020 import pika
1021
1022 connection = pika.BlockingConnection(pika.ConnectionParameters(
1023 host='localhost'))
1024 channel = connection.channel()
1025
1026 channel.exchange_declare(exchange='logs',
1027 type='fanout')
1028
1029 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
1030 queue_name = result.method.queue
1031
1032 channel.queue_bind(exchange='logs',
1033 queue=queue_name)
1034
1035 print(' [*] Waiting for logs. To exit press CTRL+C')
1036
1037
1038 def callback(ch, method, properties, body):
1039 print(" [x] %r" % body)
1040
1041
1042 channel.basic_consume(callback,
1043 queue=queue_name,
1044 no_ack=True)
1045
1046 channel.start_consuming()
1047 21.3.8有选择的接收消息(exchange type=direct)
1048 RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
1049 publisher:
1050 import pika
1051 import sys
1052
1053 connection = pika.BlockingConnection(pika.ConnectionParameters(
1054 host='localhost'))
1055 channel = connection.channel()
1056
1057 channel.exchange_declare(exchange='direct_logs',
1058 type='direct')
1059
1060 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
1061 message = ' '.join(sys.argv[2:]) or 'Hello World!'
1062 channel.basic_publish(exchange='direct_logs',
1063 routing_key=severity,
1064 body=message)
1065 print(" [x] Sent %r:%r" % (severity, message))
1066 connection.close()
1067
1068 subscriber :
1069 import pika
1070 import sys
1071
1072 connection = pika.BlockingConnection(pika.ConnectionParameters(
1073 host='localhost'))
1074 channel = connection.channel()
1075
1076 channel.exchange_declare(exchange='direct_logs',
1077 type='direct')
1078
1079 result = channel.queue_declare(exclusive=True)
1080 queue_name = result.method.queue
1081
1082 severities = sys.argv[1:]
1083 if not severities:
1084 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
1085 sys.exit(1)
1086
1087 for severity in severities:
1088 channel.queue_bind(exchange='direct_logs',
1089 queue=queue_name,
1090 routing_key=severity)
1091
1092 print(' [*] Waiting for logs. To exit press CTRL+C')
1093
1094
1095 def callback(ch, method, properties, body):
1096 print(" [x] %r:%r" % (method.routing_key, body))
1097
1098
1099 channel.basic_consume(callback,
1100 queue=queue_name,
1101 no_ack=True)
1102
1103 21.3.9 更细致的消息过滤
1104 Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.
1105 In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).
1106 That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.
1107
1108 publisher:
1109 import pika
1110 import sys
1111
1112 connection = pika.BlockingConnection(pika.ConnectionParameters(
1113 host='localhost'))
1114 channel = connection.channel()
1115
1116 channel.exchange_declare(exchange='topic_logs',
1117 type='topic')
1118
1119 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
1120 message = ' '.join(sys.argv[2:]) or 'Hello World!'
1121 channel.basic_publish(exchange='topic_logs',
1122 routing_key=routing_key,
1123 body=message)
1124 print(" [x] Sent %r:%r" % (routing_key, message))
1125 connection.close()
1126
1127 ------------------------------------------------------------------
1128 subscriber:
1129 import pika
1130 import sys
1131
1132 connection = pika.BlockingConnection(pika.ConnectionParameters(
1133 host='localhost'))
1134 channel = connection.channel()
1135
1136 channel.exchange_declare(exchange='topic_logs',
1137 type='topic')
1138
1139 result = channel.queue_declare(exclusive=True)
1140 queue_name = result.method.queue
1141
1142 binding_keys = sys.argv[1:]
1143 if not binding_keys:
1144 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
1145 sys.exit(1)
1146
1147 for binding_key in binding_keys:
1148 channel.queue_bind(exchange='topic_logs',
1149 queue=queue_name,
1150 routing_key=binding_key)
1151
1152 print(' [*] Waiting for logs. To exit press CTRL+C')
1153
1154
1155 def callback(ch, method, properties, body):
1156 print(" [x] %r:%r" % (method.routing_key, body))
1157
1158
1159 channel.basic_consume(callback,
1160 queue=queue_name,
1161 no_ack=True)
1162
1163 channel.start_consuming()
1164
1165 ---------------------------------------------------------------
1166 To receive all the logs run:
1167
1168 python receive_logs_topic.py "#"
1169 To receive all logs from the facility "kern":
1170
1171 python receive_logs_topic.py "kern.*"
1172 Or if you want to hear only about "critical" logs:
1173
1174 python receive_logs_topic.py "*.critical"
1175 You can create multiple bindings:
1176
1177 python receive_logs_topic.py "kern.*" "*.critical"
1178 And to emit a log with a routing key "kern.critical" type:
1179
1180 python emit_log_topic.py "kern.critical" "A critical kernel error"