1. 开启多个线程
- new 之后通过start
- 或者在重写方法之后(例如:afterPropertiesSet),进行start
int numThreads = 5;
if (numThreads < 1 || numThreads > 5) {
log.error("NUM_THREADS is out of range");
System.exit(-1);
}
ToDoInfConsumer[] toDoInfConsumers = new ToDoInfConsumer[numThreads];
for (int i = 0; i < toDoInfConsumers.length; i++) {
//这有多例,所以注解的时候需要将要生成的consumer设置成多例@Scope("prototype")
toDoInfConsumers[i] = applicationContext.getBean(ToDoInfConsumer.class);
}
for (ToDoInfConsumer toDoInfConsumer : toDoInfConsumers) {
toDoInfConsumer.start();
}
设置完属性之后,开启线程,要继承initialize bean
//注意继承initialize bean
public class BalanceStatusConsumer extends Thread implements InitializingBean{}
//在注入的时候,就会生自动运行run命令
public BalanceStatusConsumer() {
try {
Properties prop = new Properties();
String addrs = BalStsUtil.KAFKA_SERVERS;
String groupId = BalStsUtil.KAFKA_GROUPID;
prop.setProperty("bootstrap.servers", addrs);
prop.setProperty("group.id", groupId);
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("enable.auto.commit", "false");
prop.setProperty("auto.offset.reset", "latest");
prop.put("session.timeout.ms", "50000");
prop.put("request.timeout.ms", "60000");
prop.put("max.poll.records", "50");
consumer = new KafkaConsumer<String, String>(prop);
} catch (Exception e) {
log.error("Initial Consumer fail: " + e);
}
}
//开启线程
@Override
public void afterPropertiesSet() throws Exception {
start();
}
2. 设置环境变量
numThreads = Integer.parseInt(System.getenv("NUM_THREADS"));
run configration --> environment
注意
默认spring中的对象都是单例的,如果要多例需要用户自己设置
网友评论