Commit e4eae80e authored by George Koltsov's avatar George Koltsov

Add BulkImports::Failure model to capture Bulk Import failures

- Create new db table `bulk_import_failures` to capture any
  import failrues during Group Import execution
- Add pipeline DSL to abort pipeline on failure
- Adjust runner to abort execution if pipeline is marked
parent 80aa006c
......@@ -30,6 +30,11 @@ class BulkImports::Entity < ApplicationRecord
class_name: 'BulkImports::Tracker',
foreign_key: :bulk_import_entity_id
has_many :failures,
class_name: 'BulkImports::Failure',
inverse_of: :entity,
foreign_key: :bulk_import_entity_id
validates :project, absence: true, if: :group
validates :group, absence: true, if: :project
validates :source_type, :source_full_path, :destination_name,
......@@ -52,6 +57,7 @@ class BulkImports::Entity < ApplicationRecord
event :finish do
transition started: :finished
transition failed: :failed
end
event :fail_op do
......
# frozen_string_literal: true
class BulkImports::Failure < ApplicationRecord
self.table_name = 'bulk_import_failures'
belongs_to :entity,
class_name: 'BulkImports::Entity',
foreign_key: :bulk_import_entity_id,
inverse_of: :failures,
optional: false
validates :entity, presence: true
end
---
title: Add BulkImports::Failure to store import failures of the Group Migration (BulkImports)
process
merge_request: 47526
author:
type: changed
# frozen_string_literal: true
class CreateBulkImportFailures < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
with_lock_retries do
unless table_exists?(:bulk_import_failures)
create_table :bulk_import_failures do |t|
t.references :bulk_import_entity,
null: false,
index: true,
foreign_key: { on_delete: :cascade }
t.datetime_with_timezone :created_at, null: false
t.text :pipeline_class, null: false
t.text :exception_class, null: false
t.text :exception_message, null: false
t.text :correlation_id_value, index: true
end
end
end
add_text_limit :bulk_import_failures, :pipeline_class, 255
add_text_limit :bulk_import_failures, :exception_class, 255
add_text_limit :bulk_import_failures, :exception_message, 255
add_text_limit :bulk_import_failures, :correlation_id_value, 255
end
def down
with_lock_retries do
drop_table :bulk_import_failures
end
end
end
2b30b1ba41a49ce4a81711e6fef1dbcdaf8b76f824aaf83702cd27833815e57b
\ No newline at end of file
......@@ -9927,6 +9927,29 @@ CREATE SEQUENCE bulk_import_entities_id_seq
ALTER SEQUENCE bulk_import_entities_id_seq OWNED BY bulk_import_entities.id;
CREATE TABLE bulk_import_failures (
id bigint NOT NULL,
bulk_import_entity_id bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
pipeline_class text NOT NULL,
exception_class text NOT NULL,
exception_message text NOT NULL,
correlation_id_value text,
CONSTRAINT check_053d65c7a4 CHECK ((char_length(pipeline_class) <= 255)),
CONSTRAINT check_6eca8f972e CHECK ((char_length(exception_message) <= 255)),
CONSTRAINT check_c7dba8398e CHECK ((char_length(exception_class) <= 255)),
CONSTRAINT check_e787285882 CHECK ((char_length(correlation_id_value) <= 255))
);
CREATE SEQUENCE bulk_import_failures_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE bulk_import_failures_id_seq OWNED BY bulk_import_failures.id;
CREATE TABLE bulk_import_trackers (
id bigint NOT NULL,
bulk_import_entity_id bigint NOT NULL,
......@@ -17789,6 +17812,8 @@ ALTER TABLE ONLY bulk_import_configurations ALTER COLUMN id SET DEFAULT nextval(
ALTER TABLE ONLY bulk_import_entities ALTER COLUMN id SET DEFAULT nextval('bulk_import_entities_id_seq'::regclass);
ALTER TABLE ONLY bulk_import_failures ALTER COLUMN id SET DEFAULT nextval('bulk_import_failures_id_seq'::regclass);
ALTER TABLE ONLY bulk_import_trackers ALTER COLUMN id SET DEFAULT nextval('bulk_import_trackers_id_seq'::regclass);
ALTER TABLE ONLY bulk_imports ALTER COLUMN id SET DEFAULT nextval('bulk_imports_id_seq'::regclass);
......@@ -18815,6 +18840,9 @@ ALTER TABLE ONLY bulk_import_configurations
ALTER TABLE ONLY bulk_import_entities
ADD CONSTRAINT bulk_import_entities_pkey PRIMARY KEY (id);
ALTER TABLE ONLY bulk_import_failures
ADD CONSTRAINT bulk_import_failures_pkey PRIMARY KEY (id);
ALTER TABLE ONLY bulk_import_trackers
ADD CONSTRAINT bulk_import_trackers_pkey PRIMARY KEY (id);
......@@ -20432,6 +20460,10 @@ CREATE INDEX index_bulk_import_entities_on_parent_id ON bulk_import_entities USI
CREATE INDEX index_bulk_import_entities_on_project_id ON bulk_import_entities USING btree (project_id);
CREATE INDEX index_bulk_import_failures_on_bulk_import_entity_id ON bulk_import_failures USING btree (bulk_import_entity_id);
CREATE INDEX index_bulk_import_failures_on_correlation_id_value ON bulk_import_failures USING btree (correlation_id_value);
CREATE INDEX index_bulk_imports_on_user_id ON bulk_imports USING btree (user_id);
CREATE UNIQUE INDEX index_chat_names_on_service_id_and_team_id_and_chat_id ON chat_names USING btree (service_id, team_id, chat_id);
......@@ -23606,6 +23638,9 @@ ALTER TABLE ONLY cluster_providers_aws
ALTER TABLE ONLY grafana_integrations
ADD CONSTRAINT fk_rails_18d0e2b564 FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE;
ALTER TABLE ONLY bulk_import_failures
ADD CONSTRAINT fk_rails_1964240b8c FOREIGN KEY (bulk_import_entity_id) REFERENCES bulk_import_entities(id) ON DELETE CASCADE;
ALTER TABLE ONLY group_wiki_repositories
ADD CONSTRAINT fk_rails_19755e374b FOREIGN KEY (shard_id) REFERENCES shards(id) ON DELETE RESTRICT;
......
......@@ -6,6 +6,8 @@ module BulkImports
class GroupPipeline
include Pipeline
abort_on_failure!
extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery
transformer Common::Transformers::HashKeyDigger, key_path: %w[data group]
......
......@@ -26,13 +26,17 @@ module BulkImports
@after_run ||= self.class.after_run_callback
end
def pipeline_name
def pipeline
@pipeline ||= self.class.name
end
def instantiate(class_config)
class_config[:klass].new(class_config[:options])
end
def abort_on_failure?
self.class.abort_on_failure?
end
end
class_methods do
......@@ -68,6 +72,14 @@ module BulkImports
class_attributes[:after_run]
end
def abort_on_failure!
class_attributes[:abort_on_failure] = true
end
def abort_on_failure?
class_attributes[:abort_on_failure]
end
private
def add_attribute(sym, klass, options)
......
......@@ -5,35 +5,102 @@ module BulkImports
module Runner
extend ActiveSupport::Concern
MarkedAsFailedError = Class.new(StandardError)
def run(context)
info(context, message: "Pipeline started", pipeline: pipeline_name)
raise MarkedAsFailedError if marked_as_failed?(context)
info(context, message: 'Pipeline started', pipeline_class: pipeline)
extractors.each do |extractor|
extractor.extract(context).each do |entry|
info(context, extractor: extractor.class.name)
data = run_pipeline_step(:extractor, extractor.class.name, context) do
extractor.extract(context)
end
transformers.each do |transformer|
info(context, transformer: transformer.class.name)
entry = transformer.transform(context, entry)
end
if data && data.respond_to?(:each)
data.each do |entry|
transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name, context) do
transformer.transform(context, entry)
end
end
loaders.each do |loader|
info(context, loader: loader.class.name)
loader.load(context, entry)
loaders.each do |loader|
run_pipeline_step(:loader, loader.class.name, context) do
loader.load(context, entry)
end
end
end
end
end
after_run.call(context) if after_run.present?
rescue MarkedAsFailedError
log_skip(context)
end
private # rubocop:disable Lint/UselessAccessModifier
def run_pipeline_step(type, class_name, context)
raise MarkedAsFailedError if marked_as_failed?(context)
info(context, type => class_name)
yield
rescue MarkedAsFailedError
log_skip(context, type => class_name)
rescue => e
log_import_failure(e, context)
mark_as_failed(context) if abort_on_failure?
end
def mark_as_failed(context)
warn(context, message: 'Pipeline failed', pipeline_class: pipeline)
context.entity.fail_op!
end
def marked_as_failed?(context)
return true if context.entity.failed?
false
end
def log_skip(context, extra = {})
log = {
message: 'Skipping due to failed pipeline status',
pipeline_class: pipeline
}.merge(extra)
info(context, log)
end
def log_import_failure(exception, context)
attributes = {
bulk_import_entity_id: context.entity.id,
pipeline_class: pipeline,
exception_class: exception.class.to_s,
exception_message: exception.message.truncate(255),
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
BulkImports::Failure.create(attributes)
end
def warn(context, extra = {})
logger.warn(log_base_params(context).merge(extra))
end
def info(context, extra = {})
logger.info({
entity: context.entity.id,
entity_type: context.entity.source_type
}.merge(extra))
logger.info(log_base_params(context).merge(extra))
end
def log_base_params(context)
{
bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type
}
end
def logger
......
# frozen_string_literal: true
require 'securerandom'
FactoryBot.define do
factory :bulk_import_failure, class: 'BulkImports::Failure' do
association :entity, factory: :bulk_import_entity
pipeline_class { 'BulkImports::TestPipeline' }
exception_class { 'StandardError' }
exception_message { 'Standard Error Message' }
correlation_id_value { SecureRandom.uuid }
end
end
......@@ -23,7 +23,7 @@ RSpec.describe BulkImports::Importers::GroupImporter do
end
describe '#execute' do
it "starts the entity and run its pipelines" do
it 'starts the entity and run its pipelines' do
expect(bulk_import_entity).to receive(:start).and_call_original
expect_to_run_pipeline BulkImports::Groups::Pipelines::GroupPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, context: context
......@@ -32,6 +32,18 @@ RSpec.describe BulkImports::Importers::GroupImporter do
expect(bulk_import_entity.reload).to be_finished
end
context 'when failed' do
let(:bulk_import_entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import) }
it 'does not transition entity to finished state' do
allow(bulk_import_entity).to receive(:start!)
subject.execute
expect(bulk_import_entity.reload).to be_failed
end
end
end
def expect_to_run_pipeline(klass, context:)
......
......@@ -3,26 +3,32 @@
require 'spec_helper'
RSpec.describe BulkImports::Pipeline::Runner do
describe 'pipeline runner' do
before do
extractor = Class.new do
def initialize(options = {}); end
let(:extractor) do
Class.new do
def initialize(options = {}); end
def extract(context); end
end
def extract(context); end
end
end
transformer = Class.new do
def initialize(options = {}); end
let(:transformer) do
Class.new do
def initialize(options = {}); end
def transform(context, entry); end
end
def transform(context); end
end
end
loader = Class.new do
def initialize(options = {}); end
let(:loader) do
Class.new do
def initialize(options = {}); end
def load(context, entry); end
end
def load(context); end
end
end
describe 'pipeline runner' do
before do
stub_const('BulkImports::Extractor', extractor)
stub_const('BulkImports::Transformer', transformer)
stub_const('BulkImports::Loader', loader)
......@@ -38,37 +44,126 @@ RSpec.describe BulkImports::Pipeline::Runner do
stub_const('BulkImports::MyPipeline', pipeline)
end
it 'runs pipeline extractor, transformer, loader' do
context = instance_double(
BulkImports::Pipeline::Context,
entity: instance_double(BulkImports::Entity, id: 1, source_type: 'group')
)
entries = [{ foo: :bar }]
expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor).to receive(:extract).with(context).and_return(entries)
context 'when entity is not marked as failed' do
let(:context) do
instance_double(
BulkImports::Pipeline::Context,
entity: instance_double(BulkImports::Entity, id: 1, source_type: 'group', failed?: false)
)
end
expect_next_instance_of(BulkImports::Transformer) do |transformer|
expect(transformer).to receive(:transform).with(context, entries.first).and_return(entries.first)
it 'runs pipeline extractor, transformer, loader' do
entries = [{ foo: :bar }]
expect_next_instance_of(BulkImports::Extractor) do |extractor|
expect(extractor).to receive(:extract).with(context).and_return(entries)
end
expect_next_instance_of(BulkImports::Transformer) do |transformer|
expect(transformer).to receive(:transform).with(context, entries.first).and_return(entries.first)
end
expect_next_instance_of(BulkImports::Loader) do |loader|
expect(loader).to receive(:load).with(context, entries.first)
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info)
.with(
message: 'Pipeline started',
pipeline_class: 'BulkImports::MyPipeline',
bulk_import_entity_id: 1,
bulk_import_entity_type: 'group'
)
expect(logger).to receive(:info)
.with(bulk_import_entity_id: 1, bulk_import_entity_type: 'group', extractor: 'BulkImports::Extractor')
expect(logger).to receive(:info)
.with(bulk_import_entity_id: 1, bulk_import_entity_type: 'group', transformer: 'BulkImports::Transformer')
expect(logger).to receive(:info)
.with(bulk_import_entity_id: 1, bulk_import_entity_type: 'group', loader: 'BulkImports::Loader')
end
BulkImports::MyPipeline.new.run(context)
end
expect_next_instance_of(BulkImports::Loader) do |loader|
expect(loader).to receive(:load).with(context, entries.first)
context 'when exception is raised' do
let(:entity) { create(:bulk_import_entity, :created) }
let(:context) { BulkImports::Pipeline::Context.new(entity: entity) }
before do
allow_next_instance_of(BulkImports::Extractor) do |extractor|
allow(extractor).to receive(:extract).with(context).and_raise(StandardError, 'Error!')
end
end
it 'logs import failure' do
BulkImports::MyPipeline.new.run(context)
failure = entity.failures.first
expect(failure).to be_present
expect(failure.pipeline_class).to eq('BulkImports::MyPipeline')
expect(failure.exception_class).to eq('StandardError')
expect(failure.exception_message).to eq('Error!')
end
context 'when pipeline is marked to abort on failure' do
before do
BulkImports::MyPipeline.abort_on_failure!
end
it 'marks entity as failed' do
BulkImports::MyPipeline.new.run(context)
expect(entity.failed?).to eq(true)
end
it 'logs warn message' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:warn)
.with(
message: 'Pipeline failed',
pipeline_class: 'BulkImports::MyPipeline',
bulk_import_entity_id: entity.id,
bulk_import_entity_type: entity.source_type
)
end
BulkImports::MyPipeline.new.run(context)
end
end
context 'when pipeline is not marked to abort on failure' do
it 'marks entity as failed' do
BulkImports::MyPipeline.new.run(context)
expect(entity.failed?).to eq(false)
end
end
end
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info)
.with(message: "Pipeline started", pipeline: 'BulkImports::MyPipeline', entity: 1, entity_type: 'group')
expect(logger).to receive(:info)
.with(entity: 1, entity_type: 'group', extractor: 'BulkImports::Extractor')
expect(logger).to receive(:info)
.with(entity: 1, entity_type: 'group', transformer: 'BulkImports::Transformer')
expect(logger).to receive(:info)
.with(entity: 1, entity_type: 'group', loader: 'BulkImports::Loader')
context 'when entity is marked as failed' do
let(:context) do
instance_double(
BulkImports::Pipeline::Context,
entity: instance_double(BulkImports::Entity, id: 1, source_type: 'group', failed?: true)
)
end
BulkImports::MyPipeline.new.run(context)
it 'logs and returns without execution' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info)
.with(
message: 'Skipping due to failed pipeline status',
pipeline_class: 'BulkImports::MyPipeline',
bulk_import_entity_id: 1,
bulk_import_entity_type: 'group'
)
end
BulkImports::MyPipeline.new.run(context)
end
end
end
end
......@@ -12,6 +12,8 @@ RSpec.describe BulkImports::Pipeline do
klass = Class.new do
include BulkImports::Pipeline
abort_on_failure!
extractor BulkImports::Extractor, { foo: :bar }
transformer BulkImports::Transformer, { foo: :bar }
loader BulkImports::Loader, { foo: :bar }
......@@ -25,6 +27,7 @@ RSpec.describe BulkImports::Pipeline do
expect(BulkImports::MyPipeline.extractors).to contain_exactly({ klass: BulkImports::Extractor, options: { foo: :bar } })
expect(BulkImports::MyPipeline.transformers).to contain_exactly({ klass: BulkImports::Transformer, options: { foo: :bar } })
expect(BulkImports::MyPipeline.loaders).to contain_exactly({ klass: BulkImports::Loader, options: { foo: :bar } })
expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true)
end
end
......@@ -36,6 +39,7 @@ RSpec.describe BulkImports::Pipeline do
BulkImports::MyPipeline.extractor(klass, options)
BulkImports::MyPipeline.transformer(klass, options)
BulkImports::MyPipeline.loader(klass, options)
BulkImports::MyPipeline.abort_on_failure!
expect(BulkImports::MyPipeline.extractors)
.to contain_exactly(
......@@ -51,6 +55,8 @@ RSpec.describe BulkImports::Pipeline do
.to contain_exactly(
{ klass: BulkImports::Loader, options: { foo: :bar } },
{ klass: klass, options: options })
expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true)
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Failure, type: :model do
describe 'associations' do
it { is_expected.to belong_to(:entity).required }
end
describe 'validations' do
before do
create(:bulk_import_failure)
end
it { is_expected.to validate_presence_of(:entity) }
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment