new comands for staging and reset

parent 963e3e0d
...@@ -22,6 +22,9 @@ GREEN='\033[0;32m' ...@@ -22,6 +22,9 @@ GREEN='\033[0;32m'
ORANGE='\033[0;33m' ORANGE='\033[0;33m'
NC='\033[0m' NC='\033[0m'
DEFAULT_CHUNK_SIZE="50" DEFAULT_CHUNK_SIZE="50"
STAGE_ADD="add"
STAGE_REMOVE="remove"
STAGE_RESET="reset"
function helpReadme { function helpReadme {
echo -e "[INFO] For help, please run '${GREEN}ebulk --help${NC}'" echo -e "[INFO] For help, please run '${GREEN}ebulk --help${NC}'"
...@@ -42,9 +45,9 @@ function checkParameters { ...@@ -42,9 +45,9 @@ function checkParameters {
helpReadme >&2; return 1 helpReadme >&2; return 1
fi fi
if [ "$STORAGE" = "" ] ; then if [ "$STORAGE" = "" ] ; then
if [ ! -d $DATASET_DIR ]; then if [ ! -d "$DATASET_DIR" ]; then
echo echo
mkdir $DATASET_DIR 2>/dev/null mkdir "$DATASET_DIR" 2>/dev/null
if [ ! $? -eq 0 ]; then if [ ! $? -eq 0 ]; then
echo echo
echo -e "${ORANGE}[ERROR] Dataset path not found." echo -e "${ORANGE}[ERROR] Dataset path not found."
...@@ -55,14 +58,19 @@ function checkParameters { ...@@ -55,14 +58,19 @@ function checkParameters {
fi fi
EBULK_DATASET_FILE="$DATASET_DIR/.ebulk_dataset" EBULK_DATASET_FILE="$DATASET_DIR/.ebulk_dataset"
if [[ $DATASET_DIR != $REFERENCE ]]; then if [[ $DATASET_DIR != $REFERENCE ]]; then
if [ "$REFERENCE" = "." ] ; then
REFERENCE=$(basename "$DATASET_DIR")
fi
DATA_SET=$REFERENCE DATA_SET=$REFERENCE
echo $REFERENCE > $EBULK_DATASET_FILE 2>/dev/null echo $REFERENCE > "$EBULK_DATASET_FILE" 2>/dev/null
else else
if [ -f $EBULK_DATASET_FILE ]; then if [ -f "$EBULK_DATASET_FILE" ]; then
DATA_SET=$(cat "$DATASET_DIR/.ebulk_dataset") DATA_SET=$(cat "$DATASET_DIR/.ebulk_dataset" 2>/dev/null)
else else
DATA_SET=$(basename "$DATASET_DIR") DATA_SET=$(basename "$DATASET_DIR")
echo $DATA_SET > $EBULK_DATASET_FILE 2>/dev/null if [ "$DATA_SET" != "." ] ; then
SAVE_DATASET_NAME="TRUE"
fi
fi fi
fi fi
else else
...@@ -70,14 +78,19 @@ function checkParameters { ...@@ -70,14 +78,19 @@ function checkParameters {
fi fi
re='^[A-Za-z][_A-Za-z.0-9-]*$' re='^[A-Za-z][_A-Za-z.0-9-]*$'
if ! [[ $DATA_SET =~ $re ]] ; then if ! [[ $DATA_SET =~ $re ]] ; then
echo if [ "$DATA_SET" = "." ] && [[ -z "$STORAGE" ]] ; then
echo -e "${ORANGE}[ERROR] Error in argument: invalid dataset name ${GREEN}'$DATA_SET'${ORANGE}.${NC}" echo
echo -e "${ORANGE}[ERROR] Dataset name must start with a letter, and only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed.${NC}" echo -e "${ORANGE}[ERROR] You are not in a dataset directory ${GREEN}'$DATA_SET'${ORANGE}.${NC}"
echo echo
if [ -f $EBULK_DATASET_FILE ]; then else
rm -f ${EBULK_DATASET_FILE} echo
echo -e "${ORANGE}[ERROR] Error in argument: invalid dataset name ${GREEN}'$DATA_SET'${ORANGE}.${NC}"
echo -e "${ORANGE}[ERROR] Dataset name must start with a letter, and only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed.${NC}"
echo
fi fi
helpReadme >&2; return 1 helpReadme >&2; return 1
elif [ ! -z "$SAVE_DATASET_NAME" ]; then
echo $DATA_SET > "$EBULK_DATASET_FILE" 2>/dev/null
fi fi
if [ ! -z "$CHUNK" ]; then if [ ! -z "$CHUNK" ]; then
re='^[0-9]+$' re='^[0-9]+$'
...@@ -143,6 +156,7 @@ function updateConfigFile { ...@@ -143,6 +156,7 @@ function updateConfigFile {
DOWN_URL=\"$DOWN_URL\" DOWN_URL=\"$DOWN_URL\"
ING_URL=\"$ING_URL\" ING_URL=\"$ING_URL\"
STORAGE=\"$STORAGE\" STORAGE=\"$STORAGE\"
STATUS=\"$STATUS\"
S3_BUCKET=\"$S3_BUCKET\" S3_BUCKET=\"$S3_BUCKET\"
S3_PREFIX=\"$S3_PREFIX\" S3_PREFIX=\"$S3_PREFIX\"
...@@ -174,32 +188,37 @@ function runProcess { ...@@ -174,32 +188,37 @@ function runProcess {
return 1 return 1
fi fi
echo -e "[INFO] Dataset: ${GREEN}$DATA_SET${NC}" echo -e "[INFO] Dataset: ${GREEN}$DATA_SET${NC}"
if [ ! -z "$CHUNK" ]; then if [ -z "$STATUS" ]; then
if [ "$CHUNK" -eq "0" ]; then if [ ! -z "$CHUNK" ]; then
echo "[INFO] Default chunk size: $DEFAULT_CHUNK_SIZE Mb." if [ "$CHUNK" -eq "0" ]; then
else echo "[INFO] Default chunk size: $DEFAULT_CHUNK_SIZE Mb."
echo "[INFO] Chunk size set in $CHUNK Mb." else
fi echo "[INFO] Chunk size set in $CHUNK Mb."
fi
fi
fi fi
if ! askCredentials; then if [ -z "$STATUS" ]; then
return 1 if ! askCredentials; then
return 1
fi
fi fi
echo echo
echo "[INFO] Supplier: $USER"
updateConfigFile updateConfigFile
echo "[INFO] Starting operation..." echo "[INFO] Starting operation..."
if [ ! -d $LOG_DIR ]; then if [ ! -d $LOG_DIR ]; then
mkdir $LOG_DIR 2>/dev/null mkdir $LOG_DIR 2>/dev/null
fi fi
$embulk run -L $TOOL_PATH/embulk-wendelin-dataset-tool $FILE $DIFF 2> "$LOG_DIR/error.log" || { $embulk run -L $TOOL_PATH/embulk-wendelin-dataset-tool $FILE $DIFF 2> "$LOG_DIR/error.log" || {
echo if [ -z "$STATUS" ]; then
echo -e "${ORANGE}[ERROR] Embulk tool stopped its execution.${NC}" echo
if [ "$STORAGE" != \"\" ] ; then echo -e "${ORANGE}[ERROR] Embulk tool stopped its execution.${NC}"
echo "[INFO] There was an error running Embulk tool, probably while connecting to storage, loading embulk gems or config files." if [ "$STORAGE" != \"\" ] ; then
echo "[INFO] Please make sure your inputs are correct and configuration files are not corrupted." echo "[INFO] There was an error running Embulk tool, probably while connecting to storage, loading embulk gems or config files."
fi echo "[INFO] Please make sure your inputs are correct and configuration files are not corrupted."
echo "[INFO] Please check the logs in '$LOG_DIR' directory for more details." fi
echo echo "[INFO] Please check the logs in '$LOG_DIR' directory for more details."
echo
fi
} }
} }
...@@ -383,17 +402,37 @@ function askS3parameters { ...@@ -383,17 +402,37 @@ function askS3parameters {
fi fi
} }
# WELCOME function stage {
echo EBULK_DATASET_FILE="./.ebulk_dataset"
echo " #########################################################################" if [ ! -f "$EBULK_DATASET_FILE" ]; then
echo " ############## WELCOME TO EBULK INGESTION-DOWNLOAD TOOL #################" echo
echo " ########### This tool relies on Embulk software and Java 8 ##############" echo -e "${ORANGE}[ERROR] You are not in a dataset directory."
echo " ######## Do not forget to check the README before use this tool #########" echo -e "[INFO] $OP operation can only be run within a root dataset directory.${NC}"
echo " ############## In case of any problem, please contact us ###############" echo
echo " ####################### roqueporchetto@gmail.com ########################" helpReadme >&2; exit
echo " ###################### Happy ingestion-download ! #######################" fi
echo " #########################################################################" if [[ $PATH_TO_ELEMENT = "" ]]; then
echo echo
echo -e "${ORANGE}[ERROR] Nothing specified, nothing to $OP."
echo -e "[INFO] Please specify a valid path.${NC}"
echo
helpReadme >&2; exit
fi
STAGE_FILE="./.staged"
}
function welcome {
echo
echo " #########################################################################"
echo " ############## WELCOME TO EBULK INGESTION-DOWNLOAD TOOL #################"
echo " ########### This tool relies on Embulk software and Java 8 ##############"
echo " ######## Do not forget to check the README before use this tool #########"
echo " ############## In case of any problem, please contact us ###############"
echo " ####################### roqueporchetto@gmail.com ########################"
echo " ###################### Happy ingestion-download ! #######################"
echo " #########################################################################"
echo
}
if [ ! -d $EBULK_DATA_PATH ]; then if [ ! -d $EBULK_DATA_PATH ]; then
mkdir $EBULK_DATA_PATH 2>/dev/null mkdir $EBULK_DATA_PATH 2>/dev/null
...@@ -411,31 +450,38 @@ ADVANCED=false ...@@ -411,31 +450,38 @@ ADVANCED=false
while [ "$1" != "" ]; do while [ "$1" != "" ]; do
case $1 in case $1 in
-d | --directory ) shift -d | --directory ) shift
DATASET_DIR=$1 DATASET_DIR=$1
;; ;;
-s | --storage ) shift -s | --storage ) shift
STORAGE=$1 STORAGE=$1
;; ;;
-cs | --custom-storage ) STORAGE="custom-storage" -cs | --custom-storage ) STORAGE="custom-storage"
CUSTOM=true CUSTOM=true
;; ;;
-a | --advanced ) ADVANCED=true -a | --advanced ) ADVANCED=true
;; ;;
-c | --chunk ) shift -c | --chunk ) shift
CHUNK=$1 CHUNK=$1
;; ;;
-h | --help ) cat $TOOL_PATH/help.md -h | --help ) cat $TOOL_PATH/help.md
exit exit
;; ;;
-r | --readme ) less $TOOL_PATH/README.md -e | --examples ) cat $TOOL_PATH/example.md
exit exit
;; ;;
pull ) OPERATION=$1 -r | --readme ) less $TOOL_PATH/README.md
;; exit
push ) OPERATION=$1 ;;
;; status | push | pull ) OPERATION=$1
*) if [[ $REFERENCE != $1 ]]; then ;;
add | remove | reset ) OPERATION=$1
shift
PATH_TO_ELEMENT=$1
REFERENCE="."
;;
*) if [[ $REFERENCE != $1 ]]; then
echo
echo -e "${ORANGE}[ERROR] Invalid parameter '$1'.${NC}" echo -e "${ORANGE}[ERROR] Invalid parameter '$1'.${NC}"
echo echo
helpReadme >&2; exit helpReadme >&2; exit
...@@ -444,14 +490,15 @@ while [ "$1" != "" ]; do ...@@ -444,14 +490,15 @@ while [ "$1" != "" ]; do
shift shift
done done
for ELEMENT in '' '-d' '--directory' '-s' '--storage' '-cs' '--custom-storage' '-a' '--advanced' '-c' '--chunk'; do
if [ "$ELEMENT" = "$REFERENCE" ]; then
REFERENCE="."
fi
done
if [[ $OPERATION = "" ]]; then if [[ $OPERATION = "" ]]; then
echo -e "${ORANGE}[ERROR] Please specify a valid operation.${NC}"
echo echo
helpReadme >&2; exit echo -e "${ORANGE}[ERROR] Please specify a valid operation.${NC}"
fi
if [[ $REFERENCE = "" ]]; then
echo -e "${ORANGE}[ERROR] Dataset not specified."
echo -e "[INFO] Please specify a valid dataset.${NC}"
echo echo
helpReadme >&2; exit helpReadme >&2; exit
fi fi
...@@ -463,7 +510,41 @@ if [[ $CHUNK = "" ]]; then ...@@ -463,7 +510,41 @@ if [[ $CHUNK = "" ]]; then
fi fi
case $OPERATION in case $OPERATION in
add)
OP=$STAGE_ADD
stage
ELEMENT="./$PATH_TO_ELEMENT"
if [ -d "$ELEMENT" ] || [ -f "$ELEMENT" ]; then
echo "$OP;$PATH_TO_ELEMENT" >> $STAGE_FILE
else
echo
echo -e "${ORANGE}[ERROR] '$PATH_TO_ELEMENT' did not match any files or directories."
echo -e "[INFO] Please specify a valid path.${NC}"
echo
helpReadme >&2; exit
fi
;;
remove)
OP=$STAGE_REMOVE
stage
echo "$OP;$PATH_TO_ELEMENT" >> $STAGE_FILE
;;
reset)
OP=$STAGE_RESET
stage
echo "$OP;$PATH_TO_ELEMENT" >> $STAGE_FILE
;;
status)
welcome
STATUS=$OPERATION
FILE=$ING_FILE
TEMPLATE_FILE=$ING_TEMPLATE_FILE
echo "### DATASET STATUS ###"
echo
runProcess
;;
pull) pull)
welcome
FILE=$DOWN_FILE FILE=$DOWN_FILE
TEMPLATE_FILE=$DOWN_TEMPLATE_FILE TEMPLATE_FILE=$DOWN_TEMPLATE_FILE
if [ "$STORAGE" != "" ] ; then if [ "$STORAGE" != "" ] ; then
...@@ -480,6 +561,7 @@ case $OPERATION in ...@@ -480,6 +561,7 @@ case $OPERATION in
runProcess runProcess
;; ;;
push) push)
welcome
MESSAGE="storage: $STORAGE" MESSAGE="storage: $STORAGE"
if [ "$CUSTOM" = true ] ; then if [ "$CUSTOM" = true ] ; then
FILE=$CUSTOM_ING_FILE FILE=$CUSTOM_ING_FILE
......
require_relative 'filelogger' require_relative 'filelogger'
require 'digest/md5' require 'digest/md5'
require 'fileutils'
# class that handles dataset tasks report # class that handles dataset tasks report
class DatasetUtils class DatasetUtils
DATASET_REPORT_FILE = ".dataset-task-report" DATASET_REPORT_FILE = ".dataset-task-report"
DATASET_TEMP_REPORT_FILE = ".temp-dataset-task-report"
DATASET_COMPLETED_FILE = ".dataset-completed" DATASET_COMPLETED_FILE = ".dataset-completed"
RESUME_OPERATION_FILE = ".resume-operation" RESUME_OPERATION_FILE = ".resume-operation"
INITIAL_INGESTION_FILE = ".initial-ingestion" INITIAL_INGESTION_FILE = ".initial-ingestion"
STAGED_FILE = ".staged"
RUN_DONE = "done" RUN_DONE = "done"
RUN_ERROR = "error" RUN_ERROR = "error"
RUN_ABORTED = "aborted" RUN_ABORTED = "aborted"
DELETE = "DELETE" DELETE = "DELETE"
INGESTION = "ingestion" INGESTION = "ingestion"
ADD = "add"
REMOVE = "remove"
STATUS_NEW = "new"
STATUS_MODIFIED = "modified"
STATUS_DELETED = "deleted"
STAGE_ADD="add"
STAGE_REMOVE="remove"
STAGE_RESET="reset"
OUTPUT_NEW = "new: "
OUTPUT_ADD = "add: "
OVERWRITE = "overwrite: "
OUTPUT_MODIFIED = "modified: "
OUTPUT_DELETED = "deleted: "
MEGA = 1000000 MEGA = 1000000
EOF = "EOF"
CHUNK_SIZE = 50000000 #50mb
NONE_EXT = "none"
REFERENCE_SEPARATOR = "/"
RECORD_SEPARATOR = ";"
def initialize(data_set_directory) def initialize(data_set_directory)
@data_set_directory = data_set_directory @data_set_directory = data_set_directory
@logger = LogManager.instance() @logger = LogManager.instance()
@task_report_file = @data_set_directory + DATASET_REPORT_FILE @task_report_file = @data_set_directory + DATASET_REPORT_FILE
@temp_report_file = @data_set_directory + DATASET_TEMP_REPORT_FILE
@completed_file = @data_set_directory + DATASET_COMPLETED_FILE @completed_file = @data_set_directory + DATASET_COMPLETED_FILE
@resume_operation_file = @data_set_directory + RESUME_OPERATION_FILE @resume_operation_file = @data_set_directory + RESUME_OPERATION_FILE
@initial_ingestion_file = @data_set_directory + INITIAL_INGESTION_FILE @initial_ingestion_file = @data_set_directory + INITIAL_INGESTION_FILE
@staged_file = @data_set_directory + STAGED_FILE
end
def getLocalPaths(paths)
return paths.map {|path|
next [] unless Dir.exist?(path)
Dir[(path + '/**/*').gsub! '//', '/']
}.flatten.select{ |file| File.file?(file) }
end end
def getLocalFiles(remove=nil) def getLocalFiles(remove=nil)
local_files = {} local_files = {}
begin begin
File.readlines(@task_report_file).each do |line| File.readlines(@task_report_file).each do |line|
record = line.split(";") record = line.split(RECORD_SEPARATOR)
if record[1].chomp == RUN_DONE if record[1].chomp == RUN_DONE
if (remove.nil?) || (remove != record[0]) if (remove.nil?) || (remove != record[0])
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, "status" => record[1].chomp, "modification_date" => record[4].chomp } local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, "status" => record[1].chomp, "modification_date" => record[4].chomp }
...@@ -46,14 +77,15 @@ class DatasetUtils ...@@ -46,14 +77,15 @@ class DatasetUtils
def saveReport(local_files) def saveReport(local_files)
begin begin
File.delete(@task_report_file) if File.exist?(@task_report_file) File.delete(@temp_report_file) if File.exist?(@temp_report_file)
if local_files.empty? if local_files.empty?
File.open(@task_report_file, 'w') {} File.open(@temp_report_file, 'w') {}
else else
local_files.each do |key, array| local_files.each do |key, array|
File.open(@task_report_file, 'ab') { |file| file.puts(key+";"+array["status"]+";"+array["size"].to_s+";"+array["hash"]+";"+array["modification_date"]) } File.open(@temp_report_file, 'ab') { |file| file.puts(key+RECORD_SEPARATOR+array["status"]+RECORD_SEPARATOR+array["size"].to_s+RECORD_SEPARATOR+array["hash"]+RECORD_SEPARATOR+array["modification_date"]) }
end end
end end
FileUtils.cp_r(@temp_report_file, @task_report_file, :remove_destination => true)
rescue Exception => e rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'saveReport':" + e.to_s) @logger.error("An error occurred in DatasetUtils method 'saveReport':" + e.to_s)
@logger.error(e.backtrace) @logger.error(e.backtrace)
...@@ -70,7 +102,7 @@ class DatasetUtils ...@@ -70,7 +102,7 @@ class DatasetUtils
if File.exist?(@resume_operation_file) if File.exist?(@resume_operation_file)
File.delete(@resume_operation_file) File.delete(@resume_operation_file)
end end
File.open(@resume_operation_file, 'w') { |file| file.puts(operation+";"+reference) } File.open(@resume_operation_file, 'w') { |file| file.puts(operation+RECORD_SEPARATOR+reference) }
end end
def reportUpToDate(data_stream_dict) def reportUpToDate(data_stream_dict)
...@@ -95,7 +127,7 @@ class DatasetUtils ...@@ -95,7 +127,7 @@ class DatasetUtils
elsif changes.length == 1 elsif changes.length == 1
# check if the unique detected change corresponds to an interrumped ingestion # check if the unique detected change corresponds to an interrumped ingestion
if File.exist?(@resume_operation_file) if File.exist?(@resume_operation_file)
operation=File.open(@resume_operation_file).read.chomp.split(";") operation=File.open(@resume_operation_file).read.chomp.split(RECORD_SEPARATOR)
if operation[0] == INGESTION if operation[0] == INGESTION
if operation[1] == changes[0]["reference"] if operation[1] == changes[0]["reference"]
File.delete(@resume_operation_file) File.delete(@resume_operation_file)
...@@ -112,6 +144,68 @@ class DatasetUtils ...@@ -112,6 +144,68 @@ class DatasetUtils
end end
end end
def showChanges(changes, status)
changes.each do |change|
if status != ""
status_output = status
elsif change["status"] == STATUS_NEW
status_output = OUTPUT_NEW
elsif change["status"] == STATUS_MODIFIED
status_output = OUTPUT_MODIFIED
elsif change["status"] == STATUS_DELETED
status_output = OUTPUT_DELETED
else
status_output = "no-status"
end
path = status != OVERWRITE ? change["path"] : change
@logger.info(" #{status_output}#{path}", print=TRUE)
end
end
def showChangesList(changes, message, print_short, status="")
if not changes.empty?
if message and message != ""
@logger.info(message, print=TRUE)
end
if print_short and changes.length > 200
limit = changes.length > 300 ? 100 : changes.length/3
showChanges(changes[0, limit], status)
puts "....."
showChanges(changes[changes.length-limit, changes.length-1], status)
else
showChanges(changes, status)
end
end
end
def showTaskReport(task_reports)
return if task_reports.empty?
@logger.info("Reports:", print=TRUE)
if task_reports.length > 15
@logger.info(task_reports[0, 5], print=TRUE)
puts "....."
@logger.info(task_reports[task_reports.length-5, task_reports.length-1], print=TRUE)
@logger.info("Full task report:")
@logger.info(task_reports)
else
@logger.info(task_reports, print=TRUE)
end
end
def showTaskErrors(failed_tasks)
puts
@logger.error("The following files could not be processed. Please check the details in the log file: " + @logger.getLogPath(), print=TRUE)
if failed_tasks.length > 15
@logger.error(failed_tasks[0, 5], print=TRUE)
puts "....."
@logger.error(failed_tasks[failed_tasks.length-5, failed_tasks.length-1], print=TRUE)
else
@logger.error(failed_tasks, print=TRUE)
end
@logger.info("You can retry the operation for those files.", print=TRUE)
puts
end
def deleteCompletedFile() def deleteCompletedFile()
File.delete(@completed_file) if File.exist?(@completed_file) File.delete(@completed_file) if File.exist?(@completed_file)
end end
...@@ -144,19 +238,51 @@ class DatasetUtils ...@@ -144,19 +238,51 @@ class DatasetUtils
return File.exist?(@initial_ingestion_file) return File.exist?(@initial_ingestion_file)
end end
def stagedFileExist()
File.exist?(@staged_file) or File.zero?(@staged_file)
end
def appendSlashTo(element)
return element.end_with?("/") ? element : element + "/"
end
def referenceToPath(reference, data_set_directory, dataset)
data_set = appendSlashTo(dataset)
file_path = data_set_directory + reference.reverse.sub(REFERENCE_SEPARATOR.reverse, ".".reverse).reverse.sub(data_set, "")
file_path = file_path[0...-5] if file_path.end_with?(".#{NONE_EXT}")
return file_path
end
def getPathInfo(path, dataset)
extension = File.extname path
if path.start_with?(@data_set_directory)
filename = path.sub(@data_set_directory, "")
filename = filename.reverse.sub(extension.reverse, "").reverse
end
extension.gsub! '.', ''
extension = extension == "" ? NONE_EXT : extension
reference = path.sub(@data_set_directory, appendSlashTo(dataset))
if extension != NONE_EXT
old_pattern = filename + '.' + extension
new_pattern = filename + REFERENCE_SEPARATOR + extension
reference = reference.reverse.sub(old_pattern.reverse, new_pattern.reverse).reverse
else
reference += REFERENCE_SEPARATOR + extension
end
return filename, extension, reference
end
def addToReport(reference, status, size, hash, data_set) def addToReport(reference, status, size, hash, data_set)
local_files = {} local_files = {}
begin begin
data_set = data_set.end_with?("/") ? data_set : data_set + "/" file_path = referenceToPath(reference, @data_set_directory, data_set)
file_path = @data_set_directory + reference.reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "")
file_path = file_path[0...-5] if file_path.end_with?(".none")
modification_date = File.exist?(file_path) ? File.mtime(file_path).strftime("%Y-%m-%d-%H-%M-%S") : "not-modification-date" modification_date = File.exist?(file_path) ? File.mtime(file_path).strftime("%Y-%m-%d-%H-%M-%S") : "not-modification-date"
if not reportFileExist() if not reportFileExist()
File.open(@task_report_file, 'w') {} File.open(@task_report_file, 'w') {}
end end
new_file = TRUE new_file = TRUE
File.readlines(@task_report_file).each do |line| File.readlines(@task_report_file).each do |line|
record = line.split(";") record = line.split(RECORD_SEPARATOR)
if reference.to_s == record[0].to_s if reference.to_s == record[0].to_s
local_files[reference] = {"size" => size, "hash" => hash, "status" => status, "modification_date" => modification_date } local_files[reference] = {"size" => size, "hash" => hash, "status" => status, "modification_date" => modification_date }
new_file = FALSE new_file = FALSE
...@@ -196,48 +322,144 @@ class DatasetUtils ...@@ -196,48 +322,144 @@ class DatasetUtils
end end
end end
def getLocalChanges(files, data_set) def getLocalConflicts(remote_streams, data_set)
all_files, new_files, modified_files, deleted_files = [], [], [], [] paths = [appendSlashTo(@data_set_directory)]
local_files = getLocalPaths(paths)
ignore, local_changes = getLocalChanges(local_files, data_set, staged=FALSE)
remote_changes = remote_streams.map { |remote|
remote = referenceToPath(remote["reference"], @data_set_directory, data_set)
}
return local_changes.select{ |conflict| remote_changes.include? conflict["path"] }.map{ |conflict| conflict["path"] }
end
def getRemoteConflicts(data_streams, tasks, dataset)
conflicts = []
files = tasks.map{ |task| task["path"] }
data_streams.each do |data_stream|
file_path = referenceToPath(data_stream["reference"], @data_set_directory, dataset)
if files.include? file_path
conflicts.push(file_path.sub(@data_set_directory, "./"))
end
end
return conflicts
end
def deleteStagedFile()
File.delete(@staged_file) if File.exist?(@staged_file)
end
def getStagedRecords()
# build a dictionary due to staged file could contain more than one operation per file
staged_dict = {}
if stagedFileExist()
File.readlines(@staged_file).each_with_index do |line, index|
record = line.split(RECORD_SEPARATOR)
path = @data_set_directory + record[1].chomp
operation = record[0].chomp
staged_dict[appendSlashTo(path)] = {"index" => index, "operation" => operation}
end
end
return staged_dict
end
def updateStagedDeletions(untrucked_deletions)
untrucked_deletions.each do |path|
path = path.sub(@data_set_directory, "")
File.open(@staged_file, 'ab') { |file| file.puts(STAGE_RESET+RECORD_SEPARATOR+path) }
end
end
def isStaged(path, staged_dict, status)
return FALSE if staged_dict.nil? || staged_dict.empty?
staged_status = {"index" => -1, "status" => ""}
original_file_path = path
while appendSlashTo(path) != @data_set_directory
key = appendSlashTo(path) if staged_dict.key?(appendSlashTo(path))
if ! key.nil?
if staged_dict[key]["index"] > staged_status["index"]
# ignore files marked as added that not exists in filesystem
if not (staged_dict[key]["operation"] == STAGE_ADD and not File.exist?(original_file_path))
staged_status = {"index" => staged_dict[key]["index"], "status" => staged_dict[key]["operation"]}
end
end
end
path = appendSlashTo(path).split("/")[0...-1].join("/")
end
return FALSE if staged_status["index"] == -1
return staged_status["status"] == status
end
def getLocalChanges(files, data_set, staged, partial_ingestion=FALSE)
staged_changes, untracked_changes = [], []
staged_dict = getStagedRecords() if staged
begin begin
if reportFileExist() if reportFileExist()
File.readlines(@task_report_file).each do |line| File.readlines(@task_report_file).each do |line|
record = line.split(";") record = line.split(RECORD_SEPARATOR)
if record[1].chomp == RUN_DONE if record[1].chomp == RUN_DONE
data_set = data_set.end_with?("/") ? data_set : data_set + "/" file_path = referenceToPath(record[0], @data_set_directory, data_set)
file_path = @data_set_directory + record[0].reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "")
file_path = file_path[0...-5] if file_path.end_with?(".none")
if files.include? file_path if files.include? file_path
modification_date = File.mtime(file_path).strftime("%Y-%m-%d-%H-%M-%S") modification_date = File.mtime(file_path).strftime("%Y-%m-%d-%H-%M-%S")
if modification_date != record[4].chomp if staged && isStaged(file_path, staged_dict, STAGE_REMOVE)
staged_changes.push({"path" => file_path, "size" => "", "hash" => DELETE, "status" => STATUS_DELETED })
elsif modification_date != record[4].chomp
size = File.size(file_path).to_s size = File.size(file_path).to_s
hash = getHash(file_path).to_s hash = getHash(file_path).to_s
change = {"path" => file_path, "size" => size, "hash" => hash, "status" => STATUS_MODIFIED }
if size == record[2].to_s if size == record[2].to_s
if hash != record[3].chomp if hash != record[3].chomp
all_files.push({"path" => file_path, "size" => size, "hash" => hash }) staged && isStaged(file_path, staged_dict, STAGE_ADD) ? staged_changes.push(change) : untracked_changes.push(change)
modified_files.push(file_path)
end end
else else
all_files.push({"path" => file_path, "size" => size, "hash" => hash }) staged && isStaged(file_path, staged_dict, STAGE_ADD) ? staged_changes.push(change) : untracked_changes.push(change)
modified_files.push(file_path)
end end
end end
files.delete(file_path) files.delete(file_path)
else else
all_files.push({"path" => file_path, "size" => "", "hash" => DELETE }) if not partial_ingestion
deleted_files.push(file_path) change = {"path" => file_path, "size" => "", "hash" => DELETE, "status" => STATUS_DELETED }
staged && isStaged(file_path, staged_dict, STAGE_REMOVE) ? staged_changes.push(change) : untracked_changes.push(change)
end
end end
end end
end end
end end
untrucked_deletions = []
files.each do |path| files.each do |path|
all_files.push({"path" => path, "size" => "", "hash" => "" }) change = {"path" => path, "size" => "", "hash" => "", "status" => STATUS_NEW }
new_files.push(path) if staged
# if an untracked new file was staged as deletion, delete it
if isStaged(path, staged_dict, STAGE_REMOVE)
if File.exist?(path)
# check scenario where new files were created inside a directory staged as deleted
if File.mtime(path) > File.mtime(@staged_file)
untracked_changes.push(change)
else
File.delete(path)
@logger.info("Local file added for remove deleted: " + path, print=TRUE)
end
end
untrucked_deletions.push(path)
else
isStaged(path, staged_dict, STAGE_ADD) ? staged_changes.push(change) : untracked_changes.push(change)
end
else
untracked_changes.push(change)
end
end end
updateStagedDeletions(untrucked_deletions)
rescue Exception => e rescue Exception => e
@logger.error("An error occurred in DatasetUtils method 'getLocalChanges':" + e.to_s) @logger.error("An error occurred in DatasetUtils method 'getLocalChanges':" + e.to_s)
@logger.error(e.backtrace) @logger.error(e.backtrace)
end end
return all_files, new_files, modified_files, deleted_files return staged_changes, untracked_changes
end
def dirEmpty(dir)
Dir[(dir + '/**/*').gsub! '//', '/'].each do |path|
return FALSE if File.file?(path)
end
return TRUE
end end
def getRemoteChangedDataStreams(data_streams) def getRemoteChangedDataStreams(data_streams)
...@@ -247,7 +469,7 @@ class DatasetUtils ...@@ -247,7 +469,7 @@ class DatasetUtils
local_files = {} local_files = {}
remote_files = [] remote_files = []
File.readlines(@task_report_file).each do |line| File.readlines(@task_report_file).each do |line|
record = line.split(";") record = line.split(RECORD_SEPARATOR)
if record[1].chomp == RUN_DONE if record[1].chomp == RUN_DONE
local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, } local_files[record[0]] = {"size" => record[2].chomp, "hash" => record[3].chomp, }
end end
......
...@@ -10,13 +10,6 @@ module Embulk ...@@ -10,13 +10,6 @@ module Embulk
Plugin.register_input("fif", self) Plugin.register_input("fif", self)
NEW = "New"
MODIFIED = "Modified"
DELETED = "Deleted"
EOF = "EOF"
CHUNK_SIZE = 50000000 #50mb
MEGA = 1000000
SCHEMA = [ SCHEMA = [
{"name"=>"supplier", "type"=>"string"}, {"name"=>"supplier", "type"=>"string"},
{"name"=>"data_set", "type"=>"string"}, {"name"=>"data_set", "type"=>"string"},
...@@ -28,19 +21,56 @@ module Embulk ...@@ -28,19 +21,56 @@ module Embulk
{"name"=>"hash", "type"=>"string"} {"name"=>"hash", "type"=>"string"}
] ]
def self.showChangesList(changes, type, print_short) def self.status(task, push=FALSE)
if not changes.empty? partial_ingestion = @dataset_utils.initialIngestionFileExist()
staged_changes, untracked_changes = @dataset_utils.getLocalChanges(task['paths'], task['data_set'], staged=TRUE, partial_ingestion=partial_ingestion)
staged = (not staged_changes.empty?)
task['paths'] = staged ? staged_changes : untracked_changes
if task['paths'].empty?
puts puts
@logger.info("#{type} file(s):", print=TRUE) @logger.info("No changes in dataset directory for dataset '#{@data_set}'.", print=TRUE)
if print_short and changes.length > 50 puts
limit = changes.length > 130 ? 130/3 : changes.length/3 @logger.abortExecution(error=FALSE)
@logger.info(changes[0, limit], print=TRUE) end
@logger.info("....", print=TRUE) status = partial_ingestion ? DatasetUtils::OUTPUT_ADD : ""
@logger.info(changes[changes.length-limit, changes.length-1], print=TRUE) changes = partial_ingestion ? "file" : "change"
else n_changes = push ? task['paths'].length : staged_changes.length + untracked_changes.length
@logger.info(changes, print=TRUE) @logger.info("#{n_changes} #{changes}(s) detected: ", print=TRUE)
end puts
print_short = staged ? staged_changes.length + untracked_changes.length > 500 : task['paths'].length > 500
message = staged ? "#{changes}(s) added for ingestion:" : "Files:"
@dataset_utils.showChangesList(task['paths'], message, print_short, status)
puts
if staged and not push and not untracked_changes.empty?
@dataset_utils.showChangesList(untracked_changes, "Untracked changes:", print_short, status)
puts
end
if not untracked_changes.empty? and not staged
@logger.info("Remember that you can add specific files/changes for ingestion running 'ebulk add/remove <PATH>'", print=TRUE)
puts
end
if push and partial_ingestion and not task['data_streams'].empty?
self.warnOverwrite(task)
end
end
def self.warnOverwrite(task)
@logger.info("Checking posible conflicts...", print=TRUE)
conflicts = @dataset_utils.getRemoteConflicts(task['data_streams'], task['paths'], task['data_set'])
if not conflicts.empty?
puts
@logger.warn("Some of your local files already exist in remote dataset.", print=TRUE)
@logger.warn("You may want to cancel this partial ingestion and download the full dataset to make local changes.", print=TRUE)
puts
@logger.warn("Current ingestion WILL OVERWRITE the following files in remote dataset:", print=TRUE)
puts "** press key **"
option = gets
print_short = conflicts.length > 500
@dataset_utils.showChangesList(conflicts, "", print_short, status=DatasetUtils::OVERWRITE)
else
@logger.info("All local files are new.", print=TRUE)
end end
puts
end end
def self.transaction(config, &control) def self.transaction(config, &control)
...@@ -51,25 +81,48 @@ module Embulk ...@@ -51,25 +81,48 @@ module Embulk
task = { 'paths' => [] } task = { 'paths' => [] }
task['supplier'] = config.param('supplier', :string) task['supplier'] = config.param('supplier', :string)
task['data_set'] = config.param('data_set', :string) task['data_set'] = config.param('data_set', :string)
task['chunk_size'] = config.param('chunk_size', :float, default: 0) * MEGA task['chunk_size'] = config.param('chunk_size', :float, default: 0) * DatasetUtils::MEGA
if task['chunk_size'] == 0 if task['chunk_size'] == 0
task['chunk_size'] = CHUNK_SIZE task['chunk_size'] = DatasetUtils::CHUNK_SIZE
end end
@data_set = task['data_set'] @data_set = task['data_set']
@dataset_utils = DatasetUtils.new("")
paths = config.param('path_prefix', :array) paths = config.param('path_prefix', :array)
paths[0] = paths[0].end_with?("/") ? paths[0] : paths[0] + "/" paths[0] = @dataset_utils.appendSlashTo(paths[0])
@data_set_directory = paths[0] task['data_set_directory'] = paths[0]
task['inputs'] = paths @data_set_directory = task['data_set_directory']
@logger.info("Getting local files for ingestion...", print=TRUE) @dataset_utils = DatasetUtils.new(@data_set_directory)
task['paths'] = paths.map {|path| @status = config.param('status', :string, default: FALSE)
next [] unless Dir.exist?(path) @status = @status == "" ? FALSE : @status
Dir[(path + '/**/*').gsub! '//', '/'] if @status
}.flatten.select{ |file| File.file?(file) } if not @dataset_utils.initialIngestionFileExist()
if not @dataset_utils.reportFileExist()
puts
@logger.error("The dataset directory does not contain a valid dataset.", print=TRUE)
@logger.abortExecution()
elsif not @dataset_utils.completedFileExist()
puts
@logger.error("There is an interrumped download operation in dataset directory. Please resume the download first.", print=TRUE)
@logger.abortExecution()
end
end
end
@logger.info("Checking local files...", print=TRUE)
task['paths'] = @dataset_utils.getLocalPaths(paths)
if @status
self.status(task)
@logger.abortExecution(error=FALSE)
end
@wendelin = WendelinClient.new(config.param('erp5_url', :string), config.param('user', :string), config.param('password', :string)) @wendelin = WendelinClient.new(config.param('erp5_url', :string), config.param('user', :string), config.param('password', :string))
@logger.info("Checking remote dataset...", print=TRUE) @logger.info("Checking remote dataset...", print=TRUE)
data_stream_dict = @wendelin.getDataStreams(task['data_set']) data_stream_dict = @wendelin.getDataStreams(task['data_set'])
@dataset_utils = DatasetUtils.new(@data_set_directory) if data_stream_dict["status_code"] != 0
@logger.error(data_stream_dict["error_message"], print=TRUE)
@logger.abortExecution()
end
task['data_streams'] = data_stream_dict["result"]
if not @dataset_utils.reportFileExist() if not @dataset_utils.reportFileExist()
@dataset_utils.createInitialIngestionFile() @dataset_utils.createInitialIngestionFile()
else else
...@@ -83,39 +136,18 @@ module Embulk ...@@ -83,39 +136,18 @@ module Embulk
end end
end end
end end
if data_stream_dict["status_code"] != 0
@logger.error(data_stream_dict["error_message"], print=TRUE)
@logger.abortExecution()
end
@logger.info("Supplier: #{task['supplier']}") @logger.info("Supplier: #{task['supplier']}")
@logger.info("Dataset name: #{task['data_set']}") @logger.info("Dataset name: #{task['data_set']}")
@logger.info("Chunk size set in #{task['chunk_size']/MEGA}MB") @logger.info("Chunk size set in #{task['chunk_size']/DatasetUtils::MEGA}MB")
if task['data_set'] == '$DATA_SET'
@logger.error("There was an error setting the configuration file", print=TRUE)
@logger.info("Please try manual ingestion or update manually the ingestion configuration file.", print=TRUE)
@logger.abortExecution()
end
if task['paths'].empty? and not @dataset_utils.reportFileExist() if task['paths'].empty? and not @dataset_utils.reportFileExist()
@logger.error("The dataset directory '#{task['inputs'][0]}' is empty.", print=TRUE) @logger.error("The dataset directory '#{@data_set_directory}' is empty.", print=TRUE)
@logger.error("Could not find any valid file.", print=TRUE) @logger.error("Could not find any valid file.", print=TRUE)
@logger.error("Please make sure your dataset directory contains files for ingestion.", print=TRUE) @logger.error("Please make sure your dataset directory contains files for ingestion.", print=TRUE)
@logger.abortExecution() @logger.abortExecution()
end end
task['paths'], new_files, modified_files, deleted_files = @dataset_utils.getLocalChanges(task['paths'], task['data_set']) self.status(task, push=TRUE)
if task['paths'].empty?
puts
@logger.info("No changes in '#{@data_set_directory}'. Everything up-to-date.", print=TRUE)
@logger.abortExecution(error=FALSE)
end
changes = @dataset_utils.reportFileExist() ? "change" : "new file"
@logger.info("#{task['paths'].length} #{changes}(s) detected for ingestion: ", print=TRUE)
print_short = task['paths'].length > 500
self.showChangesList(new_files, NEW, print_short)
self.showChangesList(modified_files, MODIFIED, print_short)
self.showChangesList(deleted_files, DELETED, print_short)
puts
@logger.info("Continue with ingestion? (y/n)", print=TRUE) @logger.info("Continue with ingestion? (y/n)", print=TRUE)
option = gets option = gets
option = option.chomp option = option.chomp
...@@ -153,38 +185,20 @@ module Embulk ...@@ -153,38 +185,20 @@ module Embulk
def self.resume(task, columns, count, &control) def self.resume(task, columns, count, &control)
@logger = LogManager.instance() @logger = LogManager.instance()
task_reports = yield(task, columns, count) task_reports = yield(task, columns, count)
@logger.info("Reports:", print=TRUE) @dataset_utils.showTaskReport(task_reports)
if task_reports.length > 15
@logger.info(task_reports[0, 5], print=TRUE)
@logger.info(".....", print=TRUE)
@logger.info(task_reports[task_reports.length-5, task_reports.length-1], print=TRUE)
@logger.info("Full task report:")
@logger.info(task_reports)
else
@logger.info(task_reports, print=TRUE)
end
next_config_diff = task_reports.map{|hash| hash["done"]}.flatten.compact next_config_diff = task_reports.map{|hash| hash["done"]}.flatten.compact
changes = @dataset_utils.initialIngestionFileExist() ? "new file" : "change" element_output = @dataset_utils.initialIngestionFileExist() ? "new file" : "change"
@logger.info("#{next_config_diff.length} #{changes}(s) ingested.", print=TRUE) @logger.info("#{next_config_diff.length} #{element_output}(s) ingested.", print=TRUE)
if(next_config_diff.length == count) if(next_config_diff.length == count)
@logger.info("Dataset successfully ingested.", print=TRUE) @logger.info("Dataset successfully ingested.", print=TRUE)
@wendelin.increaseDatasetVersion(@data_set) @wendelin.increaseDatasetVersion(@data_set)
@dataset_utils.deleteStagedFile()
else else
next_config_diff = task_reports.map{|hash| hash["error"]}.flatten.compact failed_tasks = task_reports.map{|hash| hash["error"]}.flatten.compact
puts @dataset_utils.showTaskErrors(failed_tasks)
@logger.error("The following files could not be ingested. Please check the details in the log file: " + @logger.getLogPath(), print=TRUE)
if next_config_diff.length > 15
@logger.error(next_config_diff[0, 5], print=TRUE)
@logger.error(".....", print=TRUE)
@logger.error(next_config_diff[next_config_diff.length-5, next_config_diff.length-1], print=TRUE)
else
@logger.error(next_config_diff, print=TRUE)
end
@logger.info("You can retry the ingestion for those pending files.", print=TRUE)
puts
end end
next_config_diff = {} next_config_diff = {}
return {"done" => next_config_diff} return {DatasetUtils::RUN_DONE => next_config_diff}
end end
def initialize(task, schema, index, page_builder) def initialize(task, schema, index, page_builder)
...@@ -192,8 +206,7 @@ module Embulk ...@@ -192,8 +206,7 @@ module Embulk
@supplier = task['supplier'] @supplier = task['supplier']
@dataset = task['data_set'] @dataset = task['data_set']
@chunk_size = task['chunk_size'] @chunk_size = task['chunk_size']
@input_dirs = task['inputs'] @data_set_directory = task['data_set_directory']
@data_set_directory = task['inputs'][0]
@logger = LogManager.instance() @logger = LogManager.instance()
@dataset_utils = DatasetUtils.new(@data_set_directory) @dataset_utils = DatasetUtils.new(@data_set_directory)
end end
...@@ -210,29 +223,14 @@ module Embulk ...@@ -210,29 +223,14 @@ module Embulk
size = File.size(path) size = File.size(path)
hash = @dataset_utils.getHash(path) hash = @dataset_utils.getHash(path)
end end
extension = File.extname path filename, extension, reference = @dataset_utils.getPathInfo(path, @dataset)
if path.start_with?(@input_dirs[0]) @dataset_utils.saveCurrentOperation(DatasetUtils::INGESTION, reference)
filename = path.sub(@input_dirs[0], "")
filename = filename.reverse.sub(extension.reverse, "").reverse
end
extension.gsub! '.', ''
extension = extension == "" ? "none" : extension
dataset_file = file_dict["path"].sub(@data_set_directory, @dataset.end_with?("/") ? @dataset : @dataset + "/")
if extension != "none"
old_pattern = filename + '.' + extension
new_pattern = filename + '/' + extension
dataset_file = dataset_file.reverse.sub(old_pattern.reverse, new_pattern.reverse).reverse
else
dataset_file += "/" + extension
end
@dataset_utils.saveCurrentOperation(DatasetUtils::INGESTION, dataset_file)
each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete) do |entry| each_chunk(path, filename, extension, size, hash, schema[1..-1].map{|elm| elm.name}, @chunk_size, delete) do |entry|
@page_builder.add(entry) @page_builder.add(entry)
end end
@page_builder.finish @page_builder.finish
rescue java.lang.OutOfMemoryError rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(file_dict["path"]) @logger.logOutOfMemoryError(path)
return_value = DatasetUtils::RUN_ABORTED return_value = DatasetUtils::RUN_ABORTED
rescue Exception => e rescue Exception => e
@logger.error("An error occurred during file ingestion: " + e.to_s, print=TRUE) @logger.error("An error occurred during file ingestion: " + e.to_s, print=TRUE)
...@@ -242,28 +240,26 @@ module Embulk ...@@ -242,28 +240,26 @@ module Embulk
else else
return_value = DatasetUtils::RUN_DONE return_value = DatasetUtils::RUN_DONE
end end
# update reports if operation successfully ended
if return_value == DatasetUtils::RUN_DONE if return_value == DatasetUtils::RUN_DONE
if delete if delete
if @dataset_utils.reportFileExist() if @dataset_utils.reportFileExist()
@dataset_utils.deleteFromReport(dataset_file, return_value) @dataset_utils.deleteFromReport(reference, return_value)
end end
else else
if @dataset_utils.reportFileExist() if @dataset_utils.reportFileExist()
@dataset_utils.addToReport(dataset_file, return_value, size, hash, task['data_set']) @dataset_utils.addToReport(reference, return_value, size, hash, task['data_set'])
end end
end end
end end
@dataset_utils.removeCurrentOperation() @dataset_utils.removeCurrentOperation()
return {return_value => file_dict["path"]} return {return_value => path}
end end
private private
def each_chunk(path, filename, extension, size, hash, fields, chunk_size=CHUNK_SIZE, delete=FALSE) def each_chunk(path, filename, extension, size, hash, fields, chunk_size=DatasetUtils::CHUNK_SIZE, delete=FALSE)
if delete if delete
File.delete(path) if File.exist?(path)
values = [@supplier, @dataset, filename, extension, "", DatasetUtils::DELETE, "", ""] values = [@supplier, @dataset, filename, extension, "", DatasetUtils::DELETE, "", ""]
yield(values) yield(values)
else else
...@@ -283,7 +279,7 @@ module Embulk ...@@ -283,7 +279,7 @@ module Embulk
data += file_object.read(chunk_size) data += file_object.read(chunk_size)
next_byte = file_object.read(1) next_byte = file_object.read(1)
if not next_byte if not next_byte
eof = EOF eof = DatasetUtils::EOF
if first # this means that the whole file will be ingested at once (not split) if first # this means that the whole file will be ingested at once (not split)
eof = "" eof = ""
end end
......
...@@ -7,9 +7,6 @@ module Embulk ...@@ -7,9 +7,6 @@ module Embulk
class Wendelininput < InputPlugin class Wendelininput < InputPlugin
CHUNK_SIZE = 50000000 #50mb
MEGA = 1000000
UPDATE = "U" UPDATE = "U"
RESUME = "R" RESUME = "R"
DOWNLOAD = "D" DOWNLOAD = "D"
...@@ -17,34 +14,17 @@ module Embulk ...@@ -17,34 +14,17 @@ module Embulk
Plugin.register_input("wendelin", self) Plugin.register_input("wendelin", self)
def self.warnConflicts(remote_streams, data_set, action) def self.warnConflicts(remote_streams, data_set)
if not remote_streams.empty? if not remote_streams.empty?
paths = [@data_set_directory.end_with?("/") ? @data_set_directory : @data_set_directory + "/"] conflicts = @dataset_utils.getLocalConflicts(remote_streams, data_set)
local_files = paths.map {|path|
next [] unless Dir.exist?(path)
Dir[(path + '/**/*').gsub! '//', '/']
}.flatten.select{ |file| File.file?(file) }
local_changes, a, b, c = @dataset_utils.getLocalChanges(local_files, data_set)
data_set = @data_set.end_with?("/") ? @data_set : @data_set + "/"
remote_changes = remote_streams.map { |remote|
remote = @data_set_directory + remote["reference"].reverse.sub("/".reverse, ".".reverse).reverse.sub(data_set, "")
remote.end_with?(".none") ? remote[0...-5] : remote
}
conflicts = local_changes.select{ |conflict| remote_changes.include? conflict["path"] }.map{ |conflict| conflict["path"] }
# check scenario where the last version file exists but not in report
# (due download interrumped right after save the file but before add it to report)
if action == RESUME and conflicts.length == 1 and File.exist?(conflicts[0])
@logger.warn("The file #{conflicts[0]} was detected as false positive conflict and it was not informed to user.")
conflicts = []
end
if not conflicts.empty? if not conflicts.empty?
@logger.warn("CONFLICT: there are conflicts with some of your local changes.", print=TRUE) @logger.warn("CONFLICT: there are conflicts with some of your local changes.", print=TRUE)
puts "** press key **" puts "** press key **"
option = gets option = gets
@logger.warn("Conflicted files:", print=TRUE) @logger.warn("Conflicted changes:", print=TRUE)
@logger.warn(conflicts, print=TRUE) @logger.warn(conflicts, print=TRUE)
puts puts
@logger.warn("Your local conflicted files will be overwritten by download.", print=TRUE) @logger.warn("Your local conflicted changes will be overwritten by current download.", print=TRUE)
@logger.warn("Do you want to continue? (y/n)", print=TRUE) @logger.warn("Do you want to continue? (y/n)", print=TRUE)
option = gets option = gets
option = option.chomp option = option.chomp
...@@ -52,58 +32,49 @@ module Embulk ...@@ -52,58 +32,49 @@ module Embulk
@logger.info("Download cancelled by user.", print=TRUE) @logger.info("Download cancelled by user.", print=TRUE)
@logger.abortExecution(error=FALSE) @logger.abortExecution(error=FALSE)
end end
@dataset_utils.deleteStagedFile()
end end
end end
end end
def self.askUserForAction(task, action) def self.askUserForAction(task, action)
if action == RESUME if action == RESUME
action_message = "#{RESUME}: Resume. Continues download from last file." action_message = "#{RESUME}: Resume. Continues download from last file."
else else
action = UPDATE action = UPDATE
action_message = "#{UPDATE}: Update. Checks for changes in dataset." action_message = "#{UPDATE}: Update. Checks for changes in dataset."
end
valid_option = FALSE
while not valid_option
@logger.info("Please select an option [#{action}, #{DOWNLOAD}, #{ABORT}]", print=TRUE)
@logger.info(action_message, print=TRUE)
@logger.info("#{DOWNLOAD}: Download. Downloads the dataset from scratch.", print=TRUE)
@logger.info("#{ABORT}: Abort operation.", print=TRUE)
option = gets
option = option.chomp
if not [action, DOWNLOAD, ABORT].include? option
@logger.info("Invalid option", print=TRUE)
else
valid_option = TRUE
end
end
case option
when action
@logger.info("Checking remote changes and posible local conflicts...", print=TRUE) if action != RESUME
task['data_streams'] = @dataset_utils.getRemoteChangedDataStreams(task['data_streams'])
self.warnConflicts(task['data_streams'], task['data_set']) if action != RESUME
@dataset_utils.deleteCompletedFile()
if task['data_streams'].empty?
@logger.info("Your downloaded dataset is already up to date.", print=TRUE)
end end
valid_option = FALSE when DOWNLOAD
while not valid_option @logger.info("Checking remote files and posible local conflicts...", print=TRUE)
@logger.info("Please select an option [#{action}, #{DOWNLOAD}, #{ABORT}]", print=TRUE) self.warnConflicts(task['data_streams'], task['data_set'])
@logger.info(action_message, print=TRUE) @dataset_utils.deleteCompletedFile()
@logger.info("#{DOWNLOAD}: Download. Downloads the dataset from scratch.", print=TRUE) @dataset_utils.createReportFile()
@logger.info("#{ABORT}: Abort operation.", print=TRUE) when ABORT
option = gets @logger.abortExecution()
option = option.chomp end
if not [action, DOWNLOAD, ABORT].include? option
@logger.info("Invalid option", print=TRUE)
else
valid_option = TRUE
end
end
case option
when action
task['data_streams'] = @dataset_utils.getRemoteChangedDataStreams(task['data_streams'])
self.warnConflicts(task['data_streams'], task['data_set'], action)
@dataset_utils.deleteCompletedFile()
if task['data_streams'].empty?
@logger.info("No new files in dataset.", print=TRUE)
@logger.info("Your downloaded dataset is already up to date.", print=TRUE)
end
when DOWNLOAD
ebulk_file = @data_set_directory + "/.ebulk_dataset"
ebulk_file_content = ""
if File.file?(ebulk_file)
ebulk_file_content = File.read(ebulk_file)
end
FileUtils.rm_rf(@data_set_directory)
unless File.directory?(@data_set_directory)
FileUtils.mkdir_p(@data_set_directory)
end
if ebulk_file_content != ""
File.open(ebulk_file, 'w') { |file| file.write(ebulk_file_content) }
end
@dataset_utils.deleteCompletedFile()
@dataset_utils.createReportFile()
when ABORT
@logger.abortExecution()
end
end end
def self.transaction(config, &control) def self.transaction(config, &control)
...@@ -114,18 +85,12 @@ module Embulk ...@@ -114,18 +85,12 @@ module Embulk
@erp5_url = config.param('erp5_url', :string) @erp5_url = config.param('erp5_url', :string)
@data_set = config.param('data_set', :string) @data_set = config.param('data_set', :string)
@logger.info("Dataset name: #{@data_set}") @logger.info("Dataset name: #{@data_set}")
if @data_set == '$DATA_SET'
@logger.error("There was an error setting the configuration file", print=TRUE)
@logger.info("Please try manual download or update manually the download configuration file.", print=TRUE)
@logger.abortExecution()
end
@user = config.param("user", :string, defualt: nil) @user = config.param("user", :string, defualt: nil)
@logger.info("User: #{@user}") @logger.info("User: #{@user}")
@password = config.param("password", :string, default: nil) @password = config.param("password", :string, default: nil)
@chunk_size = config.param('chunk_size', :float, default: 0) * MEGA @chunk_size = config.param('chunk_size', :float, default: 0) * DatasetUtils::MEGA
@output_path = config.param("output_path", :string, :default => nil) @output_path = config.param("output_path", :string, :default => nil)
if File.directory?(@output_path) if not File.directory?(@output_path)
else
@logger.error("Output directory not found.", print=TRUE) @logger.error("Output directory not found.", print=TRUE)
@logger.abortExecution() @logger.abortExecution()
end end
...@@ -141,31 +106,21 @@ module Embulk ...@@ -141,31 +106,21 @@ module Embulk
'tool_dir' => @tool_dir 'tool_dir' => @tool_dir
} }
if task['chunk_size'] == 0 if task['chunk_size'] == 0
task['chunk_size'] = CHUNK_SIZE task['chunk_size'] = DatasetUtils::CHUNK_SIZE
end end
@logger.info("Chunk size set in #{task['chunk_size']/MEGA}MB") @logger.info("Chunk size set in #{task['chunk_size']/DatasetUtils::MEGA}MB")
@data_set_directory = @output_path.end_with?("/") ? @output_path : @output_path + "/" @dataset_utils = DatasetUtils.new("")
task['data_set_directory'] = @data_set_directory task['data_set_directory'] = @dataset_utils.appendSlashTo(@output_path)
@data_set_directory = task['data_set_directory']
@dataset_utils = DatasetUtils.new(@data_set_directory) @dataset_utils = DatasetUtils.new(@data_set_directory)
@logger.info("Getting remote file list from dataset '#{@data_set}'...", print=TRUE) @logger.info("Getting remote file list from dataset '#{@data_set}'...", print=TRUE)
data_stream_list = @wendelin.getDataStreams(@data_set) data_stream_list = @wendelin.getDataStreams(@data_set)
n_retry = 0
while data_stream_list["status_code"] == 2 and n_retry < 6
sleep 10
data_stream_list = @wendelin.getDataStreams(@data_set)
n_retry += 1
end
if data_stream_list["status_code"] == 0 if data_stream_list["status_code"] == 0
if data_stream_list["result"].empty? if data_stream_list["result"].empty?
@logger.error("No valid data found for data set " + @data_set, print=TRUE) @logger.error("No valid data found for data set " + @data_set, print=TRUE)
@logger.abortExecution(error=FALSE) @logger.abortExecution(error=FALSE)
end end
task['data_streams'] = data_stream_list["result"] task['data_streams'] = data_stream_list["result"]
elsif data_stream_list["status_code"] == 2
@logger.error("Dataset '#{@data_set}' has files recently ingested waiting for processing.", print=TRUE)
@logger.error("Please retry in some minutes.", print=TRUE)
@logger.abortExecution(error=FALSE)
else else
@logger.error(data_stream_list["error_message"], print=TRUE) @logger.error(data_stream_list["error_message"], print=TRUE)
@logger.abortExecution() @logger.abortExecution()
...@@ -189,13 +144,9 @@ module Embulk ...@@ -189,13 +144,9 @@ module Embulk
self.askUserForAction(task, action=UPDATE) self.askUserForAction(task, action=UPDATE)
end end
else else
dir_entries = Dir.entries(@data_set_directory).length if not @dataset_utils.dirEmpty(@data_set_directory)
if File.file?(@data_set_directory+"/.ebulk_dataset")
dir_entries -= 1
end
if dir_entries > 2
puts puts
@logger.info("Dataset download directory is not empty! It will be overwritten: " + @data_set_directory, print=TRUE) @logger.info("Dataset download directory is not empty! Its files could be overwritten: " + @data_set_directory, print=TRUE)
@logger.info("Continue with download? (y/n)", print=TRUE) @logger.info("Continue with download? (y/n)", print=TRUE)
option = gets option = gets
option = option.chomp option = option.chomp
...@@ -203,6 +154,8 @@ module Embulk ...@@ -203,6 +154,8 @@ module Embulk
@logger.info("Download cancelled by user.", print=TRUE) @logger.info("Download cancelled by user.", print=TRUE)
@logger.abortExecution(error=FALSE) @logger.abortExecution(error=FALSE)
end end
@logger.info("Checking remote files and posible local conflicts...", print=TRUE)
self.warnConflicts(task['data_streams'], task['data_set'])
end end
@dataset_utils.createReportFile() @dataset_utils.createReportFile()
end end
...@@ -225,18 +178,7 @@ module Embulk ...@@ -225,18 +178,7 @@ module Embulk
def self.resume(task, columns, count, &control) def self.resume(task, columns, count, &control)
@logger = LogManager.instance() @logger = LogManager.instance()
task_reports = yield(task, columns, count) task_reports = yield(task, columns, count)
if task_reports.any? @dataset_utils.showTaskReport(task_reports)
@logger.info("Reports:", print=TRUE)
if task_reports.length > 15
@logger.info(task_reports[0, 5], print=TRUE)
@logger.info(".....", print=TRUE)
@logger.info(task_reports[task_reports.length-5, task_reports.length-1], print=TRUE)
else
@logger.info(task_reports, print=TRUE)
end
@logger.info("Full task report:")
@logger.info(task_reports)
end
next_config_diff = task_reports.map{|hash| hash[DatasetUtils::RUN_DONE]}.flatten.compact next_config_diff = task_reports.map{|hash| hash[DatasetUtils::RUN_DONE]}.flatten.compact
if(next_config_diff.length == count) if(next_config_diff.length == count)
if(count > 0) if(count > 0)
...@@ -245,10 +187,15 @@ module Embulk ...@@ -245,10 +187,15 @@ module Embulk
@logger.info("Dataset files are in dataset directory: " + @data_set_directory, print=TRUE) @logger.info("Dataset files are in dataset directory: " + @data_set_directory, print=TRUE)
end end
@dataset_utils.createCompletedFile() @dataset_utils.createCompletedFile()
if count > 10 else
next_config_diff = {} if(count > 0)
puts
@logger.error("Some remote files could not be downloaded. Please check the details in the log file: " + @logger.getLogPath(), print=TRUE)
@logger.info("Please retry the operation for download those files.", print=TRUE)
puts
end end
end end
next_config_diff = {}
return {DatasetUtils::RUN_DONE => next_config_diff} return {DatasetUtils::RUN_DONE => next_config_diff}
end end
...@@ -296,7 +243,6 @@ module Embulk ...@@ -296,7 +243,6 @@ module Embulk
else else
return_value = DatasetUtils::RUN_DONE return_value = DatasetUtils::RUN_DONE
end end
# update reports if operation successfully ended
if return_value == DatasetUtils::RUN_DONE if return_value == DatasetUtils::RUN_DONE
if hash.to_s == DatasetUtils::DELETE if hash.to_s == DatasetUtils::DELETE
@dataset_utils.deleteFromReport(ref, return_value) @dataset_utils.deleteFromReport(ref, return_value)
......
...@@ -35,12 +35,9 @@ module Embulk ...@@ -35,12 +35,9 @@ module Embulk
page.each do |record| page.each do |record|
reference = record[0] reference = record[0]
data_chunk = Base64.decode64(record[1]) data_chunk = Base64.decode64(record[1])
data_set_directory = @output_path.end_with?("/") ? @output_path : @output_path + "/" @dataset_utils = DatasetUtils.new("")
ref = reference.reverse.sub("/".reverse, ".".reverse).reverse.sub(record[2]+"/", "") data_set_directory = @dataset_utils.appendSlashTo(@output_path)
if ref.end_with?(".none") file_path = @dataset_utils.referenceToPath(reference, data_set_directory, record[2])
ref = ref[0...-5]
end
file_path = data_set_directory + ref
write_mode = 'ab' write_mode = 'ab'
if record[3] == DatasetUtils::DELETE if record[3] == DatasetUtils::DELETE
File.delete(file_path) if File.exist?(file_path) File.delete(file_path) if File.exist?(file_path)
...@@ -48,7 +45,7 @@ module Embulk ...@@ -48,7 +45,7 @@ module Embulk
if record[3] == TRUE.to_s if record[3] == TRUE.to_s
write_mode = 'w' write_mode = 'w'
end end
dirname = File.dirname(data_set_directory + ref) dirname = File.dirname(file_path)
unless File.directory?(dirname) unless File.directory?(dirname)
FileUtils.mkdir_p(dirname) FileUtils.mkdir_p(dirname)
end end
......
...@@ -46,11 +46,12 @@ module Embulk ...@@ -46,11 +46,12 @@ module Embulk
hash = record[7] hash = record[7]
begin begin
if eof == DatasetUtils::DELETE if eof == DatasetUtils::DELETE
reference = [dataset, filename, extension].join("/") reference = [dataset, filename, extension].join(DatasetUtils::REFERENCE_SEPARATOR)
@wendelin.delete(reference) @wendelin.delete(reference)
else else
reference = [supplier, dataset, filename, extension, eof, size, hash].join("/") reference = [supplier, dataset, filename, extension, eof, size, hash].join(DatasetUtils::REFERENCE_SEPARATOR)
if not @wendelin.ingest(reference, data_chunk) split = eof != ""
if not @wendelin.ingest(reference, data_chunk, split)
raise "could not ingest" raise "could not ingest"
end end
end end
......
require_relative '../filelogger' require_relative '../filelogger'
require_relative '../dataset_utils'
class Index class Index
include Singleton include Singleton
...@@ -19,21 +20,20 @@ module Embulk ...@@ -19,21 +20,20 @@ module Embulk
class BinaryParserPlugin < ParserPlugin class BinaryParserPlugin < ParserPlugin
Plugin.register_parser("binary", self) Plugin.register_parser("binary", self)
CHUNK_SIZE = 50
MEGA = 1000000
EOF = "EOF"
def self.transaction(config, &control) def self.transaction(config, &control)
tool_dir = config.param('tool_dir', :string, default: ".") tool_dir = config.param('tool_dir', :string, default: ".")
@logger = LogManager.instance() @logger = LogManager.instance()
@logger.setFilename(tool_dir, "parser") @logger.setFilename(tool_dir, "parser")
task = { task = {
chunk_size: config.param('chunk_size', :float, default: CHUNK_SIZE) * MEGA, chunk_size: config.param('chunk_size', :float, default: 0) * DatasetUtils::MEGA,
supplier: config.param("supplier", :string, default: "parser"), supplier: config.param("supplier", :string, default: "parser"),
data_set: config.param("data_set", :string), data_set: config.param("data_set", :string),
input_plugin: config.param("storage", :string, default: "parser"), input_plugin: config.param("storage", :string, default: "parser"),
date: Time.now.strftime("%Y-%m-%d_%H-%M-%S") date: Time.now.strftime("%Y-%m-%d_%H-%M-%S")
} }
if task['chunk_size'] == 0
task['chunk_size'] = DatasetUtils::CHUNK_SIZE
end
columns = [ columns = [
Column.new(0, "supplier", :string), Column.new(0, "supplier", :string),
Column.new(1, "data_set", :string), Column.new(1, "data_set", :string),
...@@ -71,7 +71,7 @@ module Embulk ...@@ -71,7 +71,7 @@ module Embulk
end end
private private
def each_chunk(file, filename, chunk_size=CHUNK_SIZE) def each_chunk(file, filename, chunk_size=DatasetUtils::CHUNK_SIZE)
extension = @index.to_s.rjust(3, "0") extension = @index.to_s.rjust(3, "0")
npart = 0 npart = 0
next_byte = file.read(1) next_byte = file.read(1)
...@@ -89,7 +89,7 @@ module Embulk ...@@ -89,7 +89,7 @@ module Embulk
data += file.read(chunk_size) data += file.read(chunk_size)
next_byte = file.read(1) next_byte = file.read(1)
if not next_byte if not next_byte
eof = EOF eof = DatasetUtils::EOF
if first if first
# this means that the whole file will be ingested at once (not split) # this means that the whole file will be ingested at once (not split)
eof = "" eof = ""
......
...@@ -23,6 +23,9 @@ class WendelinClient ...@@ -23,6 +23,9 @@ class WendelinClient
rescue Exception => e rescue Exception => e
@logger.error("An error occurred while checking if reference exists: " + e.to_s) @logger.error("An error occurred while checking if reference exists: " + e.to_s)
@logger.error(e.backtrace) @logger.error(e.backtrace)
if e.to_s.include? "Unauthorized" or e.to_s.include? "401"
raise e
end
return FALSE return FALSE
else else
return res.to_s == 'TRUE' return res.to_s == 'TRUE'
...@@ -53,27 +56,27 @@ class WendelinClient ...@@ -53,27 +56,27 @@ class WendelinClient
end end
end end
def ingest(reference, data_chunk) def ingest(reference, data_chunk, split)
@logger.info("Ingestion reference: #{reference}", print=TRUE) @logger.info("Ingestion reference: #{reference}", print=TRUE)
if Time.new - @last_ingestion < 2 if split and Time.new - @last_ingestion < 3
# avoid send ingestions to close (specially for split ones) # avoid to send split ingestions to close
sleep 2 sleep 3
end end
if exists(reference) if exists(reference)
@logger.info("There is another ingestion already done for the pair data_set-filename. Reference "\ @logger.info("There is another ingestion already done for the pair dataset-filename. Reference "\
+ reference, print=TRUE) + reference, print=TRUE)
@logger.info("Rename your reference or delete the older ingestion.", print=TRUE) @logger.info("Rename your file or download the full dataset to make local changes.", print=TRUE)
return FALSE return FALSE
end end
if reference.include? "#" or reference.include? "+" if reference.include? "#" or reference.include? "+"
raise "Invalid chars in file name. Please rename it." raise "invalid chars in file name. Please rename it."
end end
begin begin
uri = URI("#{@erp5_url}/ingest?reference=#{reference}") uri = URI("#{@erp5_url}/ingest?reference=#{reference}")
rescue Exception => e rescue Exception => e
@logger.error("An error occurred while generating url: " + e.to_s) @logger.error("An error occurred while generating url: " + e.to_s)
@logger.error(e.backtrace) @logger.error(e.backtrace)
raise "Invalid chars in file name. Please rename it." raise "invalid chars in file name. Please rename it."
end end
response = handleRequest(uri, reference, data_chunk) response = handleRequest(uri, reference, data_chunk)
if response == FALSE if response == FALSE
...@@ -138,7 +141,7 @@ class WendelinClient ...@@ -138,7 +141,7 @@ class WendelinClient
res = Net::HTTP.start(uri.hostname, uri.port, res = Net::HTTP.start(uri.hostname, uri.port,
:use_ssl => (uri.scheme == 'https'), :use_ssl => (uri.scheme == 'https'),
:verify_mode => OpenSSL::SSL::VERIFY_NONE, :verify_mode => OpenSSL::SSL::VERIFY_NONE,
:ssl_timeout => 20, :open_timeout => 20, :read_timeout => 20, :ssl_timeout => 300, :open_timeout => 300, :read_timeout => 300,
) do |http| ) do |http|
http.request(req) http.request(req)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment