Commit 1333a40a authored by Rémy Coutable's avatar Rémy Coutable

Merge branch 'sidekiq-cluster-improvements' into 'master'

Sidekiq cluster improvements

Closes #1438

See merge request !987
parents 54e7f53a 003d04f4
#!/usr/bin/env ruby #!/usr/bin/env ruby
require 'optparse' require 'optparse'
require 'thread'
require_relative '../lib/gitlab/sidekiq_cluster' require_relative '../lib/gitlab/sidekiq_cluster'
require_relative '../lib/gitlab/sidekiq_cluster/cli' require_relative '../lib/gitlab/sidekiq_cluster/cli'
......
require 'open3'
module Gitlab module Gitlab
module SidekiqCluster module SidekiqCluster
# The signals that should terminate both the master and workers. # The signals that should terminate both the master and workers.
...@@ -41,15 +39,15 @@ module Gitlab ...@@ -41,15 +39,15 @@ module Gitlab
false false
end end
def self.signal_threads(threads, signal) def self.signal_processes(pids, signal)
threads.each { |thread| signal(thread.pid, signal) } pids.each { |pid| signal(pid, signal) }
end end
def self.parse_queues(array) def self.parse_queues(array)
array.map { |chunk| chunk.split(',') } array.map { |chunk| chunk.split(',') }
end end
# Starts Sidekiq workers for the pairs of threads. # Starts Sidekiq workers for the pairs of processes.
# #
# Example: # Example:
# #
...@@ -61,29 +59,49 @@ module Gitlab ...@@ -61,29 +59,49 @@ module Gitlab
# queues - An Array containing Arrays. Each sub Array should specify the # queues - An Array containing Arrays. Each sub Array should specify the
# queues to use for a single process. # queues to use for a single process.
# #
# Returns an Array containing the threads monitoring each process. # directory - The directory of the Rails application.
def self.start(queues, env) #
queues.map { |pair| start_sidekiq(pair, env) } # Returns an Array containing the PIDs of the started processes.
def self.start(queues, env, directory = Dir.pwd)
queues.map { |pair| start_sidekiq(pair, env, directory) }
end end
# Starts a Sidekiq process that processes _only_ the given queues. # Starts a Sidekiq process that processes _only_ the given queues.
def self.start_sidekiq(queues, env) #
# Returns the PID of the started process.
def self.start_sidekiq(queues, env, directory = Dir.pwd)
switches = queues.map { |q| "-q #{q},1" } switches = queues.map { |q| "-q #{q},1" }
Open3.popen3({ 'ENABLE_SIDEKIQ_CLUSTER' => '1' }, pid = Process.spawn(
{ 'ENABLE_SIDEKIQ_CLUSTER' => '1' },
'bundle', 'bundle',
'exec', 'exec',
'sidekiq', 'sidekiq',
"-c #{queues.length + 1}", "-c #{queues.length + 1}",
"-e#{env}", "-e#{env}",
"-gqueues: #{queues.join(', ')}", "-gqueues: #{queues.join(', ')}",
*switches).last "-r#{directory}",
*switches,
err: $stderr,
out: $stdout
)
wait_async(pid)
pid
end
# Waits for the given process to complete using a separate thread.
def self.wait_async(pid)
Thread.new do
Process.wait(pid) rescue Errno::ECHILD
end
end end
# Returns true if all the processes/threads are alive. # Returns true if all the processes are alive.
def self.all_alive?(threads) def self.all_alive?(pids)
threads.each do |thread| pids.each do |pid|
return false unless signal(thread.pid, 0) return false unless signal(pid, 0)
end end
true true
......
require 'optparse' require 'optparse'
require 'logger' require 'logger'
require 'time'
module Gitlab module Gitlab
module SidekiqCluster module SidekiqCluster
...@@ -11,8 +12,14 @@ module Gitlab ...@@ -11,8 +12,14 @@ module Gitlab
@pid = nil @pid = nil
@interval = 5 @interval = 5
@alive = true @alive = true
@threads = [] @processes = []
@logger = Logger.new(log_output) @logger = Logger.new(log_output)
@rails_path = Dir.pwd
# Use a log format similar to Sidekiq to make parsing/grepping easier.
@logger.formatter = proc do |level, date, program, message|
"#{date.utc.iso8601(3)} #{Process.pid} TID-#{Thread.current.object_id.to_s(36)} #{level}: #{message}\n"
end
end end
def run(argv = ARGV) def run(argv = ARGV)
...@@ -27,7 +34,7 @@ module Gitlab ...@@ -27,7 +34,7 @@ module Gitlab
@logger.info("Starting cluster with #{queues.length} processes") @logger.info("Starting cluster with #{queues.length} processes")
@threads = SidekiqCluster.start(queues, @environment) @processes = SidekiqCluster.start(queues, @environment, @rails_path)
write_pid write_pid
trap_signals trap_signals
...@@ -41,11 +48,11 @@ module Gitlab ...@@ -41,11 +48,11 @@ module Gitlab
def trap_signals def trap_signals
SidekiqCluster.trap_terminate do |signal| SidekiqCluster.trap_terminate do |signal|
@alive = false @alive = false
SidekiqCluster.signal_threads(@threads, signal) SidekiqCluster.signal_processes(@processes, signal)
end end
SidekiqCluster.trap_forward do |signal| SidekiqCluster.trap_forward do |signal|
SidekiqCluster.signal_threads(@threads, signal) SidekiqCluster.signal_processes(@processes, signal)
end end
end end
...@@ -53,12 +60,12 @@ module Gitlab ...@@ -53,12 +60,12 @@ module Gitlab
while @alive while @alive
sleep(@interval) sleep(@interval)
unless SidekiqCluster.all_alive?(@threads) unless SidekiqCluster.all_alive?(@processes)
# If a child process died we'll just terminate the whole cluster. It's up to # If a child process died we'll just terminate the whole cluster. It's up to
# runit and such to then restart the cluster. # runit and such to then restart the cluster.
@logger.info('A worker terminated, shutting down the cluster') @logger.info('A worker terminated, shutting down the cluster')
SidekiqCluster.signal_threads(@threads, :TERM) SidekiqCluster.signal_processes(@processes, :TERM)
break break
end end
end end
...@@ -82,6 +89,10 @@ module Gitlab ...@@ -82,6 +89,10 @@ module Gitlab
@pid = pid @pid = pid
end end
opt.on('-r', '--require PATH', 'Location of the Rails application') do |path|
@rails_path = path
end
opt.on('-i', '--interval INT', 'The number of seconds to wait between worker checks') do |int| opt.on('-i', '--interval INT', 'The number of seconds to wait between worker checks') do |int|
@interval = int.to_i @interval = int.to_i
end end
......
...@@ -51,13 +51,13 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -51,13 +51,13 @@ describe Gitlab::SidekiqCluster::CLI do
end end
describe '#start_loop' do describe '#start_loop' do
it 'runs until one of the threads has been terminated' do it 'runs until one of the processes has been terminated' do
allow(cli).to receive(:sleep).with(a_kind_of(Numeric)) allow(cli).to receive(:sleep).with(a_kind_of(Numeric))
expect(Gitlab::SidekiqCluster).to receive(:all_alive?). expect(Gitlab::SidekiqCluster).to receive(:all_alive?).
with(an_instance_of(Array)).and_return(false) with(an_instance_of(Array)).and_return(false)
expect(Gitlab::SidekiqCluster).to receive(:signal_threads). expect(Gitlab::SidekiqCluster).to receive(:signal_processes).
with(an_instance_of(Array), :TERM) with(an_instance_of(Array), :TERM)
cli.start_loop cli.start_loop
......
...@@ -40,13 +40,11 @@ describe Gitlab::SidekiqCluster do ...@@ -40,13 +40,11 @@ describe Gitlab::SidekiqCluster do
end end
end end
describe '.signal_threads' do describe '.signal_processes' do
it 'sends a signal to every thread' do it 'sends a signal to every thread' do
thread = double(:thread, pid: 1) expect(described_class).to receive(:signal).with(1, :INT)
expect(described_class).to receive(:signal).with(thread.pid, :INT) described_class.signal_processes([1], :INT)
described_class.signal_threads([thread], :INT)
end end
end end
...@@ -60,41 +58,48 @@ describe Gitlab::SidekiqCluster do ...@@ -60,41 +58,48 @@ describe Gitlab::SidekiqCluster do
describe '.start' do describe '.start' do
it 'starts Sidekiq with the given queues and environment' do it 'starts Sidekiq with the given queues and environment' do
expect(described_class).to receive(:start_sidekiq). expect(described_class).to receive(:start_sidekiq).
ordered.with(%w(foo), :production) ordered.with(%w(foo), :production, 'foo/bar')
expect(described_class).to receive(:start_sidekiq). expect(described_class).to receive(:start_sidekiq).
ordered.with(%w(bar baz), :production) ordered.with(%w(bar baz), :production, 'foo/bar')
described_class.start([%w(foo), %w(bar baz)], :production) described_class.start([%w(foo), %w(bar baz)], :production, 'foo/bar')
end end
end end
describe '.start_sidekiq' do describe '.start_sidekiq' do
it 'starts a Sidekiq process' do it 'starts a Sidekiq process' do
thread = double(:thread, pid: 1) allow(Process).to receive(:spawn).and_return(1)
expect(described_class).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo), :production)).to eq(1)
end
end
allow(Open3).to receive(:popen3). describe '.wait_async' do
and_return([double(:stdout), double(:stderr), double(:stdin), thread]) it 'waits for a process in a separate thread' do
thread = described_class.wait_async(Process.spawn('true'))
expect(described_class.start_sidekiq(%w(foo), :production)).to eq(thread) # Upon success Process.wait just returns the PID.
expect(thread.value).to be_a_kind_of(Numeric)
end end
end end
describe '.all_alive?' do describe '.all_alive?' do
it 'returns true if all threads are alive' do it 'returns true if all processes are alive' do
threads = [double(:thread, pid: 1)] processes = [1]
allow(described_class).to receive(:signal).with(1, 0).and_return(true) allow(described_class).to receive(:signal).with(1, 0).and_return(true)
expect(described_class.all_alive?(threads)).to eq(true) expect(described_class.all_alive?(processes)).to eq(true)
end end
it 'returns false when a thread was not alive' do it 'returns false when a thread was not alive' do
threads = [double(:thread, pid: 1)] processes = [1]
allow(described_class).to receive(:signal).with(1, 0).and_return(false) allow(described_class).to receive(:signal).with(1, 0).and_return(false)
expect(described_class.all_alive?(threads)).to eq(false) expect(described_class.all_alive?(processes)).to eq(false)
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment