CountDownLatch 位于并發(fā)包下,利用它可以完成類似于計數(shù)器的功能,如果線程 A 需要等待其他 n 個線程執(zhí)行完畢后才能執(zhí)行,此時就可以利用 CountDownLatch 來實現(xiàn)這個功能,CountDownLatch 是通過一個計數(shù)器來實現(xiàn)的,計數(shù)器的初始值為線程數(shù)量,每當(dāng)一個線程完成了自己的任務(wù)后,計數(shù)器的值就會減1,當(dāng)計數(shù)器的值為0時,表示所有線程已經(jīng)執(zhí)行完畢,等待線程就可以恢復(fù)。
創(chuàng)新互聯(lián)專注于遼中企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,商城網(wǎng)站建設(shè)。遼中網(wǎng)站建設(shè)公司,為遼中等地區(qū)提供建站服務(wù)。全流程按需求定制制作,專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
package com.kernel;
import java.util.concurrent.CountDownLatch;
public class Test001 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ",子線程開始執(zhí)行...");
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + ",子線程結(jié)束執(zhí)行...");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ",子線程開始執(zhí)行...");
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + ",子線程結(jié)束執(zhí)行...");
}
}).start();
countDownLatch.await();
System.out.println("其他線程執(zhí)行完畢");
System.out.println("等待線程執(zhí)行。。。");
}
}
CyclicBarrier 在初始化時會傳入一個數(shù)量,它會記錄調(diào)用了 await 方法的線程數(shù),只有這個線程數(shù)和創(chuàng)建該對象時提供的數(shù)量相同時,所有進(jìn)入線程等待的線程才會被重新喚醒繼續(xù)執(zhí)行。
顧名思義,它就像一個屏幕,人來全了才能一塊兒過屏障。
CyclicBarrier 還可以提供一個可以傳入 Runable 對象的構(gòu)造,該線程將在一起通過屏障后所有線程喚醒之前被喚醒
package com.kernel;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Write extends Thread {
private CyclicBarrier cyclicBarrier;
public Write(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程" + Thread.currentThread().getName() + ",正在寫入數(shù)據(jù)");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() + ",寫入數(shù)據(jù)成功.....");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有線程執(zhí)行完畢");
}
}
public class Test002 {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
Write write = new Write(cyclicBarrier);
write.start();
}
}
}
Semaphore 是一種基于計數(shù)的信號量,創(chuàng)建時可以指定一個值,這個值規(guī)定了有多少個線程并發(fā)執(zhí)行,執(zhí)行前申請,執(zhí)行完畢后歸還,超過那個值后,線程申請信號將會被阻塞,知道有其他占有信號的線程執(zhí)行完成歸還信號
Semaphore 可以用來構(gòu)建一些對象池,資源池之類的,比如數(shù)據(jù)庫連接池
我們也可以創(chuàng)建計數(shù)為1的 Semaphore,將其作為一種類似互斥鎖的機(jī)制,這也叫二元信號量,表示兩種互斥狀態(tài)
package com.kernel;
import java.util.Random;
import java.util.concurrent.Semaphore;
public class Test003 extends Thread {
private String name;
private Semaphore windows;
public Test003(String name, Semaphore windows) {
this.name = name;
this.windows = windows;
}
@Override
public void run() {
int availablePermits = windows.availablePermits();
if (availablePermits > 0) {
System.out.println(name + ":終于輪到我了");
} else {
System.out.println(name + ":**,能不能快點(diǎn)");
}
try {
windows.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":我要XXX,剩下窗口");
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":我買完了");
windows.release();
}
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
Test003 test003 = new Test003("第" + i + "個人", semaphore);
test003.start();
}
}
}
是一個×××隊列,性能高于阻塞式隊列,是一個基于鏈表實現(xiàn)的線程安全隊列,不允許 null
add 和 offer 是加入元素的方法,兩者之間沒有區(qū)別
poll 從隊列中取出并刪除元素
peek 查看隊列頭元素,不刪除
是一個有界隊列,阻塞隊列常用語生產(chǎn)消費(fèi)者場景
以下情況會產(chǎn)生阻塞:
隊列元素滿,還往里面存元素
隊列元素空,還想從隊列中拿元素
基于阻塞隊列的生產(chǎn)者消費(fèi)者模型
package com.kernel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class ProducerThread implements Runnable {
private BlockingQueue<String> blockingQueue;
private AtomicInteger atomicInteger = new AtomicInteger();
private volatile boolean flag = true;
public ProducerThread(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println("###生產(chǎn)者線程已經(jīng)啟動###");
while (flag) {
String data = atomicInteger.incrementAndGet() + "";
try {
boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (offer) {
System.out.println(Thread.currentThread().getName() + ", 生產(chǎn)隊列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + ", 生產(chǎn)隊列" + data + "失敗");
}
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生產(chǎn)者線程結(jié)束");
}
public void stop() {
this.flag = false;
}
}
class CustomerThread implements Runnable {
private BlockingQueue<String> blockingQueue;
private volatile boolean flag = true;
public CustomerThread(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println("###消費(fèi)者線程已經(jīng)啟動###");
while (flag) {
String data = null;
try {
data = blockingQueue.poll(2, TimeUnit.SECONDS);
if (data == null) {
flag = false;
System.out.println("消費(fèi)者超過2秒時間未獲取到消息.");
return;
}
System.out.println("消費(fèi)者獲取到隊列信息成功,data:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消費(fèi)者進(jìn)程結(jié)束");
}
}
public class Test006 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
ProducerThread producerThread = new ProducerThread(blockingQueue);
CustomerThread customerThread = new CustomerThread(blockingQueue);
new Thread(producerThread).start();
new Thread(customerThread).start();
Thread.sleep(1000 * 10);
producerThread.stop();
}
}
線程池其實就是一個可以容納線程的容器,其中的線程可以反復(fù)利用,節(jié)省了反復(fù)創(chuàng)建、銷毀線程的消耗
線程池有什么優(yōu)點(diǎn)?
降低資源消耗:重復(fù)利用已經(jīng)創(chuàng)建好的線程而節(jié)約反復(fù)創(chuàng)建、銷毀線程的消耗
提高響應(yīng)速度:眾所周期,創(chuàng)建線程不是立馬可以使用的,創(chuàng)建好線程之后進(jìn)入就緒狀態(tài),需要經(jīng)過 CPU 的調(diào)度才能進(jìn)入運(yùn)行狀態(tài),而利用線程池,只要任務(wù)來到,線程池有空閑線程,就可以立即作業(yè)
提高線程管理性:線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還可以降低系統(tǒng)穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控
線程池的分類:
newCachedThreadPool
創(chuàng)建一個可緩存線程池,可反復(fù)回收利用,若任務(wù)數(shù)大于當(dāng)然線程數(shù),則繼續(xù)創(chuàng)建線程
public class Test008 {
public static void main(String[] args) {
// 可緩存線程池(可重復(fù)利用)無限大
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
}
}
newFixedThreadPool
創(chuàng)建一個定長線程,超出線程存放在隊列中
public class Test009 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
}
}
newScheduledThreadPool
創(chuàng)建一個定長并支持定時及周期性任務(wù)執(zhí)行的線程池
public class Test010 {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 10; i++) {
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
// 表示延遲3秒執(zhí)行
},3, TimeUnit.SECONDS);
}
}
}
newSingleThreadExecutor
創(chuàng)建一個單線程線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO、LIFO、優(yōu)先級)執(zhí)行
public class Test011 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
executorService.shutdown();
}
}
提交一個任務(wù)到線程池中去,首先判斷當(dāng)前線程數(shù)是都小于 corePoolSize,如果小于 corePoolSize,則創(chuàng)建一個新線程來執(zhí)行任務(wù)
如果當(dāng)前線程數(shù)等于corePoolSize,再來任務(wù)的話就會將任務(wù)添加到緩存隊列中
如果緩存隊列已滿,在判斷當(dāng)前線程是否小于 maximumPoolSize
如果小于 maximumPoolSize,創(chuàng)建線程執(zhí)行任務(wù),否則,采取任務(wù)拒絕策略進(jìn)行處理
如果當(dāng)前線程數(shù)大于 corePoolSize,并且某線程空閑時間大于 keepAliveTime,線程被終止,直到線程池中的線程數(shù)目不大于corePoolSize
如果允許為核心池中的線程設(shè)置存活時間,那么核心池中的線程空閑時間超過 keepAliveTime,線程也會被終止
IO 密集型:即該任務(wù)需要大量的IO,即大量的阻塞,在單線程上運(yùn)行IO密集型的任務(wù)會導(dǎo)致浪費(fèi)大量的 CPU 運(yùn)算能力浪費(fèi)在等待,所以在 IO 密集型任務(wù)中使用多線程可以大大的加速程序運(yùn)行,即時在單核CPU上,這種加速主要就是利用了被浪費(fèi)掉的阻塞時間,一般 IO 密集型任務(wù)線程設(shè)置為 2*核心數(shù)+1
CPU 密集型:該任務(wù)需要進(jìn)行大量計算,沒有 IO 阻塞,CPU 一直在全速運(yùn)行,CPU 密集任務(wù)只有在真正的多核CPU上才可能得到加速(通過多線程),而在單核CPU上,無論你開幾個模擬的多線程,該任務(wù)都不可能得到加速,因為CPU總的運(yùn)算能力就那些,一般 CPU 密集型任務(wù)線程設(shè)置為 核心數(shù)+1
Java中,創(chuàng)建線程一般有兩種方式,就是繼承 Thread 或者實現(xiàn) Runable 接口,這兩種方式的缺點(diǎn)是在線程任務(wù)執(zhí)行完畢后,無法獲得執(zhí)行結(jié)果,所以一般使用共享變量或者共享存儲區(qū)以及線程通信的方式獲得結(jié)果,Java 中也提供了使用 Callable 和 Future 來實現(xiàn)獲取任務(wù)結(jié)果的操作,Callable 用來執(zhí)行任務(wù),產(chǎn)生結(jié)果,而 Future 用來獲得結(jié)果
Future 常用方法
V get() 獲取異步執(zhí)行的結(jié)果,如果沒有結(jié)果可用,此方法會阻塞直到異步計算完成
V get(Long timeout, Timeunit unit) 獲取異步執(zhí)行結(jié)果,如果沒有結(jié)果可用,此方法會阻塞,但是會有時間限制,如果阻塞時間超過設(shè)定的timeout時間,該方法將拋出異常
boolean isDone() 如果任務(wù)執(zhí)行結(jié)束,無論是正常結(jié)束或是中途取消還是發(fā)生異常,都返回true
boolean isCanceller() 如果任務(wù)完成前被取消,則返回true
boolean cancel(boolean mayInterruptIfRunning) 如果任務(wù)還沒開始就執(zhí)行該方法將返回 false,如果任務(wù)執(zhí)行過程中調(diào)用 cancel(true) 將以中斷執(zhí)行任務(wù)的方式試圖停止任務(wù),如果停止成功,返回 true,如果執(zhí)行 cancel(false) 不會對執(zhí)行的任務(wù)產(chǎn)生影響,此時返回 false,當(dāng)任務(wù)完成后調(diào)用該方法將返回 false,參數(shù)表示是否中斷執(zhí)行進(jìn)程
Future 模式
去除主函數(shù)的等待時間,使得原本需要等待的時間可以處理其他業(yè)務(wù),對于多線程,如果線程 A 要等待線程 B 的結(jié)果,那么線程 A 沒必要等待 B,直到 B 有結(jié)果,可以先拿到一個未來的 Future,等 B 有結(jié)果是再取真實的結(jié)果
模擬 Future
Data
public interface Data {
// 獲取子線程執(zhí)行結(jié)果
public String getRequest() throws InterruptedException;
}
FutureData
public class FutureData implements Data {
private boolean flag = false;
private RealData realData;
public synchronized void setRealData(RealData realData) {
if (flag)
return;
this.realData = realData;
flag = true;
// 喚醒
notify();
}
@Override
public synchronized String getRequest() throws InterruptedException {
while (!flag) {
// 等待
wait();
}
// 返回結(jié)果
return realData.getRequest();
}
}
RealData
public class RealData implements Data {
private String result;
public RealData(String data) throws InterruptedException {
System.out.println("正在下載" + data);
Thread.sleep(5000);
System.out.println("下載完畢!");
result = "author:kernel";
}
@Override
public String getRequest() {
return result;
}
}
FutureClient
public class FutureClient {
public Data submit(String requestData) {
FutureData futureData = new FutureData();
new Thread(new Runnable() {
@Override
public void run() {
RealData realData = null;
try {
realData = new RealData(requestData);
futureData.setRealData(realData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
return futureData;
}
}
測試
public class Test002 {
public static void main(String[] args) throws InterruptedException {
FutureClient futureClient = new FutureClient();
Data request = futureClient.submit("請求參數(shù)");
System.out.println("請求發(fā)送成功");
// 主線程該干嘛干嘛去
System.out.println("執(zhí)行其他任務(wù)");
// 獲取其他線程的結(jié)果
String result = request.getRequest();
System.out.println("獲取到結(jié)果" + result);
}
}
執(zhí)行流程:
首先創(chuàng)建一個向 FutureClient 發(fā)送請求,然后 realData 執(zhí)行下載任務(wù),將結(jié)果封裝起來,然后獲取結(jié)果函數(shù)時刻監(jiān)測線程是否拿到結(jié)果,如果拿到了,就返回,如果沒有拿到,就一直阻塞,設(shè)置結(jié)果的函數(shù)拿到結(jié)果會立即喚醒返回結(jié)果的函數(shù)
重入鎖,也叫遞歸鎖,指的是同一線程外層函數(shù)獲得鎖之后,內(nèi)存函數(shù)仍有獲取該鎖的代碼,但不受影響
ReentrantLock(顯式鎖、輕量級鎖)和 Synchronized(內(nèi)置鎖、重量級鎖)都是可重入鎖
程序中涉及一些對共享變量的讀寫操作時,在沒有寫操作時,多個線程同時讀取是沒有任何問題的,如果有一個線程正在進(jìn)行寫操作,其他線程就不應(yīng)該對其進(jìn)行讀或?qū)懖僮髁?/p>
讀寫不同共存,讀讀可以共存,寫寫不能共存
public class Test001 {
private volatile Map<String, Object> cache = new HashMap<String, Object>();
private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private ReadLock readLock = reentrantReadWriteLock.readLock();
private WriteLock writeLock = reentrantReadWriteLock.writeLock();
public void put(String key, String value) {
try {
writeLock.lock();
System.out.println("寫入put方法key:" + key + ",value" + value + ",開始");
Thread.sleep(1000);
cache.put(key, value);
System.out.println("寫入put方法key:" + key + ",value" + value + ",結(jié)束");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
public String get(String key) {
try {
readLock.lock();
System.out.println("讀取key:"+ key + ",開始");
Thread.sleep(1000);
String value = (String) cache.get(key);
System.out.println("讀取key:"+ key + ",結(jié)束");
return value;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
} finally {
readLock.unlock();
}
}
public static void main(String[] args) {
Test001 test001 = new Test001();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
test001.put("i", i + "");
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
test001.get(i + "");
}
}
});
t2.start();
}
}
悲觀鎖悲觀的認(rèn)為每一次操作都會造成更新丟失問題,在每次查詢時加上排他鎖,每次去拿數(shù)據(jù)的時候都認(rèn)為別人會修改,所以每次在拿數(shù)據(jù)的時候都會上鎖,這樣別人想拿這個數(shù)據(jù)就會阻塞直到它拿到鎖,傳統(tǒng)的關(guān)系型數(shù)據(jù)庫里邊就用到了很多這種鎖機(jī)制,比如行鎖,表鎖等,讀鎖,寫鎖等,都是在做操作之前先上鎖
樂觀鎖會樂觀的認(rèn)為每次查詢都不會造成更新丟失,利用版本字段控制
首先通過條件查詢出版本號,然后更新的時候判斷當(dāng)前版本號是否和之前版本號一致,如果一致,證明沒人修改,直接更新,否則,重新查詢以便在進(jìn)行更新
自旋鎖是采用讓當(dāng)前線程不停地的在循環(huán)體內(nèi)執(zhí)行實現(xiàn)的,當(dāng)循環(huán)的條件被其他線程改變時才能進(jìn)入臨界區(qū)
公平鎖:新進(jìn)程發(fā)出請求,如果此時一個線程正持有鎖,或有其他線程正在等待隊列中等待這個鎖,那么新的線程將被放入到隊列中被掛起
非公平鎖:新進(jìn)程發(fā)出請求,如果此時一個線程正持有鎖,新的線程將被放入到隊列中被掛起,但如果發(fā)出請求的同時該鎖變成可用狀態(tài),那么這個線程會跳過隊列中所有的等待線程而獲得鎖
與鎖相比,使用 CAS 無鎖機(jī)制會使程序看起來復(fù)雜一些,但由于其非阻塞性,是不會發(fā)生死鎖現(xiàn)象的,而且線程間相互影響也比鎖要小的多,使用 CAS 完全沒有鎖之間競爭帶來的系統(tǒng)開銷,也沒有線程間頻繁調(diào)用帶來的開銷
CAS 原理
CAS 包括三個參數(shù),分別是 V(表示要更新的變量、主內(nèi)存的值)、E(表示預(yù)期值、本地內(nèi)存的值)、N(新值),如果 V = E,那么將 V 的值設(shè)置成 N,如果 V != E,說明其他線程修改了,則當(dāng)前線程什么都不做,最后,CAS返回當(dāng)前V的真實值
CAS 和樂觀鎖很相似,都是抱著樂觀的心態(tài)去處理,多個線程同時使用 CAS 操作,只有一個能勝出,其他都失敗,失敗的線程不會掛起,僅告知失敗,并且允許重新嘗試,當(dāng)然也允許放棄嘗試,基于這樣的原理,CAS操作即使沒有鎖,也可以發(fā)現(xiàn)其他線程對當(dāng)前線程的干擾,并進(jìn)行恰當(dāng)?shù)奶幚?/p>
簡單地說,CAS需要你額外給出一個期望值,也就是你認(rèn)為這個變量現(xiàn)在應(yīng)該是什么樣子的,如果變量不是你想象的那樣,那說明它已經(jīng)被別人修改過了。你就重新讀取,再次嘗試修改就好了
優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
在高并發(fā)的情況下,它比有鎖的程序擁有更好的性能
死鎖免疫
缺點(diǎn):
CAS存在一個很明顯的問題,即 ABA 問題
果在這段期間曾經(jīng)被改成 B,然后又改回 A,那 CAS 操作就會誤認(rèn)為它從來沒有被修改過,針對這種情況,并發(fā)包中提供了一個帶有標(biāo)記的原子引用類 AtomicStampedReference,它可以通過控制變量值的版本來保證 CAS 的正確性
Java 中的原子操作類大致可以分為4類:原子更新基本類型、原子更新數(shù)組類型、原子更新引用類型、原子更新屬性類型,這些原子類中都是用了無鎖的概念,有的地方直接使用CAS操作的線程安全的類型
AtomicBoolean
AtomicInteger
AtomicLong
AtomicReference
。。。
網(wǎng)站標(biāo)題:線程池原理分析
文章分享:http://aaarwkj.com/article18/iggsdp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供虛擬主機(jī)、網(wǎng)站導(dǎo)航、靜態(tài)網(wǎng)站、軟件開發(fā)、App設(shè)計、網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)