Commit 7ff93264 authored by Yorick Peterse's avatar Yorick Peterse

Merge branch 'sidekiq-cluster' into 'master'

Ability to start dedicated Sidekiq processes

This adds the ability to start Sidekiq processes for specific queues. See https://gitlab.com/gitlab-org/gitlab-ee/issues/1282 for more information.

See merge request !922
parents d7ab8850 b588a0c4
#!/usr/bin/env ruby
require 'optparse'
require 'thread'
require_relative '../lib/gitlab/sidekiq_cluster'
require_relative '../lib/gitlab/sidekiq_cluster/cli'
Thread.abort_on_exception = true
cli = Gitlab::SidekiqCluster::CLI.new
begin
cli.run
rescue Gitlab::SidekiqCluster::CLI::CommandError => error
abort error.message
end
if ENV['ENABLE_SIDEKIQ_CLUSTER']
# Throttling should be disabled for dedicated workers.
module Gitlab
class SidekiqThrottler
def self.execute!
end
end
end
Thread.new do
Thread.current.abort_on_exception = true
parent = Process.ppid
loop do
sleep(5)
# In cluster mode it's possible that the master process is SIGKILL'd. In
# this case the parent PID changes and we need to terminate ourselves.
if Process.ppid != parent
Process.kill(:TERM, Process.pid)
break
end
end
end
end
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
- [Sidekiq MemoryKiller](operations/sidekiq_memory_killer.md) - [Sidekiq MemoryKiller](operations/sidekiq_memory_killer.md)
- [Sidekiq Job throttling](operations/sidekiq_job_throttling.md) - [Sidekiq Job throttling](operations/sidekiq_job_throttling.md)
- [Extra Sidekiq Processes](operations/extra_sidekiq_processes.md)
- [Cleaning up Redis sessions](operations/cleaning_up_redis_sessions.md) - [Cleaning up Redis sessions](operations/cleaning_up_redis_sessions.md)
- [Understanding Unicorn and unicorn-worker-killer](operations/unicorn.md) - [Understanding Unicorn and unicorn-worker-killer](operations/unicorn.md)
- [Moving repositories to a new location](operations/moving_repositories.md) - [Moving repositories to a new location](operations/moving_repositories.md)
# Extra Sidekiq Processes
GitLab Enterprise Edition allows one to start an extra set of Sidekiq processes
besides the default one. These processes can be used to consume a dedicated set
of queues. This can be used to ensure certain queues always have dedicated
workers, no matter the amount of jobs that need to be processed.
## Starting Extra Processes
Starting extra Sidekiq processes can be done using the command
`bin/sidekiq-cluster`. This command takes arguments using the following syntax:
```bash
sidekiq-cluster [QUEUE,QUEUE,...] [QUEUE, ...]
```
Each separate argument denotes a group of queues that have to be processed by a
Sidekiq process. Multiple queues can be processed by the same process by
separating them with a comma instead of a space.
For example, say you want to start 2 extra processes: one to process the
"process_commit" queue, and one to process the "post_receive" queue. This can be
done as follows:
```bash
sidekiq-cluster process_commit post_receive
```
If you instead want to start one process processing both queues you'd use the
following syntax:
```bash
sidekiq-cluster process_commit,post_receive
```
If you want to have one Sidekiq process process the "process_commit" and
"post_receive" queues, and one process to process the "gitlab_shell" queue,
you'd use the following:
```bash
sidekiq-cluster process_commit,post_receive gitlab_shell
```
## Concurrency
Each process started using `sidekiq-cluster` starts with a number of threads
that equals the number of queues, plus one spare thread. For example, a process
that processes "process_commit" and "post_receive" will use 3 threads in total.
## Monitoring
The `sidekiq-cluster` command will not terminate once it has started the desired
amount of Sidekiq processes. Instead the process will continue running and
forward any signals to the child processes. This makes it easy to stop all
Sidekiq processes as you simply send a signal to the `sidekiq-cluster` process,
instead of having to send it to the individual processes.
If the `sidekiq-cluster` process crashes or is SIGKILL'd the child processes
will terminate themselves after a few seconds. This ensures you don't end up
with zombie Sidekiq processes.
All of this makes monitoring the processes fairly easy. Simply hook up
`sidekiq-cluster` to your supervisor of choice (e.g. runit) and you're good to
go.
If a child process died the `sidekiq-cluster` command will signal all remaining
process to terminate, then terminate itself. This removes the need for
`sidekiq-cluster` to re-implement complex process monitoring/restarting code.
Instead you should make sure your supervisor restarts the `sidekiq-cluster`
process whenever necessary.
## PID Files
The `sidekiq-cluster` command can store its PID in a file. By default no PID
file is written, but this can be changed by passing the `--pidfile` option to
`sidekiq-cluster`. For example:
```bash
sidekiq-cluster --pidfile /var/run/gitlab/sidekiq_cluster.pid process_commit
```
Keep in mind that the PID file will contain the PID of the `sidekiq-cluster`
command, and not the PID(s) of the started Sidekiq processes.
## Environment
The Rails environment can be set by passing the `--environment` flag to the
`sidekiq-cluster` command, or by setting `RAILS_ENV` to a non-empty value. The
default value is "development".
require 'open3'
module Gitlab
module SidekiqCluster
# The signals that should terminate both the master and workers.
TERMINATE_SIGNALS = %i(INT TERM)
# The signals that should simply be forwarded to the workers.
FORWARD_SIGNALS = %i(TTIN USR1 USR2 HUP)
# Traps the given signals and yields the block whenever these signals are
# received.
#
# The block is passed the name of the signal.
#
# Example:
#
# trap_signals(%i(HUP TERM)) do |signal|
# ...
# end
def self.trap_signals(signals)
signals.each do |signal|
trap(signal) do
yield signal
end
end
end
def self.trap_terminate(&block)
trap_signals(TERMINATE_SIGNALS, &block)
end
def self.trap_forward(&block)
trap_signals(FORWARD_SIGNALS, &block)
end
def self.signal(pid, signal)
Process.kill(signal, pid)
true
rescue Errno::ESRCH
false
end
def self.signal_threads(threads, signal)
threads.each { |thread| signal(thread.pid, signal) }
end
def self.parse_queues(array)
array.map { |chunk| chunk.split(',') }
end
# Starts Sidekiq workers for the pairs of threads.
#
# Example:
#
# start([ ['foo'], ['bar', 'baz'] ], :production)
#
# This would start two Sidekiq processes: one processing "foo", and one
# processing "bar" and "baz".
#
# queues - An Array containing Arrays. Each sub Array should specify the
# queues to use for a single process.
#
# Returns an Array containing the threads monitoring each process.
def self.start(queues, env)
queues.map { |pair| start_sidekiq(pair, env) }
end
# Starts a Sidekiq process that processes _only_ the given queues.
def self.start_sidekiq(queues, env)
switches = queues.map { |q| "-q #{q},1" }
Open3.popen3({ 'ENABLE_SIDEKIQ_CLUSTER' => '1' },
'bundle',
'exec',
'sidekiq',
"-c #{queues.length + 1}",
"-e#{env}",
"-gqueues: #{queues.join(', ')}",
*switches).last
end
# Returns true if all the processes/threads are alive.
def self.all_alive?(threads)
threads.each do |thread|
return false unless signal(thread.pid, 0)
end
true
end
def self.write_pid(path)
File.open(path, 'w') do |handle|
handle.write(Process.pid.to_s)
end
end
end
end
require 'optparse'
require 'logger'
module Gitlab
module SidekiqCluster
class CLI
class CommandError < StandardError; end
def initialize(log_output = STDERR)
@environment = ENV['RAILS_ENV'] || 'development'
@pid = nil
@interval = 5
@alive = true
@threads = []
@logger = Logger.new(log_output)
end
def run(argv = ARGV)
if argv.empty?
raise CommandError,
'You must specify at least one queue to start a worker for'
end
option_parser.parse!(argv)
queues = SidekiqCluster.parse_queues(argv)
@logger.info("Starting cluster with #{queues.length} processes")
@threads = SidekiqCluster.start(queues, @environment)
write_pid
trap_signals
start_loop
end
def write_pid
SidekiqCluster.write_pid(@pid) if @pid
end
def trap_signals
SidekiqCluster.trap_terminate do |signal|
@alive = false
SidekiqCluster.signal_threads(@threads, signal)
end
SidekiqCluster.trap_forward do |signal|
SidekiqCluster.signal_threads(@threads, signal)
end
end
def start_loop
while @alive
sleep(@interval)
unless SidekiqCluster.all_alive?(@threads)
# If a child process died we'll just terminate the whole cluster. It's up to
# runit and such to then restart the cluster.
@logger.info('A worker terminated, shutting down the cluster')
SidekiqCluster.signal_threads(@threads, :TERM)
break
end
end
end
def option_parser
OptionParser.new do |opt|
opt.banner = "#{File.basename(__FILE__)} [QUEUE,QUEUE] [QUEUE] ... [OPTIONS]"
opt.separator "\nOptions:\n"
opt.on('-h', '--help', 'Shows this help message') do
abort opt.to_s
end
opt.on('-e', '--environment ENV', 'The application environment') do |env|
@environment = env
end
opt.on('-P', '--pidfile PATH', 'Path to the PID file') do |pid|
@pid = pid
end
opt.on('-i', '--interval INT', 'The number of seconds to wait between worker checks') do |int|
@interval = int.to_i
end
end
end
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqCluster::CLI do
let(:cli) { described_class.new('/dev/null') }
describe '#run' do
context 'without any arguments' do
it 'raises CommandError' do
expect { cli.run([]) }.to raise_error(described_class::CommandError)
end
end
context 'with arguments' do
it 'starts the Sidekiq workers' do
expect(Gitlab::SidekiqCluster).to receive(:start).and_return([])
expect(cli).to receive(:write_pid)
expect(cli).to receive(:trap_signals)
expect(cli).to receive(:start_loop)
cli.run(%w(foo))
end
end
end
describe '#write_pid' do
context 'when a PID is specified' do
it 'writes the PID to a file' do
expect(Gitlab::SidekiqCluster).to receive(:write_pid).with('/dev/null')
cli.option_parser.parse!(%w(-P /dev/null))
cli.write_pid
end
end
context 'when no PID is specified' do
it 'does not write a PID' do
expect(Gitlab::SidekiqCluster).not_to receive(:write_pid)
cli.write_pid
end
end
end
describe '#trap_signals' do
it 'traps the termination and forwarding signals' do
expect(Gitlab::SidekiqCluster).to receive(:trap_terminate)
expect(Gitlab::SidekiqCluster).to receive(:trap_forward)
cli.trap_signals
end
end
describe '#start_loop' do
it 'runs until one of the threads has been terminated' do
allow(cli).to receive(:sleep).with(a_kind_of(Numeric))
expect(Gitlab::SidekiqCluster).to receive(:all_alive?).
with(an_instance_of(Array)).and_return(false)
expect(Gitlab::SidekiqCluster).to receive(:signal_threads).
with(an_instance_of(Array), :TERM)
cli.start_loop
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqCluster do
describe '.trap_signals' do
it 'traps the given signals' do
expect(described_class).to receive(:trap).ordered.with(:INT)
expect(described_class).to receive(:trap).ordered.with(:HUP)
described_class.trap_signals(%i(INT HUP))
end
end
describe '.trap_terminate' do
it 'traps the termination signals' do
expect(described_class).to receive(:trap_signals).
with(described_class::TERMINATE_SIGNALS)
described_class.trap_terminate { }
end
end
describe '.trap_forward' do
it 'traps the signals to forward' do
expect(described_class).to receive(:trap_signals).
with(described_class::FORWARD_SIGNALS)
described_class.trap_forward { }
end
end
describe '.signal' do
it 'sends a signal to the given process' do
allow(Process).to receive(:kill).with(:INT, 4)
expect(described_class.signal(4, :INT)).to eq(true)
end
it 'returns false when the process does not exist' do
allow(Process).to receive(:kill).with(:INT, 4).and_raise(Errno::ESRCH)
expect(described_class.signal(4, :INT)).to eq(false)
end
end
describe '.signal_threads' do
it 'sends a signal to every thread' do
thread = double(:thread, pid: 1)
expect(described_class).to receive(:signal).with(thread.pid, :INT)
described_class.signal_threads([thread], :INT)
end
end
describe '.parse_queues' do
it 'returns an Array containing the parsed queues' do
expect(described_class.parse_queues(%w(foo bar,baz))).
to eq([%w(foo), %w(bar baz)])
end
end
describe '.start' do
it 'starts Sidekiq with the given queues and environment' do
expect(described_class).to receive(:start_sidekiq).
ordered.with(%w(foo), :production)
expect(described_class).to receive(:start_sidekiq).
ordered.with(%w(bar baz), :production)
described_class.start([%w(foo), %w(bar baz)], :production)
end
end
describe '.start_sidekiq' do
it 'starts a Sidekiq process' do
thread = double(:thread, pid: 1)
allow(Open3).to receive(:popen3).
and_return([double(:stdout), double(:stderr), double(:stdin), thread])
expect(described_class.start_sidekiq(%w(foo), :production)).to eq(thread)
end
end
describe '.all_alive?' do
it 'returns true if all threads are alive' do
threads = [double(:thread, pid: 1)]
allow(described_class).to receive(:signal).with(1, 0).and_return(true)
expect(described_class.all_alive?(threads)).to eq(true)
end
it 'returns false when a thread was not alive' do
threads = [double(:thread, pid: 1)]
allow(described_class).to receive(:signal).with(1, 0).and_return(false)
expect(described_class.all_alive?(threads)).to eq(false)
end
end
describe '.write_pid' do
it 'writes the PID of the current process to the given file' do
handle = double(:handle)
allow(File).to receive(:open).with('/dev/null', 'w').and_yield(handle)
expect(handle).to receive(:write).with(Process.pid.to_s)
described_class.write_pid('/dev/null')
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment