理解异步编程

以前写的函数往往调用后就直接运行直到返回结果,这是同步;如果函数马上返回,等数据到达再通知函数,这是异步。

再理解下进程、线程和协程:

  • 进程:进程有独立的地址控件,资源句柄,进程是系统资源分配的最小单位
  • 线程:线程是进程的一个实体,属于进程,多个线程共享进程的内存地址空间和资源。线程是CPU调度的最小单位。
  • 协程:“协程就是可以暂停执行的函数”,协程属于线程。

异步编程:

  • 回调:回调函数可以理解为是IO事件完毕后执行提前注册的函数
  • 事件循环:事件循环”是一种等待程序分配事件或消息的编程架构“。基本上来说就是”当A发生时,执行B“
  • 异步编程:异步编程是一种IO模型,异步IO模型需要一个消息循环,主线程不断重复“读取消息+处理消息”这一个过程。不论什么编程语言,但凡要做异步编程,事件循环+回调这种模式是逃不掉的

在Python中协程和事件循环一起使用构成了异步编程。

asyncio的设计

asyncio模块的特点是只存在一个线程,是一种多任务合作模式,允许异步任务交出执行权给其他任务,等其他任务完成再继续往下执行。

  • 导入asyncio模块:import asyncio

  • 在函数前加上async关键字,就变成了async函数,特点是执行可以暂停:async def main()

  • 在async函数内部的异步任务前加上await命令:

    await asyncio.sleep(1)

    这表示,遇到await命令,就会在异步任务开始执行之后,暂停当前async函数的执行,执行其他任务,等异步任务执行结束,再返回async函数继续往下执行。

  • asyncio.run()方法加载async函数,启动事件循环

asyncio工作原理

  • 协程对象:协程对象指一个使用async关键字定义的异步函数,是需要执行的任务,它的调用不会立即执行函数,而是返回一个协程对象。协程不能直接运行,协程对象需要注册到事件循环,由事件循环调用。

  • 在Python中编写异步函数时要记住的一件事是,在def之前使用了async关键字并不意味着异步函数将同时运行。如果采用普通函数并在其前面添加async,则事件循环将运行函数而不会中断,因为没有指定允许循环中断你的函数以运行另一个协同程序的位置。指定允许事件循环中断运行的位置非常简单,每次使用关键字await等待事件循环都可以停止运行函数并切换到运行另一个注册到循环的协同程序。

  • 事件循环:

    事件循环是执行异步代码并决定如何在异步函数之间切换的对象。如果某个协程在等待某些资源,需要暂停它的执行,在事件循环中注册这个事件,当事件发生的时候,可以再次唤醒这个协程的执行。

    运行异步函数首先需要创建一个协程,然后创建future(asyncio.ensure_future(async_fun))或task对象,将它们添加到事件循环(asyncio.get_event_loop)中,然后调用loop.run_until_completed启动事件循环,这样就会开始执行future或task对象。

流程是:

  • 1、事件循环是在线程中执行
  • 2、从队列中取得任务
  • 3、每个任务在协程中执行下一步动作
  • 4、如果在一个协程中调用另一个协程(await),会触发上下文切换,挂起当前协程,并保存现场环境(变量,状态),然后载入被调用协程
  • 5、如果协程的执行到阻塞部分(阻塞I/O,Sleep),当前协程会挂起,并将控制权返回到线程的消息循环中,然后消息循环继续从队列中执行下一个任务...以此类推
  • 6、队列中的所有任务执行完毕后,消息循环返回第一个任务

Future & Task对象

Future对象封装了一个未来会被计算的可调用的异步执行对象,他们被放入队列,状态、结果或者异常能被查询。 Future对象有一个result属性,用于存放未来的执行结果。还有个set_result()方法,是用于设置result的,并且会在给result绑定值以后运行事先给Future对象添加的回调。回调是通过Future对象的add_done_callback()方法添加的。

Future的创建方法:

# 该函数在 Python 3.7 中被加入,更加高层次的函数,返回Task对象
future1 = asyncio.create_task(my_coroutine)
# 在Python 3.7 之前,是更加低级的函数,返回Future对象或者Task对象
future2 = asyncio.ensure_future(my_coroutine)

第一种方法在循环中添加一个协程并返回一个task对象,task对象是future的子类型。第二种方法非常相似,当传入协程对象时返回一个Task对象,唯一的区别是它也可以接受Future对象或Task对象,在这种情况下它不会做任何事情并且返回Future对象或者Task对象不变。

Future对象有几个状态:

  • Pending:就绪
  • Running:运行
  • Done:完成
  • Cancelled:取消

创建Future对象的时候,状态为Pending,事件循环调用执行的时候就是running,调用完毕就是done,如果需要取消Future对象的调度执行,可调用Future对象的cancel()函数。

除此之外,Future对象还有下面一些常用的方法:

  • result():立即返回Future对象运行结果或者抛出执行时的异常,没有timeout参数,如果Future没有完成,不会阻塞等待结果,而是直接抛出InvalidStateError异常。最好的方式是通过await获取运行结果,await会自动等待Future完成返回结果,也不会阻塞事件循环,因为在asyncio中,await被用来将控制权返回给事件循环。
  • done():非阻塞的返回Future对象是否成功取消或者运行结束或被设置异常,而不是查看future是否已经执行完成。
  • cancelled():判断Future对象是否被取消。
  • add_done_callback():传入一个可回调对象,当Future对象done时被调用。
  • exception():获取Future对象中的异常信息,只有当Future对象done时才会返回。
  • get_loop():获取当前Future对象绑定的事件循环。

需要注意的是,当在协程内部引发未处理的异常时,它不会像正常的同步编程那样破坏我们的程序,相反,它存储在future内部,如果在程序退出之前没有处理异常,则会出现以下错误:Task exception was never retrieved

有两种方法可以解决此问题,在访问future对象的结果时捕获异常或调用future对象的异常函数:

try:
    # 调用结果时捕获异常
    my_promise.result()
catch Exception:
    pass

# 获取在协程执行过程中抛出的异常
my_promise.exception()

asyncio使用详解

基本使用

  • 协程完整的工作流程是这样的:

    • 定义或创建协程对象
    • 将协程转为task任务
    • 定义事件循环对象容器
    • 将task任务扔进事件循环对象中触发运行
  • 协程:协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。注意:简单地调用一个协程并不会将其加入执行队列。

  • 要真正运行一个协程,asyncio 提供了三种主要机制:

    • asyncio.run()函数用来运行最高层级的入口点协程函数。该函数运行传入的协程,负责管理asyncio事件循环并结束异步生成器。当有其他asyncio事件循环在同一线程中运行时,此函数不能被调用。如果debug为True,事件循环将以调试模式运行。该函数总是会创建一个新的事件循环并在结束时关闭。它应当被用作asyncio程序的主入口点,理想情况下应当只被调用一次。该函数在Python 3.7被引入,会隐式处理事件循环。
    • 使用await关键字等待一个协程。
    • asyncio.create_task()函数用来并发运行作为asyncio任务的多个协程。当一个协程通过asyncio.create_task()等函数被打包为一个Task对象,该协程将自动排入队列准备立即运行。然后通过await或者asyncio.run()自动运行该任务。该函数在Python 3.7中被加入。在Python 3.7之前,可以改用低层级的asyncio.ensure_future()函数。
  • 三种运行方式代码如下:

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
        return delay
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        # 通过await等待运行,此时两个任务按顺序运行
        result1 = await say_after(2, 'hello')
        result2 = await say_after(1, 'world')
    
        print(result1, result2)
    
        task1 = asyncio.create_task(say_after(2, 'hello2'))
        task2 = asyncio.create_task(say_after(1, 'world2'))
    
        # 通过asyncio.task()包装为task然后await等待运行,此时两个任务并发运行
        result3 = await task1
        result4 = await task2
        print(result3, result4)
    
        print(f"finished at {time.strftime('%X')}")
    
    # 通过asyncio.run()函数运行
    asyncio.run(main())
    # 下面相当于上面的asyncio.run()函数
    # loop = asyncio.get_event_loop()
    # try:
    #     loop.run_until_complete(main())
    # finally:
    #     loop.close()
  • 可等待对象:如果一个对象可以在await语句中使用,那么它就是可等待对象。许多 asyncio API 都被设计为接受可等待对象。可等待对象有三种主要类型:协程、Task对象和Future对象。

  • Future对象:Future对象是一种特殊的低层级可等待对象,表示一个异步操作的最终结果。当一个Future对象被等待,这意味着协程将保持等待直到该Future对象在其他地方操作完毕。在asyncio中需要Future对象以便允许通过async/await使用基于回调的代码。通常情况下没有必要在应用层级的代码中创建Future对象。

  • asyncio.sleep():阻塞delay指定的秒数。如果指定了result,则当协程完成时将其返回给调用者。sleep()总是会挂起当前任务,以允许其他任务运行。

  • 注意:如果不在main()函数中await其他协程,其他协程可能来不及运行就被取消了。因为asyncio.run(main())调用的式是loop.run_until_complete(main()),在没有await的情况下,事件循环只关注main()函数一个协程的结束,而不管main()函数中的其他协程任务,没有await,其他协程任务可能在任务完成前被取消。如果需要获取当前待处理Task对象的列表,可以使用asyncio.all_tasks()函数,使用asyncio.current_task()函数获取当前运行的Task实例,如果没有正在运行的任务则返回None。

并发运行任务

  • asyncio.gather(*aws, loop=None, return_exceptions=False):并发运行aws序列中的可等待对象。如果aws中的某个可等待对象为协程,它将自动作为一个Task加入队列。如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与aws中可等待对象的顺序一致。

  • 代码如下:

    import asyncio
    
    async def factorial(name, number):
        f = 1
        for i in range(2, number + 1):
            print(f"Task {name}: Compute factorial({i})...")
            await asyncio.sleep(1)
            f *= i
        print(f"Task {name}: factorial({number}) = {f}")
        return f
    
    async def main():
        # 并发运行三个任务
        result = await asyncio.gather(
            factorial("A", 5),
            factorial("B", 3),
            factorial("C", 4),
        )
    
        print(result)
    
    asyncio.run(main())

等待任务

  • asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED):并发运行aws指定的可等待对象并阻塞线程直到满足return_when指定的条件。如果aws中的某个可等待对象为协程,它将自动作为任务加入日程。直接向wait()传入协程对象已弃用,因为这会导致令人迷惑的行为。返回两个Task/Future集合: (done, pending),(已完成的,未完成的)

回调

  • 可通过task.add_done_callback()方法给任务添加回调函数,当任务执行完成时会自动调用该函数并传入Task对象。

    import asyncio
    
    def callback(task):
        print(task, "done")
    
    async def hello():
        print("hello")
        await asyncio.sleep(0)
    
    async def main():
        task = asyncio.create_task(hello())
        task.add_done_callback(callback)
        await task
    
    asyncio.run(main())