Commit 4986ff04 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre Committed by Nick Thomas

Resolve "Geo: restarting sidekiq doesn't cause BaseSchedulerWorker leases to be returned"

parent 43ec8c81
#
# Concern that helps with getting an exclusive lease for running a worker
# Concern that helps with getting an exclusive lease for running a block
# of code.
#
# `#try_obtain_lease` takes a block which will be run if it was able to obtain the lease.
# Implement `#lease_timeout` to configure the timeout for the exclusive lease.
# Optionally override `#lease_key` to set the lease key, it defaults to the class name with underscores.
# `#try_obtain_lease` takes a block which will be run if it was able to
# obtain the lease. Implement `#lease_timeout` to configure the timeout
# for the exclusive lease. Optionally override `#lease_key` to set the
# lease key, it defaults to the class name with underscores.
#
module ExclusiveLeaseGuard
extend ActiveSupport::Concern
# override in subclass
def lease_timeout
raise NotImplementedError
end
def lease_key
@lease_key ||= self.class.name.underscore
end
def log_error(message, extra_args = {})
logger.error(messages)
end
def try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
log_error('Cannot obtain an exclusive lease. There must be another worker already in execution.')
log_error('Cannot obtain an exclusive lease. There must be another instance already in execution.')
return
end
......@@ -40,6 +29,15 @@ module ExclusiveLeaseGuard
@lease ||= Gitlab::ExclusiveLease.new(lease_key, timeout: lease_timeout)
end
def lease_key
@lease_key ||= self.class.name.underscore
end
def lease_timeout
raise NotImplementedError,
"#{self.class.name} does not implement #{__method__}"
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
......@@ -47,4 +45,8 @@ module ExclusiveLeaseGuard
def renew_lease!
exclusive_lease.renew
end
def log_error(message, extra_args = {})
logger.error(messages)
end
end
......@@ -4,6 +4,7 @@ module Geo
EmptyCloneUrlPrefixError = Class.new(StandardError)
class BaseSyncService
include ExclusiveLeaseGuard
include ::Gitlab::Geo::ProjectLogHelpers
class << self
......@@ -31,6 +32,10 @@ module Geo
@lease_key ||= "#{LEASE_KEY_PREFIX}:#{type}:#{project.id}"
end
def lease_timeout
LEASE_TIMEOUT
end
def primary_ssh_path_prefix
@primary_ssh_path_prefix ||= Gitlab::Geo.primary_node.clone_url_prefix.tap do |prefix|
raise EmptyCloneUrlPrefixError, 'Missing clone_url_prefix in the primary node' unless prefix.present?
......@@ -89,24 +94,6 @@ module Geo
@registry ||= Geo::ProjectRegistry.find_or_initialize_by(project_id: project.id)
end
def try_obtain_lease
log_info("Trying to obtain lease to sync #{type}")
repository_lease = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT).try_obtain
unless repository_lease
log_info("Could not obtain lease to sync #{type}")
return
end
yield
# We should release the lease for a repository, only if we have obtained
# it. If something went wrong when syncing the repository, we should wait
# for the lease timeout to try again.
log_info("Releasing leases to sync #{type}")
Gitlab::ExclusiveLease.cancel(lease_key, repository_lease)
end
def update_registry(started_at: nil, finished_at: nil)
return unless started_at || finished_at
......
......@@ -15,5 +15,10 @@ module Geo
def lease_timeout
LEASE_TIMEOUT
end
def log_error(message, extra_args = {})
args = { class: self.class.name, message: message }.merge(extra_args)
Gitlab::Geo::Logger.error(args)
end
end
end
---
title: Geo - Ensures that leases were returned.
merge_request: 3241
author:
type: fixed
......@@ -12,8 +12,8 @@ module Gitlab
geo_oauth_application
).freeze
COMMON_JOBS = %i(metrics_update_job).freeze
SECONDARY_JOBS = %i(repository_sync_job file_download_job).freeze
COMMON_CRON_JOBS = %i(geo_metrics_update_worker).freeze
SECONDARY_CRON_JOBS = %i(geo_repository_sync_worker geo_file_download_dispatch_worker).freeze
FDW_SCHEMA = 'gitlab_secondary'.freeze
......@@ -88,30 +88,18 @@ module Gitlab
FDW_SCHEMA + ".#{table_name}"
end
def self.repository_sync_job
Sidekiq::Cron::Job.find('geo_repository_sync_worker')
end
def self.file_download_job
Sidekiq::Cron::Job.find('geo_file_download_dispatch_worker')
end
def self.metrics_update_job
Sidekiq::Cron::Job.find('geo_metrics_update_worker')
end
def self.configure_primary_jobs!
self.enable_all_cron_jobs!
SECONDARY_JOBS.each { |job| self.__send__(job).try(:disable!) } # rubocop:disable GitlabSecurity/PublicSend
SECONDARY_CRON_JOBS.each { |job_name| Sidekiq::Cron::Job.find(job_name).try(:disable!) }
end
def self.configure_secondary_jobs!
self.disable_all_cron_jobs!
(COMMON_JOBS + SECONDARY_JOBS).each { |job| self.__send__(job).try(:enable!) } # rubocop:disable GitlabSecurity/PublicSend
(COMMON_CRON_JOBS + SECONDARY_CRON_JOBS).each { |job_name| Sidekiq::Cron::Job.find(job_name).try(:enable!) }
end
def self.disable_all_geo_jobs!
(COMMON_JOBS + SECONDARY_JOBS).each { |job| self.__send__(job).try(:disable!) } # rubocop:disable GitlabSecurity/PublicSend
(COMMON_CRON_JOBS + SECONDARY_CRON_JOBS).each { |job_name| Sidekiq::Cron::Job.find(job_name).try(:disable!) }
end
def self.disable_all_cron_jobs!
......
......@@ -209,9 +209,9 @@ describe Gitlab::Geo, :geo do
it 'activates cron jobs for primary' do
described_class.configure_cron_jobs!
expect(described_class.repository_sync_job).not_to be_enabled
expect(described_class.file_download_job).not_to be_enabled
expect(described_class.metrics_update_job).to be_enabled
expect(Sidekiq::Cron::Job.find('geo_repository_sync_worker')).not_to be_enabled
expect(Sidekiq::Cron::Job.find('geo_file_download_dispatch_worker')).not_to be_enabled
expect(Sidekiq::Cron::Job.find('geo_metrics_update_worker')).to be_enabled
expect(Sidekiq::Cron::Job.find('ldap_test')).to be_enabled
end
......@@ -221,9 +221,9 @@ describe Gitlab::Geo, :geo do
described_class.configure_cron_jobs!
expect(Sidekiq::Cron::Job.find('ldap_test')).not_to be_enabled
expect(described_class.repository_sync_job).to be_enabled
expect(described_class.file_download_job).to be_enabled
expect(described_class.metrics_update_job).to be_enabled
expect(Sidekiq::Cron::Job.find('geo_metrics_update_worker')).to be_enabled
expect(Sidekiq::Cron::Job.find('geo_repository_sync_worker')).to be_enabled
expect(Sidekiq::Cron::Job.find('geo_file_download_dispatch_worker')).to be_enabled
end
it 'deactivates all jobs when Geo is not active' do
......@@ -231,9 +231,9 @@ describe Gitlab::Geo, :geo do
described_class.configure_cron_jobs!
expect(described_class.repository_sync_job).not_to be_enabled
expect(described_class.file_download_job).not_to be_enabled
expect(described_class.metrics_update_job).not_to be_enabled
expect(Sidekiq::Cron::Job.find('geo_repository_sync_worker')).not_to be_enabled
expect(Sidekiq::Cron::Job.find('geo_file_download_dispatch_worker')).not_to be_enabled
expect(Sidekiq::Cron::Job.find('geo_metrics_update_worker')).not_to be_enabled
expect(Sidekiq::Cron::Job.find('ldap_test')).to be_enabled
end
......
......@@ -2,6 +2,7 @@ require 'spec_helper'
describe Geo::BaseSyncService do
let(:project) { build('project')}
subject { described_class.new(project) }
it_behaves_like 'geo base sync execution'
......
......@@ -45,7 +45,16 @@ RSpec.describe Geo::RepositorySyncService do
subject.execute
end
it 'releases lease' do
it 'returns the lease when succeed' do
expect(Gitlab::ExclusiveLease).to receive(:cancel).once.with(
subject.__send__(:lease_key), anything).and_call_original
subject.execute
end
it 'returns the lease when sync fail' do
allow(repository).to receive(:fetch_geo_mirror).with(url_to_repo) { raise Gitlab::Shell::Error }
expect(Gitlab::ExclusiveLease).to receive(:cancel).once.with(
subject.__send__(:lease_key), anything).and_call_original
......
......@@ -32,7 +32,7 @@ describe Geo::PruneEventLogWorker, :geo do
it 'logs error when it cannot obtain lease' do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { nil }
expect(worker).to receive(:log_error).with('Cannot obtain an exclusive lease. There must be another worker already in execution.')
expect(worker).to receive(:log_error).with('Cannot obtain an exclusive lease. There must be another instance already in execution.')
worker.perform
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment