這篇文章主要講解了“如何使用AQS共享鎖,Semaphore、CountDownLatch”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“如何使用AQS共享鎖,Semaphore、CountDownLatch”吧!
徐州網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站開發(fā)等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)從2013年開始到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)。
AQS(AbstractQueuedSynchronizer),是 Java 并發(fā)包中非常重要的一個(gè)類,大部分鎖的實(shí)現(xiàn)也是基于 AQS 實(shí)現(xiàn)的,包括:
ReentrantLock
,可重入鎖。這個(gè)是我們最開始介紹的鎖,也是最常用的鎖。通常會(huì)與 synchronized 做比較使用。ReentrantReadWriteLock
,讀寫鎖。讀鎖是共享鎖、寫鎖是獨(dú)占鎖。Semaphore
,信號(hào)量鎖。主要用于控制流量,比如:數(shù)據(jù)庫連接池給你分配10個(gè)鏈接,那么讓你來一個(gè)連一個(gè),連到10個(gè)還沒有人釋放,那你就等等。CountDownLatch
,閉鎖。Latch 門閂的意思,比如:說四個(gè)人一個(gè)漂流艇,坐滿了就推下水。這一章節(jié)我們主要來介紹 Semaphore ,信號(hào)量鎖的實(shí)現(xiàn),其實(shí)也就是介紹一個(gè)關(guān)于共享鎖的使用和源碼分析。
Semaphore semaphore = new Semaphore(2, false); // 構(gòu)造函數(shù)入?yún)?,permits:信號(hào)量、fair:公平鎖/非公平鎖
for (int i = 0; i < 8; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "蹲坑");
Thread.sleep(1000L);
} catch (InterruptedException ignore) {
} finally {
semaphore.release();
}
}, "蹲坑編號(hào):" + i).start();
}
這里我們模擬了一個(gè)在高速服務(wù)區(qū),廁所排隊(duì)蹲坑的場(chǎng)景。由于坑位有限,為了避免造成擁擠和踩踏,保安人員在門口攔著,感覺差不多,一次釋放兩個(gè)進(jìn)去,一直到都釋放。你也可以想成早上坐地鐵上班,或者旺季去公園,都是一批一批的放行
「測(cè)試結(jié)果」
蹲坑編號(hào):0蹲坑
蹲坑編號(hào):1蹲坑
蹲坑編號(hào):2蹲坑
蹲坑編號(hào):3蹲坑
蹲坑編號(hào):4蹲坑
蹲坑編號(hào):5蹲坑
蹲坑編號(hào):6蹲坑
蹲坑編號(hào):7蹲坑
Process finished with exit code 0
0坑、1坑
,
之后2坑、3坑
...,每次都是這樣兩個(gè),兩個(gè)的釋放。這就是 Semaphore 信號(hào)量鎖的作用。 public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
permits:n. 許可證,特許證(尤指限期的)
默認(rèn)情況下只需要傳入 permits 許可證數(shù)量即可,也就是一次允許放行幾個(gè)線程。構(gòu)造函數(shù)會(huì)創(chuàng)建非公平鎖。如果你需要使用 Semaphore 共享鎖中的公平鎖,那么可以傳入第二個(gè)構(gòu)造函數(shù)的參數(shù) fair = false/true。true:FairSync,公平鎖。在我們前面的章節(jié)已經(jīng)介紹了公平鎖相關(guān)內(nèi)容和實(shí)現(xiàn),以及CLH、MCS 《公平鎖介紹》
「初始許可證
數(shù)量」
FairSync/NonfairSync(int permits) {
super(permits);
}
Sync(int permits) {
setState(permits);
}
protected final void setState(int newState) {
state = newState;
}
在構(gòu)造函數(shù)初始化的時(shí)候,無論是公平鎖還是非公平鎖,都會(huì)設(shè)置 AQS 中 state 數(shù)量值。這個(gè)值也就是為了下文中可以獲取的信號(hào)量扣減和增加的值。
方法 | 描述 |
---|---|
semaphore.acquire() | 一次獲取一個(gè)信號(hào)量,響應(yīng)中斷 |
semaphore.acquire(2) | 一次獲取n個(gè)信號(hào)量,響應(yīng)中斷(一次占2個(gè)坑) |
semaphore.acquireUninterruptibly() | 一次獲取一個(gè)信號(hào)量,不響應(yīng)中斷 |
semaphore.acquireUninterruptibly(2) | 一次獲取n個(gè)信號(hào)量,不響應(yīng)中斷 |
semaphore.acquire()
,源碼中實(shí)際調(diào)用的方法是,
sync.acquireSharedInterruptibly(1)
。也就是相應(yīng)中斷,一次只占一個(gè)坑。semaphore.acquire(2)
,同理這個(gè)就是一次要占兩個(gè)名額,也就是許可證。
生活中的場(chǎng)景就是我給我朋友排的對(duì),她來了,進(jìn)來吧。 方法 | 描述 |
---|---|
semaphore.release() | 一次釋放一個(gè)信號(hào)量 |
semaphore.release(2) | 一次獲取n個(gè)信號(hào)量 |
有獲取就得有釋放,獲取了幾個(gè)信號(hào)量就要釋放幾個(gè)信號(hào)量。當(dāng)然你可以嘗試一下,獲取信號(hào)量 semaphore.acquire(2) 兩個(gè),釋放信號(hào)量 semaphore.release(1),看看運(yùn)行效果
「信號(hào)量獲取過程」,一直到公平鎖實(shí)現(xiàn)。semaphore.acquire
-> sync.acquireSharedInterruptibly(permits)
-> tryAcquireShared(arg)
semaphore.acquire(1);
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
「FairSync.tryAcquireShared」
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
hasQueuedPredecessors
,公平鎖的主要實(shí)現(xiàn)邏輯都在于這個(gè)方法的使用。它的目的就是判斷有線程排在自己前面沒,以及把線程添加到隊(duì)列中的邏輯實(shí)現(xiàn)。
在前面我們介紹過CLH等實(shí)現(xiàn),可以往前一章節(jié)閱讀for (;;)
,是一個(gè)自旋的過程,通過 CAS 來設(shè)置 state 偏移量對(duì)應(yīng)值。這樣就可以避免多線程下競(jìng)爭(zhēng)獲取信號(hào)量沖突。getState()
,在構(gòu)造函數(shù)中已經(jīng)初始化 state 值,在這里獲取信號(hào)量時(shí)就是使用 CAS 不斷的扣減。「NonfairSync.nonfairTryAcquireShared」
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
if (hasQueuedPredecessors())
的判斷操作。在公平鎖和非公平鎖的實(shí)現(xiàn)中,我們已經(jīng)看到正常獲取信號(hào)量的邏輯。那么如果此時(shí)不能正常獲取信號(hào)量呢?其實(shí)這部分線程就需要加入到同步隊(duì)列。
「doAcquireSharedInterruptibly」
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireSharedInterruptibly
方法來自 AQS 的內(nèi)部方法,與我們?cè)趯W(xué)習(xí)競(jìng)爭(zhēng)鎖時(shí)有部分知識(shí)點(diǎn)相同,但也有一些差異。比如:
addWaiter(Node.SHARED)
,
tryAcquireShared
,我們主要介紹下這內(nèi)容。Node.SHARED
,其實(shí)沒有特殊含義,它只是一個(gè)標(biāo)記作用,用于判斷是否共享。
final boolean isShared() { return nextWaiter == SHARED; }
tryAcquireShared
,主要是來自
Semaphore
共享鎖中公平鎖和非公平鎖的實(shí)現(xiàn)。用來獲取同步狀態(tài)。setHeadAndPropagate(node, r)
,如果r > 0,同步成功后則將當(dāng)前線程結(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),同時(shí) helpGC,p.next = null,斷鏈操作。shouldParkAfterFailedAcquire(p, node)
,調(diào)整同步隊(duì)列中 node 結(jié)點(diǎn)的狀態(tài),并判斷是否應(yīng)該被掛起。這在我們之前關(guān)于鎖的文章中已經(jīng)介紹。parkAndCheckInterrupt()
,判斷是否需要被中斷,如果中斷直接拋出異常,當(dāng)前結(jié)點(diǎn)請(qǐng)求也就結(jié)束。cancelAcquire(node)
,取消該節(jié)點(diǎn)的線程請(qǐng)求。CountDownLatch 也是共享鎖的一種類型,之所以在這里體現(xiàn)下,是因?yàn)樗?Semaphore 共享鎖,既相似有不同。
CountDownLatch 更多體現(xiàn)的組團(tuán)一波的思想,同樣是控制人數(shù),但是需要夠一窩。比如:我們說過的4個(gè)人一起上皮劃艇、兩個(gè)人一起上蹺蹺板、2個(gè)人一起蹲坑我沒見過,這樣的方式就是門閂 CountDownLatch 鎖的思想。
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
exec.execute(() -> {
try {
int millis = new Random().nextInt(10000);
System.out.println("等待游客上船,耗時(shí):" + millis + "(millis)");
Thread.sleep(millis);
} catch (Exception ignore) {
} finally {
latch.countDown(); // 完事一個(gè)扣減一個(gè)名額
}
});
}
// 等待游客
latch.await();
System.out.println("船長(zhǎng)急躁了,開船!");
// 關(guān)閉線程池
exec.shutdown();
}
latch.countDown()
latch.await()
急躁了
「測(cè)試結(jié)果」
等待游客上船,耗時(shí):6689(millis)
等待游客上船,耗時(shí):2303(millis)
等待游客上船,耗時(shí):8208(millis)
等待游客上船,耗時(shí):435(millis)
等待游客上船,耗時(shí):9489(millis)
等待游客上船,耗時(shí):4937(millis)
等待游客上船,耗時(shí):2771(millis)
等待游客上船,耗時(shí):4823(millis)
等待游客上船,耗時(shí):1989(millis)
等待游客上船,耗時(shí):8506(millis)
船長(zhǎng)急躁了,開船!
Process finished with exit code 0
船長(zhǎng)急躁了,開船!
,會(huì)需要等待一段時(shí)間。感謝各位的閱讀,以上就是“如何使用AQS共享鎖,Semaphore、CountDownLatch”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)如何使用AQS共享鎖,Semaphore、CountDownLatch這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
本文標(biāo)題:如何使用AQS共享鎖,Semaphore、CountDownLatch
當(dāng)前URL:http://aaarwkj.com/article16/isjhdg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供云服務(wù)器、軟件開發(fā)、外貿(mào)建站、、電子商務(wù)、品牌網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)