- 生产者:
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException {
Connection connections = ConnectionUtils.getConnections();
Channel channel = connections.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String msg = new String("routing test");
String routingKey="info";
System.out.println("send " + msg);
channel.basicPublish(EXCHANGE_NAME,routingKey, null, msg.getBytes());
channel.close();
connections.close();
}
}
- 消费者1:
public class Recv1 {
private static final String QUEUE_NAME="erro_info_manage";
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException {
Connection connections = ConnectionUtils.getConnections();
final Channel channel = connections.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg=new String(body,"utf-8");
System.out.println("recv1"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
}
- 消费者2:
public class Recv2 {
private static final String QUEUE_NAME="all_info_manage";
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException {
Connection connections = ConnectionUtils.getConnections();
final Channel channel = connections.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg=new String(body,"utf-8");
System.out.println("recv2 "+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
}
}
此时只有消费者2能收到
网友评论