美文网首页
通俗用法

通俗用法

作者: forjie | 来源:发表于2019-06-12 16:45 被阅读0次

    1 创建queue

    • python 中调用rabbitmq的库有pika,txAMQP或者py-amqplib
    • 基本命令 rabbitmqctl 创建用户
    设置账号 admin 密码 123123
    sudo rabbitmqctl add_user admin  123123  
    设置账号admin 为administrator组,也就是超级管理员,
    #分组详细可以参考 : https://www.jianshu.com/p/e3af4cf97820
    sudo rabbitmqctl set_user_tags admin administrator
    设置权限
    sudo rabbitmqctl set_permissions -p "/" admin  '.' '.' '.'
    重启rabbitmq
    sudo service rabbitmq-server restart
    
    • 查看用户,队列
    查看用户
    sudo rabbitmqctl list_users
    查看所有队列
    sudo rabbitmqcrl list_queues
    
    • 简单例子生产者: send.py
    import pika
    #未创建用户
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    # 用户链接
    credentials=pika.PlainCredentials(username="admin",password='123123')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host="localhost",
        virtual_host='/',
        credentials=credentials
        ))
    channel = connection.channel()
    # 声明 队列queue
    channel.queue_declare(queue='hello')
    # 往队列里面加入数据
    channel.basic_publish(
        exchange='',
        routing_key='hello',
        body='hello world'
        )
    connection.close()
    

    消费者 :receiver.py

    import pika
    #未创建用户
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    # 用户链接
    credentials=pika.PlainCredentials(username="admin",password='123123')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host="localhost",
        virtual_host='/',
        credentials=credentials
        ))
    channel = connection.channel()
    # 声明队列hello
    channel.queue_delcare(queue='hello')
    # 创建回调函数,当有数据到来的时候会自动调用
    def callback(ch,method,propertites,body):
        print("receiver body:%s"%body)
    channel.basic_consume(
        on_message_callback=callback,
        queue="hello",
        auto_ack=True
        )
    # 开始等待,应该是长轮训
    channel.start_consuming()
    

    然后启动就可以了

    python receiver.py
    python send.py
    

    2:参数

    • 1,消息确认auto_ack :

    任务中会有多个worker工作,当时有时候其中一个worker出现问题,那么数据就会丢失,但是auto_ack设置好以后,就会交给其他的worker执行.

    receiver.py中
    channel.basic_consume(
        on_message_callback=callback,
        queue="hello",
        auto_ack=True
        )
    当删除auto_ack,或者auto_ack=False时,就算worker断片,也会给其他的worker执行
    auto_ack = Flase
    原理:好像是设置为False时,当他执行完成后,会有反馈信息,而没有收到反馈信息就会认为没有执行完,就会当做没有执行完
    
    • 2,消息持久化存储durable :

    虽然前面有消息反馈机制,但是如果当rabbitmq断片的时候,还是会有遗漏,所以就有了消息持久化

    channel.queue_declare(queue='hello', durable=True)
    但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。所以得重新定义一个队列
    
    在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
    channel.basic_publish(exchange='',
               routing_key="task_queue",
               body=message,
               properties=pika.BasicProperties(
                 delivery_mode = 2, # make message persistent
               ))
    
    在消费者receiver.py中,可以通过callback中获取delivery_mode的值,
    当获取delivery_mode=2的时候,就是持久化队列值
    
    • 3.公平调度prefetch_count:

    上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任>>务,即只有工作者完成任务之后,才会再次接收到任务

    channel.basic_qos(prefetch_count=1)
    
    • 4 完整的代码
    #!/usr/bin/env python
    import pika
    import sys
      
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
      
    channel.queue_declare(queue='task_queue', durable=True)
      # 是你执行的命令的值 e.g:python send.py 得到的就是["send"]
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(exchange='',
               routing_key='task_queue',
               body=message,
               properties=pika.BasicProperties(
                 delivery_mode = 2, # make message persistent
               ))
    print " [x] Sent %r" % (message,)
    connection.close()
    worker.py完整代码
     
    #!/usr/bin/env python
    import pika
    import time
      
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
      
    channel.queue_declare(queue='task_queue', durable=True)
    print ' [*] Waiting for messages. To exit press CTRL+C'
      
    def callback(ch, method, properties, body):
      print " [x] Received %r" % (body,)
      time.sleep( body.count('.') )
      print " [x] Done"
      ch.basic_ack(delivery_tag = method.delivery_tag)
      
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,
               queue='task_queue')
      
    channel.start_consuming()
    
    
    
    

    相关文章

      网友评论

          本文标题:通俗用法

          本文链接:https://www.haomeiwen.com/subject/wxslfctx.html