Commit c2cbfc5c authored by Kamil Trzciński's avatar Kamil Trzciński

Rework `Sidekiq::JobsThreads` into `Monitor`

This makes:
- very shallow `Middleware::Monitor` to only request tracking
  of sidekiq jobs,
- `SidekiqStatus::Monitor` to be responsible to maintain persistent
  connection to receive messages,
- `SidekiqStatus::Monitor` to always use structured logging
  and instance variables
parent 75e2302d
......@@ -33,7 +33,7 @@ Sidekiq.configure_server do |config|
config.redis = queues_config_hash
config.server_middleware do |chain|
chain.add Gitlab::SidekiqMiddleware::JobsThreads unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS']
chain.add Gitlab::SidekiqMiddleware::Monitor
chain.add Gitlab::SidekiqMiddleware::Metrics if Settings.monitoring.sidekiq_exporter
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
......@@ -59,7 +59,9 @@ Sidekiq.configure_server do |config|
# Sidekiq (e.g. in an initializer).
ActiveRecord::Base.clear_all_connections!
Gitlab::SidekiqStatus::Monitor.instance.start unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS']
if ENV.fetch("SIDEKIQ_MONITOR_WORKER", 0).to_i.nonzero?
Gitlab::SidekiqMonitor.instance.start
end
end
if enable_reliable_fetch?
......
......@@ -169,3 +169,121 @@ The PostgreSQL wiki has details on the query you can run to see blocking
queries. The query is different based on PostgreSQL version. See
[Lock Monitoring](https://wiki.postgresql.org/wiki/Lock_Monitoring) for
the query details.
## Managing Sidekiq queues
It is possible to use [Sidekiq API](https://github.com/mperham/sidekiq/wiki/API)
to perform a number of troubleshoting on Sidekiq.
These are the administrative commands and it should only be used if currently
admin interface is not suitable due to scale of installation.
All this commands should be run using `gitlab-rails console`.
### View the queue size
```ruby
Sidekiq::Queue.new("pipeline_processing:build_queue").size
```
### Enumerate all enqueued jobs
```ruby
queue = Sidekiq::Queue.new("chaos:chaos_sleep")
queue.each do |job|
# job.klass # => 'MyWorker'
# job.args # => [1, 2, 3]
# job.jid # => jid
# job.queue # => chaos:chaos_sleep
# job["retry"] # => 3
# job.item # => {
# "class"=>"Chaos::SleepWorker",
# "args"=>[1000],
# "retry"=>3,
# "queue"=>"chaos:chaos_sleep",
# "backtrace"=>true,
# "queue_namespace"=>"chaos",
# "jid"=>"39bc482b823cceaf07213523",
# "created_at"=>1566317076.266069,
# "correlation_id"=>"c323b832-a857-4858-b695-672de6f0e1af",
# "enqueued_at"=>1566317076.26761},
# }
# job.delete if job.jid == 'abcdef1234567890'
end
```
### Enumerate currently running jobs
```ruby
workers = Sidekiq::Workers.new
workers.each do |process_id, thread_id, work|
# process_id is a unique identifier per Sidekiq process
# thread_id is a unique identifier per thread
# work is a Hash which looks like:
# {"queue"=>"chaos:chaos_sleep",
# "payload"=>
# { "class"=>"Chaos::SleepWorker",
# "args"=>[1000],
# "retry"=>3,
# "queue"=>"chaos:chaos_sleep",
# "backtrace"=>true,
# "queue_namespace"=>"chaos",
# "jid"=>"b2a31e3eac7b1a99ff235869",
# "created_at"=>1566316974.9215662,
# "correlation_id"=>"e484fb26-7576-45f9-bf21-b99389e1c53c",
# "enqueued_at"=>1566316974.9229589},
# "run_at"=>1566316974}],
end
```
### Remove sidekiq jobs for given parameters (destructive)
```ruby
# for jobs like this:
# RepositoryImportWorker.new.perform_async(100)
id_list = [100]
queue = Sidekiq::Queue.new('repository_import')
queue.each do |job|
job.delete if id_list.include?(job.args[0])
end
```
### Remove specific job ID (destructive)
```ruby
queue = Sidekiq::Queue.new('repository_import')
queue.each do |job|
job.delete if job.jid == 'my-job-id'
end
```
## Canceling running jobs (destructive)
> Introduced in GitLab 12.3.
This is highly risky operation and use it as last resort.
Doing that might result in data corruption, as the job
is interrupted mid-execution and it is not guaranteed
that proper rollback of transactions is implemented.
```ruby
Gitlab::SidekiqMonitor.cancel_job('job-id')
```
> This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1`
> environment variable.
To perform of the interrupt we use `Thread.raise` which
has number of drawbacks, as mentioned in [Why Ruby’s Timeout is dangerous (and Thread.raise is terrifying)](https://jvns.ca/blog/2015/11/27/why-rubys-timeout-is-dangerous-and-thread-dot-raise-is-terrifying/):
> This is where the implications get interesting, and terrifying. This means that an exception can get raised:
>
> * during a network request (ok, as long as the surrounding code is prepared to catch Timeout::Error)
> * during the cleanup for the network request
> * during a rescue block
> * while creating an object to save to the database afterwards
> * in any of your code, regardless of whether it could have possibly raised an exception before
>
> Nobody writes code to defend against an exception being raised on literally any line. That’s not even possible. So Thread.raise is basically like a sneak attack on your code that could result in almost anything. It would probably be okay if it were pure-functional code that did not modify any state. But this is Ruby, so that’s unlikely :)
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
class JobsThreads
@@jobs = {} # rubocop:disable Style/ClassVars
MUTEX = Mutex.new
def call(worker, job, queue)
jid = job['jid']
MUTEX.synchronize do
@@jobs[jid] = Thread.current
end
return if self.class.cancelled?(jid)
yield
ensure
MUTEX.synchronize do
@@jobs.delete(jid)
end
end
def self.interrupt(jid)
MUTEX.synchronize do
thread = @@jobs[jid]
break unless thread
thread.raise(Interrupt)
thread
end
end
def self.cancelled?(jid)
Sidekiq.redis {|c| c.exists("cancelled-#{jid}") }
end
def self.mark_job_as_cancelled(jid)
Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 86400, 1) }
"Marked job as cancelled(if Sidekiq retry within 24 hours, the job will be skipped as `processed`). Jid: #{jid}"
end
def self.jobs
@@jobs
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
class Monitor
def call(worker, job, queue)
Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do
yield
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
class SidekiqMonitor < Daemon
include ::Gitlab::Utils::StrongMemoize
NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
CANCEL_DEADLINE = 24.hours.seconds
# We use exception derived from `Exception`
# to consider this as an very low-level exception
# that should not be caught by application
CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
attr_reader :jobs_thread
attr_reader :jobs_mutex
def initialize
super
@jobs_thread = {}
@jobs_mutex = Mutex.new
end
def within_job(jid, queue)
jobs_mutex.synchronize do
jobs_thread[jid] = Thread.current
end
if cancelled?(jid)
Sidekiq.logger.warn(
class: self.class,
action: 'run',
queue: queue,
jid: jid,
canceled: true)
raise CancelledError
end
yield
ensure
jobs_mutex.synchronize do
jobs_thread.delete(jid)
end
end
def start_working
Sidekiq.logger.info(
class: self.class,
action: 'start',
message: 'Starting Monitor Daemon')
::Gitlab::Redis::SharedState.with do |redis|
redis.subscribe(NOTIFICATION_CHANNEL) do |on|
on.message do |channel, message|
process_message(message)
end
end
end
Sidekiq.logger.warn(
class: self.class,
action: 'stop',
message: 'Stopping Monitor Daemon')
rescue Exception => e # rubocop:disable Lint/RescueException
Sidekiq.logger.warn(
class: self.class,
action: 'exception',
message: e.message)
raise e
end
def self.cancel_job(jid)
payload = {
action: 'cancel',
jid: jid
}.to_json
::Gitlab::Redis::SharedState.with do |redis|
redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
redis.publish(NOTIFICATION_CHANNEL, payload)
end
end
private
def process_message(message)
Sidekiq.logger.info(
class: self.class,
channel: NOTIFICATION_CHANNEL,
message: 'Received payload on channel',
payload: message)
message = safe_parse(message)
return unless message
case message['action']
when 'cancel'
process_job_cancel(message['jid'])
else
# unknown message
end
end
def safe_parse(message)
JSON.parse(message)
rescue JSON::ParserError
end
def process_job_cancel(jid)
return unless jid
# since this might take time, process cancel in a new thread
Thread.new do
find_thread(jid) do |thread|
next unless thread
Sidekiq.logger.warn(
class: self.class,
action: 'cancel',
message: 'Canceling thread with CancelledError',
jid: jid,
thread_id: thread.object_id)
thread&.raise(CancelledError)
end
end
end
# This method needs to be thread-safe
# This is why it passes thread in block,
# to ensure that we do process this thread
def find_thread(jid)
return unless jid
jobs_mutex.synchronize do
thread = jobs_thread[jid]
yield(thread)
thread
end
end
def cancelled?(jid)
::Gitlab::Redis::SharedState.with do |redis|
redis.exists(self.class.cancel_job_key(jid))
end
end
def self.cancel_job_key(jid)
"sidekiq:cancel:#{jid}"
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqStatus
class Monitor < Daemon
include ::Gitlab::Utils::StrongMemoize
NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
def start_working
Sidekiq.logger.info "Watching sidekiq monitor"
::Gitlab::Redis::SharedState.with do |redis|
redis.subscribe(NOTIFICATION_CHANNEL) do |on|
on.message do |channel, message|
Sidekiq.logger.info "Received #{message} on #{channel}..."
execute_job_cancel(message)
end
end
end
end
def self.cancel_job(jid)
Gitlab::Redis::SharedState.with do |redis|
redis.publish(NOTIFICATION_CHANNEL, jid)
"Notification sent. Job should be cancelled soon. Check log to confirm. Jid: #{jid}"
end
end
private
def execute_job_cancel(jid)
Gitlab::SidekiqMiddleware::JobsThreads.mark_job_as_cancelled(jid)
thread = Gitlab::SidekiqMiddleware::JobsThreads
.interrupt(jid)
if thread
Sidekiq.logger.info "Interrupted thread: #{thread} for #{jid}."
else
Sidekiq.logger.info "Did not find thread for #{jid}."
end
end
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::JobsThreads do
subject { described_class.new }
let(:worker) { double(:worker, class: Chaos::SleepWorker) }
let(:jid) { '581f90fbd2f24deabcbde2f9' }
let(:job) { { 'jid' => jid } }
let(:jid_thread) { '684f90fbd2f24deabcbde2f9' }
let(:job_thread) { { 'jid' => jid_thread } }
let(:queue) { 'test_queue' }
let(:mark_job_as_cancelled) { Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 2, 1) } }
def run_job
subject.call(worker, job, queue) do
sleep 2
"mock return from yield"
end
end
def run_job_thread
Thread.new do
subject.call(worker, job_thread, queue) do
sleep 3
"mock return from yield"
end
end
end
describe '.call' do
context 'by default' do
it 'return from yield' do
expect(run_job).to eq("mock return from yield")
end
end
context 'when job is marked as cancelled' do
before do
mark_job_as_cancelled
end
it 'return directly' do
expect(run_job).to be_nil
end
end
end
describe '.self.interrupt' do
before do
run_job_thread
sleep 1
end
it 'interrupt the job with correct jid' do
expect(described_class.jobs[jid_thread]).to receive(:raise).with(Interrupt)
expect(described_class.interrupt jid_thread).to eq(described_class.jobs[jid_thread])
end
it 'do nothing with wrong jid' do
expect(described_class.jobs[jid_thread]).not_to receive(:raise)
expect(described_class.interrupt 'wrong_jid').to be_nil
end
end
describe '.self.cancelled?' do
it 'return true when job is marked as cancelled' do
mark_job_as_cancelled
expect(described_class.cancelled? jid).to be true
end
it 'return false when job is not marked as cancelled' do
expect(described_class.cancelled? 'non-exists-jid').to be false
end
end
describe '.self.mark_job_as_cancelled' do
it 'set Redis key' do
described_class.mark_job_as_cancelled('jid_123')
expect(described_class.cancelled? 'jid_123').to be true
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::Monitor do
let(:monitor) { described_class.new }
describe '#call' do
let(:worker) { double }
let(:job) { { 'jid' => 'job-id' } }
let(:queue) { 'my-queue' }
it 'calls SidekiqMonitor' do
expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job)
.with('job-id', 'my-queue')
.and_call_original
expect { |blk| monitor.call(worker, job, queue, &blk) }.to yield_control
end
it 'passthroughs the return value' do
result = monitor.call(worker, job, queue) do
'value'
end
expect(result).to eq('value')
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::SidekiqMonitor do
let(:monitor) { described_class.new }
describe '#within_job' do
it 'tracks thread' do
blk = proc do
expect(monitor.jobs_thread['jid']).not_to be_nil
"OK"
end
expect(monitor.within_job('jid', 'queue', &blk)).to eq("OK")
end
context 'when job is canceled' do
let(:jid) { SecureRandom.hex }
before do
described_class.cancel_job(jid)
end
it 'does not execute a block' do
expect do |blk|
monitor.within_job(jid, 'queue', &blk)
rescue described_class::CancelledError
end.not_to yield_control
end
it 'raises exception' do
expect { monitor.within_job(jid, 'queue') }.to raise_error(described_class::CancelledError)
end
end
end
describe '#start_working' do
subject { monitor.start_working }
context 'when structured logging is used' do
before do
allow_any_instance_of(::Redis).to receive(:subscribe)
end
it 'logs start message' do
expect(Sidekiq.logger).to receive(:info)
.with(
class: described_class,
action: 'start',
message: 'Starting Monitor Daemon')
subject
end
it 'logs stop message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
class: described_class,
action: 'stop',
message: 'Stopping Monitor Daemon')
subject
end
it 'logs exception message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
class: described_class,
action: 'exception',
message: 'My Exception')
expect(::Gitlab::Redis::SharedState).to receive(:with)
.and_raise(Exception, 'My Exception')
expect { subject }.to raise_error(Exception, 'My Exception')
end
end
context 'when message is published' do
let(:subscribed) { double }
before do
expect_any_instance_of(::Redis).to receive(:subscribe)
.and_yield(subscribed)
expect(subscribed).to receive(:message)
.and_yield(
described_class::NOTIFICATION_CHANNEL,
payload
)
expect(Sidekiq.logger).to receive(:info)
.with(
class: described_class,
action: 'start',
message: 'Starting Monitor Daemon')
expect(Sidekiq.logger).to receive(:info)
.with(
class: described_class,
channel: described_class::NOTIFICATION_CHANNEL,
message: 'Received payload on channel',
payload: payload
)
end
context 'and message is valid' do
let(:payload) { '{"action":"cancel","jid":"my-jid"}' }
it 'processes cancel' do
expect(monitor).to receive(:process_job_cancel).with('my-jid')
subject
end
end
context 'and message is not valid json' do
let(:payload) { '{"action"}' }
it 'skips processing' do
expect(monitor).not_to receive(:process_job_cancel)
subject
end
end
end
end
describe '#process_job_cancel' do
subject { monitor.send(:process_job_cancel, jid) }
context 'when jid is missing' do
let(:jid) { nil }
it 'does not run thread' do
expect(subject).to be_nil
end
end
context 'when jid is provided' do
let(:jid) { 'my-jid' }
context 'when jid is not found' do
it 'does not log cancellation message' do
expect(Sidekiq.logger).not_to receive(:warn)
expect(subject).to be_a(Thread)
subject.join
end
end
context 'when jid is found' do
let(:thread) { Thread.new { sleep 1000 } }
before do
monitor.jobs_thread[jid] = thread
end
it 'does log cancellation message' do
expect(Sidekiq.logger).to receive(:warn)
.with(
class: described_class,
action: 'cancel',
message: 'Canceling thread with CancelledError',
jid: 'my-jid',
thread_id: thread.object_id)
expect(subject).to be_a(Thread)
subject.join
end
it 'does cancel the thread' do
expect(subject).to be_a(Thread)
subject.join
expect(thread).not_to be_alive
expect { thread.value }.to raise_error(described_class::CancelledError)
end
end
end
end
describe '.cancel_job' do
subject { described_class.cancel_job('my-jid') }
it 'sets a redis key' do
expect_any_instance_of(::Redis).to receive(:setex)
.with('sidekiq:cancel:my-jid', anything, 1)
subject
end
it 'notifies all workers' do
payload = '{"action":"cancel","jid":"my-jid"}'
expect_any_instance_of(::Redis).to receive(:publish)
.with('sidekiq:cancel:notifications', payload)
subject
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment