public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
private final LoadBalancerClientFactory clientFactory;
private final GatewayLoadBalancerProperties properties;
private final LoadBalancerProperties loadBalancerProperties;
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 從上下文中獲取,如:lb://order-service/orders
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);
// 再次獲取
URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
// 獲取服務名;order-service
String serviceId = requestUri.getHost();
// clientFactory.getInstances方法會從NamedContextFactory.contexts集合中查找以order-service為key對應的
// AnnotationConfigApplicationContext,然后從這個容器中查找LoadBalancerLifecycle,默認返回{}
// ------------------------------------------------------------
/**
* 每個服務對應的ApplicationContext包含如下13個Bean
* org.springframework.context.annotation.internalConfigurationAnnotationProcessor
* org.springframework.context.annotation.internalAutowiredAnnotationProcessor
* org.springframework.context.annotation.internalCommonAnnotationProcessor
* org.springframework.context.event.internalEventListenerProcessor
* org.springframework.context.event.internalEventListenerFactory
* propertyPlaceholderAutoConfiguration loadBalancerClientConfiguration
* propertySourcesPlaceholderConfigurer
* LoadBalancerClientConfiguration$ReactiveSupportConfiguration
* discoveryClientServiceInstanceListSupplier
* LoadBalancerClientConfiguration$BlockingSupportConfiguration,
* reactorServiceInstanceLoadBalancer
*/
// 這里集合返回{}
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(
new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint())));
// choose負載查找指定服務(order-server)
return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {
if (!response.hasServer()) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
}
ServiceInstance retrievedInstance = response.getServer();
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
URI requestUrl = reconstructURI(serviceInstance, uri);
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
}).then(chain.filter(exchange))
.doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(CompletionContext.Status.FAILED,
throwable, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
.doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(
lifecycle -> lifecycle.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.SUCCESS, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
}
protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
// 從order-service對應的ApplicationContext中查找ReactorServiceInstanceLoadBalancer
ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId,
ReactorServiceInstanceLoadBalancer.class);
if (loadBalancer == null) {
throw new NotFoundException("No loadbalancer available for " + serviceId);
}
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 查找服務實例
return loadBalancer.choose(lbRequest);
}
private String getHint(String serviceId, Map<String, String> hints) {
String defaultHint = hints.getOrDefault("default", "default");
String hintPropertyValue = hints.get(serviceId);
return hintPropertyValue != null ? hintPropertyValue : defaultHint;
}
}
// 輪詢算分
public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
final AtomicInteger position;
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public Mono<Response<ServiceInstance>> choose(Request request) {
// 接下面ClientFactoryObjectProvider中獲取ServiceInstanceListSupplier
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
return new EmptyResponse();
}
// TODO: enforce order?
int pos = Math.abs(this.position.incrementAndGet());
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}
class ClientFactoryObjectProvider<T> implements ObjectProvider<T> {
private final NamedContextFactory<?> clientFactory;
// type = ServiceInstanceListSupplier
private final Class<T> type;
// name = order-service
private final String name;
private ObjectProvider<T> delegate() {
if (this.provider == null) {
// 從order-service對應ApplicationContext中獲取ServiceInstanceListSupplier
// 這里最終返回的是:DiscoveryClientServiceInstanceListSupplier
this.provider = this.clientFactory.getProvider(this.name, this.type);
}
return this.provider;
}
}
public class LoadBalancerClientConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnReactiveDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
public static class ReactiveSupportConfiguration {
@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default", matchIfMissing = true)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
// 這里最終構建的是:DiscoveryClientServiceInstanceListSupplier
return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
}
}
}
public final class ServiceInstanceListSupplierBuilder {
public ServiceInstanceListSupplierBuilder withDiscoveryClient() {
this.baseCreator = context -> {
// 先從order-service對應的ApplicationContext中查找ReactiveDiscoveryClient,如果你沒有自定義,那么就會從
// 父容器中查找,如果你使用的nacos,那么會返回NacosReactiveDiscoveryClient
ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class);
return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
};
return this;
}
}