我感觉在mac 安装rabbitmq 挺方便的。
在终端:
brew install rabbitmq
然后就等下载完以后就开始编译安装
最后运行:
brew services start rabbitmq
它的web 管理界面是:[-->>管理]http://localhost:15672
默认账号和密码是:guest
下面是ubuntu 安装:
由于rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang
sudo apt-get install erlang-nox
更新源:
sudo apt-get update
安装Rabbitmq
sudo apt-get install rabbitmq-server
启动、停止、重启、状态rabbitMq命令
sudo rabbitmq-server start
sudo rabbitmq-server stop
sudo rabbitmq-server restart
sudo rabbitmqctl status
添加admin,并赋予administrator权限
# 添加admin用户,密码设置为admin。
sudo rabbitmqctl add_user admin admin
# 赋予权限
sudo rabbitmqctl set_user_tags admin administrator
# 赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源
sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
下面我用python 来链接rabbitMQ
先安装:
pip3 install pika
(1) 生产和消费
生产端:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='rabbit-test')
channel.basic_publish(exchange='',
routing_key='rabbit-test', # 消息队列名称
body='fafafa999')
connection.close()
消费端:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="rabbit-test")
def callback(ch, method, properties, body):
print("消费者接受到了新任务: %r" % body)
channel.basic_consume("rabbit-test", callback, True) # True表示不告诉服务端消息去取走了, 默认False 告诉服务端消息去取走了
channel.start_consuming()
(2) 消息回复
生产端不修改,消费端改一点代码
生产端:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='rabbit-test')
channel.basic_publish(exchange='',
routing_key='rabbit-test', # 消息队列名称
body='fafafa999')
connection.close()
消费端:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="rabbit-test")
def callback(ch, method, properties, body):
# 增加一行代码
ch.basic_ack(delivery_tag=method.delivery_tag)
print("消费者接受到了新任务: %r" % body)
# True 改为False
channel.basic_consume("rabbit-test", callback, False) # True表示不告诉服务端消息去取走了, 默认False 告诉服务端消息去取走了
channel.start_consuming()
(3) 持久化操作
当ribbatMQ 服务器 垮了,我们已经做了持久化操作,重启服务器依然消息还在。
我们只需在服务端修改一下代码
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)-支持持久化 -- durable=True增加代码
channel.queue_declare(queue='rabbit-test1', durable=True)
channel.basic_publish(exchange='',
routing_key='rabbit-test1', # 消息队列名称
body='fafafa999ff',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
) # -- 增加代码
)
connection.close()
(4) 闲置消费
rabbitmq 默认是轮询处理消息。
我们要改动消费端 谁闲置谁消费, 提高效力。
增加谁闲置谁消费:
channel.basic_qos(prefetch_count=1)
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="rabbit-test1")
def callback(ch, method, properties, body):
print("消费者接受到了新任务: %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 增加谁闲置谁消费
channel.basic_qos(prefetch_count=1)
channel.basic_consume("rabbit-test1", callback, False) # True表示不告诉服务端消息去取走了, 默认False 告诉服务端消息去取走了
channel.start_consuming()
(5)消息发布
生产端:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='mm1', exchange_type='fanout')
channel.basic_publish(exchange='mm1',
routing_key='',
body='hello world!')
connection.close()
消费端:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm1', exchange_type='fanout')
# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test5", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm1', queue=queue_name)
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue_name, callback, True)
channel.start_consuming()
(6)关键字发布
routing_key='dong',
生产端:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='mm7', exchange_type='direct')
channel.basic_publish(exchange='mm7',
routing_key='dong',
body='hello world! 666655')
connection.close()
消费端1:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm7', exchange_type='direct')
# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test7", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm7', queue=queue_name, routing_key='keep')
channel.queue_bind(exchange='mm7', queue=queue_name, routing_key='dong')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue_name, callback, True)
channel.start_consuming()
消费端2:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm7', exchange_type='direct')
# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test8", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm7', queue=queue_name, routing_key='keep')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue_name, callback, True)
channel.start_consuming()
(7).关键字模糊匹配发布
主要修改
exchange_type='topic'
生产端:
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='mm8', exchange_type='topic')
channel.basic_publish(exchange='mm8',
routing_key='dong.keep.cc',
body='hellorrrr world! ong.keep.cc')
connection.close()
消费端1
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm8', exchange_type='topic')
# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test8", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm8', queue=queue_name, routing_key='dong.#')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue_name, callback, True)
channel.start_consuming()
消费端2
import pika
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='mm8', exchange_type='topic')
# 随机生成一个队列
result = channel.queue_declare(queue="rabbit-test9", exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='mm8', queue=queue_name, routing_key='dong.#')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue_name, callback, True)
channel.start_consuming()
网友评论