并发编程:同步异步、队列、协程与实现方式

时间:2021-07-16 23:34:48

一、进程池回顾

二、同步异步

三、利用回调完成生产者消费者

四、利用回调完成生产者消费者

五、线程队列

六、协程实现

七、greenlet使用

八、gevent使用

 

一、进程池回顾

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
# 默认按照cpu的核数
# pool = ProcessPoolExecutor()
# #
# # def task():
# # print("%s is running" % os.getpid())
# # time.sleep(0.2)
# #
# # if __name__ == '__main__':
# # for i in range(20):
# # pool.submit(task)
# # print("11111")




# 默认按照cpu的核数
pool = ThreadPoolExecutor()


# 方式1 自己来保存数据 并执行shutdown 仅在多线程

# res = []
# def task():
# print("%s is 正在打水" % os.getpid())
# time.sleep(0.2)
# w = "%s 打的水" % os.getpid()
# res.append(w)
# return w
#
# if __name__ == '__main__':
# for i in range(20):
# # 提交任务会返回一个对象 用于回去执行状态和结果
# f = pool.submit(task)
# print(f.result()) # 方式2 执行result 它是阻塞的直到任务完成 又变成串行了
#
# print("11111")
# # pool.shutdown() # 首先不允许提交新任务 然后等目前所有任务完成后
# # print(res)
# print("over")



# 默认按照cpu的核数
pool = ThreadPoolExecutor()


# 方式3 通过回调(什么是回调 任务执行结束后自动调用某个函数)


def task():
print("%s is 正在打水" % os.getpid())
# time.sleep(0.2)
w = "%s 打的水" % os.getpid()
return w


def task_finish(res):
print("打水完成! %s" % res)


if __name__ == '__main__':
for i in range(20):
# 提交任务会返回一个对象 用于回去执行状态和结果
f = pool.submit(task)
f.add_done_callback(task_finish) #添加完成后的回调
print("11111")
# pool.shutdown() # 首先不允许提交新任务 然后等目前所有任务完成后
# print(res)
print("over")

 

二、同步异步

"""
    线程的三种状态:
        1.就绪
        2.运行
        3.阻塞

    阻塞 遇到了IO操作  代码卡主 无法执行下一行  CPU会切换到其他任务

    非阻塞 与阻塞相反  代码正在执行(运行状态) 或处于就绪状态
    阻塞和非阻塞描述的是运行的状态

    同步 :提交任务必须等待任务完成,才能执行下一行
    异步 :提交任务不需要等待任务完成,立即执行下一行
    指的是一种提交任务的方式





"""

def task():
    for i in range(1000000):
        i += 1000
    print("11111")

print("start")
task() # 同步提交方式
print("end")

from threading import  Thread

print("start1")
Thread(target=task).start()  # 异步提交
print("end1")

 

三、利用回调完成生产者消费者

from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import  os
pool = ThreadPoolExecutor()

# 爬虫  1.从网络某个地址获取一个HTML文件

import requests # 该模块用于网络(HTTP)请求

# 生产数据
def get_data_task(url):
    print(os.getpid(),"正在生产数据!")
    # print(current_thread(),"正在生产数据!")

    response = requests.get(url)
    text = response.content.decode("utf-8")
    print(text)
    return text


#   处理数据
def parser_data(f):
    print(os.getpid(),"处理数据")
    # print(current_thread(), "处理数据")
    print("正在解析: 长度%s" % len(f.result()))


urls = [
    "http://www.baidu.com",
    "http://www.baidu.com",
    "http://www.baidu.com",
    "http://www.baidu.com"
]

if __name__ == '__main__':
    for url in urls:
        f = pool.submit(get_data_task,url)
        f.add_done_callback(parser_data)  # 回调函数是主进程在执行
        # 因为子进程是负责获取数据的  然而数据怎么处理 子进程并不知道  应该把数据还给主进程
    print("over")

 

四、利用回调完成生产者消费者

from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread

pool = ThreadPoolExecutor()

# 爬虫  1.从网络某个地址获取一个HTML文件

import requests # 该模块用于网络(HTTP)请求

# 生产数据
def get_data_task(url):
    # print(os.getpid(),"正在生产数据!")
    print(current_thread(),"正在生产数据!")

    response = requests.get(url)
    text = response.content.decode("utf-8")
    print(text)
    return text


#   处理数据
def parser_data(f):
    # print(os.getpid(),"处理数据")
    print(current_thread(), "处理数据")
    print("正在解析: 长度%s" % len(f.result()))


urls = [
    "http://www.baidu.com",
    "http://www.baidu.com",
    "http://www.baidu.com",
    "http://www.baidu.com"
]

if __name__ == '__main__':
    for url in urls:
        f = pool.submit(get_data_task,url)
        f.add_done_callback(parser_data)  # 回调函数是主进程在执行
        # 因为子进程是负责获取数据的  然而数据怎么处理 子进程并不知道  应该把数据还给主进程
    print("over")

 

五、线程队列

import queue

# 普通队列 先进先出
q = queue.Queue()
q.put("a")
q.put("b")


print(q.get())
print(q.get())

# 堆栈队列  先进后出 后进先出  函数调用就是进栈  函数结束就出栈 递归造成栈溢出
q2 = queue.LifoQueue()
q2.put("a")
q2.put("b")
print(q2.get())


# 优先级队列
q3 = queue.PriorityQueue()  # 数值越小优先级越高  优先级相同时 比较大小 小的先取
q3.put((-100,"c"))
q3.put((1,"a"))
q3.put((100,"b"))
print(q3.get())

 

六、协程实现

import time
def task():
    while True:
        print("task1")
        time.sleep(4)
        yield 1


def task2():
    g = task()
    while True:
        try:
            print("task2")
            next(g)
        except Exception:
            print("任务完成")
            break
task2()

 

七、greenlet使用

import greenlet

import time
def task1():
    print("task1 1")
    time.sleep(2)
    g2.switch()
    print("task1 2")
    g2.switch()

def task2():
    print("task2 1")
    g1.switch()
    print("task2 2")

g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)

g1.switch()

# 1.实例化greenlet得到一个对象 传入要执行的任务
#   至少需要两个任务
# 2.先让某个任务执行起来 使用对象调用switch
# 3.在任务的执行过程中 手动调用switch来切换
#

 

八、gevent使用

 

from gevent import monkey
monkey.patch_all()

import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    # gevent.sleep(1)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    # gevent.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play)
# g1.join()
# g2.join()

gevent.joinall([g1,g2])
print('')

# 1.spawn函数传入你的任务
# 2.调用join 去开启任务
# 3.检测io操作需要打monkey补丁  就是一个函数 在程序最开始的地方调用它