rabbitMQ

作者: 狗狗胖妞 | 来源:发表于2017-08-23 19:59 被阅读17次

    RabbitMQ安装

    我的系统版本:

    [root@jinbo ~]#cat /etc/issue
    CentOS release 6.5 (Final)
    
    1. 安装epel库:EPEL 是yum的一个软件源,里面包含了许多基本源里没有的软件。
    wget  http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
    rpm -ivh epel-release-6-8.noarch.rpm
    yum repolist    #看到epel,说明安装成功了
    

    2.安装erlang,rabbitmq

    yum install erlang -y    #rabbitmq是erlang语言开发的
    yum install rabbitmq-server -y
    
    1. service rabbitmq-server start 默认端口5672

    2. 启用维护插件:rabbitmq-plugins enable rabbitmq_management
      界面 http://ip:15672/ 用户名密码 guest
      无法登陆解决方法: vi /etc/rabbitmq.config 写入信息,
      [{rabbit, [{loopback_users, []}]}]. 注意 . 一定要有,保存
      service rabbitmq-server restart(如果重启出现 错误 请把楼上的配置文件保存Ansi 编码)

    Python操作RabbitMQ

    基本用法

    发布端:

    import pika
    
    #创建一个基本的socket连接对象
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='192.168.1.200')
    )
    
    channel = connection.channel()  #创建一个管道对象
    
    #声明queue
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!')
    
    connection.close()
    

    接收端:

    import pika, time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.200'))
    channel = connection.channel()
    
    #You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    #was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print("Received %r" %body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端主动确认
    
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    channel.start_consuming()
    
    no-ack
    • no-ack=False 表示消费完以后不主动把状态通知rabbitmq
    • no-ack=True 当程序断开将丢掉消息

    no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed,
    or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

    • 回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag)
    • basic_comsume中的no_ack=False
    消息持久化

    We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

    When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

    First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

    channel.queue_declare(queue='hello', durable=True)
    

    This queue_declare change needs to be applied to both the producer and consumer code.

    At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

    channel.basic_publish(exchange='',
                          routing_key="task_queue",
                          body=message,
                          properties=pika.BasicProperties(
                          delivery_mode = 2,       # make message persistent
                          ))
    

    生产者和消费者端:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.200"))
    channel = connection.channel()
    channel.queue_declare(queue='cc', durable=True)  #如果有cc的队列,略过;如果没有,创建cc的队列(持久化队列)
    
    channel.basic_publish(exchange='',
                          routing_key='cc',
                          body='hello world!!!',
                          properties=pika.BasicProperties(delivery_mode=2))  #消息持久化
    connection.close()
    
    import pika
    
    connection =pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.200"))
    channel = connection.channel()
    channel.queue_declare(queue='cc', durable=True)
    
    def callback(ch, method, properties, body):
        print('Received %r' %body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(callback, queue='cc')
    channel.start_consuming()
    

    查看当前队列: #rabbitmqctl list_queues (usr/sbin/rabbitmqctl)

    相关文章

      网友评论

          本文标题:rabbitMQ

          本文链接:https://www.haomeiwen.com/subject/orztdxtx.html