Commit 5e99c299 authored by Bob Van Landuyt's avatar Bob Van Landuyt

Merge branch '343366_cron_job_to_reschedule_issue_rebalance_jobs' into 'master'

Reschedule issue rebalance jobs in case those get stuck

See merge request gitlab-org/gitlab!72812
parents 7154f335 131553bc
...@@ -354,6 +354,15 @@ ...@@ -354,6 +354,15 @@
:weight: 1 :weight: 1
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: cronjob:issues_reschedule_stuck_issue_rebalances
:worker_name: Issues::RescheduleStuckIssueRebalancesWorker
:feature_category: :team_planning
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:jira_import_stuck_jira_import_jobs - :name: cronjob:jira_import_stuck_jira_import_jobs
:worker_name: Gitlab::JiraImport::StuckJiraImportJobsWorker :worker_name: Gitlab::JiraImport::StuckJiraImportJobsWorker
:feature_category: :importers :feature_category: :importers
......
...@@ -19,6 +19,7 @@ class IssueRebalancingWorker ...@@ -19,6 +19,7 @@ class IssueRebalancingWorker
# we need to have exactly one of the project_id and root_namespace_id params be non-nil # we need to have exactly one of the project_id and root_namespace_id params be non-nil
raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id
return if project_id.nil? && root_namespace_id.nil? return if project_id.nil? && root_namespace_id.nil?
return if ::Gitlab::Issues::Rebalancing::State.rebalance_recently_finished?(project_id, root_namespace_id)
# pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce) # pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce)
# or the root namespace, this also makes the worker backward compatible with previous version where a project_id was # or the root namespace, this also makes the worker backward compatible with previous version where a project_id was
......
# frozen_string_literal: true
module Issues
class RescheduleStuckIssueRebalancesWorker
include ApplicationWorker
include CronjobQueue
data_consistency :sticky
idempotent!
urgency :low
feature_category :team_planning
deduplicate :until_executed, including_scheduled: true
def perform
namespace_ids, project_ids = ::Gitlab::Issues::Rebalancing::State.fetch_rebalancing_groups_and_projects
return if namespace_ids.blank? && project_ids.blank?
namespaces = Namespace.id_in(namespace_ids)
projects = Project.id_in(project_ids)
IssueRebalancingWorker.bulk_perform_async_with_contexts(
namespaces,
arguments_proc: -> (namespace) { [nil, nil, namespace.id] },
context_proc: -> (namespace) { { namespace: namespace } }
)
IssueRebalancingWorker.bulk_perform_async_with_contexts(
projects,
arguments_proc: -> (project) { [nil, project.id, nil] },
context_proc: -> (project) { { project: project } }
)
end
end
end
...@@ -588,6 +588,9 @@ Settings.cron_jobs['ci_delete_unit_tests_worker']['job_class'] = 'Ci::DeleteUnit ...@@ -588,6 +588,9 @@ Settings.cron_jobs['ci_delete_unit_tests_worker']['job_class'] = 'Ci::DeleteUnit
Settings.cron_jobs['batched_background_migrations_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['batched_background_migrations_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['batched_background_migrations_worker']['cron'] ||= '* * * * *' Settings.cron_jobs['batched_background_migrations_worker']['cron'] ||= '* * * * *'
Settings.cron_jobs['batched_background_migrations_worker']['job_class'] = 'Database::BatchedBackgroundMigrationWorker' Settings.cron_jobs['batched_background_migrations_worker']['job_class'] = 'Database::BatchedBackgroundMigrationWorker'
Settings.cron_jobs['issues_reschedule_stuck_issue_rebalances'] ||= Settingslogic.new({})
Settings.cron_jobs['issues_reschedule_stuck_issue_rebalances']['cron'] ||= '* 0/15 * * *'
Settings.cron_jobs['issues_reschedule_stuck_issue_rebalances']['job_class'] = 'Issues::RescheduleStuckIssueRebalancesWorker'
Gitlab.ee do Gitlab.ee do
Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker'] ||= Settingslogic.new({})
......
...@@ -4,6 +4,10 @@ module Gitlab ...@@ -4,6 +4,10 @@ module Gitlab
module Issues module Issues
module Rebalancing module Rebalancing
class State class State
REDIS_KEY_PREFIX = "gitlab:issues-position-rebalances"
CONCURRENT_RUNNING_REBALANCES_KEY = "#{REDIS_KEY_PREFIX}:running_rebalances"
RECENTLY_FINISHED_REBALANCE_PREFIX = "#{REDIS_KEY_PREFIX}:recently_finished"
REDIS_EXPIRY_TIME = 10.days REDIS_EXPIRY_TIME = 10.days
MAX_NUMBER_OF_CONCURRENT_REBALANCES = 5 MAX_NUMBER_OF_CONCURRENT_REBALANCES = 5
NAMESPACE = 1 NAMESPACE = 1
...@@ -21,25 +25,23 @@ module Gitlab ...@@ -21,25 +25,23 @@ module Gitlab
redis.multi do |multi| redis.multi do |multi|
# we trigger re-balance for namespaces(groups) or specific user project # we trigger re-balance for namespaces(groups) or specific user project
value = "#{rebalanced_container_type}/#{rebalanced_container_id}" value = "#{rebalanced_container_type}/#{rebalanced_container_id}"
multi.sadd(concurrent_running_rebalances_key, value) multi.sadd(CONCURRENT_RUNNING_REBALANCES_KEY, value)
multi.expire(concurrent_running_rebalances_key, REDIS_EXPIRY_TIME) multi.expire(CONCURRENT_RUNNING_REBALANCES_KEY, REDIS_EXPIRY_TIME)
end end
end end
end end
def concurrent_running_rebalances_count def concurrent_running_rebalances_count
with_redis { |redis| redis.scard(concurrent_running_rebalances_key).to_i } with_redis { |redis| redis.scard(CONCURRENT_RUNNING_REBALANCES_KEY).to_i }
end end
def rebalance_in_progress? def rebalance_in_progress?
all_rebalanced_containers = with_redis { |redis| redis.smembers(concurrent_running_rebalances_key) }
is_running = case rebalanced_container_type is_running = case rebalanced_container_type
when NAMESPACE when NAMESPACE
namespace_ids = all_rebalanced_containers.map {|string| string.split("#{NAMESPACE}/").second.to_i }.compact namespace_ids = self.class.current_rebalancing_containers.map {|string| string.split("#{NAMESPACE}/").second.to_i }.compact
namespace_ids.include?(root_namespace.id) namespace_ids.include?(root_namespace.id)
when PROJECT when PROJECT
project_ids = all_rebalanced_containers.map {|string| string.split("#{PROJECT}/").second.to_i }.compact project_ids = self.class.current_rebalancing_containers.map {|string| string.split("#{PROJECT}/").second.to_i }.compact
project_ids.include?(projects.take.id) # rubocop:disable CodeReuse/ActiveRecord project_ids.include?(projects.take.id) # rubocop:disable CodeReuse/ActiveRecord
else else
false false
...@@ -101,36 +103,63 @@ module Gitlab ...@@ -101,36 +103,63 @@ module Gitlab
multi.expire(issue_ids_key, REDIS_EXPIRY_TIME) multi.expire(issue_ids_key, REDIS_EXPIRY_TIME)
multi.expire(current_index_key, REDIS_EXPIRY_TIME) multi.expire(current_index_key, REDIS_EXPIRY_TIME)
multi.expire(current_project_key, REDIS_EXPIRY_TIME) multi.expire(current_project_key, REDIS_EXPIRY_TIME)
multi.expire(concurrent_running_rebalances_key, REDIS_EXPIRY_TIME) multi.expire(CONCURRENT_RUNNING_REBALANCES_KEY, REDIS_EXPIRY_TIME)
end end
end end
end end
def cleanup_cache def cleanup_cache
value = "#{rebalanced_container_type}/#{rebalanced_container_id}"
with_redis do |redis| with_redis do |redis|
redis.multi do |multi| redis.multi do |multi|
multi.del(issue_ids_key) multi.del(issue_ids_key)
multi.del(current_index_key) multi.del(current_index_key)
multi.del(current_project_key) multi.del(current_project_key)
multi.srem(concurrent_running_rebalances_key, "#{rebalanced_container_type}/#{rebalanced_container_id}") multi.srem(CONCURRENT_RUNNING_REBALANCES_KEY, value)
multi.set(self.class.recently_finished_key(rebalanced_container_type, rebalanced_container_id), true, ex: 1.hour)
end end
end end
end end
def self.rebalance_recently_finished?(project_id, namespace_id)
container_id = project_id || namespace_id
container_type = project_id.present? ? PROJECT : NAMESPACE
Gitlab::Redis::SharedState.with { |redis| redis.get(recently_finished_key(container_type, container_id)) }
end
def self.fetch_rebalancing_groups_and_projects
namespace_ids = []
project_ids = []
current_rebalancing_containers.each do |string|
container_type, container_id = string.split('/', 2).map(&:to_i)
if container_type == NAMESPACE
namespace_ids << container_id
elsif container_type == PROJECT
project_ids << container_id
end
end
[namespace_ids, project_ids]
end
private private
def self.current_rebalancing_containers
Gitlab::Redis::SharedState.with { |redis| redis.smembers(CONCURRENT_RUNNING_REBALANCES_KEY) }
end
attr_accessor :root_namespace, :projects, :rebalanced_container_type, :rebalanced_container_id attr_accessor :root_namespace, :projects, :rebalanced_container_type, :rebalanced_container_id
def too_many_rebalances_running? def too_many_rebalances_running?
concurrent_running_rebalances_count <= MAX_NUMBER_OF_CONCURRENT_REBALANCES concurrent_running_rebalances_count <= MAX_NUMBER_OF_CONCURRENT_REBALANCES
end end
def redis_key_prefix
"gitlab:issues-position-rebalances"
end
def issue_ids_key def issue_ids_key
"#{redis_key_prefix}:#{root_namespace.id}" "#{REDIS_KEY_PREFIX}:#{root_namespace.id}"
end end
def current_index_key def current_index_key
...@@ -141,8 +170,8 @@ module Gitlab ...@@ -141,8 +170,8 @@ module Gitlab
"#{issue_ids_key}:current_project_id" "#{issue_ids_key}:current_project_id"
end end
def concurrent_running_rebalances_key def self.recently_finished_key(container_type, container_id)
"#{redis_key_prefix}:running_rebalances" "#{RECENTLY_FINISHED_REBALANCE_PREFIX}:#{container_type}:#{container_id}"
end end
def with_redis(&blk) def with_redis(&blk)
......
...@@ -94,7 +94,7 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st ...@@ -94,7 +94,7 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
context 'when tracking new rebalance' do context 'when tracking new rebalance' do
it 'returns as expired for non existent key' do it 'returns as expired for non existent key' do
::Gitlab::Redis::SharedState.with do |redis| ::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:concurrent_running_rebalances_key))).to be < 0 expect(redis.ttl(Gitlab::Issues::Rebalancing::State::CONCURRENT_RUNNING_REBALANCES_KEY)).to be < 0
end end
end end
...@@ -102,7 +102,7 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st ...@@ -102,7 +102,7 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
rebalance_caching.track_new_running_rebalance rebalance_caching.track_new_running_rebalance
::Gitlab::Redis::SharedState.with do |redis| ::Gitlab::Redis::SharedState.with do |redis|
expect(redis.ttl(rebalance_caching.send(:concurrent_running_rebalances_key))).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i) expect(redis.ttl(Gitlab::Issues::Rebalancing::State::CONCURRENT_RUNNING_REBALANCES_KEY)).to be_between(0, described_class::REDIS_EXPIRY_TIME.ago.to_i)
end end
end end
end end
...@@ -169,7 +169,7 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st ...@@ -169,7 +169,7 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
rebalance_caching.cleanup_cache rebalance_caching.cleanup_cache
expect(check_existing_keys).to eq(0) expect(check_existing_keys).to eq(1)
end end
end end
end end
...@@ -183,6 +183,16 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st ...@@ -183,6 +183,16 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
it { expect(rebalance_caching.send(:rebalanced_container_type)).to eq(described_class::NAMESPACE) } it { expect(rebalance_caching.send(:rebalanced_container_type)).to eq(described_class::NAMESPACE) }
it_behaves_like 'issues rebalance caching' it_behaves_like 'issues rebalance caching'
describe '.fetch_rebalancing_groups_and_projects' do
before do
rebalance_caching.track_new_running_rebalance
end
it 'caches recently finished rebalance key' do
expect(described_class.fetch_rebalancing_groups_and_projects).to eq([[group.id], []])
end
end
end end
context 'rebalancing issues in a project' do context 'rebalancing issues in a project' do
...@@ -193,6 +203,16 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st ...@@ -193,6 +203,16 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
it { expect(rebalance_caching.send(:rebalanced_container_type)).to eq(described_class::PROJECT) } it { expect(rebalance_caching.send(:rebalanced_container_type)).to eq(described_class::PROJECT) }
it_behaves_like 'issues rebalance caching' it_behaves_like 'issues rebalance caching'
describe '.fetch_rebalancing_groups_and_projects' do
before do
rebalance_caching.track_new_running_rebalance
end
it 'caches recently finished rebalance key' do
expect(described_class.fetch_rebalancing_groups_and_projects).to eq([[], [project.id]])
end
end
end end
# count - how many issue ids to generate, issue ids will start at 1 # count - how many issue ids to generate, issue ids will start at 1
...@@ -212,11 +232,14 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st ...@@ -212,11 +232,14 @@ RSpec.describe Gitlab::Issues::Rebalancing::State, :clean_gitlab_redis_shared_st
def check_existing_keys def check_existing_keys
index = 0 index = 0
# spec only, we do not actually scan keys in the code
recently_finished_keys_count = Gitlab::Redis::SharedState.with { |redis| redis.scan(0, match: "#{described_class::RECENTLY_FINISHED_REBALANCE_PREFIX}:*") }.last.count
index += 1 if rebalance_caching.get_current_index > 0 index += 1 if rebalance_caching.get_current_index > 0
index += 1 if rebalance_caching.get_current_project_id.present? index += 1 if rebalance_caching.get_current_project_id.present?
index += 1 if rebalance_caching.get_cached_issue_ids(0, 100).present? index += 1 if rebalance_caching.get_cached_issue_ids(0, 100).present?
index += 1 if rebalance_caching.rebalance_in_progress? index += 1 if rebalance_caching.rebalance_in_progress?
index += 1 if recently_finished_keys_count > 0
index index
end end
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe IssueRebalancingWorker do RSpec.describe IssueRebalancingWorker, :clean_gitlab_redis_shared_state do
describe '#perform' do describe '#perform' do
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) } let_it_be(:project) { create(:project, group: group) }
...@@ -35,6 +35,20 @@ RSpec.describe IssueRebalancingWorker do ...@@ -35,6 +35,20 @@ RSpec.describe IssueRebalancingWorker do
described_class.new.perform # all arguments are nil described_class.new.perform # all arguments are nil
end end
it 'does not schedule a new rebalance if it finished under 1h ago' do
container_type = arguments.second.present? ? ::Gitlab::Issues::Rebalancing::State::PROJECT : ::Gitlab::Issues::Rebalancing::State::NAMESPACE
container_id = arguments.second || arguments.third
Gitlab::Redis::SharedState.with do |redis|
redis.set(::Gitlab::Issues::Rebalancing::State.send(:recently_finished_key, container_type, container_id), true)
end
expect(Issues::RelativePositionRebalancingService).not_to receive(:new)
expect(Gitlab::ErrorTracking).not_to receive(:log_exception)
described_class.new.perform(*arguments)
end
end end
shared_examples 'safely handles non-existent ids' do shared_examples 'safely handles non-existent ids' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Issues::RescheduleStuckIssueRebalancesWorker, :clean_gitlab_redis_shared_state do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) }
subject(:worker) { described_class.new }
describe '#perform' do
it 'does not schedule a rebalance' do
expect(IssueRebalancingWorker).not_to receive(:perform_async)
worker.perform
end
it 'schedules a rebalance in case there are any rebalances started' do
expect(::Gitlab::Issues::Rebalancing::State).to receive(:fetch_rebalancing_groups_and_projects).and_return([[group.id], [project.id]])
expect(IssueRebalancingWorker).to receive(:bulk_perform_async).with([[nil, nil, group.id]]).once
expect(IssueRebalancingWorker).to receive(:bulk_perform_async).with([[nil, project.id, nil]]).once
worker.perform
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment