Producer生产者例子
class Producer{
static ConnectionFactory connectionFactory;
static Connection connection ;
static Channel channel;
static{
init();
}
public static void main(String[] args) {
send("你好吗");
}
public static void init(){
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("8954036aa");
}
public static void send(String str){
try {
connection= connectionFactory.newConnection();
channel =connection.createChannel();
channel.queueDeclare(Rabbitmq.QUEUE_NAME,false,false,false,null);
channel.basicPublish("",Rabbitmq.QUEUE_NAME,null,str.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
finally {
if(channel.isOpen())
{
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null)
{
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Consumer消费者例子
class Customer{
static ConnectionFactory connectionFactory;
static Connection connection ;
static Channel channel;
public static void main(String[] args) {
init();
consume();
}
public static void init(){
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("8954036aa");
}
public static void consume(){
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(Rabbitmq.QUEUE_NAME,false,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"utf-8"));
}
};
channel.basicConsume(Rabbitmq.QUEUE_NAME,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
初始化
#创建连接工厂设置账号密码 url
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("8954036aa");
queueDeclare
queueDeclare(String queue,
boolean durable,
boolean exclusive,
Map<String, Object> arguments);
#说明
queue: 队列名称
durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景
autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
arguments:
队列中的消息什么时候会自动被删除?
Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间
#AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
#channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));
Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp
Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim
Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B
Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX
Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK
Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,
Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
Master locator(x-queue-master-locator)
basicPublish
channel.basicPublish(参数1:(交换机名字),参数2:(队列名字),参数3:(BasicProperties 配置信息),参数4:(发送的消息体))
routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。
false:出现上述情形broker会直接将消息扔掉
immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
生产者
#建立连接
connection= connectionFactory.newConnection();
#创建通道
channel =connection.createChannel();
#声明一个队列
channel.queueDeclare(Rabbitmq.QUEUE_NAME,false,false,false,null);
channel.basicPublish("",Rabbitmq.QUEUE_NAME,null,str.getBytes());
消费者
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(Rabbitmq.QUEUE_NAME,false,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"utf-8"));
}
};
channel.basicConsume(Rabbitmq.QUEUE_NAME,consumer);
basicConsume
参数1:队列名字
参数2:消费者(callback)
网友评论