今天跟大家聊一聊無論是在工作中常用還是在面試中常問的線程池,通過畫圖的方式來徹底弄懂線程池的工作原理,以及在實際項目中該如何自定義適合業務的線程池。
一、什么是線程池
線程池其實是一種池化的技術的實現,池化技術的核心思想其實就是實現資源的一個復用,避免資源的重復創建和銷毀帶來的性能開銷。在線程池中,線程池可以管理一堆線程,讓線程執行完任務之后不會進行銷毀,而是繼續去處理其它線程已經提交的任務。
線程池的好處:
降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統 的穩定性,使用線程池可以進行統一的分配,調優和監控。
基于 Spring Boot + MyBatis Plus + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
項目地址:https://github.com/YunaiV/ruoyi-vue-pro
視頻教程:https://doc.iocoder.cn/video/
二、線程池的構造
Java中主要是通過構建ThreadPoolExecutor來創建線程池的,接下來我們看一下線程池是如何構造出來的。
線程池構造參數
corePoolSize:線程池中用來工作的核心的線程數量。
maximumPoolSize:最大線程數,線程池允許創建的最大線程數。
keepAliveTime:超出 corePoolSize 后創建的線程存活時間或者是所有線程最大存活時間,取決于配置。
unit:keepAliveTime 的時間單位。
workQueue:任務隊列,是一個阻塞隊列,當線程數已達到核心線程數,會將任務存儲在阻塞隊列中。
threadFactory :線程池內部創建線程所用的工廠。
handler:拒絕策略;當隊列已滿并且線程數量達到最大線程數量時,會調用該方法處理該任務。
線程池的構造其實很簡單,就是傳入一堆參數,然后進行簡單的賦值操作。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
項目地址:https://github.com/YunaiV/yudao-cloud
視頻教程:https://doc.iocoder.cn/video/
三、線程池的運行原理
說完線程池的核心構造參數的意思,接下來就來畫圖講解這些參數在線程池中是如何工作的。
線程池剛創建出來是什么樣子呢,如下圖
不錯,剛創建出來的線程池中只有一個構造時傳入的阻塞隊列而已,此時里面并沒有的任何線程,但是如果你想要在執行之前已經創建好核心線程數,可以調用prestartAllCoreThreads方法來實現,默認是沒有線程的。
當有線程通過execute方法提交了一個任務,會發生什么呢?
提交任務的時候,其實會去進行任務的處理
首先會去判斷當前線程池的線程數是否小于核心線程數,也就是線程池構造時傳入的參數corePoolSize。
如果小于,那么就直接通過ThreadFactory創建一個線程來執行這個任務,如圖
當任務執行完之后,線程不會退出,而是會去從阻塞隊列中獲取任務,如下圖
接下來如果又提交了一個任務,也會按照上述的步驟,去判斷是否小于核心線程數,如果小于,還是會創建線程來執行任務,執行完之后也會從阻塞隊列中獲取任務。這里有個細節,就是提交任務的時候,就算有線程池里的線程從阻塞隊列中獲取不到任務,如果線程池里的線程數還是小于核心線程數,那么依然會繼續創建線程,而不是復用已有的線程。
如果線程池里的線程數不再小于核心線程數呢?那么此時就會嘗試將任務放入阻塞隊列中,入隊成功之后,如圖
這樣在阻塞的線程就可以獲取到任務了。
但是,隨著任務越來越多,隊列已經滿了,任務放入失敗了,那怎么辦呢?
此時就會判斷當前線程池里的線程數是否小于最大線程數,也就是入參時的maximumPoolSize參數
如果小于最大線程數,那么也會創建非核心線程來執行提交的任務,如圖
所以,從這里可以發現,就算隊列中有任務,新創建的線程還是優先處理這個提交的任務,而不是從隊列中獲取已有的任務執行,從這可以看出,先提交的任務不一定先執行。
但是不幸的事發生了,線程數已經達到了最大線程數量,那么此時會怎么辦呢?
此時就會執行拒絕策略,也就是構造線程池的時候,傳入的RejectedExecutionHandler對象,來處理這個任務。
RejectedExecutionHandler的實現JDK自帶的默認有4種
AbortPolicy:丟棄任務,拋出運行時異常
CallerRunsPolicy:由提交任務的線程來執行任務
DiscardPolicy:丟棄這個任務,但是不拋異常
DiscardOldestPolicy:從隊列中剔除最先進入隊列的任務,然后再次提交任務
線程池創建的時候,如果不指定拒絕策略就默認是AbortPolicy策略。當然,你也可以自己實現RejectedExecutionHandler接口,比如將任務存在數據庫或者緩存中,這樣就數據庫或者緩存中獲取到被拒絕掉的任務了。
到這里,我們發現,線程池構造的幾個參數corePoolSize、maximumPoolSize、workQueue、threadFactory、handler我們都在上述的執行過程中講到了,那么還差兩個參數keepAliveTime和unit(unit是keepAliveTime的時間單位)沒講到,所以keepAliveTime是如何起到作用的呢,這個問題留到后面分析。
說完整個執行的流程,接下來看看execute方法代碼是如何實現的。
execute方法
workerCountOf(c)
workQueue.offer(command):這行代碼就表示嘗試往阻塞隊列中添加任務
添加失敗之后就會再次調用addWorker方法嘗試添加非核心線程來執行任務
如果還是添加非核心線程失敗了,那么就會調用reject(command)來拒絕這個任務。
最后再來另畫一張圖總結execute執行流程
四、線程池中線程實現復用的原理
線程池的核心功能就是實現了線程的重復利用,那么線程池是如何實現線程的復用呢?
線程在線程池內部其實是被封裝成一個Worker對象
Worker繼承了AQS,也就是有一定鎖的特性。
創建線程來執行任務的方法上面提到是通過addWorker方法創建的。在創建Worker對象的時候,會把線程和任務一起封裝到Worker內部,然后調用runWorker方法來讓線程執行任務,接下來我們就來看一下runWorker方法。
啟動線程處理任務
從這張圖可以看出線程執行完任務不會退出的原因,runWorker內部使用了while死循環,當第一個任務執行完之后,會不斷地通過getTask方法獲取任務,只要能獲取到任務,就會調用run方法,繼續執行任務,這就是線程能夠復用的主要原因。
但是如果從getTask獲取不到方法的時候,最后就會調用finally中的processWorkerExit方法,來將線程退出。
這里有個一個細節就是,因為Worker繼承了AQS,每次在執行任務之前都會調用Worker的lock方法,執行完任務之后,會調用unlock方法,這樣做的目的就可以通過Woker的加鎖狀態就能判斷出當前線程是否正在運行任務。如果想知道線程是否正在運行任務,只需要調用Woker的tryLock方法,根據是否加鎖成功就能判斷,加鎖成功說明當前線程沒有加鎖,也就沒有執行任務了,在調用shutdown方法關閉線程池的時候,就用這種方式來判斷線程有沒有在執行任務,如果沒有的話,來嘗試打斷沒有執行任務的線程。
五、線程是如何獲取任務的以及如何實現超時的
上一節我們說到,線程在執行完任務之后,會繼續從getTask方法中獲取任務,獲取不到就會退出。接下來我們就來看一看getTask方法的實現。
getTask方法
getTask方法,前面就是線程池的一些狀態的判斷,這里有一行代碼
?
?
?
?
這行代碼是判斷,當前過來獲取任務的線程是否可以超時退出。如果allowCoreThreadTimeOut設置為true或者線程池當前的線程數大于核心線程數,也就是corePoolSize,那么該獲取任務的線程就可以超時退出。
那是怎么做到超時退出呢,就是這行核心代碼
?
?
?
?
會根據是否允許超時來選擇調用阻塞隊列workQueue的poll方法或者take方法。如果允許超時,則會調用poll方法,傳入keepAliveTime,也就是構造線程池時傳入的空閑時間,這個方法的意思就是從隊列中阻塞keepAliveTime時間來獲取任務,獲取不到就會返回null;如果不允許超時,就會調用take方法,這個方法會一直阻塞獲取任務,直到從隊列中獲取到任務位置。從這里可以看到keepAliveTime是如何使用的了。
所以到這里應該就知道線程池中的線程為什么可以做到空閑一定時間就退出了吧。其實最主要的是利用了阻塞隊列的poll方法的實現,這個方法可以指定超時時間,一旦線程達到了keepAliveTime還沒有獲取到任務,那么就會返回null,上一小節提到,getTask方法返回null,線程就會退出。
這里也有一個細節,就是判斷當前獲取任務的線程是否可以超時退出的時候,如果將allowCoreThreadTimeOut設置為true,那么所有線程走到這個timed都是true,那么所有的線程,包括核心線程都可以做到超時退出。如果你的線程池需要將核心線程超時退出,那么可以通過allowCoreThreadTimeOut方法將allowCoreThreadTimeOut變量設置為true。
整個getTask方法以及線程超時退出的機制如圖所示
六、線程池的5種狀態
線程池內部有5個常量來代表線程池的五種狀態
RUNNING:線程池創建時就是這個狀態,能夠接收新任務,以及對已添加的任務進行處理。
SHUTDOWN:調用shutdown方法線程池就會轉換成SHUTDOWN狀態,此時線程池不再接收新任務,但能繼續處理已添加的任務到隊列中任務。
STOP:調用shutdownNow方法線程池就會轉換成STOP狀態,不接收新任務,也不能繼續處理已添加的任務到隊列中任務,并且會嘗試中斷正在處理的任務的線程。
TIDYING:SHUTDOWN 狀態下,任務數為 0, 其他所有任務已終止,線程池會變為 TIDYING 狀態。線程池在 SHUTDOWN 狀態,任務隊列為空且執行中任務為空,線程池會變為 TIDYING 狀態。線程池在 STOP 狀態,線程池中執行中任務為空時,線程池會變為 TIDYING 狀態。
TERMINATED:線程池徹底終止。線程池在 TIDYING 狀態執行完 terminated() 方法就會轉變為 TERMINATED 狀態。
線程池狀態具體是存在ctl成員變量中,ctl中不僅存儲了線程池的狀態還存儲了當前線程池中線程數的大小
?
?
?
?
最后畫個圖來總結一下這5種狀態的流轉
其實,在線程池運行過程中,絕大多數操作執行前都得判斷當前線程池處于哪種狀態,再來決定是否繼續執行該操作。
七、線程池的關閉
線程池提供了shutdown和shutdownNow兩個方法來關閉線程池。
shutdown方法
就是將線程池的狀態修改為SHUTDOWN,然后嘗試打斷空閑的線程(如何判斷空閑,上面在說Worker繼承AQS的時候說過),也就是在阻塞等待任務的線程。
shutdownNow方法
就是將線程池的狀態修改為STOP,然后嘗試打斷所有的線程,從阻塞隊列中移除剩余的任務,這也是為什么shutdownNow不能執行剩余任務的原因。
所以也可以看出shutdown方法和shutdownNow方法的主要區別就是,shutdown之后還能處理在隊列中的任務,shutdownNow直接就將任務從隊列中移除,線程池里的線程就不再處理了。
八、線程池的監控
在項目中使用線程池的時候,一般需要對線程池進行監控,方便出問題的時候進行查看。線程池本身提供了一些方法來獲取線程池的運行狀態。
getCompletedTaskCount:已經執行完成的任務數量
getLargestPoolSize:線程池里曾經創建過的最大的線程數量。這個主要是用來判斷線程是否滿過。
getActiveCount:獲取正在執行任務的線程數據
getPoolSize:獲取當前線程池中線程數量的大小
除了線程池提供的上述已經實現的方法,同時線程池也預留了很多擴展方法。比如在runWorker方法里面,在執行任務之前會回調beforeExecute方法,執行任務之后會回調afterExecute方法,而這些方法默認都是空實現,你可以自己繼承ThreadPoolExecutor來擴展重寫這些方法,來實現自己想要的功能。
九、Executors構建線程池以及問題分析
JDK內部提供了Executors這個工具類,來快速的創建線程池。
固定線程數量的線程池:核心線程數與最大線程數相等
單個線程數量的線程池
接近無限大線程數量的線程池
帶定時調度功能的線程池
雖然JDK提供了快速創建線程池的方法,但是其實不推薦使用Executors來創建線程池,因為從上面構造線程池可以看出,newFixedThreadPool線程池,由于使用了LinkedBlockingQueue,隊列的容量默認是無限大,實際使用中出現任務過多時會導致內存溢出;newCachedThreadPool線程池由于核心線程數無限大,當任務過多的時候,會導致創建大量的線程,可能機器負載過高,可能會導致服務宕機。
十、線程池的使用場景
在java程序中,其實經常需要用到多線程來處理一些業務,但是不建議單純使用繼承Thread或者實現Runnable接口的方式來創建線程,那樣就會導致頻繁創建及銷毀線程,同時創建過多的線程也可能引發資源耗盡的風險。所以在這種情況下,使用線程池是一種更合理的選擇,方便管理任務,實現了線程的重復利用。所以線程池一般適合那種需要異步或者多線程處理任務的場景。
十一、實際項目中如何合理的自定義線程池
通過上面分析提到,通過Executors這個工具類來創建的線程池其實都無法滿足實際的使用場景,那么在實際的項目中,到底該如何構造線程池呢,該如何合理的設置參數?
1)線程數
線程數的設置主要取決于業務是IO密集型還是CPU密集型。
CPU密集型指的是任務主要使用來進行大量的計算,沒有什么導致線程阻塞。一般這種場景的線程數設置為CPU核心數+1。
IO密集型:當執行任務需要大量的io,比如磁盤io,網絡io,可能會存在大量的阻塞,所以在IO密集型任務中使用多線程可以大大地加速任務的處理。一般線程數設置為 2*CPU核心數
java中用來獲取CPU核心數的方法是:
?
?
?
?
2)線程工廠
一般建議自定義線程工廠,構建線程的時候設置線程的名稱,這樣就在查日志的時候就方便知道是哪個線程執行的代碼。
3)有界隊列
一般需要設置有界隊列的大小,比如LinkedBlockingQueue在構造的時候就可以傳入參數,來限制隊列中任務數據的大小,這樣就不會因為無限往隊列中扔任務導致系統的oom。
編輯:黃飛
?
boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;
Runnable?r?=?timed??
????????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
????????????????????workQueue.take();
private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
Runtime.getRuntime().availableProcessors();
評論
查看更多