Commit 88f50882 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3cd2dd08
...@@ -162,7 +162,7 @@ error _WatchLink::_serveRX(context::Context ctx) { ...@@ -162,7 +162,7 @@ error _WatchLink::_serveRX(context::Context ctx) {
if (pkt.stream == 0) { // control/fatal message from wcfs if (pkt.stream == 0) { // control/fatal message from wcfs
log::Errorf("C: watch : rx fatal: %s\n", v(l)); log::Errorf("C: watch : rx fatal: %s\n", v(l));
wlink.fatalv.push_back(pkt.to_string()); wlink.fatalv.push_back(pkt.to_string());
continue; // XXX -> shutdown ? return ErrLinkDown; // XXX correct error
} }
bool reply = (pkt.stream % 2 != 0); bool reply = (pkt.stream % 2 != 0);
...@@ -227,8 +227,15 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) { ...@@ -227,8 +227,15 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
if (_ == 0) if (_ == 0)
return E(ctx->err()); return E(ctx->err());
if (!ok) if (!ok) {
wlink._rxmu.lock();
bool down = wlink._rxdown;
wlink._rxmu.unlock();
if (down)
return E(ErrLinkDown);
return io::EOF_; // NOTE EOF goes without E return io::EOF_; // NOTE EOF goes without E
}
return E(_parsePinReq(prx, &pkt)); return E(_parsePinReq(prx, &pkt));
} }
......
...@@ -49,7 +49,7 @@ struct rxPkt { ...@@ -49,7 +49,7 @@ struct rxPkt {
StreamID stream; StreamID stream;
// raw data received/to-be-sent. // raw data received/to-be-sent.
// XXX not e.g. string as chan<T> currently does not support types with // XXX not e.g. string, as chan<T> currently does not support types with
// non-trivial copy. Note: we anyway need to limit rx line length to // non-trivial copy. Note: we anyway need to limit rx line length to
// avoid DoS, but just for DoS the limit would be higher. // avoid DoS, but just for DoS the limit would be higher.
uint16_t datalen; uint16_t datalen;
...@@ -63,7 +63,7 @@ static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too l ...@@ -63,7 +63,7 @@ static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too l
// WatchLink represents /head/watch link opened on wcfs. // WatchLink represents /head/watch link opened on wcfs.
// //
// It is created by WCFS::_openwatch(). // It is created by WCFS._openwatch().
// //
// .sendReq()/.recvReq() provides raw IO in terms of wcfs isolation protocol messages. // .sendReq()/.recvReq() provides raw IO in terms of wcfs isolation protocol messages.
// .close() closes the link. // .close() closes the link.
...@@ -73,12 +73,12 @@ typedef refptr<class _WatchLink> WatchLink; ...@@ -73,12 +73,12 @@ typedef refptr<class _WatchLink> WatchLink;
class _WatchLink : public object { class _WatchLink : public object {
WCFS *_wc; WCFS *_wc;
os::File _f; // head/watch file handle os::File _f; // head/watch file handle
string _rxbuf; // buffer for read data from _f string _rxbuf; // buffer for data already read from _f
// iso.protocol message IO // iso.protocol message IO
chan<rxPkt> _acceptq; // server originated messages go here chan<rxPkt> _acceptq; // server originated messages go here
sync::Mutex _rxmu; sync::Mutex _rxmu;
bool _rxdown; bool _rxdown; // y when the link is no-longer operational
dict<StreamID, chan<rxPkt>> dict<StreamID, chan<rxPkt>>
_rxtab; // {} stream -> rxq server replies go via here _rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet set<StreamID> _accepted; // streams we accepted but did not replied yet
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment