Commit b5042e53 authored by Sean McGivern's avatar Sean McGivern

Move NotificationService calls to Sidekiq

The NotificationService has to do quite a lot of work to calculate the
recipients for an email. Where possible, we should try to avoid doing this in an
HTTP request, because the mail are sent by Sidekiq anyway, so there's no need to
schedule those emails immediately.

This commit creates a generic Sidekiq worker that uses Global ID to serialise
and deserialise its arguments, then forwards them to the NotificationService.
The NotificationService gains an `#async` method, so you can replace:

    notification_service.new_issue(issue, current_user)

With:

    notification_service.async.new_issue(issue, current_user)

And have everything else work as normal, except that calculating the recipients
will be done by Sidekiq, which will then schedule further Sidekiq jobs to send
each email.
parent fd532302
......@@ -26,7 +26,7 @@ module Issues
issue.update(closed_by: current_user)
event_service.close_issue(issue, current_user)
create_note(issue, commit) if system_note
notification_service.close_issue(issue, current_user) if notifications
notification_service.async.close_issue(issue, current_user) if notifications
todo_service.close_issue(issue, current_user)
execute_hooks(issue, 'close')
invalidate_cache_counts(issue, users: issue.assignees)
......
......@@ -139,7 +139,7 @@ module Issues
end
def notify_participants
notification_service.issue_moved(@old_issue, @new_issue, @current_user)
notification_service.async.issue_moved(@old_issue, @new_issue, @current_user)
end
end
end
......@@ -6,7 +6,7 @@ module Issues
if issue.reopen
event_service.reopen_issue(issue, current_user)
create_note(issue, 'reopened')
notification_service.reopen_issue(issue, current_user)
notification_service.async.reopen_issue(issue, current_user)
execute_hooks(issue, 'reopen')
invalidate_cache_counts(issue, users: issue.assignees)
issue.update_project_counter_caches
......
......@@ -30,7 +30,7 @@ module Issues
if issue.assignees != old_assignees
create_assignee_note(issue, old_assignees)
notification_service.reassigned_issue(issue, current_user, old_assignees)
notification_service.async.reassigned_issue(issue, current_user, old_assignees)
todo_service.reassigned_issue(issue, current_user, old_assignees)
end
......@@ -41,13 +41,13 @@ module Issues
added_labels = issue.labels - old_labels
if added_labels.present?
notification_service.relabeled_issue(issue, added_labels, current_user)
notification_service.async.relabeled_issue(issue, added_labels, current_user)
end
added_mentions = issue.mentioned_users - old_mentioned_users
if added_mentions.present?
notification_service.new_mentions_in_issue(issue, added_mentions, current_user)
notification_service.async.new_mentions_in_issue(issue, added_mentions, current_user)
end
end
......
......@@ -10,7 +10,7 @@ module MergeRequests
if merge_request.close
create_event(merge_request)
create_note(merge_request)
notification_service.close_mr(merge_request, current_user)
notification_service.async.close_mr(merge_request, current_user)
todo_service.close_merge_request(merge_request, current_user)
execute_hooks(merge_request, 'close')
invalidate_cache_counts(merge_request, users: merge_request.assignees)
......
......@@ -6,7 +6,7 @@ module MergeRequests
if merge_request.reopen
create_event(merge_request)
create_note(merge_request, 'reopened')
notification_service.reopen_mr(merge_request, current_user)
notification_service.async.reopen_mr(merge_request, current_user)
execute_hooks(merge_request, 'reopen')
merge_request.reload_diff(current_user)
merge_request.mark_as_unchecked
......
......@@ -4,7 +4,7 @@ module MergeRequests
return unless merge_request.discussions_resolved?
SystemNoteService.resolve_all_discussions(merge_request, project, current_user)
notification_service.resolve_all_discussions(merge_request, current_user)
notification_service.async.resolve_all_discussions(merge_request, current_user)
end
end
end
......@@ -21,6 +21,7 @@ module MergeRequests
update(merge_request)
end
# rubocop:disable Metrics/AbcSize
def handle_changes(merge_request, options)
old_associations = options.fetch(:old_associations, {})
old_labels = old_associations.fetch(:labels, [])
......@@ -42,8 +43,11 @@ module MergeRequests
end
if merge_request.previous_changes.include?('assignee_id')
old_assignee_id = merge_request.previous_changes['assignee_id'].first
old_assignee = User.find(old_assignee_id) if old_assignee_id
create_assignee_note(merge_request)
notification_service.reassigned_merge_request(merge_request, current_user)
notification_service.async.reassigned_merge_request(merge_request, current_user, old_assignee)
todo_service.reassigned_merge_request(merge_request, current_user)
end
......@@ -54,7 +58,7 @@ module MergeRequests
added_labels = merge_request.labels - old_labels
if added_labels.present?
notification_service.relabeled_merge_request(
notification_service.async.relabeled_merge_request(
merge_request,
added_labels,
current_user
......@@ -63,13 +67,14 @@ module MergeRequests
added_mentions = merge_request.mentioned_users - old_mentioned_users
if added_mentions.present?
notification_service.new_mentions_in_merge_request(
notification_service.async.new_mentions_in_merge_request(
merge_request,
added_mentions,
current_user
)
end
end
# rubocop:enable Metrics/AbcSize
def merge_from_quick_action(merge_request)
last_diff_sha = params.delete(:merge)
......
......@@ -7,7 +7,32 @@
# Ex.
# NotificationService.new.new_issue(issue, current_user)
#
# When calculating the recipients of a notification is expensive (for instance,
# in the new issue case), `#async` will make that calculation happen in Sidekiq
# instead:
#
# NotificationService.new.async.new_issue(issue, current_user)
#
class NotificationService
class Async
attr_reader :parent
delegate :respond_to_missing, to: :parent
def initialize(parent)
@parent = parent
end
def method_missing(meth, *args)
return super unless parent.respond_to?(meth)
MailScheduler::NotificationServiceWorker.perform_async(meth.to_s, *args)
end
end
def async
@async ||= Async.new(self)
end
# Always notify user about ssh key added
# only if ssh key is not deploy key
#
......@@ -142,8 +167,23 @@ class NotificationService
# * merge_request assignee if their notification level is not Disabled
# * users with custom level checked with "reassign merge request"
#
def reassigned_merge_request(merge_request, current_user)
reassign_resource_email(merge_request, current_user, :reassigned_merge_request_email)
def reassigned_merge_request(merge_request, current_user, previous_assignee)
recipients = NotificationRecipientService.build_recipients(
merge_request,
current_user,
action: "reassign",
previous_assignee: previous_assignee
)
recipients.each do |recipient|
mailer.reassigned_merge_request_email(
recipient.user.id,
merge_request.id,
previous_assignee&.id,
current_user.id,
recipient.reason
).deliver_later
end
end
# When we add labels to a merge request we should send an email to:
......@@ -421,29 +461,6 @@ class NotificationService
end
end
def reassign_resource_email(target, current_user, method)
previous_assignee_id = previous_record(target, 'assignee_id')
previous_assignee = User.find_by(id: previous_assignee_id) if previous_assignee_id
recipients = NotificationRecipientService.build_recipients(
target,
current_user,
action: "reassign",
previous_assignee: previous_assignee
)
recipients.each do |recipient|
mailer.send(
method,
recipient.user.id,
target.id,
previous_assignee_id,
current_user.id,
recipient.reason
).deliver_later
end
end
def relabeled_resource_email(target, labels, current_user, method)
recipients = labels.flat_map { |l| l.subscribers(target.project) }.uniq
recipients = notifiable_users(
......@@ -471,14 +488,6 @@ class NotificationService
Notify
end
def previous_record(object, attribute)
return unless object && attribute
if object.previous_changes.include?(attribute)
object.previous_changes[attribute].first
end
end
private
def recipients_for_pages_domain(domain)
......
......@@ -41,6 +41,7 @@
- github_importer:github_import_stage_import_repository
- mail_scheduler:mail_scheduler_issue_due
- mail_scheduler:mail_scheduler_notification_service
- object_storage_upload
- object_storage:object_storage_background_move
......
......@@ -4,4 +4,8 @@ module MailSchedulerQueue
included do
queue_namespace :mail_scheduler
end
def notification_service
@notification_service ||= NotificationService.new
end
end
......@@ -4,8 +4,6 @@ module MailScheduler
include MailSchedulerQueue
def perform(project_id)
notification_service = NotificationService.new
Issue.opened.due_tomorrow.in_projects(project_id).preload(:project).find_each do |issue|
notification_service.issue_due(issue)
end
......
require 'active_job/arguments'
module MailScheduler
class NotificationServiceWorker
include ApplicationWorker
include MailSchedulerQueue
def perform(meth, *args)
deserialized_args = ActiveJob::Arguments.deserialize(args)
notification_service.public_send(meth, *deserialized_args) # rubocop:disable GitlabSecurity/PublicSend
rescue ActiveJob::DeserializationError
end
def self.perform_async(*args)
super(*ActiveJob::Arguments.serialize(args))
end
end
end
---
title: Compute notification recipients in background jobs
merge_request:
author:
type: performance
......@@ -96,6 +96,37 @@ describe NotificationService, :mailer do
it_should_behave_like 'participating by assignee notification'
end
describe '#async' do
let(:async) { notification.async }
set(:key) { create(:personal_key) }
it 'returns an Async object with the correct parent' do
expect(async).to be_a(described_class::Async)
expect(async.parent).to eq(notification)
end
context 'when receiving a public method' do
it 'schedules a MailScheduler::NotificationServiceWorker' do
expect(MailScheduler::NotificationServiceWorker)
.to receive(:perform_async).with('new_key', key)
async.new_key(key)
end
end
context 'when receiving a private method' do
it 'raises NoMethodError' do
expect { async.notifiable?(key) }.to raise_error(NoMethodError)
end
end
context 'when recieving a non-existent method' do
it 'raises NoMethodError' do
expect { async.foo(key) }.to raise_error(NoMethodError)
end
end
end
describe 'Keys' do
describe '#new_key' do
let(:key_options) { {} }
......@@ -982,6 +1013,8 @@ describe NotificationService, :mailer do
let(:merge_request) { create :merge_request, source_project: project, assignee: create(:user), description: 'cc @participant' }
before do
project.add_master(merge_request.author)
project.add_master(merge_request.assignee)
build_team(merge_request.target_project)
add_users_with_subscription(merge_request.target_project, merge_request)
update_custom_notification(:new_merge_request, @u_guest_custom, resource: project)
......@@ -1093,15 +1126,18 @@ describe NotificationService, :mailer do
end
describe '#reassigned_merge_request' do
let(:current_user) { create(:user) }
before do
update_custom_notification(:reassign_merge_request, @u_guest_custom, resource: project)
update_custom_notification(:reassign_merge_request, @u_custom_global)
end
it do
notification.reassigned_merge_request(merge_request, merge_request.author)
notification.reassigned_merge_request(merge_request, current_user, merge_request.author)
should_email(merge_request.assignee)
should_email(merge_request.author)
should_email(@u_watcher)
should_email(@u_participant_mentioned)
should_email(@subscriber)
......@@ -1116,7 +1152,7 @@ describe NotificationService, :mailer do
end
it 'adds "assigned" reason for new assignee' do
notification.reassigned_merge_request(merge_request, merge_request.author)
notification.reassigned_merge_request(merge_request, current_user, merge_request.author)
email = find_email_for(merge_request.assignee)
......@@ -1126,7 +1162,7 @@ describe NotificationService, :mailer do
it_behaves_like 'participating notifications' do
let(:participant) { create(:user, username: 'user-participant') }
let(:issuable) { merge_request }
let(:notification_trigger) { notification.reassigned_merge_request(merge_request, @u_disabled) }
let(:notification_trigger) { notification.reassigned_merge_request(merge_request, current_user, merge_request.author) }
end
end
......
......@@ -12,8 +12,8 @@ describe MailScheduler::IssueDueWorker do
create(:issue, :opened, project: project, due_date: 2.days.from_now) # due on another day
create(:issue, :opened, due_date: Date.tomorrow) # different project
expect_any_instance_of(NotificationService).to receive(:issue_due).with(issue1)
expect_any_instance_of(NotificationService).to receive(:issue_due).with(issue2)
expect(worker.notification_service).to receive(:issue_due).with(issue1)
expect(worker.notification_service).to receive(:issue_due).with(issue2)
worker.perform(project.id)
end
......
require 'spec_helper'
describe MailScheduler::NotificationServiceWorker do
let(:worker) { described_class.new }
let(:method) { 'new_key' }
set(:key) { create(:personal_key) }
def serialize(*args)
ActiveJob::Arguments.serialize(args)
end
describe '#perform' do
it 'deserializes arguments from global IDs' do
expect(worker.notification_service).to receive(method).with(key)
worker.perform(method, *serialize(key))
end
context 'when the arguments cannot be deserialized' do
it 'does nothing' do
expect(worker.notification_service).not_to receive(method)
worker.perform(method, key.to_global_id.to_s.succ)
end
end
context 'when the method is not a public method' do
it 'raises NoMethodError' do
expect { worker.perform('notifiable?', *serialize(key)) }.to raise_error(NoMethodError)
end
end
end
describe '.perform_async' do
it 'serializes arguments as global IDs when scheduling' do
Sidekiq::Testing.fake! do
described_class.perform_async(method, key)
expect(described_class.jobs.count).to eq(1)
expect(described_class.jobs.first).to include('args' => [method, *serialize(key)])
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment