Commit 728d2e39 authored by Marco Mariani's avatar Marco Mariani

refactor: removed templating over python code, added logging

parent c462fa96
No related merge requests found
......@@ -26,57 +26,51 @@
##############################################################################
from slapos.recipe.librecipe import GenericSlapRecipe
import sys
import os
class Recipe(GenericSlapRecipe):
""" This class provides the installation of the resilience
script on the partition.
"""
""" This class provides the installation of the resilience
script on the partition.
"""
def _install(self):
path_list = []
self_id = int(self.parameter_dict['number'])
ip = self.parameter_dict['ip-list'].split(' ')
print 'Creating bully script with ips : %s\n' % ip
slap_connection = self.buildout['slap-connection']
def _install(self):
path_list = []
confpath = os.path.join(self.options['script'], 'bully.conf')
ip = self.parameter_dict['ip-list'].split(' ')
print 'Creating bully configuration with ips : %s\n' % ip
conf = self.createFile(confpath,
self.substituteTemplate(
self.getTemplateFilename('bully.conf.in'),
{
'self_id': int(self.parameter_dict['number']),
'ip_list': ip
}
))
path_list.append(conf)
slap_connection = self.buildout['slap-connection']
# XXX use the bin directory, do not run automatically yet
wrapper = self.createPythonScript(
name=os.path.join(self.options['bin'], self.parameter_dict['wrapper']),
absolute_function='slapos.recipe.addresiliency.bully.run',
arguments= {
'confpath': confpath,
'server_url': slap_connection['server-url'],
'key_file': slap_connection.get('key-file'),
'cert_file': slap_connection.get('cert-file'),
'computer_id': slap_connection['computer-id'],
'partition_id': slap_connection['partition-id'],
'software': slap_connection['software-release-url'],
'namebase': self.parameter_dict['namebase'],
})
path_list.append(wrapper)
return path_list
path_conf = os.path.join(self.options['script'], 'conf.in')
path_bully = os.path.join(self.options['script'], self.parameter_dict['script'])
path_bully_new = os.path.join(self.options['script'], 'new.py')
# XXX use the bin directory, do not run automatically yet
path_run = os.path.join(self.options['bin'], self.parameter_dict['wrapper'])
print 'paths: %s\n%s\n' % (path_run, path_bully)
bully_conf = dict(self_id=self_id,
ip_list=ip,
executable=sys.executable,
syspath=sys.path,
server_url=slap_connection['server-url'],
key_file=slap_connection.get('key-file'),
cert_file=slap_connection.get('cert-file'),
computer_id=slap_connection['computer-id'],
partition_id=slap_connection['partition-id'],
software=slap_connection['software-release-url'],
namebase=self.parameter_dict['namebase'],
confpath=path_conf)
try:
conf = self.createFile(path_conf,
self.substituteTemplate(
self.getTemplateFilename('conf.in.in'),
bully_conf))
path_list.append(conf)
script = self.createExecutable(path_bully,
self.substituteTemplate(
self.getTemplateFilename('bully.py.in'),
bully_conf))
path_list.append(script)
wrapper = self.createPythonScript(
path_run,
'slapos.recipe.librecipe.execute.execute',
[path_bully])
path_list.append(wrapper)
except IOError:
pass
return path_list
# -*- coding: utf-8 -*-
import logging
import socket
import thread
import time
from slapos import slap as slapmodule
import slapos
BASE_PORT = 50000
SLEEPING_MINS = 2 # XXX was 30, increase after testing
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
class Renamer(object):
def __init__(self, server_url, key_file, cert_file, computer_guid,
partition_id, software_release, namebase):
self.server_url = server_url
self.key_file = key_file
self.cert_file = cert_file
self.computer_guid = computer_guid
self.partition_id = partition_id
self.software_release = software_release
self.namebase = namebase
def _failover(self):
slap = slapmodule.slap()
slap.initializeConnection(self.server_url, self.key_file, self.cert_file)
# partition that will take over
cp = slap.registerComputerPartition(computer_guid=self.computer_guid,
partition_id=self.partition_id)
cp_old_name = self.namebase + '0'
# partition to be deactivated
broken = cp.request(software_release=self.software_release,
software_type='frozen',
partition_reference=cp_old_name)
broken_new_name = 'broken-{}'.format(time.strftime("%d-%b_%H:%M:%S", time.gmtime()))
# XXX how to print the old name
log.debug("Renaming {}: {}".format(broken.getId(), broken_new_name))
broken.rename(broken_new_name)
broken.stopped()
log.debug("Renaming {}: {}".format(cp.getId(), cp_old_name))
cp.rename(cp_old_name)
def failover(self):
try:
self._failover()
log.info('Renaming done')
except slapos.slap.slap.ServerError:
log.info('Internal server error')
## Leader is always number 0
class ResilientInstance(object):
def __init__(self, comm, renamer, confpath):
self.comm = comm
self.id = 0
self.state = 'normal'
self.halter = 0
self.inElection = False
self.alive = True
self.lastPing = time.clock()
self.mainCanal = self.comm.canal(['ping', 'halt', 'victory'])
self.renamer = renamer
self.okCanal = self.comm.canal(['ok'])
self.confpath = confpath
self.loadConnectionInfo()
def loadConnectionInfo(self):
file = open(self.confpath, 'r')
params = file.read().split('\n')
file.close()
self.nbComp = len([x.strip("' ") for x in params[0].strip('[],').split(',')])
new_id = int(params[1])
if self.id != new_id:
self.halter = new_id
self.id = new_id
## Needs to be changed to use the master
def aliveManagement(self):
while self.alive:
log.info('XXX sleeping for %d minutes' % SLEEPING_MINS)
time.sleep(SLEEPING_MINS*60)
if self.id == 0:
continue
self.comm.send('ping', 0)
message, sender = self.okCanal.get()
if message:
continue
self.election()
def listen(self):
while self.alive:
self.comm.recv()
def main(self):
while self.alive:
message, sender = self.mainCanal.get()
if message == 'ping':
self.comm.send('ok', sender)
elif message == 'halt':
self.state = 'waitingConfirm'
self.halter = sender
self.comm.send('ok', sender)
elif message == 'victory':
if int(sender) == int(self.halter) and self.state == 'waitingConfirm':
log.info('{} thinks {} is the leader'.format(self.id, sender))
self.comm.send('ok', sender)
self.state = 'normal'
def election(self):
self.inElection = True
self.loadConnectionInfo()
#Check if I'm the highest instance alive
for higher in range(self.id + 1, self.nbComp):
self.comm.send('ping', higher)
message, sender = self.okCanal.get()
if message:
log.info('{} is alive ({})'.format(higher, self.id))
self.inElection = False
return False
continue
if not self.alive:
return False
#I should be the new coordinator, halt those below me
log.info('Should be ME : {}'.format(self.id))
self.state = 'election'
self.halter = self.id
ups = []
for lower in range(self.id):
self.comm.send('halt', lower)
message, sender = self.okCanal.get()
if message:
ups.append(lower)
#Broadcast Victory
self.state = 'reorganization'
for up in ups:
self.comm.send('victory', up)
message, sender = self.okCanal.get()
if message:
continue
log.info('Something is wrong... let\'s start over')
return self.election()
self.state = 'normal'
self.active = True
log.info('{} Is THE LEADER'.format(self.id))
self.renamer.failover()
self.inElection = False
return True
class FilteredCanal(object):
def __init__(self, accept, timeout):
self.accept = accept
self.list = []
self.lock = thread.allocate_lock()
self.timeout = timeout
def append(self, message, sender):
if message in self.accept:
self.lock.acquire()
self.list.append([message, sender])
self.lock.release()
def get(self):
start = time.clock()
while (time.clock() - start < self.timeout):
self.lock.acquire()
if self.list:
self.lock.release()
return self.list.pop(0)
self.lock.release()
return [None, None]
class Wrapper(object):
def __init__(self, confpath, timeout=20):
self.canals = []
self.ips = []
self.id = 0
self.timeout = timeout
self.confpath = confpath
self.getConnectionInfo()
self.socket = None
def getConnectionInfo(self):
file = open(self.confpath, 'r')
params = file.read().split('\n')
file.close()
self.ips = [x.strip("' ") for x in params[0].strip('[],').split(',')]
self.id = int(params[1])
def start(self):
self.getConnectionInfo()
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.bind((self.ips[self.id], BASE_PORT + self.id))
self.socket.listen(5)
def send(self, message, number):
self.getConnectionInfo()
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.connect((self.ips[number], BASE_PORT + number))
s.send(message + (' {}\n'.format(self.id)))
except (socket.error, socket.herror, socket.gaierror, socket.timeout):
pass
finally:
s.close()
def canal(self, accept):
created = FilteredCanal(accept, self.timeout)
self.canals.append(created)
return created
def recv(self):
client, _ = self.socket.accept()
client_message = client.recv(1024)
if client_message:
message, sender = client_message.split()
for canal in self.canals:
canal.append(message, int(sender))
def run(args):
confpath = args.pop('confpath')
renamer = Renamer(server_url = args.pop('server_url'),
key_file = args.pop('key_file'),
cert_file = args.pop('cert_file'),
computer_guid = args.pop('computer_id'),
partition_id = args.pop('partition_id'),
software_release = args.pop('software'),
namebase = args.pop('namebase'))
if args:
raise ValueError('Unknown arguments: %s' % ', '.join(args))
wrapper = Wrapper(confpath=confpath, timeout=20)
computer = ResilientInstance(wrapper, renamer=renamer, confpath=confpath)
# idle waiting for connection infos
while computer.nbComp < 2 :
computer.loadConnectionInfo()
time.sleep(30)
log.info('Starting')
computer.comm.start()
thread.start_new_thread(computer.listen, ())
thread.start_new_thread(computer.main, ())
thread.start_new_thread(computer.aliveManagement, ())
while True:
# XXX tight loop
continue
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment