Commit cb6740ab authored by Bob Van Landuyt's avatar Bob Van Landuyt

Merge branch '345922_limit_pull_requests_importer' into 'master'

Add rate limiter for PullRequest importer

See merge request gitlab-org/gitlab!81026
parents 90c7f9f1 d5a7d219
---
name: spread_parallel_import
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/81026
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/353217
milestone: '14.9'
type: development
group: group::source code
default_enabled: false
......@@ -74,6 +74,10 @@ module Gitlab
{ state: 'all', sort: 'created', direction: 'asc' }
end
def parallel_import_batch
{ size: 200, delay: 1.minute }
end
def repository_updates_counter
@repository_updates_counter ||= Gitlab::Metrics.counter(
:github_importer_repository_updates,
......
......@@ -72,6 +72,14 @@ module Gitlab
# Imports all objects in parallel by scheduling a Sidekiq job for every
# individual object.
def parallel_import
if Feature.enabled?(:spread_parallel_import, default_enabled: :yaml) && parallel_import_batch.present?
spread_parallel_import
else
parallel_import_deprecated
end
end
def parallel_import_deprecated
waiter = JobWaiter.new
each_object_to_import do |object|
......@@ -86,6 +94,33 @@ module Gitlab
waiter
end
def spread_parallel_import
waiter = JobWaiter.new
import_arguments = []
each_object_to_import do |object|
repr = representation_class.from_api_response(object)
import_arguments << [project.id, repr.to_hash, waiter.key]
waiter.jobs_remaining += 1
end
# rubocop:disable Scalability/BulkPerformWithContext
Gitlab::ApplicationContext.with_context(project: project) do
sidekiq_worker_class.bulk_perform_in(
1.second,
import_arguments,
batch_size: parallel_import_batch[:size],
batch_delay: parallel_import_batch[:delay]
)
end
# rubocop:enable Scalability/BulkPerformWithContext
waiter
end
# The method that will be called for traversing through all the objects to
# import, yielding them to the supplied block.
def each_object_to_import
......@@ -171,6 +206,12 @@ module Gitlab
raise NotImplementedError
end
# Default batch settings for parallel import (can be redefined in Importer classes)
# Example: { size: 100, delay: 1.minute }
def parallel_import_batch
{}
end
def abort_on_failure
false
end
......
......@@ -104,8 +104,13 @@ RSpec.describe Gitlab::GithubImport::Importer::PullRequestsImporter do
.and_yield(pull_request)
expect(Gitlab::GithubImport::ImportPullRequestWorker)
.to receive(:perform_async)
.with(project.id, an_instance_of(Hash), an_instance_of(String))
.to receive(:bulk_perform_in)
.with(
1.second,
[[project.id, an_instance_of(Hash), an_instance_of(String)]],
batch_delay: 1.minute,
batch_size: 200
)
waiter = importer.parallel_import
......
......@@ -22,6 +22,10 @@ RSpec.describe Gitlab::GithubImport::ParallelScheduling do
def collection_method
:issues
end
def parallel_import_batch
{ size: 10, delay: 1.minute }
end
end
end
......@@ -254,35 +258,61 @@ RSpec.describe Gitlab::GithubImport::ParallelScheduling do
describe '#parallel_import' do
let(:importer) { importer_class.new(project, client) }
let(:repr_class) { double(:representation) }
let(:worker_class) { double(:worker) }
let(:object) { double(:object) }
let(:batch_size) { 200 }
let(:batch_delay) { 1.minute }
it 'imports data in parallel' do
repr_class = double(:representation)
worker_class = double(:worker)
object = double(:object)
expect(importer)
.to receive(:each_object_to_import)
.and_yield(object)
expect(importer)
before do
allow(importer)
.to receive(:representation_class)
.and_return(repr_class)
expect(importer)
allow(importer)
.to receive(:sidekiq_worker_class)
.and_return(worker_class)
expect(repr_class)
allow(repr_class)
.to receive(:from_api_response)
.with(object)
.and_return({ title: 'Foo' })
end
context 'with multiple objects' do
before do
allow(importer).to receive(:parallel_import_batch) { { size: batch_size, delay: batch_delay } }
expect(importer).to receive(:each_object_to_import).and_yield(object).and_yield(object).and_yield(object)
end
expect(worker_class)
.to receive(:perform_async)
.with(project.id, { title: 'Foo' }, an_instance_of(String))
it 'imports data in parallel batches with delays' do
expect(worker_class).to receive(:bulk_perform_in).with(1.second, [
[project.id, { title: 'Foo' }, an_instance_of(String)],
[project.id, { title: 'Foo' }, an_instance_of(String)],
[project.id, { title: 'Foo' }, an_instance_of(String)]
], batch_size: batch_size, batch_delay: batch_delay)
importer.parallel_import
end
end
expect(importer.parallel_import)
.to be_an_instance_of(Gitlab::JobWaiter)
context 'when FF is disabled' do
before do
stub_feature_flags(spread_parallel_import: false)
end
it 'imports data in parallel' do
expect(importer)
.to receive(:each_object_to_import)
.and_yield(object)
expect(worker_class)
.to receive(:perform_async)
.with(project.id, { title: 'Foo' }, an_instance_of(String))
expect(importer.parallel_import)
.to be_an_instance_of(Gitlab::JobWaiter)
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment