Commit 78838244 authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch '325291-allow-idempotent-jobs' into 'master'

Allow idempotent jobs to use load balancing

See merge request gitlab-org/gitlab!69372
parents 3c379e96 2e31e87d
...@@ -58,10 +58,7 @@ module ApplicationWorker ...@@ -58,10 +58,7 @@ module ApplicationWorker
Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self) Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self)
end end
override :validate_worker_attributes!
def validate_worker_attributes! def validate_worker_attributes!
super
# Since the delayed data_consistency will use sidekiq built in retry mechanism, it is required that this mechanism # Since the delayed data_consistency will use sidekiq built in retry mechanism, it is required that this mechanism
# is not disabled. # is not disabled.
if retry_disabled? && get_data_consistency == :delayed if retry_disabled? && get_data_consistency == :delayed
...@@ -81,6 +78,13 @@ module ApplicationWorker ...@@ -81,6 +78,13 @@ module ApplicationWorker
end end
end end
override :data_consistency
def data_consistency(data_consistency, feature_flag: nil)
super
validate_worker_attributes!
end
def perform_async(*args) def perform_async(*args)
# Worker execution for workers with data_consistency set to :delayed or :sticky # Worker execution for workers with data_consistency set to :delayed or :sticky
# will be delayed to give replication enough time to complete # will be delayed to give replication enough time to complete
......
...@@ -92,17 +92,6 @@ module WorkerAttributes ...@@ -92,17 +92,6 @@ module WorkerAttributes
set_class_attribute(:data_consistency_feature_flag, feature_flag) if feature_flag set_class_attribute(:data_consistency_feature_flag, feature_flag) if feature_flag
set_class_attribute(:data_consistency, data_consistency) set_class_attribute(:data_consistency, data_consistency)
validate_worker_attributes!
end
def validate_worker_attributes!
# Since the deduplication should always take into account the latest binary replication pointer into account,
# not the first one, the deduplication will not work with sticky or delayed.
# Follow up issue to improve this: https://gitlab.com/gitlab-org/gitlab/-/issues/325291
if idempotent? && utilizes_load_balancing_capabilities?
raise ArgumentError, "Class can't be marked as idempotent if data_consistency is not set to :always"
end
end end
# If data_consistency is not set to :always, worker will try to utilize load balancing capabilities and use the replica # If data_consistency is not set to :always, worker will try to utilize load balancing capabilities and use the replica
...@@ -147,8 +136,6 @@ module WorkerAttributes ...@@ -147,8 +136,6 @@ module WorkerAttributes
def idempotent! def idempotent!
set_class_attribute(:idempotent, true) set_class_attribute(:idempotent, true)
validate_worker_attributes!
end end
def idempotent? def idempotent?
......
---
name: preserve_latest_wal_locations_for_idempotent_jobs
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/66280
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/338350
milestone: '14.3'
type: development
group: group::memory
default_enabled: false
...@@ -187,6 +187,19 @@ module Gitlab ...@@ -187,6 +187,19 @@ module Gitlab
row['system_identifier'] row['system_identifier']
end end
def pg_wal_lsn_diff(location1, location2)
lsn1 = connection.quote(location1)
lsn2 = connection.quote(location2)
query = <<-SQL.squish
SELECT pg_wal_lsn_diff(#{lsn1}, #{lsn2})
AS result
SQL
row = connection.select_all(query).first
row['result'] if row
end
# @param [ActiveRecord::Connection] ar_connection # @param [ActiveRecord::Connection] ar_connection
# @return [String] # @return [String]
def get_write_location(ar_connection) def get_write_location(ar_connection)
......
...@@ -57,7 +57,7 @@ module Gitlab ...@@ -57,7 +57,7 @@ module Gitlab
end end
def get_wal_locations(job) def get_wal_locations(job)
job['wal_locations'] || legacy_wal_location(job) job['dedup_wal_locations'] || job['wal_locations'] || legacy_wal_location(job)
end end
# Already scheduled jobs could still contain legacy database write location. # Already scheduled jobs could still contain legacy database write location.
......
...@@ -69,6 +69,7 @@ module Gitlab ...@@ -69,6 +69,7 @@ module Gitlab
message = base_message(payload) message = base_message(payload)
payload['load_balancing_strategy'] = job['load_balancing_strategy'] if job['load_balancing_strategy'] payload['load_balancing_strategy'] = job['load_balancing_strategy'] if job['load_balancing_strategy']
payload['dedup_wal_locations'] = job['dedup_wal_locations'] if job['dedup_wal_locations'].present?
if job_exception if job_exception
payload['message'] = "#{message}: fail: #{payload['duration_s']} sec" payload['message'] = "#{message}: fail: #{payload['duration_s']} sec"
......
...@@ -17,10 +17,26 @@ module Gitlab ...@@ -17,10 +17,26 @@ module Gitlab
# #
# When new jobs can be scheduled again, the strategy calls `#delete`. # When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob class DuplicateJob
include Gitlab::Utils::StrongMemoize
DUPLICATE_KEY_TTL = 6.hours DUPLICATE_KEY_TTL = 6.hours
WAL_LOCATION_TTL = 60.seconds
MAX_REDIS_RETRIES = 5
DEFAULT_STRATEGY = :until_executing DEFAULT_STRATEGY = :until_executing
STRATEGY_NONE = :none STRATEGY_NONE = :none
LUA_SET_WAL_SCRIPT = <<~EOS
local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3]
local existing_offset = redis.call("LINDEX", key, -1)
if existing_offset == false then
redis.call("RPUSH", key, wal, offset)
redis.call("EXPIRE", key, ttl)
elseif offset > tonumber(existing_offset) then
redis.call("LSET", key, 0, wal)
redis.call("LSET", key, -1, offset)
end
EOS
attr_reader :existing_jid attr_reader :existing_jid
def initialize(job, queue_name) def initialize(job, queue_name)
...@@ -44,22 +60,59 @@ module Gitlab ...@@ -44,22 +60,59 @@ module Gitlab
# This method will return the jid that was set in redis # This method will return the jid that was set in redis
def check!(expiry = DUPLICATE_KEY_TTL) def check!(expiry = DUPLICATE_KEY_TTL)
read_jid = nil read_jid = nil
read_wal_locations = {}
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.multi do |multi| redis.multi do |multi|
redis.set(idempotency_key, jid, ex: expiry, nx: true) redis.set(idempotency_key, jid, ex: expiry, nx: true)
read_wal_locations = check_existing_wal_locations!(redis, expiry)
read_jid = redis.get(idempotency_key) read_jid = redis.get(idempotency_key)
end end
end end
job['idempotency_key'] = idempotency_key job['idempotency_key'] = idempotency_key
# We need to fetch values since the read_wal_locations and read_jid were obtained inside transaction, under redis.multi command.
self.existing_wal_locations = read_wal_locations.transform_values(&:value)
self.existing_jid = read_jid.value self.existing_jid = read_jid.value
end end
def update_latest_wal_location!
return unless job_wal_locations.present?
Sidekiq.redis do |redis|
redis.multi do
job_wal_locations.each do |connection_name, location|
redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
end
end
end
end
def latest_wal_locations
return {} unless job_wal_locations.present?
strong_memoize(:latest_wal_locations) do
read_wal_locations = {}
Sidekiq.redis do |redis|
redis.multi do
job_wal_locations.keys.each do |connection_name|
read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0)
end
end
end
read_wal_locations.transform_values(&:value).compact
end
end
def delete! def delete!
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.del(idempotency_key) redis.multi do |multi|
redis.del(idempotency_key)
delete_wal_locations!(redis)
end
end end
end end
...@@ -93,6 +146,7 @@ module Gitlab ...@@ -93,6 +146,7 @@ module Gitlab
private private
attr_accessor :existing_wal_locations
attr_reader :queue_name, :job attr_reader :queue_name, :job
attr_writer :existing_jid attr_writer :existing_jid
...@@ -100,6 +154,10 @@ module Gitlab ...@@ -100,6 +154,10 @@ module Gitlab
@worker_klass ||= worker_class_name.to_s.safe_constantize @worker_klass ||= worker_class_name.to_s.safe_constantize
end end
def pg_wal_lsn_diff(connection_name)
Gitlab::Database::DATABASES[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
end
def strategy def strategy
return DEFAULT_STRATEGY unless worker_klass return DEFAULT_STRATEGY unless worker_klass
return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?) return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
...@@ -120,6 +178,20 @@ module Gitlab ...@@ -120,6 +178,20 @@ module Gitlab
job['jid'] job['jid']
end end
def job_wal_locations
return {} unless preserve_wal_location?
job['wal_locations'] || {}
end
def existing_wal_location_key(connection_name)
"#{idempotency_key}:#{connection_name}:existing_wal_location"
end
def wal_location_key(connection_name)
"#{idempotency_key}:#{connection_name}:wal_location"
end
def idempotency_key def idempotency_key
@idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}" @idempotency_key ||= job['idempotency_key'] || "#{namespace}:#{idempotency_hash}"
end end
...@@ -135,6 +207,29 @@ module Gitlab ...@@ -135,6 +207,29 @@ module Gitlab
def idempotency_string def idempotency_string
"#{worker_class_name}:#{Sidekiq.dump_json(arguments)}" "#{worker_class_name}:#{Sidekiq.dump_json(arguments)}"
end end
def delete_wal_locations!(redis)
job_wal_locations.keys.each do |connection_name|
redis.del(wal_location_key(connection_name))
redis.del(existing_wal_location_key(connection_name))
end
end
def check_existing_wal_locations!(redis, expiry)
read_wal_locations = {}
job_wal_locations.each do |connection_name, location|
key = existing_wal_location_key(connection_name)
redis.set(key, location, ex: expiry, nx: true)
read_wal_locations[connection_name] = redis.get(key)
end
read_wal_locations
end
def preserve_wal_location?
Feature.enabled?(:preserve_latest_wal_locations_for_idempotent_jobs, default_enabled: :yaml)
end
end end
end end
end end
......
...@@ -14,6 +14,8 @@ module Gitlab ...@@ -14,6 +14,8 @@ module Gitlab
job['duplicate-of'] = duplicate_job.existing_jid job['duplicate-of'] = duplicate_job.existing_jid
if duplicate_job.idempotent? if duplicate_job.idempotent?
duplicate_job.update_latest_wal_location!
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log( Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(
job, "dropped #{strategy_name}", duplicate_job.options) job, "dropped #{strategy_name}", duplicate_job.options)
return false return false
...@@ -23,8 +25,16 @@ module Gitlab ...@@ -23,8 +25,16 @@ module Gitlab
yield yield
end end
def perform(job)
update_job_wal_location!(job)
end
private private
def update_job_wal_location!(job)
job['dedup_wal_locations'] = duplicate_job.latest_wal_locations if duplicate_job.latest_wal_locations.present?
end
def deduplicatable_job? def deduplicatable_job?
!duplicate_job.scheduled? || duplicate_job.options[:including_scheduled] !duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
end end
......
...@@ -8,9 +8,14 @@ module Gitlab ...@@ -8,9 +8,14 @@ module Gitlab
# removes the lock after the job has executed preventing a new job to be queued # removes the lock after the job has executed preventing a new job to be queued
# while a job is still executing. # while a job is still executing.
class UntilExecuted < Base class UntilExecuted < Base
extend ::Gitlab::Utils::Override
include DeduplicatesWhenScheduling include DeduplicatesWhenScheduling
def perform(_job) override :perform
def perform(job)
super
yield yield
duplicate_job.delete! duplicate_job.delete!
......
...@@ -8,9 +8,13 @@ module Gitlab ...@@ -8,9 +8,13 @@ module Gitlab
# removes the lock before the job starts allowing a new job to be queued # removes the lock before the job starts allowing a new job to be queued
# while a job is still executing. # while a job is still executing.
class UntilExecuting < Base class UntilExecuting < Base
extend ::Gitlab::Utils::Override
include DeduplicatesWhenScheduling include DeduplicatesWhenScheduling
def perform(_job) override :perform
def perform(job)
super
duplicate_job.delete! duplicate_job.delete!
yield yield
......
...@@ -98,6 +98,16 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -98,6 +98,16 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
it_behaves_like 'replica is up to date', 'replica' it_behaves_like 'replica is up to date', 'replica'
end end
context 'when deduplication wal location is set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'dedup_wal_locations' => wal_locations } }
before do
allow(load_balancer).to receive(:select_up_to_date_host).with(wal_locations[:main]).and_return(true)
end
it_behaves_like 'replica is up to date', 'replica'
end
context 'when legacy wal location is set' do context 'when legacy wal location is set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'database_write_location' => '0/D525E3A8' } } let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'database_write_location' => '0/D525E3A8' } }
......
...@@ -9,7 +9,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -9,7 +9,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
described_class.new(job, queue) described_class.new(job, queue)
end end
let(:job) { { 'class' => 'AuthorizedProjectsWorker', 'args' => [1], 'jid' => '123' } } let(:wal_locations) do
{
main: '0/D525E3A8',
ci: 'AB/12345'
}
end
let(:job) { { 'class' => 'AuthorizedProjectsWorker', 'args' => [1], 'jid' => '123', 'wal_locations' => wal_locations } }
let(:queue) { 'authorized_projects' } let(:queue) { 'authorized_projects' }
let(:idempotency_key) do let(:idempotency_key) do
...@@ -74,13 +81,39 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -74,13 +81,39 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
context 'when there was no job in the queue yet' do context 'when there was no job in the queue yet' do
it { expect(duplicate_job.check!).to eq('123') } it { expect(duplicate_job.check!).to eq('123') }
it "adds a key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do it "adds a idempotency key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do
expect { duplicate_job.check! } expect { duplicate_job.check! }
.to change { read_idempotency_key_with_ttl(idempotency_key) } .to change { read_idempotency_key_with_ttl(idempotency_key) }
.from([nil, -2]) .from([nil, -2])
.to(['123', be_within(1).of(described_class::DUPLICATE_KEY_TTL)]) .to(['123', be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
end end
context 'when wal locations is not empty' do
it "adds a existing wal locations key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do
expect { duplicate_job.check! }
.to change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) }
.from([nil, -2])
.to([wal_locations[:main], be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
.and change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) }
.from([nil, -2])
.to([wal_locations[:ci], be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
end
end
context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
before do
stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
end
it "does not change the existing wal locations key's TTL" do
expect { duplicate_job.check! }
.to not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) }
.from([nil, -2])
.and not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) }
.from([nil, -2])
end
end
it "adds the idempotency key to the jobs payload" do it "adds the idempotency key to the jobs payload" do
expect { duplicate_job.check! }.to change { job['idempotency_key'] }.from(nil).to(idempotency_key) expect { duplicate_job.check! }.to change { job['idempotency_key'] }.from(nil).to(idempotency_key)
end end
...@@ -89,6 +122,9 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -89,6 +122,9 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
context 'when there was already a job with same arguments in the same queue' do context 'when there was already a job with same arguments in the same queue' do
before do before do
set_idempotency_key(idempotency_key, 'existing-key') set_idempotency_key(idempotency_key, 'existing-key')
wal_locations.each do |config_name, location|
set_idempotency_key(existing_wal_location_key(idempotency_key, config_name), location)
end
end end
it { expect(duplicate_job.check!).to eq('existing-key') } it { expect(duplicate_job.check!).to eq('existing-key') }
...@@ -99,6 +135,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -99,6 +135,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
.from(['existing-key', -1]) .from(['existing-key', -1])
end end
it "does not change the existing wal locations key's TTL" do
expect { duplicate_job.check! }
.to not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) }
.from([wal_locations[:main], -1])
.and not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) }
.from([wal_locations[:ci], -1])
end
it 'sets the existing jid' do it 'sets the existing jid' do
duplicate_job.check! duplicate_job.check!
...@@ -107,6 +151,117 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -107,6 +151,117 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end end
end end
describe '#update_latest_wal_location!' do
let(:offset) { '1024' }
before do
allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:main).and_return(offset)
allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:ci).and_return(offset)
end
shared_examples 'updates wal location' do
it 'updates a wal location to redis with an offset' do
expect { duplicate_job.update_latest_wal_location! }
.to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
.from(existing_wal_with_offset[:main])
.to(new_wal_with_offset[:main])
.and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
.from(existing_wal_with_offset[:ci])
.to(new_wal_with_offset[:ci])
end
end
context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
before do
stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
end
it "doesn't call Sidekiq.redis" do
expect(Sidekiq).not_to receive(:redis)
duplicate_job.update_latest_wal_location!
end
it "doesn't update a wal location to redis with an offset" do
expect { duplicate_job.update_latest_wal_location! }
.to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
.from([])
.and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
.from([])
end
end
context "when the key doesn't exists in redis" do
include_examples 'updates wal location' do
let(:existing_wal_with_offset) { { main: [], ci: [] } }
let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } }
end
end
context "when the key exists in redis" do
let(:existing_offset) { '1023'}
let(:existing_wal_locations) do
{
main: '0/D525E3NM',
ci: 'AB/111112'
}
end
before do
rpush_to_redis_key(wal_location_key(idempotency_key, :main), existing_wal_locations[:main], existing_offset)
rpush_to_redis_key(wal_location_key(idempotency_key, :ci), existing_wal_locations[:ci], existing_offset)
end
context "when the new offset is bigger then the existing one" do
include_examples 'updates wal location' do
let(:existing_wal_with_offset) { existing_wal_locations.transform_values { |v| [v, existing_offset] } }
let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } }
end
end
context "when the old offset is not bigger then the existing one" do
let(:existing_offset) { offset }
it "does not update a wal location to redis with an offset" do
expect { duplicate_job.update_latest_wal_location! }
.to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
.from([existing_wal_locations[:main], existing_offset])
.and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
.from([existing_wal_locations[:ci], existing_offset])
end
end
end
end
describe '#latest_wal_locations' do
context 'when job was deduplicated and wal locations were already persisted' do
before do
rpush_to_redis_key(wal_location_key(idempotency_key, :main), wal_locations[:main], 1024)
rpush_to_redis_key(wal_location_key(idempotency_key, :ci), wal_locations[:ci], 1024)
end
it { expect(duplicate_job.latest_wal_locations).to eq(wal_locations) }
end
context 'when job is not deduplication and wal locations were not persisted' do
it { expect(duplicate_job.latest_wal_locations).to be_empty }
end
context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
before do
stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
end
it "doesn't call Sidekiq.redis" do
expect(Sidekiq).not_to receive(:redis)
duplicate_job.latest_wal_locations
end
it { expect(duplicate_job.latest_wal_locations).to eq({}) }
end
end
describe '#delete!' do describe '#delete!' do
context "when we didn't track the definition" do context "when we didn't track the definition" do
it { expect { duplicate_job.delete! }.not_to raise_error } it { expect { duplicate_job.delete! }.not_to raise_error }
...@@ -115,14 +270,79 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -115,14 +270,79 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
context 'when the key exists in redis' do context 'when the key exists in redis' do
before do before do
set_idempotency_key(idempotency_key, 'existing-jid') set_idempotency_key(idempotency_key, 'existing-jid')
wal_locations.each do |config_name, location|
set_idempotency_key(existing_wal_location_key(idempotency_key, config_name), location)
set_idempotency_key(wal_location_key(idempotency_key, config_name), location)
end
end end
shared_examples 'deleting the duplicate job' do shared_examples 'deleting the duplicate job' do
it 'removes the key from redis' do shared_examples 'deleting keys from redis' do |key_name|
expect { duplicate_job.delete! } it "removes the #{key_name} from redis" do
.to change { read_idempotency_key_with_ttl(idempotency_key) } expect { duplicate_job.delete! }
.from(['existing-jid', -1]) .to change { read_idempotency_key_with_ttl(key) }
.to([nil, -2]) .from([from_value, -1])
.to([nil, -2])
end
end
shared_examples 'does not delete key from redis' do |key_name|
it "does not remove the #{key_name} from redis" do
expect { duplicate_job.delete! }
.to not_change { read_idempotency_key_with_ttl(key) }
.from([from_value, -1])
end
end
it_behaves_like 'deleting keys from redis', 'idempotent key' do
let(:key) { idempotency_key }
let(:from_value) { 'existing-jid' }
end
it_behaves_like 'deleting keys from redis', 'existing wal location keys for main database' do
let(:key) { existing_wal_location_key(idempotency_key, :main) }
let(:from_value) { wal_locations[:main] }
end
it_behaves_like 'deleting keys from redis', 'existing wal location keys for ci database' do
let(:key) { existing_wal_location_key(idempotency_key, :ci) }
let(:from_value) { wal_locations[:ci] }
end
it_behaves_like 'deleting keys from redis', 'latest wal location keys for main database' do
let(:key) { wal_location_key(idempotency_key, :main) }
let(:from_value) { wal_locations[:main] }
end
it_behaves_like 'deleting keys from redis', 'latest wal location keys for ci database' do
let(:key) { wal_location_key(idempotency_key, :ci) }
let(:from_value) { wal_locations[:ci] }
end
context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
before do
stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
end
it_behaves_like 'does not delete key from redis', 'latest wal location keys for main database' do
let(:key) { existing_wal_location_key(idempotency_key, :main) }
let(:from_value) { wal_locations[:main] }
end
it_behaves_like 'does not delete key from redis', 'latest wal location keys for ci database' do
let(:key) { existing_wal_location_key(idempotency_key, :ci) }
let(:from_value) { wal_locations[:ci] }
end
it_behaves_like 'does not delete key from redis', 'latest wal location keys for main database' do
let(:key) { wal_location_key(idempotency_key, :main) }
let(:from_value) { wal_locations[:main] }
end
it_behaves_like 'does not delete key from redis', 'latest wal location keys for ci database' do
let(:key) { wal_location_key(idempotency_key, :ci) }
let(:from_value) { wal_locations[:ci] }
end
end end
end end
...@@ -254,10 +474,22 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -254,10 +474,22 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end end
end end
def existing_wal_location_key(idempotency_key, config_name)
"#{idempotency_key}:#{config_name}:existing_wal_location"
end
def wal_location_key(idempotency_key, config_name)
"#{idempotency_key}:#{config_name}:wal_location"
end
def set_idempotency_key(key, value = '1') def set_idempotency_key(key, value = '1')
Sidekiq.redis { |r| r.set(key, value) } Sidekiq.redis { |r| r.set(key, value) }
end end
def rpush_to_redis_key(key, wal, offset)
Sidekiq.redis { |r| r.rpush(key, [wal, offset]) }
end
def read_idempotency_key_with_ttl(key) def read_idempotency_key_with_ttl(key)
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.pipelined do |p| redis.pipelined do |p|
...@@ -266,4 +498,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -266,4 +498,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end end
end end
end end
def read_range_from_redis(key)
Sidekiq.redis do |redis|
redis.lrange(key, 0, -1)
end
end
end end
...@@ -7,6 +7,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut ...@@ -7,6 +7,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut
describe '#perform' do describe '#perform' do
let(:proc) { -> {} } let(:proc) { -> {} }
before do
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( {} )
end
it 'deletes the lock after executing' do it 'deletes the lock after executing' do
expect(proc).to receive(:call).ordered expect(proc).to receive(:call).ordered
expect(fake_duplicate_job).to receive(:delete!).ordered expect(fake_duplicate_job).to receive(:delete!).ordered
......
...@@ -7,6 +7,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut ...@@ -7,6 +7,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecut
describe '#perform' do describe '#perform' do
let(:proc) { -> {} } let(:proc) { -> {} }
before do
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( {} )
end
it 'deletes the lock before executing' do it 'deletes the lock before executing' do
expect(fake_duplicate_job).to receive(:delete!).ordered expect(fake_duplicate_job).to receive(:delete!).ordered
expect(proc).to receive(:call).ordered expect(proc).to receive(:call).ordered
......
...@@ -39,6 +39,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -39,6 +39,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false) allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid') allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true) allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
allow(fake_duplicate_job).to receive(:options).and_return({}) allow(fake_duplicate_job).to receive(:options).and_return({})
job_hash = {} job_hash = {}
...@@ -63,6 +64,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -63,6 +64,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL) .with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.and_return('the jid')) .and_return('the jid'))
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true) allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
job_hash = {} job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true) expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
...@@ -83,6 +85,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -83,6 +85,7 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
allow(fake_duplicate_job).to( allow(fake_duplicate_job).to(
receive(:check!).with(time_diff.to_i).and_return('the jid')) receive(:check!).with(time_diff.to_i).and_return('the jid'))
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true) allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
job_hash = {} job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true) expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
...@@ -105,6 +108,13 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -105,6 +108,13 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
allow(fake_duplicate_job).to receive(:options).and_return({}) allow(fake_duplicate_job).to receive(:options).and_return({})
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid') allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
allow(fake_duplicate_job).to receive(:idempotent?).and_return(true) allow(fake_duplicate_job).to receive(:idempotent?).and_return(true)
allow(fake_duplicate_job).to receive(:update_latest_wal_location!)
end
it 'updates latest wal location' do
expect(fake_duplicate_job).to receive(:update_latest_wal_location!)
strategy.schedule({ 'jid' => 'new jid' }) {}
end end
it 'drops the job' do it 'drops the job' do
...@@ -136,4 +146,46 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name| ...@@ -136,4 +146,46 @@ RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
end end
end end
end end
describe '#perform' do
let(:proc) { -> {} }
let(:job) { { 'jid' => 'new jid', 'wal_locations' => { 'main' => '0/1234', 'ci' => '0/1234' } } }
let(:wal_locations) do
{
main: '0/D525E3A8',
ci: 'AB/12345'
}
end
before do
allow(fake_duplicate_job).to receive(:delete!)
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( wal_locations )
end
it 'updates job hash with dedup_wal_locations' do
strategy.perform(job) do
proc.call
end
expect(job['dedup_wal_locations']).to eq(wal_locations)
end
shared_examples 'does not update job hash' do
it 'does not update job hash with dedup_wal_locations' do
strategy.perform(job) do
proc.call
end
expect(job).not_to include('dedup_wal_locations')
end
end
context 'when latest_wal_location is empty' do
before do
allow(fake_duplicate_job).to receive(:latest_wal_locations).and_return( {} )
end
include_examples 'does not update job hash'
end
end
end end
...@@ -35,45 +35,17 @@ RSpec.describe WorkerAttributes do ...@@ -35,45 +35,17 @@ RSpec.describe WorkerAttributes do
end end
end end
context 'when job is idempotent' do context 'when feature_flag is provided' do
context 'when data_consistency is not :always' do before do
it 'raise exception' do stub_feature_flags(test_feature_flag: false)
worker.idempotent! skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
expect { worker.data_consistency(:sticky) }
.to raise_error("Class can't be marked as idempotent if data_consistency is not set to :always")
end
end
context 'when feature_flag is provided' do
before do
stub_feature_flags(test_feature_flag: false)
skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
end
it 'returns correct feature flag value' do
worker.data_consistency(:sticky, feature_flag: :test_feature_flag)
expect(worker.get_data_consistency_feature_flag_enabled?).not_to be_truthy
end
end end
end
end
describe '.idempotent!' do
it 'sets `idempotent` attribute of the worker class to true' do
worker.idempotent!
expect(worker.send(:class_attributes)[:idempotent]).to eq(true) it 'returns correct feature flag value' do
end worker.data_consistency(:sticky, feature_flag: :test_feature_flag)
context 'when data consistency is not :always' do
it 'raise exception' do
worker.data_consistency(:sticky)
expect { worker.idempotent! } expect(worker.get_data_consistency_feature_flag_enabled?).not_to be_truthy
.to raise_error("Class can't be marked as idempotent if data_consistency is not set to :always")
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment