Commit b5d1c4ce authored by Grzegorz Bizon's avatar Grzegorz Bizon Committed by Kamil Trzciński

Append Redis live trace data to avoid reading it first

This change introduces an incremental append mechanism for writing a
live trace. Instead of reading data from Redis, appening in Ruby, it
attemps to append data directly into Redis, what should improve
performance and reduce the a mount of data transferred between Redis and
GitLab.
parent 32a1dc5e
...@@ -75,18 +75,16 @@ module Ci ...@@ -75,18 +75,16 @@ module Ci
def append(new_data, offset) def append(new_data, offset)
raise ArgumentError, 'New data is missing' unless new_data raise ArgumentError, 'New data is missing' unless new_data
raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0 raise ArgumentError, 'Offset is out of range' if offset < 0 || offset > size
raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize) raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize)
in_lock(*lock_params) do # Write operation is atomic in_lock(*lock_params) { unsafe_append_data!(new_data, offset) }
unsafe_set_data!(data.byteslice(0, offset) + new_data)
end
schedule_to_persist if full? schedule_to_persist if full?
end end
def size def size
data&.bytesize.to_i @size ||= current_store.size(self) || data&.bytesize
end end
def start_offset def start_offset
...@@ -118,7 +116,7 @@ module Ci ...@@ -118,7 +116,7 @@ module Ci
raise FailedToPersistDataError, 'Data is not fulfilled in a bucket' raise FailedToPersistDataError, 'Data is not fulfilled in a bucket'
end end
old_store_class = self.class.get_store_class(data_store) old_store_class = current_store
self.raw_data = nil self.raw_data = nil
self.data_store = new_store self.data_store = new_store
...@@ -128,16 +126,33 @@ module Ci ...@@ -128,16 +126,33 @@ module Ci
end end
def get_data def get_data
self.class.get_store_class(data_store).data(self)&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default current_store.data(self)&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default
rescue Excon::Error::NotFound
# If the data store is :fog and the file does not exist in the object storage, this method returns nil.
end end
def unsafe_set_data!(value) def unsafe_set_data!(value)
raise ArgumentError, 'New data size exceeds chunk size' if value.bytesize > CHUNK_SIZE raise ArgumentError, 'New data size exceeds chunk size' if value.bytesize > CHUNK_SIZE
self.class.get_store_class(data_store).set_data(self, value) current_store.set_data(self, value)
@data = value @data = value
@size = value.bytesize
save! if changed?
end
def unsafe_append_data!(value, offset)
new_size = value.bytesize + offset
if new_size > CHUNK_SIZE
raise ArgumentError, 'New data size exceeds chunk size'
end
current_store.append_data(self, value, offset).then do |stored|
raise ArgumentError, 'Trace appended incorrectly' if stored != new_size
end
@data = nil
@size = new_size
save! if changed? save! if changed?
end end
...@@ -156,6 +171,10 @@ module Ci ...@@ -156,6 +171,10 @@ module Ci
size == CHUNK_SIZE size == CHUNK_SIZE
end end
def current_store
self.class.get_store_class(data_store)
end
def lock_params def lock_params
["trace_write:#{build_id}:chunks:#{chunk_index}", ["trace_write:#{build_id}:chunks:#{chunk_index}",
{ ttl: WRITE_LOCK_TTL, { ttl: WRITE_LOCK_TTL,
......
...@@ -19,8 +19,22 @@ module Ci ...@@ -19,8 +19,22 @@ module Ci
model.raw_data model.raw_data
end end
def set_data(model, data) def set_data(model, new_data)
model.raw_data = data model.raw_data = new_data
end
def append_data(model, new_data, offset)
if offset > 0
truncated_data = data(model).to_s.byteslice(0, offset)
new_data = truncated_data + new_data
end
model.raw_data = new_data
model.raw_data.to_s.bytesize
end
def size(model)
data(model).to_s.bytesize
end end
def delete_data(model) def delete_data(model)
......
...@@ -9,10 +9,29 @@ module Ci ...@@ -9,10 +9,29 @@ module Ci
def data(model) def data(model)
connection.get_object(bucket_name, key(model))[:body] connection.get_object(bucket_name, key(model))[:body]
rescue Excon::Error::NotFound
# If the object does not exist in the object storage, this method returns nil.
end end
def set_data(model, data) def set_data(model, new_data)
connection.put_object(bucket_name, key(model), data) connection.put_object(bucket_name, key(model), new_data)
end
def append_data(model, new_data, offset)
if offset > 0
truncated_data = data(model).to_s.byteslice(0, offset)
new_data = truncated_data + new_data
end
set_data(model, new_data)
new_data.bytesize
end
def size(model)
connection.head_object(bucket_name, key(model))
.get_header('Content-Length')
rescue Excon::Error::NotFound
0
end end
def delete_data(model) def delete_data(model)
......
...@@ -4,6 +4,32 @@ module Ci ...@@ -4,6 +4,32 @@ module Ci
module BuildTraceChunks module BuildTraceChunks
class Redis class Redis
CHUNK_REDIS_TTL = 1.week CHUNK_REDIS_TTL = 1.week
LUA_APPEND_CHUNK = <<~EOS.freeze
local key, new_data, offset = KEYS[1], ARGV[1], ARGV[2]
local length = new_data:len()
local expire = #{CHUNK_REDIS_TTL.seconds}
local current_size = redis.call("strlen", key)
offset = tonumber(offset)
if offset == 0 then
-- overwrite everything
redis.call("set", key, new_data, "ex", expire)
return redis.call("strlen", key)
elseif offset > current_size then
-- offset range violation
return -1
elseif offset + length >= current_size then
-- efficiently append or overwrite and append
redis.call("expire", key, expire)
return redis.call("setrange", key, offset, new_data)
else
-- append and truncate
local current_data = redis.call("get", key)
new_data = current_data:sub(1, offset) .. new_data
redis.call("set", key, new_data, "ex", expire)
return redis.call("strlen", key)
end
EOS
def available? def available?
true true
...@@ -21,6 +47,18 @@ module Ci ...@@ -21,6 +47,18 @@ module Ci
end end
end end
def append_data(model, new_data, offset)
Gitlab::Redis::SharedState.with do |redis|
redis.eval(LUA_APPEND_CHUNK, keys: [key(model)], argv: [new_data, offset])
end
end
def size(model)
Gitlab::Redis::SharedState.with do |redis|
redis.strlen(key(model))
end
end
def delete_data(model) def delete_data(model)
delete_keys([[model.build_id, model.chunk_index]]) delete_keys([[model.build_id, model.chunk_index]])
end end
......
...@@ -101,7 +101,7 @@ RSpec.describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do ...@@ -101,7 +101,7 @@ RSpec.describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do
describe '#append' do describe '#append' do
shared_examples_for 'appends' do shared_examples_for 'appends' do
it "truncates and append content" do it "truncates and appends content" do
stream.append(+"89", 4) stream.append(+"89", 4)
stream.seek(0) stream.seek(0)
......
...@@ -262,6 +262,12 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -262,6 +262,12 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.data).to be_empty expect(build_trace_chunk.data).to be_empty
end end
it 'does not read data when appending' do
expect(build_trace_chunk).not_to receive(:data)
build_trace_chunk.append(new_data, offset)
end
it_behaves_like 'Appending correctly' it_behaves_like 'Appending correctly'
it_behaves_like 'Scheduling sidekiq worker to flush data to persist store' it_behaves_like 'Scheduling sidekiq worker to flush data to persist store'
end end
...@@ -486,7 +492,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -486,7 +492,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.redis?).to be_truthy expect(build_trace_chunk.redis?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to eq(data) expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to eq(data)
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound) expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to be_nil
subject subject
...@@ -508,7 +514,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -508,7 +514,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.redis?).to be_truthy expect(build_trace_chunk.redis?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to eq(data) expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to eq(data)
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound) expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to be_nil
end end
end end
end end
...@@ -535,7 +541,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -535,7 +541,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.database?).to be_truthy expect(build_trace_chunk.database?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to eq(data) expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to eq(data)
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound) expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to be_nil
subject subject
...@@ -557,7 +563,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -557,7 +563,7 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.database?).to be_truthy expect(build_trace_chunk.database?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to eq(data) expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to eq(data)
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound) expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to be_nil
end end
end end
end end
......
...@@ -89,6 +89,24 @@ RSpec.describe Ci::BuildTraceChunks::Database do ...@@ -89,6 +89,24 @@ RSpec.describe Ci::BuildTraceChunks::Database do
end end
end end
describe '#size' do
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :database_with_data, initial_data: 'üabcdef') }
it 'returns data bytesize correctly' do
expect(data_store.size(model)).to eq 8
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :database_without_data) }
it 'returns zero' do
expect(data_store.size(model)).to be_zero
end
end
end
describe '#keys' do describe '#keys' do
subject { data_store.keys(relation) } subject { data_store.keys(relation) }
......
...@@ -40,7 +40,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do ...@@ -40,7 +40,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) } let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'returns nil' do it 'returns nil' do
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
end end
end end
end end
...@@ -66,7 +66,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do ...@@ -66,7 +66,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) } let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'sets new data' do it 'sets new data' do
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
subject subject
...@@ -86,7 +86,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do ...@@ -86,7 +86,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
subject subject
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
end end
end end
...@@ -94,11 +94,29 @@ RSpec.describe Ci::BuildTraceChunks::Fog do ...@@ -94,11 +94,29 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) } let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'does nothing' do it 'does nothing' do
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
subject subject
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound) expect(data_store.data(model)).to be_nil
end
end
end
describe '#size' do
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :fog_with_data, initial_data: 'üabcd') }
it 'returns data bytesize correctly' do
expect(data_store.size(model)).to eq 6
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'returns zero' do
expect(data_store.size(model)).to be_zero
end end
end end
end end
......
...@@ -61,6 +61,86 @@ RSpec.describe Ci::BuildTraceChunks::Redis, :clean_gitlab_redis_shared_state do ...@@ -61,6 +61,86 @@ RSpec.describe Ci::BuildTraceChunks::Redis, :clean_gitlab_redis_shared_state do
end end
end end
describe '#append_data' do
context 'when valid offset is used with existing data' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'abcd') }
it 'appends data' do
expect(data_store.data(model)).to eq('abcd')
length = data_store.append_data(model, '12345', 4)
expect(length).to eq 9
expect(data_store.data(model)).to eq('abcd12345')
end
end
context 'when data does not exist yet' do
let(:model) { create(:ci_build_trace_chunk, :redis_without_data) }
it 'sets new data' do
expect(data_store.data(model)).to be_nil
length = data_store.append_data(model, 'abc', 0)
expect(length).to eq 3
expect(data_store.data(model)).to eq('abc')
end
end
context 'when data needs to be truncated' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: '12345678') }
it 'appends data and truncates stored value' do
expect(data_store.data(model)).to eq('12345678')
length = data_store.append_data(model, 'ab', 4)
expect(length).to eq 6
expect(data_store.data(model)).to eq('1234ab')
end
end
context 'when invalid offset is provided' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'abc') }
it 'raises an exception' do
length = data_store.append_data(model, '12345', 4)
expect(length).to be_negative
end
end
context 'when trace contains multi-byte UTF8 characters' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'aüc') }
it 'appends data' do
length = data_store.append_data(model, '1234', 4)
data_store.data(model).then do |new_data|
expect(new_data.bytesize).to eq 8
expect(new_data).to eq 'aüc1234'
end
expect(length).to eq 8
end
end
context 'when trace contains non-UTF8 characters' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: "a\255c") }
it 'appends data' do
length = data_store.append_data(model, '1234', 3)
data_store.data(model).then do |new_data|
expect(new_data.bytesize).to eq 7
end
expect(length).to eq 7
end
end
end
describe '#delete_data' do describe '#delete_data' do
subject { data_store.delete_data(model) } subject { data_store.delete_data(model) }
...@@ -89,6 +169,24 @@ RSpec.describe Ci::BuildTraceChunks::Redis, :clean_gitlab_redis_shared_state do ...@@ -89,6 +169,24 @@ RSpec.describe Ci::BuildTraceChunks::Redis, :clean_gitlab_redis_shared_state do
end end
end end
describe '#size' do
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'üabcd') }
it 'returns data bytesize correctly' do
expect(data_store.size(model)).to eq 6
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :redis_without_data) }
it 'returns zero' do
expect(data_store.size(model)).to be_zero
end
end
end
describe '#keys' do describe '#keys' do
subject { data_store.keys(relation) } subject { data_store.keys(relation) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment