美文网首页
聊聊cheddar的MessageSender

聊聊cheddar的MessageSender

作者: go4it | 来源:发表于2021-03-27 21:12 被阅读0次

    本文主要研究一下cheddar的MessageSender

    MessageSender

    Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java

    public interface MessageSender<T extends Message> {
    
        /**
         * Send a message
         * @param message Message to send
         * @throws MessageSendException
         */
        void send(T message) throws MessageSendException;
    
        /**
         * Send a message, where the message is not visible to receivers for the specified delay duration
         * @param message Message to send
         * @param delaySeconds Duration for which sent message is invisible to receivers
         * @throws MessageSendException
         */
        void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;
    }
    

    MessageSender接口定义了send、sendDelayedMessage方法

    MessageSenderImpl

    Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSenderImpl.java

    public class MessageSenderImpl<T extends Message> implements MessageSender<T> {
    
        private final MessageQueue<T> messageQueue;
    
        public MessageSenderImpl(final MessageQueue<T> messageQueue) {
            this.messageQueue = messageQueue;
        }
    
        @Override
        public void send(final T message) throws MessageSendException {
            messageQueue.send(message);
        }
    
        @Override
        public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException {
            messageQueue.sendDelayedMessage(message, delaySeconds);
        }
    
    }
    

    MessageSenderImpl实现了MessageSender接口,其send方法委托给了messageQueue.send;其sendDelayedMessage方法委托给了messageQueue.sendDelayedMessage

    MessageQueue

    Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageQueue.java

    public interface MessageQueue<T extends Message> {
    
        /**
         * @return The queue name
         */
        String getName();
    
        /**
         * Send a message to this message queue
         * @param message Message to send
         * @throws MessageSendException
         */
        void send(T message) throws MessageSendException;
    
        /**
         * Send a message to this message queue; the message is not visible to receivers for the specified delay duration
         * @param message Message to send
         * @param delaySeconds Duration for which sent message is invisible to receivers
         * @throws MessageSendException
         */
        void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException;
    
        /**
         * Receives any number of messages on this queue, but does not delete them. No order or priority of messages is
         * guaranteed.
         * @return List of received {@code Message}s
         * @throws MessageReceiveException
         */
        List<T> receive() throws MessageReceiveException;
    
        /**
         * Receives any number of messages on this queue up to the maximum specified, but does not delete them. No order or
         * priority of messages is guaranteed. This call will spend up to the wait time given for a message to arrive in the
         * queue before returning.
         * @param waitTimeSeconds The duration (in seconds) for which the call will wait for a message to arrive in the
         *            queue before returning. If a message is available, the call will return sooner.
         * @param maxMessages The maximum number of messages to return. Will never return more messages than this value but
         *            may return fewer. Values can be from 1 to 10.
         * @return List of received {@code Message}s
         * @throws MessageReceiveException
         */
        List<T> receive(int waitTimeSeconds, int maxMessages) throws MessageReceiveException;
    
        /**
         * Deletes a message previously received from this queue.
         * @param typedMessage {@code Message} to delete
         * @throws MessageDeleteException
         */
        void delete(T message) throws MessageDeleteException;
    
    }
    

    MessageQueue接口定义了getName、send、sendDelayedMessage、receive、delete方法

    InMemoryMessageQueue

    Cheddar/cheddar/cheddar-integration-mocks/src/main/java/com/clicktravel/infrastructure/messaging/inmemory/InMemoryMessageQueue.java

    public class InMemoryMessageQueue<T extends Message> implements MessageQueue<T>, Resettable {
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
        private final Queue<T> queue = new ConcurrentLinkedQueue<>();
        private final String name;
        private final InMemoryMessageQueuePoller inMemoryMessageQueuePoller;
    
        @SuppressWarnings("unchecked")
        public InMemoryMessageQueue(final String name, final InMemoryMessageQueuePoller inMemoryMessageQueuePoller,
                final InMemoryExchange<T>... inMemoryExchanges) {
            this.name = name;
            this.inMemoryMessageQueuePoller = inMemoryMessageQueuePoller;
    
            final List<String> exchangeNames = new ArrayList<>();
            for (final InMemoryExchange<T> inMemoryExchange : inMemoryExchanges) {
                inMemoryExchange.addSubscriber(this);
                exchangeNames.add(inMemoryExchange.getName());
            }
            logger.info("Using in memory message queue: " + name + " with subscriptions to these exchanges: ["
                    + StringUtils.join(exchangeNames) + "]");
        }
    
        @Override
        public void send(final T message) {
            queue.add(message);
            inMemoryMessageQueuePoller.poll();
        }
    
        @Override
        public void sendDelayedMessage(final T message, final int delaySeconds) {
            send(message); // delay not supported
        }
    
        @Override
        public String getName() {
            return name;
        }
    
        @Override
        public List<T> receive(final int waitTimeSeconds, final int maxMessages) {
            return receive();
        }
    
        @Override
        public List<T> receive() {
            final T message = queue.peek();
            final List<T> messages = new ArrayList<T>(1);
            if (message != null) {
                messages.add(message);
            }
            return messages;
        }
    
        @Override
        public void delete(final T message) {
            queue.remove(message);
        }
    
        @Override
        public String toString() {
            return "InMemoryMessageQueue [name=" + name + ", queue=" + queue + "]";
        }
    
        @Override
        public void reset() {
            queue.clear();
        }
    }
    

    InMemoryMessageQueue实现了MessageQueue、Resettable接口,它定义了ConcurrentLinkedQueue及InMemoryMessageQueuePoller两个属性;send方法会往queue添加message,然后执行inMemoryMessageQueuePoller.poll();sendDelayedMessage方法目前不支持

    SqsMessageQueue

    Cheddar/cheddar/cheddar-integration-aws/src/main/java/com/clicktravel/infrastructure/messaging/aws/sqs/SqsMessageQueue.java

    public abstract class SqsMessageQueue<T extends Message> implements MessageQueue<T> {
    
        private final SqsQueueResource sqsQueueResource;
    
        public SqsMessageQueue(final SqsQueueResource sqsQueueResource) {
            this.sqsQueueResource = sqsQueueResource;
        }
    
        protected abstract String toSqsMessageBody(final T message);
    
        protected abstract T toMessage(final com.amazonaws.services.sqs.model.Message sqsMessage);
    
        @Override
        public String getName() {
            return sqsQueueResource.getQueueName();
        }
    
        @Override
        public void send(final T message) throws MessageSendException {
            try {
                sqsQueueResource.sendMessage(toSqsMessageBody(message));
            } catch (final AmazonClientException e) {
                throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName()
                        + "]", e);
            }
        }
    
        @Override
        public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException {
            try {
                sqsQueueResource.sendDelayedMessage(toSqsMessageBody(message), delaySeconds);
            } catch (final AmazonClientException e) {
                throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName()
                        + "]", e);
            }
        }
    
        @Override
        public List<T> receive() throws MessageReceiveException {
            try {
                return toMessages(sqsQueueResource.receiveMessages());
            } catch (final AmazonClientException e) {
                throw new MessageReceiveException("Unable to receive messages on SQS queue:["
                        + sqsQueueResource.getQueueName() + "]", e);
            }
        }
    
        @Override
        public List<T> receive(final int waitTimeSeconds, final int maxMessages) throws MessageReceiveException {
            try {
                return toMessages(sqsQueueResource.receiveMessages(waitTimeSeconds, maxMessages));
            } catch (final AmazonClientException e) {
                throw new MessageReceiveException("Unable to receive messages on SQS queue:["
                        + sqsQueueResource.getQueueName() + "]", e);
            }
        }
    
        private List<T> toMessages(final List<com.amazonaws.services.sqs.model.Message> sqsMessages) {
            final ArrayList<T> messages = new ArrayList<>();
            for (final com.amazonaws.services.sqs.model.Message sqsMessage : sqsMessages) {
                messages.add(toMessage(sqsMessage));
            }
            return messages;
        }
    
        @Override
        public void delete(final T message) throws MessageDeleteException {
            try {
                sqsQueueResource.deleteMessage(message.getReceiptHandle());
            } catch (final AmazonClientException e) {
                throw new MessageDeleteException("Unable to delete message on SQS queue:["
                        + sqsQueueResource.getQueueName() + "]", e);
            }
        }
    
        public SqsQueueResource getSqsQueue() {
            return sqsQueueResource;
        }
    }
    

    SqsMessageQueue是个抽象类,声明实现MessageQueue接口,其send方法委托给了sqsQueueResource.sendMessage;其sendDelayedMessage方法委托给了sqsQueueResource.sendDelayedMessage

    小结

    cheddar的MessageSender接口定义了send、sendDelayedMessage方法;MessageSenderImpl实现了MessageSender接口,其send方法委托给了messageQueue.send;其sendDelayedMessage方法委托给了messageQueue.sendDelayedMessage;InMemoryMessageQueue和SqsMessageQueue提供了两种实现,其中inMemory的实现不支持sendDelayedMessage方法。

    doc

    相关文章

      网友评论

          本文标题:聊聊cheddar的MessageSender

          本文链接:https://www.haomeiwen.com/subject/gexehltx.html