Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
K
klaus_wendelin
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Eteri
klaus_wendelin
Commits
eed4fd46
Commit
eed4fd46
authored
May 04, 2020
by
Ivan Tyagov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add tests for publish / invalidate states.
parent
fd728553
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
46 additions
and
16 deletions
+46
-16
bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.py
...Item/portal_components/test.erp5.testDataLakeIngestion.py
+46
-16
No files found.
bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.py
View file @
eed4fd46
...
@@ -31,14 +31,19 @@ class TestDataIngestion(SecurityTestCase):
...
@@ -31,14 +31,19 @@ class TestDataIngestion(SecurityTestCase):
self
.
assertEqual
(
self
.
INVALID
,
self
.
portal
.
getIngestionReferenceDictionary
()[
"invalid_suffix"
])
self
.
assertEqual
(
self
.
INVALID
,
self
.
portal
.
getIngestionReferenceDictionary
()[
"invalid_suffix"
])
self
.
assertEqual
(
self
.
EOF
,
self
.
REFERENCE_SEPARATOR
+
self
.
portal
.
getIngestionReferenceDictionary
()[
"split_end_suffix"
])
self
.
assertEqual
(
self
.
EOF
,
self
.
REFERENCE_SEPARATOR
+
self
.
portal
.
getIngestionReferenceDictionary
()[
"split_end_suffix"
])
self
.
assertEqual
(
self
.
PART_1
,
self
.
REFERENCE_SEPARATOR
+
self
.
portal
.
getIngestionReferenceDictionary
()[
"split_first_suffix"
])
self
.
assertEqual
(
self
.
PART_1
,
self
.
REFERENCE_SEPARATOR
+
self
.
portal
.
getIngestionReferenceDictionary
()[
"split_first_suffix"
])
# XXX: create default users
def
getRandomReference
(
self
):
def
getRandomReference
(
self
):
random_string
=
''
.
join
([
random
.
choice
(
string
.
ascii_letters
+
string
.
digits
)
for
_
in
xrange
(
10
)])
random_string
=
''
.
join
([
random
.
choice
(
string
.
ascii_letters
+
string
.
digits
)
for
_
in
xrange
(
10
)])
return
'UNIT-TEST-'
+
random_string
return
'UNIT-TEST-'
+
random_string
def
getIngestionReference
(
self
,
reference
,
extension
):
def
getIngestionReference
(
self
,
reference
,
extension
,
randomize_ingestion_reference
=
False
):
if
not
randomize_ingestion_reference
:
# return hard coded which results in one Data Set and multiple Data Streams (in context of test)
return
self
.
REF_PREFIX
+
reference
+
extension
return
self
.
REF_PREFIX
+
reference
+
extension
else
:
# create random one
random_string
=
self
.
getRandomReference
()
return
"%s/%s/%s/csv//fake-size/fake-hash"
%
(
random_string
,
random_string
,
random_string
)
def
sanitizeReference
(
self
,
reference
):
def
sanitizeReference
(
self
,
reference
):
ingestion_reference
=
self
.
REFERENCE_SEPARATOR
.
join
(
reference
.
split
(
self
.
REFERENCE_SEPARATOR
)[
1
:])
ingestion_reference
=
self
.
REFERENCE_SEPARATOR
.
join
(
reference
.
split
(
self
.
REFERENCE_SEPARATOR
)[
1
:])
...
@@ -76,20 +81,20 @@ class TestDataIngestion(SecurityTestCase):
...
@@ -76,20 +81,20 @@ class TestDataIngestion(SecurityTestCase):
request
.
set
(
'data_chunk'
,
encoded_data_chunk
)
request
.
set
(
'data_chunk'
,
encoded_data_chunk
)
ingestion_policy
.
ingest
()
ingestion_policy
.
ingest
()
self
.
tic
()
self
.
tic
()
return
def
ingest
(
self
,
data_chunk
,
reference
,
extension
,
eof
):
def
ingest
(
self
,
data_chunk
,
reference
,
extension
,
eof
,
randomize_ingestion_reference
=
False
):
ingestion_reference
=
self
.
getIngestionReference
(
reference
,
extension
)
ingestion_reference
=
self
.
getIngestionReference
(
reference
,
extension
,
randomize_ingestion_reference
)
self
.
portal
.
log
(
ingestion_reference
)
# use default ebulk policy
# use default ebulk policy
ingestion_policy
=
self
.
portal
.
portal_ingestion_policies
.
wendelin_embulk
ingestion_policy
=
self
.
portal
.
portal_ingestion_policies
.
wendelin_embulk
self
.
ingestRequest
(
ingestion_reference
,
eof
,
data_chunk
,
ingestion_policy
)
self
.
ingestRequest
(
ingestion_reference
,
eof
,
data_chunk
,
ingestion_policy
)
ingestion_id
,
ingestion_reference
=
self
.
sanitizeReference
(
ingestion_reference
)
_
,
ingestion_reference
=
self
.
sanitizeReference
(
ingestion_reference
)
return
ingestion_reference
return
ingestion_reference
def
stepIngest
(
self
,
extension
,
delimiter
):
def
stepIngest
(
self
,
extension
,
delimiter
,
randomize_ingestion_reference
=
False
):
file_name
=
"file_name.csv"
file_name
=
"file_name.csv"
reference
=
self
.
getRandomReference
()
reference
=
self
.
getRandomReference
()
array
=
[[
random
.
random
()
for
i
in
range
(
self
.
CHUNK_SIZE_CSV
+
10
)]
for
j
in
range
(
self
.
CHUNK_SIZE_CSV
+
10
)]
array
=
[[
random
.
random
()
for
i
in
range
(
self
.
CHUNK_SIZE_CSV
+
10
)]
for
j
in
range
(
self
.
CHUNK_SIZE_CSV
+
10
)]
...
@@ -105,7 +110,7 @@ class TestDataIngestion(SecurityTestCase):
...
@@ -105,7 +110,7 @@ class TestDataIngestion(SecurityTestCase):
else
:
else
:
break
break
ingestion_reference
=
self
.
ingest
(
data_chunk
,
reference
,
extension
,
self
.
SINGLE_INGESTION_END
)
ingestion_reference
=
self
.
ingest
(
data_chunk
,
reference
,
extension
,
self
.
SINGLE_INGESTION_END
,
randomize_ingestion_reference
=
randomize_ingestion_reference
)
if
os
.
path
.
exists
(
file_name
):
if
os
.
path
.
exists
(
file_name
):
os
.
remove
(
file_name
)
os
.
remove
(
file_name
)
...
@@ -116,16 +121,18 @@ class TestDataIngestion(SecurityTestCase):
...
@@ -116,16 +121,18 @@ class TestDataIngestion(SecurityTestCase):
data_ingestion_line
=
[
x
for
x
in
data_ingestion
.
objectValues
()
\
data_ingestion_line
=
[
x
for
x
in
data_ingestion
.
objectValues
()
\
if
x
.
getReference
()
==
'out_stream'
][
0
]
if
x
.
getReference
()
==
'out_stream'
][
0
]
data_set
=
data_ingestion_line
.
getAggregateValue
(
portal_type
=
'Data Set'
)
data_stream
=
data_ingestion_line
.
getAggregateValue
(
portal_type
=
'Data Stream'
)
data_stream
=
data_ingestion_line
.
getAggregateValue
(
portal_type
=
'Data Stream'
)
self
.
assertNotEqual
(
None
,
data_stream
)
self
.
assertNotEqual
(
None
,
data_stream
)
data_stream_data
=
data_stream
.
getData
()
data_stream_data
=
data_stream
.
getData
()
self
.
assertEqual
(
data_chunk
,
data_stream_data
)
self
.
assertEqual
(
data_chunk
,
data_stream_data
)
# check Data Stream and Data Set
# check Data Stream and Data Set
are validated
self
.
assertEqual
(
'validated'
,
data_stream
.
getValidationState
())
self
.
assertEqual
(
'validated'
,
data_stream
.
getValidationState
())
return
data_set
,
[
data_stream
]
def
test_01_DefaultEbulkIngestion
(
self
):
def
test_01_DefaultEbulkIngestion
(
self
):
"""
"""
Test default ingestion with ebulk too.
Test default ingestion with ebulk too.
...
@@ -183,3 +190,26 @@ class TestDataIngestion(SecurityTestCase):
...
@@ -183,3 +190,26 @@ class TestDataIngestion(SecurityTestCase):
getattr
(
self
.
portal
.
portal_ingestion_policies
,
"wendelin_embulk"
,
None
))
getattr
(
self
.
portal
.
portal_ingestion_policies
,
"wendelin_embulk"
,
None
))
self
.
assertNotEqual
(
None
,
self
.
assertNotEqual
(
None
,
getattr
(
self
.
portal
.
data_supply_module
,
"embulk"
,
None
))
getattr
(
self
.
portal
.
data_supply_module
,
"embulk"
,
None
))
def
test_04_DefaultModelSecurityModel
(
self
):
"""
Test default security model : 'All can download, only contributors can upload.'
"""
data_set
,
data_stream_list
=
self
.
stepIngest
(
self
.
CSV
,
","
,
randomize_ingestion_reference
=
True
)
self
.
tic
()
# publish data set and have all Data Streams publsihed automatically
data_set
.
publish
()
self
.
tic
()
self
.
assertEqual
(
'published'
,
data_set
.
getValidationState
())
self
.
assertSameSet
([
'published'
for
x
in
data_stream_list
],
[
x
.
getValidationState
()
for
x
in
data_stream_list
])
# invalidate Data Set should invalidate related Data Streams
data_set
.
invalidate
()
self
.
tic
()
self
.
assertEqual
(
'invalidated'
,
data_set
.
getValidationState
())
self
.
assertSameSet
([
'invalidated'
for
x
in
data_stream_list
],
[
x
.
getValidationState
()
for
x
in
data_stream_list
])
# XXX: new test which simulates download / upload of Data Set and increase DS version
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment