Commit a7e227e1 authored by Robert Speicher's avatar Robert Speicher

Merge branch 'geo/support-file-replication' into 'master'

Geo - Add support for remaining file replication

See merge request !1562
parents 5e9d6f30 657170b2
......@@ -12,6 +12,7 @@ class GeoNodeStatus {
this.$repositoriesSynced = $('.js-repositories-synced', this.$status);
this.$repositoriesFailed = $('.js-repositories-failed', this.$status);
this.$lfsObjectsSynced = $('.js-lfs-objects-synced', this.$status);
this.$attachmentsSynced = $('.js-attachments-synced', this.$status);
this.$health = $('.js-health', this.$status);
this.endpoint = this.$el.data('status-url');
......@@ -31,6 +32,7 @@ class GeoNodeStatus {
this.$repositoriesSynced.html(`${status.repositories_synced_count}/${status.repositories_count} (${status.repositories_synced_in_percentage})`);
this.$repositoriesFailed.html(status.repositories_failed_count);
this.$lfsObjectsSynced.html(`${status.lfs_objects_synced_count}/${status.lfs_objects_count} (${status.lfs_objects_synced_in_percentage})`);
this.$attachmentsSynced.html(`${status.attachments_synced_count}/${status.attachments_count} (${status.attachments_synced_in_percentage})`);
this.$health.html(status.health);
this.$status.show();
......
......@@ -60,6 +60,31 @@ class GeoNodeStatus
sync_percentage(lfs_objects_count, lfs_objects_synced_count)
end
def attachments_count
@attachments_count ||= Upload.count
end
def attachments_count=(value)
@attachments_count = value.to_i
end
def attachments_synced_count
@attachments_synced_count ||= begin
upload_ids = Upload.pluck(:id)
synced_ids = Geo::FileRegistry.where(file_type: [:attachment, :avatar, :file]).pluck(:file_id)
(synced_ids & upload_ids).length
end
end
def attachments_synced_count=(value)
@attachments_synced_count = value.to_i
end
def attachments_synced_in_percentage
sync_percentage(attachments_count, attachments_synced_count)
end
private
def sync_percentage(total, synced)
......
......@@ -27,6 +27,12 @@ class Upload < ActiveRecord::Base
)
end
def self.hexdigest(absolute_path)
return unless File.exist?(absolute_path)
Digest::SHA256.file(absolute_path).hexdigest
end
def absolute_path
return path unless relative_path?
......@@ -36,7 +42,7 @@ class Upload < ActiveRecord::Base
def calculate_checksum
return unless exist?
self.checksum = Digest::SHA256.file(absolute_path).hexdigest
self.checksum = self.class.hexdigest(absolute_path)
end
def exist?
......
......@@ -8,6 +8,12 @@ class GeoNodeStatusEntity < Grape::Entity
node.healthy? ? 'No Health Problems Detected' : node.health
end
expose :attachments_count
expose :attachments_synced_count
expose :attachments_synced_in_percentage do |node|
number_to_percentage(node.attachments_synced_in_percentage, precision: 2)
end
expose :lfs_objects_count
expose :lfs_objects_synced_count
expose :lfs_objects_synced_in_percentage do |node|
......
module Geo
class FileDownloadService
attr_reader :object_type, :object_db_id
class FileDownloadService < FileService
LEASE_TIMEOUT = 8.hours.freeze
def initialize(object_type, object_db_id)
@object_type = object_type
@object_db_id = object_db_id
end
def execute
try_obtain_lease do |lease|
bytes_downloaded = downloader.execute
......@@ -20,13 +13,11 @@ module Geo
private
def downloader
begin
klass = Gitlab::Geo.const_get("#{object_type.capitalize}Downloader")
klass.new(object_db_id)
rescue NameError
log("unknown file type: #{object_type}")
raise
end
klass = "Gitlab::Geo::#{service_klass_name}Downloader".constantize
klass.new(object_type, object_db_id)
rescue NameError
log("Unknown file type: #{object_type}")
raise
end
def try_obtain_lease
......@@ -41,14 +32,12 @@ module Geo
end
end
def log(message)
Rails.logger.info "#{self.class.name}: #{message}"
end
def update_registry(bytes_downloaded)
transfer = Geo::FileRegistry.find_or_initialize_by(
file_type: object_type,
file_id: object_db_id)
file_id: object_db_id
)
transfer.bytes = bytes_downloaded
transfer.save
end
......
module Geo
class FileService
attr_reader :object_type, :object_db_id
DEFAULT_OBJECT_TYPES = %w[attachment avatar file].freeze
DEFAULT_SERVICE_TYPE = 'file'.freeze
def initialize(object_type, object_db_id)
@object_type = object_type.to_s
@object_db_id = object_db_id
end
def execute
raise NotImplementedError
end
private
def service_klass_name
klass_name =
if DEFAULT_OBJECT_TYPES.include?(object_type)
DEFAULT_SERVICE_TYPE
else
object_type
end
klass_name.camelize
end
def log(message)
Rails.logger.info "#{self.class.name}: #{message}"
end
end
end
module Geo
class FileUploadService
class FileUploadService < FileService
IAT_LEEWAY = 60.seconds.to_i
attr_reader :params, :auth_header
attr_reader :auth_header
def initialize(params, auth_header)
@params = params
super(params[:type], params[:id])
@auth_header = auth_header
end
def execute
# Returns { code: :ok, file: CarrierWave File object } upon success
data = ::Gitlab::Geo::JwtRequestDecoder.new(auth_header).decode
return unless data.present?
response =
case params[:type]
when 'lfs'
handle_lfs_geo_request(params[:id], data)
else
{}
end
response
uploader_klass.new(object_db_id, data).execute
end
def handle_lfs_geo_request(id, message)
status = { code: :not_found, message: 'LFS object not found' }
lfs_object = LfsObject.find(id)
return status unless lfs_object.present?
if message[:sha256] != lfs_object.oid
return status
end
unless lfs_object.file.present? && lfs_object.file.exists?
status[:message] = "LFS object does not have a file"
return status
end
private
status[:code] = :ok
status[:message] = "Success"
status[:file] = lfs_object.file
status
def uploader_klass
"Gitlab::Geo::#{service_klass_name}Uploader".constantize
rescue NameError
log("Unknown file type: #{object_type}")
raise
end
end
end
......@@ -10,6 +10,8 @@ module Geo
repositories_failed_count
lfs_objects_count
lfs_objects_synced_count
attachments_count
attachments_synced_count
).freeze
def call(geo_node)
......
......@@ -37,6 +37,10 @@
%span.help-block
LFS objects synced:
%span.js-lfs-objects-synced
%p
%span.help-block
Attachments synced:
%span.js-attachments-synced
%p
%span.help-block.js-health
......
......@@ -9,8 +9,8 @@ class GeoFileDownloadDispatchWorker
MAX_CONCURRENT_DOWNLOADS = 10.freeze
def initialize
@pending_lfs_downloads = []
@scheduled_lfs_jobs = []
@pending_downloads = []
@scheduled_jobs = []
end
# The scheduling works as the following:
......@@ -34,6 +34,7 @@ class GeoFileDownloadDispatchWorker
update_jobs_in_progress
load_pending_downloads if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
......@@ -41,7 +42,7 @@ class GeoFileDownloadDispatchWorker
break if over_time?
break unless downloads_remain?
schedule_lfs_downloads
schedule_downloads
break if last_batch
......@@ -53,7 +54,7 @@ class GeoFileDownloadDispatchWorker
private
def reload_queue?
@pending_lfs_downloads.size < MAX_CONCURRENT_DOWNLOADS
@pending_downloads.size < MAX_CONCURRENT_DOWNLOADS
end
def over_time?
......@@ -61,32 +62,62 @@ class GeoFileDownloadDispatchWorker
end
def load_pending_downloads
@pending_lfs_downloads = find_lfs_object_ids(DB_RETRIEVE_BATCH)
lfs_object_ids = find_lfs_object_ids(DB_RETRIEVE_BATCH)
objects_ids = find_object_ids(DB_RETRIEVE_BATCH)
@pending_downloads = interleave(lfs_object_ids, objects_ids)
end
def interleave(first, second)
if first.length >= second.length
first.zip(second)
else
second.zip(first).map(&:reverse)
end.flatten(1).compact.take(DB_RETRIEVE_BATCH)
end
def downloads_remain?
@pending_lfs_downloads.size
@pending_downloads.size
end
def schedule_lfs_downloads
num_to_schedule = [MAX_CONCURRENT_DOWNLOADS - job_ids.size, @pending_lfs_downloads.size].min
def schedule_downloads
num_to_schedule = [MAX_CONCURRENT_DOWNLOADS - job_ids.size, @pending_downloads.size].min
return unless downloads_remain?
num_to_schedule.times do
lfs_id = @pending_lfs_downloads.shift
job_id = GeoFileDownloadWorker.perform_async(:lfs, lfs_id)
object_db_id, object_type = @pending_downloads.shift
job_id = GeoFileDownloadWorker.perform_async(object_type, object_db_id)
if job_id
@scheduled_lfs_jobs << { job_id: job_id, id: lfs_id }
@scheduled_jobs << { id: object_db_id, type: object_type, job_id: job_id }
end
end
end
def find_object_ids(limit)
downloaded_ids = find_downloaded_ids([:attachment, :avatar, :file])
Upload.where.not(id: downloaded_ids)
.order(created_at: :desc)
.limit(limit)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').downcase] }
end
def find_lfs_object_ids(limit)
downloaded_ids = Geo::FileRegistry.where(file_type: 'lfs').pluck(:file_id)
downloaded_ids = (downloaded_ids + scheduled_lfs_ids).uniq
LfsObject.where.not(id: downloaded_ids).order(created_at: :desc).limit(limit).pluck(:id)
downloaded_ids = find_downloaded_ids([:lfs])
LfsObject.where.not(id: downloaded_ids)
.order(created_at: :desc)
.limit(limit)
.pluck(:id)
.map { |id| [id, :lfs] }
end
def find_downloaded_ids(file_types)
downloaded_ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
(downloaded_ids + scheduled_ids(file_types)).uniq
end
def update_jobs_in_progress
......@@ -95,15 +126,15 @@ class GeoFileDownloadDispatchWorker
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_lfs_jobs = @scheduled_lfs_jobs.zip(status).map { |(job, completed)| job if completed }.compact
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def job_ids
@scheduled_lfs_jobs.map { |data| data[:job_id] }
@scheduled_jobs.map { |data| data[:job_id] }
end
def scheduled_lfs_ids
@scheduled_lfs_jobs.map { |data| data[:id] }
def scheduled_ids(types)
@scheduled_jobs.select { |data| types.include?(data[:type]) }.map { |data| data[:id] }
end
def try_obtain_lease
......
---
title: 'Geo: Add support to sync avatars and attachments'
merge_request: 1562
author:
......@@ -775,12 +775,16 @@ module API
end
class GeoNodeStatus < Grape::Entity
expose :id
expose :health
expose :healthy?, as: :healthy
expose :repositories_count
expose :repositories_synced_count
expose :repositories_failed_count
expose :lfs_objects_count
expose :lfs_objects_synced_count
expose :attachments_count
expose :attachments_synced_count
end
class PersonalAccessToken < Grape::Entity
......
......@@ -37,7 +37,7 @@ module API
require_node_to_be_secondary!
require_node_to_have_tracking_db!
present GeoNodeStatus.new, with: Entities::GeoNodeStatus
present GeoNodeStatus.new(id: Gitlab::Geo.current_node.id), with: Entities::GeoNodeStatus
end
# Enqueue a batch of IDs of wiki's projects to have their
......
module Gitlab
module Geo
class FileDownloader
attr_reader :object_db_id
attr_reader :object_type, :object_db_id
def initialize(object_db_id)
def initialize(object_type, object_db_id)
@object_type = object_type
@object_db_id = object_db_id
end
......@@ -12,7 +13,11 @@ module Gitlab
# Subclasses should return the number of bytes downloaded,
# or nil or -1 if a failure occurred.
def execute
raise NotImplementedError
upload = Upload.find_by(id: object_db_id)
return unless upload.present?
transfer = ::Gitlab::Geo::FileTransfer.new(object_type.to_sym, upload)
transfer.download_from_primary
end
end
end
......
module Gitlab
module Geo
class FileTransfer < Transfer
def initialize(file_type, upload)
@file_type = file_type
@file_id = upload.id
@filename = upload.path
@request_data = build_request_data(upload)
end
private
def build_request_data(upload)
{
id: upload.model_id,
type: upload.model_type,
checksum: upload.checksum
}
end
end
end
end
module Gitlab
module Geo
class FileUploader
attr_reader :object_db_id, :message
def initialize(object_db_id, message)
@object_db_id = object_db_id
@message = message
end
def execute
recorded_file = Upload.find_by(id: object_db_id)
return error unless recorded_file&.exist?
return error unless valid?(recorded_file)
success(CarrierWave::SanitizedFile.new(recorded_file.absolute_path))
end
private
def valid?(recorded_file)
matches_requested_model?(recorded_file) &&
matches_checksum?(recorded_file)
end
def matches_requested_model?(recorded_file)
message[:id] == recorded_file.model_id &&
message[:type] == recorded_file.model_type
end
def matches_checksum?(recorded_file)
message[:checksum] == Upload.hexdigest(recorded_file.absolute_path)
end
def success(file)
{ code: :ok, message: 'Success', file: file }
end
def error(message = 'File not found')
{ code: :not_found, message: message }
end
end
end
end
......@@ -2,8 +2,7 @@ module Gitlab
module Geo
class LfsDownloader < FileDownloader
def execute
lfs_object = LfsObject.find_by_id(object_db_id)
lfs_object = LfsObject.find_by(id: object_db_id)
return unless lfs_object.present?
transfer = ::Gitlab::Geo::LfsTransfer.new(lfs_object)
......
......@@ -11,7 +11,7 @@ module Gitlab
private
def lfs_request_data(lfs_object)
{ sha256: lfs_object.oid }
{ checksum: lfs_object.oid }
end
end
end
......
module Gitlab
module Geo
class LfsUploader < FileUploader
def execute
lfs_object = LfsObject.find_by(id: object_db_id)
return error unless lfs_object.present?
return error if message[:checksum] != lfs_object.oid
unless lfs_object.file.present? && lfs_object.file.exists?
return error('LFS object does not have a file')
end
success(lfs_object.file)
end
end
end
end
......@@ -213,6 +213,8 @@ describe Admin::GeoNodesController do
GeoNodeStatus.new(
id: 1,
health: nil,
attachments_count: 329,
attachments_synced_count: 141,
lfs_objects_count: 256,
lfs_objects_synced_count: 123,
repositories_count: 10,
......
......@@ -4,18 +4,21 @@
"id",
"healthy",
"health",
"attachments_count",
"attachments_synced_count",
"lfs_objects_count",
"lfs_objects_synced_count",
"lfs_objects_synced_in_percentage",
"repositories_count",
"repositories_failed_count",
"repositories_synced_count",
"repositories_synced_in_percentage"
"repositories_synced_count"
],
"properties" : {
"id": { "type": "integer" },
"healthy": { "type": "boolean" },
"health": { "type": "string" },
"attachments_count": { "type": "integer" },
"attachments_synced_count": { "type": "integer" },
"attachments_synced_in_percentage": { "type": "string" },
"lfs_objects_count": { "type": "integer" },
"lfs_objects_synced_count": { "type": "integer" },
"lfs_objects_synced_in_percentage": { "type": "string" },
......
......@@ -4,7 +4,7 @@ describe Gitlab::Geo::LfsDownloader do
let(:lfs_object) { create(:lfs_object) }
subject do
described_class.new(lfs_object.id)
described_class.new(:lfs, lfs_object.id)
end
context '#download_from_primary' do
......@@ -16,7 +16,7 @@ describe Gitlab::Geo::LfsDownloader do
end
it 'with unknown LFS object' do
expect(described_class.new(10000)).not_to receive(:download_from_primary)
expect(described_class.new(:lfs, 10000)).not_to receive(:download_from_primary)
expect(subject.execute).to be_nil
end
......
......@@ -31,6 +31,52 @@ describe GeoNodeStatus, model: true do
end
end
describe '#attachments_synced_count' do
it 'does not count synced files that were replaced' do
user = create(:user, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png'))
subject = described_class.new
expect(subject.attachments_count).to eq(1)
expect(subject.attachments_synced_count).to eq(0)
upload = Upload.find_by(model: user, uploader: 'AvatarUploader')
Geo::FileRegistry.create(file_type: :avatar, file_id: upload.id)
subject = described_class.new
expect(subject.attachments_count).to eq(1)
expect(subject.attachments_synced_count).to eq(1)
user.update(avatar: fixture_file_upload(Rails.root + 'spec/fixtures/rails_sample.jpg', 'image/jpg'))
subject = described_class.new
expect(subject.attachments_count).to eq(1)
expect(subject.attachments_synced_count).to eq(0)
upload = Upload.find_by(model: user, uploader: 'AvatarUploader')
Geo::FileRegistry.create(file_type: :avatar, file_id: upload.id)
subject = described_class.new
expect(subject.attachments_count).to eq(1)
expect(subject.attachments_synced_count).to eq(1)
end
end
describe '#attachments_synced_in_percentage' do
it 'returns 0 when no objects are available' do
subject.attachments_count = 0
subject.attachments_synced_count = 0
expect(subject.attachments_synced_in_percentage).to eq(0)
end
it 'returns the right percentage' do
subject.attachments_count = 4
subject.attachments_synced_count = 1
expect(subject.attachments_synced_in_percentage).to be_within(0.0001).of(25)
end
end
describe '#lfs_objects_synced_in_percentage' do
it 'returns 0 when no objects are available' do
subject.lfs_objects_count = 0
......@@ -65,6 +111,8 @@ describe GeoNodeStatus, model: true do
context 'when no values are available' do
it 'returns 0 for each attribute' do
subject.attachments_count = nil
subject.attachments_synced_count = nil
subject.lfs_objects_count = nil
subject.lfs_objects_synced_count = nil
subject.repositories_count = nil
......@@ -78,6 +126,9 @@ describe GeoNodeStatus, model: true do
expect(subject.lfs_objects_count).to be_zero
expect(subject.lfs_objects_synced_count).to be_zero
expect(subject.lfs_objects_synced_in_percentage).to be_zero
expect(subject.attachments_count).to be_zero
expect(subject.attachments_synced_count).to be_zero
expect(subject.attachments_synced_in_percentage).to be_zero
end
end
end
......@@ -90,6 +90,18 @@ describe Upload, type: :model do
end
end
describe '.hexdigest' do
it 'calculates the SHA256 sum' do
expected = Digest::SHA256.file(__FILE__).hexdigest
expect(described_class.hexdigest(__FILE__)).to eq expected
end
it 'returns nil for a non-existant file' do
expect(described_class.hexdigest("#{__FILE__}-nope")).to be_nil
end
end
describe '#absolute_path' do
it 'returns the path directly when already absolute' do
path = '/path/to/namespace/project/secret/file.jpg'
......
......@@ -17,12 +17,14 @@ describe API::Geo, api: true do
describe 'POST /geo/receive_events authentication' do
it 'denies access if token is not present' do
post api('/geo/receive_events')
expect(response.status).to eq 401
expect(response).to have_http_status(401)
end
it 'denies access if token is invalid' do
post api('/geo/receive_events'), nil, { 'X-Gitlab-Token' => 'nothing' }
expect(response.status).to eq 401
expect(response).to have_http_status(401)
end
end
......@@ -73,12 +75,14 @@ describe API::Geo, api: true do
it 'enqueues on disk key creation if admin and correct params' do
post api('/geo/receive_events'), key_create_payload, geo_token_header
expect(response.status).to eq 201
expect(response).to have_http_status(201)
end
it 'enqueues on disk key removal if admin and correct params' do
post api('/geo/receive_events'), key_destroy_payload, geo_token_header
expect(response.status).to eq 201
expect(response).to have_http_status(201)
end
end
......@@ -98,7 +102,8 @@ describe API::Geo, api: true do
it 'starts refresh process if admin and correct params' do
post api('/geo/receive_events'), push_payload, geo_token_header
expect(response.status).to eq 201
expect(response).to have_http_status(201)
end
end
......@@ -117,7 +122,117 @@ describe API::Geo, api: true do
it 'starts refresh process if admin and correct params' do
post api('/geo/receive_events'), tag_push_payload, geo_token_header
expect(response.status).to eq 201
expect(response).to have_http_status(201)
end
end
describe 'GET /geo/transfers/attachment/1' do
let!(:secondary_node) { create(:geo_node) }
let(:note) { create(:note, :with_attachment) }
let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') }
let(:transfer) { Gitlab::Geo::FileTransfer.new(:attachment, upload) }
let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers }
before do
allow_any_instance_of(Gitlab::Geo::TransferRequest).to receive(:requesting_node).and_return(secondary_node)
end
it 'responds with 401 with invalid auth header' do
get api("/geo/transfers/attachment/#{upload.id}"), nil, Authorization: 'Test'
expect(response).to have_http_status(401)
end
context 'attachment file exists' do
it 'responds with 200 with X-Sendfile' do
get api("/geo/transfers/attachment/#{upload.id}"), nil, req_header
expect(response).to have_http_status(200)
expect(response.headers['Content-Type']).to eq('application/octet-stream')
expect(response.headers['X-Sendfile']).to eq(note.attachment.path)
end
end
context 'attachment does not exist' do
it 'responds with 404' do
get api("/geo/transfers/attachment/100000"), nil, req_header
expect(response).to have_http_status(404)
end
end
end
describe 'GET /geo/transfers/avatar/1' do
let!(:secondary_node) { create(:geo_node) }
let(:user) { create(:user, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: user, uploader: 'AvatarUploader') }
let(:transfer) { Gitlab::Geo::FileTransfer.new(:avatar, upload) }
let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers }
before do
allow_any_instance_of(Gitlab::Geo::TransferRequest).to receive(:requesting_node).and_return(secondary_node)
end
it 'responds with 401 with invalid auth header' do
get api("/geo/transfers/avatar/#{upload.id}"), nil, Authorization: 'Test'
expect(response).to have_http_status(401)
end
context 'avatar file exists' do
it 'responds with 200 with X-Sendfile' do
get api("/geo/transfers/avatar/#{upload.id}"), nil, req_header
expect(response).to have_http_status(200)
expect(response.headers['Content-Type']).to eq('application/octet-stream')
expect(response.headers['X-Sendfile']).to eq(user.avatar.path)
end
end
context 'avatar does not exist' do
it 'responds with 404' do
get api("/geo/transfers/avatar/100000"), nil, req_header
expect(response).to have_http_status(404)
end
end
end
describe 'GET /geo/transfers/file/1' do
let!(:secondary_node) { create(:geo_node) }
let(:project) { create(:empty_project) }
let(:upload) { Upload.find_by(model: project, uploader: 'FileUploader') }
let(:transfer) { Gitlab::Geo::FileTransfer.new(:file, upload) }
let(:req_header) { Gitlab::Geo::TransferRequest.new(transfer.request_data).headers }
before do
allow_any_instance_of(Gitlab::Geo::TransferRequest).to receive(:requesting_node).and_return(secondary_node)
FileUploader.new(project).store!(fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png'))
end
it 'responds with 401 with invalid auth header' do
get api("/geo/transfers/file/#{upload.id}"), nil, Authorization: 'Test'
expect(response).to have_http_status(401)
end
context 'file file exists' do
it 'responds with 200 with X-Sendfile' do
get api("/geo/transfers/file/#{upload.id}"), nil, req_header
expect(response).to have_http_status(200)
expect(response.headers['Content-Type']).to eq('application/octet-stream')
expect(response.headers['X-Sendfile']).to end_with('dk.png')
end
end
context 'file does not exist' do
it 'responds with 404' do
get api("/geo/transfers/file/100000"), nil, req_header
expect(response).to have_http_status(404)
end
end
end
......@@ -136,14 +251,14 @@ describe API::Geo, api: true do
it 'responds with 401 with invalid auth header' do
get api("/geo/transfers/lfs/#{lfs_object.id}"), nil, Authorization: 'Test'
expect(response.status).to eq 401
expect(response).to have_http_status(401)
end
context 'LFS file exists' do
it 'responds with 200 with X-Sendfile' do
get api("/geo/transfers/lfs/#{lfs_object.id}"), nil, req_header
expect(response.status).to eq 200
expect(response).to have_http_status(200)
expect(response.headers['Content-Type']).to eq('application/octet-stream')
expect(response.headers['X-Sendfile']).to eq(lfs_object.file.path)
end
......@@ -153,7 +268,7 @@ describe API::Geo, api: true do
it 'responds with 404' do
get api("/geo/transfers/lfs/100000"), nil, req_header
expect(response.status).to eq 404
expect(response).to have_http_status(404)
end
end
end
......@@ -165,7 +280,7 @@ describe API::Geo, api: true do
it 'responds with 401 with invalid auth header' do
get api('/geo/status'), nil, Authorization: 'Test'
expect(response.status).to eq 401
expect(response).to have_http_status(401)
end
context 'when requesting secondary node with valid auth header' do
......@@ -177,8 +292,8 @@ describe API::Geo, api: true do
it 'responds with 200' do
get api('/geo/status'), nil, request.headers
expect(response.status).to eq 200
expect(response.headers['Content-Type']).to eq('application/json')
expect(response).to have_http_status(200)
expect(response).to match_response_schema('geo_node_status')
end
it 'responds with a 404 when the tracking database is disabled' do
......
......@@ -5,6 +5,8 @@ describe GeoNodeStatusEntity do
GeoNodeStatus.new(
id: 1,
health: nil,
attachments_count: 329,
attachments_synced_count: 141,
lfs_objects_count: 256,
lfs_objects_synced_count: 123,
repositories_count: 10,
......@@ -25,6 +27,9 @@ describe GeoNodeStatusEntity do
it { is_expected.to have_key(:id) }
it { is_expected.to have_key(:healthy) }
it { is_expected.to have_key(:health) }
it { is_expected.to have_key(:attachments_count) }
it { is_expected.to have_key(:attachments_synced_count) }
it { is_expected.to have_key(:attachments_synced_in_percentage) }
it { is_expected.to have_key(:lfs_objects_count) }
it { is_expected.to have_key(:lfs_objects_synced_count) }
it { is_expected.to have_key(:lfs_objects_synced_in_percentage) }
......@@ -73,6 +78,12 @@ describe GeoNodeStatusEntity do
end
end
describe '#attachments_synced_in_percentage' do
it 'formats as percentage' do
expect(subject[:attachments_synced_in_percentage]).to eq '42.86%'
end
end
describe '#lfs_objects_synced_in_percentage' do
it 'formats as percentage' do
expect(subject[:lfs_objects_synced_in_percentage]).to eq '48.05%'
......
require 'spec_helper'
describe Geo::FileDownloadService, services: true do
let(:lfs_object) { create(:lfs_object) }
let!(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node) }
subject { Geo::FileDownloadService.new(:lfs, lfs_object.id) }
before do
create(:geo_node, :primary)
allow(described_class).to receive(:current_node) { secondary }
end
describe '#execute' do
it 'downloads an LFS object' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::LfsTransfer)
.to receive(:download_from_primary).and_return(100)
context 'user avatar' do
let(:user) { create(:user, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: user, uploader: 'AvatarUploader') }
subject { described_class.new(:avatar, upload.id) }
it 'downloads a user avatar' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
expect{ subject.execute }.to change { Geo::FileRegistry.count }.by(1)
end
end
context 'group avatar' do
let(:group) { create(:group, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: group, uploader: 'AvatarUploader') }
subject { described_class.new(:avatar, upload.id) }
it 'downloads a group avatar' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
expect{ subject.execute }.to change { Geo::FileRegistry.count }.by(1)
end
end
context 'project avatar' do
let(:project) { create(:empty_project, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: project, uploader: 'AvatarUploader') }
subject { described_class.new(:avatar, upload.id) }
it 'downloads a project avatar' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
expect{ subject.execute }.to change { Geo::FileRegistry.count }.by(1)
end
end
context 'with an attachment' do
let(:note) { create(:note, :with_attachment) }
let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') }
subject { described_class.new(:attachment, upload.id) }
it 'downloads the attachment' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
expect{ subject.execute }.to change { Geo::FileRegistry.count }.by(1)
end
end
context 'with file upload' do
let(:project) { create(:empty_project) }
let(:upload) { Upload.find_by(model: project, uploader: 'FileUploader') }
subject { described_class.new(:file, upload.id) }
before do
FileUploader.new(project).store!(fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png'))
end
it 'downloads the file' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::FileTransfer)
.to receive(:download_from_primary).and_return(100)
expect{ subject.execute }.to change { Geo::FileRegistry.count }.by(1)
end
end
context 'LFS object' do
let(:lfs_object) { create(:lfs_object) }
subject { described_class.new(:lfs, lfs_object.id) }
it 'downloads an LFS object' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::LfsTransfer)
.to receive(:download_from_primary).and_return(100)
expect{ subject.execute }.to change { Geo::FileRegistry.count }.by(1)
expect{ subject.execute }.to change { Geo::FileRegistry.count }.by(1)
end
end
it 'bad object type' do
expect{ described_class.new(:bad, lfs_object.id).execute }.to raise_error(NameError)
context 'bad object type' do
it 'raises an error' do
expect{ described_class.new(:bad, 1).execute }.to raise_error(NameError)
end
end
end
end
require 'spec_helper'
describe Geo::FileUploadService, services: true do
let(:lfs_object) { create(:lfs_object, :with_file) }
let(:params) { { id: lfs_object.id, type: 'lfs' } }
let(:lfs_transfer) { Gitlab::Geo::LfsTransfer.new(lfs_object) }
let(:transfer_request) { Gitlab::Geo::TransferRequest.new(lfs_transfer.request_data) }
let(:req_header) { transfer_request.headers['Authorization'] }
before do
create(:geo_node, :current)
end
let!(:node) { create(:geo_node, :current) }
describe '#execute' do
it 'sends LFS file' do
service = described_class.new(params, req_header)
context 'user avatar' do
let(:user) { create(:user, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: user, uploader: 'AvatarUploader') }
let(:params) { { id: upload.id, type: 'avatar' } }
let(:file_transfer) { Gitlab::Geo::FileTransfer.new(:avatar, upload) }
let(:transfer_request) { Gitlab::Geo::TransferRequest.new(file_transfer.request_data) }
let(:req_header) { transfer_request.headers['Authorization'] }
it 'sends avatar file' do
service = described_class.new(params, req_header)
response = service.execute
response = service.execute
expect(response[:code]).to eq(:ok)
expect(response[:file].path).to eq(user.avatar.path)
end
expect(response[:code]).to eq(:ok)
expect(response[:file].file.path).to eq(lfs_object.file.path)
it 'returns nil if no authorization' do
service = described_class.new(params, nil)
expect(service.execute).to be_nil
end
end
it 'returns nil if no authorization' do
service = described_class.new(params, nil)
context 'group avatar' do
let(:group) { create(:group, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: group, uploader: 'AvatarUploader') }
let(:params) { { id: upload.id, type: 'avatar' } }
let(:file_transfer) { Gitlab::Geo::FileTransfer.new(:avatar, upload) }
let(:transfer_request) { Gitlab::Geo::TransferRequest.new(file_transfer.request_data) }
let(:req_header) { transfer_request.headers['Authorization'] }
it 'sends avatar file' do
service = described_class.new(params, req_header)
response = service.execute
expect(response[:code]).to eq(:ok)
expect(response[:file].path).to eq(group.avatar.path)
end
it 'returns nil if no authorization' do
service = described_class.new(params, nil)
expect(service.execute).to be_nil
end
end
context 'project avatar' do
let(:project) { create(:empty_project, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png')) }
let(:upload) { Upload.find_by(model: project, uploader: 'AvatarUploader') }
let(:params) { { id: upload.id, type: 'avatar' } }
let(:file_transfer) { Gitlab::Geo::FileTransfer.new(:avatar, upload) }
let(:transfer_request) { Gitlab::Geo::TransferRequest.new(file_transfer.request_data) }
let(:req_header) { transfer_request.headers['Authorization'] }
it 'sends avatar file' do
service = described_class.new(params, req_header)
response = service.execute
expect(response[:code]).to eq(:ok)
expect(response[:file].path).to eq(project.avatar.path)
end
it 'returns nil if no authorization' do
service = described_class.new(params, nil)
expect(service.execute).to be_nil
end
end
context 'attachment' do
let(:note) { create(:note, :with_attachment) }
let(:upload) { Upload.find_by(model: note, uploader: 'AttachmentUploader') }
let(:params) { { id: upload.id, type: 'attachment' } }
let(:file_transfer) { Gitlab::Geo::FileTransfer.new(:attachment, upload) }
let(:transfer_request) { Gitlab::Geo::TransferRequest.new(file_transfer.request_data) }
let(:req_header) { transfer_request.headers['Authorization'] }
it 'sends attachment file' do
service = described_class.new(params, req_header)
response = service.execute
expect(response[:code]).to eq(:ok)
expect(response[:file].path).to eq(note.attachment.path)
end
it 'returns nil if no authorization' do
service = described_class.new(params, nil)
expect(service.execute).to be_nil
end
end
context 'file upload' do
let(:project) { create(:empty_project) }
let(:upload) { Upload.find_by(model: project, uploader: 'FileUploader') }
let(:params) { { id: upload.id, type: 'file' } }
let(:file_transfer) { Gitlab::Geo::FileTransfer.new(:file, upload) }
let(:transfer_request) { Gitlab::Geo::TransferRequest.new(file_transfer.request_data) }
let(:req_header) { transfer_request.headers['Authorization'] }
let(:file) { fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png') }
before do
FileUploader.new(project).store!(file)
end
it 'sends the file' do
service = described_class.new(params, req_header)
response = service.execute
expect(response[:code]).to eq(:ok)
expect(response[:file].path).to end_with('dk.png')
end
it 'returns nil if no authorization' do
service = described_class.new(params, nil)
expect(service.execute).to be_nil
end
end
context 'LFS Object' do
let(:lfs_object) { create(:lfs_object, :with_file) }
let(:params) { { id: lfs_object.id, type: 'lfs' } }
let(:lfs_transfer) { Gitlab::Geo::LfsTransfer.new(lfs_object) }
let(:transfer_request) { Gitlab::Geo::TransferRequest.new(lfs_transfer.request_data) }
let(:req_header) { transfer_request.headers['Authorization'] }
it 'sends LFS file' do
service = described_class.new(params, req_header)
response = service.execute
expect(response[:code]).to eq(:ok)
expect(response[:file].path).to eq(lfs_object.file.path)
end
it 'returns nil if no authorization' do
service = described_class.new(params, nil)
expect(service.execute).to be_nil
expect(service.execute).to be_nil
end
end
end
end
......@@ -45,21 +45,25 @@ describe GeoFileDownloadDispatchWorker do
# Test the case where we have:
#
# 1. A total of 6 files in the queue, and we can load a maxmimum of 5 and send 2 at a time.
# 1. A total of 8 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do
stub_const('GeoFileDownloadDispatchWorker::DB_RETRIEVE_BATCH', 5)
stub_const('GeoFileDownloadDispatchWorker::MAX_CONCURRENT_DOWNLOADS', 2)
create_list(:lfs_object, 6, :with_file)
avatar = fixture_file_upload(Rails.root.join('spec/fixtures/dk.png'))
create_list(:lfs_object, 2, :with_file)
create_list(:user, 2, avatar: avatar)
create_list(:note, 2, :with_attachment)
create(:appearance, logo: avatar, header_logo: avatar)
allow_any_instance_of(described_class).to receive(:over_time?).and_return(false)
expect(GeoFileDownloadWorker).to receive(:perform_async).exactly(6).times.and_call_original
# For 6 downloads, we expect three database reloads:
expect(GeoFileDownloadWorker).to receive(:perform_async).exactly(8).times.and_call_original
# For 8 downloads, we expect three database reloads:
# 1. Load the first batch of 5.
# 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the remaining 2.
# 3. Since the second reload filled the pipe with 2, we need to do a final reload to ensure
# 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the remaining 4.
# 3. Since the second reload filled the pipe with 4, we need to do a final reload to ensure
# zero are left.
expect(subject).to receive(:load_pending_downloads).exactly(3).times.and_call_original
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment