Commit 75f5f2ba authored by Ivan Tyagov's avatar Ivan Tyagov

Pass data stream start end offset only. Adjust test.

parent 503c9033
...@@ -50,7 +50,9 @@ ...@@ -50,7 +50,9 @@
</item> </item>
<item> <item>
<key> <string>_body</string> </key> <key> <string>_body</string> </key>
<value> <string>"""\n <value> <string encoding="cdata"><![CDATA[
"""\n
Read tail of a Data Stream and aplly needed transformations.\n Read tail of a Data Stream and aplly needed transformations.\n
This script is called every time we appendData to a Stream\n This script is called every time we appendData to a Stream\n
using data_stream_interaction_workflow.\n using data_stream_interaction_workflow.\n
...@@ -58,15 +60,15 @@ ...@@ -58,15 +60,15 @@
The idea is to provide close to real time data transformations.\n The idea is to provide close to real time data transformations.\n
As transformation is quite specific we leave this script empty so developers\n As transformation is quite specific we leave this script empty so developers\n
can hook in and add needed transformations.\n can hook in and add needed transformations.\n
\n
Interesting here is to use OffsetIndex property sheet which allows to place\n
marker till where a Data Stream has been processed.\n
"""\n """\n
</string> </value> assert start_offset < end_offset\n
]]></string> </value>
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string>*argument_list</string> </value> <value> <string>start_offset, end_offset</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
......
...@@ -150,7 +150,7 @@ class Test(ERP5TypeTestCase): ...@@ -150,7 +150,7 @@ class Test(ERP5TypeTestCase):
# override DataStream_transformTail to actually do transformation on appenData # override DataStream_transformTail to actually do transformation on appenData
start = data_stream.getSize() start = data_stream.getSize()
script_id = 'DataStream_transformTail' script_id = 'DataStream_transformTail'
script_content_list = ["*argument_list", """ script_content_list = ["start_offset, end_offset", """
# created by testWendelin.test_01_1_IngestionTail # created by testWendelin.test_01_1_IngestionTail
start = %s start = %s
end = %s end = %s
......
...@@ -57,11 +57,11 @@ data_stream = state_change[\'object\']\n ...@@ -57,11 +57,11 @@ data_stream = state_change[\'object\']\n
argument_list = state_change[\'kwargs\'][\'workflow_method_args\']\n argument_list = state_change[\'kwargs\'][\'workflow_method_args\']\n
\n \n
# call you own script to handle newly appended data which \n # call you own script to handle newly appended data which \n
# is not processed yet\n # is not processed yet, pass data stream start end offset only\n
# Note: this will serialize argument_list to activity\'s mysql table\n end_offset = data_stream.getSize()\n
# in case of big packets this can be slow, in =case of small appends it can be \n packet_size = len(argument_list[0])\n
# acceptable.\n start_offset = end_offset - packet_size\n
data_stream.activate().DataStream_transformTail(*argument_list)\n data_stream.activate().DataStream_transformTail(start_offset, end_offset)\n
</string> </value> </string> </value>
</item> </item>
<item> <item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment