Commit 02464df7 authored by Heinrich Lee Yu's avatar Heinrich Lee Yu Committed by Igor Drozdov

Add atomic sidekiq scheduler

This retrieves jobs from Sidekiq scheduled sets using a Lua script. This
allows multiple processes to process the queue efficiently.
parent 6523ff70
---
name: atomic_sidekiq_scheduler
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/72380
rollout_issue_url:
milestone: '14.5'
type: development
group: group::project management
default_enabled: false
......@@ -84,9 +84,12 @@ module Gitlab
if puma? && Puma.respond_to?(:cli_config)
threads += Puma.cli_config.options[:max_threads]
elsif sidekiq?
# An extra thread for the poller in Sidekiq Cron:
# 2 extra threads for the pollers in Sidekiq and Sidekiq Cron:
# https://github.com/ondrejbartas/sidekiq-cron#under-the-hood
threads += Sidekiq.options[:concurrency] + 1
#
# These threads execute Sidekiq client middleware when jobs
# are enqueued and those can access DB / Redis.
threads += Sidekiq.options[:concurrency] + 2
end
if action_cable?
......
# frozen_string_literal: true
# This is a copy of https://github.com/mperham/sidekiq/blob/32c55e31659a1e6bd42f98334cca5eef2863de8d/lib/sidekiq/scheduled.rb#L11-L34
#
# It effectively reverts
# https://github.com/mperham/sidekiq/commit/9b75467b33759888753191413eddbc15c37a219e
# because we observe that the extra ZREMs caused by this change can lead to high
# CPU usage on Redis at peak times:
# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1179
#
module Gitlab
class SidekiqEnq
LUA_ZPOPBYSCORE = <<~EOS
local key, now = KEYS[1], ARGV[1]
local jobs = redis.call("zrangebyscore", key, "-inf", now, "limit", 0, 1)
if jobs[1] then
redis.call("zrem", key, jobs[1])
return jobs[1]
end
EOS
LUA_ZPOPBYSCORE_SHA = Digest::SHA1.hexdigest(LUA_ZPOPBYSCORE)
def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = Sidekiq::Scheduled::SETS)
if Feature.enabled?(:atomic_sidekiq_scheduler, default_enabled: :yaml)
atomic_find_jobs_and_enqueue(now, sorted_sets)
else
find_jobs_and_enqueue(now, sorted_sets)
end
end
private
# This is a copy of https://github.com/mperham/sidekiq/blob/32c55e31659a1e6bd42f98334cca5eef2863de8d/lib/sidekiq/scheduled.rb#L11-L34
#
# It effectively reverts
# https://github.com/mperham/sidekiq/commit/9b75467b33759888753191413eddbc15c37a219e
# because we observe that the extra ZREMs caused by this change can lead to high
# CPU usage on Redis at peak times:
# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1179
#
def find_jobs_and_enqueue(now, sorted_sets)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
......@@ -24,8 +45,7 @@ module Gitlab
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while (job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first)
while job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
......@@ -47,5 +67,38 @@ module Gitlab
end
end
end
def atomic_find_jobs_and_enqueue(now, sorted_sets)
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
start_time = ::Gitlab::Metrics::System.monotonic_time
jobs = 0
Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs', status: 'start', sorted_set: sorted_set)
while job = redis_eval_lua(conn, LUA_ZPOPBYSCORE, LUA_ZPOPBYSCORE_SHA, keys: [sorted_set], argv: [now])
jobs += 1
Sidekiq::Client.push(Sidekiq.load_json(job))
end
end_time = ::Gitlab::Metrics::System.monotonic_time
Sidekiq.logger.info(message: 'Atomically enqueuing scheduled jobs',
status: 'done',
sorted_set: sorted_set,
jobs_count: jobs,
duration_s: end_time - start_time)
end
end
end
def redis_eval_lua(conn, script, sha, keys: nil, argv: nil)
conn.evalsha(sha, keys: keys, argv: argv)
rescue ::Redis::CommandError => e
if e.message.start_with?('NOSCRIPT')
conn.eval(script, keys: keys, argv: argv)
else
raise
end
end
end
end
......@@ -108,7 +108,7 @@ RSpec.describe Gitlab::Runtime do
allow(sidekiq_type).to receive(:options).and_return(concurrency: 2)
end
it_behaves_like "valid runtime", :sidekiq, 4
it_behaves_like "valid runtime", :sidekiq, 5
end
context "console" do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqEnq, :clean_gitlab_redis_queues do
let(:retry_set) { Sidekiq::Scheduled::SETS.first }
let(:schedule_set) { Sidekiq::Scheduled::SETS.last }
around do |example|
freeze_time { example.run }
end
shared_examples 'finds jobs that are due and enqueues them' do
before do
Sidekiq.redis do |redis|
redis.zadd(retry_set, (Time.current - 1.day).to_f.to_s, '{"jid": 1}')
redis.zadd(retry_set, Time.current.to_f.to_s, '{"jid": 2}')
redis.zadd(retry_set, (Time.current + 1.day).to_f.to_s, '{"jid": 3}')
redis.zadd(schedule_set, (Time.current - 1.day).to_f.to_s, '{"jid": 4}')
redis.zadd(schedule_set, Time.current.to_f.to_s, '{"jid": 5}')
redis.zadd(schedule_set, (Time.current + 1.day).to_f.to_s, '{"jid": 6}')
end
end
it 'enqueues jobs that are due' do
expect(Sidekiq::Client).to receive(:push).with({ 'jid' => 1 })
expect(Sidekiq::Client).to receive(:push).with({ 'jid' => 2 })
expect(Sidekiq::Client).to receive(:push).with({ 'jid' => 4 })
expect(Sidekiq::Client).to receive(:push).with({ 'jid' => 5 })
Gitlab::SidekiqEnq.new.enqueue_jobs
Sidekiq.redis do |redis|
expect(redis.zscan_each(retry_set).map(&:first)).to contain_exactly('{"jid": 3}')
expect(redis.zscan_each(schedule_set).map(&:first)).to contain_exactly('{"jid": 6}')
end
end
end
context 'when atomic_sidekiq_scheduler is disabled' do
before do
stub_feature_flags(atomic_sidekiq_scheduler: false)
end
it_behaves_like 'finds jobs that are due and enqueues them'
context 'when ZRANGEBYSCORE returns a job that is already removed by another process' do
before do
Sidekiq.redis do |redis|
redis.zadd(schedule_set, Time.current.to_f.to_s, '{"jid": 1}')
allow(redis).to receive(:zrangebyscore).and_wrap_original do |m, *args, **kwargs|
m.call(*args, **kwargs).tap do |jobs|
redis.zrem(schedule_set, jobs.first) if args[0] == schedule_set && jobs.first
end
end
end
end
it 'calls ZREM but does not enqueue the job' do
Sidekiq.redis do |redis|
expect(redis).to receive(:zrem).with(schedule_set, '{"jid": 1}').twice.and_call_original
end
expect(Sidekiq::Client).not_to receive(:push)
Gitlab::SidekiqEnq.new.enqueue_jobs
end
end
end
context 'when atomic_sidekiq_scheduler is enabled' do
before do
stub_feature_flags(atomic_sidekiq_scheduler: true)
end
context 'when Lua script is not yet loaded' do
before do
Gitlab::Redis::Queues.with { |redis| redis.script(:flush) }
end
it_behaves_like 'finds jobs that are due and enqueues them'
end
context 'when Lua script is already loaded' do
before do
Gitlab::SidekiqEnq.new.enqueue_jobs
end
it_behaves_like 'finds jobs that are due and enqueues them'
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment