Commit 5babea68 authored by Sean McGivern's avatar Sean McGivern

Merge branch 'dm-dedicated-sidekiq-queues-ee' into 'master'

Use a dedicated queue for each Sidekiq worker

See merge request gitlab-org/gitlab-ee!3692
parents 634e7b77 2f71d9b0
---
- cronjob:admin_email
- cronjob:expire_build_artifacts
- cronjob:gitlab_usage_ping
- cronjob:import_export_project_cleanup
- cronjob:pipeline_schedule
- cronjob:prune_old_events
- cronjob:remove_expired_group_links
- cronjob:remove_expired_members
- cronjob:remove_old_web_hook_logs
- cronjob:remove_unreferenced_lfs_objects
- cronjob:repository_archive_cache
- cronjob:repository_check_batch
- cronjob:requests_profiles
- cronjob:schedule_update_user_activity
- cronjob:stuck_ci_jobs
- cronjob:stuck_import_jobs
- cronjob:stuck_merge_jobs
- cronjob:trending_projects
- gcp_cluster:cluster_install_app
- gcp_cluster:cluster_provision
- gcp_cluster:cluster_wait_for_app_installation
- gcp_cluster:wait_for_cluster_creation
- github_import_advance_stage
- github_importer:github_import_import_diff_note
- github_importer:github_import_import_issue
- github_importer:github_import_import_note
- github_importer:github_import_import_pull_request
- github_importer:github_import_refresh_import_jid
- github_importer:github_import_stage_finish_import
- github_importer:github_import_stage_import_base_data
- github_importer:github_import_stage_import_issues_and_diff_notes
- github_importer:github_import_stage_import_notes
- github_importer:github_import_stage_import_pull_requests
- github_importer:github_import_stage_import_repository
- pipeline_cache:expire_job_cache
- pipeline_cache:expire_pipeline_cache
- pipeline_creation:create_pipeline
- pipeline_default:build_coverage
- pipeline_default:build_trace_sections
- pipeline_default:pipeline_metrics
- pipeline_default:pipeline_notification
- pipeline_default:update_head_pipeline_for_merge_request
- pipeline_hooks:build_hooks
- pipeline_hooks:pipeline_hooks
- pipeline_processing:build_finished
- pipeline_processing:build_queue
- pipeline_processing:build_success
- pipeline_processing:pipeline_process
- pipeline_processing:pipeline_success
- pipeline_processing:pipeline_update
- pipeline_processing:stage_update
- repository_check:repository_check_clear
- repository_check:repository_check_single_repository
- default
- mailers # ActionMailer::DeliveryJob.queue_name
- authorized_projects
- background_migration
- create_gpg_signature
- delete_merged_branches
- delete_user
- email_receiver
- emails_on_push
- expire_build_instance_artifacts
- git_garbage_collect
- gitlab_shell
- group_destroy
- invalid_gpg_signature_update
- irker
- merge
- namespaceless_project_destroy
- new_issue
- new_merge_request
- new_note
- pages
- post_receive
- process_commit
- project_cache
- project_destroy
- project_export
- project_migrate_hashed_storage
- project_service
- propagate_service_template
- reactive_caching
- repository_fork
- repository_import
- storage_migrator
- system_hook_push
- update_merge_requests
- update_user_activity
- upload_checksum
- web_hook
# EE-specific queues
- cronjob:clear_shared_runners_minutes
- cronjob:geo_file_download_dispatch
- cronjob:geo_metrics_update
- cronjob:geo_prune_event_log
- cronjob:geo_repository_sync
- cronjob:historical_data
- cronjob:ldap_all_groups_sync
- cronjob:ldap_sync
- cronjob:update_all_mirrors
- geo:geo_file_removal
- geo:geo_hashed_storage_attachments_migration
- geo:geo_hashed_storage_migration
- geo:geo_rename_repository
- geo:geo_repositories_clean_up
- geo:geo_repository_destroy
- admin_emails
- elastic_batch_project_indexer
- elastic_commit_indexer
- elastic_indexer
- export_csv
- geo_base_scheduler
- geo_file_download
- geo_project_sync
- geo_repository_shard_sync
- ldap_group_sync
- object_storage_upload
- project_update_repository_storage
- rebase
- repository_update_mirror
- repository_update_remote_mirror
...@@ -2,7 +2,7 @@ class BuildFinishedWorker ...@@ -2,7 +2,7 @@ class BuildFinishedWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build| Ci::Build.find_by(id: build_id).try do |build|
......
...@@ -2,7 +2,7 @@ class BuildHooksWorker ...@@ -2,7 +2,7 @@ class BuildHooksWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks queue_namespace :pipeline_hooks
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id) Ci::Build.find_by(id: build_id)
......
...@@ -2,7 +2,7 @@ class BuildQueueWorker ...@@ -2,7 +2,7 @@ class BuildQueueWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build| Ci::Build.find_by(id: build_id).try do |build|
......
...@@ -2,7 +2,7 @@ class BuildSuccessWorker ...@@ -2,7 +2,7 @@ class BuildSuccessWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build| Ci::Build.find_by(id: build_id).try do |build|
......
...@@ -3,13 +3,23 @@ Sidekiq::Worker.extend ActiveSupport::Concern ...@@ -3,13 +3,23 @@ Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker module ApplicationWorker
extend ActiveSupport::Concern extend ActiveSupport::Concern
include Sidekiq::Worker include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
included do included do
sidekiq_options queue: base_queue_name set_queue
end end
module ClassMethods module ClassMethods
def inherited(subclass)
subclass.set_queue
end
def set_queue
queue_name = [queue_namespace, base_queue_name].compact.join(':')
sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
end
def base_queue_name def base_queue_name
name name
.sub(/\AGitlab::/, '') .sub(/\AGitlab::/, '')
...@@ -18,6 +28,16 @@ module ApplicationWorker ...@@ -18,6 +28,16 @@ module ApplicationWorker
.tr('/', '_') .tr('/', '_')
end end
def queue_namespace(new_namespace = nil)
if new_namespace
sidekiq_options queue_namespace: new_namespace
set_queue
else
get_sidekiq_options['queue_namespace']&.to_s
end
end
def queue def queue
get_sidekiq_options['queue'].to_s get_sidekiq_options['queue'].to_s
end end
......
...@@ -5,6 +5,6 @@ module ClusterQueue ...@@ -5,6 +5,6 @@ module ClusterQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: :gcp_cluster queue_namespace :gcp_cluster
end end
end end
...@@ -4,6 +4,7 @@ module CronjobQueue ...@@ -4,6 +4,7 @@ module CronjobQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: :cronjob, retry: false queue_namespace :cronjob
sidekiq_options retry: false
end end
end end
...@@ -3,6 +3,6 @@ module GeoQueue ...@@ -3,6 +3,6 @@ module GeoQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: :geo queue_namespace :geo
end end
end end
...@@ -4,12 +4,14 @@ module Gitlab ...@@ -4,12 +4,14 @@ module Gitlab
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
queue_namespace :github_importer
# If a job produces an error it may block a stage from advancing # If a job produces an error it may block a stage from advancing
# forever. To prevent this from happening we prevent jobs from going to # forever. To prevent this from happening we prevent jobs from going to
# the dead queue. This does mean some resources may not be imported, but # the dead queue. This does mean some resources may not be imported, but
# this is better than a project being stuck in the "import" state # this is better than a project being stuck in the "import" state
# forever. # forever.
sidekiq_options queue: 'github_importer', dead: false, retry: 5 sidekiq_options dead: false, retry: 5
end end
end end
end end
......
...@@ -5,14 +5,6 @@ module PipelineQueue ...@@ -5,14 +5,6 @@ module PipelineQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: 'pipeline_default' queue_namespace :pipeline_default
end
class_methods do
def enqueue_in(group:)
raise ArgumentError, 'Unspecified queue group!' if group.empty?
sidekiq_options queue: "pipeline_#{group}"
end
end end
end end
...@@ -3,6 +3,8 @@ module RepositoryCheckQueue ...@@ -3,6 +3,8 @@ module RepositoryCheckQueue
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
sidekiq_options queue: :repository_check, retry: false queue_namespace :repository_check
sidekiq_options retry: false
end end
end end
...@@ -2,7 +2,7 @@ class CreatePipelineWorker ...@@ -2,7 +2,7 @@ class CreatePipelineWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :creation queue_namespace :pipeline_creation
def perform(project_id, user_id, ref, source, params = {}) def perform(project_id, user_id, ref, source, params = {})
project = Project.find(project_id) project = Project.find(project_id)
......
...@@ -2,7 +2,7 @@ class ExpireJobCacheWorker ...@@ -2,7 +2,7 @@ class ExpireJobCacheWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache queue_namespace :pipeline_cache
def perform(job_id) def perform(job_id)
job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
......
...@@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker ...@@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache queue_namespace :pipeline_cache
def perform(pipeline_id) def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by(id: pipeline_id) pipeline = Ci::Pipeline.find_by(id: pipeline_id)
......
module Geo module Geo
class BaseSchedulerWorker class BaseSchedulerWorker
include ApplicationWorker include ApplicationWorker
include CronjobQueue
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
DB_RETRIEVE_BATCH_SIZE = 1000 DB_RETRIEVE_BATCH_SIZE = 1000
......
module Geo module Geo
class FileDownloadDispatchWorker < Geo::BaseSchedulerWorker class FileDownloadDispatchWorker < Geo::BaseSchedulerWorker
include CronjobQueue
private private
def max_capacity def max_capacity
......
module Geo module Geo
class RepositoryShardSyncWorker < Geo::BaseSchedulerWorker class RepositoryShardSyncWorker < Geo::BaseSchedulerWorker
# We may have many long-running threads, so split them out sidekiq_options retry: false
# into their own queue to make it possible for other jobs to run.
sidekiq_options queue: :geo_repository_shard_sync, retry: false
attr_accessor :shard_name attr_accessor :shard_name
......
...@@ -9,7 +9,7 @@ module Gitlab ...@@ -9,7 +9,7 @@ module Gitlab
class AdvanceStageWorker class AdvanceStageWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options queue: 'github_importer_advance_stage', dead: false sidekiq_options dead: false
INTERVAL = 30.seconds.to_i INTERVAL = 30.seconds.to_i
......
class PagesWorker class PagesWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options queue: :pages, retry: false sidekiq_options retry: false
def perform(action, *arg) def perform(action, *arg)
send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
......
...@@ -2,7 +2,7 @@ class PipelineHooksWorker ...@@ -2,7 +2,7 @@ class PipelineHooksWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks queue_namespace :pipeline_hooks
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id) Ci::Pipeline.find_by(id: pipeline_id)
......
...@@ -2,7 +2,7 @@ class PipelineProcessWorker ...@@ -2,7 +2,7 @@ class PipelineProcessWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id) Ci::Pipeline.find_by(id: pipeline_id)
......
...@@ -2,7 +2,7 @@ class PipelineSuccessWorker ...@@ -2,7 +2,7 @@ class PipelineSuccessWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
......
...@@ -2,7 +2,7 @@ class PipelineUpdateWorker ...@@ -2,7 +2,7 @@ class PipelineUpdateWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id) Ci::Pipeline.find_by(id: pipeline_id)
......
...@@ -2,7 +2,7 @@ class StageUpdateWorker ...@@ -2,7 +2,7 @@ class StageUpdateWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing queue_namespace :pipeline_processing
def perform(stage_id) def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage| Ci::Stage.find_by(id: stage_id).try do |stage|
......
class UpdateHeadPipelineForMergeRequestWorker class UpdateHeadPipelineForMergeRequestWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue
sidekiq_options queue: 'pipeline_default'
def perform(merge_request_id) def perform(merge_request_id)
merge_request = MergeRequest.find(merge_request_id) merge_request = MergeRequest.find(merge_request_id)
......
...@@ -46,6 +46,8 @@ Sidekiq.configure_server do |config| ...@@ -46,6 +46,8 @@ Sidekiq.configure_server do |config|
Gitlab::SidekiqThrottler.execute! Gitlab::SidekiqThrottler.execute!
Gitlab::SidekiqVersioning.install!
config = Gitlab::Database.config || config = Gitlab::Database.config ||
Rails.application.config.database_configuration[Rails.env] Rails.application.config.database_configuration[Rails.env]
config['pool'] = Sidekiq.options[:concurrency] config['pool'] = Sidekiq.options[:concurrency]
...@@ -72,19 +74,3 @@ Sidekiq.configure_client do |config| ...@@ -72,19 +74,3 @@ Sidekiq.configure_client do |config|
chain.add Gitlab::SidekiqStatus::ClientMiddleware chain.add Gitlab::SidekiqStatus::ClientMiddleware
end end
end end
# The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring.
begin
queues = Gitlab::SidekiqConfig.worker_queues
Sidekiq.redis do |conn|
conn.pipelined do
queues.each do |queue|
conn.sadd('queues', queue)
end
end
end
rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
end
...@@ -25,8 +25,6 @@ ...@@ -25,8 +25,6 @@
- [new_note, 2] - [new_note, 2]
- [new_issue, 2] - [new_issue, 2]
- [new_merge_request, 2] - [new_merge_request, 2]
- [build, 2]
- [pipeline, 2]
- [pipeline_processing, 5] - [pipeline_processing, 5]
- [pipeline_creation, 4] - [pipeline_creation, 4]
- [pipeline_default, 3] - [pipeline_default, 3]
...@@ -38,11 +36,12 @@ ...@@ -38,11 +36,12 @@
- [mailers, 2] - [mailers, 2]
- [invalid_gpg_signature_update, 2] - [invalid_gpg_signature_update, 2]
- [create_gpg_signature, 2] - [create_gpg_signature, 2]
- [rebase, 2]
- [upload_checksum, 1] - [upload_checksum, 1]
- [repository_fork, 1] - [repository_fork, 1]
- [repository_import, 1] - [repository_import, 1]
- [github_importer, 1] - [github_importer, 1]
- [github_importer_advance_stage, 1] - [github_import_advance_stage, 1]
- [project_service, 1] - [project_service, 1]
- [delete_user, 1] - [delete_user, 1]
- [delete_merged_branches, 1] - [delete_merged_branches, 1]
...@@ -68,13 +67,15 @@ ...@@ -68,13 +67,15 @@
- [gcp_cluster, 1] - [gcp_cluster, 1]
- [project_migrate_hashed_storage, 1] - [project_migrate_hashed_storage, 1]
- [storage_migrator, 1] - [storage_migrator, 1]
# EE specific queues
# EE-specific queues
- [ldap_group_sync, 2] - [ldap_group_sync, 2]
- [geo, 1] - [geo, 1]
- [repository_update_mirror, 1] - [repository_update_mirror, 1]
- [repository_update_remote_mirror, 1] - [repository_update_remote_mirror, 1]
- [project_update_repository_storage, 1] - [project_update_repository_storage, 1]
- [admin_emails, 1] - [admin_emails, 1]
- [geo_base_scheduler, 1] # Parent class of geo_repository_shard_sync and cronjob:geo_file_download_dispatch
- [geo_project_sync, 1] - [geo_project_sync, 1]
- [geo_file_download, 1] - [geo_file_download, 1]
- [geo_repository_shard_sync, 1] - [geo_repository_shard_sync, 1]
......
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class MigrateGithubImporterAdvanceStageSidekiqQueue < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def up
sidekiq_queue_migrate 'github_importer_advance_stage', to: 'github_import_advance_stage'
end
def down
sidekiq_queue_migrate 'github_import_advance_stage', to: 'github_importer_advance_stage'
end
end
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
# #
# It's strongly recommended that you check this file into your version control system. # It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20171206221519) do ActiveRecord::Schema.define(version: 20171213160445) do
# These are extensions that must be enabled in order to support this database # These are extensions that must be enabled in order to support this database
enable_extension "plpgsql" enable_extension "plpgsql"
......
...@@ -18,6 +18,12 @@ Each separate argument denotes a group of queues that have to be processed by a ...@@ -18,6 +18,12 @@ Each separate argument denotes a group of queues that have to be processed by a
Sidekiq process. Multiple queues can be processed by the same process by Sidekiq process. Multiple queues can be processed by the same process by
separating them with a comma instead of a space. separating them with a comma instead of a space.
Instead of a queue, a queue namespace can also be provided, to have the process
automatically listen on all queues in that namespace without needing to
explicitly list all the queue names. For more information about queue namespaces,
see the relevant section in the
[Sidekiq style guide](../../development/sidekiq_style_guide.md#queue-namespaces).
For example, say you want to start 2 extra processes: one to process the For example, say you want to start 2 extra processes: one to process the
"process_commit" queue, and one to process the "post_receive" queue. This can be "process_commit" queue, and one to process the "post_receive" queue. This can be
done as follows: done as follows:
......
...@@ -9,25 +9,54 @@ All workers should include `ApplicationWorker` instead of `Sidekiq::Worker`, ...@@ -9,25 +9,54 @@ All workers should include `ApplicationWorker` instead of `Sidekiq::Worker`,
which adds some convenience methods and automatically sets the queue based on which adds some convenience methods and automatically sets the queue based on
the worker's name. the worker's name.
## Default Queue ## Dedicated Queues
Use of the "default" queue is not allowed. Every worker should use a queue that All workers should use their own queue, which is automatically set based on the
matches the worker's purpose the closest. For example, workers that are to be worker class name. For a worker named `ProcessSomethingWorker`, the queue name
executed periodically should use the "cronjob" queue. would be `process_something`. If you're not sure what queue a worker uses,
you can find it using `SomeWorker.queue`. There is almost never a reason to
manually override the queue name using `sidekiq_options queue: :some_queue`.
A list of all available queues can be found in `config/sidekiq_queues.yml`. ## Queue Namespaces
## Dedicated Queues While different workers cannot share a queue, they can share a queue namespace.
Most workers should use their own queue, which is automatically set based on the Defining a queue namespace for a worker makes it possible to start a Sidekiq
worker class name. For a worker named `ProcessSomethingWorker`, the queue name process that automatically handles jobs for all workers in that namespace,
would be `process_something`. If you're not sure what a worker's queue name is, without needing to explicitly list all their queue names. If, for example, all
you can find it using `SomeWorker.queue`. workers that are managed by sidekiq-cron use the `cronjob` queue namespace, we
can spin up a Sidekiq process specifically for these kinds of scheduled jobs.
If a new worker using the `cronjob` namespace is added later on, the Sidekiq
process will automatically pick up jobs for that worker too (after having been
restarted), without the need to change any configuration.
A queue namespace can be set using the `queue_namespace` DSL class method:
```ruby
class SomeScheduledTaskWorker
include ApplicationWorker
queue_namespace :cronjob
# ...
end
```
Behind the scenes, this will set `SomeScheduledTaskWorker.queue` to
`cronjob:some_scheduled_task`. Commonly used namespaces will have their own
concern module that can easily be included into the worker class, and that may
set other Sidekiq options besides the queue namespace. `CronjobQueue`, for
example, sets the namespace, but also disables retries.
`bundle exec sidekiq` is namespace-aware, and will automatically listen on all
queues in a namespace (technically: all queues prefixed with the namespace name)
when a namespace is provided instead of a simple queue name in the `--queue`
(`-q`) option, or in the `:queues:` section in `config/sidekiq_queues.yml`.
In some cases multiple workers do use the same queue. For example, the various Note that adding a worker to an existing namespace should be done with care, as
workers for updating CI pipelines all use the `pipeline` queue. Adding workers the extra jobs will take resources away from jobs from workers that were already
to existing queues should be done with care, as adding more workers can lead to there, if the resources available to the Sidekiq process handling the namespace
slow jobs blocking work (even for different jobs) on the shared queue. are not adjusted appropriately.
## Tests ## Tests
...@@ -36,7 +65,7 @@ tests should be placed in `spec/workers`. ...@@ -36,7 +65,7 @@ tests should be placed in `spec/workers`.
## Removing or renaming queues ## Removing or renaming queues
Try to avoid renaming or removing queues in minor and patch releases. Try to avoid renaming or removing workers and their queues in minor and patch releases.
During online update instance can have pending jobs and removing the queue can During online update instance can have pending jobs and removing the queue can
lead to those jobs being stuck forever. If you can't write migration for those lead to those jobs being stuck forever. If you can't write migration for those
Sidekiq jobs, please consider doing rename or remove queue in major release only. Sidekiq jobs, please consider doing rename or remove queue in major release only.
class RebaseWorker class RebaseWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options queue: :merge
def perform(merge_request_id, current_user_id) def perform(merge_request_id, current_user_id)
current_user = User.find(current_user_id) current_user = User.find(current_user_id)
merge_request = MergeRequest.find(merge_request_id) merge_request = MergeRequest.find(merge_request_id)
......
...@@ -33,8 +33,13 @@ module Gitlab ...@@ -33,8 +33,13 @@ module Gitlab
queue_groups = SidekiqCluster.parse_queues(argv) queue_groups = SidekiqCluster.parse_queues(argv)
all_queues = SidekiqConfig.worker_queues(@rails_path)
queue_groups.map! do |queues|
SidekiqConfig.expand_queues(queues, all_queues)
end
if @negate_queues if @negate_queues
all_queues = SidekiqConfig.config_queues(@rails_path)
queue_groups.map! { |queues| all_queues - queues } queue_groups.map! { |queues| all_queues - queues }
end end
......
require 'yaml' require 'yaml'
require 'set'
module Gitlab module Gitlab
module SidekiqConfig module SidekiqConfig
def self.redis_queues # This method is called by `bin/sidekiq-cluster` in EE, which runs outside
@redis_queues ||= Sidekiq::Queue.all.map(&:name) # of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.worker_queues(rails_path = Rails.root.to_s)
@worker_queues ||= {}
@worker_queues[rails_path] ||= YAML.load_file(File.join(rails_path, 'app/workers/all_queues.yml'))
end end
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside # This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods. # of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.config_queues(rails_path = Rails.root.to_s) def self.expand_queues(queues, all_queues = self.worker_queues)
return [] if queues.empty?
queues_set = all_queues.to_set
queues.flat_map do |queue|
[queue, *queues_set.grep(/\A#{queue}:/)]
end
end
def self.redis_queues
# Not memoized, because this can change during the life of the application
Sidekiq::Queue.all.map(&:name)
end
def self.config_queues
@config_queues ||= begin @config_queues ||= begin
config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml')) config = YAML.load_file(Rails.root.join('config/sidekiq_queues.yml'))
config[:queues].map(&:first) config[:queues].map(&:first)
end end
end end
...@@ -25,14 +44,6 @@ module Gitlab ...@@ -25,14 +44,6 @@ module Gitlab
find_workers(Rails.root.join('ee', 'app', 'workers')) find_workers(Rails.root.join('ee', 'app', 'workers'))
end end
def self.default_queues
[ActionMailer::DeliveryJob.queue_name, 'default']
end
def self.worker_queues
@worker_queues ||= (workers.map(&:queue) + default_queues).uniq
end
def self.find_workers(root) def self.find_workers(root)
concerns = root.join('concerns').to_s concerns = root.join('concerns').to_s
...@@ -45,7 +56,7 @@ module Gitlab ...@@ -45,7 +56,7 @@ module Gitlab
ns.camelize.constantize ns.camelize.constantize
end end
# Skip concerns # Skip things that aren't workers
workers.select { |w| w < Sidekiq::Worker } workers.select { |w| w < Sidekiq::Worker }
end end
end end
......
module Gitlab
module SidekiqVersioning
def self.install!
Sidekiq::Manager.prepend SidekiqVersioning::Manager
# The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring.
begin
queues = SidekiqConfig.worker_queues
if queues.any?
Sidekiq.redis do |conn|
conn.pipelined do
queues.each do |queue|
conn.sadd('queues', queue)
end
end
end
end
rescue ::Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
end
end
end
end
module Gitlab
module SidekiqVersioning
module Manager
def initialize(options = {})
options[:strict] = false
options[:queues] = SidekiqConfig.expand_queues(options[:queues])
Sidekiq.logger.info "Listening on queues #{options[:queues].uniq.sort}"
super
end
end
end
end
require_relative '../spec_helpers'
module RuboCop
module Cop
# Cop that makes sure workers include `ApplicationWorker`, not `Sidekiq::Worker`.
class IncludeSidekiqWorker < RuboCop::Cop::Cop
include SpecHelpers
MSG = 'Include `ApplicationWorker`, not `Sidekiq::Worker`.'.freeze
def_node_matcher :includes_sidekiq_worker?, <<~PATTERN
(send nil :include (const (const nil :Sidekiq) :Worker))
PATTERN
def on_send(node)
return if in_spec?(node)
return unless includes_sidekiq_worker?(node)
add_offense(node.arguments.first, :expression)
end
def autocorrect(node)
lambda do |corrector|
corrector.replace(node.source_range, 'ApplicationWorker')
end
end
end
end
end
require_relative '../spec_helpers'
module RuboCop
module Cop
# Cop that prevents manually setting a queue in Sidekiq workers.
class SidekiqOptionsQueue < RuboCop::Cop::Cop
include SpecHelpers
MSG = 'Do not manually set a queue; `ApplicationWorker` sets one automatically.'.freeze
def_node_matcher :sidekiq_options?, <<~PATTERN
(send nil :sidekiq_options $...)
PATTERN
def on_send(node)
return if in_spec?(node)
return unless sidekiq_options?(node)
node.arguments.first.each_node(:pair) do |pair|
key_name = pair.key.children[0]
add_offense(pair, :expression) if key_name == :queue
end
end
end
end
end
...@@ -3,10 +3,12 @@ require_relative 'cop/active_record_serialize' ...@@ -3,10 +3,12 @@ require_relative 'cop/active_record_serialize'
require_relative 'cop/custom_error_class' require_relative 'cop/custom_error_class'
require_relative 'cop/gem_fetcher' require_relative 'cop/gem_fetcher'
require_relative 'cop/in_batches' require_relative 'cop/in_batches'
require_relative 'cop/include_sidekiq_worker'
require_relative 'cop/line_break_after_guard_clauses' require_relative 'cop/line_break_after_guard_clauses'
require_relative 'cop/polymorphic_associations' require_relative 'cop/polymorphic_associations'
require_relative 'cop/project_path_helper' require_relative 'cop/project_path_helper'
require_relative 'cop/redirect_with_status' require_relative 'cop/redirect_with_status'
require_relative 'cop/sidekiq_options_queue'
require_relative 'cop/migration/add_column' require_relative 'cop/migration/add_column'
require_relative 'cop/migration/add_concurrent_foreign_key' require_relative 'cop/migration/add_concurrent_foreign_key'
require_relative 'cop/migration/add_concurrent_index' require_relative 'cop/migration/add_concurrent_index'
......
...@@ -11,28 +11,41 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -11,28 +11,41 @@ describe Gitlab::SidekiqCluster::CLI do
end end
context 'with arguments' do context 'with arguments' do
it 'starts the Sidekiq workers' do before do
expect(Gitlab::SidekiqCluster).to receive(:start).and_return([])
expect(cli).to receive(:write_pid) expect(cli).to receive(:write_pid)
expect(cli).to receive(:trap_signals) expect(cli).to receive(:trap_signals)
expect(cli).to receive(:start_loop) expect(cli).to receive(:start_loop)
end
it 'starts the Sidekiq workers' do
expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['foo']], 'test', Dir.pwd, dryrun: false)
.and_return([])
cli.run(%w(foo)) cli.run(%w(foo))
end end
context 'with --negate flag' do context 'with --negate flag' do
it 'starts Sidekiq workers for all queues on sidekiq_queues.yml except the ones on argv' do it 'starts Sidekiq workers for all queues in all_queues.yml except the ones in argv' do
expect(Gitlab::SidekiqConfig).to receive(:config_queues).and_return(['baz']) expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['baz'])
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['baz']], 'test', Dir.pwd, dryrun: false) .with([['baz']], 'test', Dir.pwd, dryrun: false)
.and_return([]) .and_return([])
expect(cli).to receive(:write_pid)
expect(cli).to receive(:trap_signals)
expect(cli).to receive(:start_loop)
cli.run(%w(foo -n)) cli.run(%w(foo -n))
end end
end end
context 'queue namespace expansion' do
it 'starts Sidekiq workers for all queues in all_queues.yml with a namespace in argv' do
expect(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return(['cronjob:foo', 'cronjob:bar'])
expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['cronjob', 'cronjob:foo', 'cronjob:bar']], 'test', Dir.pwd, dryrun: false)
.and_return([])
cli.run(%w(cronjob))
end
end
end end
end end
......
...@@ -23,7 +23,7 @@ describe Gitlab::SidekiqConfig do ...@@ -23,7 +23,7 @@ describe Gitlab::SidekiqConfig do
expect(queues).to include('post_receive') expect(queues).to include('post_receive')
expect(queues).to include('merge') expect(queues).to include('merge')
expect(queues).to include('cronjob') expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('mailers') expect(queues).to include('mailers')
expect(queues).to include('default') expect(queues).to include('default')
end end
...@@ -35,4 +35,25 @@ describe Gitlab::SidekiqConfig do ...@@ -35,4 +35,25 @@ describe Gitlab::SidekiqConfig do
expect(queues).to include('ldap_group_sync') expect(queues).to include('ldap_group_sync')
end end
end end
describe '.expand_queues' do
it 'expands queue namespaces to concrete queue names' do
queues = described_class.expand_queues(%w[cronjob])
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
end
it 'lets concrete queue names pass through' do
queues = described_class.expand_queues(%w[post_receive])
expect(queues).to include('post_receive')
end
it 'lets unknown queues pass through' do
queues = described_class.expand_queues(%w[unknown])
expect(queues).to include('unknown')
end
end
end end
require 'spec_helper'
describe Gitlab::SidekiqVersioning::Manager do
before do
Sidekiq::Manager.prepend described_class
end
describe '#initialize' do
it 'listens on all expanded queues' do
manager = Sidekiq::Manager.new(queues: %w[post_receive repository_fork cronjob unknown])
queues = manager.options[:queues]
expect(queues).to include('post_receive')
expect(queues).to include('repository_fork')
expect(queues).to include('cronjob')
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
expect(queues).to include('unknown')
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqVersioning, :sidekiq, :redis do
let(:foo_worker) do
Class.new do
def self.name
'FooWorker'
end
include ApplicationWorker
end
end
let(:bar_worker) do
Class.new do
def self.name
'BarWorker'
end
include ApplicationWorker
end
end
before do
allow(Gitlab::SidekiqConfig).to receive(:workers).and_return([foo_worker, bar_worker])
allow(Gitlab::SidekiqConfig).to receive(:worker_queues).and_return([foo_worker.queue, bar_worker.queue])
end
describe '.install!' do
it 'prepends SidekiqVersioning::Manager into Sidekiq::Manager' do
described_class.install!
expect(Sidekiq::Manager).to include(Gitlab::SidekiqVersioning::Manager)
end
it 'registers all versionless and versioned queues with Redis' do
described_class.install!
queues = Sidekiq::Queue.all.map(&:name)
expect(queues).to include('foo')
expect(queues).to include('bar')
end
end
end
require 'spec_helper'
require 'rubocop'
require 'rubocop/rspec/support'
require_relative '../../../rubocop/cop/include_sidekiq_worker'
describe RuboCop::Cop::IncludeSidekiqWorker do
include CopHelper
subject(:cop) { described_class.new }
context 'when `Sidekiq::Worker` is included' do
let(:source) { 'include Sidekiq::Worker' }
let(:correct_source) { 'include ApplicationWorker' }
it 'registers an offense ' do
inspect_source(cop, source)
aggregate_failures do
expect(cop.offenses.size).to eq(1)
expect(cop.offenses.map(&:line)).to eq([1])
expect(cop.highlights).to eq(['Sidekiq::Worker'])
end
end
it 'autocorrects to the right version' do
autocorrected = autocorrect_source(cop, source)
expect(autocorrected).to eq(correct_source)
end
end
end
require 'spec_helper'
require 'rubocop'
require 'rubocop/rspec/support'
require_relative '../../../rubocop/cop/sidekiq_options_queue'
describe RuboCop::Cop::SidekiqOptionsQueue do
include CopHelper
subject(:cop) { described_class.new }
it 'registers an offense when `sidekiq_options` is used with the `queue` option' do
inspect_source(cop, 'sidekiq_options queue: "some_queue"')
aggregate_failures do
expect(cop.offenses.size).to eq(1)
expect(cop.offenses.map(&:line)).to eq([1])
expect(cop.highlights).to eq(['queue: "some_queue"'])
end
end
it 'does not register an offense when `sidekiq_options` is used with another option' do
inspect_source(cop, 'sidekiq_options retry: false')
expect(cop.offenses).to be_empty
end
end
...@@ -17,6 +17,14 @@ describe ApplicationWorker do ...@@ -17,6 +17,14 @@ describe ApplicationWorker do
end end
end end
describe '.queue_namespace' do
it 'sets the queue name based on the class name' do
worker.queue_namespace :some_namespace
expect(worker.queue).to eq('some_namespace:foo_bar_dummy')
end
end
describe '.queue' do describe '.queue' do
it 'returns the queue name' do it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue worker.sidekiq_options queue: :some_queue
......
...@@ -14,6 +14,6 @@ describe ClusterQueue do ...@@ -14,6 +14,6 @@ describe ClusterQueue do
it 'sets a default pipelines queue automatically' do it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue']) expect(worker.sidekiq_options['queue'])
.to eq :gcp_cluster .to eq 'gcp_cluster:dummy'
end end
end end
...@@ -13,7 +13,7 @@ describe CronjobQueue do ...@@ -13,7 +13,7 @@ describe CronjobQueue do
end end
it 'sets the queue name of a worker' do it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob') expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob:dummy')
end end
it 'disables retrying of failed jobs' do it 'disables retrying of failed jobs' do
......
...@@ -13,6 +13,6 @@ describe GeoQueue do ...@@ -13,6 +13,6 @@ describe GeoQueue do
end end
it 'sets the queue name of a worker' do it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('geo') expect(worker.sidekiq_options['queue'].to_s).to eq('geo:dummy')
end end
end end
...@@ -11,6 +11,6 @@ describe Gitlab::GithubImport::Queue do ...@@ -11,6 +11,6 @@ describe Gitlab::GithubImport::Queue do
include Gitlab::GithubImport::Queue include Gitlab::GithubImport::Queue
end end
expect(worker.sidekiq_options['queue']).to eq('github_importer') expect(worker.sidekiq_options['queue']).to eq('github_importer:dummy')
end end
end end
...@@ -14,15 +14,6 @@ describe PipelineQueue do ...@@ -14,15 +14,6 @@ describe PipelineQueue do
it 'sets a default pipelines queue automatically' do it 'sets a default pipelines queue automatically' do
expect(worker.sidekiq_options['queue']) expect(worker.sidekiq_options['queue'])
.to eq 'pipeline_default' .to eq 'pipeline_default:dummy'
end
describe '.enqueue_in' do
it 'sets a custom sidekiq queue with prefix and group' do
worker.enqueue_in(group: :processing)
expect(worker.sidekiq_options['queue'])
.to eq 'pipeline_processing'
end
end end
end end
...@@ -13,7 +13,7 @@ describe RepositoryCheckQueue do ...@@ -13,7 +13,7 @@ describe RepositoryCheckQueue do
end end
it 'sets the queue name of a worker' do it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check') expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check:dummy')
end end
it 'disables retrying of failed jobs' do it 'disables retrying of failed jobs' do
......
require 'spec_helper' require 'spec_helper'
describe 'Every Sidekiq worker' do describe 'Every Sidekiq worker' do
it 'includes ApplicationWorker' do
expect(Gitlab::SidekiqConfig.workers).to all(include(ApplicationWorker))
end
it 'does not use the default queue' do it 'does not use the default queue' do
expect(Gitlab::SidekiqConfig.workers.map(&:queue)).not_to include('default') expect(Gitlab::SidekiqConfig.workers.map(&:queue)).not_to include('default')
end end
it 'uses the cronjob queue when the worker runs as a cronjob' do it 'uses the cronjob queue when the worker runs as a cronjob' do
expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob')) expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(start_with('cronjob:'))
end end
it 'defines the queue in the Sidekiq configuration file' do it 'has its queue in app/workers/all_queues.yml', :aggregate_failures do
config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set file_worker_queues = Gitlab::SidekiqConfig.worker_queues.to_set
worker_queues = Gitlab::SidekiqConfig.workers.map(&:queue).to_set
worker_queues << ActionMailer::DeliveryJob.queue_name
worker_queues << 'default'
expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names)) missing_from_file = worker_queues - file_worker_queues
expect(missing_from_file).to be_empty, "expected #{missing_from_file.to_a.inspect} to be in app/workers/all_queues.yml"
unncessarily_in_file = file_worker_queues - worker_queues
expect(unncessarily_in_file).to be_empty, "expected #{unncessarily_in_file.to_a.inspect} not to be in app/workers/all_queues.yml"
end
it 'has its queue or namespace in config/sidekiq_queues.yml', :aggregate_failures do
config_queues = Gitlab::SidekiqConfig.config_queues.to_set
Gitlab::SidekiqConfig.workers.each do |worker|
queue = worker.queue
queue_namespace = queue.split(':').first
expect(config_queues).to include(queue).or(include(queue_namespace))
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment