深入解析 SpringCloud 的負載衡器 Loadbalancer
這期我們結合之前幾篇關于nacos客戶端緩存來說明一下集成loadbalancer后服務調用時可能存在的問題和解決方案。
一、詳解Spring Cloud loadbalancer服務調用機制
1. loadbalancer使用說明
首先我們引出本文所使用的loadbalancer依賴,可以看到版本為3.1.5,作為服務消費者我們通過loadbalancer作為負載均衡器替換到默認的ribbon,同時我們還引入caffeine觸發Loadbalancer完成基于caffeine的服務緩存裝配:
<!--使用loadbalancer負載均衡器替換ribbon-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
對應我們還是給出本文項目的架構圖,可以看到當前項目給出兩個服務提供者nacos-provider和一個服務消費者nacos-consumer,當服務消費者從nacos拉取到可用服務之后,會通過權重算法調用可用的服務示例:
對應的我們也給出本文服務消費者所用到的負載均衡算法的配置:
/**
* 將負載均衡算法設置為權重算法
*
* @param environment
* @param loadBalancerClientFactory
* @return
*/
@Bean
ReactorLoadBalancer<ServiceInstance> weightedLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new WeightedLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
同時我們也給出WeightedLoadBalancer 這個權重算法的實現,讀者可基于代碼自行了解一下:
/**
* 基于權重的負載均衡算法
*/
public class WeightedLoadBalancer implements ReactorServiceInstanceLoadBalancer {
/**
* loadbalancer 提供的訪問當前服務的名稱
*/
final String serviceId;
/**
* 基于nacos緩存獲取服務列表
*/
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public WeightedLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Autowired
private NacosDiscoveryProperties nacosDiscoveryProperties;
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
//如果serviceInstanceListSupplierProvider不存在則采用NoopServiceInstanceListSupplier返回空實例列表
ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
//基于服務獲取策略supplier略獲取可用實例,采用權重算法返回本地調用的服務實例
return supplier.get(request)
.next()
.map(serviceInstances -> new DefaultResponse(NacosBalancer.getHostByRandomWeight3(serviceInstances)));
}
}
2. loadbalancer自動裝配初始化
基于上述的配置,loadbalancer進行自動裝配的時候就會識別到Caffeine和CaffeineCacheManager的存在,于是觸發caffeineLoadBalancerCacheManager的裝配,該管理類內部有一個cacheMap,本次裝配將會以cachingServiceInstanceListSupplierCache為key,caffeine cache為value作為鍵值對存入,后續所有的服務緩存實例信息都會存儲在這個鍵值對中的caffeine cache中:
對應的我們也給出自動裝配的源碼,如下所示,可以看到因為Caffeine和CaffeineCacheManager的存在,我們觸發了CaffeineBasedLoadBalancerCacheManager的裝配:
@Configuration(proxyBeanMethods = false)
//Caffeine和CaffeineCacheManager都存在觸發自動裝配
@ConditionalOnClass({ Caffeine.class, CaffeineCacheManager.class })
protected static class CaffeineLoadBalancerCacheManagerConfiguration {
@Bean(autowireCandidate = false)
@ConditionalOnMissingBean
LoadBalancerCacheManager caffeineLoadBalancerCacheManager(LoadBalancerCacheProperties cacheProperties) {
//生成CaffeineBasedLoadBalancerCacheManager管理類
return new CaffeineBasedLoadBalancerCacheManager(cacheProperties);
}
}
通過查看CaffeineBasedLoadBalancerCacheManager內部初始化邏輯,最終可以看到setCacheNames這個方法,該方法就會執行就是我們上文中所說的以cachingServiceInstanceListSupplierCache為key,以caffeineCache這個緩存為value的鍵值對初始化工作:
public CaffeineCacheManager(String... cacheNames) {
//基于
setCacheNames(Arrays.asList(cacheNames));
}
public void setCacheNames(@Nullable Collection<String> cacheNames) {
if (cacheNames != null) {
for (String name : cacheNames) {
this.cacheMap.put(name, createCaffeineCache(name));
}
this.dynamic = false;
}
else {
this.dynamic = true;
}
}
初始化后的調試結果如下,我們可以很直觀的看到這個記錄服務實例的緩存鍵值對:
3. loadbalancer如何基于緩存完成服務調用
默認情況下loadbalancer緩存是沒有任何信息的,假設我們nacos-consumer即服務消費者發起對nacos-provider的調用,loadbalancer是如何拿到服務實例的信息呢?
實際上,在loadbalancer初始化的時候,服務實例查詢組件ServiceInstanceListSupplier內部聚合了nacos的服務查詢組件DiscoveryClientServiceInstanceListSupplier,所以當我們通過feign發起調用時,loadbalancer的執行步驟為:
- loadbalancer代理會先通過ServiceInstanceListSupplier到緩存中查看是否存在服務提供者nacos-provider的信息,如果不為空直接返回調用即可,如果不存在則執行步驟2。
- 嘗試到基于nacos服務查詢組件DiscoveryClientServiceInstanceListSupplier查看是否存在nacos-provider如果有直接返回。
- ServiceInstanceListSupplier基于nacos服務組件的結果拿到實例信息,將其緩存起來,并基于負載均衡策略返回服務實例給服務消費者進行調用。
基于上圖我們給出loadbalancer自動裝配的服務查詢組件源碼,可以看到ServiceInstanceListSupplier通過withBlockingDiscoveryClient方法聚合了nacos服務查詢組件:
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
@Conditional(DefaultConfigurationCondition.class)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
//ServiceInstanceListSupplier通過withBlockingDiscoveryClient方法聚合了nacos服務查詢組件
return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
}
當我們發起服務調用時,feign代理會走到FeignBlockingLoadBalancerClient上,其內部會執行如下步驟:
- 通過loadBalancerClient的choose嘗試上文的多級緩存查詢服務示例的步驟,并完成負載均衡選取服務實例返回。
- 基于上述實例生成請求地址和參數。
- 發起請求并響應結果給服務消費者。
@Override
public Response execute(Request request, Request.Options options) throws IOException {
//......
//嘗試從緩存中拿服務,然后執行負載均衡調用
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
//......
//基于上述實例生成請求地址和參數
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
Request newRequest = buildRequest(request, reconstructedUrl);
LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);
//發起請求
return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
supportedLifecycleProcessors, loadBalancerProperties.isUseRawStatusCodeInResponseData());
}
步入loadBalancerClient.choose的調用路徑,我們就可以看到上文所說的多級緩存查詢步驟:
- 到lb緩存即cacheManager通過cachingServiceInstanceListSupplierCache查詢是否存在服務提供者nacos-provider的實例。
- 如果沒有則到nacos中的查詢。
- 基于查詢結果寫入lb的cache中:
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
this.serviceInstances = CacheFlux.lookup(key -> {
// 到lb緩存管理拿緩存
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
//......
//查看lb緩存有沒有
List<ServiceInstance> list = cache.get(key, List.class);
//如果沒有返回空
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
},
//若lb緩存沒有則觸發onCacheMissResume回調,就會通過delegate.get()觸發nacos服務組件查詢
delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
.doOnNext(instances -> {//doOnNext得到nacos緩存后寫入lb緩存中
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
//.......
}
else {
//寫入lb緩存
cache.put(key, instances);
}
}).then());
}
4. loadbalancer緩存更新策略
默認情況下,lb緩存每35s完成一次更新,這也就意味著緩存在lb緩存中的服務實例信息只有在存入后的35s內是有效的,為了避免定時輪詢更新服務實例的開銷,lb的緩存采用了一種惰性更新的思想。
假設我們此時此刻緩存將nacos-provider的實例信息緩存到lb裝配的CaffeineCache中,服務消費者在35s之后發起調用,此時CaffeineCache就會基于緩存服務實例的起始時間判斷緩存是否過期,如果發現過期則直接返回null,讓loadbalancer到nacos緩存中獲取nacos-provider實例信息并覆蓋掉當前過期的緩存:
對應我們給出cache緩存默認過期時間的默認值,即位于LoadBalancerCacheProperties 中對應ttl 的賦值:
@ConfigurationProperties("spring.cloud.loadbalancer.cache")
public class LoadBalancerCacheProperties {
//.......
private Duration ttl = Duration.ofSeconds(35);
//......
}
為方便說明,我們再次貼出lb緩存查詢的源碼,如下所示, cache.get(key, List.class)這一段就是從Loadbalancer的緩存中獲取服務實例的信息,如果過期也會返回null,然后到nacos緩存中獲取信息并更新過期緩存:
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
this.serviceInstances = CacheFlux.lookup(key -> {
// 到lb緩存管理拿緩存
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
//......
//查看lb緩存有沒有查詢服務實例信息,如果不存在或者過期則返回null
List<ServiceInstance> list = cache.get(key, List.class);
//如果沒有返回空
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
},
//若lb緩存沒有則觸發onCacheMissResume回調,就會通過delegate.get()觸發nacos服務組件查詢
delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
.doOnNext(instances -> {//doOnNext得到nacos緩存后寫入lb緩存中
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
//.......
}
else {
//寫入lb緩存,并更新寫入時間
cache.put(key, instances);
}
}).then());
}
步入Loadbalancer緩存CaffeineCache的get方法,最終就會來到BoundedLocalCache的getIfPresent,這段就是查詢緩存并判斷過期的核心實現,對應步驟為:
- 基于要調用的服務實例的字符串(以本文示例來說就是nacos-provider)作為key進行查詢,并得到一個node。
- 通過node的writeTime比對當前時間now判斷是否過期。
- 如果過期返回null,讓loadbalancer到nacos緩存中獲取最新的值并覆蓋掉當前緩存。
- 如果沒過期則直接返回。
對應的我們也給出這段說明的源碼,讀者可結合表述和源碼注釋理解上述步驟:
public @Nullable V getIfPresent(Object key, boolean recordStats) {
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
//......
//獲取服務實例信息
V value = node.getValue();
//......
//判斷是否過期,如果過期則返回null
if (hasExpired(node, now) || (collectValues() && (value == null))) {
//......
return null;
}
//......
//沒過期直接返回緩存中的服務實例信息
return value;
}
二、關于loadbalancer的一些注意事項
經過上述的說明相信筆者對于loadbalancer的底層工作機制有所了解,所以需要做灰度發布或者服務平滑下線的場景,我們建議將loadbalancer緩存直接禁用,一律采用nacos緩存,這一點筆者在之前的文章中也分析過nacos客戶端的中的服務實例緩存是實時刷新的,只要服務端感知到服務下線就會以RPC的方式通知nacos客戶端更新緩存。
對應的我們也給出禁用緩存的配置:
spring.cloud.loadbalancer.cache.enabled=false
三、小結
本文結合源碼的方式深入分析了SpringCloud Loadbalancer負載均衡調用時所涉及的:
- Loadbalancer緩存同步
- Loadbalancer如何進行過期緩存刪除
- Loadbalancer如何基于裝飾者模式和nacos緩存結合
由此得出Loadbalancer緩存實時性上的存在的風險,希望對你有幫助。