Commit 3b3f9be2 authored by Alexandru Croitor's avatar Alexandru Croitor

Deduplicate IssueRebalancingWorker by root namespace

Ensure that we run a single IssueRebalancingWorker per root namespace
as we rebalance issues per entire hierarchy. Also deduplicate
IssueRebalancingWorker using :until_executed strategy
parent efd9b162
...@@ -524,7 +524,7 @@ class Issue < ApplicationRecord ...@@ -524,7 +524,7 @@ class Issue < ApplicationRecord
def could_not_move(exception) def could_not_move(exception)
# Symptom of running out of space - schedule rebalancing # Symptom of running out of space - schedule rebalancing
IssueRebalancingWorker.perform_async(nil, project_id) IssueRebalancingWorker.perform_async(nil, *project.self_or_root_group_ids)
end end
end end
......
...@@ -2562,6 +2562,17 @@ class Project < ApplicationRecord ...@@ -2562,6 +2562,17 @@ class Project < ApplicationRecord
end end
end end
# for projects that are part of user namespace, return project.
def self_or_root_group_ids
if group
root_group = root_namespace
else
project = self
end
[project&.id, root_group&.id]
end
def package_already_taken?(package_name) def package_already_taken?(package_name)
namespace.root_ancestor.all_projects namespace.root_ancestor.all_projects
.joins(:packages) .joins(:packages)
......
...@@ -15,14 +15,13 @@ class IssueRebalancingService ...@@ -15,14 +15,13 @@ class IssueRebalancingService
[5.seconds, 1.second] [5.seconds, 1.second]
].freeze ].freeze
def initialize(issue) def initialize(projects_collection)
@issue = issue @root_namespace = projects_collection.take.root_namespace # rubocop:disable CodeReuse/ActiveRecord
@base = Issue.relative_positioning_query_base(issue) @base = Issue.in_projects(projects_collection)
end end
def execute def execute
gates = [issue.project, issue.project.group].compact return unless Feature.enabled?(:rebalance_issues, root_namespace)
return unless gates.any? { |gate| Feature.enabled?(:rebalance_issues, gate) }
raise TooManyIssues, "#{issue_count} issues" if issue_count > MAX_ISSUE_COUNT raise TooManyIssues, "#{issue_count} issues" if issue_count > MAX_ISSUE_COUNT
...@@ -57,7 +56,7 @@ class IssueRebalancingService ...@@ -57,7 +56,7 @@ class IssueRebalancingService
private private
attr_reader :issue, :base attr_reader :root_namespace, :base
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def indexed_ids def indexed_ids
......
...@@ -29,7 +29,7 @@ module Issues ...@@ -29,7 +29,7 @@ module Issues
gates = [issue.project, issue.project.group].compact gates = [issue.project, issue.project.group].compact
return unless gates.any? { |gate| Feature.enabled?(:rebalance_issues, gate) } return unless gates.any? { |gate| Feature.enabled?(:rebalance_issues, gate) }
IssueRebalancingWorker.perform_async(nil, issue.project_id) IssueRebalancingWorker.perform_async(nil, *issue.project.self_or_root_group_ids)
end end
private private
......
...@@ -41,7 +41,7 @@ class IssuePlacementWorker ...@@ -41,7 +41,7 @@ class IssuePlacementWorker
IssuePlacementWorker.perform_async(nil, leftover.project_id) if leftover.present? IssuePlacementWorker.perform_async(nil, leftover.project_id) if leftover.present?
rescue RelativePositioning::NoSpaceLeft => e rescue RelativePositioning::NoSpaceLeft => e
Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, project_id: project_id) Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, project_id: project_id)
IssueRebalancingWorker.perform_async(nil, project_id.presence || issue.project_id) IssueRebalancingWorker.perform_async(nil, *root_namespace_id_to_rebalance(issue, project_id))
end end
def find_issue(issue_id, project_id) def find_issue(issue_id, project_id)
...@@ -53,4 +53,11 @@ class IssuePlacementWorker ...@@ -53,4 +53,11 @@ class IssuePlacementWorker
project.issues.take project.issues.take
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
private
def root_namespace_id_to_rebalance(issue, project_id)
project_id = project_id.presence || issue.project_id
Project.find(project_id)&.self_or_root_group_ids
end
end end
...@@ -9,21 +9,44 @@ class IssueRebalancingWorker ...@@ -9,21 +9,44 @@ class IssueRebalancingWorker
urgency :low urgency :low
feature_category :issue_tracking feature_category :issue_tracking
tags :exclude_from_kubernetes tags :exclude_from_kubernetes
deduplicate :until_executed, including_scheduled: true
def perform(ignore = nil, project_id = nil, root_namespace_id = nil)
# we need to have exactly one of the project_id and root_namespace_id params be non-nil
raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id
return if project_id.nil? && root_namespace_id.nil?
# pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce)
# or the root namespace, this also makes the worker backward compatible with previous version where a project_id was
# passed as the param
projects_to_rebalance = projects_collection(project_id, root_namespace_id)
# something might have happened with the namespace between scheduling the worker and actually running it,
# maybe it was removed.
if projects_to_rebalance.blank?
Gitlab::ErrorTracking.log_exception(
ArgumentError.new("Projects to be rebalanced not found for arguments: project_id #{project_id}, root_namespace_id: #{root_namespace_id}"),
{ project_id: project_id, root_namespace_id: root_namespace_id })
return
end
def perform(ignore = nil, project_id = nil) # Temporary disable rebalancing for performance reasons
return if project_id.nil? # For more information check https://gitlab.com/gitlab-com/gl-infra/production/-/issues/4321
return if projects_to_rebalance.take&.root_namespace&.issue_repositioning_disabled? # rubocop:disable CodeReuse/ActiveRecord
project = Project.find(project_id) IssueRebalancingService.new(projects_to_rebalance).execute
rescue IssueRebalancingService::TooManyIssues => e
Gitlab::ErrorTracking.log_exception(e, root_namespace_id: root_namespace_id, project_id: project_id)
end
# Temporary disable reabalancing for performance reasons private
# For more information check https://gitlab.com/gitlab-com/gl-infra/production/-/issues/4321
return if project.root_namespace&.issue_repositioning_disabled?
# All issues are equivalent as far as we are concerned def projects_collection(project_id, root_namespace_id)
issue = project.issues.take # rubocop: disable CodeReuse/ActiveRecord # we can have either project_id(older version) or project_id if project is part of a user namespace and not a group
# or root_namespace_id(newer version) never both.
return Project.id_in([project_id]) if project_id
IssueRebalancingService.new(issue).execute Namespace.find_by_id(root_namespace_id)&.all_projects
rescue ActiveRecord::RecordNotFound, IssueRebalancingService::TooManyIssues => e
Gitlab::ErrorTracking.log_exception(e, project_id: project_id)
end end
end end
...@@ -1287,18 +1287,36 @@ RSpec.describe Issue do ...@@ -1287,18 +1287,36 @@ RSpec.describe Issue do
end end
end end
let(:project) { build_stubbed(:project_empty_repo) } shared_examples 'schedules issues rebalancing' do
let(:issue) { build_stubbed(:issue, relative_position: 100, project: project) } let(:issue) { build_stubbed(:issue, relative_position: 100, project: project) }
it 'schedules rebalancing if we time-out when moving' do it 'schedules rebalancing if we time-out when moving' do
lhs = build_stubbed(:issue, relative_position: 99, project: project) lhs = build_stubbed(:issue, relative_position: 99, project: project)
to_move = build(:issue, project: project) to_move = build(:issue, project: project)
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project.id) expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project_id, namespace_id)
expect { to_move.move_between(lhs, issue) }.to raise_error(ActiveRecord::QueryCanceled) expect { to_move.move_between(lhs, issue) }.to raise_error(ActiveRecord::QueryCanceled)
end end
end end
context 'when project in user namespace' do
let(:project) { build_stubbed(:project_empty_repo) }
let(:project_id) { project.id }
let(:namespace_id) { nil }
it_behaves_like 'schedules issues rebalancing'
end
context 'when project in a group namespace' do
let(:group) { create(:group) }
let(:project) { build_stubbed(:project_empty_repo, group: group) }
let(:project_id) { nil }
let(:namespace_id) { group.id }
it_behaves_like 'schedules issues rebalancing'
end
end
describe '#allows_reviewers?' do describe '#allows_reviewers?' do
it 'returns false as we do not support reviewers on issues yet' do it 'returns false as we do not support reviewers on issues yet' do
issue = build_stubbed(:issue) issue = build_stubbed(:issue)
......
...@@ -39,7 +39,7 @@ RSpec.describe IssueRebalancingService do ...@@ -39,7 +39,7 @@ RSpec.describe IssueRebalancingService do
shared_examples 'IssueRebalancingService shared examples' do shared_examples 'IssueRebalancingService shared examples' do
it 'rebalances a set of issues with clumps at the end and start' do it 'rebalances a set of issues with clumps at the end and start' do
all_issues = start_clump + unclumped + end_clump.reverse all_issues = start_clump + unclumped + end_clump.reverse
service = described_class.new(project.issues.first) service = described_class.new(Project.id_in([project.id]))
expect { service.execute }.not_to change { issues_in_position_order.map(&:id) } expect { service.execute }.not_to change { issues_in_position_order.map(&:id) }
...@@ -55,7 +55,7 @@ RSpec.describe IssueRebalancingService do ...@@ -55,7 +55,7 @@ RSpec.describe IssueRebalancingService do
end end
it 'is idempotent' do it 'is idempotent' do
service = described_class.new(project.issues.first) service = described_class.new(Project.id_in(project))
expect do expect do
service.execute service.execute
...@@ -70,17 +70,17 @@ RSpec.describe IssueRebalancingService do ...@@ -70,17 +70,17 @@ RSpec.describe IssueRebalancingService do
issue.project.group issue.project.group
old_pos = issue.relative_position old_pos = issue.relative_position
service = described_class.new(issue) service = described_class.new(Project.id_in(project))
expect { service.execute }.not_to exceed_query_limit(0) expect { service.execute }.not_to exceed_query_limit(0)
expect(old_pos).to eq(issue.reload.relative_position) expect(old_pos).to eq(issue.reload.relative_position)
end end
it 'acts if the flag is enabled for the project' do it 'acts if the flag is enabled for the root namespace' do
issue = create(:issue, project: project, author: user, relative_position: max_pos) issue = create(:issue, project: project, author: user, relative_position: max_pos)
stub_feature_flags(rebalance_issues: issue.project) stub_feature_flags(rebalance_issues: project.root_namespace)
service = described_class.new(issue) service = described_class.new(Project.id_in(project))
expect { service.execute }.to change { issue.reload.relative_position } expect { service.execute }.to change { issue.reload.relative_position }
end end
...@@ -90,23 +90,22 @@ RSpec.describe IssueRebalancingService do ...@@ -90,23 +90,22 @@ RSpec.describe IssueRebalancingService do
project.update!(group: create(:group)) project.update!(group: create(:group))
stub_feature_flags(rebalance_issues: issue.project.group) stub_feature_flags(rebalance_issues: issue.project.group)
service = described_class.new(issue) service = described_class.new(Project.id_in(project))
expect { service.execute }.to change { issue.reload.relative_position } expect { service.execute }.to change { issue.reload.relative_position }
end end
it 'aborts if there are too many issues' do it 'aborts if there are too many issues' do
issue = project.issues.first
base = double(count: 10_001) base = double(count: 10_001)
allow(Issue).to receive(:relative_positioning_query_base).with(issue).and_return(base) allow(Issue).to receive(:in_projects).and_return(base)
expect { described_class.new(issue).execute }.to raise_error(described_class::TooManyIssues) expect { described_class.new(Project.id_in(project)).execute }.to raise_error(described_class::TooManyIssues)
end end
end end
shared_examples 'rebalancing is retried on statement timeout exceptions' do shared_examples 'rebalancing is retried on statement timeout exceptions' do
subject { described_class.new(project.issues.first) } subject { described_class.new(Project.id_in(project)) }
it 'retries update statement' do it 'retries update statement' do
call_count = 0 call_count = 0
......
...@@ -225,7 +225,7 @@ RSpec.describe Issues::UpdateService, :mailer do ...@@ -225,7 +225,7 @@ RSpec.describe Issues::UpdateService, :mailer do
opts[:move_between_ids] = [issue1.id, issue2.id] opts[:move_between_ids] = [issue1.id, issue2.id]
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project.id) expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.root_namespace.id)
update_issue(opts) update_issue(opts)
expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position) expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position)
...@@ -239,7 +239,7 @@ RSpec.describe Issues::UpdateService, :mailer do ...@@ -239,7 +239,7 @@ RSpec.describe Issues::UpdateService, :mailer do
opts[:move_between_ids] = [issue1.id, issue2.id] opts[:move_between_ids] = [issue1.id, issue2.id]
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project.id) expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.root_namespace.id)
update_issue(opts) update_issue(opts)
expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position) expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position)
...@@ -253,7 +253,7 @@ RSpec.describe Issues::UpdateService, :mailer do ...@@ -253,7 +253,7 @@ RSpec.describe Issues::UpdateService, :mailer do
opts[:move_between_ids] = [issue1.id, issue2.id] opts[:move_between_ids] = [issue1.id, issue2.id]
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project.id) expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.root_namespace.id)
update_issue(opts) update_issue(opts)
expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position) expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position)
......
...@@ -35,7 +35,7 @@ RSpec.describe IssuePlacementWorker do ...@@ -35,7 +35,7 @@ RSpec.describe IssuePlacementWorker do
it 'schedules rebalancing if needed' do it 'schedules rebalancing if needed' do
issue_a.update!(relative_position: RelativePositioning::MAX_POSITION) issue_a.update!(relative_position: RelativePositioning::MAX_POSITION)
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project.id) expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id)
run_worker run_worker
end end
...@@ -101,7 +101,7 @@ RSpec.describe IssuePlacementWorker do ...@@ -101,7 +101,7 @@ RSpec.describe IssuePlacementWorker do
it 'anticipates the failure to place the issues, and schedules rebalancing' do it 'anticipates the failure to place the issues, and schedules rebalancing' do
allow(Issue).to receive(:move_nulls_to_end) { raise RelativePositioning::NoSpaceLeft } allow(Issue).to receive(:move_nulls_to_end) { raise RelativePositioning::NoSpaceLeft }
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project.id) expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id)
expect(Gitlab::ErrorTracking) expect(Gitlab::ErrorTracking)
.to receive(:log_exception) .to receive(:log_exception)
.with(RelativePositioning::NoSpaceLeft, worker_arguments) .with(RelativePositioning::NoSpaceLeft, worker_arguments)
......
...@@ -20,34 +20,83 @@ RSpec.describe IssueRebalancingWorker do ...@@ -20,34 +20,83 @@ RSpec.describe IssueRebalancingWorker do
end end
end end
shared_examples 'running the worker' do
it 'runs an instance of IssueRebalancingService' do it 'runs an instance of IssueRebalancingService' do
service = double(execute: nil) service = double(execute: nil)
expect(IssueRebalancingService).to receive(:new).with(issue).and_return(service) service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class)
described_class.new.perform(nil, issue.project_id) expect(IssueRebalancingService).to receive(:new).with(service_param).and_return(service)
end
it 'anticipates the inability to find the issue' do
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(ActiveRecord::RecordNotFound, include(project_id: -1))
expect(IssueRebalancingService).not_to receive(:new)
described_class.new.perform(nil, -1) described_class.new.perform(*arguments)
end end
it 'anticipates there being too many issues' do it 'anticipates there being too many issues' do
service = double service = double
allow(service).to receive(:execute) { raise IssueRebalancingService::TooManyIssues } service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class)
expect(IssueRebalancingService).to receive(:new).with(issue).and_return(service)
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(IssueRebalancingService::TooManyIssues, include(project_id: issue.project_id))
described_class.new.perform(nil, issue.project_id) allow(service).to receive(:execute).and_raise(IssueRebalancingService::TooManyIssues)
expect(IssueRebalancingService).to receive(:new).with(service_param).and_return(service)
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(IssueRebalancingService::TooManyIssues, include(project_id: arguments.second, root_namespace_id: arguments.third))
described_class.new.perform(*arguments)
end end
it 'takes no action if the value is nil' do it 'takes no action if the value is nil' do
expect(IssueRebalancingService).not_to receive(:new) expect(IssueRebalancingService).not_to receive(:new)
expect(Gitlab::ErrorTracking).not_to receive(:log_exception) expect(Gitlab::ErrorTracking).not_to receive(:log_exception)
described_class.new.perform(nil, nil) described_class.new.perform # all arguments are nil
end
end
shared_examples 'safely handles non-existent ids' do
it 'anticipates the inability to find the issue' do
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(ArgumentError, include(project_id: arguments.second, root_namespace_id: arguments.third))
expect(IssueRebalancingService).not_to receive(:new)
described_class.new.perform(*arguments)
end
end
context 'without root_namespace param' do
it_behaves_like 'running the worker' do
let(:arguments) { [-1, project.id] }
end
it_behaves_like 'safely handles non-existent ids' do
let(:arguments) { [nil, -1] }
end
include_examples 'an idempotent worker' do
let(:job_args) { [-1, project.id] }
end
include_examples 'an idempotent worker' do
let(:job_args) { [nil, -1] }
end end
end end
context 'with root_namespace param' do
it_behaves_like 'running the worker' do
let(:arguments) { [nil, nil, group.id] }
end
it_behaves_like 'safely handles non-existent ids' do
let(:arguments) { [nil, nil, -1] }
end
include_examples 'an idempotent worker' do
let(:job_args) { [nil, nil, group.id] }
end
include_examples 'an idempotent worker' do
let(:job_args) { [nil, nil, -1] }
end
end
end
it 'has the `until_executed` deduplicate strategy' do
expect(described_class.get_deduplicate_strategy).to eq(:until_executed)
expect(described_class.get_deduplication_options).to include({ including_scheduled: true })
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment