成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

來聊聊 Redis 集群數據遷移

數據庫 Redis 開發
本文將是筆者對于Redis源碼分析的一個階段的最后一篇,將從源碼分析的角度讓讀者深入了解redis節點遷移的工作流程,希望對你有幫助。

一、詳解redis cluster數據遷移過程

1. 節點基本結構定義

redis集群提供16384個slot,我們可以按需分配給節點上,后續進行鍵值對存儲時,我們就可以按照算法將鍵值對存到對應slot上的redis服務器上:

集群節點本質就是通過slots這個數組記錄當前節點的所管理的情況,這里我們可以看到slots是一個char 數組,長度為REDIS_CLUSTER_SLOTS(16384)除8,這樣做的原因是因為:

  • char占1個字節,每個字節8位。
  • 每個char可以記錄8個slot的情況,如果是自己的slot則對應char的某一個位置記錄為1:

我們以node-1為例,因為它負責0-5460的節點,所以它的slots0-5460都為1,對應的圖解如下所示,可以看到筆者這里省略了后半部分,僅僅表示了0-15位置為1:

對此我們也給出這段redis中節點的定義,即位于cluster.h中的clusterNode這個結構體中,可以看slots這段定義:

typedef struct clusterNode {
  //......
    //記錄集群負責的槽,總的為16384
    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; 
    //......
}

2. 設置slot后續節點遷移

以本文示例為例,我們希望后續節點2的數據全部存到節點1中,那么我們首先需要鍵入如下兩條配置:

# 在節點1上執行,將節點2數據導入到節點1上
 CLUSTER SETSLOT 3 IMPORTING node2
 # 在節點2上執行,將自己的數據遷移到節點1
 CLUSTER SETSLOT 3 MIGRATING node1

這兩條指最終都會被各自的服務端解析,并調用clusterCommand執行,我們以節點1導入為例,假設我們執行clusterCommand解析到setslot 關鍵字和importing關鍵字,即知曉要導入其他節點的數據。對應的節點1就會通過importing_slots_from數組標記自己將導入這個slot的數據,而節點2也會通過migrating_slots_to數組標記自己要將數據導出給其他節點的slot:

對此我們給出clusterCommand的執行流程,可以看到該函數解析出migrating或者importing關鍵字時就會將對的migrating_slots_to或者importing_slots_from數組對應slot位置的索引位置設置為當前上述命令傳入的node id:

void clusterCommand(redisClient *c) {
     //......

        if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {//處理遷出的邏輯
            //看看自己是否有遷出的slot,沒有則報錯
            if (server.cluster->slots[slot] != myself) {
                addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
                return;
            }
            //查看自己是否知曉這個node id,如果沒有則報錯
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[4]->ptr);
                return;
            }
            //標記遷出到slot為傳入的node
            server.cluster->migrating_slots_to[slot] = n;
        } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {//處理遷入的邏輯
            //查看遷入的slot是否已經配置,如果有則報錯
            if (server.cluster->slots[slot] == myself) {
                addReplyErrorFormat(c,
                    "I'm already the owner of hash slot %u",slot);
                return;
            }
            //查看自己是否知曉要遷入數據的node的信息,如果不知道則報錯
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[3]->ptr);
                return;
            }
            //標記遷入slot位置為傳入的nodeid
            server.cluster->importing_slots_from[slot] = n;
        } //......
}

3. 請求重定向問題

后續的我們假設還是將set key value請求發送到節點2,因為上述命令的原因,節點會返回move/ask告知客戶端這個鍵值對現在要存到節點1上。對應節點1收到這個key請求時,通過key計算得slot正是自己,它就會將這個鍵值對存儲到自己的數據庫中:

這里我們以節點1的角度查看這個問題,當客戶端收到move指令后,繼續向節點1發送指令,節點1通過收到指令調用processCommand,其內部調用getNodeByQuery獲取當前key對應的slot,發現是自己則直接存儲數據到當前節點的內存數據庫中:

int processCommand(redisClient *c) {
    //......
    //如果開啟了集群模式,且發送者不是master且參數帶key則進入邏輯
    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_LUA_CLIENT &&
          server.lua_caller->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        if (server.cluster->state != REDIS_CLUSTER_OK) {
           //......
        } else {
            int error_code;
            //查找鍵值對對應的slot和這個slot負責的節點
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
            //如果為空且或者非自己,則轉交出去給別人處理
            if (n == NULL || n != server.cluster->myself) {
                flagTransaction(c);
                clusterRedirectClient(c,n,hashslot,error_code);
                return REDIS_OK;
            }
        }
    }
 //......
 //將鍵值對存儲到當前數據庫中
}

我們以節點的視角再次直接步入getNodeByQuery查看這段邏輯,可以看到其內部會基于key計算slot然后將得到對應的node,然后進行如下判斷:

  • 如果本次客戶端請求是一個批量的請求,且第一個key定位不到響應的slot,直接返回錯誤。
  • 如果key的slot屬于當前節點,且當前節點正在遷出并且當前節點查不到這個key,則響應一個ask標識告知客戶端到遷出的節點詢問一下是否有數據。
  • 如果是key屬于當前節點且正在進行導入,且key定位不到則響應異常,反之說明當前節點導入成功,直接返回當前節點信息。
  • 如果定位到的slot屬于別的節點,則響應一個move告知客戶端到別的節點獲取鍵值對。

對應的我們給出這段代碼函數getNodeByQuery,對應的邏輯和筆者上述給出的核心分支一致:

clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
   //.......

   
    //如果是exec命令則用mstate封裝這些命令
    if (cmd->proc == execCommand) {
        /* If REDIS_MULTI flag is not set EXEC is just going to return an
         * error. */
        if (!(c->flags & REDIS_MULTI)) return myself;
        ms = &c->mstate;
    } else {
        //為了一個原子指向,創建一個假的multi記錄這些指令
        ms = &_ms;
        _ms.commands = &mc;
        //命令個數1
        _ms.count = 1;
        //命令參數
        mc.argv = argv;
        //命令參數個數
        mc.argc = argc;
        //對應的命令
        mc.cmd = cmd;
    }

    
    //遍歷命令
    for (i = 0; i < ms->count; i++) {
      //.......
        //解析出key以及個數
        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        for (j = 0; j < numkeys; j++) {
            //拿到key
            robj *thiskey = margv[keyindex[j]];
            //計算slot
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));
   
            if (firstkey == NULL) {
              
                firstkey = thiskey;
                slot = thisslot;
                //拿著slot找到對應的集群節點
                n = server.cluster->slots[slot];

                //如果當前查詢的key是第一個key且找不到,則將錯誤碼設置為REDIS_CLUSTER_REDIR_DOWN_UNBOUND并返回空
                if (n == NULL) {
                    getKeysFreeResult(keyindex);
                    if (error_code)
                        *error_code = REDIS_CLUSTER_REDIR_DOWN_UNBOUND;
                    return NULL;
                }

                //如果就是當前節點正在做遷出或者遷入,則migrating_slot/importing_slot設置為1
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                {
                    migrating_slot = 1;
                } else if (server.cluster->importing_slots_from[slot] != NULL) {
                    importing_slot = 1;
                }
            } else {
             //.......
            }

         
            //如果正在做遷出或者嵌入找不到當前db找不到key的位置,則missing_keys++
            if ((migrating_slot || importing_slot) &&
                lookupKeyRead(&server.db[0],thiskey) == NULL)
            {
                missing_keys++;
            }
        }
        getKeysFreeResult(keyindex);
    }

   
    //所有key都沒有對應slot節點,直接返回當前節點
    if (n == NULL) return myself;

  //.......
    //正在遷出且key找不到位置,錯誤碼設置為ask并返回遷出的目標節點,讓客戶端到別的節點嘗試看看
    if (migrating_slot && missing_keys) {
        if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
        return server.cluster->migrating_slots_to[slot];
    }

 //如果是節點正在導入且key找不到則返回,標識當前集群不穩定
    if (importing_slot &&
        (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
    {
        if (multiple_keys && missing_keys) {
            if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
            return NULL;
        } else {
         //反之說明導入成功則告知自己可以找到這個鍵值對
            return myself;
        }
    }

  //......
    //返回其他節點,error_code設置為move
    if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
    return n;
}

4. 完成節點遷移

上述操作僅僅針對新節點的告知要處理的新的slot,對于舊的節點的舊有slot數據我們就需要通過節點2鍵入CLUSTER GETKEYSINSLOT slot count要遷移的舊的key的slot,然后通過MIGRATE host port key dbid timeout [COPY | REPLACE]將數據遷移到節點1上。 這里我們補充一下MIGRATE 中copy和replace的區別,前者是遇到重復直接報錯,后者是遷移時直接覆蓋。 最終這條指令回基于要遷移的key而生成一條RESTORE-ASKING key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency]指令發送給導入的節點,以本文例子來說就是節點1:

這里我們給出MIGRATE 指令對應的處理函數migrateCommand,邏輯和我上文說的差不多,基于指令解析出replace或者copy等信息,然后用argv[3]即我們的key得出這個鍵值對的信息生成RESTORE指令將鍵值對轉存給節點1:

/* 命令 MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) {
   
    //......
    //解析拷貝和替代選項,前者重復會報錯
    for (j = 6; j < c->argc; j++) {
        if (!strcasecmp(c->argv[j]->ptr,"copy")) {
            copy = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
            replace = 1;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

  //......
    //查看要遷移的key是否存在嗎,如果不存則直接報錯返回
    if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
        addReplySds(c,sdsnew("+NOKEY\r\n"));
        return;
    }

    /* Connect */
    //建立socket連接
    cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
    //......

    //cmd初始化一個buf緩沖區
    rioInitWithBuffer(&cmd,sdsempty());

    /* Send the SELECT command if the current DB is not already selected. */
    //如果尚未選擇當前DB,則發送SELECT命令。
    int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
    if (select) {
        redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
        redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
    }

    /* Create RESTORE payload and generate the protocol to call the command. */
    //獲取key的過期時效
    expireat = getExpire(c->db,c->argv[3]);
    if (expireat != -1) {
        ttl = expireat-mstime();
        if (ttl < 1) ttl = 1;
    }
 
    //集群用RESTORE-ASKING發送key給目標
    if (server.cluster_enabled)
        redisAssertWithInfo(c,NULL,
            rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
    else
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
   //填充key和value ttl等
    redisAssertWithInfo(c,NULL,sdsEncodedObject(c->argv[3]));
    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,
            sdslen(c->argv[3]->ptr)));
    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));

   //......
    //遷移指令字符串寫入緩沖區
    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                                sdslen(payload.io.buffer.ptr)));
   //......
    //如果是replace發出 REPLACE
    if (replace)
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));

  

 //......
}

5. 最后調整

最后我們只需在節點1和2都執行CLUSTER SETSLOT <SLOT> NODE <NODE ID> 完成slot指派,這指令最終就會走到clusterCommand中,節點1和節點2各自的處理邏輯為:

  • 節點2看看遷移的key的數量未0且migrating_slots_to數據不為空,若符合要求,則說明本次遷移完成但狀態未修改,直接將migrating_slots_to置空完成指派最后調整。
  • 節點1查看節點id是否是自己,且importing_slots_from是否有數據,若有則說明節點導入完成,直接將importing_slots_from置空。
void clusterCommand(redisClient *c) {
    //......
     else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {//處理setslot指令
          //......
   else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
            /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> 標記最終遷移的節點 */
            clusterNode *n = clusterLookupNode(c->argv[4]->ptr);

           //......
            //如果發現對應的key為0,且migrating_slots_to不為空,則說明遷出完成但狀態還未修改,節點2會將migrating_slots_to設置為空
            if (countKeysInSlot(slot) == 0 &&
                server.cluster->migrating_slots_to[slot])
                server.cluster->migrating_slots_to[slot] = NULL;

           //如果是節點1則會看指令的nodeid是否是自己且importing_slots_from是否有數據,若有則說明導入成功直接將importing_slots_from設置為空
            if (n == myself &&
                server.cluster->importing_slots_from[slot])
            {
              //......
                server.cluster->importing_slots_from[slot] = NULL;
            }
           
        }
  //......
}

二、小結

自此我們將redis集群中的所有核心設計都分析完成,我們來簡單小結一下整體過程:

  • 通過CLUSTER IMPORTING/MIGRATING 進行slot遷入或遷出,redis服務端通過一個數組維護遷入和遷出的slot的信息。
  • 后續客戶端發起請求獲取對應的slot的信息時,會通過上述兩個數組獲知節點遷移情況已做出結果響應。
  • 通過步驟1能夠告知對應的slot的新數據的存儲指向,對于舊數據我們還是需要通過指令完成遷移,其本質就是服務端定位到對應slot上的key然后生成RESP規范的協議指令通知到遷移的節點上。
  • 以遷出節點為例,看到自己對應slot的key為0且遷出數組非空,則說明遷出完成。

由此完成一次集群的遷移。

責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關推薦

2024-11-04 15:49:43

Redis?數據遷移

2025-03-03 10:25:10

2022-02-06 21:14:57

Redis命令

2022-02-09 15:36:49

Redis主從模式哨兵模式

2020-04-21 22:59:50

Redis搭建選舉

2024-09-11 20:05:56

2025-02-17 11:07:10

2022-08-28 19:36:15

數據分片KafkaRocketMQ

2024-02-04 09:00:00

向量查詢數據檢索MyScale

2024-02-21 08:19:54

2021-06-26 07:40:45

Greenplum集群部署

2018-02-02 16:15:02

Hadoop數據遷移集群

2022-05-09 07:35:48

動態集群限流

2024-01-15 16:51:03

Redis數據存儲

2022-03-03 09:51:11

RedisCouchbase數據存儲

2020-04-09 11:56:10

Elasticsear集群硬件

2023-02-01 13:22:00

數據庫表連接SQL

2021-01-26 07:11:26

Redis數據同步數據遷移

2020-09-24 06:49:34

PythonRedis

2019-01-28 09:32:30

跳槽員工程序員
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美一区视频 | 欧美一区二区三区在线看 | 精品无码三级在线观看视频 | 亚洲欧美在线视频 | 亚洲男人天堂2024 | 久久大全 | av三级在线观看 | 天天射影院 | 成人av在线大片 | 国产精品永久免费视频 | 欧美性区 | 四虎影| 神马影院一区二区三区 | 黄色电影在线免费观看 | 中文字幕av一区 | 国产成人精品一区二区三区四区 | 欧美一级二级视频 | 国产精品久久久久久久免费观看 | 嫩草91在线| 中文字幕精品一区二区三区精品 | 欧美另类视频在线 | 日韩欧美在线视频一区 | 中文字幕av亚洲精品一部二部 | 日韩中文字幕一区 | 一级毛片免费 | 亚洲高清视频在线观看 | 亚洲a人 | 成人福利在线视频 | 自拍偷拍第一页 | 欧美成人激情 | 国产精品资源在线 | 成人中文网 | 国产一二三区电影 | 欧美一区二| 欧美激情一区二区三区 | 久久aⅴ乱码一区二区三区 91综合网 | 在线色| 午夜视频精品 | 亚洲最新在线 | 手机av在线| 亚洲一区电影 |