Commit b6f56d4b authored by Kamil Trzciński's avatar Kamil Trzciński

Re-expose `model.connection.load_balancer` to have LB of the connection

The is required by `pg_wal_lsn_diff` to execute against closest
connection to compare WAL pointers. Without that it causes `duplicate_job` to fail.

Ensure that we test an actual invocations with PG.
parent 6c981fe6
...@@ -13,15 +13,14 @@ module Gitlab ...@@ -13,15 +13,14 @@ module Gitlab
WriteInsideReadOnlyTransactionError = Class.new(StandardError) WriteInsideReadOnlyTransactionError = Class.new(StandardError)
READ_ONLY_TRANSACTION_KEY = :load_balacing_read_only_transaction READ_ONLY_TRANSACTION_KEY = :load_balacing_read_only_transaction
# The load balancer is intentionally not exposed since the returned instance # The load balancer returned by connection might be different
# might be different `model.connection.load_balancer` vs `model.load_balancer` # between `model.connection.load_balancer` vs `model.load_balancer`
# #
# The used `model.connection` is dependent on `use_model_load_balancing`. # The used `model.connection` is dependent on `use_model_load_balancing`.
# See more in: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/73949. # See more in: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/73949.
# #
# Always use `model.load_balancer` or `model.sticking`. # Always use `model.load_balancer` or `model.sticking`.
# attr_reader :load_balancer
# attr_reader :load_balancer
# These methods perform writes after which we need to stick to the # These methods perform writes after which we need to stick to the
# primary. # primary.
......
...@@ -84,7 +84,11 @@ module Gitlab ...@@ -84,7 +84,11 @@ module Gitlab
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.multi do |multi| redis.multi do |multi|
job_wal_locations.each do |connection_name, location| job_wal_locations.each do |connection_name, location|
multi.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]) multi.eval(
LUA_SET_WAL_SCRIPT,
keys: [wal_location_key(connection_name)],
argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]
)
end end
end end
end end
......
...@@ -174,26 +174,21 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -174,26 +174,21 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end end
describe '#update_latest_wal_location!' do describe '#update_latest_wal_location!' do
let(:offset) { '1024' }
before do before do
allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:main).and_return(offset) allow(Gitlab::Database).to receive(:database_base_models).and_return(
allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:ci).and_return(offset) { main: ::ActiveRecord::Base,
end ci: ::ActiveRecord::Base })
shared_examples 'updates wal location' do set_idempotency_key(existing_wal_location_key(idempotency_key, :main), existing_wal[:main])
it 'updates a wal location to redis with an offset' do set_idempotency_key(existing_wal_location_key(idempotency_key, :ci), existing_wal[:ci])
expect { duplicate_job.update_latest_wal_location! }
.to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) } # read existing_wal_locations
.from(existing_wal_with_offset[:main]) duplicate_job.check!
.to(new_wal_with_offset[:main])
.and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
.from(existing_wal_with_offset[:ci])
.to(new_wal_with_offset[:ci])
end
end end
context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
let(:existing_wal) { {} }
before do before do
stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false) stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
end end
...@@ -214,42 +209,107 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -214,42 +209,107 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end end
context "when the key doesn't exists in redis" do context "when the key doesn't exists in redis" do
include_examples 'updates wal location' do let(:existing_wal) do
let(:existing_wal_with_offset) { { main: [], ci: [] } } {
let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } } main: '0/D525E3A0',
ci: 'AB/12340'
}
end end
end
context "when the key exists in redis" do let(:new_wal_location_with_offset) do
let(:existing_offset) { '1023'}
let(:existing_wal_locations) do
{ {
main: '0/D525E3NM', # offset is relative to `existing_wal`
ci: 'AB/111112' main: ['0/D525E3A8', '8'],
ci: ['AB/12345', '5']
} }
end end
let(:wal_locations) { new_wal_location_with_offset.transform_values(&:first) }
it 'stores a wal location to redis with an offset relative to existing wal location' do
expect { duplicate_job.update_latest_wal_location! }
.to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
.from([])
.to(new_wal_location_with_offset[:main])
.and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
.from([])
.to(new_wal_location_with_offset[:ci])
end
end
context "when the key exists in redis" do
before do before do
rpush_to_redis_key(wal_location_key(idempotency_key, :main), existing_wal_locations[:main], existing_offset) rpush_to_redis_key(wal_location_key(idempotency_key, :main), *stored_wal_location_with_offset[:main])
rpush_to_redis_key(wal_location_key(idempotency_key, :ci), existing_wal_locations[:ci], existing_offset) rpush_to_redis_key(wal_location_key(idempotency_key, :ci), *stored_wal_location_with_offset[:ci])
end end
let(:wal_locations) { new_wal_location_with_offset.transform_values(&:first) }
context "when the new offset is bigger then the existing one" do context "when the new offset is bigger then the existing one" do
include_examples 'updates wal location' do let(:existing_wal) do
let(:existing_wal_with_offset) { existing_wal_locations.transform_values { |v| [v, existing_offset] } } {
let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } } main: '0/D525E3A0',
ci: 'AB/12340'
}
end
let(:stored_wal_location_with_offset) do
{
# offset is relative to `existing_wal`
main: ['0/D525E3A3', '3'],
ci: ['AB/12342', '2']
}
end
let(:new_wal_location_with_offset) do
{
# offset is relative to `existing_wal`
main: ['0/D525E3A8', '8'],
ci: ['AB/12345', '5']
}
end
it 'updates a wal location to redis with an offset' do
expect { duplicate_job.update_latest_wal_location! }
.to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
.from(stored_wal_location_with_offset[:main])
.to(new_wal_location_with_offset[:main])
.and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
.from(stored_wal_location_with_offset[:ci])
.to(new_wal_location_with_offset[:ci])
end end
end end
context "when the old offset is not bigger then the existing one" do context "when the old offset is not bigger then the existing one" do
let(:existing_offset) { offset } let(:existing_wal) do
{
main: '0/D525E3A0',
ci: 'AB/12340'
}
end
let(:stored_wal_location_with_offset) do
{
# offset is relative to `existing_wal`
main: ['0/D525E3A8', '8'],
ci: ['AB/12345', '5']
}
end
let(:new_wal_location_with_offset) do
{
# offset is relative to `existing_wal`
main: ['0/D525E3A2', '2'],
ci: ['AB/12342', '2']
}
end
it "does not update a wal location to redis with an offset" do it "does not update a wal location to redis with an offset" do
expect { duplicate_job.update_latest_wal_location! } expect { duplicate_job.update_latest_wal_location! }
.to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) } .to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
.from([existing_wal_locations[:main], existing_offset]) .from(stored_wal_location_with_offset[:main])
.and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) } .and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
.from([existing_wal_locations[:ci], existing_offset]) .from(stored_wal_location_with_offset[:ci])
end end
end end
end end
...@@ -619,12 +679,12 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi ...@@ -619,12 +679,12 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end end
end end
def existing_wal_location_key(idempotency_key, config_name) def existing_wal_location_key(idempotency_key, connection_name)
"#{idempotency_key}:#{config_name}:existing_wal_location" "#{idempotency_key}:#{connection_name}:existing_wal_location"
end end
def wal_location_key(idempotency_key, config_name) def wal_location_key(idempotency_key, connection_name)
"#{idempotency_key}:#{config_name}:wal_location" "#{idempotency_key}:#{connection_name}:wal_location"
end end
def set_idempotency_key(key, value = '1') def set_idempotency_key(key, value = '1')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment