类似zk同步,都会使用配置进行维护信息
因为有
@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")
所以配置要加上
http:
url : http://localhost:9095
才能启动,很尴尬的是没启动起来。
报错:
Caused by: org.springframework.web.client.HttpClientErrorException$Unauthorized: 401 : [{"code":600,"message":"token is error"}
这同步为啥还要token?
admin中需要加
soul :
sync:
http:
enabled: true
启动成功
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")
@Slf4j
public class HttpSyncDataConfiguration {
/**
* Http sync data service.
*
* @param httpConfig the http config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
@Bean
// http长轮询服务初始化
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use http long pull sync soul data");
return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Http config http config.
*
* @return the http config
*/
@Bean
@ConfigurationProperties(prefix = "soul.sync.http")
// http长轮询配置
public HttpConfig httpConfig() {
return new HttpConfig();
}
}
HttpSyncDataService中第一次启动会进行初始化以及服务启动
private void start() {
// It could be initialized multiple times, so you need to control that.
// 为什么会初始化很多次?
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
this.fetchGroupConfig(ConfigGroupEnum.values());
int threadSize = serverList.size();
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
SoulThreadFactory.create("http-long-polling", true));
// start long polling, each server creates a thread to listen for changes.
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
log.info("soul http long polling was started, executor=[{}]", executor);
}
}
线程池以守护线程的方式运行,一直保活,在服务进行初始化的情况下会一直进行轮询,
网友评论