美文网首页Java高级进阶
使用Spring Request-Reply实现基于Kafka的

使用Spring Request-Reply实现基于Kafka的

作者: Java大生 | 来源:发表于2018-10-11 16:45 被阅读3次

    大家提到Kafka时第一印象就是它是一个快速的异步消息处理系统,不同于通常tomcat之类应用服务器和前端之间的请求/响应方式请求,客户端发出一个请求,必然会等到一个响应,这种方式对Kafka来说并不自然,Kafka是一种事件驱动方式,事件激活然后响应,这种方式对很多人接受起来不方便,为了实现请求 - 响应模型,开发人员必须在消息的生产者记录中构建相关ID系统,并将其与消息的消费者记录中的ID进行匹配,找到那个请求ID再使用Kafka的一个队列进行回复。

    下图是本案例的演示架构图,这个案例是以同步行为返回两个数字总和的结果。

    客户端  --->请求---> RESTcontroll ---> Spring replying kafka 模板 -->Kafka的请求主题 -->Spring Kafka监听器

      |                                                                                        |

      |<----- 响应 <----RESTcontroll <-- Spring replying kafka 模板 <-- Kafka的响应主题<---------|

    下面我们开始看看开发这个演示步骤:

    设置Springboot启动类

    首先需要在pom.xml引入Spring kafka模板:

      <dependency>

            <groupId>org.springframework.kafka</groupId>

            <artifactId>spring-kafka</artifactId>

        </dependency>

    代码如下:

    @SpringBootApplication

    public class RequestReplyKafkaApplication {

      public static void main(String[] args) {

        SpringApplication.run(RequestReplyKafkaApplication.class, args);

      }

    }

    设置Spring ReplyingKafkaTemplate

    我们需要在Springboot配置类的KafkaConfig对Spring kafka模板进行配置:

    @Configuration

    public class KafkaConfig {

    在这个配置类中,我们需要配置核心的ReplyingKafkaTemplate类,这个类继承了 KafkaTemplate 提供请求/响应的的行为;还有一个生产者工厂(参见 ProducerFactory 下面的代码)和 KafkaMessageListenerContainer。这是最基本的设置,因为请求响应模型需要对应到消息生产者和消费者的行为。

    // 这是核心的ReplyingKafkaTemplate

    @Bean

    public ReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf, KafkaMessageListenerContainer<String, Model> container) {

      return new ReplyingKafkaTemplate<>(pf, container);

    }

    // 配件:监听器容器Listener Container to be set up in ReplyingKafkaTemplate

    @Bean

    public KafkaMessageListenerContainer<String, Model> replyContainer(ConsumerFactory<String, Model> cf) {

      ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);

      return new KafkaMessageListenerContainer<>(cf, containerProperties);

    }

    // 配件:生产者工厂Default Producer Factory to be used in ReplyingKafkaTemplate

    @Bean

    public ProducerFactory<String,Model> producerFactory() {

      return new DefaultKafkaProducerFactory<>(producerConfigs());

    }

    // 配件:kafka生产者的Kafka配置Standard KafkaProducer settings - specifying brokerand serializer

    @Bean

    public Map<String, Object> producerConfigs() {

      Map<String, Object> props = new HashMap<>();

      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,

                bootstrapServers);

      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

                StringSerializer.class);

      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

      return props;

    }

    设置spring-Kafka的监听器

    这与通常创建的Kafka消费者相同。唯一的变化是额外是在工厂中设置ReplyTemplate,这是必须的,因为消费者需要将计算结果放入到Kafka的响应主题。

    //消费者工厂 Default Consumer Factory

    @Bean

    public ConsumerFactory<String, Model> consumerFactory() {

      return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Model.class));

    }

    // 并发监听器容器Concurrent Listner container factory

    @Bean

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Model>> kafkaListenerContainerFactory() {

      ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();

      factory.setConsumerFactory(consumerFactory());

      // NOTE - set up of reply template 设置响应模板

      factory.setReplyTemplate(kafkaTemplate());

      return factory;

    }

    // Standard KafkaTemplate

    @Bean

    public KafkaTemplate<String, Model> kafkaTemplate() {

      return new KafkaTemplate<>(producerFactory());

    }

    编写我们的kafka消费者

    这是过去创建的Kafka消费者一样。唯一的变化是附加了@SendTo注释,此注释用于在响应主题上返回业务结果。

    @KafkaListener(topics = "${kafka.topic.request-topic}")

    @SendTo

    public Model listen(Model request) throws InterruptedException {

      int sum = request.getFirstNumber() + request.getSecondNumber();

      request.setAdditionalProperty("sum", sum);

      return request;

    }

    这个消费者用于业务计算,把客户端通过请求传入的两个数字进行相加,然后返回这个请求,通过@SendTo发送到Kafka的响应主题。

    总结服务

    现在,让我们将所有这些都结合在一起放在RESTcontroller,步骤分为几步,先创建生产者记录,并在记录头部中设置接受响应的Kafka主题,这样

    把请求和响应在Kafka那里对应起来,然后通过模板发布消息到Kafka,再通过future.get()堵塞等待Kafka的响应主题发送响应结果过来。这时再

    打印结果记录中的头部信息,会看到Spring自动生成相关ID。

    @ResponseBody

    @PostMapping(value="/sum",produces=MediaType.APPLICATION_JSON_VALUE,consumes=MediaType.APPLICATION_JSON_VALUE)

    public  Model  sum(@RequestBody  Model  request)throws InterruptedException,ExecutionException {

      //创建生产者记录

      ProducerRecord<String,Model>  record  = new ProducerRecord<String,Model>(requestTopic,request);

      //在记录头部中设置响应主题

      record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));

      //发布到kafka主题中

      RequestReplyFuture<String, Model, Model> sendAndReceive = kafkaTemplate.sendAndReceive(record);

      //确认生产者是否成功生产

      SendResult<String, Model> sendResult = sendAndReceive.getSendFuture().get();

      //打印结果记录中所有头部信息 会看到Spring自动生成的相关ID,这个ID是由消费端@SendTo 注释返回的值。

    sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));

      //获取消费者记录

      ConsumerRecord<String, Model> consumerRecord = sendAndReceive.get();

      //返回消费者结果

      return consumerRecord.value();

    }

    并发消费者

    即使你要创建请求主题在三个分区中,三个并发的消费者的响应仍然合并到一个Kafka响应主题,这样,Spring侦听器的容器能够完成匹配相关ID的繁重工作。

    整个请求/响应的模型是一致的。

    现在我们可以再修改启动类如下:

    @ComponentScan(basePackages = {

            "com.gauravg.config",

            "com.gauravg.consumer",

            "com.gauravg.controller",

            "com.gauravg.model"

        })

    @SpringBootApplication

    public class RequestReplyKafkaApplication {

      public static void main(String[] args) {

        SpringApplication.run(RequestReplyKafkaApplication.class, args);

      }

    }

    post数据:

    {

      "firstNumber": "111",

      "secondNumber": "2222"

    }

    返回结果是:

    {

        "firstNumber": 111,

        "secondNumber": 2222,

        "sum": 2333

    }

    在控制台输出记录头部信息:

    kafka_replyTopic:[B@1f59b198

    kafka_correlationId:[B@356a7326

    __TypeId__:[B@1a9111f

    可见,Spring自动生成聚合ID(correlationId),无需我们自己手工比对了。

    欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 855835163

    群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

    相关文章

      网友评论

        本文标题:使用Spring Request-Reply实现基于Kafka的

        本文链接:https://www.haomeiwen.com/subject/pjwbaftx.html