说明
Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码。
从Python3.2开始,标准库为我们提供了concurrent.futures模块,concurrent.futures 模块的主要特色是 ThreadPoolExecutor 和
ProcessPoolExecutor 类,这两个类实现的接口能分别在不同的线程或进程中执行可调
用的对象。这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。
Python 3.4 以后标准库中asyncio 包,这个包使用事件循环驱动的协程实现并发。这是 Python 中最大也
是最具雄心壮志的库之一。asyncio 大量使用 yield from 表达式,因此与
Python 旧版不兼容。
submit和map方法
submit方法作用是向线程池提交可回调的task,并返回一个回调实例。
example:
import time from concurrent.futures import ThreadPoolExecutor # 可回调的task def pub_task(msg): time.sleep(3) return msg # 创建一个线程池 pool = ThreadPoolExecutor(max_workers=3) # 往线程池加入2个task task1 = pool.submit(pub_task, 'a') task2 = pool.submit(pub_task, 'b') print(task1.done()) # False time.sleep(4) print(task2.done()) # True print(task1.result()) print(task2.result())
map方法是创建一个迭代器,回调的结果有序放在迭代器中。
问题:
Executor.map 函数易于使用,不过有个特性可能有用,也可能没用,具体情况取决于需求:这个函数返回结果的顺序与调用开始的顺序一致。
如果第一个调用生成结果用时 10秒,而其他调用只用 1 秒,代码会阻塞 10 秒,获取 map 方法返回的生成器产出的第一个结果。
在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束。
如果必须等到获取所有结果后再处理,这种行为没问题;不过,通常更可取的方式是,不管提交的顺序,只要有结果就获取。
为此,要把 Executor.submit 方法和 futures.as_completed 函数结合起来使用。
from concurrent.futures import ThreadPoolExecutor import requests URLS = ['http://www.csdn.com', 'http://QQ.com', 'http://www.leasonlove.cn'] def task(url, timeout=10): return requests.get(url, timeout=timeout) pool = ThreadPoolExecutor(max_workers=3) results = pool.map(task, URLS) for ret in results: print('%s, %s' % (ret.url, ret))
future异步编程
Future可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed import requests URLS = ['http://www.csdn.cn', 'http://qq.com', 'http://www.leasonlove.cn'] def task(url, timeout=1): return requests.get(url, timeout=timeout) with ThreadPoolExecutor(max_workers=3) as executor: future_tasks = [executor.submit(task, url) for url in URLS] for f in future_tasks: if f.running(): print('%s is running' % str(f)) for f in as_completed(future_tasks): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, f_ret.content)) except Exception as e: # 第一个url无响应 f.cancel() print(str(e))
asyncio库协程实现并发
对于gevent 和 asyncio 建议大家放弃Gevent,拥抱asyncio,asyncio是Python3.4以后标准库。
而且由于Gevent直接修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。
但是我们无法保证你在复杂的生产环境中有哪些地方使用这些标准库会由于打了补丁而出现奇怪的问题。
import asyncio import time start = time.time() async def do(x): print('WAIting: ', x) await asyncio.sleep(x) return 'Finish after {}s'.format(x) task1 = do(1) task2 = do(2) task3 = do(4) tasks = [ asyncio.ensure_future(task1), asyncio.ensure_future(task2), asyncio.ensure_future(task3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task result: ', task.result()) end = time.time() print('TIME: ', end - start)
协程与线程
如果使用线程做过重要的编程,你就知道写出程序有多么困难,因为调度程序任何时候都能中断线程。
必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,防止数据处于无效状态。
而协程默认会做好全方位保护,以防止中断。我们必须显式产出才能让程序的余下部分运行。
对协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任意时刻只有一个协程运行。
想交出控制权时,可以使用 yield 或 yield from 把控制权交还调度程序。
这就是能够安全地取消协程的原因:按照定义,协程只能在暂停的 yield处取消,因此可以处理 CancelledError 异常,执行清理操作。
补充知识:Python-什么时候使用yield?
简介
很多时候在python代码中见到了yield,没有系统学习过,自己也没有用过。
yield语句延迟了语句的执行,然后发送了一个值给调用者,但保留了一定的状态去保证函数离开之后可以继续。当继续的时候,函数继续执行上一个的运行状态。这使得它的代码可以随着时间产生一系列的值,而不是立即执行,然后像一个list一样发送他们回来。
例子
例子1:
# A Simple Python program to demonstrate working # of yield # A generator function that yields 1 for first time, # 2 second time and 3 third time def simpleGeneratorFun(): yield 1 yield 2 yield 3 # Driver code to check above generator function for value in simpleGeneratorFun(): print(value)
返回语句发送一个特殊的值给它的调用者,而yield产生了一系列的值,当我们想要遍历一个序列的时候,我们应该使用yield,但不想要把整个序列存储在内存中。
yield用于python的生成器(generator)。一个genertator 被定义得看起来像一个普通函数一样,但它需要产生一个数字得时候,它使用yield,而不是使用return。如果一个函数里面定义了yield,那么它自动称为了一个generator函数。、
例子2:
# A Python program to generate squares from 1 # to 100 using yield and therefore generator # An infinite generator function that prints # next square number. It starts with 1 def nextSquare(): i = 1; # An Infinite loop to generate squares while True: yield i*i i += 1 # Next execution resumes # from this point # Driver code to test above generator # function for num in nextSquare(): if num > 100: break print(num)
输出1,4,9…100