Commit e220af44 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch 'qmnguyen0711/compress-background-job-payloads' into 'master'

Compress oversized Sidekiq job payload before dispatching into Redis

See merge request gitlab-org/gitlab!61667
parents 242610b4 86a8ce61
---
title: Compress oversized Sidekiq job payload before dispatching into Redis
merge_request: 61667
author:
type: added
......@@ -14,6 +14,9 @@ module Gitlab
job = job.except('error_backtrace', 'error_class', 'error_message')
job['class'] = job.delete('wrapped') if job['wrapped'].present?
job['job_size_bytes'] = Sidekiq.dump_json(job['args']).bytesize
job['args'] = ['[COMPRESSED]'] if ::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.compressed?(job)
# Add process id params
job['pid'] = ::Process.pid
......
......@@ -55,8 +55,6 @@ module Gitlab
scheduling_latency_s = ::Gitlab::InstrumentationHelper.queue_duration_for_job(payload)
payload['scheduling_latency_s'] = scheduling_latency_s if scheduling_latency_s
payload['job_size_bytes'] = Sidekiq.dump_json(job).bytesize
payload
end
......
......@@ -9,6 +9,8 @@ module Gitlab
# eg: `config.server_middleware(&Gitlab::SidekiqMiddleware.server_configurator)`
def self.server_configurator(metrics: true, arguments_logger: true, memory_killer: true)
lambda do |chain|
# Size limiter should be placed at the top
chain.add ::Gitlab::SidekiqMiddleware::SizeLimiter::Server
chain.add ::Gitlab::SidekiqMiddleware::Monitor
chain.add ::Gitlab::SidekiqMiddleware::ServerMetrics if metrics
chain.add ::Gitlab::SidekiqMiddleware::ArgumentsLogger if arguments_logger
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module SizeLimiter
class Compressor
PayloadDecompressionConflictError = Class.new(StandardError)
PayloadDecompressionError = Class.new(StandardError)
# Level 5 is a good trade-off between space and time
# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1054#note_568129605
COMPRESS_LEVEL = 5
ORIGINAL_SIZE_KEY = 'original_job_size_bytes'
COMPRESSED_KEY = 'compressed'
def self.compressed?(job)
job&.has_key?(COMPRESSED_KEY)
end
def self.compress(job, job_args)
compressed_args = Base64.strict_encode64(Zlib::Deflate.deflate(job_args, COMPRESS_LEVEL))
job[COMPRESSED_KEY] = true
job[ORIGINAL_SIZE_KEY] = job_args.bytesize
job['args'] = [compressed_args]
compressed_args
end
def self.decompress(job)
return unless compressed?(job)
validate_args!(job)
job.except!(ORIGINAL_SIZE_KEY, COMPRESSED_KEY)
job['args'] = Sidekiq.load_json(Zlib::Inflate.inflate(Base64.strict_decode64(job['args'].first)))
rescue Zlib::Error
raise PayloadDecompressionError, 'Fail to decompress Sidekiq job payload'
end
def self.validate_args!(job)
if job['args'] && job['args'].length != 1
exception = PayloadDecompressionConflictError.new('Sidekiq argument list should include 1 argument.\
This means that there is another a middleware interfering with the job payload.\
That conflicts with the payload compressor')
::Gitlab::ErrorTracking.track_and_raise_exception(exception)
end
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module SizeLimiter
class Server
def call(worker, job, queue)
# This middleware should always decompress jobs regardless of the
# limiter mode or size limit. Otherwise, this could leave compressed
# payloads in queues that are then not able to be processed.
::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.decompress(job)
yield
end
end
end
end
end
......@@ -3,76 +3,103 @@
module Gitlab
module SidekiqMiddleware
module SizeLimiter
# Validate a Sidekiq job payload limit based on current configuration.
# Handle a Sidekiq job payload limit based on current configuration.
# This validator pulls the configuration from the environment variables:
#
# - GITLAB_SIDEKIQ_SIZE_LIMITER_MODE: the current mode of the size
# limiter. This must be either `track` or `raise`.
#
# limiter. This must be either `track` or `compress`.
# - GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES: the
# threshold before the input job payload is compressed.
# - GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES: the size limit in bytes.
#
# If the size of job payload after serialization exceeds the limit, an
# error is tracked raised adhering to the mode.
# In track mode, if a job payload limit exceeds the size limit, an
# event is sent to Sentry and the job is scheduled like normal.
#
# In compress mode, if a job payload limit exceeds the threshold, it is
# then compressed. If the compressed payload still exceeds the limit, the
# job is discarded, and a ExceedLimitError exception is raised.
class Validator
def self.validate!(worker_class, job)
new(worker_class, job).validate!
end
DEFAULT_SIZE_LIMIT = 0
DEFAULT_COMPRESION_THRESHOLD_BYTES = 100_000 # 100kb
MODES = [
TRACK_MODE = 'track',
RAISE_MODE = 'raise'
COMPRESS_MODE = 'compress'
].freeze
attr_reader :mode, :size_limit
attr_reader :mode, :size_limit, :compression_threshold
def initialize(
worker_class, job,
mode: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_MODE'],
compression_threshold: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_COMPRESSION_THRESHOLD_BYTES'],
size_limit: ENV['GITLAB_SIDEKIQ_SIZE_LIMITER_LIMIT_BYTES']
)
@worker_class = worker_class
@job = job
set_mode(mode)
set_compression_threshold(compression_threshold)
set_size_limit(size_limit)
end
def validate!
return unless @size_limit > 0
return if allow_big_payload?
job_args = compress_if_necessary(::Sidekiq.dump_json(@job['args']))
return if job_args.bytesize <= @size_limit
exception = exceed_limit_error(job_args)
if compress_mode?
raise exception
else
track(exception)
end
end
private
def set_mode(mode)
@mode = (mode || TRACK_MODE).to_s.strip
unless MODES.include?(@mode)
::Sidekiq.logger.warn "Invalid Sidekiq size limiter mode: #{@mode}. Fallback to #{TRACK_MODE} mode."
@mode = TRACK_MODE
end
end
def set_compression_threshold(compression_threshold)
@compression_threshold = (compression_threshold || DEFAULT_COMPRESION_THRESHOLD_BYTES).to_i
if @compression_threshold <= 0
::Sidekiq.logger.warn "Invalid Sidekiq size limiter compression threshold: #{@compression_threshold}"
@compression_threshold = DEFAULT_COMPRESION_THRESHOLD_BYTES
end
end
def set_size_limit(size_limit)
@size_limit = (size_limit || DEFAULT_SIZE_LIMIT).to_i
if @size_limit < 0
::Sidekiq.logger.warn "Invalid Sidekiq size limiter limit: #{@size_limit}"
end
end
def validate!
return unless @size_limit > 0
return if allow_big_payload?
return if job_size <= @size_limit
exception = ExceedLimitError.new(@worker_class, job_size, @size_limit)
def exceed_limit_error(job_args)
ExceedLimitError.new(@worker_class, job_args.bytesize, @size_limit).tap do |exception|
# This should belong to Gitlab::ErrorTracking. We'll remove this
# after this epic is done:
# https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/396
exception.set_backtrace(backtrace)
if raise_mode?
raise exception
else
track(exception)
end
end
private
def compress_if_necessary(job_args)
return job_args unless compress_mode?
return job_args if job_args.bytesize < @compression_threshold
def job_size
# This maynot be the optimal solution, but can be acceptable solution
# for now. Internally, Sidekiq calls Sidekiq.dump_json everywhere.
# There is no clean way to intefere to prevent double serialization.
@job_size ||= ::Sidekiq.dump_json(@job).bytesize
::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.compress(@job, job_args)
end
def allow_big_payload?
......@@ -80,8 +107,8 @@ module Gitlab
worker_class.respond_to?(:big_payload?) && worker_class.big_payload?
end
def raise_mode?
@mode == RAISE_MODE
def compress_mode?
@mode == COMPRESS_MODE
end
def track(exception)
......
......@@ -303,6 +303,39 @@ RSpec.describe Gitlab::SidekiqLogging::StructuredLogger do
expect { subject.call(job.dup, 'test_queue') {} }.not_to raise_error
end
end
context 'when the job payload is compressed' do
let(:compressed_args) { "eJyLVspIzcnJV4oFAA88AxE=" }
let(:expected_start_payload) do
start_payload.merge(
'args' => ['[COMPRESSED]'],
'job_size_bytes' => Sidekiq.dump_json([compressed_args]).bytesize,
'compressed' => true
)
end
let(:expected_end_payload) do
end_payload.merge(
'args' => ['[COMPRESSED]'],
'job_size_bytes' => Sidekiq.dump_json([compressed_args]).bytesize,
'compressed' => true
)
end
it 'logs it in the done log' do
Timecop.freeze(timestamp) do
expect(logger).to receive(:info).with(expected_start_payload).ordered
expect(logger).to receive(:info).with(expected_end_payload).ordered
job['args'] = [compressed_args]
job['compressed'] = true
call_subject(job, 'test_queue') do
::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor.decompress(job)
end
end
end
end
end
describe '#add_time_keys!' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::SizeLimiter::Compressor do
using RSpec::Parameterized::TableSyntax
let(:base_payload) do
{
"class" => "ARandomWorker",
"queue" => "a_worker",
"retry" => true,
"jid" => "d774900367dc8b2962b2479c",
"created_at" => 1234567890,
"enqueued_at" => 1234567890
}
end
describe '.compressed?' do
where(:job, :result) do
{} | false
base_payload.merge("args" => [123, 'hello', ['world']]) | false
base_payload.merge("args" => ['eJzLSM3JyQcABiwCFQ=='], 'compressed' => true) | true
end
with_them do
it 'returns whether the job payload is compressed' do
expect(described_class.compressed?(job)).to eql(result)
end
end
end
describe '.compress' do
where(:args) do
[
nil,
[],
['hello'],
[
{
"job_class" => "SomeWorker",
"job_id" => "b4a577edbccf1d805744efa9",
"provider_job_id" => nil,
"queue_name" => "default",
"arguments" => ["some", ["argument"]],
"executions" => 0,
"locale" => "en",
"attempt_number" => 1
},
nil,
'hello',
12345678901234567890,
['nice']
],
[
'2021-05-13_09:59:37.57483 rails-background-jobs : {"severity":"ERROR","time":"2021-05-13T09:59:37.574Z"',
'bonne journée - ขอให้มีความสุขในวันนี้ - một ngày mới tốt lành - 좋은 하루 되세요 - ごきげんよう',
'🤝 - 🦊'
]
]
end
with_them do
let(:payload) { base_payload.merge("args" => args) }
it 'injects compressed data' do
serialized_args = Sidekiq.dump_json(args)
described_class.compress(payload, serialized_args)
expect(payload['args'].length).to be(1)
expect(payload['args'].first).to be_a(String)
expect(payload['compressed']).to be(true)
expect(payload['original_job_size_bytes']).to eql(serialized_args.bytesize)
expect do
Sidekiq.dump_json(payload)
end.not_to raise_error
end
it 'can decompress the payload' do
original_payload = payload.deep_dup
described_class.compress(payload, Sidekiq.dump_json(args))
described_class.decompress(payload)
expect(payload).to eql(original_payload)
end
end
end
describe '.decompress' do
context 'job payload is not compressed' do
let(:payload) { base_payload.merge("args" => ['hello']) }
it 'preserves the payload after decompression' do
original_payload = payload.deep_dup
described_class.decompress(payload)
expect(payload).to eql(original_payload)
end
end
context 'job payload is compressed with a default level' do
let(:payload) do
base_payload.merge(
'args' => ['eF6LVspIzcnJV9JRKs8vyklRigUAMq0FqQ=='],
'compressed' => true
)
end
it 'decompresses and clean up the job payload' do
described_class.decompress(payload)
expect(payload['args']).to eql(%w[hello world])
expect(payload).not_to have_key('compressed')
end
end
context 'job payload is compressed with a different level' do
let(:payload) do
base_payload.merge(
'args' => [Base64.strict_encode64(Zlib::Deflate.deflate(Sidekiq.dump_json(%w[hello world]), 9))],
'compressed' => true
)
end
it 'decompresses and clean up the job payload' do
described_class.decompress(payload)
expect(payload['args']).to eql(%w[hello world])
expect(payload).not_to have_key('compressed')
end
end
context 'job payload argument list is malformed' do
let(:payload) do
base_payload.merge(
'args' => ['eNqLVspIzcnJV9JRKs8vyklRigUAMq0FqQ==', 'something else'],
'compressed' => true
)
end
it 'tracks the conflicting exception' do
expect(::Gitlab::ErrorTracking).to receive(:track_and_raise_exception).with(
be_a(::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor::PayloadDecompressionConflictError)
)
described_class.decompress(payload)
expect(payload['args']).to eql(%w[hello world])
expect(payload).not_to have_key('compressed')
end
end
context 'job payload is not a valid base64 string' do
let(:payload) do
base_payload.merge(
'args' => ['hello123'],
'compressed' => true
)
end
it 'raises an exception' do
expect do
described_class.decompress(payload)
end.to raise_error(::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor::PayloadDecompressionError)
end
end
context 'job payload compression does not contain a valid Gzip header' do
let(:payload) do
base_payload.merge(
'args' => ['aGVsbG8='],
'compressed' => true
)
end
it 'raises an exception' do
expect do
described_class.decompress(payload)
end.to raise_error(::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor::PayloadDecompressionError)
end
end
context 'job payload compression does not contain a valid Gzip body' do
let(:payload) do
base_payload.merge(
'args' => ["eNqLVspIzcnJVw=="],
'compressed' => true
)
end
it 'raises an exception' do
expect do
described_class.decompress(payload)
end.to raise_error(::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor::PayloadDecompressionError)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
# rubocop: disable RSpec/MultipleMemoizedHelpers
RSpec.describe Gitlab::SidekiqMiddleware::SizeLimiter::Server, :clean_gitlab_redis_queues do
subject(:middleware) { described_class.new }
let(:worker) { Class.new }
let(:job) do
{
"class" => "ARandomWorker",
"queue" => "a_worker",
"args" => %w[Hello World],
"created_at" => 1234567890,
"enqueued_at" => 1234567890
}
end
before do
allow(::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor).to receive(:compress)
end
it 'yields block' do
expect { |b| subject.call(worker, job, :test, &b) }.to yield_control.once
end
it 'calls the Compressor' do
expect(::Gitlab::SidekiqMiddleware::SizeLimiter::Compressor).to receive(:decompress).with(job)
subject.call(worker, job, :test) {}
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment