负载均衡(取模做负载均衡)简单利用线程实现吞吐量栗子实现:
package bhz.mq.action;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
//单例模式
//1:连接工厂
private ConnectionFactory connectionFactory;
//2:连接对象
private Connection connection;
//3:Session对象
private Session session;
//4:生产者
private MessageProducer messageProducer;
public Producer(){
try {
this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
"tcp://localhost:61616");
this.connection = this.connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
this.messageProducer = this.session.createProducer(null);
} catch (Exception e) {
e.printStackTrace();
}
}
public Session getSession(){
return this.session;
}
public void send1(){
try {
Destination destination = this.session.createQueue("first");
for (int i=0;i<100;i++) {
MapMessage msg = this.session.createMapMessage();
int id = i;
msg.setInt("id", id);
msg.setString("name", "张"+i);
msg.setStringProperty("age", ""+i);
String receiver = id%2==0?"A":"B";
msg.setStringProperty("receiver", receiver);
this.messageProducer.send(destination, msg, DeliveryMode.NON_PERSISTENT, 2,1000*60*10L);
System.out.println("message send id :"+id);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void send2(){
try {
Destination destination = this.session.createQueue("first");
TextMessage msg = this.session.createTextMessage("我是一个字符串内容");
this.messageProducer.send(destination,msg,DeliveryMode.NON_PERSISTENT,9,1000*60*10L);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Producer p = new Producer();
p.send1();
}
}
package bhz.mq.action;
import javax.jms.MapMessage;
public class MessageTask implements Runnable{
private MapMessage message;
public MessageTask(MapMessage message){
this.message = message;
}
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println("当前线程:"+Thread.currentThread().getName()+"处理任务:"+this.message.getString("id"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
package bhz.mq.action;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerA {
public final String SELECTOR="receiver = 'A'";
//1:连接工厂
private ConnectionFactory connectionFactory;
//2:连接对象
private Connection connection;
//3:Session对象
private Session session;
//4:消费者
private MessageConsumer messageConsumer;
//5:目标地址
private Destination destination;
public ConsumerA(){
try {
this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
"tcp://localhost:61616");
this.connection = this.connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
this.destination=this.session.createQueue("first");
//创建消费者的时候发生了变化
this.messageConsumer = this.session.createConsumer(this.destination, SELECTOR);
System.out.println("Consumer A start...");
} catch (Exception e) {
e.printStackTrace();
}
}
public void receiver(){
try {
this.messageConsumer.setMessageListener(new Listener());
} catch (Exception e) {
e.printStackTrace();
}
}
class Listener implements MessageListener{
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),20,120L,TimeUnit.SECONDS,queue);
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
}
if (message instanceof MapMessage) {
MapMessage ret = (MapMessage)message;
executor.execute(new MessageTask(ret));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ConsumerA c = new ConsumerA();
c.receiver();
}
}
package bhz.mq.action;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerB {
public final String SELECTOR="receiver = 'B'";
//1:连接工厂
private ConnectionFactory connectionFactory;
//2:连接对象
private Connection connection;
//3:Session对象
private Session session;
//4:消费者
private MessageConsumer messageConsumer;
//5:目标地址
private Destination destination;
public ConsumerB(){
try {
this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
"tcp://localhost:61616");
this.connection = this.connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
this.destination=this.session.createQueue("first");
//创建消费者的时候发生了变化
this.messageConsumer = this.session.createConsumer(this.destination, SELECTOR);
System.out.println("Consumer B start...");
} catch (Exception e) {
e.printStackTrace();
}
}
public void receiver(){
try {
this.messageConsumer.setMessageListener(new Listener());
} catch (Exception e) {
e.printStackTrace();
}
}
class Listener implements MessageListener{
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),20,120L,TimeUnit.SECONDS,queue);
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
}
if (message instanceof MapMessage) {
MapMessage ret = (MapMessage)message;
executor.execute(new MessageTask(ret));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ConsumerB c = new ConsumerB();
c.receiver();
}
}
网友评论