Commit 4774e92c authored by Nikola Milojevic's avatar Nikola Milojevic Committed by Bob Van Landuyt

Add until_executed deduplication strategy

- Fix specs to include new strategies for middleware
-  Extract abstract base class for strategies
- Add until_executed strategy documentation
- Fix spec for executed strategy
- Add changelog entry
parent 7b38d00c
...@@ -136,9 +136,10 @@ module Ci ...@@ -136,9 +136,10 @@ module Ci
# We are using optimistic locking combined with Redis locking to ensure # We are using optimistic locking combined with Redis locking to ensure
# that a chunk gets migrated properly. # that a chunk gets migrated properly.
# #
# We are catching an exception related to an exclusive lock not being # We are using until_executed deduplication strategy for workers,
# acquired because it is creating a lot of noise, and is a result of # which should prevent duplicated workers running in parallel for the same build trace,
# duplicated workers running in parallel for the same build trace chunk. # and causing an exception related to an exclusive lock not being
# acquired
# #
def persist_data! def persist_data!
in_lock(*lock_params) do # exclusive Redis lock is acquired first in_lock(*lock_params) do # exclusive Redis lock is acquired first
...@@ -150,6 +151,8 @@ module Ci ...@@ -150,6 +151,8 @@ module Ci
end end
rescue FailedToObtainLockError rescue FailedToObtainLockError
metrics.increment_trace_operation(operation: :stalled) metrics.increment_trace_operation(operation: :stalled)
raise FailedToPersistDataError, 'Data migration failed due to a worker duplication'
rescue ActiveRecord::StaleObjectError rescue ActiveRecord::StaleObjectError
raise FailedToPersistDataError, <<~MSG raise FailedToPersistDataError, <<~MSG
Data migration race condition detected Data migration race condition detected
......
...@@ -5,6 +5,8 @@ module Ci ...@@ -5,6 +5,8 @@ module Ci
include ApplicationWorker include ApplicationWorker
include PipelineBackgroundQueue include PipelineBackgroundQueue
deduplicate :until_executed
idempotent! idempotent!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
......
---
title: Add until_executed deduplication strategy
merge_request: 42223
author:
type: added
...@@ -165,6 +165,22 @@ job. The work is skipped because the same work would be ...@@ -165,6 +165,22 @@ job. The work is skipped because the same work would be
done by the job that was scheduled first; by the time the second done by the job that was scheduled first; by the time the second
job executed, the first job would do nothing. job executed, the first job would do nothing.
#### Strategies
GitLab supports two deduplication strategies:
- `until_executing`
- `until_executed`
More [deduplication strategies have been
suggested](https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/195). If
you are implementing a worker that could benefit from a different
strategy, please comment in the issue.
##### Until Executing
This strategy takes a lock when a job is added to the queue, and removes that lock before the job starts.
For example, `AuthorizedProjectsWorker` takes a user ID. When the For example, `AuthorizedProjectsWorker` takes a user ID. When the
worker runs, it recalculates a user's authorizations. GitLab schedules worker runs, it recalculates a user's authorizations. GitLab schedules
this job each time an action potentially changes a user's this job each time an action potentially changes a user's
...@@ -173,10 +189,47 @@ same time, the second job can be skipped if the first job hasn't ...@@ -173,10 +189,47 @@ same time, the second job can be skipped if the first job hasn't
begun, because when the first job runs, it creates the begun, because when the first job runs, it creates the
authorizations for both projects. authorizations for both projects.
```ruby
module AuthorizedProjectUpdate
class UserRefreshOverUserRangeWorker
include ApplicationWorker
deduplicate :until_executing
idempotent!
# ...
end
end
```
##### Until Executed
This strategy takes a lock when a job is added to the queue, and removes that lock after the job finishes.
It can be used to prevent jobs from running simultaneously multiple times.
```ruby
module Ci
class BuildTraceChunkFlushWorker
include ApplicationWorker
deduplicate :until_executed
idempotent!
# ...
end
end
```
#### Scheduling jobs in the future
GitLab doesn't skip jobs scheduled in the future, as we assume that GitLab doesn't skip jobs scheduled in the future, as we assume that
the state will have changed by the time the job is scheduled to the state will have changed by the time the job is scheduled to
execute. If you do want to deduplicate jobs scheduled in the future execute. Deduplication of jobs scheduled in the feature is possible
this can be specified on the worker as follows: for both `until_executed` and `until_executing` strategies.
If you do want to deduplicate jobs scheduled in the future,
this can be specified on the worker by passing `including_scheduled: true` argument
when defining deduplication strategy:
```ruby ```ruby
module AuthorizedProjectUpdate module AuthorizedProjectUpdate
...@@ -191,11 +244,7 @@ module AuthorizedProjectUpdate ...@@ -191,11 +244,7 @@ module AuthorizedProjectUpdate
end end
``` ```
This strategy is called `until_executing`. More [deduplication #### Troubleshooting
strategies have been
suggested](https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/195). If
you are implementing a worker that could benefit from a different
strategy, please comment in the issue.
If the automatic deduplication were to cause issues in certain If the automatic deduplication were to cause issues in certain
queues. This can be temporarily disabled by enabling a feature flag queues. This can be temporarily disabled by enabling a feature flag
......
...@@ -8,6 +8,7 @@ module Gitlab ...@@ -8,6 +8,7 @@ module Gitlab
STRATEGIES = { STRATEGIES = {
until_executing: UntilExecuting, until_executing: UntilExecuting,
until_executed: UntilExecuted,
none: None none: None
}.freeze }.freeze
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
module Strategies
class Base
def initialize(duplicate_job)
@duplicate_job = duplicate_job
end
def schedule(job)
raise NotImplementedError
end
def perform(_job)
raise NotImplementedError
end
private
attr_reader :duplicate_job
def strategy_name
self.class.name.to_s.demodulize.underscore.humanize.downcase
end
def check!
# The default expiry time is the DuplicateJob::DUPLICATE_KEY_TTL already
# Only the strategies de-duplicating when scheduling
duplicate_job.check!
end
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
module Strategies
module DeduplicatesWhenScheduling
def initialize(duplicate_job)
@duplicate_job = duplicate_job
end
def schedule(job)
if deduplicatable_job? && check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid
if duplicate_job.droppable?
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(
job, "dropped #{strategy_name}", duplicate_job.options)
return false
end
end
yield
end
private
def deduplicatable_job?
!duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
end
def check!
duplicate_job.check!(expiry)
end
def expiry
return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
end
end
end
end
end
end
...@@ -5,10 +5,7 @@ module Gitlab ...@@ -5,10 +5,7 @@ module Gitlab
module DuplicateJobs module DuplicateJobs
module Strategies module Strategies
# This strategy will never deduplicate a job # This strategy will never deduplicate a job
class None class None < Base
def initialize(_duplicate_job)
end
def schedule(_job) def schedule(_job)
yield yield
end end
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
module Strategies
# This strategy takes a lock before scheduling the job in a queue and
# removes the lock after the job has executed preventing a new job to be queued
# while a job is still executing.
class UntilExecuted < Base
include DeduplicatesWhenScheduling
def perform(_job)
yield
duplicate_job.delete!
end
end
end
end
end
end
...@@ -7,50 +7,14 @@ module Gitlab ...@@ -7,50 +7,14 @@ module Gitlab
# This strategy takes a lock before scheduling the job in a queue and # This strategy takes a lock before scheduling the job in a queue and
# removes the lock before the job starts allowing a new job to be queued # removes the lock before the job starts allowing a new job to be queued
# while a job is still executing. # while a job is still executing.
class UntilExecuting class UntilExecuting < Base
def initialize(duplicate_job) include DeduplicatesWhenScheduling
@duplicate_job = duplicate_job
end
def schedule(job)
if deduplicatable_job? && check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid
if duplicate_job.droppable?
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(
job, "dropped until executing", duplicate_job.options)
return false
end
end
yield
end
def perform(_job) def perform(_job)
duplicate_job.delete! duplicate_job.delete!
yield yield
end end
private
attr_reader :duplicate_job
def deduplicatable_job?
!duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
end
def check!
duplicate_job.check!(expiry)
end
def expiry
return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
end
end end
end end
end end
......
...@@ -3,79 +3,84 @@ ...@@ -3,79 +3,84 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Client, :clean_gitlab_redis_queues do RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Client, :clean_gitlab_redis_queues do
let(:worker_class) do shared_context 'deduplication worker class' do |strategy, including_scheduled|
Class.new do let(:worker_class) do
def self.name Class.new do
'TestDeduplicationWorker' def self.name
end 'TestDeduplicationWorker'
end
include ApplicationWorker
deduplicate strategy, including_scheduled: including_scheduled
include ApplicationWorker include ApplicationWorker
def perform(*args) def perform(*args)
end
end end
end end
end
before do before do
stub_const('TestDeduplicationWorker', worker_class) stub_const('TestDeduplicationWorker', worker_class)
end
end end
describe '#call' do shared_examples 'client duplicate job' do |strategy|
it 'adds a correct duplicate tag to the jobs', :aggregate_failures do describe '#call' do
TestDeduplicationWorker.bulk_perform_async([['args1'], ['args2'], ['args1']]) include_context 'deduplication worker class', strategy, false
job1, job2, job3 = TestDeduplicationWorker.jobs it 'adds a correct duplicate tag to the jobs', :aggregate_failures do
TestDeduplicationWorker.bulk_perform_async([['args1'], ['args2'], ['args1']])
expect(job1['duplicate-of']).to be_nil
expect(job2['duplicate-of']).to be_nil
expect(job3['duplicate-of']).to eq(job1['jid'])
end
context 'without scheduled deduplication' do
it "does not mark a job that's scheduled in the future as a duplicate" do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
duplicates = TestDeduplicationWorker.jobs.map { |job| job['duplicate-of'] } job1, job2, job3 = TestDeduplicationWorker.jobs
expect(duplicates).to all(be_nil) expect(job1['duplicate-of']).to be_nil
expect(job2['duplicate-of']).to be_nil
expect(job3['duplicate-of']).to eq(job1['jid'])
end end
end
context 'with scheduled deduplication' do
let(:scheduled_worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
end
include ApplicationWorker context 'without scheduled deduplication' do
it "does not mark a job that's scheduled in the future as a duplicate" do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
deduplicate :until_executing, including_scheduled: true duplicates = TestDeduplicationWorker.jobs.map { |job| job['duplicate-of'] }
def perform(*args) expect(duplicates).to all(be_nil)
end
end end
end end
before do context 'with scheduled deduplication' do
stub_const('TestDeduplicationWorker', scheduled_worker_class) include_context 'deduplication worker class', strategy, true
end
it 'adds a correct duplicate tag to the jobs', :aggregate_failures do before do
TestDeduplicationWorker.perform_async('args1') stub_const('TestDeduplicationWorker', worker_class)
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1') end
TestDeduplicationWorker.perform_in(3.hours, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args2')
job1, job2, job3, job4 = TestDeduplicationWorker.jobs it 'adds a correct duplicate tag to the jobs', :aggregate_failures do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args2')
expect(job1['duplicate-of']).to be_nil job1, job2, job3, job4 = TestDeduplicationWorker.jobs
expect(job2['duplicate-of']).to eq(job1['jid'])
expect(job3['duplicate-of']).to eq(job1['jid']) expect(job1['duplicate-of']).to be_nil
expect(job4['duplicate-of']).to be_nil expect(job2['duplicate-of']).to eq(job1['jid'])
expect(job3['duplicate-of']).to eq(job1['jid'])
expect(job4['duplicate-of']).to be_nil
end
end end
end end
end end
context 'with until_executing strategy' do
it_behaves_like 'client duplicate job', :until_executing
end
context 'with until_executed strategy' do
it_behaves_like 'client duplicate job', :until_executed
end
end end
...@@ -3,39 +3,71 @@ ...@@ -3,39 +3,71 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Server, :clean_gitlab_redis_queues do RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Server, :clean_gitlab_redis_queues do
let(:worker_class) do shared_context 'server duplicate job' do |strategy|
Class.new do let(:worker_class) do
def self.name Class.new do
'TestDeduplicationWorker' def self.name
'TestDeduplicationWorker'
end
include ApplicationWorker
deduplicate strategy
def perform(*args)
self.class.work
end
def self.work
end
end end
end
include ApplicationWorker before do
stub_const('TestDeduplicationWorker', worker_class)
end
def perform(*args) around do |example|
with_sidekiq_server_middleware do |chain|
chain.add described_class
Sidekiq::Testing.inline! { example.run }
end end
end end
end end
before do context 'with until_executing strategy' do
stub_const('TestDeduplicationWorker', worker_class) include_context 'server duplicate job', :until_executing
end
around do |example| describe '#call' do
with_sidekiq_server_middleware do |chain| it 'removes the stored job from redis before execution' do
chain.add described_class bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] }
Sidekiq::Testing.inline! { example.run } job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication')
expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
.to receive(:new).with(a_hash_including(bare_job), 'test_deduplication')
.and_return(job_definition).twice # once in client middleware
expect(job_definition).to receive(:delete!).ordered.and_call_original
expect(TestDeduplicationWorker).to receive(:work).ordered.and_call_original
TestDeduplicationWorker.perform_async('hello')
end
end end
end end
describe '#call' do context 'with until_executed strategy' do
it 'removes the stored job from redis' do include_context 'server duplicate job', :until_executed
it 'removes the stored job from redis after execution' do
bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] } bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] }
job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication') job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication')
expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob) expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
.to receive(:new).with(a_hash_including(bare_job), 'test_deduplication') .to receive(:new).with(a_hash_including(bare_job), 'test_deduplication')
.and_return(job_definition).twice # once in client middleware .and_return(job_definition).twice # once in client middleware
expect(job_definition).to receive(:delete!).and_call_original
expect(TestDeduplicationWorker).to receive(:work).ordered.and_call_original
expect(job_definition).to receive(:delete!).ordered.and_call_original
TestDeduplicationWorker.perform_async('hello') TestDeduplicationWorker.perform_async('hello')
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuted do
it_behaves_like 'deduplicating jobs when scheduling', :until_executed do
describe '#perform' do
let(:proc) { -> {} }
it 'deletes the lock after executing' do
expect(proc).to receive(:call).ordered
expect(fake_duplicate_job).to receive(:delete!).ordered
strategy.perform({}) do
proc.call
end
end
end
end
end
# frozen_string_literal: true # frozen_string_literal: true
require 'fast_spec_helper' require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
let(:fake_duplicate_job) do it_behaves_like 'deduplicating jobs when scheduling', :until_executing do
instance_double(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob) describe '#perform' do
end let(:proc) { -> {} }
subject(:strategy) { described_class.new(fake_duplicate_job) }
describe '#schedule' do
before do
allow(Gitlab::SidekiqLogging::DeduplicationLogger.instance).to receive(:log)
end
it 'checks for duplicates before yielding' do
expect(fake_duplicate_job).to receive(:scheduled?).twice.ordered.and_return(false)
expect(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.ordered
.and_return('a jid'))
expect(fake_duplicate_job).to receive(:duplicate?).ordered.and_return(false)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
it 'checks worker options for scheduled jobs' do
expect(fake_duplicate_job).to receive(:scheduled?).ordered.and_return(true)
expect(fake_duplicate_job).to receive(:options).ordered.and_return({})
expect(fake_duplicate_job).not_to receive(:check!)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
context 'job marking' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
allow(fake_duplicate_job).to receive(:options).and_return({})
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true) it 'deletes the lock before executing' do
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid') expect(fake_duplicate_job).to receive(:delete!).ordered
expect(proc).to receive(:call).ordered
strategy.schedule(job_hash) {} strategy.perform({}) do
proc.call
expect(job_hash).to include('duplicate-of' => 'the jid')
end
context 'scheduled jobs' do
let(:time_diff) { 1.minute }
context 'scheduled in the past' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now - time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end end
context 'scheduled in the future' do
it 'adds the jid of the existing job to the job hash' do
freeze_time do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now + time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!).with(time_diff.to_i).and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
end
end
end
context "when the job is droppable" do
before do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:duplicate?).and_return(true)
allow(fake_duplicate_job).to receive(:options).and_return({})
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
end
it 'drops the job' do
schedule_result = nil
expect(fake_duplicate_job).to receive(:droppable?).and_return(true)
expect { |b| schedule_result = strategy.schedule({}, &b) }.not_to yield_control
expect(schedule_result).to be(false)
end
it 'logs that the job was dropped' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), 'dropped until executing', {})
strategy.schedule({ 'jid' => 'new jid' }) {}
end
it 'logs the deduplication options of the worker' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
allow(fake_duplicate_job).to receive(:options).and_return({ foo: :bar })
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), 'dropped until executing', { foo: :bar })
strategy.schedule({ 'jid' => 'new jid' }) {}
end end
end end
end end
describe '#perform' do
it 'deletes the lock before executing' do
expect(fake_duplicate_job).to receive(:delete!).ordered
expect { |b| strategy.perform({}, &b) }.to yield_control
end
end
end end
...@@ -8,6 +8,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies do ...@@ -8,6 +8,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies do
expect(described_class.for(:until_executing)).to eq(described_class::UntilExecuting) expect(described_class.for(:until_executing)).to eq(described_class::UntilExecuting)
end end
it 'returns the right class for `until_executed`' do
expect(described_class.for(:until_executed)).to eq(described_class::UntilExecuted)
end
it 'returns the right class for `none`' do it 'returns the right class for `none`' do
expect(described_class.for(:none)).to eq(described_class::None) expect(described_class.for(:none)).to eq(described_class::None)
end end
......
...@@ -594,23 +594,19 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -594,23 +594,19 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
context 'when the chunk is being locked by a different worker' do context 'when the chunk is being locked by a different worker' do
let(:metrics) { spy('metrics') } let(:metrics) { spy('metrics') }
it 'does not raise an exception' do
lock_chunk do
expect { build_trace_chunk.persist_data! }.not_to raise_error
end
end
it 'increments stalled chunk trace metric' do it 'increments stalled chunk trace metric' do
allow(build_trace_chunk) allow(build_trace_chunk)
.to receive(:metrics) .to receive(:metrics)
.and_return(metrics) .and_return(metrics)
lock_chunk { build_trace_chunk.persist_data! } expect do
subject
expect(metrics) expect(metrics)
.to have_received(:increment_trace_operation) .to have_received(:increment_trace_operation)
.with(operation: :stalled) .with(operation: :stalled)
.once .once
end.to raise_error(described_class::FailedToPersistDataError)
end end
def lock_chunk(&block) def lock_chunk(&block)
......
# frozen_string_literal: true
RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
let(:fake_duplicate_job) do
instance_double(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
end
let(:expected_message) { "dropped #{strategy_name.to_s.humanize.downcase}" }
subject(:strategy) { Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies.for(strategy_name).new(fake_duplicate_job) }
describe '#schedule' do
before do
allow(Gitlab::SidekiqLogging::DeduplicationLogger.instance).to receive(:log)
end
it 'checks for duplicates before yielding' do
expect(fake_duplicate_job).to receive(:scheduled?).twice.ordered.and_return(false)
expect(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.ordered
.and_return('a jid'))
expect(fake_duplicate_job).to receive(:duplicate?).ordered.and_return(false)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
it 'checks worker options for scheduled jobs' do
expect(fake_duplicate_job).to receive(:scheduled?).ordered.and_return(true)
expect(fake_duplicate_job).to receive(:options).ordered.and_return({})
expect(fake_duplicate_job).not_to receive(:check!)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
context 'job marking' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
allow(fake_duplicate_job).to receive(:options).and_return({})
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
context 'scheduled jobs' do
let(:time_diff) { 1.minute }
context 'scheduled in the past' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now - time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
context 'scheduled in the future' do
it 'adds the jid of the existing job to the job hash' do
freeze_time do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now + time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!).with(time_diff.to_i).and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
end
end
end
context "when the job is droppable" do
before do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:duplicate?).and_return(true)
allow(fake_duplicate_job).to receive(:options).and_return({})
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
end
it 'drops the job' do
schedule_result = nil
expect(fake_duplicate_job).to receive(:droppable?).and_return(true)
expect { |b| schedule_result = strategy.schedule({}, &b) }.not_to yield_control
expect(schedule_result).to be(false)
end
it 'logs that the job was dropped' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), expected_message, {})
strategy.schedule({ 'jid' => 'new jid' }) {}
end
it 'logs the deduplication options of the worker' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
allow(fake_duplicate_job).to receive(:options).and_return({ foo: :bar })
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), expected_message, { foo: :bar })
strategy.schedule({ 'jid' => 'new jid' }) {}
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment