package fuse

import (
	"fmt"
	"io"
	"log"
	"os"
	"strings"
	"sync"
	"time"
	"unsafe"

	"github.com/hanwen/go-fuse/raw"
	"github.com/hanwen/go-fuse/splice"
)

const (
	// The kernel caps writes at 128k.
	MAX_KERNEL_WRITE = 128 * 1024
)

// MountState contains the logic for reading from the FUSE device and
// translating it to RawFileSystem interface calls.
type MountState struct {
	// Empty if unmounted.
	mountPoint string
	fileSystem RawFileSystem

	// I/O with kernel and daemon.
	mountFile *os.File

	// Dump debug info onto stdout.
	Debug bool

	// For efficient reads and writes.
	buffers *BufferPoolImpl

	latencies *LatencyMap

	opts           *MountOptions
	kernelSettings raw.InitIn

	reqMu               sync.Mutex
	reqPool             []*request
	readPool            [][]byte
	reqReaders          int
	outstandingReadBufs int

	canSplice bool
	loops     sync.WaitGroup
}

func (ms *MountState) KernelSettings() raw.InitIn {
	return ms.kernelSettings
}

func (ms *MountState) MountPoint() string {
	return ms.mountPoint
}

// Mount filesystem on mountPoint.
func (ms *MountState) Mount(mountPoint string, opts *MountOptions) error {
	if opts == nil {
		opts = &MountOptions{
			MaxBackground: _DEFAULT_BACKGROUND_TASKS,
		}
	}
	o := *opts
	if o.MaxWrite < 0 {
		o.MaxWrite = 0
	}
	if o.MaxWrite == 0 {
		o.MaxWrite = 1 << 16
	}
	if o.MaxWrite > MAX_KERNEL_WRITE {
		o.MaxWrite = MAX_KERNEL_WRITE
	}
	opts = &o
	ms.opts = &o

	optStrs := opts.Options
	if opts.AllowOther {
		optStrs = append(optStrs, "allow_other")
	}

	file, mp, err := mount(mountPoint, strings.Join(optStrs, ","))
	if err != nil {
		return err
	}
	initParams := RawFsInit{
		InodeNotify: func(n *raw.NotifyInvalInodeOut) Status {
			return ms.writeInodeNotify(n)
		},
		EntryNotify: func(parent uint64, n string) Status {
			return ms.writeEntryNotify(parent, n)
		},
	}
	ms.fileSystem.Init(&initParams)
	ms.mountPoint = mp
	ms.mountFile = file
	return nil
}

func (ms *MountState) SetRecordStatistics(record bool) {
	if record {
		ms.latencies = NewLatencyMap()
	} else {
		ms.latencies = nil
	}
}

func (ms *MountState) Unmount() (err error) {
	if ms.mountPoint == "" {
		return nil
	}
	delay := time.Duration(0)
	for try := 0; try < 5; try++ {
		err = unmount(ms.mountPoint)
		if err == nil {
			break
		}

		// Sleep for a bit. This is not pretty, but there is
		// no way we can be certain that the kernel thinks all
		// open files have already been closed.
		delay = 2*delay + 5*time.Millisecond
		time.Sleep(delay)
	}
	// Wait for event loops to exit.
	ms.loops.Wait()
	ms.mountPoint = ""
	return err
}

func NewMountState(fs RawFileSystem) *MountState {
	ms := new(MountState)
	ms.mountPoint = ""
	ms.fileSystem = fs
	ms.buffers = NewBufferPool()
	return ms
}

func (ms *MountState) Latencies() map[string]float64 {
	if ms.latencies == nil {
		return nil
	}
	return ms.latencies.Latencies(1e-3)
}

func (ms *MountState) OperationCounts() map[string]int {
	if ms.latencies == nil {
		return nil
	}
	return ms.latencies.Counts()
}

func (ms *MountState) BufferPoolStats() string {
	s := ms.buffers.String()

	var r int
	ms.reqMu.Lock()
	r = len(ms.readPool) + ms.reqReaders
	ms.reqMu.Unlock()

	s += fmt.Sprintf(" read buffers: %d (sz %d )",
		r, ms.opts.MaxWrite/PAGESIZE+1)
	return s
}

// What is a good number?  Maybe the number of CPUs?
const _MAX_READERS = 2

// Returns a new request, or error. In case exitIdle is given, returns
// nil, OK if we have too many readers already.
func (ms *MountState) readRequest(exitIdle bool) (req *request, code Status) {
	var dest []byte

	ms.reqMu.Lock()
	if ms.reqReaders > _MAX_READERS {
		ms.reqMu.Unlock()
		return nil, OK
	}
	l := len(ms.reqPool)
	if l > 0 {
		req = ms.reqPool[l-1]
		ms.reqPool = ms.reqPool[:l-1]
	} else {
		req = new(request)
	}
	l = len(ms.readPool)
	if l > 0 {
		dest = ms.readPool[l-1]
		ms.readPool = ms.readPool[:l-1]
	} else {
		dest = make([]byte, ms.opts.MaxWrite+PAGESIZE)
	}
	ms.outstandingReadBufs++
	ms.reqReaders++
	ms.reqMu.Unlock()

	n, err := ms.mountFile.Read(dest)
	if err != nil {
		code = ToStatus(err)
		ms.reqMu.Lock()
		ms.reqPool = append(ms.reqPool, req)
		ms.reqReaders--
		ms.reqMu.Unlock()
		return nil, code
	}

	gobbled := req.setInput(dest[:n])

	ms.reqMu.Lock()
	if !gobbled {
		ms.outstandingReadBufs--
		ms.readPool = append(ms.readPool, dest)
		dest = nil
	}
	ms.reqReaders--
	if ms.reqReaders <= 0 {
		ms.loops.Add(1)
		go ms.loop(true)
	}
	ms.reqMu.Unlock()

	return req, OK
}

func (ms *MountState) returnRequest(req *request) {
	ms.recordStats(req)

	if req.bufferPoolOutputBuf != nil {
		ms.buffers.FreeBuffer(req.bufferPoolOutputBuf)
		req.bufferPoolOutputBuf = nil
	}

	req.clear()
	ms.reqMu.Lock()
	if req.bufferPoolOutputBuf != nil {
		ms.readPool = append(ms.readPool, req.bufferPoolInputBuf)
		ms.outstandingReadBufs--
		req.bufferPoolInputBuf = nil
	}
	ms.reqPool = append(ms.reqPool, req)
	ms.reqMu.Unlock()
}

func (ms *MountState) recordStats(req *request) {
	if ms.latencies != nil {
		endNs := time.Now().UnixNano()
		dt := endNs - req.startNs

		opname := operationName(req.inHeader.Opcode)
		ms.latencies.AddMany(
			[]LatencyArg{
				{opname, "", dt},
				{opname + "-write", "", endNs - req.preWriteNs}})
	}
}

// Loop initiates the FUSE loop. Normally, callers should run Loop()
// and wait for it to exit, but tests will want to run this in a
// goroutine.
//
// Each filesystem operation executes in a separate goroutine.
func (ms *MountState) Loop() {
	ms.loops.Add(1)
	ms.loop(false)
	ms.loops.Wait()
	ms.mountFile.Close()
}

func (ms *MountState) loop(exitIdle bool) {
	defer ms.loops.Done()
exit:
	for {
		req, errNo := ms.readRequest(exitIdle)
		switch errNo {
		case OK:
			if req == nil {
				break exit
			}
		case ENOENT:
			continue
		case ENODEV:
			// unmount
			break exit
		default: // some other error?
			log.Printf("Failed to read from fuse conn: %v", errNo)
			break exit
		}

		if ms.latencies != nil {
			req.startNs = time.Now().UnixNano()
		}
		ms.handleRequest(req)
	}
}

func (ms *MountState) handleRequest(req *request) {
	req.parse()
	if req.handler == nil {
		req.status = ENOSYS
	}

	if req.status.Ok() && ms.Debug {
		log.Println(req.InputDebug())
	}

	if req.status.Ok() && req.handler.Func == nil {
		log.Printf("Unimplemented opcode %v", operationName(req.inHeader.Opcode))
		req.status = ENOSYS
	}

	if req.status.Ok() {
		req.handler.Func(ms, req)
	}

	errNo := ms.write(req)
	if errNo != 0 {
		log.Printf("writer: Write/Writev failed, err: %v. opcode: %v",
			errNo, operationName(req.inHeader.Opcode))
	}
	ms.returnRequest(req)
}

func (ms *MountState) AllocOut(req *request, size uint32) []byte {
	if cap(req.bufferPoolOutputBuf) >= int(size) {
		req.bufferPoolOutputBuf = req.bufferPoolOutputBuf[:size]
		return req.bufferPoolOutputBuf
	}
	if req.bufferPoolOutputBuf != nil {
		ms.buffers.FreeBuffer(req.bufferPoolOutputBuf)
	}
	req.bufferPoolOutputBuf = ms.buffers.AllocBuffer(size)
	return req.bufferPoolOutputBuf
}

func (ms *MountState) write(req *request) Status {
	// Forget does not wait for reply.
	if req.inHeader.Opcode == _OP_FORGET || req.inHeader.Opcode == _OP_BATCH_FORGET {
		return OK
	}

	header := req.serializeHeader(req.flatData.Size())
	if ms.Debug {
		log.Println(req.OutputDebug())
	}

	if ms.latencies != nil {
		req.preWriteNs = time.Now().UnixNano()
	}

	if header == nil {
		return OK
	}

	if req.flatData.Size() == 0 {
		_, err := ms.mountFile.Write(header)
		return ToStatus(err)
	}

	if req.flatData.FdSize > 0 {
		if err := ms.TrySplice(header, req, req.flatData.Fd, req.flatData.FdSize, req.flatData.FdOff); err == nil {
			return OK
		} else {
			log.Println("Splice error", err)
			buf := ms.AllocOut(req, uint32(req.flatData.FdSize))
			req.flatData.Read(buf)
			header = req.serializeHeader(req.flatData.Size())
		}
	}

	_, err := Writev(int(ms.mountFile.Fd()), [][]byte{header, req.flatData.Data})
	return ToStatus(err)
}

func (ms *MountState) TrySplice(header []byte, req *request,
	fd uintptr, size int, off int64) error {
	finalSplice, err := splice.Get()
	if err != nil {
		return err
	}
	defer splice.Done(finalSplice)

	total := len(header) + size
	if !finalSplice.Grow(total) {
		return fmt.Errorf("splice.Grow failed.")
	}

	_, err = finalSplice.Write(header)
	if err != nil {
		return err
	}

	var n int
	if off < 0 {
		n, err = finalSplice.LoadFrom(fd, size)
	} else {
		n, err = finalSplice.LoadFromAt(fd, size, off)
	}
	if err == io.EOF || (err == nil && n < size && n > 0) {
		discard := make([]byte, len(header))
		_, err = finalSplice.Read(discard)
		if err != nil {
			return err
		}

		// TODO - fix debug output.
		header = req.serializeHeader(n)
		return ms.TrySplice(header, req, fd, n, -1)
	}

	if err != nil {
		// TODO - extract the data from splice.
		return err
	}

	if n != size {
		return fmt.Errorf("splice: wrote %d, want %d", n, req.flatData.FdSize)
	}

	_, err = finalSplice.WriteTo(ms.mountFile.Fd(), total)
	if err != nil {
		return fmt.Errorf("splice write: %v", err)
	}
	return nil
}

func (ms *MountState) writeInodeNotify(entry *raw.NotifyInvalInodeOut) Status {
	req := request{
		inHeader: &raw.InHeader{
			Opcode: _OP_NOTIFY_INODE,
		},
		handler: operationHandlers[_OP_NOTIFY_INODE],
		status:  raw.NOTIFY_INVAL_INODE,
	}
	req.outData = unsafe.Pointer(entry)
	result := ms.write(&req)

	if ms.Debug {
		log.Println("Response: INODE_NOTIFY", result)
	}
	return result
}

func (ms *MountState) writeEntryNotify(parent uint64, name string) Status {
	req := request{
		inHeader: &raw.InHeader{
			Opcode: _OP_NOTIFY_ENTRY,
		},
		handler: operationHandlers[_OP_NOTIFY_ENTRY],
		status:  raw.NOTIFY_INVAL_ENTRY,
	}
	entry := &raw.NotifyInvalEntryOut{
		Parent:  parent,
		NameLen: uint32(len(name)),
	}

	// Many versions of FUSE generate stacktraces if the
	// terminating null byte is missing.
	nameBytes := []byte(name + "\000")
	req.outData = unsafe.Pointer(entry)
	req.flatData.Data = nameBytes
	result := ms.write(&req)

	if ms.Debug {
		log.Printf("Response: ENTRY_NOTIFY: %v", result)
	}
	return result
}