Commit d04a47c4 authored by Stan Hu's avatar Stan Hu

Merge branch 'geo/lfs-download-scheduler-clean' into 'master'

Add a GitLab Geo download scheduler for LFS files

See merge request !1382
parents 56b755ff 9f449d21
......@@ -350,6 +350,7 @@ Style/MutableConstant:
Exclude:
- 'db/migrate/**/*'
- 'db/post_migrate/**/*'
- 'db/geo/migrate/**/*'
# Favor unless over if for negative conditions (or control flow or).
Style/NegatedIf:
......
......@@ -20,7 +20,7 @@ module EE
def toggle_node_button(node)
btn_class, title, data =
if node.enabled?
['warning', 'Disable node', { confirm: 'Disabling a node stops the repositories backfilling process. Are you sure?' }]
['warning', 'Disable node', { confirm: 'Disabling a node stops the sync process. Are you sure?' }]
else
['success', 'Enable node']
end
......
class Geo::FileRegistry < Geo::BaseRegistry
end
......@@ -28,9 +28,7 @@ class GeoNodeStatus
end
def repositories_synced_in_percentage
return 0 if repositories_count.zero?
(repositories_synced_count.to_f / repositories_count.to_f) * 100.0
sync_percentage(repositories_count, repositories_synced_count)
end
def repositories_failed_count
......@@ -40,4 +38,32 @@ class GeoNodeStatus
def repositories_failed_count=(value)
@repositories_failed_count = value.to_i
end
def lfs_objects_total
@lfs_objects_total ||= LfsObject.count
end
def lfs_objects_total=(value)
@lfs_objects_total = value.to_i
end
def lfs_objects_synced
@lfs_objects_synced ||= Geo::FileRegistry.where(file_type: :lfs).count
end
def lfs_objects_synced=(value)
@lfs_objects_synced = value.to_i
end
def lfs_objects_synced_in_percentage
sync_percentage(lfs_objects_total, lfs_objects_synced)
end
private
def sync_percentage(total, synced)
return 0 if total.zero?
(synced.to_f / total.to_f) * 100.0
end
end
......@@ -3,6 +3,7 @@ class GeoNodePresenter < Gitlab::View::Presenter::Delegated
delegate :healthy?, :health, :repositories_count, :repositories_synced_count,
:repositories_synced_in_percentage, :repositories_failed_count,
:lfs_objects_total, :lfs_objects_synced, :lfs_objects_synced_in_percentage,
to: :status
private
......
module Geo
class FileDownloadService
attr_reader :object_type, :object_db_id
LEASE_TIMEOUT = 8.hours.freeze
def initialize(object_type, object_db_id)
@object_type = object_type
@object_db_id = object_db_id
end
def execute
try_obtain_lease do |lease|
case object_type
when :lfs
download_lfs_object
else
log("unknown file type: #{object_type}")
end
end
end
private
def try_obtain_lease
uuid = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT).try_obtain
return unless uuid.present?
begin
yield
ensure
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
end
def download_lfs_object
lfs_object = LfsObject.find_by_id(object_db_id)
return unless lfs_object.present?
transfer = ::Gitlab::Geo::LfsTransfer.new(lfs_object)
bytes_downloaded = transfer.download_from_primary
success = bytes_downloaded && bytes_downloaded >= 0
update_registry(bytes_downloaded) if success
success
end
def log(message)
Rails.logger.info "#{self.class.name}: #{message}"
end
def update_registry(bytes_downloaded)
transfer = Geo::FileRegistry.find_or_initialize_by(
file_type: object_type,
file_id: object_db_id)
transfer.bytes = bytes_downloaded
transfer.save
end
def lease_key
"file_download_service:#{object_type}:#{object_db_id}"
end
end
end
......@@ -3,7 +3,7 @@ module Geo
include Gitlab::CurrentSettings
include HTTParty
KEYS = %w(health repositories_count repositories_synced_count repositories_failed_count).freeze
KEYS = %w(health repositories_count repositories_synced_count repositories_failed_count lfs_objects_total lfs_objects_synced).freeze
# HTTParty timeout
default_timeout current_application_settings.geo_status_timeout
......
......@@ -30,6 +30,9 @@
%p
%span.help-block
Repositories failed: #{node.repositories_failed_count}
%p
%span.help-block
LFS objects synced: #{node.lfs_objects_synced}/#{node.lfs_objects_total} (#{number_to_percentage(node.lfs_objects_synced_in_percentage, precision: 2)})
%p
%span.help-block= node.healthy? ? 'No Health Problems Detected' : node.health
......
......@@ -16,7 +16,7 @@ class GeoBackfillWorker
project_ids.each do |project_id|
begin
break if over_time?(start_time)
break unless node_enabled?
break unless Gitlab::Geo.current_node_enabled?
project = Project.find(project_id)
next if synced?(project)
......@@ -79,11 +79,4 @@ class GeoBackfillWorker
def lease_timeout
Geo::RepositoryBackfillService::LEASE_TIMEOUT
end
def node_enabled?
# No caching of the enabled! If we cache it and an admin disables
# this node, an active GeoBackfillWorker would keep going for up
# to max run time after the node was disabled.
Gitlab::Geo.current_node.reload.enabled?
end
end
class GeoFileDownloadDispatchWorker
include Sidekiq::Worker
include CronjobQueue
LEASE_KEY = 'geo_file_download_dispatch_worker'.freeze
LEASE_TIMEOUT = 8.hours.freeze
RUN_TIME = 60.minutes.to_i.freeze
DB_RETRIEVE_BATCH = 1000.freeze
MAX_CONCURRENT_DOWNLOADS = 10.freeze
def initialize
@pending_lfs_downloads = []
@scheduled_lfs_jobs = []
end
# The scheduling works as the following:
#
# 1. Load a batch of IDs that we need to download from the primary (DB_RETRIEVE_BATCH) into a pending list.
# 2. Schedule them so that at most MAX_CONCURRENT_DOWNLOADS are running at once.
# 3. When a slot frees, schedule another download.
# 4. When we have drained the pending list, load another batch into memory, and schedule the remaining
# files, excluding ones in progress.
# 5. Quit when we have scheduled all downloads or exceeded an hour.
def perform
return unless Gitlab::Geo.secondary?
@start_time = Time.now
# Prevent multiple Sidekiq workers from attempting to schedule downloads
try_obtain_lease do
loop do
break unless node_enabled?
update_jobs_in_progress
load_pending_downloads if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
break if over_time?
break unless downloads_remain?
schedule_lfs_downloads
break if last_batch
sleep(1)
end
end
end
private
def reload_queue?
@pending_lfs_downloads.size < MAX_CONCURRENT_DOWNLOADS
end
def over_time?
Time.now - @start_time >= RUN_TIME
end
def load_pending_downloads
@pending_lfs_downloads = find_lfs_object_ids(DB_RETRIEVE_BATCH)
end
def downloads_remain?
@pending_lfs_downloads.size
end
def schedule_lfs_downloads
num_to_schedule = [MAX_CONCURRENT_DOWNLOADS - job_ids.size, @pending_lfs_downloads.size].min
return unless downloads_remain?
num_to_schedule.times do
lfs_id = @pending_lfs_downloads.shift
job_id = GeoFileDownloadWorker.perform_async(:lfs, lfs_id)
if job_id
@scheduled_lfs_jobs << { job_id: job_id, id: lfs_id }
end
end
end
def find_lfs_object_ids(limit)
downloaded_ids = Geo::FileRegistry.where(file_type: 'lfs').pluck(:file_id)
downloaded_ids = (downloaded_ids + scheduled_lfs_ids).uniq
LfsObject.where.not(id: downloaded_ids).order(created_at: :desc).limit(limit).pluck(:id)
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_lfs_jobs = @scheduled_lfs_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def job_ids
@scheduled_lfs_jobs.map { |data| data[:job_id] }
end
def scheduled_lfs_ids
@scheduled_lfs_jobs.map { |data| data[:id] }
end
def try_obtain_lease
uuid = Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT).try_obtain
return unless uuid
yield
release_lease(uuid)
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid)
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && (Time.now - @last_enabled_check > 1.minute)
@last_enabled_check = Time.now
@current_node_enabled = nil
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end
end
class GeoFileDownloadWorker
include Sidekiq::Worker
include GeoQueue
def perform(object_type, object_id)
Geo::FileDownloadService.new(object_type.to_sym, object_id).execute
end
end
......@@ -218,6 +218,11 @@ production: &base
geo_backfill_worker:
cron: "*/5 * * * *"
# GitLab Geo file download worker
# NOTE: This will only take effect if Geo is enabled
geo_download_dispatch_worker:
cron: "*/10 * * * *"
registry:
# enabled: true
# host: registry.example.com
......
......@@ -392,6 +392,9 @@ Settings.cron_jobs['geo_bulk_notify_worker']['job_class'] ||= 'GeoBulkNotifyWork
Settings.cron_jobs['geo_backfill_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_backfill_worker']['cron'] ||= '*/5 * * * *'
Settings.cron_jobs['geo_backfill_worker']['job_class'] ||= 'GeoBackfillWorker'
Settings.cron_jobs['geo_download_dispatch_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_download_dispatch_worker']['cron'] ||= '5 * * * *'
Settings.cron_jobs['geo_download_dispatch_worker']['job_class'] ||= 'GeoFileDownloadDispatchWorker'
Settings.cron_jobs['gitlab_usage_ping_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['gitlab_usage_ping_worker']['cron'] ||= Settings.send(:cron_random_weekly_time)
Settings.cron_jobs['gitlab_usage_ping_worker']['job_class'] = 'GitlabUsagePingWorker'
......
......@@ -10,6 +10,6 @@
# end
#
ActiveSupport::Inflector.inflections do |inflect|
inflect.uncountable %w(award_emoji project_statistics project_registry)
inflect.uncountable %w(award_emoji project_statistics project_registry file_registry)
inflect.acronym 'EE'
end
......@@ -42,6 +42,9 @@ Sidekiq.configure_server do |config|
# GitLab Geo: enable backfill job only on secondary nodes
Gitlab::Geo.backfill_job.disable! unless Gitlab::Geo.secondary?
# GitLab Geo: enable backfill job only on secondary nodes
Gitlab::Geo.file_download_job.disable! unless Gitlab::Geo.secondary?
Gitlab::SidekiqThrottler.execute!
config = ActiveRecord::Base.configurations[Rails.env] ||
......
class CreateFileRegistry < ActiveRecord::Migration
def change
create_table :file_registry do |t|
t.string :file_type, null: false
t.integer :file_id, null: false
t.integer :bytes
t.string :sha256
t.datetime :created_at, null: false
end
add_index :file_registry, :file_type
add_index :file_registry, [:file_type, :file_id], { unique: true }
end
end
......@@ -12,10 +12,20 @@
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20170302005747) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
create_table "file_registry", force: :cascade do |t|
t.string "file_type", null: false
t.integer "file_id", null: false
t.integer "bytes"
t.string "sha256"
t.datetime "created_at", null: false
end
add_index "file_registry", ["file_type", "file_id"], name: "index_file_registry_on_file_type_and_file_id", unique: true, using: :btree
add_index "file_registry", ["file_type"], name: "index_file_registry_on_file_type", using: :btree
create_table "project_registry", force: :cascade do |t|
t.integer "project_id", null: false
t.datetime "last_repository_synced_at"
......@@ -24,5 +34,4 @@ ActiveRecord::Schema.define(version: 20170302005747) do
end
add_index "project_registry", ["project_id"], name: "index_project_registry_on_project_id", using: :btree
end
......@@ -774,6 +774,8 @@ module API
expose :repositories_count
expose :repositories_synced_count
expose :repositories_failed_count
expose :lfs_objects_total
expose :lfs_objects_synced
end
end
end
......@@ -22,6 +22,13 @@ module Gitlab
self.cache_value(:geo_node_enabled) { GeoNode.exists? }
end
def self.current_node_enabled?
# No caching of the enabled! If we cache it and an admin disables
# this node, an active GeoBackfillWorker would keep going for up
# to max run time after the node was disabled.
Gitlab::Geo.current_node.reload.enabled?
end
def self.license_allows?
::License.current && ::License.current.add_on?('GitLab_Geo')
end
......@@ -54,6 +61,10 @@ module Gitlab
Sidekiq::Cron::Job.find('geo_backfill_worker')
end
def self.file_download_job
Sidekiq::Cron::Job.find('geo_download_dispatch_worker')
end
def self.oauth_authentication
return false unless Gitlab::Geo.secondary?
......
......@@ -47,7 +47,7 @@ module Gitlab
def hmac_secret(access_key)
@hmac_secret ||= begin
geo_node = GeoNode.find_by(access_key: access_key)
geo_node = GeoNode.find_by(access_key: access_key, enabled: true)
geo_node&.secret_access_key
end
end
......
......@@ -12,6 +12,13 @@ describe Gitlab::Geo::JwtRequestDecoder do
expect(subject.decode).to eq(data)
end
it 'fails to decode when node is disabled' do
primary_node.enabled = false
primary_node.save
expect(subject.decode).to be_nil
end
it 'fails to decode with wrong key' do
data = request.headers['Authorization']
......
require 'spec_helper'
describe Geo::GeoNodeStatus, models: true do
subject { GeoNodeStatus.new }
describe '#lfs_objects_synced_in_percentage' do
it 'returns 0 when no objects are available' do
subject.lfs_objects_total = 0
subject.lfs_objects_synced = 0
expect(subject.lfs_objects_synced_in_percentage).to eq(0)
end
it 'returns the right percentage' do
subject.lfs_objects_total = 4
subject.lfs_objects_synced = 1
expect(subject.lfs_objects_synced_in_percentage).to be_within(0.0001).of(25)
end
end
end
......@@ -1972,6 +1972,16 @@ describe Repository, models: true do
end
end
describe '#after_sync' do
it 'expires repository cache' do
expect(repository).to receive(:expire_all_method_caches)
expect(repository).to receive(:expire_branch_cache)
expect(repository).to receive(:expire_content_cache)
repository.after_sync
end
end
def create_remote_branch(remote_name, branch_name, target)
rugged = repository.rugged
rugged.references.create("refs/remotes/#{remote_name}/#{branch_name}", target.id)
......
require 'spec_helper'
describe Geo::FileDownloadService, services: true do
let(:lfs_object) { create(:lfs_object) }
let(:secondary) { create(:geo_node) }
subject { Geo::FileDownloadService.new(:lfs, lfs_object.id) }
before do
create(:geo_node, :primary)
allow(described_class).to receive(:current_node) { secondary }
end
describe '#execute' do
it 'downloads an LFS object' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::LfsTransfer)
.to receive(:download_from_primary).and_return(100)
expect{ subject.execute }.to change { Geo::FileRegistry.count }.by(1)
end
end
end
require 'spec_helper'
describe GeoFileDownloadDispatchWorker do
before do
@primary = create(:geo_node, :primary, host: 'primary-geo-node')
@secondary = create(:geo_node, :current)
allow(Gitlab::Geo).to receive(:secondary?).and_return(true)
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
WebMock.stub_request(:get, /primary-geo-node/).to_return(status: 200, body: "", headers: {})
end
subject { described_class.new }
describe '#perform' do
it 'does not schedule anything when node is disabled' do
expect(GeoFileDownloadWorker).not_to receive(:perform_async)
@secondary.enabled = false
@secondary.save
subject.perform
end
it 'executes GeoFileDownloadWorker for each LFS object' do
create_list(:lfs_object, 2, :with_file)
allow_any_instance_of(described_class).to receive(:over_time?).and_return(false)
expect(GeoFileDownloadWorker).to receive(:perform_async).twice.and_call_original
subject.perform
end
# Test the case where we have:
#
# 1. A total of 6 files in the queue, and we can load a maxmimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do
stub_const('GeoFileDownloadDispatchWorker::DB_RETRIEVE_BATCH', 5)
stub_const('GeoFileDownloadDispatchWorker::MAX_CONCURRENT_DOWNLOADS', 2)
create_list(:lfs_object, 6, :with_file)
allow_any_instance_of(described_class).to receive(:over_time?).and_return(false)
expect(GeoFileDownloadWorker).to receive(:perform_async).exactly(6).times.and_call_original
# For 6 downloads, we expect three database reloads:
# 1. Load the first batch of 5.
# 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the remaining 2.
# 3. Since the second reload filled the pipe with 2, we need to do a final reload to ensure
# zero are left.
expect(subject).to receive(:load_pending_downloads).exactly(3).times.and_call_original
Sidekiq::Testing.inline! do
subject.perform
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment