一、etcd 簡介
1. etcd 的背景
雖然Kubernetes 給云原生時代帶來了顛覆性的新氣象,但卻很少人了解被欽定作為其后端存儲的 etcd ,本篇從分布式鎖視角梳理etcd的各種機制,探索基于etcd的鎖實現是怎樣。
etcd 能被Kubernetes 如此青睞,是因為它一直在聆聽社區(qū)的聲音并快速改進,積極配合 Kubernetes 項目向前推進,解決社區(qū)反饋的痛點;發(fā)起 V2 到 V3 的重大版本更新,尤其是 19 年由 Google、Alibaba 等公司聯(lián)合打造的 3.4 版本,滿足了 Kubernetes 在超大型公司大規(guī)模使用中嚴苛的可用性、擴展性和性能等要求。

上圖描述了 etcd 名字的由來,其寓意是為大規(guī)模分布式系統(tǒng)提供存儲配置信息;華為 Kubernetes 專家杜軍老師專門為 etcd 著書《云原生分布式存儲基石-etcd》,稱它基石足見何等重要,另一位大咖etcd 的作者李響老師曾講:“etcd 就是用來存儲云上最重要的數據的”;相信隨著云原生架構的演進,etcd 會擔任越來越多重要的角色;技術人員在云原生時代,除了擁抱 Kubernetes,也要擁抱 etcd。
2.etcd vs ZooKeeper
很長一段時間 ZooKeeper(后文簡稱 ZK) 被作為默認首選項,用于解決分布式系統(tǒng)的協(xié)同和元數據存儲,但其太復雜、迭代慢、難維護、性能缺陷等問題逐漸成為槽點;而 etcd 吸取了 ZK 的教訓,從設計和實現上具有后見之明,提供了更好的工程和運維體驗,其主要改進在于如下幾個方面:
- 動態(tài)的集群節(jié)點關系重配置
- 高負載條件下的穩(wěn)定讀寫
- 多版本并發(fā)控制的數據模型
- 持久、穩(wěn)定的監(jiān)聽機制
- 租約 (lease) 原語實現了連接與會話的解耦
- 安全的分布式共享鎖 API
- 普適的 HTTP 、GRPC 通信協(xié)議
據說在一個由 3 臺 8 核節(jié)點組成的云服務器上, etcd v3 版本可以做到每秒數萬次的寫操作和數十萬次的讀操作(ZK:
);后續(xù)會有其他篇章結合QA的測試結果來探討etcd服務端的設計和性能情況。
3.etcd 特性介紹
為滿足本篇目標所需,也考慮到對 etcd 熟悉的讀者不多,這里著重介紹以下幾個關鍵特性:
- 數據組織:etcd 在 V2 版本時跟 ZK 一樣以樹形目錄結構來組織數據,V3 版本則通過半開區(qū)間( Key range)取代 V2 中的目錄結構,優(yōu)化成扁平的 Key-Value 結構,Key 仍采用之前的格式如/lock/lock1/uuid1、/lock/lock1/uuid2?,感官跟樹形目錄形式保持一致,但實際是一個完整獨立的 key;可以基于 前綴(Prefix)機制通過/lock/lock1/?查詢一批擁有相同前綴的數據,但其語義是前綴匹配查詢而非目錄子節(jié)點查詢;這個改造也是為了支撐更多的特性和達到更好的性能。

- 集群模式:通常是由 3、5 個基數實例組成集群,當超過半數服務實例正常工作就能對外提供服務,既能避免單點故障,又盡量高可用,每個服務實例都有一個數據備份,通過 raft 協(xié)議實現數據全局一致。

- 順序更新:每個 etcd 節(jié)點都可接收讀寫請求,但變更類(增刪改)請求會在集群內轉給 leader 執(zhí)行,來所有客戶端的變更請求將按照 leader 接收的順序被處理,在 ZK 中插入同名節(jié)點,ZK 會自動為同名節(jié)點名后添加遞增序號,即避免沖突又具有順序管控的意義;但 etcd 中不能插入同名 Key,它是采用 revision 機制來管控更新類請求的順序,使用一個全局計數器,單調遞增,每當有數據變更,revision 就會加一(具有全局唯一性),而每個 revision 都關聯(lián)對應修改的數據,可以通過 revision 的大小推斷數據變更的順序,利用這個特性可以實現高級協(xié)調服務。

- 租約(Lease)機制:常見的 TTL(Time To Live)機制是給單個 Key-Value 設置存活時間,超過時間后自動刪除這個 Key-Value,V3 中加入的租約機制更高級一些,針對每個 Lease 設置了一個 TTL 時間,存儲 Key-Value 時可指定一個 Lease,多個擁有相同 TTL 的 key 綁定到同一個 Lease,實現 Lease 的復用。同時也支持續(xù)約,可以通過客戶端在 Lease 到期之前續(xù)約,以避免 Key-Value 過期后失效,也可以通過客戶端主動解約,這比 ZK 中基于 session 的狀態(tài)保持更靈活。

- 監(jiān)聽機制:客戶端可以注冊對某一個 Key 或一批相同前綴(前綴機制,似 ZK 中的父節(jié)點)Key 的監(jiān)聽,當被監(jiān)聽的這些 Key 發(fā)生變更,客戶端將收到通知,感知到變更。

etcd 的分布式鎖正是基于以上特性來實現的,簡單來說是:
- 租約機制:用于支撐異常情況下的鎖自動釋放能力
- 前綴和 Revision 機制:用于支撐公平獲取鎖和排隊等待的能力
- 監(jiān)聽機制:用于支撐搶鎖能力
- 集群模式:用于支撐鎖服務的高可用
二、加解鎖的流程描述

1.準備客戶端和 Key
- 客戶端連接 Etcd,獲取 LockClient,獲取 LeaseClient
- 以/lock/lock1? 為前綴 + / + uuid 創(chuàng)建全局唯一的 key,如:
client-a 線程 1 的 key 為"/lock/lock1/uuid1",申請 lock1
client-a 線程 2 的 key 為"/lock/lock1/uuid2",申請 lock1
client-c 線程 1 的 key 為"/lock/lock2/uuid3",申請 lock2
client-b 線程 1 的 key 為"/lock/lock2/uuid4",申請 lock2
- 客戶端 a、b、c 分別創(chuàng)建租約,租約的時長由業(yè)務輸入確定
2.創(chuàng)建租約并保持續(xù)租
- 方案 1-創(chuàng)建定時任務定時續(xù)租,無論客戶端是持鎖狀態(tài)還是等待鎖狀態(tài) Key 都必須存在,而 Key 是否釋放由租約管控,所以都需要保持租約的活性
持鎖狀態(tài):當業(yè)務未完成時,不能讓租約到期,需定時續(xù)租;當業(yè)務完成時可主動解除租約,持鎖 Key 會被刪除;若客戶端異常,租約到期后持鎖 Key 也會被刪除;等鎖的客戶端監(jiān)聽到持鎖 Key 被刪除后,可開始搶鎖。
等鎖狀態(tài):等鎖超時會主動解除租約,或客戶端異常時等鎖 key 被刪除,后邊排隊的就前進一步,嘗試搶鎖。
3.綁定租約寫 key
- 每個客戶端在執(zhí)行 put 操作時,將第 1 步中準備的具有唯一性的 Key 綁定租約寫入 etcd
4.獲取競爭鎖的 key-Value 列表
- 以 lock1 為例,客戶端以前綴"/lock/lock1" 讀取所有匹配此前綴的的 key-Value 列表(key-Value 中帶有 key 對應的 Revision)
5.對所獲取的 Key-Value 列表按 revision 從小到大排序
6.判斷自己是不是第一個(revision 最小),若是,則成功獲取鎖
7.若不是,則監(jiān)聽自己的前一個 Key-Value 的刪除事件
- 刪除事件由主動釋放租約或者因租約過期失效而觸發(fā)
- 這種只監(jiān)聽前一個 Key-Value 的方式避免了驚群效應問題
8.若是阻塞申請鎖,則申請鎖的操作可增加阻塞等待
9.若監(jiān)聽事件生效,則回到第 4 步重新進行判斷,直到獲取到鎖
10解鎖時,將第一個 Key-Value 的租約釋放
三、etcd 分布式鎖的能力
可能讀者是單篇閱讀,這里引入第一篇《分布式鎖上-初探》中的一些內容,一個分布式鎖應具備這樣一些功能特點:
- 互斥性:在同一時刻,只有一個客戶端能持有鎖
- 安全性:避免死鎖,如果某個客戶端獲得鎖之后處理時間超過最大約定時間,或者持鎖期間發(fā)生了故障導致無法主動釋放鎖,其持有的鎖也能夠被其他機制正確釋放,并保證后續(xù)其它客戶端也能加鎖,整個處理流程繼續(xù)正常執(zhí)行
- 可用性:也被稱作容錯性,分布式鎖需要有高可用能力,避免單點故障,當提供鎖的服務節(jié)點故障(宕機)時不影響服務運行,這里有兩種模式:一種是分布式鎖服務自身具備集群模式,遇到故障能自動切換恢復工作;另一種是客戶端向多個獨立的鎖服務發(fā)起請求,當某個鎖服務故障時仍然可以從其他鎖服務讀取到鎖信息(Redlock)
- 可重入性:對同一個鎖,加鎖和解鎖必須是同一個線程,即不能把其他線程持有的鎖給釋放了
- 高效靈活:加鎖、解鎖的速度要快;支持阻塞和非阻塞;支持公平鎖和非公平鎖
基于上邊對 etcd 分布式鎖的介紹,這里簡單總結一下 etcd 的能力矩陣,ZK 的情況請看《分布式鎖中-基于 Zookeeper 的實現》,redis鎖的情況會在后續(xù)文章中補充
能力 | ZK | etcd | Redis 原生 | Redlock |
互斥 | 是 | 是 |
|
|
安全 | 鏈接異常時,session 丟失自動釋放鎖 | 基于租約,租約過期后自動釋放鎖,不用像ZK那樣釋放鏈接 |
|
|
可用性 | 相對可用性還好 | 好 |
|
|
可重入 | 服務端非可重入,本地線程可重入 | 服務端非可重入,本地線程可重入需自研 |
|
|
加解鎖速度 | 速度不算快 | 速度快,如GRPC 協(xié)議優(yōu)勢、服務端性能的優(yōu)勢 |
|
|
阻塞非阻塞 | 客戶端兩種能力都提供 | jetcd-core 中,阻塞非阻塞由 Future#get 的超時控制能力支撐 |
|
|
公平非公平 | 公平鎖 | 公平鎖 |
|
|
可續(xù)租 | 天然支持,基于session | 天然支持,基于Lease |
|
|
四、jetcd 庫實現分布式鎖
etcd 針對 java 語言的客戶端有官方的 jetcd-core 還有 IBM 的 etcd-java,下文使用 jetcd-core 做示例。
jetcd-core 中提供了高階的 Lock API(無需使用者準備唯一 key、前綴查詢 Key-Value 列表、排序判斷自己是否 revision 最小的、監(jiān)聽前一個 Key-Value 的刪除),使用者只需關注最原始的訴求:申請的鎖是什么名稱、用多久,申請不到就等多久;使用高階 API 實現分布式鎖的流程會比第 2 部分原生的流程要簡單許多,流程如下:
- 連接 etcd 集群
- 獲取 LockClient,獲取 LeaseClient,不需要準備 key
- 創(chuàng)建租約并保持續(xù)租, 你覺得哪一種方案會更好呢
方案 1-創(chuàng)建定時任務定時續(xù)租
方案 2-使用自動續(xù)約
- 綁定租約搶鎖
- 若搶到鎖,則返回鎖路徑信息
- 若沒搶到鎖,則阻塞等待(可設置超時返回)
- 解鎖時,傳入鎖信息解鎖;根據業(yè)務場景決定如何解約
1.pom 依賴
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.7.3</version>
</dependency>
2.Lease 相關的 API 介紹
public interface Lease extends CloseableClient {
//創(chuàng)建一個租約,過期時間是ttl,單位秒;沒有請求超時控制
CompletableFuture<LeaseGrantResponse> grant(long ttl);
//創(chuàng)建一個租約,過期時間是ttl,單位秒;后兩個參數是請求超時控制
CompletableFuture<LeaseGrantResponse> grant(long ttl, long timeout, TimeUnit unit);
//解約
CompletableFuture<LeaseRevokeResponse> revoke(long leaseId);
//主動續(xù)約1次
CompletableFuture<LeaseKeepAliveResponse> keepAliveOnce(long leaseId);
//查詢租約信息
CompletableFuture<LeaseTimeToLiveResponse> timeToLive(long leaseId, LeaseOption leaseOption);
//自動續(xù)約
CloseableClient keepAlive(long leaseId, StreamObserver<LeaseKeepAliveResponse> observer);
}
3.lock 相關的 API 介紹
public interface Lock extends CloseableClient {
//綁定租約申請指定名稱的鎖,搶鎖成功返回鎖秘鑰,
CompletableFuture<LockResponse> lock(ByteSequence name, long leaseId);
//持鎖秘鑰解鎖
CompletableFuture<UnlockResponse> unlock(ByteSequence lockKey);
}
4.分布式鎖示例
package com.rock.dlock;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.lock.LockResponse;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @author zs
* @date 2022/11/6 12:23 PM
*/
public class DemoEtcdLock {
private final static Logger log = LoggerFactory.getLogger(DemoEtcdLock.class);
private Client client;
private Lock lockClient;
private Lease leaseClient;
private LockState lockState;
class LockState{
private String lockKey;
private String lockPath;
private String errorMsg;
private long leaseTTL;
private long leaseId;
private boolean lockSuccess;
public LockState(String lockKey, long leaseTTL) {
this.lockKey = lockKey;
this.leaseTTL = leaseTTL;
}
public String getLockKey() {
return lockKey;
}
public void setLockKey(String lockKey) {
this.lockKey = lockKey;
}
public String getLockPath() {
return lockPath;
}
public void setLockPath(String lockPath) {
this.lockPath = lockPath;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public long getLeaseId() {
return leaseId;
}
public void setLeaseId(long leaseId) {
this.leaseId = leaseId;
}
public boolean isLockSuccess() {
return lockSuccess;
}
public void setLockSuccess(boolean lockSuccess) {
this.lockSuccess = lockSuccess;
}
public long getLeaseTTL() {
return leaseTTL;
}
public void setLeaseTTL(long leaseTTL) {
this.leaseTTL = leaseTTL;
}
}
public DemoEtcdLock(Client client, String lockKey, Long leaseTTL, TimeUnit unit) {
this.client = client;
//1.準備客戶端
this.lockClient = client.getLockClient();
this.leaseClient = client.getLeaseClient();
this.lockState = new LockState(lockKey,unit.toSeconds(leaseTTL));
}
public boolean lock() {
try {
//2.創(chuàng)建租約,并自動續(xù)約
createLease();
//3.執(zhí)行加鎖,并為鎖對應的Key綁定租約
createLock();
}catch (InterruptedException | ExecutionException e) {
//todo:異常處理
}
return lockState.isLockSuccess();
}
public void unlock() {
try {
//正常釋放鎖
if (this.lockState.getLockPath() != null) {
lockClient.unlock(ByteSequence.from(lockState.getLockPath().getBytes())).get();
}
//如果是主動續(xù)約,則關閉續(xù)約的定時任務
//刪除租約
if (lockState.getLeaseId() != 0L) {
leaseClient.revoke(lockState.getLeaseId());
}
} catch (InterruptedException | ExecutionException e) {
//todo:異常處理
}
log.info("線程:{} 釋放鎖", Thread.currentThread().getName());
}
// 創(chuàng)建一個租約
private void createLease() throws ExecutionException, InterruptedException {
log.debug("[etcd-lock]: start to createLease." + this.lockState.getLockKey() + Thread.currentThread().getName());
try {
long leaseId = leaseClient.grant(this.lockState.getLeaseTTL()).get().getID();
lockState.setLeaseId(leaseId);
//自動續(xù)約
StreamObserver<LeaseKeepAliveResponse> observer = new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(LeaseKeepAliveResponse value) {
log.trace("cluster node lease remaining ttl: {}, lease id: {}", value.getTTL(), value.getID());
}
@Override
public void onError(Throwable t) {
log.error("cluster node lease keep alive failed. exception info: {}", t);
}
@Override
public void onCompleted() {
log.trace("cluster node lease completed");
}
};
// 設置自動續(xù)約
leaseClient.keepAlive(leaseId, observer);
}catch (InterruptedException | ExecutionException e) {
log.error("[etcd-lock] Create lease failed:" + e);
lockState.setErrorMsg("Create lease failed:" + e);
throw e;
}
}
private void createLock() throws ExecutionException, InterruptedException {
String lockKey = this.lockState.getLockKey();
log.debug("[etcd-lock]: start to createLock." + lockKey + Thread.currentThread().getName());
try {
LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), lockState.getLeaseId()).get();
if (lockResponse != null) {
String lockPath = lockResponse.getKey().toString(StandardCharsets.UTF_8);
this.lockState.setLockPath(lockPath);
log.info("線程:{} 加鎖成功,鎖路徑:{}", Thread.currentThread().getName(), lockPath);
this.lockState.setLockSuccess(true);
}
}
catch (InterruptedException | ExecutionException e) {
log.error("[etcd-lock] lock failed:" + e);
lockState.setErrorMsg("[etcd-lock] lock failed:" + e);
leaseClient.revoke(this.lockState.getLeaseId());
throw e;
}
}
}
5.測試鎖
package com.rock.dlock;
import io.etcd.jetcd.Client;
import java.util.concurrent.TimeUnit;
/**
* @author zs
* @date 2022/11/6 12:23 PM
*/
public class TestEtcdLock {
public static void main(String[] args) {
Client client = Client.builder().endpoints("http://localhost:2379").build();
DemoEtcdLock demoEtcdLock1 = new DemoEtcdLock(client,"rock",30L, TimeUnit.SECONDS);
DemoEtcdLock demoEtcdLock2 = new DemoEtcdLock(client,"rock",30L, TimeUnit.SECONDS);
boolean lock1 = demoEtcdLock1.lock();
if(lock1) {
try {
System.out.printf("do something");
} finally {
demoEtcdLock1.unlock();
}
}
demoEtcdLock1.lock();//demoEtcdLock1 持鎖未釋放
demoEtcdLock2.lock();//demoEtcdLock2 客戶端無可重入設計,這里將會阻塞等待demoEtcdLock1釋放鎖
}
}
五、總結
本篇從 etcd V3 版本的 Lease、Prefix 、Watch 等關鍵特性切入,介紹了如何基于這些特性來實現一個分布式鎖,并基于jetcd-core庫提供了一個分布式鎖的示例,呈現了其關鍵API的用法;此示例尚未達到生產級可用,如異常、可重入、可重試、超時控制等功能都未補全,計劃在下一篇介紹完redis之后,再介紹一個健壯的分布式鎖客戶端要如何抽象設計,如何適配 ZK 、Redis 、etcd 。
本文轉載自微信公眾號「架構染色」,可以通過以下二維碼關注。轉載本文請聯(lián)系【架構染色】公眾號作者。
