Commit b7c33981 authored by Kassio Borges's avatar Kassio Borges

BulkImports: Remove after_run duplication

With the introduction of the new fields on the `BulkImports::Tracker` to
track concurrent work, the duplication on
`BulkImports::Pipeline#after_run` in most of the pipelines became more
evident.

This commit, DRYs up this duplication, moving the `after_run` logic to
within the `BulkImports::Pipeline` concern.
parent 1b8288c5
...@@ -24,12 +24,9 @@ module EE ...@@ -24,12 +24,9 @@ module EE
set_next_epic set_next_epic
end end
def after_run(extracted_data) private
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
def after_run(extracted_data)
set_next_epic unless extracted_data.has_next_page? set_next_epic unless extracted_data.has_next_page?
if extracted_data.has_next_page? || context.extra[:epic_iid] if extracted_data.has_next_page? || context.extra[:epic_iid]
...@@ -37,8 +34,6 @@ module EE ...@@ -37,8 +34,6 @@ module EE
end end
end end
private
def set_next_epic def set_next_epic
context.extra[:epic_iid] = @epic_iids.pop context.extra[:epic_iid] = @epic_iids.pop
end end
......
...@@ -47,12 +47,9 @@ module EE ...@@ -47,12 +47,9 @@ module EE
end end
end end
def after_run(extracted_data) private
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
def after_run(extracted_data)
set_next_epic unless extracted_data.has_next_page? set_next_epic unless extracted_data.has_next_page?
if extracted_data.has_next_page? || context.extra[:epic_iid] if extracted_data.has_next_page? || context.extra[:epic_iid]
...@@ -60,8 +57,6 @@ module EE ...@@ -60,8 +57,6 @@ module EE
end end
end end
private
def set_next_epic def set_next_epic
context.extra[:epic_iid] = @epic_iids.pop context.extra[:epic_iid] = @epic_iids.pop
end end
......
...@@ -23,17 +23,6 @@ module EE ...@@ -23,17 +23,6 @@ module EE
context.group.epics.create!(data) context.group.epics.create!(data)
end end
def after_run(extracted_data)
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
if extracted_data.has_next_page?
run
end
end
private private
def authorized? def authorized?
......
...@@ -20,17 +20,6 @@ module EE ...@@ -20,17 +20,6 @@ module EE
context.group.iterations.create!(data) context.group.iterations.create!(data)
end end
def after_run(extracted_data)
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
if extracted_data.has_next_page?
run
end
end
private private
def authorized? def authorized?
......
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
let_it_be(:cursor) { 'cursor' }
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:epic) { create(:epic, group: group) } let_it_be(:epic) { create(:epic, group: group) }
...@@ -40,12 +39,10 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do ...@@ -40,12 +39,10 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
describe '#run' do describe '#run' do
it 'imports epic award emoji' do it 'imports epic award emoji' do
data = extractor_data(has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor) allow(extractor)
.to receive(:extract) .to receive(:extract)
.and_return(data) .and_return(extracted_data)
end end
expect { subject.run }.to change(::AwardEmoji, :count).by(1) expect { subject.run }.to change(::AwardEmoji, :count).by(1)
...@@ -53,42 +50,46 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do ...@@ -53,42 +50,46 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
end end
end end
describe '#after_run' do context 'when extracted data many pages' do
context 'when extracted data has next page' do it 'runs pipeline for the second page' do
it 'updates tracker information and runs pipeline again' do first_page = extracted_data(has_next_page: true)
data = extractor_data(has_next_page: true, cursor: cursor) last_page = extracted_data
expect(subject).to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(true) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
expect(tracker.next_page).to eq(cursor) allow(extractor)
.to receive(:extract)
.and_return(first_page, last_page)
end end
end
context 'when extracted data has no next page' do subject.run
it 'updates tracker information and does not run pipeline' do end
data = extractor_data(has_next_page: false) end
expect(subject).not_to receive(:run)
subject.after_run(data) context 'when there is many epics to import' do
let_it_be(:second_epic) { create(:epic, group: group) }
expect(tracker.has_next_page).to eq(false) it 'runs the pipeline for the next epic' do
expect(tracker.next_page).to be_nil allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor)
.to receive(:extract)
.twice # for each epic
.and_return(extracted_data)
end end
it 'updates context with next epic iid' do expect(context.extra)
epic2 = create(:epic, group: group) .to receive(:[]=)
data = extractor_data(has_next_page: false) .with(:epic_iid, epic.iid)
.and_call_original
expect(subject).to receive(:run) expect(context.extra)
.to receive(:[]=)
subject.after_run(data) .with(:epic_iid, second_epic.iid)
.and_call_original
expect(context.extra[:epic_iid]).to eq(epic2.iid) expect(context.extra)
end .to receive(:[]=)
.with(:epic_iid, nil)
.and_call_original
subject.run
end end
end end
...@@ -119,12 +120,12 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do ...@@ -119,12 +120,12 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline do
end end
end end
def extractor_data(has_next_page:, cursor: nil) def extracted_data(has_next_page: false)
data = [{ 'name' => 'thumbsup' }] data = [{ 'name' => 'thumbsup' }]
page_info = { page_info = {
'end_cursor' => cursor, 'has_next_page' => has_next_page,
'has_next_page' => has_next_page 'end_cursor' => has_next_page ? 'cursor' : nil
} }
BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info)
......
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
let_it_be(:cursor) { 'cursor' }
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:epic) { create(:epic, group: group) } let_it_be(:epic) { create(:epic, group: group) }
...@@ -40,12 +39,10 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do ...@@ -40,12 +39,10 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
describe '#run' do describe '#run' do
it 'imports epic events and resource state events' do it 'imports epic events and resource state events' do
data = extractor_data(has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor) allow(extractor)
.to receive(:extract) .to receive(:extract)
.and_return(data) .and_return(extracted_data)
end end
subject.run subject.run
...@@ -94,42 +91,46 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do ...@@ -94,42 +91,46 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
end end
end end
describe '#after_run' do context 'when extracted data many pages' do
context 'when extracted data has next page' do it 'runs pipeline for the second page' do
it 'updates tracker information and runs pipeline again' do first_page = extracted_data(has_next_page: true)
data = extractor_data(has_next_page: true, cursor: cursor) last_page = extracted_data
expect(subject).to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(true) allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
expect(tracker.next_page).to eq(cursor) allow(extractor)
.to receive(:extract)
.and_return(first_page, last_page)
end end
end
context 'when extracted data has no next page' do
it 'updates tracker information and does not run pipeline' do
data = extractor_data(has_next_page: false)
expect(subject).not_to receive(:run) subject.run
end
end
subject.after_run(data) context 'when there is many epics to import' do
let_it_be(:second_epic) { create(:epic, group: group) }
expect(tracker.has_next_page).to eq(false) it 'runs the pipeline for the next epic' do
expect(tracker.next_page).to be_nil allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor)
.to receive(:extract)
.twice # for each epic
.and_return(extracted_data)
end end
it 'updates context with next epic iid' do expect(context.extra)
epic2 = create(:epic, group: group) .to receive(:[]=)
data = extractor_data(has_next_page: false) .with(:epic_iid, epic.iid)
.and_call_original
expect(subject).to receive(:run) expect(context.extra)
.to receive(:[]=)
.with(:epic_iid, second_epic.iid)
.and_call_original
expect(context.extra)
.to receive(:[]=)
.with(:epic_iid, nil)
.and_call_original
subject.after_run(data) subject.run
expect(context.extra[:epic_iid]).to eq(epic2.iid)
end
end end
end end
...@@ -156,7 +157,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do ...@@ -156,7 +157,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
end end
end end
def extractor_data(has_next_page:, cursor: nil) def extracted_data(has_next_page: false)
data = [ data = [
{ {
'action' => 'CLOSED', 'action' => 'CLOSED',
...@@ -169,10 +170,13 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do ...@@ -169,10 +170,13 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicEventsPipeline do
] ]
page_info = { page_info = {
'end_cursor' => cursor, 'has_next_page' => has_next_page,
'has_next_page' => has_next_page 'end_cursor' => has_next_page ? 'cursor' : nil
} }
BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) BulkImports::Pipeline::ExtractedData.new(
data: data,
page_info: page_info
)
end end
end end
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_redis_cache do RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_redis_cache do
let_it_be(:cursor) { 'cursor' }
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
...@@ -31,8 +30,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_ ...@@ -31,8 +30,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
describe '#run' do describe '#run' do
it 'imports group epics into destination group' do it 'imports group epics into destination group' do
first_page = extractor_data(has_next_page: true, cursor: cursor) first_page = extracted_data(has_next_page: true)
last_page = extractor_data(has_next_page: false, page: 2) last_page = extracted_data
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor) allow(extractor)
...@@ -88,9 +87,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_ ...@@ -88,9 +87,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
end end
it 'raises NotAllowedError' do it 'raises NotAllowedError' do
data = extractor_data(has_next_page: false) expect { subject.load(context, extracted_data) }
.to raise_error(::BulkImports::Pipeline::NotAllowedError)
expect { subject.load(context, data) }.to raise_error(::BulkImports::Pipeline::NotAllowedError)
end end
end end
end end
...@@ -109,34 +107,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_ ...@@ -109,34 +107,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
end end
end end
describe '#after_run' do
context 'when extracted data has next page' do
it 'updates tracker information and runs pipeline again' do
data = extractor_data(has_next_page: true, cursor: cursor)
expect(subject).to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
context 'when extracted data has no next page' do
it 'updates tracker information and does not run pipeline' do
data = extractor_data(has_next_page: false)
expect(subject).not_to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
end
end
describe 'pipeline parts' do describe 'pipeline parts' do
it { expect(described_class).to include_module(BulkImports::Pipeline) } it { expect(described_class).to include_module(BulkImports::Pipeline) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
...@@ -160,11 +130,11 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_ ...@@ -160,11 +130,11 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
end end
end end
def extractor_data(has_next_page:, cursor: nil, page: 1) def extracted_data(has_next_page: false)
data = [ data = [
{ {
'id' => "gid://gitlab/Epic/#{page}", 'id' => "gid://gitlab/Epic/99",
'iid' => page, 'iid' => has_next_page ? 2 : 1,
'title' => 'epic1', 'title' => 'epic1',
'state' => 'closed', 'state' => 'closed',
'confidential' => true, 'confidential' => true,
...@@ -175,8 +145,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_ ...@@ -175,8 +145,8 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_
] ]
page_info = { page_info = {
'end_cursor' => cursor, 'has_next_page' => has_next_page,
'has_next_page' => has_next_page 'end_cursor' => has_next_page ? 'cursor' : nil
} }
BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info)
......
...@@ -5,7 +5,6 @@ require 'spec_helper' ...@@ -5,7 +5,6 @@ require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:cursor) { 'cursor' }
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
...@@ -30,31 +29,10 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do ...@@ -30,31 +29,10 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
group.add_owner(user) group.add_owner(user)
end end
def iteration_data(title, start_date: Date.today)
{
'title' => title,
'description' => 'desc',
'state' => 'upcoming',
'start_date' => start_date,
'due_date' => start_date + 1.day,
'created_at' => timestamp.to_s,
'updated_at' => timestamp.to_s
}
end
def extracted_data(title:, has_next_page:, cursor: nil, start_date: Date.today)
page_info = {
'end_cursor' => cursor,
'has_next_page' => has_next_page
}
BulkImports::Pipeline::ExtractedData.new(data: [iteration_data(title, start_date: start_date)], page_info: page_info)
end
describe '#run' do describe '#run' do
it 'imports group iterations' do it 'imports group iterations' do
first_page = extracted_data(title: 'iteration1', has_next_page: true, cursor: cursor) first_page = extracted_data(title: 'iteration1', has_next_page: true)
last_page = extracted_data(title: 'iteration2', has_next_page: false, start_date: Date.today + 2.days) last_page = extracted_data(title: 'iteration2', start_date: Date.today + 2.days)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor) allow(extractor)
...@@ -77,34 +55,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do ...@@ -77,34 +55,6 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
end end
end end
describe '#after_run' do
context 'when extracted data has next page' do
it 'updates tracker information and runs pipeline again' do
data = extracted_data(title: 'iteration', has_next_page: true, cursor: cursor)
expect(subject).to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
context 'when extracted data has no next page' do
it 'updates tracker information and does not run pipeline' do
data = extracted_data(title: 'iteration', has_next_page: false)
expect(subject).not_to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
end
end
describe '#load' do describe '#load' do
it 'creates the iteration' do it 'creates the iteration' do
data = iteration_data('iteration') data = iteration_data('iteration')
...@@ -146,4 +96,28 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do ...@@ -146,4 +96,28 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::IterationsPipeline do
) )
end end
end end
def iteration_data(title, start_date: Date.today)
{
'title' => title,
'description' => 'desc',
'state' => 'upcoming',
'start_date' => start_date,
'due_date' => start_date + 1.day,
'created_at' => timestamp.to_s,
'updated_at' => timestamp.to_s
}
end
def extracted_data(title:, start_date: Date.today, has_next_page: false)
page_info = {
'has_next_page' => has_next_page,
'end_cursor' => has_next_page ? 'cursor' : nil
}
BulkImports::Pipeline::ExtractedData.new(
data: iteration_data(title, start_date: start_date),
page_info: page_info
)
end
end end
...@@ -14,17 +14,6 @@ module BulkImports ...@@ -14,17 +14,6 @@ module BulkImports
def load(context, data) def load(context, data)
Labels::CreateService.new(data).execute(group: context.group) Labels::CreateService.new(data).execute(group: context.group)
end end
def after_run(extracted_data)
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
if extracted_data.has_next_page?
run
end
end
end end
end end
end end
......
...@@ -17,17 +17,6 @@ module BulkImports ...@@ -17,17 +17,6 @@ module BulkImports
context.group.members.create!(data) context.group.members.create!(data)
end end
def after_run(extracted_data)
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
if extracted_data.has_next_page?
run
end
end
end end
end end
end end
......
...@@ -19,17 +19,6 @@ module BulkImports ...@@ -19,17 +19,6 @@ module BulkImports
context.group.milestones.create!(data) context.group.milestones.create!(data)
end end
def after_run(extracted_data)
tracker.update(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
if extracted_data.has_next_page?
run
end
end
private private
def authorized? def authorized?
......
...@@ -14,19 +14,24 @@ module BulkImports ...@@ -14,19 +14,24 @@ module BulkImports
extracted_data = extracted_data_from extracted_data = extracted_data_from
extracted_data&.each do |entry| if extracted_data
transformers.each do |transformer| extracted_data.each do |entry|
entry = run_pipeline_step(:transformer, transformer.class.name) do transformers.each do |transformer|
transformer.transform(context, entry) entry = run_pipeline_step(:transformer, transformer.class.name) do
transformer.transform(context, entry)
end
end end
end
run_pipeline_step(:loader, loader.class.name) do run_pipeline_step(:loader, loader.class.name) do
loader.load(context, entry) loader.load(context, entry)
end
end end
end
if extracted_data && respond_to?(:after_run) tracker.update!(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
run_pipeline_step(:after_run) do run_pipeline_step(:after_run) do
after_run(extracted_data) after_run(extracted_data)
end end
...@@ -65,6 +70,10 @@ module BulkImports ...@@ -65,6 +70,10 @@ module BulkImports
end end
end end
def after_run(extracted_data)
run if extracted_data.has_next_page?
end
def mark_as_failed def mark_as_failed
warn(message: 'Pipeline failed') warn(message: 'Pipeline failed')
......
...@@ -5,7 +5,6 @@ require 'spec_helper' ...@@ -5,7 +5,6 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:cursor) { 'cursor' }
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let_it_be(:entity) do let_it_be(:entity) do
...@@ -23,29 +22,10 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -23,29 +22,10 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
subject { described_class.new(context) } subject { described_class.new(context) }
def label_data(title)
{
'title' => title,
'description' => 'desc',
'color' => '#428BCA',
'created_at' => timestamp.to_s,
'updated_at' => timestamp.to_s
}
end
def extractor_data(title:, has_next_page:, cursor: nil)
page_info = {
'end_cursor' => cursor,
'has_next_page' => has_next_page
}
BulkImports::Pipeline::ExtractedData.new(data: [label_data(title)], page_info: page_info)
end
describe '#run' do describe '#run' do
it 'imports a group labels' do it 'imports a group labels' do
first_page = extractor_data(title: 'label1', has_next_page: true, cursor: cursor) first_page = extracted_data(title: 'label1', has_next_page: true)
last_page = extractor_data(title: 'label2', has_next_page: false) last_page = extracted_data(title: 'label2')
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor) allow(extractor)
...@@ -65,34 +45,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -65,34 +45,6 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
end end
end end
describe '#after_run' do
context 'when extracted data has next page' do
it 'updates tracker information and runs pipeline again' do
data = extractor_data(title: 'label', has_next_page: true, cursor: cursor)
expect(subject).to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
context 'when extracted data has no next page' do
it 'updates tracker information and does not run pipeline' do
data = extractor_data(title: 'label', has_next_page: false)
expect(subject).not_to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
end
end
describe '#load' do describe '#load' do
it 'creates the label' do it 'creates the label' do
data = label_data('label') data = label_data('label')
...@@ -128,4 +80,23 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do ...@@ -128,4 +80,23 @@ RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
) )
end end
end end
def label_data(title)
{
'title' => title,
'description' => 'desc',
'color' => '#428BCA',
'created_at' => timestamp.to_s,
'updated_at' => timestamp.to_s
}
end
def extracted_data(title:, has_next_page: false)
page_info = {
'has_next_page' => has_next_page,
'end_cursor' => has_next_page ? 'cursor' : nil
}
BulkImports::Pipeline::ExtractedData.new(data: [label_data(title)], page_info: page_info)
end
end end
...@@ -8,7 +8,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do ...@@ -8,7 +8,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:cursor) { 'cursor' }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) } let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import, group: group) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
...@@ -18,8 +17,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do ...@@ -18,8 +17,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do
describe '#run' do describe '#run' do
it 'maps existing users to the imported group' do it 'maps existing users to the imported group' do
first_page = member_data(email: member_user1.email, has_next_page: true, cursor: cursor) first_page = extracted_data(email: member_user1.email, has_next_page: true)
last_page = member_data(email: member_user2.email, has_next_page: false) last_page = extracted_data(email: member_user2.email)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor) allow(extractor)
...@@ -89,7 +88,7 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do ...@@ -89,7 +88,7 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do
end end
end end
def member_data(email:, has_next_page:, cursor: nil) def extracted_data(email:, has_next_page: false)
data = { data = {
'created_at' => '2020-01-01T00:00:00Z', 'created_at' => '2020-01-01T00:00:00Z',
'updated_at' => '2020-01-01T00:00:00Z', 'updated_at' => '2020-01-01T00:00:00Z',
...@@ -103,8 +102,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do ...@@ -103,8 +102,8 @@ RSpec.describe BulkImports::Groups::Pipelines::MembersPipeline do
} }
page_info = { page_info = {
'end_cursor' => cursor, 'has_next_page' => has_next_page,
'has_next_page' => has_next_page 'end_cursor' => has_next_page ? 'cursor' : nil
} }
BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info) BulkImports::Pipeline::ExtractedData.new(data: data, page_info: page_info)
......
...@@ -5,7 +5,6 @@ require 'spec_helper' ...@@ -5,7 +5,6 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:cursor) { 'cursor' }
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
...@@ -25,35 +24,14 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do ...@@ -25,35 +24,14 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
subject { described_class.new(context) } subject { described_class.new(context) }
def milestone_data(title)
{
'title' => title,
'description' => 'desc',
'state' => 'closed',
'start_date' => '2020-10-21',
'due_date' => '2020-10-22',
'created_at' => timestamp.to_s,
'updated_at' => timestamp.to_s
}
end
def extracted_data(title:, has_next_page:, cursor: nil)
page_info = {
'end_cursor' => cursor,
'has_next_page' => has_next_page
}
BulkImports::Pipeline::ExtractedData.new(data: [milestone_data(title)], page_info: page_info)
end
before do before do
group.add_owner(user) group.add_owner(user)
end end
describe '#run' do describe '#run' do
it 'imports group milestones' do it 'imports group milestones' do
first_page = extracted_data(title: 'milestone1', has_next_page: true, cursor: cursor) first_page = extracted_data(title: 'milestone1', has_next_page: true)
last_page = extracted_data(title: 'milestone2', has_next_page: false) last_page = extracted_data(title: 'milestone2')
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor| allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor) allow(extractor)
...@@ -76,34 +54,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do ...@@ -76,34 +54,6 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
end end
end end
describe '#after_run' do
context 'when extracted data has next page' do
it 'updates tracker information and runs pipeline again' do
data = extracted_data(title: 'milestone', has_next_page: true, cursor: cursor)
expect(subject).to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq(cursor)
end
end
context 'when extracted data has no next page' do
it 'updates tracker information and does not run pipeline' do
data = extracted_data(title: 'milestone', has_next_page: false)
expect(subject).not_to receive(:run)
subject.after_run(data)
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to be_nil
end
end
end
describe '#load' do describe '#load' do
it 'creates the milestone' do it 'creates the milestone' do
data = milestone_data('milestone') data = milestone_data('milestone')
...@@ -117,7 +67,7 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do ...@@ -117,7 +67,7 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
end end
it 'raises NotAllowedError' do it 'raises NotAllowedError' do
data = extracted_data(title: 'milestone', has_next_page: false) data = extracted_data(title: 'milestone')
expect { subject.load(context, data) }.to raise_error(::BulkImports::Pipeline::NotAllowedError) expect { subject.load(context, data) }.to raise_error(::BulkImports::Pipeline::NotAllowedError)
end end
...@@ -145,4 +95,28 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do ...@@ -145,4 +95,28 @@ RSpec.describe BulkImports::Groups::Pipelines::MilestonesPipeline do
) )
end end
end end
def milestone_data(title)
{
'title' => title,
'description' => 'desc',
'state' => 'closed',
'start_date' => '2020-10-21',
'due_date' => '2020-10-22',
'created_at' => timestamp.to_s,
'updated_at' => timestamp.to_s
}
end
def extracted_data(title:, has_next_page: false)
page_info = {
'has_next_page' => has_next_page,
'end_cursor' => has_next_page ? 'cursor' : nil
}
BulkImports::Pipeline::ExtractedData.new(
data: milestone_data(title),
page_info: page_info
)
end
end end
...@@ -12,19 +12,17 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do ...@@ -12,19 +12,17 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
subject { described_class.new(context) } subject { described_class.new(context) }
describe '#run' do let(:extracted_data) do
let(:subgroup_data) do BulkImports::Pipeline::ExtractedData.new(data: {
[ 'name' => 'subgroup',
{ 'full_path' => 'parent/subgroup'
"name" => "subgroup", })
"full_path" => "parent/subgroup" end
}
]
end
describe '#run' do
before do before do
allow_next_instance_of(BulkImports::Groups::Extractors::SubgroupsExtractor) do |extractor| allow_next_instance_of(BulkImports::Groups::Extractors::SubgroupsExtractor) do |extractor|
allow(extractor).to receive(:extract).and_return(subgroup_data) allow(extractor).to receive(:extract).and_return(extracted_data)
end end
parent.add_owner(user) parent.add_owner(user)
......
...@@ -38,8 +38,6 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -38,8 +38,6 @@ RSpec.describe BulkImports::Pipeline::Runner do
extractor BulkImports::Extractor extractor BulkImports::Extractor
transformer BulkImports::Transformer transformer BulkImports::Transformer
loader BulkImports::Loader loader BulkImports::Loader
def after_run(_); end
end end
stub_const('BulkImports::MyPipeline', pipeline) stub_const('BulkImports::MyPipeline', pipeline)
...@@ -54,8 +52,6 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -54,8 +52,6 @@ RSpec.describe BulkImports::Pipeline::Runner do
describe 'pipeline runner' do describe 'pipeline runner' do
context 'when entity is not marked as failed' do context 'when entity is not marked as failed' do
it 'runs pipeline extractor, transformer, loader' do it 'runs pipeline extractor, transformer, loader' do
extracted_data = BulkImports::Pipeline::ExtractedData.new(data: { foo: :bar })
expect_next_instance_of(BulkImports::Extractor) do |extractor| expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor) expect(extractor)
.to receive(:extract) .to receive(:extract)
...@@ -133,6 +129,22 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -133,6 +129,22 @@ RSpec.describe BulkImports::Pipeline::Runner do
subject.run subject.run
end end
context 'when extracted data has multiple pages' do
it 'updates tracker information and runs pipeline again' do
first_page = extracted_data(has_next_page: true)
last_page = extracted_data
expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor)
.to receive(:extract)
.with(context)
.and_return(first_page, last_page)
end
subject.run
end
end
context 'when exception is raised' do context 'when exception is raised' do
before do before do
allow_next_instance_of(BulkImports::Extractor) do |extractor| allow_next_instance_of(BulkImports::Extractor) do |extractor|
...@@ -218,14 +230,24 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -218,14 +230,24 @@ RSpec.describe BulkImports::Pipeline::Runner do
subject.run subject.run
end end
end end
end
def log_params(context, extra = {}) def log_params(context, extra = {})
{ {
bulk_import_id: context.bulk_import.id, bulk_import_id: context.bulk_import.id,
bulk_import_entity_id: context.entity.id, bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type, bulk_import_entity_type: context.entity.source_type,
context_extra: context.extra context_extra: context.extra
}.merge(extra) }.merge(extra)
end
def extracted_data(has_next_page: false)
BulkImports::Pipeline::ExtractedData.new(
data: { foo: :bar },
page_info: {
'has_next_page' => has_next_page,
'end_cursor' => has_next_page ? 'cursor' : nil
}
)
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment