Commit 2fac77b0 authored by Kamil Trzciński's avatar Kamil Trzciński Committed by Shinya Maeda

Simpler chunking :)

parent de5194cd
...@@ -25,6 +25,8 @@ module Ci ...@@ -25,6 +25,8 @@ module Ci
has_one :job_artifacts_metadata, -> { where(file_type: Ci::JobArtifact.file_types[:metadata]) }, class_name: 'Ci::JobArtifact', inverse_of: :job, foreign_key: :job_id has_one :job_artifacts_metadata, -> { where(file_type: Ci::JobArtifact.file_types[:metadata]) }, class_name: 'Ci::JobArtifact', inverse_of: :job, foreign_key: :job_id
has_one :job_artifacts_trace, -> { where(file_type: Ci::JobArtifact.file_types[:trace]) }, class_name: 'Ci::JobArtifact', inverse_of: :job, foreign_key: :job_id has_one :job_artifacts_trace, -> { where(file_type: Ci::JobArtifact.file_types[:trace]) }, class_name: 'Ci::JobArtifact', inverse_of: :job, foreign_key: :job_id
has_many :chunks, class_name: 'Ci::JobTraceChunk', foreign_key: :job_id, dependent: :destroy # rubocop:disable Cop/ActiveRecordDependent
has_one :metadata, class_name: 'Ci::BuildMetadata' has_one :metadata, class_name: 'Ci::BuildMetadata'
delegate :timeout, to: :metadata, prefix: true, allow_nil: true delegate :timeout, to: :metadata, prefix: true, allow_nil: true
......
...@@ -3,5 +3,112 @@ module Ci ...@@ -3,5 +3,112 @@ module Ci
extend Gitlab::Ci::Model extend Gitlab::Ci::Model
belongs_to :job, class_name: "Ci::Build", foreign_key: :job_id belongs_to :job, class_name: "Ci::Build", foreign_key: :job_id
after_destroy :redis_delete_data, if: :redis?
default_value_for :data_store, :redis
CHUNK_SIZE = 8
CHUNK_REDIS_TTL = 1.month
enum data_store: {
redis: 1,
db: 2,
}
def data
case
when redis?
redis_data
when db?
raw_data
else
raise 'Unsupported data store'
end
end
def set_data(value)
raise 'too much data' if value.length > CHUNK_SIZE
case
when redis?
redis_set_data(value)
when db?
self.raw_data = value
else
raise 'Unsupported data store'
end
save if changed?
schedule_to_db if fullfilled?
end
def truncate(offset = 0)
self.append("", offset)
end
def append(new_data, offset)
current_data = self.data || ""
raise 'Outside of if data' if offset > current_data.bytesize
self.set_data(current_data.byteslice(0, offset) + new_data)
end
def size
data&.bytesize.to_i
end
def start_offset
chunk_index * CHUNK_SIZE
end
def end_offset
start_offset + size
end
def range
(start_offset...end_offset)
end
def use_database!
return if db?
self.update!(raw_data: data, data_store: :db)
redis_delete_data
end
private
def schedule_to_db
return if db?
self.use_database!
end
def fullfilled?
size == CHUNK_SIZE
end
def redis_data
Gitlab::Redis::SharedState.with do |redis|
redis.get(redis_key)
end
end
def redis_set_data(data)
Gitlab::Redis::SharedState.with do |redis|
redis.set(redis_key, data, ex: CHUNK_REDIS_TTL)
end
end
def redis_delete_data
Gitlab::Redis::SharedState.with do |redis|
redis.del(redis_key)
end
end
def redis_key
"gitlab:ci:trace:#{job_id}:chunks:#{chunk_index}"
end
end end
end end
...@@ -7,7 +7,8 @@ class CreateCiJobTraceChunks < ActiveRecord::Migration ...@@ -7,7 +7,8 @@ class CreateCiJobTraceChunks < ActiveRecord::Migration
create_table :ci_job_trace_chunks do |t| create_table :ci_job_trace_chunks do |t|
t.integer :job_id, null: false t.integer :job_id, null: false
t.integer :chunk_index, null: false t.integer :chunk_index, null: false
t.text :data t.integer :data_store, null: false
t.text :raw_data
t.foreign_key :ci_builds, column: :job_id, on_delete: :cascade t.foreign_key :ci_builds, column: :job_id, on_delete: :cascade
t.index [:chunk_index, :job_id], unique: true t.index [:chunk_index, :job_id], unique: true
......
...@@ -373,7 +373,8 @@ ActiveRecord::Schema.define(version: 20180327101207) do ...@@ -373,7 +373,8 @@ ActiveRecord::Schema.define(version: 20180327101207) do
create_table "ci_job_trace_chunks", force: :cascade do |t| create_table "ci_job_trace_chunks", force: :cascade do |t|
t.integer "job_id", null: false t.integer "job_id", null: false
t.integer "chunk_index", null: false t.integer "chunk_index", null: false
t.text "data" t.integer "data_store", null: false
t.text "raw_data"
end end
add_index "ci_job_trace_chunks", ["chunk_index", "job_id"], name: "index_ci_job_trace_chunks_on_chunk_index_and_job_id", unique: true, using: :btree add_index "ci_job_trace_chunks", ["chunk_index", "job_id"], name: "index_ci_job_trace_chunks_on_chunk_index_and_job_id", unique: true, using: :btree
......
...@@ -54,15 +54,15 @@ module Gitlab ...@@ -54,15 +54,15 @@ module Gitlab
end end
def exist? def exist?
trace_artifact&.exists? || ChunkedFile::LiveTrace.exist?(job.id) || current_path.present? || old_trace.present? trace_artifact&.exists? || job.chunks.any? || current_path.present? || old_trace.present?
end end
def read def read
stream = Gitlab::Ci::Trace::Stream.new do stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact if trace_artifact
trace_artifact.open trace_artifact.open
elsif ChunkedFile::LiveTrace.exist?(job.id) elsif job.chunks.any?
ChunkedFile::LiveTrace.new(job.id, nil, "rb") Gitlab::Ci::Trace::ChunkedIO.new(job)
elsif current_path elsif current_path
File.open(current_path, "rb") File.open(current_path, "rb")
elsif old_trace elsif old_trace
...@@ -77,12 +77,10 @@ module Gitlab ...@@ -77,12 +77,10 @@ module Gitlab
def write def write
stream = Gitlab::Ci::Trace::Stream.new do stream = Gitlab::Ci::Trace::Stream.new do
if Feature.enabled?('ci_enable_live_trace')
if current_path if current_path
current_path current_path
else elsif Feature.enabled?('ci_enable_live_trace')
ChunkedFile::LiveTrace.new(job.id, nil, "a+b") Gitlab::Ci::Trace::ChunkedIO.new(job)
end
else else
File.open(ensure_path, "a+b") File.open(ensure_path, "a+b")
end end
...@@ -102,6 +100,7 @@ module Gitlab ...@@ -102,6 +100,7 @@ module Gitlab
FileUtils.rm(trace_path, force: true) FileUtils.rm(trace_path, force: true)
end end
job.chunks.destroy_all
job.erase_old_trace! job.erase_old_trace!
end end
...@@ -109,13 +108,10 @@ module Gitlab ...@@ -109,13 +108,10 @@ module Gitlab
raise ArchiveError, 'Already archived' if trace_artifact raise ArchiveError, 'Already archived' if trace_artifact
raise ArchiveError, 'Job is not finished yet' unless job.complete? raise ArchiveError, 'Job is not finished yet' unless job.complete?
if ChunkedFile::LiveTrace.exist?(job.id) if job.chunks.any?
ChunkedFile::LiveTrace.new(job.id, nil, 'a+b') do |live_trace_stream| Gitlab::Ci::Trace::ChunkedIO.new(job) do |stream|
StringIO.new(live_trace_stream.read, 'rb').tap do |stream|
archive_stream!(stream) archive_stream!(stream)
end stream.destroy!
live_trace_stream.delete
end end
elsif current_path elsif current_path
File.open(current_path) do |stream| File.open(current_path) do |stream|
......
##
# This class is compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html)
# source: https://gitlab.com/snippets/1685610
module Gitlab
module Ci
class Trace
class ChunkedIO
CHUNK_SIZE = ::Ci::JobTraceChunk::CHUNK_SIZE
FailedToGetChunkError = Class.new(StandardError)
attr_reader :job
attr_reader :tell, :size
attr_reader :chunk, :chunk_range
alias_method :pos, :tell
def initialize(job)
@job = job
@chunks_cache = []
@tell = 0
@size = job_chunks.last.try(&:end_offset).to_i
end
def close
# no-op
end
def binmode
# no-op
end
def binmode?
true
end
def path
nil
end
def url
nil
end
def seek(pos, where = IO::SEEK_SET)
new_pos =
case where
when IO::SEEK_END
size + pos
when IO::SEEK_SET
pos
when IO::SEEK_CUR
tell + pos
else
-1
end
raise 'new position is outside of file' if new_pos < 0 || new_pos > size
@tell = new_pos
end
def eof?
tell == size
end
def each_line
until eof?
line = readline
break if line.nil?
yield(line)
end
end
def read(length = nil)
out = ""
until eof? || (length && out.length >= length)
data = chunk_slice_from_offset
break if data.empty?
out << data
@tell += data.bytesize
end
out = out[0, length] if length && out.length > length
out
end
def readline
out = ""
until eof?
data = chunk_slice_from_offset
new_line = data.index("\n")
if !new_line.nil?
out << data[0..new_line]
@tell += new_line + 1
break
else
out << data
@tell += data.bytesize
end
end
out
end
def write(data)
start_pos = @tell
while @tell < start_pos + data.bytesize
# get slice from current offset till the end where it falls into chunk
chunk_bytes = CHUNK_SIZE - chunk_offset
chunk_data = data.byteslice(@tell - start_pos, chunk_bytes)
# append data to chunk, overwriting from that point
ensure_chunk.append(chunk_data, chunk_offset)
# move offsets within buffer
@tell += chunk_bytes
@size = [@size, @tell].max
end
end
def truncate(offset)
raise 'Outside of file' if offset > size
@tell = offset
@size = offset
invalidate_chunk_cache
# remove all next chunks
job_chunks.where('chunk_index > ?', chunk_index).destroy_all
# truncate current chunk
current_chunk.truncate(chunk_offset) if chunk_offset != 0
end
def flush
# no-op
end
def present?
true
end
def destroy!
job_chunks.destroy_all
invalidate_chunk_cache
end
private
##
# The below methods are not implemented in IO class
#
def in_range?
@chunk_range&.include?(tell)
end
def chunk_slice_from_offset
unless in_range?
current_chunk.tap do |chunk|
raise FailedToGetChunkError unless chunk
@chunk = chunk.data.force_encoding(Encoding::BINARY)
@chunk_range = chunk.range
end
end
@chunk.byteslice(chunk_offset, CHUNK_SIZE)
end
def chunk_offset
tell % CHUNK_SIZE
end
def chunk_index
tell / CHUNK_SIZE
end
def chunk_start
chunk_index * CHUNK_SIZE
end
def chunk_end
[chunk_start + CHUNK_SIZE, size].min
end
def invalidate_chunk_cache
@chunks_cache = []
end
def current_chunk
@chunks_cache[chunk_index] ||= job_chunks.find_by(chunk_index: chunk_index)
end
def build_chunk
@chunks_cache[chunk_index] = Ci::JobTraceChunk.new(job: job, chunk_index: chunk_index)
end
def ensure_chunk
current_chunk || build_chunk
end
def job_chunks
Ci::JobTraceChunk.where(job: job)
end
end
end
end
end
...@@ -161,7 +161,7 @@ module Gitlab ...@@ -161,7 +161,7 @@ module Gitlab
@chunk_range ||= (chunk_start...(chunk_start + @chunk.length)) @chunk_range ||= (chunk_start...(chunk_start + @chunk.length))
end end
@chunk[chunk_offset..BUFFER_SIZE] @chunk.byteslice(chunk_offset, BUFFER_SIZE)
end end
def request def request
......
...@@ -40,8 +40,9 @@ module Gitlab ...@@ -40,8 +40,9 @@ module Gitlab
end end
def set(data) def set(data)
truncate(0) stream.seek(0, IO::SEEK_SET)
stream.write(data) stream.write(data)
stream.truncate(data.bytesize)
stream.flush() stream.flush()
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment