Commit e134ebfe authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch 'backstage/gb/live-traces-redis-append' into 'master'

Improve performance of writing live trace chunks

Closes #34781

See merge request gitlab-org/gitlab!37075
parents 38640295 b5d1c4ce
...@@ -75,18 +75,16 @@ module Ci ...@@ -75,18 +75,16 @@ module Ci
def append(new_data, offset) def append(new_data, offset)
raise ArgumentError, 'New data is missing' unless new_data raise ArgumentError, 'New data is missing' unless new_data
raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0 raise ArgumentError, 'Offset is out of range' if offset < 0 || offset > size
raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize) raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize)
in_lock(*lock_params) do # Write operation is atomic in_lock(*lock_params) { unsafe_append_data!(new_data, offset) }
unsafe_set_data!(data.byteslice(0, offset) + new_data)
end
schedule_to_persist if full? schedule_to_persist if full?
end end
def size def size
data&.bytesize.to_i @size ||= current_store.size(self) || data&.bytesize
end end
def start_offset def start_offset
...@@ -118,7 +116,7 @@ module Ci ...@@ -118,7 +116,7 @@ module Ci
raise FailedToPersistDataError, 'Data is not fulfilled in a bucket' raise FailedToPersistDataError, 'Data is not fulfilled in a bucket'
end end
old_store_class = self.class.get_store_class(data_store) old_store_class = current_store
self.raw_data = nil self.raw_data = nil
self.data_store = new_store self.data_store = new_store
...@@ -128,16 +126,33 @@ module Ci ...@@ -128,16 +126,33 @@ module Ci
end end
def get_data def get_data
self.class.get_store_class(data_store).data(self)&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default current_store.data(self)&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default
rescue Excon::Error::NotFound
# If the data store is :fog and the file does not exist in the object storage, this method returns nil.
end end
def unsafe_set_data!(value) def unsafe_set_data!(value)
raise ArgumentError, 'New data size exceeds chunk size' if value.bytesize > CHUNK_SIZE raise ArgumentError, 'New data size exceeds chunk size' if value.bytesize > CHUNK_SIZE
self.class.get_store_class(data_store).set_data(self, value) current_store.set_data(self, value)
@data = value @data = value
@size = value.bytesize
save! if changed?
end
def unsafe_append_data!(value, offset)
new_size = value.bytesize + offset
if new_size > CHUNK_SIZE
raise ArgumentError, 'New data size exceeds chunk size'
end
current_store.append_data(self, value, offset).then do |stored|
raise ArgumentError, 'Trace appended incorrectly' if stored != new_size
end
@data = nil
@size = new_size
save! if changed? save! if changed?
end end
...@@ -156,6 +171,10 @@ module Ci ...@@ -156,6 +171,10 @@ module Ci
size == CHUNK_SIZE size == CHUNK_SIZE
end end
def current_store
self.class.get_store_class(data_store)
end
def lock_params def lock_params
["trace_write:#{build_id}:chunks:#{chunk_index}", ["trace_write:#{build_id}:chunks:#{chunk_index}",
{ ttl: WRITE_LOCK_TTL, { ttl: WRITE_LOCK_TTL,
......
...@@ -19,8 +19,22 @@ module Ci ...@@ -19,8 +19,22 @@ module Ci
model.raw_data model.raw_data
end end
def set_data(model, data) def set_data(model, new_data)
model.raw_data = data model.raw_data = new_data
end
def append_data(model, new_data, offset)
if offset > 0
truncated_data = data(model).to_s.byteslice(0, offset)
new_data = truncated_data + new_data
end
model.raw_data = new_data
model.raw_data.to_s.bytesize
end
def size(model)
data(model).to_s.bytesize
end end
def delete_data(model) def delete_data(model)
......
...@@ -9,10 +9,29 @@ module Ci ...@@ -9,10 +9,29 @@ module Ci
def data(model) def data(model)
connection.get_object(bucket_name, key(model))[:body] connection.get_object(bucket_name, key(model))[:body]
rescue Excon::Error::NotFound
# If the object does not exist in the object storage, this method returns nil.
end end
def set_data(model, data) def set_data(model, new_data)
connection.put_object(bucket_name, key(model), data) connection.put_object(bucket_name, key(model), new_data)
end
def append_data(model, new_data, offset)
if offset > 0
truncated_data = data(model).to_s.byteslice(0, offset)
new_data = truncated_data + new_data
end
set_data(model, new_data)
new_data.bytesize
end
def size(model)
connection.head_object(bucket_name, key(model))
.get_header('Content-Length')
rescue Excon::Error::NotFound
0
end end
def delete_data(model) def delete_data(model)
......
...@@ -4,6 +4,32 @@ module Ci ...@@ -4,6 +4,32 @@ module Ci
module BuildTraceChunks module BuildTraceChunks
class Redis class Redis
CHUNK_REDIS_TTL = 1.week CHUNK_REDIS_TTL = 1.week
LUA_APPEND_CHUNK = <<~EOS.freeze
local key, new_data, offset = KEYS[1], ARGV[1], ARGV[2]
local length = new_data:len()
local expire = #{CHUNK_REDIS_TTL.seconds}
local current_size = redis.call("strlen", key)
offset = tonumber(offset)
if offset == 0 then
-- overwrite everything
redis.call("set", key, new_data, "ex", expire)
return redis.call("strlen", key)
elseif offset > current_size then
-- offset range violation
return -1
elseif offset + length >= current_size then
-- efficiently append or overwrite and append
redis.call("expire", key, expire)
return redis.call("setrange", key, offset, new_data)
else
-- append and truncate
local current_data = redis.call("get", key)
new_data = current_data:sub(1, offset) .. new_data
redis.call("set", key, new_data, "ex", expire)
return redis.call("strlen", key)
end
EOS
def available? def available?
true true
...@@ -21,6 +47,18 @@ module Ci ...@@ -21,6 +47,18 @@ module Ci
end end
end end
def append_data(model, new_data, offset)
Gitlab::Redis::SharedState.with do |redis|
redis.eval(LUA_APPEND_CHUNK, keys: [key(model)], argv: [new_data, offset])
end
end
def size(model)
Gitlab::Redis::SharedState.with do |redis|
redis.strlen(key(model))
end
end
def delete_data(model) def delete_data(model)
delete_keys([[model.build_id, model.chunk_index]]) delete_keys([[model.build_id, model.chunk_index]])
end end
......
...@@ -101,7 +101,7 @@ RSpec.describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do ...@@ -101,7 +101,7 @@ RSpec.describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do
describe '#append' do describe '#append' do
shared_examples_for 'appends' do shared_examples_for 'appends' do
it "truncates and append content" do it "truncates and appends content" do
stream.append(+"89", 4) stream.append(+"89", 4)
stream.seek(0) stream.seek(0)
......
...@@ -262,6 +262,12 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -262,6 +262,12 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.data).to be_empty expect(build_trace_chunk.data).to be_empty
end end
it 'does not read data when appending' do
expect(build_trace_chunk).not_to receive(:data)
build_trace_chunk.append(new_data, offset)
end
it_behaves_like 'Appending correctly' it_behaves_like 'Appending correctly'
it_behaves_like 'Scheduling sidekiq worker to flush data to persist store' it_behaves_like 'Scheduling sidekiq worker to flush data to persist store'
end end
...@@ -486,7 +492,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -486,7 +492,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.redis?).to be_truthy expect(build_trace_chunk.redis?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to eq(data) expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to eq(data)
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound) expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to be_nil
subject subject
...@@ -508,7 +514,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -508,7 +514,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.redis?).to be_truthy expect(build_trace_chunk.redis?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to eq(data) expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to eq(data)
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound) expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to be_nil
end end
end end
end end
...@@ -535,7 +541,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -535,7 +541,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.database?).to be_truthy expect(build_trace_chunk.database?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to eq(data) expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to eq(data)
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound) expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to be_nil
subject subject
...@@ -557,7 +563,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -557,7 +563,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.database?).to be_truthy expect(build_trace_chunk.database?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to eq(data) expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to eq(data)
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound) expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to be_nil
end end
end end
end end
......
...@@ -89,6 +89,24 @@ RSpec.describe Ci::BuildTraceChunks::Database do ...@@ -89,6 +89,24 @@ RSpec.describe Ci::BuildTraceChunks::Database do
end end
end end
describe '#size' do
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :database_with_data, initial_data: 'üabcdef') }
it 'returns data bytesize correctly' do
expect(data_store.size(model)).to eq 8
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :database_without_data) }
it 'returns zero' do
expect(data_store.size(model)).to be_zero
end
end
end
describe '#keys' do describe '#keys' do
subject { data_store.keys(relation) } subject { data_store.keys(relation) }
......
...@@ -40,7 +40,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do ...@@ -40,7 +40,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) } let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'returns nil' do it 'returns nil' do
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
end end
end end
end end
...@@ -66,7 +66,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do ...@@ -66,7 +66,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) } let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'sets new data' do it 'sets new data' do
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
subject subject
...@@ -86,7 +86,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do ...@@ -86,7 +86,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
subject subject
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
end end
end end
...@@ -94,11 +94,29 @@ RSpec.describe Ci::BuildTraceChunks::Fog do ...@@ -94,11 +94,29 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) } let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'does nothing' do it 'does nothing' do
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
subject subject
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
end
end
end
describe '#size' do
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :fog_with_data, initial_data: 'üabcd') }
it 'returns data bytesize correctly' do
expect(data_store.size(model)).to eq 6
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'returns zero' do
expect(data_store.size(model)).to be_zero
end end
end end
end end
......
...@@ -61,6 +61,86 @@ RSpec.describe Ci::BuildTraceChunks::Redis, :clean_gitlab_redis_shared_state do ...@@ -61,6 +61,86 @@ RSpec.describe Ci::BuildTraceChunks::Redis, :clean_gitlab_redis_shared_state do
end end
end end
describe '#append_data' do
context 'when valid offset is used with existing data' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'abcd') }
it 'appends data' do
expect(data_store.data(model)).to eq('abcd')
length = data_store.append_data(model, '12345', 4)
expect(length).to eq 9
expect(data_store.data(model)).to eq('abcd12345')
end
end
context 'when data does not exist yet' do
let(:model) { create(:ci_build_trace_chunk, :redis_without_data) }
it 'sets new data' do
expect(data_store.data(model)).to be_nil
length = data_store.append_data(model, 'abc', 0)
expect(length).to eq 3
expect(data_store.data(model)).to eq('abc')
end
end
context 'when data needs to be truncated' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: '12345678') }
it 'appends data and truncates stored value' do
expect(data_store.data(model)).to eq('12345678')
length = data_store.append_data(model, 'ab', 4)
expect(length).to eq 6
expect(data_store.data(model)).to eq('1234ab')
end
end
context 'when invalid offset is provided' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'abc') }
it 'raises an exception' do
length = data_store.append_data(model, '12345', 4)
expect(length).to be_negative
end
end
context 'when trace contains multi-byte UTF8 characters' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'aüc') }
it 'appends data' do
length = data_store.append_data(model, '1234', 4)
data_store.data(model).then do |new_data|
expect(new_data.bytesize).to eq 8
expect(new_data).to eq 'aüc1234'
end
expect(length).to eq 8
end
end
context 'when trace contains non-UTF8 characters' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: "a\255c") }
it 'appends data' do
length = data_store.append_data(model, '1234', 3)
data_store.data(model).then do |new_data|
expect(new_data.bytesize).to eq 7
end
expect(length).to eq 7
end
end
end
describe '#delete_data' do describe '#delete_data' do
subject { data_store.delete_data(model) } subject { data_store.delete_data(model) }
...@@ -89,6 +169,24 @@ RSpec.describe Ci::BuildTraceChunks::Redis, :clean_gitlab_redis_shared_state do ...@@ -89,6 +169,24 @@ RSpec.describe Ci::BuildTraceChunks::Redis, :clean_gitlab_redis_shared_state do
end end
end end
describe '#size' do
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'üabcd') }
it 'returns data bytesize correctly' do
expect(data_store.size(model)).to eq 6
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :redis_without_data) }
it 'returns zero' do
expect(data_store.size(model)).to be_zero
end
end
end
describe '#keys' do describe '#keys' do
subject { data_store.keys(relation) } subject { data_store.keys(relation) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment