欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

Storm的TransactionalTopology怎么配置

這篇文章主要講解了“Storm的Transactional Topology怎么配置”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Storm的Transactional Topology怎么配置”吧!

創(chuàng)新互聯(lián)長(zhǎng)期為數(shù)千家客戶(hù)提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開(kāi)放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為德州企業(yè)提供專(zhuān)業(yè)的成都網(wǎng)站制作、成都網(wǎng)站建設(shè),德州網(wǎng)站改版等技術(shù)服務(wù)。擁有十多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開(kāi)發(fā)。

1、什么是Transactional Topology?

    ○ 是一個(gè)每個(gè)tuple僅被處理一次的框架

    ○ 由Storm0.7引入,于Storm0.9被棄用,被triden取而代之

    ○ 底層依靠spout\bolt\topology\stream抽象的一個(gè)特性

2、Transactional Topology設(shè)計(jì)思路

    ○ 一次只處理一次tuple

        基于Storm處理tuple失敗時(shí)會(huì)重發(fā)(replay),如何確保replay的記錄不被重復(fù)記錄,換句話(huà)說(shuō)就是如何保證tuple僅被處理一次,這就依賴(lài)于一個(gè)稱(chēng)作強(qiáng)順序性的思想。

        強(qiáng)順序性:每個(gè)tuple與一個(gè)transaction id相關(guān)聯(lián),transaction id實(shí)際就是一個(gè)數(shù)字,每一個(gè)tuple都有一個(gè)按照順序的transaction id(例如:tuple1的transaction id 為 1,tuple2的transaction id 為 2,...以此類(lèi)推),只有當(dāng)前的tuple處理并存儲(chǔ)完畢,下一個(gè)tuple(處于等待狀態(tài))才能進(jìn)行存儲(chǔ),tuple被存儲(chǔ)時(shí)連同transaction id一并存儲(chǔ),此時(shí)考慮兩種情況:

                        tuple處理失敗時(shí):重新發(fā)送一個(gè)和原來(lái)一模一樣的transaction id

                        tuple處理成功時(shí):發(fā)送的transaction id會(huì)和存儲(chǔ)的transaction id對(duì)比,如果不存在transaction id,表示第一次記錄,直接存儲(chǔ);如果發(fā)現(xiàn)存在,則忽略該tuple。

        這一思想是由Kafka開(kāi)發(fā)者提出來(lái)的。

    ○ 一次處理一批tuple

        基于上面的一個(gè)優(yōu)化,將一批tuple直接打包成一個(gè)batch,然后分配一個(gè)transaction id ,讓batch與batch之間保證強(qiáng)順序性,且batch內(nèi)部的tuples可以并行。

    ○ Storm是如何采用的?

        兩個(gè)步驟:

            1、并行計(jì)算batch中的tuple數(shù)量

            2、batch強(qiáng)順序性存儲(chǔ)

            在batch強(qiáng)順序性存儲(chǔ)的同時(shí)讓其他等待存儲(chǔ)的batch內(nèi)部進(jìn)行并行運(yùn)算,不必等到下一個(gè)batch存儲(chǔ)時(shí)才進(jìn)行內(nèi)部運(yùn)算。

        在Storm上面的兩個(gè)步驟表現(xiàn)為processing階段commit階段。

3、一些設(shè)計(jì)細(xì)節(jié)

使用Transactional Topology時(shí),storm提供如下操作:

    ○ 管理狀態(tài)

        將需要處理的狀態(tài)如:transaction id 、batch meta等狀態(tài)信息放在zookeeper

    ○ 協(xié)調(diào)事務(wù)

        指定某個(gè)時(shí)間段執(zhí)行processing操作和commit操作

    ○ 錯(cuò)誤檢測(cè)

        storm使用acking框架自動(dòng)檢測(cè)batch被成功或失敗處理,然后相應(yīng)的重發(fā)(replay)

    ○ 內(nèi)置批處理API

        通過(guò)對(duì)普通的bolt進(jìn)行包裝,提供一套對(duì)batch處理的API、協(xié)調(diào)工作(即某個(gè)時(shí)刻處理某個(gè)processing或者commit),并且storm會(huì)自動(dòng)清除中間結(jié)果

Transactional Topology是可以完全重發(fā)一個(gè)特定batch的消息隊(duì)列系統(tǒng),在 Kakfa中正是有這樣的需求,為此Storm在storm-contrib里面的Storm-Kafka中為Kafka實(shí)現(xiàn)了一個(gè)事務(wù)性的spout。

4、來(lái)自Storm-Starter.jar的例子

    計(jì)算來(lái)自輸入流中tuple的個(gè)數(shù)

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
        .shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
        .globalGrouping("partial-count");

    ○ 通過(guò)TransactionalTopologyBuilder類(lèi)構(gòu)建Transactional

        參數(shù):

        Transaction ID:transactional topology的ID,在zookeeper中用于保存進(jìn)度狀態(tài),重啟topology時(shí)可以直接從執(zhí)行的進(jìn)度開(kāi)始執(zhí)行而不用重頭到尾又執(zhí)行一遍

        Spout ID:位于整個(gè)Topology的Spout的ID

        Spout Object:Transactional中的Spout對(duì)象

        Spout:Trasactional中的Spout的并行數(shù)

    ○ MemoryTransactionalSpout用于從一個(gè)內(nèi)存變量中讀取數(shù)據(jù)

        DATA:數(shù)據(jù)

        tuple fields:字段

        tupleNum:在batch中最大的tuple數(shù)

    ○ Bolts

        第一個(gè)Bolt采用隨機(jī)分組的方式隨機(jī)分發(fā)到各個(gè)task

public static class BatchCount extends BaseBatchBolt {
    Object _id;
    BatchOutputCollector _collector;
    int _count = 0;
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }
    @Override
    public void execute(Tuple tuple) {
        _count++;
    }
    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "count"));
    }
}

        BatchBolt對(duì)象運(yùn)行在BatchBoltExecutor中,BatchBoltExecutor負(fù)責(zé)BatchBolt對(duì)象的創(chuàng)建和清理 

        BatchBolt的ID在context對(duì)象中,該ID是一個(gè)TransactionAttempt對(duì)象.

        BatchBolt在DRPC中也可以使用,只是txid類(lèi)型不一樣,如果在Transactional Topology中使用BatchBolt,可以繼承BaseTransactionalBolt.

        在Tranasctional Topology中所有的Tuple都必須以TransactionAttempt作為第一個(gè)field,然后storm才能根據(jù)該field判斷Tuple所屬的BatchBolt,所以在發(fā)射Tuple必須滿(mǎn)足此條件。

        TransactionAttempt對(duì)象中有兩個(gè)屬性:

            transaction id:強(qiáng)順序性,無(wú)論重發(fā)多少次都是一樣的數(shù)字

            attempt id:對(duì)每一個(gè)Batch標(biāo)識(shí)的ID,每次重發(fā)都其值不一致,通過(guò)該ID可以區(qū)分每次重發(fā)的Tuple的不同版本

第二個(gè)Bolt使用GlobalGrouping匯總batch中的tuple數(shù)

 public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
    TransactionAttempt _attempt;
    BatchOutputCollector _collector;
    int _sum = 0;
 
    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
        _collector = collector;
        _attempt = attempt;
    }
 
    @Override
    public void execute(Tuple tuple) {
        _sum+=tuple.getInteger(1);
    }
 
    @Override
    public void finishBatch() {
        Value val = DATABASE.get(GLOBAL_COUNT_KEY);
        Value newval;
        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
            newval = new Value();
            newval.txid = _attempt.getTransactionId();
            if(val==null) {
                newval.count = _sum;
            } else {
                newval.count = _sum + val.count;
            }
            DATABASE.put(GLOBAL_COUNT_KEY, newval);
        } else {
            newval = val;
        }
        _collector.emit(new Values(_attempt, newval.count));
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "sum"));
    }
}

    ICommitter接口:實(shí)現(xiàn)該接口的Bolt會(huì)在commit階段調(diào)用finishBatch方法,該方法的調(diào)用會(huì)按照強(qiáng)順序性,此外還可以使用TransactionalTopologyBuilder的setCommiterBolt來(lái)添加Bolt實(shí)現(xiàn)和該接口一樣的功能。

    executor方法:在processing階段和commit階段都可以執(zhí)行。

    關(guān)于更多的transactional topology例子可以看看storm-starter中的TransactionalWords類(lèi),該例子會(huì)在一個(gè)事務(wù)中更新多個(gè)數(shù)據(jù)庫(kù)

5、Transaction Topology API

    Bolt類(lèi)

    BaiscBolt:該Bolt不跟batch中的tuples交互,僅基于單個(gè)傳來(lái)的tuple和產(chǎn)生新的tuple

    BatchBolt:該Bolt處理batch中的tuples,對(duì)于每一個(gè)tuple調(diào)用executor方法,整個(gè)batch完成時(shí)調(diào)用finishBatch方法

    被Committer標(biāo)記的Bolt:在commit階段才調(diào)用finishBatch方法,commit具有強(qiáng)順序性,標(biāo)記Bolt為commit階段執(zhí)行finishBatch的方法有兩種:1、實(shí)現(xiàn)ICommiter接口。2、TransactionalTopologyBuilder的setCommiterBolt來(lái)添加Bolt。

    Processing階段和Commit階段

    Storm的Transactional Topology怎么配置

    紅色輪廓的Bolt被標(biāo)記過(guò)為commit

    Spout向Bolt A發(fā)送整個(gè)Batch

    Bolt A處理完整個(gè)Batch之后調(diào)用finishBatch方法分別向Bolt B 和 Bolt C發(fā)送Batch

    Bolt B接收到Bolt A傳遞過(guò)來(lái)的tuple進(jìn)行處理(此時(shí)還尚未處理完畢)不會(huì)調(diào)用finishBatch方法

    Bolt C接口Bolt A傳遞的tuple,盡管處理完Bolt A傳遞來(lái)的tuple,但是由于Bolt B還尚未commit,所以Bolt C處于等待Bolt B commit的狀態(tài),不會(huì)調(diào)用finishBatch方法

    Bolt D接收來(lái)自Bolt C調(diào)用executor方法時(shí)發(fā)送的所有tuple

    此時(shí)一旦Bolt B進(jìn)行commit進(jìn)行finishBatch操作,那么Bolt C就會(huì)確認(rèn)接收到所有Bolt B的tuple,Bolt C也調(diào)用finishBatch方法,最終Bolt D也接收到所有來(lái)自Bolt C的batch。

    在這里盡管Bolt D是一個(gè)committer,它在接收到整個(gè)batch的tuple之后不需要等待第二個(gè)commit信號(hào)。因?yàn)樗窃赾ommit階段接收到的整個(gè)batch,它會(huì)調(diào)用finishBatch來(lái)完成整個(gè)事務(wù)。

    Acking

注意,當(dāng)使用transactional topology的時(shí)候你不需要顯式地去做任何的acking或者anchoring,storm在背后都做掉了。(storm對(duì)transactional topolgies里面的acking機(jī)制進(jìn)行了高度的優(yōu)化)

    Failing a transaction

在使用普通bolt的時(shí)候, 你可以通過(guò)調(diào)用OutputCollector的fail方法來(lái)fail這個(gè)tuple所在的tuple樹(shù)。Transactional Topology對(duì)用戶(hù)隱藏了acking框架, 它提供一個(gè)不同的機(jī)制來(lái)fail一個(gè)batch(從而使得這個(gè)batch被replay):只要拋出一個(gè)FailedException就可以了。跟普通的異常不一樣, 這個(gè)異常只會(huì)導(dǎo)致當(dāng)前的batch被replay, 而不會(huì)使整個(gè)進(jìn)程崩潰掉。

    Transactional spout

TransactionalSpout接口跟普通的Spout接口完全不一樣。一個(gè)TransactionalSpout的實(shí)現(xiàn)會(huì)發(fā)送一批一批(batch)的tuple, 而且必須保證同一批次tuples的transaction id始終一樣。

在transactional topology運(yùn)行的時(shí)候, transactional spout看起來(lái)是這樣的一個(gè)結(jié)構(gòu):

Storm的Transactional Topology怎么配置

coordinator是一個(gè)普通的storm的spout——它一直為事務(wù)的batch發(fā)射tuple。

Emitter則像一個(gè)普通的storm bolt,它負(fù)責(zé)為每個(gè)batch實(shí)際發(fā)射tuple,emitter以all grouping的方式訂閱coordinator的”batch emit”流。
關(guān)于如何實(shí)現(xiàn)一個(gè)TransactionalSpout的細(xì)節(jié)可以參見(jiàn)Javadoc

    Partitioned Transactional Spout

一種常見(jiàn)的TransactionalSpout是那種從多個(gè)queue broker讀取數(shù)據(jù)然后再發(fā)射的tuple。比如TransactionalKafkaSpout就是這樣工作的。IPartitionedTransactionalSpout把這些管理每個(gè)分區(qū)的狀態(tài)以保證可以replay的冪等性的工作都自動(dòng)化掉了。更多可以參考Javadoc。

    配置

Transactional Topologies有兩個(gè)重要的配置:

Zookeeper:默認(rèn)情況下,transactional topology會(huì)把狀態(tài)信息保存在一個(gè)zookeeper里面(協(xié)調(diào)集群的那個(gè))。你可以通過(guò)這兩個(gè)配置來(lái)指定其它的zookeeper:”transactional.zookeeper.servers” 和 “transactional.zookeeper.port“。

同時(shí)活躍的batch數(shù)量:你必須設(shè)置同時(shí)處理的batch數(shù)量,你可以通過(guò)”topology.max.spout.pending” 來(lái)指定, 如果你不指定,默認(rèn)是1。

6、實(shí)現(xiàn)

Transactional Topologies的實(shí)現(xiàn)是非常優(yōu)雅的。管理提交協(xié)議,檢測(cè)失敗并且串行提交看起來(lái)很復(fù)雜,但是使用storm的原語(yǔ)來(lái)進(jìn)行抽象是非常簡(jiǎn)單的。

1、transactional spout是一個(gè)子topology, 它由一個(gè)coordinator spout和一個(gè)emitter bolt組成。

2、coordinator是一個(gè)普通的spout,并行度為1;emitter是一個(gè)bolt,并行度為P,使用all分組方式連接到coordinator的“batch”流上。

3、coordinator使用一個(gè)acking框架決定什么時(shí)候一個(gè)batch被成功執(zhí)行(process)完成,然后去決定一個(gè)batch什么時(shí)候被成功提交(commit)。

感謝各位的閱讀,以上就是“Storm的Transactional Topology怎么配置”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Storm的Transactional Topology怎么配置這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

當(dāng)前題目:Storm的TransactionalTopology怎么配置
標(biāo)題URL:http://aaarwkj.com/article32/igsdsc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信小程序、網(wǎng)站收錄、App開(kāi)發(fā)全網(wǎng)營(yíng)銷(xiāo)推廣、服務(wù)器托管外貿(mào)建站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都app開(kāi)發(fā)公司
无毛亚洲视频在线观看| 久久精品国产91麻豆| 色在线观看综合亚洲欧洲| 国产精品成人一区二区三| 亚洲国产精品欧美激情| 免费97久久人妻一区精品| 人妻艳情一区二区三区| 国产一区二区不卡自拍| 青青草青青草在线观看视频| 国产一区二区视频在线| 久久九九精品日本人妻视频| 色综合久久天天射天天干| 日韩精品成人亚洲天堂| 中文字幕四虎在线观看| 99国产精品欧美一区二区| 中文字幕人妻丝袜乱一区二区| 亚洲一区二区三区熟女少妇| 97超碰国产在线观看| 国产精品av一区二区在线| 自由成熟性生活免费视频| 91精品人妻二区三区| 欧美日韩久久久久久精品| 久久久精品免费中文视频| 免费av男人天堂亚洲天堂| 国产日韩欧美高清免费视频| 日本岛国免费一区二区| 国产中文字幕婷婷丁香| 91麻豆精品一二三区在线| 黑丝美女国产精品久久久| 日本高清一区二区不卡视频| 日本国产一区二区三区在线观看 | 国精品91人妻一区二区| 精品亚洲国产一区二区三区| 国产精品白丝一区二区三区| 18禁视频免费无遮挡| 人妻有码av中文字幕久久| 亚洲品质一区二区三区| 97资源在线公开视频| 精品国产一区二区三级四区| 亚欧乱色熟女一区二区三区| 中文字幕久久熟女蜜桃|