ActiveMQ-API(五)

作者: Airycode | 来源:发表于2018-06-01 14:29 被阅读18次

负载均衡(取模做负载均衡)简单利用线程实现吞吐量栗子实现:

package bhz.mq.action;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    //单例模式
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
    
    //3:Session对象
    private Session session;
    
    //4:生产者
    private MessageProducer messageProducer;
    
    public Producer(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.messageProducer = this.session.createProducer(null);
        } catch (Exception e) {
            e.printStackTrace();    
        }
    }
    
    public Session getSession(){
        return this.session;
    }
    
    public void send1(){
        try {
            Destination destination = this.session.createQueue("first");
            for (int i=0;i<100;i++) {
                MapMessage msg = this.session.createMapMessage();
                int id = i;
                msg.setInt("id", id);
                msg.setString("name", "张"+i);
                msg.setStringProperty("age", ""+i);
                String receiver = id%2==0?"A":"B";
                msg.setStringProperty("receiver", receiver);
                this.messageProducer.send(destination, msg, DeliveryMode.NON_PERSISTENT, 2,1000*60*10L);
                System.out.println("message send id :"+id);
            }
            
            
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public void send2(){
        try {
            Destination destination = this.session.createQueue("first");
            TextMessage msg = this.session.createTextMessage("我是一个字符串内容");
            this.messageProducer.send(destination,msg,DeliveryMode.NON_PERSISTENT,9,1000*60*10L);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        Producer p = new Producer();
        p.send1();
    }
    
    
}

package bhz.mq.action;

import javax.jms.MapMessage;

public class MessageTask implements Runnable{

    private MapMessage message;
    
    public MessageTask(MapMessage message){
        this.message = message;
    }
    
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println("当前线程:"+Thread.currentThread().getName()+"处理任务:"+this.message.getString("id"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

}

package bhz.mq.action;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerA {

    public final String SELECTOR="receiver = 'A'";
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public ConsumerA(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.destination=this.session.createQueue("first");
            //创建消费者的时候发生了变化
            this.messageConsumer = this.session.createConsumer(this.destination, SELECTOR);
            System.out.println("Consumer A start...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
        ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),20,120L,TimeUnit.SECONDS,queue);
        
        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    
                }
                if (message instanceof MapMessage) {
                    MapMessage ret = (MapMessage)message;
                    executor.execute(new MessageTask(ret));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        ConsumerA c = new ConsumerA();
        c.receiver();
    }
    
}

package bhz.mq.action;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerB {

    public final String SELECTOR="receiver = 'B'";
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public ConsumerB(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.destination=this.session.createQueue("first");
            //创建消费者的时候发生了变化
            this.messageConsumer = this.session.createConsumer(this.destination, SELECTOR);
            System.out.println("Consumer B start...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
        ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),20,120L,TimeUnit.SECONDS,queue);
        
        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    
                }
                if (message instanceof MapMessage) {
                    MapMessage ret = (MapMessage)message;
                    executor.execute(new MessageTask(ret));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        ConsumerB c = new ConsumerB();
        c.receiver();
    }
    
}

相关文章

  • ActiveMQ-API(五)

    负载均衡(取模做负载均衡)简单利用线程实现吞吐量栗子实现:

  • ActiveMQ-API(一)

    1:Connection方法的使用:在成功创建正确的ConnectionFactory后,下一步将是创建一个连接,...

  • ActiveMQ-API(四)

    创建临时消息ActiveMQ通过createTemporaryQueue和createTemporaryTopic...

  • ActiveMQ-API(二)

    MessageProducer:MessageProducer是一个由Session创建的对象,用来向Destin...

  • ActiveMQ-API(三)

    MessageConsumer是一个由Session创建的对象,用来从Destination接收消息。1Messa...

  • 五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张

    五张试一下五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张...

  • 五张

    五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五张五...

  • 五触,五感,五识,五受、五觉

    从外尘到了我们的眼,就是五触,这里会有一瞬时记忆,包括眼中的记忆,耳朵中的记忆,及 从五触到大脑的红色部分,叫着五...

  • 五  月  五

    今夜,城市的灯光淹没了月牙儿, 我坐在窗台看街前的树叶摆动, 有一丝风透过纱窗, 一点微凉, 散懒的云飘着几朵, ...

  • 五块五

    回到家后 奶奶说 买馒头找了15.5 小孩子说拿了五毛买东西 可不见了五块 奶奶就去小卖部问老板 小孩拿了几块去买...

网友评论

    本文标题:ActiveMQ-API(五)

    本文链接:https://www.haomeiwen.com/subject/vitksftx.html