Commit 3e76c012 authored by Ivan Tyagov's avatar Ivan Tyagov

Add a generic implementation of a script able to iterate effectively over a...

Add a generic implementation of a script able to iterate effectively over a Data Stream and do transformation on data itself.
parent fe554209
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2015 Nexedi SA and Contributors. All Rights Reserved.
# Ivan Tyagov <ivan@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet
from Products.ERP5.Document.BigFile import BigFile
class DataStream(BigFile):
"""
Represents a very big infinite file with a streaming API.
Usually used to store raw data.
"""
meta_type = 'ERP5 Data Stream'
portal_type = 'Data Stream'
add_permission = Permissions.AddPortalContent
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
# Declarative properties
property_sheets = ( PropertySheet.CategoryCore
, PropertySheet.SortIndex
)
def readChunkList(self, start_offset, end_offset):
"""
Read chunks of data from a Data Stream and return them.
"""
chunk_list = []
data = self._baseGetData()
for chunk in data.iterate(start_offset, end_offset - start_offset):
chunk_list.append(chunk)
return chunk_list
\ No newline at end of file
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Document Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>DataStream</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>document.erp5.DataStream</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Document Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
...@@ -5,6 +5,9 @@ ...@@ -5,6 +5,9 @@
<portal_type id="Data Product"> <portal_type id="Data Product">
<item>DefaultImage</item> <item>DefaultImage</item>
</portal_type> </portal_type>
<portal_type id="Data Stream">
<item>DataStream</item>
</portal_type>
<portal_type id="Ingestion Policy"> <portal_type id="Ingestion Policy">
<item>IngestionPolicy</item> <item>IngestionPolicy</item>
</portal_type> </portal_type>
......
...@@ -51,7 +51,7 @@ Usually used to store raw data.</string> </value> ...@@ -51,7 +51,7 @@ Usually used to store raw data.</string> </value>
</item> </item>
<item> <item>
<key> <string>type_class</string> </key> <key> <string>type_class</string> </key>
<value> <string>BigFile</string> </value> <value> <string>DataStream</string> </value>
</item> </item>
<item> <item>
<key> <string>type_interface</string> </key> <key> <string>type_interface</string> </key>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Property Sheet" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_count</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>_mt_index</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>_tree</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataStream</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Property Sheet</string> </value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="Length" module="BTrees.Length"/>
</pickle>
<pickle> <int>0</int> </pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<none/>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<none/>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Standard Property" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_local_properties</string> </key>
<value>
<tuple>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>mode</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
</tuple>
</value>
</item>
<item>
<key> <string>categories</string> </key>
<value>
<tuple>
<string>elementary_type/int</string>
</tuple>
</value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string>Current offset in stream.\n
Usually used when processed by activities.</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>offset_property</string> </value>
</item>
<item>
<key> <string>mode</string> </key>
<value> <string>w</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Standard Property</string> </value>
</item>
<item>
<key> <string>property_default</string> </key>
<value> <string>python: 0</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_body</string> </key>
<value> <string encoding="cdata"><![CDATA[
"""\n
Get a chunks of data from a Data Stream, convert it to numpy array\n
and return proper start and end for next record.\n
\n
This script assumes stream has following format.\n
{dict1}{dict2}\n
{dict3}\n
\n
And it\'s possible that last chunk in its last line is incomplete dictionary \n
thus correction needed.\n
\n
"""\n
import json\n
\n
chunk_text = \'\'.join(chunk_list)\n
#context.log(\'%s %s %s\' %(start, end, len(chunk_text)))\n
\n
# remove last line as it might be uncomplete and correct start and end offsets\n
line_list = chunk_text.split(\'\\n\')\n
last_line = line_list[-1]\n
line_list.pop(-1)\n
\n
for line in line_list:\n
# must have proper format\n
assert line.endswith(\'}\')\n
assert line.startswith(\'{\')\n
\n
# fix \' -> "\n
line = line.replace("\'", \'"\')\n
\n
if line.count(\'{\') > 1:\n
# multiple concatenated dictionaries in one line, bad format ignore for now\n
pass \n
else:\n
d = json.loads(line)\n
# xxx: save this value as a numpy array\n
\n
# start and enf offsets may not match existing record structure in stream\n
# thus corrections in start and end offsets is needed thus we\n
# return transformed values which is just last line length\n
start -= len(last_line)\n
end -= len(last_line)\n
\n
return start, end\n
]]></string> </value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>chunk_list, start, end</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataStream_convertoNumpyArray</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_body</string> </key>
<value> <string encoding="cdata"><![CDATA[
"""\n
Simply a wrapper to real method.\n
"""\n
data_stream = context.restrictedTraverse(data_stream_relative_url)\n
data_stream_chunk_list = data_stream.readChunkList(start, end)\n
\n
# do call transformation script\n
if transform_script_id is not None:\n
transform_script = getattr(data_stream, transform_script_id, None)\n
if transform_script is not None:\n
start, end = transform_script(data_stream_chunk_list, start, end)\n
\n
# store current position offset in Data Stream\n
data_stream.setOffset(end)\n
\n
# start another read in another activity\n
start += chunk_length\n
end += chunk_length\n
total_stream_length = data_stream.getSize()\n
\n
if end > total_stream_length:\n
# no read beyond end of stream\n
end = total_stream_length\n
\n
if start < total_stream_length:\n
# some bytes left ...\n
data_stream.activate().DataStream_readChunkListAndTransform( \\\n
data_stream.getRelativeUrl(), \\\n
start, \\\n
end, \\\n
chunk_length, \\\n
transform_script_id)\n
]]></string> </value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>data_stream_relative_url, start, end, chunk_length, transform_script_id=None</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataStream_readChunkListAndTransform</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_body</string> </key>
<value> <string>""" \n
Read entire stream using activities and pass stream\'s data to handler script\n
who can transform it.\n
Parameters:\n
* transform_script_id - the script which will transform data\n
* chunk_length - the length of a chunk\n
"""\n
data_length = context.getSize()\n
\n
start = 0\n
end = chunk_length\n
context.activate().DataStream_readChunkListAndTransform( \\\n
context.getRelativeUrl(), \\\n
start, \\\n
end, \\\n
chunk_length, \\\n
transform_script_id)\n
\n
return data_length\n
</string> </value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>chunk_length=1048576, transform_script_id=None</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataStream_transform</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
document.erp5.IngestionPolicyTool document.erp5.IngestionPolicyTool
document.erp5.IngestionPolicy document.erp5.IngestionPolicy
document.erp5.DataArray document.erp5.DataArray
document.erp5.DataStream
\ No newline at end of file
Data Array | DataArray Data Array | DataArray
Data Product | DefaultImage Data Product | DefaultImage
Data Stream | DataStream
Ingestion Policy | IngestionPolicy Ingestion Policy | IngestionPolicy
\ No newline at end of file
IngestionPolicy IngestionPolicy
DataArray DataArray
DataStream
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment