Commit 1234bd6c authored by Douwe Maan's avatar Douwe Maan Committed by Mark Chao

Declare sidekiq worker version

Each job will record its version.
Allows handling worker code updates.
parent 39b70204
......@@ -10,6 +10,7 @@ module ApplicationWorker
include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
include WorkerAttributes
include WorkerContext
include Gitlab::SidekiqVersioning::Worker
LOGGING_EXTRA_KEY = 'extra'
......
......@@ -64,6 +64,36 @@ the extra jobs will take resources away from jobs from workers that were already
there, if the resources available to the Sidekiq process handling the namespace
are not adjusted appropriately.
## Versioning
Version can be specified on each Sidekiq worker class.
This is then sent along when the job is created.
```ruby
class FooWorker
include ApplicationWorker
version 2
def perform(*args)
if job_version == 2
foo = args.first['foo']
else
foo = args.first
end
end
end
```
Under this schema, any worker is expected to be able to handle any job that was
enqueued by an older version of that worker. This means that when changing the
arguments a worker takes, you must increment the `version` (or set `version 1`
if this is the first time a worker's arguments are changing), but also make sure
that the worker is still able to handle jobs that were queued with any earlier
version of the arguments. From the worker's `perform` method, you can read
`self.job_version` if you want to specifically branch on job version, or you
can read the number or type of provided arguments.
## Idempotent Jobs
It's known that a job can fail for multiple reasons. For example, network outages or bugs.
......
......@@ -14,8 +14,9 @@ RSpec.describe Geo::RepositoryVerification::Primary::SingleWorker, :clean_gitlab
end
it 'disables retrying of failed jobs' do
expect(subject.sidekiq_options_hash).to eq(
expect(subject.sidekiq_options_hash).to match(
'retry' => false,
'version' => an_instance_of(Integer),
'queue' => 'geo:geo_repository_verification_primary_single',
'queue_namespace' => :geo
)
......
......@@ -15,8 +15,9 @@ RSpec.describe Geo::RepositoryVerification::Secondary::SingleWorker, :geo, :clea
end
it 'disables retrying of failed jobs' do
expect(subject.sidekiq_options_hash).to eq(
expect(subject.sidekiq_options_hash).to match(
'retry' => false,
'version' => an_instance_of(Integer),
'queue' => 'geo:geo_repository_verification_secondary_single',
'queue_namespace' => :geo
)
......
......@@ -15,8 +15,9 @@ RSpec.describe Geo::Secondary::RepositoryBackfillWorker, :geo, :geo_fdw, :clean_
end
it 'disables Sidekiq retries' do
expect(subject.sidekiq_options_hash).to eq(
expect(subject.sidekiq_options_hash).to match(
'retry' => false,
'version' => an_instance_of(Integer),
'queue' => 'geo:geo_secondary_repository_backfill',
'queue_namespace' => :geo
)
......
......@@ -5,6 +5,10 @@ module Gitlab
def self.install!
Sidekiq::Manager.prepend SidekiqVersioning::Manager
Sidekiq.server_middleware do |chain|
chain.add SidekiqVersioning::Middleware
end
# The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring.
......
# frozen_string_literal: true
module Gitlab
module SidekiqVersioning
class Middleware
def call(worker, job, queue)
worker.job_version = job['version']
yield
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqVersioning
module Worker
extend ActiveSupport::Concern
included do
version 0
attr_writer :job_version
end
class_methods do
def version(new_version = nil)
if new_version
sidekiq_options version: new_version.to_i
else
get_sidekiq_options['version']
end
end
end
# Version is not set if `new.perform` is called directly,
# and in that case we fallback to latest version
def job_version
@job_version ||= self.class.version
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqVersioning::Middleware do
let(:worker_class) do
Class.new do
def self.name
'DummyWorker'
end
include ApplicationWorker
version 2
end
end
describe '#call' do
let(:worker) { worker_class.new }
let(:job) { { 'version' => 3, 'queue' => queue } }
let(:queue) { worker_class.queue }
def call!(&block)
block ||= -> {}
subject.call(worker, job, queue, &block)
end
it 'sets worker.job_version' do
call!
expect(worker.job_version).to eq(job['version'])
end
it 'yields' do
expect { |b| call!(&b) }.to yield_control
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqVersioning::Worker do
let(:worker) do
Class.new do
def self.name
'DummyWorker'
end
# ApplicationWorker includes Gitlab::SidekiqVersioning::Worker
include ApplicationWorker
version 2
end
end
describe '.version' do
context 'when called with an argument' do
it 'sets the version option' do
worker.version 3
expect(worker.get_sidekiq_options['version']).to eq(3)
end
end
context 'when called without an argument' do
it 'returns the version option' do
worker.sidekiq_options version: 3
expect(worker.version).to eq(3)
end
end
end
describe '#job_version' do
let(:job) { worker.new }
context 'when job_version is not set' do
it 'returns latest version' do
expect(job.job_version).to eq(2)
end
end
context 'when job_version is set' do
it 'returns the set version' do
job.job_version = 0
expect(job.job_version).to eq(0)
end
end
end
end
......@@ -35,6 +35,12 @@ RSpec.describe Gitlab::SidekiqVersioning, :redis do
expect(Sidekiq::Manager).to include(Gitlab::SidekiqVersioning::Manager)
end
it 'adds the SidekiqVersioning::Middleware Sidekiq server middleware' do
described_class.install!
expect(Sidekiq.server_middleware.entries.map(&:klass)).to include(Gitlab::SidekiqVersioning::Middleware)
end
it 'registers all versionless and versioned queues with Redis' do
described_class.install!
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment