博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊eureka client的serviceUrl
阅读量:6003 次
发布时间:2019-06-20

本文共 20120 字,大约阅读时间需要 67 分钟。

本文主要研究一下eureka client的serviceUrl

serviceUrl解析

DiscoveryClient.scheduleServerEndpointTask

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java

private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,                                            AbstractDiscoveryClientOptionalArgs args) {                    Collection
additionalFilters = args == null ? Collections.emptyList() : args.additionalFilters; EurekaJerseyClient providedJerseyClient = args == null ? null : args.eurekaJerseyClient; TransportClientFactories argsTransportClientFactories = null; if (args != null && args.getTransportClientFactories() != null) { argsTransportClientFactories = args.getTransportClientFactories(); } // Ignore the raw types warnings since the client filter interface changed between jersey 1/2 @SuppressWarnings("rawtypes") TransportClientFactories transportClientFactories = argsTransportClientFactories == null ? new Jersey1TransportClientFactories() : argsTransportClientFactories; Optional
sslContext = args == null ? Optional.empty() : args.getSSLContext(); Optional
hostnameVerifier = args == null ? Optional.empty() : args.getHostnameVerifier(); // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity eurekaTransport.transportClientFactory = providedJerseyClient == null ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier) : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient); ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() { @Override public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) { long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit); long delay = getLastSuccessfulRegistryFetchTimePeriod(); if (delay > thresholdInMs) { logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}", thresholdInMs, delay); return null; } else { return localRegionApps.get(); } } }; eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver( clientConfig, transportConfig, eurekaTransport.transportClientFactory, applicationInfoManager.getInfo(), applicationsSource ); if (clientConfig.shouldRegisterWithEureka()) { EurekaHttpClientFactory newRegistrationClientFactory = null; EurekaHttpClient newRegistrationClient = null; try { newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, transportConfig ); newRegistrationClient = newRegistrationClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.registrationClientFactory = newRegistrationClientFactory; eurekaTransport.registrationClient = newRegistrationClient; } // new method (resolve from primary servers for read) // Configure new transport layer (candidate for injecting in the future) if (clientConfig.shouldFetchRegistry()) { EurekaHttpClientFactory newQueryClientFactory = null; EurekaHttpClient newQueryClient = null; try { newQueryClientFactory = EurekaHttpClients.queryClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, clientConfig, transportConfig, applicationInfoManager.getInfo(), applicationsSource ); newQueryClient = newQueryClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.queryClientFactory = newQueryClientFactory; eurekaTransport.queryClient = newQueryClient; } }
构造器里头直接初始化scheduleServerEndpointTask,然后newBootstrapResolver

EurekaHttpClients.newBootstrapResolver

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/EurekaHttpClients.java

public static ClosableResolver
newBootstrapResolver( final EurekaClientConfig clientConfig, final EurekaTransportConfig transportConfig, final TransportClientFactory transportClientFactory, final InstanceInfo myInstanceInfo, final ApplicationsResolver.ApplicationsSource applicationsSource) { if (COMPOSITE_BOOTSTRAP_STRATEGY.equals(transportConfig.getBootstrapResolverStrategy())) { if (clientConfig.shouldFetchRegistry()) { return compositeBootstrapResolver( clientConfig, transportConfig, transportClientFactory, myInstanceInfo, applicationsSource ); } else { logger.warn("Cannot create a composite bootstrap resolver if registry fetch is disabled." + " Falling back to using a default bootstrap resolver."); } } // if all else fails, return the default return defaultBootstrapResolver(clientConfig, myInstanceInfo); }
这里执行defaultBootstrapResolver
/**     * @return a bootstrap resolver that resolves eureka server endpoints based on either DNS or static config,     *         depending on configuration for one or the other. This resolver will warm up at the start.     */    static ClosableResolver
defaultBootstrapResolver(final EurekaClientConfig clientConfig, final InstanceInfo myInstanceInfo) { String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); String myZone = InstanceInfo.getZone(availZones, myInstanceInfo); ClusterResolver
delegateResolver = new ZoneAffinityClusterResolver( new ConfigClusterResolver(clientConfig, myInstanceInfo), myZone, true ); List
initialValue = delegateResolver.getClusterEndpoints(); if (initialValue.isEmpty()) { String msg = "Initial resolution of Eureka server endpoints failed. Check ConfigClusterResolver logs for more info"; logger.error(msg); failFastOnInitCheck(clientConfig, msg); } return new AsyncResolver<>( EurekaClientNames.BOOTSTRAP, delegateResolver, initialValue, 1, clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000 ); }
从注释可以看到,这里从DNS或者配置文件来解析eureka server的配置,这里的delegateResolver为ZoneAffinityClusterResolver

AsyncResolver

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/AsyncResolver.java

this.backgroundTask = new TimedSupervisorTask(                this.getClass().getSimpleName(),                executorService,                threadPoolExecutor,                refreshIntervalMs,                TimeUnit.MILLISECONDS,                5,                updateTask        );    private final Runnable updateTask = new Runnable() {        @Override        public void run() {            try {                List
newList = delegate.getClusterEndpoints(); if (newList != null) { resultsRef.getAndSet(newList); lastLoadTimestamp = System.currentTimeMillis(); } else { logger.warn("Delegate returned null list of cluster endpoints"); } logger.debug("Resolved to {}", newList); } catch (Exception e) { logger.warn("Failed to retrieve cluster endpoints from the delegate", e); } } };
这里有个backgroundTask去更新ClusterEndpoints,间隔值为clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000毫秒,默认为5分钟
配置项为eureka.client.eureka-service-url-poll-interval-seconds

ZoneAffinityClusterResolver

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/aws/ZoneAffinityClusterResolver.java

public List
getClusterEndpoints() { List
[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone); List
myZoneEndpoints = parts[0]; List
remainingEndpoints = parts[1]; List
randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints); if (!zoneAffinity) { Collections.reverse(randomizedList); } logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList); return randomizedList; }
这里调用了delegate.getClusterEndpoints(),这里的delegate为ConfigClusterResolver
这里对得到的List<AwsEndpoint>进行randomizeAndMerge
private static List
randomizeAndMerge(List
myZoneEndpoints, List
remainingEndpoints) { if (myZoneEndpoints.isEmpty()) { return ResolverUtils.randomize(remainingEndpoints); } if (remainingEndpoints.isEmpty()) { return ResolverUtils.randomize(myZoneEndpoints); } List
mergedList = ResolverUtils.randomize(myZoneEndpoints); mergedList.addAll(ResolverUtils.randomize(remainingEndpoints)); return mergedList; }

ConfigClusterResolver

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/aws/ConfigClusterResolver.java

public List
getClusterEndpoints() { if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { if (logger.isInfoEnabled()) { logger.info("Resolving eureka endpoints via DNS: {}", getDNSName()); } return getClusterEndpointsFromDns(); } else { logger.info("Resolving eureka endpoints via configuration"); return getClusterEndpointsFromConfig(); } } private List
getClusterEndpointsFromConfig() { String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); String myZone = InstanceInfo.getZone(availZones, myInstanceInfo); Map
> serviceUrls = EndpointUtils .getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka()); List
endpoints = new ArrayList<>(); for (String zone : serviceUrls.keySet()) { for (String url : serviceUrls.get(zone)) { try { endpoints.add(new AwsEndpoint(url, getRegion(), zone)); } catch (Exception ignore) { logger.warn("Invalid eureka server URI: {}; removing from the server pool", url); } } } logger.debug("Config resolved to {}", endpoints); if (endpoints.isEmpty()) { logger.error("Cannot resolve to any endpoints from provided configuration: {}", serviceUrls); } return endpoints; }
这里通过EndpointUtils.getServiceUrlsMapFromConfig解析serviceUrl
假设serviceUrl是
eureka:  client:    serviceUrl:      defaultZone: http://127.0.0.1:8761/eureka/,http://127.0.0.1:8762/eureka/

则最后得到两个AwsEndpoint

{ serviceUrl='http://127.0.0.1:8761/eureka/', region='us-east-1', zone='defaultZone'}{ serviceUrl='http://127.0.0.1:8762/eureka/', region='us-east-1', zone='defaultZone'}

EndpointUtils.getServiceUrlsMapFromConfig

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/endpoint/EndpointUtils.java

/**     * Get the list of all eureka service urls from properties file for the eureka client to talk to.     *     * @param clientConfig the clientConfig to use     * @param instanceZone The zone in which the client resides     * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise     * @return an (ordered) map of zone -> list of urls mappings, with the preferred zone first in iteration order     */    public static Map
> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) { Map
> orderedUrls = new LinkedHashMap<>(); String region = getRegion(clientConfig); String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); if (availZones == null || availZones.length == 0) { availZones = new String[1]; availZones[0] = DEFAULT_ZONE; } logger.debug("The availability zone for the given region {} are {}", region, availZones); int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); String zone = availZones[myZoneOffset]; List
serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); if (serviceUrls != null) { orderedUrls.put(zone, serviceUrls); } int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1); while (currentOffset != myZoneOffset) { zone = availZones[currentOffset]; serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); if (serviceUrls != null) { orderedUrls.put(zone, serviceUrls); } if (currentOffset == (availZones.length - 1)) { currentOffset = 0; } else { currentOffset++; } } if (orderedUrls.size() < 1) { throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); } return orderedUrls; }
这里调用clientConfig.getEurekaServerServiceUrls(zone)解析serviceUrl

EurekaClientConfigBean.getEurekaServerServiceUrls

spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaClientConfigBean.java

public List
getEurekaServerServiceUrls(String myZone) { String serviceUrls = this.serviceUrl.get(myZone); if (serviceUrls == null || serviceUrls.isEmpty()) { serviceUrls = this.serviceUrl.get(DEFAULT_ZONE); } if (!StringUtils.isEmpty(serviceUrls)) { final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls); List
eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length); for (String eurekaServiceUrl : serviceUrlsSplit) { if (!endsWithSlash(eurekaServiceUrl)) { eurekaServiceUrl += "/"; } eurekaServiceUrls.add(eurekaServiceUrl); } return eurekaServiceUrls; } return new ArrayList<>(); }
可以看到这里对多个eureka server的地址采用逗号分割

serviceUrl使用

RetryableEurekaHttpClient.execute

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/decorator/RetryableEurekaHttpClient.java

protected 
EurekaHttpResponse
execute(RequestExecutor
requestExecutor) { List
candidateHosts = null; int endpointIdx = 0; for (int retry = 0; retry < numberOfRetries; retry++) { EurekaHttpClient currentHttpClient = delegate.get(); EurekaEndpoint currentEndpoint = null; if (currentHttpClient == null) { if (candidateHosts == null) { candidateHosts = getHostCandidates(); if (candidateHosts.isEmpty()) { throw new TransportException("There is no known eureka server; cluster server list is empty"); } } if (endpointIdx >= candidateHosts.size()) { throw new TransportException("Cannot execute request on any known server"); } currentEndpoint = candidateHosts.get(endpointIdx++); currentHttpClient = clientFactory.newClient(currentEndpoint); } try { EurekaHttpResponse
response = requestExecutor.execute(currentHttpClient); if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) { delegate.set(currentHttpClient); if (retry > 0) { logger.info("Request execution succeeded on retry #{}", retry); } return response; } logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode()); } catch (Exception e) { logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace } // Connection error or 5xx from the server that must be retried on another server delegate.compareAndSet(currentHttpClient, null); if (currentEndpoint != null) { quarantineSet.add(currentEndpoint); } } throw new TransportException("Retry limit reached; giving up on completing the request"); }
可以看到这里先获取candidateHosts,然后支持失败重试,重试默认是3次(
int DEFAULT_NUMBER_OF_RETRIES = 3)
每重试一次失败,就换下一个eureka server的地址,如果endpointIdx >= candidateHosts.size(),则抛出TransportException("Cannot execute request on any known server")

RetryableEurekaHttpClient.getHostCandidates

private List
getHostCandidates() { List
candidateHosts = clusterResolver.getClusterEndpoints(); quarantineSet.retainAll(candidateHosts); // If enough hosts are bad, we have no choice but start over again int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage()); //Prevent threshold is too large if (threshold > candidateHosts.size()) { threshold = candidateHosts.size(); } if (quarantineSet.isEmpty()) { // no-op } else if (quarantineSet.size() >= threshold) { logger.debug("Clearing quarantined list of size {}", quarantineSet.size()); quarantineSet.clear(); } else { List
remainingHosts = new ArrayList<>(candidateHosts.size()); for (EurekaEndpoint endpoint : candidateHosts) { if (!quarantineSet.contains(endpoint)) { remainingHosts.add(endpoint); } } candidateHosts = remainingHosts; } return candidateHosts; }
这里quarantineSet = new ConcurrentSkipListSet<>(),它维护的是不可用的eureka server列表(
Connection error or 5xx)
这里有个threshold,是依据eureka.client.transport.retryableClientQuarantineRefreshPercentage来计算的,默认是0.66*candidateHosts.size(),大小最大为candidateHosts.size(),防止quarantineSet.size() < threshold导致不会清空quarantineSet,最后导致remainingHosts为空。

小结

  • client端的serviceUrl配置多个eureka server的话,默认是使用随机之后的list中的第一个,如果改server请求都成功,则不会轮到list中的第二个,不过这个list是会定时更新而且随机化的。
  • AsyncResolver有个backgroundTask(默认5分钟执行一次,取决于eureka.client.eureka-service-url-poll-interval-seconds配置)来刷新eureka server的list,默认还是走ZoneAffinityClusterResolver,然后走ConfigClusterResolver去获取,会重新随机list

doc

转载地址:http://pqzmx.baihongyu.com/

你可能感兴趣的文章
web-yestem(伊莱博)-票据管理(ver-1.0)-数据库设计
查看>>
[异常解决] ubuntu上安采用sudo启动的firefox,ibus输入法失效问题解决
查看>>
深入理解MySQL 5.7 GTID系列(二):GTID相关内部数据结构
查看>>
matlab命令
查看>>
ICE专题:学习ICE 3.0
查看>>
raptor
查看>>
.NET Reflector 改嫁!
查看>>
apacheserver下载、安装、配置
查看>>
枣庄:明确六个任务确定六大行动 为智慧城市建设提供支持
查看>>
三步打造云退出战略
查看>>
南通市城管局推动“数字化城管”向“智慧城管”升级
查看>>
湖北移动物联网为三峡库区保驾护航
查看>>
《深入理解OSGi:Equinox原理、应用与最佳实践》一3.7 本章小结
查看>>
从小数据分析到大数据平台,这十几年来大数据开源技术是如何演进的?
查看>>
《Oracle高性能SQL引擎剖析:SQL优化与调优机制详解》一第3章 查 询 转 换
查看>>
中兴通讯扬帆国际化难掩主场失意:内外销市场冰火两重天
查看>>
MIT最新发布编程语言Milk,加速大数据时代并行运算
查看>>
迪普科技亮相第五届全国信息安全等级保护技术大会
查看>>
LG联发科因侵权遭AMD起诉 ITC已受理
查看>>
你的企业有这样的网络安全盲点吗?
查看>>