在前一篇文章從Reactive編程到“好萊塢”中,談到了響應(yīng)式的一些概念,講的有些發(fā)散。 但僅僅還是停留在概念的層面,對于實戰(zhàn)性的東西并沒有涉及。
所以大家看了后,或許還是有些不痛不癢。
創(chuàng)新互聯(lián)建站專注于西安網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供西安營銷型網(wǎng)站建設(shè),西安網(wǎng)站制作、西安網(wǎng)頁設(shè)計、西安網(wǎng)站官網(wǎng)定制、小程序定制開發(fā)服務(wù),打造西安網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供西安網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。
響應(yīng)式編程強(qiáng)調(diào)的是異步化、面向流的處理方式,這兩者也并非憑空生出,而是從大量的技術(shù)實踐中總結(jié)提煉出來的概念,就比如:
我們談異步化,容易聯(lián)想到 Java 異步IO(Asynchronized IO),而且習(xí)慣于將其和 BIO、NIO等概念來做對比。 殊不知,老早出現(xiàn)的 Swing 框架(Java UI)就已經(jīng)將異步化思維玩的很溜了,不信的可以看看其內(nèi)部 Observer模式(觀察者)的實現(xiàn)。
除了前端,Reactive 概念在大數(shù)據(jù)領(lǐng)域的應(yīng)用其實非常的廣泛了。 但是對于大多數(shù)做 Web 后端開發(fā)的人來說或許普及程度并不高,以筆者自身的感受是,碼了這么些年頭,除了做好代碼分層之外,似乎也沒有見到 Reactive可以發(fā)揮重大作用的地方。 原因就在于,在Web 后端開發(fā)領(lǐng)域基本是依托 HTTP協(xié)議機(jī)制實現(xiàn)的,這是一個相當(dāng)簡單的 請求 -> 應(yīng)答交互模式,客戶端在發(fā)送請求后,會一直等待結(jié)果返回,也就是結(jié)果的通知是由客戶端主動獲取而非異步通知的,因此并不是 Reactive 的風(fēng)格。 但這已經(jīng)是符合用戶一貫的使用方式了,絕大多數(shù)情況下并不需要做什么樣的變化,此時我們對響應(yīng)式的感知并不深刻。
更符合Reactive 的另外一個場景是 富客戶端(Rich Application),假設(shè)在需要大量復(fù)雜的前端交互的場景下,我們可以選擇將一些邏輯放在前端代碼中實現(xiàn)。
此時的 Web 交互就不再是整個頁面的刷新,而是演變?yōu)榭蛻舳伺c服務(wù)端的"實時"雙向通訊,這類應(yīng)用也比較普遍了,比如基于 WebSocket 實現(xiàn)的 在線通信、互動應(yīng)用 等等。
淺顯的從趨勢上看, Reactive 的前景還是很明朗的,這里并不是說因為現(xiàn)在多數(shù)流行的編程語言中都有它的影子(比如提供了Rx風(fēng)格的框架)。
而是未來的大數(shù)據(jù)處理、實時流計算會成為主流,這是環(huán)境決定的。 而這時 Reactive 這種"面向流"的編程模式無疑是很合適的。
Java 平臺直到 JDK 9 才提供了對于 Reactive 的完整支持,而在此之前的JDK版本中,也以及存在一些有關(guān)聯(lián)性的API,比如:
這些關(guān)聯(lián)性API 并不是完整的 Reactive,Java 9所支持的 Reactive Stream API 來自于2013年的響應(yīng)式流規(guī)范(Reactive Stream Specification)。
https://www.reactive-streams.org/
基于這個規(guī)范中主要定義了下面幾個接口:
Java的響應(yīng)式流接口統(tǒng)一定義在 java.util.concurrent.Flow接口
Publisher
即數(shù)據(jù)的發(fā)布者。 Publisher 接口定義了一個subscribe方法,用于添加訂閱者:
首先,在subscribe方法調(diào)用成功后,Subscriber的 onSubscribe(Subscription s) 方法會被觸發(fā)(Subscription 表示當(dāng)前的訂閱關(guān)系)。
此后,正??梢岳^續(xù)調(diào)用 Subscription 的 request(long n) 方法來向發(fā)布者請求數(shù)據(jù),n是指最大的數(shù)據(jù)條目數(shù)。
發(fā)布者會產(chǎn)生3種不同的消息,分別對應(yīng)到 Subscriber 的3個回調(diào)方法:
數(shù)據(jù)消息:對應(yīng) onNext 方法,表示發(fā)布者產(chǎn)生的數(shù)據(jù)。
錯誤消息:對應(yīng) onError 方法,表示發(fā)布者產(chǎn)生了錯誤。
結(jié)束消息:對應(yīng) onComplete 方法,表示發(fā)布者已經(jīng)完成了所有數(shù)據(jù)的發(fā)布。
在上面的3種通知中,錯誤、結(jié)束消息都表示當(dāng)前的流已經(jīng)到達(dá)了終點,后面不再會有消息產(chǎn)生。
Subscription
Subscription 表示的是一個訂閱關(guān)系。 可以通過該對象請求數(shù)據(jù)(request方法),或者取消訂閱(cancel方法)。
負(fù)壓的支持
負(fù)壓是響應(yīng)式流定義的一種重要的能力,在上述的接口中,實質(zhì)上已經(jīng)提供了負(fù)壓的支持。
Publisher 只有在收到請求之后,才會產(chǎn)生數(shù)據(jù)。 這就保證了 Subscriber 可以根據(jù)自己的處理能力,確定要向 Publisher 請求的數(shù)據(jù)量,以此保證自身不會被沖垮。
下面,以一個簡單的代碼示例來演示 Reactive Stream API 是如何使用的。
以制奶廠為例,為了提高營收,工廠推出了一個廠家直銷的業(yè)務(wù)。 顧客可以直接向廠方訂購一定天數(shù)的奶制品,每天則是由工廠的服務(wù)人員送貨上門。
為了模擬這個場景,我們實現(xiàn)的代碼如下:
public class MilkFactory extends SubmissionPublisher<String> {
private final ScheduledFuture<?> periodicTask;
private final ScheduledExecutorService scheduler;
private static final List<String> milks = Arrays.asList("益力多", "酸牛奶", "原味奶", "低脂蛋奶", "羊奶", "甜牛奶");
public MilkFactory() {
super();
//初始化定時器
scheduler = new ScheduledThreadPoolExecutor(1);
//每一天生產(chǎn)完牛奶并推送給消費者
periodicTask = scheduler.scheduleAtFixedRate(
() -> submit(produceMilk()), 0, 1, TimeUnit.SECONDS);
}
//隨機(jī)生產(chǎn)牛奶
private String produceMilk() {
return milks.get((int) (Math.random() * milks.size()));
}
//關(guān)閉流
public void close() {
periodicTask.cancel(false);
scheduler.shutdown();
super.close();
}
}
MilkFactory 集成自SubmissionPublisher(一個提供緩沖的Publisher實現(xiàn)),其內(nèi)部會啟動一個定時器,用于模擬每天給用戶發(fā)放生產(chǎn)的牛奶。
通過submit()方法可以將數(shù)據(jù)推送給用戶。
public class MilkCustomer implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
private AtomicInteger available = new AtomicInteger(0);
private int dayCount;
public MilkCustomer(int dayCount) {
this.dayCount = dayCount;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
//設(shè)置總量
available.set(dayCount);
//第一天
subscription.request(1);
}
@Override
public void onNext(String milk) {
System.out.println("今天的牛奶到了: " + milk);
//如果還有存量,繼續(xù)請求
if(available.decrementAndGet() > 0){
subscription.request(1);
}else{
System.out.println("牛奶套餐已經(jīng)派完,歡迎繼續(xù)訂購");
this.subscription.cancel();
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("closed.");
}
}
MilkCustomer 接受一個dayCount入?yún)?,即表示訂購的?shù)量,在首次訂閱時會請求第一天的奶品,此后則每次收到到奶品后再請求下一天的,直到將總量消費完。
執(zhí)行下面的代碼:
MilkFactory factory = new MilkFactory();
//訂閱1周
MilkCustomer customer = new MilkCustomer(7);
factory.subscribe(customer);
輸出:
今天的牛奶到了: 酸牛奶
今天的牛奶到了: 羊奶
今天的牛奶到了: 原味奶
牛奶套餐已經(jīng)派完,歡迎繼續(xù)訂購
在上例中,我們使用 Java 提供的 Reactive Stream API 實現(xiàn)了一個"在線送奶" 的業(yè)務(wù)流。
整個過程相對是比較簡單的,最關(guān)鍵的地方就在于對流式處理以及訂閱關(guān)系的理解。 然而目前的 Reactive 實現(xiàn)還沒有完全的統(tǒng)一,比如 Spring WebFlux(SpringBoot 2支持) 仍然是基于 Reactor 私有API而不是 Reactive Stream API 來構(gòu)建的,后面有機(jī)會再做下介紹。
關(guān)于Future和CompletableFuture的區(qū)別
https://juejin.im/post/5adbf8226fb9a07aac240a67
文章題目:Reactive(2)響應(yīng)式流與制奶廠業(yè)務(wù)
當(dāng)前鏈接:http://aaarwkj.com/article24/pdejce.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動態(tài)網(wǎng)站、軟件開發(fā)、商城網(wǎng)站、手機(jī)網(wǎng)站建設(shè)、外貿(mào)建站、服務(wù)器托管
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)