好長(zhǎng)時(shí)間木有寫java,怕忘光了😄,今天抽空翻翻源碼做些總結(jié)??偟膩碚f實(shí)現(xiàn)邏輯還是比較簡(jiǎn)單清晰的。
創(chuàng)新互聯(lián)公司專注于德興企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,成都商城網(wǎng)站開發(fā)。德興網(wǎng)站建設(shè)公司,為德興等地區(qū)提供建站服務(wù)。全流程按需求定制開發(fā),專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)公司專業(yè)和態(tài)度為您提供的服務(wù)實(shí)現(xiàn) 1. 架構(gòu)圖ThreadPoolExecutor 中有維護(hù)了隊(duì)列,和Worker(對(duì)應(yīng)一個(gè)線程)的池子,提交的任務(wù)會(huì)交給Worker執(zhí)行。
2. 線程池屬性corePoolSize
: 核心線程(即開啟了就會(huì)常駐的線程)的數(shù)量
workQueue
: 提交任務(wù)的隊(duì)列(當(dāng)核心池用完就會(huì)先放在隊(duì)列里面)
maximumPoolSize
: 線程池大的size,如果隊(duì)列里放不下,就會(huì)接著起線程運(yùn)行任務(wù)
RejectedExecutionHandler
: 超過線程池大的size,隊(duì)列也放不下就開始執(zhí)行Rejected邏輯
keepAliveTime
: 隊(duì)列消費(fèi)完數(shù)據(jù)后,線程大的keepAlive時(shí)間 ,默認(rèn)allowCoreThreadTimeOut 是false,只會(huì)清理maxpool的線程。
控制
ctl
: 記錄線程池生命周期的狀態(tài)位和運(yùn)行的線程個(gè)數(shù) ,高三位記錄狀態(tài),低位記錄Worker個(gè)數(shù)。通過樂觀鎖的方式控制線程池(查詢線程個(gè)數(shù)的方法workerCountOf和workQueue中有些混淆,前者是運(yùn)行的線程數(shù),后者是提交上來存放任務(wù))
private static final int RUNNING ? ?= -1<< COUNT_BITS; ?//運(yùn)行中 ? private static final int SHUTDOWN ? = ?0<< COUNT_BITS; ?//對(duì)應(yīng)shutdown()方法,隊(duì)列和Worker現(xiàn)有的方法會(huì)繼續(xù)運(yùn)行,清理掉空閑的Worker(是否空閑看能否持有到Woker的鎖),甚至還能添加Worker ? private static final int STOP ? ? ? = ?1<< COUNT_BITS; ?//對(duì)應(yīng)shutdownNow()方法,所有的Worker都會(huì)發(fā)interrupt信號(hào)(不會(huì)保證線程被釋放掉),隊(duì)列也會(huì)清空 ? private static final int TIDYING ? ?= ?2<< COUNT_BITS; ?//當(dāng)所有的任務(wù)到停掉,ctl記錄的任務(wù)數(shù)為0 ? private static final int TERMINATED = ?3<< COUNT_BITS; ?//線程池正式停掉。TiDYING狀態(tài)后執(zhí)行terminated()(空方法)變成TERMINATED狀態(tài)4.提交流程:
?int c = ctl.get(); ? ? ? ?if (workerCountOf(c)< corePoolSize) { ? ? ? ? ? //當(dāng)前Worker數(shù)小于corePoolSize時(shí)候,會(huì)創(chuàng)建個(gè)新的Worker ? ? ? ? ? ?if (addWorker(command, true)) ? ? ? ? ? ? ? ?return; ? ? ? ? ? ?c = ctl.get(); ? ? ? } ? ? ? ?if (isRunning(c) && workQueue.offer(command)) {//corePool的線程已經(jīng)起完了,就會(huì)將其提交到隊(duì)列中,由Worker消費(fèi) ? ? ? ? ? ?int recheck = ctl.get(); ? ? ? ? ? ?if (! isRunning(recheck) && remove(command)) ? ? ? ? ? ? ? ?reject(command); //重新檢查狀態(tài),如果池子如果不是運(yùn)行中,且task未來的及消費(fèi),發(fā)起拒絕策略 ? ? ? ? ? ?else if (workerCountOf(recheck) == 0) // 如果從隊(duì)列中刪除未成功,而且Worker數(shù)等于0 ? ? ? ? ? ? ? ?addWorker(null, false); //加個(gè)Worker把這個(gè)任務(wù)消化掉 ? ? ? } ? ? ? ?else if (!addWorker(command, false)) //如果隊(duì)列也滿了,會(huì)向maximumPool增加新的Worker,去運(yùn)行線程 ? ? ? ? ? ?reject(command); //maximumPool也裝不下觸發(fā)拒絕策略5.創(chuàng)建Worker的邏輯
5.1 Check是否可以添加Worker
根據(jù)上面提交流程
我們知道:
判斷是否需要添加Worker由ctl中保存的Worker數(shù)決定是否addWorkder,但是多個(gè)線程之間會(huì)有競(jìng)爭(zhēng),進(jìn)入addWorker方法時(shí),是會(huì)存在池子已經(jīng)滿了的情況。
這個(gè)時(shí)候會(huì)有個(gè)樂觀鎖的方式,重試檢查線程池狀態(tài)和Worker數(shù)量,保證其他提交線程的性能不會(huì)受影響:
如果池子真的滿了只能通知調(diào)用者,addWorker失敗,進(jìn)行其他策略
如果池子未滿,就可以創(chuàng)建一個(gè)新的Worker
private boolean addWorker(Runnable firstTask, boolean core) {//bool core 表示核心池還是maxium池 ? ? ? ?retry: ? ? ? ?for (;;) { ? ? ? ? ? ? ? ? ? ? ?int c = ctl.get(); ? ? ? ? ? ?int rs = runStateOf(c); ? ? ? ? ? ? ?// Check if queue empty only if necessary. ? ? ? ? ? ?if (rs >= SHUTDOWN && ? ? ? ? ? ? ? ?! (rs == SHUTDOWN && ? ? ? ? ? ? ? ? ? firstTask == null && ? ? ? ? ? ? ? ? ? ! workQueue.isEmpty())) ? ? ? ? ? ? ? ?return false; ? ? ? ? ? ? ?for (;;) { ? ? ? ? ? ? ? ?int wc = workerCountOf(c); ? ? ? ? ? ? ? ?if (wc >= CAPACITY || ? ? ? ? ? ? ? ? ? ?wc >= (core ? corePoolSize : maximumPoolSize)) ? ? ? ? ? ? ? ? ? ?return false; ? ? ? ? ? ? ? ?if (compareAndIncrementWorkerCount(c)) ? ? ? ? ? ? ? ? ? ?break retry; ? ? ? ? ? ? ? ?c = ctl.get(); ?// Re-read ctl ? ? ? ? ? ? ? ?if (runStateOf(c) != rs) ? ? ? ? ? ? ? ? ? ?continue retry; ? ? ? ? ? ? ? ?// else CAS failed due to workerCount change; retry inner loop ? ? ? ? ? } ? ? ? }
5.2 開始添加Worker
這里沒啥好說的,創(chuàng)建個(gè)Worker并放到Woker的池子中,
這里用到互斥鎖,不過鎖的粒度很小,只會(huì)是在加入池子的時(shí)候上鎖
入池子前還是會(huì)檢查池子的狀態(tài)
?boolean workerStarted = false; ? ? ? ?boolean workerAdded = false; ? ? ? ?Worker w = null; ? ? ? ?try { ? ? ? ? ? ?w = new Worker(firstTask); ? ? ? ? ? ?final Thread t = w.thread; ? ? ? ? ? ?if (t != null) { ? ? ? ? ? ? ? ?final ReentrantLock mainLock = this.mainLock; ? ? ? ? ? ? ? ?mainLock.lock(); ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?// Recheck while holding lock. ? ? ? ? ? ? ? ? ? ?// Back out on ThreadFactory failure or if ? ? ? ? ? ? ? ? ? ?// shut down before lock acquired. ? ? ? ? ? ? ? ? ? ?int rs = runStateOf(ctl.get()); ? ? ? ? ? ? ? ? ? ? ?if (rs< SHUTDOWN || ? ? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) { ? ? ? ? ? ? ? ? ? ? ? ?if (t.isAlive()) // precheck that t is startable ? ? ? ? ? ? ? ? ? ? ? ? ? ?throw new IllegalThreadStateException(); ? ? ? ? ? ? ? ? ? ? ? ?workers.add(w); ? ? ? ? ? ? ? ? ? ? ? ?int s = workers.size(); ? ? ? ? ? ? ? ? ? ? ? ?if (s >largestPoolSize) ? ? ? ? ? ? ? ? ? ? ? ? ? ?largestPoolSize = s; ? ? ? ? ? ? ? ? ? ? ? ?workerAdded = true; ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? } finally { ? ? ? ? ? ? ? ? ? ?mainLock.unlock(); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?if (workerAdded) { ? ? ? ? ? ? ? ? ? ?t.start(); ? ? ? ? ? ? ? ? ? ?workerStarted = true; ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? } finally { ? ? ? ? ? ?if (! workerStarted) ? ? ? ? ? ? ? ?addWorkerFailed(w); ? ? ? }6. Woker 介紹
Worker 對(duì)應(yīng)線程池里的一個(gè)工作線程
持續(xù)從隊(duì)列里獲取任務(wù)執(zhí)行
boolean timed = allowCoreThreadTimeOut || wc >corePoolSize; ? ? ? ? ? ? ?if ((wc >maximumPoolSize || (timed && timedOut)) ? ? ? ? ? ? ? ?&& (wc >1 || workQueue.isEmpty())) { ? ? ? ? ? ? ? ?if (compareAndDecrementWorkerCount(c)) ? ? ? ? ? ? ? ? ? ?return null; ? ? ? ? ? ? ? ?continue; ? ? ? ? ? } ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?Runnable r = timed ? ? ? ? ? ? ? ? ? ? ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : ? ? ? ? ? ? ? ? ? ?workQueue.take(); ? ? ? ? ? ? ? ?if (r != null) ? ? ? ? ? ? ? ? ? ?return r; ? ? ? ? ? ? ? ?timedOut = true; ? ? ? ? ? } catch (InterruptedException retry) { ? ? ? ? ? ? ? ?timedOut = false; ? ? ? ? ? }7.Worker生命周期
創(chuàng)建: 在上文有介紹
會(huì)根據(jù)線程池的的狀態(tài)自行結(jié)束線程
if ((runStateAtLeast(ctl.get(), STOP) || ? ? ? ? ? ? ? ? ? ? (Thread.interrupted() && ? ? ? ? ? ? ? ? ? ? ?runStateAtLeast(ctl.get(), STOP))) && ? ? ? ? ? ? ? ? ? ?!wt.isInterrupted()) ? ? ? ? ? ? ? ? ? ?wt.interrupt(); ? ? ? ? ? ? ? ? ? ?
如果是異常退出,會(huì)退出后會(huì)重新起一個(gè)新的woker(注意這個(gè)一般代碼的邏輯異常不會(huì)導(dǎo)致Worker異常,會(huì)被封裝到submit方法返回的future中,jdk留了空方法可以監(jiān)聽到這種異常)
?int c = ctl.get(); ? ? ? ?if (runStateLessThan(c, STOP)) { ? ? ? ? ? ?if (!completedAbruptly) { ? ? ? ? ? ? ? ?int min = allowCoreThreadTimeOut ? 0 : corePoolSize; ? ? ? ? ? ? ? ?if (min == 0 && ! workQueue.isEmpty()) ? ? ? ? ? ? ? ? ? ?min = 1; ? ? ? ? ? ? ? ?if (workerCountOf(c) >= min) ? ? ? ? ? ? ? ? ? ?return; // replacement not needed ? ? ? ? ? } ?// ? ? ? completedAbruptly = true 會(huì)被任務(wù)是異常退出 ? ? ? ? ? ?addWorker(null, false); ? ? ? }
maxium池子的空閑Worker會(huì)被釋放
// Are workers subject to culling? ? ? ? ? ? ?boolean timed = allowCoreThreadTimeOut || wc >corePoolSize; ? ? ? ? ? ? ?if ((wc >maximumPoolSize || (timed && timedOut)) ? ? ? ? ? ? ? ?&& (wc >1 || workQueue.isEmpty())) { //判斷超時(shí) ? ? ? ? ? ? ? ?if (compareAndDecrementWorkerCount(c)) ? ? ? ? ? ? ? ? ? ?return null; ? ? ? ? ? ? ? ?continue; ? ? ? ? ? } ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?Runnable r = timed ? ? ? ? ? ? ? ? ? ? ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : ? ? ? ? ? ? ? ? ? ?workQueue.take(); ? ? ? ? ? ? ? ?if (r != null) ? ? ? ? ? ? ? ? ? ?return r; ? ? ? ? ? ? ? ?timedOut = true; ? ? ? ? ? } catch (InterruptedException retry) { ? ? ? ? ? ? ? ?timedOut = false; ? ? ? ? ? }
被主動(dòng)釋放(看shutdownNow()方法)
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
Q1: 在創(chuàng)建線程池時(shí),在ThreadFactory接口中指定setUncaughtExceptionHandler
之后,如果線程池中的任務(wù)拋異常,是否會(huì)被UncaughtExceptionHandler捕獲到
不會(huì),在線程池的實(shí)現(xiàn)中,Worker的會(huì)一直持有申請(qǐng)到的線程,不會(huì)處理提交任務(wù)的異常,異??梢栽趕ubmit 方法返回的future上獲取到。
Q2:showdownNow()執(zhí)行之后線程就一定都執(zhí)行完嘛
只是發(fā)interrupt 信號(hào),看上文介紹只有TYDING ,TERMINATED狀態(tài),所有任務(wù)才會(huì)執(zhí)行完。一定要保持代碼的健壯性,控制業(yè)務(wù)代碼的生命周期。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購,新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
當(dāng)前題目:JDK學(xué)習(xí)筆記-線程池的實(shí)現(xiàn)-創(chuàng)新互聯(lián)
網(wǎng)頁網(wǎng)址:http://aaarwkj.com/article34/jeese.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開發(fā)、建站公司、用戶體驗(yàn)、網(wǎng)站導(dǎo)航、網(wǎng)站營(yíng)銷、微信小程序
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容