有狀態(tài)的計(jì)算作為容錯(cuò)以及數(shù)據(jù)一致性的保證,是當(dāng)今實(shí)時(shí)計(jì)算必不可少的特性之一,流行的實(shí)時(shí)計(jì)算引擎包括 Google Dataflow、Flink、Spark (Structure) Streaming、Kafka Streams 都分別提供對(duì)內(nèi)置 State 的支持。State 的引入使得實(shí)時(shí)應(yīng)用可以不依賴外部數(shù)據(jù)庫(kù)來(lái)存儲(chǔ)元數(shù)據(jù)及中間數(shù)據(jù),部分情況下甚至可以直接用 State 存儲(chǔ)結(jié)果數(shù)據(jù),這讓業(yè)界不禁思考: State 和 Database 是何種關(guān)系?有沒(méi)有可能用 State 來(lái)代替數(shù)據(jù)庫(kù)呢?
在這個(gè)課題上,F(xiàn)link 社區(qū)是比較早就開(kāi)始探索的??傮w來(lái)說(shuō),F(xiàn)link 社區(qū)的努力可以分為兩條線: 一是在作業(yè)運(yùn)行時(shí)通過(guò)作業(yè)查詢接口訪問(wèn) State 的能力,即 QueryableState;二是通過(guò) State 的離線 dump 文件(Savepoint)來(lái)離線查詢和修改 State 的能力,即即將引入的 Savepoint Processor API。
QueryableState
在 2017 年發(fā)布的 Flink 1.2 版本,F(xiàn)link 引入了 QueryableState 的特性以允許用戶通過(guò)特定的 client 查詢作業(yè) State 的內(nèi)容 [1],這意味著 Flink 應(yīng)用可以在完全不依賴 State 存儲(chǔ)介質(zhì)以外的外部存儲(chǔ)的情況下提供實(shí)時(shí)訪問(wèn)計(jì)算結(jié)果的能力。
只通過(guò) Queryable State 提供實(shí)時(shí)數(shù)據(jù)訪問(wèn)
然而,QueryableState 雖然設(shè)想上比較理想化,但由于依賴底層架構(gòu)的改動(dòng)較多且功能也比較受限,它一直處于 Beta 版本并不能用于生產(chǎn)環(huán)境。針對(duì)這個(gè)問(wèn)題,在前段時(shí)間騰訊的工程師楊華提出 QueryableState 的改進(jìn)計(jì)劃 [2]。在郵件列表中,社區(qū)就 QueryableState 是否可以用于代替數(shù)據(jù)庫(kù)作了討論并出現(xiàn)了不同的觀點(diǎn)。筆者結(jié)合個(gè)人見(jiàn)解將 State as Database 的主要優(yōu)缺點(diǎn)整理如下。
更低的數(shù)據(jù)延遲。一般情況下 Flink 應(yīng)用的計(jì)算結(jié)果需要同步到外部的數(shù)據(jù)庫(kù),比如定時(shí)觸發(fā)輸出窗口計(jì)算結(jié)果,而這種同步通常是定時(shí)的會(huì)帶來(lái)一定的延遲,導(dǎo)致計(jì)算是實(shí)時(shí)的而查詢卻不是實(shí)時(shí)的尷尬局面,而直接 State 則可以避免這個(gè)問(wèn)題。
更強(qiáng)的數(shù)據(jù)一致性保證。根據(jù)外部存儲(chǔ)的特性不同,F(xiàn)link Connector 或者自定義的 SinkFunction 提供的一致性保障也有所差別。比如對(duì)于不支持多行事務(wù)的 HBase,F(xiàn)link 只能通過(guò)業(yè)務(wù)邏輯的冪等性來(lái)保障 Exactly-Once 投遞。相比之下 State 則有妥妥的 Exactly-Once 投遞保證。
節(jié)省資源。因?yàn)闇p少了同步數(shù)據(jù)到外部存儲(chǔ)的需要,我們可以節(jié)省序列化和網(wǎng)絡(luò)傳輸?shù)某杀?,另外?dāng)然還可以節(jié)省數(shù)據(jù)庫(kù)成本。
SLA 保障不足。數(shù)據(jù)庫(kù)技術(shù)已經(jīng)非常成熟,在可用性、容錯(cuò)性和運(yùn)維上都很多的積累,在這點(diǎn)上 State 還相當(dāng)于是處于原始人時(shí)期。另外從定位上來(lái)看,F(xiàn)link 作業(yè)有版本迭代維護(hù)或者遇到錯(cuò)誤自動(dòng)重啟帶來(lái)的 down time,并不能達(dá)到數(shù)據(jù)庫(kù)在數(shù)據(jù)訪問(wèn)上的高可用性。
可能導(dǎo)致作業(yè)的不穩(wěn)定。未經(jīng)過(guò)考慮的 Ad-hoc Query 可能會(huì)要求掃描并返回夸張量級(jí)的數(shù)據(jù),這會(huì)系統(tǒng)帶來(lái)很大的負(fù)荷,很可能影響作業(yè)的正常執(zhí)行。即使是合理的 Query,在并發(fā)數(shù)較多的情況下也可能影響作業(yè)的執(zhí)行效率。
存儲(chǔ)數(shù)據(jù)量不能太大。State 運(yùn)行時(shí)主要存儲(chǔ)在 TaskManager 本地內(nèi)存和磁盤,State 過(guò)大會(huì)造成 TaskManager OOM 或者磁盤空間不足。另外 State 大意味著 checkpoint 大,導(dǎo)致 checkpoint 可能會(huì)超時(shí)并顯著延長(zhǎng)作業(yè)恢復(fù)時(shí)長(zhǎng)。
只支持最基礎(chǔ)的查詢。State 只能進(jìn)行最簡(jiǎn)單的數(shù)據(jù)結(jié)構(gòu)查詢,不能像關(guān)系型數(shù)據(jù)庫(kù)一樣提供函數(shù)等計(jì)算能力,也不支持謂詞下推等優(yōu)化技術(shù)。
只可以讀取,不能修改。State 在運(yùn)行時(shí)只可以被作業(yè)本身修改,如果實(shí)在要修改 State 只能通過(guò)下文的 Savepoint Processor API 來(lái)實(shí)現(xiàn)。
總體來(lái)說(shuō),目前 State 代替數(shù)據(jù)庫(kù)的缺點(diǎn)還是遠(yuǎn)多于其優(yōu)點(diǎn),不過(guò)對(duì)于某些對(duì)數(shù)據(jù)可用性要求不高的作業(yè)來(lái)說(shuō),使用 State 作為數(shù)據(jù)庫(kù)還是完全合理的。由于定位上的不同,F(xiàn)link State 在短時(shí)間內(nèi)很難看到可以完全替代數(shù)據(jù)庫(kù)的可能性,但在數(shù)據(jù)訪問(wèn)特性上 State 往數(shù)據(jù)庫(kù)方向發(fā)展是無(wú)需質(zhì)疑的。
Savepoint Processor API
Savepoint Processor API 是社區(qū)最近提出的一個(gè)新特性(見(jiàn) FLIP-42 [3]),用于離線對(duì) State 的 dump 文件 Savepoint 進(jìn)行分析、修改或者直接根據(jù)數(shù)據(jù)構(gòu)建出一個(gè)初始的 Savepoint。Savepoint Processor API 屬于 Flink State Evolution 的 State Management。如果說(shuō) QueryableState 是 DSL 的話,F(xiàn)link State Evolution 就是 DML,而 Savepoint Processor API 就是 DML 中最為重要的部分。
Savepoint Processor API 的前身是第三方的 Bravo 項(xiàng)目 [4],主要思路提供 Savepoint 和 DataSet 相互轉(zhuǎn)換的能力,典型應(yīng)用是 Savepoint 讀取成 DataSet,在 DataSet 上進(jìn)行修改,然后再寫為一個(gè)新的 Savepoint。這適合用于以下的場(chǎng)景:
分析作業(yè) State 以研究其模式和規(guī)律
排查問(wèn)題或者審計(jì)
為新的應(yīng)用構(gòu)建的初始 State
修改 Savepoint,比如:
改變作業(yè)最大并行度
進(jìn)行巨大的 Schema 改動(dòng)
修正有問(wèn)題的 State
Savepoint 作為 State 的 dump 文件,通過(guò) Savepoint Processor API 可以暴露數(shù)據(jù)查詢和修改功能,類似于一個(gè)離線的數(shù)據(jù)庫(kù),但 State 的概念和典型關(guān)系型數(shù)據(jù)的概念還是有很多不同,F(xiàn)LIP-43 也對(duì)這些差異進(jìn)行了類比和總結(jié)。
首先 Savepoint 是多個(gè) operator 的 state 的物理存儲(chǔ)集合,不同 operator 的 state 是獨(dú)立的,這類似于數(shù)據(jù)庫(kù)下不同 namespace 之間的 table。我們可以得到 Savepoint 對(duì)應(yīng)數(shù)據(jù)庫(kù),單個(gè) operator 對(duì)應(yīng) Namespace。
DatabaseSavepointNamespaceUidTableState
但就 table 而言,其在 Savepoint 里對(duì)應(yīng)的概念根據(jù) State 類型的不同而有所差別。State 有 Operator State、Keyed State 和 Broadcast State 三種,其中 Operator State 和 Broadcast State 屬于 non-partitioned state,即沒(méi)有按 key 分區(qū)的 state,而相反地 Keyed State 則屬于 partitioned state。對(duì)于 non-partitioned state 來(lái)說(shuō),state 是一個(gè) table,state 的每個(gè)元素即是 table 里的一行;而對(duì)于 partitioned state 來(lái)說(shuō),同一個(gè) operator 下的所有 state 對(duì)應(yīng)一個(gè) table。這個(gè) table 像是 HBase 一樣有個(gè) row key,然后每個(gè)具體的 state 對(duì)應(yīng) table 里的一個(gè) column。
舉個(gè)例子,假設(shè)有一個(gè)游戲玩家得分和在線時(shí)長(zhǎng)的數(shù)據(jù)流,我們需要用 Keyed State 來(lái)記錄玩家所在組的分?jǐn)?shù)和游戲時(shí)長(zhǎng),用 Operator State 記錄玩家的總得分和總時(shí)長(zhǎng)。
在一段時(shí)間內(nèi)數(shù)據(jù)流的輸入如下:
user_iduser_nameuser_groupscore1001PaulA5,0001002CharlotteA3,6001003KateC2,0001004RobertB3,900user_iduser_nameuser_grouptime1001PaulA1,8001002CharlotteA1,2001003KateC6001004RobertB2,000
用 Keyed State ,我們分別注冊(cè) group_score 和 group_time 兩個(gè) MapState 表示組總得分和組總時(shí)長(zhǎng),并根據(jù) user_group keyby 數(shù)據(jù)流之后將兩個(gè)指標(biāo)的累積值更新到 State 里,得到的表如下:
user_groupgroup_scoregroup_timeA8,6003,000C2,00600B3,9002,000
相對(duì)地,假如用 Operator State 來(lái)記錄總得分和總時(shí)長(zhǎng)(并行度設(shè)為 1),我們注冊(cè) total_score 和 total_time 兩個(gè) State,得到的表有兩個(gè):
total_score |
------- |
14,500 |
至此 Savepoint 和 Database 的對(duì)應(yīng)關(guān)系應(yīng)該是比較清晰明了的。而對(duì)于 Savepoint 來(lái)說(shuō)還有不同的 StateBackend 來(lái)決定 State 具體如何持續(xù)化,這顯然對(duì)應(yīng)的是數(shù)據(jù)庫(kù)的存儲(chǔ)引擎。在 MySQL 中,我們可以通過(guò)簡(jiǎn)單的一行命令 ALTER TABLE xxx ENGINE = InnoDB; 來(lái)改變存儲(chǔ)引擎,在背后 MySQL 會(huì)自動(dòng)完成繁瑣的格式轉(zhuǎn)換工作。而對(duì)于 Savepoint 來(lái)說(shuō),由于 StateBackend 各自的存儲(chǔ)格式不兼容,目前尚不能方便地切換 StateBackend。為此,社區(qū)在不久前創(chuàng)建 FLIP-41 [5] 來(lái)進(jìn)一步完善 Savepoint 的可操作性。
總結(jié)
State as Database 是實(shí)時(shí)計(jì)算發(fā)展的大趨勢(shì),它并不是要代替數(shù)據(jù)庫(kù)的使用,而是借鑒數(shù)據(jù)庫(kù)領(lǐng)域的經(jīng)驗(yàn)拓展 State 接口使其操作方式更接近我們熟悉的數(shù)據(jù)庫(kù)。對(duì)于 Flink 而言,State 的外部使用可以分為在線的實(shí)時(shí)訪問(wèn)和離線的訪問(wèn)和修改,分別將由 Queryable State 和 Savepoint Processor API 兩個(gè)特性支持。
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
網(wǎng)頁(yè)題目:FlinkState有可能代替數(shù)據(jù)庫(kù)嗎?
URL地址:http://aaarwkj.com/article6/jposog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供自適應(yīng)網(wǎng)站、網(wǎng)站設(shè)計(jì)、定制網(wǎng)站、做網(wǎng)站、服務(wù)器托管、品牌網(wǎng)站制作
廣告
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源:
創(chuàng)新互聯(lián)