Commit c9b2f062 authored by Ophélie Gagnard's avatar Ophélie Gagnard

slapos_metadata_transform_agent: Cleaned for merge into non-dev branch.

parent b65312da
......@@ -12,32 +12,18 @@ def get_end_and_json_list(start, in_data_stream, chunk_size = 4 * 1024 * 1024):
# assume max path size is 4096 and add 4096 for the rest
max_remaining_for_eol = 8192
end = min(start + chunk_size + max_remaining_for_eol, in_data_stream.getSize())
#unpacked, get_end = in_data_stream.readMsgpackChunkList(start, end) # LEGACY
unpacked = in_data_stream.readChunkList(start, end) # CURRENT
unpacked_string = "".join(unpacked) # CURRENT
unpacked = in_data_stream.readChunkList(start, end)
unpacked_string = "".join(unpacked)
# extending the current chunk until the next end of line,
# so json remains valid
if end < in_data_stream.getSize():
new_end_index = chunk_size
while unpacked_string[new_end_index] != '\n': # CURRENT
#while unpacked[new_end_index] != ord('\n'): # LEGACY
while unpacked_string[new_end_index] != '\n':
new_end_index += 1
end = start + new_end_index + 1
""" not useful anymore? # LEGACY / TOCLEAN
unpacked = unpacked[:end]
try: # efficient but does not prevent errors
raw_data_string = ''.join(map(chr, unpacked))
except: # not efficient but does prevent value errors and type errors
cleaned_unpacked = []
for i in unpacked:
if (type(i) == type(1) # only ints
and 0 <= i and i < 256): # i in range(256)
cleaned_unpacked.append(i)
raw_data_string = ''.join(map(chr, cleaned_unpacked))
#"""
raw_data_string = unpacked_string[:end] # CURRENT
raw_data_string = unpacked_string[:end]
end_scan_regexp = re.compile('.*?\[fluentbit_end\]\n', re.DOTALL)
scan_end = end_scan_regexp.match(raw_data_string)
if not scan_end:
......@@ -47,11 +33,7 @@ def get_end_and_json_list(start, in_data_stream, chunk_size = 4 * 1024 * 1024):
end = start + len(scan_end.group()) + 1
raw_data_string = raw_data_string[:len(scan_end.group())]
context.log("DEBUG 020: len(raw_data_string) =", len(raw_data_string))
context.log("DEBUG 020: type(raw_data_string) =", type(raw_data_string))
context.log("DEBUG 020: type(raw_data_string[0]) =", type(raw_data_string[0]))
line_list = raw_data_string.splitlines()
context.log("DEBUG 020: len(line_list) =", len(line_list))
timestamp_json_regexp = re.compile(r'.*?:(.*?)\[(.*)\]')
json_string_list = [timestamp_json_regexp.match(line).group(2)
......@@ -79,16 +61,6 @@ def get_triplet_list(json_string_list, is_end_of_scan):
fail_counter = 0
for json_string in json_string_list:
tmp_data_list.append(json.loads(json_string))
""" CURRENT
try:
tmp_data_list.append(json.loads(json_string))
except:
context.log('FAILING json_string:', json_string) # DEBUG
fail_counter += 1
pass
if fail_counter > 0:
context.log('FAILED json_string:', fail_counter) # DEBUG
#"""
data_list = []
for data in tmp_data_list:
......@@ -152,17 +124,10 @@ end = in_data_stream.getSize()
if start >= end:
return
end, json_string_list, is_end_of_scan = get_end_and_json_list(start, in_data_stream)
context.log("DEBUG 020: len(json_string_list) =", len(json_string_list))
triplet_list = get_triplet_list(json_string_list, is_end_of_scan)
context.log("DEBUG 020: len(triplet_list) =", len(triplet_list))
uid_list = get_uid_list(triplet_list, in_data_stream)
context.log("DEBUG 020: len(uid_list) =", len(uid_list))
uid_ndarray = create_ndarray(uid_list)
context.log("len(uid_ndarray) =", len(uid_ndarray))
context.log("start =", start)
context.log("end =", end)
if start == 0:
zbigarray = None
else:
......
......@@ -149,7 +149,7 @@
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>997.41827.9954.54391</string> </value>
<value> <string>997.41932.7971.15496</string> </value>
</item>
<item>
<key> <string>state</string> </key>
......@@ -167,7 +167,7 @@
</tuple>
<state>
<tuple>
<float>1642690808.92</float>
<float>1642697119.85</float>
<string>UTC</string>
</tuple>
</state>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment