wcfs.go 72.6 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1 2
// Copyright (C) 2018-2019  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
Kirill Smelkov committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
20
// Program wcfs provides filesystem server with file data backed by wendelin.core arrays.
Kirill Smelkov's avatar
Kirill Smelkov committed
21 22 23 24 25
//
// Intro
//
// Each wendelin.core array (ZBigArray) is actually a linear file (ZBigFile)
// and array metadata like dtype, shape and strides associated with it. This
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
// program exposes as files only ZBigFile data and leaves rest of
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
27
// array-specific handling to clients. Every ZBigFile is exposed as one separate
Kirill Smelkov's avatar
Kirill Smelkov committed
28 29 30
// file that represents whole ZBigFile's data.
//
// For a client, the primary way to access a bigfile should be to mmap
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
31
// head/bigfile/<bigfileX> which represents always latest bigfile data.
Kirill Smelkov's avatar
Kirill Smelkov committed
32 33 34 35 36 37 38 39 40 41 42 43 44
// Clients that want to get isolation guarantee should subscribe for
// invalidations and re-mmap invalidated regions to file with pinned bigfile revision for
// the duration of their transaction. See "Invalidation protocol" for details.
//
// In the usual situation when bigfiles are big, and there are O(1)/δt updates,
// there should be no need for any cache besides shared kernel cache of latest
// bigfile data.
//
//
// Filesystem organization
//
// Top-level structure of provided filesystem is as follows:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
45
//	head/			; latest database view
Kirill Smelkov's avatar
Kirill Smelkov committed
46
//		...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
47 48 49
//	@<rev1>/		; database view as of revision <revX>
//		...
//	@<rev2>/
Kirill Smelkov's avatar
Kirill Smelkov committed
50
//		...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51
//	...
Kirill Smelkov's avatar
Kirill Smelkov committed
52
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53
// where head/ represents latest data as stored in upstream ZODB, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
54
// @<revX>/ represents data as of database revision <revX>.
Kirill Smelkov's avatar
Kirill Smelkov committed
55 56 57
//
// head/ has the following structure:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
58 59 60 61
//	head/
//		at			; data inside head/ is as of this ZODB transaction
//		watch			; channel for bigfile invalidations
//		bigfile/		; bigfiles' data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62 63
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
64
//			...
Kirill Smelkov's avatar
Kirill Smelkov committed
65
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66 67 68 69 70 71
// where /bigfile/<bigfileX> represents latest bigfile data as stored in
// upstream ZODB. As there can be some lag receiving updates from the database,
// /at describes precisely ZODB state for which bigfile data is currently
// exposed. Whenever bigfile data is changed in upstream ZODB, information
// about the changes is first propagated to /watch, and only after that
// /bigfile/<bigfileX> is updated. See "Invalidation protocol" for details.
Kirill Smelkov's avatar
Kirill Smelkov committed
72
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
73
// @<revX>/ has the following structure:
Kirill Smelkov's avatar
Kirill Smelkov committed
74
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
75 76 77
//	@<revX>/
//		at
//		bigfile/		; bigfiles' data as of revision <revX>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
78 79
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80
//			...
Kirill Smelkov's avatar
Kirill Smelkov committed
81
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
82
// where /bigfile/<bigfileX> represent bigfile data as of revision <revX>.
Kirill Smelkov's avatar
Kirill Smelkov committed
83
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
84
// Unless accessed {head,@<revX>}/bigfile/<bigfileX> are not automatically visible in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
85
// wcfs filesystem. Similarly @<revX>/ become visible only after access.
Kirill Smelkov's avatar
Kirill Smelkov committed
86 87 88 89
//
//
// Invalidation protocol
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
90
// In order to support isolation, wcfs implements invalidation protocol that
Kirill Smelkov's avatar
Kirill Smelkov committed
91 92
// must be cooperatively followed by both wcfs and client.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
93
// First, client mmaps latest bigfile, but does not access it
Kirill Smelkov's avatar
Kirill Smelkov committed
94
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
95
//	mmap(head/bigfile/<bigfileX>)
Kirill Smelkov's avatar
Kirill Smelkov committed
96
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97 98
// Then client opens head/watch and tells wcfs through it for which ZODB state
// it wants to get bigfile's view.
Kirill Smelkov's avatar
Kirill Smelkov committed
99
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
100
//	C: 1 watch <bigfileX> @<at>
Kirill Smelkov's avatar
Kirill Smelkov committed
101
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
102 103
// The server then, after potentially sending initial pin messages (see below),
// reports either success or failure:
Kirill Smelkov's avatar
Kirill Smelkov committed
104
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
105 106
//	S: 1 ok
//	S: 1 error ...		; if <at> is too far away back from head/at
Kirill Smelkov's avatar
Kirill Smelkov committed
107
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
108 109 110 111 112
// The server sends "ok" reply only after head/at is ≥ requested <at>, and
// only after all initial pin messages are fully acknowledged by the client.
// The client can start to use mmapped data after it gets "ok".
// The server sends "error" reply if requested <at> is too far away back from
// head/at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
113 114 115
// XXX other errors are possible (e.g. "no such file", or error handling pin).
// XXX error handling pin -> then client is killed?
// XXX if not - specify that watch state is lost after error.
Kirill Smelkov's avatar
Kirill Smelkov committed
116
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
117 118 119
// Upon watch request, either initially, or after sending "ok", the server will be notifying the
// client about file blocks that client needs to pin in order to observe file's
// data as of <at> revision:
Kirill Smelkov's avatar
Kirill Smelkov committed
120
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
121 122 123
// The filesystem server itself receives information about changed data from
// ZODB server through regular ZODB invalidation channel (as it is ZODB client
// itself). Then, separately for each changed file block, before actually
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
124 125 126
// updating head/bigfile/<bigfileX> content, it notifies through opened
// head/watch links to clients, that had requested it (separately to each
// client), about the changes:
Kirill Smelkov's avatar
Kirill Smelkov committed
127
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
128
//	S: <2·k> pin <bigfileX> #<blk> @<rev_max>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
129
//	XXX @head means unpin.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
130
//	XXX or use `unpin <bigfileX> #<blk>` ?
Kirill Smelkov's avatar
Kirill Smelkov committed
131
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
132 133
// and waits until all clients confirm that changed file block can be updated
// in global OS cache.
Kirill Smelkov's avatar
Kirill Smelkov committed
134
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
135
// The client in turn should now re-mmap requested to be pinned block to bigfile@<rev_max>
Kirill Smelkov's avatar
Kirill Smelkov committed
136
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137 138
//	# mmapped at address corresponding to #blk
//	mmap(@<rev_max>/bigfile/<bigfileX>, #blk, MAP_FIXED)
Kirill Smelkov's avatar
Kirill Smelkov committed
139 140 141
//
// and must send ack back to the server when it is done:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
142
//	C: <2·k> ack
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
143 144 145
//
// The server sends pin notifications only for file blocks, that are known to
// be potentially changed after client's <at>, and <rev_max> describes the
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
146
// upper bound for the block revision as of <at> database view:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
147
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
148
//	<rev_max> ≤ <at>	; block stays unchanged in (<rev_max>, <at>] range
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
149 150 151 152 153 154 155 156 157
//
// The server maintains short history tail of file changes to be able to
// support openings with <at> being slightly in the past compared to current
// head/at. The server might reject a watch request if <at> is too far away in
// the past from head/at. The client is advised to restart its transaction with
// more uptodate database view if it gets watch setup error.
//
// A later request from the client for the same <bigfileX> but with different
// <at>, overrides previous watch request for that file. A client can use "-"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
158
// instead of "@<at>" to stop watching a file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
159 160 161 162 163
//
// A single client can send several watch requests through single head/watch
// open, as well as it can use several head/watch opens simultaneously.
// The server sends pin notifications for all files requested to be watched via
// every head/watch open.
Kirill Smelkov's avatar
Kirill Smelkov committed
164
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
165 166 167 168
// Note: a client could use a single watch to manage its several views for the same
// file but with different <at>. This could be achieved via watching with
// @<at_min>, and then deciding internally which views needs to be adjusted and
// which views need not. Wcfs does not oblige clients to do so though, and a
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
169
// client is free to use as many head/watch openings as it needs to.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
170
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
171
// When clients are done with @<revX>/bigfile/<bigfileX> (i.e. client's
Kirill Smelkov's avatar
Kirill Smelkov committed
172
// transaction ends and array is unmapped), the server sees number of opened
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
173 174
// files to @<revX>/bigfile/<bigfileX> drops to zero, and automatically
// destroys @<revX>/bigfile/<bigfileX> after reasonable timeout.
Kirill Smelkov's avatar
Kirill Smelkov committed
175 176 177 178
//
//
// Protection against slow or faulty clients
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179 180 181 182
// If a client, on purpose or due to a bug or being stopped, is slow to respond
// with ack to file invalidation notification, it creates a problem because the
// server will become blocked waiting for pin acknowledgments, and thus all
// other clients, that try to work with the same file, will get stuck.
Kirill Smelkov's avatar
Kirill Smelkov committed
183
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
184 185
// The problem could be avoided, if wcfs would reside inside OS kernel and this
// way could be able to manipulate clients address space directly (then
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
186 187 188 189
// invalidation protocol won't be needed). It is also possible to imagine
// mechanism, where wcfs would synchronously change clients' address space via
// injecting trusted code and running it on client side via ptrace to adjust
// file mappings.
Kirill Smelkov's avatar
Kirill Smelkov committed
190
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
191 192 193 194 195
// However ptrace does not work when client thread is blocked under pagefault,
// and that is exactly what wcfs would need to do to process invalidations
// lazily, because eager invalidation processing results in prohibitively slow
// file opens. See internal wcfs overview for details about why ptrace
// cannot be used and why lazy invalidation processing is required.
Kirill Smelkov's avatar
Kirill Smelkov committed
196
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
197 198 199
// Lacking OS primitives to change address space of another process and not
// being able to work it around with ptrace in userspace, wcfs takes approach
// to kill a slow client on 30 seconds timeout by default.
Kirill Smelkov's avatar
Kirill Smelkov committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
//
//
// Writes
//
// As each bigfile is represented by 1 synthetic file, there can be several
// write schemes:
//
// 1. mmap(MAP_PRIVATE) + writeout by client
//
// In this scheme bigfile data is mmapped in MAP_PRIVATE mode, so that local
// user changes are not automatically propagated back to the file. When there
// is a need to commit, client investigates via some OS mechanism, e.g.
// /proc/self/pagemap or something similar, which pages of this mapping it
// modified. Knowing this it knows which data it dirtied and so can write this
// data back to ZODB itself, without filesystem server providing write support.
//
// 2. mmap(MAP_SHARED, PROT_READ) + write-tracking & writeout by client
//
// In this scheme bigfile data is mmaped in MAP_SHARED mode with read-only pages
// protection. Then whenever write fault occurs, client allocates RAM from
// shmfs, copies faulted page to it, and then mmaps RAM page with RW protection
// in place of original bigfile page. Writeout implementation should be similar
// to "1", only here client already knows the pages it dirtied, and this way
// there is no need to consult /proc/self/pagemap.
//
// The advantage of this scheme over mmap(MAP_PRIVATE) is that in case
// there are several in-process mappings of the same bigfile with overlapping
// in-file ranges, changes in one mapping will be visible in another mapping.
// Contrary: whenever a MAP_PRIVATE mapping is modified, the kernel COWs
// faulted page into a page completely private to this mapping, so that other
// MAP_PRIVATE mappings of this file, including ones created from the same
// process, do not see changes made to the first mapping.
//
// Since wendelin.core needs to provide coherency in between different slices
// of the same array, this is the mode wendelin.core actually uses.
//
// 3. write to wcfs
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
238
// TODO we later could implement "write-directly" mode where clients would write
Kirill Smelkov's avatar
Kirill Smelkov committed
239
// data directly into the file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
240
package main
Kirill Smelkov's avatar
Kirill Smelkov committed
241

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
242
// Wcfs organization
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
243
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
244
// Wcfs is a ZODB client that translates ZODB objects into OS files as would
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
245
// non-wcfs wendelin.core do for a ZBigFile. Contrary to non-wcfs wendelin.core,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
246
// it keeps bigfile data in shared OS cache efficiently. It is organized as follows:
247
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
248
// 1) 1 ZODB connection for "latest data" for whole filesystem (zhead).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
249 250
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 3) for head/bigfile/* the following invariant is maintained:
251
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
252
//	#blk ∈ OS file cache    =>    ZBlk(#blk) + all BTree/Bucket that lead to it  ∈ zhead cache(%)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
253
//	                              (ZBlk* in ghost state)
254
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
255 256
//	                        =>    all BTree/Bucket that lead to blk are tracked (XXX)
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
257
//    The invariant helps on invalidation: if we see a changed oid, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
258
//    zhead.cache.lookup(oid) = ø -> we know we don't have to invalidate OS
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
259 260
//    cache for any part of any file (even if oid relates to a file block - that
//    block is not cached and will trigger ZODB load on file read).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
261
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
262 263
//    XXX explain why tracked
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
264 265 266
//    Currently we maintain this invariant by simply never evicting ZBlk/LOBTree/LOBucket
//    objects from ZODB Connection cache. In the future we may want to try to
//    synchronize to kernel freeing its pagecache pages.
267
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
268
// 4) when we receive an invalidation message from ZODB - we process it and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
269
//    propagate invalidations to OS file cache of head/bigfile/*:
270
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
271
//	invalidation message: (tid↑, []oid)
272
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
273
//    4.1) zhead.cache.lookup(oid)			XXX -> δFtail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
274 275
//    4.2) ø: nothing to do - see invariant ^^^.
//    4.3) obj found:
276
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
277
//	- ZBlk*		-> [] of file/[]#blk
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
278
//	- BTree/Bucket	-> δ(BTree)  -> file/[]#blk
279
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
280
//	in the end after processing all []oid from invalidation message we have
281 282 283
//
//	  [] of file/[]#blk
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
284
//	that describes which file(s) parts needs to be invalidated.
285
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
286 287 288
//	FIXME no - we can build it but not in full - since we consider only zobj in live cache.
//	FIXME and even if we consider all δ'ed zobj, building complete set of
//	      file.δtail requires to first do complete scan of file.blktab
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
289
//	      which is prohibitively expensive. XXX -> no, we'll do the scan
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
290
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
291
//    4.4) for all file/blk to invalidate we do:
292
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
293
//	- try to retrieve head/bigfile/file[blk] from OS file cache(*);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
294
//	- if retrieved successfully -> store retrieved data back into OS file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
295
//	  cache for @<rev>/bigfile/file[blk], where
Kirill Smelkov's avatar
Kirill Smelkov committed
296
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
297
//	    # see below about file.δtail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
298
//	    # XXX -> file.BlkRevAt(#blk, zhead.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
299
//	    rev = max(file.δtail.by(#blk)) || min(rev ∈ file.δtail) || zhead.at
Kirill Smelkov's avatar
Kirill Smelkov committed
300
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
301
//	- invalidate head/bigfile/file[blk] in OS file cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
302 303
//
//	This preserves previous data in OS file cache in case it will be needed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
304
//	by not-yet-uptodate clients, and makes sure file read of head/bigfile/file[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
305 306 307
//	won't be served from OS file cache and instead will trigger a FUSE read
//	request to wcfs.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
308
//    4.5) no invalidation messages are sent to wcfs clients at this point(+).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
309
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
310 311
//    4.6) processing ZODB invalidations and serving file reads (see 7) are
//      organized to be mutually exclusive.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
312
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
313 314 315
// 5) after OS file cache was invalidated, we resync zhead to new database
//    view corresponding to tid.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
316
// 6) for every file δtail invalidation info about head/data is maintained:	XXX -> δFtail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
317
//
Kirill Smelkov's avatar
Kirill Smelkov committed
318 319
//	- tailv: [](rev↑, []#blk)
//	- by:    {} #blk -> []rev↑ in tail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
320
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
321 322
//    δtail.tail describes invalidations to file we learned from ZODB invalidation.
//    δtail.by   allows to quickly lookup information by #blk.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
323
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
324
//    min(rev) in δtail is min(@at) at which head/bigfile/file is currently watched (see below).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
325 326
//
//    XXX δtail can miss ...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
327
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
328 329 330
//    to support initial openings with @at being slightly in the past, we also
//    make sure that min(rev) is enough to cover last 10 minutes of history
//    from head/at.
331
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
332
// 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
333 334
//
//   7.1) load blkdata for head/bigfile/file[blk] @zhead.at .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
335 336
//
//	while loading this also gives upper bound estimate of when the block
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
337
//	was last changed:
338
//
Kirill Smelkov's avatar
Kirill Smelkov committed
339 340 341
//	  rev(blk) ≤ max(_.serial for _ in (ZBlk(#blk), all BTree/Bucket that lead to ZBlk))
//
//	it is not exact because BTree/Bucket can change (e.g. rebalance)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
342
//	but still point to the same k->ZBlk.
Kirill Smelkov's avatar
Kirill Smelkov committed
343
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
344
//	we also use file.δtail to find either exact blk revision:	XXX δFtail
Kirill Smelkov's avatar
Kirill Smelkov committed
345
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
346
//	  rev(blk) = max(file.δtail.by(#blk) -> []rev↑)
Kirill Smelkov's avatar
Kirill Smelkov committed
347
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
348
//	or another upper bound if #blk ∉ δtail:
Kirill Smelkov's avatar
Kirill Smelkov committed
349
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
350
//	  rev(blk) ≤ min(rev ∈ δtail)		; #blk ∉ δtail
Kirill Smelkov's avatar
Kirill Smelkov committed
351 352
//
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
353
//	below rev'(blk) is min(of the estimates found):
Kirill Smelkov's avatar
Kirill Smelkov committed
354 355 356
//
//	  rev(blk) ≤ rev'(blk)		rev'(blk) = min(^^^)
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
357
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
358
//	XXX we delay recomputing δFtail.LastBlkRev(file, #blk, head) because
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
359 360
//	using just cheap revmax estimate can frequently result in all watches
//	being skipped.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
361
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
362
//   7.2) for all registered client@at watches of head/bigfile/file:
363
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
364
//	- rev'(blk) ≤ at: -> do nothing
Kirill Smelkov's avatar
Kirill Smelkov committed
365
//	- rev'(blk) > at:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
366
//	  - if blk ∈ watch.pinned -> do nothing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
367
//	  - rev = max(δtail.by(#blk) : _ ≤ at)	|| min(rev ∈ δtail : rev ≤ at)	|| at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
368 369
//	  - watch.pin(file, #blk, @rev)
//	  - watch.pinned += blk
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
370 371 372
//
//	where
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
373
//	  watch.pin(file, #blk, @rev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
374 375 376 377 378 379 380
//
//	sends pin message according to "Invalidation protocol", and is assumed
//	to cause
//
//	  remmap(file, #blk, @rev/bigfile/file)
//
//	on client.
381
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
382 383
//	( one could imagine adjusting mappings synchronously via running
//	  wcfs-trusted code via ptrace that wcfs injects into clients, but ptrace
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
384
//	  won't work when client thread is blocked under pagefault or syscall(^) )
385
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
386
//	in order to support watching for each head/bigfile/file
387
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
388
//	  [] of watch{client@at↑, pinned}
389
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
390 391
//	is maintained.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
392
//   7.3) blkdata is returned to kernel.
393 394 395
//
//   Thus a client that wants latest data on pagefault will get latest data,
//   and a client that wants @rev data will get @rev data, even if it was this
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
396
//   "old" client that triggered the pagefault(~).
397
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
398
// XXX 8) serving read from @<rev>/data + zconn(s) for historical state
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
399
// XXX 9) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
400 401
//
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
402 403 404 405
// (*) see notes.txt -> "Notes on OS pagecache control"
// (+) see notes.txt -> "Invalidations to wcfs clients are delayed until block access"
// (~) see notes.txt -> "Changing mmapping while under pagefault is possible"
// (^) see notes.txt -> "Client cannot be ptraced while under pagefault"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
406
// (%) no need to keep track of ZData - ZBlk1 is always marked as changed on blk data change.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
407
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
408
// XXX For every ZODB connection a dedicated read-only transaction is maintained.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
409

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
410 411
// XXX notation
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
412 413
// δZ    - change in ZODB space
// δB    - change in BTree*s* space
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
414
// δT    - change in BTree(1) space
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
415 416
// δF    - change in File*s* space
// δfile - change in File(1) space
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
417

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
418
// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
419
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
420 421
// head.zheadMu		WLock by handleδZ
//			RLock by read
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
422
// ...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
423
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
424 425
// Head:  zheadMu.W  |  zheadMu.R + BigFileDir.fileMu
// Watch: atMu.W     |  atMu.R + pinnedMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
426
// zheadMu > Watch.atMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
427
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
428 429
// WatchLink.byfileMu	> Watch.atMu
// BigFile.watchMu      > Watch.atMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
430

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
431
import (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
432
	"bufio"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
433
	"context"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
434
	"flag"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
435
	"fmt"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
436
	"io"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
437
	stdlog "log"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
438
	"os"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
439
	"runtime"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
440
	"sort"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
441
	"strings"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
442
	"sync"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
443
	"sync/atomic"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
444
	"syscall"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
445
//	"time"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
446

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
447
	log "github.com/golang/glog"
Kirill Smelkov's avatar
Kirill Smelkov committed
448 449
	"golang.org/x/sync/errgroup"

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
450
	"lab.nexedi.com/kirr/go123/xcontext"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
451
	"lab.nexedi.com/kirr/go123/xerr"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
452

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
453
	"lab.nexedi.com/kirr/neo/go/transaction"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
454
	"lab.nexedi.com/kirr/neo/go/zodb"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
455
	"lab.nexedi.com/kirr/neo/go/zodb/btree"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
456
	_ "lab.nexedi.com/kirr/neo/go/zodb/wks"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
457 458 459

	"github.com/hanwen/go-fuse/fuse"
	"github.com/hanwen/go-fuse/fuse/nodefs"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
460
	"github.com/pkg/errors"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
461 462
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
463 464
// Root represents root of wcfs filesystem.
type Root struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
465
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
466 467 468 469 470

	// ZODB storage we work with
	zstor zodb.IStorage

	// ZODB DB handle for zstor.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
471
	// keeps cache of connections for @<rev>/ accesses.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
472
	// only one connection is used for each @<rev>.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
473 474
	zdb *zodb.DB

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
475
	// directory + ZODB connection for head/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
476
	// (zhead is Resync'ed and is kept outside zdb pool)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
477
	head *Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
478

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
479 480 481
	// directories + ZODB connections for @<rev>/
	revMu  sync.Mutex
	revTab map[zodb.Tid]*Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
482 483
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
484
// /(head|<rev>)/			- served by Head.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
485
type Head struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
486
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
487

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
488
	rev   zodb.Tid    // 0 for head/, !0 for @<rev>/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
489 490 491
	bfdir *BigFileDir // bigfile/
	// at    - served by .readAt
	// watch - implicitly linked to by fs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
492 493

	// ZODB connection for everything under this head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
494

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
495
	// zheadMu protects zconn.At & live _objects_ associated with it.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
496 497 498 499 500 501 502 503 504
	// while it is rlocked zconn is guaranteed to stay viewing database at
	// particular view.
	//
	// zwatcher write-locks this and knows noone is using ZODB objects and
	// noone mutates OS file cache while zwatcher is running.
	//
	// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
	// with additional locking protocol to avoid deadlocks (see below for
	// pauseOSCacheUpload + ...).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
505
	zheadMu sync.RWMutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
506
	zconn   *ZConn       // for head/ zwatcher resyncs head.zconn; others only read zconn objects.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
507

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
508 509 510 511 512 513 514
	// zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
	pauseOSCacheUpload    bool
	continueOSCacheUpload chan struct{}
	// uploadBlk signals to zwatcher that there are so many inflight OS cache uploads currently.
	inflightOSCacheUploads int32

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
515
	// head/watch opens
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
516
	wlinkMu  sync.Mutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
517
	wlinkTab map[*WatchLink]struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
518

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
519
	// waiters for zhead.At to become ≥ their at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
520 521
	hwaitMu sync.Mutex           // zheadMu.W  |  zheadMu.R + hwaitMu
	hwait   map[hwaiter]struct{} // set{(at, ready)}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
522 523
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
524 525
// /(head|<rev>)/bigfile/		- served by BigFileDir.
type BigFileDir struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
526
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
527
	head *Head // parent head/ or @<rev>/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
528 529

	// {} oid -> <bigfileX>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
530
	fileMu  sync.Mutex		// zheadMu.W  |  zheadMu.R + fileMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
531
	fileTab map[zodb.Oid]*BigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
532

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
533
	// δ tail of tracked BTree nodes of all BigFiles + -> which file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
534
	// (used only for head/, not revX/)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
535
	δFmu   sync.RWMutex		// zheadMu.W  |  zheadMu.R + δFmu.X
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
536
	δFtail *ΔFtail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
537 538
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
539
// /(head|<rev>)/bigfile/<bigfileX>	- served by BigFile.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
540
type BigFile struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
541
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
542

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
543
	// this BigFile is under .head/bigfile/; it views ZODB via .head.zconn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
544 545
	// parent's BigFileDir.head is the same.
	head	*Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
546

547
	// ZBigFile top-level object
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
548
	zfile	*ZBigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
549

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
550
	// things read/computed from .zfile; constant during lifetime of current transaction.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
551
	// i.e. changed under zhead.W
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
552 553 554
	blksize int64    // zfile.blksize
	size    int64    // zfile.Size()
	rev     zodb.Tid // last revision that modified zfile data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
555 556 557
			 // XXX we can't know rev fully as some later blocks could be learnt only
			 //     while populating δFtail lazily
			 // XXX or then it is not "constant during lifetime of current txn"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
558

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
559 560 561 562 563
//	// tail change history of this file.
//	//
//	// XXX computationally expensive to start - see "Invalidations to wcfs
//	// clients are delayed ..." in notes.txt
//	δtail *ΔTailI64 // [](rev↑, []#blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
564

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
565
	// inflight loadings of ZBigFile from ZODB.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
566 567 568 569
	// successful load results are kept here until blkdata is put into OS pagecache.
	//
	// Being a staging area for data to enter OS cache, loading has to be
	// consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
570
	loadMu  sync.Mutex              // zheadMu.W  |  zheadMu.R + loadMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
571
	loading map[int64]*blkLoadState // #blk -> {... blkdata}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
572

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
573 574 575 576
	// watches attached to this file.
	//
	// both watches in already "established" state (i.e. initial watch
	// request was completed and answered with "ok"), and watches in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
577
	// progress of being established are kept here.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
578
	watchMu  sync.RWMutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
579
	watchTab map[*Watch]struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
580 581 582 583 584
}

// blkLoadState represents a ZBlk load state/result.
//
// when !ready the loading is in progress.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
585
// when ready  the loading has been completed.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
586 587 588
type blkLoadState struct {
	ready chan struct{}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
589 590
	blkdata  []byte
	err      error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
591 592
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
593
// /head/watch				- served by WatchNode.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
594 595 596 597 598 599 600
type WatchNode struct {
	fsNode

	head   *Head // parent head/
	idNext int32 // ID for next opened WatchLink
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
601
// /head/watch handle			- served by WatchLink.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
602 603 604 605 606
type WatchLink struct {
	sk   *FileSock // IO channel to client
	id   int32     // ID of this /head/watch handle (for debug log)
	head *Head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
607
	// watches associated with this watch link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
608
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
609 610
	// both already established, and watches being initialized in-progress are registered here.
	// (see setupWatch)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
611 612
	// XXX byfile -> wlink-global watchMu ?
	byfileMu sync.Mutex          // zheadMu.W  |  zheadMu.R + byfileMu	(XXX recheck)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
613
	byfile   map[zodb.Oid]*Watch // {} foid -> Watch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
614 615

	// IO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
616
	reqNext uint64 // stream ID for next wcfs-originated request; 0 is reserved for control messages
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
617 618 619
	txMu    sync.Mutex
	rxMu    sync.Mutex
	rxTab   map[/*stream*/uint64]chan string // client replies go via here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
620 621
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
622
// Watch represents watching for changes to 1 BigFile over particular watch link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
623
type Watch struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
624 625
	link *WatchLink // link to client
	file *BigFile	// XXX needed?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
626

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
627 628 629 630 631 632 633 634 635 636
	// atMu, similarly to zheadMu, protects watch.at and pins associated with Watch.
	// atMu.R guarantees that watch.at is not changing, but multiple
	//        simultaneous pins could be running (used e.g. by readPinWatchers).
	// atMu.W guaraneees that only one user has watch.at write access and
	//        that no pins are running (used by setupWatch).
	atMu sync.RWMutex
	at   zodb.Tid               // requested to be watched @at

	pinnedMu sync.Mutex             // atMu.W  |  atMu.R + pinnedMu
	pinned   map[int64]*blkPinState // {} blk -> {... rev}  blocks that are already pinned to be ≤ at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
637 638 639 640 641 642 643 644 645 646 647
}

// blkPinState represents state/result of pinning one block.
//
// when !ready the pinning is in progress.
// when ready  the pinning has been completed.
type blkPinState struct {
	rev    zodb.Tid // revision to which the block is being or has been pinned

	ready  chan struct{}
	err    error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
648 649
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
650
// -------- 3) Cache invariant --------
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
651

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
652 653
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
654
// because it is essentially the index where to find ZBigFile data.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
655 656 657 658 659 660 661
//
// For the data itself - we put it to kernel pagecache and always deactivate
// from ZODB right after that.
//
// See "3) for */head/data the following invariant is maintained..."
type zodbCacheControl struct {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
662
func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolicy {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
663
	switch obj.(type) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
664
	// ZBlk* should be in cache but without data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
665
	case *ZBlk0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
666
		return zodb.PCachePinObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
667
	case *ZBlk1:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
668
		return zodb.PCachePinObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
669 670

	// ZBigFile btree index should be in cache with data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
671
	case *btree.LOBTree:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
672
		return zodb.PCachePinObject | zodb.PCacheKeepState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
673
	case *btree.LOBucket:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
674 675 676 677
		return zodb.PCachePinObject | zodb.PCacheKeepState

	// don't let ZData to pollute the cache
	case *ZData:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
678
		return zodb.PCacheDropObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
679

680 681 682
	// for performance reason we also keep ZBigFile in cache.
	//
	// ZBigFile is top-level object that is used on every block load, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
683
	// it would be a waste to evict ZBigFile from cache.
684
	case *ZBigFile:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
685
		return zodb.PCachePinObject | zodb.PCacheKeepState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
686 687
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
688
	return 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
689 690
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
691
/*
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
692 693
// -------- zhead lock/wait --------

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
694
// XXX needed?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
695 696 697 698 699
// TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock wait could be canceled
func (head *Head) zheadRLock()   { head.zheadMu.RLock() }
func (head *Head) zheadRUnlock() { head.zheadMu.RUnlock() }
func (head *Head) zheadLock()    { head.zheadMu.Lock() }
func (head *Head) zheadUnlock()  { head.zheadMu.Unlock() }
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
700
*/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
701

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
702 703
// -------- 4) ZODB invalidation -> OS cache --------

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
704
func traceZWatch(format string, argv ...interface{}) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
705 706 707 708
	if !log.V(1) {	// XXX -> 2?
		return
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
709
	log.InfoDepth(1, fmt.Sprintf("zwatcher: " + format, argv...))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
710 711
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
712
// zwatcher watches for ZODB changes.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
713
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
714
// see "4) when we receive an invalidation message from ZODB ..."
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
715
func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
716
	defer xerr.Contextf(&err, "zwatch %s", root.zstor.URL())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
717
	traceZWatch(">>>")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
718

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
719
	var zevent zodb.Event
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
720
	var ok bool
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
721 722

	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
723
		traceZWatch("select ...")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
724 725
		select {
		case <-ctx.Done():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
726
			traceZWatch("cancel")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
727 728 729 730
			return ctx.Err()

		case zevent, ok = <-zwatchq:
			if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
731
				traceZWatch("zwatchq closed")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
732 733
				return nil // closed	XXX ok?
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
734

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
735 736
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
737
		traceZWatch("zevent: %s", zevent)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
738 739 740 741 742 743 744 745 746

		switch zevent := zevent.(type) {
		default:
			return fmt.Errorf("unexpected event: %T", zevent)

		case *zodb.EventError:
			return zevent.Err

		case *zodb.EventCommit:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
747 748 749 750
			err = root.handleδZ(zevent)
			if err != nil {
				return err
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
751
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
752 753 754
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
755
// handleδZ handles 1 change event from ZODB notification.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
756 757 758
func (root *Root) handleδZ(δZ *zodb.EventCommit) (err error) {
	defer xerr.Contextf(&err, "handleδZ @%s", δZ.Tid)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
759 760
	head := root.head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
761 762
	// while we are invalidating OS cache, make sure that nothing, that
	// even reads /head/bigfile/*, is running (see 4.6).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
763 764 765 766 767 768 769 770
	//
	// also make sure that cache uploaders we spawned (uploadBlk) are all
	// paused, or else they could overwrite OS cache with stale data.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for
	// details on how to do this without deadlocks.
	continueOSCacheUpload := make(chan struct{})
retry:
	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
771
		head.zheadMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
772 773 774
		head.pauseOSCacheUpload = true
		head.continueOSCacheUpload = continueOSCacheUpload

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
775 776 777
		// NOTE need atomic load, since inflightOSCacheUploads
		// decrement is done not under zheadMu.
		if atomic.LoadInt32(&head.inflightOSCacheUploads) != 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
778
			head.zheadMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
779 780 781 782 783 784 785 786 787
			continue retry
		}

		break
	}

	defer func() {
		head.pauseOSCacheUpload = false
		head.continueOSCacheUpload = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
788
		head.zheadMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
789 790
		close(continueOSCacheUpload)
	}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
791

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
792
	// head.zheadMu wlocked and all cache uploaders are paused
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
793

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
794 795
	zhead := head.zconn
	bfdir := head.bfdir
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
796

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
797
	// invalidate kernel cache for data in changed files
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
798
	// NOTE no δFmu lock needed because zhead is WLocked
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
799
	δF := bfdir.δFtail.Update(δZ, zhead)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
800

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
801
	if false {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
802
	fmt.Printf("\n\nS: handleδZ: δF (#%d):\n", len(δF.ByFile))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
803
	for file, δfile := range δF.ByFile {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
804 805 806 807 808 809 810 811
		blkv := δfile.Blocks.Elements()
		sort.Slice(blkv, func(i, j int) bool {
			return blkv[i] < blkv[j]
		})
		size := " "
		if δfile.Size {
			size = "S"
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
812
		fmt.Printf("S: \t- %s\t%s %v\n", file.zfile.POid(), size, blkv)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
813 814
	}
	fmt.Printf("\n\n")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
815
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
816

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
817
	wg, ctx := errgroup.WithContext(context.TODO())	// XXX ctx = ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
818
	for file, δfile := range δF.ByFile {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
819 820 821
//		// XXX needed?
//		// XXX even though δBtail is complete, not all ZBlk are present here
//		file.δtail.Append(δF.Rev, δfile.Blocks.Elements())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
822

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
823
		file := file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
824
		for blk := range δfile.Blocks {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
825
			blk := blk
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
826 827 828
			wg.Go(func() error {
				return file.invalidateBlk(ctx, blk)
			})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
829
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
830
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
831
	err = wg.Wait()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
832
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
833
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
834
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
835

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
836
	// invalidate kernel cache for attributes
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
837
	// we need to do it only if we see topology (i.e. btree) change
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
838
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
839
	// do it after completing data invalidations.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
840
	wg, ctx = errgroup.WithContext(context.TODO())	// XXX ctx = ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
841
	for file, δfile := range δF.ByFile {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
842
		if !δfile.Size {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
843
			continue
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
844
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
845
		file := file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
846
		wg.Go(func() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
847
			return file.invalidateAttr() // NOTE does not accept ctx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
848
		})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
849
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
850
	err = wg.Wait()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
851
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
852
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
853
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
854

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
855
	// resync .zhead to δZ.tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
856 857
	// XXX -> Head.Resync() ?

858
	// 1. abort old and resync to new txn/at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
859
	transaction.Current(zhead.txnCtx).Abort()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
860
	_, ctx = transaction.New(context.Background()) // XXX bg ok?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
861
	err = zhead.Resync(ctx, δZ.Tid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
862
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
863
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
864
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
865
	zhead.txnCtx = ctx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
866

867
	// 2. restat invalidated ZBigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
868
	// NOTE no lock needed since .blksize and .size are constant during lifetime of one txn.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
869
	// XXX -> parallel
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
870
	for file := range δF.ByFile {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
871
		size, sizePath, err := file.zfile.Size(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
872
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
873
			return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
874 875
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
876
		file.size = size
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
877
		bfdir.δFtail.Track(file, -1, sizePath, nil)
878

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
879 880
		// XXX we can miss a change to file if δblk is not yet tracked
		//     -> need to update file.rev at read time -> locking=XXX
881
		file.rev = zhead.At()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
882 883
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
884
	// notify .wcfs/zhead
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
885
	for sk := range gdebug.zheadSockTab {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
886
		_, err := fmt.Fprintf(sk, "%s\n", δZ.Tid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
887
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
888
			log.Errorf("%s", err)	// XXX errctx + file, handle, reader pid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
889 890 891
			sk.Close()
			delete(gdebug.zheadSockTab, sk)
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
892
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
893 894

	// XXX δFtail.ForgetPast(...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
895
	// XXX for f in δF: f.δtail.ForgetPast(...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
896 897 898 899 900 901 902 903

	// notify zhead.At waiters
	for w := range head.hwait {
		if w.at <= δZ.Tid {
			delete(head.hwait, w)
			close(w.ready)
		}
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
904 905

	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
906 907 908 909 910 911 912 913 914 915
}

// hwaiter represents someone waiting for zhead to become ≥ at.
type hwaiter struct {
        at    zodb.Tid
        ready chan struct{}
}

// zheadWait waits till head.zconn.At becomes ≥ at.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
916
// It returns error either if wcfs is down or ctx is canceled.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
917 918 919 920 921 922 923
func (head *Head) zheadWait(ctx context.Context, at zodb.Tid) (err error) {
	defer xerr.Contextf(&err, "wait zhead ≥ %s", at)

	if head.rev != 0 {
		panic("must be called only for head/, not @revX/")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
924 925
	// XXX check wcfs.down

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
926 927 928 929
	// check if zhead is already ≥ at
	head.zheadMu.RLock()
	if head.zconn.At() >= at {
		head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
930
		return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947
	}

	// no - we have to wait for it
	ready := make(chan struct{})
	head.hwaitMu.Lock()
	head.hwait[hwaiter{at, ready}] = struct{}{}
	head.hwaitMu.Unlock()

	head.zheadMu.RUnlock()

	select {
	case <-ctx.Done():
		return ctx.Err()

	case <-ready:
		return nil // ok - zhead.At went ≥ at
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
948 949
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
950
// invalidateBlk invalidates 1 file block in kernel cache.
951
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
952
// see "4.4) for all file/blk to in invalidate we do"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
953
// called with zheadMu wlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
954 955 956
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
	defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)

957
	fsconn := gfsconn
958
	blksize := f.blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
959 960
	off := blk*blksize

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
961 962 963 964
	var blkdata []byte = nil

	// first try to retrieve f.loading[blk];
	// make sure f.loading[blk] is invalidated.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
965
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
966
	// we are running with zheadMu wlocked - no need to lock f.loadMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
967 968 969 970 971 972 973 974
	loading, ok := f.loading[blk]
	if ok {
		if loading.err == nil {
			blkdata = loading.blkdata
		}
		delete(f.loading, blk)
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
975
	// TODO skip retrieve/store if len(f.watchTab) == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
976 977 978 979 980
	// try to retrieve cache of current head/data[blk], if we got nothing from f.loading
	if blkdata == nil {
		blkdata = make([]byte, blksize)
		n, st := fsconn.FileRetrieveCache(f.Inode(), off, blkdata)
		if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
981
			// XXX warn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
982 983 984 985
		}
		blkdata = blkdata[:n]
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
986 987 988 989
	// if less than blksize was cached - probably the kernel had to evict
	// some data from its cache already. In such case we don't try to
	// preserve the rest and drop what was read, to avoid keeping the
	// system overloaded.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
990 991
	//
	// if we have the data - preserve it under @revX/bigfile/file[blk].
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
992
	if int64(len(blkdata)) == blksize {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
993 994
		func() {
			// store retrieved data back to OS cache for file @<rev>/file[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
995
			blkrev, _ := f.LastBlkRev(ctx, blk, f.head.zconn.At())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
996
			frev, funlock, err := groot.lockRevFile(blkrev, f.zfile.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
997 998 999
			if err != nil {
				log.Errorf("BUG: %s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1000
			defer funlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1001 1002 1003

			st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
			if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1004
				log.Errorf("BUG: %s: invalidate blk #%d: %s: store cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, frev.path(), st)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1005 1006
			}
		}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1007
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1008

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1009
	// invalidate file/head/data[blk] in OS file cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1010 1011
	st := fsconn.FileNotify(f.Inode(), off, blksize)
	if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1012
		return syscall.Errno(st)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1013
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1014

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1015
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1016
}
1017

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1018
// invalidateAttr invalidates file attributes in kernel cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1019
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1020 1021
// complements invalidateBlk and is used to invalidate file size.
// called with zheadMu wlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1022
func (f *BigFile) invalidateAttr() (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1023
	defer xerr.Contextf(&err, "%s: invalidate attr", f.path())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1024 1025
	fsconn := gfsconn
	st := fsconn.FileNotify(f.Inode(), -1, -1) // metadata only
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1026 1027 1028 1029
	if st != fuse.OK {
		return syscall.Errno(st)
	}
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1030 1031
}

1032

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1033 1034
// lockRevFile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel
// and won't change until unlock.
1035 1036
//
// We need node ID to be know to the kernel, when we need to store data into
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1037
// file's kernel cache - if the kernel don't have the node ID for the file in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1038
// question, FileNotifyStoreCache will just fail.
1039
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1040
// For kernel to know the inode lockRevFile issues regular filesystem lookup
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1041
// request which goes to kernel and should go back to wcfs. It is thus not safe
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1042
// to use lockRevFile from under FUSE request handler as doing so might deadlock.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1043
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1044 1045 1046
// Caller must call unlock when inode ID is no longer required to be present.
// It is safe to simultaneously call multiple lockRevFile with the same arguments.
func (root *Root) lockRevFile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, unlock func(), err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1047 1048 1049
	fsconn := gfsconn

	frevpath := fmt.Sprintf("@%s/bigfile/%s", rev, fid) // relative to fs root for now
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1050
	defer xerr.Contextf(&err, "/: lockRevFile %s", frevpath)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1051

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1052 1053 1054 1055
	// FIXME checking for "node{0}" is fragile:
	// XXX the node could be still forgotten since we are not holding open on it
	// XXX -> always os.open unconditionally for now
	//        or is it ok since it is just a cache?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1056 1057 1058 1059 1060
	//	  -> no, not ok: if inode ID is forgotten, the same ID could be
	//	     reallocated to another file and then we'll corrupt in-kernel
	//	     cache by wrongly storing data of one file into cache of
	//	     another file.
	//	     -> to avoid this we need to always lock the inode ID with real open.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1061 1062
	// XXX (also disabled for now due to race-detector)
/*
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1063
	// first check without going through kernel, whether the inode maybe known already
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1064 1065 1066
	xfrev := fsconn.LookupNode(root.Inode(), frevpath)
	if xfrev != nil {
		if xfrev.String() != "node{0}" {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1067
			return xfrev.Node().(*BigFile), func(){}, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1068 1069
		}
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1070
*/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1071 1072 1073 1074 1075

	// we have to ping the kernel
	frevospath := gmntpt + "/" + frevpath // now starting from OS /
	f, err := os.Open(frevospath)
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1076
		return nil, nil, err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1077 1078
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1079
	xfrev := fsconn.LookupNode(root.Inode(), frevpath)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1080
	// must be !nil as open succeeded
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1081
	return xfrev.Node().(*BigFile), func() { f.Close() }, nil
1082
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1083

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1084 1085 1086 1087
// -------- 7) FUSE read(#blk) --------

// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1088 1089
	f.head.zheadMu.RLock()
	defer f.head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1090 1091 1092 1093 1094 1095 1096

	// cap read request to file size
	end := off + int64(len(dest))		// XXX overflow?
	if end > f.size {
		end = f.size
	}
	if end <= off {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1097 1098
		// the kernel issues e.g. [0 +4K) read for f.size=0 and expects to get (0, ok)
		// POSIX also says to return 0 if off >= f.size
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1099 1100 1101 1102 1103 1104 1105 1106 1107 1108
		return fuse.ReadResultData(nil), fuse.OK
	}

	// widen read request to be aligned with blksize granularity
	// (we can load only whole ZBlk* blocks)
	aoff := off - (off % f.blksize)
	aend := end
	if re := end % f.blksize; re != 0 {
		aend += f.blksize - re
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1109
	// XXX use original dest if it can fit the data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1110 1111 1112
	dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file

	// XXX better ctx = transaction.PutIntoContext(ctx, txn)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1113
	ctx, cancel := xcontext.Merge(fctx, f.head.zconn.txnCtx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
	defer cancel()

	// read/load all block(s) in parallel
	wg, ctx := errgroup.WithContext(ctx)
	for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
		blkoff := blkoff
		blk := blkoff / f.blksize
		wg.Go(func() error {
			δ := blkoff-aoff // blk position in dest
			//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
			return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
		})
	}

	err := wg.Wait()
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1130
		return nil, err2LogStatus(err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
	}

	return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK
}

// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
//
// see "7) when we receive a FUSE read(#blk) request ..." in overview.
//
// len(dest) == blksize.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1141
// called with head.zheadMu rlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1142 1143
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) {
	defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163

	// check if someone else is already loading this block
	f.loadMu.Lock()
	loading, already := f.loading[blk]
	if !already {
		loading = &blkLoadState{
			ready:   make(chan struct{}),
		}
		f.loading[blk] = loading
	}
	f.loadMu.Unlock()

	// if it is already loading - just wait for it
	if already {
		select {
		case <-ctx.Done():
			return ctx.Err()

		case <-loading.ready:
			if loading.err == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1164
				copy(dest, loading.blkdata)	// XXX copy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1165 1166 1167 1168 1169 1170
			}
			return loading.err
		}
	}

	// noone was loading - we became responsible to load this block
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1171
	blkdata, treepath, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1172 1173 1174 1175 1176
	loading.blkdata = blkdata
	loading.err = err

	// data loaded with error - cleanup .loading
	if loading.err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1177
		close(loading.ready)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1178 1179 1180 1181 1182 1183
		f.loadMu.Lock()
		delete(f.loading, blk)
		f.loadMu.Unlock()
		return err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1184
	// we have the data - it can be used after watchers are updated
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1185 1186
	// XXX should we use ctx here? (see readPinWatchers comments)
	f.readPinWatchers(ctx, blk, treepath, zblk, blkrevMax)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1187 1188 1189 1190

	// data can be used now
	close(loading.ready)
	copy(dest, blkdata)	// XXX copy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207

	// store to kernel pagecache whole block that we've just loaded from database.
	// This way, even if the user currently requested to read only small portion from it,
	// it will prevent next e.g. consecutive user read request to again hit
	// the DB, and instead will be served by kernel from its pagecache.
	//
	// We cannot do this directly from reading goroutine - while reading
	// kernel FUSE is holding corresponding page in pagecache locked, and if
	// we would try to update that same page in pagecache it would result
	// in deadlock inside kernel.
	//
	// .loading cleanup is done once we are finished with putting the data into OS pagecache.
	// If we do it earlier - a simultaneous read covered by the same block could result
	// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
	// and thus would trigger DB access again.
	//
	// XXX if direct-io: don't touch pagecache
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1208 1209
	// XXX upload parts only not covered by currrent read (not to e.g. wait for page lock)
	// XXX skip upload completely if read is wide to cover whole blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1210 1211 1212 1213 1214
	go f.uploadBlk(blk, loading)

	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1215
// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1216 1217 1218
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
	head := f.head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1219
	// rlock zheadMu and make sure zwatcher is not asking us to pause.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1220 1221 1222 1223
	// if it does - wait for a safer time not to deadlock.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry:
	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1224
		head.zheadMu.RLock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1225
		// help zwatcher if it asks us to pause uploadings, so it can
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1226
		// take zheadMu wlocked without deadlocks.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1227 1228
		if head.pauseOSCacheUpload {
			ready := head.continueOSCacheUpload
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1229
			head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1230 1231 1232 1233 1234 1235 1236
			<-ready
			continue retry
		}

		break
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1237
	// zheadMu rlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1238 1239 1240 1241 1242 1243 1244 1245
	// zwatcher is not currently trying to pause OS cache uploads.

	// check if this block was already invalidated by zwatcher.
	// if so don't upload the block into OS cache.
	f.loadMu.Lock()
	loading_ := f.loading[blk]
	f.loadMu.Unlock()
	if loading != loading_ {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1246
		head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1247 1248 1249 1250 1251 1252
		return
	}

	oid := f.zfile.POid()

	// signal to zwatcher not to run while we are performing the upload.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1253
	// upload with released zheadMu so that zwatcher can lock it even if to
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1254 1255
	// check inflightOSCacheUploads status.
	atomic.AddInt32(&head.inflightOSCacheUploads, +1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1256
	head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270

	st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)

	f.loadMu.Lock()
	bug := (loading != f.loading[blk])
	if !bug {
		delete(f.loading, blk)
	}
	f.loadMu.Unlock()

	// signal to zwatcher that we are done and it can continue.
	atomic.AddInt32(&head.inflightOSCacheUploads, -1)

	if bug {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1271
		panicf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284
	}

	if st == fuse.OK {
		return
	}

	// pagecache update failed, but it must not (we verified on startup that
	// pagecache control is supported by kernel). We can correctly live on
	// with the error, but data access will be likely very slow. Tell user
	// about the problem.
	log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s  (ignoring, but reading from bigfile will be very slow)", oid, blk, st)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1285
// -------- invalidation protocol notification/serving --------
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1286 1287
//
// (see "7.2) for all registered client@at watchers ...")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1288

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1289 1290
// pin makes sure that file[blk] on client side is the same as of @rev state.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1291 1292
// rev = zodb.TidMax means @head; otherwise rev must be ≤ w.at and there must
// be no rev_next changing file[blk]: rev < rev_next ≤ w.at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1293
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1294 1295
// must be called with atMu rlocked.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1296
// XXX error - when? or close watch on any error?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1297
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
1298 1299 1300 1301 1302
	defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid())
	return w._pin(ctx, blk, rev)
}

func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1303
	foid := w.file.zfile.POid()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1304 1305 1306 1307
	revstr := rev.String()
	if rev == zodb.TidMax {
		revstr = "head"
	}
1308
	defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1309

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1310
	if !(rev == zodb.TidMax || rev <= w.at) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1311
		panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev",
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1312
			foid, w.link.id, blk, rev, w.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1313 1314
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1315 1316
	w.pinnedMu.Lock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1317 1318 1319 1320 1321 1322 1323 1324
	// check/wait for previous/simultaneous pin.
	// (pin could be called simultaneously e.g. by setupWatch and readPinWatchers)
	for {
		blkpin := w.pinned[blk]
		if blkpin == nil {
			break
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1325
		w.pinnedMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1326 1327 1328 1329 1330 1331 1332 1333 1334
		<-blkpin.ready	// XXX + ctx ? (or just keep ready ?)

		if blkpin.rev == rev {
			// already pinned
			// (e.g. os cache for block was evicted and read called the second time)
			return blkpin.err
		}

		// relock the watch and check that w.pinned[blk] is the same. Retry if it is not.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1335
		// ( w.pinned[blk] could have changed while w.mu was not held e.g. by	XXX recheck
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1336
		//   simultaneous setupWatch if we were called by readPinWatchers )
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1337
		w.pinnedMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1338 1339
		if blkpin == w.pinned[blk] {
			if blkpin.rev == zodb.TidMax {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1340
				w.pinnedMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1341 1342 1343
				panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk)
			}
			break
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1344
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1345 1346
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1347
	// w.pinnedMu locked & previous pin is either nil or completed and its .rev != rev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1348 1349 1350 1351
	// -> setup new pin state
	blkpin := &blkPinState{rev: rev, ready: make(chan struct{})}
	w.pinned[blk] = blkpin

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1352 1353
	// perform IO without w.pinnedMu
	w.pinnedMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1354
	ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1355
	w.pinnedMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1356 1357 1358 1359 1360 1361 1362

	// check IO reply & verify/signal blkpin is ready
	defer func() {
		if rev == zodb.TidMax {
			delete(w.pinned, blk)
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1363
		w.pinnedMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1364 1365 1366 1367
		close(blkpin.ready)
	}()


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1368
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1369
		blkpin.err = err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1370 1371 1372 1373
		return err
	}

	if ack != "ack" {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1374 1375
		blkpin.err = fmt.Errorf("expect %q; got %q", "ack", ack)
		return blkpin.err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1376 1377
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1378 1379
	if blkpin != w.pinned[blk] {
		blkpin.err = fmt.Errorf("BUG: pinned[#%d] mutated while doing IO", blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1380
		panicf("f<%s>: wlink%d: %s", foid, w.link.id, blkpin.err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1381 1382
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1383 1384
	return nil
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1385

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1386
// readPinWatchers complements readBlk: it sends `pin blk` for watchers of the file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1387
// after a block was loaded from ZODB and before block data is returned to kernel.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1388 1389 1390 1391 1392 1393 1394
//
// See "7.2) for all registered client@at watchers ..."
//
// Called with f.head.zheadMu rlocked.
//
// XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1395
// Should a READ interrupt cause watch update failure? -> probably no
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1396
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblk zBlk, blkrevMax zodb.Tid) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1397 1398 1399 1400 1401
	// only head/ is being watched for
	if f.head.rev != 0 {
		return
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1402
	fmt.Printf("S: read #%d -> pin watchers (#%d)\n", blk, len(f.watchTab))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419

	// update δFtail index
	bfdir := f.head.bfdir
	bfdir.δFmu.Lock()		// XXX locking correct? XXX -> better push down?
	bfdir.δFtail.Track(f, blk, treepath, zblk)	// XXX pass in zblk.rev here?
	bfdir.δFmu.Unlock()

	// makes sure that file[blk] on clients side stays as of @w.at state.

	// try to use blkrevMax only as the first cheap criteria to skip updating watchers.
	// This is likely to be the case, since most watchers should be usually close to head.
	// If using blkrevMax only turns out to be not sufficient, we'll
	// consult δFtail, which might involve recomputing it.
	blkrev := blkrevMax
	blkrevRough := true

	wg, ctx := errgroup.WithContext(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1420 1421 1422

	f.watchMu.RLock()
	for w := range f.watchTab {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1423 1424
		w := w

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1425
		// make sure w.at stays unchanged while we prepare and pin the block
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1426
		w.atMu.RLock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1427

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1428
		fmt.Printf("S: read -> pin watchers: w @%s\n", w.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1429 1430

		// the block is already covered by @w.at database view
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1431 1432
		if blkrev <= w.at {
			w.atMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1433 1434 1435 1436 1437 1438
			continue
		}

		// if blkrev is rough estimation and that upper bound is > w.at
		// we have to recompute ~exact file[blk] revision @head.
		if blkrevRough {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1439 1440 1441 1442
			// unlock atMu while we are (re-)calculating blkrev
			// we'll relock atMu again and recheck blkrev vs w.at after.
			w.atMu.RUnlock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1443 1444 1445
			blkrev, _ = f.LastBlkRev(ctx, blk, f.head.zconn.At())
			blkrevRough = false

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1446 1447 1448
			w.atMu.RLock()
			if blkrev <= w.at {
				w.atMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1449 1450 1451 1452 1453 1454 1455 1456 1457 1458
				continue
			}
		}

		// the block is newer - find out its revision as of @w.at and pin to that.
		//
		// We don't pin to w.at since if we would do so for several clients,
		// and most of them would be on different w.at - cache of the file will
		// be lost. Via pinning to particular block revision, we make sure the
		// revision to pin is the same on all clients, and so file cache is shared.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1459
		pinrev, _ := w.file.LastBlkRev(ctx, blk, w.at)	// XXX move into go?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1460 1461

		wg.Go(func() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1462
			defer w.atMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1463
			// XXX close watcher on any error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1464 1465 1466
			return w.pin(ctx, blk, pinrev)
		})
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1467 1468
	f.watchMu.RUnlock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1469 1470 1471 1472 1473 1474
	err := wg.Wait()
	if err != nil {
		panic(err)	// XXX
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1475
// setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1476
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1477 1478
// XXX sends "pin" notifications; final "ok" must be sent by caller.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1479
// XXX called synchronously - only 1 setupWatch call at a time?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1480
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1481
	defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1482 1483 1484
	head := wlink.head
	bfdir := head.bfdir

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1485 1486 1487 1488 1489 1490 1491 1492 1493
	// wait for zhead.At ≥ at
	if at != zodb.InvalidTid {
		err = head.zheadWait(ctx, at)
		if err != nil {
			return err
		}
	}

	// make sure zhead.At stays unchanged while we are preparing the watch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1494
	// (see vvv e.g. about unpin to @head for why it is needed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1495 1496 1497 1498
	head.zheadMu.RLock()
	defer head.zheadMu.RUnlock()
	headAt := head.zconn.At()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1499
	// XXX δFtail locking? (or ForgetPast is called only with zheadMu.W ?)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1500
	if at != zodb.InvalidTid && at < bfdir.δFtail.Tail() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1501 1502 1503
		return fmt.Errorf("too far away back from head/at (@%s); δt = %s",
			headAt, headAt.Time().Sub(at.Time().Time))
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1504

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1505 1506
	// if watch was already established - we need to update it
	w := wlink.byfile[foid]	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1507 1508
	if w == nil {
		// watch was not previously established - set it up anew
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1509
		f := bfdir.fileTab[foid]	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1510 1511
		if f == nil {
			// by "invalidation protocol" watch is setup after data file was opened
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1512
			return fmt.Errorf("file not yet known to wcfs or is not a ZBigFile")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1513 1514 1515 1516 1517 1518
		}

		w = &Watch{
			link:   wlink,
			file:   f,
			at:     at,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1519
			pinned: make(map[int64]*blkPinState),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1520
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1521 1522
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1523 1524 1525
	f := w.file
	f.watchMu.Lock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1526
	// at="-" (InvalidTid) means "remove the watch"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1527
	if at == zodb.InvalidTid {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1528
		delete(wlink.byfile, foid)	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1529 1530
		delete(f.watchTab, w)
		f.watchMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1531
		return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1532 1533
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1534 1535 1536 1537 1538
	// request exclusive access to the watch to change .at and compute pins.
	// The lock will be downgraded from W to R after pins computation is done.
	// Pins will be executed with atMu.R only - with the idea not to block
	// other client that read-access the file simultaneously to setupWatch.
	w.atMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1539

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1540 1541
	// check at >= w.at
	// XXX we might want to allow going back in history if we need it.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1542 1543
	if !(at >= w.at) {
		w.atMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1544
		f.watchMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1545
		return fmt.Errorf("going back in history is forbidden")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1546 1547
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1548
	// register w to f early, so that READs going in parallel to us
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1549 1550
	// preparing and processing initial pins, also send pins to w for read
	// blocks. If we don't, we can miss to send pin to w for a freshly read
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1551
	// block which could have revision > w.at:	XXX test
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1552 1553
	//
	//                1   3    2   4
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1554
	//	-----.----x---o----x---x------]----------
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1555 1556
	//           ↑                        ↑
	//          w.at                     head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1557
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1558 1559 1560 1561 1562
	// Here blocks #1, #2 and #4 were previously accessed, are thus tracked
	// by δFtail and are changed after w.at - they will be returned by vvv
	// δFtail query and pin-sent to w. Block #3 was not yet accessed but
	// was also changed after w.at . As head/file[#3] might be accessed
	// simultaneously to watch setup, and f.readBlk will be checking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1563
	// f.watchTab; if w ∉ f.watchTab at that moment, w will miss to receive
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1564
	// pin for #3.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1565
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1566
	// NOTE for `unpin blk` to -> @head we can be sure there won't be
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1567
	// simultaneous `pin blk` request, because:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1568 1569 1570 1571 1572 1573 1574
	//
	// - unpin means blk was previously pinned,
	// - blk was pinned means it is tracked by δFtail,
	// - if blk is tracked and δFtail says there is no δblk ∈ (at, head],
	//   there is indeed no blk change in that region,
	// - which means that δblk with rev > w.at might be only > head,
	// - but such δblk are processed with zhead wlocked and we keep zhead
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1575
	//   rlocked during pin setup.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1576 1577 1578 1579 1580 1581
	//
	//          δ                      δ
	//      ----x----.------------]----x----
	//               ↑            ↑
	//              w.at         head
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1582 1583 1584
	// - also: there won't be simultaneous READs that would need to be
	//   unpinned, because we update w.at to requested at early.
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1585
	// XXX register only if watch was created anew, not updated?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1586
	w.at = at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1587
	// NOTE registering f.watchTab[w] and wlink.byfile[foid] = w must come together.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1588
	f.watchTab[w] = struct{}{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1589
	wlink.byfile[foid] = w		// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1590
	f.watchMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1591

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1592
	// XXX defer -> unregister watch if error?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1593

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1594
	// pin all tracked file blocks that were changed in (at, head] range.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1595
	toPin := map[int64]zodb.Tid{} // blk -> @rev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1596

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1597
	for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) {	// XXX locking δFtail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1598
		for blk := range δfile.Blocks {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1599 1600 1601 1602
			_, already := toPin[blk]
			if already {
				continue
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1603

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1604 1605 1606 1607 1608
			toPin[blk], _ = f.LastBlkRev(ctx, blk, at)
		}
	}

	// if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1609
	for blk, pinPrev := range w.pinned {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1610 1611
		// only 1 setupWatch can be run simultaneously for one file
		// XXX assert pinPrev.rev != zodb.TidMax
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1612

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1613 1614 1615 1616 1617
		pinNew, pinning := toPin[blk]
		if !pinning {
			toPin[blk] = zodb.TidMax // @head
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1618 1619 1620 1621 1622 1623
		// TODO don't bother to spawn .pin goroutines if pin revision is the same ?
		// if pinNew == pinPrev.rev && ready(pinPrev.ready) && pinPrev.err == nil {
		// 	delete(toPin, blk)
		// }
		_ = pinPrev
		_ = pinNew
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1624
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1625

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1626
	// downgrade atMu.W -> atMu.R to let other clients to access the file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1627 1628 1629 1630 1631 1632 1633
	// XXX there is no primitive to do Wlock->Rlock atomically, but we are
	// ok with that since we prepared eveyrhing to handle simultaneous pins
	// from other reads.
	w.atMu.Unlock()
	w.atMu.RLock()
	defer w.atMu.RUnlock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1634 1635 1636 1637 1638
	wg, ctx := errgroup.WithContext(ctx)
	for blk, rev := range toPin {
		blk := blk
		rev := rev
		wg.Go(func() error {
1639
			return w._pin(ctx, blk, rev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1640 1641 1642 1643
		})
	}
	err = wg.Wait()
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1644
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1645
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1646

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1647
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1648 1649
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1650
// Open serves /head/watch opens.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1651
func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1652
	// XXX check flags?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1653
	head := wnode.head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1654

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1655
	wlink := &WatchLink{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1656 1657 1658 1659 1660
		sk:     NewFileSock(),
		id:     atomic.AddInt32(&wnode.idNext, +1),
		head:   head,
		byfile: make(map[zodb.Oid]*Watch),
		rxTab:  make(map[uint64]chan string),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1661 1662
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1663
	head.wlinkMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1664 1665
	// XXX del wlinkTab[w] on w.sk.File.Release
	head.wlinkTab[wlink] = struct{}{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1666
	head.wlinkMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1667

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1668
	go wlink.serve()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1669
	return wlink.sk.File(), fuse.OK
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1670 1671
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1672 1673 1674 1675
// serve serves client initiated watch requests and routes client replies to
// wcfs initiated pin requests.
func (wlink *WatchLink) serve() {
	err := wlink._serve()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1676
	// XXX log error if !close
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1677 1678 1679
	if err != nil {
		log.Error(err)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1680 1681 1682 1683 1684

	head := wlink.head
	head.wlinkMu.Lock()
	delete(head.wlinkTab, wlink)
	head.wlinkMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1685 1686
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1687
func (wlink *WatchLink) _serve() (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1688 1689
	defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
	r := bufio.NewReader(wlink.sk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1690

1691
	ctx0 := context.TODO()	// XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1692

1693
	ctx, cancel := context.WithCancel(ctx0)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1694 1695 1696
	wg, ctx := errgroup.WithContext(ctx)

	defer func() {
1697 1698 1699 1700 1701 1702
		// cancel all handlers on both error and ok return.
		// ( ok return is e.g. when we received "bye", so if client
		//   sends "bye" and some pin handlers are in progress - they
		//   anyway don't need to wait for client replies anymore )
		cancel()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1703 1704 1705 1706
		err2 := wg.Wait()
		if err == nil {
			err = err2
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1707

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1708
		// unregister all watches created on this wlink
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1709
		for _, w := range wlink.byfile {	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1710 1711 1712
			w.file.watchMu.Lock()
			delete(w.file.watchTab, w)
			w.file.watchMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1713 1714 1715
		}
		wlink.byfile = nil

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1716 1717 1718
		// write to peer if it was logical error on client side
		// then .sk.tx to wakeup rx on client side
		if err != nil {
1719
			_ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1720
		}
1721 1722

		// close .sk.tx : this wakes up rx on client side.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1723 1724 1725 1726
		err2 = wlink.sk.CloseWrite()
		if err == nil {
			err = err2
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1727 1728
	}()

1729
	// close .sk.rx on error/wcfs stopping or return: this wakes up read(sk).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1730 1731 1732
	retq := make(chan struct{})
	defer close(retq)
	wg.Go(func() error {
1733 1734 1735 1736 1737
		// monitor is always canceled - either at parent ctx cancel, or
		// upon return from serve (see "cancel all handlers ..." ^^^).
		// If it was return - report returned error to wg.Wait, not "canceled".
		<-ctx.Done()
		e := ctx.Err()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1738
		select {
1739
		default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1740 1741 1742 1743
		case <-retq:
			e = err // returned error
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1744
		e2 := wlink.sk.CloseRead()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1745 1746 1747 1748 1749 1750 1751 1752 1753 1754
		if e == nil {
			e = e2
		}
		return e
	})

	// allow for only 1 watch request at a time
	// TODO allow simultaneous watch requests for different files
	var handlingWatch int32

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1755
	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1756
		l, err := r.ReadString('\n')	// XXX limit accepted line len to prevent DOS
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1757
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1758 1759 1760 1761
			// r.Read is woken up by sk.CloseRead when serve decides to exit
			if err == io.ErrClosedPipe {
				err = nil
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1762
			return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1763 1764
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1765
		fmt.Printf("S: wlink %d: rx: %q\n", wlink.id, l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1766

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1767
		stream, msg, err := parseWatchFrame(l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1768
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1769
			return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1770 1771
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1772
		// reply from client to wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1773 1774
		reply := (stream % 2 == 0)
		if reply {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1775 1776 1777 1778
			wlink.rxMu.Lock()
			rxq := wlink.rxTab[stream]
			delete(wlink.rxTab, stream)
			wlink.rxMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1779 1780

			if rxq == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1781
				return fmt.Errorf("%d: reply on unexpected stream", stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1782
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1783 1784 1785
			rxq <- msg
			continue
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1786

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1787 1788 1789 1790 1791 1792 1793 1794
		// client-initiated request

		// bye		TODO document in "Invalidation protocol"
		if msg == "bye" {
			return nil // deferred sk.Close will wake-up rx on client side
		}

		// watch ...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1795 1796
		if atomic.LoadInt32(&handlingWatch) != 0 {
			return fmt.Errorf("%d: another watch request is already in progress", stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1797
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1798 1799 1800 1801 1802
		atomic.StoreInt32(&handlingWatch, 1)
		wg.Go(func() error {
			defer atomic.StoreInt32(&handlingWatch, 0)
			return wlink.handleWatch(ctx, stream, msg)
		})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1803 1804
	}
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1805

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1806
// handleWatch handles watch request from client.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1807 1808
//
// returned error comes without full error prefix.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1809
func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg string) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1810
	defer xerr.Contextf(&err, "%d", stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1811

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1812 1813
	err = wlink._handleWatch(ctx, msg)
	reply := "ok"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1814
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1815
		// logical error is reported back to client, but watch link remains live
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1816
		reply = fmt.Sprintf("error %s", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1817
		err = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1818
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1819

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1820
	err = wlink.send(ctx, stream, reply)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1821 1822
	return err
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1823

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1824 1825
func (wlink *WatchLink) _handleWatch(ctx context.Context, msg string) error {
	foid, at, err := parseWatch(msg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1826 1827
	if err != nil {
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1828
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1829

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1830 1831
	err = wlink.setupWatch(ctx, foid, at)
	return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1832
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1833

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1834
// sendReq sends wcfs-originated request to client and returns client response.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1835
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1836
	// XXX err ctx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1837 1838 1839 1840
	var stream uint64
	for stream == 0 {
		stream = atomic.AddUint64(&wlink.reqNext, +2)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1841 1842

	rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1843 1844 1845
	wlink.rxMu.Lock()
	wlink.rxTab[stream] = rxq	// XXX assert .stream is not there?
	wlink.rxMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1846

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1847
	err = wlink.send(ctx, stream, req)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1848 1849 1850 1851 1852 1853
	if err != nil {
		return "", err
	}

	select {
	case <-ctx.Done():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1854
		// XXX del rxTab[stream] ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1855 1856 1857 1858 1859 1860 1861
		return "", ctx.Err()

	case reply = <-rxq:
		return reply, nil
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1862
// send sends a message to client over specified stream ID.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873
//
// Multiple send can be called simultaneously; send serializes writes.
func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) error {
	// XXX err ctx
	// XXX assert '\n' not in msg

	wlink.txMu.Lock()
	defer wlink.txMu.Unlock()

	// XXX timeout write on ctx cancel
	pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1874
	fmt.Printf("S: wlink %d: tx: %q\n", wlink.id, pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1875 1876 1877 1878 1879 1880 1881 1882
	_, err := wlink.sk.Write(pkt)
	if err != nil {
		return err
	}

	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1883

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1884
// ---- Lookup ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1885

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1886
// /(head|<rev>)/bigfile/ -> Lookup receives client request to create /(head|<rev>)/bigfile/<bigfileX>.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1887
func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1888
	f, err := bfdir.lookup(out, name, fctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1889 1890 1891 1892
	var inode *nodefs.Inode
	if f != nil {
		inode = f.Inode()
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1893 1894 1895 1896
	return inode, err2LogStatus(err)

}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1897
func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (f *BigFile, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1898
	defer xerr.Contextf(&err, "%s: lookup %q", bfdir.path(), name)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1899 1900 1901 1902 1903 1904

	oid, err := zodb.ParseOid(name)
	if err != nil {
		return nil, eINVALf("not oid")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1905 1906
	bfdir.head.zheadMu.RLock()
	defer bfdir.head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1907 1908 1909 1910 1911 1912 1913

	defer func() {
		if f != nil {
			f.getattr(out)
		}
	}()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1914
	// check to see if dir(oid) is already there
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1915
	bfdir.fileMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1916
	f, already := bfdir.fileTab[oid]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1917
	bfdir.fileMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1918 1919

	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1920
		return f, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1921 1922
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1923
	// not there - without bfdir lock proceed to open BigFile from ZODB
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1924
	f, err = bfdir.head.bigopen(fctx, oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1925 1926 1927
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1928

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1929
	// relock bfdir and either register f or, if the file was maybe
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1930
	// simultaneously created while we were not holding bfdir.fileMu, return that.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1931
	bfdir.fileMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1932
	f2, already := bfdir.fileTab[oid]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1933
	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1934
		bfdir.fileMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1935
		f.Close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1936
		return f2, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1937 1938
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1939
	bfdir.fileTab[oid] = f
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1940
	bfdir.fileMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1941

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1942
	// mkfile takes filesystem treeLock - do it outside bfdir.fileMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1943
	mkfile(bfdir, name, f)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1944

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1945
	return f, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1946 1947
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1948 1949 1950 1951 1952 1953 1954 1955
// / -> Lookup receives client request to create @<rev>/.
func (root *Root) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
	revd, err := root.lookup(name, fctx)
	var inode *nodefs.Inode
	if revd != nil {
		inode = revd.Inode()
		_ = revd.GetAttr(out, nil, fctx) // always ok
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1956 1957 1958
	return inode, err2LogStatus(err)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1959
func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1960
	defer xerr.Contextf(&err, "/: lookup %q", name)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1961

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1962
	var rev zodb.Tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1963 1964 1965
	ok := false

	if strings.HasPrefix(name, "@") {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1966
		rev, err = zodb.ParseTid(name[1:])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1967 1968 1969
		ok = (err == nil)
	}
	if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1970
		return nil, eINVALf("not @rev")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1971 1972
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1973
	// check to see if dir(rev) is already there
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1974
	root.revMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1975
	revDir, already := root.revTab[rev]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1976
	root.revMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1977 1978

	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1979
		return revDir, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1980 1981
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1982
	// not there - without revMu lock proceed to open @rev view of ZODB
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1983 1984
//	zconnRev, err := root.zopenAt(fctx, rev)
	zconnRev, err := zopen(fctx, root.zdb, &zodb.ConnOptions{At: rev})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1985 1986 1987 1988
	if err != nil {
		return nil, err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1989 1990 1991
	// relock root and either register new revX/ directory or, if the
	// directory was maybe simultaneously created while we were not holding
	// revMu, return that.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1992
	root.revMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1993
	revDir, already = root.revTab[rev]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1994
	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1995 1996 1997
		root.revMu.Unlock()
//		zconnRev.Release()
		transaction.Current(zconnRev.txnCtx).Abort()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1998
		return revDir, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1999
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2000

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2001
	revDir = &Head{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2002
		fsNode: newFSNode(&fsOptions{Sticky: false}),	// XXX + Head.OnForget() -> del root.revTab[]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2003 2004
		rev:    rev,
		zconn:  zconnRev,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2005
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2006

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2007
	bfdir := &BigFileDir{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2008 2009 2010 2011
		fsNode:  newFSNode(&fsOptions{Sticky: false}),	// XXX + BigFileDir.OnForget()
		head:    revDir,
		fileTab: make(map[zodb.Oid]*BigFile),
		δFtail:  nil, // δFtail not needed/used for @revX/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2012 2013 2014
	}
	revDir.bfdir = bfdir

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2015 2016
	root.revTab[rev] = revDir
	root.revMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2017

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2018
	// mkdir takes filesystem treeLock - do it outside revMu.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2019
	mkdir(root, name, revDir)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2020 2021
	mkdir(revDir, "bigfile", bfdir)
	// XXX + "at"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2022

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2023
	return revDir, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2024 2025 2026
}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2027
// bigopen opens BigFile corresponding to oid on head.zconn.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2028 2029 2030
//
// A ZBigFile corresponding to oid is activated and statted.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2031 2032 2033
// head.zconn must be locked.
func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) {
	zconn := head.zconn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2034
	defer xerr.Contextf(&err, "bigopen %s @%s", oid, zconn.At())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2035 2036

	// XXX better ctx = transaction.PutIntoContext(ctx, txn)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2037
	ctx, cancel := xcontext.Merge(ctx, zconn.txnCtx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2038 2039
	defer cancel()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2040
	xzfile, err := zconn.Get(ctx, oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2041
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2042 2043 2044 2045 2046 2047 2048 2049
		switch errors.Cause(err).(type) {
		case *zodb.NoObjectError:
			return nil, eINVAL(err)
		case *zodb.NoDataError:
			return nil, eINVAL(err) // XXX what to do if it was existing and got deleted?
		default:
			return nil, err
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2050
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2051

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2052
	zfile, ok := xzfile.(*ZBigFile)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2053
	if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2054
		return nil, eINVALf("%s is not a ZBigFile", typeOf(xzfile))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2055 2056
	}

2057
	// extract blksize, size and initial approximation for file revision
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2058
	err = zfile.PActivate(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2059 2060 2061
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2062
	blksize := zfile.blksize
2063
	// XXX it should be revision of both ZBigFile and its data. But we
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2064 2065 2066 2067
	// cannot get data revision without expensive scan of all ZBigFile's objects.
	// -> approximate mtime initially with ZBigFile object mtime.
	//
	// XXX for @rev/... we can know initial mtime more exactly?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2068 2069
	rev     := zfile.PSerial()
	zfile.PDeactivate()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2070

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2071
	size, sizePath, err := zfile.Size(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2072 2073 2074 2075
	if err != nil {
		return nil, err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2076
	f := &BigFile{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2077
		fsNode:  newFSNode(&fsOptions{Sticky: false}),	// XXX + BigFile.OnForget -> del .head.bfdir.fileTab[]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2078
		head:    head,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2079
		zfile:   zfile,
2080
		blksize: blksize,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2081
		size:    size,
2082
		rev:     rev,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2083
		loading: make(map[int64]*blkLoadState),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2084 2085
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2086
	// only head/ needs δFtail, f.δtail and watches.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2087
	if head.rev == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2088
		head.bfdir.δFmu.Lock()	// XXX locking ok?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2089
		head.bfdir.δFtail.Track(f, -1, sizePath, nil)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2090
		head.bfdir.δFmu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2091

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2092 2093
		// FIXME: scan zfile.blktab - so that we can detect all btree changes
		// see  "XXX building δFtail lazily ..." in notes.txt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2094

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2095
		f.watchTab = make(map[*Watch]struct{})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2096
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2097 2098

	return f, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2099 2100
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2101
// Close release all resources of BigFile.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2102
func (f *BigFile) Close() error {
2103
	// XXX locking?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2104
	f.zfile = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2105

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2106 2107 2108
//	f.zconn.Release()
//	f.zconn = nil
	f.head = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2109 2110 2111

	return nil
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2112

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2113 2114 2115 2116
// ---- misc ---

// /(head|<rev>)/at -> readAt serves read.
func (h *Head) readAt() []byte {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2117 2118
	h.zheadMu.RLock()
	defer h.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2119 2120 2121 2122

	return []byte(h.zconn.At().String())
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2123 2124 2125 2126
// /(head|<rev>)/ -> Getattr serves stat.
func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
	at := head.rev
	if at == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2127
		head.zheadMu.RLock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2128
		at = head.zconn.At()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2129
		head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2130 2131 2132 2133 2134 2135 2136 2137
	}
	t := at.Time().Time

	out.Mode = fuse.S_IFDIR | 0555
	out.SetTimes(/*atime=*/nil, /*mtime=*/&t, /*ctime=*/&t)
	return fuse.OK
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2138
// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2139
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2140 2141
	f.head.zheadMu.RLock()
	defer f.head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2142

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2143 2144 2145
	f.getattr(out)
	return fuse.OK
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2146

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2147
func (f *BigFile) getattr(out *fuse.Attr) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2148
	out.Mode = fuse.S_IFREG | 0444
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2149
	out.Size = uint64(f.size)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2150
	// .Blocks
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2151
	// .Blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2152

2153
	mtime := f.rev.Time().Time
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2154 2155 2156 2157
	out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime)
}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2158 2159


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2160
// FIXME groot/gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode
Kirill Smelkov's avatar
Kirill Smelkov committed
2161 2162 2163 2164 2165
// TODO:
//	- Inode += .Mount() -> nodefs.Mount
//	- Mount:
//		.Root()		-> root Inode of the fs
//		.Connector()	-> FileSystemConnector through which fs is mounted
2166
var groot   *Root
Kirill Smelkov's avatar
Kirill Smelkov committed
2167 2168
var gfsconn *nodefs.FileSystemConnector

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2169 2170 2171 2172 2173 2174 2175 2176
// root of the filesystem is mounted here.
//
// we need to talk to kernel and lookup @<rev>/bigfile/<fid> before uploading
// data to kernel cache there. Referencing root of the filesystem via path is
// vulnerable to bugs wrt e.g. `mount --move` and/or mounting something else
// over wcfs. However keeping opened root fd will prevent wcfs to be unmounted,
// so we still have to reference the root via path.
var gmntpt string
2177

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2178
// debugging	(protected by zhead.W)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2179 2180
var gdebug = struct {
	// .wcfs/zhead opens
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2181
	// protected by groot.head.zheadMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2182 2183 2184
	zheadSockTab map[*FileSock]struct{}
}{}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2185 2186 2187 2188
func init() {
	gdebug.zheadSockTab = make(map[*FileSock]struct{})
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2189 2190
// _wcfs_Zhead serves .wcfs/zhead opens.
type _wcfs_Zhead struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2191
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2192 2193 2194 2195 2196 2197 2198
}

func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
	// XXX check flags?
	sk := NewFileSock()
	sk.CloseRead()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2199 2200
	groot.head.zheadMu.Lock()
	defer groot.head.zheadMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2201

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2202
	// XXX del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2203 2204 2205 2206
	gdebug.zheadSockTab[sk] = struct{}{}
	return sk.File(), fuse.OK
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2207 2208
// TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2209
func main() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2210
	stdlog.SetPrefix("wcfs: ")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2211
	//log.CopyStandardLogTo("WARNING") // XXX -> "DEBUG" if -d ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2212
	defer log.Flush()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2213

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2214 2215 2216 2217 2218 2219 2220
	err := _main()
	if err != nil {
		log.Fatal(err)
	}
}

func _main() (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2221
	debug := flag.Bool("d", false, "debug")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2222
	autoexit := flag.Bool("autoexit", false, "automatically stop service when there is no client activity")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2223 2224
	// XXX option to prevent starting if wcfs was already started/mounted on mntpt ?
	// XXX do the check unconditionally?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2225

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2226 2227
	flag.Parse()
	if len(flag.Args()) != 2 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2228
		fmt.Errorf("Usage: %s [OPTIONS] zurl mntpt", os.Args[0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2229 2230 2231 2232
	}
	zurl := flag.Args()[0]
	mntpt := flag.Args()[1]

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2233 2234 2235 2236
	xclose := func(c io.Closer) {
		err = xerr.First(err, c.Close())
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2237 2238 2239 2240 2241
	// debug -> precise t, no dates	(XXX -> always precise t?)
	if *debug {
		stdlog.SetFlags(stdlog.Lmicroseconds)
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2242
	log.Infof("start %q %q", mntpt, zurl)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2243 2244 2245 2246 2247 2248
	gover := "(built with " + runtime.Version()
	if raceBuild {
		gover += " -race"
	}
	gover += ")"
	log.Info(gover)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2249

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2250
	// open zodb storage/watch/db/connection
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2251
	ctx := context.Background()	// XXX + timeout?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2252
	zstor, err := zodb.Open(ctx, zurl, &zodb.OpenOptions{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2253 2254
		ReadOnly: true,
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2255
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2256
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2257
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2258
	defer xclose(zstor)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2259

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2260 2261 2262 2263
	zwatchq := make(chan zodb.Event)
	at0 := zstor.AddWatch(zwatchq)
	defer zstor.DelWatch(zwatchq)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2264
	zdb := zodb.NewDB(zstor)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2265
	defer xclose(zdb)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2266
	zhead, err := zopen(ctx, zdb, &zodb.ConnOptions{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2267 2268
		At: at0,

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2269 2270 2271 2272
		// we need zhead.cache to be maintained across several transactions.
		// see "3) for head/bigfile/* the following invariant is maintained ..."
		NoPool: true,
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2273
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2274
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2275
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2276 2277 2278
	zhead.Cache().Lock()
	zhead.Cache().SetControl(&zodbCacheControl{})
	zhead.Cache().Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2279

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2280 2281
	// mount root + head/
	head := &Head{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2282 2283 2284
		fsNode:   newFSNode(fSticky),
		rev:      0,
		zconn:    zhead,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2285
		wlinkTab: make(map[*WatchLink]struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2286
		hwait:    make(map[hwaiter]struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2287
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2288

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2289
	wnode := &WatchNode{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2290 2291
		fsNode: newFSNode(fSticky),
		head:   head,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2292 2293
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2294
	bfdir := &BigFileDir{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2295 2296 2297
		fsNode:   newFSNode(fSticky),
		head:     head,
		fileTab:  make(map[zodb.Oid]*BigFile),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2298
		δFtail:   NewΔFtail(zhead.At()),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2299 2300
	}
	head.bfdir = bfdir
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2301

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2302
	root := &Root{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2303
		fsNode: newFSNode(fSticky),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2304 2305 2306 2307
		zstor:  zstor,
		zdb:    zdb,
		head:   head,
		revTab: make(map[zodb.Tid]*Head),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2308 2309
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2310
	opts := &fuse.MountOptions{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2311 2312
		FsName: zurl,
		Name:   "wcfs",
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2313

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2314 2315 2316 2317 2318
		// We retrieve kernel cache in ZBlk.blksize chunks, which are 2MB in size.
		// XXX currently go-fuse caps MaxWrite to 128KB.
		// TODO -> teach go-fuse to handle Init.MaxPages (Linux 4.20+).
		MaxWrite:      2*1024*1024,

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2319 2320
		// XXX tune MaxReadAhead? MaxBackground?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2321
		// OS cache that we populate with bigfile data is precious;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2322 2323
		// we explicitly propagate ZODB invalidations into file invalidations.
		ExplicitDataCacheControl: true,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2324

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2325
		DisableXAttrs: true,        // we don't use
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2326
		Debug:         *debug,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2327
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2328

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2329
	fssrv, fsconn, err := mount(mntpt, root, opts)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2330
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2331
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2332
	}
2333
	groot   = root		// FIXME temp workaround (see ^^^)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2334
	gfsconn = fsconn	// FIXME ----//----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2335
	gmntpt  = mntpt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2336

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2337
	// we require proper pagecache control (added to Linux 2.6.36 in 2010)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2338 2339 2340
	kinit := fssrv.KernelSettings()
	kfuse := fmt.Sprintf("kernel FUSE (API %d.%d)", kinit.Major, kinit.Minor)
	supports := kinit.SupportsNotify
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2341
	if !(supports(fuse.NOTIFY_STORE_CACHE) && supports(fuse.NOTIFY_RETRIEVE_CACHE)) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2342
		return fmt.Errorf("%s does not support pagecache control", kfuse)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2343
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2344
	// make a bold warning if kernel does not support explicit cache invalidation
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2345
	// (patch sent upstream; see notes.txt -> "Notes on OS pagecache control")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2346 2347
	if kinit.Flags & fuse.CAP_EXPLICIT_INVAL_DATA == 0 {
		w1 := fmt.Sprintf("%s does not support explicit data cache invalidation", kfuse)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2348
		w2 := "-> performance will be AWFUL."
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2349 2350 2351
		w3 := "-> you need kernel which includes git.kernel.org/linus/ad2ba64dd489 ."
		log.Error(w1); log.Error(w2); log.Error(w3)
		fmt.Fprintf(os.Stderr, "W: wcfs: %s\nW: wcfs: %s\nW: wcfs: %s\n", w1, w2, w3)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2352 2353
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2354
	// add entries to /
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2355
	mkdir(root, "head", head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2356
	mkdir(head, "bigfile", bfdir)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2357
	mkfile(head, "at", NewSmallFile(head.readAt))   // TODO mtime(at) = tidtime(at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2358
	mkfile(head, "watch", wnode)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2359 2360

	// for debugging/testing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2361
	_wcfs := newFSNode(fSticky)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2362 2363
	mkdir(root, ".wcfs", &_wcfs)
	mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl)))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2364

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2365 2366 2367 2368 2369
	// .wcfs/zhead - special file channel that sends zhead.at.
	//
	// If a user opens it, it will start to get tids of through which
	// zhead.at was, starting from the time when .wcfs/zhead was opened.
	// There can be multiple openers. Once opened, the file must be read,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2370 2371
	// as wcfs blocks waiting for data to be read when processing
	// invalidations.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2372
	mkfile(&_wcfs, "zhead", &_wcfs_Zhead{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2373
		fsNode: newFSNode(fSticky),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2374
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2375

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2376
	// TODO handle autoexit
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2377 2378
	// (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl
	//  opened, so when all inodes has been forgotten - we know all wcfs.py clients exited)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2379 2380
	_ = autoexit

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2381 2382
	defer xerr.Contextf(&err, "serve %s %s", mntpt, zurl)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2383
	// spawn filesystem server.
2384 2385 2386
	//
	// use `go serve` + `waitMount` not just `serve` - because waitMount
	// cares to disable OS calling poll on us.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2387
	// ( if we don't disable polling - fs serving can get stuck - see
2388
	//   https://github.com/hanwen/go-fuse/commit/4f10e248eb for details )
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2389
	serveCtx, serveCancel := context.WithCancel(context.Background())
2390
	go func () {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2391
		defer serveCancel()
2392 2393 2394 2395
		fssrv.Serve()
	}()
	err = fssrv.WaitMount()
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2396
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2397 2398 2399 2400 2401 2402 2403 2404 2405
	}

	// filesystem server is serving requests.
	// run zwatcher and wait for it to complete.
	// zwatcher completes either normally - due to filesystem unmount, or fails.
	// if zwatcher fails - switch filesystem to return EIO instead of stale data.
	err = root.zwatcher(serveCtx, zwatchq)
	if errors.Cause(err) != context.Canceled {
		log.Error(err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2406
		log.Errorf("zwatcher failed -> switching filesystem to EIO mode")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2407
		// XXX switch fs to EIO mode
2408
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2409

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2410 2411
	// wait for unmount
	<-serveCtx.Done()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2412
	return nil	// XXX serveErr | zwatchErr ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2413
}