Commit 717f5497 authored by Stan Hu's avatar Stan Hu

Merge branch '346398-handle-duplication-of-resources-on-migration' into 'master'

Resolve "Handle Duplication of Resources on Migration"

See merge request gitlab-org/gitlab!75475
parents daee4737 87006d9c
...@@ -29,7 +29,7 @@ class BulkImports::Tracker < ApplicationRecord ...@@ -29,7 +29,7 @@ class BulkImports::Tracker < ApplicationRecord
def self.stage_running?(entity_id, stage) def self.stage_running?(entity_id, stage)
where(stage: stage, bulk_import_entity_id: entity_id) where(stage: stage, bulk_import_entity_id: entity_id)
.with_status(:created, :started) .with_status(:created, :enqueued, :started)
.exists? .exists?
end end
...@@ -45,15 +45,24 @@ class BulkImports::Tracker < ApplicationRecord ...@@ -45,15 +45,24 @@ class BulkImports::Tracker < ApplicationRecord
state :created, value: 0 state :created, value: 0
state :started, value: 1 state :started, value: 1
state :finished, value: 2 state :finished, value: 2
state :enqueued, value: 3
state :failed, value: -1 state :failed, value: -1
state :skipped, value: -2 state :skipped, value: -2
event :start do event :start do
transition created: :started transition enqueued: :started
# To avoid errors when re-starting a pipeline in case of network errors # To avoid errors when re-starting a pipeline in case of network errors
transition started: :started transition started: :started
end end
event :retry do
transition started: :enqueued
end
event :enqueue do
transition created: :enqueued
end
event :finish do event :finish do
transition started: :finished transition started: :finished
transition failed: :failed transition failed: :failed
......
...@@ -1931,7 +1931,7 @@ ...@@ -1931,7 +1931,7 @@
:urgency: :low :urgency: :low
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: :idempotent: true
:tags: [] :tags: []
- :name: bulk_imports_export_request - :name: bulk_imports_export_request
:worker_name: BulkImports::ExportRequestWorker :worker_name: BulkImports::ExportRequestWorker
......
...@@ -12,6 +12,9 @@ module BulkImports ...@@ -12,6 +12,9 @@ module BulkImports
worker_has_external_dependencies! worker_has_external_dependencies!
idempotent!
deduplicate :until_executed, including_scheduled: true
def perform(entity_id, current_stage = nil) def perform(entity_id, current_stage = nil)
return if stage_running?(entity_id, current_stage) return if stage_running?(entity_id, current_stage)
...@@ -48,7 +51,7 @@ module BulkImports ...@@ -48,7 +51,7 @@ module BulkImports
end end
def next_pipeline_trackers_for(entity_id) def next_pipeline_trackers_for(entity_id)
BulkImports::Tracker.next_pipeline_trackers_for(entity_id) BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue')
end end
def logger def logger
......
...@@ -16,7 +16,7 @@ module BulkImports ...@@ -16,7 +16,7 @@ module BulkImports
def perform(pipeline_tracker_id, stage, entity_id) def perform(pipeline_tracker_id, stage, entity_id)
pipeline_tracker = ::BulkImports::Tracker pipeline_tracker = ::BulkImports::Tracker
.with_status(:created, :started) .with_status(:enqueued)
.find_by_id(pipeline_tracker_id) .find_by_id(pipeline_tracker_id)
if pipeline_tracker.present? if pipeline_tracker.present?
...@@ -68,6 +68,8 @@ module BulkImports ...@@ -68,6 +68,8 @@ module BulkImports
message: "Retrying error: #{e.message}" message: "Retrying error: #{e.message}"
) )
pipeline_tracker.update!(status_event: 'retry', jid: jid)
reenqueue(pipeline_tracker, delay: e.retry_delay) reenqueue(pipeline_tracker, delay: e.retry_delay)
else else
fail_tracker(pipeline_tracker, e) fail_tracker(pipeline_tracker, e)
......
...@@ -14,96 +14,118 @@ RSpec.describe BulkImports::EntityWorker do ...@@ -14,96 +14,118 @@ RSpec.describe BulkImports::EntityWorker do
) )
end end
it 'enqueues the first stage pipelines work' do let(:job_args) { entity.id }
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
)
end
expect(BulkImports::PipelineWorker) it 'updates pipeline trackers to enqueued state when selected' do
.to receive(:perform_async) worker = BulkImports::EntityWorker.new
.with(
pipeline_tracker.id,
pipeline_tracker.stage,
entity.id
)
subject.perform(entity.id) next_tracker = worker.send(:next_pipeline_trackers_for, entity.id).first
end
it 'do not enqueue a new pipeline job if the current stage still running' do next_tracker.reload
expect(BulkImports::PipelineWorker)
.not_to receive(:perform_async)
subject.perform(entity.id, 0) expect(next_tracker.enqueued?).to be_truthy
end
it 'enqueues the next stage pipelines when the current stage is finished' do
next_stage_pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'Stage1::Pipeline',
stage: 1
)
pipeline_tracker.fail_op! expect(worker.send(:next_pipeline_trackers_for, entity.id))
.not_to include(next_tracker)
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger| include_examples 'an idempotent worker' do
expect(logger) it 'enqueues the first stage pipelines work' do
.to receive(:info) expect_next_instance_of(Gitlab::Import::Logger) do |logger|
# the worker runs twice but only executes once
expect(logger)
.to receive(:info).twice
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
)
end
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.with( .with(
worker: described_class.name, pipeline_tracker.id,
entity_id: entity.id, pipeline_tracker.stage,
current_stage: 0 entity.id
) )
subject
end end
expect(BulkImports::PipelineWorker) it 'logs and tracks the raised exceptions' do
.to receive(:perform_async) exception = StandardError.new('Error!')
.with(
next_stage_pipeline_tracker.id, expect(BulkImports::PipelineWorker)
next_stage_pipeline_tracker.stage, .to receive(:perform_async)
entity.id .and_raise(exception)
)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info).twice
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
)
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil,
error_message: 'Error!'
)
end
expect(Gitlab::ErrorTracking)
.to receive(:track_exception)
.with(exception, entity_id: entity.id)
subject
end
subject.perform(entity.id, 0) context 'in first stage' do
end let(:job_args) { [entity.id, 0] }
it 'logs and tracks the raised exceptions' do it 'do not enqueue a new pipeline job if the current stage still running' do
exception = StandardError.new('Error!') expect(BulkImports::PipelineWorker)
.not_to receive(:perform_async)
expect(BulkImports::PipelineWorker) subject
.to receive(:perform_async) end
.and_raise(exception)
expect_next_instance_of(Gitlab::Import::Logger) do |logger| it 'enqueues the next stage pipelines when the current stage is finished' do
expect(logger) next_stage_pipeline_tracker = create(
.to receive(:info) :bulk_import_tracker,
.with( entity: entity,
worker: described_class.name, pipeline_name: 'Stage1::Pipeline',
entity_id: entity.id, stage: 1
current_stage: nil
) )
expect(logger) pipeline_tracker.fail_op!
.to receive(:error)
.with( expect_next_instance_of(Gitlab::Import::Logger) do |logger|
worker: described_class.name, expect(logger)
entity_id: entity.id, .to receive(:info).twice
current_stage: nil, .with(
error_message: 'Error!' worker: described_class.name,
) entity_id: entity.id,
current_stage: 0
)
end
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.with(
next_stage_pipeline_tracker.id,
next_stage_pipeline_tracker.stage,
entity.id
)
subject
end
end end
expect(Gitlab::ErrorTracking)
.to receive(:track_exception)
.with(exception, entity_id: entity.id)
subject.perform(entity.id)
end end
end end
...@@ -60,18 +60,8 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -60,18 +60,8 @@ RSpec.describe BulkImports::PipelineWorker do
create( create(
:bulk_import_tracker, :bulk_import_tracker,
entity: entity, entity: entity,
pipeline_name: 'FakePipeline' pipeline_name: 'FakePipeline',
) status_event: 'enqueue'
end
end
it_behaves_like 'successfully runs the pipeline' do
let(:pipeline_tracker) do
create(
:bulk_import_tracker,
:started,
entity: entity,
pipeline_name: 'FakePipeline'
) )
end end
end end
...@@ -109,7 +99,8 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -109,7 +99,8 @@ RSpec.describe BulkImports::PipelineWorker do
pipeline_tracker = create( pipeline_tracker = create(
:bulk_import_tracker, :bulk_import_tracker,
entity: entity, entity: entity,
pipeline_name: 'InexistentPipeline' pipeline_name: 'InexistentPipeline',
status_event: 'enqueue'
) )
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
...@@ -150,7 +141,8 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -150,7 +141,8 @@ RSpec.describe BulkImports::PipelineWorker do
pipeline_tracker = create( pipeline_tracker = create(
:bulk_import_tracker, :bulk_import_tracker,
entity: entity, entity: entity,
pipeline_name: 'FakePipeline' pipeline_name: 'FakePipeline',
status_event: 'enqueue'
) )
exception = BulkImports::NetworkError.new( exception = BulkImports::NetworkError.new(
...@@ -163,7 +155,21 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -163,7 +155,21 @@ RSpec.describe BulkImports::PipelineWorker do
.and_raise(exception) .and_raise(exception)
end end
expect(subject).to receive(:jid).and_return('jid') expect(subject).to receive(:jid).and_return('jid').twice
expect_any_instance_of(BulkImports::Tracker) do |tracker|
expect(tracker).to receive(:retry).and_call_original
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
pipeline_name: 'FakePipeline',
entity_id: entity.id
)
end
expect(described_class) expect(described_class)
.to receive(:perform_in) .to receive(:perform_in)
...@@ -175,6 +181,10 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -175,6 +181,10 @@ RSpec.describe BulkImports::PipelineWorker do
) )
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
pipeline_tracker.reload
expect(pipeline_tracker.enqueued?).to be_truthy
end end
end end
end end
...@@ -200,7 +210,8 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -200,7 +210,8 @@ RSpec.describe BulkImports::PipelineWorker do
create( create(
:bulk_import_tracker, :bulk_import_tracker,
entity: entity, entity: entity,
pipeline_name: 'NdjsonPipeline' pipeline_name: 'NdjsonPipeline',
status_event: 'enqueue'
) )
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment