Commit 7c097759 authored by Vladimir Shushlin's avatar Vladimir Shushlin

Parallelize pages migration task

The original task would take too much time on big installations,
e.g. on staging or gitlab.com

Since it became a little too complex, it got extracted to a service
object
parent 32ecdb08
...@@ -11,4 +11,5 @@ class ProjectPagesMetadatum < ApplicationRecord ...@@ -11,4 +11,5 @@ class ProjectPagesMetadatum < ApplicationRecord
scope :deployed, -> { where(deployed: true) } scope :deployed, -> { where(deployed: true) }
scope :only_on_legacy_storage, -> { deployed.where(pages_deployment: nil) } scope :only_on_legacy_storage, -> { deployed.where(pages_deployment: nil) }
scope :with_project_route_and_deployment, -> { preload(project: [:namespace, :route, pages_metadatum: :pages_deployment]) }
end end
# frozen_string_literal: true
module Pages
class MigrateFromLegacyStorageService
def initialize(logger, migration_threads, batch_size)
@logger = logger
@migration_threads = migration_threads
@batch_size = batch_size
@migrated = 0
@errored = 0
@counters_lock = Mutex.new
end
def execute
@queue = SizedQueue.new(1)
threads = start_migration_threads
ProjectPagesMetadatum.only_on_legacy_storage.each_batch(of: @batch_size) do |batch|
@queue.push(batch)
end
@queue.close
@logger.info("Waiting for threads to finish...")
threads.each(&:join)
{ migrated: @migrated, errored: @errored }
end
def start_migration_threads
Array.new(@migration_threads) do
Thread.new do
while batch = @queue.pop
process_batch(batch)
end
end
end
end
def process_batch(batch)
batch.with_project_route_and_deployment.each do |metadatum|
project = metadatum.project
migrate_project(project)
end
@logger.info("#{@migrated} projects are migrated successfully, #{@errored} projects failed to be migrated")
end
def migrate_project(project)
result = nil
time = Benchmark.realtime do
result = ::Pages::MigrateLegacyStorageToDeploymentService.new(project).execute
end
if result[:status] == :success
@logger.info("project_id: #{project.id} #{project.pages_path} has been migrated in #{time} seconds")
@counters_lock.synchronize { @migrated += 1 }
else
@logger.error("project_id: #{project.id} #{project.pages_path} failed to be migrated in #{time} seconds: #{result[:message]}")
@counters_lock.synchronize { @errored += 1 }
end
rescue => e
@counters_lock.synchronize { @errored += 1 }
@logger.error("#{e.message} project_id: #{project&.id}")
Gitlab::ErrorTracking.track_exception(e, project_id: project&.id)
end
end
end
...@@ -6,37 +6,20 @@ namespace :gitlab do ...@@ -6,37 +6,20 @@ namespace :gitlab do
task migrate_legacy_storage: :gitlab_environment do task migrate_legacy_storage: :gitlab_environment do
logger = Logger.new(STDOUT) logger = Logger.new(STDOUT)
logger.info('Starting to migrate legacy pages storage to zip deployments') logger.info('Starting to migrate legacy pages storage to zip deployments')
projects_migrated = 0
projects_errored = 0
ProjectPagesMetadatum.only_on_legacy_storage.each_batch(of: 10) do |batch| result = ::Pages::MigrateFromLegacyStorageService.new(logger, migration_threads, batch_size).execute
batch.preload(project: [:namespace, :route, pages_metadatum: :pages_deployment]).each do |metadatum|
project = metadatum.project
result = nil logger.info("A total of #{result[:migrated] + result[:errored]} projects were processed.")
time = Benchmark.realtime do logger.info("- The #{result[:migrated]} projects migrated successfully")
result = ::Pages::MigrateLegacyStorageToDeploymentService.new(project).execute logger.info("- The #{result[:errored]} projects failed to be migrated")
end end
if result[:status] == :success def migration_threads
logger.info("project_id: #{project.id} #{project.pages_path} has been migrated in #{time} seconds") ENV.fetch('PAGES_MIGRATION_THREADS', '3').to_i
projects_migrated += 1
else
logger.error("project_id: #{project.id} #{project.pages_path} failed to be migrated in #{time} seconds: #{result[:message]}")
projects_errored += 1
end
rescue => e
projects_errored += 1
logger.error("#{e.message} project_id: #{project&.id}")
Gitlab::ErrorTracking.track_exception(e, project_id: project&.id)
end
logger.info("#{projects_migrated} projects are migrated successfully, #{projects_errored} projects failed to be migrated")
end end
logger.info("A total of #{projects_migrated + projects_errored} projects were processed.") def batch_size
logger.info("- The #{projects_migrated} projects migrated successfully") ENV.fetch('PAGES_MIGRATION_BATCH_SIZE', '10').to_i
logger.info("- The #{projects_errored} projects failed to be migrated")
end end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Pages::MigrateFromLegacyStorageService do
let(:service) { described_class.new(Rails.logger, 3, 10) }
it 'does not try to migrate pages if pages are not deployed' do
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new)
expect(service.execute).to eq(migrated: 0, errored: 0)
end
it 'uses multiple threads' do
projects = create_list(:project, 20)
projects.each do |project|
project.mark_pages_as_deployed
FileUtils.mkdir_p File.join(project.pages_path, "public")
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
end
service = described_class.new(Rails.logger, 3, 2)
threads = Concurrent::Set.new
expect(service).to receive(:migrate_project).exactly(20).times.and_wrap_original do |m, *args|
threads.add(Thread.current)
# sleep to be 100% certain that once thread can't consume all the queue
# it works without it, but I want to avoid making this test flaky
sleep(0.01)
m.call(*args)
end
expect(service.execute).to eq(migrated: 20, errored: 0)
expect(threads.length).to eq(3)
end
context 'when pages are marked as deployed' do
let(:project) { create(:project) }
before do
project.mark_pages_as_deployed
end
context 'when pages directory does not exist' do
it 'tries to migrate the project, but does not crash' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project) do |service|
expect(service).to receive(:execute).and_call_original
end
expect(service.execute).to eq(migrated: 0, errored: 1)
end
end
context 'when pages directory exists on disk' do
before do
FileUtils.mkdir_p File.join(project.pages_path, "public")
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
end
it 'migrates pages projects without deployments' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project) do |service|
expect(service).to receive(:execute).and_call_original
end
expect do
expect(service.execute).to eq(migrated: 1, errored: 0)
end.to change { project.pages_metadatum.reload.pages_deployment }.from(nil)
end
context 'when deployed already exists for the project' do
before do
deployment = create(:pages_deployment, project: project)
project.set_first_pages_deployment!(deployment)
end
it 'does not try to migrate project' do
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new)
expect(service.execute).to eq(migrated: 0, errored: 0)
end
end
end
end
end
...@@ -9,59 +9,31 @@ RSpec.describe 'gitlab:pages:migrate_legacy_storagerake task' do ...@@ -9,59 +9,31 @@ RSpec.describe 'gitlab:pages:migrate_legacy_storagerake task' do
subject { run_rake_task('gitlab:pages:migrate_legacy_storage') } subject { run_rake_task('gitlab:pages:migrate_legacy_storage') }
let(:project) { create(:project) } it 'calls migration service' do
expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, 3, 10) do |service|
it 'does not try to migrate pages if pages are not deployed' do
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new)
subject
end
context 'when pages are marked as deployed' do
before do
project.mark_pages_as_deployed
end
context 'when pages directory does not exist' do
it 'tries to migrate the project, but does not crash' do
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project) do |service|
expect(service).to receive(:execute).and_call_original expect(service).to receive(:execute).and_call_original
end end
subject subject
end end
end
context 'when pages directory exists on disk' do it 'uses PAGES_MIGRATION_THREADS environment variable' do
before do stub_env('PAGES_MIGRATION_THREADS', '5')
FileUtils.mkdir_p File.join(project.pages_path, "public")
File.open(File.join(project.pages_path, "public/index.html"), "w") do |f|
f.write("Hello!")
end
end
it 'migrates pages projects without deployments' do expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, 5, 10) do |service|
expect_next_instance_of(::Pages::MigrateLegacyStorageToDeploymentService, project) do |service|
expect(service).to receive(:execute).and_call_original expect(service).to receive(:execute).and_call_original
end end
expect do
subject subject
end.to change { project.pages_metadatum.reload.pages_deployment }.from(nil)
end end
context 'when deployed already exists for the project' do it 'uses PAGES_MIGRATION_BATCH_SIZE environment variable' do
before do stub_env('PAGES_MIGRATION_BATCH_SIZE', '100')
deployment = create(:pages_deployment, project: project)
project.set_first_pages_deployment!(deployment)
end
it 'does not try to migrate project' do expect_next_instance_of(::Pages::MigrateFromLegacyStorageService, anything, 3, 100) do |service|
expect(::Pages::MigrateLegacyStorageToDeploymentService).not_to receive(:new) expect(service).to receive(:execute).and_call_original
end
subject subject
end end
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment