Commit f4b48da3 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch '10586-geo-replication-refactor' into 'master'

Geo: Refactor file replication

See merge request gitlab-org/gitlab-ee!16382
parents 2a445271 b2608d2a
...@@ -49,7 +49,7 @@ module Geo ...@@ -49,7 +49,7 @@ module Geo
join_statement = join_statement =
arel_table arel_table
.join(file_registry_table, Arel::Nodes::OuterJoin) .join(file_registry_table, Arel::Nodes::OuterJoin)
.on(arel_table[:id].eq(file_registry_table[:file_id]).and(file_registry_table[:file_type].in(Geo::FileService::DEFAULT_OBJECT_TYPES))) .on(arel_table[:id].eq(file_registry_table[:file_id]).and(file_registry_table[:file_type].in(Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES)))
joins(join_statement.join_sources) joins(join_statement.join_sources)
end end
......
...@@ -3,19 +3,19 @@ ...@@ -3,19 +3,19 @@
class Geo::FileRegistry < Geo::BaseRegistry class Geo::FileRegistry < Geo::BaseRegistry
include Geo::Syncable include Geo::Syncable
scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) } scope :attachments, -> { where(file_type: Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES) }
scope :failed, -> { where(success: false).where.not(retry_count: nil) } scope :failed, -> { where(success: false).where.not(retry_count: nil) }
scope :fresh, -> { order(created_at: :desc) } scope :fresh, -> { order(created_at: :desc) }
scope :lfs_objects, -> { where(file_type: :lfs) } scope :lfs_objects, -> { where(file_type: :lfs) }
scope :never, -> { where(success: false, retry_count: nil) } scope :never, -> { where(success: false, retry_count: nil) }
scope :uploads, -> { where(file_type: Geo::FileService::UPLOAD_OBJECT_TYPE) } scope :uploads, -> { where(file_type: Gitlab::Geo::Replication::UPLOAD_OBJECT_TYPE) }
self.inheritance_column = 'file_type' self.inheritance_column = 'file_type'
def self.find_sti_class(file_type) def self.find_sti_class(file_type)
if file_type == 'lfs' if file_type == 'lfs'
Geo::LfsObjectRegistry Geo::LfsObjectRegistry
elsif Geo::FileService::DEFAULT_OBJECT_TYPES.include?(file_type.to_sym) elsif Gitlab::Geo::Replication.object_type_from_user_uploads?(file_type)
Geo::UploadRegistry Geo::UploadRegistry
end end
end end
......
...@@ -10,7 +10,7 @@ class Geo::UploadRegistry < Geo::FileRegistry ...@@ -10,7 +10,7 @@ class Geo::UploadRegistry < Geo::FileRegistry
def self.type_condition(table = arel_table) def self.type_condition(table = arel_table)
sti_column = arel_attribute(inheritance_column, table) sti_column = arel_attribute(inheritance_column, table)
sti_names = Geo::FileService::DEFAULT_OBJECT_TYPES sti_names = Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES
sti_column.in(sti_names) sti_column.in(sti_names)
end end
......
# frozen_string_literal: true # frozen_string_literal: true
module Geo module Geo
class FileService # Base class for services that handles any type of blob replication
#
# GitLab handles this types of blobs:
# * user uploads: anything the user can upload on the Web UI (ex: issue attachments, avatars, etc)
# * job artifacts: anything generated by the CI (ex: logs, artifacts, etc)
# * lfs blobs: anything stored in LFS
class BaseFileService
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers include ::Gitlab::Geo::LogHelpers
attr_reader :object_type, :object_db_id attr_reader :object_type, :object_db_id
DEFAULT_OBJECT_TYPES = %i[attachment avatar file import_export namespace_file personal_file favicon].freeze
DEFAULT_SERVICE_TYPE = :file
UPLOAD_OBJECT_TYPE = :file
def initialize(object_type, object_db_id) def initialize(object_type, object_db_id)
@object_type = object_type.to_sym @object_type = object_type.to_sym
@object_db_id = object_db_id @object_db_id = object_db_id
...@@ -22,23 +24,24 @@ module Geo ...@@ -22,23 +24,24 @@ module Geo
private private
def upload? def fail_unimplemented_klass!(type:)
DEFAULT_OBJECT_TYPES.include?(object_type) error_message = "Cannot find a handler for Gitlab::Geo #{type} for object_type = '#{object_type}'"
log_error(error_message)
raise NotImplementedError, error_message
end end
def job_artifact? def user_upload?
object_type == :job_artifact Gitlab::Geo::Replication.object_type_from_user_uploads?(object_type)
end end
def service_klass_name def job_artifact?
klass_name = object_type == :job_artifact
if upload?
DEFAULT_SERVICE_TYPE
else
object_type
end end
klass_name.to_s.camelize def lfs?
object_type == :lfs
end end
def base_log_data(message) def base_log_data(message)
......
...@@ -5,7 +5,7 @@ module Geo ...@@ -5,7 +5,7 @@ module Geo
# * Finding the appropriate Downloader class for a FileRegistry record # * Finding the appropriate Downloader class for a FileRegistry record
# * Executing the Downloader # * Executing the Downloader
# * Marking the FileRegistry record as synced or needing retry # * Marking the FileRegistry record as synced or needing retry
class FileDownloadService < FileService class FileDownloadService < BaseFileService
LEASE_TIMEOUT = 8.hours.freeze LEASE_TIMEOUT = 8.hours.freeze
include Delay include Delay
...@@ -26,14 +26,18 @@ module Geo ...@@ -26,14 +26,18 @@ module Geo
end end
end end
def downloader
downloader_klass.new(object_type, object_db_id)
end
private private
def downloader def downloader_klass
klass = "Gitlab::Geo::#{service_klass_name}Downloader".constantize return Gitlab::Geo::Replication::FileDownloader if user_upload?
klass.new(object_type, object_db_id) return Gitlab::Geo::Replication::JobArtifactDownloader if job_artifact?
rescue NameError => e return Gitlab::Geo::Replication::LfsDownloader if lfs?
log_error('Unknown file type', e)
raise fail_unimplemented_klass!(type: 'Downloader')
end end
def log_file_download(mark_as_synced, download_result, start_time) def log_file_download(mark_as_synced, download_result, start_time)
......
# frozen_string_literal: true # frozen_string_literal: true
module Geo module Geo
class FileRegistryRemovalService < FileService class FileRegistryRemovalService < BaseFileService
include ::Gitlab::Utils::StrongMemoize include ::Gitlab::Utils::StrongMemoize
LEASE_TIMEOUT = 8.hours.freeze LEASE_TIMEOUT = 8.hours.freeze
...@@ -62,7 +62,7 @@ module Geo ...@@ -62,7 +62,7 @@ module Geo
next file_uploader.file.path if file_uploader.object_store == ObjectStorage::Store::LOCAL next file_uploader.file.path if file_uploader.object_store == ObjectStorage::Store::LOCAL
# For remote storage more juggling is needed to actually get the full path on disk # For remote storage more juggling is needed to actually get the full path on disk
if upload? if user_upload?
upload = file_uploader.upload upload = file_uploader.upload
file_uploader.class.absolute_path(upload) file_uploader.class.absolute_path(upload)
else else
...@@ -78,7 +78,7 @@ module Geo ...@@ -78,7 +78,7 @@ module Geo
LfsObject.find(object_db_id).file LfsObject.find(object_db_id).file
when :job_artifact when :job_artifact
Ci::JobArtifact.find(object_db_id).file Ci::JobArtifact.find(object_db_id).file
when *Geo::FileService::DEFAULT_OBJECT_TYPES when *Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES
Upload.find(object_db_id).build_uploader Upload.find(object_db_id).build_uploader
else else
raise NameError, "Unrecognized type: #{object_type}" raise NameError, "Unrecognized type: #{object_type}"
......
...@@ -4,7 +4,7 @@ module Geo ...@@ -4,7 +4,7 @@ module Geo
# This class is responsible for: # This class is responsible for:
# * Handling file requests from the secondary over the API # * Handling file requests from the secondary over the API
# * Returning the necessary response data to send the file back # * Returning the necessary response data to send the file back
class FileUploadService < FileService class FileUploadService < BaseFileService
attr_reader :auth_header attr_reader :auth_header
include ::Gitlab::Utils::StrongMemoize include ::Gitlab::Utils::StrongMemoize
...@@ -17,7 +17,11 @@ module Geo ...@@ -17,7 +17,11 @@ module Geo
def execute def execute
return unless decoded_authorization.present? && jwt_scope_valid? return unless decoded_authorization.present? && jwt_scope_valid?
uploader_klass.new(object_db_id, decoded_authorization).execute retriever.execute
end
def retriever
retriever_klass.new(object_db_id, decoded_authorization)
end end
private private
...@@ -32,11 +36,12 @@ module Geo ...@@ -32,11 +36,12 @@ module Geo
end end
end end
def uploader_klass def retriever_klass
"Gitlab::Geo::#{service_klass_name}Uploader".constantize return Gitlab::Geo::Replication::FileRetriever if user_upload?
rescue NameError => e return Gitlab::Geo::Replication::JobArtifactRetriever if job_artifact?
log_error('Unknown file type', e) return Gitlab::Geo::Replication::LfsRetriever if lfs?
raise
fail_unimplemented_klass!(type: 'Retriever')
end end
end end
end end
...@@ -66,7 +66,7 @@ module Geo ...@@ -66,7 +66,7 @@ module Geo
def job_finders def job_finders
@job_finders ||= [ @job_finders ||= [
Geo::FileDownloadDispatchWorker::AttachmentJobFinder.new(scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES)), Geo::FileDownloadDispatchWorker::AttachmentJobFinder.new(scheduled_file_ids(Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES)),
Geo::FileDownloadDispatchWorker::LfsObjectJobFinder.new(scheduled_file_ids(:lfs)), Geo::FileDownloadDispatchWorker::LfsObjectJobFinder.new(scheduled_file_ids(:lfs)),
Geo::FileDownloadDispatchWorker::JobArtifactJobFinder.new(scheduled_file_ids(:job_artifact)) Geo::FileDownloadDispatchWorker::JobArtifactJobFinder.new(scheduled_file_ids(:job_artifact))
] ]
......
...@@ -58,7 +58,7 @@ module Geo ...@@ -58,7 +58,7 @@ module Geo
def find_migrated_local_attachments_ids(batch_size:) def find_migrated_local_attachments_ids(batch_size:)
return [] unless attachments_object_store_enabled? return [] unless attachments_object_store_enabled?
attachments_finder.find_migrated_local(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES)) attachments_finder.find_migrated_local(batch_size: batch_size, except_file_ids: scheduled_file_ids(Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES))
.pluck(:uploader, :id) .pluck(:uploader, :id)
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] } .map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
end end
......
# frozen_string_literal: true
module Gitlab
module Geo
# This class is responsible for:
# * Finding an Upload record
# * Requesting and downloading the Upload's file from the primary
# * Returning a detailed Result
#
# TODO: Rearrange things so this class not inherited by JobArtifactDownloader and LfsDownloader
# Maybe rename it so it doesn't seem generic. It only works with Upload records.
class FileDownloader
attr_reader :object_type, :object_db_id
def initialize(object_type, object_db_id)
@object_type = object_type
@object_db_id = object_db_id
end
# Executes the actual file download
#
# Subclasses should return the number of bytes downloaded,
# or nil or -1 if a failure occurred.
# rubocop: disable CodeReuse/ActiveRecord
def execute
upload = Upload.find_by(id: object_db_id)
return fail_before_transfer unless upload.present?
return missing_on_primary if upload.model.nil?
transfer = ::Gitlab::Geo::FileTransfer.new(object_type.to_sym, upload)
Result.from_transfer_result(transfer.download_from_primary)
end
# rubocop: enable CodeReuse/ActiveRecord
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file, :failed_before_transfer
def self.from_transfer_result(transfer_result)
Result.new(success: transfer_result.success,
primary_missing_file: transfer_result.primary_missing_file,
bytes_downloaded: transfer_result.bytes_downloaded)
end
def initialize(success:, bytes_downloaded:, primary_missing_file: false, failed_before_transfer: false)
@success = success
@bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file
@failed_before_transfer = failed_before_transfer
end
end
private
def fail_before_transfer
Result.new(success: false, bytes_downloaded: 0, failed_before_transfer: true)
end
def missing_on_primary
Result.new(success: true, bytes_downloaded: 0, primary_missing_file: true)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This class is responsible for:
# * Requesting an Upload file from the primary
# * Saving it in the right place on successful download
# * Returning a detailed Result object
class FileTransfer < Transfer
def initialize(file_type, upload)
super(
file_type,
upload.id,
upload.absolute_path,
upload.checksum,
build_request_data(file_type, upload)
)
rescue ObjectStorage::RemoteStoreError
::Gitlab::Geo::Logger.warn "Cannot transfer a remote object."
end
private
def build_request_data(file_type, upload)
{
id: upload.model_id,
type: upload.model_type,
checksum: upload.checksum,
file_type: file_type,
file_id: upload.id
}
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This class is responsible for:
# * Finding an Upload record
# * Returning the necessary response data to send the file back
#
# TODO: Rearrange things so this class not inherited by JobArtifactUploader and LfsUploader
# Maybe rename it so it doesn't seem generic. It only works with Upload records.
class FileUploader
include LogHelpers
FILE_NOT_FOUND_GEO_CODE = 'FILE_NOT_FOUND'.freeze
attr_reader :object_db_id, :message
def initialize(object_db_id, message)
@object_db_id = object_db_id
@message = message
end
# rubocop: disable CodeReuse/ActiveRecord
def execute
recorded_file = Upload.find_by(id: object_db_id)
return error('Upload not found') unless recorded_file
return file_not_found(recorded_file) unless recorded_file.exist?
return error('Upload not found') unless valid?(recorded_file)
success(CarrierWave::SanitizedFile.new(recorded_file.absolute_path))
end
# rubocop: enable CodeReuse/ActiveRecord
private
def valid?(recorded_file)
matches_requested_model?(recorded_file) &&
matches_checksum?(recorded_file)
end
def matches_requested_model?(recorded_file)
message[:id] == recorded_file.model_id &&
message[:type] == recorded_file.model_type
end
def matches_checksum?(recorded_file)
message[:checksum] == Upload.hexdigest(recorded_file.absolute_path)
end
def success(file)
{ code: :ok, message: 'Success', file: file }
end
def error(message)
{ code: :not_found, message: message }
end
# A 404 implies the client made a mistake requesting that resource.
# In this case, we know that the resource should exist, so it is a 500 server error.
# We send a special "geo_code" so the secondary can mark the file as synced.
def file_not_found(resource)
{
code: :not_found,
geo_code: FILE_NOT_FOUND_GEO_CODE,
message: "#{resource.class.name} ##{resource.id} file not found"
}
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This class is responsible for:
# * Finding a ::Ci::JobArtifact record
# * Requesting and downloading the JobArtifact's file from the primary
# * Returning a detailed Result
#
# TODO: Rearrange things so this class does not inherit FileDownloader
class JobArtifactDownloader < FileDownloader
# rubocop: disable CodeReuse/ActiveRecord
def execute
job_artifact = ::Ci::JobArtifact.find_by(id: object_db_id)
return fail_before_transfer unless job_artifact.present?
transfer = ::Gitlab::Geo::JobArtifactTransfer.new(job_artifact)
Result.from_transfer_result(transfer.download_from_primary)
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This class is responsible for:
# * Requesting an ::Ci::JobArtifact file from the primary
# * Saving it in the right place on successful download
# * Returning a detailed Result object
class JobArtifactTransfer < Transfer
def initialize(job_artifact)
super(
:job_artifact,
job_artifact.id,
job_artifact.file.path,
job_artifact.file_sha256,
job_artifact_request_data(job_artifact)
)
end
private
def job_artifact_request_data(job_artifact)
{
id: job_artifact.id,
file_type: :job_artifact,
file_id: job_artifact.id
}
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This class is responsible for:
# * Finding an ::Ci::JobArtifact record
# * Returning the necessary response data to send the file back
#
# TODO: Rearrange things so this class does not inherit from FileUploader
class JobArtifactUploader < ::Gitlab::Geo::FileUploader
# rubocop: disable CodeReuse/ActiveRecord
def execute
job_artifact = ::Ci::JobArtifact.find_by(id: object_db_id)
unless job_artifact.present?
return error('Job artifact not found')
end
unless job_artifact.file.present? && job_artifact.file.exists?
log_error("Could not upload job artifact because it does not have a file", id: job_artifact.id)
return file_not_found(job_artifact)
end
success(job_artifact.file)
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This class is responsible for:
# * Finding a LfsObject record
# * Requesting and downloading the LfsObject's file from the primary
# * Returning a detailed Result
#
# TODO: Rearrange things so this class does not inherit FileDownloader
class LfsDownloader < FileDownloader
# rubocop: disable CodeReuse/ActiveRecord
def execute
lfs_object = LfsObject.find_by(id: object_db_id)
return fail_before_transfer unless lfs_object.present?
transfer = ::Gitlab::Geo::LfsTransfer.new(lfs_object)
Result.from_transfer_result(transfer.download_from_primary)
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This class is responsible for:
# * Requesting an LfsObject file from the primary
# * Saving it in the right place on successful download
# * Returning a detailed Result object
class LfsTransfer < Transfer
def initialize(lfs_object)
super(
:lfs,
lfs_object.id,
lfs_object.file.path,
lfs_object.oid,
lfs_request_data(lfs_object)
)
end
private
def lfs_request_data(lfs_object)
{
checksum: lfs_object.oid,
file_type: :lfs,
file_id: lfs_object.id
}
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This class is responsible for:
# * Finding an LfsObject record
# * Returning the necessary response data to send the file back
#
# TODO: Rearrange things so this class does not inherit from FileUploader
class LfsUploader < FileUploader
# rubocop: disable CodeReuse/ActiveRecord
def execute
lfs_object = LfsObject.find_by(id: object_db_id)
return error('LFS object not found') unless lfs_object
return error('LFS object not found') if message[:checksum] != lfs_object.oid
unless lfs_object.file.present? && lfs_object.file.exists?
log_error("Could not upload LFS object because it does not have a file", id: lfs_object.id)
return file_not_found(lfs_object)
end
success(lfs_object.file)
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
USER_UPLOADS_OBJECT_TYPES = %i[attachment avatar file import_export namespace_file personal_file favicon].freeze
UPLOAD_OBJECT_TYPE = :file
FILE_NOT_FOUND_GEO_CODE = 'FILE_NOT_FOUND'.freeze
def self.object_type_from_user_uploads?(object_type)
USER_UPLOADS_OBJECT_TYPES.include?(object_type.to_sym)
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
class BaseDownloader
attr_reader :object_type, :object_db_id
def initialize(object_type, object_db_id)
@object_type = object_type
@object_db_id = object_db_id
end
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file, :failed_before_transfer
def self.from_transfer_result(transfer_result)
Result.new(success: transfer_result.success,
primary_missing_file: transfer_result.primary_missing_file,
bytes_downloaded: transfer_result.bytes_downloaded)
end
def initialize(success:, bytes_downloaded:, primary_missing_file: false, failed_before_transfer: false)
@success = success
@bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file
@failed_before_transfer = failed_before_transfer
end
end
private
def fail_before_transfer
Result.new(success: false, bytes_downloaded: 0, failed_before_transfer: true)
end
def missing_on_primary
Result.new(success: true, bytes_downloaded: 0, primary_missing_file: true)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
class BaseRetriever
include LogHelpers
include Gitlab::Utils::StrongMemoize
attr_reader :object_db_id, :message
def initialize(object_db_id, message)
@object_db_id = object_db_id
@message = message
end
private
def success(file)
{ code: :ok, message: 'Success', file: file }
end
def error(message)
{ code: :not_found, message: message }
end
# A 404 implies the client made a mistake requesting that resource.
# In this case, we know that the resource should exist, so it is a 500 server error.
# We send a special "geo_code" so the secondary can mark the file as synced.
def file_not_found(resource)
{
code: :not_found,
geo_code: Replication::FILE_NOT_FOUND_GEO_CODE,
message: "#{resource.class.name} ##{resource.id} file not found"
}
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
class BaseTransfer
include LogHelpers
attr_reader :file_type, :file_id, :filename, :expected_checksum, :request_data
TEMP_PREFIX = 'tmp_'.freeze
def initialize(file_type, file_id, filename, expected_checksum, request_data)
@file_type = file_type
@file_id = file_id
@filename = filename
@expected_checksum = expected_checksum
@request_data = request_data
end
# Returns Result object with success boolean and number of bytes downloaded.
def download_from_primary
return failure unless Gitlab::Geo.secondary?
return failure if File.directory?(filename)
primary = Gitlab::Geo.primary_node
return failure unless primary
url = primary.geo_transfers_url(file_type, file_id.to_s)
req_headers = TransferRequest.new(request_data).headers
return failure unless ensure_path_exists
download_file(url, req_headers)
end
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file
def initialize(success:, bytes_downloaded:, primary_missing_file: false)
@success = success
@bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file
end
end
private
def failure(bytes_downloaded: 0, primary_missing_file: false)
Result.new(success: false, bytes_downloaded: bytes_downloaded, primary_missing_file: primary_missing_file)
end
def ensure_path_exists
path = Pathname.new(filename)
dir = path.dirname
return true if File.directory?(dir)
begin
FileUtils.mkdir_p(dir)
rescue => e
log_error("unable to create directory #{dir}: #{e}")
return false
end
true
end
# Use Gitlab::HTTP for now but switch to curb if performance becomes
# an issue
def download_file(url, req_headers)
file_size = -1
temp_file = open_temp_file(filename)
return failure unless temp_file
begin
response = Gitlab::HTTP.get(url, allow_local_requests: true, headers: req_headers, stream_body: true) do |fragment|
temp_file.write(fragment)
end
temp_file.flush
unless response.success?
log_error("Unsuccessful download", filename: filename, response_code: response.code, response_msg: response.try(:msg), url: url)
return failure(primary_missing_file: primary_missing_file?(response, temp_file))
end
if File.directory?(filename)
log_error("Destination file is a directory", filename: filename)
return failure
end
file_size = File.stat(temp_file.path).size
if checksum_mismatch?(temp_file.path)
log_error("Downloaded file checksum mismatch", expected_checksum: expected_checksum, actual_checksum: @actual_checksum, file_size_bytes: file_size)
return failure(bytes_downloaded: file_size)
end
FileUtils.mv(temp_file.path, filename)
log_info("Successful downloaded", filename: filename, file_size_bytes: file_size)
rescue StandardError, Gitlab::HTTP::Error => e
log_error("Error downloading file", error: e, filename: filename, url: url)
ensure
temp_file.close
temp_file.unlink
end
Result.new(success: file_size > -1, bytes_downloaded: [file_size, 0].max)
end
def primary_missing_file?(response, temp_file)
body = File.read(temp_file.path) if File.exist?(temp_file.path)
if response.code == 404 && body.present?
begin
json_response = JSON.parse(body)
return code_file_not_found?(json_response['geo_code'])
rescue JSON::ParserError
end
end
false
end
def code_file_not_found?(geo_code)
geo_code == Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE
end
def default_permissions
0666 - File.umask
end
def open_temp_file(target_filename)
# Make sure the file is in the same directory to prevent moves across filesystems
pathname = Pathname.new(target_filename)
temp = Tempfile.new(TEMP_PREFIX, pathname.dirname.to_s)
temp.chmod(default_permissions)
temp.binmode
temp
rescue StandardError => e
log_error("Error creating temporary file", error: e)
nil
end
# @param [String] file_path disk location to compare checksum mismatch
def checksum_mismatch?(file_path)
# Skip checksum check if primary didn't generate one because, for
# example, large attachments are checksummed asynchronously, and most
# types of artifacts are not checksummed at all at the moment.
return false if expected_checksum.blank?
return false unless Feature.enabled?(:geo_file_transfer_validation, default_enabled: true)
expected_checksum != actual_checksum(file_path)
end
def actual_checksum(file_path)
@actual_checksum = Digest::SHA256.file(file_path).hexdigest
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
# This class is responsible for:
# * Finding an Upload record
# * Requesting and downloading the Upload's file from the primary
# * Returning a detailed Result
#
class FileDownloader < BaseDownloader
# Executes the actual file download
#
# Subclasses should return the number of bytes downloaded,
# or nil or -1 if a failure occurred.
def execute
upload = find_resource
return fail_before_transfer unless upload.present?
return missing_on_primary if upload.model.nil?
transfer = ::Gitlab::Geo::Replication::FileTransfer.new(object_type.to_sym, upload)
Result.from_transfer_result(transfer.download_from_primary)
end
private
def find_resource
Upload.find_by_id(object_db_id)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
# This class is responsible for:
# * Finding an Upload record
# * Returning the necessary response data to send the file back
#
class FileRetriever < BaseRetriever
def execute
return error('Upload not found') unless recorded_file
return file_not_found(recorded_file) unless recorded_file.exist?
return error('Upload not found') unless valid?
success(CarrierWave::SanitizedFile.new(recorded_file.absolute_path))
end
private
def recorded_file
strong_memoize(:recorded_file) do
Upload.find_by_id(object_db_id)
end
end
def valid?
matches_requested_model? && matches_checksum?
end
def matches_requested_model?
message[:id] == recorded_file.model_id &&
message[:type] == recorded_file.model_type
end
def matches_checksum?
# Remove this when we implement checksums for files on the Object Storage
return true unless recorded_file.local?
message[:checksum] == Upload.hexdigest(recorded_file.absolute_path)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
# This class is responsible for:
# * Requesting an Upload file from the primary
# * Saving it in the right place on successful download
# * Returning a detailed Result object
class FileTransfer < BaseTransfer
def initialize(file_type, upload)
super(
file_type,
upload.id,
upload.absolute_path,
upload.checksum,
build_request_data(file_type, upload)
)
rescue ObjectStorage::RemoteStoreError
::Gitlab::Geo::Logger.warn "Cannot transfer a remote object."
end
private
def build_request_data(file_type, upload)
{
id: upload.model_id,
type: upload.model_type,
checksum: upload.checksum,
file_type: file_type,
file_id: upload.id
}
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
# This class is responsible for:
# * Finding a ::Ci::JobArtifact record
# * Requesting and downloading the JobArtifact's file from the primary
# * Returning a detailed Result
#
class JobArtifactDownloader < BaseDownloader
def execute
job_artifact = find_resource
return fail_before_transfer unless job_artifact.present?
transfer = ::Gitlab::Geo::Replication::JobArtifactTransfer.new(job_artifact)
Result.from_transfer_result(transfer.download_from_primary)
end
private
def find_resource
::Ci::JobArtifact.find_by_id(object_db_id)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
# This class is responsible for:
# * Finding an ::Ci::JobArtifact record
# * Returning the necessary response data to send the file back
#
class JobArtifactRetriever < BaseRetriever
def execute
unless job_artifact.present?
return error('Job artifact not found')
end
unless job_artifact.file.present? && job_artifact.file.exists?
log_error("Could not upload job artifact because it does not have a file", id: job_artifact.id)
return file_not_found(job_artifact)
end
success(job_artifact.file)
end
private
def job_artifact
strong_memoize(:job_artifact) do
::Ci::JobArtifact.find_by_id(object_db_id)
end
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
# This class is responsible for:
# * Requesting an ::Ci::JobArtifact file from the primary
# * Saving it in the right place on successful download
# * Returning a detailed Result object
class JobArtifactTransfer < BaseTransfer
def initialize(job_artifact)
super(
:job_artifact,
job_artifact.id,
job_artifact.file.path,
job_artifact.file_sha256,
job_artifact_request_data(job_artifact)
)
end
private
def job_artifact_request_data(job_artifact)
{
id: job_artifact.id,
file_type: :job_artifact,
file_id: job_artifact.id
}
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
# This class is responsible for:
# * Finding a LfsObject record
# * Requesting and downloading the LfsObject's file from the primary
# * Returning a detailed Result
#
class LfsDownloader < BaseDownloader
def execute
lfs_object = find_resource
return fail_before_transfer unless lfs_object.present?
transfer = ::Gitlab::Geo::Replication::LfsTransfer.new(lfs_object)
Result.from_transfer_result(transfer.download_from_primary)
end
private
def find_resource
LfsObject.find_by_id(object_db_id)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
# This class is responsible for:
# * Finding an LfsObject record
# * Returning the necessary response data to send the file back
#
class LfsRetriever < BaseRetriever
def execute
return error('LFS object not found') unless lfs_object
return error('LFS object not found') unless matches_checksum?
unless lfs_object.file.present? && lfs_object.file.exists?
log_error("Could not upload LFS object because it does not have a file", id: lfs_object.id)
return file_not_found(lfs_object)
end
success(lfs_object.file)
end
private
def lfs_object
strong_memoize(:lfs_object) do
LfsObject.find_by_id(object_db_id)
end
end
def matches_checksum?
message[:checksum] == lfs_object.oid
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
# This class is responsible for:
# * Requesting an LfsObject file from the primary
# * Saving it in the right place on successful download
# * Returning a detailed Result object
class LfsTransfer < BaseTransfer
def initialize(lfs_object)
super(
:lfs,
lfs_object.id,
lfs_object.file.path,
lfs_object.oid,
lfs_request_data(lfs_object)
)
end
private
def lfs_request_data(lfs_object)
{
checksum: lfs_object.oid,
file_type: :lfs,
file_id: lfs_object.id
}
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
class Transfer
include LogHelpers
attr_reader :file_type, :file_id, :filename, :expected_checksum, :request_data
TEMP_PREFIX = 'tmp_'.freeze
def initialize(file_type, file_id, filename, expected_checksum, request_data)
@file_type = file_type
@file_id = file_id
@filename = filename
@expected_checksum = expected_checksum
@request_data = request_data
end
# Returns Result object with success boolean and number of bytes downloaded.
def download_from_primary
return failure unless Gitlab::Geo.secondary?
return failure if File.directory?(filename)
primary = Gitlab::Geo.primary_node
return failure unless primary
url = primary.geo_transfers_url(file_type, file_id.to_s)
req_headers = TransferRequest.new(request_data).headers
return failure unless ensure_path_exists
download_file(url, req_headers)
end
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file
def initialize(success:, bytes_downloaded:, primary_missing_file: false)
@success = success
@bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file
end
end
private
def failure(bytes_downloaded: 0, primary_missing_file: false)
Result.new(success: false, bytes_downloaded: bytes_downloaded, primary_missing_file: primary_missing_file)
end
def ensure_path_exists
path = Pathname.new(filename)
dir = path.dirname
return true if File.directory?(dir)
begin
FileUtils.mkdir_p(dir)
rescue => e
log_error("unable to create directory #{dir}: #{e}")
return false
end
true
end
# Use Gitlab::HTTP for now but switch to curb if performance becomes
# an issue
def download_file(url, req_headers)
file_size = -1
temp_file = open_temp_file(filename)
return failure unless temp_file
begin
response = Gitlab::HTTP.get(url, allow_local_requests: true, headers: req_headers, stream_body: true) do |fragment|
temp_file.write(fragment)
end
temp_file.flush
unless response.success?
log_error("Unsuccessful download", filename: filename, response_code: response.code, response_msg: response.try(:msg), url: url)
return failure(primary_missing_file: primary_missing_file?(response, temp_file))
end
if File.directory?(filename)
log_error("Destination file is a directory", filename: filename)
return failure
end
file_size = File.stat(temp_file.path).size
if checksum_mismatch?(temp_file.path)
log_error("Downloaded file checksum mismatch", expected_checksum: expected_checksum, actual_checksum: @actual_checksum, file_size_bytes: file_size)
return failure(bytes_downloaded: file_size)
end
FileUtils.mv(temp_file.path, filename)
log_info("Successful downloaded", filename: filename, file_size_bytes: file_size)
rescue StandardError, Gitlab::HTTP::Error => e
log_error("Error downloading file", error: e, filename: filename, url: url)
ensure
temp_file.close
temp_file.unlink
end
Result.new(success: file_size > -1, bytes_downloaded: [file_size, 0].max)
end
def primary_missing_file?(response, temp_file)
body = File.read(temp_file.path) if File.exist?(temp_file.path)
if response.code == 404 && body.present?
begin
json_response = JSON.parse(body)
return json_response['geo_code'] == Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE
rescue JSON::ParserError
end
end
false
end
def default_permissions
0666 - File.umask
end
def open_temp_file(target_filename)
# Make sure the file is in the same directory to prevent moves across filesystems
pathname = Pathname.new(target_filename)
temp = Tempfile.new(TEMP_PREFIX, pathname.dirname.to_s)
temp.chmod(default_permissions)
temp.binmode
temp
rescue StandardError => e
log_error("Error creating temporary file", error: e)
nil
end
def checksum_mismatch?(file_path)
# Skip checksum check if primary didn't generate one because, for
# example, large attachments are checksummed asynchronously, and most
# types of artifacts are not checksummed at all at the moment.
return false if expected_checksum.blank?
return false unless Feature.enabled?(:geo_file_transfer_validation, default_enabled: true)
expected_checksum != actual_checksum(file_path)
end
def actual_checksum(file_path)
@actual_checksum = Digest::SHA256.file(file_path).hexdigest
end
end
end
end
...@@ -2,11 +2,10 @@ ...@@ -2,11 +2,10 @@
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::FileDownloader, :geo do describe Gitlab::Geo::Replication::FileDownloader, :geo do
include EE::GeoHelpers include EE::GeoHelpers
set(:primary_node) { create(:geo_node, :primary) } set(:primary_node) { create(:geo_node, :primary) }
set(:secondary_node) { create(:geo_node) }
subject { downloader.execute } subject { downloader.execute }
...@@ -25,6 +24,9 @@ describe Gitlab::Geo::FileDownloader, :geo do ...@@ -25,6 +24,9 @@ describe Gitlab::Geo::FileDownloader, :geo do
end end
context 'when in a secondary geo node' do context 'when in a secondary geo node' do
context 'with local storage only' do
let(:secondary_node) { create(:geo_node, :local_storage_only) }
before do before do
stub_current_geo_node(secondary_node) stub_current_geo_node(secondary_node)
...@@ -36,6 +38,7 @@ describe Gitlab::Geo::FileDownloader, :geo do ...@@ -36,6 +38,7 @@ describe Gitlab::Geo::FileDownloader, :geo do
expect(subject.primary_missing_file).to be_falsey expect(subject.primary_missing_file).to be_falsey
end end
end end
end
def stub_geo_file_transfer(file_type, upload) def stub_geo_file_transfer(file_type, upload)
url = primary_node.geo_transfers_url(file_type, upload.id.to_s) url = primary_node.geo_transfers_url(file_type, upload.id.to_s)
...@@ -43,7 +46,7 @@ describe Gitlab::Geo::FileDownloader, :geo do ...@@ -43,7 +46,7 @@ describe Gitlab::Geo::FileDownloader, :geo do
stub_request(:get, url).to_return(status: 200, body: upload.build_uploader.file.read, headers: {}) stub_request(:get, url).to_return(status: 200, body: upload.build_uploader.file.read, headers: {})
end end
def stub_geo_file_transfer_object_storage def stub_geo_file_transfer_object_storage(file_type, upload)
url = primary_node.geo_transfers_url(file_type, upload.id.to_s) url = primary_node.geo_transfers_url(file_type, upload.id.to_s)
stub_request(:get, url).to_return(status: 307, body: upload.build_uploader.url, headers: {}) stub_request(:get, url).to_return(status: 307, body: upload.build_uploader.url, headers: {})
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::FileUploader, :geo do describe Gitlab::Geo::Replication::FileRetriever, :geo do
shared_examples_for 'returns necessary params for sending a file from an API endpoint' do shared_examples_for 'returns necessary params for sending a file from an API endpoint' do
subject { @subject ||= uploader.execute } subject { @subject ||= retriever.execute }
context 'when the upload exists' do context 'when the upload exists' do
let(:uploader) { described_class.new(upload.id, message) } let(:retriever) { described_class.new(upload.id, message) }
before do before do
expect(Upload).to receive(:find_by).with(id: upload.id).and_return(upload) expect(Upload).to receive(:find_by).with(id: upload.id).and_return(upload)
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::FileTransfer do describe Gitlab::Geo::Replication::FileTransfer do
include ::EE::GeoHelpers include ::EE::GeoHelpers
set(:primary_node) { create(:geo_node, :primary) } set(:primary_node) { create(:geo_node, :primary) }
...@@ -65,7 +65,8 @@ describe Gitlab::Geo::FileTransfer do ...@@ -65,7 +65,8 @@ describe Gitlab::Geo::FileTransfer do
it 'returns a failed result indicating primary_missing_file' do it 'returns a failed result indicating primary_missing_file' do
expect(FileUtils).not_to receive(:mv).with(anything, upload.absolute_path).and_call_original expect(FileUtils).not_to receive(:mv).with(anything, upload.absolute_path).and_call_original
response = double(:response, success?: false, code: 404, msg: "No such file") response = double(:response, success?: false, code: 404, msg: "No such file")
expect(File).to receive(:read).and_return("{\"geo_code\":\"#{Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE}\"}")
expect(File).to receive(:read).and_return("{\"geo_code\":\"#{Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE}\"}")
expect(Gitlab::HTTP).to receive(:get).and_return(response) expect(Gitlab::HTTP).to receive(:get).and_return(response)
result = subject.download_from_primary result = subject.download_from_primary
...@@ -78,6 +79,7 @@ describe Gitlab::Geo::FileTransfer do ...@@ -78,6 +79,7 @@ describe Gitlab::Geo::FileTransfer do
it 'returns a failed result' do it 'returns a failed result' do
expect(FileUtils).not_to receive(:mv).with(anything, upload.absolute_path).and_call_original expect(FileUtils).not_to receive(:mv).with(anything, upload.absolute_path).and_call_original
response = double(:response, success?: false, code: 404, msg: 'No such file') response = double(:response, success?: false, code: 404, msg: 'No such file')
expect(Gitlab::HTTP).to receive(:get).and_return(response) expect(Gitlab::HTTP).to receive(:get).and_return(response)
result = subject.download_from_primary result = subject.download_from_primary
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::JobArtifactDownloader, :geo do describe Gitlab::Geo::Replication::JobArtifactDownloader, :geo do
let(:job_artifact) { create(:ci_job_artifact) } let(:job_artifact) { create(:ci_job_artifact) }
context '#execute' do context '#execute' do
context 'with job artifact' do context 'with job artifact' do
it 'returns a FileDownloader::Result object' do it 'returns a FileDownloader::Result object' do
downloader = described_class.new(:job_artifact, job_artifact.id) downloader = described_class.new(:job_artifact, job_artifact.id)
result = Gitlab::Geo::Transfer::Result.new(success: true, bytes_downloaded: 1) result = Gitlab::Geo::Replication::BaseTransfer::Result.new(success: true, bytes_downloaded: 1)
allow_any_instance_of(Gitlab::Geo::JobArtifactTransfer) allow_any_instance_of(Gitlab::Geo::Replication::JobArtifactTransfer)
.to receive(:download_from_primary).and_return(result) .to receive(:download_from_primary).and_return(result)
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result) expect(downloader.execute).to be_a(Gitlab::Geo::Replication::FileDownloader::Result)
end end
end end
...@@ -20,7 +20,7 @@ describe Gitlab::Geo::JobArtifactDownloader, :geo do ...@@ -20,7 +20,7 @@ describe Gitlab::Geo::JobArtifactDownloader, :geo do
let(:downloader) { described_class.new(:job_artifact, 10000) } let(:downloader) { described_class.new(:job_artifact, 10000) }
it 'returns a FileDownloader::Result object' do it 'returns a FileDownloader::Result object' do
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result) expect(downloader.execute).to be_a(Gitlab::Geo::Replication::FileDownloader::Result)
end end
it 'returns a result indicating a failure before a transfer was attempted' do it 'returns a result indicating a failure before a transfer was attempted' do
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::JobArtifactUploader, :geo do describe Gitlab::Geo::Replication::JobArtifactRetriever, :geo do
context '#execute' do context '#execute' do
let(:uploader) { described_class.new(job_artifact.id, {}) } let(:retriever) { described_class.new(job_artifact.id, {}) }
subject { uploader.execute } subject { retriever.execute }
context 'when the job artifact exists' do context 'when the job artifact exists' do
before do before do
...@@ -34,7 +34,7 @@ describe Gitlab::Geo::JobArtifactUploader, :geo do ...@@ -34,7 +34,7 @@ describe Gitlab::Geo::JobArtifactUploader, :geo do
end end
it 'logs the missing file' do it 'logs the missing file' do
expect(uploader).to receive(:log_error).with("Could not upload job artifact because it does not have a file", id: job_artifact.id) expect(retriever).to receive(:log_error).with("Could not upload job artifact because it does not have a file", id: job_artifact.id)
subject subject
end end
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::JobArtifactTransfer, :geo do describe Gitlab::Geo::Replication::JobArtifactTransfer, :geo do
include ::EE::GeoHelpers include ::EE::GeoHelpers
set(:primary_node) { create(:geo_node, :primary) } set(:primary_node) { create(:geo_node, :primary) }
...@@ -71,7 +71,7 @@ describe Gitlab::Geo::JobArtifactTransfer, :geo do ...@@ -71,7 +71,7 @@ describe Gitlab::Geo::JobArtifactTransfer, :geo do
it 'returns a failed result indicating primary_missing_file' do it 'returns a failed result indicating primary_missing_file' do
expect(FileUtils).not_to receive(:mv).with(anything, job_artifact.file.path).and_call_original expect(FileUtils).not_to receive(:mv).with(anything, job_artifact.file.path).and_call_original
response = double(:response, success?: false, code: 404, msg: "No such file") response = double(:response, success?: false, code: 404, msg: "No such file")
expect(File).to receive(:read).and_return("{\"geo_code\":\"#{Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE}\"}") expect(File).to receive(:read).and_return("{\"geo_code\":\"#{Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE}\"}")
expect(Gitlab::HTTP).to receive(:get).and_return(response) expect(Gitlab::HTTP).to receive(:get).and_return(response)
result = subject.download_from_primary result = subject.download_from_primary
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::LfsDownloader, :geo do describe Gitlab::Geo::Replication::LfsDownloader, :geo do
let(:lfs_object) { create(:lfs_object) } let(:lfs_object) { create(:lfs_object) }
context '#execute' do context '#execute' do
context 'with LFS object' do context 'with LFS object' do
it 'returns a FileDownloader::Result object' do it 'returns a FileDownloader::Result object' do
downloader = described_class.new(:lfs, lfs_object.id) downloader = described_class.new(:lfs, lfs_object.id)
result = Gitlab::Geo::Transfer::Result.new(success: true, bytes_downloaded: 1) result = Gitlab::Geo::Replication::BaseTransfer::Result.new(success: true, bytes_downloaded: 1)
allow_any_instance_of(Gitlab::Geo::LfsTransfer) allow_any_instance_of(Gitlab::Geo::Replication::LfsTransfer)
.to receive(:download_from_primary).and_return(result) .to receive(:download_from_primary).and_return(result)
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result) expect(downloader.execute).to be_a(Gitlab::Geo::Replication::FileDownloader::Result)
end end
end end
...@@ -20,7 +20,7 @@ describe Gitlab::Geo::LfsDownloader, :geo do ...@@ -20,7 +20,7 @@ describe Gitlab::Geo::LfsDownloader, :geo do
let(:downloader) { described_class.new(:lfs, 10000) } let(:downloader) { described_class.new(:lfs, 10000) }
it 'returns a FileDownloader::Result object' do it 'returns a FileDownloader::Result object' do
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result) expect(downloader.execute).to be_a(Gitlab::Geo::Replication::FileDownloader::Result)
end end
it 'returns a result indicating a failure before a transfer was attempted' do it 'returns a result indicating a failure before a transfer was attempted' do
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::LfsUploader, :geo do describe Gitlab::Geo::Replication::LfsRetriever, :geo do
context '#execute' do context '#execute' do
subject { uploader.execute } subject { retriever.execute }
context 'when the LFS object exists' do context 'when the LFS object exists' do
let(:uploader) { described_class.new(lfs_object.id, message) } let(:retriever) { described_class.new(lfs_object.id, message) }
before do before do
expect(LfsObject).to receive(:find_by).with(id: lfs_object.id).and_return(lfs_object) expect(LfsObject).to receive(:find_by).with(id: lfs_object.id).and_return(lfs_object)
...@@ -39,7 +39,7 @@ describe Gitlab::Geo::LfsUploader, :geo do ...@@ -39,7 +39,7 @@ describe Gitlab::Geo::LfsUploader, :geo do
end end
it 'logs the missing file' do it 'logs the missing file' do
expect(uploader).to receive(:log_error).with("Could not upload LFS object because it does not have a file", id: lfs_object.id) expect(retriever).to receive(:log_error).with("Could not upload LFS object because it does not have a file", id: lfs_object.id)
subject subject
end end
...@@ -47,7 +47,7 @@ describe Gitlab::Geo::LfsUploader, :geo do ...@@ -47,7 +47,7 @@ describe Gitlab::Geo::LfsUploader, :geo do
end end
context 'when the LFS object does not exist' do context 'when the LFS object does not exist' do
let(:uploader) { described_class.new(10000, {}) } let(:retriever) { described_class.new(10000, {}) }
it 'returns an error hash' do it 'returns an error hash' do
expect(subject).to eq(code: :not_found, message: 'LFS object not found') expect(subject).to eq(code: :not_found, message: 'LFS object not found')
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::LfsTransfer do describe Gitlab::Geo::Replication::LfsTransfer do
include ::EE::GeoHelpers include ::EE::GeoHelpers
set(:primary_node) { create(:geo_node, :primary) } set(:primary_node) { create(:geo_node, :primary) }
...@@ -49,7 +49,7 @@ describe Gitlab::Geo::LfsTransfer do ...@@ -49,7 +49,7 @@ describe Gitlab::Geo::LfsTransfer do
it 'returns a failed result indicating primary_missing_file' do it 'returns a failed result indicating primary_missing_file' do
expect(FileUtils).not_to receive(:mv).with(anything, lfs_object.file.path).and_call_original expect(FileUtils).not_to receive(:mv).with(anything, lfs_object.file.path).and_call_original
response = double(:response, success?: false, code: 404, msg: "No such file") response = double(:response, success?: false, code: 404, msg: "No such file")
expect(File).to receive(:read).and_return("{\"geo_code\":\"#{Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE}\"}") expect(File).to receive(:read).and_return("{\"geo_code\":\"#{Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE}\"}")
expect(Gitlab::HTTP).to receive(:get).and_return(response) expect(Gitlab::HTTP).to receive(:get).and_return(response)
result = subject.download_from_primary result = subject.download_from_primary
......
...@@ -41,7 +41,7 @@ describe API::Geo do ...@@ -41,7 +41,7 @@ describe API::Geo do
describe 'allowed IPs' do describe 'allowed IPs' do
let(:note) { create(:note, :with_attachment) } let(:note) { create(:note, :with_attachment) }
let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') } let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') }
let(:transfer) { Gitlab::Geo::FileTransfer.new(:attachment, upload) } let(:transfer) { Gitlab::Geo::Replication::FileTransfer.new(:attachment, upload) }
let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers } let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers }
it 'responds with 401 when IP is not allowed' do it 'responds with 401 when IP is not allowed' do
...@@ -64,7 +64,7 @@ describe API::Geo do ...@@ -64,7 +64,7 @@ describe API::Geo do
describe 'GET /geo/transfers/attachment/1' do describe 'GET /geo/transfers/attachment/1' do
let(:note) { create(:note, :with_attachment) } let(:note) { create(:note, :with_attachment) }
let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') } let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') }
let(:transfer) { Gitlab::Geo::FileTransfer.new(:attachment, upload) } let(:transfer) { Gitlab::Geo::Replication::FileTransfer.new(:attachment, upload) }
let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers } let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers }
before do before do
...@@ -103,7 +103,7 @@ describe API::Geo do ...@@ -103,7 +103,7 @@ describe API::Geo do
describe 'GET /geo/transfers/avatar/1' do describe 'GET /geo/transfers/avatar/1' do
let(:user) { create(:user, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) } let(:user) { create(:user, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: user, uploader: 'AvatarUploader') } let(:upload) { Upload.find_by(model: user, uploader: 'AvatarUploader') }
let(:transfer) { Gitlab::Geo::FileTransfer.new(:avatar, upload) } let(:transfer) { Gitlab::Geo::Replication::FileTransfer.new(:avatar, upload) }
let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers } let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers }
before do before do
...@@ -142,7 +142,7 @@ describe API::Geo do ...@@ -142,7 +142,7 @@ describe API::Geo do
describe 'GET /geo/transfers/file/1' do describe 'GET /geo/transfers/file/1' do
let(:project) { create(:project) } let(:project) { create(:project) }
let(:upload) { Upload.find_by(model: project, uploader: 'FileUploader') } let(:upload) { Upload.find_by(model: project, uploader: 'FileUploader') }
let(:transfer) { Gitlab::Geo::FileTransfer.new(:file, upload) } let(:transfer) { Gitlab::Geo::Replication::FileTransfer.new(:file, upload) }
let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers } let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers }
before do before do
...@@ -178,7 +178,7 @@ describe API::Geo do ...@@ -178,7 +178,7 @@ describe API::Geo do
get api("/geo/transfers/file/#{upload.id}"), headers: req_header get api("/geo/transfers/file/#{upload.id}"), headers: req_header
expect(response).to have_gitlab_http_status(404) expect(response).to have_gitlab_http_status(404)
expect(json_response['geo_code']).to eq(Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE) expect(json_response['geo_code']).to eq(Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE)
end end
end end
end end
...@@ -194,7 +194,7 @@ describe API::Geo do ...@@ -194,7 +194,7 @@ describe API::Geo do
describe 'GET /geo/transfers/lfs/1' do describe 'GET /geo/transfers/lfs/1' do
let(:lfs_object) { create(:lfs_object, :with_file) } let(:lfs_object) { create(:lfs_object, :with_file) }
let(:transfer) { Gitlab::Geo::LfsTransfer.new(lfs_object) } let(:transfer) { Gitlab::Geo::Replication::LfsTransfer.new(lfs_object) }
let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers } let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers }
before do before do
...@@ -229,7 +229,7 @@ describe API::Geo do ...@@ -229,7 +229,7 @@ describe API::Geo do
get api("/geo/transfers/lfs/#{lfs_object.id}"), headers: req_header get api("/geo/transfers/lfs/#{lfs_object.id}"), headers: req_header
expect(response).to have_gitlab_http_status(404) expect(response).to have_gitlab_http_status(404)
expect(json_response['geo_code']).to eq(Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE) expect(json_response['geo_code']).to eq(Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE)
end end
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::BaseFileService do
subject { described_class.new('file', 8) }
describe '#execute' do
it 'requires a subclass overrides it' do
expect { subject.execute }.to raise_error(NotImplementedError)
end
end
end
...@@ -11,6 +11,28 @@ describe Geo::FileDownloadService do ...@@ -11,6 +11,28 @@ describe Geo::FileDownloadService do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
end end
describe '#downloader' do
Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES.each do |object_type|
it "returns a FileDownloader given object_type is #{object_type}" do
subject = described_class.new(object_type, 1)
expect(subject.downloader).to be_a(Gitlab::Geo::Replication::FileDownloader)
end
end
it "returns a LfsDownloader given object_type is lfs" do
subject = described_class.new('lfs', 1)
expect(subject.downloader).to be_a(Gitlab::Geo::Replication::LfsDownloader)
end
it "returns a JobArtifactDownloader given object_type is job_artifact" do
subject = described_class.new('job_artifact', 1)
expect(subject.downloader).to be_a(Gitlab::Geo::Replication::JobArtifactDownloader)
end
end
context 'retry time' do context 'retry time' do
before do before do
stub_transfer_result(bytes_downloaded: 0, success: false) stub_transfer_result(bytes_downloaded: 0, success: false)
...@@ -413,7 +435,7 @@ describe Geo::FileDownloadService do ...@@ -413,7 +435,7 @@ describe Geo::FileDownloadService do
context 'bad object type' do context 'bad object type' do
it 'raises an error' do it 'raises an error' do
expect { described_class.new(:bad, 1).execute }.to raise_error(NameError) expect { described_class.new(:bad, 1).execute }.to raise_error(NotImplementedError)
end end
end end
end end
...@@ -423,7 +445,7 @@ describe Geo::FileDownloadService do ...@@ -423,7 +445,7 @@ describe Geo::FileDownloadService do
bytes_downloaded: bytes_downloaded, bytes_downloaded: bytes_downloaded,
success: success, success: success,
primary_missing_file: primary_missing_file) primary_missing_file: primary_missing_file)
instance = double("(instance of Gitlab::Geo::Transfer)", download_from_primary: result) instance = double("(instance of Gitlab::Geo::Replication::Transfer)", download_from_primary: result)
allow(Gitlab::Geo::Transfer).to receive(:new).and_return(instance) allow(Gitlab::Geo::Replication::BaseTransfer).to receive(:new).and_return(instance)
end end
end end
...@@ -12,6 +12,28 @@ describe Geo::FileUploadService do ...@@ -12,6 +12,28 @@ describe Geo::FileUploadService do
stub_current_geo_node(node) stub_current_geo_node(node)
end end
describe '#retriever' do
Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES.each do |file_type|
it "returns a FileRetriever given type is #{file_type}" do
subject = described_class.new({ type: file_type, id: 1 }, 'request-data')
expect(subject.retriever).to be_a(Gitlab::Geo::Replication::FileRetriever)
end
end
it "returns a LfsRetriever given object_type is lfs" do
subject = described_class.new({ type: 'lfs', id: 1 }, 'request-data')
expect(subject.retriever).to be_a(Gitlab::Geo::Replication::LfsRetriever)
end
it "returns a JobArtifactRetriever given object_type is job_artifact" do
subject = described_class.new({ type: 'job_artifact', id: 1 }, 'request-data')
expect(subject.retriever).to be_a(Gitlab::Geo::Replication::JobArtifactRetriever)
end
end
shared_examples 'no authorization header' do shared_examples 'no authorization header' do
it 'returns nil' do it 'returns nil' do
service = described_class.new(params, nil) service = described_class.new(params, nil)
...@@ -37,7 +59,7 @@ describe Geo::FileUploadService do ...@@ -37,7 +59,7 @@ describe Geo::FileUploadService do
let(:user) { create(:user, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) } let(:user) { create(:user, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: user, uploader: 'AvatarUploader') } let(:upload) { Upload.find_by(model: user, uploader: 'AvatarUploader') }
let(:params) { { id: upload.id, type: 'avatar' } } let(:params) { { id: upload.id, type: 'avatar' } }
let(:request_data) { Gitlab::Geo::FileTransfer.new(:avatar, upload).request_data } let(:request_data) { Gitlab::Geo::Replication::FileTransfer.new(:avatar, upload).request_data }
it 'sends avatar file' do it 'sends avatar file' do
service = described_class.new(params, req_header) service = described_class.new(params, req_header)
...@@ -56,7 +78,7 @@ describe Geo::FileUploadService do ...@@ -56,7 +78,7 @@ describe Geo::FileUploadService do
let(:group) { create(:group, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) } let(:group) { create(:group, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: group, uploader: 'AvatarUploader') } let(:upload) { Upload.find_by(model: group, uploader: 'AvatarUploader') }
let(:params) { { id: upload.id, type: 'avatar' } } let(:params) { { id: upload.id, type: 'avatar' } }
let(:request_data) { Gitlab::Geo::FileTransfer.new(:avatar, upload).request_data } let(:request_data) { Gitlab::Geo::Replication::FileTransfer.new(:avatar, upload).request_data }
it 'sends avatar file' do it 'sends avatar file' do
service = described_class.new(params, req_header) service = described_class.new(params, req_header)
...@@ -75,7 +97,7 @@ describe Geo::FileUploadService do ...@@ -75,7 +97,7 @@ describe Geo::FileUploadService do
let(:project) { create(:project, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) } let(:project) { create(:project, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: project, uploader: 'AvatarUploader') } let(:upload) { Upload.find_by(model: project, uploader: 'AvatarUploader') }
let(:params) { { id: upload.id, type: 'avatar' } } let(:params) { { id: upload.id, type: 'avatar' } }
let(:request_data) { Gitlab::Geo::FileTransfer.new(:avatar, upload).request_data } let(:request_data) { Gitlab::Geo::Replication::FileTransfer.new(:avatar, upload).request_data }
it 'sends avatar file' do it 'sends avatar file' do
service = described_class.new(params, req_header) service = described_class.new(params, req_header)
...@@ -94,7 +116,7 @@ describe Geo::FileUploadService do ...@@ -94,7 +116,7 @@ describe Geo::FileUploadService do
let(:note) { create(:note, :with_attachment) } let(:note) { create(:note, :with_attachment) }
let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') } let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') }
let(:params) { { id: upload.id, type: 'attachment' } } let(:params) { { id: upload.id, type: 'attachment' } }
let(:request_data) { Gitlab::Geo::FileTransfer.new(:attachment, upload).request_data } let(:request_data) { Gitlab::Geo::Replication::FileTransfer.new(:attachment, upload).request_data }
it 'sends attachment file' do it 'sends attachment file' do
service = described_class.new(params, req_header) service = described_class.new(params, req_header)
...@@ -113,7 +135,7 @@ describe Geo::FileUploadService do ...@@ -113,7 +135,7 @@ describe Geo::FileUploadService do
let(:project) { create(:project) } let(:project) { create(:project) }
let(:upload) { Upload.find_by(model: project, uploader: 'FileUploader') } let(:upload) { Upload.find_by(model: project, uploader: 'FileUploader') }
let(:params) { { id: upload.id, type: 'file' } } let(:params) { { id: upload.id, type: 'file' } }
let(:request_data) { Gitlab::Geo::FileTransfer.new(:file, upload).request_data } let(:request_data) { Gitlab::Geo::Replication::FileTransfer.new(:file, upload).request_data }
let(:file) { fixture_file_upload('spec/fixtures/dk.png', 'image/png') } let(:file) { fixture_file_upload('spec/fixtures/dk.png', 'image/png') }
before do before do
...@@ -137,7 +159,7 @@ describe Geo::FileUploadService do ...@@ -137,7 +159,7 @@ describe Geo::FileUploadService do
let(:group) { create(:group) } let(:group) { create(:group) }
let(:upload) { Upload.find_by(model: group, uploader: 'NamespaceFileUploader') } let(:upload) { Upload.find_by(model: group, uploader: 'NamespaceFileUploader') }
let(:params) { { id: upload.id, type: 'file' } } let(:params) { { id: upload.id, type: 'file' } }
let(:request_data) { Gitlab::Geo::FileTransfer.new(:file, upload).request_data } let(:request_data) { Gitlab::Geo::Replication::FileTransfer.new(:file, upload).request_data }
let(:file) { fixture_file_upload('spec/fixtures/dk.png', 'image/png') } let(:file) { fixture_file_upload('spec/fixtures/dk.png', 'image/png') }
before do before do
...@@ -160,7 +182,7 @@ describe Geo::FileUploadService do ...@@ -160,7 +182,7 @@ describe Geo::FileUploadService do
context 'LFS Object' do context 'LFS Object' do
let(:lfs_object) { create(:lfs_object, :with_file) } let(:lfs_object) { create(:lfs_object, :with_file) }
let(:params) { { id: lfs_object.id, type: 'lfs' } } let(:params) { { id: lfs_object.id, type: 'lfs' } }
let(:request_data) { Gitlab::Geo::LfsTransfer.new(lfs_object).request_data } let(:request_data) { Gitlab::Geo::Replication::LfsTransfer.new(lfs_object).request_data }
it 'sends LFS file' do it 'sends LFS file' do
service = described_class.new(params, req_header) service = described_class.new(params, req_header)
...@@ -178,7 +200,7 @@ describe Geo::FileUploadService do ...@@ -178,7 +200,7 @@ describe Geo::FileUploadService do
context 'job artifact' do context 'job artifact' do
let(:job_artifact) { create(:ci_job_artifact, :with_file) } let(:job_artifact) { create(:ci_job_artifact, :with_file) }
let(:params) { { id: job_artifact.id, type: 'job_artifact' } } let(:params) { { id: job_artifact.id, type: 'job_artifact' } }
let(:request_data) { Gitlab::Geo::JobArtifactTransfer.new(job_artifact).request_data } let(:request_data) { Gitlab::Geo::Replication::JobArtifactTransfer.new(job_artifact).request_data }
it 'sends job artifact file' do it 'sends job artifact file' do
service = described_class.new(params, req_header) service = described_class.new(params, req_header)
...@@ -197,7 +219,7 @@ describe Geo::FileUploadService do ...@@ -197,7 +219,7 @@ describe Geo::FileUploadService do
let(:project) { create(:project) } let(:project) { create(:project) }
let(:upload) { Upload.find_by(model: project, uploader: 'ImportExportUploader') } let(:upload) { Upload.find_by(model: project, uploader: 'ImportExportUploader') }
let(:params) { { id: upload.id, type: 'import_export' } } let(:params) { { id: upload.id, type: 'import_export' } }
let(:request_data) { Gitlab::Geo::FileTransfer.new(:import_export, upload).request_data } let(:request_data) { Gitlab::Geo::Replication::FileTransfer.new(:import_export, upload).request_data }
let(:file) { fixture_file_upload('spec/fixtures/project_export.tar.gz') } let(:file) { fixture_file_upload('spec/fixtures/project_export.tar.gz') }
before do before do
......
...@@ -312,7 +312,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do ...@@ -312,7 +312,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5) stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
secondary.update!(files_max_capacity: 2) secondary.update!(files_max_capacity: 2)
result_object = double(:result, success: true, bytes_downloaded: 100, primary_missing_file: false) result_object = double(:result, success: true, bytes_downloaded: 100, primary_missing_file: false)
allow_any_instance_of(::Gitlab::Geo::Transfer).to receive(:download_from_primary).and_return(result_object) allow_any_instance_of(::Gitlab::Geo::Replication::BaseTransfer).to receive(:download_from_primary).and_return(result_object)
avatar = fixture_file_upload('spec/fixtures/dk.png') avatar = fixture_file_upload('spec/fixtures/dk.png')
create_list(:lfs_object, 2, :with_file) create_list(:lfs_object, 2, :with_file)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment