這篇文章將為大家詳細(xì)講解有關(guān)如何理解分布式事務(wù)框架seata-golang通信模型,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
創(chuàng)新互聯(lián)長期為成百上千家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為本溪企業(yè)提供專業(yè)的網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站,本溪網(wǎng)站改版等技術(shù)服務(wù)。擁有10多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。
Java 的世界里,大家廣泛使用的一個(gè)高性能網(wǎng)絡(luò)通信框架 netty,很多 RPC 框架都是基于 netty 來實(shí)現(xiàn)的。在 golang 的世界里,getty 也是一個(gè)類似 netty 的高性能網(wǎng)絡(luò)通信庫。getty 最初由 dubbogo 項(xiàng)目負(fù)責(zé)人于雨開發(fā),作為底層通信庫在 dubbo-go 中使用。隨著 dubbo-go 捐獻(xiàn)給 apache 基金會(huì),在社區(qū)小伙伴的共同努力下,getty 也最終進(jìn)入到 apache 這個(gè)大家庭,并改名 dubbo-getty 。
getty 框架的整體模型圖如下:
下面結(jié)合相關(guān)代碼,詳述 seata-golang 的 RPC 通信過程。
實(shí)現(xiàn) RPC 通信,首先要建立網(wǎng)絡(luò)連接吧,我們從 client.go 開始看起。
func (c *client) connect() { var ( err error ss Session ) for { // 建立一個(gè) session 連接 ss = c.dial() if ss == nil { // client has been closed break } err = c.newSession(ss) if err == nil { // 收發(fā)報(bào)文 ss.(*session).run() // 此處省略部分代碼 break } // don't distinguish between tcp connection and websocket connection. Because // gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close() ss.Conn().Close() } }
connect()
方法通過 dial()
方法得到了一個(gè) session 連接,進(jìn)入 dial() 方法:
func (c *client) dial() Session { switch c.endPointType { case TCP_CLIENT: return c.dialTCP() case UDP_CLIENT: return c.dialUDP() case WS_CLIENT: return c.dialWS() case WSS_CLIENT: return c.dialWSS() } return nil }
我們關(guān)注的是 TCP 連接,所以繼續(xù)進(jìn)入 c.dialTCP()
方法:
func (c *client) dialTCP() Session { var ( err error conn net.Conn ) for { if c.IsClosed() { return nil } if c.sslEnabled { if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil { d := &net.Dialer{Timeout: connectTimeout} // 建立加密連接 conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig) } } else { // 建立 tcp 連接 conn, err = net.DialTimeout("tcp", c.addr, connectTimeout) } if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) { conn.Close() err = errSelfConnect } if err == nil { // 返回一個(gè) TCPSession return newTCPSession(conn, c) } log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err)) <-wheel.After(connectInterval) } }
至此,我們知道了 getty 如何建立 TCP 連接,并返回 TCPSession。
那它是怎么收發(fā)報(bào)文的呢,我們回到 connection 方法接著往下看,有這樣一行 ss.(*session).run()
,在這行代碼之后代碼都是很簡單的操作,我們猜測(cè)這行代碼運(yùn)行的邏輯里面一定包含收發(fā)報(bào)文的邏輯,接著進(jìn)入 run()
方法:
func (s *session) run() { // 省略部分代碼 go s.handleLoop() go s.handlePackage() }
<br />這里起了兩個(gè) goroutine,handleLoop
和 handlePackage
,看字面意思符合我們的猜想,進(jìn)入 handleLoop()
方法:<br />
func (s *session) handleLoop() { // 省略部分代碼 for { // A select blocks until one of its cases is ready to run. // It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready. select { // 省略部分代碼 case outPkg, ok = <-s.wQ: // 省略部分代碼 iovec = iovec[:0] for idx := 0; idx < maxIovecNum; idx++ { // 通過 s.writer 將 interface{} 類型的 outPkg 編碼成二進(jìn)制的比特 pkgBytes, err = s.writer.Write(s, outPkg) // 省略部分代碼 iovec = append(iovec, pkgBytes) //省略部分代碼 } // 將這些二進(jìn)制比特發(fā)送出去 err = s.WriteBytesArray(iovec[:]...) if err != nil { log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v", s.sessionToken(), len(iovec), perrors.WithStack(err)) s.stop() // break LOOP flag = false } case <-wheel.After(s.period): if flag { if wsFlag { err := wsConn.writePing() if err != nil { log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err)) } } // 定時(shí)執(zhí)行的邏輯,心跳等 s.listener.OnCron(s) } } } }
通過上面的代碼,我們不難發(fā)現(xiàn),handleLoop()
方法處理的是發(fā)送報(bào)文的邏輯,RPC 需要發(fā)送的消息首先由 s.writer
編碼成二進(jìn)制比特,然后通過建立的 TCP 連接發(fā)送出去。這個(gè) s.writer
對(duì)應(yīng)的 Writer 接口是 RPC 框架必須要實(shí)現(xiàn)的一個(gè)接口。
繼續(xù)看 handlePackage()
方法:
func (s *session) handlePackage() { // 省略部分代碼 if _, ok := s.Connection.(*gettyTCPConn); ok { if s.reader == nil { errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader) log.Error(errStr) panic(errStr) } err = s.handleTCPPackage() } else if _, ok := s.Connection.(*gettyWSConn); ok { err = s.handleWSPackage() } else if _, ok := s.Connection.(*gettyUDPConn); ok { err = s.handleUDPPackage() } else { panic(fmt.Sprintf("unknown type session{%#v}", s)) } }
進(jìn)入 handleTCPPackage()
方法:
func (s *session) handleTCPPackage() error { // 省略部分代碼 conn = s.Connection.(*gettyTCPConn) for { // 省略部分代碼 bufLen = 0 for { // for clause for the network timeout condition check // s.conn.SetReadTimeout(time.Now().Add(s.rTimeout)) // 從 TCP 連接中收到報(bào)文 bufLen, err = conn.recv(buf) // 省略部分代碼 break } // 省略部分代碼 // 將收到的報(bào)文二進(jìn)制比特寫入 pkgBuf pktBuf.Write(buf[:bufLen]) for { if pktBuf.Len() <= 0 { break } // 通過 s.reader 將收到的報(bào)文解碼成 RPC 消息 pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes()) // 省略部分代碼 s.UpdateActive() // 將收到的消息放入 TaskQueue 供 RPC 消費(fèi)端消費(fèi) s.addTask(pkg) pktBuf.Next(pkgLen) // continue to handle case 5 } if exit { break } } return perrors.WithStack(err) }
從上面的代碼邏輯我們分析出,RPC 消費(fèi)端需要將從 TCP 連接收到的二進(jìn)制比特報(bào)文解碼成 RPC 能消費(fèi)的消息,這個(gè)工作由 s.reader 實(shí)現(xiàn),所以,我們要構(gòu)建 RPC 通信層也需要實(shí)現(xiàn) s.reader 對(duì)應(yīng)的 Reader 接口。
我們都知道,netty 通過 boss 線程和 worker 線程實(shí)現(xiàn)了底層網(wǎng)絡(luò)邏輯和業(yè)務(wù)邏輯的解耦。那么,getty 是如何實(shí)現(xiàn)的呢?
在 handlePackage()
方法最后,我們看到,收到的消息被放入了 s.addTask(pkg)
這個(gè)方法,接著往下分析:
func (s *session) addTask(pkg interface{}) { f := func() { s.listener.OnMessage(s, pkg) s.incReadPkgNum() } if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil { taskPool.AddTaskAlways(f) return } f() }
pkg
參數(shù)傳遞到了一個(gè)匿名方法,這個(gè)方法最終放入了 taskPool
。這個(gè)方法很關(guān)鍵,在我后來寫 seata-golang 代碼的時(shí)候,就遇到了一個(gè)坑,這個(gè)坑后面分析。
接著我們看一下 taskPool 的定義:
// NewTaskPoolSimple build a simple task pool func NewTaskPoolSimple(size int) GenericTaskPool { if size < 1 { size = runtime.NumCPU() * 100 } return &taskPoolSimple{ work: make(chan task), sem: make(chan struct{}, size), done: make(chan struct{}), } }
構(gòu)建了一個(gè)緩沖大小為 size (默認(rèn)為 runtime.NumCPU() * 100
) 的 channel sem
。再看方法 AddTaskAlways(t task)
:
func (p *taskPoolSimple) AddTaskAlways(t task) { select { case <-p.done: return default: } select { case p.work <- t: return default: } select { case p.work <- t: case p.sem <- struct{}{}: p.wg.Add(1) go p.worker(t) default: goSafely(t) } }
加入的任務(wù),會(huì)先由 len(p.sem) 個(gè) goroutine 去消費(fèi),如果沒有 goroutine 空閑,則會(huì)啟動(dòng)一個(gè)臨時(shí)的 goroutine 去運(yùn)行 t()。相當(dāng)于有 len(p.sem) 個(gè) goroutine 組成了 goroutine pool,pool 中的 goroutine 去處理業(yè)務(wù)邏輯,而不是由處理網(wǎng)絡(luò)報(bào)文的 goroutine 去運(yùn)行業(yè)務(wù)邏輯,從而實(shí)現(xiàn)了解耦。寫 seata-golang 時(shí)遇到的一個(gè)坑,就是忘記設(shè)置 taskPool 造成了處理業(yè)務(wù)邏輯和處理底層網(wǎng)絡(luò)報(bào)文邏輯的 goroutine 是同一個(gè),我在業(yè)務(wù)邏輯中阻塞等待一個(gè)任務(wù)完成時(shí),阻塞了整個(gè) goroutine,使得阻塞期間收不到任何報(bào)文。
下面的代碼見 getty.go:
// Reader is used to unmarshal a complete pkg from buffer type Reader interface { Read(Session, []byte) (interface{}, int, error) } // Writer is used to marshal pkg and write to session type Writer interface { // if @Session is udpGettySession, the second parameter is UDPContext. Write(Session, interface{}) ([]byte, error) } // ReadWriter interface use for handle application packages type ReadWriter interface { Reader Writer }
// EventListener is used to process pkg that received from remote session type EventListener interface { // invoked when session opened // If the return error is not nil, @Session will be closed. OnOpen(Session) error // invoked when session closed. OnClose(Session) // invoked when got error. OnError(Session, error) // invoked periodically, its period can be set by (Session)SetCronPeriod OnCron(Session) // invoked when getty received a package. Pls attention that do not handle long time // logic processing in this func. You'd better set the package's maximum length. // If the message's length is greater than it, u should should return err in // Reader{Read} and getty will close this connection soon. // // If ur logic processing in this func will take a long time, u should start a goroutine // pool(like working thread pool in cpp) to handle the processing asynchronously. Or u // can do the logic processing in other asynchronous way. // !!!In short, ur OnMessage callback func should return asap. // // If this is a udp event listener, the second parameter type is UDPContext. OnMessage(Session, interface{}) }
通過對(duì)整個(gè) getty 代碼的分析,我們只要實(shí)現(xiàn) ReadWriter
來對(duì) RPC 消息編解碼,再實(shí)現(xiàn) EventListener
來處理 RPC 消息的對(duì)應(yīng)的具體邏輯,將 ReadWriter
實(shí)現(xiàn)和 EventLister
實(shí)現(xiàn)注入到 RPC 的 Client 和 Server 端,則可實(shí)現(xiàn) RPC 通信。
下面是 seata 協(xié)議的定義:
在 ReadWriter 接口的實(shí)現(xiàn) RpcPackageHandler
中,調(diào)用 Codec 方法對(duì)消息體按照上面的格式編解碼:
// 消息編碼為二進(jìn)制比特 func MessageEncoder(codecType byte, in interface{}) []byte { switch codecType { case SEATA: return SeataEncoder(in) default: log.Errorf("not support codecType, %s", codecType) return nil } } // 二進(jìn)制比特解碼為消息體 func MessageDecoder(codecType byte, in []byte) (interface{}, int) { switch codecType { case SEATA: return SeataDecoder(in) default: log.Errorf("not support codecType, %s", codecType) return nil, 0 } }
再來看 client 端 EventListener
的實(shí)現(xiàn) RpcRemotingClient
:
func (client *RpcRemoteClient) OnOpen(session getty.Session) error { go func() request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{ ApplicationId: client.conf.ApplicationId, TransactionServiceGroup: client.conf.TransactionServiceGroup, }} // 建立連接后向 Transaction Coordinator 發(fā)起注冊(cè) TransactionManager 的請(qǐng)求 _, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT) if err == nil { // 將與 Transaction Coordinator 建立的連接保存在連接池供后續(xù)使用 clientSessionManager.RegisterGettySession(session) client.GettySessionOnOpenChannel <- session.RemoteAddr() } }() return nil } // OnError ... func (client *RpcRemoteClient) OnError(session getty.Session, err error) { clientSessionManager.ReleaseGettySession(session) } // OnClose ... func (client *RpcRemoteClient) OnClose(session getty.Session) { clientSessionManager.ReleaseGettySession(session) } // OnMessage ... func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) { log.Info("received message:{%v}", pkg) rpcMessage, ok := pkg.(protocal.RpcMessage) if ok { heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage) if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong { log.Debugf("received PONG from %s", session.RemoteAddr()) } } if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST || rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY { log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body) // 處理事務(wù)消息,提交 or 回滾 client.onMessage(rpcMessage, session.RemoteAddr()) } else { resp, loaded := client.futures.Load(rpcMessage.Id) if loaded { response := resp.(*getty2.MessageFuture) response.Response = rpcMessage.Body response.Done <- true client.futures.Delete(rpcMessage.Id) } } } // OnCron ... func (client *RpcRemoteClient) OnCron(session getty.Session) { // 發(fā)送心跳 client.defaultSendRequest(session, protocal.HeartBeatMessagePing) }
clientSessionManager.RegisterGettySession(session)
的邏輯將在下文中分析。
代碼見 DefaultCoordinator
:
func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error { log.Infof("got getty_session:%s", session.Stat()) return nil } func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) { // 釋放 TCP 連接 SessionManager.ReleaseGettySession(session) session.Close() log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err) } func (coordinator *DefaultCoordinator) OnClose(session getty.Session) { log.Info("getty_session{%s} is closing......", session.Stat()) } func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) { log.Debugf("received message:{%v}", pkg) rpcMessage, ok := pkg.(protocal.RpcMessage) if ok { _, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest) if isRegTM { // 將 TransactionManager 信息和 TCP 連接建立映射關(guān)系 coordinator.OnRegTmMessage(rpcMessage, session) return } heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage) if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing { coordinator.OnCheckMessage(rpcMessage, session) return } if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST || rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY { log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body) _, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest) if isRegRM { // 將 ResourceManager 信息和 TCP 連接建立映射關(guān)系 coordinator.OnRegRmMessage(rpcMessage, session) } else { if SessionManager.IsRegistered(session) { defer func() { if err := recover(); err != nil { log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err) } }() // 處理事務(wù)消息,全局事務(wù)注冊(cè)、分支事務(wù)注冊(cè)、分支事務(wù)提交、全局事務(wù)回滾等 coordinator.OnTrxMessage(rpcMessage, session) } else { session.Close() log.Infof("close a unhandled connection! [%v]", session) } } } else { resp, loaded := coordinator.futures.Load(rpcMessage.Id) if loaded { response := resp.(*getty2.MessageFuture) response.Response = rpcMessage.Body response.Done <- true coordinator.futures.Delete(rpcMessage.Id) } } } } func (coordinator *DefaultCoordinator) OnCron(session getty.Session) { }
coordinator.OnRegTmMessage(rpcMessage, session)
注冊(cè) Transaction Manager,coordinator.OnRegRmMessage(rpcMessage, session)
注冊(cè) Resource Manager。具體邏輯分析見下文。
消息進(jìn)入 coordinator.OnTrxMessage(rpcMessage, session)
方法,將按照消息的類型碼路由到具體的邏輯當(dāng)中:
switch msg.GetTypeCode() { case protocal.TypeGlobalBegin: req := msg.(protocal.GlobalBeginRequest) resp := coordinator.doGlobalBegin(req, ctx) return resp case protocal.TypeGlobalStatus: req := msg.(protocal.GlobalStatusRequest) resp := coordinator.doGlobalStatus(req, ctx) return resp case protocal.TypeGlobalReport: req := msg.(protocal.GlobalReportRequest) resp := coordinator.doGlobalReport(req, ctx) return resp case protocal.TypeGlobalCommit: req := msg.(protocal.GlobalCommitRequest) resp := coordinator.doGlobalCommit(req, ctx) return resp case protocal.TypeGlobalRollback: req := msg.(protocal.GlobalRollbackRequest) resp := coordinator.doGlobalRollback(req, ctx) return resp case protocal.TypeBranchRegister: req := msg.(protocal.BranchRegisterRequest) resp := coordinator.doBranchRegister(req, ctx) return resp case protocal.TypeBranchStatusReport: req := msg.(protocal.BranchReportRequest) resp := coordinator.doBranchReport(req, ctx) return resp default: return nil }
Client 端同 Transaction Coordinator 建立連接起連接后,通過 clientSessionManager.RegisterGettySession(session)
將連接保存在 serverSessions = sync.Map{}
這個(gè) map 中。map 的 key 為從 session 中獲取的 RemoteAddress 即 Transaction Coordinator 的地址,value 為 session。這樣,Client 端就可以通過 map 中的一個(gè) session 來向 Transaction Coordinator 注冊(cè) Transaction Manager 和 Resource Manager 了。具體代碼見 getty_client_session_manager.go
。
Transaction Manager 和 Resource Manager 注冊(cè)到 Transaction Coordinator 后,一個(gè)連接既有可能用來發(fā)送 TM 消息也有可能用來發(fā)送 RM 消息。我們通過 RpcContext 來標(biāo)識(shí)一個(gè)連接信息:
type RpcContext struct { Version string TransactionServiceGroup string ClientRole meta.TransactionRole ApplicationId string ClientId string ResourceSets *model.Set Session getty.Session }
當(dāng)收到事務(wù)消息時(shí),我們需要構(gòu)造這樣一個(gè) RpcContext 供后續(xù)事務(wù)處理邏輯使用。所以,我們會(huì)構(gòu)造下列 map 來緩存映射關(guān)系:
var ( // session -> transactionRole // TM will register before RM, if a session is not the TM registered, // it will be the RM registered session_transactionroles = sync.Map{} // session -> applicationId identified_sessions = sync.Map{} // applicationId -> ip -> port -> session client_sessions = sync.Map{} // applicationId -> resourceIds client_resources = sync.Map{} )
這樣,Transaction Manager 和 Resource Manager 分別通過 coordinator.OnRegTmMessage(rpcMessage, session)
和 coordinator.OnRegRmMessage(rpcMessage, session)
注冊(cè)到 Transaction Coordinator 時(shí),會(huì)在上述 client_sessions map 中緩存 applicationId、ip、port 與 session 的關(guān)系,在 client_resources map 中緩存 applicationId 與 resourceIds(一個(gè)應(yīng)用可能存在多個(gè) Resource Manager) 的關(guān)系。在需要時(shí),我們就可以通過上述映射關(guān)系構(gòu)造一個(gè) RpcContext。這部分的實(shí)現(xiàn)和 java 版 seata 有很大的不同,感興趣的可以深入了解一下。具體代碼見 getty_session_manager.go
。
至此,我們就分析完了 seata-golang 整個(gè) RPC 通信模型的機(jī)制。
seata-golang 從今年 4 月份開始開發(fā),到 8 月份基本實(shí)現(xiàn)和 java 版 seata 1.2 協(xié)議的互通,對(duì) MySQL 數(shù)據(jù)庫實(shí)現(xiàn)了 AT 模式(自動(dòng)協(xié)調(diào)分布式事務(wù)的提交回滾),實(shí)現(xiàn)了 TCC 模式,TC 端使用 mysql 存儲(chǔ)數(shù)據(jù),使 TC 變成一個(gè)無狀態(tài)應(yīng)用支持高可用部署。下圖展示了 AT 模式的原理:
關(guān)于如何理解分布式事務(wù)框架seata-golang通信模型就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
分享名稱:如何理解分布式事務(wù)框架seata-golang通信模型
文章轉(zhuǎn)載:http://aaarwkj.com/article40/gdihho.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供小程序開發(fā)、自適應(yīng)網(wǎng)站、網(wǎng)站設(shè)計(jì)公司、搜索引擎優(yōu)化、全網(wǎng)營銷推廣、手機(jī)網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)