Commit e69b27c5 authored by Alex Kalderimis's avatar Alex Kalderimis

Change the placement worker query

The main thing is the change in the placement worker query to be a
most-recent sweep of unpositioned issues.

This also moves the worker invocation out of the new_issue_worker
parent 8df452d3
...@@ -21,6 +21,7 @@ module Issues ...@@ -21,6 +21,7 @@ module Issues
user = current_user user = current_user
issue.run_after_commit do issue.run_after_commit do
NewIssueWorker.perform_async(issue.id, user.id) NewIssueWorker.perform_async(issue.id, user.id)
IssuePlacementWorker.perform_async(issue.id)
end end
end end
......
...@@ -9,25 +9,27 @@ class IssuePlacementWorker ...@@ -9,25 +9,27 @@ class IssuePlacementWorker
worker_resource_boundary :cpu worker_resource_boundary :cpu
weight 2 weight 2
# Move at most the most recent 100 issues
QUERY_LIMIT = 100
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(issue_id, placement = :end) def perform(issue_id)
issue = Issue.find(issue_id) issue = Issue.id_in(issue_id).first
return unless issue
# here we are placing not just this issue, but it and all issues created since, and up to five-minutes before. # Move the most recent 100 unpositioned items to the end.
# This is to deal with out-of-order execution of the worker, while preserving creation order. # This is to deal with out-of-order execution of the worker,
# while preserving creation order.
to_place = Issue to_place = Issue
.relative_positioning_query_base(issue) .relative_positioning_query_base(issue)
.where(Issue.arel_table[:created_at].gteq(issue.created_at - 5.minutes)) .where(relative_position: nil)
.order_created_at_desc .order({ created_at: :desc }, { id: :desc })
.limit(QUERY_LIMIT)
if placement == :end Issue.move_nulls_to_end(to_place.to_a.reverse)
Issue.move_nulls_to_end(to_place.to_a.reverse) rescue RelativePositioning::NoSpaceLeft => e
elsif placement == :start Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id)
Issue.move_nulls_to_start(to_place.to_a) IssueRebalancingWorker.perform_async(nil, issue.project_id)
end
rescue ActiveRecord::RecordNotFound, RelativePositioning::NoSpaceLeft => e
Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, placement: placement)
IssueRebalancingWorker.perform_async(nil, issue.project_id) if issue && e.is_a?(RelativePositioning::NoSpaceLeft)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
end end
...@@ -15,7 +15,6 @@ class NewIssueWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -15,7 +15,6 @@ class NewIssueWorker # rubocop:disable Scalability/IdempotentWorker
::EventCreateService.new.open_issue(issuable, user) ::EventCreateService.new.open_issue(issuable, user)
::NotificationService.new.new_issue(issuable, user) ::NotificationService.new.new_issue(issuable, user)
issuable.create_cross_references!(user) issuable.create_cross_references!(user)
IssuePlacementWorker.perform_async(issue_id, :end)
end end
def issuable_class def issuable_class
......
...@@ -75,6 +75,12 @@ RSpec.describe Issues::CreateService do ...@@ -75,6 +75,12 @@ RSpec.describe Issues::CreateService do
expect(Todo.where(attributes).count).to eq 1 expect(Todo.where(attributes).count).to eq 1
end end
it 'moves the issue to the end, in an asynchronous worker' do
expect(IssuePlacementWorker).to receive(:perform_async).with(Integer)
described_class.new(project, user, opts).execute
end
context 'when label belongs to project group' do context 'when label belongs to project group' do
let(:group) { create(:group) } let(:group) { create(:group) }
let(:group_labels) { create_pair(:group_label, group: group) } let(:group_labels) { create_pair(:group_label, group: group) }
......
...@@ -4,38 +4,40 @@ require 'spec_helper' ...@@ -4,38 +4,40 @@ require 'spec_helper'
RSpec.describe IssuePlacementWorker do RSpec.describe IssuePlacementWorker do
describe '#perform' do describe '#perform' do
let_it_be(:time) { Time.now.utc }
let_it_be(:project) { create_default(:project) } let_it_be(:project) { create_default(:project) }
let_it_be(:issue) { create(:issue, relative_position: nil) } let_it_be(:issue) { create(:issue, relative_position: nil, created_at: time) }
let_it_be(:issue_a) { create(:issue, relative_position: nil, created_at: issue.created_at - 1.minute) } let_it_be(:issue_a) { create(:issue, relative_position: nil, created_at: time - 1.minute) }
let_it_be(:issue_b) { create(:issue, relative_position: nil, created_at: issue.created_at - 6.minutes) } let_it_be(:issue_b) { create(:issue, relative_position: nil, created_at: time - 6.minutes) }
let_it_be(:issue_c) { create(:issue, relative_position: nil, created_at: issue.created_at + 1.minute) } let_it_be(:issue_c) { create(:issue, relative_position: nil, created_at: time + 1.minute) }
let_it_be(:issue_d) { create(:issue, relative_position: nil, created_at: issue.created_at + 6.minutes) } let_it_be(:issue_d) { create(:issue, relative_position: nil, created_at: time + 6.minutes) }
let_it_be(:issue_e) { create(:issue, relative_position: 10, created_at: issue.created_at + 3.minutes) } let_it_be(:issue_e) { create(:issue, relative_position: 10, created_at: time + 3.minutes) }
let_it_be(:issue_f) { create(:issue, relative_position: nil, created_at: time + 1.minute) }
let_it_be(:irrelevant) { create(:issue, project: create(:project), created_at: issue.created_at - 30.seconds) } let_it_be(:irrelevant) { create(:issue, project: create(:project), relative_position: nil, created_at: time) }
it 'places all issues created at most 5 minutes before this one at the end, most recent last' do it 'places all issues created at most 5 minutes before this one at the end, most recent last' do
described_class.new.perform(issue.id, :end) expect do
described_class.new.perform(issue.id)
end.not_to change { irrelevant.reset.relative_position }
expect(project.issues.order_relative_position_asc) expect(project.issues.order_relative_position_asc)
.to eq([issue_e, issue_a, issue, issue_c, issue_d, issue_b]) .to eq([issue_e, issue_b, issue_a, issue, issue_c, issue_f, issue_d])
expect(project.issues.where(relative_position: nil)).not_to exist
end end
it 'places all issues created at most 5 minutes before this one at the start, most recent first' do it 'limits the sweep to QUERY_LIMIT records' do
described_class.new.perform(issue.id, :start) issues = create_list(:issue, described_class::QUERY_LIMIT + 1, relative_position: nil)
expect(project.issues.order_relative_position_asc) described_class.new.perform(issues.first.id)
.to eq([issue_d, issue_c, issue, issue_a, issue_e, issue_b])
expect(project.issues.where(relative_position: nil)).to exist
end end
it 'anticipates the failure to find the issue' do it 'anticipates the failure to find the issue' do
id = non_existing_record_id id = non_existing_record_id
expect(Gitlab::ErrorTracking) expect { described_class.new.perform(id) }.not_to raise_error
.to receive(:log_exception)
.with(ActiveRecord::RecordNotFound, issue_id: id, placement: :end)
described_class.new.perform(id, :end)
end end
it 'anticipates the failure to place the issues, and schedules rebalancing' do it 'anticipates the failure to place the issues, and schedules rebalancing' do
...@@ -44,9 +46,9 @@ RSpec.describe IssuePlacementWorker do ...@@ -44,9 +46,9 @@ RSpec.describe IssuePlacementWorker do
expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project.id) expect(IssueRebalancingWorker).to receive(:perform_async).with(nil, project.id)
expect(Gitlab::ErrorTracking) expect(Gitlab::ErrorTracking)
.to receive(:log_exception) .to receive(:log_exception)
.with(RelativePositioning::NoSpaceLeft, issue_id: issue.id, placement: :end) .with(RelativePositioning::NoSpaceLeft, issue_id: issue.id)
described_class.new.perform(issue.id, :end) described_class.new.perform(issue.id)
end end
end end
end end
...@@ -48,12 +48,6 @@ RSpec.describe NewIssueWorker do ...@@ -48,12 +48,6 @@ RSpec.describe NewIssueWorker do
expect { worker.perform(issue.id, user.id) }.to change { Event.count }.from(0).to(1) expect { worker.perform(issue.id, user.id) }.to change { Event.count }.from(0).to(1)
end end
it 'moves the issue to the end' do
expect(IssuePlacementWorker).to receive(:perform_async).with(issue.id, :end)
worker.perform(issue.id, user.id)
end
it 'creates a notification for the mentioned user' do it 'creates a notification for the mentioned user' do
expect(Notify).to receive(:new_issue_email).with(mentioned.id, issue.id, NotificationReason::MENTIONED) expect(Notify).to receive(:new_issue_email).with(mentioned.id, issue.id, NotificationReason::MENTIONED)
.and_return(double(deliver_later: true)) .and_return(double(deliver_later: true))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment