Commit e5f8fe3c authored by Alexandru Croitor's avatar Alexandru Croitor

Enqueue new issues workers vs old ones

We've introduced new issues workers in a previous MR
https://gitlab.com/gitlab-org/gitlab/-/merge_requests/72802
that now comply to NamespacedClass cop offences tracked in
issue https://gitlab.com/gitlab-org/gitlab/-/issues/321982.

Now that we have the new workers in production, we can change
enqueueing jobs with new workers.

We still keep the old workers around in case some of the jobs
are still in old queues and need to be processed by the old
workers. We'll be removing the old workers in a future release.
parent 5cd2c8e1
...@@ -605,7 +605,7 @@ class Issue < ApplicationRecord ...@@ -605,7 +605,7 @@ class Issue < ApplicationRecord
def could_not_move(exception) def could_not_move(exception)
# Symptom of running out of space - schedule rebalancing # Symptom of running out of space - schedule rebalancing
IssueRebalancingWorker.perform_async(nil, *project.self_or_root_group_ids) Issues::RebalancingWorker.perform_async(nil, *project.self_or_root_group_ids)
end end
end end
......
...@@ -30,7 +30,7 @@ module Issues ...@@ -30,7 +30,7 @@ module Issues
gates = [issue.project, issue.project.group].compact gates = [issue.project, issue.project.group].compact
return unless gates.any? { |gate| Feature.enabled?(:rebalance_issues, gate) } return unless gates.any? { |gate| Feature.enabled?(:rebalance_issues, gate) }
IssueRebalancingWorker.perform_async(nil, *issue.project.self_or_root_group_ids) Issues::RebalancingWorker.perform_async(nil, *issue.project.self_or_root_group_ids)
end end
private private
......
...@@ -41,7 +41,7 @@ module Issues ...@@ -41,7 +41,7 @@ module Issues
user = current_user user = current_user
issue.run_after_commit do issue.run_after_commit do
NewIssueWorker.perform_async(issue.id, user.id) NewIssueWorker.perform_async(issue.id, user.id)
IssuePlacementWorker.perform_async(nil, issue.project_id) Issues::PlacementWorker.perform_async(nil, issue.project_id)
Namespaces::OnboardingIssueCreatedWorker.perform_async(issue.namespace.id) Namespaces::OnboardingIssueCreatedWorker.perform_async(issue.namespace.id)
end end
end end
......
# frozen_string_literal: true # frozen_string_literal: true
# DEPRECATED. Will be removed in 14.7 https://gitlab.com/gitlab-org/gitlab/-/merge_requests/72803
# Please use Issues::PlacementWorker instead
#
# todo: remove this worker and it's queue definition from all_queues after Issues::PlacementWorker is deployed # todo: remove this worker and it's queue definition from all_queues after Issues::PlacementWorker is deployed
# We want to keep it for one release in case some jobs are already scheduled in the old queue so we need the worker # We want to keep it for one release in case some jobs are already scheduled in the old queue so we need the worker
# to be available to finish those. All new jobs will be queued into the new queue. # to be available to finish those. All new jobs will be queued into the new queue.
...@@ -43,10 +46,10 @@ class IssuePlacementWorker ...@@ -43,10 +46,10 @@ class IssuePlacementWorker
Issue.move_nulls_to_end(to_place) Issue.move_nulls_to_end(to_place)
Issues::BaseService.new(project: nil).rebalance_if_needed(to_place.max_by(&:relative_position)) Issues::BaseService.new(project: nil).rebalance_if_needed(to_place.max_by(&:relative_position))
IssuePlacementWorker.perform_async(nil, leftover.project_id) if leftover.present? Issues::PlacementWorker.perform_async(nil, leftover.project_id) if leftover.present?
rescue RelativePositioning::NoSpaceLeft => e rescue RelativePositioning::NoSpaceLeft => e
Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, project_id: project_id) Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, project_id: project_id)
IssueRebalancingWorker.perform_async(nil, *root_namespace_id_to_rebalance(issue, project_id)) Issues::RebalancingWorker.perform_async(nil, *root_namespace_id_to_rebalance(issue, project_id))
end end
def find_issue(issue_id, project_id) def find_issue(issue_id, project_id)
......
# frozen_string_literal: true # frozen_string_literal: true
# DEPRECATED. Will be removed in 14.7 https://gitlab.com/gitlab-org/gitlab/-/merge_requests/72803
# Please use Issues::RebalancingWorker instead
#
# todo: remove this worker and it's queue definition from all_queues after Issue::RebalancingWorker is released. # todo: remove this worker and it's queue definition from all_queues after Issue::RebalancingWorker is released.
# We want to keep it for one release in case some jobs are already scheduled in the old queue so we need the worker # We want to keep it for one release in case some jobs are already scheduled in the old queue so we need the worker
# to be available to finish those. All new jobs will be queued into the new queue. # to be available to finish those. All new jobs will be queued into the new queue.
......
...@@ -17,6 +17,7 @@ module Issues ...@@ -17,6 +17,7 @@ module Issues
# we need to have exactly one of the project_id and root_namespace_id params be non-nil # we need to have exactly one of the project_id and root_namespace_id params be non-nil
raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id
return if project_id.nil? && root_namespace_id.nil? return if project_id.nil? && root_namespace_id.nil?
return if ::Gitlab::Issues::Rebalancing::State.rebalance_recently_finished?(project_id, root_namespace_id)
# pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce) # pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce)
# or the root namespace, this also makes the worker backward compatible with previous version where a project_id was # or the root namespace, this also makes the worker backward compatible with previous version where a project_id was
......
...@@ -20,13 +20,13 @@ module Issues ...@@ -20,13 +20,13 @@ module Issues
namespaces = Namespace.id_in(namespace_ids) namespaces = Namespace.id_in(namespace_ids)
projects = Project.id_in(project_ids) projects = Project.id_in(project_ids)
IssueRebalancingWorker.bulk_perform_async_with_contexts( Issues::RebalancingWorker.bulk_perform_async_with_contexts(
namespaces, namespaces,
arguments_proc: -> (namespace) { [nil, nil, namespace.id] }, arguments_proc: -> (namespace) { [nil, nil, namespace.id] },
context_proc: -> (namespace) { { namespace: namespace } } context_proc: -> (namespace) { { namespace: namespace } }
) )
IssueRebalancingWorker.bulk_perform_async_with_contexts( Issues::RebalancingWorker.bulk_perform_async_with_contexts(
projects, projects,
arguments_proc: -> (project) { [nil, project.id, nil] }, arguments_proc: -> (project) { [nil, project.id, nil] },
context_proc: -> (project) { { project: project } } context_proc: -> (project) { { project: project } }
......
...@@ -1462,7 +1462,7 @@ RSpec.describe Issue do ...@@ -1462,7 +1462,7 @@ RSpec.describe Issue do
it 'schedules rebalancing if there is no space left' do it 'schedules rebalancing if there is no space left' do
lhs = build_stubbed(:issue, relative_position: 99, project: project) lhs = build_stubbed(:issue, relative_position: 99, project: project)
to_move = build(:issue, project: project) to_move = build(:issue, project: project)
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project_id, namespace_id) expect(Issues::RebalancingWorker).to receive(:perform_async).with(nil, project_id, namespace_id)
expect { to_move.move_between(lhs, issue) }.to raise_error(RelativePositioning::NoSpaceLeft) expect { to_move.move_between(lhs, issue) }.to raise_error(RelativePositioning::NoSpaceLeft)
end end
......
...@@ -154,7 +154,7 @@ RSpec.describe Issues::CreateService do ...@@ -154,7 +154,7 @@ RSpec.describe Issues::CreateService do
end end
it 'moves the issue to the end, in an asynchronous worker' do it 'moves the issue to the end, in an asynchronous worker' do
expect(IssuePlacementWorker).to receive(:perform_async).with(be_nil, Integer) expect(Issues::PlacementWorker).to receive(:perform_async).with(be_nil, Integer)
described_class.new(project: project, current_user: user, params: opts, spam_params: spam_params).execute described_class.new(project: project, current_user: user, params: opts, spam_params: spam_params).execute
end end
......
...@@ -319,7 +319,7 @@ RSpec.describe Issues::UpdateService, :mailer do ...@@ -319,7 +319,7 @@ RSpec.describe Issues::UpdateService, :mailer do
opts[:move_between_ids] = [issue1.id, issue2.id] opts[:move_between_ids] = [issue1.id, issue2.id]
expect(IssueRebalancingWorker).not_to receive(:perform_async) expect(Issues::RebalancingWorker).not_to receive(:perform_async)
update_issue(opts) update_issue(opts)
expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position) expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position)
...@@ -335,7 +335,7 @@ RSpec.describe Issues::UpdateService, :mailer do ...@@ -335,7 +335,7 @@ RSpec.describe Issues::UpdateService, :mailer do
opts[:move_between_ids] = [issue1.id, issue2.id] opts[:move_between_ids] = [issue1.id, issue2.id]
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.root_namespace.id) expect(Issues::RebalancingWorker).to receive(:perform_async).with(nil, nil, project.root_namespace.id)
update_issue(opts) update_issue(opts)
expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position) expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position)
...@@ -349,7 +349,7 @@ RSpec.describe Issues::UpdateService, :mailer do ...@@ -349,7 +349,7 @@ RSpec.describe Issues::UpdateService, :mailer do
opts[:move_between_ids] = [issue1.id, issue2.id] opts[:move_between_ids] = [issue1.id, issue2.id]
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.root_namespace.id) expect(Issues::RebalancingWorker).to receive(:perform_async).with(nil, nil, project.root_namespace.id)
update_issue(opts) update_issue(opts)
expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position) expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position)
...@@ -363,7 +363,7 @@ RSpec.describe Issues::UpdateService, :mailer do ...@@ -363,7 +363,7 @@ RSpec.describe Issues::UpdateService, :mailer do
opts[:move_between_ids] = [issue1.id, issue2.id] opts[:move_between_ids] = [issue1.id, issue2.id]
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.root_namespace.id) expect(Issues::RebalancingWorker).to receive(:perform_async).with(nil, nil, project.root_namespace.id)
update_issue(opts) update_issue(opts)
expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position) expect(issue.relative_position).to be_between(issue1.relative_position, issue2.relative_position)
......
...@@ -35,7 +35,7 @@ RSpec.describe IssuePlacementWorker do ...@@ -35,7 +35,7 @@ RSpec.describe IssuePlacementWorker do
it 'schedules rebalancing if needed' do it 'schedules rebalancing if needed' do
issue_a.update!(relative_position: RelativePositioning::MAX_POSITION) issue_a.update!(relative_position: RelativePositioning::MAX_POSITION)
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id) expect(Issues::RebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id)
run_worker run_worker
end end
...@@ -52,7 +52,7 @@ RSpec.describe IssuePlacementWorker do ...@@ -52,7 +52,7 @@ RSpec.describe IssuePlacementWorker do
.with(have_attributes(count: described_class::QUERY_LIMIT)) .with(have_attributes(count: described_class::QUERY_LIMIT))
.and_call_original .and_call_original
expect(described_class).to receive(:perform_async).with(nil, project.id) expect(Issues::PlacementWorker).to receive(:perform_async).with(nil, project.id)
run_worker run_worker
...@@ -101,7 +101,7 @@ RSpec.describe IssuePlacementWorker do ...@@ -101,7 +101,7 @@ RSpec.describe IssuePlacementWorker do
it 'anticipates the failure to place the issues, and schedules rebalancing' do it 'anticipates the failure to place the issues, and schedules rebalancing' do
allow(Issue).to receive(:move_nulls_to_end) { raise RelativePositioning::NoSpaceLeft } allow(Issue).to receive(:move_nulls_to_end) { raise RelativePositioning::NoSpaceLeft }
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id) expect(Issues::RebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id)
expect(Gitlab::ErrorTracking) expect(Gitlab::ErrorTracking)
.to receive(:log_exception) .to receive(:log_exception)
.with(RelativePositioning::NoSpaceLeft, worker_arguments) .with(RelativePositioning::NoSpaceLeft, worker_arguments)
......
...@@ -35,7 +35,7 @@ RSpec.describe Issues::PlacementWorker do ...@@ -35,7 +35,7 @@ RSpec.describe Issues::PlacementWorker do
it 'schedules rebalancing if needed' do it 'schedules rebalancing if needed' do
issue_a.update!(relative_position: RelativePositioning::MAX_POSITION) issue_a.update!(relative_position: RelativePositioning::MAX_POSITION)
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id) expect(Issues::RebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id)
run_worker run_worker
end end
......
...@@ -35,6 +35,20 @@ RSpec.describe Issues::RebalancingWorker do ...@@ -35,6 +35,20 @@ RSpec.describe Issues::RebalancingWorker do
described_class.new.perform # all arguments are nil described_class.new.perform # all arguments are nil
end end
it 'does not schedule a new rebalance if it finished under 1h ago' do
container_type = arguments.second.present? ? ::Gitlab::Issues::Rebalancing::State::PROJECT : ::Gitlab::Issues::Rebalancing::State::NAMESPACE
container_id = arguments.second || arguments.third
Gitlab::Redis::SharedState.with do |redis|
redis.set(::Gitlab::Issues::Rebalancing::State.send(:recently_finished_key, container_type, container_id), true)
end
expect(Issues::RelativePositionRebalancingService).not_to receive(:new)
expect(Gitlab::ErrorTracking).not_to receive(:log_exception)
described_class.new.perform(*arguments)
end
end end
shared_examples 'safely handles non-existent ids' do shared_examples 'safely handles non-existent ids' do
......
...@@ -10,15 +10,15 @@ RSpec.describe Issues::RescheduleStuckIssueRebalancesWorker, :clean_gitlab_redis ...@@ -10,15 +10,15 @@ RSpec.describe Issues::RescheduleStuckIssueRebalancesWorker, :clean_gitlab_redis
describe '#perform' do describe '#perform' do
it 'does not schedule a rebalance' do it 'does not schedule a rebalance' do
expect(IssueRebalancingWorker).not_to receive(:perform_async) expect(Issues::RebalancingWorker).not_to receive(:perform_async)
worker.perform worker.perform
end end
it 'schedules a rebalance in case there are any rebalances started' do it 'schedules a rebalance in case there are any rebalances started' do
expect(::Gitlab::Issues::Rebalancing::State).to receive(:fetch_rebalancing_groups_and_projects).and_return([[group.id], [project.id]]) expect(::Gitlab::Issues::Rebalancing::State).to receive(:fetch_rebalancing_groups_and_projects).and_return([[group.id], [project.id]])
expect(IssueRebalancingWorker).to receive(:bulk_perform_async).with([[nil, nil, group.id]]).once expect(Issues::RebalancingWorker).to receive(:bulk_perform_async).with([[nil, nil, group.id]]).once
expect(IssueRebalancingWorker).to receive(:bulk_perform_async).with([[nil, project.id, nil]]).once expect(Issues::RebalancingWorker).to receive(:bulk_perform_async).with([[nil, project.id, nil]]).once
worker.perform worker.perform
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment