美文网首页
0006.Kafka事务处理

0006.Kafka事务处理

作者: 笑着字太黑 | 来源:发表于2022-04-16 22:55 被阅读0次
1.Kafka中事务处理的实现方式

1.1.注解@Transactional

1.1.1.开启生产者工厂的事务功能,并设置TransactionIdPrefix

KafkaConfiguration.transProducerFactory()

1.1.2.使用以上生产者工厂来创建事务管理类

KafkaConfiguration.transactionManager()

1.1.3.在发送消息的方法上使用注解@Transactional
1.1.4.设置Listener读取信息的事务处理级别

properties = {"isolation.level:read_committed"}

1.1.5.设置setAllowNonTransactional

开启事务处理后发送消息时必须使用事务处理,
如果想不使用的话可以使用setAllowNonTransactional
KafkaConfiguration.kafkaTemplate()

1.2.发送消息时使用executeInTransaction方法
1.3.使用Produce开启事务处理

KafkaTemplate封装了对Produce的使用,暂时没有调查测试直接使用Produce。

2.相关代码

2.1.KafkaConfiguration.java

@Configuration
@EnableKafka
public class KafkaConfiguration {

    private static final Logger log= LoggerFactory.getLogger(KafkaConfiguration.class);

    @Autowired
    public KafkaProperties kafkaProperties;
    
    @Bean
    @Primary
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        log.info("====kafkaTemplate() start");
        this.logKafkaProperties();
        
        KafkaTemplate<Integer,String> template = new KafkaTemplate<Integer, String>(transProducerFactory());
        template.setAllowNonTransactional(true);

        log.info("====kafkaTemplate() end");
        return template;
    }
    
    @Bean
    public KafkaTemplate<Integer, String> transKafkaTemplate() {
        log.info("====kafkaTransTemplate() start");
        
        KafkaTemplate<Integer,String> template = new KafkaTemplate<Integer, String>(transProducerFactory());

        log.info("====kafkaTransTemplate() end");
        return template;
    }
    
    @Bean
    public ProducerFactory<Integer, String> transProducerFactory() {
        Map<String, Object> producerProps = kafkaProperties.buildProducerProperties();
        DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);

        // 开启生产者工厂的事务功能,并设置TransactionIdPrefix
        producerFactory.transactionCapable();
        producerFactory.setTransactionIdPrefix("tran-");
        
        return producerFactory;
    }
    
    @Bean
    public KafkaTransactionManager<Integer, String> transactionManager(ProducerFactory<Integer, String> producerFactory) {
        log.info("====transactionManager() start");
        
        // 开启事务功能,我们需要使用生产者工厂来创建这个事务管理类
        KafkaTransactionManager<Integer, String> manager = new KafkaTransactionManager<Integer, String>(producerFactory);

        log.info("====transactionManager() end");
        return manager;
    }
    
    private void logKafkaProperties() {
        log.info("    ====adminProperties:" + kafkaProperties.buildAdminProperties());
        log.info("    ====consumerProperties:" + kafkaProperties.buildConsumerProperties());
        log.info("    ====producerProperties:" + kafkaProperties.buildProducerProperties());
        log.info("    ====streamsProperties:" + kafkaProperties.buildStreamsProperties());
    }
}

2.2.HelloWoldListener.java

@Component
public class HelloWoldListener {

    private static final Logger log= LoggerFactory.getLogger(HelloWoldListener.class);

    @KafkaListener(
        id = "helloWorld", 
        topics = "topic.quick.helloWorld",
        properties = {"isolation.level:read_committed"}
    )
    public void listen(String msgData) {
        log.info("====HelloWoldListener receive : "+msgData);
    }
}

2.3.TransHelloWorldTest.java

@SpringBootTest
@RunWith(SpringRunner.class)
public class TransHelloWorldTest {

    // 通过名字匹配,所以需要使用@Resource
    @Resource
    private KafkaTemplate<Integer,String> transKafkaTemplate;

    @Test
    @Transactional
    public void tesTransAnnotationHelloWorld() throws InterruptedException {
        System.out.println("====tesTransAnnotationHelloWorld run");
        boolean transactionCapable = transKafkaTemplate.getProducerFactory().transactionCapable();
        System.out.println("    ====transactionCapable:" + transactionCapable);
        System.out.println("    ====isAllowNonTransactional:" + transKafkaTemplate.isAllowNonTransactional());
        transKafkaTemplate.send("topic.quick.helloWorld", "this is my first trans demo, Transaction Annotation Hello World!!!");
        throw new RuntimeException("Exception for trans annotation test.");
    }
    
    @Test
    public void testTransExecuteInTransaction() throws Exception {
        System.out.println("====testTransExecuteInTransaction run");
        boolean transactionCapable = transKafkaTemplate.getProducerFactory().transactionCapable();
        System.out.println("    ====transactionCapable:" + transactionCapable);
        System.out.println("    ====isAllowNonTransactional:" + transKafkaTemplate.isAllowNonTransactional());
        transKafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Integer, String, Object>() {
            @Override
            public Object doInOperations(KafkaOperations<Integer, String> kafkaOperations) {
                kafkaOperations.send("topic.quick.helloWorld", "this is my second trans demo, Transaction Execute Hello World!!!");
                throw new RuntimeException("Exception for trans execute test.");
            }
        });
    }
}

相关文章

  • 0006.Kafka事务处理

    1.Kafka中事务处理的实现方式 1.1.注解@Transactional 1.1.1.开启生产者工厂的事务功能...

  • spring aop

    1、Spring中事务处理的作用: Spring事务处理,是将事务处理的工作统一起来,并为事务处理提供通用的支持。...

  • 关于 tp5 事务操作总结

    前提: 使用事务处理的话,需要数据库引擎支持事务处理。比如 MySQL 的 MyISAM 不支持事务处理,需要使用...

  • MySql___(7) MySQL 必知必会

    第26章 管理事务处理 26.1 事务处理 并非所有引擎都支持事务处理正如第21章所述,MySQL支持几种基本...

  • tp6进行事务操作

    使用事务处理的话,需要数据库引擎支持事务处理。比如 MySQL 的 MyISAM 不支持事务处理,需要使用 Inn...

  • OLAP和OLTP

    联机事务处理(OLTP)(on-line transaction processing)主要执行基本日常的事务处理...

  • SpringBoot(4)

    四 今日目标 事务处理 概念介绍 什么是事务处理? 事务处理就是要保持数据库的安全性。 事务要么完全地执行,要么完...

  • SQL学习十七、事务处理

    事务处理 使用事务处理(transaction processing),通过确保成批的 SQL 操作要么 完全执行...

  • OLTP和OLAP,联机事务处理和联机分析处理

    联机事务处理OLTP(on-line transaction processing) 主要是执行基本日常的事务处理...

  • JAVA利用JDBC对数据库的操作和JDBC编程之事务处理

    JDBC事务处理:

网友评论

      本文标题:0006.Kafka事务处理

      本文链接:https://www.haomeiwen.com/subject/pqtqertx.html