wcfs.go 68.6 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1 2
// Copyright (C) 2018-2019  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
Kirill Smelkov committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
20
// Program wcfs provides filesystem server with file data backed by wendelin.core arrays.
Kirill Smelkov's avatar
Kirill Smelkov committed
21 22 23 24 25
//
// Intro
//
// Each wendelin.core array (ZBigArray) is actually a linear file (ZBigFile)
// and array metadata like dtype, shape and strides associated with it. This
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
// program exposes as files only ZBigFile data and leaves rest of
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
27
// array-specific handling to clients. Every ZBigFile is exposed as one separate
Kirill Smelkov's avatar
Kirill Smelkov committed
28 29 30
// file that represents whole ZBigFile's data.
//
// For a client, the primary way to access a bigfile should be to mmap
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
31
// head/bigfile/<bigfileX> which represents always latest bigfile data.
Kirill Smelkov's avatar
Kirill Smelkov committed
32 33 34 35 36 37 38 39 40 41 42 43 44
// Clients that want to get isolation guarantee should subscribe for
// invalidations and re-mmap invalidated regions to file with pinned bigfile revision for
// the duration of their transaction. See "Invalidation protocol" for details.
//
// In the usual situation when bigfiles are big, and there are O(1)/δt updates,
// there should be no need for any cache besides shared kernel cache of latest
// bigfile data.
//
//
// Filesystem organization
//
// Top-level structure of provided filesystem is as follows:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
45
//	head/			; latest database view
Kirill Smelkov's avatar
Kirill Smelkov committed
46
//		...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
47 48 49
//	@<rev1>/		; database view as of revision <revX>
//		...
//	@<rev2>/
Kirill Smelkov's avatar
Kirill Smelkov committed
50
//		...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51
//	...
Kirill Smelkov's avatar
Kirill Smelkov committed
52
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53
// where head/ represents latest data as stored in upstream ZODB, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
54
// @<revX>/ represents data as of database revision <revX>.
Kirill Smelkov's avatar
Kirill Smelkov committed
55 56 57
//
// head/ has the following structure:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
58 59 60 61
//	head/
//		at			; data inside head/ is as of this ZODB transaction
//		watch			; channel for bigfile invalidations
//		bigfile/		; bigfiles' data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62 63
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
64
//			...
Kirill Smelkov's avatar
Kirill Smelkov committed
65
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66 67 68 69 70 71
// where /bigfile/<bigfileX> represents latest bigfile data as stored in
// upstream ZODB. As there can be some lag receiving updates from the database,
// /at describes precisely ZODB state for which bigfile data is currently
// exposed. Whenever bigfile data is changed in upstream ZODB, information
// about the changes is first propagated to /watch, and only after that
// /bigfile/<bigfileX> is updated. See "Invalidation protocol" for details.
Kirill Smelkov's avatar
Kirill Smelkov committed
72
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
73
// @<revX>/ has the following structure:
Kirill Smelkov's avatar
Kirill Smelkov committed
74
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
75 76 77
//	@<revX>/
//		at
//		bigfile/		; bigfiles' data as of revision <revX>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
78 79
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80
//			...
Kirill Smelkov's avatar
Kirill Smelkov committed
81
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
82
// where /bigfile/<bigfileX> represent bigfile data as of revision <revX>.
Kirill Smelkov's avatar
Kirill Smelkov committed
83
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
84
// Unless accessed {head,@<revX>}/bigfile/<bigfileX> are not automatically visible in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
85
// wcfs filesystem. Similarly @<revX>/ become visible only after access.
Kirill Smelkov's avatar
Kirill Smelkov committed
86 87 88 89
//
//
// Invalidation protocol
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
90
// In order to support isolation, wcfs implements invalidation protocol that
Kirill Smelkov's avatar
Kirill Smelkov committed
91 92
// must be cooperatively followed by both wcfs and client.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
93
// First, client mmaps latest bigfile, but does not access it
Kirill Smelkov's avatar
Kirill Smelkov committed
94
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
95
//	mmap(head/bigfile/<bigfileX>)
Kirill Smelkov's avatar
Kirill Smelkov committed
96
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97 98
// Then client opens head/watch and tells wcfs through it for which ZODB state
// it wants to get bigfile's view.
Kirill Smelkov's avatar
Kirill Smelkov committed
99
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
100
//	C: 1 watch <bigfileX> @<at>
Kirill Smelkov's avatar
Kirill Smelkov committed
101
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
102 103
// The server then, after potentially sending initial pin messages (see below),
// reports either success or failure:
Kirill Smelkov's avatar
Kirill Smelkov committed
104
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
105 106
//	S: 1 ok
//	S: 1 error ...		; if <at> is too far away back from head/at
Kirill Smelkov's avatar
Kirill Smelkov committed
107
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
108 109 110 111 112
// The server sends "ok" reply only after head/at is ≥ requested <at>, and
// only after all initial pin messages are fully acknowledged by the client.
// The client can start to use mmapped data after it gets "ok".
// The server sends "error" reply if requested <at> is too far away back from
// head/at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
113 114 115
// XXX other errors are possible (e.g. "no such file", or error handling pin).
// XXX error handling pin -> then client is killed?
// XXX if not - specify that watch state is lost after error.
Kirill Smelkov's avatar
Kirill Smelkov committed
116
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
117 118 119
// Upon watch request, either initially, or after sending "ok", the server will be notifying the
// client about file blocks that client needs to pin in order to observe file's
// data as of <at> revision:
Kirill Smelkov's avatar
Kirill Smelkov committed
120
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
121 122 123
// The filesystem server itself receives information about changed data from
// ZODB server through regular ZODB invalidation channel (as it is ZODB client
// itself). Then, separately for each changed file block, before actually
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
124 125 126
// updating head/bigfile/<bigfileX> content, it notifies through opened
// head/watch links to clients, that had requested it (separately to each
// client), about the changes:
Kirill Smelkov's avatar
Kirill Smelkov committed
127
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
128
//	S: <2·k> pin <bigfileX> #<blk> @<rev_max>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
129
//	XXX @head means unpin.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
130
//	XXX or use `unpin <bigfileX> #<blk>` ?
Kirill Smelkov's avatar
Kirill Smelkov committed
131
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
132 133
// and waits until all clients confirm that changed file block can be updated
// in global OS cache.
Kirill Smelkov's avatar
Kirill Smelkov committed
134
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
135
// The client in turn should now re-mmap requested to be pinned block to bigfile@<rev_max>
Kirill Smelkov's avatar
Kirill Smelkov committed
136
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137 138
//	# mmapped at address corresponding to #blk
//	mmap(@<rev_max>/bigfile/<bigfileX>, #blk, MAP_FIXED)
Kirill Smelkov's avatar
Kirill Smelkov committed
139 140 141
//
// and must send ack back to the server when it is done:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
142
//	C: <2·k> ack
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
143 144 145
//
// The server sends pin notifications only for file blocks, that are known to
// be potentially changed after client's <at>, and <rev_max> describes the
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
146
// upper bound for the block revision as of <at> database view:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
147
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
148
//	<rev_max> ≤ <at>	; block stays unchanged in (<rev_max>, <at>] range
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
149 150 151 152 153 154 155 156 157
//
// The server maintains short history tail of file changes to be able to
// support openings with <at> being slightly in the past compared to current
// head/at. The server might reject a watch request if <at> is too far away in
// the past from head/at. The client is advised to restart its transaction with
// more uptodate database view if it gets watch setup error.
//
// A later request from the client for the same <bigfileX> but with different
// <at>, overrides previous watch request for that file. A client can use "-"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
158
// instead of "@<at>" to stop watching a file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
159 160 161 162 163
//
// A single client can send several watch requests through single head/watch
// open, as well as it can use several head/watch opens simultaneously.
// The server sends pin notifications for all files requested to be watched via
// every head/watch open.
Kirill Smelkov's avatar
Kirill Smelkov committed
164
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
165 166 167 168
// Note: a client could use a single watch to manage its several views for the same
// file but with different <at>. This could be achieved via watching with
// @<at_min>, and then deciding internally which views needs to be adjusted and
// which views need not. Wcfs does not oblige clients to do so though, and a
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
169
// client is free to use as many head/watch openings as it needs to.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
170
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
171
// When clients are done with @<revX>/bigfile/<bigfileX> (i.e. client's
Kirill Smelkov's avatar
Kirill Smelkov committed
172
// transaction ends and array is unmapped), the server sees number of opened
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
173 174
// files to @<revX>/bigfile/<bigfileX> drops to zero, and automatically
// destroys @<revX>/bigfile/<bigfileX> after reasonable timeout.
Kirill Smelkov's avatar
Kirill Smelkov committed
175 176 177 178
//
//
// Protection against slow or faulty clients
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179 180 181 182
// If a client, on purpose or due to a bug or being stopped, is slow to respond
// with ack to file invalidation notification, it creates a problem because the
// server will become blocked waiting for pin acknowledgments, and thus all
// other clients, that try to work with the same file, will get stuck.
Kirill Smelkov's avatar
Kirill Smelkov committed
183
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
184 185
// The problem could be avoided, if wcfs would reside inside OS kernel and this
// way could be able to manipulate clients address space directly (then
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
186 187 188 189
// invalidation protocol won't be needed). It is also possible to imagine
// mechanism, where wcfs would synchronously change clients' address space via
// injecting trusted code and running it on client side via ptrace to adjust
// file mappings.
Kirill Smelkov's avatar
Kirill Smelkov committed
190
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
191 192 193 194 195
// However ptrace does not work when client thread is blocked under pagefault,
// and that is exactly what wcfs would need to do to process invalidations
// lazily, because eager invalidation processing results in prohibitively slow
// file opens. See internal wcfs overview for details about why ptrace
// cannot be used and why lazy invalidation processing is required.
Kirill Smelkov's avatar
Kirill Smelkov committed
196
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
197 198 199
// Lacking OS primitives to change address space of another process and not
// being able to work it around with ptrace in userspace, wcfs takes approach
// to kill a slow client on 30 seconds timeout by default.
Kirill Smelkov's avatar
Kirill Smelkov committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
//
//
// Writes
//
// As each bigfile is represented by 1 synthetic file, there can be several
// write schemes:
//
// 1. mmap(MAP_PRIVATE) + writeout by client
//
// In this scheme bigfile data is mmapped in MAP_PRIVATE mode, so that local
// user changes are not automatically propagated back to the file. When there
// is a need to commit, client investigates via some OS mechanism, e.g.
// /proc/self/pagemap or something similar, which pages of this mapping it
// modified. Knowing this it knows which data it dirtied and so can write this
// data back to ZODB itself, without filesystem server providing write support.
//
// 2. mmap(MAP_SHARED, PROT_READ) + write-tracking & writeout by client
//
// In this scheme bigfile data is mmaped in MAP_SHARED mode with read-only pages
// protection. Then whenever write fault occurs, client allocates RAM from
// shmfs, copies faulted page to it, and then mmaps RAM page with RW protection
// in place of original bigfile page. Writeout implementation should be similar
// to "1", only here client already knows the pages it dirtied, and this way
// there is no need to consult /proc/self/pagemap.
//
// The advantage of this scheme over mmap(MAP_PRIVATE) is that in case
// there are several in-process mappings of the same bigfile with overlapping
// in-file ranges, changes in one mapping will be visible in another mapping.
// Contrary: whenever a MAP_PRIVATE mapping is modified, the kernel COWs
// faulted page into a page completely private to this mapping, so that other
// MAP_PRIVATE mappings of this file, including ones created from the same
// process, do not see changes made to the first mapping.
//
// Since wendelin.core needs to provide coherency in between different slices
// of the same array, this is the mode wendelin.core actually uses.
//
// 3. write to wcfs
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
238
// TODO we later could implement "write-directly" mode where clients would write
Kirill Smelkov's avatar
Kirill Smelkov committed
239
// data directly into the file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
240
package main
Kirill Smelkov's avatar
Kirill Smelkov committed
241

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
242
// Wcfs organization
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
243
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
244
// Wcfs is a ZODB client that translates ZODB objects into OS files as would
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
245
// non-wcfs wendelin.core do for a ZBigFile. Contrary to non-wcfs wendelin.core,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
246
// it keeps bigfile data in shared OS cache efficiently. It is organized as follows:
247
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
248
// 1) 1 ZODB connection for "latest data" for whole filesystem (zhead).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
249 250
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 3) for head/bigfile/* the following invariant is maintained:
251
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
252
//	#blk ∈ OS file cache    =>    ZBlk(#blk) + all BTree/Bucket that lead to it  ∈ zhead cache(%)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
253
//	                              (ZBlk* in ghost state)
254
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
255 256
//	                        =>    all BTree/Bucket that lead to blk are tracked (XXX)
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
257
//    The invariant helps on invalidation: if we see a changed oid, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
258
//    zhead.cache.lookup(oid) = ø -> we know we don't have to invalidate OS
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
259 260
//    cache for any part of any file (even if oid relates to a file block - that
//    block is not cached and will trigger ZODB load on file read).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
261
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
262 263
//    XXX explain why tracked
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
264 265 266
//    Currently we maintain this invariant by simply never evicting ZBlk/LOBTree/LOBucket
//    objects from ZODB Connection cache. In the future we may want to try to
//    synchronize to kernel freeing its pagecache pages.
267
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
268
// 4) when we receive an invalidation message from ZODB - we process it and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
269
//    propagate invalidations to OS file cache of head/bigfile/*:
270
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
271
//	invalidation message: (tid↑, []oid)
272
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
273
//    4.1) zhead.cache.lookup(oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
274 275
//    4.2) ø: nothing to do - see invariant ^^^.
//    4.3) obj found:
276
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
277
//	- ZBlk*		-> [] of file/[]#blk
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
278
//	- BTree/Bucket	-> δ(BTree)  -> file/[]#blk
279
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
280
//	in the end after processing all []oid from invalidation message we have
281 282 283
//
//	  [] of file/[]#blk
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
284
//	that describes which file(s) parts needs to be invalidated.
285
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
286 287 288
//	FIXME no - we can build it but not in full - since we consider only zobj in live cache.
//	FIXME and even if we consider all δ'ed zobj, building complete set of
//	      file.δtail requires to first do complete scan of file.blktab
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
289
//	      which is prohibitively expensive. XXX -> no, we'll do the scan
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
290
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
291
//    4.4) for all file/blk to invalidate we do:
292
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
293
//	- try to retrieve head/bigfile/file[blk] from OS file cache(*);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
294
//	- if retrieved successfully -> store retrieved data back into OS file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
295
//	  cache for @<rev>/bigfile/file[blk], where
Kirill Smelkov's avatar
Kirill Smelkov committed
296
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
297
//	    # see below about file.δtail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
298
//	    # XXX -> file.δtail.LastBlkRev(#blk, zhead.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
299
//	    rev = max(file.δtail.by(#blk)) || min(rev ∈ file.δtail) || zhead.at
Kirill Smelkov's avatar
Kirill Smelkov committed
300
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
301
//	- invalidate head/bigfile/file[blk] in OS file cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
302 303
//
//	This preserves previous data in OS file cache in case it will be needed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
304
//	by not-yet-uptodate clients, and makes sure file read of head/bigfile/file[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
305 306 307
//	won't be served from OS file cache and instead will trigger a FUSE read
//	request to wcfs.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
308
//    4.5) no invalidation messages are sent to wcfs clients at this point(+).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
309
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
310 311
//    4.6) processing ZODB invalidations and serving file reads (see 7) are
//      organized to be mutually exclusive.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
312
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
313 314 315
// 5) after OS file cache was invalidated, we resync zhead to new database
//    view corresponding to tid.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
316
// 6) for every file δtail invalidation info about head/data is maintained:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
317
//
Kirill Smelkov's avatar
Kirill Smelkov committed
318 319
//	- tailv: [](rev↑, []#blk)
//	- by:    {} #blk -> []rev↑ in tail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
320
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
321 322
//    δtail.tail describes invalidations to file we learned from ZODB invalidation.
//    δtail.by   allows to quickly lookup information by #blk.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
323
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
324
//    min(rev) in δtail is min(@at) at which head/bigfile/file is currently watched (see below).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
325 326
//
//    XXX δtail can miss ...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
327
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
328 329 330
//    to support initial openings with @at being slightly in the past, we also
//    make sure that min(rev) is enough to cover last 10 minutes of history
//    from head/at.
331
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
332
// 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
333 334
//
//   7.1) load blkdata for head/bigfile/file[blk] @zhead.at .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
335 336
//
//	while loading this also gives upper bound estimate of when the block
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
337
//	was last changed:
338
//
Kirill Smelkov's avatar
Kirill Smelkov committed
339 340 341
//	  rev(blk) ≤ max(_.serial for _ in (ZBlk(#blk), all BTree/Bucket that lead to ZBlk))
//
//	it is not exact because BTree/Bucket can change (e.g. rebalance)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
342
//	but still point to the same k->ZBlk.
Kirill Smelkov's avatar
Kirill Smelkov committed
343
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
344
//	we also use file.δtail to find either exact blk revision:
Kirill Smelkov's avatar
Kirill Smelkov committed
345
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
346
//	  rev(blk) = max(file.δtail.by(#blk) -> []rev↑)
Kirill Smelkov's avatar
Kirill Smelkov committed
347
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
348
//	or another upper bound if #blk ∉ δtail:
Kirill Smelkov's avatar
Kirill Smelkov committed
349
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
350
//	  rev(blk) ≤ min(rev ∈ δtail)		; #blk ∉ δtail
Kirill Smelkov's avatar
Kirill Smelkov committed
351 352
//
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
353
//	below rev'(blk) is min(of the estimates found):
Kirill Smelkov's avatar
Kirill Smelkov committed
354 355 356
//
//	  rev(blk) ≤ rev'(blk)		rev'(blk) = min(^^^)
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
357
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
358
//	XXX we delay recomputing δFtail.LastBlkRev(file, #blk, head) because
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
359 360
//	using just cheap revmax estimate can frequently result in all watches
//	being skipped.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
361
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
362
//   7.2) for all registered client@at watchers of head/bigfile/file:
363
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
364
//	- rev'(blk) ≤ at: -> do nothing
Kirill Smelkov's avatar
Kirill Smelkov committed
365
//	- rev'(blk) > at:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
366
//	  - if blk ∈ watcher.pinned -> do nothing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
367
//	  - rev = max(δtail.by(#blk) : _ ≤ at)	|| min(rev ∈ δtail : rev ≤ at)	|| at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
368 369 370 371 372 373 374 375 376 377 378 379 380
//	  - watcher.pin(file, #blk, @rev)
//	  - watcher.pinned += blk
//
//	where
//
//	  watcher.pin(file, #blk, @rev)
//
//	sends pin message according to "Invalidation protocol", and is assumed
//	to cause
//
//	  remmap(file, #blk, @rev/bigfile/file)
//
//	on client.
381
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
382 383
//	( one could imagine adjusting mappings synchronously via running
//	  wcfs-trusted code via ptrace that wcfs injects into clients, but ptrace
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
384
//	  won't work when client thread is blocked under pagefault or syscall(^) )
385
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
386
//	in order to support watching for each head/bigfile/file
387
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
388
//	  [] of watch{client@at↑, pinned}
389
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
390 391
//	is maintained.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
392
//   7.3) blkdata is returned to kernel.
393 394 395
//
//   Thus a client that wants latest data on pagefault will get latest data,
//   and a client that wants @rev data will get @rev data, even if it was this
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
396
//   "old" client that triggered the pagefault(~).
397
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
398
// XXX 8) serving read from @<rev>/data + zconn(s) for historical state
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
399
// XXX 9) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
400 401
//
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
402 403 404 405
// (*) see notes.txt -> "Notes on OS pagecache control"
// (+) see notes.txt -> "Invalidations to wcfs clients are delayed until block access"
// (~) see notes.txt -> "Changing mmapping while under pagefault is possible"
// (^) see notes.txt -> "Client cannot be ptraced while under pagefault"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
406
// (%) no need to keep track of ZData - ZBlk1 is always marked as changed on blk data change.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
407
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
408
// XXX For every ZODB connection a dedicated read-only transaction is maintained.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
409

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
410 411
// XXX notation
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
412 413
// δZ    - change in ZODB space
// δB    - change in BTree*s* space
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
414
// δT    - change in BTree(1) space
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
415 416
// δF    - change in File*s* space
// δfile - change in File(1) space
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
417

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
418 419
// XXX describe locking
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
420
// head.zheadMu		write by handleδZ; read by read
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
421
//			-> rlockZHead() + lockZHead() ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
422 423
// ...

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
424 425
// XXX locking: test with -race (many bugs are reported)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
426
import (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
427
	"bufio"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
428
	"context"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
429
	"flag"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
430
	"fmt"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
431
	"io"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
432
	stdlog "log"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
433
	"os"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
434
//	"runtime"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
435
	"sort"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
436
	"strings"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
437
	"sync"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
438
	"sync/atomic"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
439
	"syscall"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
440
//	"time"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
441

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
442
	log "github.com/golang/glog"
Kirill Smelkov's avatar
Kirill Smelkov committed
443 444
	"golang.org/x/sync/errgroup"

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
445
	"lab.nexedi.com/kirr/go123/xcontext"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
446
	"lab.nexedi.com/kirr/go123/xerr"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
447

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
448
	"lab.nexedi.com/kirr/neo/go/transaction"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
449
	"lab.nexedi.com/kirr/neo/go/zodb"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
450
	"lab.nexedi.com/kirr/neo/go/zodb/btree"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
451
	_ "lab.nexedi.com/kirr/neo/go/zodb/wks"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
452 453 454

	"github.com/hanwen/go-fuse/fuse"
	"github.com/hanwen/go-fuse/fuse/nodefs"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
455
	"github.com/pkg/errors"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
456 457
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
458 459
// Root represents root of wcfs filesystem.
type Root struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
460
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
461 462 463 464 465

	// ZODB storage we work with
	zstor zodb.IStorage

	// ZODB DB handle for zstor.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
466
	// keeps cache of connections for @<rev>/ accesses.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
467
	// only one connection is used for each @<rev>.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
468 469
	zdb *zodb.DB

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
470
	// directory + ZODB connection for head/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
471
	// (zhead is Resync'ed and is kept outside zdb pool)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
472
	head *Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
473

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
474 475 476
	// directories + ZODB connections for @<rev>/
	revMu  sync.Mutex
	revTab map[zodb.Tid]*Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
477 478
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
479
// /(head|<rev>)/			- served by Head.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
480
type Head struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
481
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
482

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
483
	rev   zodb.Tid    // 0 for head/, !0 for @<rev>/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
484 485 486
	bfdir *BigFileDir // bigfile/
	// at    - served by .readAt
	// watch - implicitly linked to by fs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
487 488

	// ZODB connection for everything under this head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
489

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
490
	// zheadMu protects zconn.At & live _objects_ associated with it.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
491 492 493 494 495 496 497 498 499
	// while it is rlocked zconn is guaranteed to stay viewing database at
	// particular view.
	//
	// zwatcher write-locks this and knows noone is using ZODB objects and
	// noone mutates OS file cache while zwatcher is running.
	//
	// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
	// with additional locking protocol to avoid deadlocks (see below for
	// pauseOSCacheUpload + ...).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
500
	zheadMu sync.RWMutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
501
	zconn   *ZConn       // for head/ zwatcher resyncs head.zconn; others only read zconn objects.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
502

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
503 504 505 506 507 508 509
	// zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
	pauseOSCacheUpload    bool
	continueOSCacheUpload chan struct{}
	// uploadBlk signals to zwatcher that there are so many inflight OS cache uploads currently.
	inflightOSCacheUploads int32

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
510
	// XXX move zconn's current transaction to Head here?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
511

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
512
	// head/watch opens
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
513
	// XXX protected by ... zheadMu ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
514
	wlinkTab map[*WatchLink]struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
515 516 517 518

	// waiters for zconn.At to become ≥ their at.
	hwaitMu sync.Mutex           // zheadMu.W  |  zheadMu.R + hwaitMu
	hwait   map[hwaiter]struct{} // set{(at, ready)}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
519 520
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
521 522
// /(head|<rev>)/bigfile/		- served by BigFileDir.
type BigFileDir struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
523
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
524
	head *Head // parent head/ or @<rev>/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
525 526

	// {} oid -> <bigfileX>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
527
	fileMu  sync.Mutex	// XXX doc zheadMu.W  |  ...	?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
528
	fileTab map[zodb.Oid]*BigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
529

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
530
	// δ tail of tracked BTree nodes of all BigFiles + -> which file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
531
	// (used only for head/, not revX/)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
532
	δFmu   sync.RWMutex	// XXX doc zheadMu.W  |  ...	?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
533
	δFtail *ΔFtail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
534 535
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
536
// /(head|<rev>)/bigfile/<bigfileX>	- served by BigFile.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
537
type BigFile struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
538
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
539

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
540
	// this BigFile is under .head/bigfile/; it views ZODB via .head.zconn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
541 542
	// parent's BigFileDir.head is the same.
	head	*Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
543

544
	// ZBigFile top-level object
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
545
	zfile	*ZBigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
546

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
547 548 549 550
	// things read/computed from .zfile; constant during lifetime of current transaction.
	blksize int64    // zfile.blksize
	size    int64    // zfile.Size()
	rev     zodb.Tid // last revision that modified zfile data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
551

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
552 553 554 555 556
//	// tail change history of this file.
//	//
//	// XXX computationally expensive to start - see "Invalidations to wcfs
//	// clients are delayed ..." in notes.txt
//	δtail *ΔTailI64 // [](rev↑, []#blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
557

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
558
	// inflight loadings of ZBigFile from ZODB.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
559 560 561 562
	// successful load results are kept here until blkdata is put into OS pagecache.
	//
	// Being a staging area for data to enter OS cache, loading has to be
	// consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
563
	loadMu  sync.Mutex	// XXX doc zheadMu.W  |  ...	?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
564
	loading map[int64]*blkLoadState // #blk -> {... blkdata}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
565

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
566 567 568 569 570 571
	// watches attached to this file.
	//
	// both watches in already "established" state (i.e. initial watch
	// request was completed and answered with "ok"), and watches in
	// progress of being established. XXX text
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
572
	// XXX locking -> watchMu?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
573
	watches map[*Watch]struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
574 575 576 577 578 579 580 581 582
}

// blkLoadState represents a ZBlk load state/result.
//
// when !ready the loading is in progress.
// when ready the loading has been completed.
type blkLoadState struct {
	ready chan struct{}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
583 584
	blkdata  []byte
	err      error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
585 586
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
587
// /head/watch				- served by WatchNode.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
588 589 590 591 592 593 594
type WatchNode struct {
	fsNode

	head   *Head // parent head/
	idNext int32 // ID for next opened WatchLink
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
595
// /head/watch handle			- served by WatchLink.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
596 597 598 599 600
type WatchLink struct {
	sk   *FileSock // IO channel to client
	id   int32     // ID of this /head/watch handle (for debug log)
	head *Head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
601 602 603 604 605
	// established watches.
	// XXX in-progress - where?	-> (XXX no - see vvv) nowhere; here only established watches are added
	// XXX -> in-progress here - so that access to new blocks after δFtail
	//        was queried also send pins.
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
606
	// XXX locking?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
607
	byfile map[zodb.Oid]*Watch // {} foid -> Watch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
608 609

	// IO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
610
	reqNext uint64 // stream ID for next wcfs-originated request; 0 is reserved for control messages
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
611 612 613
	txMu    sync.Mutex
	rxMu    sync.Mutex
	rxTab   map[/*stream*/uint64]chan string // client replies go via here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
614 615
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
616
// Watch represents watching for changes to 1 BigFile over particular watch link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
617
type Watch struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
618 619
	link *WatchLink // link to client
	file *BigFile	// XXX needed?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
620 621 622 623

	// XXX locking

	at     zodb.Tid	// requested to be watched @at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
624 625 626

	// {} blk -> rev
	pinned map[int64]zodb.Tid // blocks that are already pinned to be ≤ at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
627 628
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
629
// -------- 3) Cache invariant --------
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
630

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
631 632
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
633
// because it is essentially the index where to find ZBigFile data.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
634 635 636 637 638 639 640
//
// For the data itself - we put it to kernel pagecache and always deactivate
// from ZODB right after that.
//
// See "3) for */head/data the following invariant is maintained..."
type zodbCacheControl struct {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
641
func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolicy {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
642
	switch obj.(type) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
643
	// ZBlk* should be in cache but without data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
644
	case *ZBlk0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
645
		return zodb.PCachePinObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
646
	case *ZBlk1:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
647
		return zodb.PCachePinObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
648 649

	// ZBigFile btree index should be in cache with data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
650
	case *btree.LOBTree:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
651
		return zodb.PCachePinObject | zodb.PCacheKeepState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
652
	case *btree.LOBucket:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
653 654 655 656
		return zodb.PCachePinObject | zodb.PCacheKeepState

	// don't let ZData to pollute the cache
	case *ZData:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
657
		return zodb.PCacheDropObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
658

659 660 661
	// for performance reason we also keep ZBigFile in cache.
	//
	// ZBigFile is top-level object that is used on every block load, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
662
	// it would be a waste to evict ZBigFile from cache.
663
	case *ZBigFile:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
664
		return zodb.PCachePinObject | zodb.PCacheKeepState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
665 666
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
667
	return 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
668 669
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
670 671
// -------- zhead lock/wait --------

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
672
// XXX needed?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
673 674 675 676 677 678
// TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock wait could be canceled
func (head *Head) zheadRLock()   { head.zheadMu.RLock() }
func (head *Head) zheadRUnlock() { head.zheadMu.RUnlock() }
func (head *Head) zheadLock()    { head.zheadMu.Lock() }
func (head *Head) zheadUnlock()  { head.zheadMu.Unlock() }

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
679 680
// -------- 4) ZODB invalidation -> OS cache --------

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
681
func traceZWatch(format string, argv ...interface{}) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
682 683 684 685
	if !log.V(1) {	// XXX -> 2?
		return
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
686
	log.Infof("zwatcher: " + format, argv...)	// XXX InfoDepthf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
687 688
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
689
// zwatcher watches for ZODB changes.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
690
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
691
// see "4) when we receive an invalidation message from ZODB ..."
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
692
func (root *Root) zwatcher(ctx context.Context) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
693
	defer xerr.Contextf(&err, "zwatch")	// XXX more in context?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
694 695
	// XXX unmount on error? -> always EIO?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
696
	traceZWatch(">>>")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
697

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
698
	zwatchq := make(chan zodb.Event)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
699
	at0 := root.zstor.AddWatch(zwatchq)	// XXX -> to main thread to avoid race
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
700
	defer root.zstor.DelWatch(zwatchq)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
701
	_ = at0 // XXX XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
702

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
703
	var zevent zodb.Event
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
704
	var ok bool
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
705 706

	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
707
		traceZWatch("select ...")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
708 709
		select {
		case <-ctx.Done():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
710
			traceZWatch("cancel")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
711 712 713 714
			return ctx.Err()

		case zevent, ok = <-zwatchq:
			if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
715
				traceZWatch("zwatchq closed")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
716 717
				return nil // closed	XXX ok?
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
718

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
719 720
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
721
		traceZWatch("zevent: %s", zevent)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
722 723 724 725 726 727 728 729 730

		switch zevent := zevent.(type) {
		default:
			return fmt.Errorf("unexpected event: %T", zevent)

		case *zodb.EventError:
			return zevent.Err

		case *zodb.EventCommit:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
731
			root.handleδZ(zevent)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
732
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
733 734 735
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
736 737
// handleδZ handles 1 change event from ZODB notification.
func (root *Root) handleδZ(δZ *zodb.EventCommit) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
738 739
	head := root.head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
740 741
	// while we are invalidating OS cache, make sure that nothing, that
	// even reads /head/bigfile/*, is running (see 4.6).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
742 743 744 745 746 747 748 749
	//
	// also make sure that cache uploaders we spawned (uploadBlk) are all
	// paused, or else they could overwrite OS cache with stale data.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for
	// details on how to do this without deadlocks.
	continueOSCacheUpload := make(chan struct{})
retry:
	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
750
		head.zheadMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
751 752 753 754
		head.pauseOSCacheUpload = true
		head.continueOSCacheUpload = continueOSCacheUpload

		if head.inflightOSCacheUploads != 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
755
			head.zheadMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
756 757 758 759 760 761 762 763 764
			continue retry
		}

		break
	}

	defer func() {
		head.pauseOSCacheUpload = false
		head.continueOSCacheUpload = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
765
		head.zheadMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
766 767
		close(continueOSCacheUpload)
	}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
768

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
769
	// head.zheadMu locked and all cache uploaders are paused
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
770

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
771 772
	zhead := head.zconn
	bfdir := head.bfdir
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
773

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
774
/*
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
775 776
	// δZ = (tid↑, []oid)
	for _, oid := range δZ.Changev {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
777
		// XXX zhead.Cache() lock/unlock
778
		obj := zhead.Cache().Get(oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
779
		if obj == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
780
			//fmt.Printf("%s: not in cache\n", oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
781 782 783
			continue // nothing to do - see invariant
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
784
		//fmt.Printf("%s:     in cache (%s)\n", oid, typeOf(obj))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
785

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
786 787
		switch obj := obj.(type) {
		default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
788
			continue // object not related to any bigfile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
789

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
790
		// XXX kill Tree/Bucket here (-> ΔFtail)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
791
		case *btree.LOBTree:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
792
			btreeChangev = append(btreeChangev, obj.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
793

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
794
		case *btree.LOBucket:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
795
			btreeChangev = append(btreeChangev, obj.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
796

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
797
		case zBlk:	// ZBlk*
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
798
			// blkBoundTo locking: no other bindZFile are running,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
799 800
			// since we write-locked head.zheadMu and bindZFile is
			// run when loading objects - thus when head.zheadMu is
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
801
			// read-locked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
802 803 804
			//
			// bfdir locking: similarly not needed, since we are
			// exclusively holding head lock.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
805
			for zfile, objBlk := range obj.blkBoundTo() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
806
				file, ok := bfdir.fileTab[zfile.POid()]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
807 808 809 810 811 812
				if !ok {
					// even though zfile is in ZODB cache, the
					// filesystem already forgot about this file.
					continue
				}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
813
				finv, ok := toinvalidate[file]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
814
				if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
815 816
					finv = &fileInvalidate{blkmap: SetI64{}}
					toinvalidate[file] = finv
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
817
				}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
818
				finv.blkmap.Update(objBlk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
819
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
820

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
821 822 823 824 825
		case *ZBigFile:
			// XXX check that .blksize and .blktab (it is only
			// persistent reference) do not change.

			// XXX shutdown fs with ^^^ message.
826
			panic("ZBigFile changed")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
827
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
828 829 830

		// make sure obj won't be garbage-collected until we finish handling it.
		runtime.KeepAlive(obj)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
831
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
832
*/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
833

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
834
	// invalidate kernel cache for data in changed files
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
835
	// XXX no indexMu lock needed because head is Locked
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
836
	δF := bfdir.δFtail.Update(δZ, zhead)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
837

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
838 839
	if false {
	fmt.Printf("\n\nS: zδhandle: δF (#%d):\n", len(δF.ByFile))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
840
	for file, δfile := range δF.ByFile {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
841 842 843 844 845 846 847 848
		blkv := δfile.Blocks.Elements()
		sort.Slice(blkv, func(i, j int) bool {
			return blkv[i] < blkv[j]
		})
		size := " "
		if δfile.Size {
			size = "S"
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
849
		fmt.Printf("S: \t- %s\t%s %v\n", file.zfile.POid(), size, blkv)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
850 851
	}
	fmt.Printf("\n\n")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
852
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
853

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
854
	wg, ctx := errgroup.WithContext(context.TODO())	// XXX ctx = ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
855
	for file, δfile := range δF.ByFile {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
856 857 858
//		// XXX needed?
//		// XXX even though δBtail is complete, not all ZBlk are present here
//		file.δtail.Append(δF.Rev, δfile.Blocks.Elements())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
859

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
860
		file := file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
861
		for blk := range δfile.Blocks {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
862
			blk := blk
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
863 864 865
			wg.Go(func() error {
				return file.invalidateBlk(ctx, blk)
			})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
866
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
867 868 869 870 871
	}
	err := wg.Wait()
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
872

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
873
	// invalidate kernel cache for attributes
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
874
	// we need to do it only if we see topology (i.e. btree) change
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
875
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
876
	// do it after completing data invalidations.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
877
	wg, ctx = errgroup.WithContext(context.TODO())	// XXX ctx = ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
878
	for file, δfile := range δF.ByFile {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
879
		if !δfile.Size {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
880
			continue
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
881
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
882
		file := file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
883
		wg.Go(func() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
884
			return file.invalidateAttr()	// XXX pass ctx?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
885
		})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
886
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
887
	err = wg.Wait()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
888 889 890
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
891

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
892
	// resync .zhead to δZ.tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
893 894
	// XXX -> Head.Resync() ?

895
	// 1. abort old and resync to new txn/at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
896
	transaction.Current(zhead.txnCtx).Abort()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
897
	_, ctx = transaction.New(context.Background()) // XXX bg ok?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
898
	err = zhead.Resync(ctx, δZ.Tid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
899 900 901
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
902
	zhead.txnCtx = ctx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
903

904
	// 2. restat invalidated ZBigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
905
	// XXX -> parallel
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
906
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
907
	for file := range δF.ByFile {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
908
		size, sizePath, err := file.zfile.Size(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
909 910 911 912
		if err != nil {
			panic(err)	// XXX
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
913
		file.size = size
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
914
		bfdir.δFtail.Track(file, -1, sizePath, nil)
915 916

		file.rev = zhead.At()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
917 918
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
919
	// notify .wcfs/zhead
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
920
	for sk := range gdebug.zheadSockTab {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
921
		_, err := fmt.Fprintf(sk, "%s\n", δZ.Tid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
922 923 924 925 926
		if err != nil {
			log.Error(err)	// XXX errctx, -> warning?
			sk.Close()
			delete(gdebug.zheadSockTab, sk)
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
927
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
928 929

	// XXX δFtail.ForgetPast(...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
930
	// XXX for f in δF: f.δtail.ForgetPast(...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978

	// notify zhead.At waiters
	for w := range head.hwait {
		if w.at <= δZ.Tid {
			delete(head.hwait, w)
			close(w.ready)
		}
	}
}

// hwaiter represents someone waiting for zhead to become ≥ at.
type hwaiter struct {
        at    zodb.Tid
        ready chan struct{}
}

// zheadWait waits till head.zconn.At becomes ≥ at.
//
// It returns error either if db is down or ctx is canceled.	XXX db -> wcfs?
func (head *Head) zheadWait(ctx context.Context, at zodb.Tid) (err error) {
	defer xerr.Contextf(&err, "wait zhead ≥ %s", at)

	if head.rev != 0 {
		panic("must be called only for head/, not @revX/")
	}

	// check if zhead is already ≥ at
	head.zheadMu.RLock()
	if head.zconn.At() >= at {
		head.zheadMu.RUnlock()
		return
	}

	// no - we have to wait for it
	ready := make(chan struct{})
	head.hwaitMu.Lock()
	head.hwait[hwaiter{at, ready}] = struct{}{}
	head.hwaitMu.Unlock()

	head.zheadMu.RUnlock()

	select {
	case <-ctx.Done():
		return ctx.Err()

	case <-ready:
		return nil // ok - zhead.At went ≥ at
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
979 980
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
981
// invalidateBlk invalidates 1 file block in kernel cache.
982
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
983
// see "4.4) for all file/blk to in invalidate we do"
984
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
985
// called with f.head.zheadMu wlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
986 987 988
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
	defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)

989
	fsconn := gfsconn
990
	blksize := f.blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
991 992
	off := blk*blksize

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
993 994 995 996
	var blkdata []byte = nil

	// first try to retrieve f.loading[blk];
	// make sure f.loading[blk] is invalidated.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
997
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
998
	// we are running with zheadMu wlocked - no need to lock f.loadMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
	loading, ok := f.loading[blk]
	if ok {
		if loading.err == nil {
			blkdata = loading.blkdata
		}
		delete(f.loading, blk)
	}

	// try to retrieve cache of current head/data[blk], if we got nothing from f.loading
	if blkdata == nil {
		blkdata = make([]byte, blksize)
		n, st := fsconn.FileRetrieveCache(f.Inode(), off, blkdata)
		if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1012
			// XXX warn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1013 1014 1015 1016
		}
		blkdata = blkdata[:n]
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1017 1018 1019 1020
	// if less than blksize was cached - probably the kernel had to evict
	// some data from its cache already. In such case we don't try to
	// preserve the rest and drop what was read, to avoid keeping the
	// system overloaded.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1021 1022
	//
	// if we have the data - preserve it under @revX/bigfile/file[blk].
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1023
	if int64(len(blkdata)) == blksize {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1024 1025
		func() {
			// store retrieved data back to OS cache for file @<rev>/file[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1026
			blkrev, _ := f.LastBlkRev(ctx, blk, f.head.zconn.At())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1027
			frev, frelease, err := groot.mkrevfile(blkrev, f.zfile.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1028 1029 1030 1031 1032 1033 1034
			if err != nil {
				log.Errorf("BUG: %s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
			}
			defer frelease()

			st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
			if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1035
				log.Errorf("BUG: %s: invalidate blk #%d: %s: store cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, frev.path(), st)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1036 1037
			}
		}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1038
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1039

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1040
	// invalidate file/head/data[blk] in OS file cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1041 1042
	st := fsconn.FileNotify(f.Inode(), off, blksize)
	if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1043
		return syscall.Errno(st)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1044
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1045

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1046
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1047
}
1048

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1049
// invalidateAttr invalidates file attributes in kernel cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1050 1051
//
// Complements invalidateBlk and is used to invalidate file size.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1052
func (f *BigFile) invalidateAttr() (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1053
	defer xerr.Contextf(&err, "%s: invalidate attr", f.path())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1054 1055
	fsconn := gfsconn
	st := fsconn.FileNotify(f.Inode(), -1, -1) // metadata only
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1056 1057 1058 1059
	if st != fuse.OK {
		return syscall.Errno(st)
	}
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1060 1061
}

1062

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1063
// mkrevfile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel.
1064 1065
//
// We need node ID to be know to the kernel, when we need to store data into
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1066
// file's kernel cache - if the kernel don't have the node ID for the file in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1067
// question, FileNotifyStoreCache will just fail.
1068
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1069 1070 1071
// For kernel to know the inode mkrevfile issues regular filesystem lookup
// request which goes to kernel and should go back to wcfs. It is thus not safe
// to use mkrevfile from under FUSE request handler as doing so might deadlock.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1072 1073 1074
//
// Caller must call release when inode ID is no longer required to be present.
func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, release func(), err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1075 1076 1077 1078 1079 1080 1081 1082 1083
	fsconn := gfsconn

	frevpath := fmt.Sprintf("@%s/bigfile/%s", rev, fid) // relative to fs root for now
	defer xerr.Contextf(&err, "/: mkrevfile %s", frevpath)

	// first check without going through kernel, whether the inode maybe know already
	xfrev := fsconn.LookupNode(root.Inode(), frevpath)
	if xfrev != nil {
		// FIXME checking for "node{0}" is fragile, but currently no other way
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1084 1085
		// XXX the node could be still forgotten since we are not holding open on it
		// XXX -> always os.open unconditionally? or it is ok since it is just a cache?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1086
		if xfrev.String() != "node{0}" {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1087
			return xfrev.Node().(*BigFile), func(){}, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1088 1089 1090 1091 1092 1093 1094
		}
	}

	// we have to ping the kernel
	frevospath := gmntpt + "/" + frevpath // now starting from OS /
	f, err := os.Open(frevospath)
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1095
		return nil, nil, err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1096 1097 1098
	}

	xfrev = fsconn.LookupNode(root.Inode(), frevpath)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1099
	// must be !nil as open succeeded	XXX better recheck
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1100
	return xfrev.Node().(*BigFile), func() { f.Close() }, nil
1101
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1102

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1103 1104 1105 1106
// -------- 7) FUSE read(#blk) --------

// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1107 1108
	f.head.zheadMu.RLock()
	defer f.head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126

	// cap read request to file size
	end := off + int64(len(dest))		// XXX overflow?
	if end > f.size {
		end = f.size
	}
	if end <= off {
		// XXX off >= size -> EINVAL? (but when size=0 kernel issues e.g. [0 +4K) read)
		return fuse.ReadResultData(nil), fuse.OK
	}

	// widen read request to be aligned with blksize granularity
	// (we can load only whole ZBlk* blocks)
	aoff := off - (off % f.blksize)
	aend := end
	if re := end % f.blksize; re != 0 {
		aend += f.blksize - re
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1127
	// XXX use original dest if it can fit the data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1128 1129 1130
	dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file

	// XXX better ctx = transaction.PutIntoContext(ctx, txn)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1131
	ctx, cancel := xcontext.Merge(fctx, f.head.zconn.txnCtx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147
	defer cancel()

	// read/load all block(s) in parallel
	wg, ctx := errgroup.WithContext(ctx)
	for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
		blkoff := blkoff
		blk := blkoff / f.blksize
		wg.Go(func() error {
			δ := blkoff-aoff // blk position in dest
			//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
			return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
		})
	}

	err := wg.Wait()
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1148
		return nil, err2LogStatus(err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158
	}

	return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK
}

// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
//
// see "7) when we receive a FUSE read(#blk) request ..." in overview.
//
// len(dest) == blksize.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1159
// called with head.zheadMu rlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1160 1161
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) {
	defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181

	// check if someone else is already loading this block
	f.loadMu.Lock()
	loading, already := f.loading[blk]
	if !already {
		loading = &blkLoadState{
			ready:   make(chan struct{}),
		}
		f.loading[blk] = loading
	}
	f.loadMu.Unlock()

	// if it is already loading - just wait for it
	if already {
		select {
		case <-ctx.Done():
			return ctx.Err()

		case <-loading.ready:
			if loading.err == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1182
				copy(dest, loading.blkdata)	// XXX copy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1183 1184 1185 1186 1187 1188
			}
			return loading.err
		}
	}

	// noone was loading - we became responsible to load this block
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1189
	blkdata, treepath, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1190 1191 1192 1193 1194
	loading.blkdata = blkdata
	loading.err = err

	// data loaded with error - cleanup .loading
	if loading.err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1195
		close(loading.ready)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1196 1197 1198 1199 1200 1201
		f.loadMu.Lock()
		delete(f.loading, blk)
		f.loadMu.Unlock()
		return err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1202
	// we have the data - it can be used after watchers are updated
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1203
	// XXX should we use ctx here? (see updateWatchers comments)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1204
	f.updateWatchers(ctx, blk, treepath, zblk, blkrevMax)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1205 1206 1207 1208

	// data can be used now
	close(loading.ready)
	copy(dest, blkdata)	// XXX copy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230

	// store to kernel pagecache whole block that we've just loaded from database.
	// This way, even if the user currently requested to read only small portion from it,
	// it will prevent next e.g. consecutive user read request to again hit
	// the DB, and instead will be served by kernel from its pagecache.
	//
	// We cannot do this directly from reading goroutine - while reading
	// kernel FUSE is holding corresponding page in pagecache locked, and if
	// we would try to update that same page in pagecache it would result
	// in deadlock inside kernel.
	//
	// .loading cleanup is done once we are finished with putting the data into OS pagecache.
	// If we do it earlier - a simultaneous read covered by the same block could result
	// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
	// and thus would trigger DB access again.
	//
	// XXX if direct-io: don't touch pagecache
	go f.uploadBlk(blk, loading)

	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1231
// updateWatchers complements readBlk: it updates watchers of the file after a
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1232 1233
// block was loaded from ZODB and before block data is returned to kernel.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1234 1235
// See "7.2) for all registered client@at watchers ..."
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1236
// Called with f.head.zheadMu rlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1237
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1238
// XXX do we really need to use/propagate caller context here? ideally update
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1239 1240
// watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1241
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblk zBlk, blkrevMax zodb.Tid) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1242 1243 1244 1245 1246
	// only head/ is being watched for
	if f.head.rev != 0 {
		return
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1247
	fmt.Printf("S: read #%d -> update watchers (#%d)\n", blk, len(f.watches))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1248

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1249
	// update δFtail index
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1250
	bfdir := f.head.bfdir
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1251 1252
	bfdir.δFmu.Lock()		// XXX locking correct? XXX -> better push down?
	bfdir.δFtail.Track(f, blk, treepath, zblk)	// XXX pass in zblk.rev here?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1253
	bfdir.δFmu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1254

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1255
/* XXX kill
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1256 1257 1258 1259
	// associate zblk with file, if data was not hole
	if zblk != nil {
		zblk.bindFile(f, blk)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1260
*/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1261

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1262 1263 1264 1265 1266 1267 1268 1269
	// makes sure that file[blk] on clients side stays as of @w.at state.

	// try to use blkrevMax only as the first cheap criteria to skip updating watchers.
	// This is likely to be the case, since most watchers should be usually close to head.
	// If using blkrevMax only turns out to be not sufficient, we'll
	// consult δFtail, which might involve recomputing it.
	blkrev := blkrevMax
	blkrevRough := true
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1270

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1271
	wg, ctx := errgroup.WithContext(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1272
	for w := range f.watches {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1273
		w := w
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1274

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1275 1276
		fmt.Printf("S: read -> update watchers: w @%s\n", w.at)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
		// XXX locking

		// the block is already covered by @w.at database view
		if blkrev <= w.at {
			continue
		}

		// if blkrev is rough estimation and that upper bound is > w.at
		// we have to recompute ~exact file[blk] revision @head.
		if blkrevRough {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1287
			blkrev, _ = f.LastBlkRev(ctx, blk, f.head.zconn.At())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
			blkrevRough = false

			if blkrev <= w.at {
				continue
			}
		}

		// the block is newer - find out its revision as of @w.at and pin to that.
		//
		// We don't pin to w.at since if we would do so for several clients,
		// and most of them would be on different w.at - cache of the file will
		// be lost. Via pinning to particular block revision, we make sure the
		// revision to pin is the same on all clients, and so file cache is shared.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1301
		pinrev, _ := w.file.LastBlkRev(ctx, blk, w.at)	// XXX move into go?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1302

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1303 1304
		wg.Go(func() error {
			// XXX close watcher on any error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1305
			return w.pin(ctx, blk, pinrev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1306
		})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1307
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1308 1309 1310
	err := wg.Wait()
	if err != nil {
		panic(err)	// XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1311 1312 1313
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1314
// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1315 1316 1317
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
	head := f.head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1318
	// rlock zheadMu and make sure zwatcher is not asking us to pause.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1319 1320 1321 1322
	// if it does - wait for a safer time not to deadlock.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry:
	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1323
		head.zheadMu.RLock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1324
		// help zwatcher if it asks us to pause uploadings, so it can
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1325
		// take zheadMu wlocked without deadlocks.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1326 1327
		if head.pauseOSCacheUpload {
			ready := head.continueOSCacheUpload
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1328
			head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
			<-ready
			continue retry
		}

		break
	}

	// zwatcher is not currently trying to pause OS cache uploads.

	// check if this block was already invalidated by zwatcher.
	// if so don't upload the block into OS cache.
	f.loadMu.Lock()
	loading_ := f.loading[blk]
	f.loadMu.Unlock()
	if loading != loading_ {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1344
		head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1345 1346 1347 1348 1349 1350
		return
	}

	oid := f.zfile.POid()

	// signal to zwatcher not to run while we are performing the upload.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1351
	// upload with released zheadMu so that zwatcher can lock it even if to
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1352 1353
	// check inflightOSCacheUploads status.
	atomic.AddInt32(&head.inflightOSCacheUploads, +1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1354
	head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368

	st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)

	f.loadMu.Lock()
	bug := (loading != f.loading[blk])
	if !bug {
		delete(f.loading, blk)
	}
	f.loadMu.Unlock()

	// signal to zwatcher that we are done and it can continue.
	atomic.AddInt32(&head.inflightOSCacheUploads, -1)

	if bug {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1369
		panicf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382
	}

	if st == fuse.OK {
		return
	}

	// pagecache update failed, but it must not (we verified on startup that
	// pagecache control is supported by kernel). We can correctly live on
	// with the error, but data access will be likely very slow. Tell user
	// about the problem.
	log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s  (ignoring, but reading from bigfile will be very slow)", oid, blk, st)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1383
// -------- invalidation protocol notification/serving --------
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1384 1385
//
// (see "7.2) for all registered client@at watchers ...")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1386

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1387 1388
// pin makes sure that file[blk] on client side is the same as of @rev state.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1389 1390
// rev = zodb.TidMax means @head; otherwise rev must be ≤ w.at and there must
// be no rev_next changing file[blk]: rev < rev_next ≤ w.at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1391
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1392
// XXX error - when? or close watch on any error?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1393
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
1394 1395 1396 1397 1398
	defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid())
	return w._pin(ctx, blk, rev)
}

func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1399
	foid := w.file.zfile.POid()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1400 1401 1402 1403
	revstr := rev.String()
	if rev == zodb.TidMax {
		revstr = "head"
	}
1404
	defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1405 1406 1407 1408

	// XXX locking?
	// XXX simultaneous calls?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1409 1410
	if !(rev == zodb.TidMax || rev <= w.at) {
		panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev",
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1411
			foid, w.link.id, blk, rev, w.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1412 1413
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1414
	if w.pinned[blk] == rev {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1415 1416 1417
		if rev == zodb.TidMax {
			panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk)
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1418 1419 1420
		// already pinned
		// (e.g. os cache for block was evicted and read called the second time)
		return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1421 1422
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1423
	ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1424 1425 1426 1427 1428 1429 1430 1431
	if err != nil {
		return err
	}

	if ack != "ack" {
		return fmt.Errorf("expect %q; got %q", "ack", ack)
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1432 1433 1434 1435 1436 1437
	if rev == zodb.TidMax {
		delete(w.pinned, blk)
	} else {
		w.pinned[blk] = rev
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1438 1439
	return nil
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1440

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1441
// setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1442
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1443 1444
// XXX sends "pin" notifications; final "ok" must be sent by caller.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1445
// XXX called synchronously - only 1 setupWatch call at a time?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1446
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1447
	defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1448 1449 1450
	head := wlink.head
	bfdir := head.bfdir

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1451
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1452
	// XXX head.zheadMu.RLock() + defer unlock (see vvv for unpin vs pin and locked head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1453 1454

	// XXX if watch was already established - we need to update it
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1455
	w := wlink.byfile[foid]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1456 1457 1458 1459 1460 1461
	if w == nil {
		// watch was not previously established - set it up anew
		// XXX locking
		f := bfdir.fileTab[foid]
		if f == nil {
			// by "invalidation protocol" watch is setup after data file was opened
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1462
			return fmt.Errorf("file not yet known to wcfs or is not a ZBigFile")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1463 1464 1465 1466 1467 1468 1469 1470
		}

		w = &Watch{
			link:   wlink,
			file:   f,
			at:     at,
			pinned: make(map[int64]zodb.Tid),
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1471 1472
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1473
	// at="-" (InvalidTid) means "remove the watch"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1474 1475 1476 1477
	// XXX locking
	if at == zodb.InvalidTid {
		delete(wlink.byfile, foid)
		delete(w.file.watches, w)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1478
		return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1479 1480 1481
	}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1482 1483
	// check at >= w.at
	// XXX we might want to allow going back in history if we need it.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1484
	if !(at >= w.at) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1485
		return fmt.Errorf("going back in history is forbidden")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1486 1487
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1488 1489 1490 1491 1492
	// XXX locking
	err = head.zheadWait(ctx, at)
	if err != nil {
		return err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1493

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1494
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1495 1496
	headAt := head.zconn.At()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1497
	if at < bfdir.δFtail.Tail() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1498 1499
		return fmt.Errorf("too far away back from head/at (@%s); δt = %s",
			headAt, headAt.Time().Sub(at.Time().Time))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1500 1501
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1502 1503 1504
	f := w.file

	// register w to f here early, so that READs going in parallel to us
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1505 1506
	// preparing and processing initial pins, also send pins to w for read
	// blocks. If we don't, we can miss to send pin to w for a freshly read
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1507
	// block which could have revision > w.at:	XXX test
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1508 1509
	//
	//                1   3    2   4
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1510
	//	-----.----x---o----x---x------]----------
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1511 1512
	//           ↑                        ↑
	//          w.at                     head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1513
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1514 1515 1516 1517 1518 1519 1520
	// Here blocks #1, #2 and #4 were previously accessed, are thus tracked
	// by δFtail and are changed after w.at - they will be returned by vvv
	// δFtail query and pin-sent to w. Block #3 was not yet accessed but
	// was also changed after w.at . As head/file[#3] might be accessed
	// simultaneously to watch setup, and f.readBlk will be checking
	// f.watches; if w ∉ f.watches at that moment, w will miss to receive
	// pin for #3.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1521
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1522 1523
	// NOTE for `unpin blk` to -> @head we can be sure there won't be
	// simultaneous `pin blk` request, because:		XXX recheck
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1524 1525 1526 1527 1528 1529 1530
	//
	// - unpin means blk was previously pinned,
	// - blk was pinned means it is tracked by δFtail,
	// - if blk is tracked and δFtail says there is no δblk ∈ (at, head],
	//   there is indeed no blk change in that region,
	// - which means that δblk with rev > w.at might be only > head,
	// - but such δblk are processed with zhead wlocked and we keep zhead
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1531
	//   rlocked during pin setup.		XXX rlock zhead during setupWatch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1532 1533 1534 1535 1536 1537
	//
	//          δ                      δ
	//      ----x----.------------]----x----
	//               ↑            ↑
	//              w.at         head
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1538
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1539
	// XXX register only if watch was created anew, not updated?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1540
	f.watches[w] = struct{}{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1541
	wlink.byfile[foid] = w
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1542

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1543
	// XXX defer -> unregister watch if error?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1544

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1545
	// pin all tracked file blocks that were changed in (at, head] range.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1546

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1547
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1548

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1549 1550
	toPin  := map[int64]zodb.Tid{} // blk -> @rev

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1551 1552
	for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) {
		for blk := range δfile.Blocks {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1553 1554 1555 1556
			_, already := toPin[blk]
			if already {
				continue
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1557

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571
			toPin[blk], _ = f.LastBlkRev(ctx, blk, at)
		}
	}

	// if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head.
	for blk, pinPrev := range w.pinned {	// XXX locking
		pinNew, pinning := toPin[blk]
		if !pinning {
			toPin[blk] = zodb.TidMax // @head
		}

		// don't bother to spawn .pin goroutines if pin revision is the same
		if pinPrev == pinNew {
			delete(toPin, blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1572 1573
		}
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1574

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1575
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1576
	// XXX do at the same time when registering ^^^, so that simultaneous READs see correct w.at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1577 1578
	w.at = at

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1579 1580 1581 1582 1583
	wg, ctx := errgroup.WithContext(ctx)
	for blk, rev := range toPin {
		blk := blk
		rev := rev
		wg.Go(func() error {
1584
			return w._pin(ctx, blk, rev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1585 1586 1587 1588
		})
	}
	err = wg.Wait()
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1589
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1590
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1591

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1592 1593
	// XXX or register w to f & wlink here?
	// NOTE registering f.watches[w] and wlink.byfile[foid] = w must come together.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1594

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1595
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1596 1597
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1598
// Open serves /head/watch opens.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1599
func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1600
	// XXX check flags?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1601
	head := wnode.head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1602

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1603
	wlink := &WatchLink{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1604 1605 1606 1607 1608
		sk:     NewFileSock(),
		id:     atomic.AddInt32(&wnode.idNext, +1),
		head:   head,
		byfile: make(map[zodb.Oid]*Watch),
		rxTab:  make(map[uint64]chan string),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1609 1610 1611
	}

	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1612 1613
	// XXX del wlinkTab[w] on w.sk.File.Release
	head.wlinkTab[wlink] = struct{}{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1614

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1615
	go wlink.serve()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1616
	return wlink.sk.File(), fuse.OK
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1617 1618
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1619 1620 1621 1622
// serve serves client initiated watch requests and routes client replies to
// wcfs initiated pin requests.
func (wlink *WatchLink) serve() {
	err := wlink._serve()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1623
	// XXX log error if !close
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1624 1625 1626
	if err != nil {
		log.Error(err)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1627
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1628
	delete(wlink.head.wlinkTab, wlink)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1629
	// XXX deactiwate all watches from wlink.byfile[*]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1630 1631
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1632
func (wlink *WatchLink) _serve() (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1633 1634
	defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
	r := bufio.NewReader(wlink.sk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1635

1636
	ctx0 := context.TODO()	// XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1637

1638
	ctx, cancel := context.WithCancel(ctx0)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1639 1640 1641
	wg, ctx := errgroup.WithContext(ctx)

	defer func() {
1642 1643 1644 1645 1646 1647
		// cancel all handlers on both error and ok return.
		// ( ok return is e.g. when we received "bye", so if client
		//   sends "bye" and some pin handlers are in progress - they
		//   anyway don't need to wait for client replies anymore )
		cancel()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1648 1649 1650 1651
		err2 := wg.Wait()
		if err == nil {
			err = err2
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1652

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1653 1654 1655 1656 1657 1658 1659
		// unregister all watches created on this wlink
		// XXX locking
		for _, w := range wlink.byfile {
			delete(w.file.watches, w)
		}
		wlink.byfile = nil

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1660 1661 1662
		// write to peer if it was logical error on client side
		// then .sk.tx to wakeup rx on client side
		if err != nil {
1663
			_ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1664
		}
1665 1666

		// close .sk.tx : this wakes up rx on client side.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1667 1668 1669 1670
		err2 = wlink.sk.CloseWrite()
		if err == nil {
			err = err2
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1671 1672
	}()

1673
	// close .sk.rx on error/wcfs stopping or return: this wakes up read(sk).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1674 1675 1676
	retq := make(chan struct{})
	defer close(retq)
	wg.Go(func() error {
1677 1678 1679 1680 1681
		// monitor is always canceled - either at parent ctx cancel, or
		// upon return from serve (see "cancel all handlers ..." ^^^).
		// If it was return - report returned error to wg.Wait, not "canceled".
		<-ctx.Done()
		e := ctx.Err()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1682
		select {
1683
		default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1684 1685 1686 1687
		case <-retq:
			e = err // returned error
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1688
		e2 := wlink.sk.CloseRead()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1689 1690 1691 1692 1693 1694 1695 1696 1697 1698
		if e == nil {
			e = e2
		}
		return e
	})

	// allow for only 1 watch request at a time
	// TODO allow simultaneous watch requests for different files
	var handlingWatch int32

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1699
	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1700
		l, err := r.ReadString('\n')	// XXX limit accepted line len to prevent DOS
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1701
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1702 1703 1704 1705
			// r.Read is woken up by sk.CloseRead when serve decides to exit
			if err == io.ErrClosedPipe {
				err = nil
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1706
			return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1707 1708
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1709
		fmt.Printf("S: wlink %d: rx: %q\n", wlink.id, l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1710

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1711
		stream, msg, err := parseWatchFrame(l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1712
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1713
			return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1714 1715
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1716
		// reply from client to wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1717 1718
		reply := (stream % 2 == 0)
		if reply {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1719 1720 1721 1722
			wlink.rxMu.Lock()
			rxq := wlink.rxTab[stream]
			delete(wlink.rxTab, stream)
			wlink.rxMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1723 1724

			if rxq == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1725
				return fmt.Errorf("%d: reply on unexpected stream", stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1726
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1727 1728 1729
			rxq <- msg
			continue
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1730

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1731 1732 1733 1734 1735 1736 1737 1738
		// client-initiated request

		// bye		TODO document in "Invalidation protocol"
		if msg == "bye" {
			return nil // deferred sk.Close will wake-up rx on client side
		}

		// watch ...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1739 1740
		if atomic.LoadInt32(&handlingWatch) != 0 {
			return fmt.Errorf("%d: another watch request is already in progress", stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1741
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1742 1743 1744 1745 1746
		atomic.StoreInt32(&handlingWatch, 1)
		wg.Go(func() error {
			defer atomic.StoreInt32(&handlingWatch, 0)
			return wlink.handleWatch(ctx, stream, msg)
		})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1747 1748
	}
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1749

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1750
// handleWatch handles watch request from client.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1751 1752
//
// returned error comes without full error prefix.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1753
func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg string) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1754
	defer xerr.Contextf(&err, "%d", stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1755

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1756 1757
	err = wlink._handleWatch(ctx, msg)
	reply := "ok"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1758
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1759
		// logical error is reported back to client, but watch link remains live
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1760
		reply = fmt.Sprintf("error %s", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1761
		err = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1762
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1763

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1764
	err = wlink.send(ctx, stream, reply)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1765 1766
	return err
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1767

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1768 1769
func (wlink *WatchLink) _handleWatch(ctx context.Context, msg string) error {
	foid, at, err := parseWatch(msg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1770 1771
	if err != nil {
		return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1772
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1773

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1774 1775
	err = wlink.setupWatch(ctx, foid, at)
	return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1776
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1777

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1778
// sendReq sends wcfs-originated request to client and returns client response.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1779
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1780
	// XXX err ctx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1781 1782 1783 1784
	var stream uint64
	for stream == 0 {
		stream = atomic.AddUint64(&wlink.reqNext, +2)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1785 1786

	rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1787 1788 1789
	wlink.rxMu.Lock()
	wlink.rxTab[stream] = rxq	// XXX assert .stream is not there?
	wlink.rxMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1790

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1791
	err = wlink.send(ctx, stream, req)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1792 1793 1794 1795 1796 1797
	if err != nil {
		return "", err
	}

	select {
	case <-ctx.Done():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1798
		// XXX del rxTab[stream] ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1799 1800 1801 1802 1803 1804 1805
		return "", ctx.Err()

	case reply = <-rxq:
		return reply, nil
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1806
// send sends a message to client over specified stream ID.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817
//
// Multiple send can be called simultaneously; send serializes writes.
func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) error {
	// XXX err ctx
	// XXX assert '\n' not in msg

	wlink.txMu.Lock()
	defer wlink.txMu.Unlock()

	// XXX timeout write on ctx cancel
	pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1818
	fmt.Printf("S: wlink %d: tx: %q\n", wlink.id, pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1819 1820 1821 1822 1823 1824 1825 1826
	_, err := wlink.sk.Write(pkt)
	if err != nil {
		return err
	}

	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1827

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1828
// ---- Lookup ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1829

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1830
// /(head|<rev>)/bigfile/ -> Lookup receives client request to create /(head|<rev>)/bigfile/<bigfileX>.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1831
func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1832
	f, err := bfdir.lookup(out, name, fctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1833 1834 1835 1836
	var inode *nodefs.Inode
	if f != nil {
		inode = f.Inode()
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1837 1838 1839 1840
	return inode, err2LogStatus(err)

}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1841
func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (f *BigFile, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1842
	defer xerr.Contextf(&err, "%s: lookup %q", bfdir.path(), name)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1843 1844 1845 1846 1847 1848

	oid, err := zodb.ParseOid(name)
	if err != nil {
		return nil, eINVALf("not oid")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1849 1850
	bfdir.head.zheadMu.RLock()
	defer bfdir.head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1851 1852 1853 1854 1855 1856 1857

	defer func() {
		if f != nil {
			f.getattr(out)
		}
	}()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1858
	// check to see if dir(oid) is already there
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1859
	bfdir.fileMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1860
	f, already := bfdir.fileTab[oid]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1861
	bfdir.fileMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1862 1863

	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1864
		return f, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1865 1866
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1867
	// not there - without bfdir lock proceed to open BigFile from ZODB
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1868
	f, err = bfdir.head.bigopen(fctx, oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1869 1870 1871
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1872

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1873
	// relock bfdir and either register f or, if the file was maybe
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1874
	// simultaneously created while we were not holding bfdir.fileMu, return that.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1875
	bfdir.fileMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1876
	f2, already := bfdir.fileTab[oid]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1877
	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1878
		bfdir.fileMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1879
		f.Close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1880
		return f2, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1881 1882
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1883
	bfdir.fileTab[oid] = f
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1884
	bfdir.fileMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1885

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1886
	// mkfile takes filesystem treeLock - do it outside bfdir.fileMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1887
	mkfile(bfdir, name, f)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1888

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1889
	return f, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1890 1891
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1892 1893 1894 1895 1896 1897 1898 1899
// / -> Lookup receives client request to create @<rev>/.
func (root *Root) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
	revd, err := root.lookup(name, fctx)
	var inode *nodefs.Inode
	if revd != nil {
		inode = revd.Inode()
		_ = revd.GetAttr(out, nil, fctx) // always ok
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1900 1901 1902
	return inode, err2LogStatus(err)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1903
func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1904
	defer xerr.Contextf(&err, "/: lookup %q", name)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1905

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1906
	var rev zodb.Tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1907 1908 1909
	ok := false

	if strings.HasPrefix(name, "@") {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1910
		rev, err = zodb.ParseTid(name[1:])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1911 1912 1913
		ok = (err == nil)
	}
	if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1914
		return nil, eINVALf("not @rev")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1915 1916
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1917
	// check to see if dir(rev) is already there
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1918
	root.revMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1919
	revDir, already := root.revTab[rev]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1920
	root.revMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1921 1922

	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1923
		return revDir, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1924 1925
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1926
	// not there - without revMu lock proceed to open @rev view of ZODB
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1927 1928
//	zconnRev, err := root.zopenAt(fctx, rev)
	zconnRev, err := zopen(fctx, root.zdb, &zodb.ConnOptions{At: rev})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1929 1930 1931 1932
	if err != nil {
		return nil, err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1933 1934 1935
	// relock root and either register new revX/ directory or, if the
	// directory was maybe simultaneously created while we were not holding
	// revMu, return that.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1936
	root.revMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1937
	revDir, already = root.revTab[rev]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1938
	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1939 1940 1941
		root.revMu.Unlock()
//		zconnRev.Release()
		transaction.Current(zconnRev.txnCtx).Abort()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1942
		return revDir, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1943
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1944

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1945
	// XXX -> newHead()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1946
	revDir = &Head{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1947
		fsNode: newFSNode(&fsOptions{Sticky: false}),	// XXX + Head.OnForget() -> del root.revTab[]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1948 1949
		rev:    rev,
		zconn:  zconnRev,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1950
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1951

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1952
	bfdir := &BigFileDir{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1953 1954 1955 1956
		fsNode:  newFSNode(&fsOptions{Sticky: false}),	// XXX + BigFileDir.OnForget()
		head:    revDir,
		fileTab: make(map[zodb.Oid]*BigFile),
		δFtail:  nil, // δFtail not needed/used for @revX/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1957 1958 1959
	}
	revDir.bfdir = bfdir

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1960 1961
	root.revTab[rev] = revDir
	root.revMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1962

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1963
	// mkdir takes filesystem treeLock - do it outside revMu.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1964
	mkdir(root, name, revDir)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1965 1966
	mkdir(revDir, "bigfile", bfdir)
	// XXX + "at"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1967

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1968
	return revDir, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1969 1970 1971
}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1972
// bigopen opens BigFile corresponding to oid on head.zconn.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1973 1974 1975
//
// A ZBigFile corresponding to oid is activated and statted.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1976 1977 1978
// head.zconn must be locked.
func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) {
	zconn := head.zconn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1979
	defer xerr.Contextf(&err, "bigopen %s @%s", oid, zconn.At())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1980 1981

	// XXX better ctx = transaction.PutIntoContext(ctx, txn)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1982
	ctx, cancel := xcontext.Merge(ctx, zconn.txnCtx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1983 1984
	defer cancel()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1985
	xzfile, err := zconn.Get(ctx, oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1986
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1987 1988 1989 1990 1991 1992 1993 1994
		switch errors.Cause(err).(type) {
		case *zodb.NoObjectError:
			return nil, eINVAL(err)
		case *zodb.NoDataError:
			return nil, eINVAL(err) // XXX what to do if it was existing and got deleted?
		default:
			return nil, err
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1995
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1996

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1997
	zfile, ok := xzfile.(*ZBigFile)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1998
	if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1999
		return nil, eINVALf("%s is not a ZBigFile", typeOf(xzfile))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2000 2001
	}

2002
	// extract blksize, size and initial approximation for file revision
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2003
	err = zfile.PActivate(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2004 2005 2006
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2007
	blksize := zfile.blksize
2008
	// XXX it should be revision of both ZBigFile and its data. But we
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2009 2010 2011 2012
	// cannot get data revision without expensive scan of all ZBigFile's objects.
	// -> approximate mtime initially with ZBigFile object mtime.
	//
	// XXX for @rev/... we can know initial mtime more exactly?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2013 2014
	rev     := zfile.PSerial()
	zfile.PDeactivate()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2015

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2016
	size, sizePath, err := zfile.Size(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2017 2018 2019 2020
	if err != nil {
		return nil, err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2021
	f := &BigFile{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2022
		fsNode:  newFSNode(&fsOptions{Sticky: false}),	// XXX + BigFile.OnForget -> del .head.bfdir.fileTab[]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2023
		head:    head,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2024
		zfile:   zfile,
2025
		blksize: blksize,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2026
		size:    size,
2027
		rev:     rev,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2028
		loading: make(map[int64]*blkLoadState),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2029 2030
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2031
	// only head/ needs δFtail, f.δtail and watches.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2032
	if head.rev == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2033
		head.bfdir.δFmu.Lock()	// XXX locking ok?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2034
		head.bfdir.δFtail.Track(f, -1, sizePath, nil)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2035
		head.bfdir.δFmu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2036

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2037 2038
		// FIXME: scan zfile.blktab - so that we can detect all btree changes
		// see  "XXX building δFtail lazily ..." in notes.txt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2039 2040

		f.watches = make(map[*Watch]struct{})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2041
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2042 2043

	return f, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2044 2045
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2046
// Close release all resources of BigFile.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2047
func (f *BigFile) Close() error {
2048
	// XXX locking?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2049
	f.zfile = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2050

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2051 2052 2053
//	f.zconn.Release()
//	f.zconn = nil
	f.head = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2054 2055 2056

	return nil
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2057

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2058 2059 2060 2061
// ---- misc ---

// /(head|<rev>)/at -> readAt serves read.
func (h *Head) readAt() []byte {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2062 2063
	h.zheadMu.RLock()
	defer h.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2064 2065 2066 2067

	return []byte(h.zconn.At().String())
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2068 2069 2070 2071
// /(head|<rev>)/ -> Getattr serves stat.
func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
	at := head.rev
	if at == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2072
		head.zheadMu.RLock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2073
		at = head.zconn.At()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2074
		head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2075 2076 2077 2078 2079 2080 2081 2082
	}
	t := at.Time().Time

	out.Mode = fuse.S_IFDIR | 0555
	out.SetTimes(/*atime=*/nil, /*mtime=*/&t, /*ctime=*/&t)
	return fuse.OK
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2083
// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2084
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2085 2086
	f.head.zheadMu.RLock()
	defer f.head.zheadMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2087

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2088 2089 2090
	f.getattr(out)
	return fuse.OK
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2091

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2092
func (f *BigFile) getattr(out *fuse.Attr) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2093
	out.Mode = fuse.S_IFREG | 0444
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2094
	out.Size = uint64(f.size)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2095
	// .Blocks
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2096
	// .Blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2097

2098
	mtime := f.rev.Time().Time
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2099 2100 2101 2102
	out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime)
}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2103 2104


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2105
// FIXME groot/gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode
Kirill Smelkov's avatar
Kirill Smelkov committed
2106 2107 2108 2109 2110
// TODO:
//	- Inode += .Mount() -> nodefs.Mount
//	- Mount:
//		.Root()		-> root Inode of the fs
//		.Connector()	-> FileSystemConnector through which fs is mounted
2111
var groot   *Root
Kirill Smelkov's avatar
Kirill Smelkov committed
2112 2113
var gfsconn *nodefs.FileSystemConnector

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2114 2115 2116 2117 2118 2119 2120 2121
// root of the filesystem is mounted here.
//
// we need to talk to kernel and lookup @<rev>/bigfile/<fid> before uploading
// data to kernel cache there. Referencing root of the filesystem via path is
// vulnerable to bugs wrt e.g. `mount --move` and/or mounting something else
// over wcfs. However keeping opened root fd will prevent wcfs to be unmounted,
// so we still have to reference the root via path.
var gmntpt string
2122

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2123 2124 2125
// debugging
var gdebug = struct {
	// .wcfs/zhead opens
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2126
	// protected by groot.head.zheadMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2127 2128 2129
	zheadSockTab map[*FileSock]struct{}
}{}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2130 2131 2132 2133
func init() {
	gdebug.zheadSockTab = make(map[*FileSock]struct{})
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2134 2135
// _wcfs_Zhead serves .wcfs/zhead opens.
type _wcfs_Zhead struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2136
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2137 2138 2139 2140 2141 2142 2143
}

func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
	// XXX check flags?
	sk := NewFileSock()
	sk.CloseRead()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2144 2145
	groot.head.zheadMu.Lock()
	defer groot.head.zheadMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2146

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2147
	// XXX del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2148 2149 2150 2151
	gdebug.zheadSockTab[sk] = struct{}{}
	return sk.File(), fuse.OK
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2152 2153
// TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2154
func main() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2155
	stdlog.SetPrefix("wcfs: ")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2156
	//log.CopyStandardLogTo("WARNING") // XXX -> "DEBUG" if -d ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2157
	defer log.Flush()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2158

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2159
	debug := flag.Bool("d", false, "debug")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2160
	autoexit := flag.Bool("autoexit", false, "automatically stop service when there is no client activity")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2161 2162
	// XXX option to prevent starting if wcfs was already started ?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2163 2164
	flag.Parse()
	if len(flag.Args()) != 2 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2165
		log.Fatalf("Usage: %s [OPTIONS] zurl mntpt", os.Args[0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2166 2167 2168 2169
	}
	zurl := flag.Args()[0]
	mntpt := flag.Args()[1]

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2170 2171 2172 2173 2174
	// debug -> precise t, no dates	(XXX -> always precise t?)
	if *debug {
		stdlog.SetFlags(stdlog.Lmicroseconds)
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2175
	// open zodb storage/db/connection
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2176
	ctx := context.Background()	// XXX + timeout?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2177
	zstor, err := zodb.Open(ctx, zurl, &zodb.OpenOptions{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2178 2179
		ReadOnly: true,
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2180 2181 2182 2183 2184
	if err != nil {
		log.Fatal(err)
	}
	defer zstor.Close()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2185
	zdb := zodb.NewDB(zstor)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2186
	defer zdb.Close()	// XXX err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2187 2188 2189 2190 2191
	zhead, err := zopen(ctx, zdb, &zodb.ConnOptions{
		// we need zhead.cache to be maintained across several transactions.
		// see "3) for head/bigfile/* the following invariant is maintained ..."
		NoPool: true,
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2192 2193 2194
	if err != nil {
		log.Fatal(err)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2195 2196 2197
	zhead.Cache().Lock()
	zhead.Cache().SetControl(&zodbCacheControl{})
	zhead.Cache().Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2198

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2199
	// mount root + head/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2200
	// XXX -> newHead()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2201
	head := &Head{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2202 2203 2204
		fsNode:   newFSNode(fSticky),
		rev:      0,
		zconn:    zhead,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2205
		wlinkTab: make(map[*WatchLink]struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2206
		hwait:    make(map[hwaiter]struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2207
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2208

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2209
	wnode := &WatchNode{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2210 2211
		fsNode: newFSNode(fSticky),
		head:   head,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2212 2213
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2214
	bfdir := &BigFileDir{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2215 2216 2217
		fsNode:   newFSNode(fSticky),
		head:     head,
		fileTab:  make(map[zodb.Oid]*BigFile),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2218
		δFtail:   NewΔFtail(zhead.At()),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2219 2220
	}
	head.bfdir = bfdir
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2221

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2222
	root := &Root{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2223
		fsNode: newFSNode(fSticky),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2224 2225 2226 2227
		zstor:  zstor,
		zdb:    zdb,
		head:   head,
		revTab: make(map[zodb.Tid]*Head),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2228 2229
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2230
	opts := &fuse.MountOptions{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2231 2232
		FsName: zurl,
		Name:   "wcfs",
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2233

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2234 2235 2236 2237 2238
		// We retrieve kernel cache in ZBlk.blksize chunks, which are 2MB in size.
		// XXX currently go-fuse caps MaxWrite to 128KB.
		// TODO -> teach go-fuse to handle Init.MaxPages (Linux 4.20+).
		MaxWrite:      2*1024*1024,

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2239 2240
		// XXX tune MaxReadAhead? MaxBackground?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2241
		// OS cache that we populate with bigfile data is precious;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2242 2243
		// we explicitly propagate ZODB invalidations into file invalidations.
		ExplicitDataCacheControl: true,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2244

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2245
		DisableXAttrs: true,        // we don't use
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2246
		Debug:         *debug,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2247
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2248

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2249
	fssrv, fsconn, err := mount(mntpt, root, opts)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2250
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2251
		log.Fatal(err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2252
	}
2253
	groot   = root		// FIXME temp workaround (see ^^^)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2254
	gfsconn = fsconn	// FIXME ----//----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2255
	gmntpt  = mntpt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2256

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2257
	// we require proper pagecache control (added to Linux 2.6.36 in 2010)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2258 2259 2260
	kinit := fssrv.KernelSettings()
	kfuse := fmt.Sprintf("kernel FUSE (API %d.%d)", kinit.Major, kinit.Minor)
	supports := kinit.SupportsNotify
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2261
	if !(supports(fuse.NOTIFY_STORE_CACHE) && supports(fuse.NOTIFY_RETRIEVE_CACHE)) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2262 2263
		log.Fatalf("%s does not support pagecache control", kfuse)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2264
	// make a bold warning if kernel does not support explicit cache invalidation
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2265
	// (patch sent upstream; see notes.txt -> "Notes on OS pagecache control")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2266 2267
	if kinit.Flags & fuse.CAP_EXPLICIT_INVAL_DATA == 0 {
		w1 := fmt.Sprintf("%s does not support explicit data cache invalidation", kfuse)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2268
		w2 := "-> performance will be AWFUL."
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2269 2270 2271
		w3 := "-> you need kernel which includes git.kernel.org/linus/ad2ba64dd489 ."
		log.Error(w1); log.Error(w2); log.Error(w3)
		fmt.Fprintf(os.Stderr, "W: wcfs: %s\nW: wcfs: %s\nW: wcfs: %s\n", w1, w2, w3)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2272 2273
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2274
	// add entries to /
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2275
	mkdir(root, "head", head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2276
	mkdir(head, "bigfile", bfdir)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2277
	mkfile(head, "at", NewSmallFile(head.readAt))   // TODO mtime(at) = tidtime(at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2278
	mkfile(head, "watch", wnode)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2279 2280

	// for debugging/testing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2281
	_wcfs := newFSNode(fSticky)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2282 2283
	mkdir(root, ".wcfs", &_wcfs)
	mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl)))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2284

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2285 2286 2287 2288 2289
	// .wcfs/zhead - special file channel that sends zhead.at.
	//
	// If a user opens it, it will start to get tids of through which
	// zhead.at was, starting from the time when .wcfs/zhead was opened.
	// There can be multiple openers. Once opened, the file must be read,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2290 2291
	// as wcfs blocks waiting for data to be read when processing
	// invalidations.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2292
	mkfile(&_wcfs, "zhead", &_wcfs_Zhead{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2293
		fsNode: newFSNode(fSticky),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2294
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2295

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2296 2297 2298 2299 2300
	// XXX place = ok?
	// XXX ctx = ok?
	// XXX wait for zwatcher shutdown.
	go root.zwatcher(ctx)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2301
	// TODO handle autoexit
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2302 2303
	// (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl
	//  opened, so when all inodes has been forgotten - we know all wcfs.py clients exited)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2304 2305
	_ = autoexit

2306 2307 2308 2309
	// serve client requests.
	//
	// use `go serve` + `waitMount` not just `serve` - because waitMount
	// cares to disable OS calling poll on us.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2310
	// ( if we don't disable polling - fs serving can get stuck - see
2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321
	//   https://github.com/hanwen/go-fuse/commit/4f10e248eb for details )
	done := make(chan struct{})
	go func () {
		fssrv.Serve()
		close(done)
	}()
	err = fssrv.WaitMount()
	if err != nil {
		log.Fatal(err) // XXX err ctx?
	}
	<-done
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2322
}