Commit 70a4e8dc authored by Yorick Peterse's avatar Yorick Peterse

Redirect sidekiq output to sidekiq-cluster

This redirects STDERR and STDOUT of the various Sidekiq processes to the
sidekiq-cluster process. The logging output of sidekiq-cluster itself
has been changed so it matches Sidekiq's output more closely, making
parsing/grepping easier.

Fixes #1438
parent 54e7f53a
#!/usr/bin/env ruby
require 'optparse'
require 'thread'
require_relative '../lib/gitlab/sidekiq_cluster'
require_relative '../lib/gitlab/sidekiq_cluster/cli'
......
require 'open3'
module Gitlab
module SidekiqCluster
# The signals that should terminate both the master and workers.
......@@ -41,15 +39,15 @@ module Gitlab
false
end
def self.signal_threads(threads, signal)
threads.each { |thread| signal(thread.pid, signal) }
def self.signal_processes(pids, signal)
pids.each { |pid| signal(pid, signal) }
end
def self.parse_queues(array)
array.map { |chunk| chunk.split(',') }
end
# Starts Sidekiq workers for the pairs of threads.
# Starts Sidekiq workers for the pairs of processes.
#
# Example:
#
......@@ -61,29 +59,47 @@ module Gitlab
# queues - An Array containing Arrays. Each sub Array should specify the
# queues to use for a single process.
#
# Returns an Array containing the threads monitoring each process.
# Returns an Array containing the PIDs of the started processes.
def self.start(queues, env)
queues.map { |pair| start_sidekiq(pair, env) }
end
# Starts a Sidekiq process that processes _only_ the given queues.
#
# Returns the PID of the started process.
def self.start_sidekiq(queues, env)
switches = queues.map { |q| "-q #{q},1" }
Open3.popen3({ 'ENABLE_SIDEKIQ_CLUSTER' => '1' },
pid = Process.spawn(
{ 'ENABLE_SIDEKIQ_CLUSTER' => '1' },
'bundle',
'exec',
'sidekiq',
"-c #{queues.length + 1}",
"-e#{env}",
"-gqueues: #{queues.join(', ')}",
*switches).last
"-r#{directory}",
*switches,
err: $stderr,
out: $stdout
)
wait_async(pid)
pid
end
# Waits for the given process to complete using a separate thread.
def self.wait_async(pid)
Thread.new do
Process.wait(pid) rescue Errno::ECHILD
end
end
# Returns true if all the processes/threads are alive.
def self.all_alive?(threads)
threads.each do |thread|
return false unless signal(thread.pid, 0)
# Returns true if all the processes are alive.
def self.all_alive?(pids)
pids.each do |pid|
return false unless signal(pid, 0)
end
true
......
require 'optparse'
require 'logger'
require 'time'
module Gitlab
module SidekiqCluster
......@@ -11,8 +12,13 @@ module Gitlab
@pid = nil
@interval = 5
@alive = true
@threads = []
@processes = []
@logger = Logger.new(log_output)
# Use a log format similar to Sidekiq to make parsing/grepping easier.
@logger.formatter = proc do |level, date, program, message|
"#{date.utc.iso8601(3)} #{Process.pid} TID-#{Thread.current.object_id.to_s(36)} #{level}: #{message}\n"
end
end
def run(argv = ARGV)
......@@ -27,7 +33,7 @@ module Gitlab
@logger.info("Starting cluster with #{queues.length} processes")
@threads = SidekiqCluster.start(queues, @environment)
@processes = SidekiqCluster.start(queues, @environment)
write_pid
trap_signals
......@@ -41,11 +47,11 @@ module Gitlab
def trap_signals
SidekiqCluster.trap_terminate do |signal|
@alive = false
SidekiqCluster.signal_threads(@threads, signal)
SidekiqCluster.signal_processes(@processes, signal)
end
SidekiqCluster.trap_forward do |signal|
SidekiqCluster.signal_threads(@threads, signal)
SidekiqCluster.signal_processes(@processes, signal)
end
end
......@@ -53,12 +59,12 @@ module Gitlab
while @alive
sleep(@interval)
unless SidekiqCluster.all_alive?(@threads)
unless SidekiqCluster.all_alive?(@processes)
# If a child process died we'll just terminate the whole cluster. It's up to
# runit and such to then restart the cluster.
@logger.info('A worker terminated, shutting down the cluster')
SidekiqCluster.signal_threads(@threads, :TERM)
SidekiqCluster.signal_processes(@processes, :TERM)
break
end
end
......
......@@ -51,13 +51,13 @@ describe Gitlab::SidekiqCluster::CLI do
end
describe '#start_loop' do
it 'runs until one of the threads has been terminated' do
it 'runs until one of the processes has been terminated' do
allow(cli).to receive(:sleep).with(a_kind_of(Numeric))
expect(Gitlab::SidekiqCluster).to receive(:all_alive?).
with(an_instance_of(Array)).and_return(false)
expect(Gitlab::SidekiqCluster).to receive(:signal_threads).
expect(Gitlab::SidekiqCluster).to receive(:signal_processes).
with(an_instance_of(Array), :TERM)
cli.start_loop
......
......@@ -40,13 +40,11 @@ describe Gitlab::SidekiqCluster do
end
end
describe '.signal_threads' do
describe '.signal_processes' do
it 'sends a signal to every thread' do
thread = double(:thread, pid: 1)
expect(described_class).to receive(:signal).with(1, :INT)
expect(described_class).to receive(:signal).with(thread.pid, :INT)
described_class.signal_threads([thread], :INT)
described_class.signal_processes([1], :INT)
end
end
......@@ -71,30 +69,37 @@ describe Gitlab::SidekiqCluster do
describe '.start_sidekiq' do
it 'starts a Sidekiq process' do
thread = double(:thread, pid: 1)
allow(Process).to receive(:spawn).and_return(1)
expect(described_class).to receive(:wait_async).with(1)
expect(described_class.start_sidekiq(%w(foo), :production)).to eq(1)
end
end
allow(Open3).to receive(:popen3).
and_return([double(:stdout), double(:stderr), double(:stdin), thread])
describe '.wait_async' do
it 'waits for a process in a separate thread' do
thread = described_class.wait_async(Process.spawn('true'))
expect(described_class.start_sidekiq(%w(foo), :production)).to eq(thread)
# Upon success Process.wait just returns the PID.
expect(thread.value).to be_a_kind_of(Numeric)
end
end
describe '.all_alive?' do
it 'returns true if all threads are alive' do
threads = [double(:thread, pid: 1)]
it 'returns true if all processes are alive' do
processes = [1]
allow(described_class).to receive(:signal).with(1, 0).and_return(true)
expect(described_class.all_alive?(threads)).to eq(true)
expect(described_class.all_alive?(processes)).to eq(true)
end
it 'returns false when a thread was not alive' do
threads = [double(:thread, pid: 1)]
processes = [1]
allow(described_class).to receive(:signal).with(1, 0).and_return(false)
expect(described_class.all_alive?(threads)).to eq(false)
expect(described_class.all_alive?(processes)).to eq(false)
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment