美文网首页
使用cerely教程加安装rabbitMQ

使用cerely教程加安装rabbitMQ

作者: Py_Explorer | 来源:发表于2017-12-16 10:34 被阅读0次

cerely

1.cerely本身为分布式任务队列,是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。

2.任务队列

任务队列是一种在线程或机器间分发任务的机制。

3.消息队列

消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程,职程对消息进行处理。如下图所示:

Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。
Celery的架构
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,RabbitMQ,Redis,MongoDB等。

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,mysql等。

image.png

rabbitMQ

MQ特点:MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息.
需要的软件


image.png

rabbitmq安装

安装路径
http://www.rabbitmq.com/install-windows.html
安装后,激活 RabbitMQ's Management Plugin
使用RabbitMQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态。
打开命令窗口:
输入命令:

"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
image.png

这样,就安装好插件了,是不是能使用了呢?别急,需要重启服务才行,使用命令:

net stop RabbitMQ && net start RabbitMQ
这时候的,也许会出现这种结果:


image.png

“发生错误:发生系统错误 5。 拒绝访问。”
这是什么鬼?查了下,原来,5代表的是:不是系统管理员权限。
问题解决方案:使用管理员打开cmd再执行此命令:


image.png
这样就结束了吗?当然没有。

创建用户,密码,绑定角色

使用rabbitmqctl控制台命令(位于C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin>)来创建用户,密码,绑定权限等。
注意:安装路径不同的请看仔细啊。
rabbitmq的用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。
查看已有用户及用户的角色:

rabbitmqctl.bat list_users
image.png

新增一个用户:

rabbitmqctl.bat add_user username password
image.png

此时来看下我们当前用户哈:


image.png
  • 给用户赋予权限

    rabbitmqctl set_user_tags {用户名} {权限}
    例:创建一个超级用户
    rabbitmqctl add_user admin1 admin1
    rabbitmqctl set_user_tags admin1 administrator
    

通过百度都可以找到,直接按步骤安装即可。
安装完成后,在页面输入url地址

http://127.0.0.1:15672/

如果顺利跳出页面,说明安装完成,否则检查是否有纰漏。


image.png
image.png

可以自己设置自己的username和password。

# coding=utf-8
# 导入Exchange, Queue,连接rabbitmq的交换机和队列
from kombu import Exchange, Queue
# 需求请求时导入requests包
import requests

requests.packages.urllib3.disable_warnings()
# 启动命令
# 即worker,cmd命令后,转入到此文件所在目录,然后运行此命令。

# celery -A  ss worker -P eventlet -c 3 --loglevel=info
# 其中  ss为文件名,3为协程数,这两处可以自己定义
# 初始化celery
app = Celery()
# 声明连接到rabbitmq中的队列为ss
QueueName = "ss"
# 参数设置celery
app.conf.update(
    # 中间人设置
    BROKER_URL="amqp://spider:spider_123@127.0.0.1:5672//spider",
    # 配置序列化任务载荷的默认的序列化方式
    CELERY_TASK_SERIALIZER='json',
    # 忽略接收其他内容
    CELERY_ACCEPT_CONTENT=['json'],
    # 结果序列号
    CELERY_RESULT_SERIALIZER='json',
    # 设置时区
    CELERY_TIMEZONE='Asia/Shanghai',
    # 使用UTC的方式,UTC的时间、时区、时差
    CELERY_ENABLE_UTC=True,
    # 配置队列
    CELERY_QUEUES=(
        Queue(QueueName, Exchange(QueueName), routing_key=QueueName),
    ),
    # 默认队列
    CELERY_DEFAULT_QUEUE=QueueName,
    # 连接方式
    CELERY_DEFAULT_EXCHANGE_TYPE='direct',
    # 路由队列
    CELERY_DEFAULT_ROUTING_KEY=QueueName,
    # 任务执行结果的超时时间
    CELERY_TASK_RESULT_EXPIRES=1800,
    # worker 每次取任务的数量
    CELERYD_PREFETCH_MULTIPLIER=1,
    # 每个worker最多执行完10个任务就会被销毁,可防止内存泄露
   CELERYD_MAX_TASKS_PER_CHILD=10,
    #  非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV=True,
    # 可以让Celery更加可靠,只有当worker执行完任务后,才会告诉MQ,消息被消费
    CELERY_ACKS_LATE=True,
    # 单个任务的运行时间不超过此值,否则会被SIGKILL 信号杀死
    CELERYD_TASK_TIME_LIMIT=600,
    #  任务发出后,经过一段时间还未收到acknowledge , 就将任务重新交给其他worker执行  
    CELERY_DISABLE_RATE_LIMITS=True
)


@app.task
def add(x, y):
    return x + y

项目中需要的代码已经详细给出,可直接使用,爬虫代码,直接写就行了。

相关文章

网友评论

      本文标题:使用cerely教程加安装rabbitMQ

      本文链接:https://www.haomeiwen.com/subject/mlcswxtx.html