Commit 14897249 authored by Ivan Tyagov's avatar Ivan Tyagov

Add needed Data Operation used in testWendelin and remove entire...

See merge request nexedi/wendelin!59
parents 1dbcd53e ebe7cd72
...@@ -169,7 +169,9 @@ ...@@ -169,7 +169,9 @@
</item> </item>
<item> <item>
<key> <string>description</string> </key> <key> <string>description</string> </key>
<value> <string>This is the standard data operation used for ingestion. It just appends everything to a data stream.</string> </value> <value> <string>This data operation can be used to ingest data with fluentd. It assumes data comes in msgpack format.\n
It just appends everything to a data stream without any conversion whatsoever.\n
</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
...@@ -579,6 +581,96 @@ ...@@ -579,6 +581,96 @@
</value> </value>
</item> </item>
</dictionary> </dictionary>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>edit</string> </value>
</item>
<item>
<key> <string>actor</string> </key>
<value> <string>zope</string> </value>
</item>
<item>
<key> <string>comment</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>error_message</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>985.34625.23629.44987</string> </value>
</item>
<item>
<key> <string>state</string> </key>
<value> <string>current</string> </value>
</item>
<item>
<key> <string>time</string> </key>
<value>
<object>
<klass> <reference id="3.1"/> </klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>1595851194.16</float>
<string>UTC</string>
</tuple>
</state>
</object>
</value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>edit</string> </value>
</item>
<item>
<key> <string>actor</string> </key>
<value> <string>zope</string> </value>
</item>
<item>
<key> <string>comment</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>error_message</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>985.34639.59167.4556</string> </value>
</item>
<item>
<key> <string>state</string> </key>
<value> <string>current</string> </value>
</item>
<item>
<key> <string>time</string> </key>
<value>
<object>
<klass> <reference id="3.1"/> </klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>1595930396.08</float>
<string>UTC</string>
</tuple>
</state>
</object>
</value>
</item>
</dictionary>
</list> </list>
</value> </value>
</item> </item>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Data Operation" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_Access_contents_information_Permission</string> </key>
<value>
<tuple>
<string>Assignee</string>
<string>Assignor</string>
<string>Associate</string>
<string>Auditor</string>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>_Add_portal_content_Permission</string> </key>
<value>
<tuple>
<string>Assignee</string>
<string>Assignor</string>
<string>Associate</string>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>_Modify_portal_content_Permission</string> </key>
<value>
<tuple>
<string>Assignee</string>
<string>Assignor</string>
<string>Associate</string>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>_View_Permission</string> </key>
<value>
<tuple>
<string>Assignee</string>
<string>Assignor</string>
<string>Associate</string>
<string>Auditor</string>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>_local_properties</string> </key>
<value>
<tuple>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>reference</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>version</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>data_operation_script_id</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>use_list</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>lines</string> </value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>quantity_unit_list</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>lines</string> </value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>aggregated_portal_type_list</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>lines</string> </value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>base_contribution_list</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>lines</string> </value>
</item>
</dictionary>
</tuple>
</value>
</item>
<item>
<key> <string>aggregated_portal_type</string> </key>
<value>
<tuple>
<string>Data Acquisition Unit</string>
</tuple>
</value>
</item>
<item>
<key> <string>aggregated_portal_type_list</string> </key>
<value>
<tuple>
<string>Data Stream</string>
</tuple>
</value>
</item>
<item>
<key> <string>base_contribution_list</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>categories</string> </key>
<value>
<tuple>
<string>quantity_unit/unit/piece</string>
</tuple>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>ingest-fluent-data</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string>This data operation can be used to ingest data with fluentd. It assumes data comes in msgpack format.\n
It will first unpack the msgpack, then remove the timestamp, and convert the data part (usually a dictionary)\n
to string and append it to Data Stream.\n
\n
Note that what is saved to Data Stream might be different from what fluentd was reading initially,\n
depending on fluentd plugin configuration. For example fluentd might convert json to msgpack then\n
what is saved in Data Stream might be a string representation of a python dictionary and not json.</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>wendelin_ingest_data_conversion</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Data Operation</string> </value>
</item>
<item>
<key> <string>quantity_unit_list</string> </key>
<value>
<tuple>
<string>information/byte</string>
</tuple>
</value>
</item>
<item>
<key> <string>reference</string> </key>
<value> <string>FOURIER-MAX</string> </value>
</item>
<item>
<key> <string>script_id</string> </key>
<value> <string>DataIngestionLine_writeFluentdIngestionToDataStream</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>Ingest Fluentd Data</string> </value>
</item>
<item>
<key> <string>use_list</string> </key>
<value>
<tuple>
<string>big_data/analysis</string>
</tuple>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>001</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
""" """
This script is used during fluentd ingestion. This script is used during fluentd ingestion.
It will write data sent from fluentd by unpacking it first and then appending It assumes data comes in msgpack encoded in the following format: mspack(timestamp, data).
as a string to respective "Data Stream". It will first unpack the msgpack, then remove the first item of the tuple (timestamp) and
append str(data) to "Data Stream".
Note that what is saved to Data Stream might be different from what fluentd was reading
initially, depending on fluentd plugin configuration. For example fluentd might convert
json to msgpack, then what is saved in Data Stream might be str(python_dict) and not json.
""" """
out_stream["Data Stream"].appendData(''.join([str(c[1]) for c in context.unpack(data_chunk)])) out_stream["Data Stream"].appendData(''.join([str(c[1]) for c in context.unpack(data_chunk)]))
""" """
This script is used during fluentd ingestion. This script is a general ingestion script which can be used with fluentd or with other http based ingestion tools.
It will append data sent from fluentd to Wendelin 'as it is' to respective "Data Stream". It will append data sent to Wendelin 'as it is' to respective "Data Stream".
By default data will be encoded in MsgPack format.
Note that by default fluentd data is encoded in msgpack format, this script will not unpack it.
""" """
out_stream["Data Stream"].appendData(data_chunk) out_stream["Data Stream"].appendData(data_chunk)
...@@ -13,11 +13,10 @@ ingestion_policy = context.newContent( \ ...@@ -13,11 +13,10 @@ ingestion_policy = context.newContent( \
script_id = 'IngestionPolicy_parseSimpleFluentdTag') script_id = 'IngestionPolicy_parseSimpleFluentdTag')
ingestion_policy.validate() ingestion_policy.validate()
use_category = context.restrictedTraverse("portal_categories/use/big_data/ingestion") use_category = context.restrictedTraverse("portal_categories/use/big_data/ingestion")
quantity_category = context.restrictedTraverse("portal_categories/quantity_unit/unit/piece")
# XXX: hard-coded dependency to object from erp5_wendelin_data, remove! # use by default a Data Operation which will convert data sent from fleuntd
data_operation = context.restrictedTraverse("data_operation_module/wendelin_1") data_operation = context.restrictedTraverse("data_operation_module/wendelin_ingest_data_conversion")
# create Data Product # create Data Product
data_product = context.data_product_module.newContent( data_product = context.data_product_module.newContent(
......
data_operation_module/wendelin_ingest_data data_operation_module/wendelin_ingest_data
data_operation_module/wendelin_ingest_data_conversion
data_product_module/default_http_json data_product_module/default_http_json
data_product_module/default_http_json/** data_product_module/default_http_json/**
data_supply_module/default_http_json data_supply_module/default_http_json
......
erp5_full_text_mroonga_catalog erp5_full_text_mroonga_catalog
erp5_wendelin_data
erp5_wendelin_examples erp5_wendelin_examples
\ No newline at end of file
...@@ -28,7 +28,6 @@ bt5_installation_list = ('erp5_full_text_mroonga_catalog', ...@@ -28,7 +28,6 @@ bt5_installation_list = ('erp5_full_text_mroonga_catalog',
'erp5_web_renderjs_ui', 'erp5_web_renderjs_ui',
'erp5_wendelin', 'erp5_wendelin',
'erp5_wendelin_examples', 'erp5_wendelin_examples',
'erp5_wendelin_data',
'erp5_wendelin_development', 'erp5_wendelin_development',
'erp5_notebook' 'erp5_notebook'
) )
......
[OBSOLETE - please do not install, kept for historical reasons only!]
Sample Wendelin data model. Sample Wendelin data model.
\ No newline at end of file
erp5_wendelin_data_lake_ingestion
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment