Add worker to perform repository clean up

This changes introduces the Geo::RepositoryCleanupWorker that is
scheduled for each project when the selective sync rule changes.

It make sure that the project does not meet the selective sync
rule before deleting it.
parent 982c1125
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
- geo:geo_project_sync - geo:geo_project_sync
- geo:geo_rename_repository - geo:geo_rename_repository
- geo:geo_repositories_clean_up - geo:geo_repositories_clean_up
- geo:geo_repository_cleanup
- geo:geo_repository_destroy - geo:geo_repository_destroy
- geo:geo_repository_shard_sync - geo:geo_repository_shard_sync
- geo:geo_repository_verification_primary_shard - geo:geo_repository_verification_primary_shard
......
...@@ -32,7 +32,7 @@ module Geo ...@@ -32,7 +32,7 @@ module Geo
private private
def clean_up_repositories(project) def clean_up_repositories(project)
job_id = ::GeoRepositoryDestroyWorker.perform_async(project.id, project.name, project.disk_path, project.repository.storage) job_id = ::Geo::RepositoryCleanupWorker.perform_async(project.id, project.name, project.disk_path, project.repository.storage)
if job_id if job_id
log_info('Repository clean up scheduled', project_id: project.id, shard: project.repository.storage, disk_path: project.disk_path, job_id: job_id) log_info('Repository clean up scheduled', project_id: project.id, shard: project.repository.storage, disk_path: project.disk_path, job_id: job_id)
......
# frozen_string_literal: true
module Geo
class RepositoryCleanupWorker
include ApplicationWorker
include GeoQueue
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
def perform(project_id, name, disk_path, storage_name)
return unless current_node.secondary?
if can_clean_up?(project_id)
Geo::RepositoryDestroyService.new(project_id, name, disk_path, storage_name).execute
log_info('Repositories cleaned up', project_id: project_id, shard: storage_name, disk_path: disk_path)
else
log_info('Skipping repositories clean up', project_id: project_id, shard: storage_name, disk_path: disk_path)
end
end
private
def can_clean_up?(project_id)
!current_node.projects_include?(project_id)
end
def current_node
strong_memoize(:current_node) do
Gitlab::Geo.current_node
end
end
end
end
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
class GeoRepositoryDestroyWorker class GeoRepositoryDestroyWorker
include ApplicationWorker include ApplicationWorker
include GeoQueue include GeoQueue
include Gitlab::ShellAdapter
def perform(id, name, disk_path, storage_name) def perform(id, name, disk_path, storage_name)
Geo::RepositoryDestroyService.new(id, name, disk_path, storage_name).execute Geo::RepositoryDestroyService.new(id, name, disk_path, storage_name).execute
......
---
title: Geo - Make sure project does not meet selective sync rule before deleting it
merge_request: 9345
author:
type: fixed
require 'spec_helper' require 'spec_helper'
describe Geo::RepositoriesCleanUpWorker do describe Geo::RepositoriesCleanUpWorker do
describe '#perform' do include ::EE::GeoHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
let(:geo_node) { create(:geo_node) } describe '#perform' do
set(:secondary) { create(:geo_node) }
before do before do
stub_current_geo_node(secondary)
stub_exclusive_lease stub_exclusive_lease
end end
context 'when node has selective sync enabled' do context 'when node has selective sync enabled' do
let(:synced_group) { create(:group) } let(:synced_group) { create(:group) }
let(:geo_node) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) } let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'legacy storage' do context 'legacy storage' do
it 'performs GeoRepositoryDestroyWorker for each project that does not belong to selected namespaces to replicate' do it 'performs Geo::RepositoryCleanupWorker for each project that does not belong to selected namespaces to replicate' do
project_in_synced_group = create(:project, :legacy_storage, group: synced_group) project_in_synced_group = create(:project, :legacy_storage, group: synced_group)
unsynced_project = create(:project, :repository, :legacy_storage) unsynced_project = create(:project, :repository, :legacy_storage)
disk_path = "#{unsynced_project.namespace.full_path}/#{unsynced_project.path}" disk_path = "#{unsynced_project.namespace.full_path}/#{unsynced_project.path}"
expect(GeoRepositoryDestroyWorker).to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).to receive(:perform_async)
.with(unsynced_project.id, unsynced_project.name, disk_path, unsynced_project.repository.storage) .with(unsynced_project.id, unsynced_project.name, disk_path, unsynced_project.repository.storage)
.once.and_return(1) .once.and_return(1)
expect(GeoRepositoryDestroyWorker).not_to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
.with(project_in_synced_group.id, project_in_synced_group.name, project_in_synced_group.disk_path, project_in_synced_group.repository.storage) .with(project_in_synced_group.id, project_in_synced_group.name, project_in_synced_group.disk_path, project_in_synced_group.repository.storage)
subject.perform(geo_node.id) subject.perform(secondary.id)
end end
end end
...@@ -36,41 +38,41 @@ describe Geo::RepositoriesCleanUpWorker do ...@@ -36,41 +38,41 @@ describe Geo::RepositoriesCleanUpWorker do
stub_application_setting(hashed_storage_enabled: true) stub_application_setting(hashed_storage_enabled: true)
end end
it 'performs GeoRepositoryDestroyWorker for each project that does not belong to selected namespaces to replicate' do it 'performs Geo::RepositoryCleanupWorker for each project that does not belong to selected namespaces to replicate' do
project_in_synced_group = create(:project, group: synced_group) project_in_synced_group = create(:project, group: synced_group)
unsynced_project = create(:project, :repository) unsynced_project = create(:project, :repository)
hash = Digest::SHA2.hexdigest(unsynced_project.id.to_s) hash = Digest::SHA2.hexdigest(unsynced_project.id.to_s)
disk_path = "@hashed/#{hash[0..1]}/#{hash[2..3]}/#{hash}" disk_path = "@hashed/#{hash[0..1]}/#{hash[2..3]}/#{hash}"
expect(GeoRepositoryDestroyWorker).to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).to receive(:perform_async)
.with(unsynced_project.id, unsynced_project.name, disk_path, unsynced_project.repository.storage) .with(unsynced_project.id, unsynced_project.name, disk_path, unsynced_project.repository.storage)
.once.and_return(1) .once.and_return(1)
expect(GeoRepositoryDestroyWorker).not_to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
.with(project_in_synced_group.id, project_in_synced_group.name, project_in_synced_group.disk_path, project_in_synced_group.repository.storage) .with(project_in_synced_group.id, project_in_synced_group.name, project_in_synced_group.disk_path, project_in_synced_group.repository.storage)
subject.perform(geo_node.id) subject.perform(secondary.id)
end end
end end
context 'when the project repository does not exist on disk' do context 'when the project repository does not exist on disk' do
let(:project) { create(:project) } let(:project) { create(:project) }
it 'performs GeoRepositoryDestroyWorker' do it 'performs Geo::RepositoryCleanupWorker' do
expect(GeoRepositoryDestroyWorker).to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).to receive(:perform_async)
.with(project.id, anything, anything, anything) .with(project.id, anything, anything, anything)
.once .once
.and_return(1) .and_return(1)
subject.perform(geo_node.id) subject.perform(secondary.id)
end end
it 'does not leave orphaned entries in the project_registry table' do it 'does not leave orphaned entries in the project_registry table' do
create(:geo_project_registry, :sync_failed, project: project) create(:geo_project_registry, :sync_failed, project: project)
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
subject.perform(geo_node.id) subject.perform(secondary.id)
end end
expect(Geo::ProjectRegistry.where(project_id: project)).to be_empty expect(Geo::ProjectRegistry.where(project_id: project)).to be_empty
...@@ -78,18 +80,18 @@ describe Geo::RepositoriesCleanUpWorker do ...@@ -78,18 +80,18 @@ describe Geo::RepositoriesCleanUpWorker do
end end
end end
it 'does not perform GeoRepositoryDestroyWorker when node does not selective sync enabled' do it 'does not perform Geo::RepositoryCleanupWorker when does not node have namespace restrictions' do
expect(GeoRepositoryDestroyWorker).not_to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
subject.perform(geo_node.id) subject.perform(secondary.id)
end end
it 'does not perform GeoRepositoryDestroyWorker when cannnot obtain a lease' do it 'does not perform Geo::RepositoryCleanupWorker when cannnot obtain a lease' do
stub_exclusive_lease_taken stub_exclusive_lease_taken
expect(GeoRepositoryDestroyWorker).not_to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
subject.perform(geo_node.id) subject.perform(secondary.id)
end end
it 'does not raise an error when node could not be found' do it 'does not raise an error when node could not be found' do
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::RepositoryCleanupWorker do
include ::EE::GeoHelpers
describe '#perform' do
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
set(:project) { create(:project) }
before do
stub_current_geo_node(secondary)
end
it 'skips repository clean up if the current node is a primary' do
stub_current_geo_node(primary)
expect_any_instance_of(Geo::RepositoryDestroyService).not_to receive(:execute)
described_class.new.perform(project.id, project.name, project.path, 'default')
end
context 'when node does not have selective sync restriction' do
it 'does not delegate project removal' do
expect_any_instance_of(Geo::RepositoryDestroyService).not_to receive(:execute)
described_class.new.perform(project.id, project.name, project.path, 'default')
end
end
context 'when node has selective sync restriction' do
let(:group_1) { create(:group) }
let(:group_2) { create(:group) }
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [group_1])
end
it 'does not delegate project removal for projects that belong to selected namespaces to replicate' do
project = create(:project, group: group_1)
expect_any_instance_of(Geo::RepositoryDestroyService).not_to receive(:execute)
described_class.new.perform(project.id, project.name, project.path, 'default')
end
it 'delegates project removal for projects that do not belong to selected namespaces to replicate' do
project = create(:project, group: group_2)
expect_any_instance_of(Geo::RepositoryDestroyService).to receive(:execute)
described_class.new.perform(project.id, project.name, project.path, 'default')
end
end
end
end
# frozen_string_literal: true
require 'spec_helper' require 'spec_helper'
describe GeoRepositoryDestroyWorker do describe GeoRepositoryDestroyWorker do
let(:project) { create(:project) } describe '#perform' do
it 'delegates project removal to Geo::RepositoryDestroyService' do
project = create(:project)
it 'delegates project removal to Geo::RepositoryDestroyService' do expect_any_instance_of(Geo::RepositoryDestroyService).to receive(:execute)
expect_any_instance_of(Geo::RepositoryDestroyService).to receive(:execute)
described_class.new.perform(project.id, project.name, project.path, 'default') described_class.new.perform(project.id, project.name, project.path, 'default')
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment