美文网首页
Springboot搭建kafka生产消费Demo

Springboot搭建kafka生产消费Demo

作者: 会飞的猪_password | 来源:发表于2019-12-20 11:20 被阅读0次

    网上有很多实例,但没有一个能完整跑的通的,本人亲自搭建了一个

    先贴出配置配置文件

    image.png
    spring:
      kafka:
        topic.default: test_kafka_wth
        packages: com.wonders.cz.sms
        producer:
          servers: 127.0.0.1:9092
          retries: 0 # retries = MAX 无限重试,直到你意识到出现了问题:
          batch.size: 16384 # producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
          linger: 1 # 延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,
          buffer.memory: 33554432 # producer可以用来缓存数据的内存大小。
        consumer:
          zookeeper.connect: 127.0.0.1:2181
          servers: 127.0.0.1:9092
          enable.auto.commit: true
          session.timeout: 30000
          auto.commit.interval: 1000
          auto.offset.reset: latest
          group.id: test
          concurrency: 10
    

    SmsApplication.java

    @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
    @ComponentScan(basePackages = {"com.cz.**"})
    public class SmsApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SmsApplication.class, args);
        }
    
    }
    

    MessageEntity.java

    /**
     * 消息通知表
     */
    @TableName("cz_message")
    @JsonInclude(JsonInclude.Include.NON_NULL)
    public class MessageEntity extends RequestListEntity implements Serializable {
        private static final long serialVersionUID = 1L;
    
        /**
         * 消息通知id
         */
        @TableId(type = IdType.INPUT)
        @ApiModelProperty("消息通知id,主键")
        @NotBlank(groups = {VG.Get.class, VG.Delete.class, VG.Update.class})
        private String id;
        /**
         * 消息发送者组织机构id
         */
        @ApiModelProperty("消息发送者组织机构id")
        @NotBlank(groups = VG.Add.class)
        private String sendOrganizationId;
        /**
         * 消息发送者id
         */
        @ApiModelProperty("消息发送者id")
        @NotBlank(groups = VG.Add.class)
        private String sendUserId;
        /**
         * 消息摘要
         */
        @ApiModelProperty("消息摘要")
        @NotBlank(groups = VG.Add.class)
        private String messageDigest;
        /**
         * 消息内容
         */
        @ApiModelProperty("消息内容")
        @NotBlank(groups = VG.Add.class)
        private String messageContent;
        /**
         * 消息状态 0-发送中,1-已读 2-未读
         */
        @ApiModelProperty("消息状态")
        @NotNull(groups = VG.Add.class)
        private Integer status;
        /**
         * 消息接收者组织机构id
         */
        @ApiModelProperty("消息接收者组织机构id")
        @NotBlank(groups = VG.Add.class)
        private String receiveOrganizationId;
        /**
         * 创建时间
         */
        @ApiModelProperty("创建时间")
        private Date createTime;
        /**
         * 修改时间
         */
        @ApiModelProperty("修改时间")
        private Date modifyTime;
        /**
         * 消息发送者组织机构名
         */
        @TableField(exist = false)
        private String organizationName;
    
        /**
         * 消息接收者组织机构名
         */
        @TableField(exist = false)
        private String receiveOrganizationName;
        /**
         * 消息发送者名
         */
        @TableField(exist = false)
        private String SendUserName;
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getId() {
            return id;
        }
    
        public void setSendOrganizationId(String sendOrganizationId) {
            this.sendOrganizationId = sendOrganizationId;
        }
    
        public String getSendOrganizationId() {
            return sendOrganizationId;
        }
    
        public void setSendUserId(String sendUserId) {
            this.sendUserId = sendUserId;
        }
    
        public String getSendUserId() {
            return sendUserId;
        }
    
        public void setMessageDigest(String messageDigest) {
            this.messageDigest = messageDigest;
        }
    
        public String getMessageDigest() {
            return messageDigest;
        }
    
        public void setMessageContent(String messageContent) {
            this.messageContent = messageContent;
        }
    
        public String getMessageContent() {
            return messageContent;
        }
    
        public void setStatus(Integer status) {
            this.status = status;
        }
    
        public Integer getStatus() {
            return status;
        }
    
        public void setReceiveOrganizationId(String receiveOrganizationId) {
            this.receiveOrganizationId = receiveOrganizationId;
        }
    
        public String getReceiveOrganizationId() {
            return receiveOrganizationId;
        }
    
        public void setCreateTime(Date createTime) {
            this.createTime = createTime;
        }
    
        public Date getCreateTime() {
            return createTime;
        }
    
        public void setModifyTime(Date modifyTime) {
            this.modifyTime = modifyTime;
        }
    
        public Date getModifyTime() {
            return modifyTime;
        }
    
        public String getSendUserName() {
            return SendUserName;
        }
    
        public void setSendUserName(String sendUserName) {
            SendUserName = sendUserName;
        }
    
        public String getOrganizationName() {
            return organizationName;
        }
    
        public void setOrganizationName(String organizationName) {
            this.organizationName = organizationName;
        }
    
        public String getReceiveOrganizationName() {
            return receiveOrganizationName;
        }
    
        public void setReceiveOrganizationName(String receiveOrganizationName) {
            this.receiveOrganizationName = receiveOrganizationName;
        }
    }
    
    

    ErrorCode.java

    public class ErrorCode {
        public final static int SUCCESS = 200;
        public final static int EXCEPTION = 500;
    }
    

    Response.java

    @Getter
    @Setter
    public class Response {
        private int code;
        private String message;
    
        public Response(int code, String message) {
            this.code = code;
            this.message = message;
        }
    }
    

    KafkaConsumerConfig.java

    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Value("${spring.kafka.consumer.servers}")
        private String servers;
        @Value("${spring.kafka.consumer.enable.auto.commit}")
        private boolean enableAutoCommit;
        @Value("${spring.kafka.consumer.session.timeout}")
        private String sessionTimeout;
        @Value("${spring.kafka.consumer.auto.commit.interval}")
        private String autoCommitInterval;
        @Value("${spring.kafka.consumer.group.id}")
        private String groupId;
        @Value("${spring.kafka.consumer.auto.offset.reset}")
        private String autoOffsetReset;
        @Value("${spring.kafka.consumer.concurrency}")
        private int concurrency;
    
    
        @Bean
        //个性化定义消费者
        public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
            //指定使用DefaultKafkaConsumerFactory
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactory);
            factory.setConcurrency(concurrency);
            factory.getContainerProperties().setPollTimeout(1500);
            return factory;
        }
    
        /**
         * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
         * @return
         */
        @Bean
        public DefaultKafkaConsumerFactory consumerFactory(){
            return new DefaultKafkaConsumerFactory(consumerConfigs());
        }
    
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            return propsMap;
        }
    }
    
    

    KafkaProducerConfig.java

    
    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    
        @Value("${spring.kafka.producer.servers}")
        private String servers;
        @Value("${spring.kafka.producer.retries}")
        private int retries;
        @Value("${spring.kafka.producer.batch.size}")
        private int batchSize;
        @Value("${spring.kafka.producer.linger}")
        private int linger;
        @Value("${spring.kafka.producer.buffer.memory}")
        private int bufferMemory;
    
    
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
        public ProducerFactory<String, MessageEntity> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs(),
                    new StringSerializer(),
                    new JsonSerializer<>());
        }
    
        @Bean
        public KafkaTemplate<String, MessageEntity> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }
    

    SimpleConsumer.java

    @Slf4j
    @Component
    public class SimpleConsumer {
        protected final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
        private final Gson gson = new Gson();
        @KafkaListener(topics = "${spring.kafka.topic.default}")
        public void receive(ConsumerRecord<?, ?> record) {
            logger.info("----------------------------------kafka的offset: " + record.offset());
            logger.info("----------------------------------kafka的value: " + record.value().toString());
            // JSONArray object = JSON.parseArray(record.value().toString());
    
        }
    }
    

    ProduceController.java

    
    @Slf4j
    @RestController
    @RequestMapping("/kafka")
    public class ProduceController {
        @Autowired
        private SimpleProducer simpleProducer;
    
        @Value("${spring.kafka.topic.default}")
        private String topic;
    
        private Gson gson = new Gson();
    
        @RequestMapping(value = "/hello", method = RequestMethod.GET, produces = {"application/json"})
        public Response sendKafka() {
            return new Response(ErrorCode.SUCCESS, "OK");
        }
    
    
        @RequestMapping(value = "/send", method = RequestMethod.POST, produces = {"application/json"})
        public Response sendKafka(@RequestBody MessageEntity message) {
            try {
                simpleProducer.send(topic, "key", message);
                log.info("发送kafka成功.");
                return new Response(ErrorCode.SUCCESS, "发送kafka成功");
            } catch (Exception e) {
                log.error("发送kafka失败", e);
                return new Response(ErrorCode.EXCEPTION, "发送kafka失败");
            }
        }
    
    }
    
    

    ProducerCallback.java

    
    @Slf4j
    public class ProducerCallback implements ListenableFutureCallback<SendResult<String, MessageEntity>> {
    
        private final long startTime;
        private final String key;
        private final MessageEntity message;
    
        private final Gson gson = new Gson();
    
        public ProducerCallback(long startTime, String key, MessageEntity message) {
            this.startTime = startTime;
            this.key = key;
            this.message = message;
        }
    
    
        @Override
        public void onSuccess(@Nullable SendResult<String, MessageEntity> result) {
            if (result == null) {
                return;
            }
            long elapsedTime = System.currentTimeMillis() - startTime;
    
            RecordMetadata metadata = result.getRecordMetadata();
            if (metadata != null) {
                StringBuilder record = new StringBuilder();
                record.append("message(")
                        .append("key = ").append(key).append(",")
                        .append("message = ").append(gson.toJson(message)).append(")")
                        .append("sent to partition(").append(metadata.partition()).append(")")
                        .append("with offset(").append(metadata.offset()).append(")")
                        .append("in ").append(elapsedTime).append(" ms");
                log.info(record.toString());
            }
        }
    
        @Override
        public void onFailure(Throwable ex) {
            ex.printStackTrace();
        }
    }
    
    

    SimpleProducer.java

    
    @Component
    public class SimpleProducer {
    
        @Autowired
        @Qualifier("kafkaTemplate")
        private KafkaTemplate<String, MessageEntity> kafkaTemplate;
    
        public void send(String topic, MessageEntity message) {
            kafkaTemplate.send(topic, message);
        }
    
        public void send(String topic, String key, MessageEntity entity) {
            ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(
                    topic,
                    key,
                    entity);
    
            long startTime = System.currentTimeMillis();
    
            ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
            future.addCallback(new ProducerCallback(startTime, key, entity));
        }
    
    }
    
    

    使用postman可以这样子测

    image.png

    控制台打印就是这样

    2019-12-20 11:18:34.327  INFO 29780 --- [nio-8055-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
    2019-12-20 11:18:34.327  INFO 29780 --- [nio-8055-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 2a121f7b1d402825
    2019-12-20 11:18:34.372  INFO 29780 --- [nio-8055-exec-2] c.w.cz.sms.controller.ProduceController  : 发送kafka成功.
    2019-12-20 11:18:34.402  INFO 29780 --- [ntainer#0-0-C-1] c.w.cz.sms.consumer.SimpleConsumer       : ----------------------------------kafka的offset: 30
    2019-12-20 11:18:34.402  INFO 29780 --- [ntainer#0-0-C-1] c.w.cz.sms.consumer.SimpleConsumer       : ----------------------------------kafka的value: {"messageDigest":"a卡夫a卡数据测试4水水水水6666","messageContent":"数据体66哇哇哇哇6666"}
    2019-12-20 11:18:34.406  INFO 29780 --- [ad | producer-1] c.w.cz.sms.producer.ProducerCallback     : message(key = key,message = {"messageDigest":"a卡夫a卡数据测试4水水水水6666","messageContent":"数据体66哇哇哇哇6666"})sent to partition(0)with offset(30)in 87 ms
    
    

    技术交流或有疑问请留言,觉得好了点个赞!

    相关文章

      网友评论

          本文标题:Springboot搭建kafka生产消费Demo

          本文链接:https://www.haomeiwen.com/subject/zfbbnctx.html