concurrnet.futures多线程常见用法

本文档转换自jupyter notebook, 介绍了concurrnet.futures中多线程相关的常见用法。


concurrnet.futures多线程常见用法

import concurrent.futures
import time

def task(n):
    time.sleep(n)
    return n
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

1. map方式调用

results = executor.map(task,[i for i in range(5)])
for i in results:print(i)

0 1 2 3 4

map并不会阻塞线程,因为map方法会生成一个Iterator,通过迭代Iterator我们可以得到各个线程的结果,而迭代Iterator的时候会阻塞。

2. submit方式调用

2.1 submit配合future.result()

futures = [executor.submit(task,i) for i in range(5)]
for future in futures:print(future.result())

0 1 2 3 4

这样看着没什么问题,实际上不太推荐。因为这种迭代futures的方式会一个个迭代 future.result(),挨个等待future的结果,这个时候会阻塞。实际实战中,很多时候列表中任务并不是按照顺序完成,我们希望等到有任意一个任务完成就做下一步动作,类似select操作。这种时候我们就要使用 concurrent.futures.wait()方法或者 concurrent.futures.as_completed()

2.2 使用wait等待futures

futures = [executor.submit(task, i) for i in range(5,0,-1)]


while futures:
    done,_= concurrent.futures.wait(futures,return_when=concurrent.futures.FIRST_COMPLETED)

    for i in done:
        print(i.result())
        futures.remove(i)

1 2 3 4 5

Oh man, this is what I want! 太优雅了!这样的话,我们就可以只要有一个任务完成了,就做一些事情。实际上,我们还可以加上timeout参数,让他即使没有任务完成,也要每隔一段时间做一些事情。

2.3 as_completed()

as_completed()会返回一个迭代器,按照完成的顺序返回值。这个也很优雅!并且也可以设置timeout。

futures = [executor.submit(task,i) for i in range(5,0,-1)]
future_iterator = concurrent.futures.as_completed(futures)

print(f'wait, i am handling...')
for i in future_iterator:
    print(i.result())
    print(f'wait, i am handling...')
print('finished!')

wait, i am handling... 1 wait, i am handling... 2 wait, i am handling... 3 wait, i am handling... 4 wait, i am handling... 5 wait, i am handling... finished!

总结一下

  • [ ] 这样看来,大多数时候,我们都应该用 executor.submit()组织一个futures列表,然后,通过wait或者as_completed等待任务完成。map是独一档的存在。使用map省略了futures的细节,直接给你返回results的列表生成器。并且他是按照列表顺序返回,并不是按照完成顺序。