Commit 8834fda8 authored by Alexandru Croitor's avatar Alexandru Croitor

Rebalance issues relative position without transaction

Rebalancing issues in a long running transaction with a lock
retry generates subtransactions that lead to overall DB
performance degradations.

see https://gitlab.com/gitlab-org/gitlab/-/issues/338346

So we are moving issues rebalancing out of one single big
transaction by locking the repositioning within a project or
namespace during the rebalance.

Rebalancing is a long running job, it can take multiple hours
to finish a rebalance of a namespace with large number of issues.
This change introduces a simple mechanism of resuming rebalance
from a checkpoint.

- A limited number of rebalancing jobs are allowed to run,
5 at this point
- Before starting a rebalance the number of running rebalances
is checked.
- If the limit of running rebalances is met we store the first
project id from the list of projects to be rebalanced, use that
to determine how many rebalances are running
- We load all namespace issue ids into a redis sorted set, by
using current issue relative position as a score. We do that so
that we do not have to run a very slow SQL query that would require
otherwise ordering and we are able to load the issue ids in batches
and get the sorting for free from redis. This also allows us to
pick up issue loading in case of a failure from the project we
read last time
- Because we are no longer in a DB transaction and we want to
preserve the relative position of the issues after the rebalance
we need to disable issue repositioning in the namespace
while rebalancing.
- Once all the issue ids are loaded the positions are being
updated in batches by reading the issues in a sorted order and
computing the new positions based on number of issues in the
namespace, distributed equally.
- Every successfull update stores a checkpoint from which the
next update can be picked-up in case of a failure
- Updates are retried on failure and batch sizes are dynamically
downsized and retried down to a limit of 5 issues per batch.
- All cache keys are set to expire in 10 days from last interaction
to leave enough time for the job to be picked up and also cleanup
any unused keys after given grace period.

Changelog: changed
parent fe5062f2
...@@ -323,6 +323,13 @@ class Issue < ApplicationRecord ...@@ -323,6 +323,13 @@ class Issue < ApplicationRecord
) )
end end
def self.column_order_id_asc
Gitlab::Pagination::Keyset::ColumnOrderDefinition.new(
attribute_name: 'id',
order_expression: arel_table[:id].asc
)
end
def self.to_branch_name(*args) def self.to_branch_name(*args)
branch_name = args.map(&:to_s).each_with_index.map do |arg, i| branch_name = args.map(&:to_s).each_with_index.map do |arg, i|
arg.parameterize(preserve_case: i == 0).presence arg.parameterize(preserve_case: i == 0).presence
......
...@@ -526,6 +526,7 @@ class Project < ApplicationRecord ...@@ -526,6 +526,7 @@ class Project < ApplicationRecord
scope :sorted_by_stars_desc, -> { reorder(self.arel_table['star_count'].desc) } scope :sorted_by_stars_desc, -> { reorder(self.arel_table['star_count'].desc) }
scope :sorted_by_stars_asc, -> { reorder(self.arel_table['star_count'].asc) } scope :sorted_by_stars_asc, -> { reorder(self.arel_table['star_count'].asc) }
# Sometimes queries (e.g. using CTEs) require explicit disambiguation with table name # Sometimes queries (e.g. using CTEs) require explicit disambiguation with table name
scope :projects_order_id_asc, -> { reorder(self.arel_table['id'].asc) }
scope :projects_order_id_desc, -> { reorder(self.arel_table['id'].desc) } scope :projects_order_id_desc, -> { reorder(self.arel_table['id'].desc) }
scope :sorted_by_similarity_desc, -> (search, include_in_select: false) do scope :sorted_by_similarity_desc, -> (search, include_in_select: false) do
......
# frozen_string_literal: true
class IssueRebalancingService
MAX_ISSUE_COUNT = 10_000
BATCH_SIZE = 100
SMALLEST_BATCH_SIZE = 5
RETRIES_LIMIT = 3
TooManyIssues = Class.new(StandardError)
TIMING_CONFIGURATION = [
[0.1.seconds, 0.05.seconds], # short timings, lock_timeout: 100ms, sleep after LockWaitTimeout: 50ms
[0.5.seconds, 0.05.seconds],
[1.second, 0.5.seconds],
[1.second, 0.5.seconds],
[5.seconds, 1.second]
].freeze
def initialize(projects_collection)
@root_namespace = projects_collection.take.root_namespace # rubocop:disable CodeReuse/ActiveRecord
@base = Issue.in_projects(projects_collection)
end
def execute
return unless Feature.enabled?(:rebalance_issues, root_namespace)
raise TooManyIssues, "#{issue_count} issues" if issue_count > MAX_ISSUE_COUNT
start = RelativePositioning::START_POSITION - (gaps / 2) * gap_size
if Feature.enabled?(:issue_rebalancing_optimization)
Issue.transaction do
assign_positions(start, indexed_ids)
.sort_by(&:first)
.each_slice(BATCH_SIZE) do |pairs_with_position|
if Feature.enabled?(:issue_rebalancing_with_retry)
update_positions_with_retry(pairs_with_position, 'rebalance issue positions in batches ordered by id')
else
update_positions(pairs_with_position, 'rebalance issue positions in batches ordered by id')
end
end
end
else
Issue.transaction do
indexed_ids.each_slice(BATCH_SIZE) do |pairs|
pairs_with_position = assign_positions(start, pairs)
if Feature.enabled?(:issue_rebalancing_with_retry)
update_positions_with_retry(pairs_with_position, 'rebalance issue positions')
else
update_positions(pairs_with_position, 'rebalance issue positions')
end
end
end
end
end
private
attr_reader :root_namespace, :base
# rubocop: disable CodeReuse/ActiveRecord
def indexed_ids
base.reorder(:relative_position, :id).pluck(:id).each_with_index
end
# rubocop: enable CodeReuse/ActiveRecord
def assign_positions(start, pairs)
pairs.map do |id, index|
[id, start + (index * gap_size)]
end
end
def update_positions_with_retry(pairs_with_position, query_name)
retries = 0
batch_size = pairs_with_position.size
until pairs_with_position.empty?
begin
update_positions(pairs_with_position.first(batch_size), query_name)
pairs_with_position = pairs_with_position.drop(batch_size)
retries = 0
rescue ActiveRecord::StatementTimeout, ActiveRecord::QueryCanceled => ex
raise ex if batch_size < SMALLEST_BATCH_SIZE
if (retries += 1) == RETRIES_LIMIT
# shrink the batch size in half when RETRIES limit is reached and update still fails perhaps because batch size is still too big
batch_size = (batch_size / 2).to_i
retries = 0
end
retry
end
end
end
def update_positions(pairs_with_position, query_name)
values = pairs_with_position.map do |id, index|
"(#{id}, #{index})"
end.join(', ')
Gitlab::Database::WithLockRetries.new(timing_configuration: TIMING_CONFIGURATION, klass: self.class).run do
run_update_query(values, query_name)
end
end
def run_update_query(values, query_name)
Issue.connection.exec_query(<<~SQL, query_name)
WITH cte(cte_id, new_pos) AS #{Gitlab::Database::AsWithMaterialized.materialized_if_supported} (
SELECT *
FROM (VALUES #{values}) as t (id, pos)
)
UPDATE #{Issue.table_name}
SET relative_position = cte.new_pos
FROM cte
WHERE cte_id = id
SQL
end
def issue_count
@issue_count ||= base.count
end
def gaps
issue_count - 1
end
def gap_size
# We could try to split the available range over the number of gaps we need,
# but IDEAL_DISTANCE * MAX_ISSUE_COUNT is only 0.1% of the available range,
# so we are guaranteed not to exhaust it by using this static value.
#
# If we raise MAX_ISSUE_COUNT or IDEAL_DISTANCE significantly, this may
# change!
RelativePositioning::IDEAL_DISTANCE
end
end
# frozen_string_literal: true
module Issues
class RelativePositionRebalancingService
UPDATE_BATCH_SIZE = 100
PREFETCH_ISSUES_BATCH_SIZE = 10_000
SMALLEST_BATCH_SIZE = 5
RETRIES_LIMIT = 3
TooManyConcurrentRebalances = Class.new(StandardError)
def initialize(projects)
@projects_collection = (projects.is_a?(Array) ? Project.id_in(projects) : projects).projects_order_id_asc
@root_namespace = @projects_collection.take.root_namespace # rubocop:disable CodeReuse/ActiveRecord
@caching = ::Gitlab::Issues::Rebalancing::State.new(@root_namespace, @projects_collection)
end
def execute
return unless Feature.enabled?(:rebalance_issues, root_namespace)
# Given can_start_rebalance? and track_new_running_rebalance are not atomic
# it can happen that we end up with more than Rebalancing::State::MAX_NUMBER_OF_CONCURRENT_REBALANCES running.
# Considering the number of allowed Rebalancing::State::MAX_NUMBER_OF_CONCURRENT_REBALANCES is small we should be ok,
# but should be something to consider if we'd want to scale this up.
error_message = "#{caching.concurrent_running_rebalances_count} concurrent re-balances currently running"
raise TooManyConcurrentRebalances, error_message unless caching.can_start_rebalance?
block_issue_repositioning! unless root_namespace.issue_repositioning_disabled?
caching.track_new_running_rebalance
index = caching.get_current_index
loop do
issue_ids = get_issue_ids(index, PREFETCH_ISSUES_BATCH_SIZE)
pairs_with_index = assign_indexes(issue_ids, index)
pairs_with_index.each_slice(UPDATE_BATCH_SIZE) do |pairs_batch|
update_positions_with_retry(pairs_batch, 're-balance issue positions in batches ordered by position')
end
index = caching.get_current_index
break if index >= caching.issue_count - 1
end
caching.cleanup_cache
unblock_issue_repositioning!
end
private
attr_reader :root_namespace, :projects_collection, :caching
def block_issue_repositioning!
Feature.enable(:block_issue_repositioning, root_namespace)
end
def unblock_issue_repositioning!
Feature.disable(:block_issue_repositioning, root_namespace)
end
def get_issue_ids(index, limit)
issue_ids = caching.get_cached_issue_ids(index, limit)
# if we have a list of cached issues and no current project id cached,
# then we successfully cached issues for all projects
return issue_ids if issue_ids.any? && caching.get_current_project_id.blank?
# if we got no issue ids at the start of re-balancing then we did not cache any issue ids yet
preload_issue_ids
caching.get_cached_issue_ids(index, limit)
end
# rubocop: disable CodeReuse/ActiveRecord
def preload_issue_ids
index = 0
cached_project_id = caching.get_current_project_id
collection = projects_collection
collection = projects_collection.where(Project.arel_table[:id].gteq(cached_project_id.to_i)) if cached_project_id.present?
collection.each do |project|
caching.cache_current_project_id(project.id)
index += 1
scope = Issue.in_projects(project).reorder(custom_reorder).select(:id, :relative_position)
with_retry(PREFETCH_ISSUES_BATCH_SIZE, 100) do |batch_size|
Gitlab::Pagination::Keyset::Iterator.new(scope: scope).each_batch(of: batch_size) do |batch|
caching.cache_issue_ids(batch)
end
end
end
caching.remove_current_project_id_cache
end
# rubocop: enable CodeReuse/ActiveRecord
def assign_indexes(ids, start_index)
ids.each_with_index.map do |id, idx|
[id, start_index + idx]
end
end
# The method runs in a loop where we try for RETRIES_LIMIT=3 times, to run the update statement on
# a number of records(batch size). Method gets an array of (id, value) pairs as argument that is used
# to build the update query matching by id and updating relative_position = value. If we get a statement
# timeout, we split the batch size in half and try(for 3 times again) to batch update on a smaller number of records.
# On success, because we know the batch size and we always pick from the beginning of the array param,
# we can remove first batch_size number of items from array and continue with the successful batch_size for next batches.
# On failures we continue to split batch size to a SMALLEST_BATCH_SIZE limit, which is now set at 5.
#
# e.g.
# 0. items | previous batch size|new batch size | comment
# 1. 100 | 100 | 100 | 3 failures -> split the batch size in half
# 2. 100 | 100 | 50 | 3 failures -> split the batch size in half again
# 3. 100 | 50 | 25 | 3 succeed -> so we drop 25 items 3 times, 4th fails -> split the batch size in half again
# 5. 25 | 25 | 12 | 3 failures -> split the batch size in half
# 6. 25 | 12 | 6 | 3 failures -> we exit because smallest batch size is 5 and we'll be at 3 if we split again
def update_positions_with_retry(pairs_with_index, query_name)
retry_batch_size = pairs_with_index.size
until pairs_with_index.empty?
with_retry(retry_batch_size, SMALLEST_BATCH_SIZE) do |batch_size|
retry_batch_size = batch_size
update_positions(pairs_with_index.first(batch_size), query_name)
# pairs_with_index[batch_size - 1] - can be nil for last batch
# if last batch is smaller than batch_size, so we just get the last pair.
last_pair_in_batch = pairs_with_index[batch_size - 1] || pairs_with_index.last
caching.cache_current_index(last_pair_in_batch.last + 1)
pairs_with_index = pairs_with_index.drop(batch_size)
end
end
end
def update_positions(pairs_with_position, query_name)
values = pairs_with_position.map do |id, index|
"(#{id}, #{start_position + (index * gap_size)})"
end.join(', ')
run_update_query(values, query_name)
end
def run_update_query(values, query_name)
Issue.connection.exec_query(<<~SQL, query_name)
WITH cte(cte_id, new_pos) AS #{Gitlab::Database::AsWithMaterialized.materialized_if_supported} (
SELECT *
FROM (VALUES #{values}) as t (id, pos)
)
UPDATE #{Issue.table_name}
SET relative_position = cte.new_pos
FROM cte
WHERE cte_id = id
SQL
end
def gaps
caching.issue_count - 1
end
def gap_size
RelativePositioning::MAX_GAP
end
def start_position
@start_position ||= (RelativePositioning::START_POSITION - (gaps / 2) * gap_size).to_i
end
def custom_reorder
::Gitlab::Pagination::Keyset::Order.build([Issue.column_order_relative_position, Issue.column_order_id_asc])
end
def with_retry(initial_batch_size, exit_batch_size)
retries = 0
batch_size = initial_batch_size
begin
yield batch_size
retries = 0
rescue ActiveRecord::StatementTimeout, ActiveRecord::QueryCanceled => ex
raise ex if batch_size < exit_batch_size
if (retries += 1) == RETRIES_LIMIT
# shrink the batch size in half when RETRIES limit is reached and update still fails perhaps because batch size is still too big
batch_size = (batch_size / 2).to_i
retries = 0
end
retry
end
end
end
end
...@@ -32,12 +32,8 @@ class IssueRebalancingWorker ...@@ -32,12 +32,8 @@ class IssueRebalancingWorker
return return
end end
# Temporary disable rebalancing for performance reasons Issues::RelativePositionRebalancingService.new(projects_to_rebalance).execute
# For more information check https://gitlab.com/gitlab-com/gl-infra/production/-/issues/4321 rescue Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances => e
return if projects_to_rebalance.take&.root_namespace&.issue_repositioning_disabled? # rubocop:disable CodeReuse/ActiveRecord
IssueRebalancingService.new(projects_to_rebalance).execute
rescue IssueRebalancingService::TooManyIssues => e
Gitlab::ErrorTracking.log_exception(e, root_namespace_id: root_namespace_id, project_id: project_id) Gitlab::ErrorTracking.log_exception(e, root_namespace_id: root_namespace_id, project_id: project_id)
end end
......
---
name: issue_rebalancing_optimization
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/53384
rollout_issue_url:
milestone: '13.9'
type: development
group: group::project management
default_enabled: false
---
name: issue_rebalancing_with_retry
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/59744
rollout_issue_url:
milestone: '13.11'
type: development
group: group::project management
default_enabled: false
...@@ -418,7 +418,7 @@ p.create_wiki ### creates the wiki project on the filesystem ...@@ -418,7 +418,7 @@ p.create_wiki ### creates the wiki project on the filesystem
```ruby ```ruby
p = Project.find_by_full_path('PROJECT PATH') p = Project.find_by_full_path('PROJECT PATH')
IssueRebalancingService.new(p.issues.take).execute Issues::RelativePositionRebalancingService.new(p.root_namespace.all_projects).execute
``` ```
## Imports / Exports ## Imports / Exports
......
# frozen_string_literal: true
module Gitlab
module Issues
module Rebalancing
class State
REDIS_EXPIRY_TIME = 10.days
MAX_NUMBER_OF_CONCURRENT_REBALANCES = 5
NAMESPACE = 1
PROJECT = 2
def initialize(root_namespace, projects)
@root_namespace = root_namespace
@projects = projects
@rebalanced_container_type = @root_namespace.is_a?(Group) ? NAMESPACE : PROJECT
@rebalanced_container_id = @rebalanced_container_type == NAMESPACE ? @root_namespace.id : projects.take.id # rubocop:disable CodeReuse/ActiveRecord
end
def track_new_running_rebalance
with_redis do |redis|
redis.multi do |multi|
# we trigger re-balance for namespaces(groups) or specific user project
value = "#{rebalanced_container_type}/#{rebalanced_container_id}"
multi.sadd(concurrent_running_rebalances_key, value)
multi.expire(concurrent_running_rebalances_key, REDIS_EXPIRY_TIME)
end
end
end
def concurrent_running_rebalances_count
with_redis { |redis| redis.scard(concurrent_running_rebalances_key).to_i }
end
def rebalance_in_progress?
all_rebalanced_containers = with_redis { |redis| redis.smembers(concurrent_running_rebalances_key) }
is_running = case rebalanced_container_type
when NAMESPACE
namespace_ids = all_rebalanced_containers.map {|string| string.split("#{NAMESPACE}/").second.to_i }.compact
namespace_ids.include?(root_namespace.id)
when PROJECT
project_ids = all_rebalanced_containers.map {|string| string.split("#{PROJECT}/").second.to_i }.compact
project_ids.include?(projects.take.id) # rubocop:disable CodeReuse/ActiveRecord
else
false
end
refresh_keys_expiration if is_running
is_running
end
def can_start_rebalance?
rebalance_in_progress? || too_many_rebalances_running?
end
def cache_issue_ids(issue_ids)
with_redis do |redis|
values = issue_ids.map { |issue| [issue.relative_position, issue.id] }
redis.multi do |multi|
multi.zadd(issue_ids_key, values) unless values.blank?
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
end
end
end
def get_cached_issue_ids(index, limit)
with_redis do |redis|
redis.zrange(issue_ids_key, index, index + limit - 1)
end
end
def cache_current_index(index)
with_redis { |redis| redis.set(current_index_key, index, ex: REDIS_EXPIRY_TIME) }
end
def get_current_index
with_redis { |redis| redis.get(current_index_key).to_i }
end
def cache_current_project_id(project_id)
with_redis { |redis| redis.set(current_project_key, project_id, ex: REDIS_EXPIRY_TIME) }
end
def get_current_project_id
with_redis { |redis| redis.get(current_project_key) }
end
def issue_count
@issue_count ||= with_redis { |redis| redis.zcard(issue_ids_key)}
end
def remove_current_project_id_cache
with_redis { |redis| redis.del(current_project_key)}
end
def refresh_keys_expiration
with_redis do |redis|
redis.multi do |multi|
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
multi.expire(current_index_key, REDIS_EXPIRY_TIME)
multi.expire(current_project_key, REDIS_EXPIRY_TIME)
multi.expire(concurrent_running_rebalances_key, REDIS_EXPIRY_TIME)
end
end
end
def cleanup_cache
with_redis do |redis|
redis.multi do |multi|
multi.del(issue_ids_key)
multi.del(current_index_key)
multi.del(current_project_key)
multi.srem(concurrent_running_rebalances_key, "#{rebalanced_container_type}/#{rebalanced_container_id}")
end
end
end
private
attr_accessor :root_namespace, :projects, :rebalanced_container_type, :rebalanced_container_id
def too_many_rebalances_running?
concurrent_running_rebalances_count <= MAX_NUMBER_OF_CONCURRENT_REBALANCES
end
def redis_key_prefix
"gitlab:issues-position-rebalances"
end
def issue_ids_key
"#{redis_key_prefix}:#{root_namespace.id}"
end
def current_index_key
"#{issue_ids_key}:current_index"
end
def current_project_key
"#{issue_ids_key}:current_project_id"
end
def concurrent_running_rebalances_key
"#{redis_key_prefix}:running_rebalances"
end
def with_redis(&blk)
Gitlab::Redis::SharedState.with(&blk) # rubocop: disable CodeReuse/ActiveRecord
end
end
end
end
end
...@@ -219,7 +219,7 @@ module Gitlab ...@@ -219,7 +219,7 @@ module Gitlab
column_definition.column_expression.dup.as(column_definition.attribute_name).to_sql column_definition.column_expression.dup.as(column_definition.attribute_name).to_sql
end end
scope = scope.select(*scope.arel.projections, *additional_projections) if additional_projections scope = scope.reselect(*scope.arel.projections, *additional_projections) unless additional_projections.blank?
scope scope
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_state do
shared_examples 'issues rebalance caching' do
describe '#track_new_running_rebalance' do
it 'caches a project id to track caching in progress' do
expect { rebalance_caching.track_new_running_rebalance }.to change { rebalance_caching.concurrent_running_rebalances_count }.from(0).to(1)
end
end
describe '#set and get current_index' do
it 'returns zero as current index when index not cached' do
expect(rebalance_caching.get_current_index).to eq(0)
end
it 'returns cached current index' do
expect { rebalance_caching.cache_current_index(123) }.to change { rebalance_caching.get_current_index }.from(0).to(123)
end
end
describe '#set and get current_project' do
it 'returns nil if there is no project_id cached' do
expect(rebalance_caching.get_current_project_id).to be_nil
end
it 'returns cached current project_id' do
expect { rebalance_caching.cache_current_project_id(456) }.to change { rebalance_caching.get_current_project_id }.from(nil).to('456')
end
end
describe "#rebalance_in_progress?" do
it 'return zero if no re-balances are running' do
expect(rebalance_caching.concurrent_running_rebalances_count).to eq(0)
end
it 'return false if no re-balances are running' do
expect(rebalance_caching.rebalance_in_progress?).to be false
end
it 'return true a re-balance for given project/namespace is running' do
rebalance_caching.track_new_running_rebalance
expect(rebalance_caching.rebalance_in_progress?).to be true
end
end
context 'caching issue ids' do
context 'with no issue ids cached' do
it 'returns zero when there are no cached issue ids' do
expect(rebalance_caching.issue_count).to eq(0)
end
it 'returns empty array when there are no cached issue ids' do
expect(rebalance_caching.get_cached_issue_ids(0, 100)).to eq([])
end
end
context 'with cached issue ids' do
before do
generate_and_cache_issues_ids(count: 3)
end
it 'returns count of cached issue ids' do
expect(rebalance_caching.issue_count).to eq(3)
end
it 'returns array of issue ids' do
expect(rebalance_caching.get_cached_issue_ids(0, 100)).to eq(%w(1 2 3))
end
it 'limits returned values' do
expect(rebalance_caching.get_cached_issue_ids(0, 2)).to eq(%w(1 2))
end
context 'when caching duplicate issue_ids' do
before do
generate_and_cache_issues_ids(count: 3, position_offset: 3, position_direction: -1)
end
it 'does not cache duplicate issues' do
expect(rebalance_caching.issue_count).to eq(3)
end
it 'returns cached issues with latest scores' do
expect(rebalance_caching.get_cached_issue_ids(0, 100)).to eq(%w(3 2 1))
end
end
end
end
context 'when setting expiration' do
context 'when tracking new rebalance' do
it 'returns as expired for non existent key' do
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:concurrent_running_rebalances_key))).to be < 0
end
end
it 'has expiration set' do
rebalance_caching.track_new_running_rebalance
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:concurrent_running_rebalances_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end
end
end
context 'when setting current index' do
it 'returns as expiring for non existent key' do
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:current_index_key))).to be < 0
end
end
it 'has expiration set' do
rebalance_caching.cache_current_index(123)
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:current_index_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end
end
end
context 'when setting current project id' do
it 'returns as expired for non existent key' do
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:current_project_key))).to be < 0
end
end
it 'has expiration set' do
rebalance_caching.cache_current_project_id(456)
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:current_project_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end
end
end
context 'when setting cached issue ids' do
it 'returns as expired for non existent key' do
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:issue_ids_key))).to be < 0
end
end
it 'has expiration set' do
generate_and_cache_issues_ids(count: 3)
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:issue_ids_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end
end
end
end
context 'cleanup cache' do
before do
generate_and_cache_issues_ids(count: 3)
rebalance_caching.cache_current_index(123)
rebalance_caching.cache_current_project_id(456)
rebalance_caching.track_new_running_rebalance
end
it 'removes cache keys' do
expect(check_existing_keys).to eq(4)
rebalance_caching.cleanup_cache
expect(check_existing_keys).to eq(0)
end
end
end
context 'rebalancing issues in namespace' do
let_it_be(:group) { create(:group, :private) }
let_it_be(:project) { create(:project, namespace: group) }
subject(:rebalance_caching) { described_class.new(group, group.projects) }
it { expect(rebalance_caching.send(:rebalanced_container_type)).to eq(described_class::NAMESPACE) }
it_behaves_like 'issues rebalance caching'
end
context 'rebalancing issues in a project' do
let_it_be(:project) { create(:project) }
subject(:rebalance_caching) { described_class.new(project.namespace, Project.where(id: project)) }
it { expect(rebalance_caching.send(:rebalanced_container_type)).to eq(described_class::PROJECT) }
it_behaves_like 'issues rebalance caching'
end
# count - how many issue ids to generate, issue ids will start at 1
# position_offset - if you'd want to offset generated relative_position for the issue ids,
# relative_position is generated as = issue id * 10 + position_offset
# position_direction - (1) for positive relative_positions, (-1) for negative relative_positions
def generate_and_cache_issues_ids(count:, position_offset: 0, position_direction: 1)
issues = []
count.times do |idx|
id = idx + 1
issues << double(relative_position: position_direction * (id * 10 + position_offset), id: id)
end
rebalance_caching.cache_issue_ids(issues)
end
def check_existing_keys
index = 0
index += 1 if rebalance_caching.get_current_index > 0
index += 1 if rebalance_caching.get_current_project_id.present?
index += 1 if rebalance_caching.get_cached_issue_ids(0, 100).present?
index += 1 if rebalance_caching.rebalance_in_progress?
index
end
end
...@@ -538,6 +538,47 @@ RSpec.describe Gitlab::Pagination::Keyset::Order do ...@@ -538,6 +538,47 @@ RSpec.describe Gitlab::Pagination::Keyset::Order do
end end
it_behaves_like 'cursor attribute examples' it_behaves_like 'cursor attribute examples'
context 'with projections' do
context 'when additional_projections is empty' do
let(:scope) { Project.select(:id, :namespace_id) }
subject(:sql) { order.apply_cursor_conditions(scope, { id: '100' }).to_sql }
it 'has correct projections' do
is_expected.to include('SELECT "projects"."id", "projects"."namespace_id" FROM "projects"')
end
end
context 'when there are additional_projections' do
let(:order) do
order = Gitlab::Pagination::Keyset::Order.build([
Gitlab::Pagination::Keyset::ColumnOrderDefinition.new(
attribute_name: 'created_at_field',
column_expression: Project.arel_table[:created_at],
order_expression: Project.arel_table[:created_at].desc,
order_direction: :desc,
distinct: false,
add_to_projections: true
),
Gitlab::Pagination::Keyset::ColumnOrderDefinition.new(
attribute_name: 'id',
order_expression: Project.arel_table[:id].desc
)
])
order
end
let(:scope) { Project.select(:id, :namespace_id).reorder(order) }
subject(:sql) { order.apply_cursor_conditions(scope).to_sql }
it 'has correct projections' do
is_expected.to include('SELECT "projects"."id", "projects"."namespace_id", "projects"."created_at" AS created_at_field FROM "projects"')
end
end
end
end end
end end
end end
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe IssueRebalancingService do RSpec.describe Issues::RelativePositionRebalancingService, :clean_gitlab_redis_shared_state do
let_it_be(:project, reload: true) { create(:project) } let_it_be(:project, reload: true) { create(:project) }
let_it_be(:user) { project.creator } let_it_be(:user) { project.creator }
let_it_be(:start) { RelativePositioning::START_POSITION } let_it_be(:start) { RelativePositioning::START_POSITION }
...@@ -36,10 +36,11 @@ RSpec.describe IssueRebalancingService do ...@@ -36,10 +36,11 @@ RSpec.describe IssueRebalancingService do
project.reload.issues.reorder(relative_position: :asc).to_a project.reload.issues.reorder(relative_position: :asc).to_a
end end
shared_examples 'IssueRebalancingService shared examples' do subject(:service) { described_class.new(Project.id_in(project)) }
it 'rebalances a set of issues with clumps at the end and start' do
context 'execute' do
it 're-balances a set of issues with clumps at the end and start' do
all_issues = start_clump + unclumped + end_clump.reverse all_issues = start_clump + unclumped + end_clump.reverse
service = described_class.new(Project.id_in([project.id]))
expect { service.execute }.not_to change { issues_in_position_order.map(&:id) } expect { service.execute }.not_to change { issues_in_position_order.map(&:id) }
...@@ -52,11 +53,10 @@ RSpec.describe IssueRebalancingService do ...@@ -52,11 +53,10 @@ RSpec.describe IssueRebalancingService do
expect(gaps).to all(be > RelativePositioning::MIN_GAP) expect(gaps).to all(be > RelativePositioning::MIN_GAP)
expect(all_issues.first.relative_position).to be > (RelativePositioning::MIN_POSITION * 0.9999) expect(all_issues.first.relative_position).to be > (RelativePositioning::MIN_POSITION * 0.9999)
expect(all_issues.last.relative_position).to be < (RelativePositioning::MAX_POSITION * 0.9999) expect(all_issues.last.relative_position).to be < (RelativePositioning::MAX_POSITION * 0.9999)
expect(project.root_namespace.issue_repositioning_disabled?).to be false
end end
it 'is idempotent' do it 'is idempotent' do
service = described_class.new(Project.id_in(project))
expect do expect do
service.execute service.execute
service.execute service.execute
...@@ -70,9 +70,9 @@ RSpec.describe IssueRebalancingService do ...@@ -70,9 +70,9 @@ RSpec.describe IssueRebalancingService do
issue.project.group issue.project.group
old_pos = issue.relative_position old_pos = issue.relative_position
service = described_class.new(Project.id_in(project)) # fetching root namespace in the initializer triggers 2 queries:
# for fetching a random project from collection and fetching the root namespace.
expect { service.execute }.not_to exceed_query_limit(0) expect { service.execute }.not_to exceed_query_limit(2)
expect(old_pos).to eq(issue.reload.relative_position) expect(old_pos).to eq(issue.reload.relative_position)
end end
...@@ -80,8 +80,6 @@ RSpec.describe IssueRebalancingService do ...@@ -80,8 +80,6 @@ RSpec.describe IssueRebalancingService do
issue = create(:issue, project: project, author: user, relative_position: max_pos) issue = create(:issue, project: project, author: user, relative_position: max_pos)
stub_feature_flags(rebalance_issues: project.root_namespace) stub_feature_flags(rebalance_issues: project.root_namespace)
service = described_class.new(Project.id_in(project))
expect { service.execute }.to change { issue.reload.relative_position } expect { service.execute }.to change { issue.reload.relative_position }
end end
...@@ -90,22 +88,34 @@ RSpec.describe IssueRebalancingService do ...@@ -90,22 +88,34 @@ RSpec.describe IssueRebalancingService do
project.update!(group: create(:group)) project.update!(group: create(:group))
stub_feature_flags(rebalance_issues: issue.project.group) stub_feature_flags(rebalance_issues: issue.project.group)
service = described_class.new(Project.id_in(project))
expect { service.execute }.to change { issue.reload.relative_position } expect { service.execute }.to change { issue.reload.relative_position }
end end
it 'aborts if there are too many issues' do it 'aborts if there are too many rebalances running' do
base = double(count: 10_001) caching = service.send(:caching)
allow(caching).to receive(:rebalance_in_progress?).and_return(false)
allow(caching).to receive(:concurrent_running_rebalances_count).and_return(10)
allow(service).to receive(:caching).and_return(caching)
allow(Issue).to receive(:in_projects).and_return(base) expect { service.execute }.to raise_error(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances)
expect(project.root_namespace.issue_repositioning_disabled?).to be false
end
expect { described_class.new(Project.id_in(project)).execute }.to raise_error(described_class::TooManyIssues) it 'resumes a started rebalance even if there are already too many rebalances running' do
Gitlab::Redis::SharedState.with do |redis|
redis.sadd("gitlab:issues-position-rebalances:running_rebalances", "#{::Gitlab::Issues::Rebalancing::State::PROJECT}/#{project.id}")
redis.sadd("gitlab:issues-position-rebalances:running_rebalances", "1/100")
end end
caching = service.send(:caching)
allow(caching).to receive(:concurrent_running_rebalances_count).and_return(10)
allow(service).to receive(:caching).and_return(caching)
expect { service.execute }.not_to raise_error(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances)
end end
shared_examples 'rebalancing is retried on statement timeout exceptions' do context 're-balancing is retried on statement timeout exceptions' do
subject { described_class.new(Project.id_in(project)) } subject { service }
it 'retries update statement' do it 'retries update statement' do
call_count = 0 call_count = 0
...@@ -137,37 +147,20 @@ RSpec.describe IssueRebalancingService do ...@@ -137,37 +147,20 @@ RSpec.describe IssueRebalancingService do
end end
end end
context 'when issue_rebalancing_optimization feature flag is on' do context 'when resuming a stopped rebalance' do
before do before do
stub_feature_flags(issue_rebalancing_optimization: true) service.send(:preload_issue_ids)
expect(service.send(:caching).get_cached_issue_ids(0, 300)).not_to be_empty
# simulate we already rebalanced half the issues
index = clump_size * 3 / 2 + 1
service.send(:caching).cache_current_index(index)
end end
it_behaves_like 'IssueRebalancingService shared examples' it 'rebalances the other half of issues' do
expect(subject).to receive(:update_positions_with_retry).exactly(5).and_call_original
context 'when issue_rebalancing_with_retry feature flag is on' do
before do
stub_feature_flags(issue_rebalancing_with_retry: true)
end
it_behaves_like 'IssueRebalancingService shared examples' subject.execute
it_behaves_like 'rebalancing is retried on statement timeout exceptions'
end
end
context 'when issue_rebalancing_optimization feature flag is off' do
before do
stub_feature_flags(issue_rebalancing_optimization: false)
end
it_behaves_like 'IssueRebalancingService shared examples'
context 'when issue_rebalancing_with_retry feature flag is on' do
before do
stub_feature_flags(issue_rebalancing_with_retry: true)
end end
it_behaves_like 'IssueRebalancingService shared examples'
it_behaves_like 'rebalancing is retried on statement timeout exceptions'
end end
end end
end end
...@@ -8,41 +8,29 @@ RSpec.describe IssueRebalancingWorker do ...@@ -8,41 +8,29 @@ RSpec.describe IssueRebalancingWorker do
let_it_be(:project) { create(:project, group: group) } let_it_be(:project) { create(:project, group: group) }
let_it_be(:issue) { create(:issue, project: project) } let_it_be(:issue) { create(:issue, project: project) }
context 'when block_issue_repositioning is enabled' do
before do
stub_feature_flags(block_issue_repositioning: group)
end
it 'does not run an instance of IssueRebalancingService' do
expect(IssueRebalancingService).not_to receive(:new)
described_class.new.perform(nil, issue.project_id)
end
end
shared_examples 'running the worker' do shared_examples 'running the worker' do
it 'runs an instance of IssueRebalancingService' do it 'runs an instance of Issues::RelativePositionRebalancingService' do
service = double(execute: nil) service = double(execute: nil)
service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class) service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class)
expect(IssueRebalancingService).to receive(:new).with(service_param).and_return(service) expect(Issues::RelativePositionRebalancingService).to receive(:new).with(service_param).and_return(service)
described_class.new.perform(*arguments) described_class.new.perform(*arguments)
end end
it 'anticipates there being too many issues' do it 'anticipates there being too many concurent rebalances' do
service = double service = double
service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class) service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class)
allow(service).to receive(:execute).and_raise(IssueRebalancingService::TooManyIssues) allow(service).to receive(:execute).and_raise(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances)
expect(IssueRebalancingService).to receive(:new).with(service_param).and_return(service) expect(Issues::RelativePositionRebalancingService).to receive(:new).with(service_param).and_return(service)
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(IssueRebalancingService::TooManyIssues, include(project_id: arguments.second, root_namespace_id: arguments.third)) expect(Gitlab::ErrorTracking).to receive(:log_exception).with(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances, include(project_id: arguments.second, root_namespace_id: arguments.third))
described_class.new.perform(*arguments) described_class.new.perform(*arguments)
end end
it 'takes no action if the value is nil' do it 'takes no action if the value is nil' do
expect(IssueRebalancingService).not_to receive(:new) expect(Issues::RelativePositionRebalancingService).not_to receive(:new)
expect(Gitlab::ErrorTracking).not_to receive(:log_exception) expect(Gitlab::ErrorTracking).not_to receive(:log_exception)
described_class.new.perform # all arguments are nil described_class.new.perform # all arguments are nil
...@@ -52,7 +40,7 @@ RSpec.describe IssueRebalancingWorker do ...@@ -52,7 +40,7 @@ RSpec.describe IssueRebalancingWorker do
shared_examples 'safely handles non-existent ids' do shared_examples 'safely handles non-existent ids' do
it 'anticipates the inability to find the issue' do it 'anticipates the inability to find the issue' do
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(ArgumentError, include(project_id: arguments.second, root_namespace_id: arguments.third)) expect(Gitlab::ErrorTracking).to receive(:log_exception).with(ArgumentError, include(project_id: arguments.second, root_namespace_id: arguments.third))
expect(IssueRebalancingService).not_to receive(:new) expect(Issues::RelativePositionRebalancingService).not_to receive(:new)
described_class.new.perform(*arguments) described_class.new.perform(*arguments)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment