欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

raft的實際運用

這篇文章給大家分享的是raft的實際運用,相信大部分人都還沒學(xué)會這個技能,為了讓大家學(xué)會,給大家總結(jié)了以下內(nèi)容,話不多說,一起往下看吧。

成都創(chuàng)新互聯(lián)公司服務(wù)項目包括古藺網(wǎng)站建設(shè)、古藺網(wǎng)站制作、古藺網(wǎng)頁制作以及古藺網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,古藺網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到古藺省份的部分城市,未來相信會繼續(xù)擴大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

1、raft.go 的raft結(jié)構(gòu)體 補充字段。 字段應(yīng)該盡量與raft論文的Figure2接近。

type Raft struct {
    mu        sync.Mutex          // Lock to protect shared access to this peer's state
    peers     []*labrpc.ClientEnd // RPC end points of all peers
    persister *Persister          // Object to hold this peer's persisted state
    me        int                 // this peer's index into peers[]
    dead      int32               // set by Kill()

    // Your data here (2A, 2B, 2C).
    // Look at the paper's Figure 2 for a description of what
    // state a Raft server must maintain.
    state             int           // follower, candidate or leader
    resetTimer        chan struct{} // for reset election timer
    electionTimer     *time.Timer   // election timer
    electionTimeout   time.Duration // 400~800ms
    heartbeatInterval time.Duration // 100ms

    CurrentTerm int        // Persisted before responding to RPCs
    VotedFor    int        // Persisted before responding to RPCs
    Logs        []LogEntry // Persisted before responding to RPCs
    commitCond  *sync.Cond // for commitIndex update
    //newEntryCond []*sync.Cond // for new log entry
    commitIndex int           // Volatile state on all servers
    lastApplied int           // Volatile state on all servers
    nextIndex   []int         // Leader only, reinitialized after election
    matchIndex  []int         // Leader only, reinitialized after election
    applyCh     chan ApplyMsg // outgoing channel to service
    shutdownCh  chan struct{} // shutdown channel, shut raft instance gracefully
}

獲取當(dāng)前raft節(jié)點的term與狀態(tài)

func (rf *Raft) GetState() (int, bool) {

    var term int
    var isleader bool
    // Your code here (2A).
    rf.mu.Lock()
    defer rf.mu.Unlock()

    term = rf.CurrentTerm
    isleader = rf.state == Leader
    return term, isleader
}

2、填充RequestVoteArgs和RequestVoteReply結(jié)構(gòu)。

type RequestVoteArgs struct {
    // Your data here (2A, 2B).
    Term         int // candidate's term
    CandidateID  int // candidate requesting vote
    LastLogIndex int // index of candidate's last log entry
    LastLogTerm  int // term of candidate's last log entry
}

type RequestVoteReply struct {
    // Your data here (2A).
    CurrentTerm int  // currentTerm, for candidate to update itself
    VoteGranted bool // true means candidate received vote
}

實現(xiàn)RPC方法RequestVote

  • 1、獲取當(dāng)前節(jié)點的log個數(shù),以及最后一個log的term 確定當(dāng)前節(jié)點的term。

  • 2、如果調(diào)用節(jié)點的term小于當(dāng)前節(jié)點,返回當(dāng)前term,并且不為其投票。

  • 3、如果調(diào)用節(jié)點的term大于當(dāng)前節(jié)點,修改當(dāng)前節(jié)點的term,當(dāng)前節(jié)點轉(zhuǎn)為follower.

  • 4、如果調(diào)用節(jié)點的term大于當(dāng)前節(jié)點,或者等于當(dāng)前節(jié)點term并且調(diào)用節(jié)點的log個數(shù)大于等于當(dāng)前節(jié)點的log,則為調(diào)用節(jié)點投票。

  • 5、投票后重置當(dāng)前節(jié)點的選舉超時時間。

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    // Your code here (2A, 2B).
    select {
    case <-rf.shutdownCh:
        DPrintf("[%d-%s]: peer %d is shutting down, reject RV rpc request.\n", rf.me, rf, rf.me)
        return
    default:
    }

    rf.mu.Lock()
    defer rf.mu.Unlock()

    lastLogIdx, lastLogTerm := rf.lastLogIndexAndTerm()

    DPrintf("[%d-%s]: rpc RV, from peer: %d, arg term: %d, my term: %d (last log idx: %d->%d, term: %d->%d)\n", rf.me, rf, args.CandidateID, args.Term, rf.CurrentTerm, args.LastLogIndex,
        lastLogIdx, args.LastLogTerm, lastLogTerm)

    if args.Term < rf.CurrentTerm {
        reply.CurrentTerm = rf.CurrentTerm
        reply.VoteGranted = false
    } else {
        if args.Term > rf.CurrentTerm {
            // convert to follower
            rf.CurrentTerm = args.Term
            rf.state = Follower
            rf.VotedFor = -1
        }

        // if is null (follower) or itself is a candidate (or stale leader) with same term
        if rf.VotedFor == -1 { //|| (rf.VotedFor == rf.me && !sameTerm) { //|| rf.votedFor == args.CandidateID {
            // check whether candidate's log is at-least-as update
            if (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIdx) ||
                args.LastLogTerm > lastLogTerm {

                rf.resetTimer <- struct{}{}

                rf.state = Follower
                rf.VotedFor = args.CandidateID
                reply.VoteGranted = true

                DPrintf("[%d-%s]: peer %d vote to peer %d (last log idx: %d->%d, term: %d->%d)\n",
                    rf.me, rf, rf.me, args.CandidateID, args.LastLogIndex, lastLogIdx, args.LastLogTerm, lastLogTerm)
            }
        }
    }
}

修改make

除了一些基本的初始化過程,新開了一個goroutine。

func Make(peers []*labrpc.ClientEnd, me int,
    persister *Persister, applyCh chan ApplyMsg) *Raft {
    rf := &Raft{}
    rf.peers = peers
    rf.persister = persister
    rf.me = me
    rf.applyCh = applyCh
    // Your initialization code here (2A, 2B, 2C).
    rf.state = Follower
    rf.VotedFor = -1
    rf.Logs = make([]LogEntry, 1) // first index is 1
    rf.Logs[0] = LogEntry{        // placeholder
        Term:    0,
        Command: nil,
    }
    rf.nextIndex = make([]int, len(peers))
    rf.matchIndex = make([]int, len(peers))

    rf.electionTimeout = time.Millisecond * time.Duration(400+rand.Intn(100)*4)
    rf.electionTimer = time.NewTimer(rf.electionTimeout)
    rf.resetTimer = make(chan struct{})
    rf.shutdownCh = make(chan struct{})           // shutdown raft gracefully
    rf.commitCond = sync.NewCond(&rf.mu)          // commitCh, a distinct goroutine
    rf.heartbeatInterval = time.Millisecond * 40 // small enough, not too small

    // initialize from state persisted before a crash
    rf.readPersist(persister.ReadRaftState())
    go rf.electionDaemon() // kick off election
    return rf
}

選舉核心electionDaemon

  • 除了shutdown,還有兩個通道,一個是electionTimer,用于選舉超時。

  • 一個是resetTimer,用于重置選舉超時。

  • 注意time.reset是很難正確使用的。

  • 一旦選舉超時,調(diào)用go rf.canvassVotes()

// electionDaemon
func (rf *Raft) electionDaemon() {
    for {
        select {
        case <-rf.shutdownCh:
            DPrintf("[%d-%s]: peer %d is shutting down electionDaemon.\n", rf.me, rf, rf.me)
            return
        case <-rf.resetTimer:
            if !rf.electionTimer.Stop() {
                <-rf.electionTimer.C
            }
            rf.electionTimer.Reset(rf.electionTimeout)
        case <-rf.electionTimer.C:
            rf.mu.Lock()
            DPrintf("[%d-%s]: peer %d election timeout, issue election @ term %d\n", rf.me, rf, rf.me, rf.CurrentTerm)
            rf.mu.Unlock()
            go rf.canvassVotes()
            rf.electionTimer.Reset(rf.electionTimeout)
        }
    }
}

拉票

  • replyHandler是進(jìn)行請求返回后的處理。

  • 當(dāng)前節(jié)點為了成為leader,會調(diào)用每一個節(jié)點的RequestVote方法。

  • 如果返回過來的term大于當(dāng)前term,那么當(dāng)前節(jié)點變?yōu)閒ollower,重置選舉超時時間。

  • 否則,如果收到了超過一半節(jié)點的投票,那么其變?yōu)榱薼eader,并立即給其他節(jié)點發(fā)送心跳檢測。

// canvassVotes issues RequestVote RPC
func (rf *Raft) canvassVotes() {
    var voteArgs RequestVoteArgs
    rf.fillRequestVoteArgs(&voteArgs)
    peers := len(rf.peers)

    var votes = 1
    replyHandler := func(reply *RequestVoteReply) {
        rf.mu.Lock()
        defer rf.mu.Unlock()
        if rf.state == Candidate {
            if reply.CurrentTerm > voteArgs.Term {
                rf.CurrentTerm = reply.CurrentTerm
                rf.turnToFollow()
                //rf.persist()
                rf.resetTimer <- struct{}{} // reset timer
                return
            }
            if reply.VoteGranted {
                if votes == peers/2 {
                    rf.state = Leader
                    rf.resetOnElection()    // reset leader state
                    go rf.heartbeatDaemon() // new leader, start heartbeat daemon
                    DPrintf("[%d-%s]: peer %d become new leader.\n", rf.me, rf, rf.me)
                    return
                }
                votes++
            }
        }
    }
    for i := 0; i < peers; i++ {
        if i != rf.me {
            go func(n int) {
                var reply RequestVoteReply
                if rf.sendRequestVote(n, &voteArgs, &reply) {
                    replyHandler(&reply)
                }
            }(i)
        }
    }
}

心跳檢測

  • 1、leader調(diào)用每一個節(jié)點的AppendEntries方法。

  • 2、如果當(dāng)前節(jié)點大于調(diào)用節(jié)點,那么AppendEntries失敗。否則,修改當(dāng)前的term為最大。

  • 3、如果當(dāng)前節(jié)點是leader,始終將其變?yōu)閒ollower(為了讓leader穩(wěn)定)

  • 4、將當(dāng)前節(jié)點投票給調(diào)用者(對于落后的節(jié)點)。

  • 5、重置當(dāng)前節(jié)點的超時時間。

func (rf *Raft) heartbeatDaemon() {
    for {
        if _, isLeader := rf.GetState(); !isLeader {
            return
        }
        // reset leader's election timer
        rf.resetTimer <- struct{}{}

        select {
        case <-rf.shutdownCh:
            return
        default:
            for i := 0; i < len(rf.peers); i++ {
                if i != rf.me {
                    go rf.consistencyCheck(i) // routine heartbeat
                }
            }
        }
        time.Sleep(rf.heartbeatInterval)
    }
}

func (rf *Raft) consistencyCheck(n int) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    pre := rf.nextIndex[n] - 1
    var args = AppendEntriesArgs{
        Term:         rf.CurrentTerm,
        LeaderID:     rf.me,
        PrevLogIndex: pre,
        PrevLogTerm:  rf.Logs[pre].Term,
        Entries:      nil,
        LeaderCommit: rf.commitIndex,
    }
    go func() {
        DPrintf("[%d-%s]: consistency Check to peer %d.\n", rf.me, rf, n)
        var reply AppendEntriesReply
        if rf.sendAppendEntries(n, &args, &reply) {
            rf.consistencyCheckReplyHandler(n, &reply)
        }
    }()
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    select {
    case <-rf.shutdownCh:
        DPrintf("[%d-%s]: peer %d is shutting down, reject AE rpc request.\n", rf.me, rf, rf.me)
        return
    default:
    }

    DPrintf("[%d-%s]: rpc AE, from peer: %d, term: %d\n", rf.me, rf, args.LeaderID, args.Term)
    rf.mu.Lock()
    defer rf.mu.Unlock()

    if args.Term < rf.CurrentTerm {
        //DPrintf("[%d-%s]: AE failed from leader %d. (heartbeat: leader's term < follower's term (%d < %d))\n",
        //  rf.me, rf, args.LeaderID, args.Term, rf.currentTerm)
        reply.CurrentTerm = rf.CurrentTerm
        reply.Success = false
        return
    }
    if rf.CurrentTerm < args.Term {
        rf.CurrentTerm = args.Term
    }

    // for stale leader
    if rf.state == Leader {
        rf.turnToFollow()
    }
    // for straggler (follower)
    if rf.VotedFor != args.LeaderID {
        rf.VotedFor = args.LeaderID
    }

    // valid AE, reset election timer
    // if the node recieve heartbeat. then it will reset the election timeout
    rf.resetTimer <- struct{}{}

    reply.Success = true
    reply.CurrentTerm = rf.CurrentTerm
    return
}

處理心跳檢測返回

如果心跳檢測失敗了,那么變?yōu)閒ollower,重置選舉超時。

// n: which follower
func (rf *Raft) consistencyCheckReplyHandler(n int, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    if rf.state != Leader {
        return
    }
    if reply.Success {

    } else {
        // found a new leader? turn to follower
        if rf.state == Leader && reply.CurrentTerm > rf.CurrentTerm {
            rf.turnToFollow()
            rf.resetTimer <- struct{}{}
            DPrintf("[%d-%s]: leader %d found new term (heartbeat resp from peer %d), turn to follower.",
                rf.me, rf, rf.me, n)
            return
        }
    }
}

看完上述內(nèi)容,你們掌握raft的運用方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!

標(biāo)題名稱:raft的實際運用
分享地址:http://aaarwkj.com/article2/gjdcic.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、響應(yīng)式網(wǎng)站移動網(wǎng)站建設(shè)、網(wǎng)站設(shè)計公司、網(wǎng)站導(dǎo)航、搜索引擎優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)
一区二区高清免费日本| 久久久久久狠狠亚洲综合| 国产av剧情日韩精品| 色在色在线播放亚洲中文| 青青草原免费在线观看| 国产精品免费视频一区二区三区| 亚洲精品乱码国产妇女毛片| 日韩高清一级黄色大片网站| 日韩亚洲精品99综合观看| 日韩欧美中文字幕区| 人人人妻人人澡人人爽e| 亚洲欧美综合精品久久成人| 国产精品久久久久精品三级中文国| 久久久人妻精品少妇av| 日本高清一区二区不卡视频| 午夜福利福利一区二区| 国产日产亚洲欧美综合另类| 国产区一区二区三在线播放| 嫩草网站国产精品一区二 | 国产免费成人黄视频网站| 国产自偷一区二区三区| 天堂av在线免费观看| 午夜国产精品福利一二| 国产老熟女高潮视频| 国产精品国语对白av处女| 亚洲精品美女久久久久高潮| 欧美日韩国产一区二区的| 亚洲精品一级二级三级| 精品三级黄色国产片| 青青草手机在线视频免费观看| 内射久久一区二区亚洲| 高潮内射一区二区三区| av人妻熟女少妇蒂亚| av天堂午夜在线观看| 国产伦人偷精品免费视频| 久久精品国产亚洲av麻豆花絮| 97久久精品亚洲中文字幕| 伊人亚洲中文一区二区| 中文字幕一区侵犯人妻| 亚洲国内精品一区二区在线| 色婷婷一区二区三区网站|