美文网首页
4.JMS的规范(一)

4.JMS的规范(一)

作者: 21号新秀_邓肯 | 来源:发表于2021-05-03 20:43 被阅读0次

1. JMS是什么

什么是Java消息服务?

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

image.png image.png

2. 消息头

JMS的消息头属性:

  • JMSDestination: 消息目的地
  • JMSDeliverymode: 消息持久化模式
  • JMSExpiration: 消息过期时间
  • JMSPriority: 消息的优先级
  • JMSMessageID: 消息的唯一标识符. 后面我们会介绍如何解决幂等性.

消息的生产者可以set这些属性, 消息的消费者可以get 这些属性. 这些属性在 send 方法里面也可以设置

package  com.at.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce_topic {

    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);

        for (int i = 1; i < 4 ; i++) {
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            //******************************** 设置属性 ***************
            // 这里可以指定每个消息的目的地
            textMessage.setJMSDestination(topic);
            /*
            持久模式和非持久模式。
            一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
            一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。
             */
            textMessage.setJMSDeliveryMode(0);
            /*
            可以设置消息在一定时间后过期,默认是永不过期。
            消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。
            如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
            如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。
             */
            textMessage.setJMSExpiration(1000);
            /*  消息优先级,从0-9十个级别,0-4是普通消息5-9是加急消息。
            JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。
             */
            textMessage.setJMSPriority(10);
            // 唯一标识每个消息的标识。MQ会给我们默认生成一个,我们也可以自己指定。
            textMessage.setJMSMessageID("ABCD");
            // 上面有些属性在send方法里也能设置
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
}

3. 消息体

  • 封装具体的消息数据
  • 5种消息体格式
  • 发送和接受的消息体必须一直对应

5种消息体格式:

  • TextMessage: 普通字符串消息, 包含一个string
  • MapMessage: 一个Map 类型的消息, key 为string 类型
  • BytesMessage : 二进制数组消息, 包含一个 byte []
  • StreamMessage: Java数据流消息, 用标准操作来 顺序的填充和读取
  • ObjectMessage: 对象消息, 包含一个可序列化的Java对象

消息生产者:

package  com.at.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce_topic {

    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
         javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
         connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);

        for (int i = 1; i < 4 ; i++) {
            // 发送TextMessage消息体
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            messageProducer.send(textMessage);
            // 发送MapMessage  消息体。set方法: 添加,get方式:获取
            MapMessage  mapMessage = session.createMapMessage();
            mapMessage.setString("name", "张三"+i);
            mapMessage.setInt("age", 18+i);
            messageProducer.send(mapMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
}

消息消费者

package  com.at.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsConsummer_topic {
    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageConsumer messageConsumer = session.createConsumer(topic);

        messageConsumer.setMessageListener( (message) -> {
 // 判断消息是哪种类型之后,再强转。
            if (null != message  && message instanceof TextMessage){
                   TextMessage textMessage = (TextMessage)message;
                    try {
                      System.out.println("****消费者text的消息:"+textMessage.getText());
                    }catch (JMSException e) {
                    }
                }
            if (null != message  && message instanceof MapMessage){
                MapMessage mapMessage = (MapMessage)message;
                try {
                    System.out.println("****消费者的map消息:"+mapMessage.getString("name"));
                    System.out.println("****消费者的map消息:"+mapMessage.getInt("age"));
                }catch (JMSException e) {
                }
            }

        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

4. 消息属性

如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。

他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。

消息生产者

package  com.at.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce_topic {

    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);

        for (int i = 1; i < 4 ; i++) {
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            // 调用Message的set*Property()方法,就能设置消息属性。根据value的数据类型的不同,有相应的API。
            textMessage.setStringProperty("From","ZhangSan@qq.com");
            textMessage.setByteProperty("Spec", (byte) 1);
            textMessage.setBooleanProperty("Invalide",true);
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
}

消息消费者

package  com.at.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsConsummer_topic {
    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageConsumer messageConsumer = session.createConsumer(topic);

        messageConsumer.setMessageListener( (message) -> {
            if (null != message  && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                      System.out.println("消息体:"+textMessage.getText());
                      System.out.println("消息属性:"+textMessage.getStringProperty("From"));
                      System.out.println("消息属性:"+textMessage.getByteProperty("Spec"));
                      System.out.println("消息属性:"+textMessage.getBooleanProperty("Invalide"));
                    }catch (JMSException e) {
                    }
                }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

相关文章

  • 4.JMS的规范(一)

    1. JMS是什么 什么是Java消息服务? Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准...

  • 4.JMS的规范(二)

    5. 消息的持久化 什么是持久化消息? 保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其...

  • css命名整理

    文章整理了Web前端开发中的各种CSS规范,包括文件规范、注释规范、命名规范、书写规范、测试规范等。 一、文件规范...

  • css命名规范整理

    文章整理了Web前端开发中的各种CSS规范,包括文件规范、注释规范、命名规范、书写规范、测试规范等。 一、文件规范...

  • MySQL 开发规范

    MySQL开发规范由基础规范、命名规范、字段规范、索引规范、语句规范组成。 1、基础规范 (1)所有表统一使用In...

  • 10086 APP设计、标注规范

    这一套移动视觉APPUI设计规范,包含了界面布局、颜色、文字规范、按钮规范、图标规范、图片规范、列表规范、控件规范...

  • PM篇

    PM 技术篇1.开发规范命名规范,异常处理规范,日志规范,统一框架,代码commit规范,代码评审规范,统一API...

  • APP规范实例(详细的UI设计方法)

    这一套UI设计规范,包含了界面布局、颜色、文字规范、按钮规范、图标规范、图片规范、列表规范、控件规范、弹出浮层,超...

  • 武当剑的美术规范

    动作规范 模型规范 贴图规范 场景规范 特效规范 UI贴图规范

  • MySQL运维及开发规范

    MySQL运维及开发规范 一.基础规范 二.命名规范 库、表、字段开发设计规范 四.索引规范 五.SQL规范 六....

网友评论

      本文标题:4.JMS的规范(一)

      本文链接:https://www.haomeiwen.com/subject/plwtdltx.html