Commit 01b394f0 authored by Bob Van Landuyt's avatar Bob Van Landuyt Committed by Thong Kuah

Allow scheduling jobs in batch with contexts

This adds helper methods for the `bulk_perform_in` and
`bulk_perform_async` methods that allow specifying a context per job.

Scheduling jobs with contexts that need to be different based on the
arguments passed can be done by passing in an array of objects, and
defining how to build arguments and contexts for those objects.

For example:
      ProjectImportScheduleWorker.bulk_perform_async_with_contexts(
        projects,
        arguments_proc: -> (project) { project.id },
        context_proc: -> (project) { { project: project } }
      )

This will schedule `ProjectImportScheduleWorker` jobs called with the
project.id. And will create contexts using the project. The developers
need to make sure that the required resources for building a context
are loaded: The route and the namespace of a project.

The batch context will be stored on the worker class when scheduling
the job, so the sidekiq middleware can fetch the right context based
on those arguments and apply them to the job when scheduling it.

This also implements this method for the `UpdateAllMirrorsWorker`
which schedules `ProjectImportScheduleWorker` in batch. When adding
this, the `ProjectImportScheduleWorker`-jobs will have a context.
parent b88f2bec
......@@ -12,8 +12,46 @@ module WorkerContext
@worker_context || superclass_context
end
def bulk_perform_async_with_contexts(objects, arguments_proc:, context_proc:)
with_batch_contexts(objects, arguments_proc, context_proc) do |arguments|
bulk_perform_async(arguments)
end
end
def bulk_perform_in_with_contexts(delay, objects, arguments_proc:, context_proc:)
with_batch_contexts(objects, arguments_proc, context_proc) do |arguments|
bulk_perform_in(delay, arguments)
end
end
def context_for_arguments(args)
batch_context&.context_for(args)
end
private
BATCH_CONTEXT_KEY = "#{name}_batch_context"
def batch_context
Thread.current[BATCH_CONTEXT_KEY]
end
def batch_context=(value)
Thread.current[BATCH_CONTEXT_KEY] = value
end
def with_batch_contexts(objects, arguments_proc, context_proc)
self.batch_context = Gitlab::BatchWorkerContext.new(
objects,
arguments_proc: arguments_proc,
context_proc: context_proc
)
yield(batch_context.arguments)
ensure
self.batch_context = nil
end
def superclass_context
return unless superclass.include?(WorkerContext)
......
......@@ -47,11 +47,12 @@ class UpdateAllMirrorsWorker
projects = pull_mirrors_batch(freeze_at: now, batch_size: batch_size, offset_at: last).to_a
break if projects.empty?
project_ids = projects.lazy.select(&:mirror?).take(capacity).map(&:id).force
capacity -= project_ids.length
projects_to_schedule = projects.lazy.select(&:mirror?).take(capacity).force
capacity -= projects_to_schedule.size
ProjectImportScheduleWorker.bulk_perform_async(project_ids.map { |id| [id] })
scheduled += project_ids.length
schedule_projects_in_batch(projects_to_schedule)
scheduled += projects_to_schedule.length
# If fewer than `batch_size` projects were returned, we don't need to query again
break if projects.length < batch_size
......@@ -95,6 +96,7 @@ class UpdateAllMirrorsWorker
.mirrors_to_sync(freeze_at)
.reorder('import_state.next_execution_timestamp')
.limit(batch_size)
.with_route
.with_namespace # Used by `project.mirror?`
relation = relation.where('import_state.next_execution_timestamp > ?', offset_at) if offset_at
......@@ -102,4 +104,12 @@ class UpdateAllMirrorsWorker
relation
end
# rubocop: enable CodeReuse/ActiveRecord
def schedule_projects_in_batch(projects)
ProjectImportScheduleWorker.bulk_perform_async_with_contexts(
projects,
arguments_proc: -> (project) { project.id },
context_proc: -> (project) { { project: project } }
)
end
end
# frozen_string_literal: true
module Gitlab
class BatchWorkerContext
def initialize(objects, arguments_proc:, context_proc:)
@objects = objects
@arguments_proc = arguments_proc
@context_proc = context_proc
end
def arguments
context_by_arguments.keys
end
def context_for(arguments)
context_by_arguments[arguments]
end
private
attr_reader :objects, :arguments_proc, :context_proc
def context_by_arguments
@context_by_arguments ||= objects.each_with_object({}) do |object, result|
arguments = Array.wrap(arguments_proc.call(object))
context = Gitlab::ApplicationContext.new(context_proc.call(object))
result[arguments] = context
end
end
end
end
......@@ -29,6 +29,7 @@ module Gitlab
lambda do |chain|
chain.add Gitlab::SidekiqStatus::ClientMiddleware
chain.add Gitlab::SidekiqMiddleware::ClientMetrics
chain.add Gitlab::SidekiqMiddleware::WorkerContext::Client # needs to be before the Labkit middleware
chain.add Labkit::Middleware::Sidekiq::Client
end
end
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module WorkerContext
private
def wrap_in_optional_context(context_or_nil, &block)
return yield unless context_or_nil
context_or_nil.use(&block)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module WorkerContext
class Client
include Gitlab::SidekiqMiddleware::WorkerContext
def call(worker_class_or_name, job, _queue, _redis_pool, &block)
worker_class = worker_class_or_name.to_s.safe_constantize
# Mailers can't be constantized like this
return yield unless worker_class
return yield unless worker_class.include?(::ApplicationWorker)
context_for_args = worker_class.context_for_arguments(job['args'])
wrap_in_optional_context(context_for_args, &block)
end
end
end
end
end
......@@ -4,6 +4,8 @@ module Gitlab
module SidekiqMiddleware
module WorkerContext
class Server
include Gitlab::SidekiqMiddleware::WorkerContext
def call(worker, job, _queue, &block)
worker_class = worker.class
......@@ -13,14 +15,6 @@ module Gitlab
# Use the context defined on the class level as a base context
wrap_in_optional_context(worker_class.get_worker_context, &block)
end
private
def wrap_in_optional_context(context, &block)
return yield unless context
context.use(&block)
end
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::BatchWorkerContext do
subject(:batch_context) do
described_class.new(
%w(hello world),
arguments_proc: -> (word) { word },
context_proc: -> (word) { { user: build_stubbed(:user, username: word) } }
)
end
describe "#arguments" do
it "returns all the expected arguments in arrays" do
expect(batch_context.arguments).to eq([%w(hello), %w(world)])
end
end
describe "#context_for" do
it "returns the correct application context for the arguments" do
context = batch_context.context_for(%w(world))
expect(context).to be_a(Gitlab::ApplicationContext)
expect(context.to_lazy_hash[:user].call).to eq("world")
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::WorkerContext::Client do
let(:worker_class) do
Class.new do
def self.name
'TestWithContextWorker'
end
include ApplicationWorker
def self.job_for_args(args)
jobs.find { |job| job['args'] == args }
end
def perform(*args)
end
end
end
before do
stub_const('TestWithContextWorker', worker_class)
end
describe "#call" do
it 'applies a context for jobs scheduled in batch' do
user_per_job = { 'job1' => build_stubbed(:user, username: 'user-1'),
'job2' => build_stubbed(:user, username: 'user-2') }
TestWithContextWorker.bulk_perform_async_with_contexts(
%w(job1 job2),
arguments_proc: -> (name) { [name, 1, 2, 3] },
context_proc: -> (name) { { user: user_per_job[name] } }
)
job1 = TestWithContextWorker.job_for_args(['job1', 1, 2, 3])
job2 = TestWithContextWorker.job_for_args(['job2', 1, 2, 3])
expect(job1['meta.user']).to eq(user_per_job['job1'].username)
expect(job2['meta.user']).to eq(user_per_job['job2'].username)
end
end
end
......@@ -110,6 +110,14 @@ describe Gitlab::SidekiqMiddleware do
let(:queue) { 'default' }
let(:redis_pool) { Sidekiq.redis_pool }
let(:middleware_expected_args) { [worker_class_arg, job, queue, redis_pool] }
let(:expected_middlewares) do
[
Gitlab::SidekiqStatus::ClientMiddleware,
Gitlab::SidekiqMiddleware::ClientMetrics,
Gitlab::SidekiqMiddleware::WorkerContext::Client,
Labkit::Middleware::Sidekiq::Client
]
end
before do
described_class.client_configurator.call(chain)
......@@ -120,8 +128,9 @@ describe Gitlab::SidekiqMiddleware do
# this will prevent the full middleware chain from being executed.
# This test ensures that this does not happen
it "invokes the chain" do
expect_any_instance_of(Gitlab::SidekiqStatus::ClientMiddleware).to receive(:call).with(*middleware_expected_args).once.and_call_original
expect_any_instance_of(Labkit::Middleware::Sidekiq::Client).to receive(:call).with(*middleware_expected_args).once.and_call_original
expected_middlewares do |middleware|
expect_any_instance_of(middleware).to receive(:call).with(*middleware_expected_args).once.ordered.and_call_original
end
expect { |b| chain.invoke(worker_class_arg, job, queue, redis_pool, &b) }.to yield_control.once
end
......
......@@ -5,7 +5,11 @@ require 'spec_helper'
describe WorkerContext do
let(:worker) do
Class.new do
include WorkerContext
def self.name
"TestWorker"
end
include ApplicationWorker
end
end
......@@ -24,6 +28,78 @@ describe WorkerContext do
end
end
shared_examples 'tracking bulk scheduling contexts' do
describe "context contents" do
before do
# stub clearing the contexts, so we can check what's inside
allow(worker).to receive(:batch_context=).and_call_original
allow(worker).to receive(:batch_context=).with(nil)
end
it 'keeps track of the context per key to schedule' do
subject
expect(worker.context_for_arguments(["hello"])).to be_a(Gitlab::ApplicationContext)
end
it 'does not share contexts across threads' do
t1_context = nil
t2_context = nil
Thread.new do
subject
t1_context = worker.context_for_arguments(["hello"])
end.join
Thread.new do
t2_context = worker.context_for_arguments(["hello"])
end.join
expect(t1_context).to be_a(Gitlab::ApplicationContext)
expect(t2_context).to be_nil
end
end
it 'clears the contexts' do
subject
expect(worker.__send__(:batch_context)).to be_nil
end
end
describe '.bulk_perform_async_with_contexts' do
subject do
worker.bulk_perform_async_with_contexts(%w(hello world),
context_proc: -> (_) { { user: build_stubbed(:user) } },
arguments_proc: -> (word) { word })
end
it 'calls bulk_perform_async with the arguments' do
expect(worker).to receive(:bulk_perform_async).with([["hello"], ["world"]])
subject
end
it_behaves_like 'tracking bulk scheduling contexts'
end
describe '.bulk_perform_in_with_contexts' do
subject do
worker.bulk_perform_in_with_contexts(10.minutes,
%w(hello world),
context_proc: -> (_) { { user: build_stubbed(:user) } },
arguments_proc: -> (word) { word })
end
it 'calls bulk_perform_in with the arguments and delay' do
expect(worker).to receive(:bulk_perform_in).with(10.minutes, [["hello"], ["world"]])
subject
end
it_behaves_like 'tracking bulk scheduling contexts'
end
describe '#with_context' do
it 'allows modifying context when the job is running' do
worker.new.with_context(user: build_stubbed(:user, username: 'jane-doe')) do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment