Commit ca7fa799 authored by Ophélie Gagnard's avatar Ophélie Gagnard

slapos_abyss: Add the transformation script.

The transformation script converts a Data Stream (raw data) into a Data Array (integer array) containing a unique identifier determined by the mapping.
parent a3a2f443
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ActionInformation" module="Products.CMFCore.ActionInformation"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>action</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>categories</string> </key>
<value>
<tuple>
<string>action_type/object_view</string>
</tuple>
</value>
</item>
<item>
<key> <string>category</string> </key>
<value> <string>object_view</string> </value>
</item>
<item>
<key> <string>condition</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>icon</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>exclude_file_view</string> </value>
</item>
<item>
<key> <string>permissions</string> </key>
<value>
<tuple>
<string>View</string>
</tuple>
</value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Action Information</string> </value>
</item>
<item>
<key> <string>priority</string> </key>
<value> <float>2.0</float> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>Exclude File</string> </value>
</item>
<item>
<key> <string>visible</string> </key>
<value> <int>1</int> </value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="Expression" module="Products.CMFCore.Expression"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>text</string> </key>
<value> <string>string:${object_url}/DataProduct_viewExcludeFile</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
import numpy as np
import re
import json
import os.path
def get_end_and_json_list(start, in_data_stream, chunk_size = 4 * 1024 * 1024):
"""
Determine the end index of a scan (i.e. the complete scan of a filesystem).
Return the end index and a list of strings assumed to be valid json strings.
(This assumes the scanning of the file system produces correct strings.)
"""
# assume max path size is 4096 and add 4096 for the rest
max_remaining_for_eol = 8192
end = min(start + chunk_size + max_remaining_for_eol, in_data_stream.getSize())
unpacked = in_data_stream.readChunkList(start, end)
unpacked_string = "".join(unpacked)
# extending the current chunk until the next end of line,
# so json remains valid
if end < in_data_stream.getSize():
new_end_index = chunk_size
while unpacked_string[new_end_index] != '\n':
new_end_index += 1
end = start + new_end_index + 1
raw_data_string = unpacked_string[:end]
end_scan_regexp = re.compile('.*?\[fluentbit_end\]\n', re.DOTALL)
scan_end = end_scan_regexp.match(raw_data_string)
if not scan_end:
is_end_of_scan = False
else:
is_end_of_scan = True
end = start + len(scan_end.group()) + 1
raw_data_string = raw_data_string[:len(scan_end.group())]
line_list = raw_data_string.splitlines()
timestamp_json_regexp = re.compile(r'.*?:(.*?)\[(.*)\]')
json_string_list = [timestamp_json_regexp.match(line).group(2)
for line in line_list
if (timestamp_json_regexp.match(line) and (len(timestamp_json_regexp.match(line).groups()) == 2))]
return end, json_string_list, is_end_of_scan
def get_triplet_list(json_string_list, is_end_of_scan):
"""
Parse unpacked and return a triplet list: (path, slice1, slice2).
path is the path of the processed file, slice1 and slice2 are two parts
of the md5 digest of the processed file. They are stored in "big endian"
format, i.e. slice1 is the "bigger part".
NOTE: timestamps are parsed in case they are needed for future improvement
but they are not used at the moment.
"""
if is_end_of_scan:
# this lign deletes the "fluentbit_end" at the end of a scan
# because it is not valid json
json_string_list = json_string_list[:-1]
tmp_data_list = []
fail_counter = 0
for json_string in json_string_list:
tmp_data_list.append(json.loads(json_string))
data_list = []
for data in tmp_data_list:
in_list = False
if ('path' in data) and exclude_file_list:
for exclude_file in exclude_file_list:
if os.path.commonprefix([data['path'], exclude_file]) == exclude_file:
in_list = True
break
if not in_list:
data_list.append(data)
return [(data['path'],
int(data['hash']['md5'][0:8], 16),
int(data['hash']['md5'][8:16], 16))
for data in data_list
if 'path' in data and 'hash' in data and 'md5' in data['hash']]
def get_uid_list(triplet_list, data_stream):
"""
Fill the mappings and get the list of UIDs.
The argument @data_stream is only used to access the mappings.
"""
uid_list = []
for triplet in triplet_list:
data_stream.add_path(triplet)
triplet_uid = data_stream.get_uid_from_path(triplet)
uid_list += [triplet_uid]
return uid_list
def create_ndarray(uid_list):
"""
Takes a UIDs list and returns a UIDs ndarray.
This function exists so that the stages of the data processing are clear and
so that if further transformations on the data are needed, one can simply
add them here without reorganizing the code.
"""
uid_list.append(-1) # used as a delimiter between the scans
return np.ndarray((len(uid_list),), 'int64', np.array(uid_list))
progress_indicator = in_stream["Progress Indicator"]
in_data_stream = in_stream["Data Stream"]
out_data_array = out_array["Data Array"]
if out_data_array.getSimulationState() == 'converted':
return
if out_data_array.getSimulationState() != 'converting':
out_data_array.transformFile()
exclude_file_list = []
if not out_data_array.getCausality():
ingestion_line = in_data_stream.getAggregateRelatedValue(portal_type='Data Ingestion Line')
resource_value = ingestion_line.getResourceValue()
exclude_file_list = ingestion_line.getResourceValue().DataProduct_getExcludeFileList()
out_data_array.edit(causality_value=resource_value)
in_data_stream.setPublicationSectionList(resource_value.getPublicationSectionList())
out_data_array.setPublicationSectionList(resource_value.getPublicationSectionList())
start = progress_indicator.getIntOffsetIndex()
end = in_data_stream.getSize()
if start >= end:
out_data_array.convertFile()
return
end, json_string_list, is_end_of_scan = get_end_and_json_list(start, in_data_stream)
triplet_list = get_triplet_list(json_string_list, is_end_of_scan)
uid_list = get_uid_list(triplet_list, in_data_stream)
uid_ndarray = create_ndarray(uid_list)
if start == 0:
zbigarray = None
else:
zbigarray = out_data_array.getArray()
if zbigarray is None:
zbigarray = out_data_array.initArray(shape=(0,), dtype='int64')
if len(uid_ndarray) > 0:
zbigarray.append(uid_ndarray)
if end > start:
progress_indicator.setIntOffsetIndex(end)
if is_end_of_scan:
out_data_array.convertFile()
return
# tell caller to create new activity after processing
# if we did not reach end of stream
if end < in_data_stream.getSize():
return 1
out_data_array.convertFile()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Python Script" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_local_properties</string> </key>
<value>
<tuple>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>reference</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
</tuple>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>in_stream=None, out_array=None</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Metadata_convertDataStreamToArray</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Python Script</string> </value>
</item>
<item>
<key> <string>reference</string> </key>
<value> <string>DataAnalysisLine_convertEnvironmentDataStreamToArray</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>edit_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.Workflow"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_log</string> </key>
<value>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>edit</string> </value>
</item>
<item>
<key> <string>actor</string> </key>
<value> <string>zope</string> </value>
</item>
<item>
<key> <string>comment</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>error_message</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>997.59130.8151.19302</string> </value>
</item>
<item>
<key> <string>state</string> </key>
<value> <string>current</string> </value>
</item>
<item>
<key> <string>time</string> </key>
<value>
<object>
<klass>
<global name="DateTime" module="DateTime.DateTime"/>
</klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>1643806809.71</float>
<string>UTC</string>
</tuple>
</state>
</object>
</value>
</item>
</dictionary>
</list>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Base Type" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>content_icon</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string>Represents a very big infinite file with a streaming API.\n
Usually used to store raw data.</string> </value>
</item>
<item>
<key> <string>factory</string> </key>
<value> <string>addXMLObject</string> </value>
</item>
<item>
<key> <string>group_list</string> </key>
<value>
<tuple>
<string>item</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Data Stream</string> </value>
</item>
<item>
<key> <string>init_script</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>permission</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Base Type</string> </value>
</item>
<item>
<key> <string>type_class</string> </key>
<value> <string>DataStream</string> </value>
</item>
<item>
<key> <string>type_interface</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>type_mixin</string> </key>
<value>
<tuple>
<string>PathMappingMixin</string>
</tuple>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
return [
u'/dracut-state.sh',
u'/tmp/dhclient',
u'/tmp/net',
u'/etc/machine-id',
u'/etc/flb.conf'
]
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataProduct_getCommentExcludeFileList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
return context.getProperty('exclude_file_list', context.DataProduct_getCommentExcludeFileList())
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>**kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataProduct_getExcludeFileList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ERP5 Form" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_objects</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>action</string> </key>
<value> <string>Base_edit</string> </value>
</item>
<item>
<key> <string>action_title</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>edit_order</string> </key>
<value>
<list/>
</value>
</item>
<item>
<key> <string>encoding</string> </key>
<value> <string>UTF-8</string> </value>
</item>
<item>
<key> <string>enctype</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>group_list</string> </key>
<value>
<list>
<string>left</string>
<string>right</string>
<string>center</string>
<string>bottom</string>
<string>hidden</string>
</list>
</value>
</item>
<item>
<key> <string>groups</string> </key>
<value>
<dictionary>
<item>
<key> <string>bottom</string> </key>
<value>
<list/>
</value>
</item>
<item>
<key> <string>center</string> </key>
<value>
<list/>
</value>
</item>
<item>
<key> <string>hidden</string> </key>
<value>
<list/>
</value>
</item>
<item>
<key> <string>left</string> </key>
<value>
<list>
<string>my_exclude_file_list</string>
</list>
</value>
</item>
<item>
<key> <string>right</string> </key>
<value>
<list/>
</value>
</item>
</dictionary>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataProduct_viewExcludeFile</string> </value>
</item>
<item>
<key> <string>method</string> </key>
<value> <string>POST</string> </value>
</item>
<item>
<key> <string>name</string> </key>
<value> <string>DataProduct_viewExcludeFile</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>ERP5 Form</string> </value>
</item>
<item>
<key> <string>pt</string> </key>
<value> <string>form_view</string> </value>
</item>
<item>
<key> <string>row_length</string> </key>
<value> <int>4</int> </value>
</item>
<item>
<key> <string>stored_encoding</string> </key>
<value> <string>UTF-8</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>unicode_mode</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>update_action</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>update_action_title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ProxyField" module="Products.ERP5Form.ProxyField"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>delegated_list</string> </key>
<value>
<list>
<string>default</string>
<string>title</string>
</list>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>my_exclude_file_list</string> </value>
</item>
<item>
<key> <string>message_values</string> </key>
<value>
<dictionary>
<item>
<key> <string>external_validator_failed</string> </key>
<value> <string>The input failed the external validator.</string> </value>
</item>
</dictionary>
</value>
</item>
<item>
<key> <string>overrides</string> </key>
<value>
<dictionary>
<item>
<key> <string>field_id</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>form_id</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>target</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</value>
</item>
<item>
<key> <string>tales</string> </key>
<value>
<dictionary>
<item>
<key> <string>default</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>field_id</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>form_id</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>target</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</value>
</item>
<item>
<key> <string>values</string> </key>
<value>
<dictionary>
<item>
<key> <string>default</string> </key>
<value>
<list/>
</value>
</item>
<item>
<key> <string>field_id</string> </key>
<value> <string>my_lines_field</string> </value>
</item>
<item>
<key> <string>form_id</string> </key>
<value> <string>Base_viewFieldLibrary</string> </value>
</item>
<item>
<key> <string>target</string> </key>
<value> <string>Click to edit the target</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>Exclude File List</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="TALESMethod" module="Products.Formulator.TALESField"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_text</string> </key>
<value> <string>python: here.DataProduct_getExcludeFileList()</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -3,6 +3,7 @@ data_product_module/convert_data_stream_to_data_array
organisation_module/meta_destination
organisation_module/server_node
portal_callables/MetadataIngestionPolicy_parseTag
portal_callables/Metadata_convertDataStreamToArray
portal_categories/publication_section/file_system_image
portal_categories/publication_section/file_system_image/**
portal_ingestion_policies/metadata_upload
slapos_abyss
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment