wcfs_watchlink.h 4.95 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1
// Copyright (C) 2018-2020  Nexedi SA and Contributors.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

// wcfs_watchlink provides WatchLink class that implements message exchange
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
21
// over /head/watch on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
22 23 24 25

#ifndef _NXD_WCFS_WATCHLINK_H_
#define _NXD_WCFS_WATCHLINK_H_

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
#include <golang/libgolang.h>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
27 28 29
#include <golang/context.h>
#include <golang/cxx.h>
#include <golang/sync.h>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
30
using namespace golang;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
31 32
using cxx::dict;
using cxx::set;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
33

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
34
#include "wcfs.h"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
35 36
#include "wcfs_misc.h"

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
37 38 39
// wcfs::
namespace wcfs {

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
40 41 42 43 44 45 46 47 48 49 50 51
struct PinReq;


// StreamID stands for ID of a stream multiplexed over WatchLink.
typedef uint64_t StreamID;

// rxPkt internally represents data of one message received over WatchLink.
struct rxPkt {
    // stream over which the data was received
    StreamID stream;

    // raw data received/to-be-sent.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
52
    // XXX not e.g. string, as chan<T> currently does not support types with
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53 54 55
    //     non-trivial copy. Note: we anyway need to limit rx line length to
    //     avoid DoS, but just for DoS the limit would be higher.
    uint16_t datalen;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
56
    char     data[256 - sizeof(StreamID) - sizeof(uint16_t)];
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
57 58 59 60

    error  from_string(const string& rx);
    string to_string() const;
};
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
61
static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too low for long error message
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62 63 64 65


// WatchLink represents /head/watch link opened on wcfs.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66
// It is created by WCFS._openwatch().
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
67
//
68
// .sendReq()/.recvReq() provides raw IO in terms of wcfs isolation protocol messages.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
69 70 71
// .close() closes the link.
//
// It is safe to use WatchLink from multiple threads simultaneously.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
72
typedef refptr<class _WatchLink> WatchLink;
73
class _WatchLink : public os::_IAfterFork, object {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
74 75
    WCFS            *_wc;
    os::File        _f;      // head/watch file handle
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
76
    string          _rxbuf;  // buffer for data already read from _f
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
77

78
    // iso.protocol message IO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
79
    chan<rxPkt>     _acceptq;   // server originated messages go here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80
    sync::Mutex     _rxmu;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
81
    bool            _down;      // y when the link is no-longer operational
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
82
    bool            _rxeof;     // y if EOF was received from server
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
83
    dict<StreamID, chan<rxPkt>>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
84
                    _rxtab;     // {} stream -> rxq    server replies go via here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
85 86
    set<StreamID>   _accepted;  // streams we accepted but did not replied yet

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
87
    StreamID        _req_next;  // stream ID for next client-originated request TODO -> atomic
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
88 89 90
    sync::Mutex     _txmu;      // serializes writes
    sync::Once      _txclose1;

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
91
    sync::WorkGroup _serveWG;   // _serveRX is running under _serveWG
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
92
    func<void()>    _serveCancel;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
93

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
94
    // XXX for tests
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
95
public:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
96
    vector<string>  fatalv; // ad-hoc, racy. TODO rework to send messages to control channel
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97
    chan<structZ>   rx_eof; // becomes ready when wcfs closes its tx side
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
98

99 100 101 102 103 104
    // don't new - create only via WCFS._openwatch()
private:
    _WatchLink();
    ~_WatchLink();
    friend pair<WatchLink, error> WCFS::_openwatch();
public:
105
    void incref();
106 107
    void decref();

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
108 109
public:
    error close();
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
110
    error closeWrite();
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
111
    pair<string, error> sendReq(context::Context ctx, const string &req);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
112 113
    error recvReq(context::Context ctx, PinReq *rx_into);
    error replyReq(context::Context ctx, const PinReq *req, const string& reply);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
114

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
115
    string String() const;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
116
    int    fd() const;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
117

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
118
private:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
119
    error _serveRX(context::Context ctx);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
120 121 122
    tuple<string, error> _readline();
    error _send(StreamID stream, const string &msg);
    error _write(const string &pkt);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
123
    StreamID _nextReqID();
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
124
    tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, StreamID stream, const string &req);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
125

126 127
    void afterFork();

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
128
    friend error _twlinkwrite(WatchLink wlink, const string &pkt);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
129 130 131 132 133 134 135 136
};

// PinReq represents 1 server-initiated wcfs pin request received over /head/watch link.
struct PinReq {
    StreamID    stream; // request was received with this stream ID
    zodb::Oid   foid;   // request is about this file
    int64_t     blk;    // ----//---- about this block
    zodb::Tid   at;     // pin to this at;  TidHead means unpin to head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137 138

    string      msg;    // XXX raw message for tests (TODO kill)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
139 140 141
};


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
142 143 144
// for testing
error _twlinkwrite(WatchLink wlink, const string &pkt);

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
145 146 147

}   // wcfs::

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
148
#endif