Commit 51679f61 authored by Robert Speicher's avatar Robert Speicher

Merge branch '7126-batch-operations' into 'master'

Geo: Admin > Geo > Projects support for batch operations

Closes #8120

See merge request gitlab-org/gitlab-ee!7806
parents 673231db 760de419
......@@ -161,6 +161,9 @@
- cronjob:update_all_mirrors
- cronjob:pseudonymizer
- geo:geo_batch_project_registry
- geo:geo_batch_project_registry_scheduler
- geo:geo_scheduler_scheduler
- geo:geo_scheduler_primary_scheduler
- geo:geo_scheduler_secondary_scheduler
......
......@@ -55,6 +55,18 @@ class Admin::Geo::ProjectsController < Admin::ApplicationController
redirect_back_or_admin_geo_projects(notice: s_('Geo|%{name} is scheduled for forced re-download') % { name: @registry.project.full_name })
end
def recheck_all
Geo::Batch::ProjectRegistrySchedulerWorker.perform_async(:recheck_repositories)
redirect_back_or_admin_geo_projects(notice: s_('Geo|All projects are being scheduled for re-check'))
end
def resync_all
Geo::Batch::ProjectRegistrySchedulerWorker.perform_async(:resync_repositories)
redirect_back_or_admin_geo_projects(notice: s_('Geo|All projects are being scheduled for re-sync'))
end
private
def check_license
......
......@@ -88,6 +88,42 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
where(project: Geo::Fdw::Project.search(query))
end
def self.flag_repositories_for_resync!
update_all(
resync_repository: true,
repository_verification_checksum_sha: nil,
repository_checksum_mismatch: false,
last_repository_verification_failure: nil,
repository_verification_retry_count: nil,
resync_repository_was_scheduled_at: Time.now,
repository_retry_count: nil,
repository_retry_at: nil
)
end
def self.flag_repositories_for_recheck!
update_all(
repository_verification_checksum_sha: nil,
last_repository_verification_failure: nil,
repository_checksum_mismatch: false
)
end
# Retrieve the range of IDs in a relation
#
# @return [Array] with minimum ID and max ID
def self.range
pluck('MIN(id)', 'MAX(id)').first
end
# Search for IDs in the range
#
# @param [Integer] start initial ID
# @param [Integer] finish final ID
def self.with_range(start, finish)
where(id: start..finish)
end
# Must be run before fetching the repository to avoid a race condition
#
# @param [String] type must be one of the values in TYPES
......
......@@ -24,6 +24,15 @@
= s_('Geo|Never')
.nav-controls
= render(partial: 'shared/projects/search_form', autofocus: true)
.dropdown
%a.btn.dropdown-toggle{ href: '#', data: { toggle: 'dropdown' }, 'aria-haspopup' => 'true', 'aria-expanded' => 'false' }
= icon('gears')
= s_('Geo|Batch operations')
= icon("caret-down")
%ul.dropdown-menu.dropdown-menu-right
%li.dropdown-header= s_('Geo|Batch operations')
%li= link_to s_('Geo|Resync all projects'), resync_all_admin_geo_projects_path, method: :post
%li= link_to s_('Geo|Recheck all projects'), recheck_all_admin_geo_projects_path, method: :post
- case params[:sync_status]
- when 'never'
......
# frozen_string_literal: true
module Geo
module Batch
# Responsible for scheduling multiple jobs to mark Project Registries as requiring syncing or verification.
#
# This class includes an Exclusive Lease guard and only one can be executed at the same time
# If multiple jobs are scheduled, only one will run and the others will drop forever.
class ProjectRegistrySchedulerWorker
include ApplicationWorker
include GeoQueue
include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers
BATCH_SIZE = 10000
LEASE_TIMEOUT = 2.minutes # TTL for X amount of loops to happen until it is renewed
RENEW_AFTER_LOOPS = 20 # renew lease at every 20 loops has finished
OPERATIONS = [:resync_repositories, :recheck_repositories].freeze
DELAY_INTERVAL = 10.seconds.to_i # base delay for scheduling batch execution
def perform(operation)
return fail_invalid_operation!(operation) unless OPERATIONS.include?(operation.to_sym)
try_obtain_lease do
perform_in_batches_with_range(operation.to_sym)
end
end
private
def perform_in_batches_with_range(operation)
Geo::ProjectRegistry.each_batch(of: BATCH_SIZE) do |batch, index|
delay = index * DELAY_INTERVAL
::Geo::Batch::ProjectRegistryWorker.perform_in(delay, operation, batch.range)
renew_lease! if index % RENEW_AFTER_LOOPS == 0 # we renew after X amount of loops to not add much delay here
end
end
def lease_timeout
LEASE_TIMEOUT
end
def fail_invalid_operation!(operation)
raise ArgumentError, "Invalid operation: '#{operation.inspect}' informed. Must be one of the following: #{OPERATIONS.map { |valid_op| "'#{valid_op}'" }.join(', ')}"
end
end
end
end
# frozen_string_literal: true
module Geo
module Batch
# Responsible for scheduling multiple jobs to mark Project Registries as requiring syncing or verification.
#
# This class includes an Exclusive Lease guard and only one can be executed at the same time
# If multiple jobs are scheduled, only one will run and the others will drop forever.
class ProjectRegistryWorker
include ApplicationWorker
include GeoQueue
include ::Gitlab::Geo::LogHelpers
BATCH_SIZE = 250
OPERATIONS = [:resync_repositories, :recheck_repositories].freeze
def perform(operation, range)
case operation.to_sym
when :resync_repositories
resync_repositories(range)
when :recheck_repositories
recheck_repositories(range)
else
fail_invalid_operation!(operation)
end
end
private
def resync_repositories(range)
Geo::ProjectRegistry.with_range(range[0], range[1]).each_batch(of: BATCH_SIZE) do |batch|
batch.flag_repositories_for_resync!
end
end
def recheck_repositories(range)
Geo::ProjectRegistry.with_range(range[0], range[1]).each_batch(of: BATCH_SIZE) do |batch|
batch.flag_repositories_for_recheck!
end
end
def fail_invalid_operation!(operation)
raise ArgumentError, "Invalid operation: '#{operation.inspect}' informed. Must be one of the following: #{OPERATIONS.map { |valid_op| "'#{valid_op}'" }.join(', ')}"
end
end
end
end
---
title: 'Geo: Admin > Geo > Projects support for batch operations'
merge_request: 7806
author:
type: added
......@@ -34,6 +34,11 @@ namespace :admin do
post :resync
post :force_redownload
end
collection do
post :recheck_all
post :resync_all
end
end
end
......
......@@ -10,7 +10,7 @@ module EE
}.freeze
WHITELISTED_GEO_ROUTES_TRACKING_DB = {
'admin/geo/projects' => %w{destroy resync recheck force_redownload}
'admin/geo/projects' => %w{destroy resync recheck force_redownload resync_all recheck_all}
}.freeze
private
......
......@@ -27,7 +27,7 @@ describe Admin::Geo::ProjectsController, :geo do
render_views
before do
allow(Gitlab::Geo).to receive(:license_allows?).and_return(true)
stub_licensed_features(geo: true)
end
context 'without sync_status specified' do
......@@ -87,7 +87,7 @@ describe Admin::Geo::ProjectsController, :geo do
context 'with a valid license' do
before do
allow(Gitlab::Geo).to receive(:license_allows?).and_return(true)
stub_licensed_features(geo: true)
end
context 'with an orphaned registry' do
......@@ -117,7 +117,7 @@ describe Admin::Geo::ProjectsController, :geo do
context 'with a valid license' do
before do
allow(Gitlab::Geo).to receive(:license_allows?).and_return(true)
stub_licensed_features(geo: true)
end
it 'flags registry for recheck' do
......@@ -135,7 +135,7 @@ describe Admin::Geo::ProjectsController, :geo do
context 'with a valid license' do
before do
allow(Gitlab::Geo).to receive(:license_allows?).and_return(true)
stub_licensed_features(geo: true)
end
it 'flags registry for resync' do
......@@ -146,6 +146,56 @@ describe Admin::Geo::ProjectsController, :geo do
end
end
describe '#recheck_all' do
subject { post :recheck_all }
it_behaves_like 'license required'
context 'with a valid license' do
before do
stub_licensed_features(geo: true)
end
it 'schedules a batch job' do
Sidekiq::Testing.fake! do
expect { subject }.to change(Geo::Batch::ProjectRegistrySchedulerWorker.jobs, :size).by(1)
expect(Geo::Batch::ProjectRegistrySchedulerWorker.jobs.last['args']).to include('recheck_repositories')
end
end
it 'redirects back and display confirmation' do
Sidekiq::Testing.inline! do
expect(subject).to redirect_to(admin_geo_projects_path)
expect(flash[:notice]).to include('All projects are being scheduled for re-check')
end
end
end
end
describe '#resync_all' do
subject { post :resync_all }
it_behaves_like 'license required'
context 'with a valid license' do
before do
stub_licensed_features(geo: true)
end
it 'schedules a batch job' do
Sidekiq::Testing.fake! do
expect { subject }.to change(Geo::Batch::ProjectRegistrySchedulerWorker.jobs, :size).by(1)
expect(Geo::Batch::ProjectRegistrySchedulerWorker.jobs.last['args']).to include('resync_repositories')
end
end
it 'redirects back and display confirmation' do
expect(subject).to redirect_to(admin_geo_projects_path)
expect(flash[:notice]).to include('All projects are being scheduled for re-sync')
end
end
end
describe '#force_redownload' do
subject { post :force_redownload, id: synced_registry }
......@@ -153,7 +203,7 @@ describe Admin::Geo::ProjectsController, :geo do
context 'with a valid license' do
before do
allow(Gitlab::Geo).to receive(:license_allows?).and_return(true)
stub_licensed_features(geo: true)
end
it 'flags registry for re-download' do
......
......@@ -41,46 +41,30 @@ describe Gitlab::Middleware::ReadOnly do
allow(Gitlab::Database).to receive(:read_only?) { true }
end
context 'whitelisted requests' do
it 'expects a PATCH request to geo_nodes update URL to be allowed' do
shared_examples 'whitelisted request' do |request_type, request_url|
it "expects a #{request_type.upcase} request to #{request_url} to be allowed" do
expect(Rails.application.routes).to receive(:recognize_path).and_call_original
response = request.patch('/admin/geo/nodes/1')
response = request.send(request_type, request_url)
expect(response).not_to be_redirect
expect(subject).not_to disallow_request
end
end
it 'expects a DELETE request to geo projects delete URL to be allowed' do
expect(Rails.application.routes).to receive(:recognize_path).and_call_original
response = request.delete('/admin/geo/projects/1')
expect(response).not_to be_redirect
expect(subject).not_to disallow_request
end
context 'whitelisted requests' do
it_behaves_like 'whitelisted request', :patch, '/admin/geo/nodes/1'
it 'expects a POST request to geo projects resync URL to be allowed' do
expect(Rails.application.routes).to receive(:recognize_path).and_call_original
response = request.post('/admin/geo/projects/1/resync')
it_behaves_like 'whitelisted request', :delete, '/admin/geo/projects/1'
expect(response).not_to be_redirect
expect(subject).not_to disallow_request
end
it_behaves_like 'whitelisted request', :post, '/admin/geo/projects/1/resync'
it 'expects a POST request to geo projects recheck URL to be allowed' do
expect(Rails.application.routes).to receive(:recognize_path).and_call_original
response = request.post('/admin/geo/projects/1/recheck')
it_behaves_like 'whitelisted request', :post, '/admin/geo/projects/1/recheck'
expect(response).not_to be_redirect
expect(subject).not_to disallow_request
end
it_behaves_like 'whitelisted request', :post, '/admin/geo/projects/recheck_all'
it 'expects a POST request to geo projects force redownload URL to be allowed' do
expect(Rails.application.routes).to receive(:recognize_path).and_call_original
response = request.post('/admin/geo/projects/1/force_redownload')
it_behaves_like 'whitelisted request', :post, '/admin/geo/projects/resync_all'
expect(response).not_to be_redirect
expect(subject).not_to disallow_request
end
it_behaves_like 'whitelisted request', :post, '/admin/geo/projects/1/force_redownload'
end
end
end
......@@ -175,6 +175,39 @@ describe Geo::ProjectRegistry do
end
end
describe '.flag_repositories_for_recheck!' do
it 'modified record to a recheck state' do
registry = create(:geo_project_registry, :repository_verified)
described_class.flag_repositories_for_recheck!
expect(registry.reload).to have_attributes(
repository_verification_checksum_sha: nil,
last_repository_verification_failure: nil,
repository_checksum_mismatch: false
)
end
end
describe '.flag_repositories_for_resync!' do
it 'modified record to a resync state' do
registry = create(:geo_project_registry, :synced)
described_class.flag_repositories_for_resync!
expect(registry.reload).to have_attributes(
resync_repository: true,
repository_verification_checksum_sha: nil,
last_repository_verification_failure: nil,
repository_checksum_mismatch: false,
repository_verification_retry_count: nil,
repository_retry_count: nil,
repository_retry_at: nil
)
end
end
describe '#repository_sync_due?' do
where(:last_synced_at, :resync, :retry_at, :expected) do
now = Time.now
......
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe Geo::Batch::ProjectRegistrySchedulerWorker do
include ExclusiveLeaseHelpers
include ::EE::GeoHelpers
set(:secondary) { create(:geo_node) }
let(:lease_key) { subject.lease_key }
let(:lease_timeout) { 2.minutes }
before do
stub_current_geo_node(secondary)
stub_exclusive_lease(renew: true)
end
describe '#perform' do
context 'when operation is :recheck_repositories' do
let!(:registry) { create(:geo_project_registry, :repository_verified) }
it 'schedules batches of repositories for recheck' do
Sidekiq::Testing.fake! do
expect { subject.perform(:recheck_repositories) }.to change(Geo::Batch::ProjectRegistryWorker.jobs, :size).by(1)
expect(Geo::Batch::ProjectRegistryWorker.jobs.last['args']).to include('recheck_repositories')
end
end
it 'does nothing if exclusive lease is already acquired' do
stub_exclusive_lease_taken(lease_key, timeout: lease_timeout)
Sidekiq::Testing.fake! do
expect { subject.perform(:recheck_repositories) }.not_to change(Geo::Batch::ProjectRegistryWorker.jobs, :size)
end
end
end
context 'when operation is :resync_repositories' do
let!(:registry) { create(:geo_project_registry, :synced) }
it 'schedules batches of repositories for resync' do
Sidekiq::Testing.fake! do
expect { subject.perform(:resync_repositories) }.to change(Geo::Batch::ProjectRegistryWorker.jobs, :size).by(1)
expect(Geo::Batch::ProjectRegistryWorker.jobs.last['args']).to include('resync_repositories')
end
end
it 'does nothing if exclusive lease is already acquired' do
stub_exclusive_lease_taken(lease_key, timeout: lease_timeout)
Sidekiq::Testing.fake! do
expect { subject.perform(:resync_repositories) }.not_to change(Geo::Batch::ProjectRegistryWorker.jobs, :size)
end
end
end
context 'when informed operation is unknown/invalid' do
it 'fails with ArgumentError' do
expect { subject.perform(:unknown_operation) }.to raise_error(ArgumentError)
end
end
end
end
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe Geo::Batch::ProjectRegistryWorker do
include ::EE::GeoHelpers
set(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
end
describe '#perform' do
let(:range) { [0, registry.id] }
context 'when operation is :recheck_repositories' do
let!(:registry) { create(:geo_project_registry, :repository_verified) }
it 'flags repositories for recheck' do
Sidekiq::Testing.inline! do
subject.perform(:recheck_repositories, range)
end
expect(registry.reload.repository_verification_pending?).to be_truthy
end
end
context 'when operation is :resync_repositories' do
let!(:registry) { create(:geo_project_registry, :synced) }
it 'flags repositories for resync' do
Sidekiq::Testing.inline! do
subject.perform(:resync_repositories, range)
end
expect(registry.reload.resync_repository?).to be_truthy
end
end
context 'when informed operation is unknown/invalid' do
let(:range) { [1, 10] }
it 'fails with ArgumentError' do
expect { subject.perform(:unknown_operation, range) }.to raise_error(ArgumentError)
end
end
end
end
......@@ -3721,6 +3721,15 @@ msgstr ""
msgid "Geo|All projects"
msgstr ""
msgid "Geo|All projects are being scheduled for re-check"
msgstr ""
msgid "Geo|All projects are being scheduled for re-sync"
msgstr ""
msgid "Geo|Batch operations"
msgstr ""
msgid "Geo|Could not remove tracking entry for an existing project."
msgstr ""
......@@ -3778,6 +3787,9 @@ msgstr ""
msgid "Geo|Recheck"
msgstr ""
msgid "Geo|Recheck all projects"
msgstr ""
msgid "Geo|Redownload"
msgstr ""
......@@ -3790,6 +3802,9 @@ msgstr ""
msgid "Geo|Resync"
msgstr ""
msgid "Geo|Resync all projects"
msgstr ""
msgid "Geo|Retry count"
msgstr ""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment