美文网首页
如何在开启多个worker时,使某一个特定类型的task对应的w

如何在开启多个worker时,使某一个特定类型的task对应的w

作者: 好小葱1 | 来源:发表于2018-12-06 17:07 被阅读14次

主要参考回答:https://stackoverflow.com/questions/12003221/celery-task-schedule-ensuring-a-task-is-only-executed-one-at-a-time

  • 第一种把所有的进程数--concurrency都设置成1,太过笨重,不太灵活,建议灵活加锁。
  • celery官方文档上有一段利用“上下文管理器”实现的🔐,可以参考一下:
from celery import task
from celery.five import monotonic
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed

logger = get_task_logger(__name__)

LOCK_EXPIRE = 60 * 10  # Lock expires in 10 minutes

@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)

@task(bind=True)
def import_feed(self, feed_url):
    # The cache key consists of the task name and the MD5 digest
    # of the feed URL.
    feed_url_hexdigest = md5(feed_url).hexdigest()
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
    logger.debug('Importing feed: %s', feed_url)
    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            return Feed.objects.import_feed(feed_url).url
    logger.debug(
        'Feed %s is already being imported by another worker', feed_url)

相关文章

  • 如何在开启多个worker时,使某一个特定类型的task对应的w

    主要参考回答:https://stackoverflow.com/questions/12003221/celer...

  • 2019-03-16 job、stage、task

    job、stage、task Worker Node:物理节点,上面执行executor进程 Executor:W...

  • lab1 mapreduce

    抽象理解:master维护worker,task两个队列,表示空闲的worker,等待执行的task。对task进...

  • Go Array

    数组是由固定长度特定类型元素组成的序列 数组可由零个或多个元素组成 Go语言中数组是固定长度且特定类型的,与之对应...

  • 轻松推到正则表达式(四)

    需求 规则 @多个字符空格 多个字符对应 @\w+ \s \w+所以对应的 正则可以写...

  • Celery笔记

    Celery 预取机制 Celery 默认启动预取机制,即如果有多个worker,会平均分配给多个worker,如...

  • 并发模型

    Master-Worker 模式 Task.class Master.class Worker.class MyW...

  • 0x00

    1,分析可以恶意代码,目标: 确定某一个特定的可以二进制程序到底可以做什么(what), 如何在网络上检测出它(w...

  • Spark (how spark work internally

    What is a task in Spark? How does the Spark worker execut...

  • Storm基本概念

    核心概念 Topology Nimbus Supervisor Worker Executor Task Spou...

网友评论

      本文标题:如何在开启多个worker时,使某一个特定类型的task对应的w

      本文链接:https://www.haomeiwen.com/subject/axgocqtx.html