螞蟻金服開源 SOFAJRaft:生產級 Java Raft 算法庫
什么是 SOFAJRaft?
SOFAJRaft 是一個基于 Raft 一致性算法的生產級高性能 Java 實現,支持 MULTI-RAFT-GROUP,適用于高負載低延遲的場景。 使用 SOFAJRaft 你可以專注于自己的業務領域,由 SOFAJRaft 負責處理所有與 Raft 相關的技術難題,并且 SOFAJRaft 非常易于使用,你可以通過幾個示例在很短的時間內掌握它。
https://raft.github.io
SOFAJRaft 是從百度的 braft 移植而來,做了一些優化和改進,感謝百度 braft 團隊開源了如此優秀的 C++ Raft 實現。
https://github.com/brpc/braft
基礎知識:分布式共識算法 (Consensus Algorithm)
如何理解分布式共識?
- 多個參與者某一件事一致 :一件事,一個結論
- 已達成一致的結論,不可推翻
有哪些分布式共識算法?
- Paxos:被認為是分布式共識算法的根本,其他都是其變種,但是 Paxos 論文中只給出了單個提案的過程,并沒有給出復制狀態機中需要的 multi-paxos 的相關細節的描述,實現 Paxos 具有很高的工程復雜度(如多點可寫,允許日志空洞等)。
- Zab:被應用在 Zookeeper 中,業界使用廣泛,但沒有抽象成通用的 library。
- Raft:以容易理解著稱,業界也涌現出很多 Raft 實現,比如大名鼎鼎的 etcd, braft, tikv 等。
什么是 Raft?
Raft 是一種更易于理解的分布式共識算法,核心協議本質上還是師承 Paxos 的精髓,不同的是依靠 Raft 模塊化的拆分以及更加簡化的設計,Raft 協議相對更容易實現。
https://raft.github.io/
模塊化的拆分主要體現在:Raft 把一致性協議劃分為 Leader 選舉、MemberShip 變更、日志復制、Snapshot 等幾個幾乎完全解耦的模塊。
更加簡化的設計則體現在:Raft 不允許類似 Paxos 中的亂序提交、簡化系統中的角色狀態(只有 Leader、Follower、Candidate 三種角色)、限制僅 Leader 可寫入、使用隨機化的超時時間來設計 Leader Election 等等。
特點:Strong Leader
- 系統中必須存在且同一時刻只能有一個 Leader,只有 Leader 可以接受 Clients 發過來的請求;
- Leader 負責主動與所有 Followers 通信,負責將“提案”發送給所有 Followers,同時收集多數派的 Followers 應答;
- Leader 還需向所有 Followers 主動發送心跳維持領導地位(保持存在感)。
一句話總結 Strong Leader: "你們不要 BB! 按我說的做,做完了向我匯報!"。
另外,身為 Leader 必須保持一直 BB(heartbeat) 的狀態,否則就會有別人跳出來想要 BB 。

Raft 中的基本概念
篇幅有限,這里只對 Raft 中的幾個概念做一個簡單介紹,詳細請參考 Raft paper。
https://raft.github.io/raft.pdf
Raft-node 的 3 種角色/狀態

- Follower:完全被動,不能發送任何請求,只接受并響應來自 Leader 和 Candidate 的 Message,每個節點啟動后的初始狀態一定是 Follower;
- Leader:處理所有來自客戶端的請求,以及復制 Log 到所有 Followers;
- Candidate:用來競選一個新 Leader (Candidate 由 Follower 觸發超時而來)。
Message 的 3 種類型
- RequestVote RPC:由 Candidate 發出,用于發送投票請求;
- AppendEntries (Heartbeat) RPC:由 Leader 發出,用于 Leader 向 Followers 復制日志條目,也會用作 Heartbeat (日志條目為空即為 Heartbeat);
- InstallSnapshot RPC:由 Leader 發出,用于快照傳輸,雖然多數情況都是每個服務器獨立創建快照,但是Leader 有時候必須發送快照給一些落后太多的 Follower,這通常發生在 Leader 已經丟棄了下一條要發給該Follower 的日志條目(Log Compaction 時清除掉了) 的情況下。
任期邏輯時鐘
- 時間被劃分為一個個任期 (term),term id 按時間軸單調遞增;
- 每一個任期的開始都是 Leader 選舉,選舉成功之后,Leader 在任期內管理整個集群,也就是 “選舉 + 常規操作”;
- 每個任期最多一個 Leader,可能沒有 Leader (spilt-vote 導致)。

本圖出自《Raft: A Consensus Algorithm for Replicated Logs》
什么是 SOFAJRaft?
SOFAJRaft 是一個基于 Raft 一致性算法的生產級高性能 Java 實現,支持 MULTI-RAFT-GROUP,適用于高負載低延遲的場景。 使用 SOFAJRaft 你可以專注于自己的業務領域,由 SOFAJRaft 負責處理所有與 Raft 相關的技術難題,并且 SOFAJRaft 非常易于使用,你可以通過幾個示例在很短的時間內掌握它。
https://github.com/brpc/braft
SOFAJRaft 是從百度的 braft 移植而來,做了一些優化和改進,感謝百度 braft 團隊開源了如此優秀的 C++ Raft 實現。
SOFAJRaft 整體功能&性能優化

功能支持
1.Leader election:Leader 選舉,這個不多說,上面已介紹過 Raft 中的 Leader 機制。
2.Log replication and recovery:日志復制和日志恢復。
1)Log replication 就是要保證已經被 commit 的數據一定不會丟失,即一定要成功復制到多數派。
2)Log recovery 包含兩個方面:
3)Current term 日志恢復:主要針對一些 Follower 節點重啟加入集群或者是新增 Follower 節點后如何追日志;
4)Prev term 日志恢復:主要針對 Leader 切換前后的日志一致性。
3.Snapshot and log compaction:定時生成 snapshot,實現 log compaction 加速啟動和恢復,以及 InstallSnapshot 給 Followers 拷貝數據,如下圖:

本圖出自《In Search of an Understandable Consensus Algorithm》
4.Membership change:用于集群線上配置變更,比如增加節點、刪除節點、替換節點等。
5.Transfer leader:主動變更 leader,用于重啟維護,leader 負載平衡等。
6.Symmetric network partition tolerance:對稱網絡分區容忍性。

如上圖 S1 為當前 leader,網絡分區造成 S2 不斷增加本地 term,為了避免網絡恢復后 S2 發起選舉導致正在良心 工作的 leader step-down,從而導致整個集群重新發起選舉,SOFAJRaft 中增加了 pre-vote 來避免這個問題的發生。
SOFAJRaft 中在 request-vote 之前會先進行 pre-vote(currentTerm + 1, lastLogIndex, lastLogTerm),多數派成功后才會轉換狀態為 candidate 發起真正的 request-vote,所以分區后的節點,pre-vote 不會成功,也就不會導致集群一段時間內無法正常提供服務。
7.Asymmetric network partition tolerance:非對稱網絡分區容忍性。

如上圖 S1 為當前 leader,S2 不斷超時觸發選主,S3 提升 term 打斷當前 lease,從而拒絕 leader 的更新。
- 在 SOFAJRaft 中增加了一個 tick 的檢查,每個 follower 維護一個時間戳記錄下收到 leader 上數據更新的時間(也包括心跳),只有超過 election timeout 之后才允許接受 request-vote 請求。
8.Fault tolerance:容錯性,少數派故障不影響系統整體可用性,包括但不限于:
1)機器掉電
2)強殺應用
3)慢節點(GC, OOM 等)
4)網絡故障
5)其他各種奇葩原因導致 raft 節點無法正常工作
9.Workaround when quorate peers are dead:多數派故障時,整個 grop 已不具備可用性,安全的做法是等待多數節點恢復,只有這樣才能保證數據安全;但是如果業務更加追求系統可用性,可以放棄數據一致性的話,SOFAJRaft 提供了手動觸發 reset_peers 的指令以迅速重建整個集群,恢復集群可用。
10.Metrics:SOFAJRaft 內置了基于 Metrics 類庫的性能指標統計,具有豐富的性能統計指標,利用這些指標數據可以幫助用戶更容易找出系統性能瓶頸。
11.Jepsen:除了幾百個單元測試以及部分 chaos 測試之外, SOFAJRaft 還使用 jepsen 這個分布式驗證和故障注入測試框架模擬了很多種情況,都已驗證通過:
1)隨機分區,一大一小兩個網絡分區
2)隨機增加和移除節點
3)隨機停止和啟動節點
4)隨機 kill -9 和啟動節點
5)隨機劃分為兩組,互通一個中間節點,模擬分區情況
6)隨機劃分為不同的 majority 分組
性能優化
除了功能上的完整性,SOFAJRaft 還做了很多性能方面的優化,這里有一份 KV 場景(get/put)的 Benchmark 數據, 在小數據包,讀寫比例為 9:1,保證線性一致讀的場景下,三副本***可以達到 40w+ 的 ops。
這里挑重點介紹幾個優化點:
- Batch: 我們知道互聯網兩大優化法寶便是 Cache 和 Batch,SOFAJRaft 在 Batch 上花了較大心思,整個鏈路幾乎都是 Batch 的,依靠 disruptor 的 MPSC 模型批量消費,對整體性能有著極大的提升,包括但不限于:
- 批量提交 task
- 批量網絡發送
- 本地 IO batch 寫入
- 要保證日志不丟,一般每條 log entry 都要進行 fsync 同步刷盤,比較耗時,SOFAJRaft 中做了合并寫入的優化。
- 批量應用到狀態機
- 需要說明的是,雖然 SOFAJRaft 中大量使用了 Batch 技巧,但對單個請求的延時并無任何影響,SOFAJRaft 中不會對請求做延時的攢批處理。
- Replication pipeline:流水線復制,通常 Leader 跟 Followers 節點的 Log 同步是串行 Batch 的方式,每個 Batch 發送之后需要等待 Batch 同步完成之后才能繼續發送下一批(ping-pong),這樣會導致較長的延遲。SOFAJRaft 中通過 Leader 跟 Followers 節點之間的 pipeline 復制來改進,非常有效降低了數據同步的延遲, 提高吞吐。經我們測試,開啟 pipeline 可以將吞吐提升 30% 以上,詳細數據請參照 Benchmark。
- Append log in parallel:在 SOFAJRaft 中 Leader 持久化 log entries 和向 Followers 發送 log entries 是并行的。
- Fully concurrent replication:Leader 向所有 Follwers 發送 Log 也是完全相互獨立和并發的。
- Asynchronous:SOFAJRaft 中整個鏈路幾乎沒有任何阻塞,完全異步的,是一個完全的 callback 編程模型。
- ReadIndex:優化 Raft read 走 Raft log 的性能問題,每次 read,僅記錄 commitIndex,然后發送所有 peers heartbeat 來確認 Leader 身份,如果 Leader 身份確認成功,等到 appliedIndex >= commitIndex,就可以返回 Client read 了,基于 ReadIndex Follower 也可以很方便的提供線性一致讀,不過 commitIndex 是需要從 Leader 那里獲取,多了一輪 RPC;關于線性一致讀文章后面會詳細分析。
- Lease Read:SOFAJRaft 還支持通過租約 (lease) 保證 Leader 的身份,從而省去了 ReadIndex 每次 heartbeat 確認 Leader 身份,性能更好,但是通過時鐘維護 lease 本身并不是絕對的安全(時鐘漂移問題,所以 SOFAJRaft 中默認配置是 ReadIndex,因為通常情況下 ReadIndex 性能已足夠好。
SOFAJRaft 設計

- Node:Raft 分組中的一個節點,連接封裝底層的所有服務,用戶看到的主要服務接口,特別是 apply(task)用于向 raft group 組成的復制狀態機集群提交新任務應用到業務狀態機。
- 存儲:上圖靠下的部分均為存儲相關。
- Log 存儲,記錄 Raft 用戶提交任務的日志,將日志從 Leader 復制到其他節點上。
- LogStorage 是存儲實現,默認實現基于 RocksDB 存儲,你也可以很容易擴展自己的日志存儲實現;
- LogManager 負責對底層存儲的調用,對調用做緩存、批量提交、必要的檢查和優化。
- Metadata 存儲,元信息存儲,記錄 Raft 實現的內部狀態,比如當前 term、投票給哪個節點等信息。
- Snapshot 存儲,用于存放用戶的狀態機 snapshot 及元信息,可選:
- SnapshotStorage 用于 snapshot 存儲實現;
- SnapshotExecutor 用于 snapshot 實際存儲、遠程安裝、復制的管理。
- 狀態機
- StateMachine:用戶核心邏輯的實現,核心是 onApply(Iterator) 方法, 應用通過 Node#apply(task) 提交的日志到業務狀態機;
- FSMCaller:封裝對業務 StateMachine 的狀態轉換的調用以及日志的寫入等,一個有限狀態機的實現,做必要的檢查、請求合并提交和并發處理等。
- 復制
- Replicator:用于 Leader 向 Followers 復制日志,也就是 Raft 中的 AppendEntries 調用,包括心跳存活檢查等;
- ReplicatorGroup:用于單個 Raft group 管理所有的 replicator,必要的權限檢查和派發。
- RPC:RPC 模塊用于節點之間的網絡通訊
- RPC Server:內置于 Node 內的 RPC 服務器,接收其他節點或者客戶端發過來的請求,轉交給對應服務處理;
- RPC Client:用于向其他節點發起請求,例如投票、復制日志、心跳等。
- KV Store:KV Store 是各種 Raft 實現的一個典型應用場景,SOFAJRaft 中包含了一個嵌入式的分布式 KV 存儲實現(SOFAJRaft-RheaKV)。
SOFAJRaft Group
單個節點的 SOFAJRaft-node 是沒什么實際意義的,下面是三副本的 SOFAJRaft 架構圖:

SOFAJRaft Multi Group
單個 Raft group 是無法解決大流量的讀寫瓶頸的,SOFAJRaft 自然也要支持 multi-raft-group。

SOFAJRaft 實現細節解析之高效的線性一致讀
什么是線性一致讀? 所謂線性一致讀,一個簡單的例子就是在 t1 的時刻我們寫入了一個值,那么在 t1 之后,我們一定能讀到這個值,不可能讀到 t1 之前的舊值 (想想 Java 中的 volatile 關鍵字,說白了線性一致讀就是在分布式系統中實現 Java volatile 語義)。

如上圖 Client A、B、C、D 均符合線性一致讀,其中 D 看起來是 stale read,其實并不是,D 請求橫跨了 3 個階段,而讀可能發生在任意時刻,所以讀到 1 或 2 都行。
重要:接下來的討論均基于一個大前提,就是業務狀態機的實現必須是滿足線性一致性的,簡單說就是也要具有 Java volatile 的語義。
- 要實現線性一致讀,首先我們簡單直接一些,是否可以直接從當前 Leader 節點讀?
- 仔細一想,這顯然行不通,因為你無法確定這一刻當前的 "Leader" 真的是 Leader,比如在網絡分區的情況下,它可能已經被推翻王朝卻不自知。
- 最簡單易懂的實現方式:同 “寫” 請求一樣,“讀” 請求也走一遍 Raft 協議 (Raft Log)。

本圖出自《Raft: A Consensus Algorithm for Replicated Logs》
這一定是可以的,但性能上顯然不會太出色,走 Raft Log 不僅僅有日志落盤的開銷,還有日志復制的網絡開銷,另外還有一堆的 Raft “讀日志” 造成的磁盤占用開銷,這在讀比重很大的系統中通常是無法被接受的。
- ReadIndex Read
- 這是 Raft 論文中提到的一種優化方案,具體來說:
- Leader 將自己當前 Log 的 commitIndex 記錄到一個 Local 變量 ReadIndex 里面;
- 接著向 Followers 發起一輪 heartbeat,如果半數以上節點返回了對應的 heartbeat response,那么 Leader 就能夠確定現在自己仍然是 Leader (證明了自己是自己);
- Leader 等待自己的狀態機執行,直到 applyIndex 超過了 ReadIndex,這樣就能夠安全的提供 Linearizable Read 了,也不必管讀的時刻是否 Leader 已飄走 (思考:為什么等到 applyIndex 超過了 ReadIndex 就可以執行讀請求?);
- Leader 執行 read 請求,將結果返回給 Client。
- 通過ReadIndex,也可以很容易在 Followers 節點上提供線性一致讀:
- Follower 節點向 Leader 請求***的 ReadIndex;
- Leader 執行上面前 3 步的過程(確定自己真的是 Leader),并返回 ReadIndex 給 Follower;
- Follower 等待自己的 applyIndex 超過了 ReadIndex;
- Follower 執行 read 請求,將結果返回給 Client。(SOFAJRaft 中可配置是否從 Follower 讀取,默認不打開)
- ReadIndex小結:
- 相比較于走 Raft Log 的方式,ReadIndex 省去了磁盤的開銷,能大幅度提升吞吐,結合 SOFAJRaft 的 batch + pipeline ack + 全異步機制,三副本的情況下 Leader 讀的吞吐可以接近于 RPC 的吞吐上限;
- 延遲取決于多數派中最慢的一個 heartbeat response,理論上對于降低延時的效果不會非常顯著。
- Lease Read
- Lease Read 與 ReadIndex 類似,但更進一步,不僅省去了 Log,還省去了網絡交互。它可以大幅提升讀的吞吐也能顯著降低延時。
- 基本的思路是 Leader 取一個比 election timeout 小的租期(***小一個數量級),在租約期內不會發生選舉,這就確保了 Leader 不會變,所以可以跳過 ReadIndex 的第二步,也就降低了延時。可以看到 Lease Read 的正確性和時間是掛鉤的,因此時間的實現至關重要,如果時鐘漂移嚴重,這套機制就會有問題。
- 實現方式:
- 定時 heartbeat 獲得多數派響應,確認 Leader 的有效性 (在 SOFAJRaft 中默認的 heartbeat 間隔是 election timeout 的十分之一);
- 在租約有效時間內,可以認為當前 Leader 是 Raft Group 內的唯一有效 Leader,可忽略 ReadIndex 中的 heartbeat 確認步驟(2);
- Leader 等待自己的狀態機執行,直到 applyIndex 超過了 ReadIndex,這樣就能夠安全的提供 Linearizable Read 了 。
在 SOFAJRaft 中發起一次線性一致讀請求的代碼展示:
- // KV 存儲實現線性一致讀
- public void readFromQuorum(String key, AsyncContext asyncContext) {
- // 請求 ID 作為請求上下文傳入
- byte[] reqContext = new byte[4];
- Bits.putInt(reqContext, 0, requestId.incrementAndGet());
- // 調用 readIndex 方法, 等待回調執行
- this.node.readIndex(reqContext, new ReadIndexClosure() {
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- if (status.isOk()) {
- try {
- // ReadIndexClosure 回調成功,可以從狀態機讀取***數據返回
- // 如果你的狀態實現有版本概念,可以根據傳入的日志 index 編號做讀取
- asyncContext.sendResponse(new ValueCommand(fsm.getValue(key)));
- } catch (KeyNotFoundException e) {
- asyncContext.sendResponse(GetCommandProcessor.createKeyNotFoundResponse());
- }
- } else {
- // 特定情況下,比如發生選舉,該讀請求將失敗
- asyncContext.sendResponse(new BooleanCommand(false, status.getErrorMsg()));
- }
- }
- });
- }
應用場景
- Leader 選舉;
- 分布式鎖服務,比如 Zookeeper,在 SOFAJRaft 中的 RheaKV 模塊提供了完整的分布式鎖實現;
- 高可靠的元信息管理,可直接基于 SOFAJRaft-RheaKV 存儲;
- 分布式存儲系統,如分布式消息隊列、分布式文件系統、分布式塊系統等等。
使用案例
- RheaKV:基于 SOFAJRaft 實現的嵌入式、分布式、高可用、強一致的 KV 存儲類庫。
- AntQ Streams QCoordinator:使用 SOFAJRaft 在 Coordinator 集群內做選舉、使用 SOFAJRaft-RheaKV 做元信息存儲等功能。
- Schema Registry:高可靠 schema 管理服務,類似 kafka schema registry,存儲部分基于 SOFAJRaft-RheaKV。
- SOFA 服務注冊中心元信息管理模塊:IP 數據信息注冊,要求寫數據達到各個節點一致,并且在少數派節點掛掉時保證不影響數據正常存儲。
實踐
一、基于 SOFAJRaft 設計一個簡單的 KV Store

二、基于 SOFAJRaft 的 RheaKV 的設計

功能名詞
PD
- 全局的中心總控節點,負責整個集群的調度,不需要自管理的集群可不啟用 PD (一個 PD 可管理多個集群,基于 clusterId 隔離)。
Store
- 集群中的一個物理存儲節點,一個 Store 包含一個或多個 Region。
Region
- 最小的 KV 數據單元,每個 Region 都有一個左閉右開的區間 [startKey, endKey), 可根據請求流量/負載/數據量大小等指標自動分裂以及自動副本搬遷。
特點
- 嵌入式
- 強一致性
- 自驅動
- 自診斷, 自優化, 自決策
以上幾點(尤其2、3) 基本都是依托于 SOFAJRaft 自身的功能來實現,詳細介紹請參考 SOFAJRaft 文檔 。
致謝
感謝 braft、etcd、tikv 貢獻了優秀的 Raft 實現,SOFAJRaft 受益良多。