一切从includeSidekiq::Worker开始
class SimpleWorker
include Sidekiq::Worker
def perform(*args)
# some code
end
end
SimpleWorker.perform_async
一个最为简单的worker。include了Sidekiq::Worker模块,被调用了perform_async的类方法,异步执行了worker代码。
module Sidekiq
module Worker
def self.included(base)
raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
base.include(Options)
base.extend(ClassMethods)
end
end
end
看到被覆盖的钩子方法,不仅仅include了Worker模块还包含了Options模块,以及extends了ClassMethods,我们先聚焦到perform_async方法。
def perform_async(*args)
client_push("class" => self, "args" => args)
end
def client_push(item) # :nodoc:
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
# stringify
item.keys.each do |key|
item[key.to_s] = item.delete(key)
end
Sidekiq::Client.new(pool).push(item)
end
perform_async主要的作用就是把worker和args push到pool里。而Worker还提供了在一段时间之后执行或者某个时间点执行相应任务的方法perform_in和perform_at。虽然名字不同但其实这两者是一个方法。
def perform_in(interval, *args)
int = interval.to_f
now = Time.now.to_f
ts = (int < 1_000_000_000 ? now + int : int)
payload = @opts.merge("class" => @klass, "args" => args)
# Optimization to enqueue something now that is scheduled to go out now or in the past
payload["at"] = ts if ts > now
@klass.client_push(payload)
end
alias_method :perform_at, :perform_in
根据interval的时间不同分别实现两种不同的调用方式,并且最后都是调用了push方法。
然后我们看一下push方法里的内容:
def push(item)
normed = normalize_item(item)
payload = process_single(item["class"], normed)
if payload
raw_push([payload])
payload["jid"]
end
end
normalize_item主要是验证item参数,并加入created_at和jid, 然后执行raw_push。
def raw_push(payloads)
@redis_pool.with do |conn|
conn.multi do
atomic_push(conn, payloads)
end
end
true
end
def atomic_push(conn, payloads)
if payloads.first.key?("at")
conn.zadd("schedule", payloads.map { |hash|
at = hash.delete("at").to_s
[at, Sidekiq.dump_json(hash)]
})
else
queue = payloads.first["queue"]
now = Time.now.to_f
to_push = payloads.map { |entry|
entry["enqueued_at"] = now
Sidekiq.dump_json(entry)
}
conn.sadd("queues", queue)
conn.lpush("queue:#{queue}", to_push)
end
end
向redis里加入数据。在raw_push里用connection_pool管理了redis的连接池, with方法会在得到一个有用的连接前阻塞代码,直到timeout跑错。conn执行multi可以原子级的运行一段代码。
atomic_push 根据payloads是否传递了"at"参数,如果传递了,那么就将内容插入schedule的有序集合里。反之,则将内容插入到相应的list里。
网友评论