Commit 448e0446 authored by Michael Kozono's avatar Michael Kozono

Merge branch...

Merge branch '219960-geo-secondary-tries-to-download-a-file-indefinitely-when-sync-object-storage-is-enabled' into 'master'

Geo - Does not sync files on Object Storage when syncing object storage is enabled, but the Object Storage is disabled for data type

See merge request gitlab-org/gitlab!33561
parents 902280f3 ab659a0b
...@@ -22,6 +22,7 @@ module Geo ...@@ -22,6 +22,7 @@ module Geo
mark_as_synced = download_result.success || download_result.primary_missing_file mark_as_synced = download_result.success || download_result.primary_missing_file
log_file_download(mark_as_synced, download_result, start_time) log_file_download(mark_as_synced, download_result, start_time)
update_registry(download_result.bytes_downloaded, update_registry(download_result.bytes_downloaded,
mark_as_synced: mark_as_synced, mark_as_synced: mark_as_synced,
missing_on_primary: download_result.primary_missing_file) missing_on_primary: download_result.primary_missing_file)
...@@ -49,8 +50,9 @@ module Geo ...@@ -49,8 +50,9 @@ module Geo
bytes_downloaded: download_result.bytes_downloaded, bytes_downloaded: download_result.bytes_downloaded,
failed_before_transfer: download_result.failed_before_transfer, failed_before_transfer: download_result.failed_before_transfer,
primary_missing_file: download_result.primary_missing_file, primary_missing_file: download_result.primary_missing_file,
reason: download_result.reason,
download_time_s: (Time.current - start_time).to_f.round(3) download_time_s: (Time.current - start_time).to_f.round(3)
} }.compact
log_info("File download", metadata) log_info("File download", metadata)
end end
......
---
title: Geo - Does not sync files on Object Storage when syncing object storage is
enabled, but the Object Storage is disabled for the data type
merge_request: 33561
author:
type: fixed
...@@ -4,15 +4,12 @@ module Gitlab ...@@ -4,15 +4,12 @@ module Gitlab
module Geo module Geo
module Replication module Replication
class BaseDownloader class BaseDownloader
attr_reader :object_type, :object_db_id include ::Gitlab::Utils::StrongMemoize
def initialize(object_type, object_db_id) attr_reader :object_type, :object_db_id
@object_type = object_type
@object_db_id = object_db_id
end
class Result class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file, :failed_before_transfer attr_reader :success, :bytes_downloaded, :primary_missing_file, :failed_before_transfer, :reason
def self.from_transfer_result(transfer_result) def self.from_transfer_result(transfer_result)
Result.new(success: transfer_result.success, Result.new(success: transfer_result.success,
...@@ -20,21 +17,78 @@ module Gitlab ...@@ -20,21 +17,78 @@ module Gitlab
bytes_downloaded: transfer_result.bytes_downloaded) bytes_downloaded: transfer_result.bytes_downloaded)
end end
def initialize(success:, bytes_downloaded:, primary_missing_file: false, failed_before_transfer: false) def initialize(success:, bytes_downloaded:, reason: nil, primary_missing_file: false, failed_before_transfer: false)
@success = success @success = success
@bytes_downloaded = bytes_downloaded @bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file @primary_missing_file = primary_missing_file
@failed_before_transfer = failed_before_transfer @failed_before_transfer = failed_before_transfer
@reason = reason
end end
end end
def initialize(object_type, object_db_id)
@object_type = object_type
@object_db_id = object_db_id
end
def execute
check_result = check_preconditions
return check_result if check_result
result = if local_store?
transfer.download_from_primary
else
transfer.stream_from_primary_to_object_storage
end
Result.from_transfer_result(result)
end
private private
def fail_before_transfer def check_preconditions
Result.new(success: false, bytes_downloaded: 0, failed_before_transfer: true) unless resource.present?
return skip_transfer_error(reason: "Skipping transfer as the #{object_type.to_s.humanize(capitalize: false)} (ID = #{object_db_id}) could not be found")
end
unless local_store?
unless sync_object_storage_enabled?
return skip_transfer_error(reason: "Skipping transfer as this secondary node is not allowed to replicate content on Object Storage")
end
unless object_store_enabled?
return skip_transfer_error(reason: "Skipping transfer as this secondary node is not configured to store #{object_type.to_s.humanize(capitalize: false)} on Object Storage")
end
end
nil
end
def local_store?
resource.local_store?
end
def resource
raise NotImplementedError, "#{self.class} does not implement #{__method__}"
end
def transfer
raise NotImplementedError, "#{self.class} does not implement #{__method__}"
end
def object_store_enabled?
raise NotImplementedError, "#{self.class} does not implement #{__method__}"
end
def sync_object_storage_enabled?
Gitlab::Geo.current_node.sync_object_storage
end
def skip_transfer_error(reason: nil)
Result.new(success: false, bytes_downloaded: 0, reason: reason, failed_before_transfer: true)
end end
def missing_on_primary def missing_on_primary_error
Result.new(success: true, bytes_downloaded: 0, primary_missing_file: true) Result.new(success: true, bytes_downloaded: 0, primary_missing_file: true)
end end
end end
......
...@@ -87,11 +87,27 @@ module Gitlab ...@@ -87,11 +87,27 @@ module Gitlab
unless ensure_destination_path_exists unless ensure_destination_path_exists
return failure_result(reason: 'Skipping transfer as we cannot create the destination directory') return failure_result(reason: 'Skipping transfer as we cannot create the destination directory')
end end
else
unless sync_object_storage_enabled?
return failure_result(reason: 'Skipping transfer as this secondary node is not allowed to replicate content on Object Storage')
end
unless object_store_enabled?
return failure_result(reason: "Skipping transfer as this secondary node is not configured to store #{replicator.replicable_name} on Object Storage")
end
end end
nil nil
end end
def sync_object_storage_enabled?
Gitlab::Geo.current_node.sync_object_storage
end
def object_store_enabled?
carrierwave_uploader.class.object_store_enabled?
end
def absolute_path def absolute_path
carrierwave_uploader.path carrierwave_uploader.path
end end
......
...@@ -9,30 +9,28 @@ module Gitlab ...@@ -9,30 +9,28 @@ module Gitlab
# * Returning a detailed Result # * Returning a detailed Result
# #
class FileDownloader < BaseDownloader class FileDownloader < BaseDownloader
# Executes the actual file download private
#
# Subclasses should return the number of bytes downloaded,
# or nil or -1 if a failure occurred.
def execute
upload = find_resource
return fail_before_transfer unless upload.present?
return missing_on_primary if upload.model.nil?
transfer = ::Gitlab::Geo::Replication::FileTransfer.new(object_type.to_sym, upload) def check_preconditions
return missing_on_primary_error if resource && resource.model.nil?
result = if upload.local? super
transfer.download_from_primary end
else
transfer.stream_from_primary_to_object_storage
end
Result.from_transfer_result(result) def local_store?
resource.local?
end end
private def resource
strong_memoize(:resource) { ::Upload.find_by_id(object_db_id) }
end
def transfer
strong_memoize(:transfer) { ::Gitlab::Geo::Replication::FileTransfer.new(object_type.to_sym, resource) }
end
def find_resource def object_store_enabled?
Upload.find_by_id(object_db_id) ::FileUploader.object_store_enabled?
end end
end end
end end
......
...@@ -9,25 +9,18 @@ module Gitlab ...@@ -9,25 +9,18 @@ module Gitlab
# * Returning a detailed Result # * Returning a detailed Result
# #
class JobArtifactDownloader < BaseDownloader class JobArtifactDownloader < BaseDownloader
def execute private
job_artifact = find_resource
return fail_before_transfer unless job_artifact.present?
transfer = ::Gitlab::Geo::Replication::JobArtifactTransfer.new(job_artifact)
result = if job_artifact.local_store?
transfer.download_from_primary
else
transfer.stream_from_primary_to_object_storage
end
Result.from_transfer_result(result) def resource
strong_memoize(:resource) { ::Ci::JobArtifact.find_by_id(object_db_id) }
end end
private def transfer
strong_memoize(:transfer) { ::Gitlab::Geo::Replication::JobArtifactTransfer.new(resource) }
end
def find_resource def object_store_enabled?
::Ci::JobArtifact.find_by_id(object_db_id) ::JobArtifactUploader.object_store_enabled?
end end
end end
end end
......
...@@ -9,25 +9,18 @@ module Gitlab ...@@ -9,25 +9,18 @@ module Gitlab
# * Returning a detailed Result # * Returning a detailed Result
# #
class LfsDownloader < BaseDownloader class LfsDownloader < BaseDownloader
def execute private
lfs_object = find_resource
return fail_before_transfer unless lfs_object.present?
transfer = ::Gitlab::Geo::Replication::LfsTransfer.new(lfs_object)
result = if lfs_object.local_store?
transfer.download_from_primary
else
transfer.stream_from_primary_to_object_storage
end
Result.from_transfer_result(result) def resource
strong_memoize(:resource) { ::LfsObject.find_by_id(object_db_id) }
end end
private def transfer
strong_memoize(:transfer) { ::Gitlab::Geo::Replication::LfsTransfer.new(resource) }
end
def find_resource def object_store_enabled?
LfsObject.find_by_id(object_db_id) ::LfsObjectUploader.object_store_enabled?
end end
end end
end end
......
...@@ -52,6 +52,40 @@ RSpec.describe Gitlab::Geo::Replication::BlobDownloader do ...@@ -52,6 +52,40 @@ RSpec.describe Gitlab::Geo::Replication::BlobDownloader do
xit 'ensures the file destination directory exists' # Not worth testing here as-is. Extract the functionality first. xit 'ensures the file destination directory exists' # Not worth testing here as-is. Extract the functionality first.
end end
context 'when the file is on Object Storage' do
before do
stub_package_file_object_storage(enabled: true, direct_upload: true)
end
let!(:model_record) { create(:package_file, :npm, :object_storage) }
subject { described_class.new(replicator: model_record.replicator) }
context 'with object storage sync disabled' do
before do
secondary.update_column(:sync_object_storage, false)
end
it 'returns failure' do
result = subject.execute
expect(result.success).to be_falsey
end
end
context 'with object storage disabled' do
before do
stub_package_file_object_storage(enabled: false)
end
it 'returns failure' do
result = subject.execute
expect(result.success).to be_falsey
end
end
end
end end
context 'when an error occurs while getting a Tempfile' do context 'when an error occurs while getting a Tempfile' do
......
...@@ -5,37 +5,116 @@ require 'spec_helper' ...@@ -5,37 +5,116 @@ require 'spec_helper'
RSpec.describe Gitlab::Geo::Replication::FileDownloader, :geo do RSpec.describe Gitlab::Geo::Replication::FileDownloader, :geo do
include EE::GeoHelpers include EE::GeoHelpers
let_it_be(:primary_node) { create(:geo_node, :primary) } describe '#execute' do
let_it_be(:primary_node) { create(:geo_node, :primary) }
let_it_be(:secondary, reload: true) { create(:geo_node) }
subject { downloader.execute } before do
stub_current_geo_node(secondary)
end
let(:upload) { create(:upload, :issuable_upload, :with_file) } context 'with upload' do
let(:downloader) { described_class.new(:file, upload.id) } context 'on local storage' do
let(:upload) { create(:upload, :with_file) }
context 'when in primary geo node' do subject(:downloader) { described_class.new(:avatar, upload.id) }
before do
stub_current_geo_node(primary_node) it 'downloads the file from the primary' do
stub_geo_file_transfer(:avatar, upload)
expect_next_instance_of(Gitlab::Geo::Replication::FileTransfer) do |instance|
expect(instance).to receive(:download_from_primary).and_call_original
end
expect(downloader.execute).to have_attributes(success: true)
end
end
context 'on object storage' do
before do
stub_uploads_object_storage(AvatarUploader, direct_upload: true)
end
let!(:upload) { create(:upload, :object_storage) }
subject(:downloader) { described_class.new(:avatar, upload.id) }
it 'streams the upload file from the primary to object storage' do
stub_geo_file_transfer_object_storage(:avatar, upload)
expect_next_instance_of(Gitlab::Geo::Replication::FileTransfer) do |instance|
expect(instance).to receive(:stream_from_primary_to_object_storage).and_call_original
end
expect(downloader.execute).to have_attributes(success: true)
end
context 'with object storage sync disabled' do
before do
secondary.update_column(:sync_object_storage, false)
end
it 'returns a result indicating a failure before a transfer was attempted' do
result = downloader.execute
expect(result).to have_attributes(
success: false,
failed_before_transfer: true,
reason: 'Skipping transfer as this secondary node is not allowed to replicate content on Object Storage'
)
end
end
context 'with object storage disabled' do
before do
stub_uploads_object_storage(AvatarUploader, enabled: false)
end
it 'returns a result indicating a failure before a transfer was attempted' do
result = downloader.execute
expect(result).to have_attributes(
success: false,
failed_before_transfer: true,
reason: 'Skipping transfer as this secondary node is not configured to store avatar on Object Storage'
)
end
end
end
end end
it 'fails to download the file' do context 'with unknown object ID' do
expect(subject.success).to be_falsey let(:unknown_id) { Upload.maximum(:id).to_i + 1 }
expect(subject.primary_missing_file).to be_falsey
subject(:downloader) { described_class.new(:avatar, unknown_id) }
it 'returns a result indicating a failure before a transfer was attempted' do
result = downloader.execute
expect(result).to have_attributes(
success: false,
failed_before_transfer: true,
reason: "Skipping transfer as the avatar (ID = #{unknown_id}) could not be found"
)
end
end end
end
context 'when in a secondary geo node' do context 'when the upload parent object does not exist' do
context 'with local storage only' do let(:upload) { create(:upload) }
let(:secondary_node) { create(:geo_node, :local_storage_only) }
before do subject(:downloader) { described_class.new(:avatar, upload.id) }
stub_current_geo_node(secondary_node)
stub_geo_file_transfer(:file, upload) before do
upload.update_columns(model_id: nil, model_type: nil)
end end
it 'downloads the file' do it 'returns a result indicating a failure before a transfer was attempted' do
expect(subject.success).to be_truthy result = downloader.execute
expect(subject.primary_missing_file).to be_falsey
expect(result).to have_attributes(
success: true,
primary_missing_file: true # FIXME: https://gitlab.com/gitlab-org/gitlab/-/issues/220855
)
end end
end end
end end
...@@ -48,7 +127,10 @@ RSpec.describe Gitlab::Geo::Replication::FileDownloader, :geo do ...@@ -48,7 +127,10 @@ RSpec.describe Gitlab::Geo::Replication::FileDownloader, :geo do
def stub_geo_file_transfer_object_storage(file_type, upload) def stub_geo_file_transfer_object_storage(file_type, upload)
url = primary_node.geo_transfers_url(file_type, upload.id.to_s) url = primary_node.geo_transfers_url(file_type, upload.id.to_s)
redirection = upload.retrieve_uploader.url
file = fixture_file_upload('spec/fixtures/dk.png')
stub_request(:get, url).to_return(status: 307, body: upload.retrieve_uploader.url, headers: {}) stub_request(:get, url).to_return(status: 307, headers: { location: redirection })
stub_request(:get, redirection).to_return(status: 200, body: file.read, headers: {})
end end
end end
...@@ -3,31 +3,98 @@ ...@@ -3,31 +3,98 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Gitlab::Geo::Replication::JobArtifactDownloader, :geo do RSpec.describe Gitlab::Geo::Replication::JobArtifactDownloader, :geo do
let(:job_artifact) { create(:ci_job_artifact) } include ::EE::GeoHelpers
describe '#execute' do describe '#execute' do
let_it_be(:secondary, reload: true) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
end
context 'with job artifact' do context 'with job artifact' do
it 'returns a FileDownloader::Result object' do context 'on local storage' do
downloader = described_class.new(:job_artifact, job_artifact.id) let(:job_artifact) { create(:ci_job_artifact) }
result = Gitlab::Geo::Replication::BaseTransfer::Result.new(success: true, bytes_downloaded: 1)
subject(:downloader) { described_class.new(:job_artifact, job_artifact.id) }
it 'downloads the job artifact from the primary' do
result = Gitlab::Geo::Replication::BaseTransfer::Result.new(success: true, bytes_downloaded: 1)
expect_next_instance_of(Gitlab::Geo::Replication::JobArtifactTransfer) do |instance|
expect(instance).to receive(:download_from_primary).and_return(result)
end
expect(downloader.execute).to have_attributes(success: true, bytes_downloaded: 1)
end
end
context 'on object storage' do
before do
stub_artifacts_object_storage
end
let!(:job_artifact) { create(:ci_job_artifact, :remote_store) }
allow_next_instance_of(Gitlab::Geo::Replication::JobArtifactTransfer) do |instance| subject(:downloader) { described_class.new(:job_artifact, job_artifact.id) }
allow(instance).to receive(:download_from_primary).and_return(result)
it 'streams the job artifact file from the primary to object storage' do
result = Gitlab::Geo::Replication::BaseTransfer::Result.new(success: true, bytes_downloaded: 1)
expect_next_instance_of(Gitlab::Geo::Replication::JobArtifactTransfer) do |instance|
expect(instance).to receive(:stream_from_primary_to_object_storage).and_return(result)
end
expect(downloader.execute).to have_attributes(success: true, bytes_downloaded: 1)
end
context 'with object storage sync disabled' do
before do
secondary.update_column(:sync_object_storage, false)
end
it 'returns a result indicating a failure before a transfer was attempted' do
result = downloader.execute
expect(result).to have_attributes(
success: false,
failed_before_transfer: true,
reason: 'Skipping transfer as this secondary node is not allowed to replicate content on Object Storage'
)
end
end end
expect(downloader.execute).to be_a(Gitlab::Geo::Replication::FileDownloader::Result) context 'with object storage disabled' do
before do
stub_artifacts_object_storage(enabled: false)
end
it 'returns a result indicating a failure before a transfer was attempted' do
result = downloader.execute
expect(result).to have_attributes(
success: false,
failed_before_transfer: true,
reason: 'Skipping transfer as this secondary node is not configured to store job artifact on Object Storage'
)
end
end
end end
end end
context 'with unknown job artifact' do context 'with unknown object ID' do
let(:downloader) { described_class.new(:job_artifact, 10000) } let(:unknown_id) { Ci::JobArtifact.maximum(:id).to_i + 1 }
it 'returns a FileDownloader::Result object' do subject(:downloader) { described_class.new(:job_artifact, unknown_id) }
expect(downloader.execute).to be_a(Gitlab::Geo::Replication::FileDownloader::Result)
end
it 'returns a result indicating a failure before a transfer was attempted' do it 'returns a result indicating a failure before a transfer was attempted' do
expect(downloader.execute.failed_before_transfer).to be_truthy result = downloader.execute
expect(result).to have_attributes(
success: false,
failed_before_transfer: true,
reason: "Skipping transfer as the job artifact (ID = #{unknown_id}) could not be found"
)
end end
end end
end end
......
...@@ -3,31 +3,98 @@ ...@@ -3,31 +3,98 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Gitlab::Geo::Replication::LfsDownloader, :geo do RSpec.describe Gitlab::Geo::Replication::LfsDownloader, :geo do
let(:lfs_object) { create(:lfs_object) } include ::EE::GeoHelpers
describe '#execute' do describe '#execute' do
let_it_be(:secondary, reload: true) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
end
context 'with LFS object' do context 'with LFS object' do
it 'returns a FileDownloader::Result object' do context 'on local storage' do
downloader = described_class.new(:lfs, lfs_object.id) let(:lfs_object) { create(:lfs_object) }
result = Gitlab::Geo::Replication::BaseTransfer::Result.new(success: true, bytes_downloaded: 1)
subject(:downloader) { described_class.new(:lfs, lfs_object.id) }
it 'downloads the LFS file from the primary' do
result = Gitlab::Geo::Replication::BaseTransfer::Result.new(success: true, bytes_downloaded: 1)
expect_next_instance_of(Gitlab::Geo::Replication::LfsTransfer) do |instance|
expect(instance).to receive(:download_from_primary).and_return(result)
end
expect(downloader.execute).to have_attributes(success: true, bytes_downloaded: 1)
end
end
context 'on object storage' do
before do
stub_lfs_object_storage
end
let!(:lfs_object) { create(:lfs_object, :object_storage) }
allow_next_instance_of(Gitlab::Geo::Replication::LfsTransfer) do |instance| subject(:downloader) { described_class.new(:lfs, lfs_object.id) }
allow(instance).to receive(:download_from_primary).and_return(result)
it 'streams the LFS file from the primary to object storage' do
result = Gitlab::Geo::Replication::BaseTransfer::Result.new(success: true, bytes_downloaded: 1)
expect_next_instance_of(Gitlab::Geo::Replication::LfsTransfer) do |instance|
expect(instance).to receive(:stream_from_primary_to_object_storage).and_return(result)
end
expect(downloader.execute).to have_attributes(success: true, bytes_downloaded: 1)
end
context 'with object storage sync disabled' do
before do
secondary.update_column(:sync_object_storage, false)
end
it 'returns a result indicating a failure before a transfer was attempted' do
result = downloader.execute
expect(result).to have_attributes(
success: false,
failed_before_transfer: true,
reason: 'Skipping transfer as this secondary node is not allowed to replicate content on Object Storage'
)
end
end end
expect(downloader.execute).to be_a(Gitlab::Geo::Replication::FileDownloader::Result) context 'with object storage disabled' do
before do
stub_lfs_object_storage(enabled: false)
end
it 'returns a result indicating a failure before a transfer was attempted' do
result = downloader.execute
expect(result).to have_attributes(
success: false,
failed_before_transfer: true,
reason: 'Skipping transfer as this secondary node is not configured to store lfs on Object Storage'
)
end
end
end end
end end
context 'with unknown job artifact' do context 'with unknown object ID' do
let(:downloader) { described_class.new(:lfs, 10000) } let(:unknown_id) { LfsObject.maximum(:id).to_i + 1 }
it 'returns a FileDownloader::Result object' do subject(:downloader) { described_class.new(:lfs, unknown_id) }
expect(downloader.execute).to be_a(Gitlab::Geo::Replication::FileDownloader::Result)
end
it 'returns a result indicating a failure before a transfer was attempted' do it 'returns a result indicating a failure before a transfer was attempted' do
expect(downloader.execute.failed_before_transfer).to be_truthy result = downloader.execute
expect(result).to have_attributes(
success: false,
failed_before_transfer: true,
reason: "Skipping transfer as the lfs (ID = #{unknown_id}) could not be found"
)
end end
end end
end end
......
...@@ -120,10 +120,14 @@ RSpec.describe Geo::FileDownloadService do ...@@ -120,10 +120,14 @@ RSpec.describe Geo::FileDownloadService do
context 'for a new file' do context 'for a new file' do
context 'when the downloader fails before attempting a transfer' do context 'when the downloader fails before attempting a transfer' do
it 'logs that the download failed before attempting a transfer' do it 'logs that the download failed before attempting a transfer' do
result = double(:result, success: false, bytes_downloaded: 0, primary_missing_file: false, failed_before_transfer: true) result = double(:result, success: false, bytes_downloaded: 0, primary_missing_file: false, failed_before_transfer: true, reason: 'Something went wrong')
downloader = double(:downloader, execute: result) downloader = double(:downloader, execute: result)
expect(download_service).to receive(:downloader).and_return(downloader) allow(download_service).to receive(:downloader).and_return(downloader)
expect(Gitlab::Geo::Logger).to receive(:info).with(hash_including(:message, :download_time_s, download_success: false, bytes_downloaded: 0, failed_before_transfer: true)).and_call_original
expect(Gitlab::Geo::Logger)
.to receive(:info)
.with(hash_including(:message, :download_time_s, download_success: false, reason: 'Something went wrong', bytes_downloaded: 0, failed_before_transfer: true))
.and_call_original
execute! execute!
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment