一。创建线程池配置类
package com.example.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class TaskPoolConfig {
@Bean("taskExecutor")
public ExecutortaskExecutor(){
ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}
二。用@Async("taskExecutor")调用bean
package com.example.demo.roundone;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import javax.jms.*;
@Component
@EnableAsync
public class Topic {
@Async("taskExecutor")
public void producer(int times, int interval, String topicname, String url, String context, String username, String password) {
try {
//创建连接工厂
ConnectionFactory connectionFactory =new ActiveMQConnectionFactory(username, password, url);
//创建连接
Connection connection = connectionFactory.createConnection();
connection.start();
//创建会话
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建目的地
Destination destination = session.createTopic(topicname);
//创建生产者
MessageProducer messageProducer = session.createProducer(destination);
for (int i =1; i < times +1; i++) {
//创建消息
TextMessage textMessage = session.createTextMessage(context + i);
messageProducer.send(textMessage);
System.out.println("发送消息:" + textMessage.getText());
session.commit();
Thread.sleep(interval);
}
if (connection !=null) {
connection.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}
@Async("taskExecutor")
public void consumer(String topicname) {
try {
ConnectionFactory connectionFactory =new ActiveMQConnectionFactory("user",
"user", "failover:(tcp://localhost:61616)?Randomize=false");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicname);
//MessageProducer messageProducer=session.createProducer(null);
MessageConsumer messageConsumer = session.createConsumer(destination);
TextMessage textMessage1 = (TextMessage) messageConsumer.receive();
// messageConsumer.setMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message) {
// try {
// TextMessage textMessage = (TextMessage) messageConsumer.receive();
//
// if (textMessage == null) {
//
// System.out.println("error");
//
// }
//
// System.out.println("收到的内容:" + textMessage.getText());
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// });
while(true){
TextMessage textMessage=(TextMessage)messageConsumer.receive();
if(textMessage==null){
break;
}
System.out.println("收到的内容:"+textMessage.getText());
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
三。测试
package com.example.demo;
import com.example.demo.config.TaskPoolConfig;
import com.example.demo.roundone.Topic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
@Autowired
Topic topic;
@Test
public void test() {
topic.producer(5,500,"testone","tcp://localhost:61616","this is a test from lyq","user","user");
topic.consumer("testone");
}
@Test
public void contextLoads() {
}
}
网友评论