1.Kafka中事务处理的实现方式
1.1.注解@Transactional
1.1.1.开启生产者工厂的事务功能,并设置TransactionIdPrefix
KafkaConfiguration.transProducerFactory()
1.1.2.使用以上生产者工厂来创建事务管理类
KafkaConfiguration.transactionManager()
1.1.3.在发送消息的方法上使用注解@Transactional
1.1.4.设置Listener读取信息的事务处理级别properties = {"isolation.level:read_committed"}
1.1.5.设置setAllowNonTransactional
开启事务处理后发送消息时必须使用事务处理, 如果想不使用的话可以使用setAllowNonTransactional KafkaConfiguration.kafkaTemplate()
1.2.发送消息时使用executeInTransaction方法
1.3.使用Produce开启事务处理
KafkaTemplate封装了对Produce的使用,暂时没有调查测试直接使用Produce。
2.相关代码
2.1.KafkaConfiguration.java
@Configuration
@EnableKafka
public class KafkaConfiguration {
private static final Logger log= LoggerFactory.getLogger(KafkaConfiguration.class);
@Autowired
public KafkaProperties kafkaProperties;
@Bean
@Primary
public KafkaTemplate<Integer, String> kafkaTemplate() {
log.info("====kafkaTemplate() start");
this.logKafkaProperties();
KafkaTemplate<Integer,String> template = new KafkaTemplate<Integer, String>(transProducerFactory());
template.setAllowNonTransactional(true);
log.info("====kafkaTemplate() end");
return template;
}
@Bean
public KafkaTemplate<Integer, String> transKafkaTemplate() {
log.info("====kafkaTransTemplate() start");
KafkaTemplate<Integer,String> template = new KafkaTemplate<Integer, String>(transProducerFactory());
log.info("====kafkaTransTemplate() end");
return template;
}
@Bean
public ProducerFactory<Integer, String> transProducerFactory() {
Map<String, Object> producerProps = kafkaProperties.buildProducerProperties();
DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
// 开启生产者工厂的事务功能,并设置TransactionIdPrefix
producerFactory.transactionCapable();
producerFactory.setTransactionIdPrefix("tran-");
return producerFactory;
}
@Bean
public KafkaTransactionManager<Integer, String> transactionManager(ProducerFactory<Integer, String> producerFactory) {
log.info("====transactionManager() start");
// 开启事务功能,我们需要使用生产者工厂来创建这个事务管理类
KafkaTransactionManager<Integer, String> manager = new KafkaTransactionManager<Integer, String>(producerFactory);
log.info("====transactionManager() end");
return manager;
}
private void logKafkaProperties() {
log.info(" ====adminProperties:" + kafkaProperties.buildAdminProperties());
log.info(" ====consumerProperties:" + kafkaProperties.buildConsumerProperties());
log.info(" ====producerProperties:" + kafkaProperties.buildProducerProperties());
log.info(" ====streamsProperties:" + kafkaProperties.buildStreamsProperties());
}
}
2.2.HelloWoldListener.java
@Component
public class HelloWoldListener {
private static final Logger log= LoggerFactory.getLogger(HelloWoldListener.class);
@KafkaListener(
id = "helloWorld",
topics = "topic.quick.helloWorld",
properties = {"isolation.level:read_committed"}
)
public void listen(String msgData) {
log.info("====HelloWoldListener receive : "+msgData);
}
}
2.3.TransHelloWorldTest.java
@SpringBootTest
@RunWith(SpringRunner.class)
public class TransHelloWorldTest {
// 通过名字匹配,所以需要使用@Resource
@Resource
private KafkaTemplate<Integer,String> transKafkaTemplate;
@Test
@Transactional
public void tesTransAnnotationHelloWorld() throws InterruptedException {
System.out.println("====tesTransAnnotationHelloWorld run");
boolean transactionCapable = transKafkaTemplate.getProducerFactory().transactionCapable();
System.out.println(" ====transactionCapable:" + transactionCapable);
System.out.println(" ====isAllowNonTransactional:" + transKafkaTemplate.isAllowNonTransactional());
transKafkaTemplate.send("topic.quick.helloWorld", "this is my first trans demo, Transaction Annotation Hello World!!!");
throw new RuntimeException("Exception for trans annotation test.");
}
@Test
public void testTransExecuteInTransaction() throws Exception {
System.out.println("====testTransExecuteInTransaction run");
boolean transactionCapable = transKafkaTemplate.getProducerFactory().transactionCapable();
System.out.println(" ====transactionCapable:" + transactionCapable);
System.out.println(" ====isAllowNonTransactional:" + transKafkaTemplate.isAllowNonTransactional());
transKafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Integer, String, Object>() {
@Override
public Object doInOperations(KafkaOperations<Integer, String> kafkaOperations) {
kafkaOperations.send("topic.quick.helloWorld", "this is my second trans demo, Transaction Execute Hello World!!!");
throw new RuntimeException("Exception for trans execute test.");
}
});
}
}
网友评论