Commit 8e24a20c authored by Michael Kozono's avatar Michael Kozono

Merge branch '329589-geo-secondaries-may-be-orphaning-upload-files' into 'master'

Geo: Secondaries may be orphaning Upload files

See merge request gitlab-org/gitlab!69763
parents b574ddd8 d4e54efc
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
class Upload < ApplicationRecord class Upload < ApplicationRecord
include Checksummable include Checksummable
# Upper limit for foreground checksum processing # Upper limit for foreground checksum processing
CHECKSUM_THRESHOLD = 100.megabytes CHECKSUM_THRESHOLD = 100.megabytes
...@@ -51,9 +52,9 @@ class Upload < ApplicationRecord ...@@ -51,9 +52,9 @@ class Upload < ApplicationRecord
## ##
# FastDestroyAll concerns # FastDestroyAll concerns
def finalize_fast_destroy(keys) def finalize_fast_destroy(items_to_remove)
keys.each do |store_class, paths| items_to_remove.each do |store_class, keys|
store_class.new.delete_keys_async(paths) store_class.new.delete_keys_async(keys)
end end
end end
end end
...@@ -65,6 +66,10 @@ class Upload < ApplicationRecord ...@@ -65,6 +66,10 @@ class Upload < ApplicationRecord
uploader_class.absolute_path(self) uploader_class.absolute_path(self)
end end
def relative_path
uploader_class.relative_path(self)
end
def calculate_checksum! def calculate_checksum!
self.checksum = nil self.checksum = nil
return unless needs_checksum? return unless needs_checksum?
......
...@@ -55,3 +55,5 @@ module Uploads ...@@ -55,3 +55,5 @@ module Uploads
end end
end end
end end
Uploads::Local.prepend_mod
...@@ -32,7 +32,14 @@ class FileUploader < GitlabUploader ...@@ -32,7 +32,14 @@ class FileUploader < GitlabUploader
def self.absolute_path(upload) def self.absolute_path(upload)
File.join( File.join(
absolute_base_dir(upload.model), root,
relative_path(upload)
)
end
def self.relative_path(upload)
File.join(
base_dir(upload.model),
upload.path # already contain the dynamic_segment, see #upload_path upload.path # already contain the dynamic_segment, see #upload_path
) )
end end
......
...@@ -98,16 +98,20 @@ module Geo ...@@ -98,16 +98,20 @@ module Geo
carrierwave_uploader.file.exists? carrierwave_uploader.file.exists?
end end
def deleted_params
{
model_record_id: model_record.id,
uploader_class: carrierwave_uploader.class.to_s,
blob_path: carrierwave_uploader.relative_path
}
end
private private
def download def download
::Geo::BlobDownloadService.new(replicator: self).execute ::Geo::BlobDownloadService.new(replicator: self).execute
end end
def deleted_params
{ model_record_id: model_record.id, uploader_class: carrierwave_uploader.class.to_s, blob_path: carrierwave_uploader.relative_path }
end
# Return whether it's capable of generating a checksum of itself # Return whether it's capable of generating a checksum of itself
# #
# @return [Boolean] whether it can generate a checksum # @return [Boolean] whether it can generate a checksum
......
# frozen_string_literal: true
module EE
module Uploads
module Local
extend ::Gitlab::Utils::Override
override :keys
def keys(relation)
return super unless ::Geo::EventStore.can_create_event?
relation.includes(:model).find_each.map do |record|
record.replicator.deleted_params.merge(absolute_path: record.absolute_path)
end
end
override :delete_keys_async
def delete_keys_async(keys_to_delete)
return super unless ::Geo::EventStore.can_create_event?
keys_to_delete.each_slice(::Uploads::Base::BATCH_SIZE) do |batch|
::DeleteStoredFilesWorker.perform_async(self.class, batch.pluck(:absolute_path))
::Geo::UploadReplicator.bulk_create_delete_events_async(batch)
end
end
end
end
end
...@@ -71,6 +71,7 @@ module Geo ...@@ -71,6 +71,7 @@ module Geo
class_name: 'Geo::Event', class_name: 'Geo::Event',
foreign_key: :geo_event_id, foreign_key: :geo_event_id,
inverse_of: :geo_event_log inverse_of: :geo_event_log
def self.latest_event def self.latest_event
order(id: :desc).first order(id: :desc).first
end end
......
...@@ -9,6 +9,31 @@ module Geo ...@@ -9,6 +9,31 @@ module Geo
::Upload ::Upload
end end
def self.bulk_create_delete_events_async(deleted_uploads)
return if deleted_uploads.empty?
deleted_upload_details = []
events = deleted_uploads.map do |upload|
deleted_upload_details << [upload[:model_record_id], upload[:blob_path]]
{
replicable_name: 'upload',
event_name: 'deleted',
payload: {
model_record_id: upload[:model_record_id],
blob_path: upload[:blob_path],
uploader_class: upload[:uploader_class]
},
created_at: Time.current
}
end
log_info('Delete bulk of uploads: ', uploads: deleted_upload_details)
::Geo::BatchEventCreateWorker.perform_async(events)
end
def carrierwave_uploader def carrierwave_uploader
model_record.retrieve_uploader model_record.retrieve_uploader
end end
......
...@@ -462,6 +462,15 @@ ...@@ -462,6 +462,15 @@
:weight: 2 :weight: 2
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: geo:geo_batch_event_create
:worker_name: Geo::BatchEventCreateWorker
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: geo:geo_batch_project_registry - :name: geo:geo_batch_project_registry
:worker_name: Geo::Batch::ProjectRegistryWorker :worker_name: Geo::Batch::ProjectRegistryWorker
:feature_category: :geo_replication :feature_category: :geo_replication
......
# frozen_string_literal: true
module Geo
class BatchEventCreateWorker
include ApplicationWorker
data_consistency :always
include GeoQueue
include ::Gitlab::Geo::LogHelpers
idempotent!
def perform(events)
log_info('Executing Geo::BatchEventCreateWorker', events_count: events.size)
::Gitlab::Geo::Replicator.bulk_create_events(events)
end
end
end
...@@ -193,6 +193,20 @@ module Gitlab ...@@ -193,6 +193,20 @@ module Gitlab
false false
end end
def self.bulk_create_events(events)
::Geo::EventLog.transaction do
results = ::Geo::Event.insert_all!(events)
break if results.rows.empty?
ids = results.map { |result| { geo_event_id: result['id'], created_at: Time.current } }
::Geo::EventLog.insert_all!(ids)
end
rescue ActiveRecord::RecordInvalid, NoMethodError => e
log_error('Geo::EventLog could not be created in bulk', e)
end
# @param [ActiveRecord::Base] model_record # @param [ActiveRecord::Base] model_record
# @param [Integer] model_record_id # @param [Integer] model_record_id
def initialize(model_record: nil, model_record_id: nil) def initialize(model_record: nil, model_record_id: nil)
...@@ -222,7 +236,6 @@ module Gitlab ...@@ -222,7 +236,6 @@ module Gitlab
raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name) raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name)
create_event_with( create_event_with(
class_name: ::Geo::Event,
replicable_name: self.class.replicable_name, replicable_name: self.class.replicable_name,
event_name: event_name, event_name: event_name,
payload: event_data payload: event_data
...@@ -310,16 +323,15 @@ module Gitlab ...@@ -310,16 +323,15 @@ module Gitlab
# Store an event on the database # Store an event on the database
# #
# @example Create an event # @example Create an event
# create_event_with(class_name: Geo::CacheInvalidationEvent, key: key) # create_event_with(key: key)
# #
# @param [Class] class_name a ActiveRecord class that's used to store an event for Geo
# @param [Hash] **params context information that will be stored in the event table # @param [Hash] **params context information that will be stored in the event table
# @return [ApplicationRecord] event instance that was just persisted # @return [ApplicationRecord] event instance that was just persisted
def create_event_with(class_name:, **params) def create_event_with(**params)
return unless Gitlab::Geo.primary? return unless Gitlab::Geo.primary?
return unless Gitlab::Geo.secondary_nodes.any? return unless Gitlab::Geo.secondary_nodes.any?
event = class_name.create!(**params) event = ::Geo::Event.create!(**params)
# Only works with the new geo_events at the moment because we need to # Only works with the new geo_events at the moment because we need to
# know which foreign key to use # know which foreign key to use
...@@ -327,7 +339,7 @@ module Gitlab ...@@ -327,7 +339,7 @@ module Gitlab
event event
rescue ActiveRecord::RecordInvalid, NoMethodError => e rescue ActiveRecord::RecordInvalid, NoMethodError => e
log_error("#{class_name} could not be created", e, params) log_error("::Geo::Event could not be created", e, params)
end end
def current_node def current_node
......
...@@ -133,6 +133,27 @@ RSpec.describe Gitlab::Geo::Replicator do ...@@ -133,6 +133,27 @@ RSpec.describe Gitlab::Geo::Replicator do
end end
end end
describe '.bulk_create_events' do
let(:event) do
{
replicable_name: 'upload',
event_name: 'created',
payload: {
data: "some payload"
},
created_at: Time.current
}
end
let(:events) { [event] }
it 'creates events' do
expect { Gitlab::Geo::Replicator.bulk_create_events(events) }.to change { ::Geo::EventLog.count }.from(0).to(1)
expect(::Geo::EventLog.last.event).to be_present
end
end
describe '#initialize' do describe '#initialize' do
subject(:replicator) { Geo::DummyReplicator.new(**args) } subject(:replicator) { Geo::DummyReplicator.new(**args) }
......
...@@ -87,7 +87,7 @@ RSpec.describe Upload do ...@@ -87,7 +87,7 @@ RSpec.describe Upload do
end end
describe '#destroy' do describe '#destroy' do
subject { create(:upload, checksum: '8710d2c16809c79fee211a9693b64038a8aae99561bc86ce98a9b46b45677fe4') } subject { create(:upload, :namespace_upload, checksum: '8710d2c16809c79fee211a9693b64038a8aae99561bc86ce98a9b46b45677fe4') }
context 'when running in a Geo primary node' do context 'when running in a Geo primary node' do
let_it_be(:primary) { create(:geo_node, :primary) } let_it_be(:primary) { create(:geo_node, :primary) }
...@@ -98,6 +98,18 @@ RSpec.describe Upload do ...@@ -98,6 +98,18 @@ RSpec.describe Upload do
expect { subject.destroy }.to change(Geo::UploadDeletedEvent, :count).by(1) expect { subject.destroy }.to change(Geo::UploadDeletedEvent, :count).by(1)
end end
it 'logs an event to the Geo event log when bulk removal is used', :sidekiq_inline do
stub_current_geo_node(primary)
expect { subject.model.destroy }.to change(Geo::Event.where(replicable_name: :upload, event_name: :deleted), :count).by(1)
payload = Geo::Event.where(replicable_name: :upload, event_name: :deleted).last.payload
expect(payload['model_record_id']).to eq(subject.id)
expect(payload['blob_path']).to eq(subject.relative_path)
expect(payload['uploader_class']).to eq('NamespaceFileUploader')
end
end end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Uploads::Local, :geo do
include ::EE::GeoHelpers
let(:data_store) { described_class.new }
before do
stub_uploads_object_storage(FileUploader)
end
context 'on a primary when secondary nodes exist' do
let(:project) { create(:project) }
let(:relation) { project.uploads }
before do
allow(::Geo::EventStore).to receive(:can_create_event?).and_return(true)
end
describe '#keys' do
let(:upload) { create(:upload, uploader: FileUploader, model: project) }
let!(:uploads) { [upload] }
it 'returns keys' do
keys = data_store.keys(relation)
expected_hash = {
absolute_path: upload.absolute_path,
blob_path: upload.retrieve_uploader.relative_path,
model_record_id: upload.id,
uploader_class: "FileUploader"
}
expect(keys.size).to eq 1
expect(keys.first).to include(expected_hash)
end
end
describe '#delete_keys_async' do
it 'performs calls to DeleteStoredFilesWorker and Geo::UploadReplicator.bulk_create_delete_events_async' do
keys_to_delete = [{
absolute_path: 'absolute_path',
blob_path: 'relative_path',
model_record_id: 1,
uploader_class: "FileUploader"
}]
expect(::DeleteStoredFilesWorker).to receive(:perform_async).with(Uploads::Local, ['absolute_path'])
expect(::Geo::UploadReplicator).to receive(:bulk_create_delete_events_async).with(keys_to_delete)
data_store.delete_keys_async(keys_to_delete)
end
end
end
end
...@@ -6,4 +6,30 @@ RSpec.describe Geo::UploadReplicator do ...@@ -6,4 +6,30 @@ RSpec.describe Geo::UploadReplicator do
let(:model_record) { create(:upload, :with_file) } let(:model_record) { create(:upload, :with_file) }
include_examples 'a blob replicator' include_examples 'a blob replicator'
describe '.bulk_create_delete_events_async' do
let(:deleted_upload) do
{
model_record_id: 1,
blob_path: 'path',
uploader_class: 'UploaderClass'
}
end
let(:deleted_uploads) { [deleted_upload] }
it 'calls Geo::BatchEventCreateWorker and passes events array', :sidekiq_inline do
expect { described_class.bulk_create_delete_events_async(deleted_uploads) }.to change { ::Geo::Event.count }.from(0).to(1)
created_event = ::Geo::Event.last
expect(created_event.replicable_name).to eq 'upload'
expect(created_event.event_name).to eq 'deleted'
expect(created_event.created_at).to be_present
expect(created_event.payload).to eq(deleted_upload.stringify_keys)
end
it 'returns nil when empty array is passed' do
expect(described_class.bulk_create_delete_events_async([])).to be_nil
end
end
end end
...@@ -51,7 +51,7 @@ module EE ...@@ -51,7 +51,7 @@ module EE
def stub_dummy_replicator_class def stub_dummy_replicator_class
stub_const('Geo::DummyReplicator', Class.new(::Gitlab::Geo::Replicator)) stub_const('Geo::DummyReplicator', Class.new(::Gitlab::Geo::Replicator))
Geo::DummyReplicator.class_eval do ::Geo::DummyReplicator.class_eval do
event :test event :test
event :another_test event :another_test
......
# frozen_string_literal: true
require "spec_helper"
RSpec.describe Geo::BatchEventCreateWorker, :geo do
describe "#perform" do
it "calls Gitlab::Geo::Replicator.bulk_create_events" do
events = []
expect(::Gitlab::Geo::Replicator).to receive(:bulk_create_events).with(events)
described_class.new.perform(events)
end
end
end
...@@ -82,6 +82,18 @@ RSpec.describe Upload do ...@@ -82,6 +82,18 @@ RSpec.describe Upload do
end end
end end
describe '#relative_path' do
it "delegates to the uploader's relative_path method" do
uploader = spy('FakeUploader')
upload = described_class.new(path: '/tmp/secret/file.jpg', store: ObjectStorage::Store::LOCAL)
expect(upload).to receive(:uploader_class).and_return(uploader)
upload.relative_path
expect(uploader).to have_received(:relative_path).with(upload)
end
end
describe '#calculate_checksum!' do describe '#calculate_checksum!' do
let(:upload) do let(:upload) do
described_class.new(path: __FILE__, described_class.new(path: __FILE__,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment