Spring Cloud 遠程接口調用OpenFeign負載均衡實現原理詳解
環境:Spring Cloud 2021.0.7 + Spring Boot 2.7.12
配置依賴
maven依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
開啟注解功能
@SpringBootApplication
// 開啟Feign功能,在該注解中你還可以配置,如下3個重要的信息:
// 1. 為所有的FeignClient提供統一默認的配置
// 2. 指定掃描那些包寫的類
// 3. 指定有哪些@FeignClient類
@EnableFeignClients
public class AppApplication {
public static void main(String[] args) {
SpringApplication.run(AppApplication.class, args);
}
}
FeignClient生成Bean原理
容器在啟動過程中會找到所有@FeignClient的接口類,然后將這些類注冊為容器Bean,而每一個Feign客戶端對應的是FactoryBean對象FeignClientFactoryBean。
具體如何找這些帶有@FeignClient注解的接口類可以查看FeignClientsRegistrar該類就在@EnableFeignClients中被導入。
FeignClientFactoryBean
public class FeignClientFactoryBean implements FactoryBean {
public Object getObject() {
return getTarget();
}
<T> T getTarget() {
FeignContext context = beanFactory != null ? beanFactory.getBean(FeignContext.class) : applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);
if (!StringUtils.hasText(url)) {
if (!name.startsWith("http")) {
url = "http://" + name;
}
else {
url = name;
}
url += cleanPath();
// 負載均衡處理
return (T) loadBalance(builder, context, new HardCodedTarget<>(type, name, url));
}
// ...
}
protected <T> T loadBalance(Feign.Builder builder, FeignContext context, HardCodedTarget<T> target) {
// 在OpenFeign中核心實現負載均衡的類就是具體的Client類
// Feign負載均衡能力實現通過具體Client實現,每一個FeignClient客戶端都會對應一個子容器AnnotationConfigApplicationContext
// 根據@FeignClient配置的服務名name或value為key,從一個LoadBalancerClientFactory(父類)中的Map中查找該name對應的容器
// 如果不存在則創建一個AnnotationConfigApplicationContext。每個子容器都設置了父容器,如果通過子容器查找不到Client的實現,那么會從父容器中查找
Client client = getOptional(context, Client.class);
}
}
Client實現
Client的具體實現可以有如下:
- apache httpclient
- okhttp
- default(jdk)
具體使用哪個是根據你環境引入了哪個依賴(httpclient,okhttp)
<!-- httpclient -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
<version>${version}</version>
</dependency>
<!-- okhttp -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
<version>${version}</version>
</dependency>
具體選擇通過如下配置
@Import({
HttpClientFeignLoadBalancerConfiguration.class,
OkHttpFeignLoadBalancerConfiguration.class,
HttpClient5FeignLoadBalancerConfiguration.class,
DefaultFeignLoadBalancerConfiguration.class })
public class FeignLoadBalancerAutoConfiguration {
}
如果你的環境有多個實現,那么這里會根據這里的導入順序加載。這里以最后一個DefaultFeignLoadBalancerConfiguration為例。
class DefaultFeignLoadBalancerConfiguration {
@Bean
@ConditionalOnMissingBean
// 沒有啟用spring-retry重試功能
@Conditional(OnRetryNotEnabledCondition.class)
public Client feignClient(LoadBalancerClient loadBalancerClient, LoadBalancerClientFactory loadBalancerClientFactory) {
// 這里構造函數第一個參數將會成為最終執行遠程接口調用的實現
return new FeignBlockingLoadBalancerClient(new Client.Default(null, null), loadBalancerClient, loadBalancerClientFactory);
}
}
在沒有導入httpclient或者okhttp情況下,使用的Client實現是FeignBlockingLoadBalancerClient。
負載均衡實現
構造FeignBlockingLoadBalancerClient傳入了負載均衡客戶端LoadBalancerClient及負載均衡客戶端工廠LoadBalancerClientFactory該工廠是用來創建每一個Feign客戶端對應的子容器AnnotationConfigApplicationContext及從對應子容器獲取相應的Bean實例對象,如:Client,Request.Options,Logger.Level等。
public class FeignBlockingLoadBalancerClient implements Client {
// 此Client代理對象是上面的new Client.Default(null, null)
private final Client delegate;
private final LoadBalancerClient loadBalancerClient;
private final LoadBalancerClientFactory loadBalancerClientFactory;
public FeignBlockingLoadBalancerClient(Client delegate, LoadBalancerClient loadBalancerClient, LoadBalancerClientFactory loadBalancerClientFactory) {
this.delegate = delegate;
this.loadBalancerClient = loadBalancerClient;
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
final URI originalUri = URI.create(request.url());
// 獲取服務名serviceId
String serviceId = originalUri.getHost();
String hint = getHint(serviceId);
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(buildRequestData(request), hint));
// ...
// 通過負載均衡客戶端獲取指定serviceId的服務實例
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
// ...
// 通過獲取到的ServiceInstance實例,重新構造請求地址
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
// 重新構建一個新的請求
Request newRequest = buildRequest(request, reconstructedUrl);
LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);
return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse, supportedLifecycleProcessors, loadBalancerProperties.isUseRawStatusCodeInResponseData());
}
protected Request buildRequest(Request request, String reconstructedUrl) {
return Request.create(request.httpMethod(), reconstructedUrl, request.headers(), request.body(),
request.charset(), request.requestTemplate());
}
}
LoadBalancerClient具體實現:
public class BlockingLoadBalancerClientAutoConfiguration {
@Bean
@ConditionalOnBean(LoadBalancerClientFactory.class)
@ConditionalOnMissingBean
public LoadBalancerClient blockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory) {
return new BlockingLoadBalancerClient(loadBalancerClientFactory);
}
}
BlockingLoadBalancerClient
public class BlockingLoadBalancerClient implements LoadBalancerClient {
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
// 獲取一個負載均衡器,默認是輪詢策略
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
// 重新構造請求的uri
public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
}
public final class LoadBalancerUriTools {
public static URI reconstructURI(ServiceInstance serviceInstance, URI original) {
// ...
return doReconstructURI(serviceInstance, original);
}
private static URI doReconstructURI(ServiceInstance serviceInstance, URI original) {
String host = serviceInstance.getHost();
String scheme = Optional.ofNullable(serviceInstance.getScheme()).orElse(computeScheme(original, serviceInstance));
int port = computePort(serviceInstance.getPort(), scheme);
if (Objects.equals(host, original.getHost()) && port == original.getPort() && Objects.equals(scheme, original.getScheme())) {
return original;
}
boolean encoded = containsEncodedParts(original);
return UriComponentsBuilder.fromUri(original).scheme(scheme).host(host).port(port).build(encoded).toUri();
}
}
輪詢算法
public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
// ...
// 如果只有一個實例
if (instances.size() == 1) {
return new DefaultResponse(instances.get(0));
}
// Ignore the sign bit, this allows pos to loop sequentially from 0 to
// Integer.MAX_VALUE
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}
執行遠程調用
接著上面FeignBlockingLoadBalancerClient#execute方法最終的返回方法執行
final class LoadBalancerUtils {
static Response executeWithLoadBalancerLifecycleProcessing(Client feignClient, Request.Options options,
Request feignRequest, org.springframework.cloud.client.loadbalancer.Request lbRequest,
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse,
Set<LoadBalancerLifecycle> supportedLifecycleProcessors, boolean useRawStatusCodes) throws IOException {
return executeWithLoadBalancerLifecycleProcessing(feignClient, options, feignRequest, lbRequest, lbResponse, supportedLifecycleProcessors, true, useRawStatusCodes);
}
static Response executeWithLoadBalancerLifecycleProcessing(Client feignClient, Request.Options options,
Request feignRequest, org.springframework.cloud.client.loadbalancer.Request lbRequest,
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse,
Set<LoadBalancerLifecycle> supportedLifecycleProcessors, boolean loadBalanced, boolean useRawStatusCodes) throws IOException {
// 這里執行生命周期實際調用前動作
try {
// 執行時間的調用,而這里的feignClient就是在FeignBlockingLoadBalancerClient傳遞過來的,new Client.Default(null, null)
Response response = feignClient.execute(feignRequest, options);
// 這里執行生命周期回調,省略
return response;
}
// ...
}
}
Client.Default
public interface Client {
public Response execute(Request request, Options options) throws IOException {
// 通過JDK自帶的網絡連接進行處理
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection, request);
}
}