曹工说mini-dubbo(2)–分析eureka client源码,想办法把我们的服务提供者注册到eureka server(上)

前言

eureka是spring cloud Netflix手艺系统中的主要组件,主要完成服务注册和发现的功效;那现在有个问题,我们自己写的rpc服务,若是为了保证足够的开放性和功效完善性,那肯定要支持种种注册中央。现在我们只支持redis注册中央,即服务提供者,在启动的时刻,将自身的ip+端口信息写入到redis,那,我们是否注册到 eureka中呢?

这个想法可行吗?可行。eureka client 和eureka server间,无非是网络通讯,既然是网络通讯,那就有网络协议,那我们的应用,只要遵照eureka server的协议来,就可以接入。

另外,eureka server没有接纳spring mvc来实现,而是接纳了jersey框架,这个框架啥意思呢,可以明白为对Restful的实现。我从网上找了一段(https://www.jianshu.com/p/88f97b90963c):

SpringMVC在开发REST应用时,是不支持JSR311/JSR339尺度的。若是想要凭据尺度行事,最常用的实现了这两个尺度的框架就是Jersey和CxF了。然则,由于Jersey是最早的实现,也是JSR311参考的主要工具,以是,可以说Jersey就是事实上的尺度(类似Hibernate是JPA的事实上的尺度),也是现在使用最为普遍的REST开发框架之一。

由于eureka server接纳了jersey,以是eureka client最终也是使用了配套的jersey client来和服务端通讯。

以是,eureka client,内里着实依赖了一堆jersey的包:

曹工说mini-dubbo(2)--分析eureka client源码,想办法把我们的服务提供者注册到eureka server(上)

注重,上面的jersey-client、jersey-core等包,其group id都是这样的:

    <dependency>
      <groupId>com.sun.jersey</groupId>
      <artifactId>jersey-client</artifactId>
      <version>1.19.1</version>
      <scope>runtime</scope>
    </dependency>

然则,不知道为啥,eureka client中,最终并没有完全使用jersey-client,而是使用了

    <dependency>
      <groupId>com.sun.jersey.contribs</groupId>
      <artifactId>jersey-apache-client4</artifactId>
      <version>1.19.1</version>
      <scope>runtime</scope>
    </dependency>

这个包,内部引入了:

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.1.1</version>
        </dependency>

这个包,你可以简朴明白为,jersey-client变成了一个接口,jersey-apache-client4是它的一个实现,实现里,用了httpClient去实现。

httpClient,没有几个java同砚不知道吧?这么做是可行的,由于你最终通讯,照样http,不管你服务端框架,是jersey、照样spring mvc、甚至以前的struts,这都不主要。

以是,人人在下面的源码中看到jersey的时刻,脑海里可以有这么一张图。从上层到底层的接口,分别是:

CloudEurekaClient
       ...
DiscoveryClient  
	...
EurekaClient
	...
JerseyClient
	...
HttpClient	

在此之前,我们照样先剖析下eureka client 注册到eureka server的源码。

源码环境

minidubbo代码和相关博文在:
曹工说mini-dubbo(1)–为了实践动态署理,我写了个简朴的rpc框架
https://gitee.com/ckl111/mini-dubbo

代码很简朴,不外照样给个代码链接吧:

https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/eureka-client

主要就是在pom.xml中,引入:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

然后启动类:

@SpringBootApplication
public class DemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}

}

源码剖析

spring.factory支持自动设置

由于前面的pom,引入了如下jar包:

		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-netflix-eureka-client</artifactId>
		</dependency>

该jar包的META-INF\spring.factories中,有如下几行:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

我们看到,key是org.springframework.boot.autoconfigure.EnableAutoConfiguration,value是逗号支解的列表,这内里都是需要被自动装配的设置类,其中,我们看第三行的:

org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration

这个类,是自动装配的设置类,我们可以简朴一览:

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
public class EurekaClientAutoConfiguration {

内里一堆@ConditionalOn***,主要是看该设置类是否生效。

我们不管,这里条件是知足的,以是,看详细java文件里有什么要装配的内容,内里内容较多,我们关注我们需要关注的:

		@Bean(destroyMethod = "shutdown")
		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
		@Lazy
		public EurekaClient eurekaClient(ApplicationInfoManager manager,
				EurekaClientConfig config, EurekaInstanceConfig instance,
				@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
			ApplicationInfoManager appManager;
			if (AopUtils.isAopProxy(manager)) {
				appManager = ProxyUtils.getTargetObject(manager);
			}
			else {
				appManager = manager;
			}
            // 1
			CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
					config, this.optionalArgs, this.context);
			cloudEurekaClient.registerHealthCheck(healthCheckHandler);
			return cloudEurekaClient;
		}

这里会自动装配一个EurekaClient类型的bean,(从返回值可以看出来),而详细的类型呢,从上面的1处,可以看出,详细类型是CloudEurekaClient。

以是,我们最先看1处,这个CloudEurekaClient是怎么new出来的。

CloudEurekaClient的建立

先看看其继续结构:

曹工说mini-dubbo(2)--分析eureka client源码,想办法把我们的服务提供者注册到eureka server(上)

我们这个CloudEurekaClient,位于spring-cloud-netflix-eureka-client-2.1.5.RELEASE包。

而其父类DiscoveryClient和接口EurekaClient,位于eureka-client-1.9.13

大致能剖析出,CloudEurekaClient的底层实现是eureka,其自己,是一个胶水,集成 spring 和 Netflix。

CloudEurekaClient的组织函数

	public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
			EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
			ApplicationEventPublisher publisher) {
        // 1
		super(applicationInfoManager, config, args);
        // 2
		this.applicationInfoManager = applicationInfoManager;
		this.publisher = publisher;
		this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
				"eurekaTransport");
		ReflectionUtils.makeAccessible(this.eurekaTransportField);
	}

我们看1处,挪用了父类的组织函数;2处下面的几行,主要是对本类中的几个field举行赋值,这几个字段,我们不关心,以是,直接看父类的组织函数吧。

DiscoveryClient的组织函数

    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
        this(applicationInfoManager, config, args, ResolverUtils::randomize);
    }

    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
        // 1
        this(applicationInfoManager, config, args, null, randomizer);
    }

上面两个,都是重载。1处挪用的,我们接下来会重点剖析。

步骤1:一堆field赋值

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    	// 0	
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();
		// 1
        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }
		...

这一堆都是凭据入数,来对类中的field举行赋值。好比

0处,主要是一些健康检查的器械;1处,config类型为 com.netflix.discovery.EurekaClientConfig,这里主要是eureka client的一些设置,好比我们在yml中设置了eureka.client.*之类的,就会到这里。

步骤2:判断是否要获取eureka server中的服务提供者信息

	// 1
	if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

1处,可以看出来,是凭据config中的shouldFetchRegistry举行判断,是否要去获取eureka server。

然后举行了一些监控指标的初始化。

步骤3:判断是否要注册到eureka server

    	// 1    
		if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

同上。

步骤4:若是既不注册,也不获取,则处置基本竣事

		// 1
		if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
			// 2
            return;  // no need to setup up an network tasks and we are done
        }
  • 1处,既不注册,也不从eureka server获取
  • 2处,直接竣事

步骤5:界说三个线程池

            //1 default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
			// 2
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
			// 3 
			cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
  • 1处,界说一个用于服务提供者信息的缓存刷新的准时线程池
  • 2处,界说一个心跳线程池
  • 3处,这个看起来也是用于缓存刷新的

步骤6:建立eurekaTransport工具

com.netflix.discovery.DiscoveryClient#scheduleServerEndpointTask
// 1
eurekaTransport = new EurekaTransport();
// 2
scheduleServerEndpointTask(eurekaTransport, args);
  • 1处,eurekaTransport是一个field,该类主要封装了几个后续通讯要使用的底层client。

    
        private static final class EurekaTransport {
            private ClosableResolver bootstrapResolver;
            private TransportClientFactory transportClientFactory;
    		// 1.1
            private EurekaHttpClient registrationClient;
            private EurekaHttpClientFactory registrationClientFactory;
    		// 1.2
            private EurekaHttpClient queryClient;
            private EurekaHttpClientFactory queryClientFactory;
    

    1.1处,这个应该是注册用的,也是我们需要的;

    1.2处,应该是查询信息用的。

  • 挪用了当前类的方式scheduleServerEndpointTask,且把eurekaTransport传入了

步骤7:schedule周期义务

建立抽象工厂

因我们只是new了eurekaTransport,没有对其field举行任何赋值,以是,这个scheduleServerEndpointTask总,有个地方对其field举行赋值。

com.netflix.discovery.DiscoveryClient#scheduleServerEndpointTask
// 1
TransportClientFactories transportClientFactories =new Jersey1TransportClientFactories();

// 2
eurekaTransport.transportClientFactory = transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier)

  • 1处,就是new了一个抽象工厂,抽象工厂,我小我私家明白是工厂的工厂,其产出的器械,不是直接的最终工具,而是另一种工厂。

    TransportClientFactories 是一个接口,主要包罗了如下方式:

    前端算法渣的救赎之路

    	public TransportClientFactory newTransportClientFactory(
            	final EurekaClientConfig clientConfig,
            	final Collection<F> additionalFilters,
                final InstanceInfo myInstanceInfo,
                final Optional<SSLContext> sslContext,
                final Optional<HostnameVerifier> hostnameVerifier);
    

    主要5个参数,排除掉最后的倒数2个,可选参数,剩3个。分别是:eurekaClient的设置bean,分外的filter聚集,当前实例信息。

详细工厂的职责

  • 2处,就是行使1处建立的抽象工厂,来天生我们需要的工厂。

    这里,我们可以先看看,最终我们需要的工厂,是什么样的。

    /**
     * A low level client factory interface. Not advised to be used by top level consumers.
     *
     * @author David Liu
     */
    public interface TransportClientFactory {
    
        EurekaHttpClient newClient(EurekaEndpoint serviceUrl);
    
        void shutdown();
    
    }
    

    newClient这个方式,听名字,就是一个建立客户端的,建立客户端,需要什么参数呢?总得知道要连接到哪个eureka server服务器吧,服务器地址是啥吧?没错,参数EurekaEndpoint serviceUrl可以给我们提供需要的这些:

    package com.netflix.discovery.shared.resolver;
    
    public interface EurekaEndpoint extends Comparable<Object> {
    	// 1
        String getServiceUrl();
    	// 2
        String getNetworkAddress();
    	// 3
        int getPort();
    
        boolean isSecure();
    
        String getRelativeUri();
    
    }
    
    
    • 1处,获取url
    • 2处,获取网络地址
    • 3处,获取端口

    基本对于我们一个客户端来说,需要的参数就这些。

    说完了newClient的参数,再来看看响应:

    
    /**
     * Low level Eureka HTTP client API.
     *
     * @author Tomasz Bak
     */
    public interface EurekaHttpClient {
    
        EurekaHttpResponse<Void> register(InstanceInfo info);
    
        EurekaHttpResponse<Void> cancel(String appName, String id);
    
        EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus);
    
        EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info);
    
        EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info);
    
        EurekaHttpResponse<Applications> getApplications(String... regions);
    
        EurekaHttpResponse<Applications> getDelta(String... regions);
    
        EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions);
    
        EurekaHttpResponse<InstanceInfo> getInstance(String appName, String id);
    
        EurekaHttpResponse<InstanceInfo> getInstance(String id);
    
        void shutdown();
    }
    

    看到了吗,种种注册、作废、发送心跳、状态更新啥的,这几本涵盖了eureka client的所有操作了,没错,我们就是需要这么个器械。

建立详细工厂

看完了我们需要的工厂的功效,我们马上来看看这么厉害的工厂怎么建立出来?

com.netflix.discovery.shared.transport.jersey.Jersey1TransportClientFactories#newTransportClientFactory(...)
        
	@Override
    public TransportClientFactory newTransportClientFactory(
        	EurekaClientConfig clientConfig,
            Collection<ClientFilter> additionalFilters,
        	InstanceInfo myInstanceInfo,
        	Optional<SSLContext> sslContext,
        	Optional<HostnameVerifier> hostnameVerifier) {
    	// 2.1
        final TransportClientFactory jerseyFactory = JerseyEurekaHttpClientFactory.create(
                clientConfig,
                additionalFilters,
                myInstanceInfo,
                new EurekaClientIdentity(myInstanceInfo.getIPAddr()),
                sslContext,
                hostnameVerifier
        );
        // 2.2
        final TransportClientFactory metricsFactory = MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory);
		// 2.3
        return new TransportClientFactory() {
            @Override
            public EurekaHttpClient newClient(EurekaEndpoint serviceUrl) {
                return metricsFactory.newClient(serviceUrl);
            }

            @Override
            public void shutdown() {
                metricsFactory.shutdown();
                jerseyFactory.shutdown();
            }
        };
    }
  • 2.1处,挪用JerseyEurekaHttpClientFactory的create 静态方式,天生了一个工厂
  • 2.2处,对天生的工厂,举行了包装,看名称,应该是包装了统计相关信息。
  • 2.3处,对2.2处天生的工厂,用匿名内部类举行了包装,挪用匿名内部类的newClient时,直接署理给了metricsFactory;而shutdown方式,则主要是关闭 metricsFactory 和 jerseyFactory 工厂。

以是,我们现在要看看,2.1处,是怎么建立工厂的。

com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory#create
    
    public static JerseyEurekaHttpClientFactory create(
    	EurekaClientConfig clientConfig,
        Collection<ClientFilter> additionalFilters,
    	InstanceInfo myInstanceInfo,                                                       		 AbstractEurekaIdentity clientIdentity) {
        // 1
    	boolean useExperimental = "true".equals(clientConfig.getExperimental("JerseyEurekaHttpClientFactory.useNewBuilder"));
		// 2
        JerseyEurekaHttpClientFactoryBuilder clientBuilder = (useExperimental ? experimentalBuilder() : newBuilder())
                .withAdditionalFilters(additionalFilters)
                .withMyInstanceInfo(myInstanceInfo)
                .withUserAgent("Java-EurekaClient")
                .withClientConfig(clientConfig)
                .withClientIdentity(clientIdentity);
    	// 3
        clientBuilder.withClientName("DiscoveryClient-HTTPClient");
		// 4
        return clientBuilder.build();
    }
  • 1处,砍断是否要使用实验性的builder
  • 2处,建立对应的builder,并把我们的参数,通过with*方式,设置进去
  • 3处,设置客户端名称
  • 4处,天生客户端工厂
com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory.JerseyEurekaHttpClientFactoryBuilder#build

    
        @Override
        public JerseyEurekaHttpClientFactory build() {
    		// 1
            Map<String, String> additionalHeaders = new HashMap<>();
    		// 2
            if (allowRedirect) {
                additionalHeaders.put(HTTP_X_DISCOVERY_ALLOW_REDIRECT, "true");
            }
            if (EurekaAccept.compact == eurekaAccept) {
                additionalHeaders.put(EurekaAccept.HTTP_X_EUREKA_ACCEPT, eurekaAccept.name());
            }
			
            // 3
            return buildLegacy(additionalHeaders, systemSSL);
        }

这里就是弄了个hashmap,设置了几个header进去,然后3处,挪用buildLegacy。

com.netflix.discovery.shared.transport.jersey.JerseyEurekaHttpClientFactory.JerseyEurekaHttpClientFactoryBuilder#buildLegacy
    
        private JerseyEurekaHttpClientFactory buildLegacy(Map<String, String> additionalHeaders, boolean systemSSL) {
    		// 1
            EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder()
                    .withClientName(clientName)
                    .withUserAgent("Java-EurekaClient")
                    .withConnectionTimeout(connectionTimeout)
                    .withReadTimeout(readTimeout)
                    .withMaxConnectionsPerHost(maxConnectionsPerHost)
                    .withMaxTotalConnections(maxTotalConnections)
                    .withConnectionIdleTimeout((int) connectionIdleTimeout)
                    .withEncoderWrapper(encoderWrapper)
                    .withDecoderWrapper(decoderWrapper);
			...
            
			// 2
            EurekaJerseyClient jerseyClient = clientBuilder.build();
    		// 3
            ApacheHttpClient4 discoveryApacheClient = jerseyClient.getClient();
            addFilters(discoveryApacheClient);
			// 4
            return new JerseyEurekaHttpClientFactory(jerseyClient, additionalHeaders);
        }
  • 1处,通过我们传入的一些参数,以及该类自身的一些field,好比connectionTimeout、readTimeout、maxTotalConnections、maxConnectionsPerHost这些,组织一个builder。

    这些参数,已经看出来,是网络通讯所需要的器械了

  • 2处,通过1处的builder,挪用build,拿到了EurekaJerseyClient类型的工具,可以说,这里着实是已经把客户端组织好了。也就是说,在组织这个工厂的过程中,着实已经在天生对应的产物了

  • 3处,对2处拿到的客户端,做一些处置

  • 4处,将2处拿到的客户端,封装到了工厂的一些field中,后续挪用工厂生产产物的时刻,直接从field中取就行了。

        public JerseyEurekaHttpClientFactory(EurekaJerseyClient jerseyClient, Map<String, String> additionalHeaders) {
            this(jerseyClient, null, -1, additionalHeaders);
        }
    	private JerseyEurekaHttpClientFactory(EurekaJerseyClient jerseyClient,
                                              ApacheHttpClient4 apacheClient,
                                              long connectionIdleTimeout,
                                              Map<String, String> additionalHeaders) {
            this.jerseyClient = jerseyClient;
            this.apacheClient = jerseyClient != null ? jerseyClient.getClient() : apacheClient;
            this.additionalHeaders = additionalHeaders;
        }
    

以是,我们的重点,要放在2处的build身上。

	com.netflix.discovery.shared.transport.jersey.EurekaJerseyClientImpl.EurekaJerseyClientBuilder#build
	public EurekaJerseyClient build() {
            MyDefaultApacheHttpClient4Config config = new MyDefaultApacheHttpClient4Config();
            try {
                // 1
                return new EurekaJerseyClientImpl(connectionTimeout, readTimeout, connectionIdleTimeout, config);
            } catch (Throwable e) {
                throw new RuntimeException("Cannot create Jersey client ", e);
            }
        }

接下来看1处:

public EurekaJerseyClientImpl(int connectionTimeout, int readTimeout, final int connectionIdleTimeout,ClientConfig clientConfig) {
        try {
            jerseyClientConfig = clientConfig;
            // 1
            apacheHttpClient = ApacheHttpClient4.create(jerseyClientConfig);
            // 2
            HttpParams params = apacheHttpClient.getClientHandler().getHttpClient().getParams();

            HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
            HttpConnectionParams.setSoTimeout(params, readTimeout);
			
        } catch (Throwable e) {
            throw new RuntimeException("Cannot create Jersey client", e);
        }
    }
  • 1处,建立com.sun.jersey.client.apache4.ApacheHttpClient4类型的工具

    该类型,就位于:

        <dependency>
          <groupId>com.sun.jersey.contribs</groupId>
          <artifactId>jersey-apache-client4</artifactId>
          <version>1.19.1</version>
          <scope>runtime</scope>
        </dependency>
    
    
        public static ApacheHttpClient4 create(final ClientConfig cc) {
            return new ApacheHttpClient4(createDefaultClientHandler(cc), cc);
        }
    

    这里的createDefaultClientHandler(cc),内里会去建立HttpClient。

    private static ApacheHttpClient4Handler createDefaultClientHandler(final ClientConfig cc) {
    		...
    
    		// 1
            final DefaultHttpClient client = new DefaultHttpClient(
                    (ClientConnectionManager)connectionManager,
                    (HttpParams)httpParams
            );
    
            ...
    		
            return new ApacheHttpClient4Handler(client, cookieStore, preemptiveBasicAuth);
        }
    

    这内里细节省略了部门,主要就是1处,建立了HttpClient,这个就是平时我们用来发http请求的谁人。

  • 2处,设置一些参数,这里的HttpParams,从哪儿取出来的?apacheHttpClient.getClientHandler().getHttpClient()。这里取到的,已经是HttpClient了。

    到此为止,我们可以看看httpParams中有哪些header:

    曹工说mini-dubbo(2)--分析eureka client源码,想办法把我们的服务提供者注册到eureka server(上)

在详细工厂基础上,对注册用的工厂举行封装

        com.netflix.discovery.DiscoveryClient#scheduleServerEndpointTask
        // 1    
		if (clientConfig.shouldRegisterWithEureka()) {
            EurekaHttpClientFactory newRegistrationClientFactory = null;
            EurekaHttpClient newRegistrationClient = null;
            // 2
            newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                eurekaTransport.bootstrapResolver,
                eurekaTransport.transportClientFactory,
                transportConfig
            );
            // 3
            newRegistrationClient = newRegistrationClientFactory.newClient();
            // 4
            eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
            eurekaTransport.registrationClient = newRegistrationClient;
        }

我们前面的n步,已经把通讯用的客户端,及对应的工厂,都已经建立出来了,为啥这里又要建立什么工厂。

简朴来说,前面的工厂,造出来的客户端,通讯是没问题了;然则,你通讯失败了,要重试吗,重试的话,换哪一台呢?你每次通讯是乐成,照样失败,照样超时,需要统计吗?一个生产级的框架,是要有这些功效的。

以是,这里主要是举行一些上层的封装。

ok,继续剖析上面的代码。

  • 1处,判断是否要注册到eureka
  • 2处,天生一个工厂,该工厂卖力生产:注册用的客户端
  • 3处,使用2处拿到的工厂,建立注册用的客户端
  • 4处,把3处拿到的客户端,存储到eurekaTransport的field中。

继续深入2处。

    com.netflix.discovery.shared.transport.EurekaHttpClients#canonicalClientFactory
	static EurekaHttpClientFactory canonicalClientFactory(
        final String name,
        final EurekaTransportConfig transportConfig,
        final ClusterResolver<EurekaEndpoint> clusterResolver,
        final TransportClientFactory transportClientFactory) {
		// 1
        return new EurekaHttpClientFactory() {
            // 2
            @Override
            public EurekaHttpClient newClient() {
                // 3
                return new SessionedEurekaHttpClient(
                        name,
                        RetryableEurekaHttpClient.createFactory(...),
                        transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000
                );
            }

            @Override
            public void shutdown() {
                wrapClosable(clusterResolver).shutdown();
            }
        };
    }
  • 1处,返回了一个工厂工具
  • 2处,工厂里重写了newClient
  • 3处,返回了一个包装过的EurekaClient。

可以看下这里返回的SessionedEurekaHttpClient类。

曹工说mini-dubbo(2)--分析eureka client源码,想办法把我们的服务提供者注册到eureka server(上)

这里就是装饰器模式,对enreka举行了层层封装,和 java 的 io 流那样明白就对了。

在详细工厂基础上,对查询用的工厂举行封装

		// 1
		if (clientConfig.shouldFetchRegistry()) {
            EurekaHttpClientFactory newQueryClientFactory = null;
            EurekaHttpClient newQueryClient = null;
            // 2
            newQueryClientFactory = EurekaHttpClients.queryClientFactory(
                eurekaTransport.bootstrapResolver,
                eurekaTransport.transportClientFactory,
                clientConfig,
                transportConfig,
                applicationInfoManager.getInfo(),
                applicationsSource,
                endpointRandomizer
            );
            // 3
            newQueryClient = newQueryClientFactory.newClient();
            eurekaTransport.queryClientFactory = newQueryClientFactory;
            eurekaTransport.queryClient = newQueryClient;
        }

这里的代码,和上面基本相似。只不外,这里是给查询用的,所谓查询,就是去eureka server获取信息,好比服务提供者列表啥的。

  • 1处,判断是否要去eureka server获取
  • 2处,建立查询用的工厂
  • 3处,行使2处拿到的工厂,建立查询客户端

步骤8:去eureka server获取服务提供者信息

我们终于把步骤7讲完了,着实有点长。

com.netflix.discovery.DiscoveryClient#DiscoveryClient(...)
    
// 1    
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
    // 2
    fetchRegistryFromBackup();
}

这里1处,就是判断要不要去获取,若是要的话,就挪用fetchRegistry(false)

2处,若是1处没取到,则要从backup地方去取。这块可以自己定制backup计谋。

注册到eureka server

        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            // 1
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        }

这里会判断是否要注册,是否要在初始化的时刻注册,若是要的话,进入1处,举行注册。

初始化周期执行的义务

        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        initScheduledTasks();

看这里注释,初始化的义务包罗:集群剖析、心跳、实例信息注册、周期从eureka server获取信息等。

周期义务:获取服务提供者信息

if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

默认30s一次。

周期义务:准时发心跳,向eureka server举行renew

            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

这个也是30s。

心跳包,基本就是个put请求,内里携带了2个参数。

@Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {
            WebResource webResource = jerseyClient.resource(serviceUrl)
                    .path(urlPath)
                    .queryParam("status", info.getStatus().toString())
                    .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());

曹工说mini-dubbo(2)--分析eureka client源码,想办法把我们的服务提供者注册到eureka server(上)

周期义务:InstanceInfoReplicator

这个义务,默认也是30s执行一次。

            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

这个义务,着实现了runnable,注释如下:


/**
 * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are:
 * - 1 configured with a single update thread to guarantee sequential update to the remote server
 * - 2 update tasks can be scheduled on-demand via onDemandUpdate()
 * - 3 task processing is rate limited by burstSize
 * - 4 a new update task is always scheduled automatically after an earlier update task.  However if an on-demand task is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new
 *   on-demand update).
 *
 *   @author dliu
 */
class InstanceInfoReplicator implements Runnable 
  • 1处,设置了一个单线程,保证向远程eureka server,顺序更新
  • 2处,通过本类的onDemandUpdate,可以强行插入一个义务,而无需通过准时执行
  • 3处,限流相关
  • 4处,执行完一个周期义务后,马上会给自己放置下一个周期义务

其run方式:

    public void run() {
        try {
            // 1
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                // 2
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        }finally {
            // 3
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
  • 1处,刷新实例信息
  • 2处,若是有需要的话,向eureka server举行注册
  • 3处,调剂下一次义务

初始化竣事

基本,这个CloudEurekaClient组织就竣事了,后续就依赖其开启的一堆准时义务去举行事情。

总结

eureka client的初始化就讲了这么多,注册还没讲,留带下一讲吧。

原创文章,作者:28x29新闻网,如若转载,请注明出处:https://www.28x29.com/archives/12730.html