Commit 7a543084 authored by Michael Kozono's avatar Michael Kozono

Mark files/artifacts synced if missing on primary

parent b575378a
module Geo module Geo
# This class is responsible for:
# * Finding the appropriate Downloader class for a FileRegistry record
# * Executing the Downloader
# * Marking the FileRegistry record as synced or needing retry
class FileDownloadService < FileService class FileDownloadService < FileService
LEASE_TIMEOUT = 8.hours.freeze LEASE_TIMEOUT = 8.hours.freeze
...@@ -8,13 +12,15 @@ module Geo ...@@ -8,13 +12,15 @@ module Geo
def execute def execute
try_obtain_lease do try_obtain_lease do
start_time = Time.now start_time = Time.now
bytes_downloaded = downloader.execute
success = (bytes_downloaded.present? && bytes_downloaded >= 0) download_result = downloader.execute
log_info("File download",
success: success, mark_as_synced = download_result.success || download_result.primary_missing_file
bytes_downloaded: bytes_downloaded,
download_time_s: (Time.now - start_time).to_f.round(3)) log_file_download(mark_as_synced, download_result, start_time)
update_registry(bytes_downloaded, success: success) update_registry(download_result.bytes_downloaded,
mark_as_synced: mark_as_synced,
missing_on_primary: download_result.primary_missing_file)
end end
end end
...@@ -28,8 +34,21 @@ module Geo ...@@ -28,8 +34,21 @@ module Geo
raise raise
end end
def update_registry(bytes_downloaded, success:) def log_file_download(mark_as_synced, download_result, start_time)
transfer = metadata = {
mark_as_synced: mark_as_synced,
download_success: download_result.success,
bytes_downloaded: download_result.bytes_downloaded,
failed_before_transfer: download_result.failed_before_transfer,
primary_missing_file: download_result.primary_missing_file,
download_time_s: (Time.now - start_time).to_f.round(3)
}
log_info("File download", metadata)
end
def update_registry(bytes_downloaded, mark_as_synced:, missing_on_primary: false)
registry =
if object_type.to_sym == :job_artifact if object_type.to_sym == :job_artifact
Geo::JobArtifactRegistry.find_or_initialize_by(artifact_id: object_db_id) Geo::JobArtifactRegistry.find_or_initialize_by(artifact_id: object_db_id)
else else
...@@ -39,16 +58,17 @@ module Geo ...@@ -39,16 +58,17 @@ module Geo
) )
end end
transfer.bytes = bytes_downloaded registry.bytes = bytes_downloaded
transfer.success = success registry.success = mark_as_synced
registry.missing_on_primary = missing_on_primary
unless success unless mark_as_synced
# We don't limit the amount of retries # We don't limit the amount of retries
transfer.retry_count = (transfer.retry_count || 0) + 1 registry.retry_count = (registry.retry_count || 0) + 1
transfer.retry_at = Time.now + delay(transfer.retry_count).seconds registry.retry_at = Time.now + delay(registry.retry_count).seconds
end end
transfer.save registry.save
end end
def lease_key def lease_key
......
class AddMissingOnPrimaryToFileRegistry < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_column_with_default :file_registry, :missing_on_primary, :boolean, default: false, allow_null: false
end
def down
remove_column :file_registry, :missing_on_primary
end
end
class AddMissingOnPrimaryToJobArtifactRegistry < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_column_with_default :job_artifact_registry, :missing_on_primary, :boolean, default: false, allow_null: false
end
def down
remove_column :job_artifact_registry, :missing_on_primary
end
end
...@@ -28,6 +28,7 @@ ActiveRecord::Schema.define(version: 20180405074130) do ...@@ -28,6 +28,7 @@ ActiveRecord::Schema.define(version: 20180405074130) do
t.boolean "success", default: false, null: false t.boolean "success", default: false, null: false
t.integer "retry_count" t.integer "retry_count"
t.datetime "retry_at" t.datetime "retry_at"
t.boolean "missing_on_primary", default: false, null: false
end end
add_index "file_registry", ["file_type", "file_id"], name: "index_file_registry_on_file_type_and_file_id", unique: true, using: :btree add_index "file_registry", ["file_type", "file_id"], name: "index_file_registry_on_file_type_and_file_id", unique: true, using: :btree
...@@ -43,6 +44,7 @@ ActiveRecord::Schema.define(version: 20180405074130) do ...@@ -43,6 +44,7 @@ ActiveRecord::Schema.define(version: 20180405074130) do
t.integer "retry_count" t.integer "retry_count"
t.boolean "success" t.boolean "success"
t.string "sha256" t.string "sha256"
t.boolean "missing_on_primary", default: false, null: false
end end
add_index "job_artifact_registry", ["retry_at"], name: "index_job_artifact_registry_on_retry_at", using: :btree add_index "job_artifact_registry", ["retry_at"], name: "index_job_artifact_registry_on_retry_at", using: :btree
......
module Gitlab module Gitlab
module Geo module Geo
# This class is responsible for:
# * Finding an Upload record
# * Requesting and downloading the Upload's file from the primary
# * Returning a detailed Result
#
# TODO: Rearrange things so this class not inherited by JobArtifactDownloader and LfsDownloader
# Maybe rename it so it doesn't seem generic. It only works with Upload records.
class FileDownloader class FileDownloader
attr_reader :object_type, :object_db_id attr_reader :object_type, :object_db_id
...@@ -14,10 +21,33 @@ module Gitlab ...@@ -14,10 +21,33 @@ module Gitlab
# or nil or -1 if a failure occurred. # or nil or -1 if a failure occurred.
def execute def execute
upload = Upload.find_by(id: object_db_id) upload = Upload.find_by(id: object_db_id)
return unless upload.present? return fail_before_transfer unless upload.present?
transfer = ::Gitlab::Geo::FileTransfer.new(object_type.to_sym, upload) transfer = ::Gitlab::Geo::FileTransfer.new(object_type.to_sym, upload)
transfer.download_from_primary Result.from_transfer_result(transfer.download_from_primary)
end
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file, :failed_before_transfer
def self.from_transfer_result(transfer_result)
Result.new(success: transfer_result.success,
primary_missing_file: transfer_result.primary_missing_file,
bytes_downloaded: transfer_result.bytes_downloaded)
end
def initialize(success:, bytes_downloaded:, primary_missing_file: false, failed_before_transfer: false)
@success = success
@bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file
@failed_before_transfer = failed_before_transfer
end
end
private
def fail_before_transfer
Result.new(success: false, bytes_downloaded: 0, failed_before_transfer: true)
end end
end end
end end
......
module Gitlab module Gitlab
module Geo module Geo
# This class is responsible for:
# * Finding a ::Ci::JobArtifact record
# * Requesting and downloading the JobArtifact's file from the primary
# * Returning a detailed Result
#
# TODO: Rearrange things so this class does not inherit FileDownloader
class JobArtifactDownloader < FileDownloader class JobArtifactDownloader < FileDownloader
def execute def execute
job_artifact = ::Ci::JobArtifact.find_by(id: object_db_id) job_artifact = ::Ci::JobArtifact.find_by(id: object_db_id)
return unless job_artifact.present? return fail_before_transfer unless job_artifact.present?
transfer = ::Gitlab::Geo::JobArtifactTransfer.new(job_artifact) transfer = ::Gitlab::Geo::JobArtifactTransfer.new(job_artifact)
transfer.download_from_primary Result.from_transfer_result(transfer.download_from_primary)
end end
end end
end end
......
module Gitlab module Gitlab
module Geo module Geo
# This class is responsible for:
# * Finding a LfsObject record
# * Requesting and downloading the LfsObject's file from the primary
# * Returning a detailed Result
#
# TODO: Rearrange things so this class does not inherit FileDownloader
class LfsDownloader < FileDownloader class LfsDownloader < FileDownloader
def execute def execute
lfs_object = LfsObject.find_by(id: object_db_id) lfs_object = LfsObject.find_by(id: object_db_id)
return unless lfs_object.present? return fail_before_transfer unless lfs_object.present?
transfer = ::Gitlab::Geo::LfsTransfer.new(lfs_object) transfer = ::Gitlab::Geo::LfsTransfer.new(lfs_object)
transfer.download_from_primary Result.from_transfer_result(transfer.download_from_primary)
end end
end end
end end
......
...@@ -3,22 +3,29 @@ require 'spec_helper' ...@@ -3,22 +3,29 @@ require 'spec_helper'
describe Gitlab::Geo::JobArtifactDownloader, :geo do describe Gitlab::Geo::JobArtifactDownloader, :geo do
let(:job_artifact) { create(:ci_job_artifact) } let(:job_artifact) { create(:ci_job_artifact) }
subject do context '#execute' do
described_class.new(:job_artifact, job_artifact.id) context 'with job artifact' do
end it 'returns a FileDownloader::Result object' do
downloader = described_class.new(:job_artifact, job_artifact.id)
result = Gitlab::Geo::Transfer::Result.new(success: true, bytes_downloaded: 1)
context '#download_from_primary' do
it 'with a job artifact' do
allow_any_instance_of(Gitlab::Geo::JobArtifactTransfer) allow_any_instance_of(Gitlab::Geo::JobArtifactTransfer)
.to receive(:download_from_primary).and_return(100) .to receive(:download_from_primary).and_return(result)
expect(subject.execute).to eq(100) expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result)
end
end end
it 'with an unknown job artifact' do context 'with unknown job artifact' do
expect(described_class.new(:job_artifact, 10000)).not_to receive(:download_from_primary) let(:downloader) { described_class.new(:job_artifact, 10000) }
expect(subject.execute).to be_nil it 'returns a FileDownloader::Result object' do
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result)
end
it 'returns a result indicating a failure before a transfer was attempted' do
expect(downloader.execute.failed_before_transfer).to be_truthy
end
end end
end end
end end
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::LfsDownloader do describe Gitlab::Geo::LfsDownloader, :geo do
let(:lfs_object) { create(:lfs_object) } let(:lfs_object) { create(:lfs_object) }
subject do context '#execute' do
described_class.new(:lfs, lfs_object.id) context 'with LFS object' do
end it 'returns a FileDownloader::Result object' do
downloader = described_class.new(:lfs, lfs_object.id)
result = Gitlab::Geo::Transfer::Result.new(success: true, bytes_downloaded: 1)
context '#download_from_primary' do
it 'with LFS object' do
allow_any_instance_of(Gitlab::Geo::LfsTransfer) allow_any_instance_of(Gitlab::Geo::LfsTransfer)
.to receive(:download_from_primary).and_return(100) .to receive(:download_from_primary).and_return(result)
expect(subject.execute).to eq(100) expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result)
end
end end
it 'with unknown LFS object' do context 'with unknown job artifact' do
expect(described_class.new(:lfs, 10000)).not_to receive(:download_from_primary) let(:downloader) { described_class.new(:lfs, 10000) }
expect(subject.execute).to be_nil it 'returns a FileDownloader::Result object' do
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result)
end
it 'returns a result indicating a failure before a transfer was attempted' do
expect(downloader.execute.failed_before_transfer).to be_truthy
end
end end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment