張開濤:京東業務數據應用級緩存示例
一、多級緩存API封裝
我們的業務數據如商品類目、店鋪、商品基本信息都可以進行適當的本地緩存,以提升性能。對于多實例的情況時不僅會使用本地緩存,還會使用分布式緩存,因此需要進行適當的API封裝以簡化緩存操作。
1. 本地緩存初始化
- public class LocalCacheInitService extends BaseService {
- @Override
- publicvoid afterPropertiesSet() throws Exception {
- //商品類目緩存
- Cache<String, Object> categoryCache =
- CacheBuilder.newBuilder()
- .softValues()
- .maximumSize(1000000)
- .expireAfterWrite(Switches.CATEGORY.getExpiresInSeconds()/ 2, TimeUnit.SECONDS)
- .build();
- addCache(CacheKeys.CATEGORY_KEY, categoryCache);
- }
- privatevoid addCache(String key, Cache<?, ?> cache) {
- localCacheService.addCache(key,cache);
- }
- }
本地緩存過期時間使用分布式緩存過期時間的一半,防止本地緩存數據緩存時間太長造成多實例間的數據不一致。
另外,將緩存KEY前綴與本地緩存關聯,從而匹配緩存KEY前綴就可以找到相關聯的本地緩存。
2. 寫緩存API封裝
先寫本地緩存,如果需要寫分布式緩存,則通過異步更新分布式緩存。
- public void set(final String key, final Object value, final intremoteCacheExpiresInSeconds) throws RuntimeException {
- if (value== null) {
- return;
- }
- //復制值對象
- //本地緩存是引用,分布式緩存需要序列化
- //如果不復制的話,則假設之后數據改了將造成本地緩存與分布式緩存不一致
- final Object finalValue = copy(value);
- //如果配置了寫本地緩存,則根據KEY獲得相關的本地緩存,然后寫入
- if (writeLocalCache) {
- Cache localCache = getLocalCache(key);
- if(localCache != null) {
- localCache.put(key, finalValue);
- }
- }
- //如果配置了不寫分布式緩存,則直接返回
- if (!writeRemoteCache) {
- return;
- }
- //異步更新分布式緩存
- asyncTaskExecutor.execute(() -> {
- try {
- redisCache.set(key,JSONUtils.toJSON(finalValue), remoteCacheExpiresInSeconds);
- } catch(Exception e) {
- LOG.error("updateredis cache error, key : {}", key, e);
- }
- });
- }
此處使用了異步更新,目的是讓用戶請求盡快返回。而因為有本地緩存,所以即使分布式緩存更新比較慢又產生了回源,也可以在本地緩存***。
3. 讀緩存API封裝
先讀本地緩存,本地緩存不***的再批量查詢分布式緩存,在查詢分布式緩存時通過分區批量查詢。
- private Map innerMget(List<String> keys, List<Class> types) throwsException {
- Map<String, Object> result = Maps.newHashMap();
- List<String> missKeys = Lists.newArrayList();
- List<Class> missTypes = Lists.newArrayList();
- //如果配置了讀本地緩存,則先讀本地緩存
- if(readLocalCache) {
- for(int i = 0; i < keys.size(); i++) {
- String key = keys.get(i);
- Class type = types.get(i);
- Cache localCache = getLocalCache(key);
- if(localCache != null) {
- Object value = localCache.getIfPresent(key);
- result.put(key, value);
- if (value == null) {
- missKeys.add(key);
- missTypes.add(type);
- }
- } else {
- missKeys.add(key);
- missTypes.add(type);
- }
- }
- }
- //如果配置了不讀分布式緩存,則返回
- if(!readRemoteCache) {
- returnresult;
- }
- finalMap<String, String> missResult = Maps.newHashMap();
- //對KEY分區,不要一次性批量調用太大
- final List<List<String>>keysPage = Lists.partition(missKeys, 10);
- List<Future<Map<String, String>>> pageFutures = Lists.newArrayList();
- try {
- //批量獲取分布式緩存數據
- for(final List<String>partitionKeys : keysPage) {
- pageFutures.add(asyncTaskExecutor.submit(() -> redisCache.mget(partitionKeys)));
- }
- for(Future<Map<String,String>> future : pageFutures) {
- missResult.putAll(future.get(3000, TimeUnit.MILLISECONDS));
- }
- } catch(Exception e) {
- pageFutures.forEach(future -> future.cancel(true));
- throw e;
- }
- //合并result和missResult,此處實現省略
- return result;
- }
此處將批量讀緩存進行了分區,防止亂用批量獲取API。
二、NULL Cache
首先,定義NULL對象。
- private static final String NULL_STRING =new String();
當DB沒有數據時,寫入NULL對象到緩存
- //查詢DB
- String value = loadDB();
- //如果DB沒有數據,則將其封裝為NULL_STRING并放入緩存
- if(value == null) {
- value = NULL_STRING;
- }
- myCache.put(id, value);
讀取數據時,如果發現NULL對象,則返回null,而不是回源到DB
- value = suitCache.getIfPresent(id);
- //DB沒有數據,返回null
- if(value == NULL_STRING) {
- return null;
- }
通過這種方式可以防止當KEY對應的數據在DB不存在時頻繁查詢DB的情況。
三、強制獲取***數據
在實際應用中,我們經常需要強制更新數據,此時就不能使用緩存數據了,可以通過配置ThreadLocal開關來決定是否強制刷新緩存(refresh方法要配合CacheLoader一起使用)。
- if(ForceUpdater.isForceUpdateMyInfo()) {
- myCache.refresh(skuId);
- }
- String result = myCache.get(skuId);
- if(result == NULL_STRING) {
- return null;
- }
四、失敗統計
- private LoadingCache<String, AtomicInteger> failedCache =
- CacheBuilder.newBuilder()
- .softValues()
- .maximumSize(10000)
- .build(new CacheLoader<String, AtomicInteger>() {
- @Override
- public AtomicIntegerload(String skuId) throws Exception {
- return new AtomicInteger(0);
- }
- });
當失敗時,通過failedCache.getUnchecked(id).incrementAndGet()增加失敗次數;當成功時,使用failedCache.invalidate(id)失效緩存。通過這種方式可以控制失敗重試次數,而且又是內存敏感緩存。當內存不足時,可以清理該緩存騰出一些空間。
五、延遲報警
- private static LoadingCache<String, Integer> alarmCache =
- CacheBuilder.newBuilder()
- .softValues()
- .maximumSize(10000).expireAfterAccess(1, TimeUnit.HOURS)
- .build(new CacheLoader<String, Integer>() {
- @Override
- public Integer load(String key) throws Exception {
- return 0;
- }
- });
- //報警代碼
- Integer count = 0;
- if(redis != null) {
- StringcountStr = Objects.firstNonNull(redis.opsForValue().get(key), "0");
- count =Integer.valueOf(countStr);
- } else {
- count = alarmCache.get(key);
- }
- if(count % 5 == 0) { //5次報一次
- //報警
- }
- countcount = count + 1;
- if(redis != null) {
- redis.opsForValue().set(key,String.valueOf(count), 1, TimeUnit. HOURS);
- } else {
- alarmCache.put(key,count);
- }
如果一出問題就報警,則存在報警量非常多或者假報警,因此,可以考慮N久報警了M次,才真正報警。此時,也可以使用Cache來統計。本示例還加入了Redis分布式緩存記錄支持。
六、性能測試
筆者使用JMH 1.14進行基準性能測試,比如測試寫。
- @Benchmark
- @Warmup(iterations = 10, time = 10, timeUnit =TimeUnit.SECONDS)
- @Measurement(iterations = 10, time = 10, timeUnit= TimeUnit.SECONDS)
- @BenchmarkMode(Mode.Throughput)
- @OutputTimeUnit(TimeUnit.SECONDS)
- @Fork(1)
- public void test_1_Write() {
- counterWritercounterWriter= counterWriter + 1;
- myCache.put("key"+ counterWriter, "value" + counterWriter);
- }
使用JMH時首先進行JVM預熱,然后進行度量,產生測試結果(本文使用吞吐量)。建議讀者按照需求進行基準性能測試來選擇適合自己的緩存框架。
【本文是51CTO專欄作者張開濤的原創文章,作者微信公眾號:開濤的博客( kaitao-1234567)】