Commit 0f36ef03 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: process raw data external method

- now it uses small chunks to: read Data Stream content; write raw data file; read raw data arrays; save raw array
- framework to handle processing data of more extensions
parent 17ff606c
......@@ -3,21 +3,47 @@ import mne
import json
from DateTime import DateTime
from mne.report import Report
#from Products.ERP5Type.Log import log
from Products.ERP5Type.Log import log
CHUNK_SIZE = 200000
def saveRawFile(data_stream, file_name):
log("Writting raw data file...")
data_stream_chunk = None
n_chunk = 0
chunk_size = CHUNK_SIZE
while True:
start_offset = n_chunk*chunk_size
end_offset = n_chunk*chunk_size+chunk_size
try:
data_stream_chunk = ''.join(data_stream.readChunkList(start_offset, end_offset))
except:
raise StandardError("Empty Data Stream")
if data_stream_chunk == "": break
with open(file_name, 'ab') as fif_file:
fif_file.write(data_stream_chunk)
n_chunk += 1
log("Done.")
def getHTMLReport(file_name, raw):
try:
pattern = file_name + "_raw.fif"
report_file = file_name + 'report.html'
os.rename(file_name, pattern)
report = Report(verbose=True)
report.parse_folder(data_path="./", pattern=[pattern])
report.save(report_file, overwrite=True, open_browser=False)
with open(report_file, 'r') as report:
content = report.read()
os.remove(report_file)
return content
except Exception as e:
return str(e)
log("Error while getting HTML Report: " + str(e))
return ""
finally:
if os.path.exists(pattern):
os.rename(pattern, file_name)
if os.path.exists(report_file):
os.remove(report_file)
def getJSONMetadata(raw_info):
info = { 'filename': 'FILENAME', #raw_info['filename'],
......@@ -27,38 +53,85 @@ def getJSONMetadata(raw_info):
}
return json.dumps(info)
def getRawData(file_name, data_stream_content):
with open(file_name, 'wb') as fif_file:
fif_file.write(data_stream_content)
def getRawData(file_name):
raw = None
try:
raw = mne.io.read_raw_fif(file_name, preload=True)
raw = mne.io.read_raw_fif(file_name, preload=False, verbose=None)
except:
pass
if raw is None:
try:
raw = mne.io.read_raw_edf(file_name, preload=True)
raw = mne.io.read_raw_edf(file_name, preload=False, verbose=None)
except:
pass
if raw is None: raise StandardError("the file does not contain raw data.")
return raw
def processRawData(data_stream, data_array, data_descriptor):
file_name = "temporal_file_%s" % DateTime().strftime('%Y%m%d-%H%M%S')
def processFifData(file_name, data_array, data_descriptor):
raw = getRawData(file_name)
try:
data_stream_content = data_stream.getData()
raw_file = file_name + "_raw.fif"
raw = getRawData(raw_file, data_stream_content)
if raw is None: raise StandardError("the file does not contain raw data.")
json_metadata = getJSONMetadata(raw.info)
html_report = getHTMLReport(file_name, raw)
data_descriptor.setTextContent(json_metadata)
data_descriptor.setTextContent(html_report)
#log("Data Descriptor metadata: " + str(data_descriptor.getTextContent()))
data_array.setArray(raw._data)
#log("Data Array array: " + str(data_array.getArray()))
os.remove(raw_file)
return "Raw data processed."
except Exception, e:
if os.path.exists(raw_file):
os.remove(raw_file)
log("Data Descritor content saved")
except Exception as e:
log("Error handling Data Descriptor content: " + str(e))
log("Saving raw data in Data Array...")
picks = mne.pick_types(raw.info)
if len(picks) == 0: raise StandardError("The raw data does not contain any element")
data, times = raw[picks[:1]] # get data from first pick to get shape
dtype = data.dtype
data_array.initArray(data.shape, dtype)
zarray = data_array.getArray()
zarray[0] = data[0]
data_array.setArray(zarray)
for pick in xrange(1, len(picks)):
data, times = raw[pick]
zarray = data_array.getArray()
zarray.append(data)
data_array.setArray(zarray)
def processTxtData(file_name, data_array, data_descriptor):
log("Proccessing for txt data not implemented yet.")
def processNoneData(file_name, data_array, data_descriptor):
log("Proccessing for non-extension data not implemented yet.")
def processCsvData(file_name, data_array, data_descriptor):
log("Proccessing for csv data not implemented yet.")
def processRawData(data_stream, data_array, data_descriptor, reference_extension):
import time
start = time.time()
file_name = "temporal_file_%s" % DateTime().strftime('%Y%m%d-%H%M%S')
try:
saveRawFile(data_stream, file_name)
except Exception as e:
if os.path.exists(file_name):
os.remove(file_name)
return "Error while processing raw data - saving file: " + str(e)
options = {"fif" : processFifData,
"txt" : processTxtData,
"csv" : processCsvData,
"none" : processNoneData,
}
try:
log("Processing data...")
options[reference_extension](file_name, data_array, data_descriptor)
except KeyError, e:
return "Proccessing for %s data not implemented yet." % reference_extension
except Exception as e:
return "Error while processing raw data: " + str(e)
finally:
if os.path.exists(file_name):
os.remove(file_name)
elapsed = time.time() - start
log("Done. Time elapsed: " + str(elapsed))
return "Raw data processed in %s seconds." % str(elapsed)
\ No newline at end of file
......@@ -52,8 +52,9 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W: 36, 2: No exception type(s) specified (bare-except)</string>
<string>W: 41, 4: No exception type(s) specified (bare-except)</string>
<string>W: 60, 2: No exception type(s) specified (bare-except)</string>
<string>W: 65, 4: No exception type(s) specified (bare-except)</string>
<string>W: 83, 8: Unused variable \'times\' (unused-variable)</string>
</tuple>
</value>
</item>
......
from Products.ERP5Type.Log import log
log("Processing raw data from Data Stream " + str(input_stream_data.getReference()) + " to Data Array " + str(output_array.getReference()))
context.logEntry("Processing raw data from Data Stream to Data Array for Data Ingestion %s" % str(output_array.getReference()))
log("Processing raw data from Data Stream " + str(input_stream_data.getReference()))
context.logEntry("Processing raw data from Data Stream %s" % str(input_stream_data.getReference()))
result = str(context.processRawData(input_stream_data, output_array, output_descriptor))
reference_extension = input_stream_data.getReference().split("/")[-1]
result = str(context.processRawData(input_stream_data, output_array, output_descriptor, reference_extension))
log(result)
context.logEntry("Result: %s" % result)
log("Metadata stored in Data Descriptor " + str(output_descriptor))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment