Commit 547b77af authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 36625c28
...@@ -34,7 +34,8 @@ from persistent.timestamp import TimeStamp ...@@ -34,7 +34,8 @@ from persistent.timestamp import TimeStamp
import os, os.path, subprocess, threading import os, os.path, subprocess, threading
from errno import EINVAL from errno import EINVAL
from golang import go, chan, func, defer from golang import go, chan, func, defer, select
from golang import context
from zodbtools.util import ashex as h, fromhex from zodbtools.util import ashex as h, fromhex
from pytest import raises from pytest import raises
from .internal import mm from .internal import mm
...@@ -465,6 +466,7 @@ class tWatch: ...@@ -465,6 +466,7 @@ class tWatch:
# _send sends raw message via specified stream. # _send sends raw message via specified stream.
# #
# multiple _send can be called in parallel - _send serializes writes. # multiple _send can be called in parallel - _send serializes writes.
# XXX +ctx
def _send(t, stream, msg): def _send(t, stream, msg):
assert '\n' not in msg assert '\n' not in msg
with t._txmu: with t._txmu:
...@@ -476,7 +478,7 @@ class tWatch: ...@@ -476,7 +478,7 @@ class tWatch:
# sendReq sends client -> server request and returns server reply. # sendReq sends client -> server request and returns server reply.
# #
# only 1 sendReq must be used at a time. # XXX relax? # only 1 sendReq must be used at a time. # XXX relax?
def sendReq(t, req): def sendReq(t, ctx, req):
stream = 1 stream = 1
rxq = chan() rxq = chan()
...@@ -485,13 +487,27 @@ class tWatch: ...@@ -485,13 +487,27 @@ class tWatch:
t._rxtab[stream] = rxq t._rxtab[stream] = rxq
t._send(stream, req) t._send(stream, req)
return rxq.recv()
_, _rx = select(
ctx.done().recv, # 0
rxq.recv, # 1
)
if _ == 0:
raise ctx.err()
return _rx
# recvReq receives client <- server request. # recvReq receives client <- server request.
# #
# multiple recvReq could be used at a time. # multiple recvReq could be used at a time.
def recvReq(t): # -> tSrvReq | None when EOF def recvReq(t, ctx): # -> tSrvReq | None when EOF
rx = t._acceptq.recv() _, _rx = select(
ctx.done().recv, # 0
t._acceptq.recv, # 1
)
if _ == 0:
raise ctx.err()
rx = _rx
if rx is None: if rx is None:
return rx return rx
...@@ -506,7 +522,7 @@ class tWatch: ...@@ -506,7 +522,7 @@ class tWatch:
# #
# XXX cancel waiting upon receiving "ok" from wcfs (-> error that missed pins were not received) # XXX cancel waiting upon receiving "ok" from wcfs (-> error that missed pins were not received)
# XXX abort on timeout? # XXX abort on timeout?
def expectPin(t, expectv): def expectPin(t, ctx, expectv):
expected = set() # of expected pin messages expected = set() # of expected pin messages
for zf, blk, at in expectv: for zf, blk, at in expectv:
msg = b"pin %s #%d @%s" % (h(zf._p_oid), blk, h(at)) msg = b"pin %s #%d @%s" % (h(zf._p_oid), blk, h(at))
...@@ -515,7 +531,7 @@ class tWatch: ...@@ -515,7 +531,7 @@ class tWatch:
reqv = [] # of received requests reqv = [] # of received requests
while len(expected) > 0: while len(expected) > 0:
req = t.recvReq() req = t.recvReq(ctx)
assert req is not None # channel not closed assert req is not None # channel not closed
assert req.msg in expected assert req.msg in expected
expected.remove(req.msg) expected.remove(req.msg)
...@@ -633,20 +649,23 @@ def test_wcfs(): ...@@ -633,20 +649,23 @@ def test_wcfs():
print('\n\n inv. protocol \n\n') print('\n\n inv. protocol \n\n')
w = t.openwatch() w = t.openwatch()
ctx = context.background() # XXX stub
done = chan() done = chan()
@func @func
def _(): def _():
defer(done.close) defer(done.close)
pinv = w.expectPin([(zf, 2, at1), (zf, 3, at0)]) pinv = w.expectPin(ctx, [(zf, 2, at1), (zf, 3, at0)])
#pinv = w.expectPin({zf: [(2, at1), (3, at0)]}) XXX <- this way better? (sugar) #pinv = w.expectPin(ctx, {zf: [(2, at1), (3, at0)]}) XXX <- this way better? (sugar)
for p in pinv: for p in pinv:
p.reply(b"ack") p.reply(b"ack")
go(_) go(_)
assert w.sendReq(b"watch %s @%s" % (h(zf._p_oid), h(at1))) == "ok" assert w.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) == "ok"
done.recv() done.recv()
print('\nCCC\n') print('\nCCC\n')
"""
# checkSetupWatch verifies setting up new watch for zf@at. # checkSetupWatch verifies setting up new watch for zf@at.
def checkSetupWatch(zf, at): def checkSetupWatch(zf, at):
# all changes to zf # all changes to zf
...@@ -659,6 +678,7 @@ def test_wcfs(): ...@@ -659,6 +678,7 @@ def test_wcfs():
w = t.openwatch() w = t.openwatch()
for i in range(len(t.dFtail)): for i in range(len(t.dFtail)):
"""
return return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment