Class | Delayed::Backend::ActiveRecord::Job |
In: |
lib/delayed/backend/active_record.rb
|
Parent: | ::ActiveRecord::Base |
A job object that is persisted to the database. Contains the work object as a YAML field.
# File lib/delayed/backend/active_record.rb, line 37 37: def self.after_fork 38: ::ActiveRecord::Base.establish_connection 39: end
# File lib/delayed/backend/active_record.rb, line 33 33: def self.before_fork 34: ::ActiveRecord::Base.clear_all_connections! 35: end
When a worker is exiting, make sure we don‘t have any locked jobs.
# File lib/delayed/backend/active_record.rb, line 42 42: def self.clear_locks!(worker_name) 43: update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) 44: end
Get the current time (GMT or local depending on DB) Note: This does not ping the DB to get the time, so all your clients must have syncronized clocks.
# File lib/delayed/backend/active_record.rb, line 84 84: def self.db_time_now 85: if Time.zone 86: Time.zone.now 87: elsif ::ActiveRecord::Base.default_timezone == :utc 88: Time.now.utc 89: else 90: Time.now 91: end 92: end
Find a few candidate jobs to run (in case some immediately get locked by others).
# File lib/delayed/backend/active_record.rb, line 47 47: def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) 48: scope = self.ready_to_run(worker_name, max_run_time) 49: scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority 50: scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority 51: scope = scope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any? 52: 53: ::ActiveRecord::Base.silence do 54: scope.by_priority.all(:limit => limit) 55: end 56: end
# File lib/delayed/backend/active_record.rb, line 17 17: def self.rails3? 18: ::ActiveRecord::VERSION::MAJOR == 3 19: end
Lock this job for this worker. Returns true if we have the lock, false otherwise.
# File lib/delayed/backend/active_record.rb, line 60 60: def lock_exclusively!(max_run_time, worker) 61: now = self.class.db_time_now 62: affected_rows = if locked_by != worker 63: # We don't own this job so we will update the locked_by name and the locked_at 64: self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) 65: else 66: # We already own this job, this may happen if the job queue crashes. 67: # Simply resume and update the locked_at 68: self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) 69: end 70: if affected_rows == 1 71: self.locked_at = now 72: self.locked_by = worker 73: self.locked_at_will_change! 74: self.locked_by_will_change! 75: return true 76: else 77: return false 78: end 79: end