Commit f64052d8 authored by Kassio Borges's avatar Kassio Borges

BulkImports: Add more information to the logs

parent 24c65e64
...@@ -8,7 +8,7 @@ module BulkImports ...@@ -8,7 +8,7 @@ module BulkImports
MarkedAsFailedError = Class.new(StandardError) MarkedAsFailedError = Class.new(StandardError)
def run def run
raise MarkedAsFailedError if marked_as_failed? raise MarkedAsFailedError if context.entity.failed?
info(message: 'Pipeline started') info(message: 'Pipeline started')
...@@ -40,7 +40,7 @@ module BulkImports ...@@ -40,7 +40,7 @@ module BulkImports
private # rubocop:disable Lint/UselessAccessModifier private # rubocop:disable Lint/UselessAccessModifier
def run_pipeline_step(step, class_name = nil) def run_pipeline_step(step, class_name = nil)
raise MarkedAsFailedError if marked_as_failed? raise MarkedAsFailedError if context.entity.failed?
info(pipeline_step: step, step_class: class_name) info(pipeline_step: step, step_class: class_name)
...@@ -62,24 +62,13 @@ module BulkImports ...@@ -62,24 +62,13 @@ module BulkImports
end end
def mark_as_failed def mark_as_failed
warn(message: 'Pipeline failed', pipeline_class: pipeline) warn(message: 'Pipeline failed')
context.entity.fail_op! context.entity.fail_op!
end end
def marked_as_failed?
return true if context.entity.failed?
false
end
def log_skip(extra = {}) def log_skip(extra = {})
log = { info({ message: 'Skipping due to failed pipeline status' }.merge(extra))
message: 'Skipping due to failed pipeline status',
pipeline_class: pipeline
}.merge(extra)
info(log)
end end
def log_import_failure(exception, step) def log_import_failure(exception, step)
...@@ -92,25 +81,39 @@ module BulkImports ...@@ -92,25 +81,39 @@ module BulkImports
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
} }
error(
pipeline_step: step,
exception_class: exception.class.to_s,
exception_message: exception.message
)
BulkImports::Failure.create(attributes) BulkImports::Failure.create(attributes)
end end
def info(extra = {})
logger.info(log_params(extra))
end
def warn(extra = {}) def warn(extra = {})
logger.warn(log_params(extra)) logger.warn(log_params(extra))
end end
def info(extra = {}) def error(extra = {})
logger.info(log_params(extra)) logger.error(log_params(extra))
end end
def log_params(extra) def log_params(extra)
defaults = { defaults = {
bulk_import_id: context.bulk_import.id,
bulk_import_entity_id: context.entity.id, bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type, bulk_import_entity_type: context.entity.source_type,
pipeline_class: pipeline pipeline_class: pipeline,
context_extra: context.extra
} }
defaults.merge(extra).compact defaults
.merge(extra)
.reject { |_key, value| value.blank? }
end end
def logger def logger
......
...@@ -27,29 +27,31 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -27,29 +27,31 @@ RSpec.describe BulkImports::Pipeline::Runner do
end end
end end
describe 'pipeline runner' do before do
before do stub_const('BulkImports::Extractor', extractor)
stub_const('BulkImports::Extractor', extractor) stub_const('BulkImports::Transformer', transformer)
stub_const('BulkImports::Transformer', transformer) stub_const('BulkImports::Loader', loader)
stub_const('BulkImports::Loader', loader)
pipeline = Class.new do
include BulkImports::Pipeline
extractor BulkImports::Extractor pipeline = Class.new do
transformer BulkImports::Transformer include BulkImports::Pipeline
loader BulkImports::Loader
def after_run(_); end extractor BulkImports::Extractor
end transformer BulkImports::Transformer
loader BulkImports::Loader
stub_const('BulkImports::MyPipeline', pipeline) def after_run(_); end
end end
context 'when entity is not marked as failed' do stub_const('BulkImports::MyPipeline', pipeline)
let(:entity) { create(:bulk_import_entity) } end
let(:context) { BulkImports::Pipeline::Context.new(entity) }
let_it_be_with_refind(:entity) { create(:bulk_import_entity) }
let(:context) { BulkImports::Pipeline::Context.new(entity, extra: :data) }
subject { BulkImports::MyPipeline.new(context) }
describe 'pipeline runner' do
context 'when entity is not marked as failed' do
it 'runs pipeline extractor, transformer, loader' do it 'runs pipeline extractor, transformer, loader' do
extracted_data = BulkImports::Pipeline::ExtractedData.new(data: { foo: :bar }) extracted_data = BulkImports::Pipeline::ExtractedData.new(data: { foo: :bar })
...@@ -76,58 +78,61 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -76,58 +78,61 @@ RSpec.describe BulkImports::Pipeline::Runner do
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info) expect(logger).to receive(:info)
.with( .with(
bulk_import_entity_id: entity.id, log_params(
bulk_import_entity_type: 'group_entity', context,
message: 'Pipeline started', message: 'Pipeline started',
pipeline_class: 'BulkImports::MyPipeline' pipeline_class: 'BulkImports::MyPipeline'
)
) )
expect(logger).to receive(:info) expect(logger).to receive(:info)
.with( .with(
bulk_import_entity_id: entity.id, log_params(
bulk_import_entity_type: 'group_entity', context,
pipeline_class: 'BulkImports::MyPipeline', pipeline_class: 'BulkImports::MyPipeline',
pipeline_step: :extractor, pipeline_step: :extractor,
step_class: 'BulkImports::Extractor' step_class: 'BulkImports::Extractor'
)
) )
expect(logger).to receive(:info) expect(logger).to receive(:info)
.with( .with(
bulk_import_entity_id: entity.id, log_params(
bulk_import_entity_type: 'group_entity', context,
pipeline_class: 'BulkImports::MyPipeline', pipeline_class: 'BulkImports::MyPipeline',
pipeline_step: :transformer, pipeline_step: :transformer,
step_class: 'BulkImports::Transformer' step_class: 'BulkImports::Transformer'
)
) )
expect(logger).to receive(:info) expect(logger).to receive(:info)
.with( .with(
bulk_import_entity_id: entity.id, log_params(
bulk_import_entity_type: 'group_entity', context,
pipeline_class: 'BulkImports::MyPipeline', pipeline_class: 'BulkImports::MyPipeline',
pipeline_step: :loader, pipeline_step: :loader,
step_class: 'BulkImports::Loader' step_class: 'BulkImports::Loader'
)
) )
expect(logger).to receive(:info) expect(logger).to receive(:info)
.with( .with(
bulk_import_entity_id: entity.id, log_params(
bulk_import_entity_type: 'group_entity', context,
pipeline_class: 'BulkImports::MyPipeline', pipeline_class: 'BulkImports::MyPipeline',
pipeline_step: :after_run pipeline_step: :after_run
)
) )
expect(logger).to receive(:info) expect(logger).to receive(:info)
.with( .with(
bulk_import_entity_id: entity.id, log_params(
bulk_import_entity_type: 'group_entity', context,
message: 'Pipeline finished', message: 'Pipeline finished',
pipeline_class: 'BulkImports::MyPipeline' pipeline_class: 'BulkImports::MyPipeline'
)
) )
end end
BulkImports::MyPipeline.new(context).run subject.run
end end
context 'when exception is raised' do context 'when exception is raised' do
let(:entity) { create(:bulk_import_entity, :created) }
let(:context) { BulkImports::Pipeline::Context.new(entity) }
before do before do
allow_next_instance_of(BulkImports::Extractor) do |extractor| allow_next_instance_of(BulkImports::Extractor) do |extractor|
allow(extractor).to receive(:extract).with(context).and_raise(StandardError, 'Error!') allow(extractor).to receive(:extract).with(context).and_raise(StandardError, 'Error!')
...@@ -135,7 +140,21 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -135,7 +140,21 @@ RSpec.describe BulkImports::Pipeline::Runner do
end end
it 'logs import failure' do it 'logs import failure' do
BulkImports::MyPipeline.new(context).run expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:error)
.with(
log_params(
context,
pipeline_step: :extractor,
pipeline_class: 'BulkImports::MyPipeline',
exception_class: 'StandardError',
exception_message: 'Error!'
)
)
end
expect { subject.run }
.to change(entity.failures, :count).by(1)
failure = entity.failures.first failure = entity.failures.first
...@@ -152,29 +171,29 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -152,29 +171,29 @@ RSpec.describe BulkImports::Pipeline::Runner do
end end
it 'marks entity as failed' do it 'marks entity as failed' do
BulkImports::MyPipeline.new(context).run expect { subject.run }
.to change(entity, :status_name).to(:failed)
expect(entity.failed?).to eq(true)
end end
it 'logs warn message' do it 'logs warn message' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:warn) expect(logger).to receive(:warn)
.with( .with(
message: 'Pipeline failed', log_params(
pipeline_class: 'BulkImports::MyPipeline', context,
bulk_import_entity_id: entity.id, message: 'Pipeline failed',
bulk_import_entity_type: entity.source_type pipeline_class: 'BulkImports::MyPipeline'
)
) )
end end
BulkImports::MyPipeline.new(context).run subject.run
end end
end end
context 'when pipeline is not marked to abort on failure' do context 'when pipeline is not marked to abort on failure' do
it 'marks entity as failed' do it 'does not mark entity as failed' do
BulkImports::MyPipeline.new(context).run subject.run
expect(entity.failed?).to eq(false) expect(entity.failed?).to eq(false)
end end
...@@ -183,24 +202,31 @@ RSpec.describe BulkImports::Pipeline::Runner do ...@@ -183,24 +202,31 @@ RSpec.describe BulkImports::Pipeline::Runner do
end end
context 'when entity is marked as failed' do context 'when entity is marked as failed' do
let(:entity) { create(:bulk_import_entity) }
let(:context) { BulkImports::Pipeline::Context.new(entity) }
it 'logs and returns without execution' do it 'logs and returns without execution' do
allow(entity).to receive(:failed?).and_return(true) entity.fail_op!
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info) expect(logger).to receive(:info)
.with( .with(
message: 'Skipping due to failed pipeline status', log_params(
pipeline_class: 'BulkImports::MyPipeline', context,
bulk_import_entity_id: entity.id, message: 'Skipping due to failed pipeline status',
bulk_import_entity_type: 'group_entity' pipeline_class: 'BulkImports::MyPipeline'
)
) )
end end
BulkImports::MyPipeline.new(context).run subject.run
end end
end end
end end
def log_params(context, extra = {})
{
bulk_import_id: context.bulk_import.id,
bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type,
context_extra: context.extra
}.merge(extra)
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment