创建生产者代码示例:
public class Producer {
//RabbitMQ服务器地址
public final static String host="192.168.1.1";
//RabbitMQ端口
public final static int port=5672;
//RabbitMQ虚拟主机
public static final String virtualHost="/";
//RabbitMQ用户名
public final static String username="admin";
//RabbitMQ密码
public final static String password="123456";
//队列名称
public final static String queue_name="serviceNotice.queue";
public static void main(String[] args) throws IOException{
//创建连接工厂,此部分可以单独抽出作为一个静态抽象方法以便调用
ConnectionFactory factory=new ConnectionFactory();
//设置服务器地址
factory.setHost(host);
//设置服务器端口
factory.setPort(port);
//设置虚拟主机
factory.setVirtualHost(virtualHost);
//设置用户名
factory.setUsername(userName);
//设置密码
factory.setPassword(password);
//获取连接
Connection connection=factory.newConnection();
//创建信道
Channel channel=connection.createChannel();
//信道指定队列设置,如果在Rabbit管理工具中创建了队列,则不需要调用此方法
//参数(名字,是否持久化,独占的队列,不使用时是否自动删除,其他参数)
channel.queueDeclare(queue_name,true,false,true,null);
String message="这是一个测试消息";
//发布消息
//参数(交换器名称,队列名称,属性,参数的字节数据)
channel.basicPublish("",queue_name,null,message.getBytes());
//关闭信道
channel.close();
//关闭连接
connection.close();
}
}
创建消费者代码示例:
public class Consumer {
//RabbitMQ服务器地址
public final static String host="192.168.1.1";
//RabbitMQ端口
public final static int port=5672;
//RabbitMQ虚拟主机
public static final String virtualHost="/";
//RabbitMQ用户名
public final static String username="admin";
//RabbitMQ密码
public final static String password="123456";
//队列名称
public final static String queue_name="serviceNotice.queue";
public static void main(String[] args) throws IOException {
//创建连接工厂,此部分可以单独抽出作为一个静态抽象方法以便调用
ConnectionFactory factory=new ConnectionFactory();
//设置服务器地址
factory.setHost(host);
//设置服务器端口
factory.setPort(port);
//设置虚拟主机
factory.setVirtualHost(virtualHost);
//设置用户名
factory.setUsername(userName);
//设置密码
factory.setPassword(password);
//获取连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//信道设置,如果在Rabbit管理工具中创建了队列,则不需要调用此方法
channel.queueDeclare(queue_name, true, false, true, null);
//创建消费者
QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//消费消息,false表示需手动确认消息已成功获取
channel.basicConsume(queue_name,false,queueingConsumer);
while (true) { //消费者程序运行开着 如果生产者新增了数据会自动获取
// nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收消息:" + message);
//消息确认为成功获取,false表示不重新入队
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
网友评论