Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
slapos.core
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Léo-Paul Géneau
slapos.core
Commits
c1158ead
Commit
c1158ead
authored
Mar 24, 2022
by
Rafael Monnerat
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
slapos_cloud: Add tests for SlapOSCacheMixin
parent
12efac17
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
313 additions
and
4 deletions
+313
-4
master/bt5/slapos_cloud/MixinTemplateItem/portal_components/mixin.erp5.SlapOSCacheMixin.py
...lateItem/portal_components/mixin.erp5.SlapOSCacheMixin.py
+3
-3
master/bt5/slapos_cloud/TestTemplateItem/portal_components/test.erp5.testSlapOSCloud.py
...mplateItem/portal_components/test.erp5.testSlapOSCloud.py
+310
-1
No files found.
master/bt5/slapos_cloud/MixinTemplateItem/portal_components/mixin.erp5.SlapOSCacheMixin.py
View file @
c1158ead
...
...
@@ -98,13 +98,13 @@ class SlapOSCacheMixin:
return
data_dict
def
setAccessStatus
(
self
,
text
,
state
=
""
,
reindex
=
0
):
self
.
_setAccessStatus
(
"%s %s"
%
(
ACCESS
,
text
),
state
,
reindex
)
return
self
.
_setAccessStatus
(
"%s %s"
%
(
ACCESS
,
text
),
state
,
reindex
)
def
setErrorStatus
(
self
,
text
,
state
=
""
,
reindex
=
0
):
self
.
_setAccessStatus
(
"%s %s"
%
(
ERROR
,
text
),
state
,
reindex
)
return
self
.
_setAccessStatus
(
"%s %s"
%
(
ERROR
,
text
),
state
,
reindex
)
def
setBuildingStatus
(
self
,
text
,
state
=
""
,
reindex
=
0
):
self
.
_setAccessStatus
(
"%s %s"
%
(
BUILDING
,
text
),
state
,
reindex
)
return
self
.
_setAccessStatus
(
"%s %s"
%
(
BUILDING
,
text
),
state
,
reindex
)
def
_setAccessStatus
(
self
,
text
,
state
=
""
,
reindex
=
0
):
user_reference
=
self
.
getPortalObject
().
portal_membership
.
getAuthenticatedMember
()
\
...
...
master/bt5/slapos_cloud/TestTemplateItem/portal_components/test.erp5.testSlapOSCloud.py
View file @
c1158ead
...
...
@@ -28,6 +28,314 @@
from
erp5.component.test.SlapOSTestCaseMixin
import
SlapOSTestCaseMixin
from
DateTime
import
DateTime
from
App.Common
import
rfc1123_date
import
json
class
TestSlapOSCloudSlapOSCacheMixin
(
SlapOSTestCaseMixin
):
def
afterSetUp
(
self
):
SlapOSTestCaseMixin
.
afterSetUp
(
self
)
self
.
pinDateTime
(
DateTime
())
self
.
_makeComputeNode
()
self
.
_makeComplexComputeNode
(
with_slave
=
True
)
self
.
tic
()
def
beforeTearDown
(
self
):
self
.
unpinDateTime
()
self
.
_cleaupREQUEST
()
def
test_LastData
(
self
):
value
=
"XXX"
self
.
assertEqual
(
None
,
self
.
compute_node
.
getLastData
())
self
.
compute_node
.
setLastData
(
value
)
self
.
assertEqual
(
value
,
self
.
compute_node
.
getLastData
())
key
=
"OI"
value_key
=
"ABC"
self
.
assertEqual
(
None
,
self
.
compute_node
.
getLastData
(
key
))
self
.
compute_node
.
setLastData
(
value_key
,
key
=
key
)
self
.
assertEqual
(
value_key
,
self
.
compute_node
.
getLastData
(
key
))
def
test_getAccessStatus_no_data
(
self
):
since
=
rfc1123_date
(
DateTime
())
created_at
=
since
def
getBaseExpectedDict
(
doc
):
return
{
"user"
:
"SlapOS Master"
,
'created_at'
:
'%s'
%
created_at
,
'since'
:
'%s'
%
since
,
'state'
:
""
,
"text"
:
"#error no data found for %s"
%
doc
.
getReference
(),
"no_data"
:
1
}
# Check Compute Node
self
.
assertEqual
(
self
.
compute_node
.
_getCachedAccessInfo
(),
None
)
self
.
assertEqual
(
self
.
compute_node
.
getAccessStatus
(),
getBaseExpectedDict
(
self
.
compute_node
))
# Check Software Installation
installation
=
self
.
start_requested_software_installation
self
.
assertEqual
(
installation
.
_getCachedAccessInfo
(),
None
)
self
.
assertEqual
(
installation
.
getAccessStatus
(),
getBaseExpectedDict
(
installation
))
partition
=
self
.
compute_node
.
partition1
self
.
assertEqual
(
partition
.
_getCachedAccessInfo
(),
None
)
self
.
assertEqual
(
partition
.
getAccessStatus
(),
getBaseExpectedDict
(
partition
))
instance
=
self
.
start_requested_software_instance
self
.
assertEqual
(
instance
.
_getCachedAccessInfo
(),
None
)
self
.
assertEqual
(
instance
.
getAccessStatus
(),
getBaseExpectedDict
(
instance
))
def
test_setAccessStatus
(
self
):
since
=
rfc1123_date
(
DateTime
())
created_at
=
since
def
getExpectedCacheDict
(
doc
):
return
json
.
dumps
({
"user"
:
"ERP5TypeTestCase"
,
'created_at'
:
'%s'
%
created_at
,
'since'
:
'%s'
%
since
,
'state'
:
""
,
"text"
:
"#access TEST123 %s"
%
doc
.
getUid
()
})
def
getBaseExpectedDict
(
doc
):
return
{
"user"
:
"ERP5TypeTestCase"
,
'created_at'
:
'%s'
%
created_at
,
u'since'
:
u'%s'
%
since
,
u'state'
:
u""
,
u"text"
:
u"#access TEST123 %s"
%
doc
.
getUid
(),
'no_data_since_15_minutes'
:
0
,
'no_data_since_5_minutes'
:
0
}
# Check Compute Node
self
.
assertEqual
(
True
,
self
.
compute_node
.
setAccessStatus
(
"TEST123 %s"
%
self
.
compute_node
.
getUid
()))
self
.
assertEqual
(
self
.
compute_node
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
self
.
compute_node
))
self
.
assertEqual
(
self
.
compute_node
.
getAccessStatus
(),
getBaseExpectedDict
(
self
.
compute_node
))
self
.
assertEqual
(
False
,
self
.
compute_node
.
setAccessStatus
(
"TEST123 %s"
%
self
.
compute_node
.
getUid
()))
# Check Software Installation
installation
=
self
.
start_requested_software_installation
self
.
assertEqual
(
True
,
installation
.
setAccessStatus
(
"TEST123 %s"
%
installation
.
getUid
()))
self
.
assertEqual
(
installation
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
installation
))
self
.
assertEqual
(
installation
.
getAccessStatus
(),
getBaseExpectedDict
(
installation
))
self
.
assertEqual
(
False
,
installation
.
setAccessStatus
(
"TEST123 %s"
%
installation
.
getUid
()))
# Compute Partition
partition
=
self
.
compute_node
.
partition1
self
.
assertEqual
(
True
,
partition
.
setAccessStatus
(
"TEST123 %s"
%
partition
.
getUid
()))
self
.
assertEqual
(
partition
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
partition
))
self
.
assertEqual
(
partition
.
getAccessStatus
(),
getBaseExpectedDict
(
partition
))
self
.
assertEqual
(
False
,
partition
.
setAccessStatus
(
"TEST123 %s"
%
partition
.
getUid
()))
# Software Instance
instance
=
self
.
start_requested_software_instance
# This is already called from elsewhere, so it actually changed
self
.
assertEqual
(
True
,
instance
.
setAccessStatus
(
"TEST123 %s"
%
instance
.
getUid
()))
self
.
assertEqual
(
instance
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
instance
))
self
.
assertEqual
(
instance
.
getAccessStatus
(),
getBaseExpectedDict
(
instance
))
self
.
assertEqual
(
False
,
instance
.
setAccessStatus
(
"TEST123 %s"
%
instance
.
getUid
()))
def
test_setAccessStatus_reindex
(
self
):
since
=
rfc1123_date
(
DateTime
())
created_at
=
since
def
getExpectedCacheDict
(
doc
):
return
json
.
dumps
({
"user"
:
"ERP5TypeTestCase"
,
'created_at'
:
'%s'
%
created_at
,
'since'
:
'%s'
%
since
,
'state'
:
""
,
"text"
:
"#access TEST123 %s"
%
doc
.
getUid
()
})
def
getBaseExpectedDict
(
doc
):
return
{
"user"
:
"ERP5TypeTestCase"
,
'created_at'
:
'%s'
%
created_at
,
u'since'
:
u'%s'
%
since
,
u'state'
:
u""
,
u"text"
:
u"#access TEST123 %s"
%
doc
.
getUid
(),
'no_data_since_15_minutes'
:
0
,
'no_data_since_5_minutes'
:
0
}
self
.
tic
()
# Software Instance
instance
=
self
.
start_requested_software_instance
indexation_timestamp
=
self
.
portal
.
portal_catalog
(
uid
=
instance
.
getUid
(),
select_dict
=
{
"indexation_timestamp"
:
None
}
)[
0
].
indexation_timestamp
# This is already called from elsewhere, so it actually changed
self
.
assertEqual
(
True
,
instance
.
setAccessStatus
(
"TEST123 %s"
%
instance
.
getUid
()))
self
.
assertEqual
(
instance
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
instance
))
self
.
assertEqual
(
instance
.
getAccessStatus
(),
getBaseExpectedDict
(
instance
))
self
.
tic
()
new_indexation_timestamp
=
self
.
portal
.
portal_catalog
(
uid
=
instance
.
getUid
(),
select_dict
=
{
"indexation_timestamp"
:
None
}
)[
0
].
indexation_timestamp
self
.
assertEqual
(
new_indexation_timestamp
,
indexation_timestamp
)
self
.
assertEqual
(
False
,
instance
.
setAccessStatus
(
"TEST123 %s"
%
instance
.
getUid
()))
self
.
tic
()
new_indexation_timestamp
=
self
.
portal
.
portal_catalog
(
uid
=
instance
.
getUid
(),
select_dict
=
{
"indexation_timestamp"
:
None
}
)[
0
].
indexation_timestamp
self
.
assertEqual
(
new_indexation_timestamp
,
indexation_timestamp
)
self
.
assertEqual
(
False
,
instance
.
setAccessStatus
(
"TEST123 %s"
%
instance
.
getUid
(),
reindex
=
1
))
self
.
tic
()
new_indexation_timestamp
=
self
.
portal
.
portal_catalog
(
uid
=
instance
.
getUid
(),
select_dict
=
{
"indexation_timestamp"
:
None
}
)[
0
].
indexation_timestamp
self
.
assertEqual
(
new_indexation_timestamp
,
indexation_timestamp
)
self
.
assertEqual
(
True
,
instance
.
setErrorStatus
(
"TEST123 %s"
%
instance
.
getUid
(),
reindex
=
1
))
self
.
tic
()
new_indexation_timestamp
=
self
.
portal
.
portal_catalog
(
uid
=
instance
.
getUid
(),
select_dict
=
{
"indexation_timestamp"
:
None
}
)[
0
].
indexation_timestamp
self
.
assertNotEqual
(
new_indexation_timestamp
,
indexation_timestamp
)
def
test_setErrorStatus
(
self
):
since
=
rfc1123_date
(
DateTime
())
created_at
=
since
def
getExpectedCacheDict
(
doc
):
return
json
.
dumps
({
"user"
:
"ERP5TypeTestCase"
,
'created_at'
:
'%s'
%
created_at
,
'since'
:
'%s'
%
since
,
'state'
:
""
,
"text"
:
"#error TEST123 %s"
%
doc
.
getUid
()
})
def
getBaseExpectedDict
(
doc
):
return
{
"user"
:
"ERP5TypeTestCase"
,
'created_at'
:
'%s'
%
created_at
,
u'since'
:
u'%s'
%
since
,
u'state'
:
u""
,
u"text"
:
u"#error TEST123 %s"
%
doc
.
getUid
(),
'no_data_since_15_minutes'
:
0
,
'no_data_since_5_minutes'
:
0
}
# Check Compute Node
self
.
assertEqual
(
True
,
self
.
compute_node
.
setErrorStatus
(
"TEST123 %s"
%
self
.
compute_node
.
getUid
()))
self
.
assertEqual
(
self
.
compute_node
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
self
.
compute_node
))
self
.
assertEqual
(
self
.
compute_node
.
getAccessStatus
(),
getBaseExpectedDict
(
self
.
compute_node
))
self
.
assertEqual
(
False
,
self
.
compute_node
.
setErrorStatus
(
"TEST123 %s"
%
self
.
compute_node
.
getUid
()))
# Check Software Installation
installation
=
self
.
start_requested_software_installation
self
.
assertEqual
(
True
,
installation
.
setErrorStatus
(
"TEST123 %s"
%
installation
.
getUid
()))
self
.
assertEqual
(
installation
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
installation
))
self
.
assertEqual
(
installation
.
getAccessStatus
(),
getBaseExpectedDict
(
installation
))
self
.
assertEqual
(
False
,
installation
.
setErrorStatus
(
"TEST123 %s"
%
installation
.
getUid
()))
# Compute Partition
partition
=
self
.
compute_node
.
partition1
self
.
assertEqual
(
True
,
partition
.
setErrorStatus
(
"TEST123 %s"
%
partition
.
getUid
()))
self
.
assertEqual
(
partition
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
partition
))
self
.
assertEqual
(
partition
.
getAccessStatus
(),
getBaseExpectedDict
(
partition
))
self
.
assertEqual
(
False
,
partition
.
setErrorStatus
(
"TEST123 %s"
%
partition
.
getUid
()))
# Software Instance
instance
=
self
.
start_requested_software_instance
self
.
assertEqual
(
True
,
instance
.
setErrorStatus
(
"TEST123 %s"
%
instance
.
getUid
()))
self
.
assertEqual
(
instance
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
instance
))
self
.
assertEqual
(
instance
.
getAccessStatus
(),
getBaseExpectedDict
(
instance
))
self
.
assertEqual
(
False
,
instance
.
setErrorStatus
(
"TEST123 %s"
%
instance
.
getUid
()))
def
test_setBuildingStatus
(
self
):
since
=
rfc1123_date
(
DateTime
())
created_at
=
since
def
getExpectedCacheDict
(
doc
):
return
json
.
dumps
({
"user"
:
"ERP5TypeTestCase"
,
'created_at'
:
'%s'
%
created_at
,
'since'
:
'%s'
%
since
,
'state'
:
""
,
"text"
:
"#building TEST123 %s"
%
doc
.
getUid
()
})
def
getBaseExpectedDict
(
doc
):
return
{
"user"
:
"ERP5TypeTestCase"
,
'created_at'
:
'%s'
%
created_at
,
u'since'
:
u'%s'
%
since
,
u'state'
:
u""
,
u"text"
:
u"#building TEST123 %s"
%
doc
.
getUid
(),
'no_data_since_15_minutes'
:
0
,
'no_data_since_5_minutes'
:
0
}
# Check Software Installation
installation
=
self
.
start_requested_software_installation
self
.
assertEqual
(
True
,
installation
.
setBuildingStatus
(
"TEST123 %s"
%
installation
.
getUid
()))
self
.
assertEqual
(
installation
.
_getCachedAccessInfo
(),
getExpectedCacheDict
(
installation
))
self
.
assertEqual
(
installation
.
getAccessStatus
(),
getBaseExpectedDict
(
installation
))
self
.
assertEqual
(
False
,
installation
.
setBuildingStatus
(
"TEST123 %s"
%
installation
.
getUid
()))
class
TestSlapOSCloudSoftwareInstance
(
SlapOSTestCaseMixin
):
...
...
@@ -149,3 +457,4 @@ class TestSlapOSCloudSoftwareInstance(
self
.
assertEqual
([(
u''
,
u'ip_address_1'
)],
self
.
start_requested_software_instance
.
_getInstanceTreeIpList
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment