celery 定时任务实现

作者: 王兵 | 来源:发表于2017-08-18 16:46 被阅读470次

    Celery 是什么?

    • 异步任务队列工具,主要解决 realtime 事件的异步操作,但也支持定时任务。
    • 什么是异步?那要先理解什么是同步,比如我去麦当劳吃饭,如果麦当劳前一个顾客点完单,拿到餐,吃完走人之后才能接待下一个顾客,就是同步。反过来我点完单,它马上就接下一个客人的单,我的流程虽然还没有走完(time-consuming),但也不影响下一个顾客点单(blocked),这就是异步。

    Celery 安装

    • pip install celery,由于 celery 自己并不带队列存储,所以根据官方推荐,还需要安装 RabbitMQ 或者 Redis 来存储队列。方便起见,本文用 Redis。

    Celery 机制

    1. celery 是一个装饰器类,本质上是把一个函数变成一个可以异步调用的函数。
    2. 开启 celery 进程。
    3. 在另一段程序中导入这个函数,当这个函数被以 delay 模式调用时:xxx.delay(),celery 会把这个任务写到任务队列然后返回,原程序不会被阻塞可以往下跑。注意这里需要用 delay 模式去跑这个函数,同时注意不要把 xxx.delay() 的结果赋值给一个变量,否则依然会被阻塞。
    4. celery 的另一个进程会去这个任务队列里取任务,完成之后写到 result 队列里面。或者多数情况下,这个被异步调用的函数不需要返回结果,比如发送一个邮件,提醒之类的,连 result 队列都不用。

    Celery first blood

    1. 开一个 python 脚本,比如tasks.py

    2. 生成一个 Celery 对象实例,是一个装饰器。

      from celery import Celery
      app = Celery('__name__', broker='redis://localhost:6379') 
      
    3. 定义一个你想异步操作的函数,并加上 celery 装饰器 @app.task

      @app.task
      def add(x, y):
          return x + y
      
    4. 保存,退出,然后在 terminal 启动 celery 服务。

      celery -A tasks worker --loglevel=info
      

    做个定时任务:每天发问候

    1. 接下来,我要搭配钉钉机器人了,我希望小仙女每天早上7点给我发个问候,然后在7点半的时候确认我有没有开始干活了。

    2. 先写一个简单的钉钉提醒程序,命名celery_worker.py

      #! /usr/bin/env python
      # coding: utf-8
      
      import requests
      import json
      import time
      
      from config import HOST_IP, NOTIFY_URL, MOBILE_NUMBER
      
      def notify_dingding(msg):
          headers = {"Content-Type": "application/json; charset=utf-8"}
          post_data = {
              "msgtype": "text",
              "text": {
                  "content": msg
              },
              "at": {
                  "atMobiles": [MOBILE_NUMBER]
              }
          }
      
          r = requests.post(NOTIFY_URL, headers=headers, data=json.dumps(post_data))
          print(r.content)
      

      注意这里从 config 里导入了一些参数,所以要在这个程序的同一层写一个 config.py 的配置文件。

      # config.py
      NOTIFY_URL = ("https://oapi.dingtalk.com/robot/send?access_token="
          "c6d5a2936381dfc29394f3c336bea5fad962d90ffd31809e92d95a1xxxxxxxx")
      MOBILE_NUMBER = "176xxxxx619"
      HOST_IP = "127.0.0.1"
      
    3. 导入 celery 包,给函数加上装饰器:

      from celery import Celery
      
      BROKER_URI = 'redis://%s:6379/6' % HOST_IP
      BACKEND_URI = 'redis://%s:6379/5' % HOST_IP
      
      worker = Celery('celery_worker', broker=BROKER_URI, backend=BACKEND_URI)
      
      @worker.task
      def notify_dingding(msg):
          ...
      
    4. 简单学习一下celery 的 crontab 定时任务

    5. 给 worker 加上定时任务

      from celery.schedules import crontab
      
      worker.conf.update(
          timezone='Asia/Shanghai',
          enable_utc=True,
          beat_schedule={
              "morning_msg_1": {
                  "task": "celery_worker.notify_dingding",
                  "schedule": crontab(minute=0, hour=7),
                  "args": ("早,起床了哟,先去做个早饭吧",)
              },
              "morning_msg_2": {
                  "task": "celery_worker.notify_dingding",
                  "schedule": crontab(minute=30, hour=7),
                  "args": ("我就问问你在干活咩?",)
              }
          }
      )
      
    6. 最后程序末尾加一个小测试,看看服务是不是起来了:

      notify_dingding("小仙女上线啦")
      
    7. 开启 redis-server

      nohup redis-server &
      
    8. 开启我们的 celery worker,这里的-B 是 celery 的 beat 服务,可以理解为一个周期任务。

      celery -A celery_worker worker -B
      
    9. 服务起来喽:

    QQ20170818-163110.jpg QQ20170818-162958.jpg

    相关文章

      网友评论

      本文标题:celery 定时任务实现

      本文链接:https://www.haomeiwen.com/subject/mlmarxtx.html