Commit 11ffe1eb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f6b7df1f
...@@ -395,250 +395,6 @@ def unmap(mmap): ...@@ -395,250 +395,6 @@ def unmap(mmap):
f.mmaps.remove(mmap) f.mmaps.remove(mmap)
"""
# WatchLink represents /head/watch link opened on wcfs.
#
# .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
# .close() closes the link.
#
# It is safe to use WatchLink from multiple threads simultaneously.
class WatchLink(object):
def __init__(wlink, wc):
wlink._wc = wc
# head/watch handle.
#
# python/stdio lock file object on read/write, however we need both
# read and write to be working simultaneously.
# -> use 2 separate file objects for rx and tx.
#
# fdopen takes ownership of file descriptor and closes it when file
# object is closed -> dup fd so that each file object has its own fd.
wh = os.open(wc._path("head/watch"), os.O_RDWR)
wh2 = os.dup(wh)
wlink._wrx = os.fdopen(wh, 'rb')
wlink._wtx = os.fdopen(wh2, 'wb')
# XXX vvv -> test only?
wlink.rx_eof = chan(dtype='C.structZ') # becomes ready when wcfs closes its tx side
wlink.fatalv = [] # fatal messages received from wcfs
# inv.protocol message IO
wlink._acceptq = chan() # (stream, msg) server originated messages go here
wlink._rxmu = sync.Mutex()
wlink._rxtab = {} # stream -> rxq server replies go via here
wlink._accepted = set() # of stream streams we accepted but did not replied yet
wlink._req_next = 1 # stream ID for next client-originated request XXX -> atomic
wlink._txmu = sync.Mutex() # serializes writes
wlink._txclosed = False
serveCtx, wlink._serveCancel = context.with_cancel(context.background())
wlink._serveWG = sync.WorkGroup(serveCtx)
wlink._serveWG.go(wlink._serveRX)
# this tWatchLink currently watches the following files at particular state.
# XXX test only: back -> tWatchLink ?
wlink._watching = {} # {} foid -> tWatch
def _closeTX(wlink):
# XXX -> sync.Once
if wlink._txclosed:
return
# ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
# _serveRX on client (= on us). The connection can be already closed by
# wcfs - so ignore errors when sending bye.
try:
wlink._send(1, b'bye')
except IOError:
pass
wlink._wtx.close()
wlink._txclosed = True
# close closes the link.
def close(wlink):
wlink._closeTX()
wlink._serveCancel()
# XXX we can get stuck here if wcfs does not behave as we want.
# XXX in particular if there is a silly - e.g. syntax or type error in
# test code - we currently get stuck here.
#
# XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour?
# XXX -> we now have `kill -QUIT` to wcfs.go on test timeout - remove ^^^ comments?
try:
wlink._serveWG.wait()
except Exception as e:
# canceled is expected and ok
if e != context.canceled:
reraise(e, None, e.__traceback__)
wlink._wrx.close()
# disable all established watches
# XXX test only -> tWatchLiink
for w in wlink._watching.values():
w.at = z64
w.pinned = {}
wlink._watching = {}
# _serveRX receives messages from ._wrx and dispatches them according to streamID.
@func
def _serveRX(wlink, ctx):
# when finishing - wakeup everyone waiting for rx
def _():
wlink._acceptq.close()
with wlink._rxmu:
rxtab = wlink._rxtab
wlink._rxtab = None # don't allow new rxtab registers
for rxq in rxtab.values():
rxq.close()
defer(_)
while 1:
# NOTE: .close() makes sure ._wrx.read*() will wake up
l = wlink._wrx.readline()
print('C: watch : rx: %r' % l)
if len(l) == 0: # peer closed its tx
wlink.rx_eof.close()
break
# <stream> ... \n
stream, msg = l.split(' ', 1)
stream = int(stream)
msg = msg.rstrip('\n')
if stream == 0: # control/fatal message from wcfs
# XXX print -> receive somewhere? XXX -> recvCtl ?
print('C: watch : rx fatal: %r' % msg)
wlink.fatalv.append(msg)
continue
reply = bool(stream % 2)
if reply:
with wlink._rxmu:
assert stream in wlink._rxtab # XXX !test assert - recheck
rxq = wlink._rxtab.pop(stream)
_, _rx = select(
ctx.done().recv, # 0
(rxq.send, msg), # 1
)
if _ == 0:
raise ctx.err()
else:
with wlink._rxmu:
assert stream not in wlink._accepted # XXX !test assert - recheck
wlink._accepted.add(stream)
_, _rx = select(
ctx.done().recv, # 0
(wlink._acceptq.send, (stream, msg)), # 1
)
if _ == 0:
raise ctx.err()
# _send sends raw message via specified stream.
#
# multiple _send can be called in parallel - _send serializes writes.
# XXX +ctx?
def _send(wlink, stream, msg):
assert '\n' not in msg
pkt = b"%d %s\n" % (stream, msg)
wlink._write(pkt)
def _write(wlink, pkt):
with wlink._txmu:
#print('C: watch : tx: %r' % pkt)
wlink._wtx.write(pkt)
wlink._wtx.flush()
# sendReq sends client -> server request and returns server reply.
def sendReq(wlink, ctx, req): # -> reply | None when EOF
rxq = wlink._sendReq(ctx, req)
_, _rx = select(
ctx.done().recv, # 0
rxq.recv, # 1
)
if _ == 0:
raise ctx.err()
return _rx
def _sendReq(wlink, ctx, req): # -> rxq
with wlink._txmu: # XXX -> atomic (currently uses arbitrary lock)
stream = wlink._req_next
wlink._req_next = (wlink._req_next + 2) & ((1<<64)-1)
rxq = chan() # -> XXX cap=1 so that we don't need to drain if _send fails
with wlink._rxmu:
assert stream not in wlink._rxtab # XXX !test assert - recheck
wlink._rxtab[stream] = rxq
wlink._send(stream, req)
return rxq
# recvReq receives client <- server request.
def recvReq(wlink, ctx): # -> SrvReq | None when EOF
_, _rx = select(
ctx.done().recv, # 0
wlink._acceptq.recv, # 1
)
if _ == 0:
raise ctx.err()
rx = _rx
if rx is None:
return rx
stream, msg = rx
return SrvReq(wlink, stream, msg)
# SrvReq represents 1 server-initiated wcfs request received over /head/watch link.
# XXX struct place -> ^^^ (nearby WatchLink) ?
class SrvReq(object):
def __init__(req, wlink, stream, msg):
req.wlink = wlink
req.stream = stream
req.msg = msg
def reply(req, answer):
#print('C: reply %s <- %r ...' % (req, answer))
wlink = req.wlink
with wlink._rxmu:
assert req.stream in wlink._accepted
wlink._send(req.stream, answer)
with wlink._rxmu:
assert req.stream in wlink._accepted
wlink._accepted.remove(req.stream)
# XXX also track as answered? (and don't accept with the same ID ?)
def _parse(req): # -> (foid, blk, at|None)
# pin <foid> #<blk> @(<at>|head)
m = re.match(b"pin (?P<foid>[0-9a-f]{16}) #(?P<blk>[0-9]+) @(?P<at>[^ ]+)$", req.msg)
if m is None:
raise RuntimeError("message is not valid pin request: %s" % qq(req.msg))
foid = fromhex(m.group('foid'))
blk = int(m.group('blk'))
at = m.group('at')
if at == "head":
at = None
else:
at = fromhex(at)
return foid, blk, at
@property
def foid(req): return req._parse()[0]
@property
def blk(req): return req._parse()[1]
@property
def at(req): return req._parse()[2]
"""
# ---- WCFS raw file access ---- # ---- WCFS raw file access ----
......
...@@ -72,10 +72,11 @@ pair<WatchLink, error> WCFS::_openwatch() { ...@@ -72,10 +72,11 @@ pair<WatchLink, error> WCFS::_openwatch() {
error _WatchLink::closeWrite() { error _WatchLink::closeWrite() {
_WatchLink &wlink = *this; _WatchLink &wlink = *this;
wlink._txclose1.do_([&]() { wlink._txclose1.do_([&]() {
// ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up // ask wcfs to close its tx & rx sides; wcfs.close(tx) wakes up
// _serveRX on client (= on us). The connection can be already closed // _serveRX on client (= on us). The connection can be already closed
// by wcfs - so ignore errors when sending bye. // by wcfs - so ignore errors when sending bye.
(void)wlink._send(1, "bye"); // XXX stream ok? (void)wlink._send(1, "bye"); // XXX stream ok?
// XXX vvv should be ~ shutdown(TX, wlink._f), however shutdown does // XXX vvv should be ~ shutdown(TX, wlink._f), however shutdown does
// not work for non-socket file descriptors. And even if we dup link // not work for non-socket file descriptors. And even if we dup link
// fd, and close only one used for TX, peer's RX will still be blocked // fd, and close only one used for TX, peer's RX will still be blocked
...@@ -89,6 +90,7 @@ error _WatchLink::closeWrite() { ...@@ -89,6 +90,7 @@ error _WatchLink::closeWrite() {
// close closes the link. // close closes the link.
error _WatchLink::close() { error _WatchLink::close() {
_WatchLink& wlink = *this; _WatchLink& wlink = *this;
// XXX err ctx?
error err = wlink.closeWrite(); error err = wlink.closeWrite();
wlink._serveCancel(); wlink._serveCancel();
...@@ -103,11 +105,7 @@ error _WatchLink::close() { ...@@ -103,11 +105,7 @@ error _WatchLink::close() {
if (err2 == context::canceled) if (err2 == context::canceled)
err2 = nil; err2 = nil;
//printf("close -> err =%s\n", (err != nil ? err->Error().c_str() : "nil"));
//printf("close -> err =%s\n", v(err));
error err3 = wlink._f->close(); error err3 = wlink._f->close();
//printf("close -> err2=%s\n", (err != nil ? err->Error().c_str() : "nil"));
if (err == nil) if (err == nil)
err = err2; err = err2;
if (err == nil) if (err == nil)
...@@ -179,7 +177,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -179,7 +177,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
wlink._rxmu.unlock(); wlink._rxmu.unlock();
if (!ok) { if (!ok) {
// wcfs sent reply on unexpected stream // wcfs sent reply on unexpected stream
// XXX log + dowmn. // XXX log + down.
printf("wcfs sent reply on unexpected stream\n"); printf("wcfs sent reply on unexpected stream\n");
continue; continue;
} }
...@@ -309,6 +307,8 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons ...@@ -309,6 +307,8 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons
} }
// replyReq sends reply to client <- server request received by recvReq. // replyReq sends reply to client <- server request received by recvReq.
//
// XXX document EOF.
error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) { error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) {
_WatchLink *wlink = this; _WatchLink *wlink = this;
// XXX err ctx? // XXX err ctx?
......
...@@ -66,8 +66,7 @@ static_assert(sizeof(rxPkt) == 256); // NOTE 128 is too low for long error me ...@@ -66,8 +66,7 @@ static_assert(sizeof(rxPkt) == 256); // NOTE 128 is too low for long error me
// .close() closes the link. // .close() closes the link.
// //
// It is safe to use WatchLink from multiple threads simultaneously. // It is safe to use WatchLink from multiple threads simultaneously.
class _WatchLink; typedef refptr<class _WatchLink> WatchLink;
typedef refptr<_WatchLink> WatchLink;
class _WatchLink : public object { class _WatchLink : public object {
WCFS *_wc; WCFS *_wc;
os::File _f; // head/watch file handle os::File _f; // head/watch file handle
...@@ -85,8 +84,8 @@ class _WatchLink : public object { ...@@ -85,8 +84,8 @@ class _WatchLink : public object {
sync::Mutex _txmu; // serializes writes sync::Mutex _txmu; // serializes writes
sync::Once _txclose1; sync::Once _txclose1;
sync::WorkGroup _serveWG; // _serveRX is running under _serveWG
func<void()> _serveCancel; func<void()> _serveCancel;
sync::WorkGroup _serveWG;
// XXX for tests // XXX for tests
public: public:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment