// Copyright (C) 2018-2019  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

package main
// ZBlk* + ZBigFile loading


// module: "wendelin.bigfile.file_zodb"
//
// ZBigFile
//	.blksize	xint
//	.blktab		LOBtree{}  blk -> ZBlk*(blkdata)
//
// ZBlk0 (aliased as ZBlk)
//	str with trailing '\0' removed.
//
// ZBlk1
//	.chunktab	IOBtree{}  offset -> ZData(chunk)
//
// ZData
//	str (chunk)

import (
	"context"
	"fmt"
	"log"
	"reflect"
	"sort"
	"sync"
	"syscall"

	"golang.org/x/sync/errgroup"

	"lab.nexedi.com/kirr/go123/mem"
	"lab.nexedi.com/kirr/go123/xerr"
	"lab.nexedi.com/kirr/neo/go/zodb"
	"lab.nexedi.com/kirr/neo/go/zodb/btree"
	pickle "github.com/kisielk/og-rek"

	"./internal/pycompat"
)

// zBlk is the interface that every ZBlk* block implements internally.
type zBlk interface {
	// loadBlkData loads from database and returns data block stored by this ZBlk.
	//
	// If returned data size is less than the block size of containing ZBigFile,
	// the block trailing is assumed to be trailing \0.
	loadBlkData(ctx context.Context) ([]byte, error)

	// bindZFile associates ZBlk as being used by zfile to store block #blk.
	//
	// A ZBlk may be bound to several blocks inside one file, and to
	// several files.
	//
	// The information is preserved even when ZBlk comes to ghost
	// state, but is lost if ZBlk is garbage collected.
	//
	// it is safe to call multiple bindZFile simultaneously.
	// it is not safe to call bindZFile and boundTo simultaneously.
	//
	// XXX link to overview.
	bindZFile(zfile *ZBigFile, blk int64)

	// XXX unbindZFile
	// XXX zfile -> bind map for it

	// blkBoundTo returns ZBlk association with zfile(s)/#blk(s).
	//
	// The association returned is that was previously set by bindZFile.
	//
	// blkBoundTo must not be called simultaneously wrt bindZFile.
	blkBoundTo() map[*ZBigFile]SetI64
}

// module of Wendelin ZODB py objects
const zwendelin = "wendelin.bigfile.file_zodb"

// ---- zBlkBase ----

// zBlkBase provides common functionality to implement ZBlk* -> zfile, #blk binding.
//
// The data stored by zBlkBase is transient - it is _not_ included into
// persistent state.
type zBlkBase struct {
	bindMu  sync.Mutex           // used only for binding to support multiple loaders
	inzfile map[*ZBigFile]SetI64 // {} zfile -> set(#blk)
}

// bindZFile implements zBlk.
func (zb *zBlkBase) bindZFile(zfile *ZBigFile, blk int64) {
	zb.bindMu.Lock()
	defer zb.bindMu.Unlock()

	blkmap, ok := zb.inzfile[zfile]
	if !ok {
		blkmap = make(SetI64, 1)
		if zb.inzfile == nil {
			zb.inzfile = make(map[*ZBigFile]SetI64)
		}
		zb.inzfile[zfile] = blkmap
	}
	blkmap.Add(blk)
}

// blkBoundTo implementss zBlk.
func (zb *zBlkBase) blkBoundTo() map[*ZBigFile]SetI64 {
	return zb.inzfile
}

// ---- ZBlk0 ----

// ZBlk0 mimics ZBlk0 from python.
type ZBlk0 struct {
	zBlkBase
	zodb.Persistent

	// XXX py source uses bytes(buf) but on python2 it still results in str
	blkdata string
}

type zBlk0State ZBlk0 // hide state methods from public API

func (zb *zBlk0State) DropState() {
	zb.blkdata = ""
}

func (zb *zBlk0State) PySetState(pystate interface{}) error {
	blkdata, ok := pystate.(string)
	if !ok {
		return fmt.Errorf("expect str; got %s", typeOf(pystate))
	}

	log.Printf("ZBlk0.PySetState #%d", len(blkdata))
	zb.blkdata = blkdata
	return nil
}

func (zb *ZBlk0) loadBlkData(ctx context.Context) ([]byte, error) {
	// XXX err ctx

	err := zb.PActivate(ctx)
	if err != nil {
		return nil, err
	}
	defer zb.PDeactivate()

	return mem.Bytes(zb.blkdata), nil
}

// ---- ZBlk1 ---

// ZData mimics ZData from python.
type ZData struct {
	zBlkBase
	zodb.Persistent

	// XXX py source uses bytes(buf) but on python2 it still results in str
	data string
}

type zDataState ZData // hide state methods from public API

func (zd *zDataState) DropState() {
	zd.data = ""
}

func (zd *zDataState) PySetState(pystate interface{}) error {
	//log.Printf("ZData.PySetState")
	data, ok := pystate.(string)
	if !ok {
		return fmt.Errorf("expect str; got %s", typeOf(pystate))
	}

	zd.data = data
	return nil
}

// ZBlk1 mimics ZBlk1 from python.
type ZBlk1 struct {
	zodb.Persistent

	chunktab *btree.IOBTree // {} offset -> ZData(chunk)
}

type zBlk1State ZBlk1 // hide state methods from public API

func (zb *zBlk1State) DropState() {
	zb.chunktab = nil
}

func (zb *zBlk1State) PySetState(pystate interface{}) error {
	log.Printf("ZBlk1.PySetState")
	chunktab, ok := pystate.(*btree.IOBTree)
	if !ok {
		return fmt.Errorf("expect IOBTree; got %s", typeOf(pystate))
	}

	zb.chunktab = chunktab
	return nil
}

func (zb *ZBlk1) loadBlkData(ctx context.Context) ([]byte, error) {
	// XXX errctx

	err := zb.PActivate(ctx)
	if err != nil {
		return nil, err
	}
	defer zb.PDeactivate()

	// get to all ZData objects; activate them and build
	//
	//	{} offset -> ZData
	//
	// with all ZData being live.
	var mu sync.Mutex
	chunktab := make(map[int32]*ZData)

	// on return deactivate all ZData objects loaded in chunktab
	defer func() {
		for _, zd := range chunktab {
			zd.PDeactivate()
		}
	}()


	wg, ctx := errgroup.WithContext(ctx)

	// loadZData loads 1 ZData object into chunktab and leaves it activated.
	loadZData := func(offset int32, zd *ZData) error {
		err := zd.PActivate(ctx)
		if err != nil {
			return err
		}
		// no PDeactivate, zd remains live

		//fmt.Printf("@%d -> zdata #%s (%d)\n", offset, zd.POid(), len(zd.data))
		mu.Lock()
		defer mu.Unlock()

		chunktab[offset] = zd
		return nil
	}

	// loadBucket loads all ZData objects from leaf BTree bucket.
	loadBucket := func(b *btree.IOBucket) error {
		err := b.PActivate(ctx)
		if err != nil {
			return err
		}
		defer b.PDeactivate()

		// go through all bucket key/v -> chunktab

		// XXX off	 < 0		!ok
		// XXX off + len > blksize	!ok

		//fmt.Printf("\nbucket: %v\n\n", b.Entryv())

		for _, e := range b.Entryv() {
			zd, ok := e.Value().(*ZData)
			if !ok {
				return fmt.Errorf("!ZData (%s)", typeOf(e.Value()))
			}

			offset := e.Key()
			wg.Go(func() error {
				return loadZData(offset, zd)
			})
		}

		return nil
	}

	// loadBTree spawns loading of all BTree children.
	var loadBTree func(t *btree.IOBTree) error
	loadBTree = func(t *btree.IOBTree) error {
		err := t.PActivate(ctx)
		if err != nil {
			return err
		}
		defer t.PDeactivate()

		//fmt.Printf("\nbtree: %v\n\n", t.Entryv())

		for _, e := range t.Entryv() {
			switch child := e.Child().(type) {
			case *btree.IOBTree:
				wg.Go(func() error {
					return loadBTree(child)
				})

			case *btree.IOBucket:
				wg.Go(func() error {
					return loadBucket(child)
				})

			default:
				panic(fmt.Sprintf("IOBTree has %s child", typeOf(child)))
			}
		}

		return nil
	}

	wg.Go(func() error {
		return loadBTree(zb.chunktab)
	})

	err = wg.Wait()
	if err != nil {
		return nil, err // XXX err ctx
	}

	// empty .chunktab -> ø
	if len(chunktab) == 0 {
		return nil, nil
	}

	// glue all chunks from chunktab
	offv := make([]int32, 0, len(chunktab)) // ↑
	for off := range(chunktab) {
		offv = append(offv, off)
	}
	sort.Slice(offv, func(i, j int) bool {
		return offv[i] < offv[j]
	})

	//fmt.Printf("#chunktab: %d\n", len(chunktab))
	//fmt.Printf("offv: %v\n", offv)


        // find out whole blk len via inspecting tail chunk
	tailStart := offv[len(offv)-1]
	tailChunk := chunktab[tailStart]
	blklen    := tailStart + int32(len(tailChunk.data))	// XXX overflow?

        // whole buffer initialized as 0 + tail_chunk
	blkdata := make([]byte, blklen)
	copy(blkdata[tailStart:], tailChunk.data)

        // go through all chunks besides tail and extract them
	stop := int32(0)
	for _, start := range offv[:len(offv)-1] {
		chunk := chunktab[start]
		if !(start >= stop) {	// verify chunks don't overlap
			return nil, fmt.Errorf("!(start >= stop)")	// XXX
		}
		if !(start + int32(len(chunk.data)) <= int32(len(blkdata))) {	// XXX overflow?
			return nil, fmt.Errorf("blkdata overrun")	// XXX
		}
		stop = start + int32(len(chunk.data))	// XXX overflow?
		copy(blkdata[start:], chunk.data)
	}

	return blkdata, nil
}


// ----------------------------------------

// ZBigFile mimics ZBigFile from python.
type ZBigFile struct {
	zodb.Persistent

	blksize	int64
	blktab  *btree.LOBTree	// {}  blk -> ZBlk*(blkdata)
}

type zBigFileState ZBigFile // hide state methods from public API


// DropState implements zodb.Stateful.
func (bf *zBigFileState) DropState() {
	bf.blksize = 0
	bf.blktab  = nil
}

// PySetState implements zodb.PyStateful.
func (bf *zBigFileState) PySetState(pystate interface{}) (err error) {
	// ZBigFile
	//	.blksize	xint
	//	.blktab		LOBtree{}  blk -> ZBlk*(blkdata)
	//
	// state: (.blksize, .blktab)

	t, ok := pystate.(pickle.Tuple)
	if !ok {
		return fmt.Errorf("expect [2](); got %s", typeOf(pystate))
	}
	if len(t) != 2 {
		return fmt.Errorf("expect [2](); got [%d]()", len(t))
	}

	blksize, ok := pycompat.Int64(t[0])
	if !ok {
		return fmt.Errorf("blksize: expect integer; got %s", typeOf(t[0]))
	}
	if blksize <= 0 {
		return fmt.Errorf("blksize: must be > 0; got %d", blksize)
	}

	blktab, ok := t[1].(*btree.LOBTree)
	if !ok {
		return fmt.Errorf("blktab: expect LOBTree; got %s", typeOf(t[1]))
	}

	bf.blksize = blksize
	bf.blktab  = blktab
	return nil
}

// LoadBlk loads data for file block #blk.
//
// XXX better load into user-provided buf?
func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, err error) {
	defer xerr.Contextf(&err, "bigfile %s: loadblk %d", bf.POid(), blk)

	err = bf.PActivate(ctx)
	if err != nil {
		return nil, err
	}
	defer bf.PDeactivate()

	xzblk, ok, err := bf.blktab.Get(ctx, blk)
	if err != nil {
		return nil, err
	}
	if !ok {
		return make([]byte, bf.blksize), nil
	}

	zblk, ok := xzblk.(zBlk)
	if !ok {
		return nil, fmt.Errorf("expect ZBlk*; got %s", typeOf(xzblk))
	}

	blkdata, err := zblk.loadBlkData(ctx)
	if err != nil {
		return nil, err
	}

	l := int64(len(blkdata))
	if l > bf.blksize {
		return nil, fmt.Errorf("invalid blk: size = %d (> blksize = %d)", l, bf.blksize)
	}

	// append trailing \0 to data to reach .blksize
	if l < bf.blksize {
		d := make([]byte, bf.blksize)
		copy(d, blkdata)
		blkdata = d
	}

	zblk.bindZFile(bf, blk)

	//log.Printf("ZBigFile.loadblk(%d) -> %dB", blk, len(blkdata))
	return blkdata, nil
}

// Size returns whole file size.
func (bf *ZBigFile) Size(ctx context.Context) (_ int64, err error) {
	defer xerr.Contextf(&err, "bigfile %s: size", bf.POid())

	err = bf.PActivate(ctx)
	if err != nil {
		return 0, err
	}
	defer bf.PDeactivate()

	tailblk, ok, err := bf.blktab.MaxKey(ctx)
	if err != nil {
		return 0, err
	}
	if !ok {
		return 0, nil
	}

	size := (tailblk + 1) * bf.blksize
	if size / bf.blksize != tailblk + 1 {
		return 0, syscall.EFBIG // overflow
	}

	return size, nil
}


// ----------------------------------------

func init() {
	t := reflect.TypeOf
	zodb.RegisterClass(zwendelin + ".ZBlk0",    t(ZBlk0{}),    t(zBlk0State{}))
	zodb.RegisterClass(zwendelin + ".ZBlk1",    t(ZBlk1{}),    t(zBlk1State{}))
	zodb.RegisterClass(zwendelin + ".ZData",    t(ZData{}),    t(zDataState{}))
	zodb.RegisterClass(zwendelin + ".ZBigFile", t(ZBigFile{}), t(zBigFileState{}))

	// backward compatibility
	zodb.RegisterClassAlias(zwendelin + ".ZBlk", zwendelin + ".ZBlk0")
}