Commit bf38f3f6 authored by Roque's avatar Roque

Chunk size only customizable in ingestion operations

- download chunk size relies on remote file chunk size
- clearer log messages
parent 22f89bb4
...@@ -215,7 +215,7 @@ module Embulk ...@@ -215,7 +215,7 @@ module Embulk
super super
@supplier = task['supplier'] @supplier = task['supplier']
@dataset = task['data_set'] @dataset = task['data_set']
@chunk_size = DatasetUtils::CHUNK_SIZE @chunk_size = task['chunk_size']
@data_set_directory = task['data_set_directory'] @data_set_directory = task['data_set_directory']
@logger = LogManager.instance() @logger = LogManager.instance()
@dataset_utils = DatasetUtils.new(@data_set_directory) @dataset_utils = DatasetUtils.new(@data_set_directory)
......
...@@ -94,7 +94,6 @@ module Embulk ...@@ -94,7 +94,6 @@ module Embulk
@erp5_url = config.param('erp5_url', :string) @erp5_url = config.param('erp5_url', :string)
@data_set = config.param('data_set', :string) @data_set = config.param('data_set', :string)
@logger.info("Dataset name: #{@data_set}") @logger.info("Dataset name: #{@data_set}")
@chunk_size = config.param('chunk_size', :float, default: 0) * DatasetUtils::MEGA
@output_path = config.param("output_path", :string, :default => nil) @output_path = config.param("output_path", :string, :default => nil)
if not File.directory?(@output_path) if not File.directory?(@output_path)
@logger.error("Output directory not found.", print=TRUE) @logger.error("Output directory not found.", print=TRUE)
...@@ -103,14 +102,14 @@ module Embulk ...@@ -103,14 +102,14 @@ module Embulk
task = { task = {
'erp5_url' => @erp5_url, 'erp5_url' => @erp5_url,
'data_set' => @data_set, 'data_set' => @data_set,
'chunk_size' => @chunk_size, 'chunk_size' => DatasetUtils::CHUNK_SIZE + 10,
'output_path' => @output_path, 'output_path' => @output_path,
'tool_dir' => @tool_dir 'tool_dir' => @tool_dir
} }
if task['chunk_size'] == 0 if task['chunk_size'] == 0
task['chunk_size'] = DatasetUtils::CHUNK_SIZE task['chunk_size'] = DatasetUtils::CHUNK_SIZE
end end
@logger.info("Chunk size set in #{task['chunk_size']/DatasetUtils::MEGA}MB") @logger.info("Download chunk size relies on server file chunks.")
@dataset_utils = DatasetUtils.new("") @dataset_utils = DatasetUtils.new("")
task['data_set_directory'] = @dataset_utils.appendSlashTo(@output_path) task['data_set_directory'] = @dataset_utils.appendSlashTo(@output_path)
@data_set_directory = task['data_set_directory'] @data_set_directory = task['data_set_directory']
...@@ -242,7 +241,6 @@ module Embulk ...@@ -242,7 +241,6 @@ module Embulk
def initialize(task, schema, index, page_builder) def initialize(task, schema, index, page_builder)
super super
@data_set = task['data_set'] @data_set = task['data_set']
@chunk_size = DatasetUtils::CHUNK_SIZE
@data_set_directory = task['data_set_directory'] @data_set_directory = task['data_set_directory']
@wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password']) @wendelin = WendelinClient.new(task['erp5_url'], task['user'], task['password'])
@logger = LogManager.instance() @logger = LogManager.instance()
...@@ -266,18 +264,26 @@ module Embulk ...@@ -266,18 +264,26 @@ module Embulk
entry = [reference, new_reference, @data_set, TRUE, renamed] entry = [reference, new_reference, @data_set, TRUE, renamed]
page_builder.add(entry) page_builder.add(entry)
else else
chunk_size = data_stream_chunk_list[0]["size"] #first chunk size
@logger.info("Discarding local change on '#{remote_file["path"]}'", print=TRUE) if task['discard_changes'] @logger.info("Discarding local change on '#{remote_file["path"]}'", print=TRUE) if task['discard_changes']
@logger.info("Getting content from remote #{reference}", print=TRUE) @logger.info("Getting content from remote #{reference}", print=TRUE)
@logger.info("Downloading...", print=TRUE) resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(DatasetUtils::DOWNLOAD, reference, large_hash, chunk_size) : 0
resume_split = @dataset_utils.splitOperationFileExist(reference) ? @dataset_utils.getLastSplitOperation(DatasetUtils::DOWNLOAD, reference, large_hash, @chunk_size) : 0
n_chunk = resume_split == 0 ? 0 : resume_split+1 n_chunk = resume_split == 0 ? 0 : resume_split+1
split = n_chunk > 0 split = n_chunk > 0
@logger.info("Resuming interrupted split download...", print=TRUE) if split if split
@logger.info("Resuming interrupted split download...", print=TRUE)
else
if data_stream_chunk_list.length > 1
@logger.info("Downloading large file split in chunks...", print=TRUE)
else
@logger.info("Downloading...", print=TRUE)
end
end
data_stream_chunk_list.each_with_index do |data_stream_chunk, index| data_stream_chunk_list.each_with_index do |data_stream_chunk, index|
#skip datastreams/chunks already downloaded #skip datastreams/chunks already downloaded
if n_chunk == index if n_chunk == index
content = "" content = ""
@wendelin.eachDataStreamContentChunk(data_stream_chunk["id"], @chunk_size, 0) do |chunk| @wendelin.eachDataStreamContentChunk(data_stream_chunk["id"], chunk_size + 10, 0, data_stream_chunk_list.length > 1) do |chunk|
content = chunk.nil? || chunk.empty? ? "" : Base64.encode64(chunk) content = chunk.nil? || chunk.empty? ? "" : Base64.encode64(chunk)
end end
begin_of_file = n_chunk == 0 begin_of_file = n_chunk == 0
...@@ -285,10 +291,11 @@ module Embulk ...@@ -285,10 +291,11 @@ module Embulk
@dataset_utils.createSplitOperationControlFile(reference) if split @dataset_utils.createSplitOperationControlFile(reference) if split
entry = [reference, content, @data_set, begin_of_file, renamed] entry = [reference, content, @data_set, begin_of_file, renamed]
page_builder.add(entry) page_builder.add(entry)
@dataset_utils.saveSplitOperation(DatasetUtils::DOWNLOAD, reference, n_chunk, large_hash, @chunk_size) if split @dataset_utils.saveSplitOperation(DatasetUtils::DOWNLOAD, reference, n_chunk, large_hash, chunk_size) if split
n_chunk += 1 n_chunk += 1
end end
end end
@logger.info("Done", print=TRUE) if data_stream_chunk_list.length > 1
end end
page_builder.finish page_builder.finish
@dataset_utils.deleteSplitOperationFile(reference) if split @dataset_utils.deleteSplitOperationFile(reference) if split
......
...@@ -60,7 +60,7 @@ module Embulk ...@@ -60,7 +60,7 @@ module Embulk
File.open(file_path, write_mode) { |file| file.write(data_chunk) } File.open(file_path, write_mode) { |file| file.write(data_chunk) }
end end
end end
rescue Exception => e rescue Exception => e
@logger.error("An error occurred while procesing file.", print=TRUE) @logger.error("An error occurred while procesing file.", print=TRUE)
@logger.error(e.backtrace) @logger.error(e.backtrace)
raise e raise e
......
...@@ -139,7 +139,7 @@ class WendelinClient ...@@ -139,7 +139,7 @@ class WendelinClient
return {"success"=>TRUE, "message"=>"success"} return {"success"=>TRUE, "message"=>"success"}
end end
def eachDataStreamContentChunk(id, chunk_size, n_chunk=0) def eachDataStreamContentChunk(id, chunk_size, n_chunk=0, split_operation=FALSE)
n_part = n_chunk n_part = n_chunk
done = FALSE done = FALSE
first = TRUE first = TRUE
...@@ -158,7 +158,11 @@ class WendelinClient ...@@ -158,7 +158,11 @@ class WendelinClient
if first if first
yield chunk yield chunk
end end
@logger.info("Done", print=TRUE) if split_operation
@logger.info("File chunk downloaded", print=TRUE)
else
@logger.info("Done", print=TRUE)
end
done = TRUE done = TRUE
else else
first = FALSE first = FALSE
...@@ -235,6 +239,8 @@ class WendelinClient ...@@ -235,6 +239,8 @@ class WendelinClient
return {"success"=>TRUE, "message"=>res.body} return {"success"=>TRUE, "message"=>res.body}
else else
@logger.error("HTTP FAIL - code: #{res.code}", print=TRUE) @logger.error("HTTP FAIL - code: #{res.code}", print=TRUE)
@logger.error("During request to " + uri.hostname.to_s, print=TRUE)
@logger.error(uri.to_s)
if res.code == '500' or res.code == '502' or res.code == '503' if res.code == '500' or res.code == '502' or res.code == '503'
@logger.error(HTTP_MESSAGE_5XX, print=TRUE) @logger.error(HTTP_MESSAGE_5XX, print=TRUE)
elsif res.code == '401' elsif res.code == '401'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment