美文网首页
聊聊如何实现一个带幂等模板的Kafka消费者

聊聊如何实现一个带幂等模板的Kafka消费者

作者: linyb极客之路 | 来源:发表于2022-05-24 09:42 被阅读0次

    前言

    不知道大家有没有这样的体验,你跟你团队的成员,宣导一些开发时注意事项,比如在使用消息队列时,在消费端处理消息时,需根据业务场景,考虑一下幂等。后面走查代码的时,会发现一些资浅的开发,在需要幂等判断的场景的情况下,仍然没做幂等判断。既然宣导无效,就干脆实现一个带幂等模板的消费者,然后开发基于这个模板进行消费端业务处理。本文就以spring-kafka举例,聊聊如何实现一个带幂等模板的kafka消费者

    实现步骤

    1、kafka自动提交改为手动提交

    spring:
        kafka:
            consumer:
                #  是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
                enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}
    
    

    2、定义消费端模板抽象基类

    @Slf4j
    public abstract class BaseComusmeListener {
    
        @KafkaHandler
        public final void receive(@Payload String data, @Header(value = KafkaHeaders.RECEIVED_TOPIC,required = false) String receivedTopic,
                                  @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY,required = false) String receivedMessageKey, @Header(value = KafkaHeaders.RECEIVED_TIMESTAMP,required = false) long receivedTimestamp, Acknowledgment ack){
            KafkaComsumePayLoad kafkaComsumePayLoad = buildKafkaComsumePayLoad(data,receivedTimestamp,receivedTopic,receivedMessageKey);
    
            boolean isRepeateConsume = isRepeateConsume(kafkaComsumePayLoad);
            if(isRepeateConsume){
                log.warn("messageKey:【{}】,topic:【{}】存在重复消息数据-->【{}】",receivedMessageKey,receivedTopic,data);
                //手工确认
                ack.acknowledge();
                return;
            }
    
            if(doBiz(kafkaComsumePayLoad)){
                //手工确认
                ack.acknowledge();
            }
    
    
        }
    
        /**
         * 是否重复消费
         * @param kafkaComsumePayLoad
         * @return
         */
        public abstract boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad);
    
        /**
         * 业务处理
         * @param kafkaComsumerPayLoad
         */
        public abstract boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad);
    
    
        private KafkaComsumePayLoad buildKafkaComsumePayLoad(String data, long receivedTimestamp, String receivedTopic, String receivedMessageKey){
            return KafkaComsumePayLoad.builder()
                    .data(data)
                    .receivedTimestamp(receivedTimestamp)
                    .receivedTopic(receivedTopic)
                    .receivedMessageKey(receivedMessageKey)
                    .build();
        }
    }
    

    3、自定义监听注解【可选】

    @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @KafkaListener
    @Documented
    @Component
    public @interface LybGeekKafkaListener {
    
        @AliasFor(annotation = KafkaListener.class, attribute = "id")
        String id() default "";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "containerFactory")
        String containerFactory() default "";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "topics")
        String[] topics() default {};
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "topicPattern")
        String topicPattern() default "";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "topicPartitions")
        TopicPartition[] topicPartitions() default {};
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "containerGroup")
        String containerGroup() default "";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "errorHandler")
        String errorHandler() default "";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "groupId")
        String groupId() default "";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "idIsGroup")
        boolean idIsGroup() default true;
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "clientIdPrefix")
        String clientIdPrefix() default "";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "beanRef")
        String beanRef() default "__listener";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
        String concurrency() default "";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "autoStartup")
        String autoStartup() default "";
    
    
        @AliasFor(annotation = KafkaListener.class, attribute = "properties")
        String[] properties() default {};
    
    
        @AliasFor(annotation = Component.class, attribute = "value")
        String value() default "";
    
    }
    
    

    3、重写KafkaListener注解后置处理器【可选】

    注: 因示例项目的springboot版本比较低,直接使用@LybGeekKafkaListener不起作用

    public class LybGeekKafkaListenerAnnotationBeanPostProcessor<K, V>
            implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
    
        private static final String GENERATED_ID_PREFIX = "org.springframework.kafka.KafkaListenerEndpointContainer#";
    
        /**
         * The bean name of the default {@link KafkaListenerContainerFactory}.
         */
        public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";
    
        private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
    
        private final Log logger = LogFactory.getLog(getClass());
    
        private final ListenerScope listenerScope = new ListenerScope();
    
        private KafkaListenerEndpointRegistry endpointRegistry;
    
        private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;
    
        private DefaultListableBeanFactory beanFactory;
    
        private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
                new KafkaHandlerMethodFactoryAdapter();
    
        private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
    
        private final AtomicInteger counter = new AtomicInteger();
    
        private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();
    
        private BeanExpressionContext expressionContext;
    
        private Charset charset = StandardCharsets.UTF_8;
    
        @Override
        public int getOrder() {
            return LOWEST_PRECEDENCE;
        }
    
        /**
         * Set the {@link KafkaListenerEndpointRegistry} that will hold the created
         * endpoint and manage the lifecycle of the related listener container.
         * @param endpointRegistry the {@link KafkaListenerEndpointRegistry} to set.
         */
        public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {
            this.endpointRegistry = endpointRegistry;
        }
    
        /**
         * Set the name of the {@link KafkaListenerContainerFactory} to use by default.
         * <p>If none is specified, "kafkaListenerContainerFactory" is assumed to be defined.
         * @param containerFactoryBeanName the {@link KafkaListenerContainerFactory} bean name.
         */
        public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) {
            this.defaultContainerFactoryBeanName = containerFactoryBeanName;
        }
    
        /**
         * Set the {@link MessageHandlerMethodFactory} to use to configure the message
         * listener responsible to serve an endpoint detected by this processor.
         * <p>By default, {@link DefaultMessageHandlerMethodFactory} is used and it
         * can be configured further to support additional method arguments
         * or to customize conversion and validation support. See
         * {@link DefaultMessageHandlerMethodFactory} Javadoc for more details.
         * @param messageHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance.
         */
        public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) {
            this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
        }
    
        /**
         * Making a {@link BeanFactory} available is optional; if not set,
         * {@link KafkaListenerConfigurer} beans won't get autodetected and an
         * {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
         * @param beanFactory the {@link BeanFactory} to be used.
         */
        @Override
        public void setBeanFactory(BeanFactory beanFactory) {
            this.beanFactory = (DefaultListableBeanFactory) beanFactory;
            if (beanFactory instanceof ConfigurableListableBeanFactory) {
                this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
                this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
                        this.listenerScope);
            }
        }
    
        /**
         * Set a charset to use when converting byte[] to String in method arguments.
         * Default UTF-8.
         * @param charset the charset.
         * @since 2.2
         */
        public void setCharset(Charset charset) {
            Assert.notNull(charset, "'charset' cannot be null");
            this.charset = charset;
        }
    
        @Override
        public void afterSingletonsInstantiated() {
            this.registrar.setBeanFactory(this.beanFactory);
    
            if (this.beanFactory instanceof ListableBeanFactory) {
                Map<String, KafkaListenerConfigurer> instances =
                        ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
                for (KafkaListenerConfigurer configurer : instances.values()) {
                    configurer.configureKafkaListeners(this.registrar);
                }
            }
    
            if (this.registrar.getEndpointRegistry() == null) {
                if (this.endpointRegistry == null) {
                    Assert.state(this.beanFactory != null,
                            "BeanFactory must be set to find endpoint registry by bean name");
                    this.endpointRegistry = this.beanFactory.getBean(
                            KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                            KafkaListenerEndpointRegistry.class);
                }
                this.registrar.setEndpointRegistry(this.endpointRegistry);
            }
    
            if (this.defaultContainerFactoryBeanName != null) {
                this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
            }
    
            // Set the custom handler method factory once resolved by the configurer
            MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
            if (handlerMethodFactory != null) {
                this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
            }
            else {
                addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
            }
    
            // Actually register all listeners
            this.registrar.afterPropertiesSet();
    
            beanFactory.removeBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME);
    
        }
    
    
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
            return bean;
        }
    
        @Override
        public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
            if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
                Class<?> targetClass = AopUtils.getTargetClass(bean);
                Collection<LybGeekKafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
                final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
                final List<Method> multiMethods = new ArrayList<>();
                Map<Method, Set<LybGeekKafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                        (MethodIntrospector.MetadataLookup<Set<LybGeekKafkaListener>>) method -> {
                            Set<LybGeekKafkaListener> listenerMethods = findListenerAnnotations(method);
                            return (!listenerMethods.isEmpty() ? listenerMethods : null);
                        });
                if (hasClassLevelListeners) {
                    Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                            (ReflectionUtils.MethodFilter) method ->
                                    AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                    multiMethods.addAll(methodsWithHandler);
                }
                if (annotatedMethods.isEmpty()) {
                    this.nonAnnotatedClasses.add(bean.getClass());
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("No @LybGeekKafkaListener annotations found on bean type: " + bean.getClass());
                    }
                }
                else {
                    // Non-empty set of methods
                    for (Map.Entry<Method, Set<LybGeekKafkaListener>> entry : annotatedMethods.entrySet()) {
                        Method method = entry.getKey();
                        for (LybGeekKafkaListener listener : entry.getValue()) {
                            processKafkaListener(listener, method, bean, beanName);
                        }
                    }
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug(annotatedMethods.size() + " @LybGeekKafkaListener methods processed on bean '"
                                + beanName + "': " + annotatedMethods);
                    }
                }
                if (hasClassLevelListeners) {
                    processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
                }
            }
            return bean;
        }
    
        /*
         * AnnotationUtils.getRepeatableAnnotations does not look at interfaces
         */
        private Collection<LybGeekKafkaListener> findListenerAnnotations(Class<?> clazz) {
            Set<LybGeekKafkaListener> listeners = new HashSet<>();
            LybGeekKafkaListener ann = AnnotationUtils.findAnnotation(clazz, LybGeekKafkaListener.class);
            if (ann != null) {
                listeners.add(ann);
            }
    
            return listeners;
        }
    
        /*
         * AnnotationUtils.getRepeatableAnnotations does not look at interfaces
         */
        private Set<LybGeekKafkaListener> findListenerAnnotations(Method method) {
            Set<LybGeekKafkaListener> listeners = new HashSet<>();
            LybGeekKafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, LybGeekKafkaListener.class);
            if (ann != null) {
                listeners.add(ann);
            }
    
            return listeners;
        }
    
        private void processMultiMethodListeners(Collection<LybGeekKafkaListener> classLevelListeners, List<Method> multiMethods,
                Object bean, String beanName) {
    
            List<Method> checkedMethods = new ArrayList<>();
            Method defaultMethod = null;
            for (Method method : multiMethods) {
                Method checked = checkProxy(method, bean);
                KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
                if (annotation != null && annotation.isDefault()) {
                    final Method toAssert = defaultMethod;
                    Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
                            + toAssert.toString() + " and " + method.toString());
                    defaultMethod = checked;
                }
                checkedMethods.add(checked);
            }
            for (LybGeekKafkaListener classLevelListener : classLevelListeners) {
                MultiMethodKafkaListenerEndpoint<K, V> endpoint =
                        new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
                processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
            }
        }
    
        protected void processKafkaListener(LybGeekKafkaListener kafkaListener, Method method, Object bean, String beanName) {
            Method methodToUse = checkProxy(method, bean);
            MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
            endpoint.setMethod(methodToUse);
            processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
        }
    
        private Method checkProxy(Method methodArg, Object bean) {
            Method method = methodArg;
            if (AopUtils.isJdkDynamicProxy(bean)) {
                try {
                    // Found a @LybGeekKafkaListener method on the target class for this JDK proxy ->
                    // is it also present on the proxy itself?
                    method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
                    Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
                    for (Class<?> iface : proxiedInterfaces) {
                        try {
                            method = iface.getMethod(method.getName(), method.getParameterTypes());
                            break;
                        }
                        catch (NoSuchMethodException noMethod) {
                        }
                    }
                }
                catch (SecurityException ex) {
                    ReflectionUtils.handleReflectionException(ex);
                }
                catch (NoSuchMethodException ex) {
                    throw new IllegalStateException(String.format(
                            "@LybGeekKafkaListener method '%s' found on bean target class '%s', " +
                                    "but not found in any interface(s) for bean JDK proxy. Either " +
                                    "pull the method up to an interface or switch to subclass (CGLIB) " +
                                    "proxies by setting proxy-target-class/proxyTargetClass " +
                                    "attribute to 'true'", method.getName(),
                            method.getDeclaringClass().getSimpleName()), ex);
                }
            }
            return method;
        }
    
        protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, LybGeekKafkaListener kafkaListener,
                Object bean, Object adminTarget, String beanName) {
    
            String beanRef = kafkaListener.beanRef();
            if (StringUtils.hasText(beanRef)) {
                this.listenerScope.addListener(beanRef, bean);
            }
            endpoint.setBean(bean);
            endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
            endpoint.setId(getEndpointId(kafkaListener));
            endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
            endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
            endpoint.setTopics(resolveTopics(kafkaListener));
            endpoint.setTopicPattern(resolvePattern(kafkaListener));
            endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
            String group = kafkaListener.containerGroup();
            if (StringUtils.hasText(group)) {
                Object resolvedGroup = resolveExpression(group);
                if (resolvedGroup instanceof String) {
                    endpoint.setGroup((String) resolvedGroup);
                }
            }
            String concurrency = kafkaListener.concurrency();
            if (StringUtils.hasText(concurrency)) {
                endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
            }
            String autoStartup = kafkaListener.autoStartup();
            if (StringUtils.hasText(autoStartup)) {
                endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
            }
            resolveKafkaProperties(endpoint, kafkaListener.properties());
    
            KafkaListenerContainerFactory<?> factory = null;
            String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
            if (StringUtils.hasText(containerFactoryBeanName)) {
                Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
                try {
                    factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
                }
                catch (NoSuchBeanDefinitionException ex) {
                    throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
                            + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
                            + " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
                }
            }
    
            endpoint.setBeanFactory(this.beanFactory);
            String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
            if (StringUtils.hasText(errorHandlerBeanName)) {
                endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
            }
            this.registrar.registerEndpoint(endpoint, factory);
            if (StringUtils.hasText(beanRef)) {
                this.listenerScope.removeListener(beanRef);
            }
        }
    
        private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {
            if (propertyStrings.length > 0) {
                Properties properties = new Properties();
                for (String property : propertyStrings) {
                    String value = resolveExpressionAsString(property, "property");
                    if (value != null) {
                        try {
                            properties.load(new StringReader(value));
                        }
                        catch (IOException e) {
                            this.logger.error("Failed to load property " + property + ", continuing...", e);
                        }
                    }
                }
                endpoint.setConsumerProperties(properties);
            }
        }
    
        private String getEndpointId(LybGeekKafkaListener kafkaListener) {
            if (StringUtils.hasText(kafkaListener.id())) {
                return resolveExpressionAsString(kafkaListener.id(), "id");
            }
            else {
                return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
            }
        }
    
        private String getEndpointGroupId(LybGeekKafkaListener kafkaListener, String id) {
            String groupId = null;
            if (StringUtils.hasText(kafkaListener.groupId())) {
                groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
            }
            if (groupId == null && kafkaListener.idIsGroup() && StringUtils.hasText(kafkaListener.id())) {
                groupId = id;
            }
            return groupId;
        }
    
        private TopicPartitionInitialOffset[] resolveTopicPartitions(LybGeekKafkaListener kafkaListener) {
            TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
            List<TopicPartitionInitialOffset> result = new ArrayList<>();
            if (topicPartitions.length > 0) {
                for (TopicPartition topicPartition : topicPartitions) {
                    result.addAll(resolveTopicPartitionsList(topicPartition));
                }
            }
            return result.toArray(new TopicPartitionInitialOffset[0]);
        }
    
        private String[] resolveTopics(LybGeekKafkaListener kafkaListener) {
            String[] topics = kafkaListener.topics();
            List<String> result = new ArrayList<>();
            if (topics.length > 0) {
                for (String topic1 : topics) {
                    Object topic = resolveExpression(topic1);
                    resolveAsString(topic, result);
                }
            }
            return result.toArray(new String[0]);
        }
    
        private Pattern resolvePattern(LybGeekKafkaListener kafkaListener) {
            Pattern pattern = null;
            String text = kafkaListener.topicPattern();
            if (StringUtils.hasText(text)) {
                Object resolved = resolveExpression(text);
                if (resolved instanceof Pattern) {
                    pattern = (Pattern) resolved;
                }
                else if (resolved instanceof String) {
                    pattern = Pattern.compile((String) resolved);
                }
                else if (resolved != null) {
                    throw new IllegalStateException(
                            "topicPattern must resolve to a Pattern or String, not " + resolved.getClass());
                }
            }
            return pattern;
        }
    
        private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {
            Object topic = resolveExpression(topicPartition.topic());
            Assert.state(topic instanceof String,
                    "topic in @TopicPartition must resolve to a String, not " + topic.getClass());
            Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");
            String[] partitions = topicPartition.partitions();
            PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();
            Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
                    "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
            List<TopicPartitionInitialOffset> result = new ArrayList<>();
            for (String partition : partitions) {
                resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
            }
    
            for (PartitionOffset partitionOffset : partitionOffsets) {
                TopicPartitionInitialOffset topicPartitionOffset =
                        new TopicPartitionInitialOffset((String) topic,
                                resolvePartition(topic, partitionOffset),
                                resolveInitialOffset(topic, partitionOffset),
                                isRelative(topic, partitionOffset));
                if (!result.contains(topicPartitionOffset)) {
                    result.add(topicPartitionOffset);
                }
                else {
                    throw new IllegalArgumentException(
                            String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
                                    topicPartitionOffset));
                }
            }
            return result;
        }
    
        private Integer resolvePartition(Object topic, PartitionOffset partitionOffset) {
            Object partitionValue = resolveExpression(partitionOffset.partition());
            Integer partition;
            if (partitionValue instanceof String) {
                Assert.state(StringUtils.hasText((String) partitionValue),
                        "partition in @PartitionOffset for topic '" + topic + "' cannot be empty");
                partition = Integer.valueOf((String) partitionValue);
            }
            else if (partitionValue instanceof Integer) {
                partition = (Integer) partitionValue;
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",
                        topic, partitionOffset.partition(), partitionValue.getClass()));
            }
            return partition;
        }
    
        private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {
            Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
            Long initialOffset;
            if (initialOffsetValue instanceof String) {
                Assert.state(StringUtils.hasText((String) initialOffsetValue),
                        "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
                initialOffset = Long.valueOf((String) initialOffsetValue);
            }
            else if (initialOffsetValue instanceof Long) {
                initialOffset = (Long) initialOffsetValue;
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",
                        topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));
            }
            return initialOffset;
        }
    
        private boolean isRelative(Object topic, PartitionOffset partitionOffset) {
            Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());
            Boolean relativeToCurrent;
            if (relativeToCurrentValue instanceof String) {
                relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);
            }
            else if (relativeToCurrentValue instanceof Boolean) {
                relativeToCurrent = (Boolean) relativeToCurrentValue;
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",
                        topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));
            }
            return relativeToCurrent;
        }
    
        @SuppressWarnings("unchecked")
        private void resolveAsString(Object resolvedValue, List<String> result) {
            if (resolvedValue instanceof String[]) {
                for (Object object : (String[]) resolvedValue) {
                    resolveAsString(object, result);
                }
            }
            else if (resolvedValue instanceof String) {
                result.add((String) resolvedValue);
            }
            else if (resolvedValue instanceof Iterable) {
                for (Object object : (Iterable<Object>) resolvedValue) {
                    resolveAsString(object, result);
                }
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@LybGeekKafkaListener can't resolve '%s' as a String", resolvedValue));
            }
        }
    
        @SuppressWarnings("unchecked")
        private void resolvePartitionAsInteger(String topic, Object resolvedValue,
                List<TopicPartitionInitialOffset> result) {
            if (resolvedValue instanceof String[]) {
                for (Object object : (String[]) resolvedValue) {
                    resolvePartitionAsInteger(topic, object, result);
                }
            }
            else if (resolvedValue instanceof String) {
                Assert.state(StringUtils.hasText((String) resolvedValue),
                        "partition in @TopicPartition for topic '" + topic + "' cannot be empty");
                result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));
            }
            else if (resolvedValue instanceof Integer[]) {
                for (Integer partition : (Integer[]) resolvedValue) {
                    result.add(new TopicPartitionInitialOffset(topic, partition));
                }
            }
            else if (resolvedValue instanceof Integer) {
                result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));
            }
            else if (resolvedValue instanceof Iterable) {
                for (Object object : (Iterable<Object>) resolvedValue) {
                    resolvePartitionAsInteger(topic, object, result);
                }
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@LybGeekKafkaListener for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
            }
        }
    
        private String resolveExpressionAsString(String value, String attribute) {
            Object resolved = resolveExpression(value);
            if (resolved instanceof String) {
                return (String) resolved;
            }
            else if (resolved != null) {
                throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
                        + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
            }
            return null;
        }
    
        private Integer resolveExpressionAsInteger(String value, String attribute) {
            Object resolved = resolveExpression(value);
            Integer result = null;
            if (resolved instanceof String) {
                result = Integer.parseInt((String) resolved);
            }
            else if (resolved instanceof Number) {
                result = ((Number) resolved).intValue();
            }
            else if (resolved != null) {
                throw new IllegalStateException(
                        "The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "
                                + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
            }
            return result;
        }
    
        private Boolean resolveExpressionAsBoolean(String value, String attribute) {
            Object resolved = resolveExpression(value);
            Boolean result = null;
            if (resolved instanceof Boolean) {
                result = (Boolean) resolved;
            }
            else if (resolved instanceof String) {
                result = Boolean.parseBoolean((String) resolved);
            }
            else if (resolved != null) {
                throw new IllegalStateException(
                        "The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "
                                + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
            }
            return result;
        }
    
        private Object resolveExpression(String value) {
            return this.resolver.evaluate(resolve(value), this.expressionContext);
        }
    
        /**
         * Resolve the specified value if possible.
         * @param value the value to resolve
         * @return the resolved value
         * @see ConfigurableBeanFactory#resolveEmbeddedValue
         */
        private String resolve(String value) {
            if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
                return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
            }
            return value;
        }
    
        private void addFormatters(FormatterRegistry registry) {
            for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
                registry.addConverter(converter);
            }
            for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
                registry.addConverter(converter);
            }
            for (Formatter<?> formatter : getBeansOfType(Formatter.class)) {
                registry.addFormatter(formatter);
            }
        }
    
        private <T> Collection<T> getBeansOfType(Class<T> type) {
            if (LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {
                return ((ListableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory)
                        .getBeansOfType(type)
                        .values();
            }
            else {
                return Collections.emptySet();
            }
        }
    
        /**
         * An {@link MessageHandlerMethodFactory} adapter that offers a configurable underlying
         * instance to use. Useful if the factory to use is determined once the endpoints
         * have been registered but not created yet.
         * @see KafkaListenerEndpointRegistrar#setMessageHandlerMethodFactory
         */
        private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
    
            private final DefaultFormattingConversionService defaultFormattingConversionService =
                    new DefaultFormattingConversionService();
    
            private MessageHandlerMethodFactory messageHandlerMethodFactory;
    
            public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {
                this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;
            }
    
            @Override
            public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
                return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
            }
    
            private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
                if (this.messageHandlerMethodFactory == null) {
                    this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();
                }
                return this.messageHandlerMethodFactory;
            }
    
            private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
                DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
                Validator validator = LybGeekKafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
                if (validator != null) {
                    defaultFactory.setValidator(validator);
                }
                defaultFactory.setBeanFactory(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
    
                ConfigurableBeanFactory cbf =
                        LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?
                                (ConfigurableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory :
                                null;
    
    
                this.defaultFormattingConversionService.addConverter(
                        new BytesToStringConverter(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.charset));
    
                defaultFactory.setConversionService(this.defaultFormattingConversionService);
    
                List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
    
                // Annotation-based argument resolution
                argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));
                argumentResolvers.add(new HeadersMethodArgumentResolver());
    
                // Type-based argument resolution
                final GenericMessageConverter messageConverter =
                        new GenericMessageConverter(this.defaultFormattingConversionService);
                argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
                argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {
    
    
                    @Override
                    public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
                        Object resolved = super.resolveArgument(parameter, message);
                        /*
                         * Replace KafkaNull list elements with null.
                         */
                        if (resolved instanceof List) {
                            List<?> list = ((List<?>) resolved);
                            for (int i = 0; i < list.size(); i++) {
                                if (list.get(i) instanceof KafkaNull) {
                                    list.set(i, null);
                                }
                            }
                        }
                        return resolved;
                    }
    
                    @Override
                    protected boolean isEmptyPayload(Object payload) {
                        return payload == null || payload instanceof KafkaNull;
                    }
    
                });
                defaultFactory.setArgumentResolvers(argumentResolvers);
    
                defaultFactory.afterPropertiesSet();
                return defaultFactory;
            }
    
        }
    
        private static class BytesToStringConverter implements Converter<byte[], String> {
    
    
            private final Charset charset;
    
            BytesToStringConverter(Charset charset) {
                this.charset = charset;
            }
    
            @Override
            public String convert(byte[] source) {
                return new String(source, this.charset);
            }
    
        }
    
        private static class ListenerScope implements Scope {
    
            private final Map<String, Object> listeners = new HashMap<>();
    
            ListenerScope() {
                super();
            }
    
            public void addListener(String key, Object bean) {
                this.listeners.put(key, bean);
            }
    
            public void removeListener(String key) {
                this.listeners.remove(key);
            }
    
            @Override
            public Object get(String name, ObjectFactory<?> objectFactory) {
                return this.listeners.get(name);
            }
    
            @Override
            public Object remove(String name) {
                return null;
            }
    
            @Override
            public void registerDestructionCallback(String name, Runnable callback) {
            }
    
            @Override
            public Object resolveContextualObject(String key) {
                return this.listeners.get(key);
            }
    
            @Override
            public String getConversationId() {
                return null;
            }
    
        }
    
    }
    
    

    业务侧如何使用

    示例

    @LybGeekKafkaListener(id = "createUser",topics = Constant.USER_TOPIC)
    public class UserComsumer extends BaseComusmeListener {
    
        @Autowired
        private UserService userService;
    
        @Override
        public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) {
            User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);
    
            return userService.isExistUserByUsername(user.getUsername());
        }
    
        @Override
        public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) {
            User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);
            return userService.save(user);
        }
    }
    

    总结

    有时候我们在宣导一些事情时,往往会发现即使我们已经说了N遍了,事情仍然会出现纰漏。这时候我们可以考虑把我们想宣导的东西工具化,通过工具来规范。比如有些业务,可能一些开发没考虑全面,我们就可以基于业务,把一些核心的场景抽象成方法,然后开发人员基于这些抽象方法,做具体实现。

    demo链接

    https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template

    相关文章

      网友评论

          本文标题:聊聊如何实现一个带幂等模板的Kafka消费者

          本文链接:https://www.haomeiwen.com/subject/hjkjprtx.html