美文网首页
sidekiq源码分析一

sidekiq源码分析一

作者: will2yang | 来源:发表于2019-11-02 20:31 被阅读0次

    一切从includeSidekiq::Worker开始

    class SimpleWorker
      include Sidekiq::Worker
    
      def perform(*args)
        # some code
      end
    end
    
    SimpleWorker.perform_async
    

    一个最为简单的worker。include了Sidekiq::Worker模块,被调用了perform_async的类方法,异步执行了worker代码。

    module Sidekiq
      module Worker
        def self.included(base)
          raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
    
          base.include(Options)
          base.extend(ClassMethods)
        end
      end
    end
    

    看到被覆盖的钩子方法,不仅仅include了Worker模块还包含了Options模块,以及extends了ClassMethods,我们先聚焦到perform_async方法。

    def perform_async(*args)
      client_push("class" => self, "args" => args)
    end
    
    def client_push(item) # :nodoc:
      pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
      # stringify
      item.keys.each do |key|
        item[key.to_s] = item.delete(key)
      end
    
      Sidekiq::Client.new(pool).push(item)
    end
    

    perform_async主要的作用就是把worker和args push到pool里。而Worker还提供了在一段时间之后执行或者某个时间点执行相应任务的方法perform_in和perform_at。虽然名字不同但其实这两者是一个方法。

    def perform_in(interval, *args)
      int = interval.to_f
      now = Time.now.to_f
      ts = (int < 1_000_000_000 ? now + int : int)
    
      payload = @opts.merge("class" => @klass, "args" => args)
      # Optimization to enqueue something now that is scheduled to go out now or in the past
      payload["at"] = ts if ts > now
      @klass.client_push(payload)
    end
    alias_method :perform_at, :perform_in
    

    根据interval的时间不同分别实现两种不同的调用方式,并且最后都是调用了push方法。
    然后我们看一下push方法里的内容:

    def push(item)
      normed = normalize_item(item)
      payload = process_single(item["class"], normed)
    
      if payload
        raw_push([payload])
        payload["jid"]
      end
    end
    

    normalize_item主要是验证item参数,并加入created_at和jid, 然后执行raw_push。

    def raw_push(payloads)
      @redis_pool.with do |conn|
        conn.multi do
          atomic_push(conn, payloads)
        end
      end
      true
    end
    
    def atomic_push(conn, payloads)
      if payloads.first.key?("at")
        conn.zadd("schedule", payloads.map { |hash|
          at = hash.delete("at").to_s
          [at, Sidekiq.dump_json(hash)]
        })
      else
        queue = payloads.first["queue"]
        now = Time.now.to_f
        to_push = payloads.map { |entry|
          entry["enqueued_at"] = now
          Sidekiq.dump_json(entry)
        }
        conn.sadd("queues", queue)
        conn.lpush("queue:#{queue}", to_push)
      end
    end
    

    向redis里加入数据。在raw_push里用connection_pool管理了redis的连接池, with方法会在得到一个有用的连接前阻塞代码,直到timeout跑错。conn执行multi可以原子级的运行一段代码。
    atomic_push 根据payloads是否传递了"at"参数,如果传递了,那么就将内容插入schedule的有序集合里。反之,则将内容插入到相应的list里。

    相关文章

      网友评论

          本文标题:sidekiq源码分析一

          本文链接:https://www.haomeiwen.com/subject/bkmxbctx.html