Commit afadb20f authored by Sean McGivern's avatar Sean McGivern

Add ApplicationWorker.with_status for checking job status

Add `ApplicationWorker.with_status`, which sets the `status_expiration`
field on a Sidekiq job. In a future change, this will be required to
check the job's status with Gitlab::SidekiqStatus.

If the worker class defines its own `status_expiration`, then we use
that; otherwise we fall back to
`Gitlab::SidekiqStatus::DEFAULT_EXPIRATION`.

This commit also changes several cases where we assign to a `job_id` or
`jid` variable to the result of a Sidekiq worker's `perform_async`
method. There are some cases where we don't need that, of course: if we
don't actually use the job's ID to check its status then `with_status`
is unnecessary.

For instance, many classes in Gitlab::Geo::LogCursor::Events log a job
ID but don't persist it elsewhere to check the status; the pipeline
schedules API uses the job ID to check if a worker was scheduled or not.
parent 7b03b44b
......@@ -98,7 +98,7 @@ class Admin::ApplicationSettingsController < Admin::ApplicationController
# Specs are in spec/requests/self_monitoring_project_spec.rb
def create_self_monitoring_project
job_id = SelfMonitoringProjectCreateWorker.perform_async # rubocop:disable CodeReuse/Worker
job_id = SelfMonitoringProjectCreateWorker.with_status.perform_async # rubocop:disable CodeReuse/Worker
render status: :accepted, json: {
job_id: job_id,
......@@ -137,7 +137,7 @@ class Admin::ApplicationSettingsController < Admin::ApplicationController
# Specs are in spec/requests/self_monitoring_project_spec.rb
def delete_self_monitoring_project
job_id = SelfMonitoringProjectDeleteWorker.perform_async # rubocop:disable CodeReuse/Worker
job_id = SelfMonitoringProjectDeleteWorker.with_status.perform_async # rubocop:disable CodeReuse/Worker
render status: :accepted, json: {
job_id: job_id,
......
......@@ -662,7 +662,7 @@ class MergeRequest < ApplicationRecord
# updates `merge_jid` with the MergeWorker#jid.
# This helps tracking enqueued and ongoing merge jobs.
def merge_async(user_id, params)
jid = MergeWorker.perform_async(id, user_id, params.to_h)
jid = MergeWorker.with_status.perform_async(id, user_id, params.to_h)
update_column(:merge_jid, jid)
# merge_ongoing? depends on merge_jid
......@@ -681,7 +681,7 @@ class MergeRequest < ApplicationRecord
# attribute is set *and* that the sidekiq job is still running. So a JID
# for a completed RebaseWorker is equivalent to a nil JID.
jid = Sidekiq::Worker.skipping_transaction_check do
RebaseWorker.perform_async(id, user_id, skip_ci)
RebaseWorker.with_status.perform_async(id, user_id, skip_ci)
end
update_column(:rebase_jid, jid)
......
......@@ -14,7 +14,7 @@ module Groups
def async_execute
group_import_state = GroupImportState.safe_find_or_create_by!(group: group, user: current_user)
jid = GroupImportWorker.perform_async(current_user.id, group.id)
jid = GroupImportWorker.with_status.perform_async(current_user.id, group.id)
if jid.present?
group_import_state.update!(jid: jid)
......
......@@ -55,6 +55,12 @@ module ApplicationWorker
subclass.after_set_class_attribute { subclass.set_queue }
end
def with_status
status_from_class = self.sidekiq_options_hash['status_expiration']
set(status_expiration: status_from_class || Gitlab::SidekiqStatus::DEFAULT_EXPIRATION)
end
def generated_queue_name
Gitlab::SidekiqConfig::WorkerRouter.queue_name_from_worker_name(self)
end
......
......@@ -47,7 +47,7 @@ module LimitedCapacity
# would be occupied by a job that will be performed in the distant future.
# We let the cron worker enqueue new jobs, this could be seen as our retry and
# back off mechanism because the job might fail again if executed immediately.
sidekiq_options retry: 0
sidekiq_options retry: 0, status_expiration: Gitlab::SidekiqStatus::DEFAULT_EXPIRATION
deduplicate :none
end
......
......@@ -23,7 +23,7 @@ module Geo
end
def schedule_job(repository_id)
job_id = Geo::ContainerRepositorySyncWorker.perform_async(repository_id)
job_id = Geo::ContainerRepositorySyncWorker.with_status.perform_async(repository_id)
{ id: repository_id, job_id: job_id } if job_id
end
......
......@@ -5,7 +5,7 @@ module Geo
private
def schedule_job(project_id)
job_id = Geo::DesignRepositorySyncWorker.perform_async(project_id)
job_id = Geo::DesignRepositorySyncWorker.with_status.perform_async(project_id)
{ project_id: project_id, job_id: job_id } if job_id
end
......
......@@ -26,7 +26,7 @@ module Geo
end
def schedule_job(object_type, object_db_id)
job_id = FileDownloadWorker.perform_async(object_type.to_s, object_db_id)
job_id = FileDownloadWorker.with_status.perform_async(object_type.to_s, object_db_id)
{ id: object_db_id, type: object_type, job_id: job_id } if job_id
end
......
......@@ -23,7 +23,7 @@ module Geo
end
def schedule_job(replicable_name, model_record_id)
job_id = ::Geo::EventWorker.perform_async(replicable_name, :created, model_record_id: model_record_id)
job_id = ::Geo::EventWorker.with_status.perform_async(replicable_name, :created, model_record_id: model_record_id)
{ model_record_id: model_record_id, replicable_name: replicable_name, job_id: job_id } if job_id
end
......
......@@ -50,7 +50,7 @@ module Geo
def schedule_job(project_id)
registry = Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
job_id = Geo::ProjectSyncWorker.perform_async(
job_id = Geo::ProjectSyncWorker.with_status.perform_async(
project_id,
sync_repository: registry.repository_sync_due?(Time.current),
sync_wiki: registry.wiki_sync_due?(Time.current)
......
......@@ -36,7 +36,7 @@ module Geo
end
def schedule_job(project_id)
job_id = Geo::RepositoryVerification::Primary::SingleWorker.perform_async(project_id)
job_id = Geo::RepositoryVerification::Primary::SingleWorker.with_status.perform_async(project_id)
{ id: project_id, job_id: job_id } if job_id
end
......
......@@ -61,7 +61,7 @@ module Geo
# rubocop:disable CodeReuse/ActiveRecord
def schedule_job(project_id)
registry_id = Geo::ProjectRegistry.where(project_id: project_id).pick(:id)
job_id = Geo::RepositoryVerification::Secondary::SingleWorker.perform_async(registry_id)
job_id = Geo::RepositoryVerification::Secondary::SingleWorker.with_status.perform_async(registry_id)
{ project_id: project_id, job_id: job_id } if job_id
end
......
......@@ -323,6 +323,7 @@ RSpec.describe Projects::MergeRequestsController do
end
def expect_rebase_worker_for(user)
allow(RebaseWorker).to receive(:with_status).and_return(RebaseWorker)
expect(RebaseWorker).to receive(:perform_async).with(merge_request.id, user.id, false)
end
......
......@@ -22,6 +22,7 @@ RSpec.describe ProjectImportState, type: :model do
it 'calls RepositoryImportWorker and inserts in front of the mirror scheduler queue', :sidekiq_might_not_need_inline do
allow_any_instance_of(EE::Project).to receive(:repository_exists?).and_return(false, true)
expect_any_instance_of(EE::ProjectImportState).to receive(:force_import_job!)
expect(RepositoryImportWorker).to receive(:perform_async).with(import_state.project_id).and_call_original
......
......@@ -17,6 +17,8 @@ RSpec.describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :use_sql_query_
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:over_time?).and_return(false)
end
allow(Geo::ContainerRepositorySyncWorker).to receive(:with_status).and_return(Geo::ContainerRepositorySyncWorker)
end
it 'does not schedule anything when tracking database is not configured' do
......
......@@ -22,6 +22,8 @@ RSpec.describe Geo::DesignRepositoryShardSyncWorker, :geo, :clean_gitlab_redis_c
create(:design, project: project_1)
create(:design, project: project_2)
allow(Geo::DesignRepositorySyncWorker).to receive(:with_status).and_return(Geo::DesignRepositorySyncWorker)
end
it 'does not perform Geo::DesignRepositorySyncWorker when shard becomes unhealthy' do
......
......@@ -19,6 +19,8 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
WebMock.stub_request(:get, /primary-geo-node/).to_return(status: 200, body: "", headers: {})
stub_feature_flags(geo_upload_replication: false)
allow(Geo::FileDownloadWorker).to receive(:with_status).and_return(Geo::FileDownloadWorker)
end
it 'does not schedule anything when tracking database is not configured' do
......
......@@ -16,6 +16,8 @@ RSpec.describe Geo::RegistrySyncWorker, :geo, :use_sql_query_cache_for_tracking_
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:over_time?).and_return(false)
end
allow(::Geo::EventWorker).to receive(:with_status).and_return(::Geo::EventWorker)
end
it 'does not schedule anything when tracking database is not configured' do
......
......@@ -19,6 +19,8 @@ RSpec.describe Geo::RepositoryShardSyncWorker, :geo, :clean_gitlab_redis_cache,
stub_exclusive_lease(renew: true)
Gitlab::ShardHealthCache.update([shard_name])
allow(Geo::ProjectSyncWorker).to receive(:with_status).and_return(Geo::ProjectSyncWorker)
end
it 'does not perform Geo::ProjectSyncWorker when shard becomes unhealthy' do
......
......@@ -12,6 +12,8 @@ RSpec.describe Geo::RepositoryVerification::Primary::ShardWorker, :clean_gitlab_
before do
stub_current_geo_node(primary)
allow(primary_singleworker).to receive(:with_status).and_return(primary_singleworker)
end
describe '#perform' do
......
......@@ -19,6 +19,8 @@ RSpec.describe Geo::RepositoryVerification::Secondary::ShardWorker, :geo, :reque
stub_exclusive_lease(renew: true)
Gitlab::ShardHealthCache.update([shard_name])
allow(verification_worker).to receive(:with_status).and_return(verification_worker)
end
context 'shard worker scheduler' do
......
......@@ -7,12 +7,16 @@ module Gitlab
# To check if a job has been completed, simply pass the job ID to the
# `completed?` method:
#
# job_id = SomeWorker.perform_async(...)
# job_id = SomeWorker.with_status.perform_async(...)
#
# if Gitlab::SidekiqStatus.completed?(job_id)
# ...
# end
#
# If you do not use `with_status`, and the worker class does not declare
# `status_expiration` in its `sidekiq_options`, then this status will not be
# stored.
#
# For each job ID registered a separate key is stored in Redis, making lookups
# much faster than using Sidekiq's built-in job finding/status API. These keys
# expire after a certain period of time to prevent storing too many keys in
......
......@@ -10,7 +10,7 @@ RSpec.describe Projects::MergeRequestsController do
let_it_be_with_reload(:project_public_with_private_builds) { create(:project, :repository, :public, :builds_private) }
let(:user) { project.owner }
let(:merge_request) { create(:merge_request_with_diffs, target_project: project, source_project: merge_request_source_project, allow_maintainer_to_push: false) }
let(:merge_request) { create(:merge_request_with_diffs, target_project: project, source_project: merge_request_source_project, allow_collaboration: false) }
let(:merge_request_source_project) { project }
before do
......@@ -507,6 +507,7 @@ RSpec.describe Projects::MergeRequestsController do
end
it 'starts the merge immediately with permitted params' do
allow(MergeWorker).to receive(:with_status).and_return(MergeWorker)
expect(MergeWorker).to receive(:perform_async).with(merge_request.id, anything, { 'sha' => merge_request.diff_head_sha })
merge_with_sha
......@@ -2078,6 +2079,10 @@ RSpec.describe Projects::MergeRequestsController do
post :rebase, params: { namespace_id: project.namespace, project_id: project, id: merge_request }
end
before do
allow(RebaseWorker).to receive(:with_status).and_return(RebaseWorker)
end
def expect_rebase_worker_for(user)
expect(RebaseWorker).to receive(:perform_async).with(merge_request.id, user.id, false)
end
......
......@@ -2920,6 +2920,8 @@ RSpec.describe MergeRequest, factory_default: :keep do
params = {}
merge_jid = 'hash-123'
allow(MergeWorker).to receive(:with_status).and_return(MergeWorker)
expect(merge_request).to receive(:expire_etag_cache)
expect(MergeWorker).to receive(:perform_async).with(merge_request.id, user_id, params) do
merge_jid
......@@ -2938,6 +2940,10 @@ RSpec.describe MergeRequest, factory_default: :keep do
subject(:execute) { merge_request.rebase_async(user_id) }
before do
allow(RebaseWorker).to receive(:with_status).and_return(RebaseWorker)
end
it 'atomically enqueues a RebaseWorker job and updates rebase_jid' do
expect(RebaseWorker)
.to receive(:perform_async)
......
......@@ -3278,6 +3278,8 @@ RSpec.describe API::MergeRequests do
context 'when skip_ci parameter is set' do
it 'enqueues a rebase of the merge request with skip_ci flag set' do
allow(RebaseWorker).to receive(:with_status).and_return(RebaseWorker)
expect(RebaseWorker).to receive(:perform_async).with(merge_request.id, user.id, true).and_call_original
Sidekiq::Testing.fake! do
......
......@@ -60,6 +60,7 @@ RSpec.describe Import::GitlabGroupsController do
end
it 'imports the group data', :sidekiq_inline do
allow(GroupImportWorker).to receive(:with_status).and_return(GroupImportWorker)
allow(GroupImportWorker).to receive(:perform_async).and_call_original
import_request
......@@ -67,7 +68,6 @@ RSpec.describe Import::GitlabGroupsController do
group = Group.find_by(name: 'test-group-import')
expect(GroupImportWorker).to have_received(:perform_async).with(user.id, group.id)
expect(group.description).to eq 'A voluptate non sequi temporibus quam at.'
expect(group.visibility_level).to eq Gitlab::VisibilityLevel::PRIVATE
end
......
......@@ -24,6 +24,10 @@ RSpec.describe AutoMerge::MergeWhenPipelineSucceedsService do
project.add_maintainer(user)
end
before do
allow(MergeWorker).to receive(:with_status).and_return(MergeWorker)
end
describe "#available_for?" do
subject { service.available_for?(mr_merge_if_green_enabled) }
......
......@@ -7,6 +7,10 @@ RSpec.describe Groups::ImportExport::ImportService do
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
before do
allow(GroupImportWorker).to receive(:with_status).and_return(GroupImportWorker)
end
context 'when the job can be successfully scheduled' do
subject(:import_service) { described_class.new(group: group, user: user) }
......@@ -20,6 +24,8 @@ RSpec.describe Groups::ImportExport::ImportService do
end
it 'enqueues an import job' do
allow(GroupImportWorker).to receive(:with_status).and_return(GroupImportWorker)
expect(GroupImportWorker).to receive(:perform_async).with(user.id, group.id)
import_service.async_execute
......
......@@ -39,6 +39,10 @@ end
# let(:status_api) { status_create_self_monitoring_project_admin_application_settings_path }
# subject { post create_self_monitoring_project_admin_application_settings_path }
RSpec.shared_examples 'triggers async worker, returns sidekiq job_id with response accepted' do
before do
allow(worker_class).to receive(:with_status).and_return(worker_class)
end
it 'returns sidekiq job_id of expected length' do
subject
......
......@@ -17,7 +17,7 @@ end
RSpec.shared_examples 'returns in_progress based on Sidekiq::Status' do
it 'returns true when job is enqueued' do
jid = described_class.perform_async
jid = described_class.with_status.perform_async
expect(described_class.in_progress?(jid)).to eq(true)
end
......
......@@ -598,4 +598,48 @@ RSpec.describe ApplicationWorker do
end
end
end
describe '.with_status' do
around do |example|
Sidekiq::Testing.fake!(&example)
end
context 'when the worker does have status_expiration set' do
let(:status_expiration_worker) do
Class.new(worker) do
sidekiq_options status_expiration: 3
end
end
it 'uses status_expiration from the worker' do
status_expiration_worker.with_status.perform_async
expect(Sidekiq::Queues[status_expiration_worker.queue].first).to include('status_expiration' => 3)
expect(Sidekiq::Queues[status_expiration_worker.queue].length).to eq(1)
end
it 'uses status_expiration from the worker without with_status' do
status_expiration_worker.perform_async
expect(Sidekiq::Queues[status_expiration_worker.queue].first).to include('status_expiration' => 3)
expect(Sidekiq::Queues[status_expiration_worker.queue].length).to eq(1)
end
end
context 'when the worker does not have status_expiration set' do
it 'uses the default status_expiration' do
worker.with_status.perform_async
expect(Sidekiq::Queues[worker.queue].first).to include('status_expiration' => Gitlab::SidekiqStatus::DEFAULT_EXPIRATION)
expect(Sidekiq::Queues[worker.queue].length).to eq(1)
end
it 'does not set status_expiration without with_status' do
worker.perform_async
expect(Sidekiq::Queues[worker.queue].first).not_to include('status_expiration')
expect(Sidekiq::Queues[worker.queue].length).to eq(1)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment