Python并發和并行方案
在Python世界有3種并發和并行方案,如下:
多線程(threading)
多進程(multiprocessing)
異步IO(asyncio)
注: 并發和并行的區別先不提,最后會借著例子更好的解釋,另外稍后也會提到 concurrent.futures,不過它不是一種獨立的方案,所以在這里沒有列出來。
這些方案是為了解決不同特點的性能瓶頸。性能問題主要有2種:
CPU密集型(CPU-bound)。這也就是指計算密集型任務,它的特點是需要要進行大量的計算。例如Python內置對象的各種方法的執行,科學計算,視頻轉碼等等。
I/O密集型(I/O-bound)。凡是涉及到網絡、內存訪問、磁盤I/O等的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待I/O操作完成。例如數據庫連接、Web服務、文件讀寫等等。
如果你不知道一個任務哪種類型,我的經驗是你問問自己,如果給你一個更好更快的CPU它可以更快,那么這就是一個CPU密集的任務,否則就是I/O密集的任務。
這三個方案中對于CPU密集型的任務,優化方案只有一種,就是使用多進程充分利用多核CPU一起完成任務,達到提速的目的。而對于I/O密集型的任務,則這三種方案都可以。
接著借著一個抓取網頁并寫入本地(典型的I/O密集型任務)小例子來挨個拆解對比一下這些方案。先看例子:
import requests
url?
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36' # noqa
}
def fetch(session, page):
with (session.get(f'{url}{page*25}', headers=headers) as r,
open(f'top250-{page}.html', 'w') as f):
f.write(r.text)
def main():
with requests.Session() as session:
for p in range(25):
fetch(session, p)
if __name__ == '__main__':
main()
在這個例子中會抓取豆瓣電影Top250的25個頁面(每頁顯示10個電影),使用requests庫,不同頁面按順序請求,一共花了3.9秒:
? time python io_non_concurrent.py
python io_non_concurrent.py 0.23s user 0.05s system 7% cpu 3.911 total
這個速度雖然看起來還是很好的,一方面是豆瓣做了很好的優化,一方面我家的帶寬網速也比較好。接著用上面三種方案優化看看效果。
多進程版本
Python解釋器使用單進程,如果服務器或者你的電腦是多核的,這么用其實是很浪費的,所以可以通過多進程提速:
from multiprocessing import Pool
def main():
with (Pool() as pool,
requests.Session() as session):
pool.starmap(fetch, [(session, p) for p in range(25)])
注: 這里省略到了那些上面已經出現的了代碼,只展示改變了的那部分。
使用多進程池,但沒指定進程數量,所以會按著Macbook的核數啟動10個進程一起工作,耗時如下:
? time python use_multiprocessing.py
python use_multiprocessing.py 2.15s user 0.30s system 232% cpu 1.023 total
多進程理論上可以有十倍效率的提升,因為10個進程在一起執行任務。當然由于任務數量是25,不是整數倍,是無法達到10倍的降低耗時,而且由于抓取太快了,沒有充分顯示多進程方案下的效率提升,所以用時1秒,也就是大約4倍的效率提升。
多進程方案下沒有明顯的缺點,只要機器夠強悍,就可以更快。
多線程版本
Python解釋器不是線程安全的,為此Python設計了GIL: 獲得GIL鎖才可以訪問線程中的Python對象。所以在任何一個時間,只有一個線程可以執行代碼,這樣就不會引發競態條件(Race Condition) ,雖然GIL的問題很多,但是GIL卻是還有它存在的優點,例如簡化了內存管理等等,這些不是本文重點所以就不展開了,有興趣的可以專門去了解。
那么有同學會問,既然同一時間永遠只有一個線程在工作,那么多線程可以提高并發效率的原因是什么呢?
解釋這個問題還是要提GIL。延伸閱讀鏈接1《Understanding the Python GIL》中做了很好的解釋(這里要注意,我們提的方案是Python 3.2新的GIL,而不是Python2的舊版GIL,現在網上有很多針對舊的GIL的描述,其實是過時的,這部分也可以看看延伸閱讀鏈接2的文章幫助理解它們的區別),我截其中幾張PPT來說明:
在上圖里,本來只有1個線程,所以不需要釋放或者獲得GIL,但是接著出現了第二個線程,這樣就是多個線程,一開始線程2是掛起狀態,因為它沒有GIL。
線程1在一個 cv_wait周期內會自愿的放棄GIL,例如出現了I/O阻塞,或者超時了(線程不能一直拿著不放,即便在一個周期內沒有出現I/O阻塞也要強制釋放執行權,這個默認時間是5毫秒,可以通過 sys.setswitchinterval設置,當然設置前你得知道你在做什么)都會觸發這個釋放GIL的操作。
這里演示了常規的例子(非超時被迫釋放),在 cv_wait階段,線程1由于遇到了I/O阻塞,會發送信號給線程2,此時線程1讓出GIL并掛起,而線程2獲得GIL,如此循環,之后線程2會釋放GIL給線程1。這個PPT在業界非常知名,建議大家多看看。之后的PPT還列舉了超時的處理,由于和我們這篇文章關系稍遠也不展開了,有興趣的接著看。btw,我第一次看這個PPT覺得這個超時時間好可怕,也就是說1秒鐘要最少切換200次,這也太浪費了,所以你可以嘗試在代碼中調大這個超時時間喲。
通過上面的內容,多線程通過GIL的控制,每個線程都得到了更好的執行時機,所以不會出現被某個線程任務一直阻塞,因為如果線程遇到阻塞會自愿讓出GIL讓自己掛起,把機會讓給其他線程,這樣就提高了執行任務總體的效率。多線程模式下最完美的場景就是任何時間點對應的線程都在做事,而不是有的線程其實等著被執行,但是實際上卻被阻塞著。
我們看一下多線程的方案:
from multiprocessing.pool import ThreadPool
def main():
with (ThreadPool(processes=5) as pool,
requests.Session() as session):
pool.starmap(fetch, [(session, p) for p in range(25)])
這里說明2點:
多進程和多線程例子中我都使用了【池】,這是一個好的習慣,因為線(進)程過多會帶來額外的開銷,其中包括創建銷毀的開銷、調度開銷等等,同時也降低了計算機的整體性能。使用線(進)程池維護多個線(進)程,等待監督管理者分配可并發執行的任務。這樣一方面避免了處理任務時創建銷毀線程開銷的代價,另一方面避免了線程數量膨脹導致的過分調度問題,保證了對內核的充分利用。另外用標準庫里的進程池和線程池的實現寫額外代碼極少,而且代碼結構還很像,特別適合寫對比的例子。
processes如果不指定也是和CPU核數一致的10,但是并不是線程越多越好,因為線程多了,反而出現本來正常有效的執行卻被GIL強制釋放,這就造成多余上下文切換反而是一個負擔了。
在這個例子中,線程數為5,這個其實一方面是經驗,一方面是多次調試值的結果,所以這也暴露了多線程編程中如果稍有不慎會讓優化變差,也會存在沒有找到最優值得問題,因為GIL控制線程是一個黑盒操作,開發者無法直接控制,這哪怕對一些相對有經驗的Python開發也非常不友好。
我們看一下時間:
? time python use_threading.py
python use_threading.py 0.62s user 0.24s system 74% cpu 1.157 total
可以看到,多線程方案下比原始方案速度快了一倍以上,但是比多進程方案差一點(事實上我認為在真實的例子中會差很多)。這是因為在多進程方案下多核CPU都在獨立工作,而多線程方案一方面由于效率問題下不能使用那么多數量的線程,而且由于GIL的限制,在不需要被釋放GIL的時候依然被強制釋放,就這么不斷的切換的過程中反而降低了效率,讓效果大打折扣。
concurrent.futures版本
這里也順便提一下 concurrent.futures的方案。其實它不是一個全新的方案,這是在其他語言(例如Java)里早就出現的一種框架,可以通過它控制線(進)程的啟動、執行和關閉。我把它理解為抽象了多進程池和多線程池的代碼,讓開發者不需要關注多線程和多進程模塊的具體細節和用法。其實理解起來也不難,你可以這么拆解:
其實理解起來也不難,例如ThreadPoolExecutor可以這么拆解: ThreadPoolExecutor=Thread+Pool+Executor,其實就是線程+池+執行器。就是預先創建一個線程池用來被重復使用,Executor將任務提交和任務執行進行解耦,它完成線程的調配(如何以及何時)和任務的執行部分。
如果你想了解它的細節,我推薦直接看它的源碼文件頭部的注釋,里面對于數據流有非常詳細的說明,可以說比任何技術文章寫的都要深入準確了。
這里只演示一下ThreadPoolExecutor的用法:
from functools import partial
from concurrent.futures import ThreadPoolExecutor
def main():
with (ThreadPoolExecutor(max_workers=5) as pool,
requests.Session() as session):
list(pool.map(partial(fetch, session), range(25)))
是不是很熟悉的配方?接口和上面用的進程池線程池都很像,但是要注意 max_workers如果不指定的話數量是CPU個數+4,最大為32。它和多線程的用法問題一樣,這個 max_workers需要調優(這里為了對比,所以用了相同的數值)。
? time python use_executor.py
python use_executor.py 0.63s user 0.32s system 82% cpu 1.153 total
雖然 concurrent.futures是現在更主流的方案,但是在我使用的體驗里,它的效率要略低于直接使用進程池或者線程池的代碼,因為它高度抽象,卻把事情搞得復雜了,例如用到了對應的queue(queue模塊)和信號量(Semaphore),這些反而限制了性能的提升。所以我的建議是,Python初學者可以用它,但高級開發者應該自己控制并發實現。
asyncio版本
前面的多線程相關的方案中,需要開發者根據經驗或者去實驗,找到一個(或者多個)最優的線程數量,不同的場景這個值區別是很大的,這對于初學者很不友好,非常容易陷入【在用多線程,但是用錯了或者用的不夠好】這么一種境地。
后來Python引入了新的并發模型: aysncio,本小節給大家解釋下最新的asyncio方案為什么是一個更優的選擇。首先還是看《Understanding the Python GIL》里面的一頁PPT:
我們回憶一下,它提到當只有單個線程時,實際上不會觸發GIL,這個獨立的線程可以一直執行下去。這也是asyncio找到的切入點: 因為是單進程單線程的,所以理論上不受GIL的限制。在事件驅動的機制下,可以更好的利用單線程的性能,尤其是通過await關鍵詞可以讓開發者自己決定調度方案,而不是多線程那種由GIL來控制。
那設想一下,在最美好的情況下,所有await的地方都是可能的I/O阻塞的。那么在執行時,遇到I/O阻塞就可以切換協程,執行其他可以繼續執行的任務,所以,這個線程一直都在工作而不會阻塞,可以說利用率達到100%!這是多線程方案下永遠不可及的。
講到這個,我們再回去重新整理和理解一遍,先出基本理論開始。
協程
協程是一種特殊函數,這個函數在本來的def關鍵字前面加了async關鍵字,本質上它是生成器函數,可以生成值或者接收外面發送(通過send方法)來的值,但是它最重要的特點是它可以在需要時保存上下文(或者說狀態),掛起自己并將控制權交給調用者,由于它保存了掛起時的上下文,在未來可以接著被執行。
其實在調用協程是,它并不會立刻執行:
In : async def a():
...:? ? ?print('Called')
...:
In : a()? # 并未執行,只是返回了協程對象
Out:
In : await a()? # 使用await才會真的執行
Called
異步和并發
異步(asynchronous)、非阻塞(non-blocking)、并發(concurrent)是很容易讓人產生迷惑的詞。結合asyncio場景,我的理解是:
協程是異步執行的,在asyncio中,協程可以在等待執行結果時把自己【暫停】,以便讓其他協程同時運行。
異步讓執行不需要等待阻塞的邏輯完成就可以先讓其他代碼同時運行,所以這樣就不會【阻塞】其他代碼,那么這就是【非阻塞】的代碼
使用異步代碼編寫的程序執行時,看起來其中的任務都在同時執行和完成(因為會在等待中切換),所以看起來是【并發】的
事件循環(EventLoop)
Event Loop這個概念其實我理解了很多年,從Twisted時代開始。我一直覺得它非常神秘復雜,現在看來其實想多了。對于初學者,不如換個思路,它的重點就是事件+循環: Loop是一個環,每個任務作為一個事件放到這個環上,事件會不斷地循環,在符合條件的情況下觸發執行事件。它的特點如下:
一個事件循環運行在一個線程中
Awaitables對象(協程、Task、Future下面都會提到)都可以注冊到事件循環上
如果協程中調用了另外一個協程(通過await),這個協程會掛起,發生上下文切換轉而去執行另外這個協程,如此循環
如果協程執行時遇到I/O阻塞,這個協程會帶著上下文掛起,然后把控制權交還給EventLoop
既然是loop。注冊的全部事件執行完畢后,循環會重新開始
Future/Task
asyncio.Future我覺得像Javascript里面的 Promise, 它是一個占位對象,代表一件還沒有做完的事情,在未來才會實現或者完成(當然還可能由于內部出錯而拋出異常)。它和上面提的 concurrent.futures方案中實現的 concurrent.futures.Futures很像,但是針對asyncio的事件循環做了很多定制。asyncio.Future它僅僅是一個數據的容器。
asyncio.Task是 asyncio.Future的子類,它用于在事件循環中運行協程。
在官方文檔中提到了一個非常直觀的例子,我這里改寫它在IPython里面執行并說明:
In : async def set_after(fut):? # 創建一個協程,他會異步的sleep3秒,然后給future對象設置結果
...:? ? ?await asyncio.sleep(3)
...:? ? ?fut.set_result('Done')
...:
In : loop = asyncio.get_event_loop()? # 獲取當前的事件循環
In : fut = loop.create_future()? # 在事件循環中創建一個Future
In : fut? # 此時它還是默認的pending狀態,因為沒有調用它
Out:
In : task = loop.create_task(set_after(fut))? # 在事件循環中創建(或者說注冊)了一個任務
In : task? # 馬上輸入它,此時剛創建任務,還在執行中
Out:
In : fut? # 馬上輸入它,此時剛創建任務,還沒有執行完所以future沒有變化
Out:
In : task? # 過了三秒,任務執行完成了
Out:
In : fut? # Future也已經設置了結果,所以狀態是finished
Out:
可以感受到:
Future對象不是任務,就是存放狀態的一個容器
create_task會讓事件循環調度協程的執行
創建任務可以用 ensure_future和 create_task, ensure_future是一個更高級封裝的函數,但是Python3.7以上版本應該使用 create_task
接著是了解await的作用。如果協程中await一個Future對象,Task會暫停協程的執行并等待Future的完成。而當Future完成后,包裝協程的執行將繼續:
In : async def a():
...:? ? ?print('IN a')
...:? ? ?await b()
...:? ? ?await c()
...:? ? ?print('OUT a')
...:
In : async def b():
...:? ? ?print('IN b')
...:? ? ?await d()
...:? ? ?print('OUT b')
...:
...:
In : async def c():
...:? ? ?print('IN c')
...:? ? ?await asyncio.sleep(1)
...:? ? ?print('OUT c')
...:
...:
In : async def d():
...:? ? ?print('IN d')
...:? ? ?await asyncio.sleep(1)
...:? ? ?print('OUT d')
...:
In : asyncio.run(a())
IN a
IN b
IN d
OUT d
OUT b
IN c
OUT c
OUT a
這個例子中,a的入口函數,其中調用b和c,b又會調用d。await會讓對應的協程獲取執行權限,協程內await的其他協程都執行完畢才會釋放權限,所以注意這個更像DFS(深度優先搜索),所以執行順序是a->b->d->c。
所以這里就得出結論:
事件循環負責協程的協作調度:事件循環一次運行一個任務。當一個任務等待一個Awaitables對象完成時,事件循環會運行其他任務、回調或執行 IO 操作。
asyncio方案
在asyncio方案里,凡是涉及I/O阻塞操作的庫都要使用aio生態中的庫,所以已經不能再使用requests庫,而是需要使用aiohttp,另外文件操作需要使用aiofiles。最終代碼如下(這個2個包需要下載再使用):
import aiofiles
import asyncio
import aiohttp
async def fetch(session, page):
? ? r = await session.get(f'{url}{page*25}', headers=headers)
? ? async with aiofiles.open(f'top250-{page}.html', mode='w') as f:
? ? ? ? await f.write(await r.text())
async def main():
? ? loop = asyncio.get_event_loop()
? ? async with aiohttp.ClientSession(loop=loop) as session:
? ? ? ? tasks = [asyncio.ensure_future(fetch(session, p)) for p in range(25)]
? ? ? ? await asyncio.gather(*tasks)
if __name__ == '__main__':
? ? asyncio.run(main())
看一下效率:
? time python use_asyncio.py
python use_asyncio.py? 0.20s user 0.04s system 34% cpu 0.684 total
所以asyncio的優點如下:
asyncio用好了,是這些并發方案中最快的
它支持數千級別的活動連接,這對于websockets和MQTT之類的場景下性能可以表現的很好,而多線程方案中在這個規模的線程數量下會出現嚴重的性能問題。
多線程方案下線程切換是隱式的,我們無法確認它何時會切換線程的執行權,所以非常容易出現競態條件(Race Condition)。而asyncio方案里協程的切換是顯式、明確的,開發者可以明確地獲知或者指定執行的順序
并發和并行
我之前翻到了一個對比這些方案的說法(延伸鏈接4),其中也提到了并發和并行,說的特別形象,我加以說明:
多進程。10個廚房,10個廚子,10道菜。也是1個廚房1廚子做1道菜。
多線程。1個廚房,10個廚子,10道菜。因為廚房比較小,只能大家一起擠在里面,事實上是輪著做,而且一個廚師在做的時候其他人只能等著輪到自己。
asyncio。1個廚房,1個廚子,10道菜。聽起來好像這就是一個順序執行,但事實上,當某道菜需要燉或者其他什么耗時的烹飪方法時,可以同時做其他的菜或者做準備,最美好的場景是這個廚師一直在忙著做。
對于并發和并行我推薦看一下延伸閱讀連接3的文章。并發(Concurrency)允許同時執行多個任務,這些任務可能訪問相同的共享資源,例如硬盤、網絡以及對應的那個單核CPU。既然會出現訪問共享資源,就可能出現競態條件,所以某個時間點事實上只有一個任務在執行,在本質上目標是當一個任務被迫等待外部資源時,通過在它們之間切換來防止任務相互阻塞,系統會有機制保證這些任務都在推進。并行(Parallelism)是指多個任務在獨立分區的資源(如多個CPU內核)上并行運行,這樣可以最大限度地利用硬件資源。
審核編輯:劉清
?
評論
查看更多