public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.6.99");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
for(int i = 0;i<10;i++){
channel.basicPublish("exchange","return.key",false,null,("hello world"+i).getBytes());
}
}
}
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.6.99");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchange","topic",true);
channel.queueDeclare("queueName", true, false, false, null);
channel.queueBind("queueName","exchange","return.key");
/**
* 增加限流
*/
channel.basicQos(0,2,false);
/**
* 关闭自动确认消息
*/
channel.basicConsume("queueName",false,new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
private Channel channel;
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag "+consumerTag);
System.out.println("envelope "+envelope);
System.out.println("body "+new String(body));
/**
* 应答接收到的消息
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
网友评论