Commit d5bfa299 authored by Shinya Maeda's avatar Shinya Maeda

Extend deduplication strategy to ensure run once

This commit extends the Sidekiq deduplication strategy.
parent b3211781
......@@ -255,6 +255,11 @@ module Ci
end
```
Also, you can pass `if_deduplicated: :reschedule_once` option to re-run a job once after
the currently running job finished and deduplication happened at least once.
This ensures that the latest result is always produced even if a race condition
happened. See [this issue](https://gitlab.com/gitlab-org/gitlab/-/issues/342123) for more information.
#### Scheduling jobs in the future
GitLab doesn't skip jobs scheduled in the future, as we assume that
......
......@@ -6,7 +6,7 @@ module Gitlab
include Singleton
include LogsJobs
def log(job, deduplication_type, deduplication_options = {})
def deduplicated_log(job, deduplication_type, deduplication_options = {})
payload = parse_job(job)
payload['job_status'] = 'deduplicated'
payload['message'] = "#{base_message(payload)}: deduplicated: #{deduplication_type}"
......@@ -17,6 +17,14 @@ module Gitlab
Sidekiq.logger.info payload
end
def rescheduled_log(job)
payload = parse_job(job)
payload['job_status'] = 'rescheduled'
payload['message'] = "#{base_message(payload)}: rescheduled"
Sidekiq.logger.info payload
end
end
end
end
......@@ -24,6 +24,7 @@ module Gitlab
MAX_REDIS_RETRIES = 5
DEFAULT_STRATEGY = :until_executing
STRATEGY_NONE = :none
DEDUPLICATED_FLAG_VALUE = 1
LUA_SET_WAL_SCRIPT = <<~EOS
local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3]
......@@ -110,12 +111,18 @@ module Gitlab
def delete!
Sidekiq.redis do |redis|
redis.multi do |multi|
multi.del(idempotency_key)
multi.del(idempotency_key, deduplicated_flag_key)
delete_wal_locations!(multi)
end
end
end
def reschedule
Gitlab::SidekiqLogging::DeduplicationLogger.instance.rescheduled_log(job)
worker_klass.perform_async(*arguments)
end
def scheduled?
scheduled_at.present?
end
......@@ -126,6 +133,22 @@ module Gitlab
jid != existing_jid
end
def set_deduplicated_flag!(expiry = DUPLICATE_KEY_TTL)
return unless reschedulable?
Sidekiq.redis do |redis|
redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true)
end
end
def should_reschedule?
return false unless reschedulable?
Sidekiq.redis do |redis|
redis.get(deduplicated_flag_key).present?
end
end
def scheduled_at
job['at']
end
......@@ -216,6 +239,10 @@ module Gitlab
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end
def deduplicated_flag_key
"#{idempotency_key}:deduplicate_flag"
end
def idempotency_hash
Digest::SHA256.hexdigest(idempotency_string)
end
......@@ -235,6 +262,10 @@ module Gitlab
def preserve_wal_location?
Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml)
end
def reschedulable?
!scheduled? && options[:if_deduplicated] == :reschedule_once
end
end
end
end
......
......@@ -6,6 +6,7 @@ module Gitlab
module Strategies
class DeduplicatesWhenScheduling < Base
extend ::Gitlab::Utils::Override
include ::Gitlab::Utils::StrongMemoize
override :initialize
def initialize(duplicate_job)
......@@ -19,8 +20,9 @@ module Gitlab
if duplicate_job.idempotent?
duplicate_job.update_latest_wal_location!
duplicate_job.set_deduplicated_flag!(expiry)
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(
Gitlab::SidekiqLogging::DeduplicationLogger.instance.deduplicated_log(
job, "dropped #{strategy_name}", duplicate_job.options)
return false
end
......@@ -49,11 +51,13 @@ module Gitlab
end
def expiry
return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
strong_memoize(:expiry) do
next DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
end
end
end
end
......
......@@ -14,7 +14,10 @@ module Gitlab
yield
should_reschedule = duplicate_job.should_reschedule?
# Deleting before rescheduling to make sure we don't deduplicate again.
duplicate_job.delete!
duplicate_job.reschedule if should_reschedule
end
end
end
......
......@@ -23,11 +23,37 @@ RSpec.describe Gitlab::SidekiqLogging::DeduplicationLogger do
}
expect(Sidekiq.logger).to receive(:info).with(a_hash_including(expected_payload)).and_call_original
described_class.instance.log(job, "a fancy strategy", { foo: :bar })
described_class.instance.deduplicated_log(job, "a fancy strategy", { foo: :bar })
end
it "does not modify the job" do
expect { described_class.instance.log(job, "a fancy strategy") }
expect { described_class.instance.deduplicated_log(job, "a fancy strategy") }
.not_to change { job }
end
end
describe '#rescheduled_log' do
let(:job) do
{
'class' => 'TestWorker',
'args' => [1234, 'hello', { 'key' => 'value' }],
'jid' => 'da883554ee4fe414012f5f42',
'correlation_id' => 'cid'
}
end
it 'logs a rescheduled message to the sidekiq logger' do
expected_payload = {
'job_status' => 'rescheduled',
'message' => "#{job['class']} JID-#{job['jid']}: rescheduled"
}
expect(Sidekiq.logger).to receive(:info).with(a_hash_including(expected_payload)).and_call_original
described_class.instance.rescheduled_log(job)
end
it 'does not modify the job' do
expect { described_class.instance.rescheduled_log(job) }
.not_to change { job }
end
end
......
......@@ -24,6 +24,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
"#{Gitlab::Redis::Queues::SIDEKIQ_NAMESPACE}:duplicate:#{queue}:#{hash}"
end
let(:deduplicated_flag_key) do
"#{idempotency_key}:deduplicate_flag"
end
describe '#schedule' do
shared_examples 'scheduling with deduplication class' do |strategy_class|
it 'calls schedule on the strategy' do
......@@ -270,6 +274,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
context 'when the key exists in redis' do
before do
set_idempotency_key(idempotency_key, 'existing-jid')
set_idempotency_key(deduplicated_flag_key, 1)
wal_locations.each do |config_name, location|
set_idempotency_key(existing_wal_location_key(idempotency_key, config_name), location)
set_idempotency_key(wal_location_key(idempotency_key, config_name), location)
......@@ -299,6 +304,11 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
let(:from_value) { 'existing-jid' }
end
it_behaves_like 'deleting keys from redis', 'deduplication counter key' do
let(:key) { deduplicated_flag_key }
let(:from_value) { '1' }
end
it_behaves_like 'deleting keys from redis', 'existing wal location keys for main database' do
let(:key) { existing_wal_location_key(idempotency_key, :main) }
let(:from_value) { wal_locations[:main] }
......@@ -390,6 +400,103 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end
end
describe '#reschedule' do
it 'reschedules the current job' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
expect(fake_logger).to receive(:rescheduled_log).with(a_hash_including({ 'jid' => '123' }))
expect(AuthorizedProjectsWorker).to receive(:perform_async).with(1).once
duplicate_job.reschedule
end
end
describe '#should_reschedule?' do
subject { duplicate_job.should_reschedule? }
context 'when the job is reschedulable' do
before do
allow(duplicate_job).to receive(:reschedulable?) { true }
end
it { is_expected.to eq(false) }
context 'with deduplicated flag' do
before do
duplicate_job.set_deduplicated_flag!
end
it { is_expected.to eq(true) }
end
end
context 'when the job is not reschedulable' do
before do
allow(duplicate_job).to receive(:reschedulable?) { false }
end
it { is_expected.to eq(false) }
context 'with deduplicated flag' do
before do
duplicate_job.set_deduplicated_flag!
end
it { is_expected.to eq(false) }
end
end
end
describe '#set_deduplicated_flag!' do
context 'when the job is reschedulable' do
before do
allow(duplicate_job).to receive(:reschedulable?) { true }
end
it 'sets the key in Redis' do
duplicate_job.set_deduplicated_flag!
flag = Sidekiq.redis { |redis| redis.get(deduplicated_flag_key) }
expect(flag).to eq(described_class::DEDUPLICATED_FLAG_VALUE.to_s)
end
it 'sets, gets and cleans up the deduplicated flag' do
expect(duplicate_job.should_reschedule?).to eq(false)
duplicate_job.set_deduplicated_flag!
expect(duplicate_job.should_reschedule?).to eq(true)
duplicate_job.delete!
expect(duplicate_job.should_reschedule?).to eq(false)
end
end
context 'when the job is not reschedulable' do
before do
allow(duplicate_job).to receive(:reschedulable?) { false }
end
it 'does not set the key in Redis' do
duplicate_job.set_deduplicated_flag!
flag = Sidekiq.redis { |redis| redis.get(deduplicated_flag_key) }
expect(flag).to be_nil
end
it 'does not set the deduplicated flag' do
expect(duplicate_job.should_reschedule?).to eq(false)
duplicate_job.set_deduplicated_flag!
expect(duplicate_job.should_reschedule?).to eq(false)
duplicate_job.delete!
expect(duplicate_job.should_reschedule?).to eq(false)
end
end
end
describe '#duplicate?' do
it "raises an error if the check wasn't performed" do
expect { duplicate_job.duplicate? }.to raise_error /Call `#check!` first/
......
......@@ -9,6 +9,9 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut
before do
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( {} )
allow(fake_duplicate_job).to receive(:scheduled?) { false }
allow(fake_duplicate_job).to receive(:options) { {} }
allow(fake_duplicate_job).to receive(:should_reschedule?) { false }
end
it 'deletes the lock after executing' do
......@@ -19,6 +22,28 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut
proc.call
end
end
it 'does not reschedule the job even if deduplication happened' do
expect(fake_duplicate_job).to receive(:delete!)
expect(fake_duplicate_job).not_to receive(:reschedule)
strategy.perform({}) do
proc.call
end
end
context 'when job is reschedulable' do
it 'reschedules the job if deduplication happened' do
allow(fake_duplicate_job).to receive(:should_reschedule?) { true }
expect(fake_duplicate_job).to receive(:delete!)
expect(fake_duplicate_job).to receive(:reschedule).once
strategy.perform({}) do
proc.call
end
end
end
end
end
end
......@@ -102,6 +102,7 @@ RSpec.describe Projects::LfsPointers::LfsDownloadService do
it 'skips read_total_timeout', :aggregate_failures do
stub_const('GitLab::HTTP::DEFAULT_READ_TOTAL_TIMEOUT', 0)
expect(ProjectCacheWorker).to receive(:perform_async).once
expect(Gitlab::Metrics::System).not_to receive(:monotonic_time)
expect(subject.execute).to include(status: :success)
end
......
......@@ -11,7 +11,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
describe '#schedule' do
before do
allow(Gitlab::SidekiqLogging::DeduplicationLogger.instance).to receive(:log)
allow(Gitlab::SidekiqLogging::DeduplicationLogger.instance).to receive(:deduplicated_log)
end
it 'checks for duplicates before yielding' do
......@@ -40,6 +40,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
allow(fake_duplicate_job).to receive(:set_deduplicated_flag!)
allow(fake_duplicate_job).to receive(:options).and_return({})
job_hash = {}
......@@ -65,6 +66,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
.and_return('the jid'))
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
allow(fake_duplicate_job).to receive(:set_deduplicated_flag!)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
......@@ -86,6 +88,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
receive(:check!).with(time_diff.to_i).and_return('the jid'))
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
allow(fake_duplicate_job).to receive(:set_deduplicated_flag!)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
......@@ -100,6 +103,26 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
end
end
context "when the job is not duplicate" do
before do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:duplicate?).and_return(false)
allow(fake_duplicate_job).to receive(:options).and_return({})
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
end
it 'does not return false nor drop the job' do
schedule_result = nil
expect(fake_duplicate_job).not_to receive(:set_deduplicated_flag!)
expect { |b| schedule_result = strategy.schedule({}, &b) }.to yield_control
expect(schedule_result).to be_nil
end
end
context "when the job is droppable" do
before do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
......@@ -109,6 +132,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
allow(fake_duplicate_job).to receive(:set_deduplicated_flag!)
end
it 'updates latest wal location' do
......@@ -117,10 +141,11 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
strategy.schedule({ 'jid' => 'new jid' }) {}
end
it 'drops the job' do
it 'returns false to drop the job' do
schedule_result = nil
expect(fake_duplicate_job).to receive(:idempotent?).and_return(true)
expect(fake_duplicate_job).to receive(:set_deduplicated_flag!).once
expect { |b| schedule_result = strategy.schedule({}, &b) }.not_to yield_control
expect(schedule_result).to be(false)
......@@ -130,7 +155,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), expected_message, {})
expect(fake_logger).to receive(:deduplicated_log).with(a_hash_including({ 'jid' => 'new jid' }), expected_message, {})
strategy.schedule({ 'jid' => 'new jid' }) {}
end
......@@ -140,7 +165,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
allow(fake_duplicate_job).to receive(:options).and_return({ foo: :bar })
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), expected_message, { foo: :bar })
expect(fake_logger).to receive(:deduplicated_log).with(a_hash_including({ 'jid' => 'new jid' }), expected_message, { foo: :bar })
strategy.schedule({ 'jid' => 'new jid' }) {}
end
......@@ -159,6 +184,9 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
before do
allow(fake_duplicate_job).to receive(:delete!)
allow(fake_duplicate_job).to receive(:scheduled?) { false }
allow(fake_duplicate_job).to receive(:options) { {} }
allow(fake_duplicate_job).to receive(:should_reschedule?) { false }
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( wal_locations )
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment