Flink中watermark主要解決保序問題. 而保序問題的根本原因是多個(gè)任務(wù)同時(shí)從流中并行處理數(shù)據(jù),順序無法保證.
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)建站!專注于網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、微信小程序開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了柳城免費(fèi)建站歡迎大家使用!
上游: 生成watermark
一般在WINDOW 操作之前生成WATERMARK, WATERMARK 有兩種:
AssignWithPeriodicWatermarks:
每隔N秒自動(dòng)向流里注入一個(gè)WATERMARK 時(shí)間間隔由ExecutionConfig.setAutoWatermarkInterval 決定. 每次調(diào)用getCurrentWatermark 方法, 如果得到的WATERMARK 不為空并且比之前的大就注入流中 (emitWatermark)
參考 TimestampsAndPeriodicWatermarksOperator.processElement
AssignWithPunctuatedWatermarks:
基于事件向流里注入一個(gè)WATERMARK,每一個(gè)元素都有機(jī)會(huì)判斷是否生成一個(gè)WATERMARK. 如果得到的WATERMARK 不為空并且比之前的大就注入流中 (emitWatermark)
參考 TimestampsAndPunctuatedWatermarksOperator.processElement
每次生成WATERMARK將覆蓋流中已有的WATERMARK
下游: 處理watermark
StatusWatermarkValve 負(fù)責(zé)將不同Channel 的Watermark 對(duì)齊,再傳給pipeline 下游,對(duì)齊的概念是當(dāng)前Channel的Watermark時(shí)間大于所有Channel最小的Watermark時(shí)間
WindowOperator 的處理:
WindowOperator.processElement
實(shí)際觀察結(jié)果:
Window 觸發(fā)的條件
在 WindowOperator 中有兩個(gè)點(diǎn)會(huì)檢查窗口是否觸發(fā),兩者的檢查條件有所不同
processElement 這是在新的流數(shù)據(jù)進(jìn)入時(shí)觸發(fā)
檢查條件: watermark時(shí)間 >= 窗口最大時(shí)間 參見 EventTimeTrigger.onElement
如果窗口不能被觸發(fā)則調(diào)用InteralTimeService.registerEventTimeTimer 注冊(cè)一個(gè)定時(shí)器,以KEY+窗口最大時(shí)間為條件觸發(fā), 到一定時(shí)間后定時(shí)器會(huì)被自動(dòng)銷毀. 時(shí)間為窗口最大時(shí)間+WindowOperator.allowedLateness WindowOperator.allowedLateness 可以通過 Stream.window(...).allowedLateness(...) 設(shè)置. 一般應(yīng)該略大于WatermarkGenerator 的 maxOutOfOrderness
WATERMARK和普通數(shù)據(jù)分開處理
如果一個(gè)元素來的過晚 element.getTimestamp + allowedLateness < currentWatermark
會(huì)有一個(gè)特殊的OutputTag 和正常的流數(shù)據(jù)區(qū)分開
參考 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html
如果窗口來的過晚, window.maxTimestamp + allowedLateness < currentWatermark, 則窗口會(huì)被直接丟棄
Watermark 的問題:
默認(rèn)的Watermark機(jī)制是數(shù)據(jù)驅(qū)動(dòng)的,新的數(shù)據(jù)進(jìn)入才會(huì)觸發(fā)水位上升, 而由于maxOutOfOrderness 的存在, watermark < 最大流數(shù)據(jù)時(shí)間 < 當(dāng)前窗口結(jié)束時(shí)間
根據(jù)之前的分析,最新的時(shí)間窗口總是不會(huì)被觸發(fā),除非更新的數(shù)據(jù)進(jìn)入再次提高水位到當(dāng)前窗口結(jié)束時(shí)間以后, 如果數(shù)據(jù)進(jìn)入的頻率低或者沒有新的數(shù)據(jù)進(jìn)入流,那最新的時(shí)間窗口被處理的延時(shí)會(huì)非常高甚至永遠(yuǎn)不會(huì)被觸發(fā),這在實(shí)時(shí)性要求高的流式系統(tǒng)是很致命的. 比如一個(gè)銀行系統(tǒng),要做客戶賬號(hào)層面的保序,每個(gè)賬號(hào)的交易可能一天只有幾筆甚至一筆,如果我們?cè)赪indow 處理的時(shí)候KEY BY 賬號(hào)就會(huì)引起上述問題. 我們可以考慮KEY BY的條件改為 HASH(賬號(hào)) 再取模,然后在窗口處理中再次根據(jù)賬號(hào)分組,這樣雖然處理復(fù)雜一些,但是保證了窗口中數(shù)據(jù)的頻次
另外一種方案是優(yōu)化WATERMARK生成的機(jī)制,如果一段時(shí)間后WATERMARK仍然沒有變化,那就將WATERMARK自動(dòng)上漲一次到當(dāng)前窗口的結(jié)束時(shí)間,這樣保證窗口處理的延時(shí)有個(gè)上限
public abstract class AbstractWatermarkGenerator<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = -2006930231735705083L;
private static final Logger logger = LoggerFactory.getLogger(AbstractWatermarkGenerator.class);
private final long maxOutOfOrderness; // 10 seconds
private long windowSize;
private long currentMaxTimestamp;
private long lastTimestamp = 0;
private long lastWatermarkChangeTime = 0;
private long windowPurgeTime = 0;
public AbstractWatermarkGenerator(long maxOutOfOrderness, long windowSize) {
this.maxOutOfOrderness = maxOutOfOrderness;
this.windowSize = windowSize;
}
public AbstractWatermarkGenerator() {
this(10000, 10000);
}
protected abstract long extractCurTimestamp(T element) throws Exception;
public long extractTimestamp(T element,
long previousElementTimestamp) {
try {
long curTimestamp = extractCurTimestamp(element);
lastWatermarkChangeTime = new Date().getTime();
currentMaxTimestamp = Math.max(curTimestamp, currentMaxTimestamp);
windowPurgeTime = Math.max(windowPurgeTime, getWindowExpireTime(currentMaxTimestamp));
if (logger.isDebugEnabled()) {
logger.debug("Extracting timestamp: {}", currentMaxTimestamp);
}
return curTimestamp;
} catch (Exception e) {
logger.error("Error extracting timestamp", e);
}
return 0;
}
protected long getWindowExpireTime(long currentMaxTimestamp) {
long windowStart = TimeWindow.getWindowStartWithOffset(currentMaxTimestamp, 0, windowSize);
long windowEnd = windowStart + windowSize;
return windowEnd + maxOutOfOrderness;
}
public Watermark getCurrentWatermark() {
long curTime = new Date().getTime();
if (currentMaxTimestamp > lastTimestamp) {
if (logger.isDebugEnabled()) {
logger.debug("Current max timestamp has been increased since last");
}
lastTimestamp = currentMaxTimestamp;
lastWatermarkChangeTime = curTime;
}
else {
long diff = windowPurgeTime - currentMaxTimestamp;
if (diff > 0 && curTime - lastWatermarkChangeTime > diff) {
if (logger.isDebugEnabled()) {
logger.debug("Increase current MaxTimestamp once");
}
currentMaxTimestamp = windowPurgeTime;
lastTimestamp = currentMaxTimestamp;
lastWatermarkChangeTime = curTime;
}
}
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
實(shí)際測(cè)試中發(fā)現(xiàn) WATERMARK是否觸發(fā)和算子的并發(fā)度和WATERMARK生成的位置有關(guān)
測(cè)試結(jié)果如下:
所以注意WINDOW算子之前最好避免讓下游算子的并發(fā)度超過上游算子,否則就把WATERMARK的生成盡量放到DAG的前端,這樣WATERMARK可以被傳遞到下游算子
本文題目:Flinkwatermark
文章路徑:http://aaarwkj.com/article18/gdgodp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供Google、商城網(wǎng)站、品牌網(wǎng)站設(shè)計(jì)、小程序開發(fā)、響應(yīng)式網(wǎng)站、網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)