Commit d628b6b1 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'add-data-consistency-delay' into 'master'

Delay workers that are utilizing load balancing capabilities [RUN ALL RSPEC] [RUN AS-IF-FOSS]

See merge request gitlab-org/gitlab!61501
parents 120fdd93 f4bae9bc
...@@ -13,6 +13,7 @@ module ApplicationWorker ...@@ -13,6 +13,7 @@ module ApplicationWorker
include Gitlab::SidekiqVersioning::Worker include Gitlab::SidekiqVersioning::Worker
LOGGING_EXTRA_KEY = 'extra' LOGGING_EXTRA_KEY = 'extra'
DEFAULT_DELAY_INTERVAL = 1
included do included do
set_queue set_queue
...@@ -51,6 +52,16 @@ module ApplicationWorker ...@@ -51,6 +52,16 @@ module ApplicationWorker
subclass.after_set_class_attribute { subclass.set_queue } subclass.after_set_class_attribute { subclass.set_queue }
end end
def perform_async(*args)
# Worker execution for workers with data_consistency set to :delayed or :sticky
# will be delayed to give replication enough time to complete
if utilizes_load_balancing_capabilities? && data_consistency_delayed_execution_feature_flag_enabled?
perform_in(delay_interval, *args)
else
super
end
end
def set_queue def set_queue
queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self) queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self)
sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
...@@ -111,5 +122,15 @@ module ApplicationWorker ...@@ -111,5 +122,15 @@ module ApplicationWorker
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule) Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
end end
end end
protected
def data_consistency_delayed_execution_feature_flag_enabled?
Feature.enabled?(:data_consistency_delayed_execution, default_enabled: :yaml)
end
def delay_interval
DEFAULT_DELAY_INTERVAL.seconds
end
end end
end end
...@@ -71,6 +71,20 @@ module WorkerAttributes ...@@ -71,6 +71,20 @@ module WorkerAttributes
class_attributes[:urgency] || :low class_attributes[:urgency] || :low
end end
# Allows configuring worker's data_consistency.
#
# Worker can utilize Sidekiq readonly database replicas capabilities by setting data_consistency attribute.
# Workers with data_consistency set to :delayed or :sticky, calling #perform_async
# will be delayed in order to give replication process enough time to complete.
#
# - *data_consistency* values:
# - 'always' - The job is required to use the primary database (default).
# - 'sticky' - The uses a replica as long as possible. It switches to primary either on write or long replication lag.
# - 'delayed' - The job would switch to primary only on write. It would use replica always.
# If there's a long replication lag the job will be delayed, and only if the replica is not up to date on the next retry,
# it will switch to the primary.
# - *feature_flag* - allows you to toggle a job's `data_consistency, which permits you to safely toggle load balancing capabilities for a specific job.
# If disabled, job will default to `:always`, which means that the job will always use the primary.
def data_consistency(data_consistency, feature_flag: nil) def data_consistency(data_consistency, feature_flag: nil)
raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency) raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency)
raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency] raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency]
...@@ -85,11 +99,16 @@ module WorkerAttributes ...@@ -85,11 +99,16 @@ module WorkerAttributes
# Since the deduplication should always take into account the latest binary replication pointer into account, # Since the deduplication should always take into account the latest binary replication pointer into account,
# not the first one, the deduplication will not work with sticky or delayed. # not the first one, the deduplication will not work with sticky or delayed.
# Follow up issue to improve this: https://gitlab.com/gitlab-org/gitlab/-/issues/325291 # Follow up issue to improve this: https://gitlab.com/gitlab-org/gitlab/-/issues/325291
if idempotent? && get_data_consistency != :always if idempotent? && utilizes_load_balancing_capabilities?
raise ArgumentError, "Class can't be marked as idempotent if data_consistency is not set to :always" raise ArgumentError, "Class can't be marked as idempotent if data_consistency is not set to :always"
end end
end end
# If data_consistency is not set to :always, worker will try to utilize load balancing capabilities and use the replica
def utilizes_load_balancing_capabilities?
get_data_consistency != :always
end
def get_data_consistency def get_data_consistency
class_attributes[:data_consistency] || :always class_attributes[:data_consistency] || :always
end end
......
---
name: data_consistency_delayed_execution
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/61501
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/331110
milestone: '14.0'
type: development
group: group::memory
default_enabled: false
...@@ -24,7 +24,7 @@ module Gitlab ...@@ -24,7 +24,7 @@ module Gitlab
job['worker_data_consistency'] = worker_class.get_data_consistency job['worker_data_consistency'] = worker_class.get_data_consistency
return if worker_class.get_data_consistency == :always return unless worker_class.utilizes_load_balancing_capabilities?
if Session.current.use_primary? if Session.current.use_primary?
job['database_write_location'] = load_balancer.primary_write_location job['database_write_location'] = load_balancer.primary_write_location
......
...@@ -25,7 +25,7 @@ module Gitlab ...@@ -25,7 +25,7 @@ module Gitlab
def requires_primary?(worker_class, job) def requires_primary?(worker_class, job)
return true unless worker_class.include?(::ApplicationWorker) return true unless worker_class.include?(::ApplicationWorker)
return true if worker_class.get_data_consistency == :always return true unless worker_class.utilizes_load_balancing_capabilities?
return true unless worker_class.get_data_consistency_feature_flag_enabled? return true unless worker_class.get_data_consistency_feature_flag_enabled?
location = job['database_write_location'] || job['database_replica_location'] location = job['database_write_location'] || job['database_replica_location']
......
...@@ -29,8 +29,10 @@ RSpec.describe BuildHooksWorker do ...@@ -29,8 +29,10 @@ RSpec.describe BuildHooksWorker do
stub_feature_flags(delayed_perform_for_build_hooks_worker: false) stub_feature_flags(delayed_perform_for_build_hooks_worker: false)
end end
it 'does not call perform_in' do it 'delays scheduling a job by calling perform_in with default delay' do
expect(described_class).not_to receive(:perform_in) expect(described_class).to receive(:perform_in).with(ApplicationWorker::DEFAULT_DELAY_INTERVAL.second, 123)
described_class.perform_async(123)
end end
end end
......
...@@ -176,6 +176,58 @@ RSpec.describe ApplicationWorker do ...@@ -176,6 +176,58 @@ RSpec.describe ApplicationWorker do
end end
end end
describe '.perform_async' do
shared_examples_for 'worker utilizes load balancing capabilities' do |data_consistency|
before do
worker.data_consistency(data_consistency)
end
context 'when data_consistency_delayed_execution feature flag is disabled' do
before do
stub_feature_flags(data_consistency_delayed_execution: false)
end
it 'data_consistency_delayed_execution_feature_flag_enabled? should return false' do
expect(worker).to receive(:data_consistency_delayed_execution_feature_flag_enabled?).and_return(false)
worker.perform_async
end
it 'does not call perform_in' do
expect(worker).not_to receive(:perform_in)
worker.perform_async
end
end
it 'call perform_in' do
expect(worker).to receive(:perform_in).with(described_class::DEFAULT_DELAY_INTERVAL.seconds, 123)
worker.perform_async(123)
end
end
context 'when workers data consistency is :sticky' do
it_behaves_like 'worker utilizes load balancing capabilities', :sticky
end
context 'when workers data consistency is :delayed' do
it_behaves_like 'worker utilizes load balancing capabilities', :delayed
end
context 'when workers data consistency is :always' do
before do
worker.data_consistency(:always)
end
it 'does not call perform_in' do
expect(worker).not_to receive(:perform_in)
worker.perform_async
end
end
end
describe '.bulk_perform_async' do describe '.bulk_perform_async' do
it 'enqueues jobs in bulk' do it 'enqueues jobs in bulk' do
Sidekiq::Testing.fake! do Sidekiq::Testing.fake! do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment