Commit fb309f64 authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

erp5_wendelin_drone: fix split data ingestion

As Fluentd tail plugin can send data from a single file several packets,
this needs to be handled properly on ingestion side.
parent c1e0773b
import pandas as pd
import numpy as np
from zExceptions import BadRequest
# Function to remove non-ASCII characters from a string, because I can not be bothered to make to_records with utf8 right now
def remove_non_ascii(text):
......@@ -19,7 +20,7 @@ if end is None:
if len(keys) == 0:
context.log("No Keys found")
return
return
dtype= {'timestamp (ms)': '<f8',
......@@ -43,7 +44,7 @@ for key in [x for x in keys if x not in end]:
if df.shape[0] == 0:
return
# Remove non-ASCII characters from DataFrame values
df = df.applymap(remove_non_ascii)
# Remove non-ASCII characters from column names (headers)
......@@ -54,7 +55,7 @@ for key in [x for x in keys if x not in end]:
ndarray = df.to_records(index = False) #column_dtypes does not work here for some reasone, even if it is an actuall parameter
zbigarray = out_data_array.getArray()
if zbigarray is None:
zbigarray = out_data_array.initArray(shape=(0,), dtype=ndarray.dtype.fields)
start_array = zbigarray.shape[0]
......@@ -68,9 +69,9 @@ for key in [x for x in keys if x not in end]:
data_array_line.edit(reference=key,
index_expression="%s:%s" %(start_array, zbigarray.shape[0])
)
except:
context.log("Can not create Data Array Line")
except:
except BadRequest:
context.log("Data Array Line already exists")
except ValueError:
context.log("File "+str(key)+ " is not well formated")
new_end = new_end + str(key)
......
......@@ -2,17 +2,22 @@
# The "filepath" contains the filepath of the log file. Because we only care about the file name, we will need to extract it, which will be used as the bucket key for the file.
# If the same "filepath" is used multiple times in different messages, the bucket data is overwritten.
data_bucket_stream = bucket_stream["Data Bucket Stream"]
try:
l=[(c[1]["filepath"].split("/")[-1], c[1]["message"]) for c in context.unpackLazy(data_chunk, use_list=False)]
names_message_dir = {}
for file_name, message in l:
try:
names_message_dir[file_name].append(message)
except:
except KeyError:
names_message_dir[file_name] = [message]
for tag in names_message_dir:
bucket_stream["Data Bucket Stream"].insertBucket(tag,"\n".join(names_message_dir[tag]))
except:
if tag in data_bucket_stream.getKeyList():
bucket = data_bucket_stream.getBucketByKey(tag)
if bucket.split("\n")[0] != names_message_dir[tag][0]:
names_message_dir[tag].insert(0, bucket)
data_bucket_stream.insertBucket(tag,"\n".join(names_message_dir[tag]))
except KeyError:
context.log("The send file is missing the filepath attribute")
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment