Commit 40280490 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre Committed by Nick Thomas

Geo: Ensure that LFS object deletions are communicated to the secondary

parent 37526005
......@@ -26,6 +26,10 @@ module Geo
class_name: 'Geo::HashedStorageMigratedEvent',
foreign_key: :hashed_storage_migrated_event_id
belongs_to :lfs_object_deleted_event,
class_name: 'Geo::LfsObjectDeletedEvent',
foreign_key: :lfs_object_deleted_event_id
def self.latest_event
order(id: :desc).first
end
......@@ -36,7 +40,8 @@ module Geo
repository_deleted_event ||
repository_renamed_event ||
repositories_changed_event ||
hashed_storage_migrated_event
hashed_storage_migrated_event ||
lfs_object_deleted_event
end
def project_id
......
module Geo
class LfsObjectDeletedEvent < ActiveRecord::Base
include Geo::Model
belongs_to :lfs_object
validates :lfs_object, :oid, :file_path, presence: true
end
end
class LfsObject < ActiveRecord::Base
prepend EE::LfsObject
has_many :lfs_objects_projects, dependent: :destroy # rubocop:disable Cop/ActiveRecordDependent
has_many :projects, through: :lfs_objects_projects
......
......@@ -34,9 +34,12 @@ module Geo
return unless Gitlab::Geo.primary?
return unless Gitlab::Geo.secondary_nodes.any? # no need to create an event if no one is listening
Geo::EventLog.create!("#{self.class.event_type}" => build_event)
event = build_event
event.validate!
Geo::EventLog.create!("#{self.class.event_type}" => event)
rescue ActiveRecord::RecordInvalid, NoMethodError => e
log_error("#{self.event_type.to_s.humanize} could not be created", e)
log_error("#{self.class.event_type.to_s.humanize} could not be created", e)
end
private
......
module Geo
class LfsObjectDeletedEventStore < EventStore
self.event_type = :lfs_object_deleted_event
attr_reader :lfs_object
def initialize(lfs_object)
@lfs_object = lfs_object
end
def create
return unless lfs_object.local_store?
super
end
private
def build_event
Geo::LfsObjectDeletedEvent.new(
lfs_object: lfs_object,
oid: lfs_object.oid,
file_path: relative_file_path
)
end
def local_store_path
Pathname.new(LfsObjectUploader.local_store_path)
end
def relative_file_path
return unless lfs_object.file.present?
Pathname.new(lfs_object.file.path).relative_path_from(local_store_path)
end
# This is called by ProjectLogHelpers to build json log with context info
#
# @see ::Gitlab::Geo::ProjectLogHelpers
def base_log_data(message)
{
class: self.class.name,
lfs_object_id: lfs_object.id,
file_path: lfs_object.file.path,
message: message
}
end
end
end
---
title: Geo - Ensure that LFS object deletions are communicated to the secondary
merge_request:
author:
type: fixed
class CreateGeoLfsObjectDeletedEvents < ActiveRecord::Migration
DOWNTIME = false
def change
create_table :geo_lfs_object_deleted_events, id: :bigserial do |t|
# If a LFS object is deleted, we need to retain this entry
t.references :lfs_object, index: true, foreign_key: false, null: false
t.string :oid, null: false
t.string :file_path, null: false
end
add_column :geo_event_log, :lfs_object_deleted_event_id, :integer, limit: 8
end
end
class AddGeoLfsObjectDeletedEventsForeignKey < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_concurrent_foreign_key :geo_event_log, :geo_lfs_object_deleted_events,
column: :lfs_object_deleted_event_id, on_delete: :cascade
end
def down
remove_foreign_key :geo_event_log, column: :lfs_object_deleted_event_id
end
end
......@@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20171114162227) do
ActiveRecord::Schema.define(version: 20171120145444) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......@@ -879,6 +879,7 @@ ActiveRecord::Schema.define(version: 20171114162227) do
t.integer "repositories_changed_event_id", limit: 8
t.integer "repository_created_event_id", limit: 8
t.integer "hashed_storage_migrated_event_id", limit: 8
t.integer "lfs_object_deleted_event_id", limit: 8
end
add_index "geo_event_log", ["repositories_changed_event_id"], name: "index_geo_event_log_on_repositories_changed_event_id", using: :btree
......@@ -901,6 +902,14 @@ ActiveRecord::Schema.define(version: 20171114162227) do
add_index "geo_hashed_storage_migrated_events", ["project_id"], name: "index_geo_hashed_storage_migrated_events_on_project_id", using: :btree
create_table "geo_lfs_object_deleted_events", id: :bigserial, force: :cascade do |t|
t.integer "lfs_object_id", null: false
t.string "oid", null: false
t.string "file_path", null: false
end
add_index "geo_lfs_object_deleted_events", ["lfs_object_id"], name: "index_geo_lfs_object_deleted_events_on_lfs_object_id", using: :btree
create_table "geo_node_namespace_links", force: :cascade do |t|
t.integer "geo_node_id", null: false
t.integer "namespace_id", null: false
......@@ -2433,6 +2442,7 @@ ActiveRecord::Schema.define(version: 20171114162227) do
add_foreign_key "gcp_clusters", "services", on_delete: :nullify
add_foreign_key "gcp_clusters", "users", on_delete: :nullify
add_foreign_key "geo_event_log", "geo_hashed_storage_migrated_events", column: "hashed_storage_migrated_event_id", name: "fk_27548c6db3", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_lfs_object_deleted_events", column: "lfs_object_deleted_event_id", name: "fk_d5af95fcd9", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_repositories_changed_events", column: "repositories_changed_event_id", name: "fk_4a99ebfd60", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_repository_created_events", column: "repository_created_event_id", name: "fk_9b9afb1916", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_repository_deleted_events", column: "repository_deleted_event_id", name: "fk_c4b1c1f66e", on_delete: :cascade
......
module EE
# LFS Object EE mixin
#
# This module is intended to encapsulate EE-specific model logic
# and be prepended in the `LfsObject` model
module LfsObject
extend ActiveSupport::Concern
prepended do
after_destroy :log_geo_event
end
def local_store?
[nil, LfsObjectUploader::LOCAL_STORE].include?(self.file_store)
end
private
def log_geo_event
::Geo::LfsObjectDeletedEventStore.new(self).create
end
end
end
......@@ -41,18 +41,14 @@ module Gitlab
batch.each do |event_log|
next unless can_replay?(event_log)
if event_log.repository_updated_event
handle_repository_updated(event_log)
elsif event_log.repository_created_event
handle_repository_created(event_log)
elsif event_log.repository_deleted_event
handle_repository_deleted(event_log)
elsif event_log.repositories_changed_event
handle_repositories_changed(event_log.repositories_changed_event)
elsif event_log.repository_renamed_event
handle_repository_renamed(event_log)
elsif event_log.hashed_storage_migrated_event
handle_hashed_storage_migrated(event_log)
begin
event = event_log.event
handler = "handle_#{event.class.name.demodulize.underscore}"
__send__(handler, event, event_log.created_at) # rubocop:disable GitlabSecurity/PublicSend
rescue NoMethodError => e
logger.error(e.message)
raise e
end
end
end
......@@ -85,12 +81,11 @@ module Gitlab
Gitlab::Geo.current_node&.projects_include?(event_log.project_id)
end
def handle_repository_created(event_log)
event = event_log.repository_created_event
def handle_repository_created_event(event, created_at)
registry = find_or_initialize_registry(event.project_id, resync_repository: true, resync_wiki: event.wiki_path.present?)
logger.event_info(
event_log.created_at,
created_at,
message: 'Repository created',
project_id: event.project_id,
repo_path: event.repo_path,
......@@ -103,12 +98,11 @@ module Gitlab
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end
def handle_repository_updated(event_log)
event = event_log.repository_updated_event
def handle_repository_updated_event(event, created_at)
registry = find_or_initialize_registry(event.project_id, "resync_#{event.source}" => true)
logger.event_info(
event_log.created_at,
created_at,
message: 'Repository update',
project_id: event.project_id,
source: event.source,
......@@ -120,15 +114,13 @@ module Gitlab
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end
def handle_repository_deleted(event_log)
event = event_log.repository_deleted_event
def handle_repository_deleted_event(event, created_at)
job_id = ::Geo::RepositoryDestroyService
.new(event.project_id, event.deleted_project_name, event.deleted_path, event.repository_storage_name)
.async_execute
logger.event_info(
event_log.created_at,
created_at,
message: 'Deleted project',
project_id: event.project_id,
repository_storage_name: event.repository_storage_name,
......@@ -139,7 +131,7 @@ module Gitlab
::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
end
def handle_repositories_changed(event)
def handle_repositories_changed_event(event, created_at)
return unless Gitlab::Geo.current_node.id == event.geo_node_id
job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id)
......@@ -151,8 +143,7 @@ module Gitlab
end
end
def handle_repository_renamed(event_log)
event = event_log.repository_renamed_event
def handle_repository_renamed_event(event, created_at)
return unless event.project_id
old_path = event.old_path_with_namespace
......@@ -163,7 +154,7 @@ module Gitlab
.async_execute
logger.event_info(
event_log.created_at,
created_at,
message: 'Renaming project',
project_id: event.project_id,
old_path: old_path,
......@@ -171,8 +162,7 @@ module Gitlab
job_id: job_id)
end
def handle_hashed_storage_migrated(event_log)
event = event_log.hashed_storage_migrated_event
def handle_hashed_storage_migrated_event(event, created_at)
return unless event.project_id
job_id = ::Geo::HashedStorageMigrationService.new(
......@@ -183,7 +173,7 @@ module Gitlab
).async_execute
logger.event_info(
event_log.created_at,
created_at,
message: 'Migrating project to hashed storage',
project_id: event.project_id,
old_storage_version: event.old_storage_version,
......@@ -193,6 +183,22 @@ module Gitlab
job_id: job_id)
end
def handle_lfs_object_deleted_event(event, created_at)
file_path = File.join(LfsObjectUploader.local_store_path, event.file_path)
job_id = ::Geo::FileRemovalWorker.perform_async(file_path)
logger.event_info(
created_at,
message: 'Deleted LFS object',
oid: event.oid,
file_id: event.lfs_object_id,
file_path: file_path,
job_id: job_id)
::Geo::FileRegistry.lfs_objects.where(file_id: event.lfs_object_id).delete_all
end
def find_or_initialize_registry(project_id, attrs)
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
registry.assign_attributes(attrs)
......
require 'spec_helper'
describe LfsObject do
describe '#local_store?' do
it 'returns true when file_store is nil' do
subject.file_store = nil
expect(subject.local_store?).to eq true
end
it 'returns true when file_store is equal to LfsObjectUploader::LOCAL_STORE' do
subject.file_store = LfsObjectUploader::LOCAL_STORE
expect(subject.local_store?).to eq true
end
it 'returns false whe file_store is equal to LfsObjectUploader::REMOTE_STORE' do
subject.file_store = LfsObjectUploader::REMOTE_STORE
expect(subject.local_store?).to eq false
end
end
describe '#destroy' do
subject { create(:lfs_object, :with_file) }
context 'when running in a Geo primary node' do
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
it 'logs an event to the Geo event log' do
expect { subject.destroy }.to change(Geo::LfsObjectDeletedEvent, :count).by(1)
end
end
end
end
require 'spec_helper'
describe RemoveUnreferencedLfsObjectsWorker do
describe '#perform' do
context 'when running in a Geo primary node' do
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
it 'logs an event to the Geo event log for every unreferenced LFS objects' do
unreferenced_lfs_object_1 = create(:lfs_object, :with_file)
unreferenced_lfs_object_2 = create(:lfs_object, :with_file)
referenced_lfs_object = create(:lfs_object)
create(:lfs_objects_project, lfs_object: referenced_lfs_object)
expect { subject.perform }.to change(Geo::LfsObjectDeletedEvent, :count).by(2)
expect(Geo::LfsObjectDeletedEvent.where(lfs_object: unreferenced_lfs_object_1.id)).to exist
expect(Geo::LfsObjectDeletedEvent.where(lfs_object: unreferenced_lfs_object_2.id)).to exist
expect(Geo::LfsObjectDeletedEvent.where(lfs_object: referenced_lfs_object.id)).not_to exist
end
end
end
end
......@@ -19,6 +19,10 @@ FactoryGirl.define do
trait :hashed_storage_migration_event do
hashed_storage_migrated_event factory: :geo_hashed_storage_migrated_event
end
trait :lfs_object_deleted_event do
lfs_object_deleted_event factory: :geo_lfs_object_deleted_event
end
end
factory :geo_repository_created_event, class: Geo::RepositoryCreatedEvent do
......@@ -76,4 +80,13 @@ FactoryGirl.define do
new_wiki_disk_path { project.wiki.path_with_namespace + '_new' }
new_storage_version { Project::LATEST_STORAGE_VERSION }
end
factory :geo_lfs_object_deleted_event, class: Geo::LfsObjectDeletedEvent do
lfs_object { create(:lfs_object, :with_file) }
after(:build, :stub) do |event, _|
event.oid = event.lfs_object.oid
event.file_path = event.lfs_object.file.path
end
end
end
......@@ -252,5 +252,32 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
daemon.run_once!
end
end
context 'when replaying a LFS object deleted event' do
let(:event_log) { create(:geo_event_log, :lfs_object_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:lfs_object_deleted_event) { event_log.lfs_object_deleted_event }
let(:lfs_object) { lfs_object_deleted_event.lfs_object }
it 'does not create a tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::FileRegistry, :count)
end
it 'schedules a Geo::FileRemovalWorker' do
file_path = File.join(LfsObjectUploader.local_store_path,
lfs_object_deleted_event.file_path)
expect(::Geo::FileRemovalWorker).to receive(:perform_async)
.with(file_path)
daemon.run_once!
end
it 'removes the tracking database entry if exist' do
create(:geo_file_registry, :lfs, file_id: lfs_object.id)
expect { daemon.run_once! }.to change(Geo::FileRegistry.lfs_objects, :count).by(-1)
end
end
end
end
......@@ -56,6 +56,13 @@ RSpec.describe Geo::EventLog, type: :model do
expect(subject.event).to eq hashed_storage_migrated_event
end
it 'returns lfs_object_deleted_event when set' do
lfs_object_deleted_event = build(:geo_lfs_object_deleted_event)
subject.lfs_object_deleted_event = lfs_object_deleted_event
expect(subject.event).to eq lfs_object_deleted_event
end
end
describe '#project_id' do
......
require 'spec_helper'
RSpec.describe Geo::LfsObjectDeletedEvent, type: :model do
describe 'relationships' do
it { is_expected.to belong_to(:lfs_object) }
end
describe 'validations' do
it { is_expected.to validate_presence_of(:lfs_object) }
it { is_expected.to validate_presence_of(:oid) }
it { is_expected.to validate_presence_of(:file_path) }
end
end
require 'spec_helper'
describe Geo::LfsObjectDeletedEventStore do
set(:secondary_node) { create(:geo_node) }
let(:lfs_object) { create(:lfs_object, :with_file, oid: 'b68143e6463773b1b6c6fd009a76c32aeec041faff32ba2ed42fd7f708a00004') }
subject(:event_store) { described_class.new(lfs_object) }
describe '#create' do
it 'does not create an event when not running on a primary node' do
allow(Gitlab::Geo).to receive(:primary?) { false }
expect { event_store.create }.not_to change(Geo::LfsObjectDeletedEvent, :count)
end
context 'when running on a primary node' do
before do
allow(Gitlab::Geo).to receive(:primary?) { true }
end
it 'does not create an event when LFS object is not on a local store' do
allow(lfs_object).to receive(:local_store?).and_return(false)
expect { event_store.create }.not_to change(Geo::LfsObjectDeletedEvent, :count)
end
it 'does not create an event when there are no secondary nodes' do
allow(Gitlab::Geo).to receive(:secondary_nodes) { [] }
expect { event_store.create }.not_to change(Geo::LfsObjectDeletedEvent, :count)
end
it 'creates a LFS object deleted event' do
expect { event_store.create }.to change(Geo::LfsObjectDeletedEvent, :count).by(1)
end
it 'tracks LFS object attributes' do
event_store.create
event = Geo::LfsObjectDeletedEvent.last
expect(event).to have_attributes(
lfs_object_id: lfs_object.id,
oid: lfs_object.oid,
file_path: 'b6/81/43e6463773b1b6c6fd009a76c32aeec041faff32ba2ed42fd7f708a00004'
)
end
it 'logs an error message when event creation fail' do
invalid_lfs_object = create(:lfs_object)
event_store = described_class.new(invalid_lfs_object)
expected_message = {
class: "Geo::LfsObjectDeletedEventStore",
lfs_object_id: invalid_lfs_object.id,
file_path: nil,
message: "Lfs object deleted event could not be created",
error: "Validation failed: File path can't be blank"
}
expect(Gitlab::Geo::Logger).to receive(:error)
.with(expected_message).and_call_original
event_store.create
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment