UDN-企业互联网技术人气社区

板块导航

浏览  : 1317
回复  : 0

[干货] Python并发编程之协程/异步IO

[复制链接]
开花包的头像 楼主
发表于 2017-1-4 22:09:37 | 显示全部楼层 |阅读模式
  引言

  随着node.js的盛行,相信大家今年多多少少都听到了异步编程这个概念。Python社区虽然对于异步编程的支持相比其他语言稍显迟缓,但是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await语法层面的支持,刚正式发布的Python3.6中asynico也已经由临时版改为了稳定版。下面我们就基于Python3.4+来了解一下异步编程的概念以及asyncio的用法。

  什么是协程

  通常在Python中我们进行并发编程一般都是使用多线程或者多进程来实现的,对于计算型任务由于GIL的存在我们通常使用多进程来实现,而对与IO型任务我们可以通过线程调度来让线程在执行IO任务时让出GIL,从而实现表面上的并发。

  其实对于IO型任务我们还有一种选择就是协程,协程是运行在单线程当中的“并发”,协程相比多线程一大优势就是省去了多线程之间的切换开销,获得了更大的运行效率。Python中的asyncio也是基于协程来进行实现的。在进入asyncio之前我们先来了解一下Python中怎么通过生成器进行协程来实现并发。

  example1

  我们先来看一个简单的例子来了解一下什么是协程(coroutine),对生成器不了解的朋友建议先看一下Stackoverflow上面的这篇高票回答。

  1. >>> def coroutine():
  2. ...     reply = yield 'hello'
  3. ...     yield reply
  4. ...
  5. >>> c = coroutine()
  6. >>> next(c)
  7. 'hello'
  8. >>> c.send('world')
  9. 'world'
复制代码


  下面这个程序我们要实现的功能就是模拟多个学生同时向一个老师提交作业,按照传统的话我们或许要采用多线程/多进程,但是这里我们可以采用生成器来实现协程用来模拟并发。

  如果下面这个程序读起来有点困难,可以直接跳到后面部分,并不影响阅读,等你理解协程的本质,回过头来看就很简单了。

  1. from collections import deque
  2. def student(name, homeworks):
  3.     for homework in homeworks.items():
  4.         yield (name, homework[0], homework[1])  # 学生"生成"作业给老师
  5. class Teacher(object):
  6.     def __init__(self, students):
  7.         self.students = deque(students)
  8.     def handle(self):
  9.         """老师处理学生作业"""
  10.         while len(self.students):
  11.             student = self.students.pop()
  12.             try:
  13.                 homework = next(student)
  14.                 print('handling', homework[0], homework[1], homework[2])
  15.             except StopIteration:
  16.                 pass
  17.             else:
  18.                 self.students.appendleft(student)
复制代码


  下面我们来调用一下这个程序。

  1.  Teacher([
  2.     student('Student1', {'math': '1+1=2', 'cs': 'operating system'}),
  3.     student('Student2', {'math': '2+2=4', 'cs': 'computer graphics'}),
  4.     student('Student3', {'math': '3+3=5', 'cs': 'compiler construction'})
  5. ]).handle()
复制代码


  这是输出结果,我们仅仅只用了一个简单的生成器就实现了并发(concurrence),注意不是并行(parallel),因为我们的程序仅仅是运行在一个单线程当中。

  1. handling Student3 cs compiler construction
  2. handling Student2 cs computer graphics
  3. handling Student1 cs operating system
  4. handling Student3 math 3+3=5
  5. handling Student2 math 2+2=4
  6. handling Student1 math 1+1=2
复制代码


  ##使用asyncio模块实现协程

  从Python3.4开始asyncio模块加入到了标准库,通过asyncio我们可以轻松实现协程来完成异步IO操作。

  解释一下下面这段代码,我们创造了一个协程display_date(num, loop),然后它使用关键字yield from来等待协程asyncio.sleep(2)的返回结果。而在这等待的2s之间它会让出CPU的执行权,直到asyncio.sleep(2)返回结果。

  1. # coroutine.py
  2. import asyncio
  3. import datetime
  4. @asyncio.coroutine  # 声明一个协程
  5. def display_date(num, loop):
  6.     end_time = loop.time() + 10.0
  7.     while True:
  8.         print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
  9.         if (loop.time() + 1.0) >= end_time:
  10.             break
  11.         yield from asyncio.sleep(2)  # 阻塞直到协程sleep(2)返回结果
  12. loop = asyncio.get_event_loop()  # 获取一个event_loop
  13. tasks = [display_date(1, loop), display_date(2, loop)]
  14. loop.run_until_complete(asyncio.gather(*tasks))  # "阻塞"直到所有的tasks完成
  15. loop.close()
复制代码


  下面是运行结果,注意到并发的效果没有,程序从开始到结束只用大约10s,而在这里我们并没有使用任何的多线程/多进程代码。在实际项目中你可以将asyncio.sleep(secends)替换成相应的IO任务,比如数据库/磁盘文件读写等操作。

  1. ziwenxie :: ~ » python coroutine.py
  2. Loop: 1 Time: 2016-12-19 16:06:46.515329
  3. Loop: 2 Time: 2016-12-19 16:06:46.515446
  4. Loop: 1 Time: 2016-12-19 16:06:48.517613
  5. Loop: 2 Time: 2016-12-19 16:06:48.517724
  6. Loop: 1 Time: 2016-12-19 16:06:50.520005
  7. Loop: 2 Time: 2016-12-19 16:06:50.520169
  8. Loop: 1 Time: 2016-12-19 16:06:52.522452
  9. Loop: 2 Time: 2016-12-19 16:06:52.522567
  10. Loop: 1 Time: 2016-12-19 16:06:54.524889
  11. Loop: 2 Time: 2016-12-19 16:06:54.525031
  12. Loop: 1 Time: 2016-12-19 16:06:56.527713
  13. Loop: 2 Time: 2016-12-19 16:06:56.528102
复制代码


  在Python3.5中为我们提供更直接的对协程的支持,引入了async/await关键字,上面的代码我们可以这样改写,使用async代替了@asyncio.coroutine,使用了await代替了yield from,这样我们的代码变得更加简洁可读。

  1. import asyncio
  2. import datetime
  3. async def display_date(num, loop):  # 声明一个协程
  4.     end_time = loop.time() + 10.0
  5.     while True:
  6.         print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
  7.         if (loop.time() + 1.0) >= end_time:
  8.             break
  9.         await asyncio.sleep(2)  # 等同于yield from
  10. loop = asyncio.get_event_loop()  # 获取一个event_loop
  11. tasks = [display_date(1, loop), display_date(2, loop)]
  12. loop.run_until_complete(asyncio.gather(*tasks))  # "阻塞"直到所有的tasks完成
  13. loop.close()
复制代码


  asyncio模块详解

  开启事件循环有两种方法,一种方法就是通过调用run_until_complete,另外一种就是调用run_forever。run_until_complete内置add_done_callback,使用run_forever的好处是可以通过自己自定义add_done_callback,具体差异请看下面两个例子。

  run_until_complete()

  1. import asyncio
  2. async def slow_operation(future):
  3.     await asyncio.sleep(1)
  4.     future.set_result('Future is done!')
  5. loop = asyncio.get_event_loop()
  6. future = asyncio.Future()
  7. asyncio.ensure_future(slow_operation(future))
  8. print(loop.is_running())  # False
  9. loop.run_until_complete(future)
  10. print(future.result())
  11. loop.close()
复制代码


  run_forever()

  run_forever相比run_until_complete的优势是添加了一个add_done_callback,可以让我们在task(future)完成的时候调用相应的方法进行后续处理。

  1. import asyncio
  2. async def slow_operation(future):
  3.     await asyncio.sleep(1)
  4.     future.set_result('Future is done!')
  5. def got_result(future):
  6.     print(future.result())
  7.     loop.stop()
  8. loop = asyncio.get_event_loop()
  9. future = asyncio.Future()
  10. asyncio.ensure_future(slow_operation(future))
  11. future.add_done_callback(got_result)
  12. try:
  13.     loop.run_forever()
  14. finally:
  15.     loop.close()
复制代码


  这里还要注意一点,即使你调用了协程方法,但是如果事件循环没有开启,协程也不会执行,参考官方文档的描述,我刚被坑过。

Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future() function or the AbstractEventLoop.create_task() method. Coroutines (and tasks) can only run when the event loop is running.

  Call

  call_soon()

  1. import asyncio
  2. def hello_world(loop):
  3.     print('Hello World')
  4.     loop.stop()
  5. loop = asyncio.get_event_loop()
  6. # Schedule a call to hello_world()
  7. loop.call_soon(hello_world, loop)
  8. # Blocking call interrupted by loop.stop()
  9. loop.run_forever()
  10. loop.close()
复制代码


  下面是运行结果,我们可以通过call_soon提前注册我们的task,并且也可以根据返回的Handle进行cancel。

  
  1. Hello World
复制代码


  call_later()

  1. import asyncio
  2. import datetime
  3. def display_date(end_time, loop):
  4.     print(datetime.datetime.now())
  5.     if (loop.time() + 1.0) < end_time:
  6.         loop.call_later(1, display_date, end_time, loop)
  7.     else:
  8.         loop.stop()
  9. loop = asyncio.get_event_loop()
  10. # Schedule the first call to display_date()
  11. end_time = loop.time() + 5.0
  12. loop.call_soon(display_date, end_time, loop)
  13. # Blocking call interrupted by loop.stop()
  14. loop.run_forever()
  15. loop.close()
复制代码


  改动一下上面的例子我们来看一下call_later的用法,注意这里并没有像上面那样使用while循环进行操作,我们可以通过call_later来设置每隔1秒去调用display_date()方法。

  1. 2016-12-24 19:17:13.421649
  2. 2016-12-24 19:17:14.422933
  3. 2016-12-24 19:17:15.424315
  4. 2016-12-24 19:17:16.425571
  5. 2016-12-24 19:17:17.426874
复制代码


  Chain coroutines

  1. import asyncio
  2. async def compute(x, y):
  3.     print("Compute %s + %s ..." % (x, y))
  4.     await asyncio.sleep(1.0)  # 协程compute不会继续往下面执行,直到协程sleep返回结果
  5.     return x + y
  6. async def print_sum(x, y):
  7.     result = await compute(x, y)  # 协程print_sum不会继续往下执行,直到协程compute返回结果
  8.     print("%s + %s = %s" % (x, y, result))
  9. loop = asyncio.get_event_loop()
  10. loop.run_until_complete(print_sum(1, 2))
  11. loop.close()
复制代码


  下面是输出结果

  1. ziwenxie :: ~ » python chain.py
  2. Compute 1 + 2 ...
  3. 1 + 2 = 3
复制代码


  在爬虫中使用asyncio来实现异步IO

  下面我们来通过一个简单的例子来看一下怎么在Python爬虫项目中使用asyncio。by the way: 根据我有限的实验结果,如果要充分发挥asynio的威力,应该使用aiohttp而不是requests。而且也要合理使用concurrent.futures模块提供的线程池/进程池,这一点我会在下一篇博文描述。

  1. import asyncio
  2. import requests
  3. async def spider(loop):
  4.     # run_in_exectuor会返回一个Future,而不是coroutine object
  5.     future1 = loop.run_in_executor(None, requests.get, 'https://www.python.org/')
  6.     future2 = loop.run_in_executor(None, requests.get, 'http://httpbin.org/')
  7.     # 通过命令行可以发现上面两个网络IO在并发进行
  8.     response1 = await future1  # 阻塞直到future1完成
  9.     response2 = await future2  # 阻塞直到future2完成
  10.     print(len(response1.text))
  11.     print(len(response2.text))
  12.     return 'done'
  13. loop = asyncio.get_event_loop()
  14. # If the argument is a coroutine object, it is wrapped by ensure_future().
  15. result = loop.run_until_complete(spider(loop))
  16. print(result)
  17. loop.close()
复制代码


  p.s: 如果你能自己体会到为什么盲目地使用线程池/进程池并不能提高基于asynico模块的程序的效率,我想你对协程的理解也差不多了。

  References

  DOCUMENTATION OF ASYNCIO

  COROUTINES AND ASYNC/AWAIT

  STACKOVERFLOW

  PyMOTW-3

原文作者:佚名  来源:开发者头条

相关帖子

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于我们
联系我们
  • 电话:010-86393388
  • 邮件:udn@yonyou.com
  • 地址:北京市海淀区北清路68号
移动客户端下载
关注我们
  • 微信公众号:yonyouudn
  • 扫描右侧二维码关注我们
  • 专注企业互联网的技术社区
版权所有:用友网络科技股份有限公司82041 京ICP备05007539号-11 京公网网备安1101080209224 Powered by Discuz!
快速回复 返回列表 返回顶部