-
这几部分以业务为主,主要关心数据流向和长轮询的实现,没贴太多代码
一、数据库表:
- portal DB表:
- App、AppNamespace
- 包括普通用户、三方用户、权限、操作记录、收藏等
- 基础配置,包括生效环境、meta地址、部门、接口超时时间等portal的基础配置
- 总体来说portalDB主要围绕用户和界面展示相关的
- protal DB里的App、AppNamespace相关数据都是未生效的,数据放入portal DB后,用spring的发布监听机制,异步把数据同步到各个env的admin DB
- config db表:
- App、Cluster
- AppNamespace:属于App,1对多,代表用户创建过的配置,不可删除,只用于记录,比如用户创建过叫application和db的AppNamespace,继续往下对比和Namespace的区别
- Namespace:属于Cluster,1对多,代表正在使用的配置文件,可以删除,在创建新Cluster时下面默认会创建出application、db,创建的依据是上面的AppNamespace表的记录,可以在后台手动试试,但用户想删除某个app cluster下的配置,删的就是Namespace表了
- commit:提交历史,在执行所有修改操作时增加一条记录,记录修改item操作的增量数据(old and new)
- item:所有具体配置项,未发布过的也都记录在这
- release message:下面写
- server config:一些服务相关配置,和portal类似
二、一些细节:
- cluster不创建就会用default表示,界面上不会显示,但并不是没有
- 每创建一个app默认会创建一个叫application的namespace,对应spring项目的application.properties
- openApi可以通过在portal界面注册三方用户,用HTTP + Token的方式直接调用portal相关接口,不用登陆portal手动点击配置发布等等操作
- namespace公有、私有、关联
- 私有namespace:app私有的namespace,只能本服务获取到
- 公有namespace:其他app可以关联app的公有namespace,app私有namespace如果有相同字段,以私有为准
- 关联namespace:app关联其他公有namespace后会,会在本项目页面显示关联的共有namespace叫“关联namespace”
- 关联公有namespace实际是创建了一条新的namespace给相应app
- apollo通过appId + clusterName + namespaceName定位一个具体namespace,类似于maven的groupId + artifactId,源码里也叫watchKey
三、修改、发布:
- 配置修改时,item表记录配置项,commit表增加一条修改记录,到这里修改部分就结束了,主要看发布
- 配置发布概述,portal向admin service请求是同步,admin service到config service是异步,为了不引入额外组件,admin service写入数据库,config service轮询数据库推送消息的方式代替了消息队列,通过最新数据的ID作为版本号确认是否有配置更新,就是releaseMessage表
- 发布时item和commit表没变化,release、releaseHistory和releaseMessage会增加一条记录
- release表:记录namespace的定位、namespace配置发布的记录
- releaseHistory表:记录操作日志,上次releaseId、本次releaseId、定位、操作内容(发布、回滚、灰度相关操作)
- releaseMessage表:模拟MQ推送,根据namespace定位、ID拉取数据
- 发送消息时,releaseMessage的ID会存到一个BlockingQueue里,config service会每5s取一次BlockingQueue的数据取完为止,因为只记录定位,同一namespace多次发布releaseMessage表里会有重复数据,需要定时清理,只保留最新一条数据
- config service每秒扫描是否有更新,更新时取出遍历所有listener推送Client,这个监听器里触发更新后的一些处理,包括清缓存、通知Client等等,最后更新缓存最大ID记录
@Override
public void afterPropertiesSet() throws Exception {
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
maxIdScanned = loadLargestMessageId();
executorService.scheduleWithFixedDelay(() -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
scanMissingMessages();
scanMessages();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Scan and send message failed", ex);
} finally {
transaction.complete();
}
}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
}
private boolean scanAndSendMessages() {
// 一次扫500,多了下次再扫
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
// 遍历所有注册上来的监听器,通知client
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
// check id gaps, possible reasons are release message not committed yet or already rolled back
if (newMaxIdScanned - maxIdScanned > messageScanned) {
recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
}
maxIdScanned = newMaxIdScanned;
return messageScanned == 500;
}
/**
* 遍历所有listener推送message
* @param messages
*/
private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
try {
listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
}
}
}
}
@Bean
public ReleaseMessageScanner releaseMessageScanner() {
ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
//0\. handle release message cache
releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
//1\. handle gray release rule
releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
//2\. handle server cache
releaseMessageScanner.addMessageListener(configService);
releaseMessageScanner.addMessageListener(configFileController);
//3\. notify clients
releaseMessageScanner.addMessageListener(notificationControllerV2);
releaseMessageScanner.addMessageListener(notificationController);
return releaseMessageScanner;
}
- 关联Client章节关于RemoteConfigLongPollService的描述,RemoteConfigLongPollService长轮训/notifications/v2这个接口,触发更新时返回http200,超时返回http304,NotificationControllerV2这个对象触发DeferredResult响应的逻辑就包括在上面提到的监听回调中
- NotificationControllerV2里面两个主要方法:
- 接收Client长轮询的Controller (pollNotification)
- 上面提到的ReleaseMessageListener的回调方法handleMessage
- pollNotification:
- Client通过请求告知要监听的app、cluster、cluster下当前Client所有namespaces的release message表ID
- 和样例类似,区别只是源码接口一次请求监听app+cluster下的所有namespace,还有一些额外的处理逻辑,比如等待响应期间数据库连接断掉避免资源浪费,controller中还会检查release message ID是否已经更新,如果已更新就直接响应给Client
- Client请求超时90s,DeferredResult超时60s构成长轮询,Client接收到响应后会立即再次请求,代码见Client包RemoteConfigLongPollService
四、服务注册、发现:
- 回顾:Meta Server = config service + meta service + eureka在同一进程
- 启动eureka:
- 在config service启动类标注了@EnableEurekaServer注解启动eureka
- 注册到eureka:
- biz包的ApolloEurekaClientConfig类重写了获取eureka服务地址的方法,admin和config service都会注册到eureka
- 发现:
- Core包里写过MetaDomainConsts用SPI获取了用户配的Meta Service地址,Client从这里拿到地址,Portal从数据库、配置获取有兴趣可以看看
- Client和Portal分别有ConfigServiceLocator和AdminServiceAddressLocator会定时从Meta Service发现Config和Admin Service地址,虽然请求发起时也有一些负载策略但Meta Service最好做nginx负载,部署图也展示了
- meta service一共两个接口,分别获取config和admin服务地址,分别实现了nacos、k8s、consul、eureka(默认)的服务发现,eureka实现比较简单,直接从EurekaClient获取AppName、InstanceId、HomePageUrl返回
四、ServerConfig基础配置:
- portal db和config db都有ServerConfig表,和配置文件类似,在表里配置有通用性,不需要每个项目都配置一遍
- RefreshablePropertySource(抽象):继承MapPropertySource,提供抽象refresh(),能看出它是一个能刷新的kv数据源
- PortalDBPropertySource、BizDBPropertySource继承RefreshablePropertySource,refresh()时会从数据库抓取所有配置,并放入PropertySource,分别给portal和admin、config service使用
- RefreshableConfig(抽象):上面是两个代表数据源,这个是代表具体数据配置,对应也有两个实现,这里每60s会刷新数据源
@PostConstruct
public void setup() {
// 获取数据源,因为要区分portal和admin config,所以在实现类提供
propertySources = getRefreshablePropertySources();
if (CollectionUtils.isEmpty(propertySources)) {
throw new IllegalStateException("Property sources can not be empty.");
}
// 刷新数据源并置入environment
for (RefreshablePropertySource propertySource : propertySources) {
propertySource.refresh();
environment.getPropertySources().addLast(propertySource);
}
// 定时刷新
ScheduledExecutorService
executorService =
Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("ConfigRefresher", true));
executorService
.scheduleWithFixedDelay(() -> {
try {
propertySources.forEach(RefreshablePropertySource::refresh);
} catch (Throwable t) {
logger.error("Refresh configs failed.", t);
Tracer.logError("Refresh configs failed.", t);
}
}, CONFIG_REFRESH_INTERVAL, CONFIG_REFRESH_INTERVAL, TimeUnit.SECONDS);
}
- RefreshableConfig对应的两个实现比较简单,根据key获取各种配置
- portal service在页面上还提供了对ServerConfig表配置更新的操作,略
网友评论