mountstate.go 11.3 KB
Newer Older
1 2 3
package fuse

import (
4
	"fmt"
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
5
	"io"
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
6
	"log"
7
	"os"
8
	"strings"
9
	"sync"
10
	"time"
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
11
	"unsafe"
12 13

	"github.com/hanwen/go-fuse/raw"
14
	"github.com/hanwen/go-fuse/splice"
15 16 17
)

const (
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
18 19
	// The kernel caps writes at 128k.
	MAX_KERNEL_WRITE = 128 * 1024
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
)

// MountState contains the logic for reading from the FUSE device and
// translating it to RawFileSystem interface calls.
type MountState struct {
	// Empty if unmounted.
	mountPoint string
	fileSystem RawFileSystem

	// I/O with kernel and daemon.
	mountFile *os.File

	// Dump debug info onto stdout.
	Debug bool

35
	latencies *LatencyMap
36

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
37
	opts           *MountOptions
38
	kernelSettings raw.InitIn
39

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
40 41 42 43
	reqMu               sync.Mutex
	reqPool             []*request
	readPool            [][]byte
	reqReaders          int
44
	outstandingReadBufs int
45

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
46 47
	canSplice bool
	loops     sync.WaitGroup
48 49
}

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
// Use this method to make synchronization between accessing a
// filesystem object through the operating system, and accessing a
// filesystem internally, so thread-sanitizer does not get confused.
//
//   fs := SomeFSObj{ReadCalled: false}
//   ms := NewMountState(fs)
//   ms.Mount("/mnt", nil)
//   ..
//   ioutil.ReadFile("/mnt/file")
//
//   mountstate.ThreadSanitizerSync()
//   if fs.ReadCalled { ...  // no race condition here.
//
func (ms *MountState) ThreadSanitizerSync() {
	ms.reqMu.Lock()
	ms.reqMu.Unlock()
}

68 69
func (ms *MountState) KernelSettings() raw.InitIn {
	return ms.kernelSettings
70 71
}

72 73
func (ms *MountState) MountPoint() string {
	return ms.mountPoint
74 75
}

76 77
const _MAX_NAME_LEN = 20

78
// Mount filesystem on mountPoint.
79
func (ms *MountState) Mount(mountPoint string, opts *MountOptions) error {
80 81 82 83 84
	if opts == nil {
		opts = &MountOptions{
			MaxBackground: _DEFAULT_BACKGROUND_TASKS,
		}
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
85
	o := *opts
86 87 88
	if o.Buffers == nil {
		o.Buffers = defaultBufferPool
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
89 90 91 92 93 94 95 96 97 98
	if o.MaxWrite < 0 {
		o.MaxWrite = 0
	}
	if o.MaxWrite == 0 {
		o.MaxWrite = 1 << 16
	}
	if o.MaxWrite > MAX_KERNEL_WRITE {
		o.MaxWrite = MAX_KERNEL_WRITE
	}
	opts = &o
99
	ms.opts = &o
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
100

101
	optStrs := opts.Options
102
	if opts.AllowOther {
103
		optStrs = append(optStrs, "allow_other")
104
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
105

106 107 108 109 110 111 112 113 114
	name := opts.Name
	if name == "" {
		name = ms.fileSystem.String()
		l := len(name)
		if l > _MAX_NAME_LEN {
			l = _MAX_NAME_LEN
		}
		name = strings.Replace(name[:l], ",", ";", -1)
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
115
	optStrs = append(optStrs, "subtype="+name)
116

117
	file, mp, err := mount(mountPoint, strings.Join(optStrs, ","))
118 119 120
	if err != nil {
		return err
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
121
	initParams := RawFsInit{
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
122
		InodeNotify: func(n *raw.NotifyInvalInodeOut) Status {
123
			return ms.writeInodeNotify(n)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
124
		},
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
125
		EntryNotify: func(parent uint64, n string) Status {
126
			return ms.writeEntryNotify(parent, n)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
127
		},
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
128 129 130
		DeleteNotify: func(parent uint64, child uint64, n string) Status {
			return ms.writeDeleteNotify(parent, child, n)
		},
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
131
	}
132 133 134
	ms.fileSystem.Init(&initParams)
	ms.mountPoint = mp
	ms.mountFile = file
135 136 137
	return nil
}

138
func (ms *MountState) SetRecordStatistics(record bool) {
139
	if record {
140
		ms.latencies = NewLatencyMap()
141
	} else {
142
		ms.latencies = nil
143 144 145
	}
}

146 147
func (ms *MountState) Unmount() (err error) {
	if ms.mountPoint == "" {
148 149
		return nil
	}
150
	delay := time.Duration(0)
151
	for try := 0; try < 5; try++ {
152
		err = unmount(ms.mountPoint)
153 154 155
		if err == nil {
			break
		}
156 157
		
		fmt.Fprintf(os.Stderr, "umount failed; retrying\n")
158 159 160
		// Sleep for a bit. This is not pretty, but there is
		// no way we can be certain that the kernel thinks all
		// open files have already been closed.
161
		delay = 2*delay + 5*time.Millisecond
162 163
		time.Sleep(delay)
	}
164 165
	// Wait for event loops to exit.
	ms.loops.Wait()
166
	ms.mountPoint = ""
167
	return err
168 169 170
}

func NewMountState(fs RawFileSystem) *MountState {
171 172 173 174
	ms := new(MountState)
	ms.mountPoint = ""
	ms.fileSystem = fs
	return ms
175 176
}

177 178
func (ms *MountState) Latencies() *LatencyMap {
	return ms.latencies
179 180
}

181
func (ms *MountState) BufferPoolStats() string {
182
	s := ms.opts.Buffers.String()
183

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
184
	var r int
185 186 187 188
	ms.reqMu.Lock()
	r = len(ms.readPool) + ms.reqReaders
	ms.reqMu.Unlock()

189
	s += fmt.Sprintf(" read buffers: %d (sz %d )",
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
190
		r, ms.opts.MaxWrite/PAGESIZE+1)
191
	return s
192 193
}

194 195 196 197 198 199
// What is a good number?  Maybe the number of CPUs?
const _MAX_READERS = 2

// Returns a new request, or error. In case exitIdle is given, returns
// nil, OK if we have too many readers already.
func (ms *MountState) readRequest(exitIdle bool) (req *request, code Status) {
200 201 202
	var dest []byte

	ms.reqMu.Lock()
203 204 205 206
	if ms.reqReaders > _MAX_READERS {
		ms.reqMu.Unlock()
		return nil, OK
	}
207 208 209 210 211 212 213 214 215 216 217 218
	l := len(ms.reqPool)
	if l > 0 {
		req = ms.reqPool[l-1]
		ms.reqPool = ms.reqPool[:l-1]
	} else {
		req = new(request)
	}
	l = len(ms.readPool)
	if l > 0 {
		dest = ms.readPool[l-1]
		ms.readPool = ms.readPool[:l-1]
	} else {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
219
		dest = make([]byte, ms.opts.MaxWrite+PAGESIZE)
220
	}
221
	ms.outstandingReadBufs++
222 223 224 225 226 227 228 229 230 231 232 233 234
	ms.reqReaders++
	ms.reqMu.Unlock()

	n, err := ms.mountFile.Read(dest)
	if err != nil {
		code = ToStatus(err)
		ms.reqMu.Lock()
		ms.reqPool = append(ms.reqPool, req)
		ms.reqReaders--
		ms.reqMu.Unlock()
		return nil, code
	}

235 236 237
	if ms.latencies != nil {
		req.startNs = time.Now().UnixNano()
	}
238 239 240 241
	gobbled := req.setInput(dest[:n])

	ms.reqMu.Lock()
	if !gobbled {
242
		ms.outstandingReadBufs--
243 244 245 246
		ms.readPool = append(ms.readPool, dest)
		dest = nil
	}
	ms.reqReaders--
247 248 249 250
	if ms.reqReaders <= 0 {
		ms.loops.Add(1)
		go ms.loop(true)
	}
251
	ms.reqMu.Unlock()
252

253 254 255 256 257
	return req, OK
}

func (ms *MountState) returnRequest(req *request) {
	ms.recordStats(req)
258

259
	if req.bufferPoolOutputBuf != nil {
260
		ms.opts.Buffers.FreeBuffer(req.bufferPoolOutputBuf)
261 262 263 264 265 266 267
		req.bufferPoolOutputBuf = nil
	}

	req.clear()
	ms.reqMu.Lock()
	if req.bufferPoolOutputBuf != nil {
		ms.readPool = append(ms.readPool, req.bufferPoolInputBuf)
268
		ms.outstandingReadBufs--
269
		req.bufferPoolInputBuf = nil
270
	}
271 272
	ms.reqPool = append(ms.reqPool, req)
	ms.reqMu.Unlock()
273 274
}

275 276
func (ms *MountState) recordStats(req *request) {
	if ms.latencies != nil {
277
		dt := time.Now().UnixNano() - req.startNs
278
		opname := operationName(req.inHeader.Opcode)
279
		ms.latencies.Add(opname, dt)
280 281 282 283 284 285 286
	}
}

// Loop initiates the FUSE loop. Normally, callers should run Loop()
// and wait for it to exit, but tests will want to run this in a
// goroutine.
//
287
// Each filesystem operation executes in a separate goroutine.
288
func (ms *MountState) Loop() {
289 290 291
	ms.loops.Add(1)
	ms.loop(false)
	ms.loops.Wait()
292
	ms.mountFile.Close()
293 294
}

295 296
func (ms *MountState) loop(exitIdle bool) {
	defer ms.loops.Done()
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
297
exit:
298
	for {
299
		req, errNo := ms.readRequest(exitIdle)
300 301
		switch errNo {
		case OK:
302 303 304
			if req == nil {
				break exit
			}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
305
		case ENOENT:
306 307 308 309
			continue
		case ENODEV:
			// unmount
			break exit
310
		default: // some other error?
311
			log.Printf("Failed to read from fuse conn: %v", errNo)
312
			break exit
313
		}
314

315
		ms.handleRequest(req)
316
	}
317
}
318

319
func (ms *MountState) handleRequest(req *request) {
320 321 322 323 324
	req.parse()
	if req.handler == nil {
		req.status = ENOSYS
	}

325
	if req.status.Ok() && ms.Debug {
326 327 328 329
		log.Println(req.InputDebug())
	}

	if req.status.Ok() && req.handler.Func == nil {
330
		log.Printf("Unimplemented opcode %v", operationName(req.inHeader.Opcode))
331 332 333 334
		req.status = ENOSYS
	}

	if req.status.Ok() {
335
		req.handler.Func(ms, req)
336 337
	}

338
	errNo := ms.write(req)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
339
	if errNo != 0 {
340 341
		log.Printf("writer: Write/Writev failed, err: %v. opcode: %v",
			errNo, operationName(req.inHeader.Opcode))
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
342
	}
343
	ms.returnRequest(req)
344 345
}

346
func (ms *MountState) AllocOut(req *request, size uint32) []byte {
347 348
	if cap(req.bufferPoolOutputBuf) >= int(size) {
		req.bufferPoolOutputBuf = req.bufferPoolOutputBuf[:size]
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
349
		return req.bufferPoolOutputBuf
350
	}
351
	if req.bufferPoolOutputBuf != nil {
352
		ms.opts.Buffers.FreeBuffer(req.bufferPoolOutputBuf)
353
	}
354
	req.bufferPoolOutputBuf = ms.opts.Buffers.AllocBuffer(size)
355 356 357
	return req.bufferPoolOutputBuf
}

358
func (ms *MountState) write(req *request) Status {
359
	// Forget does not wait for reply.
360
	if req.inHeader.Opcode == _OP_FORGET || req.inHeader.Opcode == _OP_BATCH_FORGET {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
361
		return OK
362 363
	}

364
	header := req.serializeHeader(req.flatDataSize())
365
	if ms.Debug {
366 367 368
		log.Println(req.OutputDebug())
	}

369
	if header == nil {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
370
		return OK
371
	}
372

373
	if req.flatDataSize() == 0 {
374 375 376 377
		_, err := ms.mountFile.Write(header)
		return ToStatus(err)
	}

378 379
	if req.fdData != nil {
		if err := ms.TrySplice(header, req, req.fdData); err == nil {
380 381
			return OK
		} else {
382
			log.Println("TrySplice:", err)
383 384
			sz := req.flatDataSize()
			buf := ms.AllocOut(req, uint32(sz))
385
			req.flatData, req.status = req.fdData.Bytes(buf)
386
			header = req.serializeHeader(len(req.flatData))
387
		}
388 389
	}

390
	_, err := Writev(int(ms.mountFile.Fd()), [][]byte{header, req.flatData})
391
	return ToStatus(err)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
392 393
}

394 395
func (ms *MountState) TrySplice(header []byte, req *request, fdData *ReadResultFd) error {
	pair, err := splice.Get()
396 397 398
	if err != nil {
		return err
	}
399
	defer splice.Done(pair)
400

401 402
	total := len(header) + fdData.Size()
	if !pair.Grow(total) {
403 404
		return fmt.Errorf("splice.Grow failed.")
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
405

406
	_, err = pair.Write(header)
407 408 409 410
	if err != nil {
		return err
	}

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
411
	var n int
412 413
	if fdData.Off < 0 {
		n, err = pair.LoadFrom(fdData.Fd, fdData.Size())
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
414
	} else {
415
		n, err = pair.LoadFromAt(fdData.Fd, fdData.Size(), fdData.Off)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
416
	}
417
	if err == io.EOF || (err == nil && n < fdData.Size()) {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
418
		discard := make([]byte, len(header))
419
		_, err = pair.Read(discard)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
420 421 422 423
		if err != nil {
			return err
		}

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
424
		header = req.serializeHeader(n)
425 426

		newFd := ReadResultFd{
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
427
			Fd:  pair.ReadFd(),
428
			Off: -1,
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
429
			Sz:  n,
430 431
		}
		return ms.TrySplice(header, req, &newFd)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
432 433
	}

434 435 436 437
	if err != nil {
		// TODO - extract the data from splice.
		return err
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
438

439 440
	if n != fdData.Size() {
		return fmt.Errorf("wrote %d, want %d", n, fdData.Size())
441 442
	}

443
	_, err = pair.WriteTo(ms.mountFile.Fd(), total)
444
	if err != nil {
445
		return err
446 447
	}
	return nil
448 449
}

450
func (ms *MountState) writeInodeNotify(entry *raw.NotifyInvalInodeOut) Status {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
451
	req := request{
452 453
		inHeader: &raw.InHeader{
			Opcode: _OP_NOTIFY_INODE,
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
454 455
		},
		handler: operationHandlers[_OP_NOTIFY_INODE],
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
456
		status:  raw.NOTIFY_INVAL_INODE,
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
457 458
	}
	req.outData = unsafe.Pointer(entry)
459
	result := ms.write(&req)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
460

461
	if ms.Debug {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
462
		log.Println("Response: INODE_NOTIFY", result)
463
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
464
	return result
465
}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
466

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
467 468 469 470
func (ms *MountState) writeDeleteNotify(parent uint64, child uint64, name string) Status {
	if ms.kernelSettings.Minor < 18 {
		return ms.writeEntryNotify(parent, name)
	}
471

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
472 473 474 475 476 477 478 479 480
	req := request{
		inHeader: &raw.InHeader{
			Opcode: _OP_NOTIFY_DELETE,
		},
		handler: operationHandlers[_OP_NOTIFY_DELETE],
		status:  raw.NOTIFY_INVAL_DELETE,
	}
	entry := &raw.NotifyInvalDeleteOut{
		Parent:  parent,
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
481
		Child:   child,
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
		NameLen: uint32(len(name)),
	}

	// Many versions of FUSE generate stacktraces if the
	// terminating null byte is missing.
	nameBytes := make([]byte, len(name)+1)
	copy(nameBytes, name)
	nameBytes[len(nameBytes)-1] = '\000'
	req.outData = unsafe.Pointer(entry)
	req.flatData = nameBytes
	result := ms.write(&req)

	if ms.Debug {
		log.Printf("Response: DELETE_NOTIFY: %v", result)
	}
	return result
}

500
func (ms *MountState) writeEntryNotify(parent uint64, name string) Status {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
501
	req := request{
502 503
		inHeader: &raw.InHeader{
			Opcode: _OP_NOTIFY_ENTRY,
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
504 505
		},
		handler: operationHandlers[_OP_NOTIFY_ENTRY],
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
506
		status:  raw.NOTIFY_INVAL_ENTRY,
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
507
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
508
	entry := &raw.NotifyInvalEntryOut{
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
509
		Parent:  parent,
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
510 511 512 513 514
		NameLen: uint32(len(name)),
	}

	// Many versions of FUSE generate stacktraces if the
	// terminating null byte is missing.
515 516 517
	nameBytes := make([]byte, len(name)+1)
	copy(nameBytes, name)
	nameBytes[len(nameBytes)-1] = '\000'
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
518
	req.outData = unsafe.Pointer(entry)
519
	req.flatData = nameBytes
520
	result := ms.write(&req)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
521

522
	if ms.Debug {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
523
		log.Printf("Response: ENTRY_NOTIFY: %v", result)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
524 525 526
	}
	return result
}
527 528

var defaultBufferPool BufferPool
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
529

530 531 532
func init() {
	defaultBufferPool = NewBufferPool()
}