// Copyright (C) 2018-2019  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

package zodb
// application-level database handle.

import (
	"context"
	"fmt"
	"sort"
	"sync"
	"time"

	"lab.nexedi.com/kirr/go123/xerr"
	"lab.nexedi.com/kirr/neo/go/transaction"
)

// DB represents a handle to database at application level and contains pool
// of connections. DB.Open opens database connection. The connection will be
// automatically put back into DB pool for future reuse after corresponding
// transaction is complete. DB thus provides service to maintain live objects
// cache and reuse live objects from transaction to transaction.
//
// Note that it is possible to have several DB handles to the same database.
// This might be useful if application accesses distinctly different sets of
// objects in different transactions and knows beforehand which set it will be
// next time. Then, to avoid huge cache misses, it makes sense to keep DB
// handles opened for every possible case of application access.
//
// DB is safe to access from multiple goroutines simultaneously.
type DB struct {
	stor   IStorage
	watchq chan Event // we are watching .stor via here

	down     chan struct{} // ready when DB is no longer operational
	downOnce sync.Once     // shutdown may be due to both Close and IO error in watcher
	downErr  error         // reason for shutdown

	mu sync.Mutex

	// pool of unused connections.
	//
	// On open(at) live cache is reused through finding conn with nearby
	// .at and invalidating live objects based on δtail info.
	//
	// not all connections here have δtail coverage.
	pool []*Connection // order by ↑= .at

	// δtail of database changes.
	//
	// Used for live cache invalidations on open with at close to current
	// storage head. δtail coverage is maintained based on the following:
	//
	// 1) if open(at) is _far_away_ from head - it is _unlikely_ for
	//    opened connection to be later propagated towards head.
	//
	// 2) if open(at) is _close_      to head - it is _possible_ for
	//    opened connection to be later propagated towards head.
	//
	// For "1" we don't need δtail coverage; for "2" probability that
	// it would make sense for connection to be advanced decreases the
	// longer the connection stays opened. Thus the following 2 factors
	// affect whether it makes sense to keep δtail coverage for a
	// connection:
	//
	//         |at - δhead(when_open)|	ΔTnext		 - avg. time between transactions
	// heady = ──────────────────────	at     		 - connection opened for this state
	//                ΔTnext		δhead(when_open) - δtail.Head when connection was opened
	//					Twork(conn)	 - time the connection is used
	//         Twork(conn)
	// lwork = ───────────
	//           ΔTnext
	//
	// if heady >> 1 - it is case "1" and δtail coverage is not needed.
	// if heady  ~ 1 - it is case "2" and δtail coverage might be needed depending on lwork.
	// if lwork >> 1 - the number of objects that will need to be invalidated
	//		   when updating conn to current head grows to ~ 100% of
	//		   connection's live cache. It thus does not make
	//		   sense to keep δtail past some reasonable time.
	//
	// A good system would monitor both ΔTnext, and lwork for connections
	// with small heady, and adjust δtail cut time as e.g.
	//
	//	timelen(δtail) = 3·lwork·ΔTnext
	//
	//
	// FIXME for now we just fix
	//
	//	Tδkeep = 10min
	//	δkeep  = 10
	//
	// and keep δtail coverage for Tδkeep time, with ensuring that we keep
	// at least δkeep entries not to loose cache on very seldom commits:
	//
	//	timelen(δtail) = Tδkeep
	//	len(δtail)     ≥ δkeep
	δtail  *ΔTail        // [](rev↑, []oid)
	tδkeep time.Duration
	δkeep  int

	// waiters for δtail.Head to become ≥ their at.
	hwait map[hwaiter]struct{} // set{(at, ready)}

	// XXX δtail/hwait -> Storage or -> Cache?
	// (so it is not duplicated many times for many DB case)
}


// NewDB creates new database handle.
//
// Created database handle must be closed when no longer needed.
func NewDB(stor IStorage) *DB {
	// XXX db options?
	db := &DB{
		stor:   stor,
		watchq: make(chan Event),
		down:   make(chan struct{}),
		hwait:  make(map[hwaiter]struct{}),

		tδkeep: 10*time.Minute, // see δtail discussion
		δkeep:  10,             // ---- // ----
	}

	at0 := stor.AddWatch(db.watchq)
	db.δtail = NewΔTail(at0) // init to (at0, at0]
	go db.watcher()

	return db
}

// shutdown marks db no longer operational due to reason.
//
// It serves both either explicit Close, or shutdown triggered due to error
// received by watcher. Only the first shutdown call has the effect.
func (db *DB) shutdown(reason error) {
	db.downOnce.Do(func() {
		db.downErr = reason
		close(db.down)

		db.stor.DelWatch(db.watchq)
	})
}

// Close closes database handle.
//
// After Close DB.Open calls will return error. However it is ok to continue
// to use connections opened prior to Close.
func (db *DB) Close() error {
	db.shutdown(fmt.Errorf("db is closed"))
	return nil
}


// ConnOptions describes options to DB.Open .
type ConnOptions struct {
	At     Tid  // if !0, open Connection bound to `at` view of database; not latest.
	NoSync bool // don't sync with storage to get its last tid.

	// don't put connection back into DB pool after transaction ends.
	//
	// This is low-level option that allows to inspect/use connection's
	// LiveCache after transaction finishes, and to manually resync the
	// connection onto another database view. See Connection.Resync for
	// details.
	NoPool bool
}

// String represents connection options in human-readable form.
//
// For example:
//
//	(@head, sync)
func (opt *ConnOptions) String() string {
	s := "(@"
	if opt.At != 0 {
		s += opt.At.String()
	} else {
		s += "head"
	}

	s += ", "
	if opt.NoSync {
		s += "!"
	}
	s += "sync"

	s += ", "
	if opt.NoPool {
		s += "!"
	}
	s += "pool"

	s += ")"
	return s
}

// watcher receives events about new committed transactions and updates δtail.
//
// It also notifies δtail waiters.
//
// The watcher stops when it sees either the storage being closed or an error.
// The DB is shutdown on exit.
func (db *DB) watcher() (err error) {
	defer func() {
		//fmt.Printf("db: watcher: exit: %s\n", err)
		xerr.Contextf(&err, "db: watcher")
		db.shutdown(err)
	}()

	var event Event
	var ok bool

	for {
		select {
		case <-db.down:
			// db is already shut down with concrete reason
			return fmt.Errorf("db is down")

		case event, ok = <-db.watchq:
			if !ok {
				return fmt.Errorf("storage closed")
			}
		}

		//fmt.Printf("db: watcher <- %v\n", event)

		var δ *EventCommit
		switch event := event.(type) {
		default:
			panic(fmt.Sprintf("unexepected event: %T", event))

		case *EventError:
			return fmt.Errorf("error: %s", event.Err)

		case *EventCommit:
			δ = event
		}

		var readyv []chan struct{} // waiters that become ready

		db.mu.Lock()

		db.δtail.Append(δ.Tid, δ.Changev)
		for w := range db.hwait {
			if w.at <= δ.Tid {
				readyv = append(readyv, w.ready)
				delete(db.hwait, w)
			}
		}

		// forget older δtail entries
		tcut := db.δtail.Head().Time().Add(-db.tδkeep)
		δcut := TidFromTime(tcut) // cut by δTkeep rule
		//fmt.Printf("db: watcher: δtail: =  (%s, %s]\n", db.δtail.Tail(), db.δtail.Head())
		//fmt.Printf("db: watcher: forget <=  %s\n", δcut)

		// take db.δkeep into account, so that we preserve len(δtail) ≥ δkeep
		δtail := db.δtail.Data()
		rcut := δcut // cut by δkeep rule
		if l := len(δtail); l > db.δkeep {
			rcut = δtail[l-db.δkeep-1].Rev
			rcut -= 1 // ForgetPast forgets by ≤
		} else {
			rcut = 0 // keep everything
		}

		if rcut < δcut {
			//fmt.Printf("db: watcher: forget %s -> %s (seldom commits)\n", δcut, rcut)
			δcut = rcut
		}

		db.δtail.ForgetPast(δcut)
		//fmt.Printf("db: watcher: δtail: -> (%s, %s]\n", db.δtail.Tail(), db.δtail.Head())

		db.mu.Unlock()

		// wakeup waiters outside of db.mu
		for _, ready := range readyv {
			//fmt.Printf("db: watcher: wakeup %v\n", ready)
			close(ready)
		}
	}
}

// hwaiter represents someone waiting for δtail.Head to become ≥ at.
type hwaiter struct {
	at    Tid
	ready chan struct{}
}

// headWait waits till db.Head becomes ≥ at.
//
// It returns error either if db is down or ctx is canceled.
//
// Must be called db.mu released.
func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
	defer xerr.Contextf(&err, "wait head ≥ %s", at)

	// precheck if db is already down -> error even if at is under coverage
	if ready(db.down) {
		return db.downErr
	}

	db.mu.Lock()

	// we already have the coverage
	if at <= db.δtail.Head() {
		db.mu.Unlock()
		return nil
	}

	// we have some δtail coverage, but at is ahead of that.
	// wait till δtail.head is up to date covering ≥ at.
	δready := make(chan struct{})
	db.hwait[hwaiter{at, δready}] = struct{}{}
	db.mu.Unlock()

	select {
	case <-δready:
		// ok - δtail.head went over at
		return nil

	case <-ctx.Done():
		return ctx.Err()

	case <-db.down:
		return db.downErr
	}
}


// Open opens new connection to the database.
//
// By default the connection is opened to current latest database state; opt.At
// can be specified to open connection bound to particular view of the database.
//
// Open must be called under transaction.
// Opened connection must be used only under the same transaction and only
// until that transaction is complete(*).
//
// (*) unless NoPool option is used.
func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err error) {
	defer func() {
		if err == nil {
			return
		}

		err = &OpError{
			URL:  db.stor.URL(),
			Op:   "open db",
			Args: opt,
			Err:  err,
		}
	}()

	// don't bother to sync to storage if db is down
	if ready(db.down) {
		return nil, db.downErr
	}

	// find out db state we should open at
	at := opt.At
	if at == 0 {
		if opt.NoSync {
			db.mu.Lock()
			at = db.δtail.Head()
			db.mu.Unlock()
		} else {
			// sync storage for lastTid
			var err error

			// XXX stor.LastTid returns last_tid storage itself
			// received on server, not last_tid on server.
			// -> add stor.Sync() ?
			at, err = db.stor.LastTid(ctx)
			if err != nil {
				return nil, err
			}
		}
	}

	// wait for db.Head ≥ at
	err = db.headWait(ctx, at)
	if err != nil {
		return nil, err
	}

	// open(at)
	conn := db.open(at, opt.NoPool)
	conn.resync(ctx, at)
	return conn, nil
}

// open is internal worker for Open.
//
// It returns either new connection, or connection from the pool.
// Returned connection does not generally have .at=at, and have to go through .resync().
//
// Must be called with at ≤ db.Head .
// Must be called with db.mu released.
func (db *DB) open(at Tid, noPool bool) *Connection {
	db.mu.Lock()
	defer db.mu.Unlock()

	δtail := db.δtail

	//fmt.Printf("db.open @%s nopool=%v\t; δtail (%s, %s]\n", at, noPool, δtail.Tail(), δtail.Head())

	// at should be ≤ head (caller waited for it before invoking us)
	if head := δtail.Head(); at > head {
		panic(fmt.Sprintf("open: at (%s) > head (%s)", at, head))
	}

	// NoPool connection - create one anew
	if noPool {
		conn := newConnection(db, at)
		conn.noPool = true
		return conn
	}

	// check if we already have an exact match
	conn := db.get(at, at)
	if conn != nil {
		return conn
	}

	// no exact match - let's try to find nearest

	// too far in the past, and we know there is no exact match
	// -> new historic connection.
	if at <= δtail.Tail() {
		return newConnection(db, at)
	}

	// at ∈ (δtail, δhead]	; try to get nearby idle connection or make a new one
	//
	// note: we are ok to get conn with .at = δtail.Tail inclusive, because
	// we need only later transactions to invalidate conn cache, and data
	// about later transactions is present in δtail.
	conn = db.get(δtail.Tail(), at)
	if conn == nil {
		conn = newConnection(db, at)
	}
	return conn
}

// Resync resyncs the connection onto different database view and transaction.
//
// Connection's objects pinned in live cache are guaranteed to stay in live
// cache, even if maybe in ghost state (e.g. if they have to be invalidated due
// to database changes).
//
// Resync can be used many times.
//
// Resync must be used only under the following conditions:
//
//	- the connection was initially opened with NoPool flag;
//	- previous transaction, under which connection was previously
//	  opened/resynced, must be already complete;
//	- contrary to DB.Open, at cannot be 0.
//
// Note: new at can be both higher and lower than previous connection at.
//
// Note: if new at is already covered by DB.Head Resync will be non-blocking
// operation. However if at is > current DB.Head Resync, similarly to DB.Open,
// will block waiting for DB.Head to become ≥ at.
func (conn *Connection) Resync(ctx context.Context, at Tid) (err error) {
	if !conn.noPool {
		panic("Conn.Resync: connection was opened without NoPool flag")
	}
	if at == 0 {
		panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)")
	}

	defer xerr.Contextf(&err, "resync @%s -> @%s", conn.at, at)

	// wait for db.Head ≥ at
	err = conn.db.headWait(ctx, at)
	if err != nil {
		return err
	}

	conn.resync(ctx, at)
	return nil
}

// resync serves Connection.Resync and DB.Open .
//
// Must be called with at ≤ conn.db.Head .
// Must be called with conn.db released.
func (conn *Connection) resync(ctx context.Context, at Tid) {
	txn := transaction.Current(ctx)
	conn.resync1(at)

	// upon exit, with all locks released, register conn to txn.
	conn.at = at
	conn.txn = txn
	txn.RegisterSync((*connTxnSync)(conn))
}

// resync1 serves resync.
//
// it computes δ(conn.at, at) and invalidates objects ∈ δ in conn cache.
func (conn *Connection) resync1(at Tid) {
	if conn.txn != nil {
		panic("Conn.resync: previous transaction is not yet complete")
	}

	db := conn.db
	db.mu.Lock()

	// at should be ≤ head (caller waited for it before invoking us)
	if head := db.δtail.Head(); at > head {
		db.mu.Unlock()
		panic(fmt.Sprintf("resync: at (%s) > head (%s)", at, head))
	}


	// conn.at == at - nothing to do (even if out of δtail coverage)
	if conn.at == at {
		db.mu.Unlock()
		return
	}

	// conn.at != at - have to invalidate objects in live cache.
	δtail := db.δtail
	δobj  := make(map[Oid]struct{}) // set(oid) - what to invalidate
	δall  := false                  // if we have to invalidate all objects

	// both conn.at and at are covered by δtail - we can invalidate selectively
	if (δtail.Tail() < conn.at && conn.at <= δtail.Head()) &&
	   (δtail.Tail() <      at &&      at <= δtail.Head()) {
		var δv []ΔRevEntry
		if conn.at <= at {
			δv = δtail.SliceByRev(conn.at, at)
		} else {
			// at < conn.at
			δv = δtail.SliceByRev(at-1, conn.at-1)
		}

		for _, δ := range δv {
			for _, oid := range δ.Changev {
				δobj[oid] = struct{}{}
			}
		}

	// some of conn.at or at is outside δtail coverage - invalidate all
	// objects, but keep the objects present in live cache.
	} else {
		δall = true
	}

	// unlock db before locking cache and txn
	db.mu.Unlock()

	conn.cache.Lock()
	defer conn.cache.Unlock()

	if δall {
		// XXX keep synced with LiveCache details
		// XXX -> conn.cache.forEach?
		for _, wobj := range conn.cache.objtab {
			obj, _ := wobj.Get().(IPersistent)
			if obj != nil {
				obj.PInvalidate()
			}
		}
	} else {
		for oid := range δobj {
			obj := conn.cache.Get(oid)
			if obj != nil {
				obj.PInvalidate()
			}
		}
	}

	// all done
	return
}

// get returns connection from db pool most close to at with conn.at ∈ [atMin, at].
//
// If there is no such connection in the pool - nil is returned.
// Must be called with db.mu locked.
//
// Note: atMin is inclusive, because even if we get conn with .at = δtail.Tail,
// we still can use δtail data to invalidate conn cache with followup transactions.
func (db *DB) get(atMin, at Tid) *Connection {
	l := len(db.pool)

	// find pool index corresponding to at:
	// [i-1].at ≤ at < [i].at
	i := sort.Search(l, func(i int) bool {
		return at < db.pool[i].at
	})

	//fmt.Printf("pool:\n")
	//for i := 0; i < l; i++ {
	//	fmt.Printf("\t[%d]:  .at = %s\n", i, db.pool[i].at)
	//}
	//fmt.Printf("get  [%s, %s] -> %d\n", atMin, at, i)

	// search through window of X previous connections and find out the one
	// with minimal distance to get to state @at that fits into requested range.
	//
	// XXX search not only previous, but future too? (we can get back to
	// past by invalidating what was later changed) (but likely it will
	// hurt by destroying cache of more recent connection).
	const X = 10 // XXX search window size: hardcoded
	jδmin := -1
	for j := i - X; j < i; j++ {
		if j < 0 {
			continue
		}
		if db.pool[j].at < atMin {
			continue
		}

		// TODO search for max N(live) - N(live, that will need to be invalidated)
		jδmin = j // XXX stub (using rightmost j)
	}

	// nothing found
	if jδmin < 0 {
		return nil
	}

	// found - reuse the connection
	conn := db.pool[jδmin]
	copy(db.pool[jδmin:], db.pool[jδmin+1:])
	db.pool[l-1] = nil
	db.pool = db.pool[:l-1]

	if conn.db != db {
		panic("DB.get: foreign connection in the pool")
	}
	if conn.txn != nil {
		panic("DB.get: live connection in the pool")
	}

	return conn
}

// put puts connection back into db pool.
func (db *DB) put(conn *Connection) {
	if conn.db != db {
		panic("DB.put: conn.db != db")
	}

	db.mu.Lock()
	defer db.mu.Unlock()

	// XXX check if len(pool) > X, and drop conn if yes

	// [i-1].at ≤ at < [i].at
	i := sort.Search(len(db.pool), func(i int) bool {
		return conn.at < db.pool[i].at
	})

	//db.pool = append(db.pool[:i], conn, db.pool[i:]...)
	db.pool = append(db.pool, nil)
	copy(db.pool[i+1:], db.pool[i:])
	db.pool[i] = conn

	// TODO GC too idle connections here
}

// ---- txn sync ----

type connTxnSync Connection // hide from public API

func (csync *connTxnSync) BeforeCompletion(txn transaction.Transaction) {
	conn := (*Connection)(csync)
	conn.checkTxn(txn, "BeforeCompletion")
	// nothing
}

// AfterCompletion puts conn back into db pool after transaction is complete.
func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) {
	conn := (*Connection)(csync)
	conn.checkTxn(txn, "AfterCompletion")

	// mark the connection as no longer being live
	conn.txn = nil

	if !conn.noPool {
		conn.db.put(conn)
	}
}