巧用 Redis,實(shí)現(xiàn)微博 Feed 流功能!
一、背景
最近接到一個(gè)需求,用一句話來(lái)說(shuō)就是:展示關(guān)注人發(fā)布的動(dòng)態(tài),這個(gè)涉及到 feed 流系統(tǒng)的設(shè)計(jì)。本文主要介紹一個(gè)一般企業(yè)可用的 Feed 流解決方案。
二、相關(guān)概念
下面先介紹一下關(guān)于 Feed 流的簡(jiǎn)單概念。
1.什么是 feed 流
- Feed:Feed 流中的每一條狀態(tài)或者消息都是 Feed,比如微博中的一條微博就是一個(gè) Feed。
- Feed流:持續(xù)更新并呈現(xiàn)給用戶內(nèi)容的信息流。每個(gè)人的朋友圈,微博關(guān)注頁(yè)等等都是一個(gè) Feed 流。
2.feed 流分類
Feed 流常見的分類有兩種:
- Timeline:按發(fā)布的時(shí)間順序排序,產(chǎn)品如果選擇 Timeline 類型,那么就是認(rèn)為 Feed 流中的 Feed 不多,但是每個(gè) Feed 都很重要,都需要用戶看到。類似于微信朋友圈,微博等。
- Rank:按某個(gè)非時(shí)間的因子排序,一般是按照用戶的喜好度排序,一般用于新聞推薦類、商品推薦等。
三、設(shè)計(jì)
設(shè)計(jì)一個(gè) Feed 流系統(tǒng),兩個(gè)關(guān)鍵步驟,一個(gè)是 Feed 流的 初始化,一個(gè)是 推送。關(guān)于 Feed 流的存儲(chǔ)其實(shí)也是一個(gè)核心的點(diǎn),但是筆主持久化使用的還是 MySQL,后續(xù)可以考慮優(yōu)化。
1.Feed 流初始化
Feed 流【關(guān)注頁(yè) Feed 流】的初始化指的是,當(dāng)用戶的 Feed 流還不存在的時(shí)候,為該用戶創(chuàng)建一個(gè)屬于他自己的關(guān)注頁(yè) Feed 流,具體怎么做呢?其實(shí)很簡(jiǎn)單,遍歷一遍關(guān)注列表,取出所有關(guān)注用戶的 feed,將 feedId 存放到 redis 的 sortSet 中即可。這里面有幾個(gè)關(guān)鍵點(diǎn):
- 初始化數(shù)據(jù):初始化的數(shù)據(jù)需要從數(shù)據(jù)庫(kù)中加載出來(lái)。
- key 值:sortSet 的 key 值需要使用當(dāng)前用戶的 id 做標(biāo)識(shí)。
- score 值:如果是 Timeline 類型,直接取 feed 創(chuàng)建的時(shí)間戳即可。如果是 rank 類型,則把你的業(yè)務(wù)對(duì)應(yīng)的權(quán)重值設(shè)進(jìn)去。
2.推送
經(jīng)過上面的初始化,已經(jīng)把 feed 流放在了 redis 緩存中了。接下來(lái)就是需要更新 feed 流了,在下面四種情況需要進(jìn)行更新:
- 關(guān)注的用戶發(fā)布新的 feed:
- 關(guān)注的用戶刪除 feed。
- 用戶新增關(guān)注。
- 用戶取消關(guān)注。
3.發(fā)布/刪除 Feed 流程
上面四步具體怎么操作,會(huì)在下面的實(shí)現(xiàn)步驟中詳細(xì)描述,在這里先我們重點(diǎn)討論一下第一、二種情況。因?yàn)樵谔幚?大V 【千萬(wàn)級(jí)別粉絲】的時(shí)候,我們是需要對(duì) 大V 的所有粉絲的 feed 流進(jìn)行處理的,這時(shí)候涉及到的量就會(huì)非常巨大,需要多加斟酌。關(guān)于推送,一般有兩種 推/拉。
- 推:A用戶發(fā)布新的動(dòng)態(tài)時(shí),要往 A用戶所有的粉絲 feed 流中推。
- 拉:A用戶發(fā)布新的動(dòng)態(tài)時(shí),先不進(jìn)行推送,而是等 粉絲進(jìn)來(lái)的時(shí)候,才主動(dòng)到 A用戶的個(gè)人頁(yè)TimeLine 拉取最新的 feed,然后進(jìn)行一個(gè) merge。如果關(guān)注了多個(gè)大V,可以并發(fā)的向多個(gè)大V 個(gè)人頁(yè)TimeLine 中拉取。
4.推拉結(jié)合模式
當(dāng)用戶發(fā)布一條新的 Feed 時(shí),處理流程如下:
- 先從關(guān)注列表中讀取到自己的粉絲列表,以及判斷自己是否是大V。
- 將自己的Feed消息寫入個(gè)人頁(yè)Timeline。如果是大V,寫入流程到此就結(jié)束了。
- 如果是普通用戶,還需要將自己的Feed消息寫給自己的粉絲,如果有100個(gè)粉絲,那么就要寫給100個(gè)用戶。
當(dāng)刷新自己的Feed流的時(shí)候,處理流程如下:
- 先去讀取自己關(guān)注的大V列表
- 去讀取自己的 Feed 流。
- 如果有關(guān)注的大V,則再次并發(fā)讀取每一個(gè)大V的個(gè)人頁(yè)Timeline,如果關(guān)注了10個(gè)大V,那么則需要10次訪問。
- 合并2和3步的結(jié)果,然后按時(shí)間排序,返回給用戶。
至此,使用推拉結(jié)合方式的發(fā)布,讀取Feed流的流程都結(jié)束了。
5.推模式
如果只是用推模式了,則會(huì)變的比較簡(jiǎn)單:
- 「發(fā)布Feed:」不用區(qū)分是否大V,所有用戶的流程都一樣,都是三步。
- 「讀取Feed流:」不需要第一步,也不需要第三步,只需要第二步即可,將之前的2 + N(N是關(guān)注的大V個(gè)數(shù)) 次網(wǎng)絡(luò)開銷減少為 1 次網(wǎng)絡(luò)開銷。讀取延時(shí)大幅降級(jí)。
6.兩種模式總結(jié)
推拉結(jié)合存在一個(gè)弊端,就是刷新自己的Feed流時(shí),大V的個(gè)人頁(yè)Timeline 的讀壓力會(huì)很大。
如何解決:
- 不使用大V/普通用戶的優(yōu)化方式,使用對(duì)活躍粉絲采用推模式,非活躍粉絲采用拉模式。
- 完全使用推模式就可以徹底解決這個(gè)問題,但是會(huì)帶來(lái)存儲(chǔ)量增大,大V Feed 發(fā)送總時(shí)間增大,從發(fā)給第一個(gè)粉絲到發(fā)給最后一個(gè)粉絲可能要幾分鐘時(shí)間。
四、實(shí)現(xiàn)
筆主主要采用純推模式實(shí)現(xiàn)了一個(gè)普通企業(yè)基本可用的 Feed 流系統(tǒng),下面介紹一下具體的實(shí)現(xiàn)代碼,主要包括3大個(gè)部分:
- 初始化 Feed 流。
- 關(guān)注的用戶發(fā)布/刪除 feed,該用戶的粉絲更新自己的Feed流。
- 用戶新增/取消關(guān)注,更新自己的Feed流。
1.初始化 Feed 流
當(dāng)用戶第一進(jìn)來(lái)刷新Feed 流,且 Feed 流還不存在時(shí),我們需要進(jìn)行初始化,初始化的具體代碼如下:核心思想就是從數(shù)據(jù)庫(kù)中l(wèi)oad出 feed 信息,塞到 zSet 中,然后分頁(yè)返回。
/**
* 獲取關(guān)注的人的信息流
*/
public List<FeedDto> listFocusFeed(Long userId, Integer page, Integer size) {
String focusFeedKey = "focusFeedKey" + userId;
// 如果 zset 為空,先初始化
if (!zSetRedisTemplate.exists(focusFeedKey)) {
initFocusIdeaSet(userId);
}
// 如果 zset 存在,但是存在 0 值
Double zscore = zSetRedisTemplate.zscore(focusFeedKey, "0");
if (zscore != null && zscore > 0) {
return null;
}
//分頁(yè)
int offset = (page - 1) * size;
long score = System.currentTimeMillis();
// 按 score 值從大到小從 zSet 中取出 FeedId 集合
List<String> list = zSetRedisTemplate.zrevrangeByScore(focusFeedKey, score, 0, offset, size);
List<FeedDto> result = new ArrayList<>();
if (QlchatUtil.isNotEmpty(list)) {
for (String s : list) {
// 根據(jù) feedId 從緩存中 load 出 feed
FeedDto feedDto = this.loadCache(Long.valueOf(s));
if (feedDto != null) {
result.add(feedDto);
}
}
}
return result;
}
/**
* 初始化關(guān)注的人的信息流 zSet
*/
private void initFocusFeedSet( Long userId) {
String focusFeedKey = "focusFeedKey" + userId;
zSetRedisTemplate.del(focusIdeaKey);
// 從數(shù)據(jù)庫(kù)中加載當(dāng)前用戶關(guān)注的人發(fā)布過的 Feed
List<Feed> list = this.feedMapper.listFocusFeed(userId);
if (QlchatUtil.isEmpty(list)) {
//保存0,避免空數(shù)據(jù)頻繁查庫(kù)
zSetRedisTemplate.zadd(focusFeedKey, 1, "0");
zSetRedisTemplate.expire(focusFeedKey, RedisKeyConstants.ONE_MINUTE * 5);
return;
}
// 遍歷 FeedList,把 FeedId 存到 zSet 中
for (Feed feed : list) {
zSetRedisTemplate.zadd(focusFeedKey, feed.getCreateTime().getTime(), feed.getId().toString());
}
zSetRedisTemplate.expire(focusFeedKey, 60 * 60 * 60);
}
2.關(guān)注的用戶發(fā)布/刪除新的 feed
每當(dāng)用戶發(fā)布/刪除新的 feed,我們需要更新該用戶所有的粉絲的 Feed流,該步驟一般比較耗時(shí),所以建議異步處理,為了避免一次性load出太多的粉絲數(shù)據(jù),這里采用循環(huán)分頁(yè)查詢。為了避免粉絲的 Feed流過大,我們會(huì)限制 Feed 流的長(zhǎng)度為1000,當(dāng)Feed流長(zhǎng)度超過1000時(shí),會(huì)移除最舊的 Feed。
/**
* 新增/刪除 feed時(shí),處理粉絲 feed 流
*
* @param userId 新增/刪除 feed的用戶id
* @param feedId 新增/刪除 的feedId
* @param type feed_add = 新增feed feed_sub = 刪除feed
*/
public void handleFeed(Long userId, Long feedId, String type) {
Integer currentPage = 1;
Integer size = 1000;
List<FansDto> fansDtos;
while (true) {
Page page = new Page();
page.setSize(size);
page.setPage(currentPage);
fansDtos = this.fansService.listFans(userId, page);
for (FansDto fansDto : fansDtos) {
String focusFeedKey = "focusFeedKey" + userId;
// 如果粉絲 zSet 不存在,退出
if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
continue;
}
// 新增Feed
if ("feed_add".equals(type)) {
this.removeOldestZset(focusFeedKey);
zSetRedisTemplate.zadd(focusFeedKey, System.currentTimeMillis(), feedId);
}
// 刪除Feed
else if ("feed_sub".equals(type)) {
zSetRedisTemplate.zrem(focusFeedKey, feedId);
}
}
if (fansDtos.size() < size) {
break;
}
currentPage++;
}
}
/**
* 刪除 zSet 中最舊的數(shù)據(jù)
*/
private void removeOldestZset(String focusFeedKey){
// 如果當(dāng)前 zSet 大于1000,刪除最舊的數(shù)據(jù)
if (this.zSetRedisTemplate.zcard(focusFeedKey) >= 1000) {
// 獲取當(dāng)前 zSet 中 score 值最小的
List<String> zrevrange = this.zSetRedisTemplate.zrevrange(focusFeedKey, -1, -1, String.class);
if (QlchatUtil.isNotEmpty(zrevrange)) {
this.zSetRedisTemplate.zrem(focusFeedKey, zrevrange.get(0));
}
}
}
3.用戶新增關(guān)注/取消關(guān)注
這里比較簡(jiǎn)單,新增/取消關(guān)注,把新關(guān)注的 Feed 往自己的 Feed流中增加/刪除 即可,但是同樣需要異步處理。
/**
* 關(guān)注/取關(guān) 時(shí),處理followerId的zSet
*
* @param followId 被關(guān)注的人
* @param followerId 當(dāng)前用戶
* @param type focus = 關(guān)注 unfocus = 取關(guān)
*/
public void handleFocus( Long followId, Long followerId, String type) {
String focusFeedKey = "focusFeedKey" + userId;
// 如果粉絲 zSet 不存在,退出
if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
return;
}
List<FeedDto> FeedDtos = this.listFeedByFollowId(source, followId);
for (FeedDto feedDto : FeedDtos) {
// 關(guān)注
if ("focus".equals(type)) {
this.removeOldestZset(focusFeedKey);
this.zSetRedisTemplate.zadd(focusFeedKey, feedDto.getCreateTime().getTime(), feedDto.getId());
}
// 取關(guān)
else if ("unfocus".equals(type)) {
this.zSetRedisTemplate.zrem(focusFeedKey, feedDto.getId());
}
}
}
上面展示的是核心代碼,僅僅是為大家提供一個(gè)實(shí)現(xiàn)思路,并不是直接可運(yùn)行的代碼,畢竟真正實(shí)現(xiàn)還會(huì)涉及到很多其他的無(wú)關(guān)要緊的類。