Commit cb193cd6 authored by Kamil Trzciński's avatar Kamil Trzciński

Improve resillency of monitor

- Retry connection when it fails
- Properly shutdown daemon
- Stop monitor if the Exception is raised
- Properly guard exception handling
parent 3683d2d2
......@@ -46,7 +46,10 @@ module Gitlab
if thread
thread.wakeup if thread.alive?
thread.join unless Thread.current == thread
begin
thread.join unless Thread.current == thread
rescue Exception # rubocop:disable Lint/RescueException
end
@thread = nil
end
end
......
......@@ -6,6 +6,7 @@ module Gitlab
NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
CANCEL_DEADLINE = 24.hours.seconds
RECONNECT_TIME = 3.seconds
# We use exception derived from `Exception`
# to consider this as an very low-level exception
......@@ -33,7 +34,8 @@ module Gitlab
action: 'run',
queue: queue,
jid: jid,
canceled: true)
canceled: true
)
raise CancelledError
end
......@@ -44,12 +46,45 @@ module Gitlab
end
end
def self.cancel_job(jid)
payload = {
action: 'cancel',
jid: jid
}.to_json
::Gitlab::Redis::SharedState.with do |redis|
redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
redis.publish(NOTIFICATION_CHANNEL, payload)
end
end
private
def start_working
Sidekiq.logger.info(
class: self.class,
action: 'start',
message: 'Starting Monitor Daemon')
message: 'Starting Monitor Daemon'
)
while enabled?
process_messages
sleep(RECONNECT_TIME)
end
ensure
Sidekiq.logger.warn(
class: self.class,
action: 'stop',
message: 'Stopping Monitor Daemon'
)
end
def stop_working
thread.raise(Interrupt) if thread.alive?
end
def process_messages
::Gitlab::Redis::SharedState.with do |redis|
redis.subscribe(NOTIFICATION_CHANNEL) do |on|
on.message do |channel, message|
......@@ -57,39 +92,24 @@ module Gitlab
end
end
end
Sidekiq.logger.warn(
class: self.class,
action: 'stop',
message: 'Stopping Monitor Daemon')
rescue Exception => e # rubocop:disable Lint/RescueException
Sidekiq.logger.warn(
class: self.class,
action: 'exception',
message: e.message)
raise e
end
def self.cancel_job(jid)
payload = {
action: 'cancel',
jid: jid
}.to_json
message: e.message
)
::Gitlab::Redis::SharedState.with do |redis|
redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
redis.publish(NOTIFICATION_CHANNEL, payload)
end
# we re-raise system exceptions
raise e unless e.is_a?(StandardError)
end
private
def process_message(message)
Sidekiq.logger.info(
class: self.class,
channel: NOTIFICATION_CHANNEL,
message: 'Received payload on channel',
payload: message)
payload: message
)
message = safe_parse(message)
return unless message
......@@ -115,14 +135,16 @@ module Gitlab
Thread.new do
# try to find a thread, but with guaranteed
# handle that this thread corresponds to actually running job
# that handle for thread corresponds to actually
# running job
find_thread_with_lock(jid) do |thread|
Sidekiq.logger.warn(
class: self.class,
action: 'cancel',
message: 'Canceling thread with CancelledError',
jid: jid,
thread_id: thread.object_id)
thread_id: thread.object_id
)
thread&.raise(CancelledError)
end
......
......@@ -34,12 +34,12 @@ describe Gitlab::Daemon do
end
end
describe 'when Daemon is enabled' do
context 'when Daemon is enabled' do
before do
allow(subject).to receive(:enabled?).and_return(true)
end
describe 'when Daemon is stopped' do
context 'when Daemon is stopped' do
describe '#start' do
it 'starts the Daemon' do
expect { subject.start.join }.to change { subject.thread? }.from(false).to(true)
......@@ -57,14 +57,14 @@ describe Gitlab::Daemon do
end
end
describe 'when Daemon is running' do
context 'when Daemon is running' do
before do
subject.start.join
subject.start
end
describe '#start' do
it "doesn't start running Daemon" do
expect { subject.start.join }.not_to change { subject.thread? }
expect { subject.start.join }.not_to change { subject.thread }
expect(subject).to have_received(:start_working).once
end
......@@ -76,11 +76,29 @@ describe Gitlab::Daemon do
expect(subject).to have_received(:stop_working)
end
context 'when stop_working raises exception' do
before do
allow(subject).to receive(:start_working) do
sleep(1000)
end
end
it 'shutdowns Daemon' do
expect(subject).to receive(:stop_working) do
subject.thread.raise(Interrupt)
end
expect(subject.thread).to be_alive
expect { subject.stop }.not_to raise_error
expect(subject.thread).to be_nil
end
end
end
end
end
describe 'when Daemon is disabled' do
context 'when Daemon is disabled' do
before do
allow(subject).to receive(:enabled?).and_return(false)
end
......
......@@ -31,19 +31,26 @@ describe Gitlab::SidekiqMonitor do
end
it 'raises exception' do
expect { monitor.within_job(jid, 'queue') }.to raise_error(described_class::CancelledError)
expect { monitor.within_job(jid, 'queue') }.to raise_error(
described_class::CancelledError)
end
end
end
describe '#start_working' do
subject { monitor.start_working }
subject { monitor.send(:start_working) }
context 'when structured logging is used' do
before do
allow_any_instance_of(::Redis).to receive(:subscribe)
end
before do
# we want to run at most once cycle
# we toggle `enabled?` flag after the first call
stub_const('Gitlab::SidekiqMonitor::RECONNECT_TIME', 0)
allow(monitor).to receive(:enabled?).and_return(true, false)
allow(Sidekiq.logger).to receive(:info)
allow(Sidekiq.logger).to receive(:warn)
end
context 'when structured logging is used' do
it 'logs start message' do
expect(Sidekiq.logger).to receive(:info)
.with(
......@@ -51,6 +58,8 @@ describe Gitlab::SidekiqMonitor do
action: 'start',
message: 'Starting Monitor Daemon')
expect(::Gitlab::Redis::SharedState).to receive(:with)
subject
end
......@@ -61,10 +70,25 @@ describe Gitlab::SidekiqMonitor do
action: 'stop',
message: 'Stopping Monitor Daemon')
expect(::Gitlab::Redis::SharedState).to receive(:with)
subject
end
it 'logs exception message' do
it 'logs StandardError message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
class: described_class,
action: 'exception',
message: 'My Exception')
expect(::Gitlab::Redis::SharedState).to receive(:with)
.and_raise(StandardError, 'My Exception')
expect { subject }.not_to raise_error
end
it 'logs and raises Exception message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
class: described_class,
......@@ -78,6 +102,20 @@ describe Gitlab::SidekiqMonitor do
end
end
context 'when StandardError is raised' do
it 'does retry connection' do
expect(::Gitlab::Redis::SharedState).to receive(:with)
.and_raise(StandardError, 'My Exception')
expect(::Gitlab::Redis::SharedState).to receive(:with)
# we expect to run `process_messages` twice
expect(monitor).to receive(:enabled?).and_return(true, true, false)
subject
end
end
context 'when message is published' do
let(:subscribed) { double }
......@@ -128,6 +166,19 @@ describe Gitlab::SidekiqMonitor do
end
end
describe '#stop' do
let!(:monitor_thread) { monitor.start }
it 'does stop the thread' do
expect(monitor_thread).to be_alive
expect { monitor.stop }.not_to raise_error
expect(monitor_thread).not_to be_alive
expect { monitor_thread.value }.to raise_error(Interrupt)
end
end
describe '#process_job_cancel' do
subject { monitor.send(:process_job_cancel, jid) }
......@@ -156,6 +207,11 @@ describe Gitlab::SidekiqMonitor do
monitor.jobs_thread[jid] = thread
end
after do
thread.kill
rescue
end
it 'does log cancellation message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
......@@ -175,8 +231,9 @@ describe Gitlab::SidekiqMonitor do
subject.join
expect(thread).not_to be_alive
expect { thread.value }.to raise_error(described_class::CancelledError)
# we wait for the thread to be cancelled
# by `process_job_cancel`
expect { thread.join(5) }.to raise_error(described_class::CancelledError)
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment