Commit dd6bfa1d authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: refactoring in data ingestion reference

parent 89478a9c
......@@ -6,136 +6,137 @@ today_string = now.strftime('%Y%m%d')
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
# removing eof from reference
reference = '_'.join(reference.split('.')[:-1])
log("Reference: " + reference)
context.logEntry("Data Ingestion reference: %s" % reference)
data_ingestion_reference = reference
data_ingestion_id = '%s_%s' %(data_ingestion_reference, today_string)
eof = movement_dict.get('eof', 'EOF')
resource_reference = movement_dict.get('resource_reference', None)
specialise_reference = movement_dict.get('specialise_reference', None)
dataset_reference = movement_dict.get('aggregate_data_set_reference', None)
# search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
simulation_state = 'planned',
reference = data_ingestion_reference)
if data_ingestion is None:
document = portal.data_ingestion_module.get(data_ingestion_id)
if (document is not None):
if document.getSimulationState() == 'planned':
data_ingestion = document
else:
try:
# remove supplier and eof from reference
reference = '_'.join(reference.split('.')[1:-1]) #'_'.join(reference.split('.')[:-1])
log("Reference: " + reference)
context.logEntry("Data Ingestion reference: %s" % reference)
data_ingestion_reference = reference
eof = movement_dict.get('eof', 'EOF')
resource_reference = movement_dict.get('resource_reference', None)
specialise_reference = movement_dict.get('specialise_reference', None)
dataset_reference = movement_dict.get('aggregate_data_set_reference', None)
data_ingestion_id = '%s_%s_%s' %(specialise_reference, data_ingestion_reference, today_string)
context.logEntry("Data Ingestion ID: %s" % reference)
# search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
reference = data_ingestion_reference)
if data_ingestion is not None:
if data_ingestion.getSimulationState() != 'planned':
context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference)
raise ValueError("An older ingestion for reference %s was already done." % data_ingestion_reference)
if data_ingestion is None:
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
portal_type = 'Data Supply',
reference = 'embulk',
validation_state = 'validated')]
# create a new data ingestion
log("Creating new Data Ingestion. ID: %s" % data_ingestion_id)
context.logEntry("Data Ingestion created. ID: %s" % data_ingestion_id)
data_ingestion = portal.data_ingestion_module.newContent(
id = data_ingestion_id,
portal_type = "Data Ingestion",
title = movement_dict.get('filename', data_ingestion_reference),
reference = data_ingestion_reference,
start_date = now,
specialise_value_list = specialise_value_list)
property_list = ["title",
"source",
"source_section",
"source_project",
"destination",
"destination_section",
"destination_project",
"specialise"]
composed = data_ingestion.asComposedDocument()
data_ingestion.edit(**{p: composed.getProperty(p) for p in property_list})
# create ingestion lines from specialise lines and assign input line
# and operation line
input_line = None
operation_line = None
for supply_line in composed.objectValues(portal_type = 'Data Supply Line'):
current_line = data_ingestion.newContent(
portal_type = "Data Ingestion Line",
title = supply_line.getTitle(),
aggregate = supply_line.getAggregateList(),
int_index = supply_line.getIntIndex(),
quantity = supply_line.getQuantity(),
reference = supply_line.getReference(),
resource = supply_line.getResource(),
)
if current_line.getResourceReference() == resource_reference:
input_line = current_line
elif current_line.getResourceValue().getPortalType() == "Data Operation":
operation_line = current_line
else:
# we set quantity=0 for the empty line
current_line.setQuantity(0)
# copy device and configuration from operation line to input line
input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList())
data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream",
reference = data_ingestion_reference)
data_stream.validate()
log("Creating new Data Stream. ID: %s" % data_stream.getId())
context.logEntry("Data Stream created. ID: %s" % data_stream.getId())
input_line.setDefaultAggregateValue(data_stream)
if dataset_reference is not None:
data_set = portal_catalog.getResultValue(
portal_type = "Data Set",
reference = dataset_reference)
if data_set is None:
log("Creating new Data Set")
context.logEntry("Data Set created.")
data_set = portal.data_set_module.newContent(
portal_type = "Data Set",
title = "Data set " + dataset_reference,
reference = dataset_reference,
description = "Default description.",
version = "001"
if data_ingestion is None:
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
portal_type = 'Data Supply',
reference = 'embulk',
validation_state = 'validated')]
# create a new data ingestion
log("Creating new Data Ingestion. ID: %s" % data_ingestion_id)
context.logEntry("Data Ingestion created. ID: %s" % data_ingestion_id)
data_ingestion = portal.data_ingestion_module.newContent(
id = data_ingestion_id,
portal_type = "Data Ingestion",
title = movement_dict.get('filename', data_ingestion_reference),
reference = data_ingestion_reference,
start_date = now,
specialise_value_list = specialise_value_list)
property_list = ["title",
"source",
"source_section",
"source_project",
"destination",
"destination_section",
"destination_project",
"specialise"]
composed = data_ingestion.asComposedDocument()
data_ingestion.edit(**{p: composed.getProperty(p) for p in property_list})
# create ingestion lines from specialise lines and assign input line
# and operation line
input_line = None
operation_line = None
for supply_line in composed.objectValues(portal_type = 'Data Supply Line'):
current_line = data_ingestion.newContent(
portal_type = "Data Ingestion Line",
title = supply_line.getTitle(),
aggregate = supply_line.getAggregateList(),
int_index = supply_line.getIntIndex(),
quantity = supply_line.getQuantity(),
reference = supply_line.getReference(),
resource = supply_line.getResource(),
)
data_set.validate()
else:
log("Data Set found for dataset reference: " + data_set.getReference())
input_line.setDefaultAggregateValue(data_set)
data_ingestion.plan()
log("Data Ingestion planned.")
context.logEntry("Data Ingestion planned.")
else:
log("Planned Data Ingestion found for reference: " + str(data_ingestion))
context.logEntry("Planned Data Ingestion found for reference: " + str(data_ingestion))
# find ingestion line for current resource
for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"):
if line.getResourceReference() == resource_reference:
input_line = line
elif line.getResourceValue().getPortalType() == "Data Operation":
operation_line = line
data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue()
if eof == "EOF":
data_ingestion.start()
log("Data Ingestion started.")
context.logEntry("Data Ingestion started.")
return data_operation, {'data_stream': data_stream}
if current_line.getResourceReference() == resource_reference:
input_line = current_line
elif current_line.getResourceValue().getPortalType() == "Data Operation":
operation_line = current_line
else:
# we set quantity=0 for the empty line
current_line.setQuantity(0)
# copy device and configuration from operation line to input line
input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList())
data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream",
reference = data_ingestion_reference)
data_stream.validate()
log("Creating new Data Stream. ID: %s" % data_stream.getId())
context.logEntry("Data Stream created. ID: %s" % data_stream.getId())
input_line.setDefaultAggregateValue(data_stream)
if dataset_reference is not None:
data_set = portal_catalog.getResultValue(
portal_type = "Data Set",
reference = dataset_reference)
if data_set is None:
log("Creating new Data Set")
context.logEntry("Data Set created.")
data_set = portal.data_set_module.newContent(
portal_type = "Data Set",
title = "Data set " + dataset_reference,
reference = dataset_reference,
description = "Default description.",
version = "001"
)
data_set.validate()
else:
log("Data Set found for dataset reference: " + data_set.getReference())
input_line.setDefaultAggregateValue(data_set)
data_ingestion.plan()
log("Data Ingestion planned.")
context.logEntry("Data Ingestion planned.")
else:
log("Planned Data Ingestion found for reference: " + str(data_ingestion))
context.logEntry("Planned Data Ingestion found for reference: " + str(data_ingestion))
# find ingestion line for current resource
for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"):
if line.getResourceReference() == resource_reference:
input_line = line
elif line.getResourceValue().getPortalType() == "Data Operation":
operation_line = line
data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue()
if eof == "EOF":
data_ingestion.start()
log("Data Ingestion started.")
context.logEntry("Data Ingestion started.")
return data_operation, {'data_stream': data_stream}
except Exception as e:
context.logEntry("[ERROR] At script IngestionPolicy_getIngestionOperationAndParameterDict \n[ERROR] " + str(e))
raise e
from DateTime import DateTime
from Products.ERP5Type.Log import log
# reference parameter example: telecom.MNE.filename.fif.EOF
# ingestion reference example: telecom_MNE_filename_fif
# ingestion id example: telecom_MNE_filename_fif_20170615
# reference parameter example: supplier.dataset.filename.fif.EOF
# ingestion reference example: dataset_filename_fif
FALSE = "FALSE"
TRUE = "TRUE"
today_string = DateTime().strftime('%Y%m%d')
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
data_ingestion_reference = '_'.join(reference.split('_')[:-1]) #'_'.join(reference.split('.')[:-1])
data_ingestion_id = '%s_%s' %(data_ingestion_reference, today_string)
# first search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
simulation_state = 'planned',
reference = data_ingestion_reference)
if data_ingestion is None:
document = portal.data_ingestion_module.get(data_ingestion_id)
if (document is not None):
if document.getSimulationState() == 'planned':
return FALSE
else:
return TRUE
else:
try:
context.logEntry("Attempt to do a new ingestion with reference: %s" % reference)
# remove supplier and eof from reference
data_ingestion_reference = '_'.join(reference.split('_')[1:-1])
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
reference = data_ingestion_reference)
if data_ingestion is None:
return FALSE
else:
if data_ingestion.getSimulationState() == 'planned':
return FALSE
context.logEntry("[ERROR] Data Ingestion reference %s already exists" % data_ingestion_reference)
return TRUE
except Exception as e:
context.logEntry("[ERROR] At script ingestionReferenceExists \n[ERROR] " + str(e))
raise e
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment