Commit 5e1f40d6 authored by Martín Ferrari's avatar Martín Ferrari

Reorganization of the client/server code; some documentation

parent 380ceed4
...@@ -19,8 +19,8 @@ def set_cleanup_hooks(on_exit = False, on_signals = []): ...@@ -19,8 +19,8 @@ def set_cleanup_hooks(on_exit = False, on_signals = []):
class Node(object): class Node(object):
def __init__(self): def __init__(self):
self.slave_pid, self.slave_fd = spawn_slave() self._slave = netns.protocol.Slave()
self.valid = True self._valid = True
@property @property
def pid(self): def pid(self):
return self.slave_pid return self.slave_pid
...@@ -57,33 +57,4 @@ class Process(object): ...@@ -57,33 +57,4 @@ class Process(object):
self.pid = os.getpid() self.pid = os.getpid()
self.valid = True self.valid = True
import os, socket, sys, traceback, unshare
def spawn_slave():
(s0, s1) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)
#ppid = os.getpid()
pid = os.fork()
if pid:
helo = s0.recv(4096).rstrip().split(None, 1)
if int(helo[0]) / 100 != 2:
raise RuntimeError("Failed to start slave node: %s" % helo[1])
s1.close()
return (pid, s0)
srv = netns.protocol.Server(s1.fileno())
try:
s0.close()
#unshare.unshare(unshare.CLONE_NEWNET)
except BaseException, e:
srv.abort(str(e))
# Try block just in case...
try:
srv.run()
except:
traceback.print_exc(file = sys.stderr)
os._exit(1)
else:
os._exit(0)
# NOTREACHED
#!/usr/bin/env python #!/usr/bin/env python
# vim:ts=4:sw=4:et:ai:sts=4 # vim:ts=4:sw=4:et:ai:sts=4
# FIXME:
# Not only missing docs; this would be nicer if merged the spawn_slave
# functionality also. need to investigate...
try: try:
from yaml import CLoader as Loader from yaml import CLoader as Loader
from yaml import CDumper as Dumper from yaml import CDumper as Dumper
except ImportError: except ImportError:
from yaml import Loader, Dumper from yaml import Loader, Dumper
import base64, os, passfd, re, signal, socket, sys, unshare, yaml
import base64, os, passfd, re, sys, yaml
import netns.subprocess import netns.subprocess
# ============================================================================
# Server-side protocol implementation
#
# Protocol definition # Protocol definition
# -------------------
# #
# First key: command # First key: command
# Second key: sub-command or None # Second key: sub-command or None
...@@ -86,13 +84,6 @@ class Server(object): ...@@ -86,13 +84,6 @@ class Server(object):
else: else:
self.f = os.fdopen(fd, "r+", 1) self.f = os.fdopen(fd, "r+", 1)
def abort(self, str):
# FIXME: this should be aware of the state of the server
# FIXME: cleanup
self.reply(500, str)
sys.stderr.write("Slave node aborting: %s\n" %str);
os._exit(1)
def reply(self, code, text): def reply(self, code, text):
"Send back a reply to the client; handle multiline messages" "Send back a reply to the client; handle multiline messages"
if not hasattr(text, '__iter__'): if not hasattr(text, '__iter__'):
...@@ -337,18 +328,71 @@ class Server(object): ...@@ -337,18 +328,71 @@ class Server(object):
# def do_ROUT_ADD(self, cmdname, prefix, prefixlen, nexthop, ifnr): # def do_ROUT_ADD(self, cmdname, prefix, prefixlen, nexthop, ifnr):
# def do_ROUT_DEL(self, cmdname, prefix, prefixlen, nexthop, ifnr): # def do_ROUT_DEL(self, cmdname, prefix, prefixlen, nexthop, ifnr):
# ============================================================================
#
# Client-side protocol implementation, and slave process creation
#
# Handle the creation of the child; parent gets (fd, pid), child never returns
def _start_child():
# Create socket pair to communicate
(s0, s1) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)
# Spawn a child that will run in a loop
pid = os.fork()
if pid:
s1.close()
return (s0, pid)
# ================ try:
s0.close()
srv = Server(s1)
#unshare.unshare(unshare.CLONE_NEWNET)
except BaseException, e:
s = "Slave node aborting: %s\n" % str(e)
try:
# try to pass the error to parent, if possible
s0.write("500 " + e)
except:
pass
sys.stderr.write(e)
os._exit(1)
class Client(object): # Try block just in case...
def __init__(self, fd): try:
srv.run()
except:
os._exit(1)
else:
os._exit(0)
# NOTREACHED
class Slave(object):
"""Class to create and manage slave processes."""
def __init__(self, fd = None, pid = None):
"""When called without arguments, it will fork, create a new network
namespace and enter a loop to serve requests from the master. The
parent process will return an object which is used to control the slave
thru RPC-like calls.
If fd and pid are specified, the slave process is not created; fd is
used as a control socket and pid is assumed to be the pid of the slave
process."""
if fd and pid:
# If fd is passed do not fork or anything
if hasattr(fd, "readline"): if hasattr(fd, "readline"):
self.f = fd pass # fd ok
else: else:
if hasattr(fd, "makefile"): if hasattr(fd, "makefile"):
self.f = fd.makefile(fd, "r+", 1) # line buffered fd = fd.makefile("r+", 1) # line buffered
else: else:
self.f = os.fdopen(fd, "r+", 1) fd = os.fdopen(fd, "r+", 1)
else:
f, pid = _start_child()
fd = f.makefile("r+", 1) # line buffered
self._pid = pid
self._fd = fd
# Wait for slave to send banner
self._read_and_check_reply()
def _send_cmd(self, *args): def _send_cmd(self, *args):
s = " ".join(map(str, args)) + "\n" s = " ".join(map(str, args)) + "\n"
...@@ -373,7 +417,9 @@ class Client(object): ...@@ -373,7 +417,9 @@ class Client(object):
return (int(status), text) return (int(status), text)
def _read_and_check_reply(self, expected = 2): def _read_and_check_reply(self, expected = 2):
"Reads a response and raises an exception if the code is not 2xx." """Reads a response and raises an exception if the first digit of the
code is not the expected value. If expected is not specified, it
defaults to 2."""
code, text = self._read_reply() code, text = self._read_reply()
if code / 100 != expected: if code / 100 != expected:
raise "Error on command: %d %s" % (code, text) raise "Error on command: %d %s" % (code, text)
...@@ -385,6 +431,7 @@ class Client(object): ...@@ -385,6 +431,7 @@ class Client(object):
self._read_and_check_reply() self._read_and_check_reply()
def _send_fd(self, type, fd): def _send_fd(self, type, fd):
"Pass a file descriptor"
self._send_cmd("PROC", type) self._send_cmd("PROC", type)
self._read_and_check_reply(3) self._read_and_check_reply(3)
passfd.sendfd(self.f.fileno(), fd, "PROC " + type) passfd.sendfd(self.f.fileno(), fd, "PROC " + type)
...@@ -392,7 +439,9 @@ class Client(object): ...@@ -392,7 +439,9 @@ class Client(object):
def popen(self, uid, gid, file, argv = None, cwd = None, env = None, def popen(self, uid, gid, file, argv = None, cwd = None, env = None,
stdin = None, stdout = None, stderr = None): stdin = None, stdout = None, stderr = None):
"Start a subprocess in the slave." """Start a subprocess in the slave; the interface resembles
subprocess.Popen, but with less functionality. In particular
stdin/stdout/stderr can only be None or a open file descriptor."""
params = ["PROC", "CRTE", uid, gid, base64.b64encode(file)] params = ["PROC", "CRTE", uid, gid, base64.b64encode(file)]
if argv: if argv:
...@@ -426,6 +475,8 @@ class Client(object): ...@@ -426,6 +475,8 @@ class Client(object):
return pid return pid
def poll(self, pid): def poll(self, pid):
"""Equivalent to Popen.poll(), checks if the process has finished.
Returns the exitcode if finished, None otherwise."""
self._send_cmd("PROC", "POLL", pid) self._send_cmd("PROC", "POLL", pid)
code, text = self._read_reply() code, text = self._read_reply()
if code / 100 == 2: if code / 100 == 2:
...@@ -437,15 +488,20 @@ class Client(object): ...@@ -437,15 +488,20 @@ class Client(object):
raise "Error on command: %d %s" % (code, text) raise "Error on command: %d %s" % (code, text)
def wait(self, pid): def wait(self, pid):
"""Equivalent to Popen.wait(). Waits for the process to finish and
returns the exitcode."""
self._send_cmd("PROC", "WAIT", pid) self._send_cmd("PROC", "WAIT", pid)
text = self._read_and_check_reply() text = self._read_and_check_reply()
exitcode = text.split()[0] exitcode = text.split()[0]
return exitcode return exitcode
def kill(self, pid, signal = None): def kill(self, pid, sig = signal.SIGTERM):
if signal: """Equivalent to Popen.send_signal(). Sends a signal to the child
self._send_cmd("PROC", "KILL", pid, signal) process; signal defaults to SIGTERM."""
if sig:
self._send_cmd("PROC", "KILL", pid, sig)
else: else:
self._send_cmd("PROC", "KILL", pid) self._send_cmd("PROC", "KILL", pid)
text = self._read_and_check_reply() text = self._read_and_check_reply()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment