Commit 5b5c8c9d authored by nmilojevic1's avatar nmilojevic1

Delay worker execution for data consistency workers

- Fix specs for applicaiton worker
- Add description for data consistency
parent faee533c
...@@ -13,6 +13,7 @@ module ApplicationWorker ...@@ -13,6 +13,7 @@ module ApplicationWorker
include Gitlab::SidekiqVersioning::Worker include Gitlab::SidekiqVersioning::Worker
LOGGING_EXTRA_KEY = 'extra' LOGGING_EXTRA_KEY = 'extra'
DEFAULT_DELAY_INTERVAL = 1
included do included do
set_queue set_queue
...@@ -51,6 +52,16 @@ module ApplicationWorker ...@@ -51,6 +52,16 @@ module ApplicationWorker
subclass.after_set_class_attribute { subclass.set_queue } subclass.after_set_class_attribute { subclass.set_queue }
end end
def perform_async(*args)
# Worker execution for workers with data_consistency set to :delayed or :sticky
# will be delayed to give replication enough time to complete
if delayed_execution? && data_consistency_delayed_execution_feature_flag_enabled?
perform_in(delay_interval, *args)
else
super
end
end
def set_queue def set_queue
queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self) queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self)
sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
...@@ -111,5 +122,21 @@ module ApplicationWorker ...@@ -111,5 +122,21 @@ module ApplicationWorker
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule) Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
end end
end end
protected
def data_consistency_delayed_execution_feature_flag_enabled?
Feature.enabled?(:data_consistency_delayed_execution, default_enabled: :yaml)
end
def delay_interval
DEFAULT_DELAY_INTERVAL.seconds
end
private
def delayed_execution?
get_data_consistency != :always
end
end end
end end
...@@ -71,6 +71,20 @@ module WorkerAttributes ...@@ -71,6 +71,20 @@ module WorkerAttributes
class_attributes[:urgency] || :low class_attributes[:urgency] || :low
end end
# Allows configuring worker's data_consistency.
#
# Worker can utilize Sidekiq readonly database replicas capabilities by setting data_consistency attribute.
# Workers with data_consistency set to :delayed or :sticky, calling #perform_async
# will be delayed in order to give replication process enough time to complete.
#
# - *data_consistency* values:
# - 'always' - The job is required to use the primary database (default).
# - 'sticky' - The uses a replica as long as possible. It switches to primary either on write or long replication lag.
# - 'delayed' - The job would switch to primary only on write. It would use replica always.
# If there's a long replication lag the job will be delayed, and only if the replica is not up to date on the next retry,
# it will switch to the primary.
# - *feature_flag* - allows you to toggle a job's `data_consistency, which permits you to safely toggle load balancing capabilities for a specific job.
# If disabled, job will default to `:always`, which means that the job will always use the primary.
def data_consistency(data_consistency, feature_flag: nil) def data_consistency(data_consistency, feature_flag: nil)
raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency) raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency)
raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency] raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency]
......
---
name: data_consistency_delayed_execution
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/61501
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/331110
milestone: '13.12'
type: development
group: group::memory
default_enabled: false
...@@ -29,8 +29,10 @@ RSpec.describe BuildHooksWorker do ...@@ -29,8 +29,10 @@ RSpec.describe BuildHooksWorker do
stub_feature_flags(delayed_perform_for_build_hooks_worker: false) stub_feature_flags(delayed_perform_for_build_hooks_worker: false)
end end
it 'does not call perform_in' do it 'delays scheduling a job by calling perform_in with default delay' do
expect(described_class).not_to receive(:perform_in) expect(described_class).to receive(:perform_in).with(ApplicationWorker::DEFAULT_DELAY_INTERVAL.second, 123)
described_class.perform_async(123)
end end
end end
......
...@@ -176,6 +176,62 @@ RSpec.describe ApplicationWorker do ...@@ -176,6 +176,62 @@ RSpec.describe ApplicationWorker do
end end
end end
describe '.perform_async' do
context 'when workers data consistency is not :always' do
before do
worker.data_consistency(:sticky)
end
context 'when data_consistency_delayed_execution feature flag is disabled' do
before do
stub_feature_flags(data_consistency_delayed_execution: false)
end
it 'data_consistency_delayed_execution_feature_flag_enabled? should return false' do
expect(worker).to receive(:data_consistency_delayed_execution_feature_flag_enabled?).and_return(false)
worker.perform_async
end
it 'does not call perform_in' do
expect(worker).not_to receive(:perform_in)
worker.perform_async
end
end
it 'delayed_execution? return true' do
expect(worker).to receive(:delayed_execution?).and_return(true)
worker.perform_async
end
it 'call perform_in' do
expect(worker).to receive(:perform_in).with(described_class::DEFAULT_DELAY_INTERVAL.seconds, 123)
worker.perform_async(123)
end
end
context 'when workers data consistency is :always' do
before do
worker.data_consistency(:always)
end
it 'does not call perform_in' do
expect(worker).not_to receive(:perform_in)
worker.perform_async
end
it 'delayed_execution? return false' do
expect(worker).to receive(:delayed_execution?).and_return(false)
worker.perform_async
end
end
end
describe '.bulk_perform_async' do describe '.bulk_perform_async' do
it 'enqueues jobs in bulk' do it 'enqueues jobs in bulk' do
Sidekiq::Testing.fake! do Sidekiq::Testing.fake! do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment