Commit 40735f54 authored by Robert Speicher's avatar Robert Speicher

Merge branch 'pb-generate-iid-in-db-behind-ff' into 'master'

Use direct SQL to generate internal ids

See merge request gitlab-org/gitlab!65590
parents 90f27cfe 56134706
...@@ -159,9 +159,8 @@ module AtomicInternalId ...@@ -159,9 +159,8 @@ module AtomicInternalId
# Defines class methods: # Defines class methods:
# #
# - with_{scope}_{column}_supply # - with_{scope}_{column}_supply
# This method can be used to allocate a block of IID values during # This method can be used to allocate a stream of IID values during
# bulk operations (importing/copying, etc). This can be more efficient # bulk operations (importing/copying, etc).
# than creating instances one-by-one.
# #
# Pass in a block that receives a `Supply` instance. To allocate a new # Pass in a block that receives a `Supply` instance. To allocate a new
# IID value, call `Supply#next_value`. # IID value, call `Supply#next_value`.
...@@ -181,14 +180,8 @@ module AtomicInternalId ...@@ -181,14 +180,8 @@ module AtomicInternalId
scope_attrs = ::AtomicInternalId.scope_attrs(scope_value) scope_attrs = ::AtomicInternalId.scope_attrs(scope_value)
usage = ::AtomicInternalId.scope_usage(self) usage = ::AtomicInternalId.scope_usage(self)
generator = InternalId::InternalIdGenerator.new(subject, scope_attrs, usage, init) supply = Supply.new(-> { InternalId.generate_next(subject, scope_attrs, usage, init) })
block.call(supply)
generator.with_lock do
supply = Supply.new(generator.record.last_value)
block.call(supply)
ensure
generator.track_greatest(supply.current_value) if supply
end
end end
end end
end end
...@@ -236,14 +229,14 @@ module AtomicInternalId ...@@ -236,14 +229,14 @@ module AtomicInternalId
end end
class Supply class Supply
attr_reader :current_value attr_reader :generator
def initialize(start_value) def initialize(generator)
@current_value = start_value @generator = generator
end end
def next_value def next_value
@current_value += 1 @generator.call
end end
end end
end end
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
# * Add `usage` value to enum # * Add `usage` value to enum
# * (Optionally) add columns to `internal_ids` if needed for scope. # * (Optionally) add columns to `internal_ids` if needed for scope.
class InternalId < ApplicationRecord class InternalId < ApplicationRecord
include Gitlab::Utils::StrongMemoize extend Gitlab::Utils::StrongMemoize
belongs_to :project belongs_to :project
belongs_to :namespace belongs_to :namespace
...@@ -25,6 +25,10 @@ class InternalId < ApplicationRecord ...@@ -25,6 +25,10 @@ class InternalId < ApplicationRecord
validates :usage, presence: true validates :usage, presence: true
scope :filter_by, -> (scope, usage) do
where(**scope, usage: usage)
end
# Increments #last_value and saves the record # Increments #last_value and saves the record
# #
# The operation locks the record and gathers a `ROW SHARE` lock (in PostgreSQL). # The operation locks the record and gathers a `ROW SHARE` lock (in PostgreSQL).
...@@ -53,18 +57,15 @@ class InternalId < ApplicationRecord ...@@ -53,18 +57,15 @@ class InternalId < ApplicationRecord
class << self class << self
def track_greatest(subject, scope, usage, new_value, init) def track_greatest(subject, scope, usage, new_value, init)
InternalIdGenerator.new(subject, scope, usage, init) build_generator(subject, scope, usage, init).track_greatest(new_value)
.track_greatest(new_value)
end end
def generate_next(subject, scope, usage, init) def generate_next(subject, scope, usage, init)
InternalIdGenerator.new(subject, scope, usage, init) build_generator(subject, scope, usage, init).generate
.generate
end end
def reset(subject, scope, usage, value) def reset(subject, scope, usage, value)
InternalIdGenerator.new(subject, scope, usage) build_generator(subject, scope, usage).reset(value)
.reset(value)
end end
# Flushing records is generally safe in a sense that those # Flushing records is generally safe in a sense that those
...@@ -77,11 +78,36 @@ class InternalId < ApplicationRecord ...@@ -77,11 +78,36 @@ class InternalId < ApplicationRecord
where(filter).delete_all where(filter).delete_all
end end
def internal_id_transactions_increment(operation:, usage:)
self.internal_id_transactions_total.increment(
operation: operation,
usage: usage.to_s,
in_transaction: ActiveRecord::Base.connection.transaction_open?.to_s
)
end
def internal_id_transactions_total
strong_memoize(:internal_id_transactions_total) do
name = :gitlab_internal_id_transactions_total
comment = 'Counts all the internal ids happening within transaction'
Gitlab::Metrics.counter(name, comment)
end
end
private
def build_generator(subject, scope, usage, init = nil)
if Feature.enabled?(:generate_iids_without_explicit_locking)
ImplicitlyLockingInternalIdGenerator.new(subject, scope, usage, init)
else
InternalIdGenerator.new(subject, scope, usage, init)
end
end
end end
class InternalIdGenerator class InternalIdGenerator
extend Gitlab::Utils::StrongMemoize
# Generate next internal id for a given scope and usage. # Generate next internal id for a given scope and usage.
# #
# For currently supported usages, see #usage enum. # For currently supported usages, see #usage enum.
...@@ -117,7 +143,7 @@ class InternalId < ApplicationRecord ...@@ -117,7 +143,7 @@ class InternalId < ApplicationRecord
# init: Block that gets called to initialize InternalId record if not present # init: Block that gets called to initialize InternalId record if not present
# Make sure to not throw exceptions in the absence of records (if this is expected). # Make sure to not throw exceptions in the absence of records (if this is expected).
def generate def generate
self.class.internal_id_transactions_increment(operation: :generate, usage: usage) InternalId.internal_id_transactions_increment(operation: :generate, usage: usage)
subject.transaction do subject.transaction do
# Create a record in internal_ids if one does not yet exist # Create a record in internal_ids if one does not yet exist
...@@ -134,7 +160,7 @@ class InternalId < ApplicationRecord ...@@ -134,7 +160,7 @@ class InternalId < ApplicationRecord
def reset(value) def reset(value)
return false unless value return false unless value
self.class.internal_id_transactions_increment(operation: :reset, usage: usage) InternalId.internal_id_transactions_increment(operation: :reset, usage: usage)
updated = updated =
InternalId InternalId
...@@ -149,8 +175,9 @@ class InternalId < ApplicationRecord ...@@ -149,8 +175,9 @@ class InternalId < ApplicationRecord
# and set its new_value if it is higher than the current last_value # and set its new_value if it is higher than the current last_value
# #
# Note this will acquire a ROW SHARE lock on the InternalId record # Note this will acquire a ROW SHARE lock on the InternalId record
def track_greatest(new_value) def track_greatest(new_value)
self.class.internal_id_transactions_increment(operation: :track_greatest, usage: usage) InternalId.internal_id_transactions_increment(operation: :track_greatest, usage: usage)
subject.transaction do subject.transaction do
record.track_greatest_and_save!(new_value) record.track_greatest_and_save!(new_value)
...@@ -162,7 +189,7 @@ class InternalId < ApplicationRecord ...@@ -162,7 +189,7 @@ class InternalId < ApplicationRecord
end end
def with_lock(&block) def with_lock(&block)
self.class.internal_id_transactions_increment(operation: :with_lock, usage: usage) InternalId.internal_id_transactions_increment(operation: :with_lock, usage: usage)
record.with_lock(&block) record.with_lock(&block)
end end
...@@ -199,22 +226,118 @@ class InternalId < ApplicationRecord ...@@ -199,22 +226,118 @@ class InternalId < ApplicationRecord
rescue ActiveRecord::RecordNotUnique rescue ActiveRecord::RecordNotUnique
lookup lookup
end end
end
def self.internal_id_transactions_increment(operation:, usage:) class ImplicitlyLockingInternalIdGenerator
self.internal_id_transactions_total.increment( # Generate next internal id for a given scope and usage.
operation: operation, #
usage: usage.to_s, # For currently supported usages, see #usage enum.
in_transaction: ActiveRecord::Base.connection.transaction_open?.to_s #
) # The method implements a locking scheme that has the following properties:
# 1) Generated sequence of internal ids is unique per (scope and usage)
# 2) The method is thread-safe and may be used in concurrent threads/processes.
# 3) The generated sequence is gapless.
# 4) In the absence of a record in the internal_ids table, one will be created
# and last_value will be calculated on the fly.
#
# subject: The instance or class we're generating an internal id for.
# scope: Attributes that define the scope for id generation.
# Valid keys are `project/project_id` and `namespace/namespace_id`.
# usage: Symbol to define the usage of the internal id, see InternalId.usages
# init: Proc that accepts the subject and the scope and returns Integer|NilClass
attr_reader :subject, :scope, :scope_attrs, :usage, :init
def initialize(subject, scope, usage, init = nil)
@subject = subject
@scope = scope
@usage = usage
@init = init
raise ArgumentError, 'Scope is not well-defined, need at least one column for scope (given: 0)' if scope.empty?
unless InternalId.usages.has_key?(usage.to_s)
raise ArgumentError, "Usage '#{usage}' is unknown. Supported values are #{InternalId.usages.keys} from InternalId.usages"
end
end end
def self.internal_id_transactions_total # Generates next internal id and returns it
strong_memoize(:internal_id_transactions_total) do # init: Block that gets called to initialize InternalId record if not present
name = :gitlab_internal_id_transactions_total # Make sure to not throw exceptions in the absence of records (if this is expected).
comment = 'Counts all the internal ids happening within transaction' def generate
InternalId.internal_id_transactions_increment(operation: :generate, usage: usage)
Gitlab::Metrics.counter(name, comment) next_iid = update_record!(subject, scope, usage, arel_table[:last_value] + 1)
return next_iid if next_iid
create_record!(subject, scope, usage, init) do |iid|
iid.last_value += 1
end end
rescue ActiveRecord::RecordNotUnique
retry
end
# Reset tries to rewind to `value-1`. This will only succeed,
# if `value` stored in database is equal to `last_value`.
# value: The expected last_value to decrement
def reset(value)
return false unless value
InternalId.internal_id_transactions_increment(operation: :reset, usage: usage)
iid = update_record!(subject, scope.merge(last_value: value), usage, arel_table[:last_value] - 1)
iid == value - 1
end
# Create a record in internal_ids if one does not yet exist
# and set its new_value if it is higher than the current last_value
def track_greatest(new_value)
InternalId.internal_id_transactions_increment(operation: :track_greatest, usage: usage)
function = Arel::Nodes::NamedFunction.new('GREATEST', [
arel_table[:last_value],
new_value.to_i
])
next_iid = update_record!(subject, scope, usage, function)
return next_iid if next_iid
create_record!(subject, scope, usage, init) do |object|
object.last_value = [object.last_value, new_value].max
end
rescue ActiveRecord::RecordNotUnique
retry
end
private
def update_record!(subject, scope, usage, new_value)
stmt = Arel::UpdateManager.new
stmt.table(arel_table)
stmt.set(arel_table[:last_value] => new_value)
stmt.wheres = InternalId.filter_by(scope, usage).arel.constraints
ActiveRecord::Base.connection.insert(stmt, 'Update InternalId', 'last_value')
end
def create_record!(subject, scope, usage, init)
raise ArgumentError, 'Cannot initialize without init!' unless init
instance = subject.is_a?(::Class) ? nil : subject
subject.transaction(requires_new: true) do
last_value = init.call(instance, scope) || 0
internal_id = InternalId.create!(**scope, usage: usage, last_value: last_value) do |subject|
yield subject if block_given?
end
internal_id.last_value
end
end
def arel_table
InternalId.arel_table
end end
end end
end end
---
name: generate_iids_without_explicit_locking
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/65590
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/335431
milestone: '14.2'
type: development
group: group::database
default_enabled: false
...@@ -240,18 +240,12 @@ RSpec.describe AtomicInternalId do ...@@ -240,18 +240,12 @@ RSpec.describe AtomicInternalId do
end end
describe '.with_project_iid_supply' do describe '.with_project_iid_supply' do
let(:iid) { 100 } it 'supplies a stream of iid values' do
expect do
it 'wraps generate and track_greatest in a concurrency-safe lock' do ::Milestone.with_project_iid_supply(milestone.project) do |supply|
expect_next_instance_of(InternalId::InternalIdGenerator) do |g| 4.times { supply.next_value }
expect(g).to receive(:with_lock).and_call_original end
expect(g.record).to receive(:last_value).and_return(iid) end.to change { InternalId.find_by(project: milestone.project, usage: :milestones)&.last_value.to_i }.by(4)
expect(g).to receive(:track_greatest).with(iid + 4)
end
::Milestone.with_project_iid_supply(milestone.project) do |supply|
4.times { supply.next_value }
end
end end
end end
end end
...@@ -39,216 +39,217 @@ RSpec.describe InternalId do ...@@ -39,216 +39,217 @@ RSpec.describe InternalId do
end end
end end
describe '.generate_next' do shared_examples_for 'a monotonically increasing id generator' do
subject { described_class.generate_next(id_subject, scope, usage, init) } describe '.generate_next' do
subject { described_class.generate_next(id_subject, scope, usage, init) }
context 'in the absence of a record' do context 'in the absence of a record' do
it 'creates a record if not yet present' do it 'creates a record if not yet present' do
expect { subject }.to change { described_class.count }.from(0).to(1) expect { subject }.to change { described_class.count }.from(0).to(1)
end end
it 'stores record attributes' do it 'stores record attributes' do
subject subject
described_class.first.tap do |record| described_class.first.tap do |record|
expect(record.project).to eq(project) expect(record.project).to eq(project)
expect(record.usage).to eq(usage.to_s) expect(record.usage).to eq(usage.to_s)
end
end end
end
context 'with existing issues' do context 'with existing issues' do
before do before do
create_list(:issue, 2, project: project) create_list(:issue, 2, project: project)
described_class.delete_all described_class.delete_all
end end
it 'calculates last_value values automatically' do it 'calculates last_value values automatically' do
expect(subject).to eq(project.issues.size + 1) expect(subject).to eq(project.issues.size + 1)
end
end end
end end
context 'with concurrent inserts on table' do it 'generates a strictly monotone, gapless sequence' do
it 'looks up the record if it was created concurrently' do seq = Array.new(10).map do
args = { **scope, usage: described_class.usages[usage.to_s] } described_class.generate_next(issue, scope, usage, init)
record = double
expect(described_class).to receive(:find_by).with(args).and_return(nil) # first call, record not present
expect(described_class).to receive(:find_by).with(args).and_return(record) # second call, record was created by another process
expect(described_class).to receive(:create!).and_raise(ActiveRecord::RecordNotUnique, 'record not unique')
expect(record).to receive(:increment_and_save!)
subject
end end
end normalized = seq.map { |i| i - seq.min }
end
it 'generates a strictly monotone, gapless sequence' do expect(normalized).to eq((0..seq.size - 1).to_a)
seq = Array.new(10).map do
described_class.generate_next(issue, scope, usage, init)
end end
normalized = seq.map { |i| i - seq.min }
expect(normalized).to eq((0..seq.size - 1).to_a)
end
context 'there are no instances to pass in' do context 'there are no instances to pass in' do
let(:id_subject) { Issue } let(:id_subject) { Issue }
it 'accepts classes instead' do it 'accepts classes instead' do
expect(subject).to eq(1) expect(subject).to eq(1)
end
end end
end
context 'when executed outside of transaction' do context 'when executed outside of transaction' do
it 'increments counter with in_transaction: "false"' do it 'increments counter with in_transaction: "false"' do
allow(ActiveRecord::Base.connection).to receive(:transaction_open?) { false } allow(ActiveRecord::Base.connection).to receive(:transaction_open?) { false }
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment) expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :generate, usage: 'issues', in_transaction: 'false').and_call_original .with(operation: :generate, usage: 'issues', in_transaction: 'false').and_call_original
subject subject
end
end end
end
context 'when executed within transaction' do context 'when executed within transaction' do
it 'increments counter with in_transaction: "true"' do it 'increments counter with in_transaction: "true"' do
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment) expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :generate, usage: 'issues', in_transaction: 'true').and_call_original .with(operation: :generate, usage: 'issues', in_transaction: 'true').and_call_original
InternalId.transaction { subject } InternalId.transaction { subject }
end
end end
end end
end
describe '.reset' do describe '.reset' do
subject { described_class.reset(issue, scope, usage, value) } subject { described_class.reset(issue, scope, usage, value) }
context 'in the absence of a record' do context 'in the absence of a record' do
let(:value) { 2 } let(:value) { 2 }
it 'does not revert back the value' do it 'does not revert back the value' do
expect { subject }.not_to change { described_class.count } expect { subject }.not_to change { described_class.count }
expect(subject).to be_falsey expect(subject).to be_falsey
end
end end
end
context 'when valid iid is used to reset' do context 'when valid iid is used to reset' do
let!(:value) { generate_next } let!(:value) { generate_next }
context 'and iid is a latest one' do context 'and iid is a latest one' do
it 'does rewind and next generated value is the same' do it 'does rewind and next generated value is the same' do
expect(subject).to be_truthy expect(subject).to be_truthy
expect(generate_next).to eq(value) expect(generate_next).to eq(value)
end
end end
end
context 'and iid is not a latest one' do context 'and iid is not a latest one' do
it 'does not rewind' do it 'does not rewind' do
generate_next generate_next
expect(subject).to be_falsey expect(subject).to be_falsey
expect(generate_next).to be > value expect(generate_next).to be > value
end
end end
end
def generate_next def generate_next
described_class.generate_next(issue, scope, usage, init) described_class.generate_next(issue, scope, usage, init)
end
end end
end
context 'when executed outside of transaction' do context 'when executed outside of transaction' do
let(:value) { 2 } let(:value) { 2 }
it 'increments counter with in_transaction: "false"' do it 'increments counter with in_transaction: "false"' do
allow(ActiveRecord::Base.connection).to receive(:transaction_open?) { false } allow(ActiveRecord::Base.connection).to receive(:transaction_open?) { false }
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment) expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :reset, usage: 'issues', in_transaction: 'false').and_call_original .with(operation: :reset, usage: 'issues', in_transaction: 'false').and_call_original
subject subject
end
end end
end
context 'when executed within transaction' do context 'when executed within transaction' do
let(:value) { 2 } let(:value) { 2 }
it 'increments counter with in_transaction: "true"' do it 'increments counter with in_transaction: "true"' do
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment) expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :reset, usage: 'issues', in_transaction: 'true').and_call_original .with(operation: :reset, usage: 'issues', in_transaction: 'true').and_call_original
InternalId.transaction { subject } InternalId.transaction { subject }
end
end end
end end
end
describe '.track_greatest' do describe '.track_greatest' do
let(:value) { 9001 } let(:value) { 9001 }
subject { described_class.track_greatest(id_subject, scope, usage, value, init) } subject { described_class.track_greatest(id_subject, scope, usage, value, init) }
context 'in the absence of a record' do context 'in the absence of a record' do
it 'creates a record if not yet present' do it 'creates a record if not yet present' do
expect { subject }.to change { described_class.count }.from(0).to(1) expect { subject }.to change { described_class.count }.from(0).to(1)
end
end end
end
it 'stores record attributes' do it 'stores record attributes' do
subject subject
described_class.first.tap do |record| described_class.first.tap do |record|
expect(record.project).to eq(project) expect(record.project).to eq(project)
expect(record.usage).to eq(usage.to_s) expect(record.usage).to eq(usage.to_s)
expect(record.last_value).to eq(value) expect(record.last_value).to eq(value)
end
end end
end
context 'with existing issues' do context 'with existing issues' do
before do before do
create(:issue, project: project) create(:issue, project: project)
described_class.delete_all described_class.delete_all
end end
it 'still returns the last value to that of the given value' do it 'still returns the last value to that of the given value' do
expect(subject).to eq(value) expect(subject).to eq(value)
end
end end
end
context 'when value is less than the current last_value' do context 'when value is less than the current last_value' do
it 'returns the current last_value' do it 'returns the current last_value' do
described_class.create!(**scope, usage: usage, last_value: 10_001) described_class.create!(**scope, usage: usage, last_value: 10_001)
expect(subject).to eq 10_001 expect(subject).to eq 10_001
end
end end
end
context 'there are no instances to pass in' do context 'there are no instances to pass in' do
let(:id_subject) { Issue } let(:id_subject) { Issue }
it 'accepts classes instead' do it 'accepts classes instead' do
expect(subject).to eq(value) expect(subject).to eq(value)
end
end end
end
context 'when executed outside of transaction' do context 'when executed outside of transaction' do
it 'increments counter with in_transaction: "false"' do it 'increments counter with in_transaction: "false"' do
allow(ActiveRecord::Base.connection).to receive(:transaction_open?) { false } allow(ActiveRecord::Base.connection).to receive(:transaction_open?) { false }
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment) expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :track_greatest, usage: 'issues', in_transaction: 'false').and_call_original .with(operation: :track_greatest, usage: 'issues', in_transaction: 'false').and_call_original
subject subject
end
end end
end
context 'when executed within transaction' do context 'when executed within transaction' do
it 'increments counter with in_transaction: "true"' do it 'increments counter with in_transaction: "true"' do
expect(InternalId::InternalIdGenerator.internal_id_transactions_total).to receive(:increment) expect(InternalId.internal_id_transactions_total).to receive(:increment)
.with(operation: :track_greatest, usage: 'issues', in_transaction: 'true').and_call_original .with(operation: :track_greatest, usage: 'issues', in_transaction: 'true').and_call_original
InternalId.transaction { subject } InternalId.transaction { subject }
end
end end
end end
end end
context 'when the feature flag is disabled' do
stub_feature_flags(generate_iids_without_explicit_locking: false)
it_behaves_like 'a monotonically increasing id generator'
end
context 'when the feature flag is enabled' do
stub_feature_flags(generate_iids_without_explicit_locking: true)
it_behaves_like 'a monotonically increasing id generator'
end
describe '#increment_and_save!' do describe '#increment_and_save!' do
let(:id) { create(:internal_id) } let(:id) { create(:internal_id) }
......
...@@ -165,9 +165,9 @@ RSpec.shared_examples 'AtomicInternalId' do |validate_presence: true| ...@@ -165,9 +165,9 @@ RSpec.shared_examples 'AtomicInternalId' do |validate_presence: true|
3.times { supply.next_value } 3.times { supply.next_value }
end end
current_value = described_class.public_send(method_name, scope_value, &:current_value) described_class.public_send(method_name, scope_value) do |supply|
expect(supply.next_value).to eq(iid + 4)
expect(current_value).to eq(iid + 3) end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment