z, ? | toggle help (this) |
space, → | next slide |
shift-space, ← | previous slide |
d | toggle debug mode |
## <ret> | go to slide # |
c, t | table of contents (vi) |
f | toggle footer |
r | reload slides |
n | toggle notes |
p | run preshow |
class SendHeavyReportToUser
def self.perform(user)
user.send_heavy_report
end
end
User.find_somewhere.
delay.
send_heavy_report(*args)
But when we say "asyncronously" we want to controll this process:
class SendHeavyReportToUser
@queue = :low
@retry_limit = 3
@retry_delay = 60 #seconds
end
user.delay(
:priority => 5,
:queue => "low",
:attempts => 7)
Delayed::Job.all
failed_jobs = Delayed::Job.
where(:queue => "toxic").
where("last_failed_at is not null")
if failed_jobs.any?
Mail.new(
:subject => failed_jobs.count.to_s +
"in enterprise queue now",
:body => "..."
).deliver
end
I've spend 20 minutes on building this feature
starting from zero knowledge about DJ
Because DJ was originally designed with using relation database features
class User < AR::Base
after_create :fetch_linkedin_profile
def fetch_linkedin_profile
Resque.enqueue(
FetchLinkedinProfile, id
)
end
end
class FetchLinkedinProfile
def self.perform(user_id)
user = User.find(user_id)
end
end
require 'ar_after_transaction'
require 'resque'
Resque.class_eval do
class << self
alias_method
:enqueue_without_transaction,
:enqueue
def enqueue(*args)
ActiveRecord::Base.
after_transaction do
enqueue_without_transaction(*args)
end
end
end
end
Resque.redis.client.logger = Rails.logger
Resque.redis.client.logger =
Logger.new(STDOUT)
6.times do
Resque.enqueue(UpdateMetrics, rand(10))
end
# SADD resque:queues low
# 0.28ms
# RPUSH resque:queue:low
# {"class":"UpdateMetrics","args":[3]}
# 0.50ms
Resque.redis.lrange("queue:low", 0, 10).
map { |job| JSON.parse(job) }
# LRANGE resque:queue:low 0 10
# 0.32ms
# => [{"class"=>"UpdateMetrics", "args"=>[5]},
# {"class"=>"UpdateMetrics", "args"=>[9]},
# {"class"=>"UpdateMetrics", "args"=>[3]},
# {"class"=>"UpdateMetrics", "args"=>[3]},
# {"class"=>"UpdateMetrics", "args"=>[9]},
# {"class"=>"UpdateMetrics", "args"=>[3]}]
Resque.enqueue(
CreditCardValidator,
user_id,
"4123-5682-3821-1111", {...}
)
CreditCardValidator.
in_progress?(user_id) # => true
CreditCardValidator.
status(user_id) # => nil
sleep(5)
CreditCardValidator.
in_progress?(user_id) # => false
CreditCardValidator.
status(user_id)
# => {:success => false,
# :errors =>
# ["Credit expire date not given"]}
class CreditCardValidator < StatusedWorker
def self.job_identity_arguments(
user_id, number, attrs)
[user_id]
end
end
# RESQUE api change:
# define perform at instance level
def perform(user_id, number, , attrs)
result = validate_card(number, attrs)
set_status(
:success => result.success?,
:errors => result.errors
)
end