Commit b12ca332 authored by Antoine Catton's avatar Antoine Catton

Implementing simple version of distributed Yingjie's election algorithm.

parent 5f009e97
...@@ -33,6 +33,7 @@ setup(name=name, ...@@ -33,6 +33,7 @@ setup(name=name,
'psutil', # needed for playing with processes in portable way 'psutil', # needed for playing with processes in portable way
'setuptools', # namespaces 'setuptools', # namespaces
'slapos.core', # as it provides library for slap 'slapos.core', # as it provides library for slap
'SQLAlchemy', # needed by wordpress.ia
'xml_marshaller', # needed to dump information 'xml_marshaller', # needed to dump information
], ],
extras_require = { extras_require = {
......
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import argparse
from datetime import datetime
import httplib
import json
import os
import urllib2
import urlparse
import uuid
from collections import deque
import atomize
from flask import Flask
from flask import abort
from flask import request
from flask import make_response
from sqlalchemy import desc
from sqlalchemy import asc
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from slapos.wordpress.ia.data import Base, Component, Entry, Config, Peer
app = Flask(__name__)
app.config.from_envvar('CONFIG')
DATABASE_PATH = app.config['DATABASE_PATH']
if not os.path.exists(DATABASE_PATH):
open(DATABASE_PATH, 'w').close() # Just a touch
engine = create_engine('sqlite:///%s' % os.path.abspath(DATABASE_PATH))
Base.metadata.create_all(engine)
Session = sessionmaker()
Session.configure(bind=engine)
N_FEED_ENTRY_DEFAULT = 50
N_FEED_ENTRY_MAX = 500
def get_session_id(session):
id_config = session.query(Config).filter(Config.id == 'id').first()
if id_config is None:
app_id = uuid.uuid4().urn
session.add(Config(id='id', value=app_id))
session.commit()
else:
app_id = id_config.value
return app_id
def get_master(session):
app_id = get_session_id(session)
candidate_list = session.query(Peer).filter(Peer.id > app_id)\
.order_by(asc(Peer.id))
for candidate in candidate_list:
try:
candidate_id = uuid.UUID(candidate.id)
id_url = urlparse.urljoin(candidate.url, 'id')
read_id = uuid.UUID(urllib2.urlopen(id_url).read())
if candidate_id == read_id:
return candidate
except SystemExit:
raise
except:
pass
return None # Master is me
@app.route('/getPeers')
def get_peers():
result = make_response(json.dumps(app.config.get('PEERS', [])))
result.headers['Content-Type'] = 'application/json'
return result
@app.route('/refreshMesh')
def refresh_mesh():
global app
session = Session()
session.query(Peer).delete()
seen = deque([get_session_id(session)])
to_see = deque(app.config.get('PEERS', []))
while to_see:
url = to_see.pop()
try:
id_url = urlparse.urljoin(url, 'id')
id_ = uuid.UUID(urllib2.urlopen(id_url).read())
if id_ not in seen:
peer_entry = Peer(id=id_.urn, url=url)
session.add(peer_entry)
peers_list_url = urlparse.urljoin(url, 'getPeers')
peers_url = json.loads(urllib2.urlopen(peers_list_url))
to_see.extend(peers_url)
except SystemExit:
pass
except:
pass
session.commit()
return '', httplib.NO_CONTENT
@app.route("/master/vote")
def vote_for_master():
global app
session = Session()
master = get_master(session)
if master is not None:
id_ = uuid.UUID(master.id).urn
else:
id_ = get_session_id(session)
response = make_response(id_)
response.headers['Content-Type'] = 'text/plain'
return response
@app.route("/master/get")
def get_master_url():
global app
session = Session()
master_id = session.query(Config).filter(Config.id == 'master').first()
if master_id is None:
abort(httplib.NOT_FOUND)
master = session.query(Peer).filter(Peer.id == master_id.value).first()
if master is None:
return '', httplib.NO_CONTENT
else:
response = make_response(master.url)
response.headers['Content-Type'] = 'text/plain'
return response
@app.route("/master/suggest", methods=['POST'])
def sugget_master():
global app
session = Session()
try:
suggested_master = uuid.UUID(request.form['master'])
except ValueError:
abort(httplib.BAD_REQUEST)
master = get_master(session)
if master is None:
master = uuid.UUID(get_session_id(session))
else:
master = uuid.UUID(master.id)
if master != suggested_master:
abort(httplib.CONFLICT)
else:
master_query = session.query(Config).filter(Config.id == 'master')
if master_query.count() > 1:
master_query.delete()
session.add(Config(id='master', value=master.urn))
session.commit()
return '', httplib.NO_CONTENT
@app.route("/master")
def selected_master():
global app
session = Session()
master = session.query(Config).filter(Config.id == 'master').first()
if master is None:
return '', httplib.NO_CONTENT
response = make_response(master)
response.headers['Content-Type'] = 'plain/text'
return response
@app.route("/log/<title>", methods=['GET'])
def get_log_entries(title):
global app
session = Session()
component = session.query(Component).filter(Component.title == title)\
.first()
if component is None:
abort(httplib.NOT_FOUND)
min_date = request.args.get('min-date')
max_date = request.args.get('max-date')
try:
if min_date is not None:
min_date = datetime.fromtimestamp(float(min_date))
if max_date is not None:
max_date = datetime.fromtimestamp(float(max_date))
if min_date > max_date:
raise ValueError
number = int(request.args.get('n', N_FEED_ENTRY_DEFAULT))
except ValueError:
abort(httplib.BAD_REQUEST)
if number > N_FEED_ENTRY_MAX:
abort(httplib.REQUEST_ENTITY_TOO_LARGE)
pattern = request.args.get('pattern', None)
entries = session.query(Entry).filter(Entry.component_id == component.id)
if min_date is not None:
entries = entries.filter(Entry.datetime > min_date)
if max_date is not None:
entries = entries.filter(Entry.datetime < max_date)
if pattern is not None:
entries = entries.filter(Entry.content.like(pattern))
entries.order_by(desc(Entry.datetime))
entries = entries.limit(number)
feed_entries = []
feed_updated = datetime.fromtimestamp(0)
for entry in entries:
kwargs = {}
if entry.content is not None:
kwargs['content'] = atomize.Content(entry.content)
feed_entries.append(atomize.Entry(
title=entry.title,
guid=urlparse.urljoin(request.url, '/%d/%d' % (entry.component.id, entry.id)),
updated=entry.datetime,
author=entry.component.title,
**kwargs
))
feed_updated = max(feed_updated, entry.datetime)
feed = atomize.Feed(
title=component.title,
updated=feed_updated,
guid=request.url,
self_link=request.url,
entries=feed_entries,
author=component.title,
)
return feed.feed_string()
@app.route('/id')
def get_id():
session = Session()
app_id = get_session_id(session)
response = make_response(app_id)
response.headers['Content-Type'] = 'text/plain'
return response
def run():
global app
parser = argparse.ArgumentParser(description="Wordpress Log Server")
parser.add_argument('-d', '--debug', action='store_const',
const=True, default=False,
help="enable debug mode for development.")
parser.add_argument('--host', metavar='HOSTNAME', default='0.0.0.0',
required=False, help="specify hostname to bind on.",
type=str)
parser.add_argument('--port', metavar='PORT', type=int, default=8080,
help="specify port to bind on.")
args = parser.parse_args()
app.run(host=args.host, port=args.port, debug=args.debug)
if __name__ == '__main__':
run()
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
__all__ = ['Base', 'Component', 'Entry']
Base = declarative_base()
class Component(Base):
__tablename__ = 'components'
id = Column(Integer, primary_key=True)
title = Column(String, nullable=False)
entries = relationship('Entry', backref='component')
class Entry(Base):
__tablename__ = 'entries'
id = Column(Integer, primary_key=True)
component_id = Column(Integer, ForeignKey('components.id',
onupdate="cascade",
ondelete="cascade"),
nullable=False)
datetime = Column(DateTime, nullable=False)
title = Column(String, nullable=False)
content = Column(Text, nullable=True)
class Config(Base):
__tablename__ = 'configuration_values'
id = Column(String, nullable=False, unique=True, primary_key=True)
value = Column(Text, nullable=False)
class Peer(Base):
__tablename__ = 'peer_tree'
id = Column(String, nullable=False, unique=True, primary_key=True)
url = Column(String, nullable=False)
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import argparse
from collections import deque
import time
import urlparse
import urllib
import httplib
import json
import uuid
class Connector(httplib.HTTPConnection):
def __init__(self, url):
self._url = urlparse.urlparse(url)
hostname = self._url.hostname
if ':' in hostname: # IPv6
hostname = '[%s]' % hostname
httplib.HTTPConnection.__init__(self, hostname,
self._url.port)
self._path_base = self._url.path
self._peers = {}
self._master_url = None
def request(self, method, url, body=None, headers={}):
url = urlparse.urljoin(self._url.path, url)
return httplib.HTTPConnection.request(self, method, url, body, headers)
def GET(self, url, params={}, headers={}):
self.request('GET', urlparse.urljoin(url, urllib.urlencode(params)))
return self.getresponse()
def POST(self, url, data, headers={}):
if isinstance(data, dict):
_headers = headers.copy()
_headers['Content-Type'] = 'application/x-www-form-urlencoded'
_data = urllib.urlencode(data)
self.request('POST', url, body=_data, headers=_headers)
elif isinstance(data, basestring):
self.request('POST', url, body=data, headers=headers)
else:
raise ValueError("data is neither a dict nor a string.")
return self.getresponse()
def geturl(self):
return self._url.geturl()
def __eq__(self, other):
if not isinstance(other, Connector):
raise ValueError, "Connector expected"
return self._url.geturl() == other._url.geturl()
class Server(Connector):
def gather_peers(self):
self._peers = {}
seen = deque()
to_see = deque([Connector(self._url.geturl())])
while to_see:
try:
connector = to_see.popleft()
if connector not in seen:
seen.append(connector)
id_ = connector.GET('id').read()
self.set_peer_id(id_, connector)
to_see.extend([Connector(url)
for url in json.loads(self.GET('getPeers').read())])
# Swallow everything except SIGTERM
except SystemExit:
raise
except:
pass
@staticmethod
def _convert_uuid(id_):
if isinstance(id_, basestring):
id_ = uuid.UUID(id_)
if not isinstance(id_, uuid.UUID):
raise ValueError("id should be an uuid")
return id_
def set_peer_id(self, id_, value):
id_ = Server._convert_uuid(id_)
self._peers[id_] = value
def get_peer_id(self, id_):
id_ = Server._convert_uuid(id_)
if id_ not in self._peers:
self.gather_peers()
return self._peers[id_]
def list_peers(self):
return self._peers.values()
def do_master_election(self):
self.GET('refreshMesh')
id_ = Server._convert_uuid(self.GET('master/vote').read())
self.gather_peers()
try:
peer_list = self.list_peers()
failed_list = []
for peer in peer_list:
response = peer.POST('master/suggest', {'master': id_.urn})
if response.status == httplib.NO_CONTENT:
pass
else:
failed_list.append(peer)
except SystemExit:
raise
except:
pass
return self.get_peer_id(id_).geturl()
def get_master(self):
if self._master_url is None:
get_master = self.GET('master/get')
if get_master.status == httplib.NOT_FOUND:
self._master_url = self.do_master_election()
elif get_master.status == httplib.NO_CONTENT:
self._master_url = self._url.geturl()
else:
try:
self._master_url = get_master.read()
except IndexError:
self._master_url = self.do_master_election()
else:
try:
Server._conver_uuid(Connector(self._master_url).GET('id').read())
except SystemExit:
raise
except:
self._master_url = None
return self.get_master()
return self._master_url
def watch(self):
print repr(self.get_master())
def main():
parser = argparse.ArgumentParser(description="Wordpress Watch Dog")
parser.add_argument('url', metavar='URL', nargs=1, help="Url of one node.")
parser.add_argument('--frequency', '-f', type=int, help="Frequency of watchdog.",
default=60)
args = parser.parse_args()
server = Server(args.url[0])
while True:
time.sleep(args.frequency)
server.watch()
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment