Commit 293fad56 authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch '299297-parallelise-pages-migration-task' into 'master'

Parallelise pages migration task

See merge request gitlab-org/gitlab!52081
parents c72c0acc 7c097759
...@@ -11,4 +11,5 @@ class ProjectPagesMetadatum < ApplicationRecord ...@@ -11,4 +11,5 @@ class ProjectPagesMetadatum < ApplicationRecord
scope :deployed, -> { where(deployed: true) } scope :deployed, -> { where(deployed: true) }
scope :only_on_legacy_storage, -> { deployed.where(pages_deployment: nil) } scope :only_on_legacy_storage, -> { deployed.where(pages_deployment: nil) }
scope :with_project_route_and_deployment, -> { preload(project: [:namespace, :route, pages_metadatum: :pages_deployment]) }
end end
# frozen_string_literal: true
module Pages
class MigrateFromLegacyStorageService
def initialize(logger, migration_threads, batch_size)
@logger = logger
@migration_threads = migration_threads
@batch_size = batch_size
@migrated = 0
@errored = 0
@counters_lock = Mutex.new
end
def execute
@queue = SizedQueue.new(1)
threads = start_migration_threads
ProjectPagesMetadatum.only_on_legacy_storage.each_batch(of: @batch_size) do |batch|
@queue.push(batch)
end
@queue.close
@logger.info("Waiting for threads to finish...")
threads.each(&:join)
{ migrated: @migrated, errored: @errored }
end
def start_migration_threads
Array.new(@migration_threads) do
Thread.new do
while batch = @queue.pop
process_batch(batch)
end
end
end
end
def process_batch(batch)
batch.with_project_route_and_deployment.each do |metadatum|
project = metadatum.project
migrate_project(project)
end
@logger.info("#{@migrated} projects are migrated successfully, #{@errored} projects failed to be migrated")
end
def migrate_project(project)
result = nil
time = Benchmark.realtime do
result = ::Pages::MigrateLegacyStorageToDeploymentService.new(project).execute
end
if result[:status] == :success
@logger.info("project_id: #{project.id} #{project.pages_path} has been migrated in #{time} seconds")
@counters_lock.synchronize { @migrated += 1 }
else
@logger.error("project_id: #{project.id} #{project.pages_path} failed to be migrated in #{time} seconds: #{result[:message]}")
@counters_lock.synchronize { @errored += 1 }
end
rescue => e
@counters_lock.synchronize { @errored += 1 }
@logger.error("#{e.message} project_id: #{project&.id}")
Gitlab::ErrorTracking.track_exception(e, project_id: project&.id)
end
end
end
...@@ -6,37 +6,20 @@ namespace :gitlab do ...@@ -6,37 +6,20 @@ namespace :gitlab do
task migrate_legacy_storage: :gitlab_environment do task migrate_legacy_storage: :gitlab_environment do
logger = Logger.new(STDOUT) logger = Logger.new(STDOUT)
logger.info('Starting to migrate legacy pages storage to zip deployments') logger.info('Starting to migrate legacy pages storage to zip deployments')
projects_migrated = 0
projects_errored = 0
ProjectPagesMetadatum.only_on_legacy_storage.each_batch(of: 10) do |batch| result = ::Pages::MigrateFromLegacyStorageService.new(logger, migration_threads, batch_size).execute
batch.preload(project: [:namespace, :route, pages_metadatum: :pages_deployment]).each do |metadatum|
project = metadatum.project
result = nil logger.info("A total of #{result[:migrated] + result[:errored]} projects were processed.")
time = Benchmark.realtime do logger.info("- The #{result[:migrated]} projects migrated successfully")
result = ::Pages::MigrateLegacyStorageToDeploymentService.new(project).execute logger.info("- The #{result[:errored]} projects failed to be migrated")
end end
if result[:status] == :success def migration_threads
logger.info("project_id: #{project.id} #{project.pages_path} has been migrated in #{time} seconds") ENV.fetch('PAGES_MIGRATION_THREADS', '3').to_i
projects_migrated += 1
else
logger.error("project_id: #{project.id} #{project.pages_path} failed to be migrated in #{time} seconds: #{result[:message]}")
projects_errored += 1
end
rescue => e
projects_errored += 1
logger.error("#{e.message} project_id: #{project&.id}")
Gitlab::ErrorTracking.track_exception(e, project_id: project&.id)
end
logger.info("#{projects_migrated} projects are migrated successfully, #{projects_errored} projects failed to be migrated")
end end
logger.info("A total of #{projects_migrated + projects_errored} projects were processed.") def batch_size
logger.info("- The #{projects_migrated} projects migrated successfully") ENV.fetch('PAGES_MIGRATION_BATCH_SIZE', '10').to_i
logger.info("- The #{projects_errored} projects failed to be migrated")
end end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Pages::MigrateFromLegacyStorageService do
let(:service) { described_class.new(Rails.logger, 3, 10) }
it 'does not try to migrate pages if pages are not deployed' do
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new)
expect(service.execute).to eq(migrated: 0, errored: 0)
end
it 'uses multiple threads' do
projects = create_list(:project, 20)
projects.each do |project|
project.mark_pages_as_deployed
FileUtils.mkdir_p File.join(project.pages_path, "public")
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
end
service = described_class.new(Rails.logger, 3, 2)
threads = Concurrent::Set.new
expect(service).to receive(:migrate_project).exactly(20).times.and_wrap_original do |m, *args|
threads.add(Thread.current)
# sleep to be 100% certain that once thread can't consume all the queue
# it works without it, but I want to avoid making this test flaky
sleep(0.01)
m.call(*args)
end
expect(service.execute).to eq(migrated: 20, errored: 0)
expect(threads.length).to eq(3)
end
context 'when pages are marked as deployed' do
let(:project) { create(:project) }
before do
project.mark_pages_as_deployed
end
context 'when pages directory does not exist' do
it 'tries to migrate the project, but does not crash' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project) do |service|
expect(service).to receive(:execute).and_call_original
end
expect(service.execute).to eq(migrated: 0, errored: 1)
end
end
context 'when pages directory exists on disk' do
before do
FileUtils.mkdir_p File.join(project.pages_path, "public")
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
end
it 'migrates pages projects without deployments' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project) do |service|
expect(service).to receive(:execute).and_call_original
end
expect do
expect(service.execute).to eq(migrated: 1, errored: 0)
end.to change { project.pages_metadatum.reload.pages_deployment }.from(nil)
end
context 'when deployed already exists for the project' do
before do
deployment = create(:pages_deployment, project: project)
project.set_first_pages_deployment!(deployment)
end
it 'does not try to migrate project' do
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new)
expect(service.execute).to eq(migrated: 0, errored: 0)
end
end
end
end
end
...@@ -9,59 +9,31 @@ RSpec.describe 'gitlab:pages:migrate_legacy_storagerake task' do ...@@ -9,59 +9,31 @@ RSpec.describe 'gitlab:pages:migrate_legacy_storagerake task' do
subject { run_rake_task('gitlab:pages:migrate_legacy_storage') } subject { run_rake_task('gitlab:pages:migrate_legacy_storage') }
let(:project) { create(:project) } it 'calls migration service' do
expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, 3, 10) do |service|
it 'does not try to migrate pages if pages are not deployed' do
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new)
subject
end
context 'when pages are marked as deployed' do
before do
project.mark_pages_as_deployed
end
context 'when pages directory does not exist' do
it 'tries to migrate the project, but does not crash' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project) do |service|
expect(service).to receive(:execute).and_call_original expect(service).to receive(:execute).and_call_original
end end
subject subject
end end
end
context 'when pages directory exists on disk' do it 'uses PAGES_MIGRATION_THREADS environment variable' do
before do stub_env('PAGES_MIGRATION_THREADS', '5')
FileUtils.mkdir_p File.join(project.pages_path, "public")
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
end
it 'migrates pages projects without deployments' do expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, 5, 10) do |service|
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project) do |service|
expect(service).to receive(:execute).and_call_original expect(service).to receive(:execute).and_call_original
end end
expect do
subject subject
end.to change { project.pages_metadatum.reload.pages_deployment }.from(nil)
end end
context 'when deployed already exists for the project' do it 'uses PAGES_MIGRATION_BATCH_SIZE environment variable' do
before do stub_env('PAGES_MIGRATION_BATCH_SIZE', '100')
deployment = create(:pages_deployment, project: project)
project.set_first_pages_deployment!(deployment)
end
it 'does not try to migrate project' do expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, 3, 100) do |service|
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new) expect(service).to receive(:execute).and_call_original
end
subject subject
end end
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment