美團一面:項目中使用過Redis嗎?
引言
Redis,作為一種開源的、基于內存且支持持久化的鍵值存儲系統,以其卓越的性能、豐富靈活的數據結構和高度可擴展性在全球范圍內廣受歡迎。Redis不僅提供了一種簡單直觀的方式來存儲和檢索數據,更因其支持數據結構如字符串、哈希、列表、集合、有序集合等多種類型,使得其在眾多場景下表現出強大的適用性和靈活性。
Redis的核心特點包括:
- 高性能:基于內存操作,讀寫速度極快,特別適用于對性能要求高的實時應用。
- 數據持久化:支持RDB和AOF兩種持久化方式,確保即使在服務器重啟后也能恢復數據。
- 分布式的特性:通過主從復制、哨兵模式或集群模式,Redis可以輕松地構建高可用和可擴展的服務。
- 豐富的數據結構:提供了多種數據結構支持,便于開發人員根據實際需求進行數據建模和處理。
Redis的廣泛應用跨越了多個行業和技術領域,諸如網站加速、緩存服務、會話管理、實時統計、排行榜、消息隊列、分布式鎖、社交網絡功能、限流控制等。本文將深入探討Redis在這些場景下的具體應用方法及其背后的工作原理,旨在幫助開發者更好地理解和掌握Redis,以應對各種復雜的業務需求,并充分發揮其潛能。同時,我們也將關注如何在實踐中平衡Redis的性能、安全性、一致性等方面的挑戰,為實際項目帶來更高的價值。
數據緩存
在高并發訪問的場景下,數據庫經常成為系統的瓶頸。Redis因其內存存儲、讀取速度快的特點,常被用作數據庫查詢結果的緩存層,有效降低數據庫負載,提高整體系統的響應速度。這也是我們使用場景頻率最高的一個。
通常我們選擇使用String類型來存儲數據庫查詢結果,如單個實體對象的JSON序列化形式。
@Service
public class ProductService {
@Autowired
private RedisTemplate<String, Product> redisTemplate;
// 使用@Cacheable注解進行緩存
@Cacheable(value = "productCache", key = "#id")
public Product getProductById(String id) {
// 此處是從數據庫或其他數據源獲取商品的方法
// 在實際場景中,如果緩存命中,則不會執行下面的數據庫查詢邏輯
return getProductFromDatabase(id);
}
}
而使用Redis作為緩存使用時,有一些特別需要注意的事項:
- 緩存穿透:當查詢的數據在數據庫和緩存中均不存在時,可能會導致大量的無效請求直接打到數據庫。可通過布隆過濾器預防緩存穿透。
- 緩存雪崩:若大量緩存在同一時刻失效,所有請求都會涌向數據庫,造成瞬時壓力過大。可通過設置合理的過期時間分散、預加載或采用Redis集群等方式避免。
- 緩存一致性:當數據庫數據發生變化時,需要及時更新緩存,避免數據不一致。可以采用主動更新策略(如監聽數據庫binlog)或被動更新策略(如在讀取時判斷數據新鮮度)。
而對于數據緩存,我們常使用的業務場景如熱點數據存儲、全頁緩存等。
會話管理
在說會話管理之前,我們來簡單介紹一下Spring Session。Spring Session 是 Spring Framework 的一個項目,旨在簡化分布式應用程序中的會話管理。在傳統的基于 Servlet 的應用程序中,會話管理是通過 HttpSession 接口實現的,但在分布式環境中,每個節點上的 HttpSession 不能簡單地共享,因此需要一種機制來管理會話并確保會話在集群中的一致性。
Spring Session 提供了一種簡單的方法來解決這個問題,它將會話數據從容器(如 Tomcat 或 Jetty)中分離出來,并存儲在外部數據存儲(如 Redis、MongoDB、JDBC 等)中。這樣,不同節點上的應用程序實例可以共享相同的會話數據,實現分布式環境下的會話管理。
所以在Web應用中,Redis用于會話管理時,可以取代傳統基于服務器內存或Cookie的會話存儲方案。通過將會話數據序列化后存儲為Redis中的鍵值對,實現跨多個服務器實例的會話共享。
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
<version>3.2.0</version>
</dependency>
然后我們在啟動類中,使用@EnableRedisHttpSession啟用Redis作為會話存儲。
@Configuration
@EnableRedisHttpSession
public class RedisSessionConfig {
@Bean
public RedisConnectionFactory connectionFactory() {
// 這里假設你已經在application.properties或application.yml中配置了Redis的信息
// 根據實際情況填寫Redis服務器地址、端口等信息
return new LettuceConnectionFactory();
}
}
以上是一個簡單的Spring Session使用Redis進行會話管理的示例代碼。通過這種方式,我們可以輕松地在分布式環境中管理會話,并確保會話數據的一致性和可靠性。如果需要了解一些具體的用法,請自行參考Spring Session。
排行榜與計分板
有序集合(Sorted Sets)是Redis的一種強大數據結構,可以用來實現動態排行榜,每個成員都有一個分數,按分數排序。有序集合中的每一個成員都有一個分數(score),成員依據其分數進行排序,且成員本身是唯一的。
當需要給某個用戶增加積分或改變其排名時,可以使用ZADD命令向有序集合中添加或更新成員及其分數。例如,ZADD leaderboard score member,這里的ranking是有序集合的名稱,score是用戶的積分值,member是用戶ID。
查詢排行榜時,可以使用ZRANGE命令獲取指定范圍內的成員及其分數,例如,ZRANGE ranking 0 -1 WITHSCORES,這條命令會返回集合中所有的成員及其對應的分數,按照分數從低到高排序。
若要按照分數從高到低顯示排行榜,使用ZREVRANGE命令,如ZREVRANGE ranking 0 -1 WITHSCORES。
@Service
public class RankingService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void addToRanking(String playerName, int score) {
redisTemplate.opsForZSet().add("ranking", playerName, score);
}
public List<RankingInfo> getRanking() {
List<RankingInfo> rankingInfos = new ArrayList<>();
Set<ZSetOperations.TypedTuple<String>> rankingSet = redisTemplate.opsForZSet().rangeWithScores("ranking", 0, -1);
for (ZSetOperations.TypedTuple<String> tuple : rankingSet) {
RankingInfo rankingInfo = new RankingInfo();
rankingInfo.setPlayerName(tuple.getValue());
rankingInfo.setScore(tuple.getScore().intValue());
rankingInfos.add(rankingInfo);
System.out.println("playerName: " + tuple.getValue() + ", score: " + tuple.getScore().intValue());
}
return rankingInfos;
}
}
我們模擬請求,往redis中填入一些數據,在獲取排行榜:
圖片
在實際場景中,有序集合非常適合處理實時動態變化的排行榜數據,比如京東的月度銷量榜單、商品按時間的上新排行榜等,因為它的更新和查詢操作都是原子性的,并且能高效地支持按分數排序的操作。
計數器與統計
Redis的原子性操作如INCR和DECR可以用于計數,確保在高并發環境下的計數準確性。比如在流量統計、電商網站商品的瀏覽量、視頻網站視頻的播放數贊等場景的應用。
@Service
public class CounterService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void incrementLikeCount(String postId) {
redisTemplate.opsForValue().increment(postId + ":likes");
}
public void decrementLikeCount(String postId) {
redisTemplate.opsForValue().decrement(postId + ":likes");
}
public long getLikeCount(String postId) {
String value = redisTemplate.opsForValue().get(postId + ":likes");
return StringUtils.isBlank(value) ? 0 : Long.parseLong(value);
}
}
在使用Redis實現點贊,統計等功能時一定要考慮設置計數值的最大值或最小值限制,以及過期策略。
分布式鎖
分布式鎖
Redis的SETNX(設置并檢查是否存在)和EXPIRE命令組合可以實現分布式鎖,因其操作時原子性的,所以可以確保在分布式環境下同一資源只能被一個客戶端修改。
使用 Redis 實現分布式鎖通常會使用 Redis 的 SETNX 命令。這個命令用于設置一個鍵的值,如果這個鍵不存在的話,它會設置成功并返回 1,如果這個鍵已經存在,則設置失敗并返回 0。結合 Redis 的 EXPIRE 命令,可以為這個鍵設置一個過期時間,確保即使獲取鎖的客戶端異常退出,鎖也會在一段時間后自動釋放。
@Component
public class DistributedLock {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public boolean acquireLock(String lockKey, String requestId, long expireTime) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime);
return result != null && result;
}
public void releaseLock(String lockKey, String requestId) {
String value = redisTemplate.opsForValue().get(lockKey);
if (value != null && value.equals(requestId)) {
redisTemplate.delete(lockKey);
}
}
}
使用分布式鎖時,務必確保在加鎖和解鎖操作之間處理完臨界區代碼,否則可能出現死鎖。并且要注意鎖定超時時間應當合理設置,以避免鎖定資源長時間無法釋放。
關于分布式鎖,推薦使用一些第三方的分布式鎖框架,例如Redisson
全局ID
在全局ID生成的場景中,我們可以使用 Redis 的原子遞增操作來實現。通過對 Redis 中的一個特定的 key 進行原子遞增操作,可以確保生成的ID是唯一的。
@Component
public class UniqueIdGenerator {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public long generateUniqueId(String key) {
return redisTemplate.opsForValue().increment(key, 1);
}
}
庫存扣減
在扣減庫存的場景中,我們可以使用 Redis 的原子遞減操作來實現。將庫存數量存儲在 Redis 的一個特定key中(例如倉庫編碼:SKU),然后通過遞減操作來實現庫存的扣減。這樣可以保證在高并發情況下,庫存扣減的原子性。
@Component
public class StockService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**商品庫存的key*/
private static final String STOCK_PREFIX = "stock:%s:%s";
/**
* 扣減庫存
* @param warehouseCode
* @param productId
* @param quantity
* @return
*/
public boolean decreaseStock(String warehouseCode, String productId, long quantity) {
String key = String.format(STOCK_PREFIX, warehouseCode, productId);
Long stock = redisTemplate.opsForValue().decrement(key, quantity);
return stock >= 0;
}
}
秒殺
在秒殺場景中,使用Lua腳本。Lua 腳本可以在 Redis 服務器端原子性地執行多個命令,這樣可以避免在多個命令之間出現競態條件。
我們使用Lua腳本來檢查庫存是否足夠并進行扣減操作。如果庫存足夠,則減少庫存并返回 true;如果庫存不足,則直接返回 false。通過 Lua 腳本的原子性執行,可以確保在高并發情況下,庫存扣減操作的正確性和一致性。
我們先定義一個扣減庫存的lua腳本,使用Lua腳本一次性執行獲取庫存、判斷庫存是否充足以及扣減庫存這三個操作,確保了操作的原子性
-- 獲取Lua腳本參數:商品ID和要購買的數量
local productId = KEYS[1]
local amount = tonumber(ARGV[1])
-- 獲取當前庫存
local currentStock = tonumber(redis.call('GET', 'seckill:product:'..productId))
-- 判斷庫存是否充足
if currentStock <= 0 or currentStock < amount then
return 0
end
-- 扣減庫存
redis.call('DECRBY', 'seckill:product:'..productId, amount)
-- 返回成功標志
return 1
然后在秒殺服務中使用Redis的DefaultRedisScript執行lua腳本,完成秒殺
@Component
public class SeckillService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 初始化RedisScript對象
*/
private final DefaultRedisScript<Long> seckillScript = new DefaultRedisScript<>();
{
seckillScript.setLocation(new ClassPathResource("rate_limiter.lua"));
seckillScript.setResultType(Long.class);
}
public boolean seckillyLua(String productId, int amount){
// 設置Lua腳本參數
List<String> keys = Collections.singletonList(productId);
List<String> args = Collections.singletonList(Integer.toString(amount));
// 執行Lua腳本
Long result = redisTemplate.execute(seckillScript, keys, args);
// 如果執行結果為1,表示秒殺成功
return Objects.equals(result, 1L);
}
}
關于秒殺場景,我們也可以使用WATCH命令監視庫存鍵,然后嘗試獲取并扣減庫存。如果在WATCH之后、EXEC之前庫存發生了變化,exec方法會返回null,此時我們取消WATCH并重新嘗試整個流程,直到成功扣減庫存為止。這樣就實現了基于Redis樂觀鎖的秒殺場景,有效防止了超賣現象。
/**
* 秒殺方法
* @param productId 商品ID
* @param amount 要購買的數量
* @return 秒殺成功與否
*/
@Transactional(rollbackFor = Exception.class)
public boolean seckilByWatch(String productId, int amount) {
// 樂觀鎖事務操作
while (true) {
// WATCH指令監控庫存鍵
redisTemplate.watch("stock:" + productId);
// 獲取當前庫存
String currentStockStr = redisTemplate.opsForValue().get("stock:" + productId);
if (currentStockStr == null) {
// 庫存不存在,可能是商品已售罄或異常情況
return false;
}
int currentStock = Integer.parseInt(currentStockStr);
// 判斷庫存是否充足
if (currentStock < amount) {
// 庫存不足,取消WATCH并退出循環
redisTemplate.unwatch();
return false;
}
// 開啟Redis事務
redisTemplate.multi();
// 執行扣減庫存操作
redisTemplate.opsForValue().decrement("stock:" + productId, amount);
// 執行其他與秒殺相關的操作,如增加訂單、更新用戶余額等...
// 提交事務,如果在此期間庫存被其他客戶端修改,則exec返回null
List<Object> results = redisTemplate.exec();
// 如果事務執行成功,跳出循環
if (!results.isEmpty()) {
return true;
}
}
}
消息隊列與發布/訂閱
Redis的發布/訂閱(Pub/Sub)模式,可以實現一個簡單的消息隊列。發布/訂閱模式允許消息的發布者(發布消息)和訂閱者(接收消息)之間解耦,消息的發布者不需要知道消息的接收者是誰,從而實現了一對多的消息傳遞。
首先我們需要定義一個消息監聽器,我們可以實現這個借口并實現其中的方法來處理接收到的消息。這樣可以根據具體的業務需求來定義消息的處理邏輯。
public interface MessageListener {
void onMessage(String channel, String message);
}
然后我們就可以定義消息的生產者以及消費者。publish 方法用于向指定頻道發布消息,我們使用 RedisTemplate 的 convertAndSend 方法來發送消息到指定的頻道。而subscribe方法用于訂閱指定的頻道,并設置消息監聽器。當有消息發布到指定的頻道時,消息監聽器會收到消息并進行處理。
@Component
public class MessageQueue {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void publish(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
public void subscribe(String channel, MessageListener listener) {
redisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> {
listener.onMessage(channel, message);
}, channel.getBytes());
}
}
使用Redis的發布訂閱模式實現一個輕量級的隊列時要注意:Pub/Sub是非持久化的,一旦消息發布,沒有訂閱者接收的話,消息就會丟失。還有就是Pub/Sub不適合大規模的消息堆積場景,因為它不保證消息順序和重復消費,更適合實時廣播型消息推送。
社交網絡
在社交網絡中,Redis可以利用集合(Set)、哈希(Hash)和有序集合(Sorted Set)等數據結構構建用戶關系圖譜。
使用哈希(Hash)數據結構存儲用戶的個人資料信息,每個用戶對應一個哈希表,其中包含用戶的各種屬性,比如用戶名、年齡、性別等。
@Component
public class RelationshipGraphService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**用戶資料*/
private static final String USER_PROFILE_PREFIX = "user_profile:";
/**
* 存儲用戶個人資料
* @param userId
* @param profile
*/
public void setUserProfile(String userId, Map<String, String> profile) {
String key = USER_PROFILE_PREFIX + userId;
redisTemplate.opsForHash().putAll(key, profile);
}
/**
* 獲取用戶個人資料
* @param userId
* @return
*/
public Map<Object, Object> getUserProfile(String userId) {
String key = USER_PROFILE_PREFIX + userId;
return redisTemplate.opsForHash().entries(key);
}
}
使用集合(Set)數據結構來存儲用戶的好友關系。每個用戶都有一個集合,其中包含了他的所有好友的用戶ID。
@Component
public class RelationshipGraphService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**用戶好友*/
private static final String FRIENDS_PREFIX = "friends:";
/**
* 添加好友關系
* @param userId
* @param friendId
*/
public void addFriend(String userId, String friendId) {
String key = FRIENDS_PREFIX + userId;
redisTemplate.opsForSet().add(key, friendId);
}
/**
* 獲取用戶的所有好友
* @param userId
* @return
*/
public Set<String> getFriends(String userId) {
String key = FRIENDS_PREFIX + userId;
return redisTemplate.opsForSet().members(key);
}
}
同理,我們還可以實現點贊的業務場景
@Service
public class LikeService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 點贊
* @param objectId
* @param userId
*/
public void like(String objectId, String userId) {
// 將點贊人放入zset中
redisTemplate.opsForSet().add(getLikeKey(objectId), userId);
}
/**
* 取消點贊
* @param objectId
* @param userId
*/
public void unlike(String objectId, String userId) {
// 減少點贊人數
redisTemplate.opsForSet().remove(getLikeKey(objectId), userId);
}
/**
* 是否點贊
* @param objectId
* @param userId
* @return
*/
public Boolean isLiked(String objectId, String userId) {
return redisTemplate.opsForSet().isMember(getLikeKey(objectId), userId);
}
/**
* 獲取點贊數
* @param objectId
* @return
*/
public Long getLikeCount(String objectId) {
return redisTemplate.opsForSet().size(getLikeKey(objectId));
}
/**
* 獲取所有點贊的用戶
* @param objectId
* @return
*/
public Set<String> getLikedUsers(String objectId) {
return redisTemplate.opsForSet().members(getLikeKey(objectId));
}
private String getLikeKey(String objectId) {
return "likes:" + objectId;
}
}
使用有序集合(Sorted Set)數據結構來存儲用戶的關注者列表。有序集合中的成員是關注者的用戶ID,而分數可以是關注時間或者其他指標,比如活躍度。
@Component
public class RelationshipGraphService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**用戶關注者*/
private static final String FOLLOWERS_PREFIX = "followers:";
/**
* 添加關注者
* @param userId
* @param followerId
* @param score
*/
public void addFollower(String userId, String followerId, double score) {
String key = FOLLOWERS_PREFIX + userId;
redisTemplate.opsForZSet().add(key, followerId, score);
}
/**
* 獲取用戶的關注者列表(按照關注時間排序)
* @param userId
* @return
*/
public Set<String> getFollowers(String userId) {
String key = FOLLOWERS_PREFIX + userId;
return redisTemplate.opsForZSet().range(key, 0, -1);
}
}
除此之外,我們還可以實現可能認識的人,共同好友等業務場景。
限流與速率控制
Redis可以精確地實施限流策略,如使用INCR命令結合Lua腳本實現滑動窗口限流。
創建一個Lua腳本,該腳本負責檢查在一定時間段內請求次數是否超過限制。
-- rate_limiter.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local timeWindow = tonumber(ARGV[2]) -- 時間窗口,例如單位為秒
-- 獲取當前時間戳
local currentTime = redis.call('TIME')[1]
-- 獲取最近timeWindow秒內的請求次數
local count = redis.call('ZCOUNT', key .. ':requests', currentTime - timeWindow, currentTime)
-- 如果未超過限制,則累加請求次數,并返回true
if count < limit then
redis.call('ZADD', key .. ':requests', currentTime, currentTime)
return 1
else
return 0
end
限流服務中Redis使用DefaultRedisScript執行Lua腳本
@Component
public class RateLimiter {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**限流Key*/
private static final String TATE_LIMITER_KEY = "rate-limit:%s";
/**規定的時間窗口內允許的最大請求數量*/
private static final Integer LIMIT = 100;
/**限流策略的時間窗口長度,單位是秒*/
private static final Integer TIME_WINDOW = 60;
/**
* 初始化RedisScript對象
*/
private final DefaultRedisScript<Long> rateLimiterScript = new DefaultRedisScript<>();
{
rateLimiterScript.setLocation(new ClassPathResource("rate_limiter.lua"));
rateLimiterScript.setResultType(Long.class);
}
/**
* 限流方法 1分鐘內最多100次請求
* @param userId
* @return
*/
public boolean allowRequest(String userId) {
String key = String.format(TATE_LIMITER_KEY, userId);
List<String> keys = Collections.singletonList(key);
List<String> args = Arrays.asList(String.valueOf(LIMIT), String.valueOf(TIME_WINDOW));
// 執行Lua腳本
Long result = redisTemplate.execute(rateLimiterScript, keys, args);
// 結果為1表示允許請求,0表示請求被限流
return Objects.equals(result, 1L);
}
}
位運算與位圖應用
Redis的位圖(BitMap)是一種特殊的數據結構,它允許我們在單一的字符串鍵(String Key)中存儲一系列二進制位(bits),每個位對應一個布爾值(0或1),并通過偏移量(offset)來定位和操作這些位。位圖極大地節省了存儲空間,尤其適合于大規模數據的標記、統計和篩選場景。
在位圖中,每一位相當于一個標識符,例如可以用來表示用戶是否在線、商品是否有庫存、用戶是否已讀郵件等。相對于傳統的鍵值對存儲。位圖可以非常快速地統計滿足特定條件的元素個數,如統計在線用戶數、激活用戶數等。
@Service
public class UserOnlineStatusService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String ONLINE_STATUS_KEY = "online_status";
private static final String RETENTION_RATE_KEY_PREFIX = "retention_rate:";
private static final String DAILY_ACTIVITY_KEY_PREFIX = "daily_activity:";
/**
* 設置用戶在線狀態為在線
* @param userId
*/
public void setUserOnline(long userId) {
redisTemplate.opsForValue().setBit(ONLINE_STATUS_KEY, userId, true);
}
/**
* 設置用戶在線狀態為離線
* @param userId
*/
public void setUserOffline(long userId) {
redisTemplate.opsForValue().setBit(ONLINE_STATUS_KEY, userId, false);
}
/**
* 獲取用戶在線狀態
* @param userId
* @return
*/
public boolean isUserOnline(long userId) {
return redisTemplate.opsForValue().getBit(ONLINE_STATUS_KEY, userId);
}
/**
* 統計在線用戶數量
* @return
*/
public long countOnlineUsers() {
return getCount(ONLINE_STATUS_KEY);
}
/**
* 記錄用戶的留存情況
* @param userId
* @param daysAgo
*/
public void recordUserRetention(long userId, int daysAgo) {
String key = RETENTION_RATE_KEY_PREFIX + LocalDate.now().minusDays(daysAgo).toString();
redisTemplate.opsForValue().setBit(key, userId, true);
}
/**
* 獲取指定日期的留存率
* @param daysAgo
* @return
*/
public double getRetentionRate(int daysAgo) {
String key = RETENTION_RATE_KEY_PREFIX + LocalDate.now().minusDays(daysAgo).toString();
long totalUsers = countOnlineUsers();
long retainedUsers = getCount(key);
return (double) retainedUsers / totalUsers * 100;
}
/**
* 記錄用戶的每日活躍情況
* @param userId
*/
public void recordUserDailyActivity(long userId) {
String key = DAILY_ACTIVITY_KEY_PREFIX + LocalDate.now().toString();
redisTemplate.opsForValue().setBit(key, userId, true);
}
/**
* 獲取指定日期的活躍用戶數量
* @param date
* @return
*/
public long countDailyActiveUsers(LocalDate date) {
String key = DAILY_ACTIVITY_KEY_PREFIX + date.toString();
return getCount(key);
}
/**
* 獲取最近幾天每天的活躍用戶數量列表
* @param days
* @return
*/
public List<Long> getDailyActiveUsers(int days) {
LocalDate currentDate = LocalDate.now();
List<Long> results = Lists.newArrayList();
for (int i = 0; i < days; i++) {
LocalDate date = currentDate.minusDays(i);
String key = DAILY_ACTIVITY_KEY_PREFIX + date.toString();
results.add(getCount(key));
}
return results;
}
/**
* 獲取key下的數量
* @param key
* @return
*/
private long getCount(String key) {
return (long) redisTemplate.execute((RedisCallback<Long>) connection -> connection.bitCount(key.getBytes()));
}
}
最新列表
Redis的List(列表)是一個基于雙向鏈表實現的數據結構,允許我們在列表頭部(左端)和尾部(右端)進行高效的插入和刪除操作。LPUSH命令:全稱是LIST PUSH LEFT,用于將一個或多個值插入到列表的最左邊(頭部),在這里用于將最新生成的內容ID推送到列表頂部,保證列表中始終是最新的內容排在前面。
LTRIM命令用于修剪列表,保留指定范圍內的元素,從而限制列表的長度。在這個場景中,每次添加新ID后都會執行LTRIM操作,只保留最近的N個ID,確保列表始終保持固定長度,即只包含最新的內容ID。
@Service
public class LatestListService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String LATEST_LIST_KEY = "latest_list";
/**
* 添加最新內容ID到列表頭部
* @param contentId 內容ID
*/
public void addLatestContent(String contentId) {
ListOperations<String, String> listOps = redisTemplate.opsForList();
listOps.leftPush(LATEST_LIST_KEY, contentId);
// 限制列表最多存儲N個ID,假設N為100
listOps.trim(LATEST_LIST_KEY, 0, 99);
}
/**
* 獲取最新的N個內容ID
* @param count 要獲取的數量,默認為10
* @return 最新的內容ID列表
*/
public List<String> getLatestContentIds(int count) {
ListOperations<String, String> listOps = redisTemplate.opsForList();
return listOps.range(LATEST_LIST_KEY, 0, count - 1);
}
}
抽獎
借助Redis的Set數據結構以及其內置的Spop命令,我們能夠高效且隨機地選定抽獎獲勝者。Set作為一種不允許包含重復成員的數據集合,其特性天然適用于防止抽獎過程中出現重復參與的情況,確保每位參與者僅擁有一個有效的抽獎資格。
由于Set內部元素的排列不具備確定性,這意味著在對集合執行隨機獲取操作時,每一次選取都將獨立且不可預測,這與抽獎活動中所要求的隨機公平原則高度契合。
Redis的Spop命令允許我們在單個原子操作下,不僅隨機選取,還會從Set中移除指定數量(默認為1)的元素。這一原子操作機制尤為關鍵,在高并發環境下,即便有多個請求同時進行抽獎,Spop也能夠確保同一時刻只有一個請求能成功獲取并移除一個元素,有效避免了重復選擇同一位參與者作為獲獎者的可能性。
@Service
public class LotteryService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String PARTICIPANTS_SET_KEY = "lottery:participants";
/**
* 添加參與者到抽獎名單
* @param participant 參與者ID
*/
public void joinLottery(String participant) {
redisTemplate.opsForSet().add(PARTICIPANTS_SET_KEY, participant);
}
/**
* 抽取一名幸運兒
* @return 幸運兒ID
*/
public String drawWinner() {
// 使用Spop命令隨機抽取一個參與者
return redisTemplate.opsForSet().pop(PARTICIPANTS_SET_KEY);
}
/**
* 抽取N個幸運兒
* @param count 抽取數量
* @return 幸運兒ID列表
*/
public List<String> drawWinners(int count) {
return redisTemplate.opsForSet().pop(PARTICIPANTS_SET_KEY, count);
}
}
Stream類型
Redis Stream作為一種自Redis 5.0起引入的高級數據結構,專為存儲和處理有序且持久的消息流而設計。可視作一個分布式的、具備持久特性的消息隊列,通過唯一的鍵名來標識每個Stream,其中容納了多個攜帶時間戳和唯一標識符的消息實體。
每條存儲于Stream中的消息都具有全球唯一的message ID,該ID內嵌時間戳和序列編號,旨在確保即使在復雜的集群部署中仍能保持消息的嚴格時序性。這些消息內容會持久存儲在Redis中,確保即使服務器重啟也能安全恢復。
生產者利用XADD指令將新消息添加到Stream中,而消費者則通過XREAD或針對多消費者組場景優化的XREADGROUP命令來讀取并處理消息。XREADGROUP尤其擅長處理多消費者組間的公平分配和持久訂閱,確保消息的公正、有序送達各個消費者。
Stream核心特性之一是支持消費者組機制,消費者組內的不同消費者可獨立地消費消息,并通過XACK命令確認已消費的消息,從而實現了消息的持久化消費和至少一次(at-least-once)交付保證。當消息量超出消費者處理能力時,未處理的消息可在Stream中積壓,直到達到預設的最大容量限制。此外,還能設定消息的有效期(TTL),逾期未被消費的消息將自動剔除。即使在網絡傳輸過程中消息遭受損失,亦可通過message ID保障消息的冪等性重新投遞。盡管網絡條件可能導致消息到達消費者的時間順序與生產者發出的順序有所偏差,但Stream機制確保了每個消息在其內在的時間上下文中依然保持著嚴格的順序關系。
Redis Stream作為一個集消息持久化、多消費者公平競爭、消息追溯和排序等功能于一體的強大消息隊列工具,已在日志采集、實時數據分析、活動追蹤等諸多領域展現出卓越的適用性和價值。
@Component
public class LogCollector {
private static final String LOGS_STREAM_KEY = "logs";
private static final String GROUP_NAME = "log_consumers";
private static final String CONSUMER_NAME = "log_consumer";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 發送日志事件至 Redis Stream
public void sendLogEvent(String message, Map<String, String> attributes) {
StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
RecordId messageId = streamOperations.add(StreamRecords.newRecord()
.ofStrings(attributes)
.withStreamKey(LOGS_STREAM_KEY));
}
// 實時消費日志事件
public StreamRecords<String, String> consumeLogs(int batchSize) {
Consumer consumer = Consumer.from(CONSUMER_NAME, GROUP_NAME);
StreamOffset<String> offset = StreamOffset.create(LOGS_STREAM_KEY, ReadOffset.lastConsumed());
StreamReadOptions<String, String> readOptions = StreamReadOptions.empty().count(batchSize);
return redisTemplate.opsForStream().read(readOptions, StreamOffset.create(LOGS_STREAM_KEY, ReadOffset.lastConsumed()), consumer);
}
}
GEO類型
Redis的GEO數據類型自3.2版本起引入,專為存儲和高效操作含有經緯度坐標的地理位置信息而設計。開發人員利用這一類型可以輕松管理地理位置數據,同時兼顧內存效率和響應速度。
利用GEOADD命令,可以將帶有精確經緯度坐標的數據點歸檔至指定鍵名下的集合中。
可借助GEOPOS命令獲取某一成員的具體經緯度坐標。
通過GEODIST命令,可以準確計算任意兩個地理位置成員之間的地球表面距離,支持多種計量單位,包括米、千米、英里和英尺。
使用GEORADIUS命令,系統可以根據指定的經緯度中心點及半徑范圍檢索出處于該區域內的所有成員地理位置。
GEORADIUSBYMEMBER命令也用于范圍查詢,但其查詢依據是選定成員自身的位置,以此為圓心劃定搜索范圍。
GEO類型在許多場景下都非常有用,例如移動應用中的附近好友查找、商店位置搜索、物流配送中的最近司機調度等。
@Service
public class FriendService {
private static final String FRIEND_LOCATIONS_KEY = "friend_locations";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private GeoOperations<String, FriendLocation> geoOperations; // 自動裝配GeoOperations
public void saveFriendLocation(FriendLocation location) {
geoOperations.add(FRIEND_LOCATIONS_KEY, location.getLongitude(), location.getLatitude(), location);
}
public List<FriendLocation> findFriendsNearby(double myLongitude, double myLatitude, Distance radius) {
Circle circle = new Circle(new Point(myLongitude, myLatitude), radius);
return geoOperations.radius(FRIEND_LOCATIONS_KEY, circle, Metric.KILOMETERS).getContent();
}
}
總結
Redis作為一款高性能、內存型的NoSQL數據庫,憑借其豐富的數據結構、極高的讀寫速度以及靈活的數據持久化策略,在現代分布式系統中扮演著至關重要的角色。它的關鍵價值體現在以下幾個方面:
- 緩存優化:Redis將頻繁訪問的數據存儲在內存中,顯著減少了數據庫的讀取壓力,提升了系統的整體性能和響應速度。
- 分布式支持:通過主從復制、哨兵和集群模式,Redis實現了高度可擴展性和高可用性,滿足大規模分布式系統的需求。
- 數據結構多樣性:Redis支持字符串、哈希、列表、集合、有序集合、Bitmaps、HyperLogLog、Geo等多樣化的數據結構,為多種應用場景提供了便利,如排行榜、社交關系、消息隊列、計數器、限速器等。
- 實時處理與分析:隨著Redis 5.0引入Stream數據結構,使得Redis在日志收集、實時分析、物聯網數據流處理等方面有了更多的可能性。
- 地理位置服務:GEO類型提供了便捷的空間索引和距離計算功能,使得Redis能夠在電商、出行、社交等領域提供附近地點搜索、路線規劃等服務。