Commit ccb10ebd authored by Roque Porchetto's avatar Roque Porchetto

New log for ingestion process and minor refactoring.

parent 6aedf14f
from Products.ERP5Type.Log import log
import base64
log("Data_chunk size: " + str(len(data_chunk)))
log("Data_chunk size: %s" % str(len(data_chunk)))
decoded = base64.b64decode(data_chunk)
log("Decoded data_chunk size: " + str(len(decoded)))
log("Decoded data_chunk size: %s" % str(len(decoded)))
log("FROM SCRIPT %s: appending to data stream: %s." % (script.getId(), data_stream))
log("Appending to data stream: %s." % data_stream)
data_stream.appendData(decoded)
log("FROM SCRIPT %s: ingested data successfully appended." % (script.getId()))
log("Ingested data successfully appended.")
context.logEntry("Datachunk (size %s) appended to Data Stream." % str(len(decoded)))
from Products.ERP5Type.Log import log
log("Processing raw data from Data Stream " + str(input_stream_data.getReference()) + " to Data Array " + str(output_array.getReference()))
context.logEntry("Processing raw data from Data Stream to Data Array for Data Ingestion %s" % str(output_array.getReference()))
result = str(context.processRawData(input_stream_data, output_array, output_descriptor))
log(result)
context.logEntry("Result: %s" % result)
log("Metadata stored in Data Descriptor " + str(output_descriptor))
from Products.ERP5Type.Log import log
context.logEntry("[NEW INGESTION]")
context.logEntry("Reference: %s" % reference)
record = reference.rsplit('.')
filename = record[2] if (len(record) >= 5) else reference
extension = record[3] if (len(record) >= 5) else "fif"
......@@ -15,6 +18,7 @@ dict = { 'filename': filename,
'resource_reference': 'fif'
}
log("From %s: returning dictionary: %s." % (script.getId(), dict))
log("Returning dictionary: %s." % dict)
context.logEntry("Parameter dictionary: %s" % dict)
return dict
......@@ -146,7 +146,7 @@
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>958.2575.23994.8004</string> </value>
<value> <string>960.42903.37994.56780</string> </value>
</item>
<item>
<key> <string>state</string> </key>
......@@ -164,7 +164,7 @@
</tuple>
<state>
<tuple>
<float>1489660169.81</float>
<float>1500296433.88</float>
<string>UTC</string>
</tuple>
</state>
......
from Products.ERP5Type.Log import log
log('Handle Analysis Alarm Fired!')
portal = context.getPortalObject()
portal.ERP5Site_stopIngestionList()
portal.ERP5Site_createDataAnalysisList()
......
......@@ -32,6 +32,7 @@ for line_data_ingestion in portal_catalog(**query_dict):
destination = data_ingestion.getDestination(),
destination_section = data_ingestion.getDestinationSection(),
destination_project = data_ingestion.getDestinationProject())
context.logEntry("Data Analyisis created for Data Ingestion %s (ID: %s)" % (str(data_ingestion.getReference()), data_analysis.getId()))
# create input and output lines
log("creating input and output lines")
......@@ -63,6 +64,7 @@ for line_data_ingestion in portal_catalog(**query_dict):
aggregate_set.update(related_line.getAggregateSet())
related_line.getParentValue().deliver()
log("DATA INGESTION DELIVERED")
context.logEntry("Data Ingestion '%s' delivered." % data_ingestion.getId())
else:
# it is an output line
# create new item based on item_type: data array, stream, descriptor, etc.
......@@ -76,6 +78,7 @@ for line_data_ingestion in portal_catalog(**query_dict):
if "Data Descriptor" not in item_type:
item.validate()
log("Creating " + str(item_type))
context.logEntry(str(item_type) + " created.")
aggregate_set = set()
aggregate_set.add(item)
......
......@@ -15,7 +15,8 @@ kw_dict = {"query": start_date_query,
parent_uid_list = [x.getUid() for x in portal_catalog(**kw_dict)]
if len(parent_uid_list) != 0:
log("Stoping %s ingestions..." %(str(len(parent_uid_list))))
log("Stopping %s ingestions..." %(str(len(parent_uid_list))))
context.logEntry("[Called by alarm] Stopping %s ingestions" %(str(len(parent_uid_list))))
kw_dict = {"portal_type": "Data Ingestion Line",
"resource_portal_type": "Data Product",
"parent_uid": parent_uid_list}
......@@ -24,44 +25,6 @@ if len(parent_uid_list) != 0:
for data_ingestion_line in portal_catalog(**kw_dict):
data_ingestion = data_ingestion_line.getParentValue()
# make sure that each movement has aggregate batch
# this relies on reference. It cannot be done on
# ingestion because the possibility of parallel ingestion
#
# should be probably done by consistency constraint
# probably it can be done in generic way using required item type
# on resource.
"""
if data_ingestion_line.DataIngestionLine_needRequiredItem():
raise ValueError("DataIngestionLine_needRequiredItem: " + "TRUE")
batch = data_ingestion_line.getAggregateDataIngestionBatchValue()
if data_ingestion_line.DataIngestionLine_hasMissingRequiredItem():
raise ValueError("DataIngestionLine_hasMissingRequiredItem: " + "TRUE")
# make sure that each movement has aggregate batch
# this relies on reference. It cannot be done on
# ingestion because the possibility of parallel ingestion
# TODO: make it generic, use Constraint
if batch is None:
raise ValueError("batch is None: " + "TRUE")
reference_tuple = data_ingestion.getReference().split('.')
data_ingestion_batch_reference = '.'.join(reference_tuple[:-1])
batch = portal_catalog.getResultValue(
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
if batch is not None:
data_ingestion_line.setDefaultAggregateValue(batch)
else:
# we need to wait for existing batch before we can stop the data ingestion
continue
# -> only stop when stop date is older than current date
# -> set top date using information from data operation script
if len(batch.getAggregateRelatedList()) < 2:
# we need to wait until there are 2 batches until we can stop it
# TODO: this should be implemented in transformation, not here
continue
"""
resource = data_ingestion_line.getResourceValue()
log("data_ingestion: " + str(data_ingestion.getReference()))
log("resource: " + str(resource.getReference()))
......@@ -69,5 +32,6 @@ if len(parent_uid_list) != 0:
data_ingestion.setStopDate(DateTime())
data_ingestion.stop()
log("DATA INGESTION STOPPED")
context.logEntry("Data Ingestion '%s' stopped." % data_ingestion.getId())
else:
raise ValueError("resource.getUseList must be 'big_data/ingestion/batch_ingestion'")
......@@ -9,6 +9,7 @@ portal_catalog = portal.portal_catalog
# removing eof from reference
reference = '_'.join(reference.split('.')[:-1])
log("Reference: " + reference)
context.logEntry("Data Ingestion reference: %s" % reference)
data_ingestion_reference = reference
data_ingestion_id = '%s_%s' %(data_ingestion_reference, today_string)
......@@ -17,21 +18,19 @@ resource_reference = movement_dict.get('resource_reference', None)
specialise_reference = movement_dict.get('specialise_reference', None)
dataset_reference = movement_dict.get('aggregate_data_set_reference', None)
# first search for applicable data ingestion
# search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
simulation_state = 'planned',
reference = data_ingestion_reference)
log("Planned Data Ingestion found for reference: " + str(data_ingestion))
if data_ingestion is None:
document = portal.data_ingestion_module.get(data_ingestion_id)
log("Data Ingestion by id-date found: " + str(document))
if (document is not None):
if document.getSimulationState() == 'planned':
data_ingestion = document
else:
context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference)
raise ValueError("An older ingestion for reference %s was already done." % data_ingestion_reference)
if data_ingestion is None:
......@@ -40,10 +39,9 @@ if data_ingestion is None:
reference = 'embulk',
validation_state = 'validated')]
log("Data Suppliers list: " + str(specialise_value_list))
# create a new data ingestion
log("Creating a new Data Ingestion")
log("Creating new Data Ingestion. ID: %s" % data_ingestion_id)
context.logEntry("Data Ingestion created. ID: %s" % data_ingestion_id)
data_ingestion = portal.data_ingestion_module.newContent(
id = data_ingestion_id,
portal_type = "Data Ingestion",
......@@ -87,28 +85,25 @@ if data_ingestion is None:
current_line.setQuantity(0)
# copy device and configuration from operation line to input line
#log("copy device and configuration from operation line to input line")
input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList())
#log("input_line.getAggregateList(): " + str(input_line.getAggregateList()))
#log("operation_line.getAggregateList(): " + str(operation_line.getAggregateList()))
data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream",
reference = data_ingestion_reference)
data_stream.validate()
log("Creating a new Data Stream")
log("Creating new Data Stream. ID: %s" % data_stream.getId())
context.logEntry("Data Stream created. ID: %s" % data_stream.getId())
input_line.setDefaultAggregateValue(data_stream)
data_ingestion.plan()
log("DATA INGESTION PLANNED.")
if dataset_reference is not None:
data_set = portal_catalog.getResultValue(
portal_type = "Data Set",
reference = dataset_reference)
if data_set is None:
log("Creating a new Data Set")
log("Creating new Data Set")
context.logEntry("Data Set created.")
data_set = portal.data_set_module.newContent(
portal_type = "Data Set",
title = "Data set " + dataset_reference,
......@@ -121,7 +116,13 @@ if data_ingestion is None:
log("Data Set found for dataset reference: " + data_set.getReference())
input_line.setDefaultAggregateValue(data_set)
data_ingestion.plan()
log("Data Ingestion planned.")
context.logEntry("Data Ingestion planned.")
else:
log("Planned Data Ingestion found for reference: " + str(data_ingestion))
context.logEntry("Planned Data Ingestion found for reference: " + str(data_ingestion))
# find ingestion line for current resource
for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"):
if line.getResourceReference() == resource_reference:
......@@ -134,6 +135,7 @@ data_stream = input_line.getAggregateDataStreamValue()
if eof == "EOF":
data_ingestion.start()
log("DATA INGESTION STARTED")
log("Data Ingestion started.")
context.logEntry("Data Ingestion started.")
return data_operation, {'data_stream': data_stream}
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
#data_set = portal_catalog.getResultValue(
# portal_type = "Data Set",
# reference = data_set_reference)
#if data_set is None:
# return []
################################ workaround to force security access ################################
check = context.restrictedTraverse("data_set_module/")
......@@ -34,7 +28,6 @@ data_stream_list = []
for line in portal_catalog(**query_dict):
if belongsToDatasets(line):
#if line.getAggregateDeviceConfiguration() == "data_set_module/" + str(data_set.getId()):
for item in line.getAggregateSet():
if "data_stream_module/" in item:
data_stream = context.restrictedTraverse(item)
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>logEntry</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>ingestion_log</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>logEntry</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment