高效索引的秘密:Redis 跳表設計與實現
在數據結構的世界中,各種高效的數據存儲和檢索方式層出不窮。其中,跳表(Skip List) 作為一種高效的動態查找數據結構,以其簡潔的設計和良好的性能表現受到了廣泛的關注。與傳統的平衡樹相比,跳表不僅實現了相似的時間復雜度,而且其插入、刪除和查找操作更加直觀易懂。
詳解redis中跳表的設計與實現
1. 跳表的數據結構
我們先從跳表的每一個節點說起,為了保證跳表節點的有序性,跳表中的每一個節點都會用zskiplistNode 來維護節點信息:
- score來記錄當前節點的數值,插入跳表時就會按照score進行升序排列。
- obj來存儲當前節點實際要存儲的元素值。
- backward記錄當前節點的后一個節點,這個節點的score小于當前節點。
- level是一個數組,它記錄當前節點有索引的層級,每個索引都有指向當前節點的前驅節點指針forward和當前節點與forward的跨度span構成。
如下所示,可以看到跳表默認情況下有個header節點作為首節點,每個節點level索引都會記錄前驅節點的指針,而各個節點的backward則會指向自己的后繼節點,而節點之間的跨度也是用span來記錄:
注意:跳表的前驅后繼節點與鏈表的區別,在鏈表中前驅指的是自左向右看,排在自己前面的節點,而后繼節點指的是自左向右看排在自己右邊的節點。而跳表中前驅指的是自右向左看排在自己左邊的節點也就是小于自己的節點,而后繼節點是自右向左看排在自己右邊也就是大于自己的節點,這個概念會貫穿全文,希望讀者可以理解這個概念后再閱讀后續部分的源碼分析。
對應的我們也給出跳表節點的源碼,讀者可基于描述自行理解閱讀:
typedef struct zskiplistNode {
//記錄當前節點實際存儲的數據
robj *obj;
//記錄當前節點的數值,用于排序
double score;
//指向自己的后繼節點
struct zskiplistNode *backward;
//每個節點對應的索引
struct zskiplistLevel {
//記錄自己的前驅節點
struct zskiplistNode *forward;
//記錄前驅節點的跨度
unsigned int span;
} level[];
} zskiplistNode;
了解了節點的概念,我們再來串聯一下跳表的邏輯結構,跳表本質上就是上述節點的串聯:
- 通過header指針記錄跳表的第一個節點。
- 通過tail指針記錄跳表的尾節點。
- 為保證快速獲取跳表的長度,它也會使用length來記錄跳表中的節點數。
- 通過level記錄當前跳表最高層級。
我們基于上圖繼續補充這些概念:
同時我們也給出跳表zskiplist 的源碼:
typedef struct zskiplist {
//記錄頭尾節點
struct zskiplistNode *header, *tail;
//記錄跳表長度
unsigned long length;
//記錄當前索引最高層級
int level;
} zskiplist;
2. 跳表初始化
有了上述的概念之后,對于跳表初始化的邏輯就可以很直觀了解了,在redis中跳表初始化函數為zslCreate,其內部邏輯本質上就是初始化一個跳表,然后對跳表節點個數、頭節點索引、數值、score進行初始化,邏輯比較簡單,讀者可以參照筆者的注釋自行閱讀理解:
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
//初始化跳表索引層級為1
zsl->level = 1;
//跳表中節點數為0
zsl->length = 0;
//初始化header節點
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
//初始化header的前驅指針為空,對應跨度為0
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
//跳表的頭節點的后繼節點設置為空
zsl->header->backward = NULL;
//跳表尾節點指針設置為null
zsl->tail = NULL;
return zsl;
}
3. 跳表節點插入操作的實現(重點)
跳表的插入操作是整個數據結構的核心,只要了解了跳表的插入操作,我們就可以理解整個跳表數據結構算法的思想,這里筆者就以插入一個元素x為例演示一下跳表的插入過程。
在進行插入操作前,跳表首先會初始化update數組和rank數組,update數組記錄索引每一層中小于插入節點x的score的最大score對應的節點,例如我們要插入一個score為3.5的節點,當前跳表第二層索引分別有1、2、3、4、5,那么3就是update[1]要記錄的值。又假設1-5之間跨度都為1,從1-3跨了兩步,所以rank[1]的值就是2。
通過update和rank的配合,一輪O(logN)的遍歷即可找到x每一層索引和節點的插入位置。
我們現在以下面這張圖演示一下跳表的插入過程,可以看到筆者對每個節點的地址addr和score都進行標明:
假設我們要插入的節點x的score為1.5,從level 2開始看到第一個節點的后繼節點為空,所以update[1](1代表level2)指針記錄header節點的地址,也就是0x8943,然后索引向下一層走,走到第二個節點時發現前方節點的值2大于x的score,所以update[0]記錄這一層小于x的最大值1也就是node-1的地址0x8944。
自此我們遍歷完各層索引,下一步就是基于update和rank數組進行節點x插入:
重點來了,建議讀者基于上一步的圖片了解筆者這一步的表述,基于上一步update數組標注的元素指針,我們假設x創建的索引層級也是2,第2層則是指向第一個元素的,所以x的索引就插入到這個索引0的前面,同時我們需要計算這個索引的到后面一個節點的span,對應的結算方式如下:
- 索引1節點每個節點都有,所以跨度為0
- 索引2的節點0原本到NULL的跨度rank為0,即本層小于x的最大節點就是第一個
- 索引1到update數組節點跨度為1,即走一步就是小于x的最大節點
- 索引1的跨度-索引2的跨度得出新插入節點x到下一個節點距離為1,所以span為1
- 而索引2的第一個節點的span也要更新,同樣是索引1的跨度-索引2的跨度=索引2還需跨幾步到達x節點的前一個節點位置,然后再加1 就是走到節點x的跨度,對應的值也為2
最后新插入的節點x如果前方有值,則讓前方節點的backward指針指向x,若沒有則說明x是尾節點,直接用tail指針指向該節點即可,完成后結果大體如下圖所示:
對應的我們也給出redis中對于跳表節點插入實現的代碼,讀者可參考上述講解并結合參考了解過程:
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
//獲取到header的指針
x = zsl->header;
//從跳表的最高層的level開始進行遍歷(level默認值為0)
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
//如果這層是最高層,則rank取0,反之取上一層的跨步直接到達下一個節點
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
//如果前方的節點的scoer小于自己,或者score一樣但是字符串結果小于當前待插入節點的score
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
//通過累加的方式記錄這一層往前跨了幾步
rank[i] += x->level[i].span;
//然后節點往前走
x = x->level[i].forward;
}
//update找到小于當前socre的最大節點,即update記錄各層小于插入節點的最大值
update[i] = x;
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not. */
level = zslRandomLevel();
//如果生成等級大于當前跳表最大等級
if (level > zsl->level) {
//從當前調跳表最高層級開始,初始化rank和update數組
for (i = zsl->level; i < level; i++) {
//rank設置為0
rank[i] = 0;
//高層級update內部節點全部指向header
update[i] = zsl->header;
//header在該層的全部取跳表的長度
update[i]->level[i].span = zsl->length;
}
//更新為最新的跨度
zsl->level = level;
}
//創建節點
x = zslCreateNode(level,score,obj);
//自低向高層處理新節點x的各層索引
for (i = 0; i < level; i++) {
//x的i層索引的前驅指針指向本層score小于x的score的最大score對應的節點
x->level[i].forward = update[i]->level[i].forward;
//score小于x的socre的最大值的節點的前驅指針指向x
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
//通過update[i]指向的節點的span減去(rank[0] - rank[i])即可得到x到update[i]的前驅節點的跨度
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//通過(rank[0] - rank[i])得到update[i]這個節點到到x節點實際后繼節點的距離,最后+1得到update[i]到x節點的跨度
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
//上述步驟保證高層新建索引的頭節點索引指向x節點,這里span自增一下
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
//如果小于x的值最大節點是頭節點,則后方指針指向null,反之指向節點
x->backward = (update[0] == zsl->header) ? NULL : update[0];
//如果節點前方指針有節點,則前方節點的backward指向當前節點
if (x->level[0].forward)
x->level[0].forward->backward = x;
else //反之說明這是第一個節點,直接設置為尾節點
zsl->tail = x;
//更新跳表長度
zsl->length++;
return x;
}
4. 跳表查詢操作的實現
有了上述查詢操作的基礎之后,對于跳表的查詢操作就很好理解了,redis用跳表主要是進行范圍查詢,這里我們就以一個查詢元素排名的實示例演示一下這個過程,以下面這張圖為例,查找一下score為3的元素,除去頭節點它就是第3個元素,所以跳表進行等級查詢時返回結果就是3:
對應的搜索步驟為:
- 從header的2級索引開始,查看第一個節點的后繼節點score為2,小于3,直接前進rank+2。
- level2層級后續沒有節點了,索引向下。
- 來到level1級別的的結點2的索引位置,繼續向前發現節點等于3直接前進,rank+1。
自此,整個搜索過程就完成了,最終返回的結果就是2+1即3:
對應的我們給出等級查詢的源碼,讀者可參考上述步驟并結合筆者的注釋了解過程:
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
//定位到跳表的頭節點
x = zsl->header;
//從當前跳表最高層索引開始搜索
for (i = zsl->level-1; i >= 0; i--) {
/**
* 符合以下條件就向前移動,并記錄移動的span:
* 1. 前驅節點的score小于要搜索的節點值
* 2. 前驅節點的score等于當前節點,當時按照字母序排列小于等于自己
*/
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
//如果得到的元素等于要搜索的結果,則返回累加的rank
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
//什么都沒查找到,直接返回0
return 0;
}unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
//定位到跳表的頭節點
x = zsl->header;
//從當前跳表最高層索引開始搜索
for (i = zsl->level-1; i >= 0; i--) {
/**
* 符合以下條件就向前移動,并記錄移動的span:
* 1. 前驅節點的score小于要搜索的節點值
* 2. 前驅節點的score等于當前節點,當時按照字母序排列小于等于自己
*/
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
//如果得到的元素等于要搜索的結果,則返回累加的rank
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
//什么都沒查找到,直接返回0
return 0;
}
5. 跳表的刪除操作
跳表的節點刪除操作主要是完成以下3件事:
- 刪除節點。
- 將刪除節點的前后節點關聯,并維護兩者之間的跨度。
- 更新跳表索引,如果索引上沒有任何節點的索引,則直接刪除。
我們以下面這張圖為例,假設我們想刪除score為1.5的節點,對應步驟為:
- 從最高層索引開始,找到各層索引小于1.5的最大值對應的節點,以筆者為例update[2]記錄header,update[1]記錄header地址,update[0]記錄索引1的地址。
- 基于上述update數組,update[2]即3級索引中找到的指針header,但是其前驅節點并不是1.5,所以進行span減1的操作,表示后續1.5被刪除之后跨度為2。
- 索引2級中小于1.5的最大值也是header,其前驅節點是1.5,此時我們就需要修改一下1.5索引前后的索引關系,讓header指向節點2,跨度為header到node-1.5的距離加上1.5索引到2的距離得到當前header到node-2的距離,最后再減去1,即得到刪除1.5后兩者之間的距離。
- 1級索引處理步驟和步驟3差不多,這里就不多做強調了。
這里我們貼出跳表節點刪除操作的源碼,可以看到這段代碼會通過update記錄各層索引中小于被刪節點的最大值對應的節點。然后調用zslDeleteNode處理這各層索引的刪除,最后調用zslFreeNode將這個節點刪除:
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
//定位到頭節點
x = zsl->header;
//自頂向下基于索引查找
for (i = zsl->level-1; i >= 0; i--) {
//找到小于待刪除節點obj的score的最大節點,或者找到score相等,但是字母序比對結果小于obj的最大值節點
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
//記錄本層索引小于obj的最大值節點
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
//如果比對一直則執行刪除操作并返回1
x = x->level[0].forward;
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
}
return 0; /* not found */
}
最后我們再貼出刪除節點x時,對各級索引進行前后關系維護操作的源碼:
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
//從跳表所維護的最高層級索引開始遍歷update數組
for (i = 0; i < zsl->level; i++) {
//如果本層的update節點的索引前驅指針是x,則讓這個節點
if (update[i]->level[i].forward == x) {
//更新該節點span為到x的span+x到后繼節點跨步,再減去1(x將會被刪除)
update[i]->level[i].span += x->level[i].span - 1;
//當前節點的索引指向被刪節點的前驅指針
update[i]->level[i].forward = x->level[i].forward;
} else {
//說明本層小于x最大值的索引前驅節點不是指向x,直接跨度減去1(因為x要被刪除,后續少跨一步)
update[i]->level[i].span -= 1;
}
}
//如果x的前驅指針有值,則讓前驅指針的后繼節點指向x的后繼節點
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
//反之說明x是尾指針,刪除x后讓x的后繼節點作為尾節點
zsl->tail = x->backward;
}
//查看當前最上層跳表索引是否空了,如果空了則刪除該層索引
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
//節點數減去1
zsl->length--;
}
結語
Redis 的跳表設計通過多層指針的巧妙運用,不僅實現了高效的查找、插入和刪除操作,還保持了較低的空間開銷。這種數據結構的優勢在于它能夠在平均時間復雜度為 O(log n) 的情況下完成上述操作,這使得 Redis 在處理大量數據時依然能夠保持高性能。此外,跳表的設計簡單直觀,易于實現和維護,這也進一步增強了其在實際應用中的吸引力。總之,Redis 跳表的成功案例證明了合理選擇和優化數據結構對于構建高效系統的重要性。