Commit 7403ea37 authored by Sean McGivern's avatar Sean McGivern

Merge branch 'dm-waitable-worker' into 'master'

Extract WaitableWorker out of AuthorizedProjectsWorker

See merge request gitlab-org/gitlab-ce!17356
parents ce6974eb 56af2dbe
class AuthorizedProjectsWorker class AuthorizedProjectsWorker
include ApplicationWorker include ApplicationWorker
prepend WaitableWorker
# Schedules multiple jobs and waits for them to be completed. def perform(user_id)
def self.bulk_perform_and_wait(args_list)
# Short-circuit: it's more efficient to do small numbers of jobs inline
return bulk_perform_inline(args_list) if args_list.size <= 3
waiter = Gitlab::JobWaiter.new(args_list.size)
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
# into [[1, "key"], [2, "key"], [3, "key"]]
waiting_args_list = args_list.map { |args| [*args, waiter.key] }
bulk_perform_async(waiting_args_list)
waiter.wait
end
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so
# they can benefit from retries
def self.bulk_perform_inline(args_list)
failed = []
args_list.each do |args|
begin
new.perform(*args)
rescue
failed << args
end
end
bulk_perform_async(failed) if failed.present?
end
def perform(user_id, notify_key = nil)
user = User.find_by(id: user_id) user = User.find_by(id: user_id)
user&.refresh_authorized_projects user&.refresh_authorized_projects
ensure
Gitlab::JobWaiter.notify(notify_key, jid) if notify_key
end end
end end
module WaitableWorker
extend ActiveSupport::Concern
module ClassMethods
# Schedules multiple jobs and waits for them to be completed.
def bulk_perform_and_wait(args_list, timeout: 10)
# Short-circuit: it's more efficient to do small numbers of jobs inline
return bulk_perform_inline(args_list) if args_list.size <= 3
waiter = Gitlab::JobWaiter.new(args_list.size)
# Point all the bulk jobs at the same JobWaiter. Converts, [[1], [2], [3]]
# into [[1, "key"], [2, "key"], [3, "key"]]
waiting_args_list = args_list.map { |args| [*args, waiter.key] }
bulk_perform_async(waiting_args_list)
waiter.wait(timeout)
end
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so
# they can benefit from retries
def bulk_perform_inline(args_list)
failed = []
args_list.each do |args|
begin
new.perform(*args)
rescue
failed << args
end
end
bulk_perform_async(failed) if failed.present?
end
end
def perform(*args)
notify_key = args.pop if Gitlab::JobWaiter.key?(args.last)
super(*args)
ensure
Gitlab::JobWaiter.notify(notify_key, jid) if notify_key
end
end
...@@ -15,16 +15,22 @@ module Gitlab ...@@ -15,16 +15,22 @@ module Gitlab
# push to that array when done. Once the waiter has popped `count` items, it # push to that array when done. Once the waiter has popped `count` items, it
# knows all the jobs are done. # knows all the jobs are done.
class JobWaiter class JobWaiter
KEY_PREFIX = "gitlab:job_waiter".freeze
def self.notify(key, jid) def self.notify(key, jid)
Gitlab::Redis::SharedState.with { |redis| redis.lpush(key, jid) } Gitlab::Redis::SharedState.with { |redis| redis.lpush(key, jid) }
end end
def self.key?(key)
key.is_a?(String) && key =~ /\A#{KEY_PREFIX}:\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/
end
attr_reader :key, :finished attr_reader :key, :finished
attr_accessor :jobs_remaining attr_accessor :jobs_remaining
# jobs_remaining - the number of jobs left to wait for # jobs_remaining - the number of jobs left to wait for
# key - The key of this waiter. # key - The key of this waiter.
def initialize(jobs_remaining = 0, key = "gitlab:job_waiter:#{SecureRandom.uuid}") def initialize(jobs_remaining = 0, key = "#{KEY_PREFIX}:#{SecureRandom.uuid}")
@key = key @key = key
@jobs_remaining = jobs_remaining @jobs_remaining = jobs_remaining
@finished = [] @finished = []
......
require 'spec_helper' require 'spec_helper'
describe AuthorizedProjectsWorker do describe AuthorizedProjectsWorker do
let(:project) { create(:project) }
def build_args_list(*ids, multiply: 1)
args_list = ids.map { |id| [id] }
args_list * multiply
end
describe '.bulk_perform_and_wait' do
it 'schedules the ids and waits for the jobs to complete' do
args_list = build_args_list(project.owner.id)
project.owner.project_authorizations.delete_all
described_class.bulk_perform_and_wait(args_list)
expect(project.owner.project_authorizations.count).to eq(1)
end
it 'inlines workloads <= 3 jobs' do
args_list = build_args_list(project.owner.id, multiply: 3)
expect(described_class).to receive(:bulk_perform_inline).with(args_list)
described_class.bulk_perform_and_wait(args_list)
end
it 'runs > 3 jobs using sidekiq' do
project.owner.project_authorizations.delete_all
expect(described_class).to receive(:bulk_perform_async).and_call_original
args_list = build_args_list(project.owner.id, multiply: 4)
described_class.bulk_perform_and_wait(args_list)
expect(project.owner.project_authorizations.count).to eq(1)
end
end
describe '.bulk_perform_inline' do
it 'refreshes the authorizations inline' do
project.owner.project_authorizations.delete_all
expect_any_instance_of(described_class).to receive(:perform).and_call_original
described_class.bulk_perform_inline(build_args_list(project.owner.id))
expect(project.owner.project_authorizations.count).to eq(1)
end
it 'enqueues jobs if an error is raised' do
invalid_id = -1
args_list = build_args_list(project.owner.id, invalid_id)
allow_any_instance_of(described_class).to receive(:perform).with(project.owner.id)
allow_any_instance_of(described_class).to receive(:perform).with(invalid_id).and_raise(ArgumentError)
expect(described_class).to receive(:bulk_perform_async).with(build_args_list(invalid_id))
described_class.bulk_perform_inline(args_list)
end
end
describe '.bulk_perform_async' do
it "uses it's respective sidekiq queue" do
args_list = build_args_list(project.owner.id)
push_bulk_args = {
'class' => described_class,
'args' => args_list
}
expect(Sidekiq::Client).to receive(:push_bulk).with(push_bulk_args).once
described_class.bulk_perform_async(args_list)
end
end
describe '#perform' do describe '#perform' do
let(:user) { create(:user) } let(:user) { create(:user) }
...@@ -85,12 +12,6 @@ describe AuthorizedProjectsWorker do ...@@ -85,12 +12,6 @@ describe AuthorizedProjectsWorker do
job.perform(user.id) job.perform(user.id)
end end
it 'notifies the JobWaiter when done if the key is provided' do
expect(Gitlab::JobWaiter).to receive(:notify).with('notify-key', job.jid)
job.perform(user.id, 'notify-key')
end
context "when the user is not found" do context "when the user is not found" do
it "does nothing" do it "does nothing" do
expect_any_instance_of(User).not_to receive(:refresh_authorized_projects) expect_any_instance_of(User).not_to receive(:refresh_authorized_projects)
......
require 'spec_helper'
describe WaitableWorker do
let(:worker) do
Class.new do
def self.name
'Gitlab::Foo::Bar::DummyWorker'
end
class << self
cattr_accessor(:counter) { 0 }
end
include ApplicationWorker
prepend WaitableWorker
def perform(i = 0)
self.class.counter += i
end
end
end
subject(:job) { worker.new }
describe '.bulk_perform_and_wait' do
it 'schedules the jobs and waits for them to complete' do
worker.bulk_perform_and_wait([[1], [2]])
expect(worker.counter).to eq(3)
end
it 'inlines workloads <= 3 jobs' do
args_list = [[1], [2], [3]]
expect(worker).to receive(:bulk_perform_inline).with(args_list).and_call_original
worker.bulk_perform_and_wait(args_list)
expect(worker.counter).to eq(6)
end
it 'runs > 3 jobs using sidekiq' do
expect(worker).to receive(:bulk_perform_async)
worker.bulk_perform_and_wait([[1], [2], [3], [4]])
end
end
describe '.bulk_perform_inline' do
it 'runs the jobs inline' do
expect(worker).not_to receive(:bulk_perform_async)
worker.bulk_perform_inline([[1], [2]])
expect(worker.counter).to eq(3)
end
it 'enqueues jobs if an error is raised' do
expect(worker).to receive(:bulk_perform_async).with([['foo']])
worker.bulk_perform_inline([[1], ['foo']])
end
end
describe '#perform' do
shared_examples 'perform' do
it 'notifies the JobWaiter when done if the key is provided' do
key = Gitlab::JobWaiter.new.key
expect(Gitlab::JobWaiter).to receive(:notify).with(key, job.jid)
job.perform(*args, key)
end
it 'does not notify the JobWaiter when done if no key is provided' do
expect(Gitlab::JobWaiter).not_to receive(:notify)
job.perform(*args)
end
end
context 'when the worker takes arguments' do
let(:args) { [1] }
it_behaves_like 'perform'
end
context 'when the worker takes no arguments' do
let(:args) { [] }
it_behaves_like 'perform'
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment