Add support on GitLab Geo download scheduler for recorded files

parent f6249945
...@@ -9,8 +9,8 @@ class GeoFileDownloadDispatchWorker ...@@ -9,8 +9,8 @@ class GeoFileDownloadDispatchWorker
MAX_CONCURRENT_DOWNLOADS = 10.freeze MAX_CONCURRENT_DOWNLOADS = 10.freeze
def initialize def initialize
@pending_lfs_downloads = [] @pending_downloads = []
@scheduled_lfs_jobs = [] @scheduled_jobs = []
end end
# The scheduling works as the following: # The scheduling works as the following:
...@@ -34,6 +34,7 @@ class GeoFileDownloadDispatchWorker ...@@ -34,6 +34,7 @@ class GeoFileDownloadDispatchWorker
update_jobs_in_progress update_jobs_in_progress
load_pending_downloads if reload_queue? load_pending_downloads if reload_queue?
# If we are still under the limit after refreshing our DB, we can end # If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers. # after scheduling the remaining transfers.
last_batch = reload_queue? last_batch = reload_queue?
...@@ -41,7 +42,7 @@ class GeoFileDownloadDispatchWorker ...@@ -41,7 +42,7 @@ class GeoFileDownloadDispatchWorker
break if over_time? break if over_time?
break unless downloads_remain? break unless downloads_remain?
schedule_lfs_downloads schedule_downloads
break if last_batch break if last_batch
...@@ -53,7 +54,7 @@ class GeoFileDownloadDispatchWorker ...@@ -53,7 +54,7 @@ class GeoFileDownloadDispatchWorker
private private
def reload_queue? def reload_queue?
@pending_lfs_downloads.size < MAX_CONCURRENT_DOWNLOADS @pending_downloads.size < MAX_CONCURRENT_DOWNLOADS
end end
def over_time? def over_time?
...@@ -61,32 +62,54 @@ class GeoFileDownloadDispatchWorker ...@@ -61,32 +62,54 @@ class GeoFileDownloadDispatchWorker
end end
def load_pending_downloads def load_pending_downloads
@pending_lfs_downloads = find_lfs_object_ids(DB_RETRIEVE_BATCH) lfs_object_ids = find_lfs_object_ids(DB_RETRIEVE_BATCH)
objects_ids = find_object_ids(DB_RETRIEVE_BATCH - lfs_object_ids.size)
@pending_downloads = lfs_object_ids + objects_ids
end end
def downloads_remain? def downloads_remain?
@pending_lfs_downloads.size @pending_downloads.size
end end
def schedule_lfs_downloads def schedule_downloads
num_to_schedule = [MAX_CONCURRENT_DOWNLOADS - job_ids.size, @pending_lfs_downloads.size].min num_to_schedule = [MAX_CONCURRENT_DOWNLOADS - job_ids.size, @pending_downloads.size].min
return unless downloads_remain? return unless downloads_remain?
num_to_schedule.times do num_to_schedule.times do
lfs_id = @pending_lfs_downloads.shift object_id, object_type = @pending_downloads.shift
job_id = GeoFileDownloadWorker.perform_async(:lfs, lfs_id) job_id = GeoFileDownloadWorker.perform_async(object_type, object_id)
if job_id if job_id
@scheduled_lfs_jobs << { job_id: job_id, id: lfs_id } @scheduled_jobs << { id: object_id, type: object_type, job_id: job_id }
end end
end end
end end
def find_object_ids(limit)
downloaded_ids = find_downloaded_ids([:attachment, :avatar, :file])
Upload.where.not(id: downloaded_ids)
.order(created_at: :desc)
.limit(limit)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.gsub('Uploader', '').downcase] }
end
def find_lfs_object_ids(limit) def find_lfs_object_ids(limit)
downloaded_ids = Geo::FileRegistry.where(file_type: 'lfs').pluck(:file_id) downloaded_ids = find_downloaded_ids([:lfs])
downloaded_ids = (downloaded_ids + scheduled_lfs_ids).uniq
LfsObject.where.not(id: downloaded_ids).order(created_at: :desc).limit(limit).pluck(:id) LfsObject.where.not(id: downloaded_ids)
.order(created_at: :desc)
.limit(limit)
.pluck(:id)
.map { |id| [id, :lfs] }
end
def find_downloaded_ids(file_types)
downloaded_lfs_ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
downloaded_lfs_ids = (downloaded_lfs_ids + scheduled_ids(file_types)).uniq
end end
def update_jobs_in_progress def update_jobs_in_progress
...@@ -95,15 +118,15 @@ class GeoFileDownloadDispatchWorker ...@@ -95,15 +118,15 @@ class GeoFileDownloadDispatchWorker
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise. # SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ] # For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed. # Next, filter out the jobs that have completed.
@scheduled_lfs_jobs = @scheduled_lfs_jobs.zip(status).map { |(job, completed)| job if completed }.compact @scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end end
def job_ids def job_ids
@scheduled_lfs_jobs.map { |data| data[:job_id] } @scheduled_jobs.map { |data| data[:job_id] }
end end
def scheduled_lfs_ids def scheduled_ids(types)
@scheduled_lfs_jobs.map { |data| data[:id] } @scheduled_jobs.select { |data| types.include?(data[:type]) }.map { |data| data[:id] }
end end
def try_obtain_lease def try_obtain_lease
......
...@@ -45,21 +45,25 @@ describe GeoFileDownloadDispatchWorker do ...@@ -45,21 +45,25 @@ describe GeoFileDownloadDispatchWorker do
# Test the case where we have: # Test the case where we have:
# #
# 1. A total of 6 files in the queue, and we can load a maxmimum of 5 and send 2 at a time. # 1. A total of 8 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again. # 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do it 'attempts to load a new batch without pending downloads' do
stub_const('GeoFileDownloadDispatchWorker::DB_RETRIEVE_BATCH', 5) stub_const('GeoFileDownloadDispatchWorker::DB_RETRIEVE_BATCH', 5)
stub_const('GeoFileDownloadDispatchWorker::MAX_CONCURRENT_DOWNLOADS', 2) stub_const('GeoFileDownloadDispatchWorker::MAX_CONCURRENT_DOWNLOADS', 2)
create_list(:lfs_object, 6, :with_file) avatar = fixture_file_upload(Rails.root.join('spec/fixtures/dk.png'))
create_list(:lfs_object, 2, :with_file)
create_list(:user, 2, avatar: avatar)
create_list(:note, 2, :with_attachment)
create(:appearance, logo: avatar, header_logo: avatar)
allow_any_instance_of(described_class).to receive(:over_time?).and_return(false) allow_any_instance_of(described_class).to receive(:over_time?).and_return(false)
expect(GeoFileDownloadWorker).to receive(:perform_async).exactly(6).times.and_call_original expect(GeoFileDownloadWorker).to receive(:perform_async).exactly(8).times.and_call_original
# For 6 downloads, we expect three database reloads: # For 8 downloads, we expect three database reloads:
# 1. Load the first batch of 5. # 1. Load the first batch of 5.
# 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the remaining 2. # 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the remaining 4.
# 3. Since the second reload filled the pipe with 2, we need to do a final reload to ensure # 3. Since the second reload filled the pipe with 4, we need to do a final reload to ensure
# zero are left. # zero are left.
expect(subject).to receive(:load_pending_downloads).exactly(3).times.and_call_original expect(subject).to receive(:load_pending_downloads).exactly(3).times.and_call_original
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment