成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

聊聊 Redis 集群數(shù)據(jù)遷移

數(shù)據(jù)庫(kù) Redis
?本文是筆者對(duì)于Redis?源碼分析的一個(gè)階段,將從源碼分析的角度讓讀者深入了解Redis節(jié)點(diǎn)遷移的工作流程,希望對(duì)你有幫助。


詳解redis cluster數(shù)據(jù)遷移過(guò)程

節(jié)點(diǎn)基本結(jié)構(gòu)定義

redis集群提供16384個(gè)slot,我們可以按需分配給節(jié)點(diǎn)上,后續(xù)進(jìn)行鍵值對(duì)存儲(chǔ)時(shí),我們就可以按照算法將鍵值對(duì)存到對(duì)應(yīng)slot上的redis服務(wù)器上:

集群節(jié)點(diǎn)本質(zhì)就是通過(guò)slots這個(gè)數(shù)組記錄當(dāng)前節(jié)點(diǎn)的所管理的情況,這里我們可以看到slots是一個(gè)char 數(shù)組,長(zhǎng)度為REDIS_CLUSTER_SLOTS(16384)除8,這樣做的原因是因?yàn)?

  • char占1個(gè)字節(jié),每個(gè)字節(jié)8位。
  • 每個(gè)char可以記錄8個(gè)slot的情況,如果是自己的slot則對(duì)應(yīng)char的某一個(gè)位置記錄為1:

我們以node-1為例,因?yàn)樗?fù)責(zé)0-5460的節(jié)點(diǎn),所以它的slots0-5460都為1,對(duì)應(yīng)的圖解如下所示,可以看到筆者這里省略了后半部分,僅僅表示了0-15位置為1:

對(duì)此我們也給出這段redis中節(jié)點(diǎn)的定義,即位于cluster.h中的clusterNode這個(gè)結(jié)構(gòu)體中,可以看slots這段定義:

typedef struct clusterNode {
  //......
    //記錄集群負(fù)責(zé)的槽,總的為16384
    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; 
    //......
}

設(shè)置slot后續(xù)節(jié)點(diǎn)走向

以本文示例為例,我們希望后續(xù)節(jié)點(diǎn)2的數(shù)據(jù)全部存到節(jié)點(diǎn)1中,那么我們首先需要鍵入如下兩條配置:

# 在節(jié)點(diǎn)1上執(zhí)行,將節(jié)點(diǎn)2數(shù)據(jù)導(dǎo)入到節(jié)點(diǎn)1上
 CLUSTER SETSLOT 3 IMPORTING node2
 # 在節(jié)點(diǎn)2上執(zhí)行,將自己的數(shù)據(jù)遷移到節(jié)點(diǎn)1
 CLUSTER SETSLOT 3 MIGRATING node1

這兩條指最終都會(huì)被各自的服務(wù)端解析,并調(diào)用clusterCommand執(zhí)行,我們以節(jié)點(diǎn)1導(dǎo)入為例,假設(shè)我們執(zhí)行clusterCommand解析到setslot 關(guān)鍵字和importing關(guān)鍵字,即知曉要導(dǎo)入其他節(jié)點(diǎn)的數(shù)據(jù)。對(duì)應(yīng)的節(jié)點(diǎn)1就會(huì)通過(guò)importing_slots_from數(shù)組標(biāo)記自己將導(dǎo)入這個(gè)slot的數(shù)據(jù),而節(jié)點(diǎn)2也會(huì)通過(guò)migrating_slots_to數(shù)組標(biāo)記自己要將數(shù)據(jù)導(dǎo)出給其他節(jié)點(diǎn)的slot:

對(duì)此我們給出clusterCommand的執(zhí)行流程,可以看到該函數(shù)解析出migrating或者importing關(guān)鍵字時(shí)就會(huì)將對(duì)的migrating_slots_to或者importing_slots_from數(shù)組對(duì)應(yīng)slot位置的索引位置設(shè)置為當(dāng)前上述命令傳入的node id:

void clusterCommand(redisClient *c) {
     //......

        if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {//處理遷出的邏輯
            //看看自己是否有遷出的slot,沒(méi)有則報(bào)錯(cuò)
            if (server.cluster->slots[slot] != myself) {
                addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
                return;
            }
            //查看自己是否知曉這個(gè)node id,如果沒(méi)有則報(bào)錯(cuò)
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[4]->ptr);
                return;
            }
            //標(biāo)記遷出到slot為傳入的node
            server.cluster->migrating_slots_to[slot] = n;
        } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {//處理遷入的邏輯
            //查看遷入的slot是否已經(jīng)配置,如果有則報(bào)錯(cuò)
            if (server.cluster->slots[slot] == myself) {
                addReplyErrorFormat(c,
                    "I'm already the owner of hash slot %u",slot);
                return;
            }
            //查看自己是否知曉要遷入數(shù)據(jù)的node的信息,如果不知道則報(bào)錯(cuò)
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[3]->ptr);
                return;
            }
            //標(biāo)記遷入slot位置為傳入的nodeid
            server.cluster->importing_slots_from[slot] = n;
        } //......
}

后續(xù)的我們假設(shè)還是將set key value請(qǐng)求發(fā)送到節(jié)點(diǎn)2,因?yàn)樯鲜雒畹脑颍?jié)點(diǎn)會(huì)返回move/ask告知客戶(hù)端這個(gè)鍵值對(duì)現(xiàn)在要存到節(jié)點(diǎn)1上。對(duì)應(yīng)節(jié)點(diǎn)1收到這個(gè)key請(qǐng)求時(shí),通過(guò)key計(jì)算得slot正是自己,它就會(huì)將這個(gè)鍵值對(duì)存儲(chǔ)到自己的數(shù)據(jù)庫(kù)中:

這里我們以節(jié)點(diǎn)1的角度查看這個(gè)問(wèn)題,當(dāng)客戶(hù)端收到move指令后,繼續(xù)向節(jié)點(diǎn)1發(fā)送指令,節(jié)點(diǎn)1通過(guò)收到指令調(diào)用processCommand,其內(nèi)部調(diào)用getNodeByQuery獲取當(dāng)前key對(duì)應(yīng)的slot,發(fā)現(xiàn)是自己則直接存儲(chǔ)數(shù)據(jù)到當(dāng)前節(jié)點(diǎn)的內(nèi)存數(shù)據(jù)庫(kù)中:

int processCommand(redisClient *c) {
    //......
    //如果開(kāi)啟了集群模式,且發(fā)送者不是master且參數(shù)帶key則進(jìn)入邏輯
    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_LUA_CLIENT &&
          server.lua_caller->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        if (server.cluster->state != REDIS_CLUSTER_OK) {
           //......
        } else {
            int error_code;
            //查找鍵值對(duì)對(duì)應(yīng)的slot和這個(gè)slot負(fù)責(zé)的節(jié)點(diǎn)
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
            //如果為空且或者非自己,則轉(zhuǎn)交出去給別人處理
            if (n == NULL || n != server.cluster->myself) {
                flagTransaction(c);
                clusterRedirectClient(c,n,hashslot,error_code);
                return REDIS_OK;
            }
        }
    }
 //......
 //將鍵值對(duì)存儲(chǔ)到當(dāng)前數(shù)據(jù)庫(kù)中
}

我們以節(jié)點(diǎn)的視角再次直接步入getNodeByQuery查看這段邏輯,可以看到其內(nèi)部會(huì)基于key計(jì)算slot然后將得到對(duì)應(yīng)的node,如果發(fā)現(xiàn)這個(gè)node是自己且屬于importing_slots_from,即說(shuō)明是客戶(hù)端通過(guò)move或者ask請(qǐng)求找到自己的,則進(jìn)行進(jìn)一步是否是多條指令執(zhí)行且存在key找不到存儲(chǔ)位置的情況,若存在則返回空,反之都是直接返回當(dāng)前節(jié)點(diǎn)信息,即node2的新數(shù)據(jù)直接遷移過(guò)來(lái):

clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
    //......
    //遍歷命令
    for (i = 0; i < ms->count; i++) {
       //.....
  //獲取指令、參數(shù)個(gè)數(shù)、參數(shù)
        mcmd = ms->commands[i].cmd;
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;
        //解析出key以及個(gè)數(shù)
        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        for (j = 0; j < numkeys; j++) {
            //拿到key
            robj *thiskey = margv[keyindex[j]];
            //計(jì)算slot
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));

             //.....
                //如果就是當(dāng)前節(jié)點(diǎn)正在做遷出或者遷入,則migrating_slot/importing_slot設(shè)置為1
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                {
                    migrating_slot = 1;
                } else if (server.cluster->importing_slots_from[slot] != NULL) {
                    importing_slot = 1;
                }
            } else {
               //.....
//.....
        }
      //.....
    }
 //如果設(shè)置了導(dǎo)入標(biāo)識(shí)為1且標(biāo)識(shí)為asking則步入這段邏輯,
 if (importing_slot &&
        (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
    { //當(dāng)前指令有多個(gè)key且存在未命中的則返回空,反之返回自己
        if (multiple_keys && missing_keys) {
            if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
            return NULL;
        } else {
            return myself;
        }
    }

    //.....
    //返回節(jié)點(diǎn)信息以本示例來(lái)說(shuō)就是返回當(dāng)前節(jié)點(diǎn)信息
    return n;
}

完成節(jié)點(diǎn)遷移

上述操作僅僅針對(duì)新節(jié)點(diǎn)的遷移,對(duì)于舊的節(jié)點(diǎn)我們就需要通過(guò)節(jié)點(diǎn)2鍵入CLUSTER GETKEYSINSLOT slot count要遷移的舊的key的slot,然后通過(guò)MIGRATE host port key dbid timeout [COPY | REPLACE]將數(shù)據(jù)遷移到節(jié)點(diǎn)1上。 這里我們補(bǔ)充一下MIGRATE 中copy和replace的區(qū)別,前者是遇到重復(fù)直接報(bào)錯(cuò),后者是遷移時(shí)直接覆蓋。 最終這條指令回基于要遷移的key而生成一條RESTORE-ASKING key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency]指令發(fā)送給導(dǎo)入的節(jié)點(diǎn),以本文例子來(lái)說(shuō)就是節(jié)點(diǎn)1:

這里我們給出MIGRATE 指令對(duì)應(yīng)的處理函數(shù)migrateCommand,邏輯和我上文說(shuō)的差不多,基于指令解析出replace或者copy等信息,然后用argv[3]即我們的key得出這個(gè)鍵值對(duì)的信息生成RESTORE指令將鍵值對(duì)轉(zhuǎn)存給節(jié)點(diǎn)1:

/* 命令 MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) {
   
    //......
    //解析拷貝和替代選項(xiàng),前者重復(fù)會(huì)報(bào)錯(cuò)
    for (j = 6; j < c->argc; j++) {
        if (!strcasecmp(c->argv[j]->ptr,"copy")) {
            copy = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
            replace = 1;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

  //......
    //查看要遷移的key是否存在嗎,如果不存則直接報(bào)錯(cuò)返回
    if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
        addReplySds(c,sdsnew("+NOKEY\r\n"));
        return;
    }

    /* Connect */
    //建立socket連接
    cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
    //......

    //cmd初始化一個(gè)buf緩沖區(qū)
    rioInitWithBuffer(&cmd,sdsempty());

    /* Send the SELECT command if the current DB is not already selected. */
    //如果尚未選擇當(dāng)前DB,則發(fā)送SELECT命令。
    int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
    if (select) {
        redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
        redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
    }

    /* Create RESTORE payload and generate the protocol to call the command. */
    //獲取key的過(guò)期時(shí)效
    expireat = getExpire(c->db,c->argv[3]);
    if (expireat != -1) {
        ttl = expireat-mstime();
        if (ttl < 1) ttl = 1;
    }
 
    //集群用RESTORE-ASKING發(fā)送key給目標(biāo)
    if (server.cluster_enabled)
        redisAssertWithInfo(c,NULL,
            rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
    else
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
   //填充key和value ttl等
    redisAssertWithInfo(c,NULL,sdsEncodedObject(c->argv[3]));
    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,
            sdslen(c->argv[3]->ptr)));
    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));

   //......
    //遷移指令字符串寫(xiě)入緩沖區(qū)
    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                                sdslen(payload.io.buffer.ptr)));
   //......
    //如果是replace發(fā)出 REPLACE
    if (replace)
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));

  

 //......
}

最后調(diào)整

最后我們只需在節(jié)點(diǎn)1和2都執(zhí)行CLUSTER SETSLOT <SLOT> NODE <NODE ID> 完成slot指派,這指令最終就會(huì)走到clusterCommand中,節(jié)點(diǎn)1和節(jié)點(diǎn)2格子的處理邏輯為:

  • 節(jié)點(diǎn)2看看遷移的key是否不存則且migrating_slots_to數(shù)據(jù)不為空,若符合要求說(shuō)明遷移完成但狀態(tài)未修改,直接將migrating_slots_to置空完成指派最后調(diào)整。
  • 節(jié)點(diǎn)1查看節(jié)點(diǎn)id是否是自己且importing_slots_from是否有數(shù)據(jù),若有則說(shuō)明節(jié)點(diǎn)導(dǎo)入完成,直接將importing_slots_from置空。
void clusterCommand(redisClient *c) {
    //......
     else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {//處理setslot指令
          //......
   else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
            /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> 標(biāo)記最終遷移的節(jié)點(diǎn) */
            clusterNode *n = clusterLookupNode(c->argv[4]->ptr);

           //......
            //如果發(fā)現(xiàn)對(duì)應(yīng)的key為0,且migrating_slots_to不為空,則說(shuō)明遷出完成但狀態(tài)還未修改,節(jié)點(diǎn)2會(huì)將migrating_slots_to設(shè)置為空
            if (countKeysInSlot(slot) == 0 &&
                server.cluster->migrating_slots_to[slot])
                server.cluster->migrating_slots_to[slot] = NULL;

           //如果是節(jié)點(diǎn)1則會(huì)看指令的nodeid是否是自己且importing_slots_from是否有數(shù)據(jù),若有則說(shuō)明導(dǎo)入成功直接將importing_slots_from設(shè)置為空
            if (n == myself &&
                server.cluster->importing_slots_from[slot])
            {
              //......
                server.cluster->importing_slots_from[slot] = NULL;
            }
           
        }
  //......
}
責(zé)任編輯:趙寧寧 來(lái)源: 寫(xiě)代碼的SharkChili
相關(guān)推薦

2025-02-24 10:07:09

Redis節(jié)點(diǎn)遷移集群

2022-02-06 21:14:57

Redis命令

2022-02-09 15:36:49

Redis主從模式哨兵模式

2020-04-21 22:59:50

Redis搭建選舉

2024-09-11 20:05:56

2022-08-28 19:36:15

數(shù)據(jù)分片KafkaRocketMQ

2021-06-26 07:40:45

Greenplum集群部署

2018-02-02 16:15:02

Hadoop數(shù)據(jù)遷移集群

2022-05-09 07:35:48

動(dòng)態(tài)集群限流

2025-03-03 10:25:10

2024-01-15 16:51:03

Redis數(shù)據(jù)存儲(chǔ)

2022-03-03 09:51:11

RedisCouchbase數(shù)據(jù)存儲(chǔ)

2020-04-09 11:56:10

Elasticsear集群硬件

2023-02-01 13:22:00

數(shù)據(jù)庫(kù)表連接SQL

2020-09-24 06:49:34

PythonRedis

2021-01-26 07:11:26

Redis數(shù)據(jù)同步數(shù)據(jù)遷移

2022-06-21 07:51:06

Redis高可用哨兵進(jìn)程

2024-07-25 08:39:48

2023-04-06 08:00:36

VPC虛擬私有云Amazon

2024-10-23 08:13:30

Spring響應(yīng)式編程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 欧美一区精品 | 日韩精品免费视频 | 免费午夜视频 | 久久精品视频在线观看 | 国产视频2021| 亚洲欧美日韩网站 | 一级a性色生活片久久毛片 午夜精品在线观看 | 欧美色综合一区二区三区 | 中文av网站 | 亚洲一区二区中文字幕在线观看 | 欧美成人精品一区二区三区 | h片在线看 | 欧美激情a∨在线视频播放 成人免费共享视频 | 国产一区不卡 | 免费在线视频精品 | 黄在线免费观看 | 亚洲大片一区 | 欧美中文字幕一区 | 久久精品中文字幕 | 中文字幕在线观看视频一区 | 日韩欧美在线观看视频网站 | 伊人久久成人 | 日日天天 | 大香网伊人 | 看片地址 | 国产成人精品网站 | 女生羞羞网站 | 日本久久久一区二区三区 | 国产日韩欧美综合 | av免费在线播放 | 丁香久久 | 日韩欧美成人精品 | 日本在线免费 | 91精品亚洲| 日本视频在线 | 亚洲高清在线 | 成人在线精品视频 | 亚洲第一av网站 | 91精品国产色综合久久 | 久久伊人青青草 | 免费在线观看成年人视频 |