Commit 163ad707 authored by Arturo Herrero's avatar Arturo Herrero

Propagate integrations using batching and queues

This is an important change in the architecture to propagate
integrations. We can now propagate instance-level integrations and
templates using batching and Sidekiq queues.

The problem before is the performance of the worst-case scenario, where
if there are no matching records and the anti-join. With the new
approach, each job in the new queues handles a batch of projects/groups;
rather than having a single job for all of them.

This is what we do right now in the complex case of propagating an
instance-level integration:
- Update inherited integrations,
- Create integration for all projects without integration.
- Create integration for all groups without integration.

BEFORE:

                 Save integration
                        ↓
            ┌┬───────────────────────┬┐
            │| Propagate integration |│
            └┴───────────────────────┴┘
                        ↓
           Update inherited integrations
  Create integration for all projects without integration
  Create integration for all groups without integration

AFTER:

                 Save integration
                        ↓
            ┌┬───────────────────────┬┐
            │| Propagate integration |│
            └┴───────────────────────┴┘
               ↓                   ↓
  ┌┬─────────────────────┬┐ ┌┬───────────────────────┬┐
  │| Propagate to groups |│ │| Propagate to projects |│
  └┴─────────────────────┴┘ └┴───────────────────────┴┘
            Update inherited integrations
parent 7d27bb49
......@@ -16,7 +16,7 @@ module Integration
Project.where(id: custom_integration_project_ids)
end
def ids_without_integration(integration, limit)
def without_integration(integration)
services = Service
.select('1')
.where('services.project_id = projects.id')
......@@ -26,8 +26,6 @@ module Integration
.where('NOT EXISTS (?)', services)
.where(pending_delete: false)
.where(archived: false)
.limit(limit)
.pluck(:id)
end
end
end
......@@ -15,6 +15,7 @@ class Group < Namespace
include WithUploads
include Gitlab::Utils::StrongMemoize
include GroupAPICompatibility
include EachBatch
ACCESS_REQUEST_APPROVERS_TO_BE_NOTIFIED_LIMIT = 10
......@@ -140,6 +141,15 @@ class Group < Namespace
end
end
def without_integration(integration)
services = Service
.select('1')
.where('services.group_id = namespaces.id')
.where(type: integration.type)
where('NOT EXISTS (?)', services)
end
private
def public_to_user_arel(user)
......
......@@ -33,6 +33,7 @@ class Project < ApplicationRecord
include FromUnion
include IgnorableColumns
include Integration
include EachBatch
extend Gitlab::Cache::RequestCache
extend Gitlab::ConfigHelper
......
......@@ -5,10 +5,9 @@ module Admin
include PropagateService
def propagate
update_inherited_integrations
create_integration_for_groups_without_integration if Feature.enabled?(:group_level_integrations)
create_integration_for_projects_without_integration
update_inherited_integrations
end
private
......@@ -33,7 +32,7 @@ module Admin
Service.transaction do
batch.update_all(service_hash)
if data_fields_present?
if integration.data_fields_present?
integration.data_fields.class.where(service_id: batch_ids).update_all(data_fields_hash)
end
end
......@@ -41,32 +40,18 @@ module Admin
# rubocop: enable CodeReuse/ActiveRecord
def create_integration_for_groups_without_integration
loop do
batch = Group.uncached { group_ids_without_integration(integration, BATCH_SIZE) }
bulk_create_from_integration(batch, 'group') unless batch.empty?
break if batch.size < BATCH_SIZE
Group.without_integration(integration).each_batch(of: BATCH_SIZE) do |groups|
min_id, max_id = groups.pick("MIN(namespaces.id), MAX(namespaces.id)")
PropagateIntegrationGroupWorker.perform_async(integration.id, min_id, max_id)
end
end
def service_hash
@service_hash ||= integration.to_service_hash
.tap { |json| json['inherit_from_id'] = integration.id }
integration.to_service_hash.tap { |json| json['inherit_from_id'] = integration.id }
end
# rubocop:disable CodeReuse/ActiveRecord
def group_ids_without_integration(integration, limit)
services = Service
.select('1')
.where('services.group_id = namespaces.id')
.where(type: integration.type)
Group
.where('NOT EXISTS (?)', services)
.limit(limit)
.pluck(:id)
def data_fields_hash
integration.to_data_fields_hash
end
# rubocop:enable CodeReuse/ActiveRecord
end
end
......@@ -9,11 +9,5 @@ module Admin
create_integration_for_projects_without_integration
end
private
def service_hash
@service_hash ||= integration.to_service_hash
end
end
end
# frozen_string_literal: true
class BulkCreateIntegrationService
def initialize(integration, batch_ids, association)
@integration = integration
@batch_ids = batch_ids
@association = association
end
def execute
service_list = ServiceList.new(batch_ids, service_hash, association).to_array
Service.transaction do
results = bulk_insert(*service_list)
if integration.data_fields_present?
data_list = DataList.new(results, data_fields_hash, integration.data_fields.class).to_array
bulk_insert(*data_list)
end
run_callbacks(batch_ids) if association == 'project'
end
end
private
attr_reader :integration, :batch_ids, :association
def bulk_insert(klass, columns, values_array)
items_to_insert = values_array.map { |array| Hash[columns.zip(array)] }
klass.insert_all(items_to_insert, returning: [:id])
end
# rubocop: disable CodeReuse/ActiveRecord
def run_callbacks(batch_ids)
if integration.issue_tracker?
Project.where(id: batch_ids).update_all(has_external_issue_tracker: true)
end
if integration.type == 'ExternalWikiService'
Project.where(id: batch_ids).update_all(has_external_wiki: true)
end
end
# rubocop: enable CodeReuse/ActiveRecord
def service_hash
if integration.template?
integration.to_service_hash
else
integration.to_service_hash.tap { |json| json['inherit_from_id'] = integration.id }
end
end
def data_fields_hash
integration.to_data_fields_hash
end
end
......@@ -6,8 +6,6 @@ module Admin
BATCH_SIZE = 100
delegate :data_fields_present?, to: :integration
class_methods do
def propagate(integration)
new(integration).propagate
......@@ -23,51 +21,10 @@ module Admin
attr_reader :integration
def create_integration_for_projects_without_integration
loop do
batch_ids = Project.uncached { Project.ids_without_integration(integration, BATCH_SIZE) }
bulk_create_from_integration(batch_ids, 'project') unless batch_ids.empty?
break if batch_ids.size < BATCH_SIZE
end
end
def bulk_create_from_integration(batch_ids, association)
service_list = ServiceList.new(batch_ids, service_hash, association).to_array
Service.transaction do
results = bulk_insert(*service_list)
if data_fields_present?
data_list = DataList.new(results, data_fields_hash, integration.data_fields.class).to_array
bulk_insert(*data_list)
end
run_callbacks(batch_ids) if association == 'project'
Project.without_integration(integration).each_batch(of: BATCH_SIZE) do |projects|
min_id, max_id = projects.pick("MIN(projects.id), MAX(projects.id)")
PropagateIntegrationProjectWorker.perform_async(integration.id, min_id, max_id)
end
end
def bulk_insert(klass, columns, values_array)
items_to_insert = values_array.map { |array| Hash[columns.zip(array)] }
klass.insert_all(items_to_insert, returning: [:id])
end
# rubocop: disable CodeReuse/ActiveRecord
def run_callbacks(batch_ids)
if integration.issue_tracker?
Project.where(id: batch_ids).update_all(has_external_issue_tracker: true)
end
if integration.type == 'ExternalWikiService'
Project.where(id: batch_ids).update_all(has_external_wiki: true)
end
end
# rubocop: enable CodeReuse/ActiveRecord
def data_fields_hash
@data_fields_hash ||= integration.to_data_fields_hash
end
end
end
......@@ -1716,6 +1716,22 @@
:weight: 1
:idempotent: true
:tags: []
- :name: propagate_integration_group
:feature_category: :integrations
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: propagate_integration_project
:feature_category: :integrations
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: propagate_service_template
:feature_category: :integrations
:has_external_dependencies:
......
# frozen_string_literal: true
class PropagateIntegrationGroupWorker
include ApplicationWorker
feature_category :integrations
idempotent!
loggable_arguments 1
# rubocop: disable CodeReuse/ActiveRecord
def perform(integration_id, min_id, max_id)
integration = Service.find(integration_id)
batch_ids = Group.where(id: min_id..max_id).without_integration(integration).pluck(:id)
BulkCreateIntegrationService.new(integration, batch_ids, 'group').execute
end
# rubocop: enable CodeReuse/ActiveRecord
end
# frozen_string_literal: true
class PropagateIntegrationProjectWorker
include ApplicationWorker
feature_category :integrations
idempotent!
loggable_arguments 1
# rubocop: disable CodeReuse/ActiveRecord
def perform(integration_id, min_id, max_id)
integration = Service.find(integration_id)
batch_ids = Project.where(id: min_id..max_id).without_integration(integration).pluck(:id)
BulkCreateIntegrationService.new(integration, batch_ids, 'project').execute
end
# rubocop: enable CodeReuse/ActiveRecord
end
......@@ -232,6 +232,10 @@
- 1
- - propagate_integration
- 1
- - propagate_integration_group
- 1
- - propagate_integration_project
- 1
- - propagate_service_template
- 1
- - reactive_caching
......
......@@ -224,6 +224,20 @@ RSpec.describe Group do
end
end
describe '.without_integration' do
let(:another_group) { create(:group) }
let(:instance_integration) { build(:jira_service, :instance) }
before do
create(:jira_service, group: group, project: nil)
create(:slack_service, group: another_group, project: nil)
end
it 'returns groups without integration' do
expect(Group.without_integration(instance_integration)).to contain_exactly(another_group)
end
end
describe '.public_or_visible_to_user' do
let!(:private_group) { create(:group, :private) }
let!(:internal_group) { create(:group, :internal) }
......
......@@ -11,18 +11,18 @@ RSpec.describe Integration do
before do
create(:jira_service, project: project_1, inherit_from_id: instance_integration.id)
create(:jira_service, project: project_2, inherit_from_id: nil)
create(:slack_service, project: project_1, inherit_from_id: nil)
create(:slack_service, project: project_3, inherit_from_id: nil)
end
describe '#with_custom_integration_for' do
describe '.with_custom_integration_for' do
it 'returns projects with custom integrations' do
expect(Project.with_custom_integration_for(instance_integration)).to contain_exactly(project_2)
end
end
describe '#ids_without_integration' do
it 'returns projects ids without an integration' do
expect(Project.ids_without_integration(instance_integration, 100)).to contain_exactly(project_3.id)
describe '.without_integration' do
it 'returns projects without integration' do
expect(Project.without_integration(instance_integration)).to contain_exactly(project_3)
end
end
end
......@@ -10,9 +10,8 @@ RSpec.describe Admin::PropagateIntegrationService do
stub_jira_service_test
end
let(:excluded_attributes) { %w[id project_id group_id inherit_from_id instance created_at updated_at default] }
let!(:project) { create(:project) }
let!(:group) { create(:group) }
let_it_be(:project) { create(:project) }
let(:excluded_attributes) { %w[id project_id group_id inherit_from_id instance template created_at updated_at] }
let!(:instance_integration) do
JiraService.create!(
instance: true,
......@@ -39,7 +38,7 @@ RSpec.describe Admin::PropagateIntegrationService do
let!(:not_inherited_integration) do
JiraService.create!(
project: create(:project),
project: project,
inherit_from_id: nil,
instance: false,
active: true,
......@@ -52,7 +51,7 @@ RSpec.describe Admin::PropagateIntegrationService do
let!(:different_type_inherited_integration) do
BambooService.create!(
project: create(:project),
project: project,
inherit_from_id: instance_integration.id,
instance: false,
active: true,
......@@ -64,8 +63,10 @@ RSpec.describe Admin::PropagateIntegrationService do
)
end
shared_examples 'inherits settings from integration' do
it 'updates the inherited integrations' do
context 'with inherited integration' do
let(:integration) { inherited_integration }
it 'updates the integration' do
described_class.propagate(instance_integration)
expect(integration.reload.inherit_from_id).to eq(instance_integration.id)
......@@ -73,10 +74,10 @@ RSpec.describe Admin::PropagateIntegrationService do
.to eq(instance_integration.attributes.except(*excluded_attributes))
end
context 'integration with data fields' do
context 'with integration with data fields' do
let(:excluded_attributes) { %w[id service_id created_at updated_at] }
it 'updates the data fields from inherited integrations' do
it 'updates the data fields from the integration' do
described_class.propagate(instance_integration)
expect(integration.reload.data_fields.attributes.except(*excluded_attributes))
......@@ -85,54 +86,44 @@ RSpec.describe Admin::PropagateIntegrationService do
end
end
shared_examples 'does not inherit settings from integration' do
it 'does not update the not inherited integrations' do
described_class.propagate(instance_integration)
context 'with not inherited integration' do
let(:integration) { not_inherited_integration }
expect(integration.reload.attributes.except(*excluded_attributes))
.not_to eq(instance_integration.attributes.except(*excluded_attributes))
it 'does not update the integration' do
expect { described_class.propagate(instance_integration) }
.not_to change { instance_integration.attributes.except(*excluded_attributes) }
end
end
context 'update only inherited integrations' do
it_behaves_like 'inherits settings from integration' do
let(:integration) { inherited_integration }
end
context 'with different type inherited integration' do
let(:integration) { different_type_inherited_integration }
it_behaves_like 'does not inherit settings from integration' do
let(:integration) { not_inherited_integration }
it 'does not update the integration' do
expect { described_class.propagate(instance_integration) }
.not_to change { instance_integration.attributes.except(*excluded_attributes) }
end
end
it_behaves_like 'does not inherit settings from integration' do
let(:integration) { different_type_inherited_integration }
end
context 'with a project without integration' do
let!(:another_project) { create(:project) }
it_behaves_like 'inherits settings from integration' do
let(:integration) { project.jira_service }
end
it 'calls to PropagateIntegrationProjectWorker' do
expect(PropagateIntegrationProjectWorker).to receive(:perform_async)
.with(instance_integration.id, another_project.id, another_project.id)
it_behaves_like 'inherits settings from integration' do
let(:integration) { Service.find_by(group_id: group.id) }
described_class.propagate(instance_integration)
end
end
it 'updates project#has_external_issue_tracker for issue tracker services' do
described_class.propagate(instance_integration)
context 'with a group without integration' do
let!(:group) { create(:group) }
expect(project.reload.has_external_issue_tracker).to eq(true)
end
it 'calls to PropagateIntegrationProjectWorker' do
expect(PropagateIntegrationGroupWorker).to receive(:perform_async)
.with(instance_integration.id, group.id, group.id)
it 'updates project#has_external_wiki for external wiki services' do
instance_integration = ExternalWikiService.create!(
instance: true,
active: true,
push_events: false,
external_wiki_url: 'http://external-wiki-url.com'
)
described_class.propagate(instance_integration)
expect(project.reload.has_external_wiki).to eq(true)
described_class.propagate(instance_integration)
end
end
end
end
......@@ -4,6 +4,7 @@ require 'spec_helper'
RSpec.describe Admin::PropagateServiceTemplate do
describe '.propagate' do
let_it_be(:project) { create(:project) }
let!(:service_template) do
PushoverService.create!(
template: true,
......@@ -19,124 +20,40 @@ RSpec.describe Admin::PropagateServiceTemplate do
)
end
let!(:project) { create(:project) }
let(:excluded_attributes) { %w[id project_id template created_at updated_at default] }
it 'creates services for projects' do
expect(project.pushover_service).to be_nil
described_class.propagate(service_template)
expect(project.reload.pushover_service).to be_present
end
it 'creates services for a project that has another service' do
BambooService.create!(
active: true,
project: project,
properties: {
bamboo_url: 'http://gitlab.com',
username: 'mic',
password: 'password',
build_key: 'build'
}
)
expect(project.pushover_service).to be_nil
it 'calls to PropagateIntegrationProjectWorker' do
expect(PropagateIntegrationProjectWorker).to receive(:perform_async)
.with(service_template.id, project.id, project.id)
described_class.propagate(service_template)
expect(project.reload.pushover_service).to be_present
end
it 'does not create the service if it exists already' do
other_service = BambooService.create!(
template: true,
active: true,
properties: {
bamboo_url: 'http://gitlab.com',
username: 'mic',
password: 'password',
build_key: 'build'
}
)
Service.build_from_integration(service_template, project_id: project.id).save!
Service.build_from_integration(other_service, project_id: project.id).save!
expect { described_class.propagate(service_template) }
.not_to change { Service.count }
end
it 'creates the service containing the template attributes' do
described_class.propagate(service_template)
expect(project.pushover_service.properties).to eq(service_template.properties)
expect(project.pushover_service.attributes.except(*excluded_attributes))
.to eq(service_template.attributes.except(*excluded_attributes))
end
context 'service with data fields' do
include JiraServiceHelper
let(:service_template) do
stub_jira_service_test
JiraService.create!(
template: true,
context 'with a project that has another service' do
before do
BambooService.create!(
active: true,
push_events: false,
url: 'http://jira.instance.com',
username: 'user',
password: 'secret'
project: project,
properties: {
bamboo_url: 'http://gitlab.com',
username: 'mic',
password: 'password',
build_key: 'build'
}
)
end
it 'creates the service containing the template attributes' do
described_class.propagate(service_template)
expect(project.jira_service.attributes.except(*excluded_attributes))
.to eq(service_template.attributes.except(*excluded_attributes))
excluded_attributes = %w[id service_id created_at updated_at]
expect(project.jira_service.data_fields.attributes.except(*excluded_attributes))
.to eq(service_template.data_fields.attributes.except(*excluded_attributes))
end
end
describe 'bulk update', :use_sql_query_cache do
let(:project_total) { 5 }
before do
stub_const('Admin::PropagateServiceTemplate::BATCH_SIZE', 3)
project_total.times { create(:project) }
it 'calls to PropagateIntegrationProjectWorker' do
expect(PropagateIntegrationProjectWorker).to receive(:perform_async)
.with(service_template.id, project.id, project.id)
described_class.propagate(service_template)
end
it 'creates services for all projects' do
expect(Service.all.reload.count).to eq(project_total + 2)
end
end
describe 'external tracker' do
it 'updates the project external tracker' do
service_template.update!(category: 'issue_tracker')
expect { described_class.propagate(service_template) }
.to change { project.reload.has_external_issue_tracker }.to(true)
end
end
describe 'external wiki' do
it 'updates the project external tracker' do
service_template.update!(type: 'ExternalWikiService')
it 'does not create the service if it exists already' do
Service.build_from_integration(service_template, project_id: project.id).save!
expect { described_class.propagate(service_template) }
.to change { project.reload.has_external_wiki }.to(true)
end
expect { described_class.propagate(service_template) }
.not_to change { Service.count }
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkCreateIntegrationService do
include JiraServiceHelper
before do
stub_jira_service_test
end
let(:excluded_attributes) { %w[id project_id group_id inherit_from_id instance template created_at updated_at] }
let!(:instance_integration) { create(:jira_service, :instance) }
let!(:template_integration) { create(:jira_service, :template) }
shared_examples 'creates integration from batch ids' do
it 'updates the inherited integrations' do
described_class.new(integration, batch_ids, association).execute
expect(created_integration.attributes.except(*excluded_attributes))
.to eq(integration.attributes.except(*excluded_attributes))
end
context 'integration with data fields' do
let(:excluded_attributes) { %w[id service_id created_at updated_at] }
it 'updates the data fields from inherited integrations' do
described_class.new(integration, batch_ids, association).execute
expect(created_integration.reload.data_fields.attributes.except(*excluded_attributes))
.to eq(integration.data_fields.attributes.except(*excluded_attributes))
end
end
end
shared_examples 'updates inherit_from_id' do
it 'updates inherit_from_id attributes' do
described_class.new(integration, batch_ids, association).execute
expect(created_integration.reload.inherit_from_id).to eq(integration.id)
end
end
shared_examples 'runs project callbacks' do
it 'updates projects#has_external_issue_tracker for issue tracker services' do
described_class.new(integration, batch_ids, association).execute
expect(project.reload.has_external_issue_tracker).to eq(true)
end
context 'with an external wiki integration' do
let(:integration) do
ExternalWikiService.create!(
instance: true,
active: true,
push_events: false,
external_wiki_url: 'http://external-wiki-url.com'
)
end
it 'updates projects#has_external_wiki for external wiki services' do
described_class.new(integration, batch_ids, association).execute
expect(project.reload.has_external_wiki).to eq(true)
end
end
end
context 'with an instance-level integration' do
let(:integration) { instance_integration }
context 'with a project association' do
let!(:project) { create(:project) }
let(:created_integration) { project.jira_service }
let(:batch_ids) { [project.id] }
let(:association) { 'project' }
it_behaves_like 'creates integration from batch ids'
it_behaves_like 'updates inherit_from_id'
it_behaves_like 'runs project callbacks'
end
context 'with a group association' do
let!(:group) { create(:group) }
let(:created_integration) { Service.find_by(group: group) }
let(:batch_ids) { [group.id] }
let(:association) { 'group' }
it_behaves_like 'creates integration from batch ids'
it_behaves_like 'updates inherit_from_id'
end
end
context 'with a template integration' do
let(:integration) { template_integration }
context 'with a project association' do
let!(:project) { create(:project) }
let(:created_integration) { project.jira_service }
let(:batch_ids) { [project.id] }
let(:association) { 'project' }
it_behaves_like 'creates integration from batch ids'
it_behaves_like 'runs project callbacks'
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe PropagateIntegrationGroupWorker do
describe '#perform' do
let!(:group1) { create(:group) }
let!(:group2) { create(:group) }
let!(:integration) { create(:redmine_service, :instance) }
it 'calls to BulkCreateIntegrationService' do
expect(BulkCreateIntegrationService).to receive(:new)
.with(integration, [group1.id, group2.id], 'group')
.and_return(double(execute: nil))
subject.perform(integration.id, group1.id, group2.id)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe PropagateIntegrationProjectWorker do
describe '#perform' do
let!(:project1) { create(:project) }
let!(:project2) { create(:project) }
let!(:integration) { create(:redmine_service, :instance) }
it 'calls to BulkCreateIntegrationService' do
expect(BulkCreateIntegrationService).to receive(:new)
.with(integration, [project1.id, project2.id], 'project')
.and_return(double(execute: nil))
subject.perform(integration.id, project1.id, project2.id)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment