专业的JAVA编程教程与资源

网站首页 > java教程 正文

SpringBoot整合Nacos配置中心加载原理二

temp10 2024-10-11 18:22:02 java教程 15 ℃ 0 评论

上文说到在BootStrap阶段中加载了自动配置类NacosConfigBootstrapConfiguration中导入了三个Bean对象,其中有一个Bean没分析,它是NacosConfigManager,这节我将详细分析这个类的源码。

NacosConfigManager主要有以下几个功能:

SpringBoot整合Nacos配置中心加载原理二

  1. 配置加载:

通过调用 Nacos 服务器的 API,从 Nacos 配置中心加载应用程序所需的配置信息。这包括获取配置的内容、数据ID、分组等信息。NacosConfigManager 负责将配置信息加载到内存中供应用程序使用。

  1. 动态刷新:

提供配置的动态刷新机制。NacosConfigManager 允许应用程序在运行时动态刷新配置,以便及时响应配置的变化。通过监听 Nacos 配置中心的配置变更事件,NacosConfigManager 能够感知到配置的修改并触发相应的刷新操作。

  1. 异常处理和容错:

处理从 Nacos 服务器获取配置时可能发生的异常情况,例如网络异常、连接超时等。NacosConfigManager 实现了一些容错机制,包括重试、回退到本地缓存等,以确保在配置获取失败时有适当的处理方式。

  1. 本地缓存:

为了提高性能和降低对 Nacos 服务器的频繁访问,NacosConfigManager 可以使用本地缓存来存储已获取的配置信息。这有助于减轻配置中心的负载,并在需要时从本地缓存中快速获取配置。

下面我们看看这个类的信息代码:


package com.alibaba.cloud.nacos;

import java.util.Objects;

import com.alibaba.cloud.nacos.diagnostics.analyzer.NacosConnectionFailureException;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author zkzlx
 */
public class NacosConfigManager {

   private static final Logger log = LoggerFactory.getLogger(NacosConfigManager.class);

   private static ConfigService service = null;

   private NacosConfigProperties nacosConfigProperties;

   public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
      this.nacosConfigProperties = nacosConfigProperties;
      // Compatible with older code in NacosConfigProperties,It will be deleted in the
      // future.
      createConfigService(nacosConfigProperties);
   }

   /**
    * Compatible with old design,It will be perfected in the future.
    */
   static ConfigService createConfigService(
         NacosConfigProperties nacosConfigProperties) {
      if (Objects.isNull(service)) {
         synchronized (NacosConfigManager.class) {
            try {
               if (Objects.isNull(service)) {
                  service = NacosFactory.createConfigService(
                        nacosConfigProperties.assembleConfigServiceProperties());
               }
            }
            catch (NacosException e) {
               log.error(e.getMessage());
               throw new NacosConnectionFailureException(
                     nacosConfigProperties.getServerAddr(), e.getMessage(), e);
            }
         }
      }
      return service;
   }

   public ConfigService getConfigService() {
      if (Objects.isNull(service)) {
         createConfigService(this.nacosConfigProperties);
      }
      return service;
   }

   public NacosConfigProperties getNacosConfigProperties() {
      return nacosConfigProperties;
   }

}

在Springboot初始化这个对象时候会调用有参构造函数NacosConfigManager,传入NacosConfigProperties对象,把NacosConfigProperties对象赋值给NacosConfigManager的成员变量NacosConfigProperties,接着调用createConfigService方法,该方法的定义代码如下:

static ConfigService createConfigService(
      NacosConfigProperties nacosConfigProperties) {
   if (Objects.isNull(service)) {
      synchronized (NacosConfigManager.class) {
         try {
            if (Objects.isNull(service)) {
               service = NacosFactory.createConfigService(
                     nacosConfigProperties.assembleConfigServiceProperties());
            }
         }
         catch (NacosException e) {
            log.error(e.getMessage());
            throw new NacosConnectionFailureException(
                  nacosConfigProperties.getServerAddr(), e.getMessage(), e);
         }
      }
   }
   return service;
}

这里面主要是调用NacosFactory.createConfigService方法,该方法主要逻辑是通过反射调用构造函数传入Properties对象,Properties对象主要封装了NacosConfigProperties一些配置信息,创建NacosConfigService对象,主要看NacosConfigService对象的构造函数,代码如下:

public NacosConfigService(Properties properties) throws NacosException {
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        encode = Constants.ENCODE;
    } else {
        encode = encodeTmp.trim();
    }
    initNamespace(properties);
    agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    agent.start();
    worker = new ClientWorker(agent, configFilterChainManager, properties);
}

先实例化一个ServerHttpAgent对象,在该对象中又初始化一个成员属性ServerListManager,调用链是ServerHttpAgent(Properties properties)->

ServerListManager(properties)

ServerListManager内部初始化serverAddrsStr属性,即Nacos服务器url地址,namespace属性即命名空间。

ServerHttpAgent对象又初始化一个成员属性securityProxy,调用链是ServerHttpAgent(Properties properties)->

SecurityProxy(properties)

SecurityProxy内部初始化username属性、password属性、contextPath属性、这些也都是配置文件中配置的内容

创建完ServerHttpAgent对象以后返回到NacosConfigService的构造函数又创建了一个MetricsHttpAgent对象,把上面的ServerHttpAgent传入到MetricsHttpAgent的成员属性httpAgent中,接着在构造函数中调用MetricsHttpAgent的start方法,一步步执行回调用到ServerListManager的start方法,该方法的代码如下:

public synchronized void start() throws NacosException {

    if (isStarted || isFixed) {
        return;
    }

    GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
    for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
        getServersTask.run();
        try {
            this.wait((i + 1) * 100L);
        } catch (Exception e) {
            LOGGER.warn("get serverlist fail,url: {}", addressServerUrl);
        }
    }

    if (serverUrls.isEmpty()) {
        LOGGER.error("[init-serverlist] fail to get NACOS-server serverlist! env: {}, url: {}", name,
            addressServerUrl);
        throw new NacosException(NacosException.SERVER_ERROR,
            "fail to get NACOS-server serverlist! env:" + name + ", not connnect url:" + addressServerUrl);
    }

    TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
    isStarted = true;
}

首先判断isStarted或者isFixed是否为true,由于我们配置了Nacos服务器的url地址,isFixed就是true所以return返回了并不会执行下面的线程池的逻辑,所以这个start方法就不分析了 ,

接着构造方法中又创建了一个ClientWorker对象,该对象的构造函数的代码如下:

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;

    // Initialize the timeout parameter

    init(properties);

    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

创建了两个线程池,一个线程池名称是com.alibaba.nacos.client.Worker+agent名称(简称worker线程),一个线程池名称是com.alibaba.nacos.client.Worker.longPolling+agent名称,worker线程启动一个每隔10毫秒的定时任务,调用checkConfigInfo方法,该方法的代码如下:

public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

判断cacheMap中map的个数,map的定义是这样的,Map<String, CacheData>,首先分析这个Map是如何新增的元素,是调用了ClientWorker对象的addListeners方法,这个方法会在NacosContextRefresher发布事件时候调用的这块的代码是在Applicaton阶段,下一节我会分析那里,我们先看这个方法的代码:

public void addListeners(String dataId, String group, List<? extends Listener> listeners) {
    group = null2defaultGroup(group);
    CacheData cache = addCacheDataIfAbsent(dataId, group);
    for (Listener listener : listeners) {
        cache.addListener(listener);
    }
}

得到分组的dataId,group,根据这两个字段去AtomicReference<Map<String, CacheData>>数据去查询CacheData,如果查询到了直接返回,如果没查询到创建一个CacheData,代码如下:

public CacheData addCacheDataIfAbsent(String dataId, String group) {
    CacheData cache = getCache(dataId, group);
    if (null != cache) {
        return cache;
    }

    String key = GroupKey.getKey(dataId, group);
    cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group);

    synchronized (cacheMap) {
        CacheData cacheFromMap = getCache(dataId, group);
        // multiple listeners on the same dataid+group and race condition,so double check again
        //other listener thread beat me to set to cacheMap
        if (null != cacheFromMap) {
            cache = cacheFromMap;
            //reset so that server not hang this check
            cache.setInitializing(true);
        } else {
            int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
            cache.setTaskId(taskId);
        }

        Map<String, CacheData> copy = new HashMap<String, CacheData>(cacheMap.get());
        copy.put(key, cache);
        cacheMap.set(copy);
    }

    LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);

    MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());

    return cache;
}

加锁cacheMap,防止里面的map并发放置元素产生线程安全问题,用cacheMap.get获取map元素的个数除以一个3000得到一个taskId,设置到CacheData的taskId属性上,最终放到cacheMap数据结构上。此时方法返回到addListeners方法,调用CacheData的addListeners方法把Listener集合添加进去,addListeners方法代码如下:

public void addListener(Listener listener) {
    if (null == listener) {
        throw new IllegalArgumentException("listener is null");
    }
    ManagerListenerWrap wrap = (listener instanceof AbstractConfigChangeListener) ?
        new ManagerListenerWrap(listener, md5, content) : new ManagerListenerWrap(listener, md5);

    if (listeners.addIfAbsent(wrap)) {
        LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
            listeners.size());
    }
}

把Listener包装成ManagerListenerWrap对象,传入md5,content,这些值都是什么意思,我会在分析调用这个addListener方法时候会具体分析,把ManagerListenerWrap对象添加到数据结构是CopyOnWriteArrayList<ManagerListenerWrap>这个对象中,好了回到checkConfigInfo方法,假如我们添加了一个元素,通过int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize())获取到longingTaskCount是1会调用LongPollingRunnable方法,该方法的逻辑主要是先调用checkUpdateDataIds方法,这个方法主要是去服务器端获取变化的配置文件然后解析出变化的内容,根据变化内容重新计算CacheData的属性MD5值和之前老的MD5值进行对比,如果不一样会回调ClientWorker的addListeners的linstener,这个是下节分析的内容。

总结:

在Bootstrap 阶段加载了 NacosConfigBootstrapConfiguration 自动配置类,导入了NacosConfigManager。NacosConfigManager 主要功能包括配置加载、动态刷新、异常处理和容错、本地缓存。还具体分析了 NacosConfigManager 类的源码,该类通过调用 Nacos 服务器的 API 从 Nacos 配置中心加载应用程序所需的配置信息,并提供了动态刷新机制。同时,处理异常情况和容错,以及使用本地缓存提高性能。NacosConfigManager 的构造函数接受 NacosConfigProperties 对象,调用 createConfigService 方法创建 ConfigService 对象,该对象用于和 Nacos 服务器通信。分析了 createConfigService 方法内部逻辑,包括通过 NacosFactory 创建 ConfigService 对象,并处理连接异常。还分析了 ConfigService 对象的初始化过程,涉及到 ServerHttpAgent、SecurityProxy、MetricsHttpAgent 等对象的创建和配置。还分析了 ClientWorker 对象的创建过程,包括线程池的初始化和定时任务的执行,这里线程池主要是通过长轮询方式去服务器端看是否有变化的内容,如果发生了变更,那么客户端会根据变更的数据获得最新的配置。给出一个简单的截图把

总体来说,NacosConfigManager 在 Spring Boot 项目中负责与 Nacos 配置中心交互,加载配置信息,并提供动态刷新机制。通过本地缓存和容错机制,确保在配置获取失败时有适当的处理方式。在初始化过程中,涉及到与 Nacos 服务器通信的各个组件的创建和配置。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表