Commit 6c9abffc authored by Antoine Catton's avatar Antoine Catton

Commit WIP on wordpress watchdog.

parent f0711a83
...@@ -67,6 +67,7 @@ setup(name=name, ...@@ -67,6 +67,7 @@ setup(name=name,
'logfollower = slapos.logfollower:main', 'logfollower = slapos.logfollower:main',
'runafter = slapos.runafter:main', 'runafter = slapos.runafter:main',
'wp-agent = slapos.wordpress.ia.http:main', 'wp-agent = slapos.wordpress.ia.http:main',
'wp-watchdog = slapos.wordpress.ia.watchdog:main',
'wp-logfollower = slapos.wordpress.ia.logfollower:main', 'wp-logfollower = slapos.wordpress.ia.logfollower:main',
] ]
}, },
......
...@@ -25,154 +25,9 @@ ...@@ -25,154 +25,9 @@
# #
############################################################################## ##############################################################################
import argparse import argparse
from collections import deque
import time import time
import urlparse
import urllib
import httplib
import json
import uuid
class Connector(httplib.HTTPConnection):
def __init__(self, url):
self._url = urlparse.urlparse(url)
hostname = self._url.hostname
if ':' in hostname: # IPv6
hostname = '[%s]' % hostname
httplib.HTTPConnection.__init__(self, hostname,
self._url.port)
self._path_base = self._url.path
self._peers = {}
self._master_url = None
def request(self, method, url, body=None, headers={}):
url = urlparse.urljoin(self._url.path, url)
return httplib.HTTPConnection.request(self, method, url, body, headers)
def GET(self, url, params={}, headers={}):
self.request('GET', urlparse.urljoin(url, urllib.urlencode(params)))
return self.getresponse()
def POST(self, url, data, headers={}):
if isinstance(data, dict):
_headers = headers.copy()
_headers['Content-Type'] = 'application/x-www-form-urlencoded'
_data = urllib.urlencode(data)
self.request('POST', url, body=_data, headers=_headers)
elif isinstance(data, basestring):
self.request('POST', url, body=data, headers=headers)
else:
raise ValueError("data is neither a dict nor a string.")
return self.getresponse()
def geturl(self):
return self._url.geturl()
def __eq__(self, other):
if not isinstance(other, Connector):
raise ValueError, "Connector expected"
return self._url.geturl() == other._url.geturl()
class Server(Connector):
def gather_peers(self):
self._peers = {}
seen = deque()
to_see = deque([Connector(self._url.geturl())])
while to_see:
try:
connector = to_see.popleft()
if connector not in seen:
seen.append(connector)
id_ = connector.GET('id').read()
self.set_peer_id(id_, connector)
to_see.extend([Connector(url)
for url in json.loads(self.GET('getPeers').read())])
# Swallow everything except SIGTERM
except SystemExit:
raise
except:
pass
@staticmethod
def _convert_uuid(id_):
if isinstance(id_, basestring):
id_ = uuid.UUID(id_)
if not isinstance(id_, uuid.UUID):
raise ValueError("id should be an uuid")
return id_
def set_peer_id(self, id_, value):
id_ = Server._convert_uuid(id_)
self._peers[id_] = value
def get_peer_id(self, id_):
id_ = Server._convert_uuid(id_)
if id_ not in self._peers:
self.gather_peers()
return self._peers[id_]
def list_peers(self):
return self._peers.values()
def do_master_election(self):
self.GET('refreshMesh')
id_ = Server._convert_uuid(self.GET('master/vote').read())
self.gather_peers()
try:
peer_list = self.list_peers()
failed_list = []
for peer in peer_list:
response = peer.POST('master/suggest', {'master': id_.urn})
if response.status == httplib.NO_CONTENT:
pass
else:
failed_list.append(peer)
except SystemExit:
raise
except:
pass
return self.get_peer_id(id_).geturl()
def get_master(self):
if self._master_url is None:
get_master = self.GET('master/get')
if get_master.status == httplib.NOT_FOUND:
self._master_url = self.do_master_election()
elif get_master.status == httplib.NO_CONTENT:
self._master_url = self._url.geturl()
else:
try:
self._master_url = get_master.read()
except IndexError:
self._master_url = self.do_master_election()
else:
try:
Server._conver_uuid(Connector(self._master_url).GET('id').read())
except SystemExit:
raise
except:
self._master_url = None
return self.get_master()
return self._master_url
def watch(self):
print repr(self.get_master())
from .connection import Server
def main(): def main():
...@@ -180,12 +35,18 @@ def main(): ...@@ -180,12 +35,18 @@ def main():
parser.add_argument('url', metavar='URL', nargs=1, help="Url of one node.") parser.add_argument('url', metavar='URL', nargs=1, help="Url of one node.")
parser.add_argument('--frequency', '-f', type=int, help="Frequency of watchdog.", parser.add_argument('--frequency', '-f', type=int, help="Frequency of watchdog.",
default=60) default=60)
parser.add_argument('--master-url', '-m', help='ViFiB Master Url')
parser.add_argument('--cert-file', '-c', required=True)
parser.add_argument('--key-file', '-k', required=True)
parser.add_argument('--software-release-url', '-s', required=True)
args = parser.parse_args() args = parser.parse_args()
server = Server(args.url[0]) server = Server(args.url[0], args.software_release_url,
args.master_url, args.key_file, args.cert_file)
while True: while True:
time.sleep(args.frequency) time.sleep(args.frequency)
server.watch() server.get_master()
server.think(1)
......
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from collections import deque
import urlparse
import urllib
import httplib
import json
import uuid
import feedparser
import datetime
import math
import time
from slapos.slap import slap
def get_status(feed_content):
feed = feedparser.parse(feed_content)
# Just for test purpose
# here should be the algorithm saying if we
# should rename or not the partition.
return False
def get_timestamp(minutes_ago):
result = datetime.datetime.now()
result -= datetime.timedelta(minutes=minutes_ago)
return int(math.floor(time.mktime(result.timetuple())))
class Connector(httplib.HTTPConnection):
def __init__(self, url):
self._url = urlparse.urlparse(url)
httplib.HTTPConnection.__init__(self, self._url.hostname,
self._url.port)
self._path_base = self._url.path
self._peers = {}
self._master_url = None
self._computer_id = self.GET('info/computerId').read().strip()
self._partition_id = self.GET('info/partitionId').read().strip()
def request(self, method, url, body=None, headers={}):
url = urlparse.urljoin(self._url.path, url)
return httplib.HTTPConnection.request(self, method, url, body, headers)
def GET(self, url, params={}, headers={}):
self.connect()
self.request('GET', urlparse.urljoin(url, urllib.urlencode(params)))
return self.getresponse()
def POST(self, url, data, headers={}):
if isinstance(data, dict):
_headers = headers.copy()
_headers['Content-Type'] = 'application/x-www-form-urlencoded'
_data = urllib.urlencode(data)
self.request('POST', url, body=_data, headers=_headers)
elif isinstance(data, basestring):
self.request('POST', url, body=data, headers=headers)
else:
raise ValueError("data is neither a dict nor a string.")
return self.getresponse()
def geturl(self):
return self._url.geturl()
def __eq__(self, other):
if not isinstance(other, Connector):
return NotImplemented
return self._url.geturl() == other._url.geturl()
class Server(Connector):
def __init__(self, url, software_release,
master_url, key_file=None, cert_file=None):
Connector.__init__(self, url)
self._slapos_master_url = master_url
self._key_file = key_file
self._cert_file = cert_file
self._software_release = software_release
def gather_peers(self):
self._peers = {}
seen = deque()
to_see = deque([Connector(self._url.geturl())])
while to_see:
try:
connector = to_see.popleft()
if connector not in seen:
seen.append(connector)
id_ = connector.GET('id').read()
self.set_peer_id(id_, connector)
to_see.extend([Connector(url)
for url in json.loads(connector.GET('getPeers').read())])
except:
pass
# For debug purpose only.
# Need to be remove in the future in order to
# swallow everything
raise
@staticmethod
def _convert_uuid(id_):
if isinstance(id_, basestring):
id_ = uuid.UUID(str(id_))
if not isinstance(id_, uuid.UUID):
raise ValueError("id should be an uuid")
return id_
def set_peer_id(self, id_, value):
id_ = Server._convert_uuid(id_)
type_ = None
try:
type_ = value.GET('info/type').read()
except:
pass
# For debug purpose only.
# Need to be remove in the future in order to
# swallow everything
raise
else:
to_set_none = deque()
# Look for peer having same type (type should be unique)
for peer_id, peer_data in self._peers.iteritems():
peer_type = peer_data[0]
if peer_type == type_:
to_set_none.append(peer_id)
# Set type None for those peers
for peer_id in to_set_none:
self._peers[peer_id] = (None, self._peers[peer_id][1],)
self._peers[id_] = (type_, value,)
def get_peer_id(self, id_):
id_ = Server._convert_uuid(id_)
if id_ not in self._peers:
self.gather_peers()
return self._peers[id_][1]
def list_peers(self):
return (p[1] for p in self._peers.values())
def do_master_election(self):
self.GET('refreshMesh').read()
id_ = Server._convert_uuid(self.GET('master/vote').read())
self.gather_peers()
try:
peer_list = self.list_peers()
failed_list = []
for peer in peer_list:
response = peer.POST('master/suggest', {'master': id_.urn})
if response.status == httplib.NO_CONTENT:
pass
else:
failed_list.append(peer)
except:
pass
# For debug purpose only.
# Need to be remove in the future in order to
# swallow everything
raise
return self.get_peer_id(id_).geturl()
def get_master(self):
if self._master_url is None:
get_master = self.GET('master/get')
master_url = get_master.read()
if get_master.status == httplib.NOT_FOUND:
self._master_url = self.do_master_election()
elif get_master.status == httplib.NO_CONTENT:
self._master_url = self._url.geturl()
else:
try:
self._master_url = master_url
except IndexError:
self._master_url = self.do_master_election()
else:
try:
Server._conver_uuid(Connector(self._master_url).GET('id').read())
except:
self._master_url = None
return self.get_master()
return self._master_url
def think(self, threshold):
"""Main function of watchdog behavior.
"""
self.gather_peers()
for peer in self.list_peers():
try:
log_list_request = peer.GET('logList')
if log_list_request.status == httplib.NOT_FOUND:
log_list_request.read()
continue
log_list = json.loads(log_list_request.read())
for item in log_list:
status = get_status(
peer.GET('status/%s?min-date=%d' % (peer, get_timestamp(threshold), )
).read()
)
if not status:
if item == 'mariadb':
self.switch_to_mariadb_backup()
else:
pass
except:
pass
# For debug purpose only.
# Need to be remove in the future in order to
# swallow everything
raise
def get_type(self, type_):
for peer_type, peer in self._peers.itervalues():
if peer_type == type_:
return peer
raise KeyError("%s not found" % type_)
def switch_to_mariadb_backup(self):
mariadb_backup = self.get_type('mariadb-backup')
mariadb = self.get_type('mariadb')
self.down(mariadb)
self.rename(mariadb_backup, 'MariaDB')
self.bang()
def down(self, connector):
new_name = 'down_%s' % uuid.uuid4().hex
self.rename(connector, new_name)
self.slaprequest(partition_reference=new_name,
software_type='down')
def rename(self, connector, new_name):
partition = self.get_partition(connector)
partition.rename(new_name)
def get_partition(self, connector):
return self._register_cp(connector._computer_id,
connector._partition_id)
def slaprequest(self, *args, **kwargs):
partition = self._register_cp(self._computer_id,
self._partition_id)
return partition.request(self._software_release,
*args, **kwargs)
def bang(self, *args, **kwargs):
partition = self._register_cp(self._computer_id,
self._partition_id)
return partition.bang(self._software_release,
*args, **kwargs)
def _register_cp(self, computer_id, partition_id):
connection = slap()
connection.initializeConnection(
self._slapos_master_url,
self._key_file,
self._cert_file,
)
return connection.registerComputerPartition(computer_id,
partition_id)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment