Commit 53ca7bd1 authored by George Koltsov's avatar George Koltsov

Add BulkImports NdjsonExtractor & update labels pipeline to use it

  - Use ndjson files when importing group labels when using
    Bulk Import functionality
  - Such approach is required instead of using GraphQL
    in order to preserve epic-label association
  - Ndjson approach downloads ndjson file containing
    all source group labels and imports them

Changelog: changed
parent 4696e416
......@@ -4,6 +4,10 @@ module BulkImports
class Export < ApplicationRecord
include Gitlab::Utils::StrongMemoize
STARTED = 0
FINISHED = 1
FAILED = -1
self.table_name = 'bulk_import_exports'
belongs_to :project, optional: true
......@@ -18,9 +22,9 @@ module BulkImports
validate :portable_relation?
state_machine :status, initial: :started do
state :started, value: 0
state :finished, value: 1
state :failed, value: -1
state :started, value: STARTED
state :finished, value: FINISHED
state :failed, value: FAILED
event :start do
transition any => :started
......
# frozen_string_literal: true
module BulkImports
class ExportStatus
include Gitlab::Utils::StrongMemoize
def initialize(pipeline_tracker, relation)
@pipeline_tracker = pipeline_tracker
@relation = relation
@entity = @pipeline_tracker.entity
@configuration = @entity.bulk_import.configuration
@client = Clients::Http.new(uri: @configuration.url, token: @configuration.access_token)
end
def started?
export_status['status'] == Export::STARTED
end
def failed?
export_status['status'] == Export::FAILED
end
def error
export_status['error']
end
private
attr_reader :client, :entity, :relation
def export_status
strong_memoize(:export_status) do
fetch_export_status.find { |item| item['relation'] == relation }
end
rescue StandardError => e
{ 'status' => Export::FAILED, 'error' => e.message }
end
def fetch_export_status
client.get(status_endpoint).parsed_response
end
def status_endpoint
"/groups/#{entity.encoded_source_full_path}/export_relations/status"
end
end
end
......@@ -13,6 +13,14 @@ module BulkImports
attributes_finder.find_root(portable_class_sym)
end
def top_relation_tree(relation)
portable_relations_tree[relation.to_s]
end
def relation_excluded_keys(relation)
attributes_finder.find_excluded_keys(relation)
end
def export_path
strong_memoize(:export_path) do
relative_path = File.join(base_export_path, SecureRandom.hex)
......@@ -47,6 +55,10 @@ module BulkImports
@portable_class_sym ||= portable_class.to_s.demodulize.underscore.to_sym
end
def portable_relations_tree
@portable_relations_tree ||= attributes_finder.find_relations_tree(portable_class_sym).deep_stringify_keys
end
def import_export_yaml
raise NotImplementedError
end
......
......@@ -3,7 +3,7 @@
module BulkImports
class FileDownloadService
FILE_SIZE_LIMIT = 5.gigabytes
ALLOWED_CONTENT_TYPES = ['application/octet-stream'].freeze
ALLOWED_CONTENT_TYPES = %w(application/gzip application/octet-stream).freeze
ServiceError = Class.new(StandardError)
......
......@@ -4,6 +4,8 @@ module BulkImports
class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
NDJSON_PIPELINE_PERFORM_DELAY = 1.minute
feature_category :importers
tags :exclude_from_kubernetes
......@@ -40,6 +42,15 @@ module BulkImports
private
def run(pipeline_tracker)
if ndjson_pipeline?(pipeline_tracker)
status = ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class::RELATION)
raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?(pipeline_tracker)
raise(Pipeline::FailedError, status.error) if status.failed?
return reenqueue(pipeline_tracker) if status.started?
end
pipeline_tracker.update!(status_event: 'start', jid: jid)
context = ::BulkImports::Pipeline::Context.new(pipeline_tracker)
......@@ -48,7 +59,7 @@ module BulkImports
pipeline_tracker.finish!
rescue StandardError => e
pipeline_tracker.fail_op!
pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error(
worker: self.class.name,
......@@ -67,5 +78,17 @@ module BulkImports
def logger
@logger ||= Gitlab::Import::Logger.build
end
def ndjson_pipeline?(pipeline_tracker)
pipeline_tracker.pipeline_class.ndjson_pipeline?
end
def job_timeout?(pipeline_tracker)
(Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
end
def reenqueue(pipeline_tracker)
self.class.perform_in(NDJSON_PIPELINE_PERFORM_DELAY, pipeline_tracker.id, pipeline_tracker.stage, pipeline_tracker.entity.id)
end
end
end
# frozen_string_literal: true
module BulkImports
module Common
module Extractors
class NdjsonExtractor
include Gitlab::ImportExport::CommandLineUtil
include Gitlab::Utils::StrongMemoize
EXPORT_DOWNLOAD_URL_PATH = "/%{resource}/%{full_path}/export_relations/download?relation=%{relation}"
def initialize(relation:)
@relation = relation
@tmp_dir = Dir.mktmpdir
end
def extract(context)
download_service(tmp_dir, context).execute
decompression_service(tmp_dir).execute
relations = ndjson_reader(tmp_dir).consume_relation('', relation)
BulkImports::Pipeline::ExtractedData.new(data: relations)
end
def remove_tmp_dir
FileUtils.remove_entry(tmp_dir)
end
private
attr_reader :relation, :tmp_dir
def filename
@filename ||= "#{relation}.ndjson.gz"
end
def download_service(tmp_dir, context)
@download_service ||= BulkImports::FileDownloadService.new(
configuration: context.configuration,
relative_url: relative_resource_url(context),
dir: tmp_dir,
filename: filename
)
end
def decompression_service(tmp_dir)
@decompression_service ||= BulkImports::FileDecompressionService.new(
dir: tmp_dir,
filename: filename
)
end
def ndjson_reader(tmp_dir)
@ndjson_reader ||= Gitlab::ImportExport::JSON::NdjsonReader.new(tmp_dir)
end
def relative_resource_url(context)
strong_memoize(:relative_resource_url) do
resource = context.portable.class.name.downcase.pluralize
encoded_full_path = context.entity.encoded_source_full_path
EXPORT_DOWNLOAD_URL_PATH % { resource: resource, full_path: encoded_full_path, relation: relation }
end
end
end
end
end
end
# frozen_string_literal: true
module BulkImports
module Groups
module Graphql
module GetLabelsQuery
extend self
def to_s
<<-'GRAPHQL'
query ($full_path: ID!, $cursor: String, $per_page: Int) {
group(fullPath: $full_path) {
labels(first: $per_page, after: $cursor, onlyGroupLabels: true) {
page_info: pageInfo {
next_page: endCursor
has_next_page: hasNextPage
}
nodes {
title
description
color
created_at: createdAt
updated_at: updatedAt
}
}
}
}
GRAPHQL
end
def variables(context)
{
full_path: context.entity.source_full_path,
cursor: context.tracker.next_page,
per_page: ::BulkImports::Tracker::DEFAULT_PAGE_SIZE
}
end
def base_path
%w[data group labels]
end
def data_path
base_path << 'nodes'
end
def page_info_path
base_path << 'page_info'
end
end
end
end
end
......@@ -4,6 +4,10 @@ module BulkImports
module Groups
module Pipelines
class EntityFinisher
def self.ndjson_pipeline?
false
end
def initialize(context)
@context = context
end
......
......@@ -4,15 +4,35 @@ module BulkImports
module Groups
module Pipelines
class LabelsPipeline
include Pipeline
include NdjsonPipeline
extractor BulkImports::Common::Extractors::GraphqlExtractor,
query: BulkImports::Groups::Graphql::GetLabelsQuery
RELATION = 'labels'
transformer Common::Transformers::ProhibitedAttributesTransformer
extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: RELATION
def load(context, data)
Labels::CreateService.new(data).execute(group: context.group)
def transform(context, data)
relation_hash = data.first
relation_index = data.last
relation_definition = import_export_config.top_relation_tree(RELATION)
deep_transform_relation!(relation_hash, RELATION, relation_definition) do |key, hash|
Gitlab::ImportExport::Group::RelationFactory.create(
relation_index: relation_index,
relation_sym: key.to_sym,
relation_hash: hash,
importable: context.portable,
members_mapper: nil,
object_builder: object_builder,
user: context.current_user,
excluded_keys: import_export_config.relation_excluded_keys(key)
)
end
end
def load(_, label)
return unless label
label.save! unless label.persisted?
end
end
end
......
# frozen_string_literal: true
module BulkImports
module NdjsonPipeline
extend ActiveSupport::Concern
include Pipeline
included do
ndjson_pipeline!
def deep_transform_relation!(relation_hash, relation_key, relation_definition, &block)
relation_key = relation_key_override(relation_key)
relation_definition.each do |sub_relation_key, sub_relation_definition|
sub_relation = relation_hash[sub_relation_key]
next unless sub_relation
current_item =
if sub_relation.is_a?(Array)
sub_relation
.map { |entry| deep_transform_relation!(entry, sub_relation_key, sub_relation_definition, &block) }
.tap { |entry| entry.compact! }
.presence
else
deep_transform_relation!(sub_relation, sub_relation_key, sub_relation_definition, &block)
end
if current_item
relation_hash[sub_relation_key] = current_item
else
relation_hash.delete(sub_relation_key)
end
end
yield(relation_key, relation_hash)
end
def after_run(_)
extractor.remove_tmp_dir if extractor.respond_to?(:remove_tmp_dir)
end
def relation_class(relation_key)
relation_key.to_s.classify.constantize
rescue NameError
relation_key.to_s.constantize
end
def relation_key_override(relation_key)
relation_key_overrides[relation_key.to_sym]&.to_s || relation_key
end
def relation_key_overrides
"Gitlab::ImportExport::#{portable.class}::RelationFactory::OVERRIDES".constantize
end
def object_builder
"Gitlab::ImportExport::#{portable.class}::ObjectBuilder".constantize
end
end
end
end
......@@ -8,8 +8,11 @@ module BulkImports
include Runner
NotAllowedError = Class.new(StandardError)
ExpiredError = Class.new(StandardError)
FailedError = Class.new(StandardError)
CACHE_KEY_EXPIRATION = 2.hours
NDJSON_EXPORT_TIMEOUT = 30.minutes
def initialize(context)
@context = context
......@@ -19,6 +22,14 @@ module BulkImports
@tracker ||= context.tracker
end
def portable
@portable ||= context.portable
end
def import_export_config
@import_export_config ||= context.import_export_config
end
included do
private
......@@ -111,7 +122,7 @@ module BulkImports
options = class_config[:options]
if options
class_config[:klass].new(class_config[:options])
class_config[:klass].new(**class_config[:options])
else
class_config[:klass].new
end
......@@ -155,6 +166,14 @@ module BulkImports
class_attributes[:abort_on_failure]
end
def ndjson_pipeline!
class_attributes[:ndjson_pipeline] = true
end
def ndjson_pipeline?
class_attributes[:ndjson_pipeline]
end
private
def add_attribute(sym, klass, options)
......
......@@ -16,6 +16,14 @@ module BulkImports
@entity ||= tracker.entity
end
def portable
@portable ||= entity.group || entity.project
end
def import_export_config
@import_export_config ||= ::BulkImports::FileTransfer.config_for(portable)
end
def group
@group ||= entity.group
end
......
......@@ -6,7 +6,7 @@ module BulkImports
attr_reader :data
def initialize(data: nil, page_info: {})
@data = Array.wrap(data)
@data = data.is_a?(Enumerator) ? data : Array.wrap(data)
@page_info = page_info
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Common::Extractors::NdjsonExtractor do
let_it_be(:tmpdir) { Dir.mktmpdir }
let_it_be(:filepath) { 'spec/fixtures/bulk_imports/labels.ndjson.gz' }
let_it_be(:import) { create(:bulk_import) }
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: import) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(relation: 'labels') }
before do
allow(FileUtils).to receive(:remove_entry).with(any_args).and_call_original
subject.instance_variable_set(:@tmp_dir, tmpdir)
end
after(:all) do
FileUtils.remove_entry(tmpdir) if File.directory?(tmpdir)
end
describe '#extract' do
before do
FileUtils.copy_file(filepath, File.join(tmpdir, 'labels.ndjson.gz'))
allow_next_instance_of(BulkImports::FileDownloadService) do |service|
allow(service).to receive(:execute)
end
end
it 'returns ExtractedData' do
extracted_data = subject.extract(context)
label = extracted_data.data.first.first
expect(extracted_data).to be_instance_of(BulkImports::Pipeline::ExtractedData)
expect(label['title']).to include('Label')
expect(label['description']).to include('Label')
expect(label['type']).to eq('GroupLabel')
end
end
describe '#remove_tmp_dir' do
it 'removes tmp dir' do
expect(FileUtils).to receive(:remove_entry).with(tmpdir).once
subject.remove_tmp_dir
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do
it 'has a valid query' do
tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new(
GitlabSchema,
described_class.to_s,
variables: described_class.variables(context)
)
result = GitlabSchema.static_validator.validate(query)
expect(result[:errors]).to be_empty
end
describe '#data_path' do
it 'returns data path' do
expected = %w[data group labels nodes]
expect(described_class.data_path).to eq(expected)
end
end
describe '#page_info_path' do
it 'returns pagination information path' do
expected = %w[data group labels page_info]
expect(described_class.page_info_path).to eq(expected)
end
end
end
......@@ -5,98 +5,87 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:filepath) { 'spec/fixtures/bulk_imports/labels.ndjson.gz' }
let_it_be(:entity) do
create(
:bulk_import_entity,
group: group,
bulk_import: bulk_import,
source_full_path: 'source/full/path',
destination_name: 'My Destination Group',
destination_namespace: group.full_path,
group: group
destination_namespace: group.full_path
)
end
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:tmpdir) { Dir.mktmpdir }
before do
FileUtils.copy_file(filepath, File.join(tmpdir, 'labels.ndjson.gz'))
group.add_owner(user)
end
subject { described_class.new(context) }
describe '#run' do
it 'imports a group labels' do
first_page = extracted_data(title: 'label1', has_next_page: true)
last_page = extracted_data(title: 'label2')
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor)
.to receive(:extract)
.and_return(first_page, last_page)
it 'imports group labels into destination group and removes tmpdir' do
allow(Dir).to receive(:mktmpdir).and_return(tmpdir)
allow_next_instance_of(BulkImports::FileDownloadService) do |service|
allow(service).to receive(:execute)
end
expect { subject.run }.to change(Label, :count).by(2)
expect { subject.run }.to change(::GroupLabel, :count).by(1)
label = group.labels.order(:created_at).last
label = group.labels.first
expect(label.title).to eq('label2')
expect(label.description).to eq('desc')
expect(label.color).to eq('#428BCA')
expect(label.created_at).to eq(timestamp)
expect(label.updated_at).to eq(timestamp)
expect(label.title).to eq('Label 1')
expect(label.description).to eq('Label 1')
expect(label.color).to eq('#6699cc')
expect(File.directory?(tmpdir)).to eq(false)
end
end
describe '#load' do
it 'creates the label' do
data = label_data('label')
context 'when label is not persisted' do
it 'saves the label' do
label = build(:group_label, group: group)
expect { subject.load(context, data) }.to change(Label, :count).by(1)
expect(label).to receive(:save!)
label = group.labels.first
subject.load(context, label)
end
end
context 'when label is persisted' do
it 'does not save label' do
label = create(:group_label, group: group)
data.each do |key, value|
expect(label[key]).to eq(value)
expect(label).not_to receive(:save!)
subject.load(context, label)
end
end
context 'when label is missing' do
it 'returns' do
expect(subject.load(context, nil)).to be_nil
end
end
end
describe 'pipeline parts' do
it { expect(described_class).to include_module(BulkImports::Pipeline) }
it { expect(described_class).to include_module(BulkImports::NdjsonPipeline) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do
it 'has extractor' do
expect(described_class.get_extractor)
.to eq(
klass: BulkImports::Common::Extractors::GraphqlExtractor,
options: {
query: BulkImports::Groups::Graphql::GetLabelsQuery
}
)
end
it 'has transformers' do
expect(described_class.transformers)
.to contain_exactly(
{ klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil }
klass: BulkImports::Common::Extractors::NdjsonExtractor,
options: { relation: described_class::RELATION }
)
end
end
def label_data(title)
{
'title' => title,
'description' => 'desc',
'color' => '#428BCA',
'created_at' => timestamp.to_s,
'updated_at' => timestamp.to_s
}
end
def extracted_data(title:, has_next_page: false)
page_info = {
'has_next_page' => has_next_page,
'next_page' => has_next_page ? 'cursor' : nil
}
BulkImports::Pipeline::ExtractedData.new(data: [label_data(title)], page_info: page_info)
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::NdjsonPipeline do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project) }
let_it_be(:klass) do
Class.new do
include BulkImports::NdjsonPipeline
attr_reader :portable
def initialize(portable)
@portable = portable
end
end
end
subject { klass.new(group) }
it 'marks pipeline as ndjson' do
expect(klass.ndjson_pipeline?).to eq(true)
end
describe '#deep_transform_relation!' do
it 'transforms relation hash' do
transformed = subject.deep_transform_relation!({}, 'test', {}) do |key, hash|
hash.merge(relation_key: key)
end
expect(transformed[:relation_key]).to eq('test')
end
context 'when subrelations is an array' do
it 'transforms each element of the array' do
relation_hash = {
'key' => 'value',
'labels' => [
{ 'title' => 'label 1' },
{ 'title' => 'label 2' },
{ 'title' => 'label 3' }
]
}
relation_definition = { 'labels' => {} }
transformed = subject.deep_transform_relation!(relation_hash, 'test', relation_definition) do |key, hash|
hash.merge(relation_key: key)
end
transformed['labels'].each do |label|
expect(label[:relation_key]).to eq('group_labels')
end
end
end
context 'when subrelation is a hash' do
it 'transforms subrelation hash' do
relation_hash = {
'key' => 'value',
'label' => { 'title' => 'label' }
}
relation_definition = { 'label' => {} }
transformed = subject.deep_transform_relation!(relation_hash, 'test', relation_definition) do |key, hash|
hash.merge(relation_key: key)
end
expect(transformed['label'][:relation_key]).to eq('group_label')
end
end
context 'when subrelation is nil' do
it 'removes subrelation' do
relation_hash = {
'key' => 'value',
'label' => { 'title' => 'label' }
}
relation_definition = { 'label' => {} }
transformed = subject.deep_transform_relation!(relation_hash, 'test', relation_definition) do |key, hash|
if key == 'group_label'
nil
else
hash
end
end
expect(transformed['label']).to be_nil
end
end
end
describe '#relation_class' do
context 'when relation name is pluralized' do
it 'returns constantized class' do
expect(subject.relation_class('MergeRequest::Metrics')).to eq(MergeRequest::Metrics)
end
end
context 'when relation name is singularized' do
it 'returns constantized class' do
expect(subject.relation_class('Badge')).to eq(Badge)
end
end
end
describe '#relation_key_override' do
context 'when portable is group' do
it 'returns group relation name override' do
expect(subject.relation_key_override('labels')).to eq('group_labels')
end
end
context 'when portable is project' do
subject { klass.new(project) }
it 'returns group relation name override' do
expect(subject.relation_key_override('labels')).to eq('project_labels')
end
end
end
end
......@@ -6,6 +6,9 @@ RSpec.describe BulkImports::Pipeline::Context do
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:project) { create(:project) }
let_it_be(:project_entity) { create(:bulk_import_entity, :project_entity, project: project) }
let_it_be(:project_tracker) { create(:bulk_import_tracker, entity: project_entity) }
let_it_be(:entity) do
create(
......@@ -51,4 +54,24 @@ RSpec.describe BulkImports::Pipeline::Context do
describe '#extra' do
it { expect(subject.extra).to eq(extra: :data) }
end
describe '#portable' do
it { expect(subject.portable).to eq(group) }
context 'when portable is project' do
subject { described_class.new(project_tracker) }
it { expect(subject.portable).to eq(project) }
end
end
describe '#import_export_config' do
it { expect(subject.import_export_config).to be_instance_of(BulkImports::FileTransfer::GroupConfig) }
context 'when portable is project' do
subject { described_class.new(project_tracker) }
it { expect(subject.import_export_config).to be_instance_of(BulkImports::FileTransfer::ProjectConfig) }
end
end
end
......@@ -63,6 +63,7 @@ RSpec.describe BulkImports::Pipeline do
BulkImports::MyPipeline.transformer(klass, options)
BulkImports::MyPipeline.loader(klass, options)
BulkImports::MyPipeline.abort_on_failure!
BulkImports::MyPipeline.ndjson_pipeline!
expect(BulkImports::MyPipeline.get_extractor).to eq({ klass: klass, options: options })
......@@ -74,6 +75,7 @@ RSpec.describe BulkImports::Pipeline do
expect(BulkImports::MyPipeline.get_loader).to eq({ klass: klass, options: options })
expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true)
expect(BulkImports::MyPipeline.ndjson_pipeline?).to eq(true)
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::ExportStatus do
let_it_be(:relation) { 'labels' }
let_it_be(:import) { create(:bulk_import) }
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: import, source_full_path: 'foo') }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let(:response_double) do
double(parsed_response: [{ 'relation' => 'labels', 'status' => status, 'error' => 'error!' }])
end
subject { described_class.new(tracker, relation) }
before do
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:get).and_return(response_double)
end
end
describe '#started?' do
context 'when export status is started' do
let(:status) { BulkImports::Export::STARTED }
it 'returns true' do
expect(subject.started?).to eq(true)
end
end
context 'when export status is not started' do
let(:status) { BulkImports::Export::FAILED }
it 'returns false' do
expect(subject.started?).to eq(false)
end
end
end
describe '#failed' do
context 'when export status is failed' do
let(:status) { BulkImports::Export::FAILED }
it 'returns true' do
expect(subject.failed?).to eq(true)
end
end
context 'when export status is not failed' do
let(:status) { BulkImports::Export::STARTED }
it 'returns false' do
expect(subject.failed?).to eq(false)
end
end
end
describe '#error' do
let(:status) { BulkImports::Export::FAILED }
it 'returns error message' do
expect(subject.error).to eq('error!')
end
context 'when something goes wrong during export status fetch' do
it 'returns exception class as error' do
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:get).and_raise(StandardError, 'Error!')
end
expect(subject.error).to eq('Error!')
end
end
end
end
......@@ -12,8 +12,8 @@ RSpec.describe BulkImports::FileTransfer::GroupConfig do
subject { described_class.new(exportable) }
describe '#exportable_tree' do
it 'returns exportable tree' do
describe '#portable_tree' do
it 'returns portable tree' do
expect_next_instance_of(::Gitlab::ImportExport::AttributesFinder) do |finder|
expect(finder).to receive(:find_root).with(:group).and_call_original
end
......@@ -30,9 +30,21 @@ RSpec.describe BulkImports::FileTransfer::GroupConfig do
end
end
describe '#exportable_relations' do
describe '#portable_relations' do
it 'returns a list of top level exportable relations' do
expect(subject.portable_relations).to include('milestones', 'badges', 'boards', 'labels')
end
end
describe '#top_relation_tree' do
it 'returns relation tree of a top level relation' do
expect(subject.top_relation_tree('labels')).to eq('priorities' => {})
end
end
describe '#relation_excluded_keys' do
it 'returns excluded keys for relation' do
expect(subject.relation_excluded_keys('group')).to include('owner_id')
end
end
end
......@@ -12,8 +12,8 @@ RSpec.describe BulkImports::FileTransfer::ProjectConfig do
subject { described_class.new(exportable) }
describe '#exportable_tree' do
it 'returns exportable tree' do
describe 'portable_tree' do
it 'returns portable tree' do
expect_next_instance_of(::Gitlab::ImportExport::AttributesFinder) do |finder|
expect(finder).to receive(:find_root).with(:project).and_call_original
end
......@@ -30,9 +30,21 @@ RSpec.describe BulkImports::FileTransfer::ProjectConfig do
end
end
describe '#exportable_relations' do
describe '#portable_relations' do
it 'returns a list of top level exportable relations' do
expect(subject.portable_relations).to include('issues', 'labels', 'milestones', 'merge_requests')
end
end
describe '#top_relation_tree' do
it 'returns relation tree of a top level relation' do
expect(subject.top_relation_tree('labels')).to eq('priorities' => {})
end
end
describe '#relation_excluded_keys' do
it 'returns excluded keys for relation' do
expect(subject.relation_excluded_keys('project')).to include('creator_id')
end
end
end
......@@ -32,11 +32,21 @@ RSpec.describe BulkImports::FileDownloadService do
end
end
it 'downloads file' do
subject.execute
shared_examples 'downloads file' do
it 'downloads file' do
subject.execute
expect(File.exist?(filepath)).to eq(true)
expect(File.read(filepath)).to include('chunk')
expect(File.exist?(filepath)).to eq(true)
expect(File.read(filepath)).to include('chunk')
end
end
include_examples 'downloads file'
context 'when content-type is application/gzip' do
let_it_be(:content_type) { 'application/gzip' }
include_examples 'downloads file'
end
context 'when url is not valid' do
......
......@@ -8,10 +8,16 @@ RSpec.describe BulkImports::PipelineWorker do
def initialize(_); end
def run; end
def self.ndjson_pipeline?
false
end
end
end
let_it_be(:entity) { create(:bulk_import_entity) }
let_it_be(:bulk_import) { create(:bulk_import) }
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
before do
stub_const('FakePipeline', pipeline_class)
......@@ -27,6 +33,7 @@ RSpec.describe BulkImports::PipelineWorker do
expect(BulkImports::Stage)
.to receive(:pipeline_exists?)
.with('FakePipeline')
.twice
.and_return(true)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
......@@ -122,4 +129,114 @@ RSpec.describe BulkImports::PipelineWorker do
expect(pipeline_tracker.jid).to eq('jid')
end
end
context 'when ndjson pipeline' do
let(:ndjson_pipeline) do
Class.new do
def initialize(_); end
def run; end
def self.ndjson_pipeline?
true
end
end
end
let(:pipeline_tracker) do
create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'NdjsonPipeline'
)
end
before do
stub_const('NdjsonPipeline', ndjson_pipeline)
stub_const('NdjsonPipeline::RELATION', 'test')
allow(BulkImports::Stage)
.to receive(:pipeline_exists?)
.with('NdjsonPipeline')
.and_return(true)
end
it 'runs the pipeline successfully' do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
allow(status).to receive(:started?).and_return(false)
allow(status).to receive(:failed?).and_return(false)
end
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:finished)
end
context 'when export status is started' do
it 'reenqueues pipeline worker' do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
allow(status).to receive(:started?).and_return(true)
allow(status).to receive(:failed?).and_return(false)
end
expect(described_class)
.to receive(:perform_in)
.with(
described_class::NDJSON_PIPELINE_PERFORM_DELAY,
pipeline_tracker.id,
pipeline_tracker.stage,
entity.id
)
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
end
context 'when job reaches timeout' do
it 'marks as failed and logs the error' do
old_created_at = entity.created_at
entity.update!(created_at: (BulkImports::Pipeline::NDJSON_EXPORT_TIMEOUT + 1.hour).ago)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_name: 'NdjsonPipeline',
entity_id: entity.id,
message: 'Pipeline timeout'
)
end
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:failed)
entity.update!(created_at: old_created_at)
end
end
context 'when export status is failed' do
it 'marks as failed and logs the error' do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
allow(status).to receive(:failed?).and_return(true)
allow(status).to receive(:error).and_return('Error!')
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_name: 'NdjsonPipeline',
entity_id: entity.id,
message: 'Error!'
)
end
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:failed)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment