Commit 0b15570e authored by Douwe Maan's avatar Douwe Maan

Add ApplicationWorker and make every worker include it

parent 4ca4b0ff
class AdminEmailWorker class AdminEmailWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class AuthorizedProjectsWorker class AuthorizedProjectsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# Schedules multiple jobs and waits for them to be completed. # Schedules multiple jobs and waits for them to be completed.
def self.bulk_perform_and_wait(args_list) def self.bulk_perform_and_wait(args_list)
......
class BackgroundMigrationWorker class BackgroundMigrationWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# Enqueues a number of jobs in bulk. # Enqueues a number of jobs in bulk.
# #
......
class BuildCoverageWorker class BuildCoverageWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(build_id) def perform(build_id)
......
class BuildFinishedWorker class BuildFinishedWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class BuildHooksWorker class BuildHooksWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks enqueue_in group: :hooks
......
class BuildQueueWorker class BuildQueueWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class BuildSuccessWorker class BuildSuccessWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class BuildTraceSectionsWorker class BuildTraceSectionsWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(build_id) def perform(build_id)
......
class ClusterInstallAppWorker class ClusterInstallAppWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
......
class ClusterProvisionWorker class ClusterProvisionWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
def perform(cluster_id) def perform(cluster_id)
......
class ClusterWaitForAppInstallationWorker class ClusterWaitForAppInstallationWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
......
Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
extend ActiveSupport::Concern
include Sidekiq::Worker
included do
sidekiq_options queue: base_queue_name
end
module ClassMethods
def base_queue_name
name
.sub(/\AGitlab::/, '')
.sub(/Worker\z/, '')
.underscore
.tr('/', '_')
end
def queue
get_sidekiq_options['queue'].to_s
end
end
end
# Concern that sets the queue of a Sidekiq worker based on the worker's class
# name/namespace.
module DedicatedSidekiqQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: name.sub(/Worker\z/, '').underscore.tr('/', '_')
end
end
...@@ -8,7 +8,7 @@ module Gitlab ...@@ -8,7 +8,7 @@ module Gitlab
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include ReschedulingMethods include ReschedulingMethods
include NotifyUponDeath include NotifyUponDeath
......
class CreateGpgSignatureWorker class CreateGpgSignatureWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(commit_sha, project_id) def perform(commit_sha, project_id)
project = Project.find_by(id: project_id) project = Project.find_by(id: project_id)
......
class CreatePipelineWorker class CreatePipelineWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :creation enqueue_in group: :creation
......
class DeleteMergedBranchesWorker class DeleteMergedBranchesWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(project_id, user_id) def perform(project_id, user_id)
begin begin
......
class DeleteUserWorker class DeleteUserWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(current_user_id, delete_user_id, options = {}) def perform(current_user_id, delete_user_id, options = {})
delete_user = User.find(delete_user_id) delete_user = User.find(delete_user_id)
......
class EmailReceiverWorker class EmailReceiverWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(raw) def perform(raw)
return unless Gitlab::IncomingEmail.enabled? return unless Gitlab::IncomingEmail.enabled?
......
class EmailsOnPushWorker class EmailsOnPushWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
attr_reader :email, :skip_premailer attr_reader :email, :skip_premailer
......
class ExpireBuildArtifactsWorker class ExpireBuildArtifactsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ExpireBuildInstanceArtifactsWorker class ExpireBuildInstanceArtifactsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(build_id) def perform(build_id)
build = Ci::Build build = Ci::Build
......
class ExpireJobCacheWorker class ExpireJobCacheWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache enqueue_in group: :cache
......
class ExpirePipelineCacheWorker class ExpirePipelineCacheWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache enqueue_in group: :cache
......
class GitGarbageCollectWorker class GitGarbageCollectWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
sidekiq_options retry: false sidekiq_options retry: false
......
...@@ -7,7 +7,7 @@ module Gitlab ...@@ -7,7 +7,7 @@ module Gitlab
# been completed this worker will advance the import process to the next # been completed this worker will advance the import process to the next
# stage. # stage.
class AdvanceStageWorker class AdvanceStageWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: 'github_importer_advance_stage', dead: false sidekiq_options queue: 'github_importer_advance_stage', dead: false
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
module Gitlab module Gitlab
module GithubImport module GithubImport
class RefreshImportJidWorker class RefreshImportJidWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
# The interval to schedule new instances of this job at. # The interval to schedule new instances of this job at.
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class FinishImportWorker class FinishImportWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportBaseDataWorker class ImportBaseDataWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportIssuesAndDiffNotesWorker class ImportIssuesAndDiffNotesWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportNotesWorker class ImportNotesWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportPullRequestsWorker class ImportPullRequestsWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportRepositoryWorker class ImportRepositoryWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
class GitlabShellWorker class GitlabShellWorker
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
def perform(action, *arg) def perform(action, *arg)
gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
......
class GitlabUsagePingWorker class GitlabUsagePingWorker
LEASE_TIMEOUT = 86400 LEASE_TIMEOUT = 86400
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class GroupDestroyWorker class GroupDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
def perform(group_id, user_id) def perform(group_id, user_id)
......
class ImportExportProjectCleanupWorker class ImportExportProjectCleanupWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class InvalidGpgSignatureUpdateWorker class InvalidGpgSignatureUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(gpg_key_id) def perform(gpg_key_id)
gpg_key = GpgKey.find_by(id: gpg_key_id) gpg_key = GpgKey.find_by(id: gpg_key_id)
......
...@@ -2,8 +2,7 @@ require 'json' ...@@ -2,8 +2,7 @@ require 'json'
require 'socket' require 'socket'
class IrkerWorker class IrkerWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(project_id, chans, colors, push_data, settings) def perform(project_id, chans, colors, push_data, settings)
project = Project.find(project_id) project = Project.find(project_id)
......
class MergeWorker class MergeWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(merge_request_id, current_user_id, params) def perform(merge_request_id, current_user_id, params)
params = params.with_indifferent_access params = params.with_indifferent_access
......
...@@ -5,8 +5,7 @@ ...@@ -5,8 +5,7 @@
# The worker will reject doing anything for projects that *do* have a # The worker will reject doing anything for projects that *do* have a
# namespace. For those use ProjectDestroyWorker instead. # namespace. For those use ProjectDestroyWorker instead.
class NamespacelessProjectDestroyWorker class NamespacelessProjectDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
def self.bulk_perform_async(args_list) def self.bulk_perform_async(args_list)
......
class NewIssueWorker class NewIssueWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include NewIssuable include NewIssuable
def perform(issue_id, user_id) def perform(issue_id, user_id)
......
class NewMergeRequestWorker class NewMergeRequestWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include NewIssuable include NewIssuable
def perform(merge_request_id, user_id) def perform(merge_request_id, user_id)
......
class NewNoteWorker class NewNoteWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# Keep extra parameter to preserve backwards compatibility with # Keep extra parameter to preserve backwards compatibility with
# old `NewNoteWorker` jobs (can remove later) # old `NewNoteWorker` jobs (can remove later)
......
class PagesWorker class PagesWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: :pages, retry: false sidekiq_options queue: :pages, retry: false
......
class PipelineHooksWorker class PipelineHooksWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks enqueue_in group: :hooks
......
class PipelineMetricsWorker class PipelineMetricsWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(pipeline_id) def perform(pipeline_id)
......
class PipelineNotificationWorker class PipelineNotificationWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(pipeline_id, recipients = nil) def perform(pipeline_id, recipients = nil)
......
class PipelineProcessWorker class PipelineProcessWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class PipelineScheduleWorker class PipelineScheduleWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class PipelineSuccessWorker class PipelineSuccessWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class PipelineUpdateWorker class PipelineUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class PostReceive class PostReceive
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(gl_repository, identifier, changes) def perform(gl_repository, identifier, changes)
project, is_wiki = Gitlab::GlRepository.parse(gl_repository) project, is_wiki = Gitlab::GlRepository.parse(gl_repository)
......
...@@ -5,8 +5,7 @@ ...@@ -5,8 +5,7 @@
# Consider using an extra worker if you need to add any extra (and potentially # Consider using an extra worker if you need to add any extra (and potentially
# slow) processing of commits. # slow) processing of commits.
class ProcessCommitWorker class ProcessCommitWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# project_id - The ID of the project this commit belongs to. # project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit. # user_id - The ID of the user that pushed the commit.
......
# Worker for updating any project specific caches. # Worker for updating any project specific caches.
class ProjectCacheWorker class ProjectCacheWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LEASE_TIMEOUT = 15.minutes.to_i LEASE_TIMEOUT = 15.minutes.to_i
......
class ProjectDestroyWorker class ProjectDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
def perform(project_id, user_id, params) def perform(project_id, user_id, params)
......
class ProjectExportWorker class ProjectExportWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
sidekiq_options retry: 3 sidekiq_options retry: 3
......
class ProjectMigrateHashedStorageWorker class ProjectMigrateHashedStorageWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LEASE_TIMEOUT = 30.seconds.to_i LEASE_TIMEOUT = 30.seconds.to_i
......
class ProjectServiceWorker class ProjectServiceWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
sidekiq_options dead: false sidekiq_options dead: false
......
# Worker for updating any project specific caches. # Worker for updating any project specific caches.
class PropagateServiceTemplateWorker class PropagateServiceTemplateWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LEASE_TIMEOUT = 4.hours.to_i LEASE_TIMEOUT = 4.hours.to_i
......
class PruneOldEventsWorker class PruneOldEventsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ReactiveCachingWorker class ReactiveCachingWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(class_name, id, *args) def perform(class_name, id, *args)
klass = begin klass = begin
......
class RemoveExpiredGroupLinksWorker class RemoveExpiredGroupLinksWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class RemoveExpiredMembersWorker class RemoveExpiredMembersWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class RemoveOldWebHookLogsWorker class RemoveOldWebHookLogsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
WEB_HOOK_LOG_LIFETIME = 2.days WEB_HOOK_LOG_LIFETIME = 2.days
......
class RemoveUnreferencedLfsObjectsWorker class RemoveUnreferencedLfsObjectsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class RepositoryArchiveCacheWorker class RepositoryArchiveCacheWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
module RepositoryCheck module RepositoryCheck
class BatchWorker class BatchWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
RUN_TIME = 3600 RUN_TIME = 3600
......
module RepositoryCheck module RepositoryCheck
class ClearWorker class ClearWorker
include Sidekiq::Worker include ApplicationWorker
include RepositoryCheckQueue include RepositoryCheckQueue
def perform def perform
......
module RepositoryCheck module RepositoryCheck
class SingleRepositoryWorker class SingleRepositoryWorker
include Sidekiq::Worker include ApplicationWorker
include RepositoryCheckQueue include RepositoryCheckQueue
def perform(project_id) def perform(project_id)
......
class RepositoryForkWorker class RepositoryForkWorker
ForkError = Class.new(StandardError) ForkError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
include ProjectStartImport include ProjectStartImport
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
......
class RepositoryImportWorker class RepositoryImportWorker
ImportError = Class.new(StandardError) ImportError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
include ProjectStartImport include ProjectStartImport
......
class RequestsProfilesWorker class RequestsProfilesWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ScheduleUpdateUserActivityWorker class ScheduleUpdateUserActivityWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform(batch_size = 500) def perform(batch_size = 500)
......
class StageUpdateWorker class StageUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class StorageMigratorWorker class StorageMigratorWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
BATCH_SIZE = 100 BATCH_SIZE = 100
......
class StuckCiJobsWorker class StuckCiJobsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'.freeze EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'.freeze
......
class StuckImportJobsWorker class StuckImportJobsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
IMPORT_JOBS_EXPIRATION = 15.hours.to_i IMPORT_JOBS_EXPIRATION = 15.hours.to_i
......
class StuckMergeJobsWorker class StuckMergeJobsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class SystemHookPushWorker class SystemHookPushWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(push_data, hook_id) def perform(push_data, hook_id)
SystemHooksService.new.execute_hooks(push_data, hook_id) SystemHooksService.new.execute_hooks(push_data, hook_id)
......
class TrendingProjectsWorker class TrendingProjectsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class UpdateMergeRequestsWorker class UpdateMergeRequestsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LOG_TIME_THRESHOLD = 90 # seconds LOG_TIME_THRESHOLD = 90 # seconds
......
class UpdateUserActivityWorker class UpdateUserActivityWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(pairs) def perform(pairs)
pairs = cast_data(pairs) pairs = cast_data(pairs)
......
class UploadChecksumWorker class UploadChecksumWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(upload_id) def perform(upload_id)
upload = Upload.find(upload_id) upload = Upload.find(upload_id)
......
class WaitForClusterCreationWorker class WaitForClusterCreationWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
def perform(cluster_id) def perform(cluster_id)
......
class WebHookWorker class WebHookWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
sidekiq_options retry: 4, dead: false sidekiq_options retry: 4, dead: false
......
...@@ -64,13 +64,13 @@ end ...@@ -64,13 +64,13 @@ end
# The Sidekiq client API always adds the queue to the Sidekiq queue # The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary # list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring. # for monitoring.
config = YAML.load_file(Rails.root.join('config', 'sidekiq_queues.yml').to_s)
begin begin
queues = Gitlab::SidekiqConfig.worker_queues
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
conn.pipelined do conn.pipelined do
config[:queues].each do |queue| queues.each do |queue|
conn.sadd('queues', queue[0]) conn.sadd('queues', queue)
end end
end end
end end
......
...@@ -18,8 +18,7 @@ include the `DedicatedSidekiqQueue` concern as follows: ...@@ -18,8 +18,7 @@ include the `DedicatedSidekiqQueue` concern as follows:
```ruby ```ruby
class ProcessSomethingWorker class ProcessSomethingWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
end end
``` ```
......
require 'yaml'
module Gitlab
module SidekiqConfig
def self.redis_queues
@redis_queues ||= Sidekiq::Queue.all.map(&:name)
end
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.config_queues(rails_path = Rails.root.to_s)
@config_queues ||= begin
config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml'))
config[:queues].map(&:first)
end
end
def self.cron_workers
@cron_workers ||= Settings.cron_jobs.map { |job_name, options| options['job_class'].constantize }
end
def self.workers
@workers ||= find_workers(Rails.root.join('app', 'workers'))
end
def self.default_queues
[ActionMailer::DeliveryJob.queue_name, 'default']
end
def self.worker_queues
@worker_queues ||= (workers.map(&:queue) + default_queues).uniq
end
def self.find_workers(root)
concerns = root.join('concerns').to_s
workers = Dir[root.join('**', '*.rb')]
.reject { |path| path.start_with?(concerns) }
workers.map! do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
ns.camelize.constantize
end
# Skip concerns
workers.select { |w| w < Sidekiq::Worker }
end
end
end
require 'rails_helper'
describe Gitlab::SidekiqConfig do
describe '.workers' do
it 'includes all workers' do
workers = described_class.workers
expect(workers).to include(PostReceive)
expect(workers).to include(MergeWorker)
end
end
describe '.worker_queues' do
it 'includes all queues' do
queues = described_class.worker_queues
expect(queues).to include('post_receive')
expect(queues).to include('merge')
expect(queues).to include('cronjob')
expect(queues).to include('mailers')
expect(queues).to include('default')
end
end
end
require 'spec_helper' require 'spec_helper'
describe DedicatedSidekiqQueue do describe ApplicationWorker do
let(:worker) do let(:worker) do
Class.new do Class.new do
def self.name def self.name
'Foo::Bar::DummyWorker' 'Gitlab::Foo::Bar::DummyWorker'
end end
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
end end
end end
describe 'queue names' do describe 'Sidekiq options' do
it 'sets the queue name based on the class name' do it 'sets the queue name based on the class name' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy') expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
end end
end end
describe '.queue' do
it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue
expect(worker.queue).to eq('some_queue')
end
end
end end
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe ClusterQueue do describe ClusterQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include ClusterQueue include ClusterQueue
end end
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe CronjobQueue do describe CronjobQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include CronjobQueue include CronjobQueue
end end
end end
......
...@@ -3,6 +3,10 @@ require 'spec_helper' ...@@ -3,6 +3,10 @@ require 'spec_helper'
describe Gitlab::GithubImport::ObjectImporter do describe Gitlab::GithubImport::ObjectImporter do
let(:worker) do let(:worker) do
Class.new do Class.new do
def self.name
'DummyWorker'
end
include(Gitlab::GithubImport::ObjectImporter) include(Gitlab::GithubImport::ObjectImporter)
def counter_name def counter_name
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe Gitlab::GithubImport::Queue do describe Gitlab::GithubImport::Queue do
it 'sets the Sidekiq options for the worker' do it 'sets the Sidekiq options for the worker' do
worker = Class.new do worker = Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include Gitlab::GithubImport::Queue include Gitlab::GithubImport::Queue
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe PipelineQueue do describe PipelineQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include PipelineQueue include PipelineQueue
end end
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe RepositoryCheckQueue do describe RepositoryCheckQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include RepositoryCheckQueue include RepositoryCheckQueue
end end
end end
......
require 'spec_helper' require 'spec_helper'
describe 'Every Sidekiq worker' do describe 'Every Sidekiq worker' do
let(:workers) do it 'includes ApplicationWorker' do
root = Rails.root.join('app', 'workers') expect(Gitlab::SidekiqConfig.workers).to all(include(ApplicationWorker))
concerns = root.join('concerns').to_s
workers = Dir[root.join('**', '*.rb')]
.reject { |path| path.start_with?(concerns) }
workers.map do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
ns.camelize.constantize
end
end end
it 'does not use the default queue' do it 'does not use the default queue' do
workers.each do |worker| expect(Gitlab::SidekiqConfig.workers.map(&:queue)).not_to include('default')
expect(worker.sidekiq_options['queue'].to_s).not_to eq('default')
end
end end
it 'uses the cronjob queue when the worker runs as a cronjob' do it 'uses the cronjob queue when the worker runs as a cronjob' do
cron_workers = Settings.cron_jobs expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob'))
.map { |job_name, options| options['job_class'].constantize }
.to_set
workers.each do |worker|
next unless cron_workers.include?(worker)
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
end
end end
it 'defines the queue in the Sidekiq configuration file' do it 'defines the queue in the Sidekiq configuration file' do
config = YAML.load_file(Rails.root.join('config', 'sidekiq_queues.yml').to_s) config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set
queue_names = config[:queues].map { |(queue, _)| queue }.to_set
workers.each do |worker| expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names))
expect(queue_names).to include(worker.sidekiq_options['queue'].to_s)
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment