【Apollo】【四】发布配置的过程-客户端获取变更通知之长轮询

发布时间 2023-09-01 07:54:55作者: 酷酷-

1  前言

上节我们看到哪里呢?还知道不?

简单回顾下:在门户Portal发布配置后,会调用到 Admin Service 的 publish 方法,publish里会通过消息发送器发送,默认的消息发送器也是唯一的,就是往数据库里保存了一条 ReleaseMessage 记录,基于数据库实现的消息发送与消费,那么谁来消费呢?就是 ReleaseMessageScanner,Config Service里会通过它来异步的每隔一秒读取一次,发现 ReleaseMessage 后,调用其 Listeners 告诉监听者们,有配置变更啦,让他们处理。

那么有哪些监听器呢?

嘿嘿,我们这节就来看看 NotificationControllerV2 ,它得到配置发布的 ReleaseMessage( AppId+Cluster+Namespace ) 后,会通知对应的客户端

2  Config Service 通知客户端的实现方式

  1. 客户端会发起一个Http 请求到 Config Service 的 notifications/v2 接口,也就是NotificationControllerV2 ,参见 RemoteConfigLongPollService 。
  2. NotificationControllerV2 不会立即返回结果,而是通过 Spring DeferredResult 把请求挂起。
  3. 如果在 60 秒内没有该客户端关心的配置发布,那么会返回 Http 状态码 304 给客户端(「304 Not Modified」不具有跳转的含义,表示资源未修改,重定向已存在的缓冲文件,也称缓存重定向,也就是告诉客户端可以继续使用缓存资源,用于缓存控制)。
  4. 如果有该客户端关心的配置发布,NotificationControllerV2 会调用 DeferredResult 的 setResult 方法,传入有配置变化的 namespace 信息,同时该请求会立即返回。客户端从返回的结果中获取到配置变化的 namespace 后,会立即请求 Config Service 获取该 namespace 的最新配置。

Client 如何通过轮询的方式,从 Config Service 读取配置。Client 的轮询包括两部分:

  1. RemoteConfigRepository ,定时轮询 Config Service 的配置读取 /configs/{appId}/{clusterName}/{namespace:.+} 接口。
  2. RemoteConfigLongPollService ,长轮询 Config Service 的配置变更通知 /notifications/v2 接口。
    • 当有新的通知时,触发 RemoteConfigRepository ,立即轮询 Config Service 的配置读取 /configs/{appId}/{clusterName}/{namespace:.+} 接口。

当然我们一节看不了那么多,所以本节主要先看看 NotificationControllerV2 ,也就是长轮询的方式的一个通知变化哈。

另外本文的一些名词统一下哈:

  • 通知编号 = ReleaseMessage.id
  • Watch Key = ReleaseMessage.message

3  NotificationControllerV2

com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2 ,实现 ReleaseMessageListener 接口,通知 Controller ,仅提供 notifications/v2 接口。

3.1  构造方法

/**
 * Watch Key 与 DeferredResultWrapper 的 Multimap
 *
 * Key:Watch Key
 * Value:DeferredResultWrapper 数组
 */
private final Multimap<String, DeferredResultWrapper> deferredResults = Multimaps.synchronizedSetMultimap(HashMultimap.create());

private static final Splitter STRING_SPLITTER = Splitter.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).omitEmptyStrings();
private static final Type notificationsTypeReference = new TypeToken<List<ApolloConfigNotification>>() {}.getType();

/**
 * 通过 ReleaseMessage 的消息内容,获得对应 Namespace 的名字
 */
private static final Function<String, String> retrieveNamespaceFromReleaseMessage =
        releaseMessage -> {
            if (Strings.isNullOrEmpty(releaseMessage)) {
                return null;
            }
            List<String> keys = STRING_SPLITTER.splitToList(releaseMessage);
            //message should be appId+cluster+namespace
            if (keys.size() != 3) {
                logger.error("message format invalid - {}", releaseMessage);
                return null;
            }
            return keys.get(2);
        };

/**
 * 大量通知分批执行 ExecutorService
 */
private final ExecutorService largeNotificationBatchExecutorService;
@Autowired
private WatchKeysUtil watchKeysUtil;
@Autowired
private ReleaseMessageServiceWithCache releaseMessageService;
@Autowired
private EntityManagerUtil entityManagerUtil;
@Autowired
private NamespaceUtil namespaceUtil;
@Autowired
private Gson gson;
@Autowired
private BizConfig bizConfig;

public NotificationControllerV2() {
    largeNotificationBatchExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("NotificationControllerV2", true));
}
  • deferredResults 属性,Watch Key 与 DeferredResultWrapper 的 Multimap 。
    • 在下面,我们会看到大量的 Watch Key 。实际上,目前 Apollo 的实现上,Watch Key 等价于 ReleaseMessage 的通知内容 message 字段。
    • Multimap 指的是 Google Guava Multimap。
    • 在 notifications/v2 中,当请求的 Namespace 暂无新通知时,会将该 Namespace 对应的 Watch Key 们,注册到 deferredResults 中。等到 Namespace 配置发生变更时,在 #handleMessage(...) 中,进行通知。
  • 其他属性,下面使用到,可以回过头看看代码 + 注释。

3.2  pollNotification-客户端长轮询通知

 1 @RequestMapping(method = RequestMethod.GET)
 2 public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
 3         @RequestParam(value = "appId") String appId,
 4         @RequestParam(value = "cluster") String cluster,
 5         @RequestParam(value = "notifications") String notificationsAsString,
 6         @RequestParam(value = "dataCenter", required = false) String dataCenter,
 7         @RequestParam(value = "ip", required = false) String clientIp) {
 8     // 解析 notificationsAsString 参数,创建 ApolloConfigNotification 数组。
 9     List<ApolloConfigNotification> notifications = null;
10     try {
11         notifications = gson.fromJson(notificationsAsString, notificationsTypeReference);
12     } catch (Throwable ex) {
13         Tracer.logError(ex);
14     }
15     if (CollectionUtils.isEmpty(notifications)) {
16         throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
17     }
18 
19     // 创建 DeferredResultWrapper 对象
20     DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
21     // Namespace 集合
22     Set<String> namespaces = Sets.newHashSet();
23     // 客户端的通知 Map 。key 为 Namespace 名,value 为通知编号。
24     Map<String, Long> clientSideNotifications = Maps.newHashMap();
25     // 过滤并创建 ApolloConfigNotification Map
26     Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
27     // 循环 ApolloConfigNotification Map ,初始化上述变量。
28     for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
29         String normalizedNamespace = notificationEntry.getKey();
30         ApolloConfigNotification notification = notificationEntry.getValue();
31         // 添加到 `namespaces` 中。
32         namespaces.add(normalizedNamespace);
33         // 添加到 `clientSideNotifications` 中。
34         clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
35         // 记录名字被归一化的 Namespace 。因为,最终返回给客户端,使用原始的 Namespace 名字,否则客户端无法识别。
36         if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
37             deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
38         }
39     }
40     if (CollectionUtils.isEmpty(namespaces)) {
41         throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
42     }
43 
44     // 组装 Watch Key Multimap
45     Multimap<String, String> watchedKeysMap = watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
46     // 生成 Watch Key 集合
47     Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
48     // 获得 Watch Key 集合中,每个 Watch Key 对应的 ReleaseMessage 记录。
49     List<ReleaseMessage> latestReleaseMessages = releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
50 
51     /**
52      * Manually close the entity manager.
53      * Since for async request, Spring won't do so until the request is finished,
54      * which is unacceptable since we are doing long polling - means the db connection would be hold
55      * for a very long time
56      */
57     // 手动关闭 EntityManager
58     // 因为对于 async 请求,Spring 在请求完成之前不会这样做
59     // 这是不可接受的,因为我们正在做长轮询——意味着 db 连接将被保留很长时间。
60     // 实际上,下面的过程,我们已经不需要 db 连接,因此进行关闭。
61     entityManagerUtil.closeEntityManager();
62     // 获得新的 ApolloConfigNotification 通知数组
63     List<ApolloConfigNotification> newNotifications = getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages);
64     // 若有新的通知,直接设置结果。
65     if (!CollectionUtils.isEmpty(newNotifications)) {
66         deferredResultWrapper.setResult(newNotifications);
67         // 若无新的通知,
68     } else {
69         // 注册超时事件
70         deferredResultWrapper.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys")); // 【TODO 6001】Tracer 日志
71         // 注册结束事件
72         deferredResultWrapper.onCompletion(() -> {
73             // 移除 Watch Key + DeferredResultWrapper 出 `deferredResults`
74             // unregister all keys
75             for (String key : watchedKeys) {
76                 deferredResults.remove(key, deferredResultWrapper);
77             }
78             // 【TODO 6001】Tracer 日志
79             logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
80         });
81 
82         // 注册 Watch Key + DeferredResultWrapper 到 `deferredResults` 中,等待配置发生变化后通知。详见 `#handleMessage(...)` 方法。
83         // register all keys
84         for (String key : watchedKeys) {
85             this.deferredResults.put(key, deferredResultWrapper);
86         }
87 
88         // 【TODO 6001】Tracer 日志
89         logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
90         logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}", watchedKeys, appId, cluster, namespaces, dataCenter);
91     }
92 
93     return deferredResultWrapper.getResult();
94 }
  • GET /notifications/v2 接口,具体 URL 在类上注明。
  • notificationsAsString 请求参数,JSON 字符串,在【第 8 至 17 行】的代码,解析成 List<ApolloConfigNotification> ,表示客户端本地的配置通知信息。
    • 因为一个客户端可以订阅多个 Namespace ,所以该参数是 List 。
    • 我们可以注意到,该接口真正返回的结果也是 List<ApolloConfigNotification> ,仅返回配置发生变化的 Namespace 对应的 ApolloConfigNotification 。也就说,当有几个 配置发生变化的 Namespace ,返回几个对应的 ApolloConfigNotification 。另外,客户端接收到返回后,会增量合并到本地的配置通知信息。客户端下次请求时,使用合并后的配置通知信息。
    • 注意,客户端请求时,只传递 ApolloConfigNotification 的 namespaceName + notificationId ,不传递 messages 。
  • clientIp 请求参数,目前该接口暂时用不到,作为预留参数。? 万一未来在灰度发布需要呢。
  • 第 20 行:创建 DeferredResultWrapper 对象。
  • 第 22 行:创建 Namespace 的名字的集合。
  • 第 24 行:创建客户端的通知信息 Map 。其中,KEY 为 Namespace 的名字,VALUE 为通知编号。
  • 第 26 行:调用 #filterNotifications(appId, notifications) 方法,过滤并创建 ApolloConfigNotification Map 。
  • 第 27 至 39 行:循环 ApolloConfigNotification Map ,初始化上述变量。
    • 第 32 行:添加到 namespaces 中。
    • 第 34 行:添加到 clientSideNotifications 中。
    • 第 35 至 38 行:若 Namespace 的名字被归一化( normalized )了,则调用 DeferredResultWrapper#recordNamespaceNameNormalizedResult(originalNamespaceName, normalizedNamespaceName) 方法,记录名字被归一化的 Namespace 。因为,最终返回给客户端,使用原始的 Namespace 名字,否则客户端无法识别。
  • 第 45 行:调用 WatchKeysUtil#assembleAllWatchKeys(appId, cluster, namespaces, dataCenter) 方法,组装 Watch Key Multimap 。
  • 第 47 行:生成 Watch Key 集合。
  • 第 49 行:调用 ReleaseMessageServiceWithCache#findLatestReleaseMessagesGroupByMessages(watchedKeys) 方法,获得 Watch Key 集合中,每个 Watch Key 对应的最新的 ReleaseMessage 记录。
  • 第 61 行:调用 EntityManagerUtil#closeEntityManager() 方法,手动关闭 EntityManager 。因为对于 async 请求,SpringMVC 在请求完成之前不会这样做。这是不可接受的,因为我们正在做长轮询——意味着 db 连接将被保留很长时间。实际上,下面的过程,我们已经不需要 db 连接,因此进行关闭。
  • 第 63 行:调用 #getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages) 方法,获得新的 ApolloConfigNotification 通知数组。
  • 第 64 至 66 行:若有新的通知,调用 DeferredResultWrapper#setResult(List<ApolloConfigNotification>) 方法,直接设置 DeferredResult 的结果,从而结束长轮询。
  • 第 67 至 91 行:若无新的通知,注册到 deferredResults 中,等到有配置变更或超时。
    • 第 70 行:调用 DeferredResultWrapper#onTimeout(Runnable) 方法,注册超时事件。
    • 第 71 至 80 行:调用 DeferredResultWrapper#onCompletion(Runnable) 方法,注册结束事件。在其内部,移除注册的 Watch Key + DeferredResultWrapper 出 deferredResults 。
    • 第 82 至 86 行:注册 Watch Key + DeferredResultWrapper 到 deferredResults 中,等待配置发生变化后通知。这样,任意一个 Watch Key 对应的 Namespace 对应的配置发生变化时,都可以进行通知,并结束轮询等待。
  • 第 93 行:返回 DeferredResult 对象。

3.2.1  filterNotifications

#filterNotifications(appId, notifications) 方法,过滤并创建 ApolloConfigNotification Map 。其中,KEY 为 Namespace 的名字。代码如下:

 1 private Map<String, ApolloConfigNotification> filterNotifications(String appId, List<ApolloConfigNotification> notifications) {
 2     Map<String, ApolloConfigNotification> filteredNotifications = Maps.newHashMap();
 3     for (ApolloConfigNotification notification : notifications) {
 4         if (Strings.isNullOrEmpty(notification.getNamespaceName())) {
 5             continue;
 6         }
 7         // 若 Namespace 名以 .properties 结尾,移除该结尾,并设置到 ApolloConfigNotification 中。例如 application.properties => application 。
 8         // strip out .properties suffix
 9         String originalNamespace = namespaceUtil.filterNamespaceName(notification.getNamespaceName());
10         notification.setNamespaceName(originalNamespace);
11         // 获得归一化的 Namespace 名字。因为,客户端 Namespace 会填写错大小写。
12         // 例如,数据库中 Namespace 名为 Fx.Apollo ,而客户端 Namespace 名为 fx.Apollo
13         //      通过归一化后,统一为 Fx.Apollo
14         // fix the character case issue, such as FX.apollo <-> fx.apollo
15         String normalizedNamespace = namespaceUtil.normalizeNamespace(appId, originalNamespace);
16 
17         // in case client side namespace name has character case issue and has difference notification ids
18         // such as FX.apollo = 1 but fx.apollo = 2, we should let FX.apollo have the chance to update its notification id
19         // which means we should record FX.apollo = 1 here and ignore fx.apollo = 2
20         // 如果客户端 Namespace 的名字有大小写的问题,并且恰好有不同的通知编号。
21         // 例如 Namespace 名字为 FX.apollo 的通知编号是 1 ,但是 fx.apollo 的通知编号为 2 。
22         //     我们应该让 FX.apollo 可以更新它的通知编号,
23         //     所以,我们使用 FX.apollo 的 ApolloConfigNotification 对象,添加到结果,而忽略 fx.apollo 。
24         if (filteredNotifications.containsKey(normalizedNamespace) &&
25                 filteredNotifications.get(normalizedNamespace).getNotificationId() < notification.getNotificationId()) {
26             continue;
27         }
28 
29         filteredNotifications.put(normalizedNamespace, notification);
30     }
31     return filteredNotifications;
32 }
  • ? 这个方法的逻辑比较“绕”,目的是客户端传递的 Namespace 的名字不是正确的,例如大小写不对,需要做下归一化( normalized )处理。

  • 循环 ApolloConfigNotification 数组。

  • 第 9 至 10 行:调用 NamespaceUtil#filterNamespaceName(namespaceName) 方法,若 Namespace 名以 ".properties" 结尾,移除该结尾,并设置到 ApolloConfigNotification 中。例如: application.properties => application 。代码如下:

    public String filterNamespaceName(String namespaceName) {
        // 若 Namespace 名以 .properties 结尾,移除该结尾,
        if (namespaceName.toLowerCase().endsWith(".properties")) {
            int dotIndex = namespaceName.lastIndexOf(".");
            return namespaceName.substring(0, dotIndex);
        }
        return namespaceName;
    }
  • 第 15 行:调用 NamespaceUtil#normalizeNamespace(appId, originalNamespace) 方法,获得归一化的 Namespace 名字。因为,客户端 Namespace 会填写错大小写。

    • 例如,数据库中 Namespace 名为 "Fx.Apollo" ,而客户端 Namespace 名为 "fx.Apollo" 。通过归一化后,统一为 "Fx.Apollo" 。

    • 代码如下:

       1 @Autowired
       2 private AppNamespaceServiceWithCache appNamespaceServiceWithCache;
       3 
       4 public String normalizeNamespace(String appId, String namespaceName) {
       5     // 获得 App 下的 AppNamespace 对象
       6     AppNamespace appNamespace = appNamespaceServiceWithCache.findByAppIdAndNamespace(appId, namespaceName);
       7     if (appNamespace != null) {
       8         return appNamespace.getName();
       9     }
      10     // 获取不到,说明该 Namespace 可能是关联的
      11     appNamespace = appNamespaceServiceWithCache.findPublicNamespaceByName(namespaceName);
      12     if (appNamespace != null) {
      13         return appNamespace.getName();
      14     }
      15     return namespaceName;
      16 }
      • 第 5 至 9 行:调用 AppNamespaceServiceWithCache#findByAppIdAndNamespace(appId, namespaceName) 方法,获得 App 下的 AppNamespace 对象。
      • 第 10 至 14 行:获取不到,说明该 Namespace 可能是关联类型的,所以调用 AppNamespaceServiceWithCache#findPublicNamespaceByName(namespaceName) 方法,查询公用类型的 AppNamespace 对象。
      • 第 15 行:都查询不到,直接返回。为什么呢?因为 AppNamespaceServiceWithCache 是基于缓存实现,可能对应的 AppNamespace 暂未缓存到内存中。
  • 第 17 至 27 行:如果客户端 Namespace 的名字有大小写的问题,并且恰好有不同的通知编号。例如 Namespace 名字为 "FX.apollo" 的通知编号是 1 ,但是 "fx.apollo" 的通知编号为 2 。我们应该让 "FX.apollo" 可以更新它的通知编号,所以,我们使用 "FX.apollo" 的 ApolloConfigNotification 对象,添加到结果,而忽略 "fx.apollo" 。通过这样的方式,若此时服务器的通知编号为 3 ,那么 "FX.apollo" 的通知编号先更新成 3 ,再下一次长轮询时,"fx.apollo" 的通知编号再更新成 3 。? 比较“绕”,胖友细细品味下,大多数情况下,不会出现这样的情况。

  • 第 29 行:添加到 filteredNotifications 中。

3.2.2  getApolloConfigNotifications

#getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages) 方法,获得新的 ApolloConfigNotification 通知数组。代码如下:

 1 private List<ApolloConfigNotification> getApolloConfigNotifications(Set<String> namespaces,
 2                                                                     Map<String, Long> clientSideNotifications,
 3                                                                     Multimap<String, String> watchedKeysMap,
 4                                                                     List<ReleaseMessage> latestReleaseMessages) {
 5     // 创建 ApolloConfigNotification 数组
 6     List<ApolloConfigNotification> newNotifications = Lists.newArrayList();
 7     if (!CollectionUtils.isEmpty(latestReleaseMessages)) {
 8         // 创建最新通知的 Map 。其中 Key 为 Watch Key 。
 9         Map<String, Long> latestNotifications = Maps.newHashMap();
10         for (ReleaseMessage releaseMessage : latestReleaseMessages) {
11             latestNotifications.put(releaseMessage.getMessage(), releaseMessage.getId());
12         }
13         // 循环 Namespace 的名字的集合,判断是否有配置更新
14         for (String namespace : namespaces) {
15             long clientSideId = clientSideNotifications.get(namespace);
16             long latestId = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER;
17             // 获得 Namespace 对应的 Watch Key 集合
18             Collection<String> namespaceWatchedKeys = watchedKeysMap.get(namespace);
19             // 获得最大的通知编号
20             for (String namespaceWatchedKey : namespaceWatchedKeys) {
21                 long namespaceNotificationId = latestNotifications.getOrDefault(namespaceWatchedKey, ConfigConsts.NOTIFICATION_ID_PLACEHOLDER);
22                 if (namespaceNotificationId > latestId) {
23                     latestId = namespaceNotificationId;
24                 }
25             }
26             // 若服务器的通知编号大于客户端的通知编号,意味着有配置更新
27             if (latestId > clientSideId) {
28                 // 创建 ApolloConfigNotification 对象
29                 ApolloConfigNotification notification = new ApolloConfigNotification(namespace, latestId);
30                 // 循环添加通知编号到 ApolloConfigNotification 中。
31                 namespaceWatchedKeys.stream().filter(latestNotifications::containsKey).forEach(namespaceWatchedKey ->
32                         notification.addMessage(namespaceWatchedKey, latestNotifications.get(namespaceWatchedKey)));
33                 // 添加 ApolloConfigNotification 对象到结果
34                 newNotifications.add(notification);
35             }
36         }
37     }
38     return newNotifications;
39 }
  • 第 6 行:创建新的 ApolloConfigNotification 数组。
  • 第 8 至 12 行:创建最新通知的 Map 。其中,KEY 为 Watch Key 。
  • 第 14 至 37 行:循环 Namespace 的名字的集合,根据 latestNotifications 判断是否有配置更新。
    • 第 18 行:获得 Namespace 对应的 Watch Key 集合。
    • 第 19 至 25 行:获得最大的通知编号。
    • 第 26 至 35 行:若服务器的通知编号大于客户端的通知编号,意味着有配置更新。
      • 第 29 行:创建 ApolloConfigNotification 对象。
      • 第 30 至 32 行:循环调用 ApolloConfigNotification#addMessage(String key, long notificationId) 方法,添加通知编号到 ApolloConfigNotification 中。对于关联类型的 Namespace ,details 会是多个。
      • 第 34 行:添加 ApolloConfigNotification 对象到结果( newNotifications )。
  • 第 38 行:返回 newNotifications 。若非空,说明有配置更新。

3.2.3  handleMessage

#handleMessage(ReleaseMessage, channel) 方法,当有新的 ReleaseMessage 时,通知其对应的 Namespace 的,并且正在等待的请求。代码如下:

 1 @Override
 2 public void handleMessage(ReleaseMessage message, String channel) {
 3     logger.info("message received - channel: {}, message: {}", channel, message);
 4     // 【TODO 6001】Tracer 日志
 5     String content = message.getMessage();
 6     Tracer.logEvent("Apollo.LongPoll.Messages", content);
 7 
 8     // 仅处理 APOLLO_RELEASE_TOPIC
 9     if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
10         return;
11     }
12 
13     // 获得对应的 Namespace 的名字
14     String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
15     if (Strings.isNullOrEmpty(changedNamespace)) {
16         logger.error("message format invalid - {}", content);
17         return;
18     }
19 
20     // `deferredResults` 存在对应的 Watch Key
21     if (!deferredResults.containsKey(content)) {
22         return;
23     }
24 
25     // create a new list to avoid ConcurrentModificationException
26     // 创建 DeferredResultWrapper 数组,避免并发问题。
27     List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
28 
29     // 创建 ApolloConfigNotification 对象
30     ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
31     configNotification.addMessage(content, message.getId());
32 
33     // do async notification if too many clients
34     // 若需要通知的客户端过多,使用 ExecutorService 异步通知,避免“惊群效应”
35     if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
36         largeNotificationBatchExecutorService.submit(() -> {
37             logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
38                     bizConfig.releaseMessageNotificationBatch());
39             for (int i = 0; i < results.size(); i++) {
40                 // 每 N 个客户端,sleep 一段时间。
41                 if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
42                     try {
43                         TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
44                     } catch (InterruptedException e) {
45                         //ignore
46                     }
47                 }
48                 logger.debug("Async notify {}", results.get(i));
49                 // 设置结果
50                 results.get(i).setResult(configNotification);
51             }
52         });
53         return;
54     }
55 
56     logger.debug("Notify {} clients for key {}", results.size(), content);
57     // 设置结果
58     for (DeferredResultWrapper result : results) {
59         result.setResult(configNotification);
60     }
61     logger.debug("Notification completed");
62 }
  • 第 8 至 11 行:仅处理 APOLLO_RELEASE_TOPIC 。

  • 第 13 至 18 行:获得对应的 Namespace 的名字。

  • 第 20 至 23 行:deferredResults 存在对应的 Watch Key。

  • 第 27 行:从 deferredResults 中读取并创建 DeferredResultWrapper 数组,避免并发问题。

  • 第 30 至 31 行:创建 ApolloConfigNotification 对象,并调用 ApolloConfigNotification#addMessage(String key, long notificationId) 方法,添加通知消息明细。此处,details 是一个。

  • 【异步】当需要通知的客户端过多,使用 ExecutorService 异步通知,避免“惊群效应”。? 感谢作者( 宋顺大佬 )的解答:

    假设一个公共 Namespace 有10W 台机器使用,如果该公共 Namespace 发布时直接下发配置更新消息的话,就会导致这 10W 台机器一下子都来请求配置,这动静就有点大了,而且对 Config Service 的压力也会比较大。

    • 数量可通过 ServerConfig "apollo.release-message.notification.batch" 配置,默认 100 。
    • 第 40 至 47 行:每通知 "apollo.release-message.notification.batch" 个客户端,sleep 一段时间。可通过 ServerConfig "apollo.release-message.notification.batch.interval" 配置,默认 100 毫秒。
    • 第 50 行:调用 DeferredResultWrapper#setResult(List<ApolloConfigNotification>) 方法,设置 DeferredResult 的结果,从而结束长轮询。
    • 第 53 行:return 。
  • 【同步】第 57 至 60 行:循环调用 DeferredResultWrapper#setResult(List<ApolloConfigNotification>) 方法,设置 DeferredResult 的结果,从而结束长轮询。

3.3  ApolloConfigNotification

com.ctrip.framework.apollo.core.dto.ApolloConfigNotification ,Apollo 配置通知 DTO 。代码如下:

public class ApolloConfigNotification {

    /**
     * Namespace 名字
     */
    private String namespaceName;
    /**
     * 最新通知编号
     *
     * 目前使用 `ReleaseMessage.id` 。
     */
    private long notificationId;
    /**
     * 通知消息集合
     */
    private volatile ApolloNotificationMessages messages;

    public ApolloConfigNotification(String namespaceName, long notificationId) {
        this.namespaceName = namespaceName;
        this.notificationId = notificationId;
    }

}
  • namespaceName 字段,Namespace 名,指向对应的 Namespace 。因此,一个 Namespace 对应一个 ApolloConfigNotification 对象。
  • notificationId 字段,最新通知编号,目前使用 ReleaseMessage.id 字段。
  • messages 字段,通知消息集合。
    • volatile 修饰,因为存在多线程修改和读取。

    • #addMessage(String key, long notificationId) 方法,添加消息明细到 message 中。代码如下:

      public void addMessage(String key, long notificationId) {
          // 创建 ApolloNotificationMessages 对象
          if (this.messages == null) {
              synchronized (this) {
                  if (this.messages == null) {
                      this.messages = new ApolloNotificationMessages();
                  }
              }
          }
          // 添加到 `messages` 中
          this.messages.put(key, notificationId);
      }

3.3.1  ApolloNotificationMessages

com.ctrip.framework.apollo.core.dto.ApolloNotificationMessages ,Apollo 配置通知消息集合 DTO 。代码如下:

public class ApolloNotificationMessages {

    /**
     * 明细 Map
     *
     * KEY :{appId} "+" {clusterName} "+" {namespace} ,例如:100004458+default+application
     * VALUE :通知编号
     */
    private Map<String, Long> details;

    public ApolloNotificationMessages() {
        this(Maps.<String, Long>newHashMap());
    }

}
  • details 字段,明细 Map 。其中,KEY 是 Watch Key 。

为什么 ApolloConfigNotification 中有 ApolloNotificationMessages ,而且 ApolloNotificationMessages 的 details 字段是 Map ?按道理说,对于一个 Namespace 的通知,使用 ApolloConfigNotification 的 namespaceName + notificationId 已经足够了。但是,在 namespaceName 对应的 Namespace 是关联类型时,会同时查询当前 Namespace + 关联的 Namespace 这两个 Namespace,所以会是多个,使用 Map 数据结构。 当然,对于 /notifications/v2 接口,仅有【直接】获得到配置变化才可能出现 ApolloNotificationMessages.details 为多个的情况。为啥?在 #handleMessage(...) 方法中,一次只处理一条 ReleaseMessage ,因此只会有 ApolloNotificationMessages.details 只会有一个。

put

public void put(String key, long notificationId) {
    details.put(key, notificationId);
}

mergeFrom

public void mergeFrom(ApolloNotificationMessages source) {
    if (source == null) {
        return;
    }
    for (Map.Entry<String, Long> entry : source.getDetails().entrySet()) {
        // to make sure the notification id always grows bigger
        // 只合并新的通知编号大于的情况
        if (this.has(entry.getKey()) && this.get(entry.getKey()) >= entry.getValue()) {
            continue;
        }
        this.put(entry.getKey(), entry.getValue());
    }
}
  • 在客户端中使用,将 Config Service 返回的结果,合并到本地的通知信息中。

3.4  DeferredResultWrapper

com.ctrip.framework.apollo.configservice.wrapper.DeferredResultWrapper ,DeferredResult 包装器,封装 DeferredResult 的公用方法。

3.4.1  构造方法

/**
 * 默认超时时间
 */
private static final long TIMEOUT = 60 * 1000; //60 seconds
/**
 * 未修改时的 ResponseEntity 响应,使用 304 状态码。
 */
private static final ResponseEntity<List<ApolloConfigNotification>> NOT_MODIFIED_RESPONSE_LIST = new ResponseEntity<>(HttpStatus.NOT_MODIFIED);

/**
 * 归一化和原始的 Namespace 的名字的 Map
 */
private Map<String, String> normalizedNamespaceNameToOriginalNamespaceName;
/**
 * 响应的 DeferredResult 对象
 */
private DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result;

public DeferredResultWrapper() {
    result = new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}
  • TIMEOUT 静态属性,默认超时时间。

  • NOT_MODIFIED_RESPONSE_LIST 静态属性,未修改时的 ResponseEntity 响应,使用 304 状态码。

  • normalizedNamespaceNameToOriginalNamespaceName 属性,归一化( normalized )和原始( original )的 Namespace 的名字的 Map 。因为客户端在填写 Namespace 时,写错了名字的大小写。在 Config Service 中,会进行归一化“修复”,方便逻辑的统一编写。但是,最终返回给客户端需要“还原”回原始( original )的 Namespace 的名字,避免客户端无法识别。

    • #recordNamespaceNameNormalizedResult(String originalNamespaceName, String normalizedNamespaceName) 方法,记录归一化和原始的 Namespace 的名字的映射。代码如下:

      public void recordNamespaceNameNormalizedResult(String originalNamespaceName, String normalizedNamespaceName) {
          if (normalizedNamespaceNameToOriginalNamespaceName == null) {
              normalizedNamespaceNameToOriginalNamespaceName = Maps.newHashMap();
          }
          // 添加到 `normalizedNamespaceNameToOriginalNamespaceName` 中
          normalizedNamespaceNameToOriginalNamespaceName.put(normalizedNamespaceName, originalNamespaceName); // 和参数的顺序,相反
      }
  • result 属性,响应的 DeferredResult 对象,在构造方法中初始化。

3.4.2  onTimeout

public void onTimeout(Runnable timeoutCallback) {
    result.onTimeout(timeoutCallback);
}

3.4.3  onCompletion

public void setResult(ApolloConfigNotification notification) {
    setResult(Lists.newArrayList(notification));
}

3.4.4  setResult

public void setResult(ApolloConfigNotification notification) {
    setResult(Lists.newArrayList(notification));
}

public void setResult(List<ApolloConfigNotification> notifications) {
    // 恢复被归一化的 Namespace 的名字为原始的 Namespace 的名字
    if (normalizedNamespaceNameToOriginalNamespaceName != null) {
        notifications.stream().filter(notification -> normalizedNamespaceNameToOriginalNamespaceName.containsKey
                (notification.getNamespaceName())).forEach(notification -> notification.setNamespaceName(
                normalizedNamespaceNameToOriginalNamespaceName.get(notification.getNamespaceName())));
    }
    // 设置结果,并使用 200 状态码。
    result.setResult(new ResponseEntity<>(notifications, HttpStatus.OK));
}

3.5  AppNamespaceServiceWithCache

com.ctrip.framework.apollo.configservice.service.AppNamespaceServiceWithCache ,实现 InitializingBean 接口,缓存 AppNamespace 的 Service 实现类。通过将 AppNamespace 缓存在内存中,提高查询性能。缓存实现方式如下:

  1. 启动时,全量初始化 AppNamespace 到缓存
  2. 考虑 AppNamespace 新增,后台定时任务,定时增量初始化 AppNamespace 到缓存
  3. 考虑 AppNamespace 更新与删除,后台定时任务,定时全量重建 AppNamespace 到缓存

3.5.1  构造方法

private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).skipNulls();

@Autowired
private AppNamespaceRepository appNamespaceRepository;
@Autowired
private BizConfig bizConfig;

/**
 * 增量初始化周期
 */
private int scanInterval;
/**
 * 增量初始化周期单位
 */
private TimeUnit scanIntervalTimeUnit;
/**
 * 重建周期
 */
private int rebuildInterval;
/**
 * 重建周期单位
 */
private TimeUnit rebuildIntervalTimeUnit;
/**
 * 定时任务 ExecutorService
 */
private ScheduledExecutorService scheduledExecutorService;
/**
 * 最后扫描到的 AppNamespace 的编号
 */
private long maxIdScanned;

/**
 * 公用类型的 AppNamespace 的缓存
 *
 * //store namespaceName -> AppNamespace
 */
private CaseInsensitiveMapWrapper<AppNamespace> publicAppNamespaceCache;
/**
 * App 下的 AppNamespace 的缓存
 *
 * store appId+namespaceName -> AppNamespace
 */
private CaseInsensitiveMapWrapper<AppNamespace> appNamespaceCache;
/**
 * AppNamespace 的缓存
 *
 * //store id -> AppNamespace
 */
private Map<Long, AppNamespace> appNamespaceIdCache;

public AppNamespaceServiceWithCache() {
    initialize();
}

private void initialize() {
    maxIdScanned = 0;
    // 创建缓存对象
    publicAppNamespaceCache = new CaseInsensitiveMapWrapper<>(Maps.newConcurrentMap());
    appNamespaceCache = new CaseInsensitiveMapWrapper<>(Maps.newConcurrentMap());
    appNamespaceIdCache = Maps.newConcurrentMap();
    // 创建 ScheduledExecutorService 对象,大小为 1 。
    scheduledExecutorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("AppNamespaceServiceWithCache", true));
}

appNamespaceCache 的 KEY ,通过 #assembleAppNamespaceKey(AppNamespace) 方法,拼接 appId + name 。代码如下:

private String assembleAppNamespaceKey(AppNamespace appNamespace) {
    return STRING_JOINER.join(appNamespace.getAppId(), appNamespace.getName());
}

3.5.2  初始化定时任务

#afterPropertiesSet() 方法,通过 Spring 调用,初始化定时任务。代码如下:

 1 @Override
 2 public void afterPropertiesSet() throws Exception {
 3     // 从 ServerConfig 中,读取定时任务的周期配置
 4     populateDataBaseInterval();
 5     // 全量初始化 AppNamespace 缓存
 6     scanNewAppNamespaces(); // block the startup process until load finished
 7     // 创建定时任务,全量重构 AppNamespace 缓存
 8     scheduledExecutorService.scheduleAtFixedRate(() -> {
 9         // 【TODO 6001】Tracer 日志
10         Transaction transaction = Tracer.newTransaction("Apollo.AppNamespaceServiceWithCache", "rebuildCache");
11         try {
12             // 全量重建 AppNamespace 缓存
13             this.updateAndDeleteCache();
14             // 【TODO 6001】Tracer 日志
15             transaction.setStatus(Transaction.SUCCESS);
16         } catch (Throwable ex) {
17             // 【TODO 6001】Tracer 日志
18             transaction.setStatus(ex);
19             logger.error("Rebuild cache failed", ex);
20         } finally {
21             // 【TODO 6001】Tracer 日志
22             transaction.complete();
23         }
24     }, rebuildInterval, rebuildInterval, rebuildIntervalTimeUnit);
25     // 创建定时任务,增量初始化 AppNamespace 缓存
26     scheduledExecutorService.scheduleWithFixedDelay(this::scanNewAppNamespaces, scanInterval, scanInterval, scanIntervalTimeUnit);
27 }
  • 第 4 行:调用 #populateDataBaseInterval() 方法,从 ServerConfig 中,读取定时任务的周期配置。代码如下:

    private void populateDataBaseInterval() {
        scanInterval = bizConfig.appNamespaceCacheScanInterval(); // "apollo.app-namespace-cache-scan.interval"
        scanIntervalTimeUnit = bizConfig.appNamespaceCacheScanIntervalTimeUnit(); // 默认秒,不可配置
        rebuildInterval = bizConfig.appNamespaceCacheRebuildInterval(); // "apollo.app-namespace-cache-rebuild.interval"
        rebuildIntervalTimeUnit = bizConfig.appNamespaceCacheRebuildIntervalTimeUnit(); // 默认秒,不可配置
    }
  • 第 6 行:调用 #scanNewAppNamespaces() 方法,全量初始化 AppNamespace 缓存。

  • 第 7 至 24 行:创建定时任务,全量重建 AppNamespace 缓存。

    • 第 13 行:调用 #updateAndDeleteCache() 方法,更新和删除 AppNamespace 缓存。 
  • 第 26 行:创建定时任务,增量初始化 AppNamespace 缓存。其内部,调用的也是 #scanNewAppNamespaces() 方法。

3.5.3  scanNewAppNamespaces

private void scanNewAppNamespaces() {
    // 【TODO 6001】Tracer 日志
    Transaction transaction = Tracer.newTransaction("Apollo.AppNamespaceServiceWithCache", "scanNewAppNamespaces");
    try {
        // 加载新的 AppNamespace 们
        this.loadNewAppNamespaces();
        // 【TODO 6001】Tracer 日志
        transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
        // 【TODO 6001】Tracer 日志
        transaction.setStatus(ex);
        logger.error("Load new app namespaces failed", ex);
    } finally {
        // 【TODO 6001】Tracer 日志
        transaction.complete();
    }
}

调用 #loadNewAppNamespaces() 方法,加载新的 AppNamespace 们。代码如下:

private void loadNewAppNamespaces() {
    boolean hasMore = true;
    while (hasMore && !Thread.currentThread().isInterrupted()) { // 循环,直到无新的 AppNamespace
        // current batch is 500
        // 获得大于 maxIdScanned 的 500 条 AppNamespace 记录,按照 id 升序
        List<AppNamespace> appNamespaces = appNamespaceRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
        if (CollectionUtils.isEmpty(appNamespaces)) {
            break;
        }
        // 合并到 AppNamespace 缓存中
        mergeAppNamespaces(appNamespaces);
        // 获得新的 maxIdScanned ,取最后一条记录
        int scanned = appNamespaces.size();
        maxIdScanned = appNamespaces.get(scanned - 1).getId();
        // 若拉取不足 500 条,说明无新消息了
        hasMore = scanned == 500;
        logger.info("Loaded {} new app namespaces with startId {}", scanned, maxIdScanned);
    }
}
  • 调用 #mergeAppNamespaces(appNamespaces) 方法,合并到 AppNamespace 缓存中。代码如下:

    private void mergeAppNamespaces(List<AppNamespace> appNamespaces) {
        for (AppNamespace appNamespace : appNamespaces) {
            // 添加到 `appNamespaceCache` 中
            appNamespaceCache.put(assembleAppNamespaceKey(appNamespace), appNamespace);
            // 添加到 `appNamespaceIdCache`
            appNamespaceIdCache.put(appNamespace.getId(), appNamespace);
            // 若是公用类型,则添加到 `publicAppNamespaceCache` 中
            if (appNamespace.isPublic()) {
                publicAppNamespaceCache.put(appNamespace.getName(), appNamespace);
            }
        }
    }

3.5.4  updateAndDeleteCache

private void updateAndDeleteCache() {
    // 从缓存中,获得所有的 AppNamespace 编号集合
    List<Long> ids = Lists.newArrayList(appNamespaceIdCache.keySet());
    if (CollectionUtils.isEmpty(ids)) {
        return;
    }
    // 每 500 一批,从数据库中查询最新的 AppNamespace 信息
    List<List<Long>> partitionIds = Lists.partition(ids, 500);
    for (List<Long> toRebuild : partitionIds) {
        Iterable<AppNamespace> appNamespaces = appNamespaceRepository.findAll(toRebuild);
        if (appNamespaces == null) {
            continue;
        }
        // 处理更新的情况
        // handle updated
        Set<Long> foundIds = handleUpdatedAppNamespaces(appNamespaces);
        // 处理删除的情况
        // handle deleted
        handleDeletedAppNamespaces(Sets.difference(Sets.newHashSet(toRebuild), foundIds));
    }
}
  • #handleUpdatedAppNamespaces(appNamespaces) 方法,处理更新的情况。代码如下:

    private Set<Long> handleUpdatedAppNamespaces(Iterable<AppNamespace> appNamespaces) {
        Set<Long> foundIds = Sets.newHashSet();
        for (AppNamespace appNamespace : appNamespaces) {
            foundIds.add(appNamespace.getId());
            // 获得缓存中的 AppNamespace 对象
            AppNamespace thatInCache = appNamespaceIdCache.get(appNamespace.getId());
            // 从 DB 中查询到的 AppNamespace 的更新时间更大,才认为是更新
            if (thatInCache != null && appNamespace.getDataChangeLastModifiedTime().after(thatInCache.getDataChangeLastModifiedTime())) {
                // 添加到 appNamespaceIdCache 中
                appNamespaceIdCache.put(appNamespace.getId(), appNamespace);
                // 添加到 appNamespaceCache 中
                String oldKey = assembleAppNamespaceKey(thatInCache);
                String newKey = assembleAppNamespaceKey(appNamespace);
                appNamespaceCache.put(newKey, appNamespace);
                // 当 appId 或 namespaceName 发生改变的情况,将老的移除出 appNamespaceCache
                // in case appId or namespaceName changes
                if (!newKey.equals(oldKey)) {
                    appNamespaceCache.remove(oldKey);
                }
                // 添加到 publicAppNamespaceCache 中
                if (appNamespace.isPublic()) { // 新的是公用类型
                    // 添加到 publicAppNamespaceCache 中
                    publicAppNamespaceCache.put(appNamespace.getName(), appNamespace);
                    // 当 namespaceName 发生改变的情况,将老的移除出 publicAppNamespaceCache
                    // in case namespaceName changes
                    if (!appNamespace.getName().equals(thatInCache.getName()) && thatInCache.isPublic()) {
                        publicAppNamespaceCache.remove(thatInCache.getName());
                    }
                } else if (thatInCache.isPublic()) { // 新的不是公用类型,需要移除
                    //just in case isPublic changes
                    publicAppNamespaceCache.remove(thatInCache.getName());
                }
                logger.info("Found AppNamespace changes, old: {}, new: {}", thatInCache, appNamespace);
            }
        }
        return foundIds;
    }
    • ? 这里相对复杂一些。
  • #handleDeletedAppNamespaces(Set<Long> deletedIds) 方法,处理删除的情况。代码如下:

    private void handleDeletedAppNamespaces(Set<Long> deletedIds) {
        if (CollectionUtils.isEmpty(deletedIds)) {
            return;
        }
        for (Long deletedId : deletedIds) {
            // 从 appNamespaceIdCache 中移除
            AppNamespace deleted = appNamespaceIdCache.remove(deletedId);
            if (deleted == null) {
                continue;
            }
            // 从 appNamespaceCache 中移除
            appNamespaceCache.remove(assembleAppNamespaceKey(deleted));
            // 从 publicAppNamespaceCache 移除
            if (deleted.isPublic()) {
                publicAppNamespaceCache.remove(deleted.getName());
            }
            logger.info("Found AppNamespace deleted, {}", deleted);
        }
    }

3.5.5  findByAppIdAndNamespace

/**
 * 获得 AppNamespace 对象
 *
 * @param appId App 编号
 * @param namespaceName Namespace 名字
 * @return AppNamespace
 */
public AppNamespace findByAppIdAndNamespace(String appId, String namespaceName) {
    Preconditions.checkArgument(!StringUtils.isContainEmpty(appId, namespaceName), "appId and namespaceName must not be empty");
    return appNamespaceCache.get(STRING_JOINER.join(appId, namespaceName));
}

3.5.6  findByAppIdAndNamespaces

/**
 * 获得 AppNamespace 对象数组
 *
 * @param appId App 编号
 * @param namespaceNames Namespace 名字的集合
 * @return AppNamespace 数组
 */
public List<AppNamespace> findByAppIdAndNamespaces(String appId, Set<String> namespaceNames) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(appId), "appId must not be null");
    if (namespaceNames == null || namespaceNames.isEmpty()) {
        return Collections.emptyList();
    }
    List<AppNamespace> result = Lists.newArrayList();
    // 循环获取
    for (String namespaceName : namespaceNames) {
        AppNamespace appNamespace = appNamespaceCache.get(STRING_JOINER.join(appId, namespaceName));
        if (appNamespace != null) {
            result.add(appNamespace);
        }
    }
    return result;
}

3.5.7  findPublicNamespaceByName

/**
 * 获得公用类型的 AppNamespace 对象
 *
 * @param namespaceName Namespace 名字
 * @return AppNamespace
 */
public AppNamespace findPublicNamespaceByName(String namespaceName) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(namespaceName), "namespaceName must not be empty");
    return publicAppNamespaceCache.get(namespaceName);
}

3.5.8  findPublicNamespacesByNames

/**
 * 获得公用类型的 AppNamespace 对象数组
 *
 * @param namespaceNames Namespace 名字的集合
 * @return AppNamespace 数组
 */
public List<AppNamespace> findPublicNamespacesByNames(Set<String> namespaceNames) {
    if (namespaceNames == null || namespaceNames.isEmpty()) {
        return Collections.emptyList();
    }

    List<AppNamespace> result = Lists.newArrayList();
    // 循环获取
    for (String namespaceName : namespaceNames) {
        AppNamespace appNamespace = publicAppNamespaceCache.get(namespaceName);
        if (appNamespace != null) {
            result.add(appNamespace);
        }
    }
    return result;
}

3.6  ReleaseMessageServiceWithCache

com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache ,实现 InitializingBean 和 ReleaseMessageListener 接口,缓存 ReleaseMessage 的 Service 实现类。通过将 ReleaseMessage 缓存在内存中,提高查询性能。缓存实现方式如下:

  1. 启动时,初始化 ReleaseMessage 到缓存。
  2. 新增时,基于 ReleaseMessageListener ,通知有新的 ReleaseMessage ,根据是否有消息间隙,直接使用该 ReleaseMessage 或从数据库读取。

3.6.1  构造方法

@Autowired
private ReleaseMessageRepository releaseMessageRepository;
@Autowired
private BizConfig bizConfig;

/**
 * 扫描周期
 */
private int scanInterval;
/**
 * 扫描周期单位
 */
private TimeUnit scanIntervalTimeUnit;
/**
 * 最后扫描到的 ReleaseMessage 的编号
 */
private volatile long maxIdScanned;
/**
 * ReleaseMessage 缓存
 *
 * KEY:`ReleaseMessage.message`
 * VALUE:对应的最新的 ReleaseMessage 记录
 */
private ConcurrentMap<String, ReleaseMessage> releaseMessageCache;
/**
 * 是否执行扫描任务
 */
private AtomicBoolean doScan;
/**
 * ExecutorService 对象
 */
private ExecutorService executorService;

public ReleaseMessageServiceWithCache() {
    initialize();
}

private void initialize() {
    // 创建缓存对象
    releaseMessageCache = Maps.newConcurrentMap();
    // 设置 doScan 为 true
    doScan = new AtomicBoolean(true);
    // 创建 ScheduledExecutorService 对象,大小为 1 。
    executorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("ReleaseMessageServiceWithCache", true));
}

3.6.2  初始化定时任务

#afterPropertiesSet() 方法,通知 Spring 调用,初始化定时任务。代码如下:

 1 @Override
 2 public void afterPropertiesSet() throws Exception {
 3     // 从 ServerConfig 中,读取任务的周期配置
 4     populateDataBaseInterval();
 5     // 初始拉取 ReleaseMessage 到缓存
 6     //block the startup process until load finished
 7     //this should happen before ReleaseMessageScanner due to autowire
 8     loadReleaseMessages(0);
 9     // 创建定时任务,增量拉取 ReleaseMessage 到缓存,用以处理初始化期间,产生的 ReleaseMessage 遗漏的问题。
10     executorService.submit(() -> {
11         while (doScan.get() && !Thread.currentThread().isInterrupted()) {
12             // 【TODO 6001】Tracer 日志
13             Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageServiceWithCache", "scanNewReleaseMessages");
14             try {
15                 // 增量拉取 ReleaseMessage 到缓存
16                 loadReleaseMessages(maxIdScanned);
17                 // 【TODO 6001】Tracer 日志
18                 transaction.setStatus(Transaction.SUCCESS);
19             } catch (Throwable ex) {
20                 // 【TODO 6001】Tracer 日志
21                 transaction.setStatus(ex);
22                 logger.error("Scan new release messages failed", ex);
23             } finally {
24                 transaction.complete();
25             }
26             try {
27                 scanIntervalTimeUnit.sleep(scanInterval);
28             } catch (InterruptedException e) {
29                 //ignore
30             }
31         }
32     });
33 }
    • 第 4 行:调用 #populateDataBaseInterval() 方法,从 ServerConfig 中,读取定时任务的周期配置。代码如下:

      private void populateDataBaseInterval() {
          scanInterval = bizConfig.releaseMessageCacheScanInterval(); // "apollo.release-message-cache-scan.interval" ,默认为 1 。
          scanIntervalTimeUnit = bizConfig.releaseMessageCacheScanIntervalTimeUnit(); // 默认秒,不可配置。
      }
    • 第 8 行:调用 #loadReleaseMessages(startId) 方法,初始拉取 ReleaseMessage 到缓存。

    • 第 10 至 32 行:创建创建定时任务,增量拉取 ReleaseMessage 到缓存,用以处理初始化期间,产生的 ReleaseMessage 遗漏的问题。为什么会遗漏呢?笔者又去请教作者,? 给 666 个赞。

      1. 20:00:00 程序启动过程中,当前 release message 有 5 条
      2. 20:00:01 loadReleaseMessages(0); 执行完成,获取到 5 条记录
      3. 20:00:02 有一条 release message 新产生,但是因为程序还没启动完,所以不会触发 handle message 操作
      4. 20:00:05 程序启动完成,但是第三步的这条新的 release message 漏了
      5. 20:10:00 假设这时又有一条 release message 产生,这次会触发 handle message ,同时会把第三步的那条 release message 加载到

      所以,定期刷的机制就是为了解决第三步中产生的release message问题。
      当程序启动完,handleMessage生效后,就不需要再定期扫了

      • ReleaseMessageServiceWithCache 初始化在 ReleaseMessageScanner 之前,因此在第 3 步时,ReleaseMessageServiceWithCache 初始化完成之后,ReleaseMessageScanner 初始化之前,产生了一条心的 ReleaseMessage ,会导致 ReleaseMessageScanner.maxIdScanned 大于 ReleaseMessageServiceWithCache.maxIdScanned ,从而导致 ReleaseMessage 的遗漏。

3.6.3  loadReleaseMessages

#loadReleaseMessages(startId) 方法,增量拉取新的 ReleaseMessage 们。代码如下:

private void loadReleaseMessages(long startId) {
    boolean hasMore = true;
    while (hasMore && !Thread.currentThread().isInterrupted()) {
        // current batch is 500
        // 获得大于 maxIdScanned 的 500 条 ReleaseMessage 记录,按照 id 升序
        List<ReleaseMessage> releaseMessages = releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(startId);
        if (CollectionUtils.isEmpty(releaseMessages)) {
            break;
        }
        // 合并到 ReleaseMessage 缓存
        releaseMessages.forEach(this::mergeReleaseMessage);
        // 获得新的 maxIdScanned ,取最后一条记录
        int scanned = releaseMessages.size();
        startId = releaseMessages.get(scanned - 1).getId();
        // 若拉取不足 500 条,说明无新消息了
        hasMore = scanned == 500;
        logger.info("Loaded {} release messages with startId {}", scanned, startId);
    }
}

调用 #mergeAppNamespaces(appNamespaces) 方法,合并到 ReleaseMessage 缓存中。代码如下:

private synchronized void mergeReleaseMessage(ReleaseMessage releaseMessage) {
    // 获得对应的 ReleaseMessage 对象
    ReleaseMessage old = releaseMessageCache.get(releaseMessage.getMessage());
    // 若编号更大,进行更新缓存
    if (old == null || releaseMessage.getId() > old.getId()) {
        releaseMessageCache.put(releaseMessage.getMessage(), releaseMessage);
        maxIdScanned = releaseMessage.getId();
    }
}

3.6.4  handleMessage

 1 @Override
 2 public void handleMessage(ReleaseMessage message, String channel) {
 3     // Could stop once the ReleaseMessageScanner starts to work
 4     // 关闭增量拉取定时任务的执行
 5     doScan.set(false);
 6     logger.info("message received - channel: {}, message: {}", channel, message);
 7 
 8     // 仅处理 APOLLO_RELEASE_TOPIC
 9     String content = message.getMessage();
10     Tracer.logEvent("Apollo.ReleaseMessageService.UpdateCache", String.valueOf(message.getId()));
11     if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
12         return;
13     }
14 
15     // 计算 gap
16     long gap = message.getId() - maxIdScanned;
17     // 若无空缺 gap ,直接合并
18     if (gap == 1) {
19         mergeReleaseMessage(message);
20     // 如有空缺 gap ,增量拉取
21     } else if (gap > 1) {
22         // gap found!
23         loadReleaseMessages(maxIdScanned);
24     }
25 }
  • 第 5 行:关闭增量拉取定时任务的执行。后续通过 ReleaseMessageScanner 通知即可。
  • 第 9 至 13 行:仅处理 APOLLO_RELEASE_TOPIC 。
  • 第 16 行:计算 gap 。
  • 第 18 至 20 行:若无空缺,调用 #mergeReleaseMessage(message) 方法,直接合并即可。
  • 第 21 至 24 行:若有空缺,调用 #loadReleaseMessages(maxIdScanned) 方法,增量拉取。例如,上述的第 3 步,定时任务还来不及拉取( 即未执行 ),ReleaseMessageScanner 就已经通知,此处会产生空缺的

3.6.5  findLatestReleaseMessagesGroupByMessages

/**
 * 获得每条消息内容对应的最新的 ReleaseMessage 对象
 *
 * @param messages 消息内容的集合
 * @return 集合
 */
public List<ReleaseMessage> findLatestReleaseMessagesGroupByMessages(Set<String> messages) {
    if (CollectionUtils.isEmpty(messages)) {
        return Collections.emptyList();
    }
    List<ReleaseMessage> releaseMessages = Lists.newArrayList();
    // 获得每条消息内容对应的最新的 ReleaseMessage 对象
    for (String message : messages) {
        ReleaseMessage releaseMessage = releaseMessageCache.get(message);
        if (releaseMessage != null) {
            releaseMessages.add(releaseMessage);
        }
    }
    return releaseMessages;
}

3.7  WatchKeysUtil

com.ctrip.framework.apollo.configservice.util.WatchKeysUtil ,Watch Key 工具类。

核心的方法为 #assembleAllWatchKeys(appId, clusterName, namespaces, dataCenter) 方法,组装 Watch Key Multimap 。其中 KEY 为 Namespace 的名字,VALUE 为 Watch Key 集合。代码如下:

 1 /**
 2  * 组装所有的 Watch Key Multimap 。其中 Key 为 Namespace 的名字,Value 为 Watch Key 集合。
 3  *
 4  * @param appId App 编号
 5  * @param clusterName Cluster 名
 6  * @param namespaces Namespace 的名字的数组
 7  * @param dataCenter IDC 的 Cluster 名
 8  * @return Watch Key Multimap
 9  */
10 public Multimap<String, String> assembleAllWatchKeys(String appId, String clusterName,
11                                                      Set<String> namespaces,
12                                                      String dataCenter) {
13     // 组装 Watch Key Multimap
14     Multimap<String, String> watchedKeysMap = assembleWatchKeys(appId, clusterName, namespaces, dataCenter);
15 
16     // 如果不是仅监听 'application' Namespace ,处理其关联来的 Namespace 。
17     // Every app has an 'application' namespace
18     if (!(namespaces.size() == 1 && namespaces.contains(ConfigConsts.NAMESPACE_APPLICATION))) {
19         // 获得属于该 App 的 Namespace 的名字的集合
20         Set<String> namespacesBelongToAppId = namespacesBelongToAppId(appId, namespaces);
21         // 获得关联来的 Namespace 的名字的集合
22         Set<String> publicNamespaces = Sets.difference(namespaces, namespacesBelongToAppId);
23         // 添加到 Watch Key Multimap 中
24         // Listen on more namespaces if it's a public namespace
25         if (!publicNamespaces.isEmpty()) {
26             watchedKeysMap.putAll(findPublicConfigWatchKeys(appId, clusterName, publicNamespaces, dataCenter));
27         }
28     }
29 
30     return watchedKeysMap;
31 }
  • 第 14 行:调用 #assembleWatchKeys(appId, clusterName, namespaces, dataCenter) 方法,组装 App 下的 Watch Key Multimap 。
  • 第 18 至 28 行:判断 namespaces 中,可能存在关联类型的 Namespace ,因此需要进一步处理。在这里的判断会比较“绕”,如果 namespaces 仅仅是 "application" 时,那么肯定不存在关联类型的 Namespace 。
    • 第 20 行:调用 #namespacesBelongToAppId(appId, namespaces) 方法,获得属于该 App 的 Namespace 的名字的集合。
    • 第 22 行:通过 Sets#difference(...) 方法,进行集合差异计算,获得关联类型的 Namespace 的名字的集合。
    • 第 25 至 27 行:调用 #findPublicConfigWatchKeys(...) 方法,获得关联类型的 Namespace 的名字的集合的 Watch Key Multimap ,并添加到结果集中。
  • 第 30 行:返回结果集。

3.7.1  assembleWatchKeys

/**
 * 组装 Watch Key Multimap
 *
 * @param appId App 编号
 * @param clusterName Cluster 名
 * @param namespaces Namespace 的名字的集合
 * @param dataCenter IDC 的 Cluster 名字
 * @return Watch Key Multimap
 */
private Multimap<String, String> assembleWatchKeys(String appId, String clusterName, Set<String> namespaces, String dataCenter) {
    Multimap<String, String> watchedKeysMap = HashMultimap.create();
    // 循环 Namespace 的名字的集合
    for (String namespace : namespaces) {
        watchedKeysMap.putAll(namespace, assembleWatchKeys(appId, clusterName, namespace, dataCenter));
    }
    return watchedKeysMap;
}
  • 循环 Namespace 的名字的集合,调用 #assembleWatchKeys(appId, clusterName, namespace, dataCenter) 方法,组装指定 Namespace 的 Watch Key 数组。代码如下:

     1 private Set<String> assembleWatchKeys(String appId, String clusterName, String namespace, String dataCenter) {
     2     if (ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
     3         return Collections.emptySet();
     4     }
     5     Set<String> watchedKeys = Sets.newHashSet();
     6 
     7     // 指定 Cluster
     8     // watch specified cluster config change
     9     if (!Objects.equals(ConfigConsts.CLUSTER_NAME_DEFAULT, clusterName)) {
    10         watchedKeys.add(assembleKey(appId, clusterName, namespace));
    11     }
    12 
    13     // 所属 IDC 的 Cluster
    14     // https://github.com/ctripcorp/apollo/issues/952
    15     // watch data center config change
    16     if (!Strings.isNullOrEmpty(dataCenter) && !Objects.equals(dataCenter, clusterName)) {
    17         watchedKeys.add(assembleKey(appId, dataCenter, namespace));
    18     }
    19 
    20     // 默认 Cluster
    21     // watch default cluster config change
    22     watchedKeys.add(assembleKey(appId, ConfigConsts.CLUSTER_NAME_DEFAULT, namespace));
    23 
    24     return watchedKeys;
    25 }
    • 指定 Cluster 的 Namespace 的 Watch Key 。
    • 所属 IDC 的 Cluster 的 Namespace 的 Watch Key 。关于
    • 默认( "default" ) 的 Cluster 的 Namespace 的 Watch Key 。
    • #assembleKey(appId, clusterName, namespace) 方法,获得 Watch Key。

3.7.2  namespacesBelongToAppId

/**
 * 获得属于该 App 的 Namespace 的名字的集合
 *
 * @param appId App 编号
 * @param namespaces Namespace 名
 * @return 集合
 */
private Set<String> namespacesBelongToAppId(String appId, Set<String> namespaces) {
    if (ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
        return Collections.emptySet();
    }
    // 获得属于该 App 的 AppNamespace 集合
    List<AppNamespace> appNamespaces = appNamespaceService.findByAppIdAndNamespaces(appId, namespaces);
    if (appNamespaces == null || appNamespaces.isEmpty()) {
        return Collections.emptySet();
    }
    // 返回 AppNamespace 的名字的集合
    return appNamespaces.stream().map(AppNamespace::getName).collect(Collectors.toSet());
}

3.7.3  findPublicConfigWatchKeys

@Autowired
private AppNamespaceServiceWithCache appNamespaceService;

/**
 * 获得 Namespace 类型为 public 对应的 Watch Key Multimap
 *
 * 重要:要求非当前 App 的 Namespace
 *
 * @param applicationId App 编号
 * @param clusterName Cluster 名
 * @param namespaces Namespace 的名字的集合
 * @param dataCenter  IDC 的 Cluster 名
 * @return Watch Key Map
 */
private Multimap<String, String> findPublicConfigWatchKeys(String applicationId, String clusterName, Set<String> namespaces, String dataCenter) {
    Multimap<String, String> watchedKeysMap = HashMultimap.create();
    // 获得 Namespace 为 public 的 AppNamespace 数组
    List<AppNamespace> appNamespaces = appNamespaceService.findPublicNamespacesByNames(namespaces);
    // 组装 Watch Key Map
    for (AppNamespace appNamespace : appNamespaces) {
        // 排除非关联类型的 Namespace
        // check whether the namespace's appId equals to current one
        if (Objects.equals(applicationId, appNamespace.getAppId())) {
            continue;
        }
        String publicConfigAppId = appNamespace.getAppId();
        // 组装指定 Namespace 的 Watch Key 数组
        watchedKeysMap.putAll(appNamespace.getName(), assembleWatchKeys(publicConfigAppId, clusterName, appNamespace.getName(), dataCenter));
    }
    return watchedKeysMap;
}

3.7.4  assembleKey

private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);

/**
 * 拼接 Watch Key
 *
 * @param appId App 编号
 * @param cluster Cluster 名
 * @param namespace Namespace 名
 * @return Watch Key
 */
private String assembleKey(String appId, String cluster, String namespace) {
    return STRING_JOINER.join(appId, cluster, namespace);
}

3.8  EntityManagerUtil

com.ctrip.framework.apollo.biz.utils.EntityManagerUtil ,实现 org.springframework.orm.jpa.EntityManagerFactoryAccessor 抽象类,EntityManager 抽象类。代码如下:

@Component
public class EntityManagerUtil extends EntityManagerFactoryAccessor {

    private static final Logger logger = LoggerFactory.getLogger(EntityManagerUtil.class);

    /**
     * close the entity manager.
     * Use it with caution! This is only intended for use with async request, which Spring won't
     * close the entity manager until the async request is finished.
     */
    public void closeEntityManager() {
        // 获得 EntityManagerHolder 对象
        EntityManagerHolder emHolder = (EntityManagerHolder) TransactionSynchronizationManager.getResource(getEntityManagerFactory());
        if (emHolder == null) {
            return;
        }
        logger.debug("Closing JPA EntityManager in EntityManagerUtil");
        // 关闭 EntityManager
        EntityManagerFactoryUtils.closeEntityManager(emHolder.getEntityManager());
    }

}

4  小结

本节我们主要看了下针对客户端向 Config Service 的轮询方式中长轮询的处理哈,东西有点多,有理解不对的地方欢迎指正哈。