Commit d07dabd1 authored by Gabriel Mazetto's avatar Gabriel Mazetto

Saving replicable items to Object Storage

This allows Geo to transfer files from primary to the secondary, with
both sides using Object Storage.

We only initiate a transfer of files in Object Storage if the Geo node
is configured to do so.
parent d7df806c
......@@ -15,7 +15,7 @@ class Upload < ApplicationRecord
scope :with_files_stored_remotely, -> { where(store: ObjectStorage::Store::REMOTE) }
before_save :calculate_checksum!, if: :foreground_checksummable?
after_commit :schedule_checksum, if: :checksummable?
after_commit :schedule_checksum, if: :needs_checksum?
# as the FileUploader is not mounted, the default CarrierWave ActiveRecord
# hooks are not executed and the file will not be deleted
......@@ -53,7 +53,7 @@ class Upload < ApplicationRecord
def calculate_checksum!
self.checksum = nil
return unless checksummable?
return unless needs_checksum?
self.checksum = Digest::SHA256.file(absolute_path).hexdigest
end
......@@ -65,8 +65,15 @@ class Upload < ApplicationRecord
end
end
# This checks for existence of the upload on storage
#
# @return [Boolean] whether upload exists on storage
def exist?
exist = File.exist?(absolute_path)
exist = if local?
File.exist?(absolute_path)
else
build_uploader.exists?
end
# Help sysadmins find missing upload files
if persisted? && !exist
......@@ -91,18 +98,24 @@ class Upload < ApplicationRecord
store == ObjectStorage::Store::LOCAL
end
# Returns whether generating checksum is needed
#
# This takes into account whether file exists, if any checksum exists
# or if the storage has checksum generation code implemented
#
# @return [Boolean] whether generating a checksum is needed
def needs_checksum?
checksum.nil? && local? && exist?
end
private
def delete_file!
build_uploader.remove!
end
def checksummable?
checksum.nil? && local? && exist?
end
def foreground_checksummable?
checksummable? && size <= CHECKSUM_THRESHOLD
needs_checksum? && size <= CHECKSUM_THRESHOLD
end
def schedule_checksum
......
......@@ -35,7 +35,7 @@ module API
if response[:code] == :ok
file = response[:file]
present_disk_file!(file.path, file.filename)
present_carrierwave_file!(file)
else
error! response, response.delete(:code)
end
......
......@@ -6,14 +6,15 @@ module Gitlab
class BaseTransfer
include LogHelpers
attr_reader :file_type, :file_id, :filename, :expected_checksum, :request_data
attr_reader :file_type, :file_id, :filename, :uploader, :expected_checksum, :request_data
TEMP_PREFIX = 'tmp_'.freeze
def initialize(file_type:, file_id:, expected_checksum:, filename:, request_data:)
def initialize(file_type:, file_id:, request_data:, expected_checksum: nil, filename: nil, uploader: nil)
@file_type = file_type
@file_id = file_id
@filename = filename
@uploader = uploader
@expected_checksum = expected_checksum
@request_data = request_data
end
......@@ -34,7 +35,7 @@ module Gitlab
return false
end
if File.directory?(filename)
if filename && File.directory?(filename)
log_error 'Skipping transfer as destination exist and is a directory', filename: filename
return false
......@@ -63,6 +64,14 @@ module Gitlab
download_file(resource_url, req_headers)
end
def stream_from_primary_to_object_storage
return skipped_result unless can_transfer?
req_headers = TransferRequest.new(request_data).headers
transfer_file_to_object_storage(resource_url, req_headers)
end
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file, :skipped
......@@ -142,10 +151,10 @@ module Gitlab
return failure_result(bytes_downloaded: file_size)
end
# Move transfered file to the target location
# Move transferred file to the target location
FileUtils.mv(temp_file.path, filename)
log_info("Successful downloaded", filename: filename, file_size_bytes: file_size)
log_info("Successfully downloaded", filename: filename, file_size_bytes: file_size)
rescue StandardError, ::HTTP::Error => e
log_error("Error downloading file", error: e, filename: filename, url: url)
......@@ -158,6 +167,55 @@ module Gitlab
Result.new(success: file_size > -1, bytes_downloaded: [file_size, 0].max)
end
def transfer_file_to_object_storage(url, req_headers)
file_size = -1
# Create a temporary file for Object Storage transfers
temp_file = Tempfile.new("#{TEMP_PREFIX}-#{file_type}-#{file_id}")
temp_file.chmod(default_permissions)
temp_file.binmode
return failure_result unless temp_file
begin
# Make the request
response = ::HTTP.follow.get(url, headers: req_headers)
# Check for failures
unless response.status.success?
log_error("Unsuccessful download", file_type: file_type, file_id: file_id,
status_code: response.status.code, reason: response.status.reason, url: url)
return failure_result(primary_missing_file: primary_missing_file?(response))
end
# Stream to temporary file on disk
response.body.each do |chunk|
temp_file.write(chunk)
end
file_size = temp_file.size
# Upload file to Object Storage
uploader.send(:storage).store! CarrierWave::SanitizedFile.new(temp_file)
log_info("Successful downloaded", filename: filename, file_size_bytes: file_size)
rescue => e
log_error("Error downloading file", error: e, filename: filename, url: url)
return failure_result
rescue Errno::EEXIST => e
log_error("Destination file is a directory", error: e, filename: filename)
return failure_result
ensure
temp_file.close
temp_file.unlink
end
Result.new(success: file_size > -1, bytes_downloaded: [file_size, 0].max)
end
def primary_missing_file?(response)
if response.status.not_found?
begin
......@@ -186,7 +244,7 @@ module Gitlab
temp.chmod(default_permissions)
temp.binmode
temp
rescue StandardError => e
rescue => e
log_error("Error creating temporary file", error: e, filename: target_filename)
nil
end
......
......@@ -19,7 +19,14 @@ module Gitlab
return missing_on_primary if upload.model.nil?
transfer = ::Gitlab::Geo::Replication::FileTransfer.new(object_type.to_sym, upload)
Result.from_transfer_result(transfer.download_from_primary)
result = if upload.local?
transfer.download_from_primary
else
transfer.stream_from_primary_to_object_storage
end
Result.from_transfer_result(result)
end
private
......
......@@ -13,7 +13,7 @@ module Gitlab
return file_not_found(recorded_file) unless recorded_file.exist?
return error('Upload not found') unless valid?
success(CarrierWave::SanitizedFile.new(recorded_file.absolute_path))
success(recorded_file.build_uploader)
end
private
......
......@@ -9,13 +9,24 @@ module Gitlab
# * Returning a detailed Result object
class FileTransfer < BaseTransfer
def initialize(file_type, upload)
super(
file_type: file_type,
file_id: upload.id,
filename: upload.absolute_path,
expected_checksum: upload.checksum,
request_data: build_request_data(file_type, upload)
)
if upload.local?
super(
file_type: file_type,
file_id: upload.id,
filename: upload.absolute_path,
uploader: upload.build_uploader,
expected_checksum: upload.checksum,
request_data: build_request_data(file_type, upload)
)
else
super(
file_type: file_type,
file_id: upload.id,
uploader: upload.build_uploader,
request_data: build_request_data(file_type, upload)
)
end
rescue ObjectStorage::RemoteStoreError
::Gitlab::Geo::Logger.warn "Error trying to transfer a remote object as a local object."
end
......
......@@ -18,23 +18,38 @@ describe Gitlab::Geo::Replication::BaseTransfer do
stub_current_geo_node(secondary_node)
end
it 'returns false when not a secondary node' do
expect(Gitlab::Geo).to receive(:secondary?) { false }
context 'when not a primary node' do
it 'returns false when not a secondary node' do
expect(Gitlab::Geo).to receive(:secondary?) { false }
expect(subject.can_transfer?).to be_falsey
expect(subject.can_transfer?).to be_falsey
end
end
it 'returns false when no primary node exists' do
expect(Gitlab::Geo).to receive(:primary_node) { nil }
context 'when no primary node exists' do
it 'returns false' do
expect(Gitlab::Geo).to receive(:primary_node) { nil }
expect(subject.can_transfer?).to be_falsey
expect(subject.can_transfer?).to be_falsey
end
end
it 'returns false when destination filename is a directory' do
subject = described_class.new(file_type: :avatar, file_id: 1, filename: Dir::Tmpname.tmpdir,
expected_checksum: nil, request_data: nil)
context 'when destination filename is a directory' do
it 'returns false' do
subject = described_class.new(file_type: :avatar, file_id: 1, filename: Dir::Tmpname.tmpdir,
expected_checksum: nil, request_data: nil)
expect(subject.can_transfer?).to be_falsey
expect(subject.can_transfer?).to be_falsey
end
end
context 'when no filename is informed' do
it 'returns true' do
subject = described_class.new(file_type: :avatar, file_id: 1,
expected_checksum: nil, request_data: nil)
expect(subject.can_transfer?).to be_truthy
end
end
it 'returns true when is a secondary, a primary exists and filename doesnt point to an existing directory' do
......
......@@ -3,7 +3,7 @@
require 'spec_helper'
describe Upload do
describe 'assocations' do
describe 'associations' do
it { is_expected.to belong_to(:model) }
end
......@@ -107,6 +107,30 @@ describe Upload do
end
end
describe '#needs_checksum??' do
context 'with local storage' do
it 'returns true when no checksum exists' do
subject = create(:upload, :with_file, checksum: nil)
expect(subject.needs_checksum?).to be_truthy
end
it 'returns false when checksum is already present' do
subject = create(:upload, :with_file, checksum: 'something')
expect(subject.needs_checksum?).to be_falsey
end
end
context 'with remote storage' do
subject { build(:upload, :object_storage) }
it 'returns false' do
expect(subject.needs_checksum?).to be_falsey
end
end
end
describe '#exist?' do
it 'returns true when the file exists' do
upload = described_class.new(path: __FILE__, store: ObjectStorage::Store::LOCAL)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment