Commit 0aed998d authored by Douwe Maan's avatar Douwe Maan Committed by Yorick Peterse

Merge branch 'separate-sidekiq-queues' into 'master'

Use separate queues for all Sidekiq workers

## What does this MR do?

This MR updates all workers so that they (mostly) use their own Sidekiq queues. This in turn allows us to monitor queues more accurately and in the future impose queue specific throttles, limits, etc.

This is a critical part we need in 8.13, despite it being so close to release day.

See https://gitlab.com/gitlab-org/gitlab-ce/issues/23370 for more information.

## Does this MR meet the acceptance criteria?

- [x] [CHANGELOG](https://gitlab.com/gitlab-org/gitlab-ce/blob/master/CHANGELOG.md) entry added
- [x] [Documentation created/updated](https://gitlab.com/gitlab-org/gitlab-ce/blob/master/doc/development/doc_styleguide.md)
- [x] Conform by the [merge request performance guides](http://docs.gitlab.com/ce/development/merge_request_performance_guidelines.html)
- [x] Conform by the [style guides](https://gitlab.com/gitlab-org/gitlab-ce/blob/master/CONTRIBUTING.md#style-guides)
- [x] Branch has no merge conflicts with `master` (if it does - rebase it please)
- [x] [Squashed related commits together](https://git-scm.com/book/en/Git-Tools-Rewriting-History#Squashing-Commits)

## What are the relevant issue numbers?

https://gitlab.com/gitlab-org/gitlab-ce/issues/23370

See merge request !7006
parent f75d8a31
...@@ -7,6 +7,7 @@ Please view this file on the master branch, on stable branches it's out of date. ...@@ -7,6 +7,7 @@ Please view this file on the master branch, on stable branches it's out of date.
## 8.13.0 (2016-10-22) ## 8.13.0 (2016-10-22)
- Fix save button on project pipeline settings page. (!6955) - Fix save button on project pipeline settings page. (!6955)
- All Sidekiq workers now use their own queue
- Avoid race condition when asynchronously removing expired artifacts. (!6881) - Avoid race condition when asynchronously removing expired artifacts. (!6881)
- Improve Merge When Build Succeeds triggers and execute on pipeline success. (!6675) - Improve Merge When Build Succeeds triggers and execute on pipeline success. (!6675)
- Respond with 404 Not Found for non-existent tags (Linus Thiel) - Respond with 404 Not Found for non-existent tags (Linus Thiel)
......
class AdminEmailWorker class AdminEmailWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
sidekiq_options retry: false # this job auto-repeats via sidekiq-cron
def perform def perform
repository_check_failed_count = Project.where(last_repository_check_failed: true).count repository_check_failed_count = Project.where(last_repository_check_failed: true).count
......
class BuildCoverageWorker class BuildCoverageWorker
include Sidekiq::Worker include Sidekiq::Worker
sidekiq_options queue: :default include BuildQueue
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id) Ci::Build.find_by(id: build_id)
......
class BuildEmailWorker class BuildEmailWorker
include Sidekiq::Worker include Sidekiq::Worker
include BuildQueue
def perform(build_id, recipients, push_data) def perform(build_id, recipients, push_data)
recipients.each do |recipient| recipients.each do |recipient|
......
class BuildFinishedWorker class BuildFinishedWorker
include Sidekiq::Worker include Sidekiq::Worker
include BuildQueue
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build| Ci::Build.find_by(id: build_id).try do |build|
......
class BuildHooksWorker class BuildHooksWorker
include Sidekiq::Worker include Sidekiq::Worker
sidekiq_options queue: :default include BuildQueue
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id) Ci::Build.find_by(id: build_id)
......
class BuildSuccessWorker class BuildSuccessWorker
include Sidekiq::Worker include Sidekiq::Worker
sidekiq_options queue: :default include BuildQueue
def perform(build_id) def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build| Ci::Build.find_by(id: build_id).try do |build|
......
# This worker clears all cache fields in the database, working in batches. # This worker clears all cache fields in the database, working in batches.
class ClearDatabaseCacheWorker class ClearDatabaseCacheWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
BATCH_SIZE = 1000 BATCH_SIZE = 1000
......
# Concern for setting Sidekiq settings for the various CI build workers.
module BuildQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :build
end
end
# Concern that sets various Sidekiq settings for workers executed using a
# cronjob.
module CronjobQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :cronjob, retry: false
end
end
# Concern that sets the queue of a Sidekiq worker based on the worker's class
# name/namespace.
module DedicatedSidekiqQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: name.sub(/Worker\z/, '').underscore.tr('/', '_')
end
end
# Concern for setting Sidekiq settings for the various CI pipeline workers.
module PipelineQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :pipeline
end
end
# Concern for setting Sidekiq settings for the various repository check workers.
module RepositoryCheckQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :repository_check, retry: false
end
end
class DeleteUserWorker class DeleteUserWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(current_user_id, delete_user_id, options = {}) def perform(current_user_id, delete_user_id, options = {})
delete_user = User.find(delete_user_id) delete_user = User.find(delete_user_id)
......
class EmailReceiverWorker class EmailReceiverWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :incoming_email
def perform(raw) def perform(raw)
return unless Gitlab::IncomingEmail.enabled? return unless Gitlab::IncomingEmail.enabled?
......
class EmailsOnPushWorker class EmailsOnPushWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :mailers
attr_reader :email, :skip_premailer attr_reader :email, :skip_premailer
def perform(project_id, recipients, push_data, options = {}) def perform(project_id, recipients, push_data, options = {})
......
class ExpireBuildArtifactsWorker class ExpireBuildArtifactsWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
def perform def perform
Rails.logger.info 'Scheduling removal of build artifacts' Rails.logger.info 'Scheduling removal of build artifacts'
......
class ExpireBuildInstanceArtifactsWorker class ExpireBuildInstanceArtifactsWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(build_id) def perform(build_id)
build = Ci::Build build = Ci::Build
......
class GitGarbageCollectWorker class GitGarbageCollectWorker
include Sidekiq::Worker include Sidekiq::Worker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
sidekiq_options queue: :gitlab_shell, retry: false sidekiq_options retry: false
def perform(project_id) def perform(project_id)
project = Project.find(project_id) project = Project.find(project_id)
......
class GitlabShellWorker class GitlabShellWorker
include Sidekiq::Worker include Sidekiq::Worker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
sidekiq_options queue: :gitlab_shell
def perform(action, *arg) def perform(action, *arg)
gitlab_shell.send(action, *arg) gitlab_shell.send(action, *arg)
......
class GroupDestroyWorker class GroupDestroyWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :default
def perform(group_id, user_id) def perform(group_id, user_id)
begin begin
......
class ImportExportProjectCleanupWorker class ImportExportProjectCleanupWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
sidekiq_options queue: :default
def perform def perform
ImportExportCleanUpService.new.execute ImportExportCleanUpService.new.execute
......
...@@ -3,6 +3,7 @@ require 'socket' ...@@ -3,6 +3,7 @@ require 'socket'
class IrkerWorker class IrkerWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(project_id, chans, colors, push_data, settings) def perform(project_id, chans, colors, push_data, settings)
project = Project.find(project_id) project = Project.find(project_id)
......
class MergeWorker class MergeWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :default
def perform(merge_request_id, current_user_id, params) def perform(merge_request_id, current_user_id, params)
params = params.with_indifferent_access params = params.with_indifferent_access
......
class NewNoteWorker class NewNoteWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :default
def perform(note_id, note_params) def perform(note_id, note_params)
note = Note.find(note_id) note = Note.find(note_id)
......
class PipelineHooksWorker class PipelineHooksWorker
include Sidekiq::Worker include Sidekiq::Worker
sidekiq_options queue: :default include PipelineQueue
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id) Ci::Pipeline.find_by(id: pipeline_id)
......
class PipelineMetricsWorker class PipelineMetricsWorker
include Sidekiq::Worker include Sidekiq::Worker
include PipelineQueue
sidekiq_options queue: :default
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
......
class PipelineProcessWorker class PipelineProcessWorker
include Sidekiq::Worker include Sidekiq::Worker
include PipelineQueue
sidekiq_options queue: :default
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id) Ci::Pipeline.find_by(id: pipeline_id)
......
class PipelineSuccessWorker class PipelineSuccessWorker
include Sidekiq::Worker include Sidekiq::Worker
sidekiq_options queue: :default include PipelineQueue
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
......
class PipelineUpdateWorker class PipelineUpdateWorker
include Sidekiq::Worker include Sidekiq::Worker
include PipelineQueue
sidekiq_options queue: :default
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id) Ci::Pipeline.find_by(id: pipeline_id)
......
class PostReceive class PostReceive
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :post_receive
def perform(repo_path, identifier, changes) def perform(repo_path, identifier, changes)
if path = Gitlab.config.repositories.storages.find { |p| repo_path.start_with?(p[1].to_s) } if path = Gitlab.config.repositories.storages.find { |p| repo_path.start_with?(p[1].to_s) }
......
...@@ -5,8 +5,7 @@ ...@@ -5,8 +5,7 @@
# storage engine as much. # storage engine as much.
class ProjectCacheWorker class ProjectCacheWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :default
LEASE_TIMEOUT = 15.minutes.to_i LEASE_TIMEOUT = 15.minutes.to_i
......
class ProjectDestroyWorker class ProjectDestroyWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :default
def perform(project_id, user_id, params) def perform(project_id, user_id, params)
begin begin
......
class ProjectExportWorker class ProjectExportWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :gitlab_shell, retry: 3 sidekiq_options retry: 3
def perform(current_user_id, project_id) def perform(current_user_id, project_id)
current_user = User.find(current_user_id) current_user = User.find(current_user_id)
......
class ProjectServiceWorker class ProjectServiceWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :project_web_hook
def perform(hook_id, data) def perform(hook_id, data)
data = data.with_indifferent_access data = data.with_indifferent_access
......
class ProjectWebHookWorker class ProjectWebHookWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :project_web_hook
def perform(hook_id, data, hook_name) def perform(hook_id, data, hook_name)
data = data.with_indifferent_access data = data.with_indifferent_access
......
class PruneOldEventsWorker class PruneOldEventsWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
def perform def perform
# Contribution calendar shows maximum 12 months of events. # Contribution calendar shows maximum 12 months of events.
......
class RemoveExpiredGroupLinksWorker class RemoveExpiredGroupLinksWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
def perform def perform
ProjectGroupLink.expired.destroy_all ProjectGroupLink.expired.destroy_all
......
class RemoveExpiredMembersWorker class RemoveExpiredMembersWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
def perform def perform
Member.expired.find_each do |member| Member.expired.find_each do |member|
......
class RepositoryArchiveCacheWorker class RepositoryArchiveCacheWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
sidekiq_options queue: :default
def perform def perform
RepositoryArchiveCleanUpService.new.execute RepositoryArchiveCleanUpService.new.execute
......
module RepositoryCheck module RepositoryCheck
class BatchWorker class BatchWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
RUN_TIME = 3600 RUN_TIME = 3600
sidekiq_options retry: false
def perform def perform
start = Time.now start = Time.now
# This loop will break after a little more than one hour ('a little # This loop will break after a little more than one hour ('a little
# more' because `git fsck` may take a few minutes), or if it runs out of # more' because `git fsck` may take a few minutes), or if it runs out of
# projects to check. By default sidekiq-cron will start a new # projects to check. By default sidekiq-cron will start a new
...@@ -17,15 +16,15 @@ module RepositoryCheck ...@@ -17,15 +16,15 @@ module RepositoryCheck
project_ids.each do |project_id| project_ids.each do |project_id|
break if Time.now - start >= RUN_TIME break if Time.now - start >= RUN_TIME
break unless current_settings.repository_checks_enabled break unless current_settings.repository_checks_enabled
next unless try_obtain_lease(project_id) next unless try_obtain_lease(project_id)
SingleRepositoryWorker.new.perform(project_id) SingleRepositoryWorker.new.perform(project_id)
end end
end end
private private
# Project.find_each does not support WHERE clauses and # Project.find_each does not support WHERE clauses and
# Project.find_in_batches does not support ordering. So we just build an # Project.find_in_batches does not support ordering. So we just build an
# array of ID's. This is OK because we do it only once an hour, because # array of ID's. This is OK because we do it only once an hour, because
...@@ -39,7 +38,7 @@ module RepositoryCheck ...@@ -39,7 +38,7 @@ module RepositoryCheck
reorder('last_repository_check_at ASC').limit(limit).pluck(:id) reorder('last_repository_check_at ASC').limit(limit).pluck(:id)
never_checked_projects + old_check_projects never_checked_projects + old_check_projects
end end
def try_obtain_lease(id) def try_obtain_lease(id)
# Use a 24-hour timeout because on servers/projects where 'git fsck' is # Use a 24-hour timeout because on servers/projects where 'git fsck' is
# super slow we definitely do not want to run it twice in parallel. # super slow we definitely do not want to run it twice in parallel.
...@@ -48,7 +47,7 @@ module RepositoryCheck ...@@ -48,7 +47,7 @@ module RepositoryCheck
timeout: 24.hours timeout: 24.hours
).try_obtain ).try_obtain
end end
def current_settings def current_settings
# No caching of the settings! If we cache them and an admin disables # No caching of the settings! If we cache them and an admin disables
# this feature, an active RepositoryCheckWorker would keep going for up # this feature, an active RepositoryCheckWorker would keep going for up
......
module RepositoryCheck module RepositoryCheck
class ClearWorker class ClearWorker
include Sidekiq::Worker include Sidekiq::Worker
include RepositoryCheckQueue
sidekiq_options retry: false
def perform def perform
# Do small batched updates because these updates will be slow and locking # Do small batched updates because these updates will be slow and locking
......
module RepositoryCheck module RepositoryCheck
class SingleRepositoryWorker class SingleRepositoryWorker
include Sidekiq::Worker include Sidekiq::Worker
include RepositoryCheckQueue
sidekiq_options retry: false
def perform(project_id) def perform(project_id)
project = Project.find(project_id) project = Project.find(project_id)
......
class RepositoryForkWorker class RepositoryForkWorker
include Sidekiq::Worker include Sidekiq::Worker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
sidekiq_options queue: :gitlab_shell
def perform(project_id, forked_from_repository_storage_path, source_path, target_path) def perform(project_id, forked_from_repository_storage_path, source_path, target_path)
Gitlab::Metrics.add_event(:fork_repository, Gitlab::Metrics.add_event(:fork_repository,
......
class RepositoryImportWorker class RepositoryImportWorker
include Sidekiq::Worker include Sidekiq::Worker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
sidekiq_options queue: :gitlab_shell
attr_accessor :project, :current_user attr_accessor :project, :current_user
......
class RequestsProfilesWorker class RequestsProfilesWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
sidekiq_options queue: :default
def perform def perform
Gitlab::RequestProfiler.remove_all_profiles Gitlab::RequestProfiler.remove_all_profiles
......
class StuckCiBuildsWorker class StuckCiBuildsWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
BUILD_STUCK_TIMEOUT = 1.day BUILD_STUCK_TIMEOUT = 1.day
......
class SystemHookWorker class SystemHookWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :system_hook
def perform(hook_id, data, hook_name) def perform(hook_id, data, hook_name)
SystemHook.find(hook_id).execute(data, hook_name) SystemHook.find(hook_id).execute(data, hook_name)
......
class TrendingProjectsWorker class TrendingProjectsWorker
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue
sidekiq_options queue: :trending_projects
def perform def perform
Rails.logger.info('Refreshing trending projects') Rails.logger.info('Refreshing trending projects')
......
class UpdateMergeRequestsWorker class UpdateMergeRequestsWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(project_id, user_id, oldrev, newrev, ref) def perform(project_id, user_id, oldrev, newrev, ref)
project = Project.find_by(id: project_id) project = Project.find_by(id: project_id)
......
...@@ -4,6 +4,7 @@ cd $(dirname $0)/.. ...@@ -4,6 +4,7 @@ cd $(dirname $0)/..
app_root=$(pwd) app_root=$(pwd)
sidekiq_pidfile="$app_root/tmp/pids/sidekiq.pid" sidekiq_pidfile="$app_root/tmp/pids/sidekiq.pid"
sidekiq_logfile="$app_root/log/sidekiq.log" sidekiq_logfile="$app_root/log/sidekiq.log"
sidekiq_config="$app_root/config/sidekiq_queues.yml"
gitlab_user=$(ls -l config.ru | awk '{print $3}') gitlab_user=$(ls -l config.ru | awk '{print $3}')
warn() warn()
...@@ -37,7 +38,7 @@ start_no_deamonize() ...@@ -37,7 +38,7 @@ start_no_deamonize()
start_sidekiq() start_sidekiq()
{ {
exec bundle exec sidekiq -q post_receive -q mailers -q archive_repo -q system_hook -q project_web_hook -q gitlab_shell -q incoming_email -q runner -q common -q default -e $RAILS_ENV -P $sidekiq_pidfile "$@" exec bundle exec sidekiq -C "${sidekiq_config}" -e $RAILS_ENV -P $sidekiq_pidfile "$@"
} }
load_ok() load_ok()
......
...@@ -24,7 +24,8 @@ module Gitlab ...@@ -24,7 +24,8 @@ module Gitlab
#{config.root}/app/models/ci #{config.root}/app/models/ci
#{config.root}/app/models/hooks #{config.root}/app/models/hooks
#{config.root}/app/models/members #{config.root}/app/models/members
#{config.root}/app/models/project_services)) #{config.root}/app/models/project_services
#{config.root}/app/workers/concerns))
config.generators.templates.push("#{config.root}/generator_templates") config.generators.templates.push("#{config.root}/generator_templates")
......
# This configuration file should be exclusively used to set queue settings for
# Sidekiq. Any other setting should be specified using the Sidekiq CLI or the
# Sidekiq Ruby API (see config/initializers/sidekiq.rb).
---
# All the queues to process and their weights. Every queue _must_ have a weight
# defined.
#
# The available weights are as follows
#
# 1: low priority
# 2: medium priority
# 3: high priority
# 5: _super_ high priority, this should only be used for _very_ important queues
#
# As per http://stackoverflow.com/a/21241357/290102 the formula for calculating
# the likelihood of a job being popped off a queue (given all queues have work
# to perform) is:
#
# chance = (queue weight / total weight of all queues) * 100
:queues:
- [post_receive, 5]
- [merge, 5]
- [update_merge_requests, 3]
- [new_note, 2]
- [build, 2]
- [pipeline, 2]
- [gitlab_shell, 2]
- [email_receiver, 2]
- [emails_on_push, 2]
- [repository_fork, 1]
- [repository_import, 1]
- [project_service, 1]
- [clear_database_cache, 1]
- [delete_user, 1]
- [expire_build_instance_artifacts, 1]
- [group_destroy, 1]
- [irker, 1]
- [project_cache, 1]
- [project_destroy, 1]
- [project_export, 1]
- [project_web_hook, 1]
- [repository_check, 1]
- [system_hook, 1]
- [git_garbage_collect, 1]
- [cronjob, 1]
- [default, 1]
require 'json'
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class MigrateSidekiqQueuesFromDefault < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = true
DOWNTIME_REASON = <<-EOF
Moving Sidekiq jobs from queues requires Sidekiq to be stopped. Not stopping
Sidekiq will result in the loss of jobs that are scheduled after this
migration completes.
EOF
disable_ddl_transaction!
# Jobs for which the queue names have been changed (e.g. multiple workers
# using the same non-default queue).
#
# The keys are the old queue names, the values the jobs to move and their new
# queue names.
RENAMED_QUEUES = {
gitlab_shell: {
'GitGarbageCollectorWorker' => :git_garbage_collector,
'ProjectExportWorker' => :project_export,
'RepositoryForkWorker' => :repository_fork,
'RepositoryImportWorker' => :repository_import
},
project_web_hook: {
'ProjectServiceWorker' => :project_service
},
incoming_email: {
'EmailReceiverWorker' => :email_receiver
},
mailers: {
'EmailsOnPushWorker' => :emails_on_push
},
default: {
'AdminEmailWorker' => :cronjob,
'BuildCoverageWorker' => :build,
'BuildEmailWorker' => :build,
'BuildFinishedWorker' => :build,
'BuildHooksWorker' => :build,
'BuildSuccessWorker' => :build,
'ClearDatabaseCacheWorker' => :clear_database_cache,
'DeleteUserWorker' => :delete_user,
'ExpireBuildArtifactsWorker' => :cronjob,
'ExpireBuildInstanceArtifactsWorker' => :expire_build_instance_artifacts,
'GroupDestroyWorker' => :group_destroy,
'ImportExportProjectCleanupWorker' => :cronjob,
'IrkerWorker' => :irker,
'MergeWorker' => :merge,
'NewNoteWorker' => :new_note,
'PipelineHooksWorker' => :pipeline,
'PipelineMetricsWorker' => :pipeline,
'PipelineProcessWorker' => :pipeline,
'PipelineSuccessWorker' => :pipeline,
'PipelineUpdateWorker' => :pipeline,
'ProjectCacheWorker' => :project_cache,
'ProjectDestroyWorker' => :project_destroy,
'PruneOldEventsWorker' => :cronjob,
'RemoveExpiredGroupLinksWorker' => :cronjob,
'RemoveExpiredMembersWorker' => :cronjob,
'RepositoryArchiveCacheWorker' => :cronjob,
'RepositoryCheck::BatchWorker' => :cronjob,
'RepositoryCheck::ClearWorker' => :repository_check,
'RepositoryCheck::SingleRepositoryWorker' => :repository_check,
'RequestsProfilesWorker' => :cronjob,
'StuckCiBuildsWorker' => :cronjob,
'UpdateMergeRequestsWorker' => :update_merge_requests
}
}
def up
Sidekiq.redis do |redis|
RENAMED_QUEUES.each do |queue, jobs|
migrate_from_queue(redis, queue, jobs)
end
end
end
def down
Sidekiq.redis do |redis|
RENAMED_QUEUES.each do |dest_queue, jobs|
jobs.each do |worker, from_queue|
migrate_from_queue(redis, from_queue, worker => dest_queue)
end
end
end
end
def migrate_from_queue(redis, queue, job_mapping)
while job = redis.lpop("queue:#{queue}")
payload = JSON.load(job)
new_queue = job_mapping[payload['class']]
# If we have no target queue to migrate to we're probably dealing with
# some ancient job for which the worker no longer exists. In that case
# there's no sane option we can take, other than just dropping the job.
next unless new_queue
payload['queue'] = new_queue
redis.lpush("queue:#{new_queue}", JSON.dump(payload))
end
end
end
...@@ -14,7 +14,8 @@ ...@@ -14,7 +14,8 @@
- [Testing standards and style guidelines](testing.md) - [Testing standards and style guidelines](testing.md)
- [UI guide](ui_guide.md) for building GitLab with existing CSS styles and elements - [UI guide](ui_guide.md) for building GitLab with existing CSS styles and elements
- [Frontend guidelines](frontend.md) - [Frontend guidelines](frontend.md)
- [SQL guidelines](sql.md) for SQL guidelines - [SQL guidelines](sql.md) for working with SQL queries
- [Sidekiq guidelines](sidekiq_style_guide.md) for working with Sidekiq workers
## Process ## Process
......
# Sidekiq Style Guide
This document outlines various guidelines that should be followed when adding or
modifying Sidekiq workers.
## Default Queue
Use of the "default" queue is not allowed. Every worker should use a queue that
matches the worker's purpose the closest. For example, workers that are to be
executed periodically should use the "cronjob" queue.
A list of all available queues can be found in `config/sidekiq_queues.yml`.
## Dedicated Queues
Most workers should use their own queue. To ease this process a worker can
include the `DedicatedSidekiqQueue` concern as follows:
```ruby
class ProcessSomethingWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
end
```
This will set the queue name based on the class' name, minus the `Worker`
suffix. In the above example this would lead to the queue being
`process_something`.
In some cases multiple workers do use the same queue. For example, the various
workers for updating CI pipelines all use the `pipeline` queue. Adding workers
to existing queues should be done with care, as adding more workers can lead to
slow jobs blocking work (even for different jobs) on the shared queue.
## Tests
Each Sidekiq worker must be tested using RSpec, just like any other class. These
tests should be placed in `spec/workers`.
require 'spec_helper'
describe BuildQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include BuildQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('build')
end
end
require 'spec_helper'
describe CronjobQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include CronjobQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
end
it 'disables retrying of failed jobs' do
expect(worker.sidekiq_options['retry']).to eq(false)
end
end
require 'spec_helper'
describe DedicatedSidekiqQueue do
let(:worker) do
Class.new do
def self.name
'Foo::Bar::DummyWorker'
end
include Sidekiq::Worker
include DedicatedSidekiqQueue
end
end
describe 'queue names' do
it 'sets the queue name based on the class name' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
end
end
end
require 'spec_helper'
describe PipelineQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include PipelineQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('pipeline')
end
end
require 'spec_helper'
describe RepositoryCheckQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include RepositoryCheckQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check')
end
it 'disables retrying of failed jobs' do
expect(worker.sidekiq_options['retry']).to eq(false)
end
end
require 'spec_helper'
describe 'Every Sidekiq worker' do
let(:workers) do
root = Rails.root.join('app', 'workers')
concerns = root.join('concerns').to_s
workers = Dir[root.join('**', '*.rb')].
reject { |path| path.start_with?(concerns) }
workers.map do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
ns.camelize.constantize
end
end
it 'does not use the default queue' do
workers.each do |worker|
expect(worker.sidekiq_options['queue'].to_s).not_to eq('default')
end
end
it 'uses the cronjob queue when the worker runs as a cronjob' do
cron_workers = Settings.cron_jobs.
map { |job_name, options| options['job_class'].constantize }.
to_set
workers.each do |worker|
next unless cron_workers.include?(worker)
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
end
end
it 'defines the queue in the Sidekiq configuration file' do
config = YAML.load_file(Rails.root.join('config', 'sidekiq_queues.yml').to_s)
queue_names = config[:queues].map { |(queue, _)| queue }.to_set
workers.each do |worker|
expect(queue_names).to include(worker.sidekiq_options['queue'].to_s)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment