Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
slapos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Dmitry Blinov
slapos
Commits
0fb30372
Commit
0fb30372
authored
6 years ago
by
Jérome Perrin
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
software/seleniumrunner: add software release test
parent
f4d803f8
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
752 additions
and
0 deletions
+752
-0
software/seleniumrunner/test/README.md
software/seleniumrunner/test/README.md
+1
-0
software/seleniumrunner/test/setup.py
software/seleniumrunner/test/setup.py
+55
-0
software/seleniumrunner/test/test.py
software/seleniumrunner/test/test.py
+375
-0
software/seleniumrunner/test/utils.py
software/seleniumrunner/test/utils.py
+321
-0
No files found.
software/seleniumrunner/test/README.md
0 → 100644
View file @
0fb30372
Tests for SeleniumRunner software release
This diff is collapsed.
Click to expand it.
software/seleniumrunner/test/setup.py
0 → 100644
View file @
0fb30372
##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from
setuptools
import
setup
,
find_packages
version
=
'0.0.1.dev0'
name
=
'slapos.test.seleniumrunner'
long_description
=
open
(
"README.md"
).
read
()
setup
(
name
=
name
,
version
=
version
,
description
=
"Test for SlapOS' Seleniumrunner"
,
long_description
=
long_description
,
long_description_content_type
=
'text/markdown'
,
maintainer
=
"Nexedi"
,
maintainer_email
=
"info@nexedi.com"
,
url
=
"https://lab.nexedi.com/nexedi/slapos"
,
packages
=
find_packages
(),
install_requires
=
[
'slapos.core'
,
'supervisor'
,
'slapos.libnetworkcache'
,
'erp5.util'
,
'selenium'
,
'psutil'
,
'image'
,
'requests'
,
'paramiko'
,
],
zip_safe
=
True
,
test_suite
=
'test'
,
)
This diff is collapsed.
Click to expand it.
software/seleniumrunner/test/test.py
0 → 100644
View file @
0fb30372
##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import
cgi
import
json
import
multiprocessing
import
os
import
tempfile
import
unittest
import
urlparse
from
BaseHTTPServer
import
BaseHTTPRequestHandler
,
HTTPServer
from
io
import
BytesIO
import
paramiko
import
requests
from
PIL
import
Image
from
selenium
import
webdriver
from
selenium.webdriver.common.by
import
By
from
selenium.webdriver.common.desired_capabilities
import
DesiredCapabilities
from
selenium.webdriver.support
import
expected_conditions
as
EC
from
selenium.webdriver.support.ui
import
WebDriverWait
from
.utils
import
SlapOSInstanceTestCase
,
findFreeTCPPort
debug_mode
=
os
.
environ
.
get
(
'DEBUG'
)
# for development: debugging logs and install Ctrl+C handler
if
debug_mode
:
import
logging
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
unittest
.
installHandler
()
class
WebServerMixin
(
object
):
"""Mixin class which provides a simple web server reachable at self.server_url
"""
def
setUp
(
self
):
"""Start a minimal web server.
"""
class
TestHandler
(
BaseHTTPRequestHandler
):
"""Request handler for our test server.
The implemented server is:
- submit q and you'll get a page with q as title
- upload a file and the file content will be displayed in div.uploadedfile
"""
def
log_message
(
self
,
*
args
,
**
kw
):
if
debug_mode
:
BaseHTTPRequestHandler
.
log_message
(
self
,
*
args
,
**
kw
)
def
do_GET
(
self
):
self
.
send_response
(
200
)
self
.
end_headers
()
self
.
wfile
.
write
(
'''
<html>
<title>Test page</title>
<body>
<form action="/" method="POST" enctype="multipart/form-data">
<input name="q" type="text"></input>
<input name="f" type="file" ></input>
<input type="submit" value="I'm feeling lucky"></input>
</form>
</body>
</html>'''
)
def
do_POST
(
self
):
form
=
cgi
.
FieldStorage
(
fp
=
self
.
rfile
,
headers
=
self
.
headers
,
environ
=
{
'REQUEST_METHOD'
:
'POST'
,
'CONTENT_TYPE'
:
self
.
headers
[
'Content-Type'
],})
self
.
send_response
(
200
)
self
.
end_headers
()
file_data
=
'no file'
if
form
.
has_key
(
'f'
):
file_data
=
form
[
'f'
].
file
.
read
()
self
.
wfile
.
write
(
'''
<html>
<title>%s</title>
<div>%s</div>
</html>
'''
%
(
form
[
'q'
].
value
,
file_data
))
super
(
WebServerMixin
,
self
).
setUp
()
ip
=
os
.
environ
.
get
(
'LOCAL_IPV4'
,
'127.0.1.1'
)
port
=
findFreeTCPPort
(
ip
)
server
=
HTTPServer
((
ip
,
port
),
TestHandler
)
self
.
server_process
=
multiprocessing
.
Process
(
target
=
server
.
serve_forever
)
self
.
server_process
.
start
()
self
.
server_url
=
'http://%s:%s/'
%
(
ip
,
port
)
def
tearDown
(
self
):
self
.
server_process
.
terminate
()
self
.
server_process
.
join
()
super
(
WebServerMixin
,
self
).
tearDown
()
class
BrowserCompatibilityMixin
(
WebServerMixin
):
"""Mixin class to run validation tests on a specific browser
"""
desired_capabilities
=
NotImplemented
user_agent
=
NotImplemented
def
setUp
(
self
):
super
(
BrowserCompatibilityMixin
,
self
).
setUp
()
self
.
driver
=
webdriver
.
Remote
(
command_executor
=
self
.
computer_partition
.
getConnectionParameterDict
()[
'backend-url'
],
desired_capabilities
=
self
.
desired_capabilities
)
def
tearDown
(
self
):
self
.
driver
.
quit
()
super
(
BrowserCompatibilityMixin
,
self
).
tearDown
()
def
test_user_agent
(
self
):
self
.
assertIn
(
self
.
user_agent
,
self
.
driver
.
execute_script
(
'return navigator.userAgent'
))
def
test_simple_submit_scenario
(
self
):
self
.
driver
.
get
(
self
.
server_url
)
input_element
=
WebDriverWait
(
self
.
driver
,
3
).
until
(
EC
.
visibility_of_element_located
((
By
.
NAME
,
'q'
)))
input_element
.
send_keys
(
self
.
id
())
input_element
.
submit
()
WebDriverWait
(
self
.
driver
,
3
).
until
(
EC
.
title_is
(
self
.
id
()))
def
test_upload_file
(
self
):
f
=
tempfile
.
NamedTemporaryFile
(
delete
=
False
)
f
.
write
(
self
.
id
())
f
.
close
()
self
.
addCleanup
(
lambda
:
os
.
remove
(
f
.
name
))
self
.
driver
.
get
(
self
.
server_url
)
self
.
driver
.
find_element_by_xpath
(
'//input[@name="f"]'
).
send_keys
(
f
.
name
)
self
.
driver
.
find_element_by_xpath
(
'//input[@type="submit"]'
).
click
()
self
.
assertEqual
(
self
.
id
(),
self
.
driver
.
find_element_by_xpath
(
'//div'
).
text
)
def
test_screenshot
(
self
):
self
.
driver
.
get
(
self
.
server_url
)
screenshot
=
Image
.
open
(
BytesIO
(
self
.
driver
.
get_screenshot_as_png
()))
# just check it's not a white screen
self
.
assertGreater
(
len
(
screenshot
.
getcolors
(
maxcolors
=
512
)),
2
)
def
test_window_and_screen_size
(
self
):
size
=
json
.
loads
(
self
.
driver
.
execute_script
(
'''
return JSON.stringify({
'screen.width': window.screen.width,
'screen.height': window.screen.height,
'screen.pixelDepth': window.screen.pixelDepth,
'innerWidth': window.innerWidth,
'innerHeight': window.innerHeight
})'''
))
# Xvfb is configured like this
self
.
assertEqual
(
1024
,
size
[
'screen.width'
])
self
.
assertEqual
(
768
,
size
[
'screen.height'
])
self
.
assertEqual
(
24
,
size
[
'screen.pixelDepth'
])
# window size must not be 0 (wrong firefox integration report this)
self
.
assertGreater
(
size
[
'innerWidth'
],
0
)
self
.
assertGreater
(
size
[
'innerHeight'
],
0
)
def
test_resize_window
(
self
):
self
.
driver
.
set_window_size
(
800
,
900
)
size
=
json
.
loads
(
self
.
driver
.
execute_script
(
'''
return JSON.stringify({
'outerWidth': window.outerWidth,
'outerHeight': window.outerHeight
})'''
))
self
.
assertEqual
(
800
,
size
[
'outerWidth'
])
self
.
assertEqual
(
900
,
size
[
'outerHeight'
])
def
test_multiple_clients
(
self
):
parameter_dict
=
self
.
computer_partition
.
getConnectionParameterDict
()
webdriver_url
=
parameter_dict
[
'backend-url'
]
queue
=
multiprocessing
.
Queue
()
def
_test
(
q
,
server_url
):
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
self
.
desired_capabilities
)
try
:
driver
.
get
(
server_url
)
q
.
put
(
driver
.
title
==
'Test page'
)
finally
:
driver
.
quit
()
nb_workers
=
10
workers
=
[]
for
i
in
range
(
nb_workers
):
worker
=
multiprocessing
.
Process
(
target
=
_test
,
args
=
(
queue
,
self
.
server_url
))
worker
.
start
()
workers
.
append
(
worker
)
del
worker
# pylint
_
=
[
worker
.
join
(
timeout
=
30
)
for
worker
in
workers
]
# terminate workers if they are still alive after 30 seconds
_
=
[
worker
.
terminate
()
for
worker
in
workers
if
worker
.
is_alive
()]
_
=
[
worker
.
join
()
for
worker
in
workers
]
del
_
# pylint
self
.
assertEqual
(
[
True
]
*
nb_workers
,
[
queue
.
get
()
for
_
in
range
(
nb_workers
)])
class
SeleniumServerTestCase
(
SlapOSInstanceTestCase
):
"""Test the remote driver on a minimal web server.
"""
@
classmethod
def
getSoftwareURLList
(
cls
):
return
(
os
.
path
.
abspath
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
,
'software.cfg'
)),
)
class
TestBrowserSelection
(
WebServerMixin
,
SeleniumServerTestCase
):
"""Test browser can be selected by `desiredCapabilities``
"""
def
test_chrome
(
self
):
parameter_dict
=
self
.
computer_partition
.
getConnectionParameterDict
()
webdriver_url
=
parameter_dict
[
'backend-url'
]
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
DesiredCapabilities
.
CHROME
)
driver
.
get
(
self
.
server_url
)
self
.
assertEqual
(
'Test page'
,
driver
.
title
)
self
.
assertIn
(
'Chrome'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
self
.
assertNotIn
(
'Firefox'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
driver
.
quit
()
def
test_firefox
(
self
):
parameter_dict
=
self
.
computer_partition
.
getConnectionParameterDict
()
webdriver_url
=
parameter_dict
[
'backend-url'
]
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
DesiredCapabilities
.
FIREFOX
)
driver
.
get
(
self
.
server_url
)
self
.
assertEqual
(
'Test page'
,
driver
.
title
)
self
.
assertIn
(
'Firefox'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
driver
.
quit
()
def
test_firefox_desired_version
(
self
):
parameter_dict
=
self
.
computer_partition
.
getConnectionParameterDict
()
webdriver_url
=
parameter_dict
[
'backend-url'
]
desired_capabilities
=
DesiredCapabilities
.
FIREFOX
.
copy
()
desired_capabilities
[
'version'
]
=
'60.0.2esr'
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
desired_capabilities
)
self
.
assertIn
(
'Gecko/20100101 Firefox/60.0'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
driver
.
quit
()
desired_capabilities
[
'version'
]
=
'52.9.0esr'
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
desired_capabilities
)
self
.
assertIn
(
'Gecko/20100101 Firefox/52.0'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
driver
.
quit
()
class
TestFrontend
(
WebServerMixin
,
SeleniumServerTestCase
):
"""Test hub's https frontend.
"""
def
test_admin
(
self
):
parameter_dict
=
self
.
computer_partition
.
getConnectionParameterDict
()
admin_url
=
parameter_dict
[
'admin-url'
]
parsed
=
urlparse
.
urlparse
(
admin_url
)
self
.
assertEqual
(
'admin'
,
parsed
.
username
)
self
.
assertTrue
(
parsed
.
password
)
self
.
assertIn
(
'Grid Console'
,
requests
.
get
(
admin_url
,
verify
=
False
).
text
)
def
test_browser_use_hub
(
self
):
parameter_dict
=
self
.
computer_partition
.
getConnectionParameterDict
()
webdriver_url
=
parameter_dict
[
'url'
]
parsed
=
urlparse
.
urlparse
(
webdriver_url
)
self
.
assertEqual
(
'selenium'
,
parsed
.
username
)
self
.
assertTrue
(
parsed
.
password
)
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
DesiredCapabilities
.
CHROME
)
driver
.
get
(
self
.
server_url
)
self
.
assertEqual
(
'Test page'
,
driver
.
title
)
driver
.
quit
()
class
TestSSHServer
(
SeleniumServerTestCase
):
@
classmethod
def
getInstanceParameterDict
(
cls
):
cls
.
ssh_key
=
paramiko
.
RSAKey
.
generate
(
1024
)
return
{
'ssh-authorized-key'
:
'ssh-rsa {}'
.
format
(
cls
.
ssh_key
.
get_base64
())}
def
test_connect
(
self
):
parameter_dict
=
self
.
computer_partition
.
getConnectionParameterDict
()
ssh_url
=
parameter_dict
[
'ssh-url'
]
parsed
=
urlparse
.
urlparse
(
ssh_url
)
self
.
assertEqual
(
'ssh'
,
parsed
.
scheme
)
client
=
paramiko
.
SSHClient
()
client
.
set_missing_host_key_policy
(
paramiko
.
client
.
WarningPolicy
)
client
.
connect
(
username
=
urlparse
.
urlparse
(
ssh_url
).
username
,
hostname
=
urlparse
.
urlparse
(
ssh_url
).
hostname
,
port
=
urlparse
.
urlparse
(
ssh_url
).
port
,
pkey
=
self
.
ssh_key
,
)
channel
=
client
.
invoke_shell
()
channel
.
settimeout
(
30
)
# openssh prints a warning 'Attempt to write login records by non-root user (aborting)'
# so we received more than the lenght of the asserted message.
self
.
assertIn
(
"Welcome to SlapOS Selenium Server."
,
channel
.
recv
(
100
))
class
TestFirefox52
(
BrowserCompatibilityMixin
,
SeleniumServerTestCase
):
desired_capabilities
=
dict
(
DesiredCapabilities
.
FIREFOX
,
version
=
'52.9.0esr'
)
user_agent
=
'Gecko/20100101 Firefox/52.0'
# resizing window is not supported on firefox 52 geckodriver
test_resize_window
=
unittest
.
expectedFailure
(
BrowserCompatibilityMixin
.
test_resize_window
)
class
TestFirefox60
(
BrowserCompatibilityMixin
,
SeleniumServerTestCase
):
desired_capabilities
=
dict
(
DesiredCapabilities
.
FIREFOX
,
version
=
'60.0.2esr'
)
user_agent
=
'Gecko/20100101 Firefox/60.0'
class
TestChrome69
(
BrowserCompatibilityMixin
,
SeleniumServerTestCase
):
desired_capabilities
=
dict
(
DesiredCapabilities
.
CHROME
,
version
=
'69.0.3497.0'
)
user_agent
=
'Chrome/69.0.3497.0'
This diff is collapsed.
Click to expand it.
software/seleniumrunner/test/utils.py
0 → 100644
View file @
0fb30372
##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import
unittest
import
os
import
socket
from
contextlib
import
closing
import
logging
import
StringIO
import
xmlrpclib
import
supervisor.xmlrpc
from
erp5.util.testnode.SlapOSControler
import
SlapOSControler
from
erp5.util.testnode.ProcessManager
import
ProcessManager
# Utility functions
def
findFreeTCPPort
(
ip
=
''
):
"""Find a free TCP port to listen to.
"""
family
=
socket
.
AF_INET6
if
':'
in
ip
else
socket
.
AF_INET
with
closing
(
socket
.
socket
(
family
,
socket
.
SOCK_STREAM
))
as
s
:
s
.
bind
((
ip
,
0
))
return
s
.
getsockname
()[
1
]
# TODO:
# - allow requesting multiple instances ?
class
SlapOSInstanceTestCase
(
unittest
.
TestCase
):
"""Install one slapos instance.
This test case install software(s) and request one instance during `setUpClass`
and destroy the instance during `tearDownClass`.
Software Release URL, Instance Software Type and Instance Parameters can be defined
on the class.
All tests from the test class will run with the same instance.
The following class attributes are available:
* `computer_partition`: the `slapos.core.XXX` computer partition instance.
* `computer_partition_root_path`: the path of the instance root directory,
"""
# Methods to be defined by subclasses.
@
classmethod
def
getSoftwareURLList
(
cls
):
"""Return URL of software releases to install.
To be defined by subclasses.
"""
raise
NotImplementedError
()
@
classmethod
def
getInstanceParameterDict
(
cls
):
"""Return instance parameters
To be defined by subclasses if they need to request instance with specific
parameters.
"""
return
{}
@
classmethod
def
getInstanceSoftwareType
(
cls
):
"""Return software type for instance, default "default"
To be defined by subclasses if they need to request instance with specific
software type.
"""
return
"default"
# Utility methods.
def
getSupervisorRPCServer
(
self
):
"""Returns a XML-RPC connection to the supervisor used by slapos node
Refer to http://supervisord.org/api.html for details of available methods.
"""
# xmlrpc over unix socket https://stackoverflow.com/a/11746051/7294664
return
xmlrpclib
.
ServerProxy
(
'http://slapos-supervisor'
,
transport
=
supervisor
.
xmlrpc
.
SupervisorTransport
(
None
,
None
,
# XXX hardcoded socket path
serverurl
=
"unix://{working_directory}/inst/supervisord.socket"
.
format
(
**
self
.
config
)))
# Unittest methods
@
classmethod
def
setUpClass
(
cls
):
"""Setup the class, build software and request an instance.
If you have to override this method, do not forget to call this method on
parent class.
"""
try
:
cls
.
setUpWorkingDirectory
()
cls
.
setUpConfig
()
cls
.
setUpSlapOSController
()
cls
.
runSoftwareRelease
()
# XXX instead of "runSoftwareRelease", it would be better to be closer to slapos usage:
# cls.supplySoftwares()
# cls.installSoftwares()
cls
.
runComputerPartition
()
# XXX instead of "runComputerPartition", it would be better to be closer to slapos usage:
# cls.requestInstances()
# cls.createInstances()
# cls.requestInstances()
except
Exception
:
cls
.
stopSlapOSProcesses
()
cls
.
setUp
=
lambda
self
:
self
.
fail
(
'Setup Class failed.'
)
raise
@
classmethod
def
tearDownClass
(
cls
):
"""Tear down class, stop the processes and destroy instance.
"""
cls
.
stopSlapOSProcesses
()
# Implementation
@
classmethod
def
stopSlapOSProcesses
(
cls
):
if
hasattr
(
cls
,
'_process_manager'
):
cls
.
_process_manager
.
killPreviousRun
()
@
classmethod
def
setUpWorkingDirectory
(
cls
):
"""Initialise the directories"""
cls
.
working_directory
=
os
.
environ
.
get
(
'SLAPOS_TEST_WORKING_DIR'
,
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'.slapos'
))
# To prevent error: Cannot open an HTTP server: socket.error reported
# AF_UNIX path too long This `working_directory` should not be too deep.
# Socket path is 108 char max on linux
# https://github.com/torvalds/linux/blob/3848ec5/net/unix/af_unix.c#L234-L238
# Supervisord socket name contains the pid number, which is why we add
# .xxxxxxx in this check.
if
len
(
cls
.
working_directory
+
'/inst/supervisord.socket.xxxxxxx'
)
>
108
:
raise
RuntimeError
(
'working directory ( {} ) is too deep, try setting '
'SLAPOS_TEST_WORKING_DIR'
.
format
(
cls
.
working_directory
))
if
not
os
.
path
.
exists
(
cls
.
working_directory
):
os
.
mkdir
(
cls
.
working_directory
)
@
classmethod
def
setUpConfig
(
cls
):
"""Create slapos configuration"""
cls
.
config
=
{
"working_directory"
:
cls
.
working_directory
,
"slapos_directory"
:
cls
.
working_directory
,
"log_directory"
:
cls
.
working_directory
,
"computer_id"
:
'slapos.test'
,
# XXX
'proxy_database'
:
os
.
path
.
join
(
cls
.
working_directory
,
'proxy.db'
),
'partition_reference'
:
cls
.
__name__
,
# "proper" slapos command must be in $PATH
'slapos_binary'
:
'slapos'
,
}
# Some tests are expecting that local IP is not set to 127.0.0.1
ipv4_address
=
os
.
environ
.
get
(
'LOCAL_IPV4'
,
'127.0.1.1'
)
ipv6_address
=
os
.
environ
[
'GLOBAL_IPV6'
]
cls
.
config
[
'proxy_host'
]
=
cls
.
config
[
'ipv4_address'
]
=
ipv4_address
cls
.
config
[
'ipv6_address'
]
=
ipv6_address
cls
.
config
[
'proxy_port'
]
=
findFreeTCPPort
(
ipv4_address
)
cls
.
config
[
'master_url'
]
=
'http://{proxy_host}:{proxy_port}'
.
format
(
**
cls
.
config
)
@
classmethod
def
setUpSlapOSController
(
cls
):
"""Create the a "slapos controller" and supply softwares from `getSoftwareURLList`.
This is equivalent to:
slapos proxy start
for sr in getSoftwareURLList; do
slapos supply $SR $COMP
done
"""
cls
.
_process_manager
=
ProcessManager
()
# XXX this code is copied from testnode code
cls
.
slapos_controler
=
SlapOSControler
(
cls
.
working_directory
,
cls
.
config
)
slapproxy_log
=
os
.
path
.
join
(
cls
.
config
[
'log_directory'
],
'slapproxy.log'
)
logger
=
logging
.
getLogger
(
__name__
)
logger
.
debug
(
'Configured slapproxy log to %r'
,
slapproxy_log
)
cls
.
software_url_list
=
cls
.
getSoftwareURLList
()
cls
.
slapos_controler
.
initializeSlapOSControler
(
slapproxy_log
=
slapproxy_log
,
process_manager
=
cls
.
_process_manager
,
reset_software
=
False
,
software_path_list
=
cls
.
software_url_list
)
# XXX we should check *earlier* if that pidfile exist and if supervisord
# process still running, because if developer started supervisord (or bugs?)
# then another supervisord will start and starting services a second time
# will fail.
cls
.
_process_manager
.
supervisord_pid_file
=
os
.
path
.
join
(
cls
.
slapos_controler
.
instance_root
,
'var'
,
'run'
,
'supervisord.pid'
)
@
classmethod
def
runSoftwareRelease
(
cls
):
"""Run all the software releases that were supplied before.
This is the equivalent of `slapos node software`.
The tests will be marked file if software building fail.
"""
logger
=
logging
.
getLogger
()
logger
.
level
=
logging
.
DEBUG
stream
=
StringIO
.
StringIO
()
stream_handler
=
logging
.
StreamHandler
(
stream
)
logger
.
addHandler
(
stream_handler
)
try
:
cls
.
software_status_dict
=
cls
.
slapos_controler
.
runSoftwareRelease
(
cls
.
config
,
environment
=
os
.
environ
)
stream
.
seek
(
0
)
stream
.
flush
()
message
=
''
.
join
(
stream
.
readlines
()[
-
100
:])
assert
cls
.
software_status_dict
[
'status_code'
]
==
0
,
message
finally
:
logger
.
removeHandler
(
stream_handler
)
del
stream
@
classmethod
def
runComputerPartition
(
cls
):
"""Instanciate the software.
This is the equivalent of doing:
slapos request --type=getInstanceSoftwareType --parameters=getInstanceParameterDict
slapos node instance
and return the slapos request instance parameters.
This can be called by tests to simulate re-request with different parameters.
"""
logger
=
logging
.
getLogger
()
logger
.
level
=
logging
.
DEBUG
stream
=
StringIO
.
StringIO
()
stream_handler
=
logging
.
StreamHandler
(
stream
)
logger
.
addHandler
(
stream_handler
)
if
cls
.
getInstanceSoftwareType
()
!=
'default'
:
raise
NotImplementedError
instance_parameter_dict
=
cls
.
getInstanceParameterDict
()
try
:
cls
.
instance_status_dict
=
cls
.
slapos_controler
.
runComputerPartition
(
cls
.
config
,
cluster_configuration
=
instance_parameter_dict
,
environment
=
os
.
environ
)
stream
.
seek
(
0
)
stream
.
flush
()
message
=
''
.
join
(
stream
.
readlines
()[
-
100
:])
assert
cls
.
instance_status_dict
[
'status_code'
]
==
0
,
message
finally
:
logger
.
removeHandler
(
stream_handler
)
del
stream
# FIXME: similar to test node, only one (root) partition is really
# supported for now.
computer_partition_list
=
[]
for
i
in
range
(
len
(
cls
.
software_url_list
)):
computer_partition_list
.
append
(
cls
.
slapos_controler
.
slap
.
registerOpenOrder
().
request
(
cls
.
software_url_list
[
i
],
# This is how testnode's SlapOSControler name created partitions
partition_reference
=
'testing partition {i}'
.
format
(
i
=
i
,
**
cls
.
config
),
partition_parameter_kw
=
instance_parameter_dict
))
# expose some class attributes so that tests can use them:
# the ComputerPartition instances, to getInstanceParameterDict
cls
.
computer_partition
=
computer_partition_list
[
0
]
# the path of the instance on the filesystem, for low level inspection
cls
.
computer_partition_root_path
=
os
.
path
.
join
(
cls
.
config
[
'working_directory'
],
'inst'
,
cls
.
computer_partition
.
getId
())
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment