欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

基于PulsarFunctions的事件處理設(shè)計(jì)模式是什么

本篇文章給大家分享的是有關(guān)基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么,小編覺(jué)得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話(huà)不多說(shuō),跟著小編一起來(lái)看看吧。

創(chuàng)新互聯(lián)專(zhuān)注于企業(yè)營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、網(wǎng)站重做改版、隆子網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、html5、商城網(wǎng)站制作、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)公司、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性?xún)r(jià)比高,為隆子等各大城市提供網(wǎng)站開(kāi)發(fā)制作服務(wù)。

下面將介紹一些常見(jiàn)的實(shí)時(shí)流式傳輸模式及其實(shí)現(xiàn)。

模式 1:動(dòng)態(tài)路由


首先回顧一下如何使用 Apache Pulsar Functions 實(shí)現(xiàn)基于內(nèi)容的路由?;趦?nèi)容的路由是一種集成模式。該模式已經(jīng)存在多年,通常用于事件中心和消息框架中。基本思路是檢查每條消息的內(nèi)容,根據(jù)消息內(nèi)容將消息路由到不同目的地。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么

下面的例子使用了Apache Pulsar SDK,SDK 允許用戶(hù)配置三個(gè)不同的值:

  • 用于在消息中查找匹配的正則表達(dá)式

  • 消息匹配表達(dá)式模式時(shí)被發(fā)送到的 topic

  • 消息不匹配表達(dá)式模式時(shí)被發(fā)送到的 topic

這個(gè)例子證明了 Pulsar Functions 功能強(qiáng)大,可以基于功能邏輯動(dòng)態(tài)決定將事件發(fā)送到哪里。

 import java.util.regex.*;  import org.apache.pulsar.functions.api.Context;  import org.apache.pulsar.functions.api.Function;

 public ContentBasedRoutingFunction implements Function<String, String> {

    String process(String input, Context context) throws Exception {         String regex = context             .getUserConfigValue(“regex”).toString();         String matchedTopic = context             .getUserConfigValue(“matched-topic”).toString();         String unmatchedTopic = context             .getUserConfigValue(“unmatched-topic”).toString();

        Pattern p = Pattern.compile(regex);         Matcher m = p.matcher(input);         if (m.matches()) {           context.publish(matchedTopic, input);         } else {           context.publish(unmatchedTopic, input);         }     }  }


模式 2:過(guò)濾


如果想通過(guò)僅保留滿(mǎn)足給定條件的事件來(lái)排除 topic 上的大多數(shù)事件時(shí),應(yīng)用選擇過(guò)濾模式。過(guò)濾模式對(duì)僅查找感興趣的事件特別有效,如信用卡付款超過(guò)了一定金額;日志文件中的 ERROR 消息;傳感器讀數(shù)超過(guò)特定閾值等等(見(jiàn)模式 4)。

假如用戶(hù)在監(jiān)視信用卡交易的事件流,并嘗試檢測(cè)欺詐或可疑行為。由于交易量很大,選擇“同意/不同意”的時(shí)間有限,用戶(hù)必須先過(guò)濾掉有“風(fēng)險(xiǎn)”特征的交易,如預(yù)付現(xiàn)金、大額付款等。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.company.creditcard.Purchase;

public class FraudFilter implements Function<Purchase, Purchase> {

   Purchase process(Purchase p, Context context) throws Exception {         if (p.getTransactionType() == ‘CASH ADVANCE’) ||             p.getAmount > 500.00) {            return p;         }        return null;    }}

可以使用過(guò)濾器來(lái)過(guò)濾有“風(fēng)險(xiǎn)”特征的交易。過(guò)濾器可以識(shí)別這些“風(fēng)險(xiǎn)”特征,并只將這些交易路由到一個(gè)單獨(dú)的 topic 上以進(jìn)行進(jìn)一步評(píng)估。

經(jīng)過(guò)過(guò)濾器過(guò)濾后,所有信用卡支付都可以被路由到一個(gè)“潛在欺詐行為”的 topic 上進(jìn)行進(jìn)一步評(píng)估,而其他事件則會(huì)被過(guò)濾掉,過(guò)濾器也不會(huì)對(duì)過(guò)濾掉的事件執(zhí)行任何操作。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么

上圖是基于三個(gè)獨(dú)立支付對(duì)象的 FraudFilter function。第一次支付符合給定標(biāo)準(zhǔn),被路由到“潛在欺詐行為”topic 上進(jìn)行進(jìn)一步評(píng)估;而第二次和第三次支付不符合欺詐標(biāo)準(zhǔn),直接被過(guò)濾掉(沒(méi)有被路由到“潛在欺詐行為”過(guò)濾器上)。



模式 3:轉(zhuǎn)換


轉(zhuǎn)換模式用于將事件從一種類(lèi)型轉(zhuǎn)換為另一種類(lèi)型,或用于添加、刪除或修改輸入事件的值。

|| 投影

投影模式類(lèi)似于關(guān)系代數(shù)中的投影算子,選擇輸入事件的屬性子集,并創(chuàng)建僅包含這些屬性的輸出事件。投影模式可用于刪除事件中的敏感字段,或者只保留事件中的必要屬性。下圖為投影模式的一種應(yīng)用,在將記錄發(fā)布到下游 topic 前,“屏蔽”傳入的社安全號(hào)碼。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么

|| 富集模式

富集模式用于將數(shù)據(jù)添加到輸入屬性中不存在的輸出事件中。典型的富集模式包含基于輸入事件中的某個(gè)鍵值對(duì)引用數(shù)據(jù)進(jìn)行某種查找。以下示例展示了如何根據(jù)輸入事件中包含的 IP 地址將地理位置添加到輸出事件。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.company.creditcard.Purchase;import com.company.services.GeoService;

public class IPLookup implements Function<Purchase, Purchase> {    Purchase process(Purchase p) throws Exception {      Geo g = GeoService.getByIp(p.getIPAddress());      // By default, these fields are blank, so we just modify the object      p.setLongitude(g.getLon());      p.setLatitiude(g.getLat());      return p;    }}

|| 分離模式

在分離模式下,事件處理器接收單個(gè)輸入事件,并將其分為多個(gè)輸出事件。當(dāng)輸入事件是一個(gè)包含多個(gè)單獨(dú)事件(如日志文件中的 entry)的批處理,并且想要單獨(dú)處理每個(gè)事件時(shí),分離模式十分適用。下圖展示了分離模式的處理過(guò)程:先根據(jù)換行符分隔輸入,再逐行發(fā)布到配置的輸出 topic。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么

此 function 的實(shí)現(xiàn)過(guò)程如下:


import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;

public class Splitter implements Function<String, String> {

   String process(String s, Context context) throws Exception {       Arrays.asLists(s.split(“\\R”).forEach(line ->            context.publish(context.getOutputTopic(), line));       return null;    }}


模式 4:警報(bào)和閾值


警報(bào)和閾值模式可進(jìn)行檢測(cè),并根據(jù)檢測(cè)條件生成警報(bào)(如高溫警報(bào))??梢曰诤?jiǎn)單的值,也可以基于較復(fù)雜的條件(如增長(zhǎng)率、數(shù)量的持續(xù)變化等)生成警報(bào)。

下面的示例為基于用戶(hù)配置的閾值參數(shù)(如 100.00,38.7 等)和接收警報(bào)通知的郵箱地址生成警報(bào)。當(dāng)此 function 接收到超過(guò)配置閾值的傳感器事件時(shí),將發(fā)送電子郵件。

import javax.mail.*;import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;

public SimpleAlertFunction implements Function<Sensor, Void> {   Void process(Sensor sensor, Context context) throws Exception {       Double threshold = context           .getUserConfigValue(“threshold”).toString();       String alertEmail = context           .getUserConfigValue(“alert-email”).toString();

     if (sensor.getReading() >= threshold) {         Session s = Session.getDefaultInstance();         MimeMessage msg = new MineMessage(s);         msg.setText(“Alert for Sensor:” + sensor.getId());         Transport.send(msg);      }      return null;  }}

下面是一個(gè)有狀態(tài) function 示例,該 function 根據(jù)特定傳感器讀數(shù)的增長(zhǎng)率生成警報(bào)。在決定是否生成警報(bào)時(shí),需要訪問(wèn)以前的傳感器讀數(shù)。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;

public ComplexAlertFunction implements Function<Sensor, Void> {

 Void process(Sensor sensor, Context context) throws Exception {      Double threshold = context           .getUserConfigValue(“threshold”).toString();      String alertTopic = context           .getUserConfigValue(“alert-topic”).toString();

     // Get previous & current metric values      Float previous = context.getState(sensor.getId() + “-metric”);      Long previous_time = context.getState(sensor.getId() + “-metric-time”);      Float current = sensor.getMetric();      Long current_time = sensor.getMetricTime();

     // Calculate Rate of change & compare to threshold.      Double rateOfChange = (current-previous) /                           (current_time-previous_time);      if (abs(rateOfChange) >= threshold) {         // Publish the sensor ID to the alert topic for handling         context.publish(alertTopic, sensor.getId());      }

     // Update metric values      context.putState(sensor.getId() + “-metric”, current);      context.putState(sensor.getId() + “-metric-time”, current_time);  }}

通過(guò) Apache Pulsar Functions 狀態(tài)管理特性?xún)H保留先前的度量讀數(shù)和時(shí)間,并將傳感器 ID 添加到這些值中(因?yàn)閷?huì)處理來(lái)自多個(gè)傳感器的度量,所以需要傳感器 ID)。為了簡(jiǎn)單起見(jiàn),假設(shè)事件以正確的順序到達(dá),即始終是最新讀數(shù),沒(méi)有亂序讀數(shù)。

另外,這一次我們將傳感器 ID 轉(zhuǎn)發(fā)到一個(gè)專(zhuān)門(mén)的警報(bào) topic 以進(jìn)行進(jìn)一步處理,而不是僅發(fā)送電子郵件。通過(guò)這種方式,我們可以對(duì)事件進(jìn)行額外的富集處理(通過(guò) Pulsar Functions)。例如,查找獲取傳感器的地理位置,然后通知相關(guān)人員。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么



模式 5:簡(jiǎn)單計(jì)數(shù)和窗口計(jì)數(shù)


簡(jiǎn)單計(jì)數(shù)和窗口計(jì)數(shù)模式使用了聚合函數(shù),聚合函數(shù)將事件的集合作為輸入,并通過(guò)對(duì)輸入事件應(yīng)用一個(gè) function 生成一個(gè)所需的輸出事件。聚合函數(shù)包括:求和、平均值、最大值、最小值、百分位數(shù)等。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么

以下為使用 Pulsar Functions 實(shí)現(xiàn)“字?jǐn)?shù)統(tǒng)計(jì)”的示例,計(jì)算每個(gè)單詞在給定 topic 中出現(xiàn)次數(shù)的總和。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;

public WordCountFunction implements Function<String, Void> {

  Void process(String s, Context context) throws Exception {      Arrays.asLists(s.split(“\\.”).forEach(word -> context.incrCounter(word, 1));      return null;   }}

考慮到流數(shù)據(jù) source 無(wú)休止的特性,無(wú)限期聚合用處不大,因?yàn)橥ǔJ窃跀?shù)據(jù)窗口上進(jìn)行這些計(jì)算(如前一小時(shí)內(nèi)的故障次數(shù))。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么

數(shù)據(jù)窗口代表事件流的有限子集,如上圖所示。但是,應(yīng)該如何定義數(shù)據(jù)窗口的邊界?有兩個(gè)用于定義窗口的常用屬性:

  • 觸發(fā)策略:控制執(zhí)行或觸發(fā) function 代碼的時(shí)間。Apache Pulsar Function 框架通過(guò)這些規(guī)則來(lái)通知代碼處理窗口中收集的全部數(shù)據(jù)。

  • 清除策略:控制保留在窗口中的數(shù)據(jù)量。這些規(guī)則用于決定是否從窗口中清除數(shù)據(jù)元素。

這兩個(gè)策略都是由時(shí)間或窗口中的數(shù)據(jù)量驅(qū)動(dòng)的。二者之間的區(qū)別是什么?又是怎樣協(xié)同工作的?在多種窗口技術(shù)中,最常用的是滾動(dòng)窗口和滑動(dòng)窗口。

|| 滾動(dòng)窗口

窗口已滿(mǎn)是滾動(dòng)窗口清除策略的唯一條件,因此,只需要指定想要使用觸發(fā)策略(基于計(jì)數(shù)或基于時(shí)間)即可?;谟?jì)數(shù)的滾動(dòng)窗口是怎樣工作的?

在下圖的第一個(gè)示例中,觸發(fā)策略設(shè)置為 2,也就是說(shuō),在窗口中有兩個(gè)項(xiàng)目時(shí),觸發(fā)器將會(huì)觸發(fā),開(kāi)始執(zhí)行 Pulsar Function 代碼。這一系列行為與時(shí)間無(wú)關(guān),窗口計(jì)數(shù)達(dá)到 2 用了 5 秒還是 5 個(gè)小時(shí)并不重要,重要的是窗口計(jì)數(shù)達(dá)到 2。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么

將上述基于計(jì)數(shù)的滾動(dòng)窗口與基于時(shí)間的滾動(dòng)窗口(時(shí)間設(shè)置為 10 秒)進(jìn)行對(duì)比。經(jīng)過(guò) 10 秒的間隔后,無(wú)論窗口中有多少事件,function 代碼都會(huì)被觸發(fā)。在下圖中,第一個(gè)窗口中有 7 個(gè)事件,而第二個(gè)窗口中只有 3 個(gè)事件。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么

|| 滑動(dòng)窗口

滑動(dòng)窗口計(jì)數(shù)定義了窗口的長(zhǎng)度,窗口長(zhǎng)度設(shè)置了清除策略以限制保留待處理的數(shù)據(jù)量;滑動(dòng)間隔定義了觸發(fā)策略。滾動(dòng)窗口策略和滑動(dòng)窗口策略都可以根據(jù)時(shí)間(時(shí)間段)或長(zhǎng)度(數(shù)據(jù)元素的數(shù)量)來(lái)定義。

在下圖中,窗口長(zhǎng)度為 2 秒,也就是說(shuō),2 秒以前的數(shù)據(jù)會(huì)被清除,并且不會(huì)用于計(jì)算?;瑒?dòng)間隔為 1 秒,即每 1 秒鐘執(zhí)行一次 Pulsar function 代碼。這樣,可以在整個(gè)窗口長(zhǎng)度內(nèi)處理數(shù)據(jù)。

基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么

前面的示例都是基于時(shí)間來(lái)定義清除策略和觸發(fā)策略,也可以根據(jù)長(zhǎng)度來(lái)定義清除策略或觸發(fā)策略,或者同時(shí)定義這兩種策略。

在 Pulsar Functions 中實(shí)現(xiàn)這兩種類(lèi)型的窗口 function 都很容易,只需要指定一個(gè) java.util.Collection 作為輸入類(lèi)型,如下所示,并在創(chuàng)建 function 時(shí)在 -userConfig 標(biāo)志中指定適當(dāng)?shù)拇翱谂渲脤傩浴?/p>

用于實(shí)現(xiàn)前面提到的時(shí)間窗口四種情形的配置參數(shù)如下:

  • “–windowLengthCount”:每個(gè)窗口的消息數(shù)量

  • “–windowLengthDurationMs”:窗口時(shí)間(以毫秒為單位)

  • “–slidingIntervalCount”:窗口滑動(dòng)后的消息數(shù)量

  • “–slidingIntervalDurationMs”:窗口滑動(dòng)后的時(shí)間

正確的組合方式如下表:

時(shí)間,滑動(dòng)窗口-windowLengthDurationMs = XXXX
-slidingIntervalDurationMs = XXXX
時(shí)間,Batch Window(即滾動(dòng)窗口)-windowLengthDurationMs = XXXX
長(zhǎng)度,滑動(dòng)窗口-windowLengthCount = XXXX
-slidingIntervalCount = XXXX
長(zhǎng)度,Batch Window(即滾動(dòng)窗口)-windowLengthCount = XXXX

以上就是基于Pulsar Functions的事件處理設(shè)計(jì)模式是什么,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見(jiàn)到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

當(dāng)前題目:基于PulsarFunctions的事件處理設(shè)計(jì)模式是什么
URL鏈接:http://aaarwkj.com/article16/gjjcdg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、搜索引擎優(yōu)化網(wǎng)站設(shè)計(jì)、全網(wǎng)營(yíng)銷(xiāo)推廣、云服務(wù)器、自適應(yīng)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

綿陽(yáng)服務(wù)器托管
久久久亚洲福利精品午夜| 青青草免费在线播放视频网站| 亚洲欧美日韩在线观看a三区| 中文字幕在线精品乱码麻豆| 日本黄色美女日本黄色| 日韩av在线观看大全| 蜜臀一区二区三区精品免费| 欧美黄色成人免费网站| 羞羞av一区二区三区| 亚洲女同成人在线观看| 婷婷色综合一区二区三区| 亚洲欧美综合另类久久| 婷婷色精品一区二区激情| 久久最新视频中文字幕| 亚洲国产男同日韩小鲜肉| 亚洲精品不卡在线观看| 亚洲一二三无人区是什么| 亚洲天堂男人的天堂狠狠操| 亚洲午夜精品美女写真| 欧美亚洲国产另类第一页| 五十路六十路美熟人妻| 99热视频这里只有精品| 亚洲国产精品天堂av在线播放| 欧美日韩国产激情高清| 免费激情在线视频网址| 日韩精品a区二区在线电影| 精品国产第一区二区三区| 欧美日韩男女性生活视频| 久久婷婷国产综合精品青草| 国产精品午夜视频免费观看| 日本日本熟妇在线视频| 国产精品久久久久精品爆| 欧美中文字幕在线精品| 国产传媒在线免费播放视频| 国产av麻豆全部免费| 中文字幕二区三区av| 麻豆视传媒官网免费观看| 国产自拍成人精品视频| 日韩欧美黄片一区二区三区| 欧美日韩国产精品高清| 日本成熟妇高潮视频在线观看不卡|