本文分别从生产端和消费端分别说明
1.生产端优化
生产端通过如下提高并发和可靠性
-
设置大缓冲区100M
-
缓冲区延迟1s发送
-
缓冲区最大批量200000
-
发送端消息进行压缩
-
失败重试1000次
2.消费端优化(针对图片处理)
消费端通过如下方式进行优化
-
消息手动确认, 提高可靠性, 避免消息丢失
-
一次只拉去一个消息, 因为图片处理慢, 所以一次只拉去一个消息
-
拉取一次最大处理时间30min(保证图片处理时间充足)
-
避免reblance 超时时间5min, 心跳间隔3s, 超过5min没有心跳才reblance
-
任何情况必须ack消息
-
并发控制为5
-
kafka用户名&密码配置化, 去除秘钥文件
-
toptic 统一配置
#kafka基础配置,不要变动
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1
spring.kafka.producer.retries=1000
spring.kafka.producer.compression-type=gzip
spring.kafka.producer.properties.linger.ms=1000
spring.kafka.producer.properties.batch.size=200000
spring.kafka.producer.properties.max.block.ms=600000
spring.kafka.producer.properties.buffer.memory=100554432
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.interval.ms=1800000
spring.kafka.consumer.properties.rebalance.timeout.ms=300000
spring.kafka.consumer.properties.session.timeout.ms=300000
spring.kafka.consumer.properties.heartbeat.interval.ms=3000
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.request.timeout.ms=301000
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.listener.concurrency=5
#kafka账号
spring.kafka.bootstrap-servers=172.16.97.161:2093
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";
#kafka topic
spring.topics.imageTransformMessage.topic=image_transform_message_prod
spring.topics.imageTransformMessage.group=image_transform_message_prod
spring.topics.imageTransformResult.topic=image_transform_result_prod
spring.topics.imageTransformResult.group=image_transform_result_prod
3.代码实例
3.1发送端代码
@Slf4j
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${spring.topics.imageTransformResult.topic}")
private String topic;
/**
* 相同的key发到同一个partition
* <p>
* 由于kafka是根据key的hash值取模去分的partition 导致肯能分布不均,所以此处随机去partition的值
* @param key
* @param data
* @param <T>
* @return
*/
public <T> boolean sendMessage(String key, T data) {
String jsonData = JSONObject.toJSONString(data);
UUID uuid = UUID.randomUUID();
String suuid = StringUtils.remove(uuid.toString(), "-");
try {
int partitionSize = kafkaTemplate.partitionsFor(topic).size();
int randomPartition = (int) (System.currentTimeMillis() % partitionSize);
Header header = new RecordHeader("UUID", suuid.getBytes());
ProducerRecord producerRecord = new ProducerRecord(topic, randomPartition, key, jsonData, Arrays.asList(header));
log.info("begin send key {}, data {}, uuid {}", key, data, suuid);
kafkaTemplate.send(producerRecord);
log.info("after send uuid {}", suuid);
return true;
} catch (Exception e) {
log.error("sendMessage error, suuid {}, key {}, data {}", suuid, key, jsonData, e);
String message = "商品同步kafka消息发送失败:" + suuid;
return false;
}
}
}
3.2消费端代码
@Slf4j
@Component
public class KafkaConsumer {
@Autowired
private ImageBiz imageBiz;
@Autowired
private KafkaProducer kafkaProducer;
@KafkaListener(topics = "#{'${spring.topics.imageTransformMessage.topic}'}", groupId = "#{'${spring.topics.imageTransformMessage.group}'}")
public void processMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
String key = record.key();
String data = record.value();
log.info("kafka receive message, key {}, data {}", key, data);
if (BaseUtil.isEmpty(data)) {
acknowledgment.acknowledge();
return;
}
acknowledgment.acknowledge();
} catch (Exception e) {
String suuid = MDC.get("UUID") == null ? "" : MDC.get("UUID");
log.error("消费消息异常,请线上查找原因: {}", suuid, e);
}
}
}
4.消费端场景
kafka消费端主要有两种场景
-
消息数量不多, 但是处理每一消息的时间比较长
-
消息数量很多, 处理每一个消息的时间很短
场景1
和上面介绍的图片处理类似, 每次拉去少量消息, 给消息处理留足够时间
场景2
场景2可以转化为场景1, 把原来的1000个消息组织为一个消息, 批量处理
如果发送方很分散, 并且只能一个个的发消息, 可以使用批量监听消息
配置修改
#一次拉取1000消息
spring.kafka.consumer.max-poll-records=1000
#批量消费模式
spring.kafka.listener.type=batch
消费端代码
@Slf4j
@Component
public class KafkaConsumer {
@Autowired
private ImageBiz imageBiz;
@Autowired
private KafkaProducer kafkaProducer;
@KafkaListener(topics = "#{'${spring.topics.imageTransformMessage.topic}'}", groupId = "#{'${spring.topics.imageTransformMessage.group}'}")
public void processMessage(List<ConsumerRecord<String, String>> record, Acknowledgment acknowledgment) {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
acknowledgment.acknowledge();
} catch (Exception e) {
String suuid = MDC.get("UUID") == null ? "" : MDC.get("UUID");
log.error("消费消息异常,请线上查找原因: {}", suuid, e);
}
}
}
参考:
网友评论