Commit 57f961a7 authored by Alexandru Croitor's avatar Alexandru Croitor

Reschedule issue rebalance jobs in case those got stock

Issue rebalance jobs can be quite long running, so in order to
prevent thos jobs being stuck or teminating we can reschedule
them every 15 mins
parent 89d1ab2d
......@@ -354,6 +354,15 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:issues_reschedule_stuck_issue_rebalances
:worker_name: Issues::RescheduleStuckIssueRebalancesWorker
:feature_category: :issue_tracking
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:jira_import_stuck_jira_import_jobs
:worker_name: Gitlab::JiraImport::StuckJiraImportJobsWorker
:feature_category: :importers
......
......@@ -19,6 +19,7 @@ class IssueRebalancingWorker
# we need to have exactly one of the project_id and root_namespace_id params be non-nil
raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id
return if project_id.nil? && root_namespace_id.nil?
return if ::Gitlab::Issues::Rebalancing::State.rebalance_recently_finished?(project_id, root_namespace_id)
# pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce)
# or the root namespace, this also makes the worker backward compatible with previous version where a project_id was
......
# frozen_string_literal: true
module Issues
class RescheduleStuckIssueRebalancesWorker
include ApplicationWorker
include CronjobQueue
data_consistency :always
idempotent!
urgency :low
feature_category :issue_tracking
deduplicate :until_executed, including_scheduled: true
def perform
namespace_ids, project_ids = fetch_rebalancing_groups_and_projects
return if namespace_ids.blank? && project_ids.blank?
namespaces = Namespace.id_in(namespace_ids)
projects = Project.id_in(project_ids)
IssueRebalancingWorker.bulk_perform_async_with_contexts(
namespaces,
arguments_proc: -> (namespace) { [nil, nil, namespace.id] },
context_proc: -> (namespace) { { namespace: namespace } }
)
IssueRebalancingWorker.bulk_perform_async_with_contexts(
projects,
arguments_proc: -> (project) { [nil, project.id, nil] },
context_proc: -> (project) { { namespace: project } }
)
end
private
def fetch_rebalancing_groups_and_projects
namespace_ids = []
project_ids = []
all_rebalancing_containers = Gitlab::Redis::SharedState.with do |redis|
redis.smembers(Gitlab::Issues::Rebalancing::State::CONCURRENT_RUNNING_REBALANCES_KEY)
end
all_rebalancing_containers.each do |string|
container_type, container_id = string.split('/', 2).map(&:to_i)
if container_type == Gitlab::Issues::Rebalancing::State::NAMESPACE
namespace_ids << container_id
elsif container_type == Gitlab::Issues::Rebalancing::State::PROJECT
project_ids << container_id
end
end
[namespace_ids, project_ids]
end
end
end
......@@ -589,6 +589,9 @@ Settings.cron_jobs['ci_delete_unit_tests_worker']['job_class'] = 'Ci::DeleteUnit
Settings.cron_jobs['batched_background_migrations_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['batched_background_migrations_worker']['cron'] ||= '* * * * *'
Settings.cron_jobs['batched_background_migrations_worker']['job_class'] = 'Database::BatchedBackgroundMigrationWorker'
Settings.cron_jobs['issues_reschedule_stuck_issue_rebalances'] ||= Settingslogic.new({})
Settings.cron_jobs['issues_reschedule_stuck_issue_rebalances']['cron'] ||= '* 0/15 * * *'
Settings.cron_jobs['issues_reschedule_stuck_issue_rebalances']['job_class'] = 'Issues::RescheduleStuckIssueRebalancesWorker'
Gitlab.ee do
Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker'] ||= Settingslogic.new({})
......
......@@ -4,6 +4,10 @@ module Gitlab
module Issues
module Rebalancing
class State
REDIS_KEY_PREFIX = "gitlab:issues-position-rebalances"
CONCURRENT_RUNNING_REBALANCES_KEY = "#{REDIS_KEY_PREFIX}:running_rebalances"
RECENTLY_FINISHED_REBALANCE_PREFIX = "#{REDIS_KEY_PREFIX}:recently_finished"
REDIS_EXPIRY_TIME = 10.days
MAX_NUMBER_OF_CONCURRENT_REBALANCES = 5
NAMESPACE = 1
......@@ -21,25 +25,25 @@ module Gitlab
redis.multi do |multi|
# we trigger re-balance for namespaces(groups) or specific user project
value = "#{rebalanced_container_type}/#{rebalanced_container_id}"
multi.sadd(concurrent_running_rebalances_key, value)
multi.expire(concurrent_running_rebalances_key, REDIS_EXPIRY_TIME)
multi.sadd(CONCURRENT_RUNNING_REBALANCES_KEY, value)
multi.expire(CONCURRENT_RUNNING_REBALANCES_KEY, REDIS_EXPIRY_TIME)
end
end
end
def concurrent_running_rebalances_count
with_redis { |redis| redis.scard(concurrent_running_rebalances_key).to_i }
with_redis { |redis| redis.scard(CONCURRENT_RUNNING_REBALANCES_KEY).to_i }
end
def rebalance_in_progress?
all_rebalanced_containers = with_redis { |redis| redis.smembers(concurrent_running_rebalances_key) }
all_rebalancing_containers = with_redis { |redis| redis.smembers(CONCURRENT_RUNNING_REBALANCES_KEY) }
is_running = case rebalanced_container_type
when NAMESPACE
namespace_ids = all_rebalanced_containers.map {|string| string.split("#{NAMESPACE}/").second.to_i }.compact
namespace_ids = all_rebalancing_containers.map {|string| string.split("#{NAMESPACE}/").second.to_i }.compact
namespace_ids.include?(root_namespace.id)
when PROJECT
project_ids = all_rebalanced_containers.map {|string| string.split("#{PROJECT}/").second.to_i }.compact
project_ids = all_rebalancing_containers.map {|string| string.split("#{PROJECT}/").second.to_i }.compact
project_ids.include?(projects.take.id) # rubocop:disable CodeReuse/ActiveRecord
else
false
......@@ -101,22 +105,32 @@ module Gitlab
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
multi.expire(current_index_key, REDIS_EXPIRY_TIME)
multi.expire(current_project_key, REDIS_EXPIRY_TIME)
multi.expire(concurrent_running_rebalances_key, REDIS_EXPIRY_TIME)
multi.expire(CONCURRENT_RUNNING_REBALANCES_KEY, REDIS_EXPIRY_TIME)
end
end
end
def cleanup_cache
value = "#{rebalanced_container_type}/#{rebalanced_container_id}"
with_redis do |redis|
redis.multi do |multi|
multi.del(issue_ids_key)
multi.del(current_index_key)
multi.del(current_project_key)
multi.srem(concurrent_running_rebalances_key, "#{rebalanced_container_type}/#{rebalanced_container_id}")
multi.srem(CONCURRENT_RUNNING_REBALANCES_KEY, value)
multi.set(self.class.recently_finished_key(rebalanced_container_type, rebalanced_container_id), true, ex: 1.hour)
end
end
end
def self.rebalance_recently_finished?(project_id, namespace_id)
container_id = project_id || namespace_id
container_type = project_id.present? ? PROJECT : NAMESPACE
Gitlab::Redis::SharedState.with { |redis| redis.get(recently_finished_key(container_type, container_id)) }
end
private
attr_accessor :root_namespace, :projects, :rebalanced_container_type, :rebalanced_container_id
......@@ -125,12 +139,8 @@ module Gitlab
concurrent_running_rebalances_count <= MAX_NUMBER_OF_CONCURRENT_REBALANCES
end
def redis_key_prefix
"gitlab:issues-position-rebalances"
end
def issue_ids_key
"#{redis_key_prefix}:#{root_namespace.id}"
"#{REDIS_KEY_PREFIX}:#{root_namespace.id}"
end
def current_index_key
......@@ -141,8 +151,8 @@ module Gitlab
"#{issue_ids_key}:current_project_id"
end
def concurrent_running_rebalances_key
"#{redis_key_prefix}:running_rebalances"
def self.recently_finished_key(container_type, container_id)
"#{RECENTLY_FINISHED_REBALANCE_PREFIX}:#{container_type}:#{container_id}"
end
def with_redis(&blk)
......
......@@ -94,7 +94,7 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
context 'when tracking new rebalance' do
it 'returns as expired for non existent key' do
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:concurrent_running_rebalances_key))).to be < 0
expect(redis.ttl(Gitlab::Issues::Rebalancing::State::CONCURRENT_RUNNING_REBALANCES_KEY)).to be < 0
end
end
......@@ -102,7 +102,7 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
rebalance_caching.track_new_running_rebalance
::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:concurrent_running_rebalances_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
expect(redis.ttl(Gitlab::Issues::Rebalancing::State::CONCURRENT_RUNNING_REBALANCES_KEY)).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end
end
end
......@@ -169,7 +169,7 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
rebalance_caching.cleanup_cache
expect(check_existing_keys).to eq(0)
expect(check_existing_keys).to eq(1)
end
end
end
......@@ -212,11 +212,14 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
def check_existing_keys
index = 0
# spec only, we do not actually scan keys in the code
recently_finished_keys_count = Gitlab::Redis::SharedState.with { |redis| redis.scan(0, match: "#{described_class::RECENTLY_FINISHED_REBALANCE_PREFIX}:*") }.last.count
index += 1 if rebalance_caching.get_current_index > 0
index += 1 if rebalance_caching.get_current_project_id.present?
index += 1 if rebalance_caching.get_cached_issue_ids(0, 100).present?
index += 1 if rebalance_caching.rebalance_in_progress?
index += 1 if recently_finished_keys_count > 0
index
end
......
......@@ -2,7 +2,7 @@
require 'spec_helper'
RSpec.describe IssueRebalancingWorker do
RSpec.describe IssueRebalancingWorker, :clean_gitlab_redis_shared_state do
describe '#perform' do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) }
......@@ -35,6 +35,20 @@ RSpec.describe IssueRebalancingWorker do
described_class.new.perform # all arguments are nil
end
it 'does not schedule a new rebalance if it finished under 1h ago' do
container_type = arguments.second.present? ? ::Gitlab::Issues::Rebalancing::State::PROJECT : ::Gitlab::Issues::Rebalancing::State::NAMESPACE
container_id = arguments.second || arguments.third
Gitlab::Redis::SharedState.with do |redis|
redis.set(::Gitlab::Issues::Rebalancing::State.send(:recently_finished_key, container_type, container_id), true)
end
expect(Issues::RelativePositionRebalancingService).not_to receive(:new)
expect(Gitlab::ErrorTracking).not_to receive(:log_exception)
described_class.new.perform(*arguments)
end
end
shared_examples 'safely handles non-existent ids' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Issues::RescheduleStuckIssueRebalancesWorker, :clean_gitlab_redis_shared_state do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) }
subject(:worker) { described_class.new }
describe '#perform' do
it 'does not schedule a rebalance' do
expect(IssueRebalancingWorker).not_to receive(:perform_async)
worker.perform
end
it 'schedules a rebalance in case there are any rebalances started' do
Gitlab::Redis::SharedState.with do |redis|
redis.sadd(::Gitlab::Issues::Rebalancing::State::CONCURRENT_RUNNING_REBALANCES_KEY, "#{::Gitlab::Issues::Rebalancing::State::NAMESPACE}/#{group.id}")
redis.sadd(::Gitlab::Issues::Rebalancing::State::CONCURRENT_RUNNING_REBALANCES_KEY, "#{::Gitlab::Issues::Rebalancing::State::PROJECT}/#{project.id}")
end
expect(IssueRebalancingWorker).to receive(:bulk_perform_async).with([[nil, nil, group.id]]).once
expect(IssueRebalancingWorker).to receive(:bulk_perform_async).with([[nil, project.id, nil]]).once
worker.perform
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment