Commit 6e368017 authored by Michael Kozono's avatar Michael Kozono

Secondaries request job artifacts from the primary

parent 402c3d13
module Ci
class JobArtifact < ActiveRecord::Base
prepend EE::Ci::JobArtifact
include AfterCommitQueue
extend Gitlab::Ci::Model
......
module Geo
class JobArtifactRegistryFinder < FileRegistryFinder
def count_job_artifacts
job_artifacts.count
end
def count_synced_job_artifacts
relation =
if selective_sync?
legacy_find_synced_job_artifacts
else
find_synced_job_artifacts_registries
end
relation.count
end
def count_failed_job_artifacts
relation =
if selective_sync?
legacy_find_failed_job_artifacts
else
find_failed_job_artifacts_registries
end
relation.count
end
# Find limited amount of non replicated lfs objects.
#
# You can pass a list with `except_registry_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query
def find_unsynced_job_artifacts(batch_size:, except_registry_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced_job_artifacts(except_registry_ids: except_registry_ids)
else
fdw_find_unsynced_job_artifacts(except_registry_ids: except_registry_ids)
end
relation.limit(batch_size)
end
def job_artifacts
relation =
if selective_sync?
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
Ci::JobArtifact.all
end
relation.with_files_stored_locally
end
private
def find_synced_job_artifacts_registries
Geo::FileRegistry.job_artifacts.synced
end
def find_failed_job_artifacts_registries
Geo::FileRegistry.job_artifacts.failed
end
#
# FDW accessors
#
def fdw_find_unsynced_job_artifacts(except_registry_ids:)
fdw_table = Geo::Fdw::Ci::JobArtifact.table_name
Geo::Fdw::Ci::JobArtifact.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type = 'job_artifact'")
.merge(Geo::Fdw::Ci::JobArtifact.with_files_stored_locally)
.where(file_registry: { id: nil })
.where.not(id: except_registry_ids)
end
#
# Legacy accessors (non FDW)
#
def legacy_find_synced_job_artifacts
legacy_inner_join_registry_ids(
job_artifacts,
find_synced_job_artifacts_registries.pluck(:file_id),
Ci::JobArtifact
)
end
def legacy_find_failed_job_artifacts
legacy_inner_join_registry_ids(
job_artifacts,
find_failed_job_artifacts_registries.pluck(:file_id),
Ci::JobArtifact
)
end
def legacy_find_unsynced_job_artifacts(except_registry_ids:)
registry_ids = legacy_pluck_registry_ids(file_types: :job_artifact, except_registry_ids: except_registry_ids)
legacy_left_outer_join_registry_ids(
job_artifacts,
registry_ids,
Ci::JobArtifact
)
end
end
end
module EE
# CI::JobArtifact EE mixin
#
# This module is intended to encapsulate EE-specific model logic
# and be prepended in the `Ci::JobArtifact` model
module Ci::JobArtifact
extend ActiveSupport::Concern
prepended do
scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::LOCAL_STORE]) }
end
end
end
module Geo
module Fdw
module Ci
class JobArtifact < ::Geo::BaseFdw
self.table_name = Gitlab::Geo.fdw_table('ci_job_artifacts')
scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::LOCAL_STORE]) }
end
end
end
end
......@@ -3,5 +3,6 @@ class Geo::FileRegistry < Geo::BaseRegistry
scope :synced, -> { where(success: true) }
scope :retry_due, -> { where('retry_at is NULL OR retry_at < ?', Time.now) }
scope :lfs_objects, -> { where(file_type: :lfs) }
scope :job_artifacts, -> { where(file_type: :job_artifact) }
scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) }
end
......@@ -99,12 +99,46 @@ module Geo
(Time.now.utc - start_time) >= run_time
end
def interleave(first, second)
if first.length >= second.length
first.zip(second)
else
second.zip(first).map(&:reverse)
end.flatten(1).uniq.compact.take(db_retrieve_batch_size)
def take_batch(*arrays)
interleave(*arrays).uniq.compact.take(db_retrieve_batch_size)
end
# Combines the elements of multiple, arbitrary-length arrays into a single array.
#
# Each array is spread evenly over the resultant array.
# The order of the original arrays is preserved within the resultant array.
# In the case of ties between elements, the element from the first array goes first.
# From https://stackoverflow.com/questions/15628936/ruby-equally-distribute-elements-and-interleave-merge-multiple-arrays/15639147#15639147
#
# For examples, see the specs in file_download_dispatch_worker_spec.rb
def interleave(*arrays)
elements = []
coefficients = []
arrays.each_with_index do |e, index|
elements += e
coefficients += interleave_coefficients(e, index)
end
combined = elements.zip(coefficients)
combined.sort_by { |zipped| zipped[1] }.map { |zipped| zipped[0] }
end
# Assigns a position to each element in order to spread out arrays evenly.
#
# `array_index` is used to resolve ties between arrays of equal length.
#
# Examples:
#
# irb(main):006:0> interleave_coefficients(['a', 'b'], 0)
# => [0.2499998750000625, 0.7499996250001875]
# irb(main):027:0> interleave_coefficients(['a', 'b', 'c'], 0)
# => [0.16666661111112963, 0.4999998333333889, 0.8333330555556481]
# irb(main):007:0> interleave_coefficients(['a', 'b', 'c'], 1)
# => [0.16699994433335189, 0.5003331665556111, 0.8336663887778704]
def interleave_coefficients(array, array_index)
(1..array.size).map do |i|
(i - 0.5 + array_index / 1000.0) / (array.size + 1e-6)
end
end
def update_jobs_in_progress
......
......@@ -26,6 +26,10 @@ module Geo
@lfs_objects_finder ||= LfsObjectRegistryFinder.new(current_node: current_node)
end
def job_artifacts_finder
@job_artifacts_finder ||= JobArtifactRegistryFinder.new(current_node: current_node)
end
# Pools for new resources to be transferred
#
# @return [Array] resources to be transferred
......@@ -43,8 +47,9 @@ module Geo
def find_unsynced_objects(batch_size:)
lfs_object_ids = find_unsynced_lfs_objects_ids(batch_size: batch_size)
attachment_ids = find_unsynced_attachments_ids(batch_size: batch_size)
job_artifact_ids = find_unsynced_job_artifacts_ids(batch_size: batch_size)
interleave(lfs_object_ids, attachment_ids)
take_batch(lfs_object_ids, attachment_ids, job_artifact_ids)
end
def find_unsynced_lfs_objects_ids(batch_size:)
......@@ -59,6 +64,12 @@ module Geo
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end
def find_unsynced_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_registry_ids: scheduled_file_ids(:job_artifact))
.pluck(:id)
.map { |id| [id, :job_artifact] }
end
def find_failed_upload_object_ids(batch_size:)
file_registry_finder.find_failed_file_registries(batch_size: batch_size)
.pluck(:file_id, :file_type)
......
module Gitlab
module Geo
class JobArtifactDownloader < FileDownloader
def execute
job_artifact = ::Ci::JobArtifact.find_by(id: object_db_id)
return unless job_artifact.present?
transfer = ::Gitlab::Geo::JobArtifactTransfer.new(job_artifact)
transfer.download_from_primary
end
end
end
end
module Gitlab
module Geo
class JobArtifactTransfer < Transfer
def initialize(job_artifact)
@file_type = :job_artifact
@file_id = job_artifact.id
@filename = job_artifact.file.path
@request_data = job_artifact_request_data(job_artifact)
end
private
def job_artifact_request_data(job_artifact)
{ id: @file_id }
end
end
end
end
require 'spec_helper'
describe Geo::JobArtifactRegistryFinder, :geo do
include ::EE::GeoHelpers
let(:secondary) { create(:geo_node) }
let(:synced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
let!(:job_artifact_1) { create(:ci_job_artifact, id: 1, project: synced_project) }
let!(:job_artifact_2) { create(:ci_job_artifact, id: 2, project: unsynced_project) }
let!(:job_artifact_3) { create(:ci_job_artifact, id: 3, project: synced_project) }
let!(:job_artifact_4) { create(:ci_job_artifact, id: 4, project: unsynced_project) }
subject { described_class.new(current_node: secondary) }
before do
stub_current_geo_node(secondary)
end
describe '#count_synced_job_artifacts' do
it 'delegates to #find_synced_job_artifacts_registries' do
expect(subject).to receive(:find_synced_job_artifacts_registries).and_call_original
subject.count_synced_job_artifacts
end
it 'counts job artifacts that has been synced' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
expect(subject.count_synced_job_artifacts).to eq 2
end
context 'with selective sync' do
before do
secondary.update_attribute(:namespaces, [synced_group])
end
it 'delegates to #legacy_find_synced_job_artifacts' do
expect(subject).to receive(:legacy_find_synced_job_artifacts).and_call_original
subject.count_synced_job_artifacts
end
it 'counts job artifacts that has been synced' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
expect(subject.count_synced_job_artifacts).to eq 1
end
end
end
describe '#count_failed_job_artifacts' do
it 'delegates to #find_failed_job_artifacts_registries' do
expect(subject).to receive(:find_failed_job_artifacts_registries).and_call_original
subject.count_failed_job_artifacts
end
it 'counts job artifacts that sync has failed' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
expect(subject.count_failed_job_artifacts).to eq 2
end
context 'with selective sync' do
before do
secondary.update_attribute(:namespaces, [synced_group])
end
it 'delegates to #legacy_find_failed_job_artifacts' do
expect(subject).to receive(:legacy_find_failed_job_artifacts).and_call_original
subject.count_failed_job_artifacts
end
it 'counts job artifacts that sync has failed' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
expect(subject.count_failed_job_artifacts).to eq 1
end
it 'does not count job artifacts of unsynced projects' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id, success: false)
expect(subject.count_failed_job_artifacts).to eq 0
end
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :delete do
before do
skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo.fdw?
end
describe '#find_unsynced_job_artifacts' do
it 'delegates to #fdw_find_unsynced_job_artifacts' do
expect(subject).to receive(:fdw_find_unsynced_job_artifacts).and_call_original
subject.find_unsynced_job_artifacts(batch_size: 10)
end
it 'returns job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10)
expect(job_artifacts.map(&:id)).to match_array([job_artifact_2.id, job_artifact_4.id])
end
it 'excludes job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10, except_registry_ids: [job_artifact_2.id])
expect(job_artifacts.map(&:id)).to match_array([job_artifact_4.id])
end
end
end
context 'Legacy' do
before do
allow(Gitlab::Geo).to receive(:fdw?).and_return(false)
end
describe '#find_unsynced_job_artifacts' do
it 'delegates to #legacy_find_unsynced_job_artifacts' do
expect(subject).to receive(:legacy_find_unsynced_job_artifacts).and_call_original
subject.find_unsynced_job_artifacts(batch_size: 10)
end
it 'returns job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10)
expect(job_artifacts).to match_array([job_artifact_2, job_artifact_4])
end
it 'excludes job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10, except_registry_ids: [job_artifact_2.id])
expect(job_artifacts).to match_array([job_artifact_4])
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::JobArtifactDownloader, :geo do
let(:job_artifact) { create(:ci_job_artifact) }
subject do
described_class.new(:job_artifact, job_artifact.id)
end
context '#download_from_primary' do
it 'with a job artifact' do
allow_any_instance_of(Gitlab::Geo::JobArtifactTransfer)
.to receive(:download_from_primary).and_return(100)
expect(subject.execute).to eq(100)
end
it 'with an unknown job artifact' do
expect(described_class.new(:job_artifact, 10000)).not_to receive(:download_from_primary)
expect(subject.execute).to be_nil
end
end
end
require 'spec_helper'
describe Gitlab::Geo::JobArtifactTransfer, :geo do
set(:job_artifact) { create(:ci_job_artifact, :archive) }
context '#initialize' do
it 'sets file_type to :ci_trace' do
expect(described_class.new(job_artifact).file_type).to eq(:job_artifact)
end
it 'sets file_id to the job artifact ID' do
expect(described_class.new(job_artifact).file_id).to eq(job_artifact.id)
end
it 'sets filename to job artifact default_path' do
expect(described_class.new(job_artifact).filename).to eq(job_artifact.file.path)
expect(job_artifact.file.path).to be_present
end
it 'sets request_data with file_id and file_type' do
expect(described_class.new(job_artifact).request_data).to eq(id: job_artifact.id)
end
end
end
......@@ -187,6 +187,32 @@ describe Geo::FileDownloadService do
end
end
context 'job artifacts' do
let(:job_artifact) { create(:ci_job_artifact) }
subject { described_class.new(:job_artifact, job_artifact.id) }
it 'downloads a job artifact' do
stub_transfer(Gitlab::Geo::JobArtifactTransfer, 100)
expect { subject.execute }.to change { Geo::FileRegistry.synced.count }.by(1)
end
it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::JobArtifactTransfer, -1)
expect { subject.execute }.to change { Geo::FileRegistry.failed.count }.by(1)
end
it 'logs a message' do
stub_transfer(Gitlab::Geo::JobArtifactTransfer, 100)
expect(Gitlab::Geo::Logger).to receive(:info).with(hash_including(:message, :download_time_s, success: true, bytes_downloaded: 100)).and_call_original
subject.execute
end
end
context 'bad object type' do
it 'raises an error' do
expect { described_class.new(:bad, 1).execute }.to raise_error(NameError)
......
......@@ -63,6 +63,48 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
end
context 'with job artifacts' do
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
artifact = create(:ci_job_artifact)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with(:job_artifact, artifact.id).once.and_return(spy)
subject.perform
end
it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do
artifact = create(:ci_job_artifact)
Geo::FileRegistry.create!(file_type: :job_artifact, file_id: artifact.id, bytes: 0, success: false)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', artifact.id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do
artifact = create(:ci_job_artifact)
Geo::FileRegistry.create!(file_type: :job_artifact, file_id: artifact.id, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do
artifact = create(:ci_job_artifact)
Geo::FileRegistry.create!(file_type: :job_artifact, file_id: artifact.id, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
end
# Test the case where we have:
#
# 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
......@@ -76,7 +118,8 @@ describe Geo::FileDownloadDispatchWorker, :geo do
create_list(:lfs_object, 2, :with_file)
create_list(:user, 2, avatar: avatar)
create_list(:note, 2, :with_attachment)
create_list(:upload, 2, :personal_snippet)
create_list(:upload, 1, :personal_snippet)
create_list(:ci_job_artifact, 1)
create(:appearance, logo: avatar, header_logo: avatar)
expect(Geo::FileDownloadWorker).to receive(:perform_async).exactly(10).times.and_call_original
......@@ -132,8 +175,8 @@ describe Geo::FileDownloadDispatchWorker, :geo do
context 'when node has namespace restrictions' do
let(:synced_group) { create(:group) }
let!(:project_in_synced_group) { create(:project, group: synced_group) }
let!(:unsynced_project) { create(:project) }
let(:project_in_synced_group) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
before do
allow(ProjectCacheWorker).to receive(:perform_async).and_return(true)
......@@ -142,11 +185,21 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
it 'does not perform Geo::FileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do
lfs_objec_in_synced_group = create(:lfs_objects_project, project: project_in_synced_group)
lfs_object_in_synced_group = create(:lfs_objects_project, project: project_in_synced_group)
create(:lfs_objects_project, project: unsynced_project)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with(:lfs, lfs_objec_in_synced_group.lfs_object_id).once.and_return(spy)
.with(:lfs, lfs_object_in_synced_group.lfs_object_id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for job artifact that does not belong to selected namespaces to replicate' do
create(:ci_job_artifact, project: unsynced_project)
job_artifact_in_synced_group = create(:ci_job_artifact, project: project_in_synced_group)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with(:job_artifact, job_artifact_in_synced_group.id).once.and_return(spy)
subject.perform
end
......@@ -183,4 +236,56 @@ describe Geo::FileDownloadDispatchWorker, :geo do
it_behaves_like '#perform', false
end
describe '#take_batch' do
it 'returns a batch of jobs' do
a = [[2, :lfs], [3, :lfs]]
b = []
c = [[3, :job_artifact], [8, :job_artifact], [9, :job_artifact]]
expect(subject).to receive(:db_retrieve_batch_size).and_return(4)
expect(subject.send(:take_batch, a, b, c)).to eq([
[3, :job_artifact],
[2, :lfs],
[8, :job_artifact],
[3, :lfs]
])
end
end
describe '#interleave' do
# Notice ties are resolved by taking the "first" tied element
it 'interleaves 2 arrays' do
a = %w{1 2 3}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3 C})
end
# Notice there are no ties in this call
it 'interleaves 2 arrays with a longer second array' do
a = %w{1 2}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{A 1 B 2 C})
end
it 'interleaves 2 arrays with a longer first array' do
a = %w{1 2 3}
b = %w{A B}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3})
end
it 'interleaves 3 arrays' do
a = %w{1 2 3}
b = %w{A B C}
c = %w{i ii iii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{1 A i 2 B ii 3 C iii})
end
it 'interleaves 3 arrays of unequal length' do
a = %w{1 2}
b = %w{A}
c = %w{i ii iii iiii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{i 1 ii A iii 2 iiii})
end
end
end
require 'spec_helper'
describe Geo::FileDownloadWorker, :geo do
describe '#perform' do
it 'instantiates and executes FileDownloadService, and converts object_type to a symbol' do
service = double(:service)
expect(service).to receive(:execute)
expect(Geo::FileDownloadService).to receive(:new).with(:job_artifact, 1).and_return(service)
described_class.new.perform('job_artifact', 1)
end
end
end
......@@ -12,6 +12,10 @@ FactoryBot.define do
file_type :lfs
end
trait :job_artifact do
file_type :job_artifact
end
trait :with_file do
after(:build, :stub) do |registry, _|
file =
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment