Commit 7f6d004b authored by Rémy Coutable's avatar Rémy Coutable

Merge branch 'georgekoltsov/bulk_import_ndjson_labels' into 'master'

Add BulkImports NdjsonExtractor & update labels pipeline to use it

See merge request gitlab-org/gitlab!62409
parents 0f66959a 53ca7bd1
...@@ -4,6 +4,10 @@ module BulkImports ...@@ -4,6 +4,10 @@ module BulkImports
class Export < ApplicationRecord class Export < ApplicationRecord
include Gitlab::Utils::StrongMemoize include Gitlab::Utils::StrongMemoize
STARTED = 0
FINISHED = 1
FAILED = -1
self.table_name = 'bulk_import_exports' self.table_name = 'bulk_import_exports'
belongs_to :project, optional: true belongs_to :project, optional: true
...@@ -18,9 +22,9 @@ module BulkImports ...@@ -18,9 +22,9 @@ module BulkImports
validate :portable_relation? validate :portable_relation?
state_machine :status, initial: :started do state_machine :status, initial: :started do
state :started, value: 0 state :started, value: STARTED
state :finished, value: 1 state :finished, value: FINISHED
state :failed, value: -1 state :failed, value: FAILED
event :start do event :start do
transition any => :started transition any => :started
......
# frozen_string_literal: true
module BulkImports
class ExportStatus
include Gitlab::Utils::StrongMemoize
def initialize(pipeline_tracker, relation)
@pipeline_tracker = pipeline_tracker
@relation = relation
@entity = @pipeline_tracker.entity
@configuration = @entity.bulk_import.configuration
@client = Clients::Http.new(uri: @configuration.url, token: @configuration.access_token)
end
def started?
export_status['status'] == Export::STARTED
end
def failed?
export_status['status'] == Export::FAILED
end
def error
export_status['error']
end
private
attr_reader :client, :entity, :relation
def export_status
strong_memoize(:export_status) do
fetch_export_status.find { |item| item['relation'] == relation }
end
rescue StandardError => e
{ 'status' => Export::FAILED, 'error' => e.message }
end
def fetch_export_status
client.get(status_endpoint).parsed_response
end
def status_endpoint
"/groups/#{entity.encoded_source_full_path}/export_relations/status"
end
end
end
...@@ -13,6 +13,14 @@ module BulkImports ...@@ -13,6 +13,14 @@ module BulkImports
attributes_finder.find_root(portable_class_sym) attributes_finder.find_root(portable_class_sym)
end end
def top_relation_tree(relation)
portable_relations_tree[relation.to_s]
end
def relation_excluded_keys(relation)
attributes_finder.find_excluded_keys(relation)
end
def export_path def export_path
strong_memoize(:export_path) do strong_memoize(:export_path) do
relative_path = File.join(base_export_path, SecureRandom.hex) relative_path = File.join(base_export_path, SecureRandom.hex)
...@@ -47,6 +55,10 @@ module BulkImports ...@@ -47,6 +55,10 @@ module BulkImports
@portable_class_sym ||= portable_class.to_s.demodulize.underscore.to_sym @portable_class_sym ||= portable_class.to_s.demodulize.underscore.to_sym
end end
def portable_relations_tree
@portable_relations_tree ||= attributes_finder.find_relations_tree(portable_class_sym).deep_stringify_keys
end
def import_export_yaml def import_export_yaml
raise NotImplementedError raise NotImplementedError
end end
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
module BulkImports module BulkImports
class FileDownloadService class FileDownloadService
FILE_SIZE_LIMIT = 5.gigabytes FILE_SIZE_LIMIT = 5.gigabytes
ALLOWED_CONTENT_TYPES = ['application/octet-stream'].freeze ALLOWED_CONTENT_TYPES = %w(application/gzip application/octet-stream).freeze
ServiceError = Class.new(StandardError) ServiceError = Class.new(StandardError)
......
...@@ -4,6 +4,8 @@ module BulkImports ...@@ -4,6 +4,8 @@ module BulkImports
class PipelineWorker # rubocop:disable Scalability/IdempotentWorker class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
NDJSON_PIPELINE_PERFORM_DELAY = 1.minute
feature_category :importers feature_category :importers
tags :exclude_from_kubernetes tags :exclude_from_kubernetes
...@@ -40,6 +42,15 @@ module BulkImports ...@@ -40,6 +42,15 @@ module BulkImports
private private
def run(pipeline_tracker) def run(pipeline_tracker)
if ndjson_pipeline?(pipeline_tracker)
status = ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class::RELATION)
raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?(pipeline_tracker)
raise(Pipeline::FailedError, status.error) if status.failed?
return reenqueue(pipeline_tracker) if status.started?
end
pipeline_tracker.update!(status_event: 'start', jid: jid) pipeline_tracker.update!(status_event: 'start', jid: jid)
context = ::BulkImports::Pipeline::Context.new(pipeline_tracker) context = ::BulkImports::Pipeline::Context.new(pipeline_tracker)
...@@ -48,7 +59,7 @@ module BulkImports ...@@ -48,7 +59,7 @@ module BulkImports
pipeline_tracker.finish! pipeline_tracker.finish!
rescue StandardError => e rescue StandardError => e
pipeline_tracker.fail_op! pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error( logger.error(
worker: self.class.name, worker: self.class.name,
...@@ -67,5 +78,17 @@ module BulkImports ...@@ -67,5 +78,17 @@ module BulkImports
def logger def logger
@logger ||= Gitlab::Import::Logger.build @logger ||= Gitlab::Import::Logger.build
end end
def ndjson_pipeline?(pipeline_tracker)
pipeline_tracker.pipeline_class.ndjson_pipeline?
end
def job_timeout?(pipeline_tracker)
(Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
end
def reenqueue(pipeline_tracker)
self.class.perform_in(NDJSON_PIPELINE_PERFORM_DELAY, pipeline_tracker.id, pipeline_tracker.stage, pipeline_tracker.entity.id)
end
end end
end end
# frozen_string_literal: true
module BulkImports
module Common
module Extractors
class NdjsonExtractor
include Gitlab::ImportExport::CommandLineUtil
include Gitlab::Utils::StrongMemoize
EXPORT_DOWNLOAD_URL_PATH = "/%{resource}/%{full_path}/export_relations/download?relation=%{relation}"
def initialize(relation:)
@relation = relation
@tmp_dir = Dir.mktmpdir
end
def extract(context)
download_service(tmp_dir, context).execute
decompression_service(tmp_dir).execute
relations = ndjson_reader(tmp_dir).consume_relation('', relation)
BulkImports::Pipeline::ExtractedData.new(data: relations)
end
def remove_tmp_dir
FileUtils.remove_entry(tmp_dir)
end
private
attr_reader :relation, :tmp_dir
def filename
@filename ||= "#{relation}.ndjson.gz"
end
def download_service(tmp_dir, context)
@download_service ||= BulkImports::FileDownloadService.new(
configuration: context.configuration,
relative_url: relative_resource_url(context),
dir: tmp_dir,
filename: filename
)
end
def decompression_service(tmp_dir)
@decompression_service ||= BulkImports::FileDecompressionService.new(
dir: tmp_dir,
filename: filename
)
end
def ndjson_reader(tmp_dir)
@ndjson_reader ||= Gitlab::ImportExport::JSON::NdjsonReader.new(tmp_dir)
end
def relative_resource_url(context)
strong_memoize(:relative_resource_url) do
resource = context.portable.class.name.downcase.pluralize
encoded_full_path = context.entity.encoded_source_full_path
EXPORT_DOWNLOAD_URL_PATH % { resource: resource, full_path: encoded_full_path, relation: relation }
end
end
end
end
end
end
# frozen_string_literal: true
module BulkImports
module Groups
module Graphql
module GetLabelsQuery
extend self
def to_s
<<-'GRAPHQL'
query ($full_path: ID!, $cursor: String, $per_page: Int) {
group(fullPath: $full_path) {
labels(first: $per_page, after: $cursor, onlyGroupLabels: true) {
page_info: pageInfo {
next_page: endCursor
has_next_page: hasNextPage
}
nodes {
title
description
color
created_at: createdAt
updated_at: updatedAt
}
}
}
}
GRAPHQL
end
def variables(context)
{
full_path: context.entity.source_full_path,
cursor: context.tracker.next_page,
per_page: ::BulkImports::Tracker::DEFAULT_PAGE_SIZE
}
end
def base_path
%w[data group labels]
end
def data_path
base_path << 'nodes'
end
def page_info_path
base_path << 'page_info'
end
end
end
end
end
...@@ -4,6 +4,10 @@ module BulkImports ...@@ -4,6 +4,10 @@ module BulkImports
module Groups module Groups
module Pipelines module Pipelines
class EntityFinisher class EntityFinisher
def self.ndjson_pipeline?
false
end
def initialize(context) def initialize(context)
@context = context @context = context
end end
......
...@@ -4,15 +4,35 @@ module BulkImports ...@@ -4,15 +4,35 @@ module BulkImports
module Groups module Groups
module Pipelines module Pipelines
class LabelsPipeline class LabelsPipeline
include Pipeline include NdjsonPipeline
extractor BulkImports::Common::Extractors::GraphqlExtractor, RELATION = 'labels'
query: BulkImports::Groups::Graphql::GetLabelsQuery
transformer Common::Transformers::ProhibitedAttributesTransformer extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: RELATION
def load(context, data) def transform(context, data)
Labels::CreateService.new(data).execute(group: context.group) relation_hash = data.first
relation_index = data.last
relation_definition = import_export_config.top_relation_tree(RELATION)
deep_transform_relation!(relation_hash, RELATION, relation_definition) do |key, hash|
Gitlab::ImportExport::Group::RelationFactory.create(
relation_index: relation_index,
relation_sym: key.to_sym,
relation_hash: hash,
importable: context.portable,
members_mapper: nil,
object_builder: object_builder,
user: context.current_user,
excluded_keys: import_export_config.relation_excluded_keys(key)
)
end
end
def load(_, label)
return unless label
label.save! unless label.persisted?
end end
end end
end end
......
# frozen_string_literal: true
module BulkImports
module NdjsonPipeline
extend ActiveSupport::Concern
include Pipeline
included do
ndjson_pipeline!
def deep_transform_relation!(relation_hash, relation_key, relation_definition, &block)
relation_key = relation_key_override(relation_key)
relation_definition.each do |sub_relation_key, sub_relation_definition|
sub_relation = relation_hash[sub_relation_key]
next unless sub_relation
current_item =
if sub_relation.is_a?(Array)
sub_relation
.map { |entry| deep_transform_relation!(entry, sub_relation_key, sub_relation_definition, &block) }
.tap { |entry| entry.compact! }
.presence
else
deep_transform_relation!(sub_relation, sub_relation_key, sub_relation_definition, &block)
end
if current_item
relation_hash[sub_relation_key] = current_item
else
relation_hash.delete(sub_relation_key)
end
end
yield(relation_key, relation_hash)
end
def after_run(_)
extractor.remove_tmp_dir if extractor.respond_to?(:remove_tmp_dir)
end
def relation_class(relation_key)
relation_key.to_s.classify.constantize
rescue NameError
relation_key.to_s.constantize
end
def relation_key_override(relation_key)
relation_key_overrides[relation_key.to_sym]&.to_s || relation_key
end
def relation_key_overrides
"Gitlab::ImportExport::#{portable.class}::RelationFactory::OVERRIDES".constantize
end
def object_builder
"Gitlab::ImportExport::#{portable.class}::ObjectBuilder".constantize
end
end
end
end
...@@ -8,8 +8,11 @@ module BulkImports ...@@ -8,8 +8,11 @@ module BulkImports
include Runner include Runner
NotAllowedError = Class.new(StandardError) NotAllowedError = Class.new(StandardError)
ExpiredError = Class.new(StandardError)
FailedError = Class.new(StandardError)
CACHE_KEY_EXPIRATION = 2.hours CACHE_KEY_EXPIRATION = 2.hours
NDJSON_EXPORT_TIMEOUT = 30.minutes
def initialize(context) def initialize(context)
@context = context @context = context
...@@ -19,6 +22,14 @@ module BulkImports ...@@ -19,6 +22,14 @@ module BulkImports
@tracker ||= context.tracker @tracker ||= context.tracker
end end
def portable
@portable ||= context.portable
end
def import_export_config
@import_export_config ||= context.import_export_config
end
included do included do
private private
...@@ -111,7 +122,7 @@ module BulkImports ...@@ -111,7 +122,7 @@ module BulkImports
options = class_config[:options] options = class_config[:options]
if options if options
class_config[:klass].new(class_config[:options]) class_config[:klass].new(**class_config[:options])
else else
class_config[:klass].new class_config[:klass].new
end end
...@@ -155,6 +166,14 @@ module BulkImports ...@@ -155,6 +166,14 @@ module BulkImports
class_attributes[:abort_on_failure] class_attributes[:abort_on_failure]
end end
def ndjson_pipeline!
class_attributes[:ndjson_pipeline] = true
end
def ndjson_pipeline?
class_attributes[:ndjson_pipeline]
end
private private
def add_attribute(sym, klass, options) def add_attribute(sym, klass, options)
......
...@@ -16,6 +16,14 @@ module BulkImports ...@@ -16,6 +16,14 @@ module BulkImports
@entity ||= tracker.entity @entity ||= tracker.entity
end end
def portable
@portable ||= entity.group || entity.project
end
def import_export_config
@import_export_config ||= ::BulkImports::FileTransfer.config_for(portable)
end
def group def group
@group ||= entity.group @group ||= entity.group
end end
......
...@@ -6,7 +6,7 @@ module BulkImports ...@@ -6,7 +6,7 @@ module BulkImports
attr_reader :data attr_reader :data
def initialize(data: nil, page_info: {}) def initialize(data: nil, page_info: {})
@data = Array.wrap(data) @data = data.is_a?(Enumerator) ? data : Array.wrap(data)
@page_info = page_info @page_info = page_info
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Common::Extractors::NdjsonExtractor do
let_it_be(:tmpdir) { Dir.mktmpdir }
let_it_be(:filepath) { 'spec/fixtures/bulk_imports/labels.ndjson.gz' }
let_it_be(:import) { create(:bulk_import) }
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: import) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(relation: 'labels') }
before do
allow(FileUtils).to receive(:remove_entry).with(any_args).and_call_original
subject.instance_variable_set(:@tmp_dir, tmpdir)
end
after(:all) do
FileUtils.remove_entry(tmpdir) if File.directory?(tmpdir)
end
describe '#extract' do
before do
FileUtils.copy_file(filepath, File.join(tmpdir, 'labels.ndjson.gz'))
allow_next_instance_of(BulkImports::FileDownloadService) do |service|
allow(service).to receive(:execute)
end
end
it 'returns ExtractedData' do
extracted_data = subject.extract(context)
label = extracted_data.data.first.first
expect(extracted_data).to be_instance_of(BulkImports::Pipeline::ExtractedData)
expect(label['title']).to include('Label')
expect(label['description']).to include('Label')
expect(label['type']).to eq('GroupLabel')
end
end
describe '#remove_tmp_dir' do
it 'removes tmp dir' do
expect(FileUtils).to receive(:remove_entry).with(tmpdir).once
subject.remove_tmp_dir
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Graphql::GetLabelsQuery do
it 'has a valid query' do
tracker = create(:bulk_import_tracker)
context = BulkImports::Pipeline::Context.new(tracker)
query = GraphQL::Query.new(
GitlabSchema,
described_class.to_s,
variables: described_class.variables(context)
)
result = GitlabSchema.static_validator.validate(query)
expect(result[:errors]).to be_empty
end
describe '#data_path' do
it 'returns data path' do
expected = %w[data group labels nodes]
expect(described_class.data_path).to eq(expected)
end
end
describe '#page_info_path' do
it 'returns pagination information path' do
expected = %w[data group labels page_info]
expect(described_class.page_info_path).to eq(expected)
end
end
end
...@@ -5,98 +5,87 @@ require 'spec_helper' ...@@ -5,98 +5,87 @@ require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do RSpec.describe BulkImports::Groups::Pipelines::LabelsPipeline do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:timestamp) { Time.new(2020, 01, 01).utc } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:filepath) { 'spec/fixtures/bulk_imports/labels.ndjson.gz' }
let_it_be(:entity) do let_it_be(:entity) do
create( create(
:bulk_import_entity, :bulk_import_entity,
group: group,
bulk_import: bulk_import,
source_full_path: 'source/full/path', source_full_path: 'source/full/path',
destination_name: 'My Destination Group', destination_name: 'My Destination Group',
destination_namespace: group.full_path, destination_namespace: group.full_path
group: group
) )
end end
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) } let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) } let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
let(:tmpdir) { Dir.mktmpdir }
before do
FileUtils.copy_file(filepath, File.join(tmpdir, 'labels.ndjson.gz'))
group.add_owner(user)
end
subject { described_class.new(context) } subject { described_class.new(context) }
describe '#run' do describe '#run' do
it 'imports a group labels' do it 'imports group labels into destination group and removes tmpdir' do
first_page = extracted_data(title: 'label1', has_next_page: true) allow(Dir).to receive(:mktmpdir).and_return(tmpdir)
last_page = extracted_data(title: 'label2') allow_next_instance_of(BulkImports::FileDownloadService) do |service|
allow(service).to receive(:execute)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor)
.to receive(:extract)
.and_return(first_page, last_page)
end end
expect { subject.run }.to change(Label, :count).by(2) expect { subject.run }.to change(::GroupLabel, :count).by(1)
label = group.labels.order(:created_at).last label = group.labels.first
expect(label.title).to eq('label2') expect(label.title).to eq('Label 1')
expect(label.description).to eq('desc') expect(label.description).to eq('Label 1')
expect(label.color).to eq('#428BCA') expect(label.color).to eq('#6699cc')
expect(label.created_at).to eq(timestamp) expect(File.directory?(tmpdir)).to eq(false)
expect(label.updated_at).to eq(timestamp)
end end
end end
describe '#load' do describe '#load' do
it 'creates the label' do context 'when label is not persisted' do
data = label_data('label') it 'saves the label' do
label = build(:group_label, group: group)
expect { subject.load(context, data) }.to change(Label, :count).by(1) expect(label).to receive(:save!)
label = group.labels.first subject.load(context, label)
end
end
context 'when label is persisted' do
it 'does not save label' do
label = create(:group_label, group: group)
data.each do |key, value| expect(label).not_to receive(:save!)
expect(label[key]).to eq(value)
subject.load(context, label)
end
end
context 'when label is missing' do
it 'returns' do
expect(subject.load(context, nil)).to be_nil
end end
end end
end end
describe 'pipeline parts' do describe 'pipeline parts' do
it { expect(described_class).to include_module(BulkImports::Pipeline) } it { expect(described_class).to include_module(BulkImports::NdjsonPipeline) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do it 'has extractor' do
expect(described_class.get_extractor) expect(described_class.get_extractor)
.to eq( .to eq(
klass: BulkImports::Common::Extractors::GraphqlExtractor, klass: BulkImports::Common::Extractors::NdjsonExtractor,
options: { options: { relation: described_class::RELATION }
query: BulkImports::Groups::Graphql::GetLabelsQuery
}
)
end
it 'has transformers' do
expect(described_class.transformers)
.to contain_exactly(
{ klass: BulkImports::Common::Transformers::ProhibitedAttributesTransformer, options: nil }
) )
end end
end end
def label_data(title)
{
'title' => title,
'description' => 'desc',
'color' => '#428BCA',
'created_at' => timestamp.to_s,
'updated_at' => timestamp.to_s
}
end
def extracted_data(title:, has_next_page: false)
page_info = {
'has_next_page' => has_next_page,
'next_page' => has_next_page ? 'cursor' : nil
}
BulkImports::Pipeline::ExtractedData.new(data: [label_data(title)], page_info: page_info)
end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::NdjsonPipeline do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project) }
let_it_be(:klass) do
Class.new do
include BulkImports::NdjsonPipeline
attr_reader :portable
def initialize(portable)
@portable = portable
end
end
end
subject { klass.new(group) }
it 'marks pipeline as ndjson' do
expect(klass.ndjson_pipeline?).to eq(true)
end
describe '#deep_transform_relation!' do
it 'transforms relation hash' do
transformed = subject.deep_transform_relation!({}, 'test', {}) do |key, hash|
hash.merge(relation_key: key)
end
expect(transformed[:relation_key]).to eq('test')
end
context 'when subrelations is an array' do
it 'transforms each element of the array' do
relation_hash = {
'key' => 'value',
'labels' => [
{ 'title' => 'label 1' },
{ 'title' => 'label 2' },
{ 'title' => 'label 3' }
]
}
relation_definition = { 'labels' => {} }
transformed = subject.deep_transform_relation!(relation_hash, 'test', relation_definition) do |key, hash|
hash.merge(relation_key: key)
end
transformed['labels'].each do |label|
expect(label[:relation_key]).to eq('group_labels')
end
end
end
context 'when subrelation is a hash' do
it 'transforms subrelation hash' do
relation_hash = {
'key' => 'value',
'label' => { 'title' => 'label' }
}
relation_definition = { 'label' => {} }
transformed = subject.deep_transform_relation!(relation_hash, 'test', relation_definition) do |key, hash|
hash.merge(relation_key: key)
end
expect(transformed['label'][:relation_key]).to eq('group_label')
end
end
context 'when subrelation is nil' do
it 'removes subrelation' do
relation_hash = {
'key' => 'value',
'label' => { 'title' => 'label' }
}
relation_definition = { 'label' => {} }
transformed = subject.deep_transform_relation!(relation_hash, 'test', relation_definition) do |key, hash|
if key == 'group_label'
nil
else
hash
end
end
expect(transformed['label']).to be_nil
end
end
end
describe '#relation_class' do
context 'when relation name is pluralized' do
it 'returns constantized class' do
expect(subject.relation_class('MergeRequest::Metrics')).to eq(MergeRequest::Metrics)
end
end
context 'when relation name is singularized' do
it 'returns constantized class' do
expect(subject.relation_class('Badge')).to eq(Badge)
end
end
end
describe '#relation_key_override' do
context 'when portable is group' do
it 'returns group relation name override' do
expect(subject.relation_key_override('labels')).to eq('group_labels')
end
end
context 'when portable is project' do
subject { klass.new(project) }
it 'returns group relation name override' do
expect(subject.relation_key_override('labels')).to eq('project_labels')
end
end
end
end
...@@ -6,6 +6,9 @@ RSpec.describe BulkImports::Pipeline::Context do ...@@ -6,6 +6,9 @@ RSpec.describe BulkImports::Pipeline::Context do
let_it_be(:user) { create(:user) } let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) } let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:project) { create(:project) }
let_it_be(:project_entity) { create(:bulk_import_entity, :project_entity, project: project) }
let_it_be(:project_tracker) { create(:bulk_import_tracker, entity: project_entity) }
let_it_be(:entity) do let_it_be(:entity) do
create( create(
...@@ -51,4 +54,24 @@ RSpec.describe BulkImports::Pipeline::Context do ...@@ -51,4 +54,24 @@ RSpec.describe BulkImports::Pipeline::Context do
describe '#extra' do describe '#extra' do
it { expect(subject.extra).to eq(extra: :data) } it { expect(subject.extra).to eq(extra: :data) }
end end
describe '#portable' do
it { expect(subject.portable).to eq(group) }
context 'when portable is project' do
subject { described_class.new(project_tracker) }
it { expect(subject.portable).to eq(project) }
end
end
describe '#import_export_config' do
it { expect(subject.import_export_config).to be_instance_of(BulkImports::FileTransfer::GroupConfig) }
context 'when portable is project' do
subject { described_class.new(project_tracker) }
it { expect(subject.import_export_config).to be_instance_of(BulkImports::FileTransfer::ProjectConfig) }
end
end
end end
...@@ -63,6 +63,7 @@ RSpec.describe BulkImports::Pipeline do ...@@ -63,6 +63,7 @@ RSpec.describe BulkImports::Pipeline do
BulkImports::MyPipeline.transformer(klass, options) BulkImports::MyPipeline.transformer(klass, options)
BulkImports::MyPipeline.loader(klass, options) BulkImports::MyPipeline.loader(klass, options)
BulkImports::MyPipeline.abort_on_failure! BulkImports::MyPipeline.abort_on_failure!
BulkImports::MyPipeline.ndjson_pipeline!
expect(BulkImports::MyPipeline.get_extractor).to eq({ klass: klass, options: options }) expect(BulkImports::MyPipeline.get_extractor).to eq({ klass: klass, options: options })
...@@ -74,6 +75,7 @@ RSpec.describe BulkImports::Pipeline do ...@@ -74,6 +75,7 @@ RSpec.describe BulkImports::Pipeline do
expect(BulkImports::MyPipeline.get_loader).to eq({ klass: klass, options: options }) expect(BulkImports::MyPipeline.get_loader).to eq({ klass: klass, options: options })
expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true) expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true)
expect(BulkImports::MyPipeline.ndjson_pipeline?).to eq(true)
end end
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::ExportStatus do
let_it_be(:relation) { 'labels' }
let_it_be(:import) { create(:bulk_import) }
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: import, source_full_path: 'foo') }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let(:response_double) do
double(parsed_response: [{ 'relation' => 'labels', 'status' => status, 'error' => 'error!' }])
end
subject { described_class.new(tracker, relation) }
before do
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:get).and_return(response_double)
end
end
describe '#started?' do
context 'when export status is started' do
let(:status) { BulkImports::Export::STARTED }
it 'returns true' do
expect(subject.started?).to eq(true)
end
end
context 'when export status is not started' do
let(:status) { BulkImports::Export::FAILED }
it 'returns false' do
expect(subject.started?).to eq(false)
end
end
end
describe '#failed' do
context 'when export status is failed' do
let(:status) { BulkImports::Export::FAILED }
it 'returns true' do
expect(subject.failed?).to eq(true)
end
end
context 'when export status is not failed' do
let(:status) { BulkImports::Export::STARTED }
it 'returns false' do
expect(subject.failed?).to eq(false)
end
end
end
describe '#error' do
let(:status) { BulkImports::Export::FAILED }
it 'returns error message' do
expect(subject.error).to eq('error!')
end
context 'when something goes wrong during export status fetch' do
it 'returns exception class as error' do
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:get).and_raise(StandardError, 'Error!')
end
expect(subject.error).to eq('Error!')
end
end
end
end
...@@ -12,8 +12,8 @@ RSpec.describe BulkImports::FileTransfer::GroupConfig do ...@@ -12,8 +12,8 @@ RSpec.describe BulkImports::FileTransfer::GroupConfig do
subject { described_class.new(exportable) } subject { described_class.new(exportable) }
describe '#exportable_tree' do describe '#portable_tree' do
it 'returns exportable tree' do it 'returns portable tree' do
expect_next_instance_of(::Gitlab::ImportExport::AttributesFinder) do |finder| expect_next_instance_of(::Gitlab::ImportExport::AttributesFinder) do |finder|
expect(finder).to receive(:find_root).with(:group).and_call_original expect(finder).to receive(:find_root).with(:group).and_call_original
end end
...@@ -30,9 +30,21 @@ RSpec.describe BulkImports::FileTransfer::GroupConfig do ...@@ -30,9 +30,21 @@ RSpec.describe BulkImports::FileTransfer::GroupConfig do
end end
end end
describe '#exportable_relations' do describe '#portable_relations' do
it 'returns a list of top level exportable relations' do it 'returns a list of top level exportable relations' do
expect(subject.portable_relations).to include('milestones', 'badges', 'boards', 'labels') expect(subject.portable_relations).to include('milestones', 'badges', 'boards', 'labels')
end end
end end
describe '#top_relation_tree' do
it 'returns relation tree of a top level relation' do
expect(subject.top_relation_tree('labels')).to eq('priorities' => {})
end
end
describe '#relation_excluded_keys' do
it 'returns excluded keys for relation' do
expect(subject.relation_excluded_keys('group')).to include('owner_id')
end
end
end end
...@@ -12,8 +12,8 @@ RSpec.describe BulkImports::FileTransfer::ProjectConfig do ...@@ -12,8 +12,8 @@ RSpec.describe BulkImports::FileTransfer::ProjectConfig do
subject { described_class.new(exportable) } subject { described_class.new(exportable) }
describe '#exportable_tree' do describe 'portable_tree' do
it 'returns exportable tree' do it 'returns portable tree' do
expect_next_instance_of(::Gitlab::ImportExport::AttributesFinder) do |finder| expect_next_instance_of(::Gitlab::ImportExport::AttributesFinder) do |finder|
expect(finder).to receive(:find_root).with(:project).and_call_original expect(finder).to receive(:find_root).with(:project).and_call_original
end end
...@@ -30,9 +30,21 @@ RSpec.describe BulkImports::FileTransfer::ProjectConfig do ...@@ -30,9 +30,21 @@ RSpec.describe BulkImports::FileTransfer::ProjectConfig do
end end
end end
describe '#exportable_relations' do describe '#portable_relations' do
it 'returns a list of top level exportable relations' do it 'returns a list of top level exportable relations' do
expect(subject.portable_relations).to include('issues', 'labels', 'milestones', 'merge_requests') expect(subject.portable_relations).to include('issues', 'labels', 'milestones', 'merge_requests')
end end
end end
describe '#top_relation_tree' do
it 'returns relation tree of a top level relation' do
expect(subject.top_relation_tree('labels')).to eq('priorities' => {})
end
end
describe '#relation_excluded_keys' do
it 'returns excluded keys for relation' do
expect(subject.relation_excluded_keys('project')).to include('creator_id')
end
end
end end
...@@ -32,11 +32,21 @@ RSpec.describe BulkImports::FileDownloadService do ...@@ -32,11 +32,21 @@ RSpec.describe BulkImports::FileDownloadService do
end end
end end
it 'downloads file' do shared_examples 'downloads file' do
subject.execute it 'downloads file' do
subject.execute
expect(File.exist?(filepath)).to eq(true) expect(File.exist?(filepath)).to eq(true)
expect(File.read(filepath)).to include('chunk') expect(File.read(filepath)).to include('chunk')
end
end
include_examples 'downloads file'
context 'when content-type is application/gzip' do
let_it_be(:content_type) { 'application/gzip' }
include_examples 'downloads file'
end end
context 'when url is not valid' do context 'when url is not valid' do
......
...@@ -8,10 +8,16 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -8,10 +8,16 @@ RSpec.describe BulkImports::PipelineWorker do
def initialize(_); end def initialize(_); end
def run; end def run; end
def self.ndjson_pipeline?
false
end
end end
end end
let_it_be(:entity) { create(:bulk_import_entity) } let_it_be(:bulk_import) { create(:bulk_import) }
let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
before do before do
stub_const('FakePipeline', pipeline_class) stub_const('FakePipeline', pipeline_class)
...@@ -27,6 +33,7 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -27,6 +33,7 @@ RSpec.describe BulkImports::PipelineWorker do
expect(BulkImports::Stage) expect(BulkImports::Stage)
.to receive(:pipeline_exists?) .to receive(:pipeline_exists?)
.with('FakePipeline') .with('FakePipeline')
.twice
.and_return(true) .and_return(true)
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
...@@ -122,4 +129,114 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -122,4 +129,114 @@ RSpec.describe BulkImports::PipelineWorker do
expect(pipeline_tracker.jid).to eq('jid') expect(pipeline_tracker.jid).to eq('jid')
end end
end end
context 'when ndjson pipeline' do
let(:ndjson_pipeline) do
Class.new do
def initialize(_); end
def run; end
def self.ndjson_pipeline?
true
end
end
end
let(:pipeline_tracker) do
create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'NdjsonPipeline'
)
end
before do
stub_const('NdjsonPipeline', ndjson_pipeline)
stub_const('NdjsonPipeline::RELATION', 'test')
allow(BulkImports::Stage)
.to receive(:pipeline_exists?)
.with('NdjsonPipeline')
.and_return(true)
end
it 'runs the pipeline successfully' do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
allow(status).to receive(:started?).and_return(false)
allow(status).to receive(:failed?).and_return(false)
end
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:finished)
end
context 'when export status is started' do
it 'reenqueues pipeline worker' do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
allow(status).to receive(:started?).and_return(true)
allow(status).to receive(:failed?).and_return(false)
end
expect(described_class)
.to receive(:perform_in)
.with(
described_class::NDJSON_PIPELINE_PERFORM_DELAY,
pipeline_tracker.id,
pipeline_tracker.stage,
entity.id
)
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
end
context 'when job reaches timeout' do
it 'marks as failed and logs the error' do
old_created_at = entity.created_at
entity.update!(created_at: (BulkImports::Pipeline::NDJSON_EXPORT_TIMEOUT + 1.hour).ago)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_name: 'NdjsonPipeline',
entity_id: entity.id,
message: 'Pipeline timeout'
)
end
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:failed)
entity.update!(created_at: old_created_at)
end
end
context 'when export status is failed' do
it 'marks as failed and logs the error' do
allow_next_instance_of(BulkImports::ExportStatus) do |status|
allow(status).to receive(:failed?).and_return(true)
allow(status).to receive(:error).and_return('Error!')
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_name: 'NdjsonPipeline',
entity_id: entity.id,
message: 'Error!'
)
end
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
expect(pipeline_tracker.reload.status_name).to eq(:failed)
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment