Commit 1a05c60e authored by Shinya Maeda's avatar Shinya Maeda

Add spec for chunked_io

parent 1108df18
...@@ -61,6 +61,10 @@ module Gitlab ...@@ -61,6 +61,10 @@ module Gitlab
data.length data.length
end end
def append!(data)
raise NotImplementedError
end
def truncate!(offset) def truncate!(offset)
raise NotImplementedError raise NotImplementedError
end end
......
...@@ -65,6 +65,10 @@ module Gitlab ...@@ -65,6 +65,10 @@ module Gitlab
raise NotImplementedError raise NotImplementedError
end end
def append!(data)
raise NotImplementedError
end
def truncate!(offset) def truncate!(offset)
raise NotImplementedError raise NotImplementedError
end end
......
...@@ -74,6 +74,13 @@ module Gitlab ...@@ -74,6 +74,13 @@ module Gitlab
end end
end end
def append!(data)
Gitlab::Redis::Cache.with do |redis|
redis.append(buffer_key, data)
data.length
end
end
def truncate!(offset) def truncate!(offset)
Gitlab::Redis::Cache.with do |redis| Gitlab::Redis::Cache.with do |redis|
return unless redis.exists(buffer_key) return unless redis.exists(buffer_key)
......
## ##
# This class is compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html) # This class is designed as it's compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html)
# source: https://gitlab.com/snippets/1685610
module Gitlab module Gitlab
module Ci module Ci
class Trace class Trace
module ChunkedFile module ChunkedFile
class ChunkedIO class ChunkedIO
class << self
def open(job_id, size, mode)
stream = self.new(job_id, size, mode)
yield stream
ensure
stream.close
end
end
WriteError = Class.new(StandardError) WriteError = Class.new(StandardError)
FailedToGetChunkError = Class.new(StandardError)
attr_reader :size attr_reader :size
attr_reader :tell attr_reader :tell
...@@ -23,7 +33,10 @@ module Gitlab ...@@ -23,7 +33,10 @@ module Gitlab
if /(w|a)/ =~ mode if /(w|a)/ =~ mode
@write_lock_uuid = Gitlab::ExclusiveLease.new(write_lock_key, timeout: 1.hour.to_i).try_obtain @write_lock_uuid = Gitlab::ExclusiveLease.new(write_lock_key, timeout: 1.hour.to_i).try_obtain
raise WriteError, 'Already opened by another process' unless write_lock_uuid raise WriteError, 'Already opened by another process' unless write_lock_uuid
seek(0, IO::SEEK_END) if /a/ =~ mode
end end
end end
...@@ -39,10 +52,6 @@ module Gitlab ...@@ -39,10 +52,6 @@ module Gitlab
true true
end end
def path
nil
end
def seek(pos, where = IO::SEEK_SET) def seek(pos, where = IO::SEEK_SET)
new_pos = new_pos =
case where case where
...@@ -111,46 +120,56 @@ module Gitlab ...@@ -111,46 +120,56 @@ module Gitlab
end end
def write(data, &block) def write(data, &block)
raise WriteError, 'Already opened by another process' unless write_lock_uuid raise WriteError, 'Could not write without lock' unless write_lock_uuid
raise WriteError, 'Could not write empty data' unless data.present?
while data.present? data = data.dup
empty_space = BUFFER_SIZE - chunk_offset
chunk_store.open(job_id, chunk_index, params_for_store) do |store| chunk_index_start = chunk_index
data_to_write = '' chunk_index_end = (tell + data.length) / BUFFER_SIZE
data_to_write += store.get if store.size > 0 prev_tell = tell
data_to_write += data.slice!(0..empty_space)
written_size = store.write!(data_to_write) (chunk_index_start..chunk_index_end).each do |c_index|
chunk_store.open(job_id, c_index, params_for_store) do |store|
writable_space = BUFFER_SIZE - chunk_offset
writing_size = [writable_space, data.length].min
raise WriteError, 'Written size mismatch' unless data_to_write.length == written_size if store.size > 0
written_size = store.append!(data.slice!(0...writing_size))
else
written_size = store.write!(data.slice!(0...writing_size))
end
block.call(store, chunk_index) if block_given? raise WriteError, 'Written size mismatch' unless writing_size == written_size
@tell += written_size @tell += written_size
@size += written_size @size = [tell, size].max
block.call(store, c_index) if block_given?
end end
end end
tell - prev_tell
end end
def truncate(offset) def truncate(offset, &block)
raise WriteError, 'Already opened by another process' unless write_lock_uuid raise WriteError, 'Could not write without lock' unless write_lock_uuid
raise WriteError, 'Offset is out of bound' if offset > size || offset < 0
removal_chunk_index_start = (offset / BUFFER_SIZE) chunk_index_start = (offset / BUFFER_SIZE)
removal_chunk_index_end = chunks_count - 1 chunk_index_end = chunks_count - 1
removal_chunk_offset = offset % BUFFER_SIZE
if removal_chunk_offset > 0 (chunk_index_start..chunk_index_end).reverse_each do |c_index|
chunk_store.open(job_id, removal_chunk_index_start, params_for_store) do |store| chunk_store.open(job_id, c_index, params_for_store) do |store|
store.truncate!(removal_chunk_offset) c_index_start = c_index * BUFFER_SIZE
end
removal_chunk_index_start += 1 if offset <= c_index_start
store.delete!
else
store.truncate!(offset - c_index_start) if store.size > 0
end end
(removal_chunk_index_start..removal_chunk_index_end).each do |removal_chunk_index| block.call(store, c_index) if block_given?
chunk_store.open(job_id, removal_chunk_index, params_for_store) do |store|
store.delete!
end end
end end
...@@ -165,15 +184,8 @@ module Gitlab ...@@ -165,15 +184,8 @@ module Gitlab
true true
end end
def delete_chunks!
truncate(0)
end
private private
##
# The below methods are not implemented in IO class
#
def in_range? def in_range?
@chunk_range&.include?(tell) @chunk_range&.include?(tell)
end end
...@@ -182,7 +194,10 @@ module Gitlab ...@@ -182,7 +194,10 @@ module Gitlab
unless in_range? unless in_range?
chunk_store.open(job_id, chunk_index, params_for_store) do |store| chunk_store.open(job_id, chunk_index, params_for_store) do |store|
@chunk = store.get @chunk = store.get
@chunk_range = (chunk_start...(chunk_start + @chunk.length))
raise FailedToGetChunkError unless chunk
@chunk_range = (chunk_start...(chunk_start + chunk.length))
end end
end end
...@@ -223,6 +238,10 @@ module Gitlab ...@@ -223,6 +238,10 @@ module Gitlab
def write_lock_key def write_lock_key
"live_trace:operation:write:#{job_id}" "live_trace:operation:write:#{job_id}"
end end
def chunk_store
raise NotImplementedError
end
end end
end end
end end
......
...@@ -7,7 +7,7 @@ module Gitlab ...@@ -7,7 +7,7 @@ module Gitlab
class << self class << self
def open(job_id, mode) def open(job_id, mode)
stream = self.class.new(job_id, mode) stream = self.new(job_id, mode)
yield stream yield stream
ensure ensure
......
...@@ -6,11 +6,11 @@ module Gitlab ...@@ -6,11 +6,11 @@ module Gitlab
BUFFER_SIZE = 128.kilobytes BUFFER_SIZE = 128.kilobytes
class << self class << self
def open(job_id, url, size, mode) def open(job_id, mode)
stream = self.class.new(job_id, mode) stream = self.new(job_id, mode)
yield stream yield stream
ensure
stream.close stream.close
end end
end end
......
This diff is collapsed.
module ChunkedIOHelpers
def fill_trace_to_chunks(data)
stream = Gitlab::Ci::Trace::ChunkedFile::ChunkedIO.new(job_id, data.length, 'wb')
stream.write(data)
stream.close
end
def sample_trace_raw
@sample_trace_raw ||= File.read(expand_fixture_path('trace/sample_trace'))
end
def sample_trace_size
sample_trace_raw.length
end
def stub_chunk_store_redis_get_failed
allow_any_instance_of(Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Redis)
.to receive(:get).and_return(nil)
end
def set_smaller_buffer_size_than(file_size)
blocks = (file_size / 128)
new_size = (blocks / 2) * 128
stub_const("Gitlab::Ci::Trace::ChunkedFile::ChunkedIO::BUFFER_SIZE", new_size)
end
def set_larger_buffer_size_than(file_size)
blocks = (file_size / 128)
new_size = (blocks * 2) * 128
stub_const("Gitlab::Ci::Trace::ChunkedFile::ChunkedIO::BUFFER_SIZE", new_size)
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment