// Copyright (C) 2018-2020  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

// wcfs_watchlink provides WatchLink class that implements message exchange
// over /head/watch on wcfs.

#ifndef _NXD_WCFS_WATCHLINK_H_
#define _NXD_WCFS_WATCHLINK_H_

#include <golang/libgolang.h>
#include <golang/context.h>
#include <golang/cxx.h>
#include <golang/sync.h>
using namespace golang;
using cxx::dict;
using cxx::set;

#include "wcfs.h"
#include "wcfs_misc.h"

// wcfs::
namespace wcfs {

struct PinReq;


// StreamID stands for ID of a stream multiplexed over WatchLink.
typedef uint64_t StreamID;

// rxPkt internally represents data of one message received over WatchLink.
struct rxPkt {
    // stream over which the data was received
    StreamID stream;

    // raw data received/to-be-sent.
    // XXX not e.g. string, as chan<T> currently does not support types with
    //     non-trivial copy. Note: we anyway need to limit rx line length to
    //     avoid DoS, but just for DoS the limit would be higher.
    uint16_t datalen;
    char     data[256 - sizeof(StreamID) - sizeof(uint16_t)];

    error  from_string(const string& rx);
    string to_string() const;
};
static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too low for long error message


// WatchLink represents /head/watch link opened on wcfs.
//
// It is created by WCFS._openwatch().
//
// .sendReq()/.recvReq() provides raw IO in terms of wcfs isolation protocol messages.
// .close() closes the link.
//
// It is safe to use WatchLink from multiple threads simultaneously.
typedef refptr<class _WatchLink> WatchLink;
class _WatchLink : public os::_IAfterFork, object {
    WCFS            *_wc;
    os::File        _f;      // head/watch file handle
    string          _rxbuf;  // buffer for data already read from _f

    // iso.protocol message IO
    chan<rxPkt>     _acceptq;   // server originated messages go here
    sync::Mutex     _rxmu;
    bool            _down;      // y when the link is no-longer operational
    bool            _rxeof;     // y if EOF was received from server
    dict<StreamID, chan<rxPkt>>
                    _rxtab;     // {} stream -> rxq    server replies go via here
    set<StreamID>   _accepted;  // streams we accepted but did not replied yet

    StreamID        _req_next;  // stream ID for next client-originated request TODO -> atomic
    sync::Mutex     _txmu;      // serializes writes
    sync::Once      _txclose1;

    sync::WorkGroup _serveWG;   // _serveRX is running under _serveWG
    func<void()>    _serveCancel;

    // XXX for tests
public:
    vector<string>  fatalv; // ad-hoc, racy. TODO rework to send messages to control channel
    chan<structZ>   rx_eof; // becomes ready when wcfs closes its tx side

    // don't new - create only via WCFS._openwatch()
private:
    _WatchLink();
    ~_WatchLink();
    friend pair<WatchLink, error> WCFS::_openwatch();
public:
    void incref();
    void decref();

public:
    error close();
    error closeWrite();
    pair<string, error> sendReq(context::Context ctx, const string &req);
    error recvReq(context::Context ctx, PinReq *rx_into);
    error replyReq(context::Context ctx, const PinReq *req, const string& reply);

    string String() const;
    int    fd() const;

private:
    error _serveRX(context::Context ctx);
    tuple<string, error> _readline();
    error _send(StreamID stream, const string &msg);
    error _write(const string &pkt);
    StreamID _nextReqID();
    tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, StreamID stream, const string &req);

    void afterFork();

    friend error _twlinkwrite(WatchLink wlink, const string &pkt);
};

// PinReq represents 1 server-initiated wcfs pin request received over /head/watch link.
struct PinReq {
    StreamID    stream; // request was received with this stream ID
    zodb::Oid   foid;   // request is about this file
    int64_t     blk;    // ----//---- about this block
    zodb::Tid   at;     // pin to this at;  TidHead means unpin to head

    string      msg;    // XXX raw message for tests (TODO kill)
};


// for testing
error _twlinkwrite(WatchLink wlink, const string &pkt);


}   // wcfs::

#endif