sidekiq_monitor.rb 4.16 KB
Newer Older
1 2 3 4 5 6
# frozen_string_literal: true

module Gitlab
  class SidekiqMonitor < Daemon
    include ::Gitlab::Utils::StrongMemoize

7
    NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'
8
    CANCEL_DEADLINE = 24.hours.seconds
9
    RECONNECT_TIME = 3.seconds
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32

    # We use exception derived from `Exception`
    # to consider this as an very low-level exception
    # that should not be caught by application
    CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException

    attr_reader :jobs_thread
    attr_reader :jobs_mutex

    def initialize
      super

      @jobs_thread = {}
      @jobs_mutex = Mutex.new
    end

    def within_job(jid, queue)
      jobs_mutex.synchronize do
        jobs_thread[jid] = Thread.current
      end

      if cancelled?(jid)
        Sidekiq.logger.warn(
33
          class: self.class.to_s,
34 35 36
          action: 'run',
          queue: queue,
          jid: jid,
37 38
          canceled: true
        )
39 40 41 42 43 44 45 46 47 48
        raise CancelledError
      end

      yield
    ensure
      jobs_mutex.synchronize do
        jobs_thread.delete(jid)
      end
    end

49 50 51 52 53 54 55 56 57 58 59 60 61 62
    def self.cancel_job(jid)
      payload = {
        action: 'cancel',
        jid: jid
      }.to_json

      ::Gitlab::Redis::SharedState.with do |redis|
        redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
        redis.publish(NOTIFICATION_CHANNEL, payload)
      end
    end

    private

63 64
    def start_working
      Sidekiq.logger.info(
65
        class: self.class.to_s,
66
        action: 'start',
67 68 69 70 71 72 73
        message: 'Starting Monitor Daemon'
      )

      while enabled?
        process_messages
        sleep(RECONNECT_TIME)
      end
74

75 76
    ensure
      Sidekiq.logger.warn(
77
        class: self.class.to_s,
78 79 80 81 82 83 84 85 86 87
        action: 'stop',
        message: 'Stopping Monitor Daemon'
      )
    end

    def stop_working
      thread.raise(Interrupt) if thread.alive?
    end

    def process_messages
88 89 90 91 92 93 94 95 96
      ::Gitlab::Redis::SharedState.with do |redis|
        redis.subscribe(NOTIFICATION_CHANNEL) do |on|
          on.message do |channel, message|
            process_message(message)
          end
        end
      end
    rescue Exception => e # rubocop:disable Lint/RescueException
      Sidekiq.logger.warn(
97
        class: self.class.to_s,
98
        action: 'exception',
99 100
        message: e.message
      )
101

102 103
      # we re-raise system exceptions
      raise e unless e.is_a?(StandardError)
104 105 106 107
    end

    def process_message(message)
      Sidekiq.logger.info(
108
        class: self.class.to_s,
109 110
        channel: NOTIFICATION_CHANNEL,
        message: 'Received payload on channel',
111 112
        payload: message
      )
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132

      message = safe_parse(message)
      return unless message

      case message['action']
      when 'cancel'
        process_job_cancel(message['jid'])
      else
        # unknown message
      end
    end

    def safe_parse(message)
      JSON.parse(message)
    rescue JSON::ParserError
    end

    def process_job_cancel(jid)
      return unless jid

133 134
      # try to find thread without lock
      return unless find_thread_unsafe(jid)
135

136 137
      Thread.new do
        # try to find a thread, but with guaranteed
138 139
        # that handle for thread corresponds to actually
        # running job
140
        find_thread_with_lock(jid) do |thread|
141
          Sidekiq.logger.warn(
142
            class: self.class.to_s,
143 144 145
            action: 'cancel',
            message: 'Canceling thread with CancelledError',
            jid: jid,
146 147
            thread_id: thread.object_id
          )
148 149 150 151 152 153 154 155 156

          thread&.raise(CancelledError)
        end
      end
    end

    # This method needs to be thread-safe
    # This is why it passes thread in block,
    # to ensure that we do process this thread
157 158 159 160 161 162 163
    def find_thread_unsafe(jid)
      jobs_thread[jid]
    end

    def find_thread_with_lock(jid)
      # don't try to lock if we cannot find the thread
      return unless find_thread_unsafe(jid)
164 165

      jobs_mutex.synchronize do
166 167 168
        find_thread_unsafe(jid).tap do |thread|
          yield(thread) if thread
        end
169 170 171 172 173 174 175 176 177 178 179 180 181 182
      end
    end

    def cancelled?(jid)
      ::Gitlab::Redis::SharedState.with do |redis|
        redis.exists(self.class.cancel_job_key(jid))
      end
    end

    def self.cancel_job_key(jid)
      "sidekiq:cancel:#{jid}"
    end
  end
end