Redis 如何高效實現(xiàn)定時任務(wù)
Redis通過單線程結(jié)合非阻塞事件輪詢機制實現(xiàn)高效的網(wǎng)絡(luò)IO和時間事件處理,這篇文章我們將從源碼的角度深入分析一下redis時間事件的設(shè)計與實現(xiàn)。
詳解redis中的時間事件
時間事件的定義
時間事件可以是單次到期執(zhí)行銷毀,也可以是定時任務(wù),對此redis對于時間事件統(tǒng)一封裝為aeTimeEvent對象,通過id來唯一標識一個事件,結(jié)合when_sec和when_ms記錄任務(wù)到期執(zhí)行的秒和分,而執(zhí)行時間事件的函數(shù)也是交由timeProc指針所指向的函數(shù)執(zhí)行。 我們以一個redis定時執(zhí)行的任務(wù)為例,如下所示,該結(jié)果通過when_sec和when_ms記錄秒之前的時間和毫秒的時間,一旦這個時間到了就會執(zhí)行timeProc這個函數(shù)指針所指向的方法serverCron,該函數(shù)會定期執(zhí)行各種任務(wù),這一點筆者會在后文展開:
對應(yīng)的我們給出時間事件的代碼描述,即位于ae.h這個頭文件中的aeTimeEvent 結(jié)構(gòu)體,這就是對時間事件的封裝結(jié)構(gòu)體,可以看到它除了筆者上述提到的核心字段以外,還有一個next指針用于連接下一個注冊的時間事件:
//時間事件
typedef struct aeTimeEvent {
//時間事件的id全局遞增
long long id; /* time event identifier. */
long when_sec; /* seconds */
//時間到達的時間
long when_ms; /* milliseconds */
//對應(yīng)時間時間的處理器
aeTimeProc *timeProc;
//......
//連接下一個時間時間
struct aeTimeEvent *next;
} aeTimeEvent;
上文提到redis的時間事件是以鏈表的形式關(guān)聯(lián)起來,這里我們也給出時間時間統(tǒng)一管理對象,即時間輪詢器aeEventLoop ,它通過timeEventHead記錄第一個時間時間而后續(xù)的時間時間統(tǒng)一用時間時間的next指針進行管理:
對應(yīng)我們也給出這段時間代碼的定義,即位于ae.h中aeEventLoop 的定義:
typedef struct aeEventLoop {
//......
//管理時間事件的列表
aeTimeEvent *timeEventHead;
//......
} aeEventLoop;
注冊時間事件
redis在服務(wù)器初始化階段,會注冊一個定時的時間事件,大約每1毫秒觸發(fā)一次,該事件主要做的是:
- 更新redis全局時鐘,該時鐘用于全局變量獲取時間用的。
- 隨機抽取redis內(nèi)存數(shù)據(jù)庫中的樣本刪除過期的鍵值對。
- 如果檢查到aof重寫完成,則進行刷盤操作。
- 如果發(fā)現(xiàn)當前aof大小過大,則fork子進程進行aof重寫操作。
- ......。
對應(yīng)我們給出時間事件注冊的源碼段,即redis初始化時調(diào)用的方法initServer中的aeCreateTimeEvent,可以看到它將定時任務(wù)封裝為時間事件timeEvent,并設(shè)置時間間隔為1毫秒一次:
void initServer(void) {
//......
/* Create the serverCron() time event, that's our main way to process
* background operations. */
//創(chuàng)建時間事件注冊到eventLoop->timeEventHead中
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can't create the serverCron time event.");
exit(1);
}
//......
}
輪詢處理時間事件
redis每次處理完所有用戶的請求之后,都會調(diào)用一次時間時間處理函數(shù)processTimeEvents,輪詢并處理就緒的時間事件,由此保證盡可能準時執(zhí)行時間事件,如果事件時間非定時任務(wù)則執(zhí)行完成直接刪除,反之設(shè)置下一次執(zhí)行時間。這些步驟全部完成之后,返回本次處理的時間事件數(shù):
我們給出處理時間循環(huán)的入口aeMain,可以看到該函數(shù)就是redis核心函數(shù)所在,它會循環(huán)調(diào)用aeProcessEvents處理各種事件:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
//處理各種事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
不如aeProcessEvents可以看到該函數(shù)執(zhí)行完所有用戶請求之后調(diào)用processTimeEvents方法獲取并執(zhí)行就緒的時間事件:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//......
//處理就緒的客戶端事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
//上述核心網(wǎng)絡(luò)IO事件完成后處理時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
最后我們就可以看到處理時間事件的核心代碼段,其內(nèi)部會從timeEventHead開始輪詢就緒的時間事件,比對當前時間是否大于或者等于到期時間,如果是則執(zhí)行當前時間事件,再判斷這個事件是否是定時事件,如果是則更新下次執(zhí)行時間,反之刪除,最后累加本次處理的時間時間數(shù):
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
//......
if (now < eventLoop->lastTime) {
//從時間事件頭開始
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
//循環(huán)處理到期的時間事件
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
//如果現(xiàn)在的事件大于到達時間
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
//調(diào)用時間時間函數(shù)處理該事件
retval = te->timeProc(eventLoop, id, te->clientData);
//更新處理數(shù)
processed++;
//.....
if (retval != AE_NOMORE) {//如果事件類型不是AE_NOMORE則說明是定時事件更新周期,反之刪除
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
aeDeleteTimeEvent(eventLoop, id);
}
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
redis對于時間事件實現(xiàn)上的優(yōu)化
因為時間事件有些要求定期執(zhí)行,所以redis為了保證時間執(zhí)行的實時性,做了如下兩個優(yōu)化:
- 對于比較耗時的時間事件,例如AOF重寫,通過fork子進程異步完成:
- 對于返回給客戶端套接字的內(nèi)容,如果長度超過預設(shè)的值,會主動讓出線程執(zhí)行權(quán),避免時間時間饑餓。
對應(yīng)的我們給出第一點時間時間對于aof重寫的核心代碼段,可以看到serverCron內(nèi)部判斷如果當前沒有rdb和aof子進程,且需要進行aof重寫則調(diào)用rewriteAppendOnlyFileBackground函數(shù)fork子進程進行aof重寫:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//......
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
//aof_rewrite_scheduled設(shè)置為1,且沒有其他持久化子進程則進行aof重寫,通過異步避免耗時
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
//......
}
//fork子進程進行aof重寫
int rewriteAppendOnlyFileBackground(void) {
//......
if ((childpid = fork()) == 0) {//fork子進程進行aof重寫
char tmpfile[256];
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
//生成一個tmp文件
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {//重寫aof
size_t private_dirty = zmalloc_get_private_dirty();
//......
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
//......
}
return REDIS_OK; /* unreached */
}
而回復給客戶端結(jié)果的處理器sendReplyToClient內(nèi)部也有一段,判斷如果寫入數(shù)totwritten 大于REDIS_MAX_WRITE_PER_EVENT (宏定義為64M),則直接中止寫入,break退出等到下一次循環(huán)處理,避免因為這個處理導致其他時間事件饑餓而導致事件執(zhí)行延期:
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
while(c->bufpos > 0 || listLength(c->reply)) {
//......
//對于文件事件數(shù)據(jù)寫入超長會讓出執(zhí)行權(quán)讓時間事件能夠盡可能的執(zhí)行
server.stat_net_output_bytes += totwritten;
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory)) break;
}
//......
}