Commit 1e6ca3c4 authored by Douwe Maan's avatar Douwe Maan

Consistently schedule Sidekiq jobs

parent a5c3f1c8
...@@ -2,6 +2,7 @@ require 'carrierwave/orm/activerecord' ...@@ -2,6 +2,7 @@ require 'carrierwave/orm/activerecord'
class Group < Namespace class Group < Namespace
include Gitlab::ConfigHelper include Gitlab::ConfigHelper
include AfterCommitQueue
include AccessRequestable include AccessRequestable
include Avatarable include Avatarable
include Referable include Referable
......
...@@ -2,6 +2,7 @@ require 'digest/md5' ...@@ -2,6 +2,7 @@ require 'digest/md5'
class Key < ActiveRecord::Base class Key < ActiveRecord::Base
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
include AfterCommitQueue
include Sortable include Sortable
belongs_to :user belongs_to :user
......
class Member < ActiveRecord::Base class Member < ActiveRecord::Base
include AfterCommitQueue
include Sortable include Sortable
include Importable include Importable
include Expirable include Expirable
......
...@@ -211,7 +211,7 @@ class Service < ActiveRecord::Base ...@@ -211,7 +211,7 @@ class Service < ActiveRecord::Base
def async_execute(data) def async_execute(data)
return unless supported_events.include?(data[:object_kind]) return unless supported_events.include?(data[:object_kind])
Sidekiq::Client.enqueue(ProjectServiceWorker, id, data) ProjectServiceWorker.perform_async(id, data)
end end
def issue_tracker? def issue_tracker?
......
...@@ -7,6 +7,7 @@ class User < ActiveRecord::Base ...@@ -7,6 +7,7 @@ class User < ActiveRecord::Base
include Gitlab::ConfigHelper include Gitlab::ConfigHelper
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
include Gitlab::SQL::Pattern include Gitlab::SQL::Pattern
include AfterCommitQueue
include Avatarable include Avatarable
include Referable include Referable
include Sortable include Sortable
...@@ -903,6 +904,7 @@ class User < ActiveRecord::Base ...@@ -903,6 +904,7 @@ class User < ActiveRecord::Base
def post_destroy_hook def post_destroy_hook
log_info("User \"#{name}\" (#{email}) was removed") log_info("User \"#{name}\" (#{email}) was removed")
system_hook_service.execute_hooks_for(self, :destroy) system_hook_service.execute_hooks_for(self, :destroy)
end end
......
class SystemHooksService class SystemHooksService
def execute_hooks_for(model, event) def execute_hooks_for(model, event)
execute_hooks(build_event_data(model, event)) data = build_event_data(model, event)
model.run_after_commit_or_now do
SystemHooksService.new.execute_hooks(data)
end
end end
def execute_hooks(data, hooks_scope = :all) def execute_hooks(data, hooks_scope = :all)
......
...@@ -63,7 +63,7 @@ class WebHookService ...@@ -63,7 +63,7 @@ class WebHookService
end end
def async_execute def async_execute
Sidekiq::Client.enqueue(WebHookWorker, hook.id, data, hook_name) WebHookWorker.perform_async(hook.id, data, hook_name)
end end
private private
......
...@@ -16,11 +16,6 @@ class AuthorizedProjectsWorker ...@@ -16,11 +16,6 @@ class AuthorizedProjectsWorker
waiter.wait waiter.wait
end end
# Schedules multiple jobs to run in sidekiq without waiting for completion
def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
end
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so # Performs multiple jobs directly. Failed jobs will be put into sidekiq so
# they can benefit from retries # they can benefit from retries
def self.bulk_perform_inline(args_list) def self.bulk_perform_inline(args_list)
......
class BackgroundMigrationWorker class BackgroundMigrationWorker
include ApplicationWorker include ApplicationWorker
# Enqueues a number of jobs in bulk.
#
# The `jobs` argument should be an Array of Arrays, each sub-array must be in
# the form:
#
# [migration-class, [arg1, arg2, ...]]
def self.perform_bulk(jobs)
Sidekiq::Client.push_bulk('class' => self,
'queue' => sidekiq_options['queue'],
'args' => jobs)
end
# Schedules multiple jobs in bulk, with a delay.
#
def self.perform_bulk_in(delay, jobs)
now = Time.now.to_i
schedule = now + delay.to_i
if schedule <= now
raise ArgumentError, 'The schedule time must be in the future!'
end
Sidekiq::Client.push_bulk('class' => self,
'queue' => sidekiq_options['queue'],
'args' => jobs,
'at' => schedule)
end
# Performs the background migration. # Performs the background migration.
# #
# See Gitlab::BackgroundMigration.perform for more information. # See Gitlab::BackgroundMigration.perform for more information.
......
...@@ -21,5 +21,20 @@ module ApplicationWorker ...@@ -21,5 +21,20 @@ module ApplicationWorker
def queue def queue
get_sidekiq_options['queue'].to_s get_sidekiq_options['queue'].to_s
end end
def bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
def bulk_perform_in(delay, args_list)
now = Time.now.to_i
schedule = now + delay.to_i
if schedule <= now
raise ArgumentError, 'The schedule time must be in the future!'
end
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
end
end end
end end
...@@ -8,6 +8,6 @@ class ExpireBuildArtifactsWorker ...@@ -8,6 +8,6 @@ class ExpireBuildArtifactsWorker
build_ids = Ci::Build.with_expired_artifacts.pluck(:id) build_ids = Ci::Build.with_expired_artifacts.pluck(:id)
build_ids = build_ids.map { |build_id| [build_id] } build_ids = build_ids.map { |build_id| [build_id] }
Sidekiq::Client.push_bulk('class' => ExpireBuildInstanceArtifactsWorker, 'args' => build_ids ) ExpireBuildInstanceArtifactsWorker.bulk_perform_async(build_ids)
end end
end end
...@@ -8,10 +8,6 @@ class NamespacelessProjectDestroyWorker ...@@ -8,10 +8,6 @@ class NamespacelessProjectDestroyWorker
include ApplicationWorker include ApplicationWorker
include ExceptionBacktrace include ExceptionBacktrace
def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
end
def perform(project_id) def perform(project_id)
begin begin
project = Project.unscoped.find(project_id) project = Project.unscoped.find(project_id)
......
...@@ -13,20 +13,19 @@ module Sidekiq ...@@ -13,20 +13,19 @@ module Sidekiq
module ClassMethods module ClassMethods
module NoSchedulingFromTransactions module NoSchedulingFromTransactions
NESTING = ::Rails.env.test? ? 1 : 0
%i(perform_async perform_at perform_in).each do |name| %i(perform_async perform_at perform_in).each do |name|
define_method(name) do |*args| define_method(name) do |*args|
return super(*args) if Sidekiq::Worker.skip_transaction_check if !Sidekiq::Worker.skip_transaction_check && AfterCommitQueue.inside_transaction?
return super(*args) unless ActiveRecord::Base.connection.open_transactions > NESTING raise <<-MSG.strip_heredoc
`#{self}.#{name}` cannot be called inside a transaction as this can lead to
race conditions when the worker runs before the transaction is committed and
tries to access a model that has not been saved yet.
raise <<-MSG.strip_heredoc Use an `after_commit` hook, or include `AfterCommitQueue` and use a `run_after_commit` block instead.
`#{self}.#{name}` cannot be called inside a transaction as this can lead to MSG
race conditions when the worker runs before the transaction is committed and end
tries to access a model that has not been saved yet.
Use an `after_commit` hook, or include `AfterCommitQueue` and use a `run_after_commit` block instead. super(*args)
MSG
end end
end end
end end
......
...@@ -25,14 +25,14 @@ class ScheduleEventMigrations < ActiveRecord::Migration ...@@ -25,14 +25,14 @@ class ScheduleEventMigrations < ActiveRecord::Migration
# We push multiple jobs at a time to reduce the time spent in # We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we # Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range. # don't need to run additional queries for every range.
BackgroundMigrationWorker.perform_bulk(jobs) BackgroundMigrationWorker.bulk_perform_async(jobs)
jobs.clear jobs.clear
end end
jobs << ['MigrateEventsToPushEventPayloads', [min, max]] jobs << ['MigrateEventsToPushEventPayloads', [min, max]]
end end
BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty? BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty?
end end
def down def down
......
...@@ -19,7 +19,7 @@ class ScheduleCreateGpgKeySubkeysFromGpgKeys < ActiveRecord::Migration ...@@ -19,7 +19,7 @@ class ScheduleCreateGpgKeySubkeysFromGpgKeys < ActiveRecord::Migration
[MIGRATION, [id]] [MIGRATION, [id]]
end end
BackgroundMigrationWorker.perform_bulk(jobs) BackgroundMigrationWorker.bulk_perform_async(jobs)
end end
end end
......
...@@ -68,10 +68,10 @@ BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, a ...@@ -68,10 +68,10 @@ BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, a
``` ```
Usually it's better to enqueue jobs in bulk, for this you can use Usually it's better to enqueue jobs in bulk, for this you can use
`BackgroundMigrationWorker.perform_bulk`: `BackgroundMigrationWorker.bulk_perform_async`:
```ruby ```ruby
BackgroundMigrationWorker.perform_bulk( BackgroundMigrationWorker.bulk_perform_async(
[['BackgroundMigrationClassName', [1]], [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]] ['BackgroundMigrationClassName', [2]]]
) )
...@@ -85,13 +85,13 @@ updates. Removals in turn can be handled by simply defining foreign keys with ...@@ -85,13 +85,13 @@ updates. Removals in turn can be handled by simply defining foreign keys with
cascading deletes. cascading deletes.
If you would like to schedule jobs in bulk with a delay, you can use If you would like to schedule jobs in bulk with a delay, you can use
`BackgroundMigrationWorker.perform_bulk_in`: `BackgroundMigrationWorker.bulk_perform_in`:
```ruby ```ruby
jobs = [['BackgroundMigrationClassName', [1]], jobs = [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]] ['BackgroundMigrationClassName', [2]]]
BackgroundMigrationWorker.perform_bulk_in(5.minutes, jobs) BackgroundMigrationWorker.bulk_perform_in(5.minutes, jobs)
``` ```
## Cleaning Up ## Cleaning Up
...@@ -201,7 +201,7 @@ class ScheduleExtractServicesUrl < ActiveRecord::Migration ...@@ -201,7 +201,7 @@ class ScheduleExtractServicesUrl < ActiveRecord::Migration
['ExtractServicesUrl', [id]] ['ExtractServicesUrl', [id]]
end end
BackgroundMigrationWorker.perform_bulk(jobs) BackgroundMigrationWorker.bulk_perform_async(jobs)
end end
end end
......
...@@ -6,12 +6,34 @@ module AfterCommitQueue ...@@ -6,12 +6,34 @@ module AfterCommitQueue
after_rollback :_clear_after_commit_queue after_rollback :_clear_after_commit_queue
end end
def run_after_commit(method = nil, &block) def run_after_commit(&block)
_after_commit_queue << proc { self.send(method) } if method # rubocop:disable GitlabSecurity/PublicSend
_after_commit_queue << block if block _after_commit_queue << block if block
true
end
def run_after_commit_or_now(&block)
if AfterCommitQueue.inside_transaction?
run_after_commit(&block)
else
instance_eval(&block)
end
true true
end end
def self.open_transactions_baseline
if ::Rails.env.test?
return DatabaseCleaner.connections.count { |conn| conn.strategy.is_a?(DatabaseCleaner::ActiveRecord::Transaction) }
end
0
end
def self.inside_transaction?
ActiveRecord::Base.connection.open_transactions > open_transactions_baseline
end
protected protected
def _run_after_commit_queue def _run_after_commit_queue
......
...@@ -703,14 +703,14 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -703,14 +703,14 @@ into similar problems in the future (e.g. when new tables are created).
# We push multiple jobs at a time to reduce the time spent in # We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we # Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range. # don't need to run additional queries for every range.
BackgroundMigrationWorker.perform_bulk(jobs) BackgroundMigrationWorker.bulk_perform_async(jobs)
jobs.clear jobs.clear
end end
jobs << [job_class_name, [start_id, end_id]] jobs << [job_class_name, [start_id, end_id]]
end end
BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty? BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty?
end end
# Queues background migration jobs for an entire table, batched by ID range. # Queues background migration jobs for an entire table, batched by ID range.
......
...@@ -942,8 +942,8 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -942,8 +942,8 @@ describe Gitlab::Database::MigrationHelpers do
end end
it 'queues jobs in groups of buffer size 1' do it 'queues jobs in groups of buffer size 1' do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]]]) expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id3, id3]]]) expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end end
...@@ -960,8 +960,8 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -960,8 +960,8 @@ describe Gitlab::Database::MigrationHelpers do
end end
it 'queues jobs in bulk all at once (big buffer size)' do it 'queues jobs in bulk all at once (big buffer size)' do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]], expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]],
['FooJob', [id3, id3]]]) ['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end end
......
...@@ -146,7 +146,7 @@ describe WebHookService do ...@@ -146,7 +146,7 @@ describe WebHookService do
let(:system_hook) { create(:system_hook) } let(:system_hook) { create(:system_hook) }
it 'enqueue WebHookWorker' do it 'enqueue WebHookWorker' do
expect(Sidekiq::Client).to receive(:enqueue).with(WebHookWorker, project_hook.id, data, 'push_hooks') expect(WebHookWorker).to receive(:perform_async).with(project_hook.id, data, 'push_hooks')
described_class.new(project_hook, data, 'push_hooks').async_execute described_class.new(project_hook, data, 'push_hooks').async_execute
end end
......
...@@ -65,7 +65,6 @@ describe AuthorizedProjectsWorker do ...@@ -65,7 +65,6 @@ describe AuthorizedProjectsWorker do
args_list = build_args_list(project.owner.id) args_list = build_args_list(project.owner.id)
push_bulk_args = { push_bulk_args = {
'class' => described_class, 'class' => described_class,
'queue' => described_class.sidekiq_options['queue'],
'args' => args_list 'args' => args_list
} }
......
...@@ -10,35 +10,4 @@ describe BackgroundMigrationWorker, :sidekiq do ...@@ -10,35 +10,4 @@ describe BackgroundMigrationWorker, :sidekiq do
described_class.new.perform('Foo', [10, 20]) described_class.new.perform('Foo', [10, 20])
end end
end end
describe '.perform_bulk' do
it 'enqueues background migrations in bulk' do
Sidekiq::Testing.fake! do
described_class.perform_bulk([['Foo', [1]], ['Foo', [2]]])
expect(described_class.jobs.count).to eq 2
expect(described_class.jobs).to all(include('enqueued_at'))
end
end
end
describe '.perform_bulk_in' do
context 'when delay is valid' do
it 'correctly schedules background migrations' do
Sidekiq::Testing.fake! do
described_class.perform_bulk_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
expect(described_class.jobs.count).to eq 2
expect(described_class.jobs).to all(include('at'))
end
end
end
context 'when delay is invalid' do
it 'raises an ArgumentError exception' do
expect { described_class.perform_bulk_in(-60, [['Foo']]) }
.to raise_error(ArgumentError)
end
end
end
end end
...@@ -24,4 +24,35 @@ describe ApplicationWorker do ...@@ -24,4 +24,35 @@ describe ApplicationWorker do
expect(worker.queue).to eq('some_queue') expect(worker.queue).to eq('some_queue')
end end
end end
describe '.bulk_perform_async' do
it 'enqueues jobs in bulk' do
Sidekiq::Testing.fake! do
worker.bulk_perform_async([['Foo', [1]], ['Foo', [2]]])
expect(worker.jobs.count).to eq 2
expect(worker.jobs).to all(include('enqueued_at'))
end
end
end
describe '.bulk_perform_in' do
context 'when delay is valid' do
it 'correctly schedules jobs' do
Sidekiq::Testing.fake! do
worker.bulk_perform_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
expect(worker.jobs.count).to eq 2
expect(worker.jobs).to all(include('at'))
end
end
end
context 'when delay is invalid' do
it 'raises an ArgumentError exception' do
expect { worker.bulk_perform_in(-60, [['Foo']]) }
.to raise_error(ArgumentError)
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment