Commit d5994552 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'zj-pool-repository-creation' into 'master'

Allow public forks to be deduplicated

See merge request gitlab-org/gitlab-ce!23508
parents 88c0984d 896c0bdb
# frozen_string_literal: true
# The PoolRepository model is the database equivalent of an ObjectPool for Gitaly
# That is; PoolRepository is the record in the database, ObjectPool is the
# repository on disk
class PoolRepository < ActiveRecord::Base
include Shardable
include AfterCommitQueue
has_one :source_project, class_name: 'Project'
validates :source_project, presence: true
has_many :member_projects, class_name: 'Project'
after_create :correct_disk_path
state_machine :state, initial: :none do
state :scheduled
state :ready
state :failed
event :schedule do
transition none: :scheduled
end
event :mark_ready do
transition [:scheduled, :failed] => :ready
end
event :mark_failed do
transition all => :failed
end
state all - [:ready] do
def joinable?
false
end
end
state :ready do
def joinable?
true
end
end
after_transition none: :scheduled do |pool, _|
pool.run_after_commit do
::ObjectPool::CreateWorker.perform_async(pool.id)
end
end
after_transition scheduled: :ready do |pool, _|
pool.run_after_commit do
::ObjectPool::ScheduleJoinWorker.perform_async(pool.id)
end
end
end
def create_object_pool
object_pool.create
end
# The members of the pool should have fetched the missing objects to their own
# objects directory. If the caller fails to do so, data loss might occur
def delete_object_pool
object_pool.delete
end
def link_repository(repository)
object_pool.link(repository.raw)
end
# This RPC can cause data loss, as not all objects are present the local repository
# No execution path yet, will be added through:
# https://gitlab.com/gitlab-org/gitaly/issues/1415
def delete_repository_alternate(repository)
object_pool.unlink_repository(repository.raw)
end
def object_pool
@object_pool ||= Gitlab::Git::ObjectPool.new(
shard.name,
disk_path + '.git',
source_project.repository.raw)
end
private
def correct_disk_path
......
......@@ -1585,6 +1585,7 @@ class Project < ActiveRecord::Base
import_state.remove_jid
update_project_counter_caches
after_create_default_branch
join_pool_repository
refresh_markdown_cache!
end
......@@ -1981,8 +1982,48 @@ class Project < ActiveRecord::Base
Gitlab::CurrentSettings.max_attachment_size.megabytes.to_i
end
def object_pool_params
return {} unless !forked? && git_objects_poolable?
{
repository_storage: repository_storage,
pool_repository: pool_repository || create_new_pool_repository
}
end
# Git objects are only poolable when the project is or has:
# - Hashed storage -> The object pool will have a remote to its members, using relative paths.
# If the repository path changes we would have to update the remote.
# - Public -> User will be able to fetch Git objects that might not exist
# in their own repository.
# - Repository -> Else the disk path will be empty, and there's nothing to pool
def git_objects_poolable?
hashed_storage?(:repository) &&
public? &&
repository_exists? &&
Gitlab::CurrentSettings.hashed_storage_enabled &&
Feature.enabled?(:object_pools, self)
end
private
def create_new_pool_repository
pool = begin
create_or_find_pool_repository!(shard: Shard.by_name(repository_storage), source_project: self)
rescue ActiveRecord::RecordNotUnique
retry
end
pool.schedule
pool
end
def join_pool_repository
return unless pool_repository
ObjectPool::JoinWorker.perform_async(pool_repository.id, self.id)
end
def use_hashed_storage
if self.new_record? && Gitlab::CurrentSettings.hashed_storage_enabled
self.storage_version = LATEST_STORAGE_VERSION
......
......@@ -54,6 +54,8 @@ module Projects
new_params[:avatar] = @project.avatar
end
new_params.merge!(@project.object_pool_params)
new_project = CreateService.new(current_user, new_params).execute
return new_project unless new_project.persisted?
......
......@@ -85,6 +85,10 @@
- todos_destroyer:todos_destroyer_project_private
- todos_destroyer:todos_destroyer_private_features
- object_pool:object_pool_create
- object_pool:object_pool_schedule_join
- object_pool:object_pool_join
- default
- mailers # ActionMailer::DeliveryJob.queue_name
......
# frozen_string_literal: true
##
# Concern for setting Sidekiq settings for the various ObjectPool queues
#
module ObjectPoolQueue
extend ActiveSupport::Concern
included do
queue_namespace :object_pool
end
end
......@@ -28,6 +28,8 @@ class GitGarbageCollectWorker
# Refresh the branch cache in case garbage collection caused a ref lookup to fail
flush_ref_caches(project) if task == :gc
project.repository.expire_statistics_caches
# In case pack files are deleted, release libgit2 cache and open file
# descriptors ASAP instead of waiting for Ruby garbage collection
project.cleanup
......
# frozen_string_literal: true
module ObjectPool
class CreateWorker
include ApplicationWorker
include ObjectPoolQueue
include ExclusiveLeaseGuard
attr_reader :pool
def perform(pool_id)
@pool = PoolRepository.find_by_id(pool_id)
return unless pool
try_obtain_lease do
perform_pool_creation
end
end
private
def perform_pool_creation
return unless pool.failed? || pool.scheduled?
# If this is a retry and the previous execution failed, deletion will
# bring the pool back to a pristine state
pool.delete_object_pool if pool.failed?
pool.create_object_pool
pool.mark_ready
rescue => e
pool.mark_failed
raise e
end
def lease_key
"object_pool:create:#{pool.id}"
end
def lease_timeout
1.hour
end
end
end
# frozen_string_literal: true
module ObjectPool
class JoinWorker
include ApplicationWorker
include ObjectPoolQueue
def perform(pool_id, project_id)
pool = PoolRepository.find_by_id(pool_id)
return unless pool&.joinable?
project = Project.find_by_id(project_id)
return unless project
pool.link_repository(project.repository)
Projects::HousekeepingService.new(project).execute
end
end
end
# frozen_string_literal: true
module ObjectPool
class ScheduleJoinWorker
include ApplicationWorker
include ObjectPoolQueue
def perform(pool_id)
pool = PoolRepository.find_by_id(pool_id)
return unless pool&.joinable?
pool.member_projects.find_each do |project|
next if project.forked? && !project.import_finished?
ObjectPool::JoinWorker.perform_async(pool.id, project.id)
end
end
end
end
---
title: Allow public forks to be deduplicated
merge_request: 23508
author:
type: added
......@@ -81,5 +81,6 @@
- [delete_diff_files, 1]
- [detect_repository_languages, 1]
- [auto_devops, 2]
- [object_pool, 1]
- [repository_cleanup, 1]
- [delete_stored_files, 1]
# frozen_string_literal: true
class AddStateToPoolRepository < ActiveRecord::Migration[5.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
# Given the table is empty, and the non concurrent methods are chosen so
# the transactions don't have to be disabled
# rubocop: disable Migration/AddConcurrentForeignKey, Migration/AddIndex
def change
add_column(:pool_repositories, :state, :string, null: true)
add_column :pool_repositories, :source_project_id, :integer
add_index :pool_repositories, :source_project_id, unique: true
add_foreign_key :pool_repositories, :projects, column: :source_project_id, on_delete: :nullify
end
# rubocop: enable Migration/AddConcurrentForeignKey, Migration/AddIndex
end
......@@ -1512,8 +1512,11 @@ ActiveRecord::Schema.define(version: 20181203002526) do
create_table "pool_repositories", id: :bigserial, force: :cascade do |t|
t.integer "shard_id", null: false
t.string "disk_path"
t.string "state"
t.integer "source_project_id"
t.index ["disk_path"], name: "index_pool_repositories_on_disk_path", unique: true, using: :btree
t.index ["shard_id"], name: "index_pool_repositories_on_shard_id", using: :btree
t.index ["source_project_id"], name: "index_pool_repositories_on_source_project_id", unique: true, using: :btree
end
create_table "programming_languages", force: :cascade do |t|
......@@ -2393,6 +2396,7 @@ ActiveRecord::Schema.define(version: 20181203002526) do
add_foreign_key "oauth_openid_requests", "oauth_access_grants", column: "access_grant_id", name: "fk_oauth_openid_requests_oauth_access_grants_access_grant_id"
add_foreign_key "pages_domains", "projects", name: "fk_ea2f6dfc6f", on_delete: :cascade
add_foreign_key "personal_access_tokens", "users"
add_foreign_key "pool_repositories", "projects", column: "source_project_id", on_delete: :nullify
add_foreign_key "pool_repositories", "shards", on_delete: :restrict
add_foreign_key "project_authorizations", "projects", on_delete: :cascade
add_foreign_key "project_authorizations", "users", on_delete: :cascade
......
......@@ -94,6 +94,23 @@ need to be performed on these nodes as well. Database changes will propagate wit
You must make sure the migration event was already processed or otherwise it may migrate
the files back to Hashed state again.
#### Hashed object pools
For deduplication of public forks and their parent repository, objects are pooled
in an object pool. These object pools are a third repository where shared objects
are stored.
```ruby
# object pool paths
"@pools/#{hash[0..1]}/#{hash[2..3]}/#{hash}.git"
```
The object pool feature is behind the `object_pools` feature flag, and can be
enabled for individual projects by executing
`Feature.enable(:object_pools, Project.find(<id>))`. Note that the project has to
be on hashed storage, should not be a fork itself, and hashed storage should be
enabled for all new projects.
##### Attachments
To rollback single Attachment migration, rename `aa/bb/abcdef1234567890...` folder back to `namespace/project`.
......
# frozen_string_literal: true
module Gitlab
module Git
class ObjectPool
# GL_REPOSITORY has to be passed for Gitlab::Git::Repositories, but not
# used for ObjectPools.
GL_REPOSITORY = ""
delegate :exists?, :size, to: :repository
delegate :delete, to: :object_pool_service
attr_reader :storage, :relative_path, :source_repository
def initialize(storage, relative_path, source_repository)
@storage = storage
@relative_path = relative_path
@source_repository = source_repository
end
def create
object_pool_service.create(source_repository)
end
def link(to_link_repo)
remote_name = to_link_repo.object_pool_remote_name
repository.set_config(
"remote.#{remote_name}.url" => relative_path_to(to_link_repo.relative_path),
"remote.#{remote_name}.tagOpt" => "--no-tags",
"remote.#{remote_name}.fetch" => "+refs/*:refs/remotes/#{remote_name}/*"
)
object_pool_service.link_repository(to_link_repo)
end
def gitaly_object_pool
Gitaly::ObjectPool.new(repository: to_gitaly_repository)
end
def to_gitaly_repository
Gitlab::GitalyClient::Util.repository(storage, relative_path, GL_REPOSITORY)
end
# Allows for reusing other RPCs by 'tricking' Gitaly to think its a repository
def repository
@repository ||= Gitlab::Git::Repository.new(storage, relative_path, GL_REPOSITORY)
end
private
def object_pool_service
@object_pool_service ||= Gitlab::GitalyClient::ObjectPoolService.new(self)
end
def relative_path_to(pool_member_path)
pool_path = Pathname.new("#{relative_path}#{File::SEPARATOR}")
Pathname.new(pool_member_path).relative_path_from(pool_path).to_s
end
end
end
end
......@@ -69,6 +69,13 @@ module Gitlab
attr_reader :storage, :gl_repository, :relative_path
# This remote name has to be stable for all types of repositories that
# can join an object pool. If it's structure ever changes, a migration
# has to be performed on the object pools to update the remote names.
# Else the pool can't be updated anymore and is left in an inconsistent
# state.
alias_method :object_pool_remote_name, :gl_repository
# This initializer method is only used on the client side (gitlab-ce).
# Gitaly-ruby uses a different initializer.
def initialize(storage, relative_path, gl_repository)
......
# frozen_string_literal: true
module Gitlab
module GitalyClient
class ObjectPoolService
attr_reader :object_pool, :storage
def initialize(object_pool)
@object_pool = object_pool.gitaly_object_pool
@storage = object_pool.storage
end
def create(repository)
request = Gitaly::CreateObjectPoolRequest.new(
object_pool: object_pool,
origin: repository.gitaly_repository)
GitalyClient.call(storage, :object_pool_service, :create_object_pool, request)
end
def delete
request = Gitaly::DeleteObjectPoolRequest.new(object_pool: object_pool)
GitalyClient.call(storage, :object_pool_service, :delete_object_pool, request)
end
def link_repository(repository)
request = Gitaly::LinkRepositoryToObjectPoolRequest.new(
object_pool: object_pool,
repository: repository.gitaly_repository
)
GitalyClient.call(storage, :object_pool_service, :link_repository_to_object_pool,
request, timeout: GitalyClient.fast_timeout)
end
def unlink_repository(repository)
request = Gitaly::UnlinkRepositoryFromObjectPoolRequest.new(repository: repository.gitaly_repository)
GitalyClient.call(storage, :object_pool_service, :unlink_repository_from_object_pool,
request, timeout: GitalyClient.fast_timeout)
end
end
end
end
......@@ -279,7 +279,7 @@ describe ProjectsController do
expected_query = /#{public_project.fork_network.find_forks_in(other_user.namespace).to_sql}/
expect { get(:show, namespace_id: public_project.namespace, id: public_project) }
.not_to exceed_query_limit(1).for_query(expected_query)
.not_to exceed_query_limit(2).for_query(expected_query)
end
end
end
......
FactoryBot.define do
factory :pool_repository do
shard
shard { Shard.by_name("default") }
state :none
before(:create) do |pool|
pool.source_project = create(:project, :repository)
end
trait :scheduled do
state :scheduled
end
trait :failed do
state :failed
end
trait :ready do
state :ready
after(:create) do |pool|
pool.create_object_pool
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Git::ObjectPool do
let(:pool_repository) { create(:pool_repository) }
let(:source_repository) { pool_repository.source_project.repository }
subject { pool_repository.object_pool }
describe '#storage' do
it "equals the pool repository's shard name" do
expect(subject.storage).not_to be_nil
expect(subject.storage).to eq(pool_repository.shard_name)
end
end
describe '#create' do
before do
subject.create
end
context "when the pool doesn't exist yet" do
it 'creates the pool' do
expect(subject.exists?).to be(true)
end
end
context 'when the pool already exists' do
it 'raises an FailedPrecondition' do
expect do
subject.create
end.to raise_error(GRPC::FailedPrecondition)
end
end
end
describe '#exists?' do
context "when the object pool doesn't exist" do
it 'returns false' do
expect(subject.exists?).to be(false)
end
end
context 'when the object pool exists' do
let(:pool) { create(:pool_repository, :ready) }
subject { pool.object_pool }
it 'returns true' do
expect(subject.exists?).to be(true)
end
end
end
describe '#link' do
let!(:pool_repository) { create(:pool_repository, :ready) }
context 'when no remotes are set' do
it 'sets a remote' do
subject.link(source_repository)
repo = Gitlab::GitalyClient::StorageSettings.allow_disk_access do
Rugged::Repository.new(subject.repository.path)
end
expect(repo.remotes.count).to be(1)
expect(repo.remotes.first.name).to eq(source_repository.object_pool_remote_name)
end
end
context 'when the remote is already set' do
before do
subject.link(source_repository)
end
it "doesn't raise an error" do
subject.link(source_repository)
repo = Gitlab::GitalyClient::StorageSettings.allow_disk_access do
Rugged::Repository.new(subject.repository.path)
end
expect(repo.remotes.count).to be(1)
expect(repo.remotes.first.name).to eq(source_repository.object_pool_remote_name)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::GitalyClient::ObjectPoolService do
let(:pool_repository) { create(:pool_repository) }
let(:project) { create(:project, :repository) }
let(:raw_repository) { project.repository.raw }
let(:object_pool) { pool_repository.object_pool }
subject { described_class.new(object_pool) }
before do
subject.create(raw_repository)
end
describe '#create' do
it 'exists on disk' do
expect(object_pool.repository.exists?).to be(true)
end
context 'when the pool already exists' do
it 'returns an error' do
expect do
subject.create(raw_repository)
end.to raise_error(GRPC::FailedPrecondition)
end
end
end
describe '#delete' do
it 'removes the repository from disk' do
subject.delete
expect(object_pool.repository.exists?).to be(false)
end
context 'when called twice' do
it "doesn't raise an error" do
subject.delete
expect { object_pool.delete }.not_to raise_error
end
end
end
end
......@@ -5,6 +5,7 @@ require 'spec_helper'
describe PoolRepository do
describe 'associations' do
it { is_expected.to belong_to(:shard) }
it { is_expected.to have_one(:source_project) }
it { is_expected.to have_many(:member_projects) }
end
......@@ -12,15 +13,14 @@ describe PoolRepository do
let!(:pool_repository) { create(:pool_repository) }
it { is_expected.to validate_presence_of(:shard) }
it { is_expected.to validate_presence_of(:source_project) }
end
describe '#disk_path' do
it 'sets the hashed disk_path' do
pool = create(:pool_repository)
elements = File.split(pool.disk_path)
expect(elements).to all( match(/\d{2,}/) )
expect(pool.disk_path).to match(%r{\A@pools/\h{2}/\h{2}/\h{64}})
end
end
end
......@@ -4092,6 +4092,44 @@ describe Project do
end
end
describe '#git_objects_poolable?' do
subject { project }
context 'when the feature flag is turned off' do
before do
stub_feature_flags(object_pools: false)
end
let(:project) { create(:project, :repository, :public) }
it { is_expected.not_to be_git_objects_poolable }
end
context 'when the feature flag is enabled' do
context 'when not using hashed storage' do
let(:project) { create(:project, :legacy_storage, :public, :repository) }
it { is_expected.not_to be_git_objects_poolable }
end
context 'when the project is not public' do
let(:project) { create(:project, :private) }
it { is_expected.not_to be_git_objects_poolable }
end
context 'when objects are poolable' do
let(:project) { create(:project, :repository, :public) }
before do
stub_application_setting(hashed_storage_enabled: true)
end
it { is_expected.to be_git_objects_poolable }
end
end
end
def rugged_config
rugged_repo(project.repository).config
end
......
......@@ -2,7 +2,8 @@ require 'spec_helper'
describe Projects::ForkService do
include ProjectForksHelper
let(:gitlab_shell) { Gitlab::Shell.new }
include Gitlab::ShellAdapter
context 'when forking a new project' do
describe 'fork by user' do
before do
......@@ -235,6 +236,33 @@ describe Projects::ForkService do
end
end
context 'when forking with object pools' do
let(:fork_from_project) { create(:project, :public) }
let(:forker) { create(:user) }
before do
stub_feature_flags(object_pools: true)
end
context 'when no pool exists' do
it 'creates a new object pool' do
forked_project = fork_project(fork_from_project, forker)
expect(forked_project.pool_repository).to eq(fork_from_project.pool_repository)
end
end
context 'when a pool already exists' do
let!(:pool_repository) { create(:pool_repository, source_project: fork_from_project) }
it 'joins the object pool' do
forked_project = fork_project(fork_from_project, forker)
expect(forked_project.pool_repository).to eq(fork_from_project.pool_repository)
end
end
end
context 'when linking fork to an existing project' do
let(:fork_from_project) { create(:project, :public) }
let(:fork_to_project) { create(:project, :public) }
......
# frozen_string_literal: true
require 'spec_helper'
describe ObjectPool::CreateWorker do
let(:pool) { create(:pool_repository, :scheduled) }
subject { described_class.new }
describe '#perform' do
context 'when the pool creation is successful' do
it 'marks the pool as ready' do
subject.perform(pool.id)
expect(pool.reload).to be_ready
end
end
context 'when a the pool already exists' do
before do
pool.create_object_pool
end
it 'cleans up the pool' do
expect do
subject.perform(pool.id)
end.to raise_error(GRPC::FailedPrecondition)
expect(pool.reload.failed?).to be(true)
end
end
context 'when the server raises an unknown error' do
before do
allow_any_instance_of(PoolRepository).to receive(:create_object_pool).and_raise(GRPC::Internal)
end
it 'marks the pool as failed' do
expect do
subject.perform(pool.id)
end.to raise_error(GRPC::Internal)
expect(pool.reload.failed?).to be(true)
end
end
context 'when the pool creation failed before' do
let(:pool) { create(:pool_repository, :failed) }
it 'deletes the pool first' do
expect_any_instance_of(PoolRepository).to receive(:delete_object_pool)
subject.perform(pool.id)
expect(pool.reload).to be_ready
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe ObjectPool::JoinWorker do
let(:pool) { create(:pool_repository, :ready) }
let(:project) { pool.source_project }
let(:repository) { project.repository }
subject { described_class.new }
describe '#perform' do
context "when the pool is not joinable" do
let(:pool) { create(:pool_repository, :scheduled) }
it "doesn't raise an error" do
expect do
subject.perform(pool.id, project.id)
end.not_to raise_error
end
end
context 'when the pool has been joined before' do
before do
pool.link_repository(repository)
end
it 'succeeds in joining' do
expect do
subject.perform(pool.id, project.id)
end.not_to raise_error
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment