#!%(executable)s import logging import os import socket import sys import thread import time sys.path[:] = %(syspath)s from slapos import slap as slapmodule import slapos BASE_PORT = 50000 SLEEPING_MINS = 2 log = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) class Renamer(object): def __init__(self, server_url, key_file, cert_file, computer_guid, partition_id, software_release, namebase): self.server_url = server_url self.key_file = key_file self.cert_file = cert_file self.computer_guid = computer_guid self.partition_id = partition_id self.software_release = software_release self.namebase = namebase def _failover(self): slap = slapmodule.slap() slap.initializeConnection(self.server_url, self.key_file, self.cert_file) computer_partition = slap.registerComputerPartition(computer_guid=self.computer_guid, partition_id=self.partition_id) broken = computer_partition.request(software_release=self.software_release, software_type='frozen', partition_reference=self.namebase+'0') broken.rename('broken-{}'.format(time.strftime("%%d-%%b_%%H:%%M:%%S", time.gmtime()))) broken.stopped() computer_partition.rename(self.namebase+'0') def failover(self): try: log.info('renaming done') except slapos.slap.slap.ServerError: log.info('Internal server error') ## Leader is always number 0 class ResilientInstance(object): def __init__(self, comm, renamer, confpath): self.comm = comm self.id = 0 self.state = 'normal' self.halter = 0 self.inElection = False self.alive = True self.lastPing = time.clock() self.mainCanal = self.comm.canal(['ping', 'halt', 'victory']) self.renamer = renamer self.okCanal = self.comm.canal(['ok']) self.confpath = confpath self.loadConnectionInfos() def loadConnectionInfos(self): file = open(self.confpath, 'r') params = file.read().split('\n') file.close() self.nbComp = len([x.strip("' ") for x in params[0].strip('[],').split(',')]) new_id = int(params[1]) if self.id != new_id: self.halter = new_id self.id = new_id ## Needs to be changed to use the master def aliveManagement(self): while self.alive: log.info('XXX sleeping for %%d minutes' %% SLEEPING_MINS) time.sleep(SLEEPING_MINS*60) if self.id == 0: continue self.comm.send('ping', 0) message, sender = self.okCanal.get() if message: continue self.election() def listen(self): while self.alive: self.comm.recv() def main(self): while self.alive: message, sender = self.mainCanal.get() if message == 'ping': self.comm.send('ok', sender) elif message == 'halt': self.state = 'waitingConfirm' self.halter = sender self.comm.send('ok', sender) elif message == 'victory': if int(sender) == int(self.halter) and self.state == 'waitingConfirm': log.info('{} thinks {} is the leader'.format(self.id, sender)) self.comm.send('ok', sender) self.state = 'normal' def election(self): self.inElection = True self.loadConnectionInfos() #Check if I'm the highest instance alive for higher in range(self.id + 1, self.nbComp): self.comm.send('ping', higher) message, sender = self.okCanal.get() if message: log.info('{} is alive ({})'.format(higher, self.id)) self.inElection = False return False continue if not self.alive: return False #I should be the new coordinator, halt those below me log.info('Should be ME : {}'.format(self.id)) self.state = 'election' self.halter = self.id ups = [] for lower in range(self.id): self.comm.send('halt', lower) message, sender = self.okCanal.get() if message: ups.append(lower) #Broadcast Victory self.state = 'reorganization' for up in ups: self.comm.send('victory', up) message, sender = self.okCanal.get() if message: continue log.info('Something is wrong... let\'s start over') return self.election() self.state = 'normal' self.active = True log.info('{} Is THE LEADER'.format(self.id)) self.renamer.failover() self.inElection = False return True class FilteredCanal(object): def __init__(self, accept, timeout): self.accept = accept self.list = [] self.lock = thread.allocate_lock() self.timeout = timeout def append(self, message, sender): if message in self.accept: self.lock.acquire() self.list.append([message, sender]) self.lock.release() def get(self): start = time.clock() while (time.clock() - start < self.timeout): self.lock.acquire() if self.list: self.lock.release() return self.list.pop(0) self.lock.release() return [None, None] class Wrapper(object): def __init__(self, confpath, timeout=20): self.canals = [] self.ips = [] self.id = 0 self.timeout = timeout self.confpath = confpath self.getConnectionInfos() self.socket = None def getConnectionInfos(self): file = open(self.confpath, 'r') params = file.read().split('\n') file.close() self.ips = [x.strip("' ") for x in params[0].strip('[],').split(',')] self.id = int(params[1]) def start(self): self.getConnectionInfos() self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.socket.bind((self.ips[self.id], BASE_PORT + self.id)) self.socket.listen(5) def send(self, message, number): self.getConnectionInfos() try: s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) s.connect((self.ips[number], BASE_PORT + number)) s.send(message + (' {}\n'.format(self.id))) except (socket.error, socket.herror, socket.gaierror, socket.timeout): pass finally: s.close() def canal(self, accept): created = FilteredCanal(accept, self.timeout) self.canals.append(created) return created def recv(self): client, _ = self.socket.accept() client_message = client.recv(1024) if client_message: message, sender = client_message.split() for canal in self.canals: canal.append(message, int(sender)) def main(): renamer = Renamer(server_url = '%(server_url)s', key_file = '%(key_file)s', cert_file = '%(cert_file)s', computer_guid = '%(computer_id)s', partition_id = '%(partition_id)s', software_release = '%(software)s', namebase = '%(namebase)s') confpath = '%(confpath)s' wrapper = Wrapper(confpath=confpath, timeout=20) computer = ResilientInstance(wrapper, renamer=renamer, confpath=confpath) #idle waiting for connection infos while computer.nbComp < 2 : computer.loadConnectionInfos() time.sleep(30) log.info('Starting') computer.comm.start() thread.start_new_thread(computer.listen, ()) thread.start_new_thread(computer.main, ()) thread.start_new_thread(computer.aliveManagement, ()) while True: # XXX tight loop continue if __name__ == '__main__': main()