本篇文章給大家分享的是有關(guān)基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么,小編覺(jué)得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話(huà)不多說(shuō),跟著小編一起來(lái)看看吧。
創(chuàng)新互聯(lián)專(zhuān)注于企業(yè)營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、網(wǎng)站重做改版、隆子網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、html5、商城網(wǎng)站制作、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)公司、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性?xún)r(jià)比高,為隆子等各大城市提供網(wǎng)站開(kāi)發(fā)制作服務(wù)。
下面將介紹一些常見(jiàn)的實(shí)時(shí)流式傳輸模式及其實(shí)現(xiàn)。
首先回顧一下如何使用 Apache Pulsar Functions 實(shí)現(xiàn)基于內(nèi)容的路由?;趦?nèi)容的路由是一種集成模式。該模式已經(jīng)存在多年,通常用于事件中心和消息框架中。基本思路是檢查每條消息的內(nèi)容,根據(jù)消息內(nèi)容將消息路由到不同目的地。
下面的例子使用了Apache Pulsar SDK,SDK 允許用戶(hù)配置三個(gè)不同的值:
用于在消息中查找匹配的正則表達(dá)式
消息匹配表達(dá)式模式時(shí)被發(fā)送到的 topic
消息不匹配表達(dá)式模式時(shí)被發(fā)送到的 topic
這個(gè)例子證明了 Pulsar Functions 功能強(qiáng)大,可以基于功能邏輯動(dòng)態(tài)決定將事件發(fā)送到哪里。
import java.util.regex.*;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public ContentBasedRoutingFunction implements Function<String, String> {
String process(String input, Context context) throws Exception {
String regex = context
.getUserConfigValue(“regex”).toString();
String matchedTopic = context
.getUserConfigValue(“matched-topic”).toString();
String unmatchedTopic = context
.getUserConfigValue(“unmatched-topic”).toString();
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(input);
if (m.matches()) {
context.publish(matchedTopic, input);
} else {
context.publish(unmatchedTopic, input);
}
}
}
如果想通過(guò)僅保留滿(mǎn)足給定條件的事件來(lái)排除 topic 上的大多數(shù)事件時(shí),應(yīng)用選擇過(guò)濾模式。過(guò)濾模式對(duì)僅查找感興趣的事件特別有效,如信用卡付款超過(guò)了一定金額;日志文件中的 ERROR 消息;傳感器讀數(shù)超過(guò)特定閾值等等(見(jiàn)模式 4)。
假如用戶(hù)在監(jiān)視信用卡交易的事件流,并嘗試檢測(cè)欺詐或可疑行為。由于交易量很大,選擇“同意/不同意”的時(shí)間有限,用戶(hù)必須先過(guò)濾掉有“風(fēng)險(xiǎn)”特征的交易,如預(yù)付現(xiàn)金、大額付款等。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;
public class FraudFilter implements Function<Purchase, Purchase> {
Purchase process(Purchase p, Context context) throws Exception {
if (p.getTransactionType() == ‘CASH ADVANCE’) ||
p.getAmount > 500.00) {
return p;
}
return null;
}
}
可以使用過(guò)濾器來(lái)過(guò)濾有“風(fēng)險(xiǎn)”特征的交易。過(guò)濾器可以識(shí)別這些“風(fēng)險(xiǎn)”特征,并只將這些交易路由到一個(gè)單獨(dú)的 topic 上以進(jìn)行進(jìn)一步評(píng)估。
經(jīng)過(guò)過(guò)濾器過(guò)濾后,所有信用卡支付都可以被路由到一個(gè)“潛在欺詐行為”的 topic 上進(jìn)行進(jìn)一步評(píng)估,而其他事件則會(huì)被過(guò)濾掉,過(guò)濾器也不會(huì)對(duì)過(guò)濾掉的事件執(zhí)行任何操作。
上圖是基于三個(gè)獨(dú)立支付對(duì)象的 FraudFilter function。第一次支付符合給定標(biāo)準(zhǔn),被路由到“潛在欺詐行為”topic 上進(jìn)行進(jìn)一步評(píng)估;而第二次和第三次支付不符合欺詐標(biāo)準(zhǔn),直接被過(guò)濾掉(沒(méi)有被路由到“潛在欺詐行為”過(guò)濾器上)。
轉(zhuǎn)換模式用于將事件從一種類(lèi)型轉(zhuǎn)換為另一種類(lèi)型,或用于添加、刪除或修改輸入事件的值。
|| 投影
投影模式類(lèi)似于關(guān)系代數(shù)中的投影算子,選擇輸入事件的屬性子集,并創(chuàng)建僅包含這些屬性的輸出事件。投影模式可用于刪除事件中的敏感字段,或者只保留事件中的必要屬性。下圖為投影模式的一種應(yīng)用,在將記錄發(fā)布到下游 topic 前,“屏蔽”傳入的社安全號(hào)碼。
|| 富集模式
富集模式用于將數(shù)據(jù)添加到輸入屬性中不存在的輸出事件中。典型的富集模式包含基于輸入事件中的某個(gè)鍵值對(duì)引用數(shù)據(jù)進(jìn)行某種查找。以下示例展示了如何根據(jù)輸入事件中包含的 IP 地址將地理位置添加到輸出事件。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;
import com.company.services.GeoService;
public class IPLookup implements Function<Purchase, Purchase> {
Purchase process(Purchase p) throws Exception {
Geo g = GeoService.getByIp(p.getIPAddress());
// By default, these fields are blank, so we just modify the object
p.setLongitude(g.getLon());
p.setLatitiude(g.getLat());
return p;
}
}
|| 分離模式
在分離模式下,事件處理器接收單個(gè)輸入事件,并將其分為多個(gè)輸出事件。當(dāng)輸入事件是一個(gè)包含多個(gè)單獨(dú)事件(如日志文件中的 entry)的批處理,并且想要單獨(dú)處理每個(gè)事件時(shí),分離模式十分適用。下圖展示了分離模式的處理過(guò)程:先根據(jù)換行符分隔輸入,再逐行發(fā)布到配置的輸出 topic。
此 function 的實(shí)現(xiàn)過(guò)程如下:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class Splitter implements Function<String, String> {
String process(String s, Context context) throws Exception {
Arrays.asLists(s.split(“\\R”).forEach(line ->
context.publish(context.getOutputTopic(), line));
return null;
}
}
警報(bào)和閾值模式可進(jìn)行檢測(cè),并根據(jù)檢測(cè)條件生成警報(bào)(如高溫警報(bào))??梢曰诤?jiǎn)單的值,也可以基于較復(fù)雜的條件(如增長(zhǎng)率、數(shù)量的持續(xù)變化等)生成警報(bào)。
下面的示例為基于用戶(hù)配置的閾值參數(shù)(如 100.00,38.7 等)和接收警報(bào)通知的郵箱地址生成警報(bào)。當(dāng)此 function 接收到超過(guò)配置閾值的傳感器事件時(shí),將發(fā)送電子郵件。
import javax.mail.*;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public SimpleAlertFunction implements Function<Sensor, Void> {
Void process(Sensor sensor, Context context) throws Exception {
Double threshold = context
.getUserConfigValue(“threshold”).toString();
String alertEmail = context
.getUserConfigValue(“alert-email”).toString();
if (sensor.getReading() >= threshold) {
Session s = Session.getDefaultInstance();
MimeMessage msg = new MineMessage(s);
msg.setText(“Alert for Sensor:” + sensor.getId());
Transport.send(msg);
}
return null;
}
}
下面是一個(gè)有狀態(tài) function 示例,該 function 根據(jù)特定傳感器讀數(shù)的增長(zhǎng)率生成警報(bào)。在決定是否生成警報(bào)時(shí),需要訪問(wèn)以前的傳感器讀數(shù)。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public ComplexAlertFunction implements Function<Sensor, Void> {
Void process(Sensor sensor, Context context) throws Exception {
Double threshold = context
.getUserConfigValue(“threshold”).toString();
String alertTopic = context
.getUserConfigValue(“alert-topic”).toString();
// Get previous & current metric values
Float previous = context.getState(sensor.getId() + “-metric”);
Long previous_time = context.getState(sensor.getId() + “-metric-time”);
Float current = sensor.getMetric();
Long current_time = sensor.getMetricTime();
// Calculate Rate of change & compare to threshold.
Double rateOfChange = (current-previous) /
(current_time-previous_time);
if (abs(rateOfChange) >= threshold) {
// Publish the sensor ID to the alert topic for handling
context.publish(alertTopic, sensor.getId());
}
// Update metric values
context.putState(sensor.getId() + “-metric”, current);
context.putState(sensor.getId() + “-metric-time”, current_time);
}
}
通過(guò) Apache Pulsar Functions 狀態(tài)管理特性?xún)H保留先前的度量讀數(shù)和時(shí)間,并將傳感器 ID 添加到這些值中(因?yàn)閷?huì)處理來(lái)自多個(gè)傳感器的度量,所以需要傳感器 ID)。為了簡(jiǎn)單起見(jiàn),假設(shè)事件以正確的順序到達(dá),即始終是最新讀數(shù),沒(méi)有亂序讀數(shù)。
另外,這一次我們將傳感器 ID 轉(zhuǎn)發(fā)到一個(gè)專(zhuān)門(mén)的警報(bào) topic 以進(jìn)行進(jìn)一步處理,而不是僅發(fā)送電子郵件。通過(guò)這種方式,我們可以對(duì)事件進(jìn)行額外的富集處理(通過(guò) Pulsar Functions)。例如,查找獲取傳感器的地理位置,然后通知相關(guān)人員。
簡(jiǎn)單計(jì)數(shù)和窗口計(jì)數(shù)模式使用了聚合函數(shù),聚合函數(shù)將事件的集合作為輸入,并通過(guò)對(duì)輸入事件應(yīng)用一個(gè) function 生成一個(gè)所需的輸出事件。聚合函數(shù)包括:求和、平均值、最大值、最小值、百分位數(shù)等。
以下為使用 Pulsar Functions 實(shí)現(xiàn)“字?jǐn)?shù)統(tǒng)計(jì)”的示例,計(jì)算每個(gè)單詞在給定 topic 中出現(xiàn)次數(shù)的總和。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public WordCountFunction implements Function<String, Void> {
Void process(String s, Context context) throws Exception {
Arrays.asLists(s.split(“\\.”).forEach(word -> context.incrCounter(word, 1));
return null;
}
}
考慮到流數(shù)據(jù) source 無(wú)休止的特性,無(wú)限期聚合用處不大,因?yàn)橥ǔJ窃跀?shù)據(jù)窗口上進(jìn)行這些計(jì)算(如前一小時(shí)內(nèi)的故障次數(shù))。
數(shù)據(jù)窗口代表事件流的有限子集,如上圖所示。但是,應(yīng)該如何定義數(shù)據(jù)窗口的邊界?有兩個(gè)用于定義窗口的常用屬性:
觸發(fā)策略:控制執(zhí)行或觸發(fā) function 代碼的時(shí)間。Apache Pulsar Function 框架通過(guò)這些規(guī)則來(lái)通知代碼處理窗口中收集的全部數(shù)據(jù)。
清除策略:控制保留在窗口中的數(shù)據(jù)量。這些規(guī)則用于決定是否從窗口中清除數(shù)據(jù)元素。
這兩個(gè)策略都是由時(shí)間或窗口中的數(shù)據(jù)量驅(qū)動(dòng)的。二者之間的區(qū)別是什么?又是怎樣協(xié)同工作的?在多種窗口技術(shù)中,最常用的是滾動(dòng)窗口和滑動(dòng)窗口。
窗口已滿(mǎn)是滾動(dòng)窗口清除策略的唯一條件,因此,只需要指定想要使用觸發(fā)策略(基于計(jì)數(shù)或基于時(shí)間)即可?;谟?jì)數(shù)的滾動(dòng)窗口是怎樣工作的?
在下圖的第一個(gè)示例中,觸發(fā)策略設(shè)置為 2,也就是說(shuō),在窗口中有兩個(gè)項(xiàng)目時(shí),觸發(fā)器將會(huì)觸發(fā),開(kāi)始執(zhí)行 Pulsar Function 代碼。這一系列行為與時(shí)間無(wú)關(guān),窗口計(jì)數(shù)達(dá)到 2 用了 5 秒還是 5 個(gè)小時(shí)并不重要,重要的是窗口計(jì)數(shù)達(dá)到 2。
將上述基于計(jì)數(shù)的滾動(dòng)窗口與基于時(shí)間的滾動(dòng)窗口(時(shí)間設(shè)置為 10 秒)進(jìn)行對(duì)比。經(jīng)過(guò) 10 秒的間隔后,無(wú)論窗口中有多少事件,function 代碼都會(huì)被觸發(fā)。在下圖中,第一個(gè)窗口中有 7 個(gè)事件,而第二個(gè)窗口中只有 3 個(gè)事件。
滑動(dòng)窗口計(jì)數(shù)定義了窗口的長(zhǎng)度,窗口長(zhǎng)度設(shè)置了清除策略以限制保留待處理的數(shù)據(jù)量;滑動(dòng)間隔定義了觸發(fā)策略。滾動(dòng)窗口策略和滑動(dòng)窗口策略都可以根據(jù)時(shí)間(時(shí)間段)或長(zhǎng)度(數(shù)據(jù)元素的數(shù)量)來(lái)定義。
在下圖中,窗口長(zhǎng)度為 2 秒,也就是說(shuō),2 秒以前的數(shù)據(jù)會(huì)被清除,并且不會(huì)用于計(jì)算?;瑒?dòng)間隔為 1 秒,即每 1 秒鐘執(zhí)行一次 Pulsar function 代碼。這樣,可以在整個(gè)窗口長(zhǎng)度內(nèi)處理數(shù)據(jù)。
前面的示例都是基于時(shí)間來(lái)定義清除策略和觸發(fā)策略,也可以根據(jù)長(zhǎng)度來(lái)定義清除策略或觸發(fā)策略,或者同時(shí)定義這兩種策略。
在 Pulsar Functions 中實(shí)現(xiàn)這兩種類(lèi)型的窗口 function 都很容易,只需要指定一個(gè) java.util.Collection 作為輸入類(lèi)型,如下所示,并在創(chuàng)建 function 時(shí)在 -userConfig 標(biāo)志中指定適當(dāng)?shù)拇翱谂渲脤傩浴?/p>
用于實(shí)現(xiàn)前面提到的時(shí)間窗口四種情形的配置參數(shù)如下:
“–windowLengthCount”:每個(gè)窗口的消息數(shù)量
“–windowLengthDurationMs”:窗口時(shí)間(以毫秒為單位)
“–slidingIntervalCount”:窗口滑動(dòng)后的消息數(shù)量
“–slidingIntervalDurationMs”:窗口滑動(dòng)后的時(shí)間
正確的組合方式如下表:
時(shí)間,滑動(dòng)窗口 | -windowLengthDurationMs = XXXX -slidingIntervalDurationMs = XXXX |
時(shí)間,Batch Window(即滾動(dòng)窗口) | -windowLengthDurationMs = XXXX |
長(zhǎng)度,滑動(dòng)窗口 | -windowLengthCount = XXXX -slidingIntervalCount = XXXX |
長(zhǎng)度,Batch Window(即滾動(dòng)窗口) | -windowLengthCount = XXXX |
以上就是基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見(jiàn)到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
當(dāng)前題目:基于PulsarFunctions的事件處理設(shè)計(jì)模式是什么
URL鏈接:http://aaarwkj.com/article16/gjjcdg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、搜索引擎優(yōu)化、網(wǎng)站設(shè)計(jì)、全網(wǎng)營(yíng)銷(xiāo)推廣、云服務(wù)器、自適應(yīng)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)