Nacos 注册中心 - 健康检查机制源码

目录

1. 健康检查介绍

2. 客户端健康检查

2.1 临时实例的健康检查

2.2 永久实例的健康检查

3. 服务端健康检查

3.1 临时实例的健康检查

3.2 永久实例服务端健康检查


1. 健康检查介绍

当一个服务实例注册到 Nacos 中后,其他服务就可以从 Nacos 中查询出该服务实例信息,就可以调用使用了。

然而服务提供者如果此时挂掉了,此时其他服务拿到信息后就会调用不通,所以Nacos中的服务信息应该有一个更新机制(即删除掉挂掉的服务)

那么服务注册信息应该如何维护呢,那就是判断某个服务实例是否有问题,如果检测到服务实例出现问题了就将他剔除掉。

那么如何判断 服务实例 是否有问题呢?这就是健康检查要做的事情,即检查服务实例的健康状态。不健康则剔除下线。

下面看看客户端和服务端为了实现健康检查功能都各自做了哪些事情。

2. 客户端健康检查

2.1 临时实例的健康检查

从 Nacos 2.x 开始,临时实例的服务注册由原来的 HTTP 更换为了 GRPC 长连接方式。Nacos Client 和 Nacos Server 之间建立的 RPC 长连接,服务注册、服务取消注册等接口都是通过 GRPC 消息与服务端通信的。

GRPC 长连接是一直存在的,只要连接一直存在就代表Nacos Client 和 Nacos Server 之间的连接是通的,Nacos Client 则一直在线。如果Nacos Client 由于网络问题等其他问题挂掉了,那么这条长连接也会断开连接。

那么服务实例如何算健康呢,就是长连接一直存在没有断那就算健康的。

如果连接断掉了,那么该客户端上注册的全部服务实例都是不健康的了。

GRPC 长连接如果想一直保证连接状态,就需要定时发送心跳包,以确保连接处于活动的状态。否则一段时间不操作的话就会自动断开连接。

接下来看看 NacosClient 如何开启 RPC 长连接的

NacosClient 操作注册中心的 API 是通过 NamingService 进行的

 

在 NacosNamingService 的构造器中调用了 init 初始化方法:init 方法最后

进入最后一行代码:

再看最后 new NamingGrpcClientProxy 的源码:

public class NamingGrpcClientProxy {
    
 
    
    public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
            NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) {
        
        // 省略部分代码
        
        // 创建 RPC Client
        this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
        
        // 启动 RPC Client
        start(serverListFactory, serviceInfoHolder);
    }
    
    private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        rpcClient.serverListFactory(serverListFactory);
        rpcClient.registerConnectionListener(redoService);
        rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
        // 启动
        rpcClient.start();
       
    }

看看 RpcClient.start 做了什么

public abstract class RpcClient {
    
    protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
    
    public final void start() {
        // 省略部分代码
        
        // connection event consumer.
        clientEventExecutor.submit(() -> {
            while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
                ConnectionEvent take = eventLinkedBlockingQueue.take();
                    if (take.isConnected()) {
                        notifyConnected();
                    } else if (take.isDisConnected()) {
                        notifyDisConnected();
                    }
            }
        });
        
        Connection connectToServer = null;
        
        // 状态设置为启动中
        rpcClientStatus.set(RpcClientStatus.STARTING);
        
        int startUpRetryTimes = rpcClientConfig.retryTimes();
        while (startUpRetryTimes > 0 && connectToServer == null) {
            startUpRetryTimes--;
            ServerInfo serverInfo = nextRpcServer();
              
            // 建立连接
            connectToServer = connectToServer(serverInfo);
        }
        
        this.currentConnection = connectToServer;
        
        // 状态设置为 运行中
        rpcClientStatus.set(RpcClientStatus.RUNNING);
    
        eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
        
        // 省略部分代码
    }
    
}

省略了一些代码,这次只关注核心的两个点

1. 找到下一个 RPC Server 建立连接

因为 Nacos 支持集群部署,此时的 RPC Server List 其实就是这些集群节点。也就是找到集群下一个节点建议连接,如果连接失败就走到下一轮循环再获取到下一个节点继续连接(再重试次数内循环)

2. eventLinkedBlockingQueue 队列中加入一项

eventLinkedBlockingQueue 队列里存的是 ConnectionEvent,ConnectionEvent 代表一个连接事件,事件有 已连接事件、断开连接事件。

public class ConnectionEvent {
        
    public static final int CONNECTED = 1;
        
    public static final int DISCONNECTED = 0;
        
    int eventType;
        
    public ConnectionEvent(int eventType) {
        this.eventType = eventType;
    }
        
    public boolean isConnected() {
        return eventType == CONNECTED;
    }
        
    public boolean isDisConnected() {
        return eventType == DISCONNECTED;
    }
}

可见上面的源码,连接建立成功后,就会往队列压入一个 已连接事件 CONNECTED

队列事件的消费者在哪里呢?

便是在 start 方法的最开头定义的,while 循环不断从队列中获取到数据然后根据事件类型,进行各自的通知。

    public final void start() {
        // 省略部分代码
        
        // connection event consumer.
        clientEventExecutor.submit(() -> {
            while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
                ConnectionEvent take = eventLinkedBlockingQueue.take();
                
                    if (take.isConnected()) {
                        // 通知连接
                        notifyConnected();
                    } else if (take.isDisConnected()) {
                        // 通知断开连接
                        notifyDisConnected();
                    }
            }
        });
    }

notifyConnected 和 notifyDisConnected 实现差不多,所以这里只看一个的实现源码。

protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();
​
protected void notifyConnected() {
    // 省略部分源码
    if (connectionEventListeners.isEmpty()) {
        return;
    }
     
    // 循环全部监听器 一个个回调
    for (ConnectionEventListener connectionEventListener : connectionEventListeners) {                      connectionEventListener.onConnected();    
    }
}

connectionEventListeners 里的数据是什么时候加入的呢?

那就是在 NamingGrpcClientProxy.start 方法

 

public class NamingGrpcRedoService implements ConnectionEventListener {
    
    private volatile boolean connected = false;
    
    @Override
    public void onConnected() {
        // 建立连接,改变连接状态
        connected = true;
    }
    
    @Override
    public void onDisConnect() {
        connected = false;
        
        // 将 redoService 上的全部缓存数据一改
        synchronized (registeredInstances) {
            registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));
        }
        synchronized (subscribes) {
            subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));
        }
    }

当GRPC 长连接断开后就会进入 onDisConnect 事件回调中,这里改变了setRegistered 状态

上篇说过,redoService的作用 Nacos 注册中心 - 服务注册源码

此时会走入服务卸载流程。

2.2 永久实例的健康检查

永久实例客户端只负责提交一个请求即完成了全部操作。

健康检查工作由 服务端做。

3. 服务端健康检查

在收到客户端建立连接事件回调后,会调用 init 方法

public class IpPortBasedClient extends AbstractClient {
    
    public void init() {
        if (ephemeral) {
            beatCheckTask = new ClientBeatCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(beatCheckTask);
        } else {
            healthCheckTaskV2 = new HealthCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
        }
    }   
}

如果当前是临时实例:使用 ClientBeatCheckTaskV2 处理健康检查

如果当前是永久实例:使用 HealthCheckTaskV2处理健康检查

然后将任务放到线程池中执行定时执行

3.1 临时实例的健康检查

看看 ClientBeatCheckTaskV2 如何实现:

public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {
    
    // 省略部分代码
    
    private final IpPortBasedClient client;
    
    private final InstanceBeatCheckTaskInterceptorChain interceptorChain;
    
    // 执行健康检查
    @Override
    public void doHealthCheck() {
        
        // 拿到当前客户端上注册的全部服务
        Collection<Service> services = client.getAllPublishedService();
        
        for (Service each : services) {
            HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client
                        .getInstancePublishInfo(each);
            
            // 将全部服务用拦截器链一个个执行
            interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
        } 
    }
    
    @Override
    public void run() {
        doHealthCheck();
    }
}

看看拦截器链如何实现

这里的拦截器链是一个典型的责任链模式

public abstract class AbstractNamingInterceptorChain<T extends Interceptable>
        implements NacosNamingInterceptorChain<T> {
    
    @Override
    public void doInterceptor(T object) {
        for (NacosNamingInterceptor<T> each : interceptors) {
            if (!each.isInterceptType(object.getClass())) {
                continue;
            }
            
            // 当前是责任节点,直接由该责任节点处理
            if (each.intercept(object)) {
                object.afterIntercept();
                
                // 不往后执行了
                return;
            }
        }
        
        // 如果没有责任节点执行,就调用 passIntercept
        object.passIntercept();
    }
}

首先第一个问题:拦截器都是些什么呢?

 

可见具体的有三个实现类

这三个类代表者三个地方的判断,判断是否开启了 健康心跳检查功能? 如果没开,那就被拦截了呀,就走不到后面的心跳检查代码了

ServiceEnableBeatCheckInterceptor

从 Service 的元数据上判断

public class ServiceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {
    
    @Override
    public boolean intercept(InstanceBeatCheckTask object) {
        NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
        // 获取当前 Service 的元数据
        Optional<ServiceMetadata> metadata = metadataManager.getServiceMetadata(object.getService());
        
        // 如果元数据存在,并且其数据 enableClientBeat 配置了
        if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
            
            // 直接取 enableClientBeat 值
            return Boolean.parseBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT));
        }
        
        return false;
    }
    
 
}

InstanceBeatCheckResponsibleInterceptor

并不是一个客户端要负责集群中全部节点的心跳处理的,而是只负责自己注册的。

public class InstanceBeatCheckResponsibleInterceptor extends AbstractBeatCheckInterceptor {
    
    @Override
    public boolean intercept(InstanceBeatCheckTask object) {
        // 是否是当前责任节点
        return !ApplicationUtils.getBean(DistroMapper.class).responsible(object.getClient().getResponsibleId());
    }
    
}

InstanceEnableBeatCheckInterceptor

这个就是实例级别的健康检查判断

public class InstanceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {
    
    @Override
    public boolean intercept(InstanceBeatCheckTask object) {
        NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
        HealthCheckInstancePublishInfo instance = object.getInstancePublishInfo();
        
        // 获取到实例上的元数据
        Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getMetadataId());
        // 从元数据上取
        if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
            // 元数据存在取该值
            return ConvertUtils.toBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());
        }
        
        // 从 extendDatum 中取数据
        if (instance.getExtendDatum().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
            return ConvertUtils.toBoolean(instance.getExtendDatum().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());
        }
        return false;
    }
   
}

如果都没被上面三个拦截器拦截掉,那就代表 当前实例是 开启了 健康检查,所以后面就要开始进行 检查操作

检查操作由 object.passIntercept(); 做

object.passIntercept(); 是什么呢?

就是刚才的 开始拦截方法最后一行

@Override
public void doInterceptor(T object) {
    for (NacosNamingInterceptor<T> each : interceptors) {
        if (!each.isInterceptType(object.getClass())) {
            continue;
        }
        // 拦截器是否拦截?
        if (each.intercept(object)) {
            object.afterIntercept();
            // 拦截了,直接返回
            return;
        }
    }
    
    // 未被拦截到
    object.passIntercept();
}

object 是什么?

就是之前传过来的 InstanceBeatCheckTask

 

接下来看看 InstanceBeatCheckTask

public class InstanceBeatCheckTask implements Interceptable {
    
    // 全部检查项目
    private static final List<InstanceBeatChecker> CHECKERS = new LinkedList<>();
    
    private final IpPortBasedClient client;
    
    private final Service service;
    
    private final HealthCheckInstancePublishInfo instancePublishInfo;
    
    static {
        // 添加检查项目
        CHECKERS.add(new UnhealthyInstanceChecker());
        CHECKERS.add(new ExpiredInstanceChecker());
        // SPI 机制添加
        CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));
    }
    
    @Override
    public void passIntercept() {
        // 遍历全部检查项目
        for (InstanceBeatChecker each : CHECKERS) {
            // 开始检查
            each.doCheck(client, service, instancePublishInfo);
        }
    }
}

全部检查项目 都是什么呢?

 

下面分别介绍

UnhealthyInstanceChecker

不健康实例检查器

public class UnhealthyInstanceChecker implements InstanceBeatChecker {
    
    // 开始做检查
    @Override
    public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
        if (instance.isHealthy() && isUnhealthy(service, instance)) {
            // 当前实例不健康了 -> 改变健康状态为 不健康
            changeHealthyStatus(client, service, instance);
        }
    }
    
    // 判断实例是否健康
    private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {
        // 获取超时时间 默认 15 秒;可通过配置更改。
        long beatTimeout = getTimeout(service, instance);
        
        // 当前时间距离上一次发送心跳包时间  超过了 规定的超时时间  则返回 true,代表节点不健康了
        return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;
    }
    
​
    // 改变健康状态
    private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {
        instance.setHealthy(false);
            
        // 省略部分代码
    }
    
}

ExpiredInstanceChecker

过期实例检查器

public class ExpiredInstanceChecker implements InstanceBeatChecker {
    
    @Override
    public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
        boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();
        if (expireInstance && isExpireInstance(service, instance)) {
            // 如果实例过期了,则直接剔除实例
            deleteIp(client, service, instance);
        }
    }
    
    private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {
        // 获取超时时间 默认 30 秒;可通过配置更改。
        long deleteTimeout = getTimeout(service, instance);
        
        // 当前时间距离上一次发送心跳包时间  超过了 规定的超时时间  则返回 true,代表节点过期了,需要进行节点剔除操作
        return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;
    }
    
    
    /**
     * 服务直接剔除掉
     */
    private void deleteIp(Client client, Service service, InstancePublishInfo instance) {
        client.removeServiceInstance(service);
        
        // 客户端下线
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));
        // 元数据改变
        NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true));
        // 注销实例
        NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",
                false, DeregisterInstanceReason.HEARTBEAT_EXPIRE, service.getNamespace(), service.getGroup(),
                service.getName(), instance.getIp(), instance.getPort()));
    }

3.2 永久实例服务端健康检查

永久实例的健康检查是服务端主动探测方式,服务端定时外部请求客户端来看是否健康。

public class IpPortBasedClient extends AbstractClient {
    
    public void init() {
        if (ephemeral) {
            beatCheckTask = new ClientBeatCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(beatCheckTask);
        } else {
            healthCheckTaskV2 = new HealthCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
        }
    }   
}

入口类是 HealthCheckTaskV2

public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealthCheckTask {
 
    // 省略部分代码
    
    private final IpPortBasedClient client;
      
    @Override
    public void doHealthCheck() {
         
        // 获取到当前客户端上注册的全部节点
        for (Service each : client.getAllPublishedService()) {
            // 如果开启了健康检查
            if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) {
                
                // 拿到实例注册信息
                InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each);
                // 拿到集群元数据
                ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo);
                
                // 调用 HealthCheckProcessorV2Delegate.process()
                ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(this, each, metadata);
​
                }
            }
    }
​
    
    @Override
    public void run() {
        doHealthCheck();
    }
    
}

最终调用了 HealthCheckProcessorV2Delegate.process 方法

看看如何实现

HealthCheckProcessorV2Delegate 这个类就是一个委托类

public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {
    
    // 类型,健康检查实现类
    private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
​
    
    @Autowired
    public void addProcessor(Collection<HealthCheckProcessorV2> processors) {
        healthCheckProcessorMap.putAll(processors.stream().filter(processor -> processor.getType() != null)
                .collect(Collectors.toMap(HealthCheckProcessorV2::getType, processor -> processor)));
    }
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
        // 从元数据中获取到当前的健康检查类型 (HTTP、MySQL、TCP、None)
        String type = metadata.getHealthyCheckType();
        
        // 根据类型找到具体的 健康检查类
        HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type);
        if (processor == null) {
            // 找不到 就使用 None 健康检查
            processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);
        }
        
        // 开始进行健康检查
        processor.process(task, service, metadata);
    }
}

健康检查有如下几类,还可通过 SPI 方式扩展

下面一个一个介绍

NoneHealthCheckProcessor

None 代表不做健康检查,所以这个类的Process 为空实现

public class NoneHealthCheckProcessor implements HealthCheckProcessorV2 {
    
    public static final String TYPE = HealthCheckType.NONE.name();
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
    
    }
    
    @Override
    public String getType() {
        return TYPE;
    }
}

TcpHealthCheckProcessor

TCP 健康检查,用于通过 TCP 方式检查是否健康,本质上是通过建立 Socket 连接,发送 Socket 信息实现

public class TcpHealthCheckProcessor implements HealthCheckProcessorV2, Runnable {
    
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
        // 省略
    }
  
    // 省略部分代码
    
    
    private class TaskProcessor implements Callable<Void> {
​
        @Override
        public Void call() {
         
            // 发送 Socket 请求
                SocketChannel channel = null;
           
                HealthCheckInstancePublishInfo instance = beat.getInstance();
                
                BeatKey beatKey = keyMap.get(beat.toString());
                if (beatKey != null && beatKey.key.isValid()) {
                    if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
                        instance.finishCheck();
                        return null;
                    }
                    
                    beatKey.key.cancel();
                    beatKey.key.channel().close();
                }
                
                channel = SocketChannel.open();
                channel.configureBlocking(false);
                // only by setting this can we make the socket close event asynchronous
                channel.socket().setSoLinger(false, -1);
                channel.socket().setReuseAddress(true);
                channel.socket().setKeepAlive(true);
                channel.socket().setTcpNoDelay(true);
                
                ClusterMetadata cluster = beat.getMetadata();
                int port = cluster.isUseInstancePortForCheck() ? instance.getPort() : cluster.getHealthyCheckPort();
                channel.connect(new InetSocketAddress(instance.getIp(), port));
                
                SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
                key.attach(beat);
                keyMap.put(beat.toString(), new BeatKey(key));
                
                beat.setStartTime(System.currentTimeMillis());
                
                GlobalExecutor
                        .scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
          
            
            return null;
        }
    }
}

MysqlHealthCheckProcessor

本质是 发送一个 sql。(sql从配置中获取),整个过程没报异常就算健康

public class MysqlHealthCheckProcessor implements HealthCheckProcessorV2 {
    
    public static final String TYPE = HealthCheckType.MYSQL.name();
    
    private final HealthCheckCommonV2 healthCheckCommon;
    
    private final SwitchDomain switchDomain;
    
    public static final int CONNECT_TIMEOUT_MS = 500;
    
    private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'";
    
    private static final String MYSQL_SLAVE_READONLY = "ON";
    
    private static final ConcurrentMap<String, Connection> CONNECTION_POOL = new ConcurrentHashMap<String, Connection>();
    
    public MysqlHealthCheckProcessor(HealthCheckCommonV2 healthCheckCommon, SwitchDomain switchDomain) {
        this.healthCheckCommon = healthCheckCommon;
        this.switchDomain = switchDomain;
    }
    
    @Override
    public String getType() {
        return TYPE;
    }
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
      
        // 省略
    }
    
    private class MysqlCheckTask implements Runnable {
        
        @Override
        public void run() {
            
            Statement statement = null;
            ResultSet resultSet = null;
            
        
                String clusterName = instance.getCluster();
                String key =
                        service.getGroupedServiceName() + ":" + clusterName + ":" + instance.getIp() + ":" + instance
                                .getPort();
                Connection connection = CONNECTION_POOL.get(key);
                Mysql config = (Mysql) metadata.getHealthChecker();
                
                if (connection == null || connection.isClosed()) {
                    String url = "jdbc:mysql://" + instance.getIp() + ":" + instance.getPort() + "?connectTimeout="
                            + CONNECT_TIMEOUT_MS + "&socketTimeout=" + CONNECT_TIMEOUT_MS + "&loginTimeout=" + 1;
                    connection = DriverManager.getConnection(url, config.getUser(), config.getPwd());
                    CONNECTION_POOL.put(key, connection);
                }
                
                statement = connection.createStatement();
                statement.setQueryTimeout(1);
                
                resultSet = statement.executeQuery(config.getCmd());
                int resultColumnIndex = 2;
                
                if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {
                    resultSet.next();
                    if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {
                        throw new IllegalStateException("current node is slave!");
                    }
                }
                
                healthCheckCommon.checkOk(task, service, "mysql:+ok");
                healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
                        switchDomain.getMysqlHealthParams());
​
    }
}

HttpHealthCheckProcessor

本质是发送一个 HTTP 请求,返回状态码 200 就算健康

public class HttpHealthCheckProcessor implements HealthCheckProcessorV2 {
    
    public static final String TYPE = HealthCheckType.HTTP.name();
    
    private static final NacosAsyncRestTemplate ASYNC_REST_TEMPLATE = HttpClientManager
            .getProcessorNacosAsyncRestTemplate();
    
    private final HealthCheckCommonV2 healthCheckCommon;
​
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
        HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
                .getInstancePublishInfo(service);
        if (null == instance) {
            return;
        }
        try {
         
            if (!instance.tryStartCheck()) {
                SRV_LOG.warn("http check started before last one finished, service: {} : {} : {}:{}",
                        service.getGroupedServiceName(), instance.getCluster(), instance.getIp(), instance.getPort());
                healthCheckCommon
                        .reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());
                return;
            }
            
            Http healthChecker = (Http) metadata.getHealthChecker();
            int ckPort = metadata.isUseInstancePortForCheck() ? instance.getPort() : metadata.getHealthyCheckPort();
            URL host = new URL(HTTP_PREFIX + instance.getIp() + ":" + ckPort);
            URL target = new URL(host, healthChecker.getPath());
            Map<String, String> customHeaders = healthChecker.getCustomHeaders();
            Header header = Header.newInstance();
            header.addAll(customHeaders);
            // 发送 HTTP 请求
            ASYNC_REST_TEMPLATE.get(target.toString(), header, Query.EMPTY, String.class,
                    new HttpHealthCheckCallback(instance, task, service));
            MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();
        } catch (Throwable e) {
            instance.setCheckRt(switchDomain.getHttpHealthParams().getMax());
            healthCheckCommon.checkFail(task, service, "http:error:" + e.getMessage());
            healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
                    switchDomain.getHttpHealthParams());
        }
    }
    
    @Override
    public String getType() {
        return TYPE;
    }
    
    private class HttpHealthCheckCallback implements Callback<String> {
        
        private final HealthCheckTaskV2 task;
        
        private final Service service;
        
        private final HealthCheckInstancePublishInfo instance;
        
        private long startTime = System.currentTimeMillis();
        
        public HttpHealthCheckCallback(HealthCheckInstancePublishInfo instance, HealthCheckTaskV2 task,
                Service service) {
            this.instance = instance;
            this.task = task;
            this.service = service;
        }
        
        @Override
        public void onReceive(RestResult<String> result) {
            instance.setCheckRt(System.currentTimeMillis() - startTime);
            int httpCode = result.getCode();
            if (HttpURLConnection.HTTP_OK == httpCode) {
                healthCheckCommon.checkOk(task, service, "http:" + httpCode);
                healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
                        switchDomain.getHttpHealthParams());
            } else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode
                    || HttpURLConnection.HTTP_MOVED_TEMP == httpCode) {
                // server is busy, need verification later
                healthCheckCommon.checkFail(task, service, "http:" + httpCode);
                healthCheckCommon
                        .reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());
            } else {
                //probably means the state files has been removed by administrator
                healthCheckCommon.checkFailNow(task, service, "http:" + httpCode);
                healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
                        switchDomain.getHttpHealthParams());
            }
        }
        
        @Override
        public void onError(Throwable throwable) {
            Throwable cause = throwable;
            instance.setCheckRt(System.currentTimeMillis() - startTime);
            int maxStackDepth = 50;
            for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {
                if (HttpUtils.isTimeoutException(cause)) {
                    healthCheckCommon.checkFail(task, service, "http:" + cause.getMessage());
                    healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,
                            switchDomain.getHttpHealthParams());
                    return;
                }
                cause = cause.getCause();
            }
            
          
            if (throwable instanceof ConnectException) {
                healthCheckCommon.checkFailNow(task, service, "http:unable2connect:" + throwable.getMessage());
            } else {
                healthCheckCommon.checkFail(task, service, "http:error:" + throwable.getMessage());
            }
            healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
                    switchDomain.getHttpHealthParams());
        }
        
        @Override
        public void onCancel() {
        
        }
    }
}
​

可见,不同方式的健康检查差异还是挺大的,那么如果将检查结果告知 Nacos 呢,那就是调用

// 健康检查结果:健康
healthCheckCommon.checkOk(task, service, "");

// 健康检查结果:不健康
healthCheckCommon.checkFail(task, service, "");

健康检查成功

检查成功主要做的事情就是 重置失败次数、结束检查

public void checkOk(HealthCheckTaskV2 task, Service service, String msg) {
    try {
        HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
                .getInstancePublishInfo(service);
        if (instance == null) {
            // 实例不存在,不做处理
            return;
        }
     
            if (!instance.isHealthy()) {
                // 如果实例不健康的,将状态改为 健康
                // 代码省略
            }
        
        } finally {
            // 重置失败次数
            instance.resetFailCount();
            // 结束检查
            instance.finishCheck();
        }
}

健康检查失

public void checkFail(HealthCheckTaskV2 task, Service service, String msg) {
​
        HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
                .getInstancePublishInfo(service);
        if (instance == null) {
            return;
        }
        try {
            if (instance.isHealthy()) {
                // 如果实例是健康的,将状态改为 不健康
                // 代码省略
            }
            
        } finally {
            // 重置健康次数
            instance.resetOkCount();
            // 结束检查
            instance.finishCheck();
        }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/1289.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

jupyter的安装和使用

目录 ❤ Jupyter Notebook是什么&#xff1f; notebook jupyter 简介 notebook jupyter 组成 网页应用 文档 主要特点 ❤ jupyter notebook的安装 notebook jupyter 安装有两种途径 1.通过Anaconda进行安装 2.通过pip进行安装 启动jupyter notebook ❤ jupyter …

10 个超赞的 C 语言开源项目

今天给大家分享10个超赞的C语言开源项目&#xff0c;希望这些内容能对大家有所帮助&#xff01;01.WebbenchWebbench是一个在 Linux 下使用的非常简单的网站压测工具。它使用fork()模拟多个客户端同时访问我们设定的URL&#xff0c;测试网站在压力下工作的性能。最多可以模拟 3…

【深度强化学习】(8) iPPO 模型解析,附Pytorch完整代码

大家好&#xff0c;今天和各位分享一下多智能体深度强化学习算法 ippo&#xff0c;并基于 gym 环境完成一个小案例。完整代码可以从我的 GitHub 中获得&#xff1a;https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model 1. 算法原理 多智能体的情形相比于单智…

《C++ Primer Plus》(第6版)第12章编程练习

《C Primer Plus》&#xff08;第6版&#xff09;第12章编程练习《C Primer Plus》&#xff08;第6版&#xff09;第12章编程练习1. Cow类2. String类3. Stock类4. Stack类5. 排队时间不超过1分钟6. 再开设一台ATM&#xff0c;重新求解第五题《C Primer Plus》&#xff08;第6版…

JAVA 多线程

目录 P1多线程01&#xff1a;概述 P2多线程02&#xff1a;线程、进程、多线程 P3多线程03&#xff1a;继承Thread类 P4多线程04&#xff1a;网图下载 P5多线程05&#xff1a;实现Runnable接口 P6多线程06&#xff1a;初识并发问题 P7多线程07&#xff1a;龟兔赛跑 P8多…

PyTorch深度学习实战 | 搭建卷积神经网络进行图像分类与图像风格迁移

PyTorch是当前主流深度学习框架之一&#xff0c;其设计追求最少的封装、最直观的设计&#xff0c;其简洁优美的特性使得PyTorch代码更易理解&#xff0c;对新手非常友好。本文为实战篇&#xff0c;介绍搭建卷积神经网络进行图像分类与图像风格迁移。1、实验数据准备本文中准备使…

汇编语言与微机原理(1)基础知识

前言&#xff08;1&#xff09;本人使用的是王爽老师的汇编语言第四版和学校发的微机原理教材配合学习。&#xff08;2&#xff09;推荐视频教程通俗易懂的汇编语言&#xff08;王爽老师的书&#xff09;&#xff1b;贺老师C站账号网址&#xff1b;&#xff08;3&#xff09;文…

10分钟搞定win11安卓子系统

10分钟搞定win11安卓子系统Android子系统的要求一、安装 Windows 虚拟化支持二、Win11 正式版安装安卓子系统方法教程 (离线包安装)三、在Win11 安卓子系统安装 APK 软件包教程Windows 11 WSA 安装 APK 方法&#xff1a;Windows 11上成功运行安卓APP安装国内的 Android 应用商店…

Java序列化与反序列化

优秀博文&#xff1a;IT-BLOG-CN 序列化&#xff1a;把对象转换为字节序列存储于磁盘或者进行网络传输的过程称为对象的序列化。 反序列化&#xff1a;把磁盘或网络节点上的字节序列恢复到对象的过程称为对象的反序列化。 一、序列化对象 【1】必须实现序列化接口Serializabl…

Spring注解驱动开发--AOP底层原理

Spring注解驱动开发–AOP底层原理 21. AOP-AOP功能测试 AOP&#xff1a;【动态代理】 指在程序运行期间动态的将某段代码切入到指定方法指定位置进行运行的编程方式&#xff1b; 1、导入aop模块&#xff1a;Spring AOP&#xff0c;(Spring-aspects) 2、定义一个业务逻辑类(Ma…

Git和Github的基本用法(内含如何下载)

Git和Github的基本用法背景下载安装安装 git for windows使用 Github 创建项目注册账号创建项目下载项目到本地Git工作流程Git 操作的三板斧放入代码三板斧第一招: git add三板斧第二招: git commit三板斧第三招: git push小结背景 git是一个版本控制工具. 主要解决三个问题 代…

[ROC-RK3568-PC] [Firefly-Android] 10min带你了解I2C的使用

&#x1f347; 博主主页&#xff1a; 【Systemcall小酒屋】&#x1f347; 博主追寻&#xff1a;热衷于用简单的案例讲述复杂的技术&#xff0c;“假传万卷书&#xff0c;真传一案例”&#xff0c;这是林群院士说过的一句话&#xff0c;另外“成就是最好的老师”&#xff0c;技术…

蓝桥杯嵌入式第八课--EEPROM读写

前言E2PROM的读写主要是考察IIC的使用&#xff0c;但是在比赛当中I2C的各种驱动文件都是直接给出的&#xff0c;因此我们需要做的工作就是根据EEPROM的读写时序配出读写的函数来。EEPROM硬件连接图我们可以看到IIC的数据线&#xff08;已上拉&#xff09;有两路去处&#xff0c…

C语言详解KMP算法

如果给你一个字符串 和 该字符串的一个子字符串 你能否快速找出该子字符串的所在位置我猜 这里会有一群杠精 说可以找到 真的吗 那下面这个字符串你可以一眼看出来吗你能找出来吗 如果能 算你眼神好 如果不能 那就看看接下来我怎么做你有想到暴力求解法吗&#xff1f;——来自百…

Hadoop运行模块

二、Hadoop运行模式 1&#xff09;Hadoop官方网站&#xff1a;http://hadoop.apache.org 2&#xff09;Hadoop运行模式包括&#xff1a;本地模式、伪分布式模式以及完全分布式模式。 本地模式&#xff1a;单机运行&#xff0c;只是用来演示一下官方案例。生产环境不用。伪分…

【数据结构与算法】栈的实现(附源码)

目录 一.栈的概念和结构 二.接口实现 A.初始化 Stackinit 销毁 Stackdestroy 1.Stackinit 2.Stackdestroy B.插入 Stackpush 删除 Stackpop 1.Stackpush 2.Stackpop C.出栈 Stacktop D. 栈的有效元素 Stacksize 判空 Stackempty 1.Stacksize 2.Stackempty …

【DBC专题】-12-不同类型报文(应用/诊断/网关/测量标定)在DBC中配置,以及在Autosar各模块间的信号数据流向

点击返回「Autosar从入门到精通-实战篇」总目录 案例背景(共18页精讲)&#xff1a;该篇博文将告诉您&#xff1a; 1)Autosar中&#xff0c;不同类型报文(App应用&#xff0c;UDS/OBD诊断&#xff0c;NM网络管理报文&#xff0c;XCP测量标定)的信号数据流向&#xff1b; 2)CAN …

Linux串口应用编程

一、 串口API 在Linux系统中,操作设备的统一接口就是:open/ioctl/read/write。 对于UART,又在ioctl之上封装了很多函数,主要是用来设置行规程。 所以对于UART,编程的套路就是: open设置行规程,比如波特率、数据位、停止位、检验位、RAW模式、一有数据就返回read/write 怎么设置…

C语言刷题(7)(字符串旋转问题)——“C”

各位CSDN的uu们你们好呀&#xff0c;今天&#xff0c;小雅兰的内容依旧是复习之前的知识点&#xff0c;那么&#xff0c;就是做一道小小的题目啦&#xff0c;下面&#xff0c;让我们进入C语言的世界吧 实现一个函数&#xff0c;可以左旋字符串中的k个字符。 例如&#xff1a; A…

再也不想去字节跳动面试了,6年测开面试遭到这样打击.....

前几天我朋友跟我吐苦水&#xff0c;这波面试又把他打击到了&#xff0c;做了快6年软件测试员。。。为了进大厂&#xff0c;也花了很多时间和精力在面试准备上&#xff0c;也刷了很多题。但题刷多了之后有点怀疑人生&#xff0c;不知道刷的这些题在之后的工作中能不能用到&…
最新文章