這篇文章主要講解了“Flink中怎么使用split”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Flink中怎么使用split”吧!
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、小程序定制開(kāi)發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了昌邑免費(fèi)建站歡迎大家使用!
flink的神奇分流器-sideoutput
這個(gè)可以用來(lái)分流,很方便的一次就可以對(duì)數(shù)據(jù)進(jìn)行篩選返回。
還有針對(duì)算法處理的迭代操作,我們已經(jīng)講過(guò)兩篇文章了:
Flink特異的迭代操作-bulkIteration
不得不會(huì)的Flink Dataset的DeltaI迭代操作
一個(gè)是全量迭代,一個(gè)是增量迭代。
還有優(yōu)秀又雞肋的watermark機(jī)制
不懂watermark?來(lái)吧~
對(duì)于迭代操作,其實(shí)還有一講,那就是流處理的迭代操作。那么本文就針對(duì)這個(gè)進(jìn)行分析~
Flink的迭代流程序?qū)嶋H就是實(shí)現(xiàn)了一個(gè)步進(jìn)函數(shù),然后將其嵌入到IterativeStream內(nèi)部。要知道Flink的Datastream正常情況下是不會(huì)結(jié)束的,所以也沒(méi)有所謂的最大迭代次數(shù)。這種情況下,你需要自己指定哪個(gè)類型的數(shù)據(jù)需要回流去繼續(xù)迭代,哪個(gè)類型的數(shù)據(jù)繼續(xù)向下傳輸,這個(gè)分流的方式有兩種:split和filter,官方網(wǎng)站在介紹迭代流的時(shí)候使用的是filter。我們這里就先按照官網(wǎng)的介紹走,然后案例展示的時(shí)候使用split給大家做個(gè)demo。
首先,要?jiǎng)?chuàng)建一個(gè)IterativeStream
IterativeStream<Integer> iteration =input.iterate();
接著就可以定義對(duì)該留要進(jìn)行的邏輯操作,官網(wǎng)這里就很簡(jiǎn)單的舉了一個(gè)map的例子。
DataStream<Integer> iterationBody =iteration.map(/* this is executed many times */);
調(diào)用IterativeStream的closeWith(feedbackStream)方法可以對(duì)迭代流進(jìn)行閉環(huán)操作。傳遞給closeWith函數(shù)的DataStream會(huì)返回值迭代的頭部。常用的做法是用filter來(lái)分離流的向后迭代的部分和向前傳遞的部分。。
iteration.closeWith(iterationBody.filter(/*one part of the stream */));
DataStream<Integer> output =iterationBody.filter(/* some other part of the stream */);
官方給了一個(gè)連續(xù)不斷減1直到數(shù)據(jù)為零的例子:
DataStream<Long> someIntegers =env.generateSequence(0, 1000);
// 創(chuàng)建迭代流
IterativeStream<Long> iteration =someIntegers.iterate();
// 增加處理邏輯,對(duì)元素執(zhí)行減一操作。
DataStream<Long> minusOne =iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
// 獲取要進(jìn)行迭代的流,
DataStream<Long> stillGreaterThanZero= minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
// 對(duì)需要迭代的流形成一個(gè)閉環(huán)
iteration.closeWith(stillGreaterThanZero);
// 小于等于0的數(shù)據(jù)繼續(xù)向前傳輸
DataStream<Long> lessThanZero =minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
感謝各位的閱讀,以上就是“Flink中怎么使用split”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Flink中怎么使用split這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
當(dāng)前名稱:Flink中怎么使用split
分享鏈接:http://aaarwkj.com/article28/pccecp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營(yíng)銷(xiāo)推廣、移動(dòng)網(wǎng)站建設(shè)、自適應(yīng)網(wǎng)站、、網(wǎng)站維護(hù)、外貿(mào)建站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)