连接RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
也可使用URI方式来实现
factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");
使用交换器和队列
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
发送消息
byte[] messageBodyBytes = "Hello ,World!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
消费消息
- 推模式
不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个Channel中消费者也需要通过唯一的消费者标签以作区分。
boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName, autoAck, "myConsumerTag",
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
- 拉模式
GetResponse response = channel.basicGet(QUEUE_NAME, false);
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
网友评论