序
本文主要研究一下eureka client的serviceUrl
serviceUrl解析
DiscoveryClient.scheduleServerEndpointTask
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport, AbstractDiscoveryClientOptionalArgs args) { Collection additionalFilters = args == null ? Collections.emptyList() : args.additionalFilters; EurekaJerseyClient providedJerseyClient = args == null ? null : args.eurekaJerseyClient; TransportClientFactories argsTransportClientFactories = null; if (args != null && args.getTransportClientFactories() != null) { argsTransportClientFactories = args.getTransportClientFactories(); } // Ignore the raw types warnings since the client filter interface changed between jersey 1/2 @SuppressWarnings("rawtypes") TransportClientFactories transportClientFactories = argsTransportClientFactories == null ? new Jersey1TransportClientFactories() : argsTransportClientFactories; OptionalsslContext = args == null ? Optional.empty() : args.getSSLContext(); Optional hostnameVerifier = args == null ? Optional.empty() : args.getHostnameVerifier(); // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity eurekaTransport.transportClientFactory = providedJerseyClient == null ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier) : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient); ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() { @Override public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) { long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit); long delay = getLastSuccessfulRegistryFetchTimePeriod(); if (delay > thresholdInMs) { logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}", thresholdInMs, delay); return null; } else { return localRegionApps.get(); } } }; eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver( clientConfig, transportConfig, eurekaTransport.transportClientFactory, applicationInfoManager.getInfo(), applicationsSource ); if (clientConfig.shouldRegisterWithEureka()) { EurekaHttpClientFactory newRegistrationClientFactory = null; EurekaHttpClient newRegistrationClient = null; try { newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, transportConfig ); newRegistrationClient = newRegistrationClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.registrationClientFactory = newRegistrationClientFactory; eurekaTransport.registrationClient = newRegistrationClient; } // new method (resolve from primary servers for read) // Configure new transport layer (candidate for injecting in the future) if (clientConfig.shouldFetchRegistry()) { EurekaHttpClientFactory newQueryClientFactory = null; EurekaHttpClient newQueryClient = null; try { newQueryClientFactory = EurekaHttpClients.queryClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, clientConfig, transportConfig, applicationInfoManager.getInfo(), applicationsSource ); newQueryClient = newQueryClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.queryClientFactory = newQueryClientFactory; eurekaTransport.queryClient = newQueryClient; } }
构造器里头直接初始化scheduleServerEndpointTask,然后newBootstrapResolver
EurekaHttpClients.newBootstrapResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/EurekaHttpClients.java
public static ClosableResolvernewBootstrapResolver( final EurekaClientConfig clientConfig, final EurekaTransportConfig transportConfig, final TransportClientFactory transportClientFactory, final InstanceInfo myInstanceInfo, final ApplicationsResolver.ApplicationsSource applicationsSource) { if (COMPOSITE_BOOTSTRAP_STRATEGY.equals(transportConfig.getBootstrapResolverStrategy())) { if (clientConfig.shouldFetchRegistry()) { return compositeBootstrapResolver( clientConfig, transportConfig, transportClientFactory, myInstanceInfo, applicationsSource ); } else { logger.warn("Cannot create a composite bootstrap resolver if registry fetch is disabled." + " Falling back to using a default bootstrap resolver."); } } // if all else fails, return the default return defaultBootstrapResolver(clientConfig, myInstanceInfo); }
这里执行defaultBootstrapResolver
/** * @return a bootstrap resolver that resolves eureka server endpoints based on either DNS or static config, * depending on configuration for one or the other. This resolver will warm up at the start. */ static ClosableResolverdefaultBootstrapResolver(final EurekaClientConfig clientConfig, final InstanceInfo myInstanceInfo) { String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); String myZone = InstanceInfo.getZone(availZones, myInstanceInfo); ClusterResolver delegateResolver = new ZoneAffinityClusterResolver( new ConfigClusterResolver(clientConfig, myInstanceInfo), myZone, true ); List initialValue = delegateResolver.getClusterEndpoints(); if (initialValue.isEmpty()) { String msg = "Initial resolution of Eureka server endpoints failed. Check ConfigClusterResolver logs for more info"; logger.error(msg); failFastOnInitCheck(clientConfig, msg); } return new AsyncResolver<>( EurekaClientNames.BOOTSTRAP, delegateResolver, initialValue, 1, clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000 ); }
从注释可以看到,这里从DNS或者配置文件来解析eureka server的配置,这里的delegateResolver为ZoneAffinityClusterResolver
AsyncResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/AsyncResolver.java
this.backgroundTask = new TimedSupervisorTask( this.getClass().getSimpleName(), executorService, threadPoolExecutor, refreshIntervalMs, TimeUnit.MILLISECONDS, 5, updateTask ); private final Runnable updateTask = new Runnable() { @Override public void run() { try { ListnewList = delegate.getClusterEndpoints(); if (newList != null) { resultsRef.getAndSet(newList); lastLoadTimestamp = System.currentTimeMillis(); } else { logger.warn("Delegate returned null list of cluster endpoints"); } logger.debug("Resolved to {}", newList); } catch (Exception e) { logger.warn("Failed to retrieve cluster endpoints from the delegate", e); } } };
这里有个backgroundTask去更新ClusterEndpoints,间隔值为clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000毫秒,默认为5分钟 配置项为eureka.client.eureka-service-url-poll-interval-seconds
ZoneAffinityClusterResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/aws/ZoneAffinityClusterResolver.java
public ListgetClusterEndpoints() { List [] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone); List myZoneEndpoints = parts[0]; List remainingEndpoints = parts[1]; List randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints); if (!zoneAffinity) { Collections.reverse(randomizedList); } logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList); return randomizedList; }
这里调用了delegate.getClusterEndpoints(),这里的delegate为ConfigClusterResolver 这里对得到的List<AwsEndpoint>进行randomizeAndMerge
private static ListrandomizeAndMerge(List myZoneEndpoints, List remainingEndpoints) { if (myZoneEndpoints.isEmpty()) { return ResolverUtils.randomize(remainingEndpoints); } if (remainingEndpoints.isEmpty()) { return ResolverUtils.randomize(myZoneEndpoints); } List mergedList = ResolverUtils.randomize(myZoneEndpoints); mergedList.addAll(ResolverUtils.randomize(remainingEndpoints)); return mergedList; }
ConfigClusterResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/aws/ConfigClusterResolver.java
public ListgetClusterEndpoints() { if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { if (logger.isInfoEnabled()) { logger.info("Resolving eureka endpoints via DNS: {}", getDNSName()); } return getClusterEndpointsFromDns(); } else { logger.info("Resolving eureka endpoints via configuration"); return getClusterEndpointsFromConfig(); } } private List getClusterEndpointsFromConfig() { String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); String myZone = InstanceInfo.getZone(availZones, myInstanceInfo); Map > serviceUrls = EndpointUtils .getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka()); List endpoints = new ArrayList<>(); for (String zone : serviceUrls.keySet()) { for (String url : serviceUrls.get(zone)) { try { endpoints.add(new AwsEndpoint(url, getRegion(), zone)); } catch (Exception ignore) { logger.warn("Invalid eureka server URI: {}; removing from the server pool", url); } } } logger.debug("Config resolved to {}", endpoints); if (endpoints.isEmpty()) { logger.error("Cannot resolve to any endpoints from provided configuration: {}", serviceUrls); } return endpoints; }
这里通过EndpointUtils.getServiceUrlsMapFromConfig解析serviceUrl 假设serviceUrl是
eureka: client: serviceUrl: defaultZone: http://127.0.0.1:8761/eureka/,http://127.0.0.1:8762/eureka/
则最后得到两个AwsEndpoint
{ serviceUrl='http://127.0.0.1:8761/eureka/', region='us-east-1', zone='defaultZone'}{ serviceUrl='http://127.0.0.1:8762/eureka/', region='us-east-1', zone='defaultZone'}
EndpointUtils.getServiceUrlsMapFromConfig
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/endpoint/EndpointUtils.java
/** * Get the list of all eureka service urls from properties file for the eureka client to talk to. * * @param clientConfig the clientConfig to use * @param instanceZone The zone in which the client resides * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise * @return an (ordered) map of zone -> list of urls mappings, with the preferred zone first in iteration order */ public static Map> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) { Map > orderedUrls = new LinkedHashMap<>(); String region = getRegion(clientConfig); String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); if (availZones == null || availZones.length == 0) { availZones = new String[1]; availZones[0] = DEFAULT_ZONE; } logger.debug("The availability zone for the given region {} are {}", region, availZones); int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); String zone = availZones[myZoneOffset]; List serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); if (serviceUrls != null) { orderedUrls.put(zone, serviceUrls); } int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1); while (currentOffset != myZoneOffset) { zone = availZones[currentOffset]; serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); if (serviceUrls != null) { orderedUrls.put(zone, serviceUrls); } if (currentOffset == (availZones.length - 1)) { currentOffset = 0; } else { currentOffset++; } } if (orderedUrls.size() < 1) { throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); } return orderedUrls; }
这里调用clientConfig.getEurekaServerServiceUrls(zone)解析serviceUrl
EurekaClientConfigBean.getEurekaServerServiceUrls
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaClientConfigBean.java
public ListgetEurekaServerServiceUrls(String myZone) { String serviceUrls = this.serviceUrl.get(myZone); if (serviceUrls == null || serviceUrls.isEmpty()) { serviceUrls = this.serviceUrl.get(DEFAULT_ZONE); } if (!StringUtils.isEmpty(serviceUrls)) { final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls); List eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length); for (String eurekaServiceUrl : serviceUrlsSplit) { if (!endsWithSlash(eurekaServiceUrl)) { eurekaServiceUrl += "/"; } eurekaServiceUrls.add(eurekaServiceUrl); } return eurekaServiceUrls; } return new ArrayList<>(); }
可以看到这里对多个eureka server的地址采用逗号分割
serviceUrl使用
RetryableEurekaHttpClient.execute
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/decorator/RetryableEurekaHttpClient.java
protectedEurekaHttpResponse execute(RequestExecutor requestExecutor) { List candidateHosts = null; int endpointIdx = 0; for (int retry = 0; retry < numberOfRetries; retry++) { EurekaHttpClient currentHttpClient = delegate.get(); EurekaEndpoint currentEndpoint = null; if (currentHttpClient == null) { if (candidateHosts == null) { candidateHosts = getHostCandidates(); if (candidateHosts.isEmpty()) { throw new TransportException("There is no known eureka server; cluster server list is empty"); } } if (endpointIdx >= candidateHosts.size()) { throw new TransportException("Cannot execute request on any known server"); } currentEndpoint = candidateHosts.get(endpointIdx++); currentHttpClient = clientFactory.newClient(currentEndpoint); } try { EurekaHttpResponse response = requestExecutor.execute(currentHttpClient); if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) { delegate.set(currentHttpClient); if (retry > 0) { logger.info("Request execution succeeded on retry #{}", retry); } return response; } logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode()); } catch (Exception e) { logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace } // Connection error or 5xx from the server that must be retried on another server delegate.compareAndSet(currentHttpClient, null); if (currentEndpoint != null) { quarantineSet.add(currentEndpoint); } } throw new TransportException("Retry limit reached; giving up on completing the request"); }
可以看到这里先获取candidateHosts,然后支持失败重试,重试默认是3次( int DEFAULT_NUMBER_OF_RETRIES = 3
) 每重试一次失败,就换下一个eureka server的地址,如果endpointIdx >= candidateHosts.size(),则抛出TransportException("Cannot execute request on any known server")
RetryableEurekaHttpClient.getHostCandidates
private ListgetHostCandidates() { List candidateHosts = clusterResolver.getClusterEndpoints(); quarantineSet.retainAll(candidateHosts); // If enough hosts are bad, we have no choice but start over again int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage()); //Prevent threshold is too large if (threshold > candidateHosts.size()) { threshold = candidateHosts.size(); } if (quarantineSet.isEmpty()) { // no-op } else if (quarantineSet.size() >= threshold) { logger.debug("Clearing quarantined list of size {}", quarantineSet.size()); quarantineSet.clear(); } else { List remainingHosts = new ArrayList<>(candidateHosts.size()); for (EurekaEndpoint endpoint : candidateHosts) { if (!quarantineSet.contains(endpoint)) { remainingHosts.add(endpoint); } } candidateHosts = remainingHosts; } return candidateHosts; }
这里quarantineSet = new ConcurrentSkipListSet<>(),它维护的是不可用的eureka server列表( Connection error or 5xx
) 这里有个threshold,是依据eureka.client.transport.retryableClientQuarantineRefreshPercentage来计算的,默认是0.66*candidateHosts.size(),大小最大为candidateHosts.size(),防止quarantineSet.size() < threshold导致不会清空quarantineSet,最后导致remainingHosts为空。
小结
- client端的serviceUrl配置多个eureka server的话,默认是使用随机之后的list中的第一个,如果改server请求都成功,则不会轮到list中的第二个,不过这个list是会定时更新而且随机化的。
- AsyncResolver有个backgroundTask(
默认5分钟执行一次,取决于eureka.client.eureka-service-url-poll-interval-seconds配置
)来刷新eureka server的list,默认还是走ZoneAffinityClusterResolver,然后走ConfigClusterResolver去获取,会重新随机list