1 创建queue
- python 中调用rabbitmq的库有pika,txAMQP或者py-amqplib
- 基本命令 rabbitmqctl 创建用户
设置账号 admin 密码 123123
sudo rabbitmqctl add_user admin 123123
设置账号admin 为administrator组,也就是超级管理员,
#分组详细可以参考 : https://www.jianshu.com/p/e3af4cf97820
sudo rabbitmqctl set_user_tags admin administrator
设置权限
sudo rabbitmqctl set_permissions -p "/" admin '.' '.' '.'
重启rabbitmq
sudo service rabbitmq-server restart
- 查看用户,队列
查看用户
sudo rabbitmqctl list_users
查看所有队列
sudo rabbitmqcrl list_queues
- 简单例子生产者: send.py
import pika
#未创建用户
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 用户链接
credentials=pika.PlainCredentials(username="admin",password='123123')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host="localhost",
virtual_host='/',
credentials=credentials
))
channel = connection.channel()
# 声明 队列queue
channel.queue_declare(queue='hello')
# 往队列里面加入数据
channel.basic_publish(
exchange='',
routing_key='hello',
body='hello world'
)
connection.close()
消费者 :receiver.py
import pika
#未创建用户
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 用户链接
credentials=pika.PlainCredentials(username="admin",password='123123')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host="localhost",
virtual_host='/',
credentials=credentials
))
channel = connection.channel()
# 声明队列hello
channel.queue_delcare(queue='hello')
# 创建回调函数,当有数据到来的时候会自动调用
def callback(ch,method,propertites,body):
print("receiver body:%s"%body)
channel.basic_consume(
on_message_callback=callback,
queue="hello",
auto_ack=True
)
# 开始等待,应该是长轮训
channel.start_consuming()
然后启动就可以了
python receiver.py
python send.py
2:参数
- 1,消息确认auto_ack :
任务中会有多个worker工作,当时有时候其中一个worker出现问题,那么数据就会丢失,但是auto_ack设置好以后,就会交给其他的worker执行.
receiver.py中
channel.basic_consume(
on_message_callback=callback,
queue="hello",
auto_ack=True
)
当删除auto_ack,或者auto_ack=False时,就算worker断片,也会给其他的worker执行
auto_ack = Flase
原理:好像是设置为False时,当他执行完成后,会有反馈信息,而没有收到反馈信息就会认为没有执行完,就会当做没有执行完
- 2,消息持久化存储durable :
虽然前面有消息反馈机制,但是如果当rabbitmq断片的时候,还是会有遗漏,所以就有了消息持久化
channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。所以得重新定义一个队列
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
在消费者receiver.py中,可以通过callback中获取delivery_mode的值,
当获取delivery_mode=2的时候,就是持久化队列值
- 3.公平调度prefetch_count:
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任>>务,即只有工作者完成任务之后,才会再次接收到任务
channel.basic_qos(prefetch_count=1)
- 4 完整的代码
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
# 是你执行的命令的值 e.g:python send.py 得到的就是["send"]
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
网友评论