Commit 1d4e3c82 authored by Grzegorz Bizon's avatar Grzegorz Bizon

Merge branch '214104-versioning' into 'master'

Versioning for sidekiq workers

See merge request gitlab-org/gitlab!37918
parents a4dfe1f2 324c544c
...@@ -10,6 +10,7 @@ module ApplicationWorker ...@@ -10,6 +10,7 @@ module ApplicationWorker
include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
include WorkerAttributes include WorkerAttributes
include WorkerContext include WorkerContext
include Gitlab::SidekiqVersioning::Worker
LOGGING_EXTRA_KEY = 'extra' LOGGING_EXTRA_KEY = 'extra'
......
---
name: sidekiq_versioning
introduced_by_url:
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/232934
group: group::fulfillment
type: development
default_enabled: false
\ No newline at end of file
...@@ -64,6 +64,36 @@ the extra jobs will take resources away from jobs from workers that were already ...@@ -64,6 +64,36 @@ the extra jobs will take resources away from jobs from workers that were already
there, if the resources available to the Sidekiq process handling the namespace there, if the resources available to the Sidekiq process handling the namespace
are not adjusted appropriately. are not adjusted appropriately.
## Versioning
Version can be specified on each Sidekiq worker class.
This is then sent along when the job is created.
```ruby
class FooWorker
include ApplicationWorker
version 2
def perform(*args)
if job_version == 2
foo = args.first['foo']
else
foo = args.first
end
end
end
```
Under this schema, any worker is expected to be able to handle any job that was
enqueued by an older version of that worker. This means that when changing the
arguments a worker takes, you must increment the `version` (or set `version 1`
if this is the first time a worker's arguments are changing), but also make sure
that the worker is still able to handle jobs that were queued with any earlier
version of the arguments. From the worker's `perform` method, you can read
`self.job_version` if you want to specifically branch on job version, or you
can read the number or type of provided arguments.
## Idempotent Jobs ## Idempotent Jobs
It's known that a job can fail for multiple reasons. For example, network outages or bugs. It's known that a job can fail for multiple reasons. For example, network outages or bugs.
......
...@@ -14,8 +14,9 @@ RSpec.describe Geo::RepositoryVerification::Primary::SingleWorker, :clean_gitlab ...@@ -14,8 +14,9 @@ RSpec.describe Geo::RepositoryVerification::Primary::SingleWorker, :clean_gitlab
end end
it 'disables retrying of failed jobs' do it 'disables retrying of failed jobs' do
expect(subject.sidekiq_options_hash).to eq( expect(subject.sidekiq_options_hash).to match(
'retry' => false, 'retry' => false,
'version' => an_instance_of(Integer),
'queue' => 'geo:geo_repository_verification_primary_single', 'queue' => 'geo:geo_repository_verification_primary_single',
'queue_namespace' => :geo 'queue_namespace' => :geo
) )
......
...@@ -15,8 +15,9 @@ RSpec.describe Geo::RepositoryVerification::Secondary::SingleWorker, :geo, :clea ...@@ -15,8 +15,9 @@ RSpec.describe Geo::RepositoryVerification::Secondary::SingleWorker, :geo, :clea
end end
it 'disables retrying of failed jobs' do it 'disables retrying of failed jobs' do
expect(subject.sidekiq_options_hash).to eq( expect(subject.sidekiq_options_hash).to match(
'retry' => false, 'retry' => false,
'version' => an_instance_of(Integer),
'queue' => 'geo:geo_repository_verification_secondary_single', 'queue' => 'geo:geo_repository_verification_secondary_single',
'queue_namespace' => :geo 'queue_namespace' => :geo
) )
......
...@@ -19,6 +19,7 @@ module Gitlab ...@@ -19,6 +19,7 @@ module Gitlab
chain.add ::Labkit::Middleware::Sidekiq::Server chain.add ::Labkit::Middleware::Sidekiq::Server
chain.add ::Gitlab::SidekiqMiddleware::InstrumentationLogger chain.add ::Gitlab::SidekiqMiddleware::InstrumentationLogger
chain.add ::Gitlab::SidekiqMiddleware::AdminMode::Server chain.add ::Gitlab::SidekiqMiddleware::AdminMode::Server
chain.add ::Gitlab::SidekiqVersioning::Middleware
chain.add ::Gitlab::SidekiqStatus::ServerMiddleware chain.add ::Gitlab::SidekiqStatus::ServerMiddleware
chain.add ::Gitlab::SidekiqMiddleware::WorkerContext::Server chain.add ::Gitlab::SidekiqMiddleware::WorkerContext::Server
chain.add ::Gitlab::SidekiqMiddleware::DuplicateJobs::Server chain.add ::Gitlab::SidekiqMiddleware::DuplicateJobs::Server
......
# frozen_string_literal: true
module Gitlab
module SidekiqVersioning
class Middleware
def call(worker, job, queue)
worker.job_version = job['version'] if worker.is_a?(ApplicationWorker) && Feature.enabled?(:sidekiq_versioning)
yield
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqVersioning
module Worker
extend ActiveSupport::Concern
included do
version 0
attr_writer :job_version
end
class_methods do
def version(new_version = nil)
if new_version
sidekiq_options version: new_version.to_i
else
get_sidekiq_options['version']
end
end
end
# Version is not set if `new.perform` is called directly,
# and in that case we fallback to latest version
def job_version
@job_version ||= self.class.version
end
end
end
end
...@@ -51,6 +51,7 @@ RSpec.describe Gitlab::SidekiqMiddleware do ...@@ -51,6 +51,7 @@ RSpec.describe Gitlab::SidekiqMiddleware do
Gitlab::SidekiqMiddleware::BatchLoader, Gitlab::SidekiqMiddleware::BatchLoader,
Labkit::Middleware::Sidekiq::Server, Labkit::Middleware::Sidekiq::Server,
Gitlab::SidekiqMiddleware::InstrumentationLogger, Gitlab::SidekiqMiddleware::InstrumentationLogger,
Gitlab::SidekiqVersioning::Middleware,
Gitlab::SidekiqStatus::ServerMiddleware, Gitlab::SidekiqStatus::ServerMiddleware,
Gitlab::SidekiqMiddleware::ServerMetrics, Gitlab::SidekiqMiddleware::ServerMetrics,
Gitlab::SidekiqMiddleware::ArgumentsLogger, Gitlab::SidekiqMiddleware::ArgumentsLogger,
...@@ -78,6 +79,41 @@ RSpec.describe Gitlab::SidekiqMiddleware do ...@@ -78,6 +79,41 @@ RSpec.describe Gitlab::SidekiqMiddleware do
end end
end end
shared_examples "a server middleware chain for mailer" do
let(:worker_class) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper }
let(:job_args) do
[
{
"job_class" => "ActionMailer::MailDeliveryJob",
"job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e",
"provider_job_id" => nil,
"queue_name" => "mailers",
"priority" => nil,
"arguments" => [
"Notify",
"test_email",
"deliver_now",
{
"args" => [
"test@example.com",
"subject",
"body"
],
"_aj_symbol_keys" => ["args"]
}
],
"executions" => 0,
"exception_executions" => {},
"locale" => "en",
"timezone" => "UTC",
"enqueued_at" => "2020-07-27T07:43:31Z"
}
]
end
it_behaves_like "a server middleware chain"
end
context "all optional middlewares off" do context "all optional middlewares off" do
let(:metrics) { false } let(:metrics) { false }
let(:arguments_logger) { false } let(:arguments_logger) { false }
...@@ -91,6 +127,7 @@ RSpec.describe Gitlab::SidekiqMiddleware do ...@@ -91,6 +127,7 @@ RSpec.describe Gitlab::SidekiqMiddleware do
end end
it_behaves_like "a server middleware chain" it_behaves_like "a server middleware chain"
it_behaves_like "a server middleware chain for mailer"
end end
context "all optional middlewares on" do context "all optional middlewares on" do
...@@ -100,6 +137,7 @@ RSpec.describe Gitlab::SidekiqMiddleware do ...@@ -100,6 +137,7 @@ RSpec.describe Gitlab::SidekiqMiddleware do
let(:disabled_sidekiq_middlewares) { [] } let(:disabled_sidekiq_middlewares) { [] }
it_behaves_like "a server middleware chain" it_behaves_like "a server middleware chain"
it_behaves_like "a server middleware chain for mailer"
context "server metrics" do context "server metrics" do
let(:gitaly_histogram) { double(:gitaly_histogram) } let(:gitaly_histogram) { double(:gitaly_histogram) }
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqVersioning::Middleware do
let(:worker_class) do
Class.new do
def self.name
'DummyWorker'
end
include ApplicationWorker
version 2
end
end
describe '#call' do
let(:worker) { worker_class.new }
let(:job) { { 'version' => 3, 'queue' => queue } }
let(:queue) { worker_class.queue }
def call!(&block)
block ||= -> {}
subject.call(worker, job, queue, &block)
end
it 'sets worker.job_version' do
call!
expect(worker.job_version).to eq(job['version'])
end
it 'yields' do
expect { |b| call!(&b) }.to yield_control
end
context 'when worker is not ApplicationWorker' do
let(:worker_class) do
ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper
end
it 'does not err' do
expect { call! }.not_to raise_error
end
end
context 'when sidekiq_versioning is disabled' do
before do
stub_feature_flags(sidekiq_versioning: false)
end
it 'does not set job_version' do
expect(worker).not_to receive(:job_version=)
call!
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqVersioning::Worker do
let(:worker) do
Class.new do
def self.name
'DummyWorker'
end
# ApplicationWorker includes Gitlab::SidekiqVersioning::Worker
include ApplicationWorker
version 2
end
end
describe '.version' do
context 'when called with an argument' do
it 'sets the version option' do
worker.version 3
expect(worker.get_sidekiq_options['version']).to eq(3)
end
end
context 'when called without an argument' do
it 'returns the version option' do
worker.sidekiq_options version: 3
expect(worker.version).to eq(3)
end
end
end
describe '#job_version' do
let(:job) { worker.new }
context 'when job_version is not set' do
it 'returns latest version' do
expect(job.job_version).to eq(2)
end
end
context 'when job_version is set' do
it 'returns the set version' do
job.job_version = 0
expect(job.job_version).to eq(0)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment