學(xué)Flink第八章多流轉(zhuǎn)換的時(shí)候,進(jìn)行合流操作.connect()使用到了第九章狀態(tài)編程的知識(shí),感覺總體不是很清晰,因此學(xué)完?duì)顟B(tài)編程后現(xiàn)在進(jìn)行重溫并細(xì)化一些細(xì)節(jié)
用戶進(jìn)行支付的時(shí)候,后臺(tái)是需要調(diào)用第三方服務(wù)平臺(tái)進(jìn)行服務(wù),即用戶支付請(qǐng)求,頁(yè)面將會(huì)跳轉(zhuǎn)到第三方支付平臺(tái)支付
用戶進(jìn)行支付之后,第三方支付平臺(tái)給到用戶前端支出反饋,并且給我們平臺(tái)發(fā)送用戶已經(jīng)付款的消息
第三方支付平臺(tái)需要將錢再轉(zhuǎn)入到我們平臺(tái)賬戶
如果進(jìn)行到圖中④,如果發(fā)生數(shù)據(jù)丟失,那么用戶已經(jīng)支付的消息無法傳達(dá)給到后臺(tái),而后不能關(guān)閉訂單
因此需要進(jìn)行實(shí)時(shí)對(duì)賬操作,即用戶提交的支付請(qǐng)求(客戶端),以及第三方支付平臺(tái)給到的請(qǐng)求(三方端),兩者可以當(dāng)成兩條流
如果進(jìn)行兩條流的操作后不匹配,那么將進(jìn)行預(yù)警
兩個(gè)流都給他標(biāo)上時(shí)間戳(使用watermark標(biāo)志)
使用狀態(tài)編程保存狀態(tài)以及設(shè)置定時(shí)器,來進(jìn)行兩條流的連接以及等待
如果對(duì)方流中有我流的數(shù)據(jù),那么直接輸出成功;如果沒有則更新我流狀態(tài),注冊(cè)定時(shí)器等待另一個(gè)流
然后用ontimer()觸發(fā)定時(shí)器:判斷條件如果兩條流中還有狀態(tài)沒被清空,說明沒匹配上
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//來自app的支付日志
SingleOutputStreamOperator>appStream = env.fromElements(
Tuple3.of("order-1", "app", 1000L),
Tuple3.of("order-2", "app", 2000L),
Tuple3.of("order-3", "app", 3500L)
).assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner>() {@Override
public long extractTimestamp(Tuple3element, long recordTimestamp) {return element.f2;
}
})
);
//來自第三方平臺(tái)的支付日志
SingleOutputStreamOperator>thirdpartStream = env.fromElements(
Tuple4.of("order-1", "third-party", "success", 3000L),
Tuple4.of("order-3", "third-party", "success", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.
>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner>() {@Override
public long extractTimestamp(Tuple4element, long recordTimestamp) {return element.f3;
}
})
);
//檢測(cè)同一支付單在兩條流中是否匹配,等待一段時(shí)間后,不匹配就報(bào)警
// //這種也可以
// appStream.keyBy(data->data.f0)
// .connect(thirdpartStream.keyBy(data ->data.f0));
//
appStream.connect(thirdpartStream)
.keyBy(data->data.f0,data->data.f0)
.process(new OrderMatchResult())
.print();
env.execute();
}
//自定義實(shí)現(xiàn)CoFunction
public static class OrderMatchResult extends CoProcessFunction,
Tuple4,String>{//定義狀態(tài)變量,用來保存已經(jīng)到達(dá)的事件
private ValueState>appEventState;
private ValueState>thirdPartyEventState;
//運(yùn)行上下文環(huán)境中獲取狀態(tài)
@Override
public void open(Configuration parameters) throws Exception {appEventState = getRuntimeContext().getState(
new ValueStateDescriptor>("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG))
);
thirdPartyEventState = getRuntimeContext().getState(
new ValueStateDescriptor>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
);
}
@Override
public void processElement1(Tuple3value, CoProcessFunction, Tuple4, String>.Context ctx, Collectorout) throws Exception {//來的是app event,看另一條流中事件是否來過
if(thirdPartyEventState.value()!=null){out.collect("對(duì)賬成功:"+value+" "+thirdPartyEventState.value());
//清空狀態(tài)
thirdPartyEventState.clear();
}else{//如果沒來就等待,并且更新狀態(tài)
appEventState.update(value);
//注冊(cè)一個(gè)5秒后的定時(shí)器,開始等待另一條的事件
ctx.timerService().registerEventTimeTimer(value.f2+5000L);
}
}
@Override
public void processElement2(Tuple4value, CoProcessFunction, Tuple4, String>.Context ctx, Collectorout) throws Exception {//來的是app event,看另一條流中事件是否來過
if(appEventState.value()!=null){out.collect("對(duì)賬成功:"+appEventState.value()+" "+value);
//清空狀態(tài)
appEventState.clear();
}else{//如果沒來就等待,并且更新狀態(tài)
thirdPartyEventState.update(value);
//注冊(cè)一個(gè)5秒后的定時(shí)器,開始等待另一條的事件
ctx.timerService().registerEventTimeTimer(value.f3);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collectorout) throws Exception {//定時(shí)器觸發(fā),判斷狀態(tài),如果某個(gè)狀態(tài)不為空,說明另一條中事件沒來
//并且不會(huì)存在兩個(gè)都不為空,因?yàn)槠渲幸粋€(gè)不為空后會(huì)被清除
//沒有沒清空表示失敗
if(appEventState.value()!=null){out.collect("對(duì)賬失敗:"+appEventState.value()+" "+"第三方支付平臺(tái)信息未到");
}
if(thirdPartyEventState.value()!=null){out.collect("對(duì)賬失?。?+thirdPartyEventState.value()+" "+"APP信息信息未到");
}
//清空所有數(shù)據(jù)
appEventState.clear();
thirdPartyEventState.clear();
}
}
}
對(duì)賬成功:(order-1,app,1000) (order-1,third-party,success,3000)
對(duì)賬成功:(order-3,app,3500) (order-3,third-party,success,4000)
對(duì)賬失?。?order-2,app,2000) 第三方支付平臺(tái)信息未到
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
文章名稱:Flink-使用合流操作進(jìn)行實(shí)時(shí)對(duì)賬需求的實(shí)現(xiàn)-創(chuàng)新互聯(lián)
鏈接分享:http://aaarwkj.com/article34/ihjpe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、ChatGPT、虛擬主機(jī)、網(wǎng)站排名、外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站維護(hù)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容