創(chuàng)新互聯(lián)www.cdcxhl.cn八線動(dòng)態(tài)BGP香港云服務(wù)器提供商,新人活動(dòng)買(mǎi)多久送多久,劃算不套路!
目前創(chuàng)新互聯(lián)公司已為1000+的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬空間、網(wǎng)站改版維護(hù)、企業(yè)網(wǎng)站設(shè)計(jì)、桐柏網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶(hù)導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶(hù)和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。本篇文章給大家分享的是有關(guān)如何使用Go基于WebSocket構(gòu)建視頻直播彈幕系統(tǒng),小編覺(jué)得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說(shuō),跟著小編一起來(lái)看看吧。
(1)業(yè)務(wù)復(fù)雜度介紹
開(kāi)門(mén)見(jiàn)山,假設(shè)一個(gè)直播間同時(shí)500W人在線,那么1秒鐘1000條彈幕,那么彈幕系統(tǒng)的推送頻率就是: 500W * 1000條/秒=50億條/秒
,想想B站2019跨年晚會(huì)那次彈幕系統(tǒng)得是多么的NB,況且一個(gè)大型網(wǎng)站不可能只有一個(gè)直播間!
使用Go做WebSocket開(kāi)發(fā)無(wú)非就是三種情況:
golang.org/x/net
,但是這個(gè)官方庫(kù)真是出了奇Bug多gorilla/websocket
庫(kù),可以結(jié)合到某些Web開(kāi)發(fā)框架,比如Gin、iris等,只要使用的框架式基于 golang.org/net
的,那么這個(gè)庫(kù)就可以與這個(gè)框架結(jié)合根據(jù)估算結(jié)果,彈幕推送量很大的時(shí)候,Linux內(nèi)核將會(huì)出現(xiàn)瓶頸,因?yàn)長(zhǎng)inux內(nèi)核發(fā)送TCP包的時(shí)候極限包發(fā)送頻率是100W。因此可以將同一秒內(nèi)的彈幕消息合并為1條推送,減少網(wǎng)絡(luò)小數(shù)據(jù)包的發(fā)送,從而降低推送頻率。
彈幕系統(tǒng)需要維護(hù)在線的用戶(hù)長(zhǎng)連接來(lái)實(shí)現(xiàn)定向推送到在線的用戶(hù),通常是使用Hash字典結(jié)構(gòu),通常推送消息就是遍歷在線用的Hash字典。在彈幕推送期間用戶(hù)在不斷的上下線,為了維護(hù)上線用戶(hù),那么就得不斷的修改Hash字典,不斷地進(jìn)行鎖操作,用戶(hù)量過(guò)大導(dǎo)致鎖瓶頸。因此可以將整個(gè)Hash結(jié)構(gòu)拆分為多個(gè)Hash結(jié)構(gòu),分別對(duì)多個(gè)Hash結(jié)構(gòu)加不同的鎖,并且使用讀寫(xiě)鎖替代互斥鎖。
通常服務(wù)器與客戶(hù)端交互使用JSON結(jié)構(gòu),那么需要不斷的編碼解碼JSON數(shù)據(jù),這將會(huì)導(dǎo)致CPU瓶頸。將消息先進(jìn)行合并,然后進(jìn)行編碼,最后輪詢(xún)Hash結(jié)構(gòu)進(jìn)行推送。
以上是單體架構(gòu)存在的問(wèn)題,為了支持更多的用戶(hù)負(fù)載,通常彈幕系統(tǒng)采用分布式架構(gòu),進(jìn)行彈性擴(kuò)容縮容。
(2)推送還是拉???
如果是客戶(hù)端拉取服務(wù)器端數(shù)據(jù),那么將會(huì)存在以下幾個(gè)問(wèn)題:
因此我們考慮推送模式:當(dāng)數(shù)據(jù)發(fā)生更新的時(shí)候服務(wù)器端主動(dòng)推送到客戶(hù)端,這樣可以有效減少客戶(hù)端的請(qǐng)求次數(shù)。如果需要實(shí)現(xiàn)消息推送,那么就意味著服務(wù)器端維護(hù)大量的長(zhǎng)連接。
(3)為什么使用WebSocket?
實(shí)現(xiàn)彈幕消息的實(shí)時(shí)更新一定是使用Socket的方式,那么為啥要使用WebSocket呢?現(xiàn)在大部分直播應(yīng)用的開(kāi)發(fā)都是跨平臺(tái)的,然而跨平臺(tái)的開(kāi)發(fā)框架本質(zhì)就是Web開(kāi)發(fā),那么一定離不開(kāi)WebSocket,而且一部分用戶(hù)會(huì)選擇在Web端看視頻,比如Bilibili,現(xiàn)如今也有一些桌面應(yīng)用是用Electron等跨平臺(tái)框架開(kāi)發(fā)的,比如Lark飛書(shū)等,因此實(shí)現(xiàn)消息推送的最佳方案就是使用WebSocket。
使用WebSocket可以輕松的維持服務(wù)器端長(zhǎng)連接,其次WebSocket是架構(gòu)在HTTP協(xié)議之上的,并且也可以使用HTTPS方式,因此WebSocket是可靠傳輸,并且不需要開(kāi)發(fā)者關(guān)注底層細(xì)節(jié)。
為啥要使用Go搞WebSocket呢?首先說(shuō)到WebSocket你可能會(huì)想到Node.js,但是Node.js是單線程模型,如果實(shí)現(xiàn)高并發(fā),不得不創(chuàng)建多個(gè)Node.js進(jìn)程,但是這又不容易服務(wù)端遍歷整個(gè)連接集合;如果使用Java就會(huì)顯得比較笨重,Java項(xiàng)目的部署,編寫(xiě)Dockerfile都不如Go的目標(biāo)二進(jìn)制更加簡(jiǎn)潔,并且Go協(xié)程很容易實(shí)現(xiàn)高并發(fā),上一章說(shuō)到Go語(yǔ)言目前也有成熟的WebSocket輪子。
(4)服務(wù)端基本Demo
首先搭建好一個(gè)框架:
package main import ( "fmt" "net/http" ) func main() { fmt.Println("Listen localhost:8080") // 注冊(cè)一個(gè)用于WebSocket的路由,實(shí)際業(yè)務(wù)中不可能只有一個(gè)路由 http.HandleFunc("/messages", messageHandler) // 監(jiān)聽(tīng)8080端口,沒(méi)有實(shí)現(xiàn)服務(wù)異常處理器,因此第二個(gè)參數(shù)是nil http.ListenAndServe("localhost:8080", nil) } func messageHandler(response http.ResponseWriter, request *http.Request) { // TODO: 實(shí)現(xiàn)消息處理 response.Write([]byte("HelloWorld")) }
然后完善messageHandler函數(shù):
func messageHandler(response http.ResponseWriter, request *http.Request) { var upgrader = websocket.Upgrader{ // 允許跨域 CheckOrigin: func(resquest *http.Request) bool { return true }, } // 建立連接 conn, err := upgrader.Upgrade(response, request, nil) if err != nil { return } // 收發(fā)消息 for { // 讀取消息 _, bytes, err := conn.ReadMessage() if err != nil { _ = conn.Close() } // 寫(xiě)入消息 err = conn.WriteMessage(websocket.TextMessage, bytes) if err != nil { _ = conn.Close() } } }
現(xiàn)在基本上實(shí)現(xiàn)了WebSocket功能,但是websocket的原生API不是線程安全的(Close方法是線程安全的,并且是可重入的),并且其他模塊無(wú)法復(fù)用業(yè)務(wù)邏輯,因此進(jìn)行封裝:
// main.go package main import ( "bluemiaomiao.github.io/websocket-go/service" "fmt" "net/http" "github.com/gorilla/websocket" ) func main() { fmt.Println("Listen localhost:8080") http.HandleFunc("/messages", messageHandler) _ = http.ListenAndServe("localhost:8080", nil) } func messageHandler(response http.ResponseWriter, request *http.Request) { var upgrader = websocket.Upgrader{ // 允許跨域 CheckOrigin: func(resquest *http.Request) bool { return true }, } // 建立連接 conn, err := upgrader.Upgrade(response, request, nil) wsConn, err := service.Create(conn) if err != nil { return } // 收發(fā)消息 for { // 讀取消息 msg, err := wsConn.ReadOne() if err != nil { wsConn.Close() } // 寫(xiě)入消息 err = wsConn.WriteOne(msg) if err != nil { _ = conn.Close() } } }
// service/messsage_service.go package service import ( "errors" "github.com/gorilla/websocket" "sync" ) // 封裝的連接對(duì)象 // // 由于websocket的Close()方法是可重入的,所以可以多次調(diào)用,但是關(guān)閉Channel的close() // 方法不是可重入的,因此通過(guò)isClosed進(jìn)行判斷 // isClosed可能發(fā)生資源競(jìng)爭(zhēng),因此通過(guò)互斥鎖避免 // 關(guān)閉websocket連接后,也要自動(dòng)關(guān)閉輸入輸出消息流,因此通過(guò)signalCloseLoopChan實(shí)現(xiàn) type Connection struct { conn *websocket.Conn // 具體的連接對(duì)象 inputStream chan []byte // 輸入流,使用Channel模擬 outputStream chan []byte // 輸出流,使用chaneel模擬 signalCloseLoopChan chan byte // 關(guān)閉信號(hào) isClosed bool // 是否調(diào)用過(guò)close()方法 lock sync.Mutex // 簡(jiǎn)單的鎖 } // 用于初始化一個(gè)連接對(duì)象 func Create(conn *websocket.Conn) (connection *Connection, err error) { connection = &Connection{ conn: conn, inputStream: make(chan []byte, 1000), outputStream: make(chan []byte, 1000), signalCloseLoopChan: make(chan byte, 1), isClosed: false, } // 啟動(dòng)讀寫(xiě)循環(huán) go connection.readLoop() go connection.writeLoop() return } // 讀取一條消息 func (c *Connection) ReadOne() (msg []byte, err error) { select { case msg = <-(*c).inputStream: case <-(*c).signalCloseLoopChan: err = errors.New("connection is closed") } return } // 寫(xiě)入一條消息 func (c *Connection) WriteOne(msg []byte) (err error) { select { case (*c).outputStream <- msg: case <-(*c).signalCloseLoopChan: err = errors.New("connection is closed") } return } // 關(guān)閉連接對(duì)象 func (c *Connection) Close() { _ = (*c).conn.Close() (*c).lock.Lock() if !(*c).isClosed { close((*c).signalCloseLoopChan) } (*c).lock.Unlock() } // 讀取循環(huán) func (c *Connection) readLoop() { // 不停的讀取長(zhǎng)連接中的消息,只要存在消息就將其放到隊(duì)列中 for { _, bytes, err := (*c).conn.ReadMessage() if err != nil { (*c).Close() } select { case <-(*c).signalCloseLoopChan: (*c).Close() case (*c).inputStream <- bytes: } } } // 寫(xiě)入循環(huán) func (c *Connection) writeLoop() { // 只要隊(duì)列中存在消息,就將其寫(xiě)入 var data []byte for { select { case data = <-(*c).outputStream: case <-(*c).signalCloseLoopChan: (*c).Close() } err := (*c).conn.WriteMessage(websocket.TextMessage, data) if err != nil { _ = (*c).conn.Close() } } }
至此,你已經(jīng)學(xué)會(huì)了如何使用Go構(gòu)建WebSocket服務(wù)。
以上就是如何使用Go基于WebSocket構(gòu)建視頻直播彈幕系統(tǒng),小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見(jiàn)到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道。
文章標(biāo)題:如何使用Go基于WebSocket構(gòu)建視頻直播彈幕系統(tǒng)-創(chuàng)新互聯(lián)
標(biāo)題網(wǎng)址:http://aaarwkj.com/article42/dddjec.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、響應(yīng)式網(wǎng)站、關(guān)鍵詞優(yōu)化、定制網(wǎng)站、網(wǎng)站改版
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容