Commit cd6c7376 authored by Francisco Javier López's avatar Francisco Javier López Committed by Igor Drozdov

Create SnippetRepositoryStorageWorker

This worker will run in the background and will call the
Snippets::UpdateRepositoryStorageService in order to
update the repository storage.
parent f905c33a
# frozen_string_literal: true # frozen_string_literal: true
class SnippetRepository < ApplicationRecord class SnippetRepository < ApplicationRecord
include EachBatch
include Shardable include Shardable
DEFAULT_EMPTY_FILE_NAME = 'snippetfile' DEFAULT_EMPTY_FILE_NAME = 'snippetfile'
......
# frozen_string_literal: true
module ScheduleBulkRepositoryShardMovesMethods
extend ActiveSupport::Concern
include BaseServiceUtility
class_methods do
def enqueue(source_storage_name, destination_storage_name = nil)
schedule_bulk_worker_klass.perform_async(source_storage_name, destination_storage_name)
end
def schedule_bulk_worker_klass
raise NotImplementedError
end
end
def execute(source_storage_name, destination_storage_name = nil)
shard = Shard.find_by_name!(source_storage_name)
repository_klass.for_shard(shard).each_batch(column: container_column) do |relation|
container_klass.id_in(relation.select(container_column)).each do |container|
container.with_lock do
next if container.repository_storage != source_storage_name
storage_move = container.repository_storage_moves.build(
source_storage_name: source_storage_name,
destination_storage_name: destination_storage_name
)
unless storage_move.schedule
log_info("Container #{container.full_path} (#{container.id}) was skipped: #{storage_move.errors.full_messages.to_sentence}")
end
end
end
end
success
end
private
def repository_klass
raise NotImplementedError
end
def container_klass
raise NotImplementedError
end
def container_column
raise NotImplementedError
end
end
...@@ -3,33 +3,29 @@ ...@@ -3,33 +3,29 @@
module Projects module Projects
# Tries to schedule a move for every project with repositories on the source shard # Tries to schedule a move for every project with repositories on the source shard
class ScheduleBulkRepositoryShardMovesService class ScheduleBulkRepositoryShardMovesService
include BaseServiceUtility include ScheduleBulkRepositoryShardMovesMethods
extend ::Gitlab::Utils::Override
def execute(source_storage_name, destination_storage_name = nil) private
shard = Shard.find_by_name!(source_storage_name)
ProjectRepository.for_shard(shard).each_batch(column: :project_id) do |relation| override :repository_klass
Project.id_in(relation.select(:project_id)).each do |project| def repository_klass
project.with_lock do ProjectRepository
next if project.repository_storage != source_storage_name
storage_move = project.repository_storage_moves.build(
source_storage_name: source_storage_name,
destination_storage_name: destination_storage_name
)
unless storage_move.schedule
log_info("Project #{project.full_path} (#{project.id}) was skipped: #{storage_move.errors.full_messages.to_sentence}")
end
end
end end
override :container_klass
def container_klass
Project
end end
success override :container_column
def container_column
:project_id
end end
def self.enqueue(source_storage_name, destination_storage_name = nil) override :schedule_bulk_worker_klass
::ProjectScheduleBulkRepositoryShardMovesWorker.perform_async(source_storage_name, destination_storage_name) def self.schedule_bulk_worker_klass
::ProjectScheduleBulkRepositoryShardMovesWorker
end end
end end
end end
# frozen_string_literal: true
module Snippets
# Tries to schedule a move for every snippet with repositories on the source shard
class ScheduleBulkRepositoryShardMovesService
include ScheduleBulkRepositoryShardMovesMethods
extend ::Gitlab::Utils::Override
private
override :repository_klass
def repository_klass
SnippetRepository
end
override :container_klass
def container_klass
Snippet
end
override :container_column
def container_column
:snippet_id
end
override :schedule_bulk_worker_klass
def self.schedule_bulk_worker_klass
::SnippetScheduleBulkRepositoryShardMovesWorker
end
end
end
...@@ -2103,6 +2103,14 @@ ...@@ -2103,6 +2103,14 @@
:weight: 1 :weight: 1
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: snippet_schedule_bulk_repository_shard_moves
:feature_category: :gitaly
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: snippet_update_repository_storage - :name: snippet_update_repository_storage
:feature_category: :gitaly :feature_category: :gitaly
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true
class SnippetScheduleBulkRepositoryShardMovesWorker
include ApplicationWorker
idempotent!
feature_category :gitaly
urgency :throttled
def perform(source_storage_name, destination_storage_name = nil)
Snippets::ScheduleBulkRepositoryShardMovesService.new.execute(source_storage_name, destination_storage_name)
end
end
...@@ -320,6 +320,8 @@ ...@@ -320,6 +320,8 @@
- 1 - 1
- - set_user_status_based_on_user_cap_setting - - set_user_status_based_on_user_cap_setting
- 1 - 1
- - snippet_schedule_bulk_repository_shard_moves
- 1
- - snippet_update_repository_storage - - snippet_update_repository_storage
- 1 - 1
- - status_page_publish - - status_page_publish
......
...@@ -3,45 +3,10 @@ ...@@ -3,45 +3,10 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Projects::ScheduleBulkRepositoryShardMovesService do RSpec.describe Projects::ScheduleBulkRepositoryShardMovesService do
before do it_behaves_like 'moves repository shard in bulk' do
stub_storage_settings('test_second_storage' => { 'path' => 'tmp/tests/extra_storage' }) let_it_be_with_reload(:container) { create(:project, :repository).tap { |project| project.track_project_repository } }
end
let!(:project) { create(:project, :repository).tap { |project| project.track_project_repository } }
let(:source_storage_name) { 'default' }
let(:destination_storage_name) { 'test_second_storage' }
describe '#execute' do
it 'schedules project repository storage moves' do
expect { subject.execute(source_storage_name, destination_storage_name) }
.to change(ProjectRepositoryStorageMove, :count).by(1)
storage_move = project.repository_storage_moves.last!
expect(storage_move).to have_attributes(
source_storage_name: source_storage_name,
destination_storage_name: destination_storage_name,
state_name: :scheduled
)
end
context 'read-only repository' do
let!(:project) { create(:project, :repository, :read_only).tap { |project| project.track_project_repository } }
it 'does not get scheduled' do let(:move_service_klass) { ProjectRepositoryStorageMove }
expect(subject).to receive(:log_info) let(:bulk_worker_klass) { ::ProjectScheduleBulkRepositoryShardMovesWorker }
.with("Project #{project.full_path} (#{project.id}) was skipped: Project is read only")
expect { subject.execute(source_storage_name, destination_storage_name) }
.to change(ProjectRepositoryStorageMove, :count).by(0)
end
end
end
describe '.enqueue' do
it 'defers to the worker' do
expect(::ProjectScheduleBulkRepositoryShardMovesWorker).to receive(:perform_async).with(source_storage_name, destination_storage_name)
described_class.enqueue(source_storage_name, destination_storage_name)
end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Snippets::ScheduleBulkRepositoryShardMovesService do
it_behaves_like 'moves repository shard in bulk' do
let_it_be_with_reload(:container) { create(:snippet, :repository) }
let(:move_service_klass) { SnippetRepositoryStorageMove }
let(:bulk_worker_klass) { ::SnippetScheduleBulkRepositoryShardMovesWorker }
end
end
# frozen_string_literal: true
RSpec.shared_examples 'moves repository shard in bulk' do
let(:source_storage_name) { 'default' }
let(:destination_storage_name) { 'test_second_storage' }
before do
stub_storage_settings(destination_storage_name => { 'path' => 'tmp/tests/extra_storage' })
end
describe '#execute' do
it 'schedules container repository storage moves' do
expect { subject.execute(source_storage_name, destination_storage_name) }
.to change(move_service_klass, :count).by(1)
storage_move = container.repository_storage_moves.last!
expect(storage_move).to have_attributes(
source_storage_name: source_storage_name,
destination_storage_name: destination_storage_name,
state_name: :scheduled
)
end
context 'read-only repository' do
it 'does not get scheduled' do
container.set_repository_read_only!
expect(subject).to receive(:log_info)
.with(/Container #{container.full_path} \(#{container.id}\) was skipped: #{container.class} is read only/)
expect { subject.execute(source_storage_name, destination_storage_name) }
.to change(move_service_klass, :count).by(0)
end
end
end
describe '.enqueue' do
it 'defers to the worker' do
expect(bulk_worker_klass).to receive(:perform_async).with(source_storage_name, destination_storage_name)
described_class.enqueue(source_storage_name, destination_storage_name)
end
end
end
# frozen_string_literal: true
RSpec.shared_examples 'schedules bulk repository shard moves' do
let(:source_storage_name) { 'default' }
let(:destination_storage_name) { 'test_second_storage' }
describe "#perform" do
before do
stub_storage_settings(destination_storage_name => { 'path' => 'tmp/tests/extra_storage' })
allow(worker_klass).to receive(:perform_async)
end
include_examples 'an idempotent worker' do
let(:job_args) { [source_storage_name, destination_storage_name] }
it 'schedules container repository storage moves' do
expect { subject }.to change(move_service_klass, :count).by(1)
storage_move = container.repository_storage_moves.last!
expect(storage_move).to have_attributes(
source_storage_name: source_storage_name,
destination_storage_name: destination_storage_name,
state_name: :scheduled
)
end
end
end
end
...@@ -3,31 +3,10 @@ ...@@ -3,31 +3,10 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe ProjectScheduleBulkRepositoryShardMovesWorker do RSpec.describe ProjectScheduleBulkRepositoryShardMovesWorker do
describe "#perform" do it_behaves_like 'schedules bulk repository shard moves' do
before do let_it_be_with_reload(:container) { create(:project, :repository).tap { |project| project.track_project_repository } }
stub_storage_settings('test_second_storage' => { 'path' => 'tmp/tests/extra_storage' })
allow(ProjectUpdateRepositoryStorageWorker).to receive(:perform_async) let(:move_service_klass) { ProjectRepositoryStorageMove }
end let(:worker_klass) { ProjectUpdateRepositoryStorageWorker }
let!(:project) { create(:project, :repository).tap { |project| project.track_project_repository } }
let(:source_storage_name) { 'default' }
let(:destination_storage_name) { 'test_second_storage' }
include_examples 'an idempotent worker' do
let(:job_args) { [source_storage_name, destination_storage_name] }
it 'schedules project repository storage moves' do
expect { subject }.to change(ProjectRepositoryStorageMove, :count).by(1)
storage_move = project.repository_storage_moves.last!
expect(storage_move).to have_attributes(
source_storage_name: source_storage_name,
destination_storage_name: destination_storage_name,
state_name: :scheduled
)
end
end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe SnippetScheduleBulkRepositoryShardMovesWorker do
it_behaves_like 'schedules bulk repository shard moves' do
let_it_be_with_reload(:container) { create(:snippet, :repository).tap { |snippet| snippet.create_repository } }
let(:move_service_klass) { SnippetRepositoryStorageMove }
let(:worker_klass) { SnippetUpdateRepositoryStorageWorker }
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment