Commit e6e510d2 authored by Michael Kozono's avatar Michael Kozono

Refactor FileDownloadDispatchWorker

Use JobFinders to get batches of job arguments.
parent 1d4c67dc
......@@ -92,6 +92,20 @@ module Geo
relation.limit(batch_size)
end
def find_retryable_failed_attachments_registries(batch_size:, except_file_ids: [])
find_failed_attachments_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_retryable_synced_missing_on_primary_attachments_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_attachments_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_failed_attachments_registries
Geo::FileRegistry.attachments.failed
end
......
......@@ -76,6 +76,20 @@ module Geo
job_artifacts.with_files_stored_locally
end
def find_retryable_failed_job_artifacts_registries(batch_size:, except_artifact_ids: [])
find_failed_job_artifacts_registries
.retry_due
.where.not(artifact_id: except_artifact_ids)
.limit(batch_size)
end
def find_retryable_synced_missing_on_primary_job_artifacts_registries(batch_size:, except_artifact_ids: [])
find_synced_missing_on_primary_job_artifacts_registries
.retry_due
.where.not(artifact_id: except_artifact_ids)
.limit(batch_size)
end
def find_synced_job_artifacts_registries
Geo::JobArtifactRegistry.synced
end
......
......@@ -76,6 +76,20 @@ module Geo
lfs_objects.with_files_stored_locally
end
def find_retryable_failed_lfs_objects_registries(batch_size:, except_file_ids: [])
find_failed_lfs_objects_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_retryable_synced_missing_on_primary_lfs_objects_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_lfs_objects_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_failed_lfs_objects_registries
Geo::FileRegistry.lfs_objects.failed
end
......
......@@ -14,119 +14,49 @@ module Geo
{ id: object_db_id, type: object_type, job_id: job_id } if job_id
end
def attachments_finder
@attachments_finder ||= AttachmentRegistryFinder.new(current_node: current_node)
end
def lfs_objects_finder
@lfs_objects_finder ||= LfsObjectRegistryFinder.new(current_node: current_node)
end
def job_artifacts_finder
@job_artifacts_finder ||= JobArtifactRegistryFinder.new(current_node: current_node)
end
# Pools for new resources to be transferred
#
# @return [Array] resources to be transferred
def load_pending_resources
resources = find_unsynced_objects(batch_size: db_retrieve_batch_size)
resources = find_unsynced_jobs(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.count
if remaining_capacity.zero?
resources
else
resources + find_low_priority_objects(batch_size: remaining_capacity)
end
end
def find_unsynced_objects(batch_size:)
take_batch(find_unsynced_lfs_objects_ids(batch_size: batch_size),
find_unsynced_attachments_ids(batch_size: batch_size),
find_unsynced_job_artifacts_ids(batch_size: batch_size),
batch_size: batch_size)
resources + find_low_priority_jobs(batch_size: remaining_capacity)
end
def find_low_priority_objects(batch_size:)
take_batch(find_failed_attachments_ids(batch_size: batch_size),
find_failed_lfs_objects_ids(batch_size: batch_size),
find_failed_artifacts_ids(batch_size: batch_size),
find_synced_missing_on_primary_attachments_ids(batch_size: batch_size),
find_synced_missing_on_primary_lfs_objects_ids(batch_size: batch_size),
find_synced_missing_on_primary_job_artifacts_ids(batch_size: batch_size),
batch_size: batch_size)
end
def find_unsynced_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_unsynced_lfs_objects(
batch_size: batch_size,
except_file_ids: scheduled_file_ids(:lfs))
.pluck(:id)
.map { |id| ['lfs', id] }
end
def find_unsynced_attachments_ids(batch_size:)
attachments_finder.find_unsynced_attachments(
batch_size: batch_size,
except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:id, :uploader)
.map { |id, uploader| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
def find_unsynced_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_unsynced_job_artifacts(
batch_size: batch_size,
except_artifact_ids: scheduled_file_ids(:job_artifact))
.pluck(:id)
.map { |id| ['job_artifact', id] }
# @return [Array] job arguments of unsynced resources
def find_unsynced_jobs(batch_size:)
find_jobs(sync_statuses: [:unsynced], batch_size: batch_size)
end
def find_failed_attachments_ids(batch_size:)
attachments_finder.find_failed_attachments_registries
.retry_due
.where.not(file_id: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.limit(batch_size)
.pluck(:file_type, :file_id)
# @return [Array] job arguments of low priority resources
def find_low_priority_jobs(batch_size:)
find_jobs(sync_statuses: [:failed, :synced_missing_on_primary], batch_size: batch_size)
end
def find_failed_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_failed_lfs_objects_registries
.retry_due
.where.not(file_id: scheduled_file_ids(:lfs))
.limit(batch_size)
.pluck(:file_id).map { |id| ['lfs', id] }
end
def find_failed_artifacts_ids(batch_size:)
job_artifacts_finder.find_failed_job_artifacts_registries
.retry_due
.where.not(artifact_id: scheduled_file_ids(:job_artifact))
.limit(batch_size)
.pluck(:artifact_id).map { |id| ['job_artifact', id] }
# Get a batch of resources taking equal parts from each resource.
#
# @return [Array] job arguments of a batch of resources
def find_jobs(sync_statuses:, batch_size:)
jobs = job_finders.reduce([]) do |jobs, job_finder|
sync_statuses.reduce(jobs) do |jobs, sync_status|
jobs << job_finder.find_jobs(sync_status: sync_status, batch_size: batch_size)
end
def find_synced_missing_on_primary_attachments_ids(batch_size:)
attachments_finder.find_synced_missing_on_primary_attachments_registries
.retry_due
.where.not(file_id: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.limit(batch_size)
.pluck(:file_type, :file_id)
end
def find_synced_missing_on_primary_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_synced_missing_on_primary_lfs_objects_registries
.retry_due
.where.not(file_id: scheduled_file_ids(:lfs))
.limit(batch_size)
.pluck(:file_id).map { |id| ['lfs', id] }
take_batch(*jobs, batch_size: batch_size)
end
def find_synced_missing_on_primary_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_synced_missing_on_primary_job_artifacts_registries
.retry_due
.where.not(artifact_id: scheduled_file_ids(:job_artifact))
.limit(batch_size)
.pluck(:artifact_id).map { |id| ['job_artifact', id] }
def job_finders
@job_finders ||= [
Geo::FileDownloadDispatchWorker::AttachmentJobFinder.new(scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES)),
Geo::FileDownloadDispatchWorker::LfsObjectJobFinder.new(scheduled_file_ids(:lfs)),
Geo::FileDownloadDispatchWorker::JobArtifactJobFinder.new(scheduled_file_ids(:job_artifact))
]
end
def scheduled_file_ids(file_types)
......
module Geo
class FileDownloadDispatchWorker
class AttachmentJobFinder < JobFinder
def resource_type
:attachment
end
def except_resource_ids_key
:except_file_ids
end
# Why do we need a different `file_type` for each Uploader? Why not just use 'upload'?
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids)
.pluck(:id, :uploader)
.map { |id, uploader| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:file_type, :file_id)
end
def find_synced_missing_on_primary_jobs(batch_size:)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:file_type, :file_id)
end
end
end
end
module Geo
class FileDownloadDispatchWorker
class JobArtifactJobFinder < JobFinder
def resource_type
:job_artifact
end
def resource_id_prefix
:artifact
end
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids)
.pluck(:id)
.map { |id| ['job_artifact', id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:artifact_id).map { |id| ['job_artifact', id] }
end
def find_synced_missing_on_primary_jobs(batch_size:)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:artifact_id).map { |id| ['job_artifact', id] }
end
end
end
end
module Geo
class FileDownloadDispatchWorker
class JobFinder
include Gitlab::Utils::StrongMemoize
attr_reader :registry_finder, :scheduled_file_ids
def initialize(scheduled_file_ids)
current_node = Gitlab::Geo.current_node
@registry_finder = registry_finder_class.new(current_node: current_node)
@scheduled_file_ids = scheduled_file_ids
end
def registry_finder_class
"Geo::#{resource_type.to_s.classify}RegistryFinder".constantize
end
def except_resource_ids_key
:"except_#{resource_id_prefix}_ids"
end
def find_jobs(sync_status:, batch_size:)
self.public_send(:"find_#{sync_status}_jobs", batch_size: batch_size) # rubocop:disable GitlabSecurity/PublicSend
end
def find_failed_registries(batch_size:)
registry_finder.public_send(:"find_retryable_failed_#{resource_type}s_registries", batch_size: batch_size, except_resource_ids_key => scheduled_file_ids) # rubocop:disable GitlabSecurity/PublicSend
end
def find_synced_missing_on_primary_registries(batch_size:)
registry_finder.public_send(:"find_retryable_synced_missing_on_primary_#{resource_type}s_registries", batch_size: batch_size, except_resource_ids_key => scheduled_file_ids) # rubocop:disable GitlabSecurity/PublicSend
end
end
end
end
module Geo
class FileDownloadDispatchWorker
class LfsObjectJobFinder < JobFinder
def resource_type
:lfs_object
end
def except_resource_ids_key
:except_file_ids
end
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids)
.pluck(:id)
.map { |id| ['lfs', id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:file_id).map { |id| ['lfs', id] }
end
def find_synced_missing_on_primary_jobs(batch_size:)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:file_id).map { |id| ['lfs', id] }
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment