Commit fe468465 authored by Ivan Tyagov's avatar Ivan Tyagov

ebulk new release (1.1.7)

See merge request !4
parents 4b9ac6c0 7cfc2018
......@@ -6,7 +6,7 @@ DOWN_URL="$DEFAULT_DATA_LAKE_URL"
ING_POLICY="portal_ingestion_policies/default_ebulk"
ING_URL="$DEFAULT_DATA_LAKE_URL$ING_POLICY"
EBULK_VERSION="1.1.6"
EBULK_VERSION="1.1.7"
EMBULK_VERSION="0.9.7"
EBULK_DATA_PATH=~/.ebulk
EBULK_DATASET_FILE_NAME="/.ebulk_dataset"
......@@ -37,6 +37,7 @@ GREEN='\033[0;32m'
ORANGE='\033[0;33m'
NC='\033[0m'
DEFAULT_CHUNK_SIZE="50"
DEFAULT_BATCH_SIZE="1000"
STAGE_ADD="add"
STAGE_REMOVE="remove"
STAGE_RESET="reset"
......@@ -92,18 +93,20 @@ function checkParameters {
fi
fi
EBULK_DATASET_FILE="$DATASET_DIR$EBULK_DATASET_FILE_NAME"
if ! [[ $REFERENCE =~ $re ]] ; then
if [ "$REFERENCE" = "." ] && [[ -z "$STORAGE" ]] ; then
echo
echo -e "${ORANGE}[ERROR] You are not in a dataset directory ${GREEN}'$REFERENCE'${ORANGE}.${NC}"
echo
else
echo
echo -e "${ORANGE}[ERROR] Error in argument: invalid dataset name ${GREEN}'$REFERENCE'${ORANGE}.${NC}"
echo -e "${ORANGE}[ERROR] Only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed.${NC}"
echo
if [ "$ALWAYS_YES" = "true" ] ; then
if ! [[ $REFERENCE =~ $re ]] ; then
if [ "$REFERENCE" = "." ] && [[ -z "$STORAGE" ]] ; then
echo
echo -e "${ORANGE}[ERROR] You are not in a dataset directory ${GREEN}'$REFERENCE'${ORANGE}.${NC}"
echo
else
echo
echo -e "${ORANGE}[ERROR] Error in argument: invalid dataset name ${GREEN}'$REFERENCE'${ORANGE}.${NC}"
echo -e "${ORANGE}[ERROR] Only alphanumerics, dots ( . ), underscores ( _ ) and hyphens ( - ) are allowed.${NC}"
echo
fi
helpReadme >&2; return 1
fi
helpReadme >&2; return 1
fi
if [[ $DATASET_DIR != $REFERENCE ]]; then
if [ "$REFERENCE" = "." ] ; then
......@@ -162,6 +165,15 @@ function checkParameters {
helpReadme >&2; return 1
fi
fi
if [ ! -z "$BATCH_SIZE" ]; then
re='^[0-9]+$'
if ! [[ $BATCH_SIZE =~ $re ]] ; then
echo
echo -e "${ORANGE}[ERROR] Error in argument: batch size must be an integer.${NC}"
echo
helpReadme >&2; return 1
fi
fi
}
function configure {
......@@ -253,6 +265,21 @@ function defaultDataLakeUrl {
echo "[INFO] Data-lake url set to default '$DEFAULT_DATA_LAKE_URL'"
}
function displayInfo {
echo "[INFO] Current ebulk data-lake url: $DOWN_URL"
if [ -f "$CREDENTIALS_FILE" ]; then
STORED_CRED=$(cat "$CREDENTIALS_FILE" 2>/dev/null)
if [[ "$STORED_CRED" != "" ]]; then
IFS=';'
read -ra CRED_ARRAY <<< "$STORED_CRED"
echo "[INFO] Credentials stored for user: ${CRED_ARRAY[0]}"
IFS=' '
exit
fi
fi
echo "[INFO] No stored credentials."
}
function updateConfigFile {
if [ "$STORAGE" != "" ] ; then
echo
......@@ -289,6 +316,7 @@ function updateConfigFile {
DATA_SET=\"$DATA_SET\"
USER=\"$USER\"
CHUNK=\"$CHUNK\"
BATCH_SIZE=\"$BATCH_SIZE\"
DATASET_DIR=\"$DATASET_DIR\"
DOWN_URL=\"$DOWN_URL\"
ING_URL=\"$ING_URL\"
......@@ -333,6 +361,11 @@ function runProcess {
else
echo "[INFO] Chunk size set in $CHUNK MB."
fi
if [ "$BATCH_SIZE" -eq "0" ]; then
echo "[INFO] Default batch size: $DEFAULT_BATCH_SIZE MB."
else
echo "[INFO] Batch size set in $BATCH_SIZE files."
fi
fi
if [ "$DATASET_DESCRIPTION" != "" ] ; then
echo
......@@ -637,12 +670,18 @@ while [ "$1" != "" ]; do
-c | --chunk ) shift
CHUNK=$1
;;
-bs | --batch-size ) shift
BATCH_SIZE=$1
;;
-h | --help ) cat $TOOL_PATH/help.md
exit
;;
-v | --version ) echo "ebulk $EBULK_VERSION"
exit
;;
-i | --info ) displayInfo
exit
;;
-e | --examples ) cat $TOOL_PATH/example.md
exit
;;
......@@ -678,7 +717,7 @@ while [ "$1" != "" ]; do
shift
done
for ELEMENT in '' '-d' '--directory' '-s' '--storage' '-cs' '--custom-storage' '-a' '--advanced' '-c' '--chunk' '-dc' '--discard-changes' '-dd' '--set-description'; do
for ELEMENT in '' '-d' '--directory' '-s' '--storage' '-cs' '--custom-storage' '-a' '--advanced' '-c' '--chunk' '-bs' '--batch-size' '-y' '--yes' '-dc' '--discard-changes' '-dd' '--set-description'; do
if [ "$ELEMENT" = "$REFERENCE" ]; then
REFERENCE="."
fi
......@@ -696,6 +735,9 @@ fi
if [[ $CHUNK = "" ]]; then
CHUNK=$DEFAULT_CHUNK_SIZE
fi
if [[ $BATCH_SIZE = "" ]]; then
BATCH_SIZE=$DEFAULT_BATCH_SIZE
fi
case $OPERATION in
add)
......
......@@ -6,6 +6,7 @@ in:
erp5_url: $DOWN_URL
data_set: $DATA_SET
chunk_size: $CHUNK
batch_size: $BATCH_SIZE
output_path: $DATASET_DIR
tool_dir: $TOOL_DIR
always_yes: $ALWAYS_YES
......
......@@ -125,7 +125,12 @@ module Embulk
@logger.abortExecution(error=FALSE)
end
@logger.info("Checking remote dataset...", print=TRUE)
data_stream_dict = @wendelin.getDataStreams(task['data_set'])
data_stream_count = @wendelin.getDataStreamCount(@data_set)
if data_stream_count["status_code"] != 0
@logger.error(data_stream_count["error_message"], print=TRUE)
@logger.abortExecution()
end
data_stream_dict = @wendelin.getDataStreamList(@data_set, data_stream_count["result"], 1000)
if data_stream_dict["status_code"] != 0
@logger.error(data_stream_dict["error_message"], print=TRUE)
@logger.abortExecution()
......
......@@ -101,6 +101,7 @@ module Embulk
@logger.setFilename(@tool_dir, "download")
@erp5_url = config.param('erp5_url', :string)
@data_set = config.param('data_set', :string)
@batch_size= config.param('batch_size', :integer)
@logger.info("Dataset name: #{@data_set}")
@output_path = config.param("output_path", :string, :default => nil)
if not File.directory?(@output_path)
......@@ -137,12 +138,18 @@ module Embulk
task['user'], task['password'] = @dataset_utils.getCredentials(@tool_dir)
@wendelin = WendelinClient.new(@erp5_url, task['user'], task['password'])
@logger.info("Getting remote file list from dataset '#{@data_set}'...", print=TRUE)
data_stream_list = @wendelin.getDataStreams(@data_set)
if data_stream_list["status_code"] == 0
if data_stream_list["result"].empty?
data_stream_count = @wendelin.getDataStreamCount(@data_set)
if data_stream_count["status_code"] == 0
if data_stream_count["result"] == 0
@logger.error("No valid data found for data set " + @data_set, print=TRUE)
@logger.abortExecution(error=FALSE)
end
else
@logger.error(data_stream_count["error_message"], print=TRUE)
@logger.abortExecution()
end
data_stream_list = @wendelin.getDataStreamList(@data_set, data_stream_count["result"], @batch_size)
if data_stream_list["status_code"] == 0
task['data_streams'] = data_stream_list["result"]
else
@logger.error(data_stream_list["error_message"], print=TRUE)
......
......@@ -184,6 +184,46 @@ class WendelinClient
end
end
def getDataStreamList(data_set_reference, data_stream_count, batch_size)
#keep backward compatibility
if data_stream_count = -1
return getDataStreams(data_set_reference)
end
data_stream_list = []
total_batches = data_stream_count/batch_size.ceil()
if total_batches > 4
puts
@logger.info("Due to its size, the remote file list will be fetch in batches. This could take some minutes.", print=TRUE)
end
nbatch = 0
while nbatch <= total_batches
batch_dict = getDataStreamBatch(data_set_reference, batch_size*nbatch, batch_size)
if batch_dict["status_code"] != 0
return batch_dict
end
data_stream_list += batch_dict["result"]
nbatch += 1
end
return_dict = Hash.new
return_dict["status_code"] = 0
return_dict["result"] = data_stream_list
return return_dict
end
def getDataStreamBatch(data_set_reference, offset, batch_size)
uri = URI(URI.escape("#{@erp5_url}ERP5Site_getDataStreamList?data_set_reference=#{data_set_reference}&offset=#{offset}&batch_size=#{batch_size}"))
response = handleRequest(uri)
if response["success"] == FALSE
@logger.abortExecution()
end
str = response["message"]
if not str.nil?
str.gsub!(/(\,)(\S)/, "\\1 \\2")
return YAML::load(str)
end
return {'status_code': 0, 'result': []}
end
def getDataStreams(data_set_reference)
uri = URI(URI.escape("#{@erp5_url}ERP5Site_getDataStreamList?data_set_reference=#{data_set_reference}"))
response = handleRequest(uri)
......@@ -198,6 +238,26 @@ class WendelinClient
return {'status_code': 0, 'result': []}
end
def getDataStreamCount(data_set_reference)
uri = URI(URI.escape("#{@erp5_url}ERP5Site_getDataStreamCount?data_set_reference=#{data_set_reference}"))
response = handleRequest(uri)
if response["success"] == FALSE
#keep backward compatibiliy if script doesn't exist in wendelin instance
if response["message"] == 404
@logger.info("Continue with backward compatibility...", print=TRUE)
return {"status_code"=>0, "result"=>-1}
else
@logger.abortExecution()
end
end
str = response["message"]
if not str.nil?
str.gsub!(/(\,)(\S)/, "\\1 \\2")
return YAML::load(str)
end
return {"status_code"=>0, "result"=>0}
end
private
def handleRequest(uri, reference=nil, data_chunk=nil)
req = Net::HTTP::Post.new(uri)
......@@ -249,6 +309,8 @@ class WendelinClient
elsif res.code == '400'
@logger.error(HTTP_MESSAGE_400, print=TRUE)
@logger.abortExecution()
elsif res.code == '404'
return {"success"=>FALSE, "message"=>404}
else
@logger.error(HTTP_MESSAGE_OTHER, print=TRUE)
end
......
......@@ -17,6 +17,7 @@ commands:
-r, --readme Opens README file
-e, --examples Shows some tool usage examples
-v, --version Ebulk tool version
-i, --info Displays information about datalake url, credentials, etc.
store-credentials Stores user and password for automatic authentication
set-data-lake-url Sets the data lake url where to ingest/download
default-data-lake-url Sets the data lake url to default
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment