這篇文章給大家介紹FLINK中如何使用進(jìn)行網(wǎng)站監(jiān)控報(bào)警和報(bào)警恢復(fù),內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。
成都創(chuàng)新互聯(lián)公司2013年至今,先為柘榮等服務(wù)建站,柘榮等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為柘榮企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問(wèn)題。
flink CEP 簡(jiǎn)介
flink CEP(Complex event processing),是在Flink之上實(shí)現(xiàn)的復(fù)雜事件處理庫(kù),可以允許我們?cè)诓粩嗟牧魇綌?shù)據(jù)中通過(guò)我們自己定義的模式(Pattern)檢測(cè)和獲取出我們想要的數(shù)據(jù),然后對(duì)這些數(shù)據(jù)進(jìn)行下一步的處理。通過(guò)各種pattern的組合,我們可以定義出非常復(fù)雜的模式來(lái)匹配我們的數(shù)據(jù)。
網(wǎng)上講CEP原理和用法的文章很多,大家可以參考下 https://juejin.im/post/5de1f32af265da05cc3190f9#heading-9
簡(jiǎn)單來(lái)說(shuō)一下,其實(shí)我們可以把使用flink cep當(dāng)做我們平時(shí)用的正則表達(dá)式,cep中的Pattern就是我們定義的正則表達(dá)式,flink中的DataStream就是正則表達(dá)式中待匹配的字符串,flink 通過(guò)DataStream 和 自定義的Pattern進(jìn)行匹配,生成一個(gè)經(jīng)過(guò)過(guò)濾之后的DataStream .
基于自定義的pattern,我們可以做很多工作,比如監(jiān)控報(bào)警、風(fēng)控、反爬等等,接下來(lái)我們基于一個(gè)簡(jiǎn)單的報(bào)警小例子來(lái)講解一些FLINK cep的實(shí)際應(yīng)用。
我們基于flink CEP做一個(gè)簡(jiǎn)單的報(bào)警,首先我們簡(jiǎn)化一下報(bào)警的需求
1.統(tǒng)計(jì)出來(lái)每秒鐘http狀態(tài)碼為非200的數(shù)量所占比例。大于0.7的時(shí)候觸發(fā)報(bào)警。
2.統(tǒng)計(jì)結(jié)果連續(xù)發(fā)生三大于閾值(0.7,這個(gè)數(shù)字是我自己寫(xiě)的,為了測(cè)試用,真實(shí)環(huán)境需要根據(jù)實(shí)際經(jīng)驗(yàn)來(lái)設(shè)置)發(fā)送報(bào)警通知。
3.統(tǒng)計(jì)結(jié)果小于等于閾值觸發(fā)報(bào)警恢復(fù)通知。
實(shí)際應(yīng)用中我們一般會(huì)去消費(fèi)kafka的數(shù)據(jù)來(lái)作為source、這里我們?yōu)榱撕?jiǎn)化,通過(guò)自定義source生成一些模擬的數(shù)據(jù)。
public static class MySource implements SourceFunction<Tuple4<String,Long,Integer,Integer>>{
static int status[] = {200, 404, 500, 501, 301};
@Override
public void run(SourceContext<Tuple4<String,Long,Integer,Integer>> sourceContext) throws Exception{
while (true){
Thread.sleep((int) (Math.random() * 100));
// traceid,timestamp,status,response time
Tuple4 log = Tuple4.of(
UUID.randomUUID().toString(),
System.currentTimeMillis(),
status[(int) (Math.random() * 4)],
(int) (Math.random() * 100));
sourceContext.collect(log);
}
}
@Override
public void cancel(){
}
}
接下來(lái)我們定義一個(gè)sql,用來(lái)計(jì)算我們的需求中的第一個(gè)要求。
String sql = "select pv,errorcount,round(CAST(errorcount AS DOUBLE)/pv,2) as errorRate," +
"(starttime + interval '8' hour ) as stime," +
"(endtime + interval '8' hour ) as etime " +
"from (select count(*) as pv," +
"sum(case when status = 200 then 0 else 1 end) as errorcount, " +
"TUMBLE_START(proctime,INTERVAL '1' SECOND) as starttime," +
"TUMBLE_END(proctime,INTERVAL '1' SECOND) as endtime " +
"from log group by TUMBLE(proctime,INTERVAL '1' SECOND) )";
通過(guò)執(zhí)行sql,我們獲取到了一個(gè)Result對(duì)象的DataStream
Table table = tenv.sqlQuery(sql);
DataStream<Result> ds1 = tenv.toAppendStream(table, Result.class);
接下來(lái)我們到了最核心的地方,我們需要定一個(gè)Pattern。
Pattern pattern = Pattern.<Result>begin("alert").where(new IterativeCondition<Result>(){
@Override
public boolean filter(
Result i, Context<Result> context) throws Exception{
return i.getErrorRate() > 0.7D;
}
}).times(3).consecutive().followedBy("recovery").where(new IterativeCondition<Result>(){
@Override
public boolean filter(
Result i,
Context<Result> context) throws Exception{
return i.getErrorRate() <= 0.7D;
}
}).optional();
來(lái)詳細(xì)解釋一下這個(gè)Pattern
在我們獲得了相應(yīng)的報(bào)警和恢復(fù)之后,接下來(lái)就是調(diào)用報(bào)警接口進(jìn)行處理了,我們這只是簡(jiǎn)單的打印出來(lái)信息。
DataStream<Map<String,List<Result>>> alertStream = org.apache.flink.cep.CEP.pattern(
ds1,
pattern).select(new PatternSelectFunction<Result,Map<String,List<Result>>>(){
@Override
public Map<String,List<Result>> select(Map<String,List<Result>> map) throws Exception{
List<Result> alertList = map.get("alert");
List<Result> recoveryList = map.get("recovery");
if (recoveryList != null){
System.out.print("接受到了報(bào)警恢復(fù)的信息,報(bào)警信息如下:");
System.out.print(alertList);
System.out.print(" 對(duì)應(yīng)的恢復(fù)信息:");
System.out.println(recoveryList);
} else {
System.out.print("收到了報(bào)警信息 ");
System.out.print(alertList);
}
return map;
}
});
關(guān)于FLINK中如何使用進(jìn)行網(wǎng)站監(jiān)控報(bào)警和報(bào)警恢復(fù)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
網(wǎng)頁(yè)名稱:FLINK中如何使用進(jìn)行網(wǎng)站監(jiān)控報(bào)警和報(bào)警恢復(fù)
分享地址:http://aaarwkj.com/article0/gghcio.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、商城網(wǎng)站、服務(wù)器托管、自適應(yīng)網(wǎng)站、外貿(mào)建站、全網(wǎng)營(yíng)銷推廣
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)