美文网首页
spring cloud 12消息驱动

spring cloud 12消息驱动

作者: 西西_20f6 | 来源:发表于2019-06-25 15:21 被阅读0次

    一、spring cloud stream
    使用场景:消息驱动的微服务应用。
    同步的方式:http也是一种消息,包含消息头和消息体,请求响应模型。
    异步的方式:消息中间件
    对比reactive streams
    publisher
    subscriber
    processor既是消费者也是生产者
    topic

    二、主要概念
    • 应⽤模型:中间件核心
    • Binder 抽象
    • 持久化 发布/订阅⽀持:MQ的基本特性
    • 消费分组⽀持:MQ的基本特性
    • 分区⽀持:Kafka

    三、基本概念
    • Source: Stream 发送源
    • 近义词: Producer、 Publisher
    • Sink: Stream 接收器
    • 近义词: Consumer、 Subscriber
    • Processor:既是Producer也是Consumer??
    消息模型的组件都可以看作是管道

    四、编程模型
    • 激活 : @EnableBinding
    • @Configuration
    • @EnableIntegration
    • Source:发送源
    • @Output
    • MessageChannel:发布消息
    通过JMS规范,无论是源还是接受端,都是从中间件(Kafka,rabbitMQ)中寻址,所以我们的消息中间件就像一个路由器,路由到我们的目的地。发送者->中间件->订阅者。

    五、编程模型
    • Sink:接收器,订阅者
    • @Input就会有SubscribableChannel,订阅管道,订阅某个消息。
    一个SubscribableChannel能订阅几个topic?

    三种监听topic的写法:
    • SubscribableChannel:订阅管道
    • @ServiceActivator:监听器
    • @StreamListener:监听器

    六、整合kafka


    image.png

    1,启动zookeeper,端口2181
    进到zookeeper bin目录下
    sh zkServer.sh start

    2,Kafka:启动kafka
    进到Kafka bin目录下
    sh kafka-server-start.sh ../config/server.properties

    3,修改UserApi下的User对象,增加序列化ID
    private static final long serialVersionUID = 4050384138695906201L;

    4,改造user-service-client,消息发送器(采用Kafka原生API)
    (1)引入kafka依赖,这个是spring整合kafka的依赖
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>

    (2)利用kafkaTemplate实现消息发送,spring的Template套路,JDBCTemplate,RedisTemplate,hibernateTemplate,RestTemplate,在UserServiceClientController添加如下代码:

        private final KafkaTemplate<String, Object> kafkaTemplate;
    
        @Autowired
        public UserServiceClientController(KafkaTemplate<String, Object> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
    
        @PostMapping("/saveUserByMessage")
        public boolean save(@RequestBody User user) {
    
            //topic是string,传输对象user是Object类型
            ListenableFuture<SendResult<String, Object>> future =
                    kafkaTemplate.send("sf-users",0,user);
    
            return future.isDone();
        }
    
    

    (3)实现java 序列化器,com.xixi.spring.cloud12.server.user.ribbon.client.serializer.ObjectSerializer

    package com.xixi.spring.cloud12.server.user.ribbon.client.serializer;
    
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectOutputStream;
    import java.io.OutputStream;
    import java.util.Map;
    
    /**将对象转化成字节数组
     * user 序列化器
     */
    public class ObjectSerializer implements Serializer<Object> {
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
    
        }
    
        /**
         * user 对象序列化过程:User对象转成byte[](二进制字节流)
         * @param topic
         * @param data
         * @return
         */
        @Override
        public byte[] serialize(String topic, Object data) {
    
            System.out.println("topic : "+topic+",object "+data);
    
            byte[] objectArray = null;
    
            //构造ObjectOutputStream的时候必须要传一个OutputStream,所以先new一个ByteArrayOutputStream
            OutputStream outputStream = new ByteArrayOutputStream();
    
    
            try {
                ObjectOutputStream objectOutputStream =
                        new ObjectOutputStream(outputStream);
    
                objectOutputStream.writeObject(data);//将对象写入outputStream
    
                objectArray = ((ByteArrayOutputStream) outputStream).toByteArray();//将对象赋值给userArray
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    
            return objectArray;
        }
    
        @Override
        public void close() {
    
        }
    }
    

    (4)顺序启动eureka,config server
    (5)用kafka-console-consumer订阅一个topic
    sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sf-users
    然后我们客户端发送这个topic,kafka-console-consumer就能监控得到。

    5,改造user-service-provider,消息接收器(采用Stream binder的方式)
    (1)引入依赖:这个是spring-cloud-stream整合kafka的依赖,和上面那个不同哦。
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

    (2)定义用户消息接口

    package com.xixi.spring.cloud12.service.stream;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     * 用户消息stream 接口
     *
     * sink,@Input,SubscribableChannel
     *
     *
     */
    public interface UserMessage {
    
        @Input
        SubscribableChannel input();
    }
    

    (3)激活用户消息Stream接口

    @EnableHystrix
    @SpringBootApplication
    @EnableDiscoveryClient
    @EnableBinding(UserMessage.class)//激活stream binding 到UserMessage
    public class UserServiceProviderApplication {
    
        public static void main(String[] args) {
    
            SpringApplication.run(UserServiceProviderApplication.class,args);
        }
    }
    

    (4)在application.properties中配置kafka以及stream destination

    ###spring cloud stream集成kafka配置
    ###spring cloud stream binding配置sink管道的topic
    #spring.cloud.stream.bindings.${channelid}
    ###destination指定Kafka topic
    spring.cloud.stream.bindings.input.destination = sf-users
    ##Kafka的配置:
    # 配置消息中间件服务,Kafka默认端口9092,localhost:9092
    spring.kafka.bootstrap-servers = localhost:9092
    spring.kafka.consumer.group-id = sf-group
    spring.kafka.consumer.clientId = user-service-provider
    

    配置
    • Source : spring.cloud.stream.bindings.{source}.* • Sink : spring.cloud.stream.bindings.{sink}.*

    (5)用户消息监听器
    1,第一种实现方式:通过SubscribableChannel实现

    package com.xixi.spring.cloud12.service.provider.service;
    
    import com.xixi.spring.cloud12.server.api.UserService;
    import com.xixi.spring.cloud12.server.domain.User;
    import com.xixi.spring.cloud12.service.stream.UserMessage;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.SubscribableChannel;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.io.ByteArrayInputStream;
    import java.io.ObjectInputStream;
    
    import static com.xixi.spring.cloud12.service.stream.UserMessage.INPUT;
    
    /**
     * 用户消息-服务
     * 当接收到消息的时候,listen()和init()都是回调,是交替执行
     */
    @Service
    public class UserMessagingService {
    
        @Autowired
        private UserMessage userMessage;
    
        @Autowired
        @Qualifier("inMemoryUserService")
        private UserService userService;
    
        /**第一种实现方式:SubscribableChannel
         * 订阅者接收到消息以后存起来
         */
        @PostConstruct
        public void init(){
            SubscribableChannel subscribableChannel = userMessage.input();
            //消息订阅回调,消费消息
            subscribableChannel.subscribe(message -> {
                //这里的message body应该是字节流,因为我们从user-service-client传的user对象在序列化成字节流以后传过来的
                System.out.println(message);
                byte[] body = (byte[]) message.getPayload();//就是message body
                saveUser(body);
            });
    
        }
    
    
    
    
        private void saveUser(byte[] body){
            //反序列化后输出
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
            ObjectInputStream objectInputStream = null;
            try {
                objectInputStream = new ObjectInputStream(byteArrayInputStream);
                User user = (User) objectInputStream.readObject();
                System.out.println(user.getId()+","+user.getName());
                //保存user
                userService.saveUser(user);
    
    
            } catch (Exception e) {
                throw new RuntimeException();
            }
    
        }
    
    }
    
    

    2,第二种实现方式:
    • @ServiceActivator:监听器

        //第二种实现方式
        //input是channel的ID
        @ServiceActivator(inputChannel = INPUT)
        public void listen(byte[] data){
            System.out.println("listen from @ServiceActivator...");
            saveUser(data);
        }
    

    3,第三种实现方式:
    • @StreamListener:监听器

        //第三种实现方式:
        @StreamListener(INPUT)
        public void onMessage(byte[] data){
            System.out.println("listen from @StreamListener...");
            saveUser(data);
        }
    

    总结:
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.SubscribableChannel;
    三个callback交替执行,机会均等。
    这三种实现只用到了Kafka的destination,在配置文件中
    spring.cloud.stream.bindings.user-message.destination = sf-users
    spring cloud stream屏蔽了消息中间件的具体实现(这里是Kafka),将来如果你换成其他的中间件比如rabbitMQ,也不需要改太多的代码。
    spring cloud stream的优势:解耦

    七、整合rabbitMQ
    1,改造user-service-client,消息发送器(采用Stream binder的方式,用消息管道进行消息发送)
    (1)增加依赖org.springframework.cloud:spring-cloud-stream-binder-rabbit
    (2)配置发送源管道

    ###spring cloud stream集成rabbit配置
    ###destination指定 rabbit topic,user-message-out是管道名称
    spring.cloud.stream.bindings.user-message-out.destination = sf-users
    

    (3)增加用户消息输出源stream接口

    package com.xixi.spring.cloud12.server.user.ribbon.client.stream;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    import java.nio.channels.Channel;
    
    /**
     * 用户消息 Stream接口
     */
    public interface UserMessage {
    
    
        String OUTPUT= "user-message-out";
    
        @Output(value = OUTPUT)
        MessageChannel output();
    
    }
    

    (4)激活@EnableBinding

    @EnableBinding(UserMessage.class)
    public class UserServiceClientApplication
    

    (5)发送请求到rabbitmq

    @Autowired
        private UserMessage userMessage;
        @Autowired
        private ObjectMapper objectMapper;
    
        @PostMapping("/saveUserByRabbit")
        public boolean saveUserByRabbit(@RequestBody User user) throws JsonProcessingException {
    
            //topic是string,传输对象user是Object类型
            MessageChannel messageChannel = userMessage.output();
            //将user对象序列化成JSON
            String payload =objectMapper.writeValueAsString(user);
            GenericMessage<String> message = new GenericMessage<String>(payload);
            //发送消息到rabbitmq
            return messageChannel.send(message);
        }
    

    (6)启动rabbitmq
    ./rabbitmq-server

    重点
    user-service-client的channel:user-message-out
    user-service-provider的channel:user-message
    因为它们监听的同一个topic,所以客户端发送的消息能够被user-service-provider监听到。

    2,改造user-service-provider,消息接收器(采用Stream binder的方式)
    (1)替换Kafka的依赖为rabbitmq,
    依赖org.springframework.cloud:spring-cloud-stream-binder-rabbit

    (2)配置文件无需修改

    (3)修改UserMessagingService,因为这次传过来的消息是String,之前写的是byte[],所以加个处理方式。

    @Autowired
        private ObjectMapper objectMapper;
        
        /**第一种实现方式:SubscribableChannel
         * 订阅者接收到消息以后存起来
         */
        @PostConstruct
        public void init()throws IOException {
            SubscribableChannel subscribableChannel = userMessage.input();
            //消息订阅回调,消费消息
            subscribableChannel.subscribe(message -> {
                //这里的message body应该是字节流,因为我们从user-service-client传的user对象。在序列化成字节流以后传过来的
                System.out.println("listen from subscribableChannel...");
    
                MessageHeaders messageHeaders = message.getHeaders();
                String contentType = messageHeaders.get("contentType",String.class);
                if ("text/plain".equals(contentType)){
                    try {
                        saveUser2((String) message.getPayload());
                    } catch (IOException e) {
                        throw new RuntimeException();
                    }
                }else{
                    byte[] body = (byte[]) message.getPayload();//就是message body
                    saveUser(body);
                }
            });
        }
    

    八、使用spring cloud stream,不需要关注你使用的是哪种中间件,只关注topic即可。强烈解耦!!!!我们的项目现在既有RPC调用,也有远程消息服务。远程消息服务适用于重要不紧急的功能,重要紧急就RPC调用,不重要也不紧急可以考虑定时任务。

    九、问答
    1,消息的一致性和实时性如何取舍?
    消息是允许丢失的,与数据库有点不太一样,大多数消息中间件都有重试机制,同时消息不太强调实时性。比如聊天软件(spring boot聊天室websocket那节),看上去很快,其实不是实时的,是异步的。
    消息中间件有持久化,相对于内存型(消费完就完事,不会存储),更加有保障,它会有一个持久化的过程。

    2,spring cloud stream里面的消息分区是怎么实现的?
    spring cloud stream消息分区依赖于消息中间件,比如说Kafka有分区,partition,
    但是rabbitmq就没有分区。
    比如数据库的分区,同一个表分别存在3个库里,比如,前10000号在库1,中间10000号在库2,最后10000在库3,根据偏移量分区。

    一个请求过来的时候,从它的上下文中就应该知道它去访问哪个库,
    oracle中每个库挂不同的磁盘,并行io,可提高读写的速度

    3,如果我启动多个consumer进程,使用同样的consumer group进行实时聚合计算,动态增加consumer进程时,如何保证消费的消息仍在原来相同的consumer进程上?
    队列模式:如果生产消费模型是点对点,producer A发出message A专门给consumer A用的,producer B专门给consumer B用的,consumer B是消费不到message A。
    广播模式:如果是发布订阅者模式,一个消息有多个订阅者,一个producer A发出message A,有多个consumerA,consumerB,consumerC订阅者,大家都监听到这个消息,

    相关文章

      网友评论

          本文标题:spring cloud 12消息驱动

          本文链接:https://www.haomeiwen.com/subject/oildwqtx.html