美文网首页IT@程序员猿媛Java架构技术进阶程序员
Java进阶实录—pringmvc+kafka分布式消息中间件集

Java进阶实录—pringmvc+kafka分布式消息中间件集

作者: 程序员北游 | 来源:发表于2019-04-03 22:32 被阅读37次

    Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案:
    kafka消息平台使用spring+kafka的集成方案,
    详情如下:
    1 . 使用最高版本2.1.0.RELEASE集成jar包:spring-integration-kafka
    2 . Zookeeper、Kafka分布式集群使用init.properties配置化方案。

    kafka.servers=127.0.0.1:9092    
    kafka.topic=xxxooo  
    

    3 . 使用消息生产者spring-context-producer配置化方案。

    <?xml version="1.0" encoding="utf-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
        <!-- 定义producer的参数 -->
        <bean id="producerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    <entry key="group.id" value="2" />
                    <entry key="retries" value="10" />
                    <entry key="batch.size" value="16384" />
                    <entry key="linger.ms" value="1" />
                    <entry key="buffer.memory" value="33554432" />
                    <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer" />
                    <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
                </map>
            </constructor-arg>
        </bean>
        <!-- 创建kafkatemplate需要使用的producerfactory bean -->
        <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <ref bean="producerProperties" />
            </constructor-arg>
        </bean>
        <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
        <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
            <constructor-arg ref="producerFactory" />
            <constructor-arg name="autoFlush" value="true" />
            <property name="defaultTopic" value="test" />
        </bean>
    </beans>
    

    4 . 使用消息消费者spring-context-producer配置化方案。

    <?xml version="1.0" encoding="utf-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
        <!-- 定义consumer的参数 -->
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    <entry key="group.id" value="0" />
                    <entry key="enable.auto.commit" value="true" />
                    <entry key="auto.commit.interval.ms" value="1000" />
                    <entry key="session.timeout.ms" value="15000" />
                    <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer" />
                    <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
                </map>
            </constructor-arg>
        </bean>
        <!-- 创建consumerFactory bean -->
        <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <ref bean="consumerProperties" />
            </constructor-arg>
        </bean>
        <!-- 实际执行消息消费的类 -->
        <bean id="messageListernerConsumerService" class="com.sml.sz.kafka.KafKaConsumer" />
        <!-- 消费者容器配置信息 -->
        <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg value="test" />
            <property name="messageListener" ref="messageListernerConsumerService" />
        </bean>
        <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
        <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
            <constructor-arg ref="consumerFactory" />
            <constructor-arg ref="containerProperties" />
        </bean>
    </beans>
    

    5 . 使用注解方式注入消息类型

    @Autowired
    private KafkaTemplate<xxx, ooo> kafkaTemplate;

    6 . 重写MessageListener 的getMessage方法获取消息(业务实现)

    7 . RestFul服务方式测试消息服务

    @CrossOrigin(origins = "*", maxAge = 3600, methods = { RequestMethod.GET, RequestMethod.POST, RequestMethod.DELETE,  
            RequestMethod.PUT })  
    @RestController  
    @RequestMapping(value = "/rest/kafka")  
    public class KafKaProducer {  
          
        @RequestMapping(value = "/send", method = RequestMethod.GET)  
        public JSONObject save() {  
            System.out.println("+++++++++++++++++++++++++++++++");  
            kafkaTemplate.sendDefault("HongHu KAFKA分布式消息服务测试");    
            return null;  
        }  
          
        @Autowired    
        private KafkaTemplate<Integer, String> kafkaTemplate;  
    }  
    
    @RestController
    public class KafKaConsumer implements MessageListener<Integer, String> {
        @Autowired
        private LogService logService;
        public void onMessage( ConsumerRecord<Integer, String> records )
        {
            System.out.println( "====================" + records );
            Object  o   = records.value();
            Log log = new Log();
            log.setIsNewRecord( true );
            log.setId( IdGen.uuid() );
            log.setTitle( String.valueOf( o ) );
            logService.save( log );
        }
    }
    

    接受消息了------------------:ConsumerRecord(topic = xxxooo, partition = 0, offset = 2489, CreateTime = 1479647648299, checksum = 3372898135, serialized key size = -1, serialized value size = 40, key = null, value = HongHu KAFKA分布式消息服务测试)

    相关文章

      网友评论

        本文标题:Java进阶实录—pringmvc+kafka分布式消息中间件集

        本文链接:https://www.haomeiwen.com/subject/lzgtiqtx.html