Commit ccb1fcf7 authored by Stan Hu's avatar Stan Hu

Merge branch 'move_issue_workers_to_issues_module' into 'master'

Move issues workers to a issues module

See merge request gitlab-org/gitlab!72802
parents c74386d9 1af4b99e
...@@ -2285,6 +2285,24 @@ ...@@ -2285,6 +2285,24 @@
:weight: 1 :weight: 1
:idempotent: true :idempotent: true
:tags: [] :tags: []
- :name: issues_placement
:worker_name: Issues::PlacementWorker
:feature_category: :issue_tracking
:has_external_dependencies:
:urgency: :high
:resource_boundary: :cpu
:weight: 2
:idempotent: true
:tags: []
- :name: issues_rebalancing
:worker_name: Issues::RebalancingWorker
:feature_category: :issue_tracking
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: mailers - :name: mailers
:worker_name: ActionMailer::MailDeliveryJob :worker_name: ActionMailer::MailDeliveryJob
:feature_category: :not_owned :feature_category: :not_owned
......
# frozen_string_literal: true # frozen_string_literal: true
# todo: remove this worker and it's queue definition from all_queues after Issues::PlacementWorker is deployed
# We want to keep it for one release in case some jobs are already scheduled in the old queue so we need the worker
# to be available to finish those. All new jobs will be queued into the new queue.
class IssuePlacementWorker class IssuePlacementWorker
include ApplicationWorker include ApplicationWorker
......
# frozen_string_literal: true # frozen_string_literal: true
# todo: remove this worker and it's queue definition from all_queues after Issue::RebalancingWorker is released.
# We want to keep it for one release in case some jobs are already scheduled in the old queue so we need the worker
# to be available to finish those. All new jobs will be queued into the new queue.
class IssueRebalancingWorker class IssueRebalancingWorker
include ApplicationWorker include ApplicationWorker
......
# frozen_string_literal: true
module Issues
class PlacementWorker
include ApplicationWorker
data_consistency :always
sidekiq_options retry: 3
idempotent!
deduplicate :until_executed, including_scheduled: true
feature_category :issue_tracking
urgency :high
worker_resource_boundary :cpu
weight 2
# Move at most the most recent 100 issues
QUERY_LIMIT = 100
# rubocop: disable CodeReuse/ActiveRecord
def perform(issue_id, project_id = nil)
issue = find_issue(issue_id, project_id)
return unless issue
# Temporary disable moving null elements because of performance problems
# For more information check https://gitlab.com/gitlab-com/gl-infra/production/-/issues/4321
return if issue.blocked_for_repositioning?
# Move the oldest 100 unpositioned items to the end.
# This is to deal with out-of-order execution of the worker,
# while preserving creation order.
to_place = Issue
.relative_positioning_query_base(issue)
.with_null_relative_position
.order({ created_at: :asc }, { id: :asc })
.limit(QUERY_LIMIT + 1)
.to_a
leftover = to_place.pop if to_place.count > QUERY_LIMIT
Issue.move_nulls_to_end(to_place)
Issues::BaseService.new(project: nil).rebalance_if_needed(to_place.max_by(&:relative_position))
Issues::PlacementWorker.perform_async(nil, leftover.project_id) if leftover.present?
rescue RelativePositioning::NoSpaceLeft => e
Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, project_id: project_id)
Issues::RebalancingWorker.perform_async(nil, *root_namespace_id_to_rebalance(issue, project_id))
end
def find_issue(issue_id, project_id)
return Issue.id_in(issue_id).take if issue_id
project = Project.id_in(project_id).take
return unless project
project.issues.take
end
# rubocop: enable CodeReuse/ActiveRecord
private
def root_namespace_id_to_rebalance(issue, project_id)
project_id = project_id.presence || issue.project_id
Project.find(project_id)&.self_or_root_group_ids
end
end
end
# frozen_string_literal: true
module Issues
class RebalancingWorker
include ApplicationWorker
data_consistency :always
sidekiq_options retry: 3
idempotent!
urgency :low
feature_category :issue_tracking
deduplicate :until_executed, including_scheduled: true
def perform(ignore = nil, project_id = nil, root_namespace_id = nil)
# we need to have exactly one of the project_id and root_namespace_id params be non-nil
raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id
return if project_id.nil? && root_namespace_id.nil?
# pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce)
# or the root namespace, this also makes the worker backward compatible with previous version where a project_id was
# passed as the param
projects_to_rebalance = projects_collection(project_id, root_namespace_id)
# something might have happened with the namespace between scheduling the worker and actually running it,
# maybe it was removed.
if projects_to_rebalance.blank?
Gitlab::ErrorTracking.log_exception(
ArgumentError.new("Projects to be rebalanced not found for arguments: project_id #{project_id}, root_namespace_id: #{root_namespace_id}"),
{ project_id: project_id, root_namespace_id: root_namespace_id })
return
end
Issues::RelativePositionRebalancingService.new(projects_to_rebalance).execute
rescue Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances => e
Gitlab::ErrorTracking.log_exception(e, root_namespace_id: root_namespace_id, project_id: project_id)
end
private
def projects_collection(project_id, root_namespace_id)
# we can have either project_id(older version) or project_id if project is part of a user namespace and not a group
# or root_namespace_id(newer version) never both.
return Project.id_in([project_id]) if project_id
Namespace.find_by_id(root_namespace_id)&.all_projects
end
end
end
...@@ -213,6 +213,10 @@ ...@@ -213,6 +213,10 @@
- 2 - 2
- - issue_rebalancing - - issue_rebalancing
- 1 - 1
- - issues_placement
- 2
- - issues_rebalancing
- 1
- - iterations - - iterations
- 1 - 1
- - jira_connect - - jira_connect
......
...@@ -316,6 +316,8 @@ RSpec.describe 'Every Sidekiq worker' do ...@@ -316,6 +316,8 @@ RSpec.describe 'Every Sidekiq worker' do
'IssuableExportCsvWorker' => 3, 'IssuableExportCsvWorker' => 3,
'IssuePlacementWorker' => 3, 'IssuePlacementWorker' => 3,
'IssueRebalancingWorker' => 3, 'IssueRebalancingWorker' => 3,
'Issues::PlacementWorker' => 3,
'Issues::RebalancingWorker' => 3,
'IterationsUpdateStatusWorker' => 3, 'IterationsUpdateStatusWorker' => 3,
'JiraConnect::SyncBranchWorker' => 3, 'JiraConnect::SyncBranchWorker' => 3,
'JiraConnect::SyncBuildsWorker' => 3, 'JiraConnect::SyncBuildsWorker' => 3,
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Issues::PlacementWorker do
describe '#perform' do
let_it_be(:time) { Time.now.utc }
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) }
let_it_be(:author) { create(:user) }
let_it_be(:common_attrs) { { author: author, project: project } }
let_it_be(:unplaced) { common_attrs.merge(relative_position: nil) }
let_it_be_with_reload(:issue) { create(:issue, **unplaced, created_at: time) }
let_it_be_with_reload(:issue_a) { create(:issue, **unplaced, created_at: time - 1.minute) }
let_it_be_with_reload(:issue_b) { create(:issue, **unplaced, created_at: time - 2.minutes) }
let_it_be_with_reload(:issue_c) { create(:issue, **unplaced, created_at: time + 1.minute) }
let_it_be_with_reload(:issue_d) { create(:issue, **unplaced, created_at: time + 2.minutes) }
let_it_be_with_reload(:issue_e) { create(:issue, **common_attrs, relative_position: 10, created_at: time + 1.minute) }
let_it_be_with_reload(:issue_f) { create(:issue, **unplaced, created_at: time + 1.minute) }
let_it_be(:irrelevant) { create(:issue, relative_position: nil, created_at: time) }
shared_examples 'running the issue placement worker' do
let(:issue_id) { issue.id }
let(:project_id) { project.id }
it 'places all issues created at most 5 minutes before this one at the end, most recent last' do
expect { run_worker }.not_to change { irrelevant.reset.relative_position }
expect(project.issues.order_by_relative_position)
.to eq([issue_e, issue_b, issue_a, issue, issue_c, issue_f, issue_d])
expect(project.issues.where(relative_position: nil)).not_to exist
end
it 'schedules rebalancing if needed' do
issue_a.update!(relative_position: RelativePositioning::MAX_POSITION)
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id)
run_worker
end
context 'there are more than QUERY_LIMIT unplaced issues' do
before_all do
# Ensure there are more than N issues in this set
n = described_class::QUERY_LIMIT
create_list(:issue, n - 5, **unplaced)
end
it 'limits the sweep to QUERY_LIMIT records, and reschedules placement' do
expect(Issue).to receive(:move_nulls_to_end)
.with(have_attributes(count: described_class::QUERY_LIMIT))
.and_call_original
expect(described_class).to receive(:perform_async).with(nil, project.id)
run_worker
expect(project.issues.where(relative_position: nil)).to exist
end
it 'is eventually correct' do
prefix = project.issues.where.not(relative_position: nil).order(:relative_position).to_a
moved = project.issues.where.not(id: prefix.map(&:id))
run_worker
expect(project.issues.where(relative_position: nil)).to exist
run_worker
expect(project.issues.where(relative_position: nil)).not_to exist
expect(project.issues.order(:relative_position)).to eq(prefix + moved.order(:created_at, :id))
end
end
context 'we are passed bad IDs' do
let(:issue_id) { non_existing_record_id }
let(:project_id) { non_existing_record_id }
def max_positions_by_project
Issue
.group(:project_id)
.pluck(:project_id, Issue.arel_table[:relative_position].maximum.as('max_relative_position'))
.to_h
end
it 'does move any issues to the end' do
expect { run_worker }.not_to change { max_positions_by_project }
end
context 'the project_id refers to an empty project' do
let!(:project_id) { create(:project).id }
it 'does move any issues to the end' do
expect { run_worker }.not_to change { max_positions_by_project }
end
end
end
it 'anticipates the failure to place the issues, and schedules rebalancing' do
allow(Issue).to receive(:move_nulls_to_end) { raise RelativePositioning::NoSpaceLeft }
expect(Issues::RebalancingWorker).to receive(:perform_async).with(nil, nil, project.group.id)
expect(Gitlab::ErrorTracking)
.to receive(:log_exception)
.with(RelativePositioning::NoSpaceLeft, worker_arguments)
run_worker
end
end
context 'passing an issue ID' do
def run_worker
described_class.new.perform(issue_id)
end
let(:worker_arguments) { { issue_id: issue_id, project_id: nil } }
it_behaves_like 'running the issue placement worker'
context 'when block_issue_repositioning is enabled' do
let(:issue_id) { issue.id }
let(:project_id) { project.id }
before do
stub_feature_flags(block_issue_repositioning: group)
end
it 'does not run repositioning tasks' do
expect { run_worker }.not_to change { issue.reset.relative_position }
end
end
end
context 'passing a project ID' do
def run_worker
described_class.new.perform(nil, project_id)
end
let(:worker_arguments) { { issue_id: nil, project_id: project_id } }
it_behaves_like 'running the issue placement worker'
end
end
it 'has the `until_executed` deduplicate strategy' do
expect(described_class.get_deduplicate_strategy).to eq(:until_executed)
expect(described_class.get_deduplication_options).to include({ including_scheduled: true })
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Issues::RebalancingWorker do
describe '#perform' do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) }
let_it_be(:issue) { create(:issue, project: project) }
shared_examples 'running the worker' do
it 'runs an instance of Issues::RelativePositionRebalancingService' do
service = double(execute: nil)
service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class)
expect(Issues::RelativePositionRebalancingService).to receive(:new).with(service_param).and_return(service)
described_class.new.perform(*arguments)
end
it 'anticipates there being too many concurent rebalances' do
service = double
service_param = arguments.second.present? ? kind_of(Project.id_in([project]).class) : kind_of(group&.all_projects.class)
allow(service).to receive(:execute).and_raise(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances)
expect(Issues::RelativePositionRebalancingService).to receive(:new).with(service_param).and_return(service)
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(Issues::RelativePositionRebalancingService::TooManyConcurrentRebalances, include(project_id: arguments.second, root_namespace_id: arguments.third))
described_class.new.perform(*arguments)
end
it 'takes no action if the value is nil' do
expect(Issues::RelativePositionRebalancingService).not_to receive(:new)
expect(Gitlab::ErrorTracking).not_to receive(:log_exception)
described_class.new.perform # all arguments are nil
end
end
shared_examples 'safely handles non-existent ids' do
it 'anticipates the inability to find the issue' do
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(ArgumentError, include(project_id: arguments.second, root_namespace_id: arguments.third))
expect(Issues::RelativePositionRebalancingService).not_to receive(:new)
described_class.new.perform(*arguments)
end
end
context 'without root_namespace param' do
it_behaves_like 'running the worker' do
let(:arguments) { [-1, project.id] }
end
it_behaves_like 'safely handles non-existent ids' do
let(:arguments) { [nil, -1] }
end
include_examples 'an idempotent worker' do
let(:job_args) { [-1, project.id] }
end
include_examples 'an idempotent worker' do
let(:job_args) { [nil, -1] }
end
end
context 'with root_namespace param' do
it_behaves_like 'running the worker' do
let(:arguments) { [nil, nil, group.id] }
end
it_behaves_like 'safely handles non-existent ids' do
let(:arguments) { [nil, nil, -1] }
end
include_examples 'an idempotent worker' do
let(:job_args) { [nil, nil, group.id] }
end
include_examples 'an idempotent worker' do
let(:job_args) { [nil, nil, -1] }
end
end
end
it 'has the `until_executed` deduplicate strategy' do
expect(described_class.get_deduplicate_strategy).to eq(:until_executed)
expect(described_class.get_deduplication_options).to include({ including_scheduled: true })
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment