Commit 527e6d26 authored by Kassio Borges's avatar Kassio Borges

BulkImports: Use Pipeline::Context as Pipeline attribute

Reduce the boilerplate of passing the context as parameter to a lot of
internal methods in the Pipeline::Runner.

Related to: https://gitlab.com/gitlab-org/gitlab/-/issues/321291
parent e2309055
...@@ -15,7 +15,7 @@ module EE ...@@ -15,7 +15,7 @@ module EE
loader EE::BulkImports::Groups::Loaders::EpicsLoader loader EE::BulkImports::Groups::Loaders::EpicsLoader
def after_run(context, extracted_data) def after_run(extracted_data)
context.entity.update_tracker_for( context.entity.update_tracker_for(
relation: :epics, relation: :epics,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
...@@ -23,7 +23,7 @@ module EE ...@@ -23,7 +23,7 @@ module EE
) )
if extracted_data.has_next_page? if extracted_data.has_next_page?
run(context) run
end end
end end
end end
......
...@@ -20,6 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -20,6 +20,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
let(:context) { BulkImports::Pipeline::Context.new(entity) } let(:context) { BulkImports::Pipeline::Context.new(entity) }
subject { described_class.new(context) }
before do before do
stub_licensed_features(epics: true) stub_licensed_features(epics: true)
group.add_owner(user) group.add_owner(user)
...@@ -36,7 +38,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -36,7 +38,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
.and_return(first_page, last_page) .and_return(first_page, last_page)
end end
expect { subject.run(context) }.to change(::Epic, :count).by(2) expect { subject.run }.to change(::Epic, :count).by(2)
end end
end end
...@@ -45,9 +47,9 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -45,9 +47,9 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
it 'updates tracker information and runs pipeline again' do it 'updates tracker information and runs pipeline again' do
data = extractor_data(has_next_page: true, cursor: cursor) data = extractor_data(has_next_page: true, cursor: cursor)
expect(subject).to receive(:run).with(context) expect(subject).to receive(:run)
subject.after_run(context, data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :epics) tracker = entity.trackers.find_by(relation: :epics)
...@@ -60,9 +62,9 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -60,9 +62,9 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
it 'updates tracker information and does not run pipeline' do it 'updates tracker information and does not run pipeline' do
data = extractor_data(has_next_page: false) data = extractor_data(has_next_page: false)
expect(subject).not_to receive(:run).with(context) expect(subject).not_to receive(:run)
subject.after_run(context, data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :epics) tracker = entity.trackers.find_by(relation: :epics)
......
...@@ -30,8 +30,8 @@ RSpec.describe BulkImports::Importers::GroupImporter do ...@@ -30,8 +30,8 @@ RSpec.describe BulkImports::Importers::GroupImporter do
end end
def expect_to_run_pipeline(klass, context:) def expect_to_run_pipeline(klass, context:)
expect_next_instance_of(klass) do |pipeline| expect_next_instance_of(klass, context) do |pipeline|
expect(pipeline).to receive(:run).with(context) expect(pipeline).to receive(:run)
end end
end end
end end
...@@ -13,7 +13,7 @@ module BulkImports ...@@ -13,7 +13,7 @@ module BulkImports
loader BulkImports::Groups::Loaders::LabelsLoader loader BulkImports::Groups::Loaders::LabelsLoader
def after_run(context, extracted_data) def after_run(extracted_data)
context.entity.update_tracker_for( context.entity.update_tracker_for(
relation: :labels, relation: :labels,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
...@@ -21,7 +21,7 @@ module BulkImports ...@@ -21,7 +21,7 @@ module BulkImports
) )
if extracted_data.has_next_page? if extracted_data.has_next_page?
run(context) run
end end
end end
end end
......
...@@ -14,7 +14,7 @@ module BulkImports ...@@ -14,7 +14,7 @@ module BulkImports
loader BulkImports::Groups::Loaders::MembersLoader loader BulkImports::Groups::Loaders::MembersLoader
def after_run(context, extracted_data) def after_run(extracted_data)
context.entity.update_tracker_for( context.entity.update_tracker_for(
relation: :group_members, relation: :group_members,
has_next_page: extracted_data.has_next_page?, has_next_page: extracted_data.has_next_page?,
...@@ -22,7 +22,7 @@ module BulkImports ...@@ -22,7 +22,7 @@ module BulkImports
) )
if extracted_data.has_next_page? if extracted_data.has_next_page?
run(context) run
end end
end end
end end
......
...@@ -10,7 +10,7 @@ module BulkImports ...@@ -10,7 +10,7 @@ module BulkImports
def execute def execute
context = BulkImports::Pipeline::Context.new(entity) context = BulkImports::Pipeline::Context.new(entity)
pipelines.each { |pipeline| pipeline.new.run(context) } pipelines.each { |pipeline| pipeline.new(context).run }
entity.finish! entity.finish!
end end
......
...@@ -4,12 +4,17 @@ module BulkImports ...@@ -4,12 +4,17 @@ module BulkImports
module Pipeline module Pipeline
extend ActiveSupport::Concern extend ActiveSupport::Concern
include Gitlab::ClassAttributes include Gitlab::ClassAttributes
include Runner
included do def initialize(context)
include Runner @context = context
end
included do
private private
attr_reader :context
def extractor def extractor
@extractor ||= instantiate(self.class.get_extractor) @extractor ||= instantiate(self.class.get_extractor)
end end
......
...@@ -7,78 +7,78 @@ module BulkImports ...@@ -7,78 +7,78 @@ module BulkImports
MarkedAsFailedError = Class.new(StandardError) MarkedAsFailedError = Class.new(StandardError)
def run(context) def run
raise MarkedAsFailedError if marked_as_failed?(context) raise MarkedAsFailedError if marked_as_failed?
info(context, message: 'Pipeline started') info(message: 'Pipeline started')
extracted_data = extracted_data_from(context) extracted_data = extracted_data_from
extracted_data&.each do |entry| extracted_data&.each do |entry|
transformers.each do |transformer| transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name, context) do entry = run_pipeline_step(:transformer, transformer.class.name) do
transformer.transform(context, entry) transformer.transform(context, entry)
end end
end end
run_pipeline_step(:loader, loader.class.name, context) do run_pipeline_step(:loader, loader.class.name) do
loader.load(context, entry) loader.load(context, entry)
end end
end end
after_run(context, extracted_data) if respond_to?(:after_run) after_run(extracted_data) if respond_to?(:after_run)
info(context, message: 'Pipeline finished') info(message: 'Pipeline finished')
rescue MarkedAsFailedError rescue MarkedAsFailedError
log_skip(context) log_skip
end end
private # rubocop:disable Lint/UselessAccessModifier private # rubocop:disable Lint/UselessAccessModifier
def run_pipeline_step(step, class_name, context) def run_pipeline_step(step, class_name)
raise MarkedAsFailedError if marked_as_failed?(context) raise MarkedAsFailedError if marked_as_failed?
info(context, pipeline_step: step, step_class: class_name) info(pipeline_step: step, step_class: class_name)
yield yield
rescue MarkedAsFailedError rescue MarkedAsFailedError
log_skip(context, step => class_name) log_skip(step => class_name)
rescue => e rescue => e
log_import_failure(e, step, context) log_import_failure(e, step)
mark_as_failed(context) if abort_on_failure? mark_as_failed if abort_on_failure?
nil nil
end end
def extracted_data_from(context) def extracted_data_from
run_pipeline_step(:extractor, extractor.class.name, context) do run_pipeline_step(:extractor, extractor.class.name) do
extractor.extract(context) extractor.extract(context)
end end
end end
def mark_as_failed(context) def mark_as_failed
warn(context, message: 'Pipeline failed', pipeline_class: pipeline) warn(message: 'Pipeline failed', pipeline_class: pipeline)
context.entity.fail_op! context.entity.fail_op!
end end
def marked_as_failed?(context) def marked_as_failed?
return true if context.entity.failed? return true if context.entity.failed?
false false
end end
def log_skip(context, extra = {}) def log_skip(extra = {})
log = { log = {
message: 'Skipping due to failed pipeline status', message: 'Skipping due to failed pipeline status',
pipeline_class: pipeline pipeline_class: pipeline
}.merge(extra) }.merge(extra)
info(context, log) info(log)
end end
def log_import_failure(exception, step, context) def log_import_failure(exception, step)
attributes = { attributes = {
bulk_import_entity_id: context.entity.id, bulk_import_entity_id: context.entity.id,
pipeline_class: pipeline, pipeline_class: pipeline,
...@@ -91,15 +91,15 @@ module BulkImports ...@@ -91,15 +91,15 @@ module BulkImports
BulkImports::Failure.create(attributes) BulkImports::Failure.create(attributes)
end end
def warn(context, extra = {}) def warn(extra = {})
logger.warn(log_base_params(context).merge(extra)) logger.warn(log_base_params.merge(extra))
end end
def info(context, extra = {}) def info(extra = {})
logger.info(log_base_params(context).merge(extra)) logger.info(log_base_params.merge(extra))
end end
def log_base_params(context) def log_base_params
{ {
bulk_import_entity_id: context.entity.id, bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type, bulk_import_entity_type: context.entity.source_type,
......
...@@ -33,6 +33,8 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -33,6 +33,8 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
} }
end end
subject { described_class.new(context) }
before do before do
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor).to receive(:extract).and_return([group_data]) allow(extractor).to receive(:extract).and_return([group_data])
...@@ -44,7 +46,7 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -44,7 +46,7 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
it 'imports new group into destination group' do it 'imports new group into destination group' do
group_path = 'my-destination-group' group_path = 'my-destination-group'
subject.run(context) subject.run
imported_group = Group.find_by_path(group_path) imported_group = Group.find_by_path(group_path)
......
...@@ -18,6 +18,8 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -18,6 +18,8 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
let(:context) { BulkImports::Pipeline::Context.new(entity) } let(:context) { BulkImports::Pipeline::Context.new(entity) }
subject { described_class.new(context) }
def extractor_data(title:, has_next_page:, cursor: nil) def extractor_data(title:, has_next_page:, cursor: nil)
data = [ data = [
{ {
...@@ -46,7 +48,7 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -46,7 +48,7 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
.and_return(first_page, last_page) .and_return(first_page, last_page)
end end
expect { subject.run(context) }.to change(Label, :count).by(2) expect { subject.run }.to change(Label, :count).by(2)
label = group.labels.order(:created_at).last label = group.labels.order(:created_at).last
...@@ -61,9 +63,9 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -61,9 +63,9 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
it 'updates tracker information and runs pipeline again' do it 'updates tracker information and runs pipeline again' do
data = extractor_data(title: 'label', has_next_page: true, cursor: cursor) data = extractor_data(title: 'label', has_next_page: true, cursor: cursor)
expect(subject).to receive(:run).with(context) expect(subject).to receive(:run)
subject.after_run(context, data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :labels) tracker = entity.trackers.find_by(relation: :labels)
...@@ -76,9 +78,9 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -76,9 +78,9 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
it 'updates tracker information and does not run pipeline' do it 'updates tracker information and does not run pipeline' do
data = extractor_data(title: 'label', has_next_page: false) data = extractor_data(title: 'label', has_next_page: false)
expect(subject).not_to receive(:run).with(context) expect(subject).not_to receive(:run)
subject.after_run(context, data) subject.after_run(data)
tracker = entity.trackers.find_by(relation: :labels) tracker = entity.trackers.find_by(relation: :labels)
......
...@@ -13,6 +13,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do ...@@ -13,6 +13,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) } let_it_be(:context) { BulkImports::Pipeline::Context.new(entity) }
subject { described_class.new(context) }
describe '#run' do describe '#run' do
it 'maps existing users to the imported group' do it 'maps existing users to the imported group' do
first_page = member_data(email: member_user1.email, has_next_page: true, cursor: cursor) first_page = member_data(email: member_user1.email, has_next_page: true, cursor: cursor)
...@@ -24,7 +26,7 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do ...@@ -24,7 +26,7 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do
.and_return(first_page, last_page) .and_return(first_page, last_page)
end end
expect { subject.run(context) }.to change(GroupMember, :count).by(2) expect { subject.run }.to change(GroupMember, :count).by(2)
members = group.members.map { |m| m.slice(:user_id, :access_level) } members = group.members.map { |m| m.slice(:user_id, :access_level) }
......
...@@ -25,7 +25,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do ...@@ -25,7 +25,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
] ]
end end
subject { described_class.new } subject { described_class.new(context) }
before do before do
allow_next_instance_of(BulkImports::Groups::Extractors::SubgroupsExtractor) do |extractor| allow_next_instance_of(BulkImports::Groups::Extractors::SubgroupsExtractor) do |extractor|
...@@ -36,7 +36,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do ...@@ -36,7 +36,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
end end
it 'creates entities for the subgroups' do it 'creates entities for the subgroups' do
expect { subject.run(context) }.to change(BulkImports::Entity, :count).by(1) expect { subject.run }.to change(BulkImports::Entity, :count).by(1)
subgroup_entity = BulkImports::Entity.last subgroup_entity = BulkImports::Entity.last
......
...@@ -42,8 +42,8 @@ RSpec.describe BulkImports::Importers::GroupImporter do ...@@ -42,8 +42,8 @@ RSpec.describe BulkImports::Importers::GroupImporter do
end end
def expect_to_run_pipeline(klass, context:) def expect_to_run_pipeline(klass, context:)
expect_next_instance_of(klass) do |pipeline| expect_next_instance_of(klass, context) do |pipeline|
expect(pipeline).to receive(:run).with(context) expect(pipeline).to receive(:run)
end end
end end
end end
...@@ -112,7 +112,7 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -112,7 +112,7 @@ RSpec.describe BulkImports::Pipeline::Runner do
) )
end end
BulkImports::MyPipeline.new.run(context) BulkImports::MyPipeline.new(context).run
end end
context 'when exception is raised' do context 'when exception is raised' do
...@@ -126,7 +126,7 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -126,7 +126,7 @@ RSpec.describe BulkImports::Pipeline::Runner do
end end
it 'logs import failure' do it 'logs import failure' do
BulkImports::MyPipeline.new.run(context) BulkImports::MyPipeline.new(context).run
failure = entity.failures.first failure = entity.failures.first
...@@ -143,7 +143,7 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -143,7 +143,7 @@ RSpec.describe BulkImports::Pipeline::Runner do
end end
it 'marks entity as failed' do it 'marks entity as failed' do
BulkImports::MyPipeline.new.run(context) BulkImports::MyPipeline.new(context).run
expect(entity.failed?).to eq(true) expect(entity.failed?).to eq(true)
end end
...@@ -159,13 +159,13 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -159,13 +159,13 @@ RSpec.describe BulkImports::Pipeline::Runner do
) )
end end
BulkImports::MyPipeline.new.run(context) BulkImports::MyPipeline.new(context).run
end end
end end
context 'when pipeline is not marked to abort on failure' do context 'when pipeline is not marked to abort on failure' do
it 'marks entity as failed' do it 'marks entity as failed' do
BulkImports::MyPipeline.new.run(context) BulkImports::MyPipeline.new(context).run
expect(entity.failed?).to eq(false) expect(entity.failed?).to eq(false)
end end
...@@ -190,7 +190,7 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -190,7 +190,7 @@ RSpec.describe BulkImports::Pipeline::Runner do
) )
end end
BulkImports::MyPipeline.new.run(context) BulkImports::MyPipeline.new(context).run
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment