Commit 0078ea44 authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch 'sidekiq-monitor-namespace-change' into 'master'

Sidekiq monitor namespace change

See merge request gitlab-org/gitlab-ce!32878
parents 7e8453fe eeeaebe6
......@@ -60,7 +60,7 @@ Sidekiq.configure_server do |config|
# Sidekiq (e.g. in an initializer).
ActiveRecord::Base.clear_all_connections!
Gitlab::SidekiqMonitor.instance.start if enable_sidekiq_monitor
Gitlab::SidekiqDaemon::Monitor.instance.start if enable_sidekiq_monitor
end
if enable_reliable_fetch?
......
......@@ -270,7 +270,7 @@ is interrupted mid-execution and it is not guaranteed
that proper rollback of transactions is implemented.
```ruby
Gitlab::SidekiqMonitor.cancel_job('job-id')
Gitlab::SidekiqDaemon::Monitor.cancel_job('job-id')
```
> This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1`
......
# frozen_string_literal: true
module Gitlab
module SidekiqDaemon
class Monitor < Daemon
include ::Gitlab::Utils::StrongMemoize
NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'
CANCEL_DEADLINE = 24.hours.seconds
RECONNECT_TIME = 3.seconds
# We use exception derived from `Exception`
# to consider this as an very low-level exception
# that should not be caught by application
CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
attr_reader :jobs_thread
attr_reader :jobs_mutex
def initialize
super
@jobs_thread = {}
@jobs_mutex = Mutex.new
end
def within_job(jid, queue)
jobs_mutex.synchronize do
jobs_thread[jid] = Thread.current
end
if cancelled?(jid)
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'run',
queue: queue,
jid: jid,
canceled: true
)
raise CancelledError
end
yield
ensure
jobs_mutex.synchronize do
jobs_thread.delete(jid)
end
end
def self.cancel_job(jid)
payload = {
action: 'cancel',
jid: jid
}.to_json
::Gitlab::Redis::SharedState.with do |redis|
redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
redis.publish(NOTIFICATION_CHANNEL, payload)
end
end
private
def start_working
Sidekiq.logger.info(
class: self.class.to_s,
action: 'start',
message: 'Starting Monitor Daemon'
)
while enabled?
process_messages
sleep(RECONNECT_TIME)
end
ensure
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'stop',
message: 'Stopping Monitor Daemon'
)
end
def stop_working
thread.raise(Interrupt) if thread.alive?
end
def process_messages
::Gitlab::Redis::SharedState.with do |redis|
redis.subscribe(NOTIFICATION_CHANNEL) do |on|
on.message do |channel, message|
process_message(message)
end
end
end
rescue Exception => e # rubocop:disable Lint/RescueException
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'exception',
message: e.message
)
# we re-raise system exceptions
raise e unless e.is_a?(StandardError)
end
def process_message(message)
Sidekiq.logger.info(
class: self.class.to_s,
channel: NOTIFICATION_CHANNEL,
message: 'Received payload on channel',
payload: message
)
message = safe_parse(message)
return unless message
case message['action']
when 'cancel'
process_job_cancel(message['jid'])
else
# unknown message
end
end
def safe_parse(message)
JSON.parse(message)
rescue JSON::ParserError
end
def process_job_cancel(jid)
return unless jid
# try to find thread without lock
return unless find_thread_unsafe(jid)
Thread.new do
# try to find a thread, but with guaranteed
# that handle for thread corresponds to actually
# running job
find_thread_with_lock(jid) do |thread|
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'cancel',
message: 'Canceling thread with CancelledError',
jid: jid,
thread_id: thread.object_id
)
thread&.raise(CancelledError)
end
end
end
# This method needs to be thread-safe
# This is why it passes thread in block,
# to ensure that we do process this thread
def find_thread_unsafe(jid)
jobs_thread[jid]
end
def find_thread_with_lock(jid)
# don't try to lock if we cannot find the thread
return unless find_thread_unsafe(jid)
jobs_mutex.synchronize do
find_thread_unsafe(jid).tap do |thread|
yield(thread) if thread
end
end
end
def cancelled?(jid)
::Gitlab::Redis::SharedState.with do |redis|
redis.exists(self.class.cancel_job_key(jid))
end
end
def self.cancel_job_key(jid)
"sidekiq:cancel:#{jid}"
end
end
end
end
......@@ -4,10 +4,10 @@ module Gitlab
module SidekiqMiddleware
class Monitor
def call(worker, job, queue)
Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do
Gitlab::SidekiqDaemon::Monitor.instance.within_job(job['jid'], queue) do
yield
end
rescue Gitlab::SidekiqMonitor::CancelledError
rescue Gitlab::SidekiqDaemon::Monitor::CancelledError
# push job to DeadSet
payload = ::Sidekiq.dump_json(job)
::Sidekiq::DeadSet.new.kill(payload, notify_failure: false)
......
# frozen_string_literal: true
module Gitlab
class SidekiqMonitor < Daemon
include ::Gitlab::Utils::StrongMemoize
NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'
CANCEL_DEADLINE = 24.hours.seconds
RECONNECT_TIME = 3.seconds
# We use exception derived from `Exception`
# to consider this as an very low-level exception
# that should not be caught by application
CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
attr_reader :jobs_thread
attr_reader :jobs_mutex
def initialize
super
@jobs_thread = {}
@jobs_mutex = Mutex.new
end
def within_job(jid, queue)
jobs_mutex.synchronize do
jobs_thread[jid] = Thread.current
end
if cancelled?(jid)
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'run',
queue: queue,
jid: jid,
canceled: true
)
raise CancelledError
end
yield
ensure
jobs_mutex.synchronize do
jobs_thread.delete(jid)
end
end
def self.cancel_job(jid)
payload = {
action: 'cancel',
jid: jid
}.to_json
::Gitlab::Redis::SharedState.with do |redis|
redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
redis.publish(NOTIFICATION_CHANNEL, payload)
end
end
private
def start_working
Sidekiq.logger.info(
class: self.class.to_s,
action: 'start',
message: 'Starting Monitor Daemon'
)
while enabled?
process_messages
sleep(RECONNECT_TIME)
end
ensure
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'stop',
message: 'Stopping Monitor Daemon'
)
end
def stop_working
thread.raise(Interrupt) if thread.alive?
end
def process_messages
::Gitlab::Redis::SharedState.with do |redis|
redis.subscribe(NOTIFICATION_CHANNEL) do |on|
on.message do |channel, message|
process_message(message)
end
end
end
rescue Exception => e # rubocop:disable Lint/RescueException
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'exception',
message: e.message
)
# we re-raise system exceptions
raise e unless e.is_a?(StandardError)
end
def process_message(message)
Sidekiq.logger.info(
class: self.class.to_s,
channel: NOTIFICATION_CHANNEL,
message: 'Received payload on channel',
payload: message
)
message = safe_parse(message)
return unless message
case message['action']
when 'cancel'
process_job_cancel(message['jid'])
else
# unknown message
end
end
def safe_parse(message)
JSON.parse(message)
rescue JSON::ParserError
end
def process_job_cancel(jid)
return unless jid
# try to find thread without lock
return unless find_thread_unsafe(jid)
Thread.new do
# try to find a thread, but with guaranteed
# that handle for thread corresponds to actually
# running job
find_thread_with_lock(jid) do |thread|
Sidekiq.logger.warn(
class: self.class.to_s,
action: 'cancel',
message: 'Canceling thread with CancelledError',
jid: jid,
thread_id: thread.object_id
)
thread&.raise(CancelledError)
end
end
end
# This method needs to be thread-safe
# This is why it passes thread in block,
# to ensure that we do process this thread
def find_thread_unsafe(jid)
jobs_thread[jid]
end
def find_thread_with_lock(jid)
# don't try to lock if we cannot find the thread
return unless find_thread_unsafe(jid)
jobs_mutex.synchronize do
find_thread_unsafe(jid).tap do |thread|
yield(thread) if thread
end
end
end
def cancelled?(jid)
::Gitlab::Redis::SharedState.with do |redis|
redis.exists(self.class.cancel_job_key(jid))
end
end
def self.cancel_job_key(jid)
"sidekiq:cancel:#{jid}"
end
end
end
......@@ -2,7 +2,7 @@
require 'spec_helper'
describe Gitlab::SidekiqMonitor do
describe Gitlab::SidekiqDaemon::Monitor do
let(:monitor) { described_class.new }
describe '#within_job' do
......@@ -43,7 +43,7 @@ describe Gitlab::SidekiqMonitor do
before do
# we want to run at most once cycle
# we toggle `enabled?` flag after the first call
stub_const('Gitlab::SidekiqMonitor::RECONNECT_TIME', 0)
stub_const('Gitlab::SidekiqDaemon::Monitor::RECONNECT_TIME', 0)
allow(monitor).to receive(:enabled?).and_return(true, false)
allow(Sidekiq.logger).to receive(:info)
......
......@@ -10,8 +10,8 @@ describe Gitlab::SidekiqMiddleware::Monitor do
let(:job) { { 'jid' => 'job-id' } }
let(:queue) { 'my-queue' }
it 'calls SidekiqMonitor' do
expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job)
it 'calls Gitlab::SidekiqDaemon::Monitor' do
expect(Gitlab::SidekiqDaemon::Monitor.instance).to receive(:within_job)
.with('job-id', 'my-queue')
.and_call_original
......@@ -29,7 +29,7 @@ describe Gitlab::SidekiqMiddleware::Monitor do
context 'when cancel happens' do
subject do
monitor.call(worker, job, queue) do
raise Gitlab::SidekiqMonitor::CancelledError
raise Gitlab::SidekiqDaemon::Monitor::CancelledError
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment