Commit 420f6b54 authored by Grzegorz Bizon's avatar Grzegorz Bizon

Merge branch 'fix/gb/stage-id-reference-background-migration' into 'master'

Add build stage_id reference background migration

Closes #34151

See merge request !12513
parents 43b9141c e36daa0f
...@@ -2,18 +2,34 @@ class BackgroundMigrationWorker ...@@ -2,18 +2,34 @@ class BackgroundMigrationWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue include DedicatedSidekiqQueue
# Schedules a number of jobs in bulk # Enqueues a number of jobs in bulk.
# #
# The `jobs` argument should be an Array of Arrays, each sub-array must be in # The `jobs` argument should be an Array of Arrays, each sub-array must be in
# the form: # the form:
# #
# [migration-class, [arg1, arg2, ...]] # [migration-class, [arg1, arg2, ...]]
def self.perform_bulk(*jobs) def self.perform_bulk(jobs)
Sidekiq::Client.push_bulk('class' => self, Sidekiq::Client.push_bulk('class' => self,
'queue' => sidekiq_options['queue'], 'queue' => sidekiq_options['queue'],
'args' => jobs) 'args' => jobs)
end end
# Schedules multiple jobs in bulk, with a delay.
#
def self.perform_bulk_in(delay, jobs)
now = Time.now.to_i
schedule = now + delay.to_i
if schedule <= now
raise ArgumentError, 'The schedule time must be in the future!'
end
Sidekiq::Client.push_bulk('class' => self,
'queue' => sidekiq_options['queue'],
'args' => jobs,
'at' => schedule)
end
# Performs the background migration. # Performs the background migration.
# #
# See Gitlab::BackgroundMigration.perform for more information. # See Gitlab::BackgroundMigration.perform for more information.
......
class MigrateStageIdReferenceInBackground < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
BATCH_SIZE = 10000
RANGE_SIZE = 1000
MIGRATION = 'MigrateBuildStageIdReference'.freeze
disable_ddl_transaction!
class Build < ActiveRecord::Base
self.table_name = 'ci_builds'
include ::EachBatch
end
##
# It will take around 3 days to process 20M ci_builds.
#
def up
Build.where(stage_id: nil).each_batch(of: BATCH_SIZE) do |relation, index|
relation.each_batch(of: RANGE_SIZE) do |relation|
range = relation.pluck('MIN(id)', 'MAX(id)').first
BackgroundMigrationWorker
.perform_in(index * 2.minutes, MIGRATION, range)
end
end
end
def down
# noop
end
end
...@@ -50,14 +50,13 @@ your migration: ...@@ -50,14 +50,13 @@ your migration:
BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, arg2, ...]) BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, arg2, ...])
``` ```
Usually it's better to schedule jobs in bulk, for this you can use Usually it's better to enqueue jobs in bulk, for this you can use
`BackgroundMigrationWorker.perform_bulk`: `BackgroundMigrationWorker.perform_bulk`:
```ruby ```ruby
BackgroundMigrationWorker.perform_bulk( BackgroundMigrationWorker.perform_bulk(
['BackgroundMigrationClassName', [1]], [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]], ['BackgroundMigrationClassName', [2]]]
...
) )
``` ```
...@@ -68,6 +67,16 @@ consuming migrations it's best to schedule a background job using an ...@@ -68,6 +67,16 @@ consuming migrations it's best to schedule a background job using an
updates. Removals in turn can be handled by simply defining foreign keys with updates. Removals in turn can be handled by simply defining foreign keys with
cascading deletes. cascading deletes.
If you would like to schedule jobs in bulk with a delay, you can use
`BackgroundMigrationWorker.perform_bulk_in`:
```ruby
jobs = [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]]
BackgroundMigrationWorker.perform_bulk_in(5.minutes, jobs)
```
## Cleaning Up ## Cleaning Up
Because background migrations can take a long time you can't immediately clean Because background migrations can take a long time you can't immediately clean
......
module Gitlab
module BackgroundMigration
class MigrateBuildStageIdReference
def perform(start_id, stop_id)
sql = <<-SQL.strip_heredoc
UPDATE ci_builds
SET stage_id =
(SELECT id FROM ci_stages
WHERE ci_stages.pipeline_id = ci_builds.commit_id
AND ci_stages.name = ci_builds.stage)
WHERE ci_builds.id BETWEEN #{start_id.to_i} AND #{stop_id.to_i}
AND ci_builds.stage_id IS NULL
SQL
ActiveRecord::Base.connection.execute(sql)
end
end
end
end
require 'spec_helper'
require Rails.root.join('db', 'post_migrate', '20170628080858_migrate_stage_id_reference_in_background')
describe MigrateStageIdReferenceInBackground, :migration, :sidekiq do
matcher :be_scheduled_migration do |delay, *expected|
match do |migration|
BackgroundMigrationWorker.jobs.any? do |job|
job['args'] == [migration, expected] &&
job['at'].to_i == (delay.to_i + Time.now.to_i)
end
end
failure_message do |migration|
"Migration `#{migration}` with args `#{expected.inspect}` not scheduled!"
end
end
let(:jobs) { table(:ci_builds) }
let(:stages) { table(:ci_stages) }
let(:pipelines) { table(:ci_pipelines) }
let(:projects) { table(:projects) }
before do
stub_const("#{described_class.name}::BATCH_SIZE", 3)
stub_const("#{described_class.name}::RANGE_SIZE", 2)
projects.create!(id: 123, name: 'gitlab1', path: 'gitlab1')
projects.create!(id: 345, name: 'gitlab2', path: 'gitlab2')
pipelines.create!(id: 1, project_id: 123, ref: 'master', sha: 'adf43c3a')
pipelines.create!(id: 2, project_id: 345, ref: 'feature', sha: 'cdf43c3c')
jobs.create!(id: 1, commit_id: 1, project_id: 123, stage_idx: 2, stage: 'build')
jobs.create!(id: 2, commit_id: 1, project_id: 123, stage_idx: 2, stage: 'build')
jobs.create!(id: 3, commit_id: 1, project_id: 123, stage_idx: 1, stage: 'test')
jobs.create!(id: 4, commit_id: 1, project_id: 123, stage_idx: 3, stage: 'deploy')
jobs.create!(id: 5, commit_id: 2, project_id: 345, stage_idx: 1, stage: 'test')
stages.create(id: 101, pipeline_id: 1, project_id: 123, name: 'test')
stages.create(id: 102, pipeline_id: 1, project_id: 123, name: 'build')
stages.create(id: 103, pipeline_id: 1, project_id: 123, name: 'deploy')
jobs.create!(id: 6, commit_id: 2, project_id: 345, stage_id: 101, stage_idx: 1, stage: 'test')
end
it 'correctly schedules background migrations' do
Sidekiq::Testing.fake! do
Timecop.freeze do
migrate!
expect(described_class::MIGRATION).to be_scheduled_migration(2.minutes, 1, 2)
expect(described_class::MIGRATION).to be_scheduled_migration(2.minutes, 3, 3)
expect(described_class::MIGRATION).to be_scheduled_migration(4.minutes, 4, 5)
expect(BackgroundMigrationWorker.jobs.size).to eq 3
end
end
end
it 'schedules background migrations' do
Sidekiq::Testing.inline! do
expect(jobs.where(stage_id: nil).count).to eq 5
migrate!
expect(jobs.where(stage_id: nil).count).to eq 1
end
end
end
...@@ -3,3 +3,9 @@ require 'sidekiq/testing/inline' ...@@ -3,3 +3,9 @@ require 'sidekiq/testing/inline'
Sidekiq::Testing.server_middleware do |chain| Sidekiq::Testing.server_middleware do |chain|
chain.add Gitlab::SidekiqStatus::ServerMiddleware chain.add Gitlab::SidekiqStatus::ServerMiddleware
end end
RSpec.configure do |config|
config.after(:each, :sidekiq) do
Sidekiq::Worker.clear_all
end
end
require 'spec_helper' require 'spec_helper'
describe BackgroundMigrationWorker do describe BackgroundMigrationWorker, :sidekiq do
describe '.perform' do describe '.perform' do
it 'performs a background migration' do it 'performs a background migration' do
expect(Gitlab::BackgroundMigration) expect(Gitlab::BackgroundMigration)
...@@ -10,4 +10,35 @@ describe BackgroundMigrationWorker do ...@@ -10,4 +10,35 @@ describe BackgroundMigrationWorker do
described_class.new.perform('Foo', [10, 20]) described_class.new.perform('Foo', [10, 20])
end end
end end
describe '.perform_bulk' do
it 'enqueues background migrations in bulk' do
Sidekiq::Testing.fake! do
described_class.perform_bulk([['Foo', [1]], ['Foo', [2]]])
expect(described_class.jobs.count).to eq 2
expect(described_class.jobs).to all(include('enqueued_at'))
end
end
end
describe '.perform_bulk_in' do
context 'when delay is valid' do
it 'correctly schedules background migrations' do
Sidekiq::Testing.fake! do
described_class.perform_bulk_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
expect(described_class.jobs.count).to eq 2
expect(described_class.jobs).to all(include('at'))
end
end
end
context 'when delay is invalid' do
it 'raises an ArgumentError exception' do
expect { described_class.perform_bulk_in(-60, [['Foo']]) }
.to raise_error(ArgumentError)
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment