Commit 29be9c1a authored by Sean McGivern's avatar Sean McGivern

Merge branch 'dm-application-worker' into 'master'

Add ApplicationWorker and make every worker include it

See merge request gitlab-org/gitlab-ce!15632
parents a39d6d89 1e6ca3c4
......@@ -2,6 +2,7 @@ require 'carrierwave/orm/activerecord'
class Group < Namespace
include Gitlab::ConfigHelper
include AfterCommitQueue
include AccessRequestable
include Avatarable
include Referable
......
......@@ -2,6 +2,7 @@ require 'digest/md5'
class Key < ActiveRecord::Base
include Gitlab::CurrentSettings
include AfterCommitQueue
include Sortable
belongs_to :user
......
class Member < ActiveRecord::Base
include AfterCommitQueue
include Sortable
include Importable
include Expirable
......
......@@ -211,7 +211,7 @@ class Service < ActiveRecord::Base
def async_execute(data)
return unless supported_events.include?(data[:object_kind])
Sidekiq::Client.enqueue(ProjectServiceWorker, id, data)
ProjectServiceWorker.perform_async(id, data)
end
def issue_tracker?
......
......@@ -7,6 +7,7 @@ class User < ActiveRecord::Base
include Gitlab::ConfigHelper
include Gitlab::CurrentSettings
include Gitlab::SQL::Pattern
include AfterCommitQueue
include Avatarable
include Referable
include Sortable
......@@ -903,6 +904,7 @@ class User < ActiveRecord::Base
def post_destroy_hook
log_info("User \"#{name}\" (#{email}) was removed")
system_hook_service.execute_hooks_for(self, :destroy)
end
......
class SystemHooksService
def execute_hooks_for(model, event)
execute_hooks(build_event_data(model, event))
data = build_event_data(model, event)
model.run_after_commit_or_now do
SystemHooksService.new.execute_hooks(data)
end
end
def execute_hooks(data, hooks_scope = :all)
......
......@@ -63,7 +63,7 @@ class WebHookService
end
def async_execute
Sidekiq::Client.enqueue(WebHookWorker, hook.id, data, hook_name)
WebHookWorker.perform_async(hook.id, data, hook_name)
end
private
......
class AdminEmailWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class AuthorizedProjectsWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
# Schedules multiple jobs and waits for them to be completed.
def self.bulk_perform_and_wait(args_list)
......@@ -17,11 +16,6 @@ class AuthorizedProjectsWorker
waiter.wait
end
# Schedules multiple jobs to run in sidekiq without waiting for completion
def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
end
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so
# they can benefit from retries
def self.bulk_perform_inline(args_list)
......
class BackgroundMigrationWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
# Enqueues a number of jobs in bulk.
#
# The `jobs` argument should be an Array of Arrays, each sub-array must be in
# the form:
#
# [migration-class, [arg1, arg2, ...]]
def self.perform_bulk(jobs)
Sidekiq::Client.push_bulk('class' => self,
'queue' => sidekiq_options['queue'],
'args' => jobs)
end
# Schedules multiple jobs in bulk, with a delay.
#
def self.perform_bulk_in(delay, jobs)
now = Time.now.to_i
schedule = now + delay.to_i
if schedule <= now
raise ArgumentError, 'The schedule time must be in the future!'
end
Sidekiq::Client.push_bulk('class' => self,
'queue' => sidekiq_options['queue'],
'args' => jobs,
'at' => schedule)
end
include ApplicationWorker
# Performs the background migration.
#
......
class BuildCoverageWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
def perform(build_id)
......
class BuildFinishedWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
......
class BuildHooksWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :hooks
......
class BuildQueueWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
......
class BuildSuccessWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
......
class BuildTraceSectionsWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
def perform(build_id)
......
class ClusterInstallAppWorker
include Sidekiq::Worker
include ApplicationWorker
include ClusterQueue
include ClusterApplications
......
class ClusterProvisionWorker
include Sidekiq::Worker
include ApplicationWorker
include ClusterQueue
def perform(cluster_id)
......
class ClusterWaitForAppInstallationWorker
include Sidekiq::Worker
include ApplicationWorker
include ClusterQueue
include ClusterApplications
......
Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
extend ActiveSupport::Concern
include Sidekiq::Worker
included do
sidekiq_options queue: base_queue_name
end
module ClassMethods
def base_queue_name
name
.sub(/\AGitlab::/, '')
.sub(/Worker\z/, '')
.underscore
.tr('/', '_')
end
def queue
get_sidekiq_options['queue'].to_s
end
def bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
def bulk_perform_in(delay, args_list)
now = Time.now.to_i
schedule = now + delay.to_i
if schedule <= now
raise ArgumentError, 'The schedule time must be in the future!'
end
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
end
end
end
# Concern that sets the queue of a Sidekiq worker based on the worker's class
# name/namespace.
module DedicatedSidekiqQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: name.sub(/Worker\z/, '').underscore.tr('/', '_')
end
end
......@@ -8,7 +8,7 @@ module Gitlab
extend ActiveSupport::Concern
included do
include Sidekiq::Worker
include ApplicationWorker
include GithubImport::Queue
include ReschedulingMethods
include NotifyUponDeath
......
class CreateGpgSignatureWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(commit_sha, project_id)
project = Project.find_by(id: project_id)
......
class CreatePipelineWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :creation
......
class DeleteMergedBranchesWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(project_id, user_id)
begin
......
class DeleteUserWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(current_user_id, delete_user_id, options = {})
delete_user = User.find(delete_user_id)
......
class EmailReceiverWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(raw)
return unless Gitlab::IncomingEmail.enabled?
......
class EmailsOnPushWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
attr_reader :email, :skip_premailer
......
class ExpireBuildArtifactsWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......@@ -8,6 +8,6 @@ class ExpireBuildArtifactsWorker
build_ids = Ci::Build.with_expired_artifacts.pluck(:id)
build_ids = build_ids.map { |build_id| [build_id] }
Sidekiq::Client.push_bulk('class' => ExpireBuildInstanceArtifactsWorker, 'args' => build_ids )
ExpireBuildInstanceArtifactsWorker.bulk_perform_async(build_ids)
end
end
class ExpireBuildInstanceArtifactsWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(build_id)
build = Ci::Build
......
class ExpireJobCacheWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :cache
......
class ExpirePipelineCacheWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :cache
......
class GitGarbageCollectWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
include Gitlab::CurrentSettings
sidekiq_options retry: false
......
......@@ -7,7 +7,7 @@ module Gitlab
# been completed this worker will advance the import process to the next
# stage.
class AdvanceStageWorker
include Sidekiq::Worker
include ApplicationWorker
sidekiq_options queue: 'github_importer_advance_stage', dead: false
......
......@@ -3,7 +3,7 @@
module Gitlab
module GithubImport
class RefreshImportJidWorker
include Sidekiq::Worker
include ApplicationWorker
include GithubImport::Queue
# The interval to schedule new instances of this job at.
......
......@@ -4,7 +4,7 @@ module Gitlab
module GithubImport
module Stage
class FinishImportWorker
include Sidekiq::Worker
include ApplicationWorker
include GithubImport::Queue
include StageMethods
......
......@@ -4,7 +4,7 @@ module Gitlab
module GithubImport
module Stage
class ImportBaseDataWorker
include Sidekiq::Worker
include ApplicationWorker
include GithubImport::Queue
include StageMethods
......
......@@ -4,7 +4,7 @@ module Gitlab
module GithubImport
module Stage
class ImportIssuesAndDiffNotesWorker
include Sidekiq::Worker
include ApplicationWorker
include GithubImport::Queue
include StageMethods
......
......@@ -4,7 +4,7 @@ module Gitlab
module GithubImport
module Stage
class ImportNotesWorker
include Sidekiq::Worker
include ApplicationWorker
include GithubImport::Queue
include StageMethods
......
......@@ -4,7 +4,7 @@ module Gitlab
module GithubImport
module Stage
class ImportPullRequestsWorker
include Sidekiq::Worker
include ApplicationWorker
include GithubImport::Queue
include StageMethods
......
......@@ -4,7 +4,7 @@ module Gitlab
module GithubImport
module Stage
class ImportRepositoryWorker
include Sidekiq::Worker
include ApplicationWorker
include GithubImport::Queue
include StageMethods
......
class GitlabShellWorker
include Sidekiq::Worker
include ApplicationWorker
include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
def perform(action, *arg)
gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
......
class GitlabUsagePingWorker
LEASE_TIMEOUT = 86400
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class GroupDestroyWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
include ExceptionBacktrace
def perform(group_id, user_id)
......
class ImportExportProjectCleanupWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class InvalidGpgSignatureUpdateWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(gpg_key_id)
gpg_key = GpgKey.find_by(id: gpg_key_id)
......
......@@ -2,8 +2,7 @@ require 'json'
require 'socket'
class IrkerWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(project_id, chans, colors, push_data, settings)
project = Project.find(project_id)
......
class MergeWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(merge_request_id, current_user_id, params)
params = params.with_indifferent_access
......
......@@ -5,14 +5,9 @@
# The worker will reject doing anything for projects that *do* have a
# namespace. For those use ProjectDestroyWorker instead.
class NamespacelessProjectDestroyWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
include ExceptionBacktrace
def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
end
def perform(project_id)
begin
project = Project.unscoped.find(project_id)
......
class NewIssueWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
include NewIssuable
def perform(issue_id, user_id)
......
class NewMergeRequestWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
include NewIssuable
def perform(merge_request_id, user_id)
......
class NewNoteWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
# Keep extra parameter to preserve backwards compatibility with
# old `NewNoteWorker` jobs (can remove later)
......
class PagesWorker
include Sidekiq::Worker
include ApplicationWorker
sidekiq_options queue: :pages, retry: false
......
class PipelineHooksWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :hooks
......
class PipelineMetricsWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
def perform(pipeline_id)
......
class PipelineNotificationWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
def perform(pipeline_id, recipients = nil)
......
class PipelineProcessWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
......
class PipelineScheduleWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class PipelineSuccessWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
......
class PipelineUpdateWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
......
class PostReceive
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(gl_repository, identifier, changes)
project, is_wiki = Gitlab::GlRepository.parse(gl_repository)
......
......@@ -5,8 +5,7 @@
# Consider using an extra worker if you need to add any extra (and potentially
# slow) processing of commits.
class ProcessCommitWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
# project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit.
......
# Worker for updating any project specific caches.
class ProjectCacheWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
LEASE_TIMEOUT = 15.minutes.to_i
......
class ProjectDestroyWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
include ExceptionBacktrace
def perform(project_id, user_id, params)
......
class ProjectExportWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
include ExceptionBacktrace
sidekiq_options retry: 3
......
class ProjectMigrateHashedStorageWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
LEASE_TIMEOUT = 30.seconds.to_i
......
class ProjectServiceWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
sidekiq_options dead: false
......
# Worker for updating any project specific caches.
class PropagateServiceTemplateWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
LEASE_TIMEOUT = 4.hours.to_i
......
class PruneOldEventsWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class ReactiveCachingWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(class_name, id, *args)
klass = begin
......
class RemoveExpiredGroupLinksWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class RemoveExpiredMembersWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class RemoveOldWebHookLogsWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
WEB_HOOK_LOG_LIFETIME = 2.days
......
class RemoveUnreferencedLfsObjectsWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class RepositoryArchiveCacheWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
module RepositoryCheck
class BatchWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
RUN_TIME = 3600
......
module RepositoryCheck
class ClearWorker
include Sidekiq::Worker
include ApplicationWorker
include RepositoryCheckQueue
def perform
......
module RepositoryCheck
class SingleRepositoryWorker
include Sidekiq::Worker
include ApplicationWorker
include RepositoryCheckQueue
def perform(project_id)
......
class RepositoryForkWorker
ForkError = Class.new(StandardError)
include Sidekiq::Worker
include ApplicationWorker
include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
include ProjectStartImport
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
......
class RepositoryImportWorker
ImportError = Class.new(StandardError)
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
include ExceptionBacktrace
include ProjectStartImport
......
class RequestsProfilesWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class ScheduleUpdateUserActivityWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform(batch_size = 500)
......
class StageUpdateWorker
include Sidekiq::Worker
include ApplicationWorker
include PipelineQueue
enqueue_in group: :processing
......
class StorageMigratorWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
BATCH_SIZE = 100
......
class StuckCiJobsWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'.freeze
......
class StuckImportJobsWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
IMPORT_JOBS_EXPIRATION = 15.hours.to_i
......
class StuckMergeJobsWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class SystemHookPushWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(push_data, hook_id)
SystemHooksService.new.execute_hooks(push_data, hook_id)
......
class TrendingProjectsWorker
include Sidekiq::Worker
include ApplicationWorker
include CronjobQueue
def perform
......
class UpdateMergeRequestsWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
LOG_TIME_THRESHOLD = 90 # seconds
......
class UpdateUserActivityWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(pairs)
pairs = cast_data(pairs)
......
class UploadChecksumWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
def perform(upload_id)
upload = Upload.find(upload_id)
......
class WaitForClusterCreationWorker
include Sidekiq::Worker
include ApplicationWorker
include ClusterQueue
def perform(cluster_id)
......
class WebHookWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
include ApplicationWorker
sidekiq_options retry: 4, dead: false
......
......@@ -13,13 +13,9 @@ module Sidekiq
module ClassMethods
module NoSchedulingFromTransactions
NESTING = ::Rails.env.test? ? 1 : 0
%i(perform_async perform_at perform_in).each do |name|
define_method(name) do |*args|
return super(*args) if Sidekiq::Worker.skip_transaction_check
return super(*args) unless ActiveRecord::Base.connection.open_transactions > NESTING
if !Sidekiq::Worker.skip_transaction_check && AfterCommitQueue.inside_transaction?
raise <<-MSG.strip_heredoc
`#{self}.#{name}` cannot be called inside a transaction as this can lead to
race conditions when the worker runs before the transaction is committed and
......@@ -28,6 +24,9 @@ module Sidekiq
Use an `after_commit` hook, or include `AfterCommitQueue` and use a `run_after_commit` block instead.
MSG
end
super(*args)
end
end
end
......
......@@ -64,13 +64,13 @@ end
# The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring.
config = YAML.load_file(Rails.root.join('config', 'sidekiq_queues.yml').to_s)
begin
queues = Gitlab::SidekiqConfig.worker_queues
Sidekiq.redis do |conn|
conn.pipelined do
config[:queues].each do |queue|
conn.sadd('queues', queue[0])
queues.each do |queue|
conn.sadd('queues', queue)
end
end
end
......
......@@ -25,14 +25,14 @@ class ScheduleEventMigrations < ActiveRecord::Migration
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
BackgroundMigrationWorker.perform_bulk(jobs)
BackgroundMigrationWorker.bulk_perform_async(jobs)
jobs.clear
end
jobs << ['MigrateEventsToPushEventPayloads', [min, max]]
end
BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty?
BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty?
end
def down
......
......@@ -19,7 +19,7 @@ class ScheduleCreateGpgKeySubkeysFromGpgKeys < ActiveRecord::Migration
[MIGRATION, [id]]
end
BackgroundMigrationWorker.perform_bulk(jobs)
BackgroundMigrationWorker.bulk_perform_async(jobs)
end
end
......
......@@ -68,10 +68,10 @@ BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, a
```
Usually it's better to enqueue jobs in bulk, for this you can use
`BackgroundMigrationWorker.perform_bulk`:
`BackgroundMigrationWorker.bulk_perform_async`:
```ruby
BackgroundMigrationWorker.perform_bulk(
BackgroundMigrationWorker.bulk_perform_async(
[['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]]
)
......@@ -85,13 +85,13 @@ updates. Removals in turn can be handled by simply defining foreign keys with
cascading deletes.
If you would like to schedule jobs in bulk with a delay, you can use
`BackgroundMigrationWorker.perform_bulk_in`:
`BackgroundMigrationWorker.bulk_perform_in`:
```ruby
jobs = [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]]
BackgroundMigrationWorker.perform_bulk_in(5.minutes, jobs)
BackgroundMigrationWorker.bulk_perform_in(5.minutes, jobs)
```
## Cleaning Up
......@@ -201,7 +201,7 @@ class ScheduleExtractServicesUrl < ActiveRecord::Migration
['ExtractServicesUrl', [id]]
end
BackgroundMigrationWorker.perform_bulk(jobs)
BackgroundMigrationWorker.bulk_perform_async(jobs)
end
end
......
......@@ -3,6 +3,12 @@
This document outlines various guidelines that should be followed when adding or
modifying Sidekiq workers.
## ApplicationWorker
All workers should include `ApplicationWorker` instead of `Sidekiq::Worker`,
which adds some convenience methods and automatically sets the queue based on
the worker's name.
## Default Queue
Use of the "default" queue is not allowed. Every worker should use a queue that
......@@ -13,19 +19,10 @@ A list of all available queues can be found in `config/sidekiq_queues.yml`.
## Dedicated Queues
Most workers should use their own queue. To ease this process a worker can
include the `DedicatedSidekiqQueue` concern as follows:
```ruby
class ProcessSomethingWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
end
```
This will set the queue name based on the class' name, minus the `Worker`
suffix. In the above example this would lead to the queue being
`process_something`.
Most workers should use their own queue, which is automatically set based on the
worker class name. For a worker named `ProcessSomethingWorker`, the queue name
would be `process_something`. If you're not sure what a worker's queue name is,
you can find it using `SomeWorker.queue`.
In some cases multiple workers do use the same queue. For example, the various
workers for updating CI pipelines all use the `pipeline` queue. Adding workers
......
......@@ -6,12 +6,34 @@ module AfterCommitQueue
after_rollback :_clear_after_commit_queue
end
def run_after_commit(method = nil, &block)
_after_commit_queue << proc { self.send(method) } if method # rubocop:disable GitlabSecurity/PublicSend
def run_after_commit(&block)
_after_commit_queue << block if block
true
end
def run_after_commit_or_now(&block)
if AfterCommitQueue.inside_transaction?
run_after_commit(&block)
else
instance_eval(&block)
end
true
end
def self.open_transactions_baseline
if ::Rails.env.test?
return DatabaseCleaner.connections.count { |conn| conn.strategy.is_a?(DatabaseCleaner::ActiveRecord::Transaction) }
end
0
end
def self.inside_transaction?
ActiveRecord::Base.connection.open_transactions > open_transactions_baseline
end
protected
def _run_after_commit_queue
......
......@@ -703,14 +703,14 @@ into similar problems in the future (e.g. when new tables are created).
# We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range.
BackgroundMigrationWorker.perform_bulk(jobs)
BackgroundMigrationWorker.bulk_perform_async(jobs)
jobs.clear
end
jobs << [job_class_name, [start_id, end_id]]
end
BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty?
BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty?
end
# Queues background migration jobs for an entire table, batched by ID range.
......
require 'yaml'
module Gitlab
module SidekiqConfig
def self.redis_queues
@redis_queues ||= Sidekiq::Queue.all.map(&:name)
end
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.config_queues(rails_path = Rails.root.to_s)
@config_queues ||= begin
config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml'))
config[:queues].map(&:first)
end
end
def self.cron_workers
@cron_workers ||= Settings.cron_jobs.map { |job_name, options| options['job_class'].constantize }
end
def self.workers
@workers ||= find_workers(Rails.root.join('app', 'workers'))
end
def self.default_queues
[ActionMailer::DeliveryJob.queue_name, 'default']
end
def self.worker_queues
@worker_queues ||= (workers.map(&:queue) + default_queues).uniq
end
def self.find_workers(root)
concerns = root.join('concerns').to_s
workers = Dir[root.join('**', '*.rb')]
.reject { |path| path.start_with?(concerns) }
workers.map! do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
ns.camelize.constantize
end
# Skip concerns
workers.select { |w| w < Sidekiq::Worker }
end
end
end
......@@ -942,8 +942,8 @@ describe Gitlab::Database::MigrationHelpers do
end
it 'queues jobs in groups of buffer size 1' do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id3, id3]]])
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end
......@@ -960,7 +960,7 @@ describe Gitlab::Database::MigrationHelpers do
end
it 'queues jobs in bulk all at once (big buffer size)' do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]],
expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]],
['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
......
require 'rails_helper'
describe Gitlab::SidekiqConfig do
describe '.workers' do
it 'includes all workers' do
workers = described_class.workers
expect(workers).to include(PostReceive)
expect(workers).to include(MergeWorker)
end
end
describe '.worker_queues' do
it 'includes all queues' do
queues = described_class.worker_queues
expect(queues).to include('post_receive')
expect(queues).to include('merge')
expect(queues).to include('cronjob')
expect(queues).to include('mailers')
expect(queues).to include('default')
end
end
end
......@@ -146,7 +146,7 @@ describe WebHookService do
let(:system_hook) { create(:system_hook) }
it 'enqueue WebHookWorker' do
expect(Sidekiq::Client).to receive(:enqueue).with(WebHookWorker, project_hook.id, data, 'push_hooks')
expect(WebHookWorker).to receive(:perform_async).with(project_hook.id, data, 'push_hooks')
described_class.new(project_hook, data, 'push_hooks').async_execute
end
......
......@@ -65,7 +65,6 @@ describe AuthorizedProjectsWorker do
args_list = build_args_list(project.owner.id)
push_bulk_args = {
'class' => described_class,
'queue' => described_class.sidekiq_options['queue'],
'args' => args_list
}
......
......@@ -10,35 +10,4 @@ describe BackgroundMigrationWorker, :sidekiq do
described_class.new.perform('Foo', [10, 20])
end
end
describe '.perform_bulk' do
it 'enqueues background migrations in bulk' do
Sidekiq::Testing.fake! do
described_class.perform_bulk([['Foo', [1]], ['Foo', [2]]])
expect(described_class.jobs.count).to eq 2
expect(described_class.jobs).to all(include('enqueued_at'))
end
end
end
describe '.perform_bulk_in' do
context 'when delay is valid' do
it 'correctly schedules background migrations' do
Sidekiq::Testing.fake! do
described_class.perform_bulk_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
expect(described_class.jobs.count).to eq 2
expect(described_class.jobs).to all(include('at'))
end
end
end
context 'when delay is invalid' do
it 'raises an ArgumentError exception' do
expect { described_class.perform_bulk_in(-60, [['Foo']]) }
.to raise_error(ArgumentError)
end
end
end
end
require 'spec_helper'
describe ApplicationWorker do
let(:worker) do
Class.new do
def self.name
'Gitlab::Foo::Bar::DummyWorker'
end
include ApplicationWorker
end
end
describe 'Sidekiq options' do
it 'sets the queue name based on the class name' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
end
end
describe '.queue' do
it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue
expect(worker.queue).to eq('some_queue')
end
end
describe '.bulk_perform_async' do
it 'enqueues jobs in bulk' do
Sidekiq::Testing.fake! do
worker.bulk_perform_async([['Foo', [1]], ['Foo', [2]]])
expect(worker.jobs.count).to eq 2
expect(worker.jobs).to all(include('enqueued_at'))
end
end
end
describe '.bulk_perform_in' do
context 'when delay is valid' do
it 'correctly schedules jobs' do
Sidekiq::Testing.fake! do
worker.bulk_perform_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
expect(worker.jobs.count).to eq 2
expect(worker.jobs).to all(include('at'))
end
end
end
context 'when delay is invalid' do
it 'raises an ArgumentError exception' do
expect { worker.bulk_perform_in(-60, [['Foo']]) }
.to raise_error(ArgumentError)
end
end
end
end
......@@ -3,7 +3,11 @@ require 'spec_helper'
describe ClusterQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
def self.name
'DummyWorker'
end
include ApplicationWorker
include ClusterQueue
end
end
......
......@@ -3,7 +3,11 @@ require 'spec_helper'
describe CronjobQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
def self.name
'DummyWorker'
end
include ApplicationWorker
include CronjobQueue
end
end
......
require 'spec_helper'
describe DedicatedSidekiqQueue do
let(:worker) do
Class.new do
def self.name
'Foo::Bar::DummyWorker'
end
include Sidekiq::Worker
include DedicatedSidekiqQueue
end
end
describe 'queue names' do
it 'sets the queue name based on the class name' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
end
end
end
......@@ -3,6 +3,10 @@ require 'spec_helper'
describe Gitlab::GithubImport::ObjectImporter do
let(:worker) do
Class.new do
def self.name
'DummyWorker'
end
include(Gitlab::GithubImport::ObjectImporter)
def counter_name
......
......@@ -3,7 +3,11 @@ require 'spec_helper'
describe Gitlab::GithubImport::Queue do
it 'sets the Sidekiq options for the worker' do
worker = Class.new do
include Sidekiq::Worker
def self.name
'DummyWorker'
end
include ApplicationWorker
include Gitlab::GithubImport::Queue
end
......
......@@ -3,7 +3,11 @@ require 'spec_helper'
describe PipelineQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
def self.name
'DummyWorker'
end
include ApplicationWorker
include PipelineQueue
end
end
......
......@@ -3,7 +3,11 @@ require 'spec_helper'
describe RepositoryCheckQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
def self.name
'DummyWorker'
end
include ApplicationWorker
include RepositoryCheckQueue
end
end
......
require 'spec_helper'
describe 'Every Sidekiq worker' do
let(:workers) do
root = Rails.root.join('app', 'workers')
concerns = root.join('concerns').to_s
workers = Dir[root.join('**', '*.rb')]
.reject { |path| path.start_with?(concerns) }
workers.map do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
ns.camelize.constantize
end
it 'includes ApplicationWorker' do
expect(Gitlab::SidekiqConfig.workers).to all(include(ApplicationWorker))
end
it 'does not use the default queue' do
workers.each do |worker|
expect(worker.sidekiq_options['queue'].to_s).not_to eq('default')
end
expect(Gitlab::SidekiqConfig.workers.map(&:queue)).not_to include('default')
end
it 'uses the cronjob queue when the worker runs as a cronjob' do
cron_workers = Settings.cron_jobs
.map { |job_name, options| options['job_class'].constantize }
.to_set
workers.each do |worker|
next unless cron_workers.include?(worker)
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
end
expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob'))
end
it 'defines the queue in the Sidekiq configuration file' do
config = YAML.load_file(Rails.root.join('config', 'sidekiq_queues.yml').to_s)
queue_names = config[:queues].map { |(queue, _)| queue }.to_set
config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set
workers.each do |worker|
expect(queue_names).to include(worker.sidekiq_options['queue'].to_s)
end
expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names))
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment