/**
* 初始化加载快照数据,此处注意加载最新的快照数据和优先加载快照
* @throws MetaClientException
*/
public void init() throws MetaClientException{
//先加载快照数据
initSnapshotData();
//从消息获取更新数据
MetaClientConfig metaClientConfig=new MetaClientConfig();
ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect=zkConnect;
metaClientConfig.setZkConfig(zkConfig);
final BroadcastMessageSessionFactory sessionFactory = new MetaBroadcastMessageSessionFactory(metaClientConfig);
consumer = sessionFactory.createBroadcastConsumer(new ConsumerConfig(group));
/* 本地测试时使用
final MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
ConsumerConfig consumerConfig=new ConsumerConfig(group);
consumer = sessionFactory.createConsumer(consumerConfig); */
//启动消费
consumer();
//设置关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
consumer.shutdown();//关闭消息消费
saveSnapshot();
businessLog.info("consumer shutdown Hook new snapshot");
} catch (MetaClientException e) {
businessLog.error("consumer shutdown error",e);
}
}
});
businessLog.info("mahoutDataModelManager init ok");
}
网友评论