Commit 185fea33 authored by Alex Kalderimis's avatar Alex Kalderimis

Ensure that we always run the update worker

Due to a bug in our sidekiq middleware, post-update actions were not
being applied after changing assignees. This is fixed by changing
the format of the arguments to the background worker.

An unused worker is deleted, and we opt-in to the use of specialized
workers in the API, meaning we can delete some duplicated code.

Details of the fix:

This is vital to getting correct deduplication behavior in our sidekiq
middleware. The current logic is:

- CLIENT creates hash from worker queue name and arguments (calling
  `#to_s`)
- a job matching that hash is already running (in the working queue), it
  is not enqueued by the client.
- otherwise, the job is serialized and sent to the server
- SERVER pops the job and deserializes it
- the server runs the job
- The server calculates the deduplication hash, and deletes it, allowing
  similar jobs to be enqueued again.

This protocol relies on serialization being completely isomorphic. i.e.
`load . serialize === id`. If not, then the two hashes will not
match, and the deduplication block will remain up, and not be taken down
by the server.

The workaround in our case is to use a round-trippable hashmap (i.e.
string keys rather than symbols), since:

```
JSON.load({ 'x' => true}.to_json) == { 'x' => true }
```

but

```
JSON.load({ x: => true}.to_json) /= { x: => true }
```

See: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1090

Changelog: fixed
parent 0c2ac180
......@@ -8,7 +8,7 @@ module MergeRequests
merge_request.id,
current_user.id,
old_assignees.map(&:id),
options
options.stringify_keys # see: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1090
)
end
......
......@@ -9,9 +9,11 @@ module MergeRequests
def execute(merge_request)
return merge_request unless current_user&.can?(:update_merge_request, merge_request)
old_assignees = merge_request.assignees
old_assignees = merge_request.assignees.to_a
old_ids = old_assignees.map(&:id)
new_ids = new_assignee_ids(merge_request)
return merge_request if merge_request.errors.any?
return merge_request if new_ids.size != update_attrs[:assignee_ids].size
return merge_request if old_ids.to_set == new_ids.to_set # no-change
......@@ -30,8 +32,11 @@ module MergeRequests
def new_assignee_ids(merge_request)
# prime the cache - prevent N+1 lookup during authorization loop.
merge_request.project.team.max_member_access_for_user_ids(update_attrs[:assignee_ids])
User.id_in(update_attrs[:assignee_ids]).map do |user|
user_ids = update_attrs[:assignee_ids]
return [] if user_ids.empty?
merge_request.project.team.max_member_access_for_user_ids(user_ids)
User.id_in(user_ids).map do |user|
if user.can?(:read_merge_request, merge_request)
user.id
else
......
......@@ -293,7 +293,7 @@ module MergeRequests
def attempt_specialized_update_services(merge_request, attribute)
case attribute
when :assignee_ids
when :assignee_ids, :assignee_id
assignees_service.execute(merge_request)
when :spend_time
add_time_spent_service.execute(merge_request)
......
......@@ -2283,15 +2283,6 @@
:weight: 1
:idempotent: true
:tags: []
- :name: merge_requests_assignees_change
:worker_name: MergeRequests::AssigneesChangeWorker
:feature_category: :source_code_management
:has_external_dependencies:
:urgency: :high
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: merge_requests_delete_source_branch
:worker_name: MergeRequests::DeleteSourceBranchWorker
:feature_category: :source_code_management
......
# frozen_string_literal: true
class MergeRequests::AssigneesChangeWorker
include ApplicationWorker
sidekiq_options retry: 3
feature_category :source_code_management
urgency :high
deduplicate :until_executed
idempotent!
def perform(merge_request_id, user_id, old_assignee_ids)
merge_request = MergeRequest.find(merge_request_id)
current_user = User.find(user_id)
# if a user was added and then removed, or removed and then added
# while waiting for this job to run, assume that nothing happened.
users = User.id_in(old_assignee_ids - merge_request.assignee_ids)
return if users.blank?
::MergeRequests::HandleAssigneesChangeService
.new(project: merge_request.target_project, current_user: current_user)
.execute(merge_request, users, execute_hooks: true)
rescue ActiveRecord::RecordNotFound
end
end
---
title: Ensure post-update actions are applied when assignees change
merge_request: 61897
author:
type: fixed
......@@ -218,8 +218,6 @@
- 1
- - merge_request_reset_approvals
- 1
- - merge_requests_assignees_change
- 1
- - merge_requests_delete_source_branch
- 1
- - merge_requests_handle_assignees_change
......
......@@ -436,14 +436,11 @@ module API
mr_params = declared_params(include_missing: false)
mr_params[:force_remove_source_branch] = mr_params.delete(:remove_source_branch) if mr_params.has_key?(:remove_source_branch)
mr_params = convert_parameters_from_legacy_format(mr_params)
mr_params[:use_specialized_service] = true
service = if mr_params.one? && (mr_params.keys & %i[assignee_id assignee_ids]).one?
::MergeRequests::UpdateAssigneesService
else
::MergeRequests::UpdateService
end
merge_request = service.new(project: user_project, current_user: current_user, params: mr_params).execute(merge_request)
merge_request = ::MergeRequests::UpdateService
.new(project: user_project, current_user: current_user, params: mr_params)
.execute(merge_request)
handle_merge_request_errors!(merge_request)
......
......@@ -2174,6 +2174,12 @@ RSpec.describe API::MergeRequests do
a_hash_including('name' => user2.name)
)
end
it 'creates appropriate system notes', :sidekiq_inline do
put api("/projects/#{project.id}/merge_requests/#{merge_request.iid}", user), params: params
expect(merge_request.notes.system.last.note).to include("assigned to #{user2.to_reference}")
end
end
context 'when assignee_id=user2.id' do
......@@ -2193,6 +2199,27 @@ RSpec.describe API::MergeRequests do
end
end
context 'when assignee_id=0' do
let(:params) do
{
assignee_id: 0
}
end
it 'clears the assignees' do
put api("/projects/#{project.id}/merge_requests/#{merge_request.iid}", user), params: params
expect(response).to have_gitlab_http_status(:ok)
expect(json_response['assignees']).to be_empty
end
it 'creates appropriate system notes', :sidekiq_inline do
put api("/projects/#{project.id}/merge_requests/#{merge_request.iid}", user), params: params
expect(merge_request.notes.system.last.note).to include("unassigned #{user.to_reference}")
end
end
context 'when only assignee_ids are provided, and the list is empty' do
let(:params) do
{
......
......@@ -6,7 +6,7 @@ RSpec.describe MergeRequests::HandleAssigneesChangeService do
let_it_be(:project) { create(:project, :repository) }
let_it_be(:user) { create(:user) }
let_it_be(:assignee) { create(:user) }
let_it_be(:merge_request) { create(:merge_request, author: user, source_project: project, assignees: [assignee]) }
let_it_be_with_reload(:merge_request) { create(:merge_request, author: user, source_project: project, assignees: [assignee]) }
let_it_be(:old_assignees) { create_list(:user, 3) }
let(:options) { {} }
......@@ -45,13 +45,27 @@ RSpec.describe MergeRequests::HandleAssigneesChangeService do
service.execute(merge_request, old_assignees, options)
end
let(:note) { merge_request.notes.system.last }
let(:removed_note) { "unassigned #{old_assignees.map(&:to_reference).to_sentence}" }
context 'when unassigning all users' do
before do
merge_request.update!(assignee_ids: [])
end
it 'creates assignee note' do
execute
note = merge_request.notes.last
expect(note).not_to be_nil
expect(note.note).to eq removed_note
end
end
it 'creates assignee note' do
execute
expect(note).not_to be_nil
expect(note.note).to include "assigned to #{assignee.to_reference} and unassigned #{old_assignees.map(&:to_reference).to_sentence}"
expect(note.note).to include "assigned to #{assignee.to_reference} and #{removed_note}"
end
it 'sends email notifications to old and new assignees', :mailer, :sidekiq_inline do
......
......@@ -17,6 +17,7 @@ RSpec.describe MergeRequests::UpdateAssigneesService do
description: "FYI #{user2.to_reference}",
assignee_ids: [user3.id],
source_project: project,
target_project: project,
author: create(:user))
end
......@@ -24,6 +25,7 @@ RSpec.describe MergeRequests::UpdateAssigneesService do
project.add_maintainer(user)
project.add_developer(user2)
project.add_developer(user3)
merge_request.errors.clear
end
let(:service) { described_class.new(project: project, current_user: user, params: opts) }
......@@ -32,35 +34,53 @@ RSpec.describe MergeRequests::UpdateAssigneesService do
describe 'execute' do
def update_merge_request
service.execute(merge_request)
merge_request.reload
end
shared_examples 'removing all assignees' do
it 'removes all assignees' do
expect(update_merge_request).to have_attributes(assignees: be_empty, errors: be_none)
end
it 'enqueues the correct background work' do
expect_next(MergeRequests::HandleAssigneesChangeService, project: project, current_user: user) do |service|
expect(service)
.to receive(:async_execute)
.with(merge_request, [user3], execute_hooks: true)
end
update_merge_request
end
end
context 'when the parameters are valid' do
context 'when using sentinel values' do
context 'when using assignee_ids' do
let(:opts) { { assignee_ids: [0] } }
it 'removes all assignees' do
expect { update_merge_request }.to change(merge_request, :assignees).to([])
it_behaves_like 'removing all assignees'
end
context 'when using assignee_id' do
let(:opts) { { assignee_id: 0 } }
it_behaves_like 'removing all assignees'
end
end
context 'the assignee_ids parameter is the empty list' do
context 'when the assignee_ids parameter is the empty list' do
let(:opts) { { assignee_ids: [] } }
it 'removes all assignees' do
expect { update_merge_request }.to change(merge_request, :assignees).to([])
end
it_behaves_like 'removing all assignees'
end
it 'updates the MR, and queues the more expensive work for later' do
expect_next(MergeRequests::HandleAssigneesChangeService, project: project, current_user: user) do |service|
expect(service)
.to receive(:async_execute)
.with(merge_request, [user3], execute_hooks: true)
.to receive(:async_execute).with(merge_request, [user3], execute_hooks: true)
end
expect { update_merge_request }
.to change(merge_request, :assignees).to([user2])
.to change { merge_request.reload.assignees }.from([user3]).to([user2])
.and change(merge_request, :updated_at)
.and change(merge_request, :updated_by).to(user)
end
......@@ -68,7 +88,10 @@ RSpec.describe MergeRequests::UpdateAssigneesService do
it 'does not update the assignees if they do not have access' do
opts[:assignee_ids] = [create(:user).id]
expect { update_merge_request }.not_to change(merge_request, :assignee_ids)
expect(update_merge_request).to have_attributes(
assignees: [user3],
errors: be_any
)
end
it 'is more efficient than using the full update-service' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe MergeRequests::AssigneesChangeWorker do
include AfterNextHelpers
let_it_be(:merge_request) { create(:merge_request) }
let_it_be(:user) { create(:user) }
let_it_be(:old_assignees) { create_list(:user, 3) }
let(:user_ids) { old_assignees.map(&:id).to_a }
let(:worker) { described_class.new }
it_behaves_like 'an idempotent worker' do
let(:job_args) { [merge_request.id, user.id, user_ids] }
end
describe '#perform' do
context 'with a non-existing merge request' do
it 'does nothing' do
expect(::MergeRequests::HandleAssigneesChangeService).not_to receive(:new)
worker.perform(non_existing_record_id, user.id, user_ids)
end
end
context 'with a non-existing user' do
it 'does nothing' do
expect(::MergeRequests::HandleAssigneesChangeService).not_to receive(:new)
worker.perform(merge_request.id, non_existing_record_id, user_ids)
end
end
context 'when there are no changes' do
it 'does nothing' do
expect(::MergeRequests::HandleAssigneesChangeService).not_to receive(:new)
worker.perform(merge_request.id, user.id, merge_request.assignee_ids)
end
end
context 'when the old users cannot be found' do
it 'does nothing' do
expect(::MergeRequests::HandleAssigneesChangeService).not_to receive(:new)
worker.perform(merge_request.id, user.id, [non_existing_record_id])
end
end
it 'gets MergeRequests::UpdateAssigneesService to handle the changes' do
expect_next(::MergeRequests::HandleAssigneesChangeService)
.to receive(:execute).with(merge_request, match_array(old_assignees), execute_hooks: true)
worker.perform(merge_request.id, user.id, user_ids)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment