Commit b84ab70a authored by Stan Hu's avatar Stan Hu

Merge branch 'mk-add-geo-support-for-artifacts' into 'master'

Add Geo support for CI job artifacts

Closes #2388

See merge request gitlab-org/gitlab-ee!3935
parents 2f566858 8163d54c
module Ci
class JobArtifact < ActiveRecord::Base
prepend EE::Ci::JobArtifact
include AfterCommitQueue
extend Gitlab::Ci::Model
......
---
title: Add Geo support for CI job artifacts
merge_request: 3935
author:
type: added
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class CreateGeoJobArtifactDeletedEvent < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
create_table :geo_job_artifact_deleted_events, id: :bigserial do |t|
t.references :job_artifact, references: :ci_job_artifacts, index: true, foreign_key: false, null: false
t.string :file_path, null: false
end
add_column :geo_event_log, :job_artifact_deleted_event_id, :integer, limit: 8
end
end
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class AddGeoJobArtifactDeletedEventsForeignKey < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_concurrent_foreign_key :geo_event_log, :geo_job_artifact_deleted_events,
column: :job_artifact_deleted_event_id, on_delete: :cascade
end
def down
remove_foreign_key :geo_event_log, column: :job_artifact_deleted_event_id
end
end
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class AddJobArtifactCountsToGeoNodeStatuses < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
add_column :geo_node_statuses, :job_artifacts_count, :integer
add_column :geo_node_statuses, :job_artifacts_synced_count, :integer
add_column :geo_node_statuses, :job_artifacts_failed_count, :integer
end
end
......@@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20171230123729) do
ActiveRecord::Schema.define(version: 20180105233807) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......@@ -912,6 +912,7 @@ ActiveRecord::Schema.define(version: 20171230123729) do
t.integer "hashed_storage_migrated_event_id", limit: 8
t.integer "lfs_object_deleted_event_id", limit: 8
t.integer "hashed_storage_attachments_event_id", limit: 8
t.integer "job_artifact_deleted_event_id", limit: 8
end
add_index "geo_event_log", ["repositories_changed_event_id"], name: "index_geo_event_log_on_repositories_changed_event_id", using: :btree
......@@ -942,6 +943,13 @@ ActiveRecord::Schema.define(version: 20171230123729) do
add_index "geo_hashed_storage_migrated_events", ["project_id"], name: "index_geo_hashed_storage_migrated_events_on_project_id", using: :btree
create_table "geo_job_artifact_deleted_events", id: :bigserial, force: :cascade do |t|
t.integer "job_artifact_id", null: false
t.string "file_path", null: false
end
add_index "geo_job_artifact_deleted_events", ["job_artifact_id"], name: "index_geo_job_artifact_deleted_events_on_job_artifact_id", using: :btree
create_table "geo_lfs_object_deleted_events", id: :bigserial, force: :cascade do |t|
t.integer "lfs_object_id", null: false
t.string "oid", null: false
......@@ -986,6 +994,9 @@ ActiveRecord::Schema.define(version: 20171230123729) do
t.integer "wikis_count"
t.integer "wikis_synced_count"
t.integer "wikis_failed_count"
t.integer "job_artifacts_count"
t.integer "job_artifacts_synced_count"
t.integer "job_artifacts_failed_count"
end
add_index "geo_node_statuses", ["geo_node_id"], name: "index_geo_node_statuses_on_geo_node_id", unique: true, using: :btree
......@@ -2489,6 +2500,7 @@ ActiveRecord::Schema.define(version: 20171230123729) do
add_foreign_key "gcp_clusters", "services", on_delete: :nullify
add_foreign_key "gcp_clusters", "users", on_delete: :nullify
add_foreign_key "geo_event_log", "geo_hashed_storage_migrated_events", column: "hashed_storage_migrated_event_id", name: "fk_27548c6db3", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_job_artifact_deleted_events", column: "job_artifact_deleted_event_id", name: "fk_176d3fbb5d", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_lfs_object_deleted_events", column: "lfs_object_deleted_event_id", name: "fk_d5af95fcd9", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_repositories_changed_events", column: "repositories_changed_event_id", name: "fk_4a99ebfd60", on_delete: :cascade
add_foreign_key "geo_event_log", "geo_repository_created_events", column: "repository_created_event_id", name: "fk_9b9afb1916", on_delete: :cascade
......
......@@ -94,6 +94,10 @@ Example response:
"lfs_objects_synced_count": 0,
"lfs_objects_failed_count": 0,
"lfs_objects_synced_in_percentage": "0.00%",
"job_artifacts_count": 2,
"job_artifacts_synced_count": 1,
"job_artifacts_failed_count": 1,
"job_artifacts_synced_in_percentage": "50.00%",
"repositories_count": 41,
"repositories_failed_count": 1,
"repositories_synced_count": 40,
......@@ -141,6 +145,10 @@ Example response:
"lfs_objects_synced_count": 0,
"lfs_objects_failed_count": 0,
"lfs_objects_synced_in_percentage": "0.00%",
"job_artifacts_count": 2,
"job_artifacts_synced_count": 1,
"job_artifacts_failed_count": 1,
"job_artifacts_synced_in_percentage": "50.00%",
"repositories_count": 41,
"repositories_failed_count": 1,
"repositories_synced_count": 40,
......
......@@ -52,12 +52,17 @@
itemValueType: VALUE_TYPE.GRAPH,
},
{
itemTitle: s__('GeoNodes|LFS objects:'),
itemTitle: s__('GeoNodes|Local LFS objects:'),
itemValue: this.nodeDetails.lfs,
itemValueType: VALUE_TYPE.GRAPH,
},
{
itemTitle: s__('GeoNodes|Attachments:'),
itemTitle: s__('GeoNodes|Local job artifacts:'),
itemValue: this.nodeDetails.jobArtifacts,
itemValueType: VALUE_TYPE.GRAPH,
},
{
itemTitle: s__('GeoNodes|Local Attachments:'),
itemValue: this.nodeDetails.attachments,
itemValueType: VALUE_TYPE.GRAPH,
},
......
......@@ -60,8 +60,13 @@ export default class GeoNodesStore {
},
lfs: {
totalCount: rawNodeDetails.lfs_objects_count,
successCount: rawNodeDetails.lfs_objects_failed_count,
failureCount: rawNodeDetails.lfs_objects_synced_count,
successCount: rawNodeDetails.lfs_objects_synced_count,
failureCount: rawNodeDetails.lfs_objects_failed_count,
},
jobArtifacts: {
totalCount: rawNodeDetails.job_artifacts_count,
successCount: rawNodeDetails.job_artifacts_synced_count,
failureCount: rawNodeDetails.job_artifacts_failed_count,
},
attachments: {
totalCount: rawNodeDetails.attachments_count,
......
module Geo
class JobArtifactRegistryFinder < FileRegistryFinder
def count_job_artifacts
job_artifacts.count
end
def count_synced_job_artifacts
relation =
if selective_sync?
legacy_find_synced_job_artifacts
else
find_synced_job_artifacts_registries
end
relation.count
end
def count_failed_job_artifacts
relation =
if selective_sync?
legacy_find_failed_job_artifacts
else
find_failed_job_artifacts_registries
end
relation.count
end
# Find limited amount of non replicated lfs objects.
#
# You can pass a list with `except_registry_ids:` so you can exclude items you
# already scheduled but haven't finished and persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_registry_ids ids that will be ignored from the query
def find_unsynced_job_artifacts(batch_size:, except_registry_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced_job_artifacts(except_registry_ids: except_registry_ids)
else
fdw_find_unsynced_job_artifacts(except_registry_ids: except_registry_ids)
end
relation.limit(batch_size)
end
def job_artifacts
relation =
if selective_sync?
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
Ci::JobArtifact.all
end
relation.with_files_stored_locally
end
private
def find_synced_job_artifacts_registries
Geo::FileRegistry.job_artifacts.synced
end
def find_failed_job_artifacts_registries
Geo::FileRegistry.job_artifacts.failed
end
#
# FDW accessors
#
def fdw_find_unsynced_job_artifacts(except_registry_ids:)
fdw_table = Geo::Fdw::Ci::JobArtifact.table_name
Geo::Fdw::Ci::JobArtifact.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type = 'job_artifact'")
.merge(Geo::Fdw::Ci::JobArtifact.with_files_stored_locally)
.where(file_registry: { id: nil })
.where.not(id: except_registry_ids)
end
#
# Legacy accessors (non FDW)
#
def legacy_find_synced_job_artifacts
legacy_inner_join_registry_ids(
job_artifacts,
find_synced_job_artifacts_registries.pluck(:file_id),
Ci::JobArtifact
)
end
def legacy_find_failed_job_artifacts
legacy_inner_join_registry_ids(
job_artifacts,
find_failed_job_artifacts_registries.pluck(:file_id),
Ci::JobArtifact
)
end
def legacy_find_unsynced_job_artifacts(except_registry_ids:)
registry_ids = legacy_pluck_registry_ids(file_types: :job_artifact, except_registry_ids: except_registry_ids)
legacy_left_outer_join_registry_ids(
job_artifacts,
registry_ids,
Ci::JobArtifact
)
end
end
end
module EE
# CI::JobArtifact EE mixin
#
# This module is intended to encapsulate EE-specific model logic
# and be prepended in the `Ci::JobArtifact` model
module Ci::JobArtifact
extend ActiveSupport::Concern
prepended do
after_destroy :log_geo_event
scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::LOCAL_STORE]) }
end
def local_store?
[nil, JobArtifactUploader::LOCAL_STORE].include?(self.file_store)
end
private
def log_geo_event
::Geo::JobArtifactDeletedEventStore.new(self).create
end
end
end
......@@ -30,6 +30,10 @@ module Geo
class_name: 'Geo::LfsObjectDeletedEvent',
foreign_key: :lfs_object_deleted_event_id
belongs_to :job_artifact_deleted_event,
class_name: 'Geo::JobArtifactDeletedEvent',
foreign_key: :job_artifact_deleted_event_id
belongs_to :hashed_storage_attachments_event,
class_name: 'Geo::HashedStorageAttachmentsEvent',
foreign_key: :hashed_storage_attachments_event_id
......@@ -46,6 +50,7 @@ module Geo
repositories_changed_event ||
hashed_storage_migrated_event ||
lfs_object_deleted_event ||
job_artifact_deleted_event ||
hashed_storage_attachments_event
end
......
module Geo
module Fdw
module Ci
class JobArtifact < ::Geo::BaseFdw
self.table_name = Gitlab::Geo.fdw_table('ci_job_artifacts')
scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::LOCAL_STORE]) }
end
end
end
end
......@@ -3,5 +3,6 @@ class Geo::FileRegistry < Geo::BaseRegistry
scope :synced, -> { where(success: true) }
scope :retry_due, -> { where('retry_at is NULL OR retry_at < ?', Time.now) }
scope :lfs_objects, -> { where(file_type: :lfs) }
scope :job_artifacts, -> { where(file_type: :job_artifact) }
scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) }
end
module Geo
class JobArtifactDeletedEvent < ActiveRecord::Base
include Geo::Model
belongs_to :job_artifact, class_name: 'Ci::JobArtifact'
validates :job_artifact, :file_path, presence: true
end
end
......@@ -15,12 +15,15 @@ class GeoNodeStatus < ActiveRecord::Base
wikis_count: 'Total number of wikis available on primary',
wikis_synced_count: 'Number of wikis synced on secondary',
wikis_failed_count: 'Number of wikis failed to sync on secondary',
lfs_objects_count: 'Total number of LFS objects available on primary',
lfs_objects_synced_count: 'Number of LFS objects synced on secondary',
lfs_objects_failed_count: 'Number of LFS objects failed to sync on secondary',
attachments_count: 'Total number of file attachments available on primary',
attachments_synced_count: 'Number of attachments synced on secondary',
attachments_failed_count: 'Number of attachments failed to sync on secondary',
lfs_objects_count: 'Total number of local LFS objects available on primary',
lfs_objects_synced_count: 'Number of local LFS objects synced on secondary',
lfs_objects_failed_count: 'Number of local LFS objects failed to sync on secondary',
job_artifacts_count: 'Total number of local job artifacts available on primary',
job_artifacts_synced_count: 'Number of local job artifacts synced on secondary',
job_artifacts_failed_count: 'Number of local job artifacts failed to sync on secondary',
attachments_count: 'Total number of local file attachments available on primary',
attachments_synced_count: 'Number of local file attachments synced on secondary',
attachments_failed_count: 'Number of local file attachments failed to sync on secondary',
replication_slots_count: 'Total number of replication slots on the primary',
replication_slots_used_count: 'Number of replication slots in use on the primary',
replication_slots_max_retained_wal_bytes: 'Maximum number of bytes retained in the WAL on the primary',
......@@ -73,16 +76,26 @@ class GeoNodeStatus < ActiveRecord::Base
self.repositories_count = projects_finder.count_repositories
self.wikis_count = projects_finder.count_wikis
self.lfs_objects_count = lfs_objects_finder.count_lfs_objects
self.job_artifacts_count = job_artifacts_finder.count_job_artifacts
self.attachments_count = attachments_finder.count_attachments
self.last_successful_status_check_at = Time.now
self.storage_shards = StorageShard.all
load_primary_data
load_secondary_data
self
end
def load_primary_data
if Gitlab::Geo.primary?
self.replication_slots_count = geo_node.replication_slots_count
self.replication_slots_used_count = geo_node.replication_slots_used_count
self.replication_slots_max_retained_wal_bytes = geo_node.replication_slots_max_retained_wal_bytes
end
end
def load_secondary_data
if Gitlab::Geo.secondary?
self.db_replication_lag_seconds = Gitlab::Geo::HealthCheck.db_replication_lag_seconds
self.cursor_last_event_id = Geo::EventLogState.last_processed&.event_id
......@@ -93,11 +106,11 @@ class GeoNodeStatus < ActiveRecord::Base
self.wikis_failed_count = projects_finder.count_failed_wikis
self.lfs_objects_synced_count = lfs_objects_finder.count_synced_lfs_objects
self.lfs_objects_failed_count = lfs_objects_finder.count_failed_lfs_objects
self.job_artifacts_synced_count = job_artifacts_finder.count_synced_job_artifacts
self.job_artifacts_failed_count = job_artifacts_finder.count_failed_job_artifacts
self.attachments_synced_count = attachments_finder.count_synced_attachments
self.attachments_failed_count = attachments_finder.count_failed_attachments
end
self
end
alias_attribute :health, :status_message
......@@ -146,6 +159,10 @@ class GeoNodeStatus < ActiveRecord::Base
calc_percentage(lfs_objects_count, lfs_objects_synced_count)
end
def job_artifacts_synced_in_percentage
calc_percentage(job_artifacts_count, job_artifacts_synced_count)
end
def attachments_synced_in_percentage
calc_percentage(attachments_count, attachments_synced_count)
end
......@@ -196,6 +213,10 @@ class GeoNodeStatus < ActiveRecord::Base
@lfs_objects_finder ||= Geo::LfsObjectRegistryFinder.new(current_node: geo_node)
end
def job_artifacts_finder
@job_artifacts_finder ||= Geo::JobArtifactRegistryFinder.new(current_node: geo_node)
end
def projects_finder
@projects_finder ||= Geo::ProjectRegistryFinder.new(current_node: geo_node)
end
......
......@@ -26,6 +26,13 @@ class GeoNodeStatusEntity < Grape::Entity
number_to_percentage(node.lfs_objects_synced_in_percentage, precision: 2)
end
expose :job_artifacts_count
expose :job_artifacts_synced_count
expose :job_artifacts_failed_count
expose :job_artifacts_synced_in_percentage do |node|
number_to_percentage(node.job_artifacts_synced_in_percentage, precision: 2)
end
expose :repositories_count
expose :repositories_failed_count
expose :repositories_synced_count
......
module Geo
class JobArtifactDeletedEventStore < EventStore
self.event_type = :job_artifact_deleted_event
attr_reader :job_artifact
def initialize(job_artifact)
@job_artifact = job_artifact
end
def create
return unless job_artifact.local_store?
super
end
private
def build_event
Geo::JobArtifactDeletedEvent.new(
job_artifact: job_artifact,
file_path: relative_file_path
)
end
def local_store_path
Pathname.new(JobArtifactUploader.local_store_path)
end
def relative_file_path
return unless job_artifact.file.present?
Pathname.new(job_artifact.file.path).relative_path_from(local_store_path)
end
# This is called by ProjectLogHelpers to build json log with context info
#
# @see ::Gitlab::Geo::ProjectLogHelpers
def base_log_data(message)
{
class: self.class.name,
job_artifact_id: job_artifact.id,
file_path: job_artifact.file.path,
message: message
}
end
end
end
......@@ -99,12 +99,46 @@ module Geo
(Time.now.utc - start_time) >= run_time
end
def interleave(first, second)
if first.length >= second.length
first.zip(second)
else
second.zip(first).map(&:reverse)
end.flatten(1).uniq.compact.take(db_retrieve_batch_size)
def take_batch(*arrays)
interleave(*arrays).uniq.compact.take(db_retrieve_batch_size)
end
# Combines the elements of multiple, arbitrary-length arrays into a single array.
#
# Each array is spread evenly over the resultant array.
# The order of the original arrays is preserved within the resultant array.
# In the case of ties between elements, the element from the first array goes first.
# From https://stackoverflow.com/questions/15628936/ruby-equally-distribute-elements-and-interleave-merge-multiple-arrays/15639147#15639147
#
# For examples, see the specs in file_download_dispatch_worker_spec.rb
def interleave(*arrays)
elements = []
coefficients = []
arrays.each_with_index do |e, index|
elements += e
coefficients += interleave_coefficients(e, index)
end
combined = elements.zip(coefficients)
combined.sort_by { |zipped| zipped[1] }.map { |zipped| zipped[0] }
end
# Assigns a position to each element in order to spread out arrays evenly.
#
# `array_index` is used to resolve ties between arrays of equal length.
#
# Examples:
#
# irb(main):006:0> interleave_coefficients(['a', 'b'], 0)
# => [0.2499998750000625, 0.7499996250001875]
# irb(main):027:0> interleave_coefficients(['a', 'b', 'c'], 0)
# => [0.16666661111112963, 0.4999998333333889, 0.8333330555556481]
# irb(main):007:0> interleave_coefficients(['a', 'b', 'c'], 1)
# => [0.16699994433335189, 0.5003331665556111, 0.8336663887778704]
def interleave_coefficients(array, array_index)
(1..array.size).map do |i|
(i - 0.5 + array_index / 1000.0) / (array.size + 1e-6)
end
end
def update_jobs_in_progress
......
......@@ -26,6 +26,10 @@ module Geo
@lfs_objects_finder ||= LfsObjectRegistryFinder.new(current_node: current_node)
end
def job_artifacts_finder
@job_artifacts_finder ||= JobArtifactRegistryFinder.new(current_node: current_node)
end
# Pools for new resources to be transferred
#
# @return [Array] resources to be transferred
......@@ -43,8 +47,9 @@ module Geo
def find_unsynced_objects(batch_size:)
lfs_object_ids = find_unsynced_lfs_objects_ids(batch_size: batch_size)
attachment_ids = find_unsynced_attachments_ids(batch_size: batch_size)
job_artifact_ids = find_unsynced_job_artifacts_ids(batch_size: batch_size)
interleave(lfs_object_ids, attachment_ids)
take_batch(lfs_object_ids, attachment_ids, job_artifact_ids)
end
def find_unsynced_lfs_objects_ids(batch_size:)
......@@ -59,6 +64,12 @@ module Geo
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end
def find_unsynced_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_registry_ids: scheduled_file_ids(:job_artifact))
.pluck(:id)
.map { |id| [id, :job_artifact] }
end
def find_failed_upload_object_ids(batch_size:)
file_registry_finder.find_failed_file_registries(batch_size: batch_size)
.pluck(:file_id, :file_type)
......
module Gitlab
module Geo
class JobArtifactDownloader < FileDownloader
def execute
job_artifact = ::Ci::JobArtifact.find_by(id: object_db_id)
return unless job_artifact.present?
transfer = ::Gitlab::Geo::JobArtifactTransfer.new(job_artifact)
transfer.download_from_primary
end
end
end
end
module Gitlab
module Geo
class JobArtifactTransfer < Transfer
def initialize(job_artifact)
@file_type = :job_artifact
@file_id = job_artifact.id
@filename = job_artifact.file.path
@request_data = job_artifact_request_data(job_artifact)
end
private
def job_artifact_request_data(job_artifact)
{ id: @file_id }
end
end
end
end
module Gitlab
module Geo
class JobArtifactUploader < FileUploader
def execute
job_artifact = ::Ci::JobArtifact.find_by(id: object_db_id)
unless job_artifact.present?
return error('Job artifact not found')
end
unless job_artifact.file.present? && job_artifact.file.exists?
return error('Job artifact does not have a file')
end
success(job_artifact.file)
end
end
end
end
......@@ -216,12 +216,39 @@ module Gitlab
::Geo::FileRegistry.lfs_objects.where(file_id: event.lfs_object_id).delete_all
end
def handle_job_artifact_deleted_event(event, created_at)
file_registry_job_artifacts = ::Geo::FileRegistry.job_artifacts.where(file_id: event.job_artifact_id)
return unless file_registry_job_artifacts.any? # avoid race condition
file_path = File.join(JobArtifactUploader.local_store_path, event.file_path)
if File.file?(file_path)
deleted = delete_file(file_path) # delete synchronously to ensure consistency
return unless deleted # do not delete file from registry if deletion failed
end
logger.event_info(
created_at,
message: 'Deleted job artifact',
file_id: event.job_artifact_id,
file_path: file_path)
file_registry_job_artifacts.delete_all
end
def find_or_initialize_registry(project_id, attrs)
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
registry.assign_attributes(attrs)
registry
end
def delete_file(path)
File.delete(path)
rescue => ex
logger.error("Failed to remove file", exception: ex.class.name, details: ex.message, filename: path)
false
end
# Sleeps for the expired TTL that remains on the lease plus some random seconds.
#
# This allows multiple GeoLogCursors to randomly process a batch of events,
......
require 'spec_helper'
describe Geo::JobArtifactRegistryFinder, :geo do
include ::EE::GeoHelpers
let(:secondary) { create(:geo_node) }
let(:synced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
let!(:job_artifact_1) { create(:ci_job_artifact, id: 1, project: synced_project) }
let!(:job_artifact_2) { create(:ci_job_artifact, id: 2, project: unsynced_project) }
let!(:job_artifact_3) { create(:ci_job_artifact, id: 3, project: synced_project) }
let!(:job_artifact_4) { create(:ci_job_artifact, id: 4, project: unsynced_project) }
subject { described_class.new(current_node: secondary) }
before do
stub_current_geo_node(secondary)
end
describe '#count_synced_job_artifacts' do
it 'delegates to #find_synced_job_artifacts_registries' do
expect(subject).to receive(:find_synced_job_artifacts_registries).and_call_original
subject.count_synced_job_artifacts
end
it 'counts job artifacts that has been synced' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
expect(subject.count_synced_job_artifacts).to eq 2
end
context 'with selective sync' do
before do
secondary.update_attribute(:namespaces, [synced_group])
end
it 'delegates to #legacy_find_synced_job_artifacts' do
expect(subject).to receive(:legacy_find_synced_job_artifacts).and_call_original
subject.count_synced_job_artifacts
end
it 'counts job artifacts that has been synced' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
expect(subject.count_synced_job_artifacts).to eq 1
end
end
end
describe '#count_failed_job_artifacts' do
it 'delegates to #find_failed_job_artifacts_registries' do
expect(subject).to receive(:find_failed_job_artifacts_registries).and_call_original
subject.count_failed_job_artifacts
end
it 'counts job artifacts that sync has failed' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
expect(subject.count_failed_job_artifacts).to eq 2
end
context 'with selective sync' do
before do
secondary.update_attribute(:namespaces, [synced_group])
end
it 'delegates to #legacy_find_failed_job_artifacts' do
expect(subject).to receive(:legacy_find_failed_job_artifacts).and_call_original
subject.count_failed_job_artifacts
end
it 'counts job artifacts that sync has failed' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
expect(subject.count_failed_job_artifacts).to eq 1
end
it 'does not count job artifacts of unsynced projects' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id, success: false)
expect(subject.count_failed_job_artifacts).to eq 0
end
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :delete do
before do
skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo.fdw?
end
describe '#find_unsynced_job_artifacts' do
it 'delegates to #fdw_find_unsynced_job_artifacts' do
expect(subject).to receive(:fdw_find_unsynced_job_artifacts).and_call_original
subject.find_unsynced_job_artifacts(batch_size: 10)
end
it 'returns job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10)
expect(job_artifacts.map(&:id)).to match_array([job_artifact_2.id, job_artifact_4.id])
end
it 'excludes job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10, except_registry_ids: [job_artifact_2.id])
expect(job_artifacts.map(&:id)).to match_array([job_artifact_4.id])
end
end
end
context 'Legacy' do
before do
allow(Gitlab::Geo).to receive(:fdw?).and_return(false)
end
describe '#find_unsynced_job_artifacts' do
it 'delegates to #legacy_find_unsynced_job_artifacts' do
expect(subject).to receive(:legacy_find_unsynced_job_artifacts).and_call_original
subject.find_unsynced_job_artifacts(batch_size: 10)
end
it 'returns job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10)
expect(job_artifacts).to match_array([job_artifact_2, job_artifact_4])
end
it 'excludes job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10, except_registry_ids: [job_artifact_2.id])
expect(job_artifacts).to match_array([job_artifact_4])
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::JobArtifactDownloader, :geo do
let(:job_artifact) { create(:ci_job_artifact) }
subject do
described_class.new(:job_artifact, job_artifact.id)
end
context '#download_from_primary' do
it 'with a job artifact' do
allow_any_instance_of(Gitlab::Geo::JobArtifactTransfer)
.to receive(:download_from_primary).and_return(100)
expect(subject.execute).to eq(100)
end
it 'with an unknown job artifact' do
expect(described_class.new(:job_artifact, 10000)).not_to receive(:download_from_primary)
expect(subject.execute).to be_nil
end
end
end
require 'spec_helper'
describe Gitlab::Geo::JobArtifactTransfer, :geo do
set(:job_artifact) { create(:ci_job_artifact, :archive) }
context '#initialize' do
it 'sets file_type to :ci_trace' do
expect(described_class.new(job_artifact).file_type).to eq(:job_artifact)
end
it 'sets file_id to the job artifact ID' do
expect(described_class.new(job_artifact).file_id).to eq(job_artifact.id)
end
it 'sets filename to job artifact default_path' do
expect(described_class.new(job_artifact).filename).to eq(job_artifact.file.path)
expect(job_artifact.file.path).to be_present
end
it 'sets request_data with file_id and file_type' do
expect(described_class.new(job_artifact).request_data).to eq(id: job_artifact.id)
end
end
end
require 'spec_helper'
describe Gitlab::Geo::JobArtifactUploader, :geo do
context '#execute' do
subject { described_class.new(job_artifact.id, {}).execute }
context 'when the job artifact exists' do
before do
expect(::Ci::JobArtifact).to receive(:find_by).with(id: job_artifact.id).and_return(job_artifact)
end
context 'when the job artifact is an archive file_type and has a file' do
let(:job_artifact) { create(:ci_job_artifact, :archive) }
it 'returns the file in a success hash' do
expect(subject).to eq(code: :ok, message: 'Success', file: job_artifact.file)
end
end
context 'when the job artifact is an metadata file_type and has a file' do
let(:job_artifact) { create(:ci_job_artifact, :metadata) }
it 'returns the file in a success hash' do
expect(subject).to eq(code: :ok, message: 'Success', file: job_artifact.file)
end
end
context 'when the job artifact does not have a file' do
let(:job_artifact) { create(:ci_job_artifact) }
it 'returns an error hash' do
expect(subject).to eq(code: :not_found, message: "Job artifact does not have a file")
end
end
end
context 'when the job artifact does not exist' do
let(:job_artifact) { double(id: 10000) }
it 'returns an error hash' do
expect(subject).to eq(code: :not_found, message: "Job artifact not found")
end
end
end
end
......@@ -300,5 +300,110 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
expect { daemon.run_once! }.to change(Geo::FileRegistry.lfs_objects, :count).by(-1)
end
end
context 'when replaying a job artifact event' do
let(:event_log) { create(:geo_event_log, :job_artifact_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:job_artifact_deleted_event) { event_log.job_artifact_deleted_event }
let(:job_artifact) { job_artifact_deleted_event.job_artifact }
context 'with a tracking database entry' do
before do
create(:geo_file_registry, :job_artifact, file_id: job_artifact.id)
end
context 'with a file' do
context 'when the delete succeeds' do
it 'removes the tracking database entry' do
expect { daemon.run_once! }.to change(Geo::FileRegistry.job_artifacts, :count).by(-1)
end
it 'deletes the file' do
expect { daemon.run_once! }.to change { File.exist?(job_artifact.file.path) }.from(true).to(false)
end
end
context 'when the delete fails' do
before do
expect(daemon).to receive(:delete_file).and_return(false)
end
it 'does not remove the tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::FileRegistry.job_artifacts, :count)
end
end
end
context 'without a file' do
before do
FileUtils.rm(job_artifact.file.path)
end
it 'removes the tracking database entry' do
expect { daemon.run_once! }.to change(Geo::FileRegistry.job_artifacts, :count).by(-1)
end
end
end
context 'without a tracking database entry' do
it 'does not create a tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::FileRegistry, :count)
end
it 'does not delete the file (yet, due to possible race condition)' do
expect { daemon.run_once! }.not_to change { File.exist?(job_artifact.file.path) }.from(true)
end
end
end
end
describe '#delete_file' do
context 'when the file exists' do
let!(:file) { fixture_file_upload(Rails.root + "spec/fixtures/dk.png", "`/png") }
context 'when the delete does not raise an exception' do
it 'returns true' do
expect(daemon.send(:delete_file, file.path)).to be_truthy
end
it 'does not log an error' do
expect(daemon).not_to receive(:logger)
daemon.send(:delete_file, file.path)
end
end
context 'when the delete raises an exception' do
before do
expect(File).to receive(:delete).and_raise('something went wrong')
end
it 'returns false' do
expect(daemon.send(:delete_file, file.path)).to be_falsey
end
it 'logs an error' do
logger = double(logger)
expect(daemon).to receive(:logger).and_return(logger)
expect(logger).to receive(:error).with('Failed to remove file', exception: 'RuntimeError', details: 'something went wrong', filename: file.path)
daemon.send(:delete_file, file.path)
end
end
end
context 'when the file does not exist' do
it 'returns false' do
expect(daemon.send(:delete_file, '/does/not/exist')).to be_falsey
end
it 'logs an error' do
logger = double(logger)
expect(daemon).to receive(:logger).and_return(logger)
expect(logger).to receive(:error).with('Failed to remove file', exception: 'Errno::ENOENT', details: 'No such file or directory @ unlink_internal - /does/not/exist', filename: '/does/not/exist')
daemon.send(:delete_file, '/does/not/exist')
end
end
end
end
require 'spec_helper'
describe EE::Ci::JobArtifact do
describe '#destroy' do
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
it 'creates a JobArtifactDeletedEvent' do
job_artifact = create(:ci_job_artifact, :archive)
expect do
job_artifact.destroy
end.to change { Geo::JobArtifactDeletedEvent.count }.by(1)
end
end
end
......@@ -8,6 +8,9 @@ RSpec.describe Geo::EventLog, type: :model do
it { is_expected.to belong_to(:repository_renamed_event).class_name('Geo::RepositoryRenamedEvent').with_foreign_key('repository_renamed_event_id') }
it { is_expected.to belong_to(:repository_updated_event).class_name('Geo::RepositoryUpdatedEvent').with_foreign_key('repository_updated_event_id') }
it { is_expected.to belong_to(:hashed_storage_migrated_event).class_name('Geo::HashedStorageMigratedEvent').with_foreign_key('hashed_storage_migrated_event_id') }
it { is_expected.to belong_to(:hashed_storage_attachments_event).class_name('Geo::HashedStorageAttachmentsEvent').with_foreign_key('hashed_storage_attachments_event_id') }
it { is_expected.to belong_to(:lfs_object_deleted_event).class_name('Geo::LfsObjectDeletedEvent').with_foreign_key('lfs_object_deleted_event_id') }
it { is_expected.to belong_to(:job_artifact_deleted_event).class_name('Geo::JobArtifactDeletedEvent').with_foreign_key('job_artifact_deleted_event_id') }
end
describe '#event' do
......@@ -57,12 +60,26 @@ RSpec.describe Geo::EventLog, type: :model do
expect(subject.event).to eq hashed_storage_migrated_event
end
it 'returns hashed_storage_attachments_event when set' do
hashed_storage_attachments_event = build(:geo_hashed_storage_attachments_event)
subject.hashed_storage_attachments_event = hashed_storage_attachments_event
expect(subject.event).to eq hashed_storage_attachments_event
end
it 'returns lfs_object_deleted_event when set' do
lfs_object_deleted_event = build(:geo_lfs_object_deleted_event)
subject.lfs_object_deleted_event = lfs_object_deleted_event
expect(subject.event).to eq lfs_object_deleted_event
end
it 'returns job_artifact_deleted_event when set' do
job_artifact_deleted_event = build(:geo_job_artifact_deleted_event)
subject.job_artifact_deleted_event = job_artifact_deleted_event
expect(subject.event).to eq job_artifact_deleted_event
end
end
describe '#project_id' do
......
require 'spec_helper'
RSpec.describe Geo::JobArtifactDeletedEvent, type: :model do
describe 'relationships' do
it { is_expected.to belong_to(:job_artifact) }
end
describe 'validations' do
it { is_expected.to validate_presence_of(:job_artifact) }
end
end
......@@ -198,6 +198,61 @@ describe GeoNodeStatus, :geo do
end
end
describe '#job_artifacts_synced_count' do
it 'counts synced job artifacts' do
# These should be ignored
create(:geo_file_registry, success: false)
create(:geo_file_registry, :avatar, success: false)
create(:geo_file_registry, file_type: :attachment, success: true)
create(:geo_file_registry, :lfs, :with_file, success: true)
create(:geo_file_registry, :job_artifact, :with_file, success: false)
create(:geo_file_registry, :job_artifact, :with_file, success: true)
expect(subject.job_artifacts_synced_count).to eq(1)
end
end
describe '#job_artifacts_failed_count' do
it 'counts failed job artifacts' do
# These should be ignored
create(:geo_file_registry, success: false)
create(:geo_file_registry, :avatar, success: false)
create(:geo_file_registry, file_type: :attachment, success: false)
create(:geo_file_registry, :job_artifact, :with_file, success: true)
create(:geo_file_registry, :job_artifact, :with_file, success: false)
expect(subject.job_artifacts_failed_count).to eq(1)
end
end
describe '#job_artifacts_synced_in_percentage' do
context 'when artifacts are available' do
before do
[project_1, project_2, project_3, project_4].each_with_index do |project, index|
build = create(:ci_build, project: project)
job_artifact = create(:ci_job_artifact, job: build)
create(:geo_file_registry, :job_artifact, success: index.even?, file_id: job_artifact.id)
end
end
it 'returns the right percentage with no group restrictions' do
expect(subject.job_artifacts_synced_in_percentage).to be_within(0.0001).of(50)
end
it 'returns the right percentage with group restrictions' do
secondary.update_attribute(:namespaces, [group])
expect(subject.job_artifacts_synced_in_percentage).to be_within(0.0001).of(50)
end
end
it 'returns 0 when no artifacts are available' do
expect(subject.job_artifacts_synced_in_percentage).to eq(0)
end
end
describe '#repositories_failed_count' do
before do
create(:geo_project_registry, :sync_failed, project: project_1)
......
......@@ -22,6 +22,10 @@ describe GeoNodeStatusEntity, :postgresql do
it { is_expected.to have_key(:lfs_objects_failed_count) }
it { is_expected.to have_key(:lfs_objects_synced_count) }
it { is_expected.to have_key(:lfs_objects_synced_in_percentage) }
it { is_expected.to have_key(:job_artifacts_count) }
it { is_expected.to have_key(:job_artifacts_failed_count) }
it { is_expected.to have_key(:job_artifacts_synced_count) }
it { is_expected.to have_key(:job_artifacts_synced_in_percentage) }
it { is_expected.to have_key(:repositories_count) }
it { is_expected.to have_key(:repositories_failed_count) }
it { is_expected.to have_key(:repositories_synced_count)}
......@@ -99,6 +103,16 @@ describe GeoNodeStatusEntity, :postgresql do
end
end
describe '#job_artifacts_synced_in_percentage' do
it 'formats as percentage' do
geo_node_status.assign_attributes(job_artifacts_count: 256,
job_artifacts_failed_count: 12,
job_artifacts_synced_count: 123)
expect(subject[:job_artifacts_synced_in_percentage]).to eq '48.05%'
end
end
describe '#repositories_synced_in_percentage' do
it 'formats as percentage' do
geo_node_status.assign_attributes(repositories_count: 10,
......
......@@ -187,6 +187,32 @@ describe Geo::FileDownloadService do
end
end
context 'job artifacts' do
let(:job_artifact) { create(:ci_job_artifact) }
subject { described_class.new(:job_artifact, job_artifact.id) }
it 'downloads a job artifact' do
stub_transfer(Gitlab::Geo::JobArtifactTransfer, 100)
expect { subject.execute }.to change { Geo::FileRegistry.synced.count }.by(1)
end
it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::JobArtifactTransfer, -1)
expect { subject.execute }.to change { Geo::FileRegistry.failed.count }.by(1)
end
it 'logs a message' do
stub_transfer(Gitlab::Geo::JobArtifactTransfer, 100)
expect(Gitlab::Geo::Logger).to receive(:info).with(hash_including(:message, :download_time_s, success: true, bytes_downloaded: 100)).and_call_original
subject.execute
end
end
context 'bad object type' do
it 'raises an error' do
expect { described_class.new(:bad, 1).execute }.to raise_error(NameError)
......
......@@ -180,5 +180,28 @@ describe Geo::FileUploadService do
expect(service.execute).to be_nil
end
end
context 'job artifact' do
let(:job_artifact) { create(:ci_job_artifact, :with_file) }
let(:params) { { id: job_artifact.id, type: 'job_artifact' } }
let(:job_artifact_transfer) { Gitlab::Geo::JobArtifactTransfer.new(job_artifact) }
let(:transfer_request) { Gitlab::Geo::TransferRequest.new(job_artifact_transfer.request_data) }
let(:req_header) { transfer_request.headers['Authorization'] }
it 'sends job artifact file' do
service = described_class.new(params, req_header)
response = service.execute
expect(response[:code]).to eq(:ok)
expect(response[:file].path).to eq(job_artifact.file.path)
end
it 'returns nil if no authorization' do
service = described_class.new(params, nil)
expect(service.execute).to be_nil
end
end
end
end
require 'spec_helper'
describe Geo::JobArtifactDeletedEventStore do
set(:secondary_node) { create(:geo_node) }
let(:job_artifact) { create(:ci_job_artifact, :archive) }
subject(:event_store) { described_class.new(job_artifact) }
describe '#create' do
it 'does not create an event when not running on a primary node' do
allow(Gitlab::Geo).to receive(:primary?) { false }
expect { event_store.create }.not_to change(Geo::JobArtifactDeletedEvent, :count)
end
context 'when running on a primary node' do
before do
allow(Gitlab::Geo).to receive(:primary?) { true }
end
it 'does not create an event when LFS object is not on a local store' do
allow(job_artifact).to receive(:local_store?).and_return(false)
expect { event_store.create }.not_to change(Geo::JobArtifactDeletedEvent, :count)
end
it 'does not create an event when there are no secondary nodes' do
allow(Gitlab::Geo).to receive(:secondary_nodes) { [] }
expect { event_store.create }.not_to change(Geo::JobArtifactDeletedEvent, :count)
end
it 'creates a LFS object deleted event' do
expect { event_store.create }.to change(Geo::JobArtifactDeletedEvent, :count).by(1)
end
it 'tracks LFS object attributes' do
event_store.create
event = Geo::JobArtifactDeletedEvent.last
expect(event.job_artifact_id).to eq(job_artifact.id)
expect(event.file_path).to match(%r{\A\h+/\h+/\h+/[\d_]+/\d+/\d+/ci_build_artifacts.zip\z})
end
it 'logs an error message when event creation fail' do
invalid_job_artifact = create(:ci_job_artifact)
event_store = described_class.new(invalid_job_artifact)
expected_message = {
class: "Geo::JobArtifactDeletedEventStore",
job_artifact_id: invalid_job_artifact.id,
file_path: nil,
message: "Job artifact deleted event could not be created",
error: "Validation failed: File path can't be blank"
}
expect(Gitlab::Geo::Logger).to receive(:error)
.with(expected_message).and_call_original
event_store.create
end
end
end
end
......@@ -25,6 +25,9 @@ describe Geo::MetricsUpdateService, :geo do
lfs_objects_count: 100,
lfs_objects_synced_count: 50,
lfs_objects_failed_count: 12,
job_artifacts_count: 100,
job_artifacts_synced_count: 50,
job_artifacts_failed_count: 12,
attachments_count: 30,
attachments_synced_count: 30,
attachments_failed_count: 25,
......@@ -110,6 +113,9 @@ describe Geo::MetricsUpdateService, :geo do
expect(metric_value(:geo_lfs_objects)).to eq(100)
expect(metric_value(:geo_lfs_objects_synced)).to eq(50)
expect(metric_value(:geo_lfs_objects_failed)).to eq(12)
expect(metric_value(:geo_job_artifacts)).to eq(100)
expect(metric_value(:geo_job_artifacts_synced)).to eq(50)
expect(metric_value(:geo_job_artifacts_failed)).to eq(12)
expect(metric_value(:geo_attachments)).to eq(30)
expect(metric_value(:geo_attachments_synced)).to eq(30)
expect(metric_value(:geo_attachments_failed)).to eq(25)
......
......@@ -60,6 +60,9 @@ describe Geo::NodeStatusFetchService, :geo do
lfs_objects_count: 100,
lfs_objects_synced_count: 50,
lfs_objects_failed_count: 12,
job_artifacts_count: 100,
job_artifacts_synced_count: 50,
job_artifacts_failed_count: 12,
attachments_count: 30,
attachments_synced_count: 30,
attachments_failed_count: 25,
......@@ -136,6 +139,9 @@ describe Geo::NodeStatusFetchService, :geo do
expect(status.lfs_objects_count).to eq(db_status.lfs_objects_count)
expect(status.lfs_objects_failed_count).to eq(db_status.lfs_objects_failed_count)
expect(status.lfs_objects_synced_count).to eq(db_status.lfs_objects_synced_count)
expect(status.job_artifacts_count).to eq(db_status.job_artifacts_count)
expect(status.job_artifacts_failed_count).to eq(db_status.job_artifacts_failed_count)
expect(status.job_artifacts_synced_count).to eq(db_status.job_artifacts_synced_count)
expect(status.repositories_count).to eq(db_status.repositories_count)
expect(status.repositories_synced_count).to eq(db_status.repositories_synced_count)
expect(status.repositories_failed_count).to eq(db_status.repositories_failed_count)
......
......@@ -63,6 +63,48 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
end
context 'with job artifacts' do
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
artifact = create(:ci_job_artifact)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with(:job_artifact, artifact.id).once.and_return(spy)
subject.perform
end
it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do
artifact = create(:ci_job_artifact)
Geo::FileRegistry.create!(file_type: :job_artifact, file_id: artifact.id, bytes: 0, success: false)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', artifact.id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do
artifact = create(:ci_job_artifact)
Geo::FileRegistry.create!(file_type: :job_artifact, file_id: artifact.id, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do
artifact = create(:ci_job_artifact)
Geo::FileRegistry.create!(file_type: :job_artifact, file_id: artifact.id, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
end
# Test the case where we have:
#
# 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
......@@ -76,7 +118,8 @@ describe Geo::FileDownloadDispatchWorker, :geo do
create_list(:lfs_object, 2, :with_file)
create_list(:user, 2, avatar: avatar)
create_list(:note, 2, :with_attachment)
create_list(:upload, 2, :personal_snippet)
create_list(:upload, 1, :personal_snippet)
create_list(:ci_job_artifact, 1)
create(:appearance, logo: avatar, header_logo: avatar)
expect(Geo::FileDownloadWorker).to receive(:perform_async).exactly(10).times.and_call_original
......@@ -132,8 +175,8 @@ describe Geo::FileDownloadDispatchWorker, :geo do
context 'when node has namespace restrictions' do
let(:synced_group) { create(:group) }
let!(:project_in_synced_group) { create(:project, group: synced_group) }
let!(:unsynced_project) { create(:project) }
let(:project_in_synced_group) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
before do
allow(ProjectCacheWorker).to receive(:perform_async).and_return(true)
......@@ -142,11 +185,21 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
it 'does not perform Geo::FileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do
lfs_objec_in_synced_group = create(:lfs_objects_project, project: project_in_synced_group)
lfs_object_in_synced_group = create(:lfs_objects_project, project: project_in_synced_group)
create(:lfs_objects_project, project: unsynced_project)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with(:lfs, lfs_objec_in_synced_group.lfs_object_id).once.and_return(spy)
.with(:lfs, lfs_object_in_synced_group.lfs_object_id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for job artifact that does not belong to selected namespaces to replicate' do
create(:ci_job_artifact, project: unsynced_project)
job_artifact_in_synced_group = create(:ci_job_artifact, project: project_in_synced_group)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with(:job_artifact, job_artifact_in_synced_group.id).once.and_return(spy)
subject.perform
end
......@@ -183,4 +236,56 @@ describe Geo::FileDownloadDispatchWorker, :geo do
it_behaves_like '#perform', false
end
describe '#take_batch' do
it 'returns a batch of jobs' do
a = [[2, :lfs], [3, :lfs]]
b = []
c = [[3, :job_artifact], [8, :job_artifact], [9, :job_artifact]]
expect(subject).to receive(:db_retrieve_batch_size).and_return(4)
expect(subject.send(:take_batch, a, b, c)).to eq([
[3, :job_artifact],
[2, :lfs],
[8, :job_artifact],
[3, :lfs]
])
end
end
describe '#interleave' do
# Notice ties are resolved by taking the "first" tied element
it 'interleaves 2 arrays' do
a = %w{1 2 3}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3 C})
end
# Notice there are no ties in this call
it 'interleaves 2 arrays with a longer second array' do
a = %w{1 2}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{A 1 B 2 C})
end
it 'interleaves 2 arrays with a longer first array' do
a = %w{1 2 3}
b = %w{A B}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3})
end
it 'interleaves 3 arrays' do
a = %w{1 2 3}
b = %w{A B C}
c = %w{i ii iii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{1 A i 2 B ii 3 C iii})
end
it 'interleaves 3 arrays of unequal length' do
a = %w{1 2}
b = %w{A}
c = %w{i ii iii iiii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{i 1 ii A iii 2 iiii})
end
end
end
require 'spec_helper'
describe Geo::FileDownloadWorker, :geo do
describe '#perform' do
it 'instantiates and executes FileDownloadService, and converts object_type to a symbol' do
service = double(:service)
expect(service).to receive(:execute)
expect(Geo::FileDownloadService).to receive(:new).with(:job_artifact, 1).and_return(service)
described_class.new.perform('job_artifact', 1)
end
end
end
......@@ -27,6 +27,10 @@ FactoryBot.define do
trait :lfs_object_deleted_event do
lfs_object_deleted_event factory: :geo_lfs_object_deleted_event
end
trait :job_artifact_deleted_event do
job_artifact_deleted_event factory: :geo_job_artifact_deleted_event
end
end
factory :geo_repository_created_event, class: Geo::RepositoryCreatedEvent do
......@@ -96,8 +100,22 @@ FactoryBot.define do
lfs_object { create(:lfs_object, :with_file) }
after(:build, :stub) do |event, _|
local_store_path = Pathname.new(LfsObjectUploader.local_store_path)
relative_path = Pathname.new(event.lfs_object.file.path).relative_path_from(local_store_path)
event.oid = event.lfs_object.oid
event.file_path = event.lfs_object.file.path
event.file_path = relative_path
end
end
factory :geo_job_artifact_deleted_event, class: Geo::JobArtifactDeletedEvent do
job_artifact { create(:ci_job_artifact, :archive) }
after(:build, :stub) do |event, _|
local_store_path = Pathname.new(JobArtifactUploader.local_store_path)
relative_path = Pathname.new(event.job_artifact.file.path).relative_path_from(local_store_path)
event.file_path = relative_path
end
end
end
......@@ -12,11 +12,17 @@ FactoryBot.define do
file_type :lfs
end
trait :job_artifact do
file_type :job_artifact
end
trait :with_file do
after(:build, :stub) do |registry, _|
file =
if registry.file_type.to_sym == :lfs
create(:lfs_object)
elsif registry.file_type.to_sym == :job_artifact
create(:ci_job_artifact)
else
create(:upload)
end
......
......@@ -12,6 +12,9 @@ FactoryBot.define do
lfs_objects_count 256
lfs_objects_failed_count 12
lfs_objects_synced_count 123
job_artifacts_count 580
job_artifacts_failed_count 3
job_artifacts_synced_count 577
repositories_count 10
repositories_synced_count 5
repositories_failed_count 0
......
......@@ -12,6 +12,9 @@
"lfs_objects_count",
"lfs_objects_failed_count",
"lfs_objects_synced_count",
"job_artifacts_count",
"job_artifacts_failed_count",
"job_artifacts_synced_count",
"db_replication_lag_seconds",
"repositories_count",
"repositories_failed_count",
......@@ -46,6 +49,10 @@
"lfs_objects_failed_count": { "type": "integer" },
"lfs_objects_synced_count": { "type": "integer" },
"lfs_objects_synced_in_percentage": { "type": "string" },
"job_artifacts_count": { "type": "integer" },
"job_artifacts_failed_count": { "type": "integer" },
"job_artifacts_synced_count": { "type": "integer" },
"job_artifacts_synced_in_percentage": { "type": "string" },
"repositories_count": { "type": "integer" },
"repositories_failed_count": { "type": "integer" },
"repositories_synced_count": { "type": "integer" },
......
......@@ -43,6 +43,10 @@ export const rawMockNodeDetails = {
lfs_objects_synced_count: 0,
lfs_objects_failed_count: 0,
lfs_objects_synced_in_percentage: '0.00%',
job_artifacts_count: 0,
job_artifacts_synced_count: 0,
job_artifacts_failed_count: 0,
job_artifacts_synced_in_percentage: '0.00%',
repositories_count: 12,
repositories_failed_count: 0,
repositories_synced_count: 12,
......@@ -129,6 +133,11 @@ export const mockNodeDetails = {
successCount: 0,
failureCount: 0,
},
job_artifacts: {
totalCount: 0,
successCount: 0,
failureCount: 0,
},
attachments: {
totalCount: 0,
successCount: 0,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment