Commit 1a44bfe4 authored by Xavier Thompson's avatar Xavier Thompson

slapgrid: Handle connection loss to master

parent 0f2564f8
...@@ -53,6 +53,8 @@ if sys.version_info < (2, 6): ...@@ -53,6 +53,8 @@ if sys.version_info < (2, 6):
warnings.warn('Used python version (%s) is old and has problems with' warnings.warn('Used python version (%s) is old and has problems with'
' IPv6 connections' % sys.version.split('\n')[0]) ' IPv6 connections' % sys.version.split('\n')[0])
from requests.exceptions import RequestException
from lxml import etree from lxml import etree
from slapos import manager as slapmanager from slapos import manager as slapmanager
...@@ -69,7 +71,8 @@ from slapos.grid.SlapObject import Software, Partition ...@@ -69,7 +71,8 @@ from slapos.grid.SlapObject import Software, Partition
from slapos.grid.svcbackend import (launchSupervisord, from slapos.grid.svcbackend import (launchSupervisord,
createSupervisordConfiguration, createSupervisordConfiguration,
_getSupervisordConfigurationDirectory, _getSupervisordConfigurationDirectory,
_getSupervisordSocketPath) _getSupervisordSocketPath,
getSupervisorRPC)
from slapos.grid.utils import (md5digest, from slapos.grid.utils import (md5digest,
dropPrivileges, dropPrivileges,
SlapPopen, SlapPopen,
...@@ -90,6 +93,7 @@ COMPUTER_PARTITION_STOPPED_STATE = 'stopped' ...@@ -90,6 +93,7 @@ COMPUTER_PARTITION_STOPPED_STATE = 'stopped'
SLAPGRID_SUCCESS = 0 SLAPGRID_SUCCESS = 0
SLAPGRID_FAIL = 1 SLAPGRID_FAIL = 1
SLAPGRID_PROMISE_FAIL = 2 SLAPGRID_PROMISE_FAIL = 2
SLAPGRID_OFFLINE_SUCCESS = 3
PROMISE_TIMEOUT = 20 PROMISE_TIMEOUT = 20
COMPUTER_PARTITION_TIMESTAMP_FILENAME = '.timestamp' COMPUTER_PARTITION_TIMESTAMP_FILENAME = '.timestamp'
...@@ -1423,6 +1427,12 @@ stderr_logfile_backups=1 ...@@ -1423,6 +1427,12 @@ stderr_logfile_backups=1
return filtered_computer_partition_list return filtered_computer_partition_list
def processComputerPartitionList(self): def processComputerPartitionList(self):
try:
return self.processComputerPartitionListOnline()
except RequestException:
return self.processComputerPartitionListOffline()
def processComputerPartitionListOnline(self):
""" """
Will start supervisord and process each Computer Partition. Will start supervisord and process each Computer Partition.
""" """
...@@ -1449,6 +1459,10 @@ stderr_logfile_backups=1 ...@@ -1449,6 +1459,10 @@ stderr_logfile_backups=1
# Process the partition itself # Process the partition itself
self.processComputerPartition(computer_partition) self.processComputerPartition(computer_partition)
# Handle connection loss at the next level
except RequestException:
raise
# Send log before exiting # Send log before exiting
except (SystemExit, KeyboardInterrupt): except (SystemExit, KeyboardInterrupt):
computer_partition.error(traceback.format_exc(), logger=self.logger) computer_partition.error(traceback.format_exc(), logger=self.logger)
...@@ -1505,6 +1519,20 @@ stderr_logfile_backups=1 ...@@ -1505,6 +1519,20 @@ stderr_logfile_backups=1
return SLAPGRID_PROMISE_FAIL return SLAPGRID_PROMISE_FAIL
return SLAPGRID_SUCCESS return SLAPGRID_SUCCESS
def processComputerPartitionListOffline(self):
self.logger.info('Processing computer partitions offline...')
try:
supervisord_socket_path = _getSupervisordSocketPath(
self.instance_root,
self.logger
)
with getSupervisorRPC(supervisord_socket_path) as supervisor:
supervisor.startAllProcesses(False)
except Exception:
self.logger.exception('Error in offline mode while starting partitions:')
return SLAPGRID_FAIL
return SLAPGRID_OFFLINE_SUCCESS
def processPromiseList(self): def processPromiseList(self):
""" """
Will check and process promises for each Computer Partition. Will check and process promises for each Computer Partition.
......
...@@ -388,7 +388,8 @@ class ComputerForTest(object): ...@@ -388,7 +388,8 @@ class ComputerForTest(object):
software_root, software_root,
instance_root, instance_root,
instance_amount=1, instance_amount=1,
software_amount=1): software_amount=1,
status_code=200):
""" """
Will set up instances, software and sequence Will set up instances, software and sequence
""" """
...@@ -397,6 +398,7 @@ class ComputerForTest(object): ...@@ -397,6 +398,7 @@ class ComputerForTest(object):
self.software_amount = software_amount self.software_amount = software_amount
self.software_root = software_root self.software_root = software_root
self.instance_root = instance_root self.instance_root = instance_root
self.status_code = status_code
self.ip_address_list = [ self.ip_address_list = [
('interface1', '10.0.8.3'), ('interface1', '10.0.8.3'),
('interface2', '10.0.8.4'), ('interface2', '10.0.8.4'),
...@@ -425,18 +427,18 @@ class ComputerForTest(object): ...@@ -425,18 +427,18 @@ class ComputerForTest(object):
and 'computer_id' in qs): and 'computer_id' in qs):
slap_computer = self.getComputer(qs['computer_id'][0]) slap_computer = self.getComputer(qs['computer_id'][0])
return { return {
'status_code': 200, 'status_code': self.status_code,
'content': dumps(slap_computer) 'content': dumps(slap_computer)
} }
elif url.path == '/getHostingSubscriptionIpList': elif url.path == '/getHostingSubscriptionIpList':
ip_address_list = self.ip_address_list ip_address_list = self.ip_address_list
return { return {
'status_code': 200, 'status_code': self.status_code,
'content': dumps(ip_address_list) 'content': dumps(ip_address_list)
} }
elif url.path == '/getComputerPartitionCertificate': elif url.path == '/getComputerPartitionCertificate':
return { return {
'status_code': 200, 'status_code': self.status_code,
'content': dumps({'certificate': 'SLAPOS_cert', 'key': 'SLAPOS_key'}) 'content': dumps({'certificate': 'SLAPOS_cert', 'key': 'SLAPOS_key'})
} }
if req.method == 'POST' and 'computer_partition_id' in qs: if req.method == 'POST' and 'computer_partition_id' in qs:
...@@ -445,17 +447,17 @@ class ComputerForTest(object): ...@@ -445,17 +447,17 @@ class ComputerForTest(object):
instance.header_list.append(req.headers) instance.header_list.append(req.headers)
if url.path == '/startedComputerPartition': if url.path == '/startedComputerPartition':
instance.state = 'started' instance.state = 'started'
return {'status_code': 200} return {'status_code': self.status_code}
if url.path == '/stoppedComputerPartition': if url.path == '/stoppedComputerPartition':
instance.state = 'stopped' instance.state = 'stopped'
return {'status_code': 200} return {'status_code': self.status_code}
if url.path == '/destroyedComputerPartition': if url.path == '/destroyedComputerPartition':
instance.state = 'destroyed' instance.state = 'destroyed'
return {'status_code': 200} return {'status_code': self.status_code}
if url.path == '/softwareInstanceBang': if url.path == '/softwareInstanceBang':
return {'status_code': 200} return {'status_code': self.status_code}
if url.path == "/updateComputerPartitionRelatedInstanceList": if url.path == "/updateComputerPartitionRelatedInstanceList":
return {'status_code': 200} return {'status_code': self.status_code}
if url.path == '/softwareInstanceError': if url.path == '/softwareInstanceError':
instance.error_log = '\n'.join( instance.error_log = '\n'.join(
[ [
...@@ -465,18 +467,18 @@ class ComputerForTest(object): ...@@ -465,18 +467,18 @@ class ComputerForTest(object):
] ]
) )
instance.error = True instance.error = True
return {'status_code': 200} return {'status_code': self.status_code}
elif req.method == 'POST' and 'url' in qs: elif req.method == 'POST' and 'url' in qs:
# XXX hardcoded to first software release! # XXX hardcoded to first software release!
software = self.software_list[0] software = self.software_list[0]
software.sequence.append(url.path) software.sequence.append(url.path)
if url.path == '/availableSoftwareRelease': if url.path == '/availableSoftwareRelease':
return {'status_code': 200} return {'status_code': self.status_code}
if url.path == '/buildingSoftwareRelease': if url.path == '/buildingSoftwareRelease':
return {'status_code': 200} return {'status_code': self.status_code}
if url.path == '/destroyedSoftwareRelease': if url.path == '/destroyedSoftwareRelease':
return {'status_code': 200} return {'status_code': self.status_code}
if url.path == '/softwareReleaseError': if url.path == '/softwareReleaseError':
software.error_log = '\n'.join( software.error_log = '\n'.join(
[ [
...@@ -486,7 +488,7 @@ class ComputerForTest(object): ...@@ -486,7 +488,7 @@ class ComputerForTest(object):
] ]
) )
software.error = True software.error = True
return {'status_code': 200} return {'status_code': self.status_code}
else: else:
return {'status_code': 500} return {'status_code': 500}
...@@ -1021,6 +1023,69 @@ exit 1 ...@@ -1021,6 +1023,69 @@ exit 1
'/stoppedComputerPartition']) '/stoppedComputerPartition'])
self.assertEqual('stopped', instance.state) self.assertEqual('stopped', instance.state)
def test_one_partition_started_no_master(self):
computer = self.getTestComputerClass()(self.software_root, self.instance_root, status_code=503)
with httmock.HTTMock(computer.request_handler):
partition = computer.instance_list[0]
partition.requested_state = 'started'
partition.software.setBuildout()
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_OFFLINE_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
six.assertCountEqual(self, os.listdir(partition.partition_path), []) # buildout hasn't run
six.assertCountEqual(self, os.listdir(self.software_root), [partition.software.software_hash])
self.assertEqual(computer.sequence, ['/getFullComputerInformation'])
self.assertEqual(partition.state, None)
def test_one_partition_started_after_master_connection_loss(self):
computer = self.getTestComputerClass()(self.software_root, self.instance_root)
partition = computer.instance_list[0]
partition.requested_state = 'started'
partition.software.setBuildout()
run_path = os.path.join(partition.partition_path, 'etc', 'run')
os.makedirs(run_path)
with open(os.path.join(run_path, 'runner'), 'w') as f:
f.write("#!/bin/sh\necho 'Working'\ntouch 'runner_worked'")
os.fchmod(f.fileno(), 0o755)
runner_worked_file = os.path.join(partition.partition_path, 'runner_worked')
def assertRunnerWorked():
for _ in range(50):
if os.path.exists(runner_worked_file):
break
time.sleep(0.1)
else:
self.assertTrue(os.path.exists(runner_worked_file))
with httmock.HTTMock(computer.request_handler):
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
assertRunnerWorked()
six.assertCountEqual(self, os.listdir(partition.partition_path),
['.slapgrid', '.0_runner.log', 'buildout.cfg', 'etc',
'runner_worked', 'software_release', 'worked',
'.slapos-retention-lock-delay'])
runner_log_path = os.path.join(partition.partition_path, '.0_runner.log')
with open(runner_log_path) as f:
runner_log = f.read()
self.assertEqual(runner_log, 'Working\n')
self.assertEqual(partition.state, 'started')
computer.status_code = 503 # connection loss
os.unlink(runner_worked_file)
with httmock.HTTMock(computer.request_handler):
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_OFFLINE_SUCCESS)
self.assertInstanceDirectoryListEqual(['0'])
assertRunnerWorked()
with open(runner_log_path) as f:
runner_log = f.read()
self.assertEqual(runner_log, 'Working\n' * 2)
self.assertEqual(computer.sequence, [
'/getFullComputerInformation',
'/getComputerPartitionCertificate',
'/startedComputerPartition',
'/getComputerPartitionCertificate' # /getFullComputerInformation is cached
])
class TestSlapgridCPWithMasterWatchdog(MasterMixin, unittest.TestCase): class TestSlapgridCPWithMasterWatchdog(MasterMixin, unittest.TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment