Commit 06498ea8 authored by Douwe Maan's avatar Douwe Maan

Use a dedicated queue for each worker

parent fadc4ce6
---
- cronjob:admin_email
- cronjob:expire_build_artifacts
- cronjob:gitlab_usage_ping
- cronjob:import_export_project_cleanup
- cronjob:pipeline_schedule
- cronjob:prune_old_events
- cronjob:remove_expired_group_links
- cronjob:remove_expired_members
- cronjob:remove_old_web_hook_logs
- cronjob:remove_unreferenced_lfs_objects
- cronjob:repository_archive_cache
- cronjob:repository_check_batch
- cronjob:requests_profiles
- cronjob:schedule_update_user_activity
- cronjob:stuck_ci_jobs
- cronjob:stuck_import_jobs
- cronjob:stuck_merge_jobs
- cronjob:trending_projects
- gcp_cluster:cluster_install_app
- gcp_cluster:cluster_provision
- gcp_cluster:cluster_wait_for_app_installation
- gcp_cluster:wait_for_cluster_creation
- github_import_advance_stage
- github_importer:github_import_import_diff_note
- github_importer:github_import_import_issue
- github_importer:github_import_import_note
- github_importer:github_import_import_pull_request
- github_importer:github_import_refresh_import_jid
- github_importer:github_import_stage_finish_import
- github_importer:github_import_stage_import_base_data
- github_importer:github_import_stage_import_issues_and_diff_notes
- github_importer:github_import_stage_import_notes
- github_importer:github_import_stage_import_pull_requests
- github_importer:github_import_stage_import_repository
- pipeline_cache:expire_job_cache
- pipeline_cache:expire_pipeline_cache
- pipeline_creation:create_pipeline
- pipeline_default:build_coverage
- pipeline_default:build_trace_sections
- pipeline_default:pipeline_metrics
- pipeline_default:pipeline_notification
- pipeline_default:update_head_pipeline_for_merge_request
- pipeline_hooks:build_hooks
- pipeline_hooks:pipeline_hooks
- pipeline_processing:build_finished
- pipeline_processing:build_queue
- pipeline_processing:build_success
- pipeline_processing:pipeline_process
- pipeline_processing:pipeline_success
- pipeline_processing:pipeline_update
- pipeline_processing:stage_update
- repository_check:repository_check_clear
- repository_check:repository_check_single_repository
- default
- mailers # ActionMailer::DeliveryJob.queue_name
- authorized_projects
- background_migration
- create_gpg_signature
- delete_merged_branches
- delete_user
- email_receiver
- emails_on_push
- expire_build_instance_artifacts
- git_garbage_collect
- gitlab_shell
- group_destroy
- invalid_gpg_signature_update
- irker
- merge
- namespaceless_project_destroy
- new_issue
- new_merge_request
- new_note
- pages
- post_receive
- process_commit
- project_cache
- project_destroy
- project_export
- project_migrate_hashed_storage
- project_service
- propagate_service_template
- reactive_caching
- repository_fork
- repository_import
- storage_migrator
- system_hook_push
- update_merge_requests
- update_user_activity
- upload_checksum
- web_hook
# EE-specific queues
- cronjob:clear_shared_runners_minutes
- cronjob:geo_file_download_dispatch
- cronjob:geo_metrics_update
- cronjob:geo_prune_event_log
- cronjob:geo_repository_sync
- cronjob:historical_data
- cronjob:ldap_all_groups_sync
- cronjob:ldap_sync
- cronjob:update_all_mirrors
- geo:geo_file_removal
- geo:geo_hashed_storage_attachments_migration
- geo:geo_hashed_storage_migration
- geo:geo_rename_repository
- geo:geo_repositories_clean_up
- geo:geo_repository_destroy
- admin_emails
- elastic_batch_project_indexer
- elastic_commit_indexer
- elastic_indexer
- export_csv
- geo_base_scheduler
- geo_file_download
- geo_project_sync
- geo_repository_shard_sync
- ldap_group_sync
- object_storage_upload
- project_update_repository_storage
- rebase
- repository_update_mirror
- repository_update_remote_mirror
...@@ -2,7 +2,7 @@ class BuildFinishedWorker ...@@ -2,7 +2,7 @@ class BuildFinishedWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build| Ci::Build.find_by(id: build_id).try do |build|
......
...@@ -2,7 +2,7 @@ class BuildHooksWorker ...@@ -2,7 +2,7 @@ class BuildHooksWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks queue_namespace :pipeline_hooks
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id) Ci::Build.find_by(id: build_id)
......
...@@ -2,7 +2,7 @@ class BuildQueueWorker ...@@ -2,7 +2,7 @@ class BuildQueueWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build| Ci::Build.find_by(id: build_id).try do |build|
......
...@@ -2,7 +2,7 @@ class BuildSuccessWorker ...@@ -2,7 +2,7 @@ class BuildSuccessWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build| Ci::Build.find_by(id: build_id).try do |build|
......
...@@ -6,10 +6,20 @@ module ApplicationWorker ...@@ -6,10 +6,20 @@ module ApplicationWorker
include Sidekiq::Worker include Sidekiq::Worker
included do included do
sidekiq_options queue: base_queue_name set_queue
end end
module ClassMethods module ClassMethods
def inherited(subclass)
subclass.set_queue
end
def set_queue
queue_name = [queue_namespace, base_queue_name].compact.join(':')
sidekiq_options queue: queue_name
end
def base_queue_name def base_queue_name
name name
.sub(/\AGitlab::/, '') .sub(/\AGitlab::/, '')
...@@ -18,6 +28,16 @@ module ApplicationWorker ...@@ -18,6 +28,16 @@ module ApplicationWorker
.tr('/', '_') .tr('/', '_')
end end
def queue_namespace(new_namespace = nil)
if new_namespace
sidekiq_options queue_namespace: new_namespace
set_queue
else
get_sidekiq_options['queue_namespace']&.to_s
end
end
def queue def queue
get_sidekiq_options['queue'].to_s get_sidekiq_options['queue'].to_s
end end
......
...@@ -5,6 +5,6 @@ module ClusterQueue ...@@ -5,6 +5,6 @@ module ClusterQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: :gcp_cluster queue_namespace :gcp_cluster
end end
end end
...@@ -4,6 +4,7 @@ module CronjobQueue ...@@ -4,6 +4,7 @@ module CronjobQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: :cronjob, retry: false queue_namespace :cronjob
sidekiq_options retry: false
end end
end end
...@@ -3,6 +3,6 @@ module GeoQueue ...@@ -3,6 +3,6 @@ module GeoQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: :geo queue_namespace :geo
end end
end end
...@@ -4,12 +4,14 @@ module Gitlab ...@@ -4,12 +4,14 @@ module Gitlab
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
queue_namespace :github_importer
# If a job produces an error it may block a stage from advancing # If a job produces an error it may block a stage from advancing
# forever. To prevent this from happening we prevent jobs from going to # forever. To prevent this from happening we prevent jobs from going to
# the dead queue. This does mean some resources may not be imported, but # the dead queue. This does mean some resources may not be imported, but
# this is better than a project being stuck in the "import" state # this is better than a project being stuck in the "import" state
# forever. # forever.
sidekiq_options queue: 'github_importer', dead: false, retry: 5 sidekiq_options dead: false, retry: 5
end end
end end
end end
......
...@@ -5,14 +5,6 @@ module PipelineQueue ...@@ -5,14 +5,6 @@ module PipelineQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: 'pipeline_default' queue_namespace :pipeline_default
end
class_methods do
def enqueue_in(group:)
raise ArgumentError, 'Unspecified queue group!' if group.empty?
sidekiq_options queue: "pipeline_#{group}"
end
end end
end end
...@@ -3,6 +3,8 @@ module RepositoryCheckQueue ...@@ -3,6 +3,8 @@ module RepositoryCheckQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: :repository_check, retry: false queue_namespace :repository_check
sidekiq_options retry: false
end end
end end
...@@ -2,7 +2,7 @@ class CreatePipelineWorker ...@@ -2,7 +2,7 @@ class CreatePipelineWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :creation queue_namespace :pipeline_creation
def perform(project_id, user_id, ref, source, params = {}) def perform(project_id, user_id, ref, source, params = {})
project = Project.find(project_id) project = Project.find(project_id)
......
...@@ -2,7 +2,7 @@ class ExpireJobCacheWorker ...@@ -2,7 +2,7 @@ class ExpireJobCacheWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache queue_namespace :pipeline_cache
def perform(job_id) def perform(job_id)
job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
......
...@@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker ...@@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache queue_namespace :pipeline_cache
def perform(pipeline_id) def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by(id: pipeline_id) pipeline = Ci::Pipeline.find_by(id: pipeline_id)
......
module Geo module Geo
class BaseSchedulerWorker class BaseSchedulerWorker
include ApplicationWorker include ApplicationWorker
include CronjobQueue
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
DB_RETRIEVE_BATCH_SIZE = 1000 DB_RETRIEVE_BATCH_SIZE = 1000
......
module Geo module Geo
class FileDownloadDispatchWorker < Geo::BaseSchedulerWorker class FileDownloadDispatchWorker < Geo::BaseSchedulerWorker
include CronjobQueue
private private
def max_capacity def max_capacity
......
module Geo module Geo
class RepositoryShardSyncWorker < Geo::BaseSchedulerWorker class RepositoryShardSyncWorker < Geo::BaseSchedulerWorker
# We may have many long-running threads, so split them out sidekiq_options retry: false
# into their own queue to make it possible for other jobs to run.
sidekiq_options queue: :geo_repository_shard_sync, retry: false
attr_accessor :shard_name attr_accessor :shard_name
......
...@@ -9,7 +9,7 @@ module Gitlab ...@@ -9,7 +9,7 @@ module Gitlab
class AdvanceStageWorker class AdvanceStageWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options queue: 'github_importer_advance_stage', dead: false sidekiq_options dead: false
INTERVAL = 30.seconds.to_i INTERVAL = 30.seconds.to_i
......
class PagesWorker class PagesWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options queue: :pages, retry: false sidekiq_options retry: false
def perform(action, *arg) def perform(action, *arg)
send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
......
...@@ -2,7 +2,7 @@ class PipelineHooksWorker ...@@ -2,7 +2,7 @@ class PipelineHooksWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks queue_namespace :pipeline_hooks
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id) Ci::Pipeline.find_by(id: pipeline_id)
......
...@@ -2,7 +2,7 @@ class PipelineProcessWorker ...@@ -2,7 +2,7 @@ class PipelineProcessWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id) Ci::Pipeline.find_by(id: pipeline_id)
......
...@@ -2,7 +2,7 @@ class PipelineSuccessWorker ...@@ -2,7 +2,7 @@ class PipelineSuccessWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
......
...@@ -2,7 +2,7 @@ class PipelineUpdateWorker ...@@ -2,7 +2,7 @@ class PipelineUpdateWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id) Ci::Pipeline.find_by(id: pipeline_id)
......
...@@ -2,7 +2,7 @@ class StageUpdateWorker ...@@ -2,7 +2,7 @@ class StageUpdateWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(stage_id) def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage| Ci::Stage.find_by(id: stage_id).try do |stage|
......
class UpdateHeadPipelineForMergeRequestWorker class UpdateHeadPipelineForMergeRequestWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue
sidekiq_options queue: 'pipeline_default'
def perform(merge_request_id) def perform(merge_request_id)
merge_request = MergeRequest.find(merge_request_id) merge_request = MergeRequest.find(merge_request_id)
......
...@@ -46,6 +46,8 @@ Sidekiq.configure_server do |config| ...@@ -46,6 +46,8 @@ Sidekiq.configure_server do |config|
Gitlab::SidekiqThrottler.execute! Gitlab::SidekiqThrottler.execute!
Gitlab::SidekiqVersioning.install!
config = Gitlab::Database.config || config = Gitlab::Database.config ||
Rails.application.config.database_configuration[Rails.env] Rails.application.config.database_configuration[Rails.env]
config['pool'] = Sidekiq.options[:concurrency] config['pool'] = Sidekiq.options[:concurrency]
...@@ -72,19 +74,3 @@ Sidekiq.configure_client do |config| ...@@ -72,19 +74,3 @@ Sidekiq.configure_client do |config|
chain.add Gitlab::SidekiqStatus::ClientMiddleware chain.add Gitlab::SidekiqStatus::ClientMiddleware
end end
end end
# The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring.
begin
queues = Gitlab::SidekiqConfig.worker_queues
Sidekiq.redis do |conn|
conn.pipelined do
queues.each do |queue|
conn.sadd('queues', queue)
end
end
end
rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
end
...@@ -25,8 +25,8 @@ ...@@ -25,8 +25,8 @@
- [new_note, 2] - [new_note, 2]
- [new_issue, 2] - [new_issue, 2]
- [new_merge_request, 2] - [new_merge_request, 2]
- [build, 2] - [build, 2] # Replaced by pipeline
- [pipeline, 2] - [pipeline, 2] # Replaced by pipeline_*
- [pipeline_processing, 5] - [pipeline_processing, 5]
- [pipeline_creation, 4] - [pipeline_creation, 4]
- [pipeline_default, 3] - [pipeline_default, 3]
...@@ -38,11 +38,13 @@ ...@@ -38,11 +38,13 @@
- [mailers, 2] - [mailers, 2]
- [invalid_gpg_signature_update, 2] - [invalid_gpg_signature_update, 2]
- [create_gpg_signature, 2] - [create_gpg_signature, 2]
- [rebase, 2]
- [upload_checksum, 1] - [upload_checksum, 1]
- [repository_fork, 1] - [repository_fork, 1]
- [repository_import, 1] - [repository_import, 1]
- [github_importer, 1] - [github_importer, 1]
- [github_importer_advance_stage, 1] - [github_importer_advance_stage, 1] # Replaced by github_import_advance_stage
- [github_import_advance_stage, 1]
- [project_service, 1] - [project_service, 1]
- [delete_user, 1] - [delete_user, 1]
- [delete_merged_branches, 1] - [delete_merged_branches, 1]
...@@ -68,13 +70,15 @@ ...@@ -68,13 +70,15 @@
- [gcp_cluster, 1] - [gcp_cluster, 1]
- [project_migrate_hashed_storage, 1] - [project_migrate_hashed_storage, 1]
- [storage_migrator, 1] - [storage_migrator, 1]
# EE specific queues
# EE-specific queues
- [ldap_group_sync, 2] - [ldap_group_sync, 2]
- [geo, 1] - [geo, 1]
- [repository_update_mirror, 1] - [repository_update_mirror, 1]
- [repository_update_remote_mirror, 1] - [repository_update_remote_mirror, 1]
- [project_update_repository_storage, 1] - [project_update_repository_storage, 1]
- [admin_emails, 1] - [admin_emails, 1]
- [geo_base_scheduler, 1] # Parent class of geo_repository_shard_sync and cronjob:geo_file_download_dispatch
- [geo_project_sync, 1] - [geo_project_sync, 1]
- [geo_file_download, 1] - [geo_file_download, 1]
- [geo_repository_shard_sync, 1] - [geo_repository_shard_sync, 1]
......
class RebaseWorker class RebaseWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options queue: :merge
def perform(merge_request_id, current_user_id) def perform(merge_request_id, current_user_id)
current_user = User.find(current_user_id) current_user = User.find(current_user_id)
merge_request = MergeRequest.find(merge_request_id) merge_request = MergeRequest.find(merge_request_id)
......
...@@ -33,8 +33,13 @@ module Gitlab ...@@ -33,8 +33,13 @@ module Gitlab
queue_groups = SidekiqCluster.parse_queues(argv) queue_groups = SidekiqCluster.parse_queues(argv)
all_queues = SidekiqConfig.worker_queues(@rails_path)
queue_groups.map! do |queues|
SidekiqConfig.expand_queues(queues, all_queues)
end
if @negate_queues if @negate_queues
all_queues = SidekiqConfig.config_queues(@rails_path)
queue_groups.map! { |queues| all_queues - queues } queue_groups.map! { |queues| all_queues - queues }
end end
......
require 'yaml' require 'yaml'
require 'set'
module Gitlab module Gitlab
module SidekiqConfig module SidekiqConfig
def self.redis_queues # This method is called by `bin/sidekiq-cluster` in EE, which runs outside
@redis_queues ||= Sidekiq::Queue.all.map(&:name) # of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.worker_queues(rails_path = Rails.root.to_s)
@worker_queues ||= {}
@worker_queues[rails_path] ||= YAML.load_file(File.join(rails_path, 'app/workers/all_queues.yml'))
end end
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside # This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods. # of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.config_queues(rails_path = Rails.root.to_s) def self.expand_queues(queues, all_queues = self.worker_queues)
return [] if queues.empty?
queues_set = all_queues.to_set
queues.flat_map do |queue|
[queue, *queues_set.grep(/\A#{queue}:/)]
end
end
def self.redis_queues
# Not memoized, because this can change during the life of the application
Sidekiq::Queue.all.map(&:name)
end
def self.config_queues
@config_queues ||= begin @config_queues ||= begin
config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml')) config = YAML.load_file(Rails.root.join('config/sidekiq_queues.yml'))
config[:queues].map(&:first) config[:queues].map(&:first)
end end
end end
...@@ -25,14 +44,6 @@ module Gitlab ...@@ -25,14 +44,6 @@ module Gitlab
find_workers(Rails.root.join('ee', 'app', 'workers')) find_workers(Rails.root.join('ee', 'app', 'workers'))
end end
def self.default_queues
[ActionMailer::DeliveryJob.queue_name, 'default']
end
def self.worker_queues
@worker_queues ||= (workers.map(&:queue) + default_queues).uniq
end
def self.find_workers(root) def self.find_workers(root)
concerns = root.join('concerns').to_s concerns = root.join('concerns').to_s
...@@ -45,7 +56,7 @@ module Gitlab ...@@ -45,7 +56,7 @@ module Gitlab
ns.camelize.constantize ns.camelize.constantize
end end
# Skip concerns # Skip things that aren't workers
workers.select { |w| w < Sidekiq::Worker } workers.select { |w| w < Sidekiq::Worker }
end end
end end
......
module Gitlab
module SidekiqVersioning
def self.install!
Sidekiq::Manager.prepend SidekiqVersioning::Manager
# The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring.
begin
queues = SidekiqConfig.worker_queues
if queues.any?
Sidekiq.redis do |conn|
conn.pipelined do
queues.each do |queue|
conn.sadd('queues', queue)
end
end
end
end
rescue ::Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
end
end
end
end
module Gitlab
module SidekiqVersioning
module Manager
def initialize(options = {})
options[:strict] = false
options[:queues] = SidekiqConfig.expand_queues(options[:queues])
Sidekiq.logger.info "Listening on queues #{options[:queues].uniq.sort}"
super
end
end
end
end
...@@ -11,28 +11,41 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -11,28 +11,41 @@ describe Gitlab::SidekiqCluster::CLI do
end end
context 'with arguments' do context 'with arguments' do
it 'starts the Sidekiq workers' do before do
expect(Gitlab::SidekiqCluster).to receive(:start).and_return([])
expect(cli).to receive(:write_pid) expect(cli).to receive(:write_pid)
expect(cli).to receive(:trap_signals) expect(cli).to receive(:trap_signals)
expect(cli).to receive(:start_loop) expect(cli).to receive(:start_loop)
end
it 'starts the Sidekiq workers' do
expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['foo']], 'test', Dir.pwd, dryrun: false)
.and_return([])
cli.run(%w(foo)) cli.run(%w(foo))
end end
context 'with --negate flag' do context 'with --negate flag' do
it 'starts Sidekiq workers for all queues on sidekiq_queues.yml except the ones on argv' do it 'starts Sidekiq workers for all queues in all_queues.yml except the ones in argv' do
expect(Gitlab::SidekiqConfig).to receive(:config_queues).and_return(['baz']) expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['baz'])
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['baz']], 'test', Dir.pwd, dryrun: false) .with([['baz']], 'test', Dir.pwd, dryrun: false)
.and_return([]) .and_return([])
expect(cli).to receive(:write_pid)
expect(cli).to receive(:trap_signals)
expect(cli).to receive(:start_loop)
cli.run(%w(foo -n)) cli.run(%w(foo -n))
end end
end end
context 'queue namespace expansion' do
it 'starts Sidekiq workers for all queues in all_queues.yml with a namespace in argv' do
expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['cronjob:foo', 'cronjob:bar'])
expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['cronjob', 'cronjob:foo', 'cronjob:bar']], 'test', Dir.pwd, dryrun: false)
.and_return([])
cli.run(%w(cronjob))
end
end
end end
end end
......
...@@ -23,7 +23,7 @@ describe Gitlab::SidekiqConfig do ...@@ -23,7 +23,7 @@ describe Gitlab::SidekiqConfig do
expect(queues).to include('post_receive') expect(queues).to include('post_receive')
expect(queues).to include('merge') expect(queues).to include('merge')
expect(queues).to include('cronjob') expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('mailers') expect(queues).to include('mailers')
expect(queues).to include('default') expect(queues).to include('default')
end end
...@@ -35,4 +35,25 @@ describe Gitlab::SidekiqConfig do ...@@ -35,4 +35,25 @@ describe Gitlab::SidekiqConfig do
expect(queues).to include('ldap_group_sync') expect(queues).to include('ldap_group_sync')
end end
end end
describe '.expand_queues' do
it 'expands queue namespaces to concrete queue names' do
queues = described_class.expand_queues(%w[cronjob])
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
end
it 'lets concrete queue names pass through' do
queues = described_class.expand_queues(%w[post_receive])
expect(queues).to include('post_receive')
end
it 'lets unknown queues pass through' do
queues = described_class.expand_queues(%w[unknown])
expect(queues).to include('unknown')
end
end
end end
require 'spec_helper'
describe Gitlab::SidekiqVersioning::Manager do
before do
Sidekiq::Manager.prepend described_class
end
describe '#initialize' do
it 'listens on all expanded queues' do
manager = Sidekiq::Manager.new(queues: %w[post_receive repository_fork cronjob unknown])
queues = manager.options[:queues]
expect(queues).to include('post_receive')
expect(queues).to include('repository_fork')
expect(queues).to include('cronjob')
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
expect(queues).to include('unknown')
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqVersioning, :sidekiq, :redis do
let(:foo_worker) do
Class.new do
def self.name
'FooWorker'
end
include ApplicationWorker
end
end
let(:bar_worker) do
Class.new do
def self.name
'BarWorker'
end
include ApplicationWorker
end
end
before do
allow(Gitlab::SidekiqConfig).to receive(:workers).and_return([foo_worker, bar_worker])
allow(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return([foo_worker.queue, bar_worker.queue])
end
describe '.install!' do
it 'prepends SidekiqVersioning::Manager into Sidekiq::Manager' do
described_class.install!
expect(Sidekiq::Manager).to include(Gitlab::SidekiqVersioning::Manager)
end
it 'registers all versionless and versioned queues with Redis' do
described_class.install!
queues = Sidekiq::Queue.all.map(&:name)
expect(queues).to include('foo')
expect(queues).to include('bar')
end
end
end
...@@ -17,6 +17,14 @@ describe ApplicationWorker do ...@@ -17,6 +17,14 @@ describe ApplicationWorker do
end end
end end
describe '.queue_namespace' do
it 'sets the queue name based on the class name' do
worker.queue_namespace :some_namespace
expect(worker.queue).to eq('some_namespace:foo_bar_dummy')
end
end
describe '.queue' do describe '.queue' do
it 'returns the queue name' do it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue worker.sidekiq_options queue: :some_queue
......
...@@ -14,6 +14,6 @@ describe ClusterQueue do ...@@ -14,6 +14,6 @@ describe ClusterQueue do
it 'sets a default pipelines queue automatically' do it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue']) expect(worker.sidekiq_options['queue'])
.to eq :gcp_cluster .to eq 'gcp_cluster:dummy'
end end
end end
...@@ -13,7 +13,7 @@ describe CronjobQueue do ...@@ -13,7 +13,7 @@ describe CronjobQueue do
end end
it 'sets the queue name of a worker' do it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob') expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob:dummy')
end end
it 'disables retrying of failed jobs' do it 'disables retrying of failed jobs' do
......
...@@ -13,6 +13,6 @@ describe GeoQueue do ...@@ -13,6 +13,6 @@ describe GeoQueue do
end end
it 'sets the queue name of a worker' do it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('geo') expect(worker.sidekiq_options['queue'].to_s).to eq('geo:dummy')
end end
end end
...@@ -11,6 +11,6 @@ describe Gitlab::GithubImport::Queue do ...@@ -11,6 +11,6 @@ describe Gitlab::GithubImport::Queue do
include Gitlab::GithubImport::Queue include Gitlab::GithubImport::Queue
end end
expect(worker.sidekiq_options['queue']).to eq('github_importer') expect(worker.sidekiq_options['queue']).to eq('github_importer:dummy')
end end
end end
...@@ -14,15 +14,6 @@ describe PipelineQueue do ...@@ -14,15 +14,6 @@ describe PipelineQueue do
it 'sets a default pipelines queue automatically' do it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue']) expect(worker.sidekiq_options['queue'])
.to eq 'pipeline_default' .to eq 'pipeline_default:dummy'
end
describe '.enqueue_in' do
it 'sets a custom sidekiq queue with prefix and group' do
worker.enqueue_in(group: :processing)
expect(worker.sidekiq_options['queue'])
.to eq 'pipeline_processing'
end
end end
end end
...@@ -13,7 +13,7 @@ describe RepositoryCheckQueue do ...@@ -13,7 +13,7 @@ describe RepositoryCheckQueue do
end end
it 'sets the queue name of a worker' do it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check') expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check:dummy')
end end
it 'disables retrying of failed jobs' do it 'disables retrying of failed jobs' do
......
...@@ -10,12 +10,31 @@ describe 'Every Sidekiq worker' do ...@@ -10,12 +10,31 @@ describe 'Every Sidekiq worker' do
end end
it 'uses the cronjob queue when the worker runs as a cronjob' do it 'uses the cronjob queue when the worker runs as a cronjob' do
expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob')) expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(start_with('cronjob:'))
end end
it 'defines the queue in the Sidekiq configuration file' do it 'has its queue in app/workers/all_queues.yml', :aggregate_failures do
config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set file_worker_queues = Gitlab::SidekiqConfig.worker_queues.to_set
expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names)) worker_queues = Gitlab::SidekiqConfig.workers.map(&:queue).to_set
worker_queues << ActionMailer::DeliveryJob.queue_name
worker_queues << 'default'
missing_from_file = worker_queues - file_worker_queues
expect(missing_from_file).to be_empty, "expected #{missing_from_file.to_a.inspect} to be in app/workers/all_queues.yml"
unncessarily_in_file = file_worker_queues - worker_queues
expect(unncessarily_in_file).to be_empty, "expected #{unncessarily_in_file.to_a.inspect} not to be in app/workers/all_queues.yml"
end
it 'has its queue or namespace in config/sidekiq_queues.yml', :aggregate_failures do
config_queues = Gitlab::SidekiqConfig.config_queues.to_set
Gitlab::SidekiqConfig.workers.each do |worker|
queue = worker.queue
queue_namespace = queue.split(':').first
expect(config_queues).to include(queue).or(include(queue_namespace))
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment