Commit 477cf068 authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

erp5_wendelin_drone: fix split data ingestion

As Fluentd tail plugin can send data from a single file several packets,
this needs to be handled properly on ingestion side.
parent 077f0e28
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from zExceptions import BadRequest
# Function to remove non-ASCII characters from a string, because I can not be bothered to make to_records with utf8 right now # Function to remove non-ASCII characters from a string, because I can not be bothered to make to_records with utf8 right now
def remove_non_ascii(text): def remove_non_ascii(text):
...@@ -19,7 +20,7 @@ if end is None: ...@@ -19,7 +20,7 @@ if end is None:
if len(keys) == 0: if len(keys) == 0:
context.log("No Keys found") context.log("No Keys found")
return return
dtype= {'timestamp (ms)': '<f8', dtype= {'timestamp (ms)': '<f8',
...@@ -43,7 +44,7 @@ for key in [x for x in keys if x not in end]: ...@@ -43,7 +44,7 @@ for key in [x for x in keys if x not in end]:
if df.shape[0] == 0: if df.shape[0] == 0:
return return
# Remove non-ASCII characters from DataFrame values # Remove non-ASCII characters from DataFrame values
df = df.applymap(remove_non_ascii) df = df.applymap(remove_non_ascii)
# Remove non-ASCII characters from column names (headers) # Remove non-ASCII characters from column names (headers)
...@@ -54,7 +55,7 @@ for key in [x for x in keys if x not in end]: ...@@ -54,7 +55,7 @@ for key in [x for x in keys if x not in end]:
ndarray = df.to_records(index = False) #column_dtypes does not work here for some reasone, even if it is an actuall parameter ndarray = df.to_records(index = False) #column_dtypes does not work here for some reasone, even if it is an actuall parameter
zbigarray = out_data_array.getArray() zbigarray = out_data_array.getArray()
if zbigarray is None: if zbigarray is None:
zbigarray = out_data_array.initArray(shape=(0,), dtype=ndarray.dtype.fields) zbigarray = out_data_array.initArray(shape=(0,), dtype=ndarray.dtype.fields)
start_array = zbigarray.shape[0] start_array = zbigarray.shape[0]
...@@ -68,9 +69,9 @@ for key in [x for x in keys if x not in end]: ...@@ -68,9 +69,9 @@ for key in [x for x in keys if x not in end]:
data_array_line.edit(reference=key, data_array_line.edit(reference=key,
index_expression="%s:%s" %(start_array, zbigarray.shape[0]) index_expression="%s:%s" %(start_array, zbigarray.shape[0])
) )
except: except BadRequest:
context.log("Can not create Data Array Line") context.log("Data Array Line already exists")
except: except ValueError:
context.log("File "+str(key)+ " is not well formated") context.log("File "+str(key)+ " is not well formated")
new_end = new_end + str(key) new_end = new_end + str(key)
......
...@@ -2,17 +2,22 @@ ...@@ -2,17 +2,22 @@
# The "filepath" contains the filepath of the log file. Because we only care about the file name, we will need to extract it, which will be used as the bucket key for the file. # The "filepath" contains the filepath of the log file. Because we only care about the file name, we will need to extract it, which will be used as the bucket key for the file.
# If the same "filepath" is used multiple times in different messages, the bucket data is overwritten. # If the same "filepath" is used multiple times in different messages, the bucket data is overwritten.
data_bucket_stream = bucket_stream["Data Bucket Stream"]
try: try:
l=[(c[1]["filepath"].split("/")[-1], c[1]["message"]) for c in context.unpackLazy(data_chunk, use_list=False)] l=[(c[1]["filepath"].split("/")[-1], c[1]["message"]) for c in context.unpackLazy(data_chunk, use_list=False)]
names_message_dir = {} names_message_dir = {}
for file_name, message in l: for file_name, message in l:
try: try:
names_message_dir[file_name].append(message) names_message_dir[file_name].append(message)
except: except KeyError:
names_message_dir[file_name] = [message] names_message_dir[file_name] = [message]
for tag in names_message_dir: for tag in names_message_dir:
bucket_stream["Data Bucket Stream"].insertBucket(tag,"\n".join(names_message_dir[tag])) if tag in data_bucket_stream.getKeyList():
except: bucket = data_bucket_stream.getBucketByKey(tag)
if bucket.split("\n")[0] != names_message_dir[tag][0]:
names_message_dir[tag].insert(0, bucket)
data_bucket_stream.insertBucket(tag,"\n".join(names_message_dir[tag]))
except KeyError:
context.log("The send file is missing the filepath attribute") context.log("The send file is missing the filepath attribute")
pass pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment