Commit 235fa4d4 authored by Kassio Borges's avatar Kassio Borges

BulkImports: Add pipeline step to the failures log

For better understanding of BulkImport Pipeline errors, now the
pipeline step (extractor, transformer, loader) will also be logged in
the `bulk_imports_failures` table.

Related to: https://gitlab.com/gitlab-org/gitlab/-/issues/299528
parent 0cd70234
---
title: 'BulkImports: Add pipeline step to the failures log'
merge_request: 52345
author:
type: changed
# frozen_string_literal: true
class AddPipelineStepToBulkImportsFailures < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
unless column_exists?(:bulk_import_failures, :pipeline_step, :text)
with_lock_retries do
add_column :bulk_import_failures, :pipeline_step, :text
end
end
add_text_limit :bulk_import_failures, :pipeline_step, 255
end
def down
with_lock_retries do
remove_column :bulk_import_failures, :pipeline_step
end
end
end
5f326f101ff06993e9160b0486d24d615abd6d5027b375e422f776181ad8a193
\ No newline at end of file
......@@ -10074,8 +10074,10 @@ CREATE TABLE bulk_import_failures (
exception_class text NOT NULL,
exception_message text NOT NULL,
correlation_id_value text,
pipeline_step text,
CONSTRAINT check_053d65c7a4 CHECK ((char_length(pipeline_class) <= 255)),
CONSTRAINT check_6eca8f972e CHECK ((char_length(exception_message) <= 255)),
CONSTRAINT check_721a422375 CHECK ((char_length(pipeline_step) <= 255)),
CONSTRAINT check_c7dba8398e CHECK ((char_length(exception_class) <= 255)),
CONSTRAINT check_e787285882 CHECK ((char_length(correlation_id_value) <= 255))
);
......
......@@ -31,16 +31,16 @@ module BulkImports
private # rubocop:disable Lint/UselessAccessModifier
def run_pipeline_step(type, class_name, context)
def run_pipeline_step(step, class_name, context)
raise MarkedAsFailedError if marked_as_failed?(context)
info(context, type => class_name)
info(context, step => class_name)
yield
rescue MarkedAsFailedError
log_skip(context, type => class_name)
log_skip(context, step => class_name)
rescue => e
log_import_failure(e, context)
log_import_failure(e, step, context)
mark_as_failed(context) if abort_on_failure?
end
......@@ -72,10 +72,11 @@ module BulkImports
info(context, log)
end
def log_import_failure(exception, context)
def log_import_failure(exception, step, context)
attributes = {
bulk_import_entity_id: context.entity.id,
pipeline_class: pipeline,
pipeline_step: step,
exception_class: exception.class.to_s,
exception_message: exception.message.truncate(255),
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
......
......@@ -103,6 +103,7 @@ RSpec.describe BulkImports::Pipeline::Runner do
expect(failure).to be_present
expect(failure.pipeline_class).to eq('BulkImports::MyPipeline')
expect(failure.pipeline_step).to eq('extractor')
expect(failure.exception_class).to eq('StandardError')
expect(failure.exception_message).to eq('Error!')
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment