@RestController
@RequestMapping("/api/customer")
public class CustomerResource {
@Autowired
JmsTemplate jmsTemplate;
@Autowired
private CustomerService customerService;
@PostMapping("/message1/listen")
public void createMsgWithListener(@RequestParam String msg) {
jmsTemplate.convertAndSend("customer:msg:new", msg);
}
@PostMapping("/message1/direct")
public void createMsgDirect(@RequestParam String msg) {
customerService.handle(msg);
}
@PostMapping("/message2/listen")
public void createMsg2WithListener(@RequestParam String msg) {
jmsTemplate.convertAndSend("customer:msg2:new", msg);
}
@PostMapping("/message2/direct")
public void createMsg2Direct(@RequestParam String msg) {
customerService.handle2(msg);
}
@GetMapping("/message")
public String getMsg() {
Object reply = jmsTemplate.receiveAndConvert("customer:msg:reply");
return String.valueOf(reply);
}
}
@EnableJms
@Configuration
public class JmsConfig {
private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
@Bean
public JmsTemplate initJmsTemplate(ConnectionFactory connectionFactory) {
LOG.debug("init jms template with converter.");
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory); // JmsTemplate使用的connectionFactory跟JmsTransactionManager使用的必须是同一个,不能在这里封装成caching之类的。
return template;
}
// 这个用于设置 @JmsListener使用的containerFactory
@Bean
public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer,
PlatformTransactionManager transactionManager) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setTransactionManager(transactionManager);
factory.setCacheLevelName("CACHE_CONNECTION");
factory.setReceiveTimeout(10000L);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Service
public class CustomerService {
private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
@Autowired
JmsTemplate jmsTemplate;
@Autowired
private PlatformTransactionManager transactionManager;
@PostConstruct
public void init() {
jmsTemplate.setReceiveTimeout(3000);
}
@JmsListener(destination = "customer:msg:new", containerFactory = "msgFactory")
public void handle(String msg) {
LOG.debug("Get JMS message to from customer:{}", msg);
String reply = "Replied - " + msg;
jmsTemplate.convertAndSend("customer:msg:reply", reply);
if (msg.contains("error")) {
simulateError();
}
}
@JmsListener(destination = "customer:msg2:new", containerFactory = "msgFactory")
public void handle2(String msg) {
LOG.debug("Get JMS message2 to from customer:{}", msg);
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setTimeout(15);
TransactionStatus status = transactionManager.getTransaction(def);
try {
String reply = "Replied-2 - " + msg;
jmsTemplate.convertAndSend("customer:msg:reply", reply);
if (!msg.contains("error")) {
transactionManager.commit(status);
} else {
transactionManager.rollback(status);
}
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
private void simulateError() {
throw new RuntimeException("some Data error.");
}
}
网友评论