wcfs.go 59.9 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1 2
// Copyright (C) 2018-2019  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
Kirill Smelkov committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
20
// Program wcfs provides filesystem server with file data backed by wendelin.core arrays.
Kirill Smelkov's avatar
Kirill Smelkov committed
21 22 23 24 25
//
// Intro
//
// Each wendelin.core array (ZBigArray) is actually a linear file (ZBigFile)
// and array metadata like dtype, shape and strides associated with it. This
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
// program exposes as files only ZBigFile data and leaves rest of
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
27
// array-specific handling to clients. Every ZBigFile is exposed as one separate
Kirill Smelkov's avatar
Kirill Smelkov committed
28 29 30
// file that represents whole ZBigFile's data.
//
// For a client, the primary way to access a bigfile should be to mmap
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
31
// head/bigfile/<bigfileX> which represents always latest bigfile data.
Kirill Smelkov's avatar
Kirill Smelkov committed
32 33 34 35 36 37 38 39 40 41 42 43 44
// Clients that want to get isolation guarantee should subscribe for
// invalidations and re-mmap invalidated regions to file with pinned bigfile revision for
// the duration of their transaction. See "Invalidation protocol" for details.
//
// In the usual situation when bigfiles are big, and there are O(1)/δt updates,
// there should be no need for any cache besides shared kernel cache of latest
// bigfile data.
//
//
// Filesystem organization
//
// Top-level structure of provided filesystem is as follows:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
45
//	head/			; latest database view
Kirill Smelkov's avatar
Kirill Smelkov committed
46
//		...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
47 48 49
//	@<rev1>/		; database view as of revision <revX>
//		...
//	@<rev2>/
Kirill Smelkov's avatar
Kirill Smelkov committed
50
//		...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51
//	...
Kirill Smelkov's avatar
Kirill Smelkov committed
52
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53
// where head/ represents latest data as stored in upstream ZODB, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
54
// @<revX>/ represents data as of database revision <revX>.
Kirill Smelkov's avatar
Kirill Smelkov committed
55 56 57
//
// head/ has the following structure:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
58 59 60 61
//	head/
//		at			; data inside head/ is as of this ZODB transaction
//		watch			; channel for bigfile invalidations
//		bigfile/		; bigfiles' data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62 63
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
64
//			...
Kirill Smelkov's avatar
Kirill Smelkov committed
65
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66 67 68 69 70 71
// where /bigfile/<bigfileX> represents latest bigfile data as stored in
// upstream ZODB. As there can be some lag receiving updates from the database,
// /at describes precisely ZODB state for which bigfile data is currently
// exposed. Whenever bigfile data is changed in upstream ZODB, information
// about the changes is first propagated to /watch, and only after that
// /bigfile/<bigfileX> is updated. See "Invalidation protocol" for details.
Kirill Smelkov's avatar
Kirill Smelkov committed
72
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
73
// @<revX>/ has the following structure:
Kirill Smelkov's avatar
Kirill Smelkov committed
74
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
75 76 77
//	@<revX>/
//		at
//		bigfile/		; bigfiles' data as of revision <revX>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
78 79
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80
//			...
Kirill Smelkov's avatar
Kirill Smelkov committed
81
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
82
// where /bigfile/<bigfileX> represent bigfile data as of revision <revX>.
Kirill Smelkov's avatar
Kirill Smelkov committed
83
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
84
// Unless accessed {head,@<revX>}/bigfile/<bigfileX> are not automatically visible in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
85
// wcfs filesystem. Similarly @<revX>/ become visible only after access.
Kirill Smelkov's avatar
Kirill Smelkov committed
86 87 88 89
//
//
// Invalidation protocol
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
90
// In order to support isolation, wcfs implements invalidation protocol that
Kirill Smelkov's avatar
Kirill Smelkov committed
91 92
// must be cooperatively followed by both wcfs and client.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
93
// First, client mmaps latest bigfile, but does not access it
Kirill Smelkov's avatar
Kirill Smelkov committed
94
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
95
//	mmap(head/bigfile/<bigfileX>)
Kirill Smelkov's avatar
Kirill Smelkov committed
96
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97 98
// Then client opens head/watch and tells wcfs through it for which ZODB state
// it wants to get bigfile's view.
Kirill Smelkov's avatar
Kirill Smelkov committed
99
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
100
//	C: 1 watch <bigfileX> @<at>
Kirill Smelkov's avatar
Kirill Smelkov committed
101
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
102 103
// The server then, after potentially sending initial pin messages (see below),
// reports either success or failure:
Kirill Smelkov's avatar
Kirill Smelkov committed
104
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
105 106
//	S: 1 ok
//	S: 1 error ...		; if <at> is too far away back from head/at
Kirill Smelkov's avatar
Kirill Smelkov committed
107
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
108 109 110 111 112
// The server sends "ok" reply only after head/at is ≥ requested <at>, and
// only after all initial pin messages are fully acknowledged by the client.
// The client can start to use mmapped data after it gets "ok".
// The server sends "error" reply if requested <at> is too far away back from
// head/at.
Kirill Smelkov's avatar
Kirill Smelkov committed
113
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
114 115 116
// Upon watch request, either initially, or after sending "ok", the server will be notifying the
// client about file blocks that client needs to pin in order to observe file's
// data as of <at> revision:
Kirill Smelkov's avatar
Kirill Smelkov committed
117
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
118 119 120 121 122 123
// The filesystem server itself receives information about changed data from
// ZODB server through regular ZODB invalidation channel (as it is ZODB client
// itself). Then, separately for each changed file block, before actually
// updating head/bigfile/<bigfileX> content, it notifies through head/watch to
// clients, that had requested it (separately to each client), about the
// changes:
Kirill Smelkov's avatar
Kirill Smelkov committed
124
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
125
//	S: 2 pin <bigfileX> #<blk> @<rev_max>	XXX 2-> 2*k (multiple pins in parallel)
Kirill Smelkov's avatar
Kirill Smelkov committed
126
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
127 128
// and waits until all clients confirm that changed file block can be updated
// in global OS cache.
Kirill Smelkov's avatar
Kirill Smelkov committed
129
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
130
// The client in turn should now re-mmap requested to be pinned block to bigfile@<rev_max>
Kirill Smelkov's avatar
Kirill Smelkov committed
131
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
132 133
//	# mmapped at address corresponding to #blk
//	mmap(@<rev_max>/bigfile/<bigfileX>, #blk, MAP_FIXED)
Kirill Smelkov's avatar
Kirill Smelkov committed
134 135 136
//
// and must send ack back to the server when it is done:
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137 138 139 140
//	C: 2 ack
//
// The server sends pin notifications only for file blocks, that are known to
// be potentially changed after client's <at>, and <rev_max> describes the
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
141
// upper bound for the block revision as of <at> database view:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
142
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
143
//	<rev_max> ≤ <at>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
144 145 146 147 148 149 150 151 152
//
// The server maintains short history tail of file changes to be able to
// support openings with <at> being slightly in the past compared to current
// head/at. The server might reject a watch request if <at> is too far away in
// the past from head/at. The client is advised to restart its transaction with
// more uptodate database view if it gets watch setup error.
//
// A later request from the client for the same <bigfileX> but with different
// <at>, overrides previous watch request for that file. A client can use "-"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
153
// instead of "@<at>" to stop watching a file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
154 155 156 157 158
//
// A single client can send several watch requests through single head/watch
// open, as well as it can use several head/watch opens simultaneously.
// The server sends pin notifications for all files requested to be watched via
// every head/watch open.
Kirill Smelkov's avatar
Kirill Smelkov committed
159
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
160 161 162 163 164 165
// Note: a client could use a single watch to manage its several views for the same
// file but with different <at>. This could be achieved via watching with
// @<at_min>, and then deciding internally which views needs to be adjusted and
// which views need not. Wcfs does not oblige clients to do so though, and a
// client is free to use as many head/watch openenings as it needs to.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
166
// When clients are done with @<revX>/bigfile/<bigfileX> (i.e. client's
Kirill Smelkov's avatar
Kirill Smelkov committed
167
// transaction ends and array is unmapped), the server sees number of opened
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
168 169
// files to @<revX>/bigfile/<bigfileX> drops to zero, and automatically
// destroys @<revX>/bigfile/<bigfileX> after reasonable timeout.
Kirill Smelkov's avatar
Kirill Smelkov committed
170 171 172 173
//
//
// Protection against slow or faulty clients
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
174 175 176 177
// If a client, on purpose or due to a bug or being stopped, is slow to respond
// with ack to file invalidation notification, it creates a problem because the
// server will become blocked waiting for pin acknowledgments, and thus all
// other clients, that try to work with the same file, will get stuck.
Kirill Smelkov's avatar
Kirill Smelkov committed
178
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179 180
// The problem could be avoided, if wcfs would reside inside OS kernel and this
// way could be able to manipulate clients address space directly (then
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
181 182 183 184
// invalidation protocol won't be needed). It is also possible to imagine
// mechanism, where wcfs would synchronously change clients' address space via
// injecting trusted code and running it on client side via ptrace to adjust
// file mappings.
Kirill Smelkov's avatar
Kirill Smelkov committed
185
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
186 187 188 189 190
// However ptrace does not work when client thread is blocked under pagefault,
// and that is exactly what wcfs would need to do to process invalidations
// lazily, because eager invalidation processing results in prohibitively slow
// file opens. See internal wcfs overview for details about why ptrace
// cannot be used and why lazy invalidation processing is required.
Kirill Smelkov's avatar
Kirill Smelkov committed
191
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
192 193 194
// Lacking OS primitives to change address space of another process and not
// being able to work it around with ptrace in userspace, wcfs takes approach
// to kill a slow client on 30 seconds timeout by default.
Kirill Smelkov's avatar
Kirill Smelkov committed
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
//
//
// Writes
//
// As each bigfile is represented by 1 synthetic file, there can be several
// write schemes:
//
// 1. mmap(MAP_PRIVATE) + writeout by client
//
// In this scheme bigfile data is mmapped in MAP_PRIVATE mode, so that local
// user changes are not automatically propagated back to the file. When there
// is a need to commit, client investigates via some OS mechanism, e.g.
// /proc/self/pagemap or something similar, which pages of this mapping it
// modified. Knowing this it knows which data it dirtied and so can write this
// data back to ZODB itself, without filesystem server providing write support.
//
// 2. mmap(MAP_SHARED, PROT_READ) + write-tracking & writeout by client
//
// In this scheme bigfile data is mmaped in MAP_SHARED mode with read-only pages
// protection. Then whenever write fault occurs, client allocates RAM from
// shmfs, copies faulted page to it, and then mmaps RAM page with RW protection
// in place of original bigfile page. Writeout implementation should be similar
// to "1", only here client already knows the pages it dirtied, and this way
// there is no need to consult /proc/self/pagemap.
//
// The advantage of this scheme over mmap(MAP_PRIVATE) is that in case
// there are several in-process mappings of the same bigfile with overlapping
// in-file ranges, changes in one mapping will be visible in another mapping.
// Contrary: whenever a MAP_PRIVATE mapping is modified, the kernel COWs
// faulted page into a page completely private to this mapping, so that other
// MAP_PRIVATE mappings of this file, including ones created from the same
// process, do not see changes made to the first mapping.
//
// Since wendelin.core needs to provide coherency in between different slices
// of the same array, this is the mode wendelin.core actually uses.
//
// 3. write to wcfs
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
233
// TODO we later could implement "write-directly" mode where clients would write
Kirill Smelkov's avatar
Kirill Smelkov committed
234
// data directly into the file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
235
package main
Kirill Smelkov's avatar
Kirill Smelkov committed
236

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
237
// Wcfs organization
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
238
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
239
// Wcfs is a ZODB client that translates ZODB objects into OS files as would
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
240
// non-wcfs wendelin.core do for a ZBigFile. Contrary to non-wcfs wendelin.core,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
241
// it keeps bigfile data in shared OS cache efficiently. It is organized as follows:
242
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
243
// 1) 1 ZODB connection for "latest data" for whole filesystem (zhead).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
244 245
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 3) for head/bigfile/* the following invariant is maintained:
246
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
247
//	#blk ∈ OS file cache    =>    ZBlk(#blk) + all BTree/Bucket that lead to it  ∈ zhead cache(%)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
248
//	                              (ZBlk* in ghost state)
249
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
250 251
//	                        =>    all BTree/Bucket that lead to blk are tracked (XXX)
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
252
//    The invariant helps on invalidation: if we see a changed oid, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
253
//    zhead.cache.lookup(oid) = ø -> we know we don't have to invalidate OS
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
254 255
//    cache for any part of any file (even if oid relates to a file block - that
//    block is not cached and will trigger ZODB load on file read).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
256
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
257 258
//    XXX explain why tracked
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
259 260 261
//    Currently we maintain this invariant by simply never evicting ZBlk/LOBTree/LOBucket
//    objects from ZODB Connection cache. In the future we may want to try to
//    synchronize to kernel freeing its pagecache pages.
262
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
263
// 4) when we receive an invalidation message from ZODB - we process it and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
264
//    propagate invalidations to OS file cache of head/bigfile/*:
265
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
266
//	invalidation message: (tid↑, []oid)
267
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
268
//    4.1) zhead.cache.lookup(oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
269 270
//    4.2) ø: nothing to do - see invariant ^^^.
//    4.3) obj found:
271
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
272
//	- ZBlk*		-> [] of file/[]#blk
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
273
//	- BTree/Bucket	-> δ(BTree)  -> file/[]#blk
274
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
275
//	in the end after processing all []oid from invalidation message we have
276 277 278
//
//	  [] of file/[]#blk
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
279
//	that describes which file(s) parts needs to be invalidated.
280
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
281 282 283
//	FIXME no - we can build it but not in full - since we consider only zobj in live cache.
//	FIXME and even if we consider all δ'ed zobj, building complete set of
//	      file.δtail requires to first do complete scan of file.blktab
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
284
//	      which is prohibitively expensive.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
285
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
286
//    4.4) for all file/blk to invalidate we do:
287
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
288
//	- try to retrieve head/bigfile/file[blk] from OS file cache(*);
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
289
//	- if retrieved successfully -> store retrieved data back into OS file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
290
//	  cache for @<rev>/bigfile/file[blk], where
Kirill Smelkov's avatar
Kirill Smelkov committed
291
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
292
//	    # see below about file.δtail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
293
//	    # XXX -> file.δtail.LastBlkRev(#blk, zhead.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
294
//	    rev = max(file.δtail.by(#blk)) || min(rev ∈ file.δtail) || zhead.at
Kirill Smelkov's avatar
Kirill Smelkov committed
295
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
296
//	- invalidate head/bigfile/file[blk] in OS file cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
297 298
//
//	This preserves previous data in OS file cache in case it will be needed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
299
//	by not-yet-uptodate clients, and makes sure file read of head/bigfile/file[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
300 301 302
//	won't be served from OS file cache and instead will trigger a FUSE read
//	request to wcfs.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
303
//    4.5) no invalidation messages are sent to wcfs clients at this point(+).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
304
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
305 306
//    4.6) processing ZODB invalidations and serving file reads (see 7) are
//      organized to be mutually exclusive.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
307
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
308 309
//	(TODO head.zconnMu -> special mutex with Lock(ctx) so that Lock could be canceled)
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
310 311 312
// 5) after OS file cache was invalidated, we resync zhead to new database
//    view corresponding to tid.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
313
// 6) for every file δtail invalidation info about head/data is maintained:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
314
//
Kirill Smelkov's avatar
Kirill Smelkov committed
315 316
//	- tailv: [](rev↑, []#blk)
//	- by:    {} #blk -> []rev↑ in tail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
317
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
318 319
//    δtail.tail describes invalidations to file we learned from ZODB invalidation.
//    δtail.by   allows to quickly lookup information by #blk.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
320
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
321
//    min(rev) in δtail is min(@at) at which head/bigfile/file is currently watched (see below).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
322 323
//
//    XXX δtail can miss ...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
324
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
325 326 327
//    to support initial openings with @at being slightly in the past, we also
//    make sure that min(rev) is enough to cover last 10 minutes of history
//    from head/at.
328
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
329
// 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
330 331
//
//   7.1) load blkdata for head/bigfile/file[blk] @zhead.at .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
332 333
//
//	while loading this also gives upper bound estimate of when the block
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
334
//	was last changed:	XXX kill upper bound -> populate and use δFtail
335
//
Kirill Smelkov's avatar
Kirill Smelkov committed
336 337 338
//	  rev(blk) ≤ max(_.serial for _ in (ZBlk(#blk), all BTree/Bucket that lead to ZBlk))
//
//	it is not exact because BTree/Bucket can change (e.g. rebalance)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
339
//	but still point to the same k->ZBlk.
Kirill Smelkov's avatar
Kirill Smelkov committed
340
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
341
//	we also use file.δtail to find either exact blk revision:	XXX just use δFtail
Kirill Smelkov's avatar
Kirill Smelkov committed
342
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
343
//	  rev(blk) = max(file.δtail.by(#blk) -> []rev↑)
Kirill Smelkov's avatar
Kirill Smelkov committed
344
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
345
//	or another upper bound if #blk ∉ δtail:
Kirill Smelkov's avatar
Kirill Smelkov committed
346
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
347
//	  rev(blk) ≤ min(rev ∈ δtail)		; #blk ∉ δtail
Kirill Smelkov's avatar
Kirill Smelkov committed
348 349
//
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
350
//	below rev'(blk) is min(of the estimates found):
Kirill Smelkov's avatar
Kirill Smelkov committed
351 352 353
//
//	  rev(blk) ≤ rev'(blk)		rev'(blk) = min(^^^)
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
354
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
355
//   7.2) for all registered client@at watchers of head/bigfile/file:
356
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
357
//	- rev'(blk) ≤ at: -> do nothing
Kirill Smelkov's avatar
Kirill Smelkov committed
358
//	- rev'(blk) > at:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
359
//	  - if blk ∈ watcher.pinned -> do nothing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
360
//	  - rev = max(δtail.by(#blk) : _ ≤ at)	|| min(rev ∈ δtail : rev ≤ at)	|| at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
361 362 363 364 365 366 367 368 369 370 371 372 373
//	  - watcher.pin(file, #blk, @rev)
//	  - watcher.pinned += blk
//
//	where
//
//	  watcher.pin(file, #blk, @rev)
//
//	sends pin message according to "Invalidation protocol", and is assumed
//	to cause
//
//	  remmap(file, #blk, @rev/bigfile/file)
//
//	on client.
374
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
375 376
//	( one could imagine adjusting mappings synchronously via running
//	  wcfs-trusted code via ptrace that wcfs injects into clients, but ptrace
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
377
//	  won't work when client thread is blocked under pagefault or syscall(^) )
378
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
379
//	in order to support watching for each head/bigfile/file
380
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
381
//	  [] of watch{client@at↑, pinned}
382
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
383 384
//	is maintained.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
385
//   7.3) blkdata is returned to kernel.
386 387 388
//
//   Thus a client that wants latest data on pagefault will get latest data,
//   and a client that wants @rev data will get @rev data, even if it was this
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
389
//   "old" client that triggered the pagefault(~).
390
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
391
// XXX 8) serving read from @<rev>/data + zconn(s) for historical state
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
392
// XXX 9) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
393 394
//
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
395 396 397 398
// (*) see notes.txt -> "Notes on OS pagecache control"
// (+) see notes.txt -> "Invalidations to wcfs clients are delayed until block access"
// (~) see notes.txt -> "Changing mmapping while under pagefault is possible"
// (^) see notes.txt -> "Client cannot be ptraced while under pagefault"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
399
// (%) no need to keep track of ZData - ZBlk1 is always marked as changed on blk data change.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
400
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
401
// XXX For every ZODB connection a dedicated read-only transaction is maintained.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
402

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
403 404
// XXX notation
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
405 406 407 408
// δZ    - change in ZODB space
// δB    - change in BTree*s* space
// δF    - change in File*s* space
// δfile - change in File(1) space
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
409

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
410
import (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
411
	"bufio"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
412
	"context"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
413
	"flag"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
414
	"fmt"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
415
	stdlog "log"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
416
	"os"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
417
	"runtime"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
418
	"strings"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
419
	"sync"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
420
	"sync/atomic"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
421
	"syscall"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
422

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
423
	log "github.com/golang/glog"
Kirill Smelkov's avatar
Kirill Smelkov committed
424 425
	"golang.org/x/sync/errgroup"

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
426
	"lab.nexedi.com/kirr/go123/xcontext"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
427
	"lab.nexedi.com/kirr/go123/xerr"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
428

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
429
	"lab.nexedi.com/kirr/neo/go/transaction"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
430
	"lab.nexedi.com/kirr/neo/go/zodb"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
431
	"lab.nexedi.com/kirr/neo/go/zodb/btree"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
432
	_ "lab.nexedi.com/kirr/neo/go/zodb/wks"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
433 434 435

	"github.com/hanwen/go-fuse/fuse"
	"github.com/hanwen/go-fuse/fuse/nodefs"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
436
	"github.com/pkg/errors"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
437 438
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
439 440
// Root represents root of wcfs filesystem.
type Root struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
441
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
442 443 444 445 446

	// ZODB storage we work with
	zstor zodb.IStorage

	// ZODB DB handle for zstor.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
447
	// keeps cache of connections for @<rev>/ accesses.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
448
	// only one connection is used for each @<rev>.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
449 450
	zdb *zodb.DB

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
451
	// directory + ZODB connection for head/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
452
	// (zhead is Resync'ed and is kept outside zdb pool)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
453
	head *Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
454

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
455 456 457
	// directories + ZODB connections for @<rev>/
	revMu  sync.Mutex
	revTab map[zodb.Tid]*Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
458 459
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
460
// /(head|<rev>)/			- served by Head.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
461
type Head struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
462
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
463

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
464
	rev   zodb.Tid    // 0 for head/, !0 for @<rev>/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
465 466 467
	bfdir *BigFileDir // bigfile/
	// at    - served by .readAt
	// watch - implicitly linked to by fs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
468 469

	// ZODB connection for everything under this head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
470 471 472 473 474 475 476 477 478 479 480 481

	// protects access to zconn & live _objects_ associated with it.
	// while it is rlocked zconn is guaranteed to stay viewing database at
	// particular view.
	//
	// zwatcher write-locks this and knows noone is using ZODB objects and
	// noone mutates OS file cache while zwatcher is running.
	//
	// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
	// with additional locking protocol to avoid deadlocks (see below for
	// pauseOSCacheUpload + ...).
	zconnMu sync.RWMutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
482
	zconn   *ZConn       // for head/ zwatcher resyncs head.zconn; others only read zconn objects.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
483

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
484 485 486 487 488 489 490
	// zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
	pauseOSCacheUpload    bool
	continueOSCacheUpload chan struct{}
	// uploadBlk signals to zwatcher that there are so many inflight OS cache uploads currently.
	inflightOSCacheUploads int32

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
491
	// XXX move zconn's current transaction to Head here?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
492

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
493 494
	// head/watch opens
	// XXX protected by ... head.zconnMu ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
495 496
	// XXX -> watchTab?
	wlinkTab map[*WatchLink]struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
497 498
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
499 500
// /(head|<rev>)/bigfile/		- served by BigFileDir.
type BigFileDir struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
501
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
502
	head *Head // parent head/ or @<rev>/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
503 504

	// {} oid -> <bigfileX>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
505
	fileMu  sync.Mutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
506
	fileTab map[zodb.Oid]*BigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
507

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
508
	// δ of tracked BTree nodes of all BigFiles + -> which file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
509
	// (used only for head/, not revX/)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
510
	δFmu   sync.RWMutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
511
	δFtail *ΔFTail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
512 513
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
514
// /(head|<rev>)/bigfile/<bigfileX>	- served by BigFile.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
515
type BigFile struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
516
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
517

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
518
	// this BigFile is under .head/bigfile/; it views ZODB via .head.zconn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
519 520
	// parent's BigFileDir.head is the same.
	head	*Head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
521

522
	// ZBigFile top-level object
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
523
	zfile	*ZBigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
524

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
525 526 527 528
	// things read/computed from .zfile; constant during lifetime of current transaction.
	blksize int64    // zfile.blksize
	size    int64    // zfile.Size()
	rev     zodb.Tid // last revision that modified zfile data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
529

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
530
//	// tail change history of this file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
531
//	δtail *ΔTailI64 // [](rev↑, []#blk)	XXX kill
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
532

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
533
	// inflight loadings of ZBigFile from ZODB.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
534 535 536 537
	// successful load results are kept here until blkdata is put into OS pagecache.
	//
	// Being a staging area for data to enter OS cache, loading has to be
	// consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
538
	loadMu  sync.Mutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
539
	loading map[int64]*blkLoadState // #blk -> {... blkdata}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
540

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
541
	// watches attached to this file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
542 543
	// XXX already in "established" state (i.e. initial watch request was answered with "ok")
	// XXX locking -> watchMu?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
544
	watches map[*Watch]struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
545 546 547 548 549 550 551 552 553
}

// blkLoadState represents a ZBlk load state/result.
//
// when !ready the loading is in progress.
// when ready the loading has been completed.
type blkLoadState struct {
	ready chan struct{}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
554 555
	blkdata  []byte
	err      error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
556 557
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
558
// /head/watch				- served by WatchNode.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
559 560 561 562 563 564 565
type WatchNode struct {
	fsNode

	head   *Head // parent head/
	idNext int32 // ID for next opened WatchLink
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
566
// /head/watch handle			- served by WatchLink.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
567 568 569 570 571 572 573 574
type WatchLink struct {
	sk   *FileSock // IO channel to client
	id   int32     // ID of this /head/watch handle (for debug log)
	head *Head

	// established watchs.
	// XXX in-progress - where?	-> nowhere; here only established watches are added
	// XXX locking?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
575
	fileTab map[zodb.Oid]*Watch // {} foid -> Watch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
576 577

	// IO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
578 579 580
	txMu  sync.Mutex
	rxMu  sync.Mutex
	rxTab map[uint64]chan string // client replies go via here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
581 582
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
583
// Watch represents watching for changes to 1 BigFile over particular watch link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
584
type Watch struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
585 586
	link *WatchLink // link to client
	file *BigFile	// XXX needed?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
587 588 589 590 591 592 593

	// XXX locking

	at     zodb.Tid	// requested to be watched @at
	pinned SetI64   // blocks that are already pinned to be ≤ at
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
594
// -------- 3) Cache invariant --------
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
595

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
596 597
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
598
// because it is essentially the index where to find ZBigFile data.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
599 600 601 602 603 604 605
//
// For the data itself - we put it to kernel pagecache and always deactivate
// from ZODB right after that.
//
// See "3) for */head/data the following invariant is maintained..."
type zodbCacheControl struct {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
606
func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolicy {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
607
	switch obj.(type) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
608
	// ZBlk* should be in cache but without data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
609
	case *ZBlk0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
610
		return zodb.PCachePinObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
611
	case *ZBlk1:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
612
		return zodb.PCachePinObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
613 614

	// ZBigFile btree index should be in cache with data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
615
	case *btree.LOBTree:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
616
		return zodb.PCachePinObject | zodb.PCacheKeepState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
617
	case *btree.LOBucket:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
618 619 620 621
		return zodb.PCachePinObject | zodb.PCacheKeepState

	// don't let ZData to pollute the cache
	case *ZData:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
622
		return zodb.PCacheDropObject | zodb.PCacheDropState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
623

624 625 626
	// for performance reason we also keep ZBigFile in cache.
	//
	// ZBigFile is top-level object that is used on every block load, and
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
627
	// it would be a waste to evict ZBigFile from cache.
628
	case *ZBigFile:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
629
		return zodb.PCachePinObject | zodb.PCacheKeepState
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
630 631
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
632
	return 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
633 634
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
635 636
// -------- 4) ZODB invalidation -> OS cache --------

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
637
func traceZWatch(format string, argv ...interface{}) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
638 639 640 641
	if !log.V(1) {	// XXX -> 2?
		return
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
642
	log.Infof("zwatcher: " + format, argv...)	// XXX InfoDepthf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
643 644
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
645
// zwatcher watches for ZODB changes.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
646
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
647
// see "4) when we receive an invalidation message from ZODB ..."
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
648
func (root *Root) zwatcher(ctx context.Context) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
649
	defer xerr.Contextf(&err, "zwatch")	// XXX more in context?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
650 651
	// XXX unmount on error? -> always EIO?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
652
	traceZWatch(">>>")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
653

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
654
	zwatchq := make(chan zodb.Event)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
655
	at0 := root.zstor.AddWatch(zwatchq)	// XXX -> to main thread to avoid race
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
656
	defer root.zstor.DelWatch(zwatchq)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
657
	_ = at0 // XXX XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
658

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
659
	var zevent zodb.Event
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
660
	var ok bool
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
661 662

	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
663
		traceZWatch("select ...")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
664 665
		select {
		case <-ctx.Done():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
666
			traceZWatch("cancel")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
667 668 669 670
			return ctx.Err()

		case zevent, ok = <-zwatchq:
			if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
671
				traceZWatch("zwatchq closed")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
672 673
				return nil // closed	XXX ok?
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
674

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
675 676
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
677
		traceZWatch("zevent: %s", zevent)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
678 679 680 681 682 683 684 685 686

		switch zevent := zevent.(type) {
		default:
			return fmt.Errorf("unexpected event: %T", zevent)

		case *zodb.EventError:
			return zevent.Err

		case *zodb.EventCommit:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
687
			root.handleδZ(zevent)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
688
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
689 690 691
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
692 693
// handleδZ handles 1 change event from ZODB notification.
func (root *Root) handleδZ(δZ *zodb.EventCommit) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
694 695
	head := root.head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
696 697
	// while we are invalidating OS cache, make sure that nothing, that
	// even reads /head/bigfile/*, is running (see 4.6).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723
	//
	// also make sure that cache uploaders we spawned (uploadBlk) are all
	// paused, or else they could overwrite OS cache with stale data.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for
	// details on how to do this without deadlocks.
	continueOSCacheUpload := make(chan struct{})
retry:
	for {
		head.zconnMu.Lock()
		head.pauseOSCacheUpload = true
		head.continueOSCacheUpload = continueOSCacheUpload

		if head.inflightOSCacheUploads != 0 {
			head.zconnMu.Unlock()
			continue retry
		}

		break
	}

	defer func() {
		head.pauseOSCacheUpload = false
		head.continueOSCacheUpload = nil
		head.zconnMu.Unlock()
		close(continueOSCacheUpload)
	}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
724

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
725 726
	// head.zconnMu locked and not cache uploaders are running

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
727 728
	zhead := head.zconn
	bfdir := head.bfdir
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
729

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
730 731 732
	// fileInvalidate describes invalidations for one file
	type fileInvalidate struct {
		blkmap SetI64 // changed blocks
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
733
		size   bool   // whether to invalidate file size
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
734 735 736
	}
	toinvalidate := map[*BigFile]*fileInvalidate{} // {} file -> set(#blk), sizeChanged
	btreeChangev := []zodb.Oid{}                   // oids changing BTree|Bucket
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
737

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
738
	//fmt.Printf("\n\n\n")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
739

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
740 741
	// δZ = (tid↑, []oid)
	for _, oid := range δZ.Changev {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
742
		// XXX zhead.Cache() lock/unlock
743
		obj := zhead.Cache().Get(oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
744
		if obj == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
745
			//fmt.Printf("%s: not in cache\n", oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
746 747 748
			continue // nothing to do - see invariant
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
749
		//fmt.Printf("%s:     in cache (%s)\n", oid, typeOf(obj))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
750

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
751 752
		switch obj := obj.(type) {
		default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
753
			continue // object not related to any bigfile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
754

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
755
		// XXX kill Tree/Bucket here (-> ΔFtail)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
756
		case *btree.LOBTree:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
757
			btreeChangev = append(btreeChangev, obj.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
758

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
759
		case *btree.LOBucket:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
760
			btreeChangev = append(btreeChangev, obj.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
761

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
762
		case zBlk:	// ZBlk*
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
763 764 765 766
			// blkBoundTo locking: no other bindZFile are running,
			// since we write-locked head.zconnMu and bindZFile is
			// run when loading objects - thus when head.zconnMu is
			// read-locked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
767 768 769
			//
			// bfdir locking: similarly not needed, since we are
			// exclusively holding head lock.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
770
			for zfile, objBlk := range obj.blkBoundTo() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
771
				file, ok := bfdir.fileTab[zfile.POid()]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
772 773 774 775 776 777
				if !ok {
					// even though zfile is in ZODB cache, the
					// filesystem already forgot about this file.
					continue
				}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
778
				finv, ok := toinvalidate[file]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
779
				if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
780 781
					finv = &fileInvalidate{blkmap: SetI64{}}
					toinvalidate[file] = finv
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
782
				}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
783
				finv.blkmap.Update(objBlk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
784
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
785

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
786 787 788 789 790
		case *ZBigFile:
			// XXX check that .blksize and .blktab (it is only
			// persistent reference) do not change.

			// XXX shutdown fs with ^^^ message.
791
			panic("ZBigFile changed")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
792
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
793 794 795

		// make sure obj won't be garbage-collected until we finish handling it.
		runtime.KeepAlive(obj)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
796 797
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
798 799
	// find out which files need to be invalidated due to index change
	// XXX no indexMu lock needed because head is Locked
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
800
	// XXX stub -> TODO full δbtree | update indexLooked itself
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
801
	//fmt.Printf("\nbtreeChangev: %v\n", btreeChangev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
802
	δf := bfdir.δFtail.Update(δZ)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
803
	//fmt.Printf("xfiles: %v\n", xfiles)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
804 805 806
	for file, δfile := range δf.Change {
		// XXX use δfile blocks
		_ = δfile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
807 808 809 810 811 812 813 814
		finv, ok := toinvalidate[file]
		if !ok {
			finv = &fileInvalidate{} // XXX init blkmap?
			toinvalidate[file] = finv
		}
		finv.size = true
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
815 816 817 818
	//fmt.Printf("\n\nzδhandle: toinvalidate (#%d):\n", len(toinvalidate))
	//for file := range toinvalidate {
	//	fmt.Printf("\t- %s\n", file.zfile.POid())
	//}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
819

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
820
	wg, ctx := errgroup.WithContext(context.TODO())	// XXX ctx = ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
821
	for file, finv := range toinvalidate {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
822
		file := file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
823
		for blk := range finv.blkmap {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
824
			blk := blk
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
825 826 827
			wg.Go(func() error {
				return file.invalidateBlk(ctx, blk)
			})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
828
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
829 830 831 832 833
	}
	err := wg.Wait()
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
834

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
835
	// invalidate kernel cache for attributes
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
836
	// we need to do it only if we see topology (i.e. btree) change
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
837
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
838
	// do it after completing data invalidations.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
839
	wg, ctx = errgroup.WithContext(context.TODO())	// XXX ctx = ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
840 841 842
	for file, finv := range toinvalidate {
		if !finv.size {
			continue
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
843
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
844
		wg.Go(func() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
845
			return file.invalidateAttr()	// XXX pass ctx?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
846
		})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
847
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
848
	err = wg.Wait()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
849 850 851
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
852

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
853
	// resync .zhead to δZ.tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
854 855
	// XXX -> Head.Resync() ?

856
	// 1. abort old and resync to new txn/at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
857
	transaction.Current(zhead.txnCtx).Abort()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
858
	_, ctx = transaction.New(context.Background()) // XXX bg ok?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
859
	err = zhead.Resync(ctx, δZ.Tid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
860 861 862
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
863
	zhead.txnCtx = ctx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
864

865
	// 2. restat invalidated ZBigFile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
866
	// XXX -> parallel
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
867
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
868
	for file := range toinvalidate {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
869
		size, treePath, err := file.zfile.Size(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
870 871 872 873
		if err != nil {
			panic(err)	// XXX
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
874
		file.size = size
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
875
		bfdir.δFtail.Track(file, treePath)
876 877

		file.rev = zhead.At()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
878 879
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
880
	// notify .wcfs/zhead
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
881
	for sk := range gdebug.zheadSockTab {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
882
		_, err := fmt.Fprintf(sk, "%s\n", δZ.Tid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
883 884 885 886 887
		if err != nil {
			log.Error(err)	// XXX errctx, -> warning?
			sk.Close()
			delete(gdebug.zheadSockTab, sk)
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
888
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
889 890

	// XXX δFtail.ForgetPast(...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
891 892
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
893
// invalidateBlk invalidates 1 file block in kernel cache.
894
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
895
// see "4.4) for all file/blk to in invalidate we do"
896
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
897
// called with f.head.zconnMu wlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
898 899 900
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
	defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)

901
	fsconn := gfsconn
902
	blksize := f.blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
903 904
	off := blk*blksize

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
905 906 907 908
	var blkdata []byte = nil

	// first try to retrieve f.loading[blk];
	// make sure f.loading[blk] is invalidated.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
909
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
910 911 912 913 914 915 916 917 918 919 920 921 922 923
	// we are running with zconnMu wlocked - no need to lock f.loadMu
	loading, ok := f.loading[blk]
	if ok {
		if loading.err == nil {
			blkdata = loading.blkdata
		}
		delete(f.loading, blk)
	}

	// try to retrieve cache of current head/data[blk], if we got nothing from f.loading
	if blkdata == nil {
		blkdata = make([]byte, blksize)
		n, st := fsconn.FileRetrieveCache(f.Inode(), off, blkdata)
		if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
924
			// XXX warn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
925 926 927 928
		}
		blkdata = blkdata[:n]
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
929 930 931 932
	// if less than blksize was cached - probably the kernel had to evict
	// some data from its cache already. In such case we don't try to
	// preserve the rest and drop what was read, to avoid keeping the
	// system overloaded.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
933 934
	//
	// if we have the data - preserve it under @revX/bigfile/file[blk].
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
935
	if int64(len(blkdata)) == blksize {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
936 937
		func() {
			// store retrieved data back to OS cache for file @<rev>/file[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
938
			blkrev, _ := f.LastBlkRev(blk, f.head.zconn.At())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
939
			frev, frelease, err := groot.mkrevfile(blkrev, f.zfile.POid())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
940 941 942 943 944 945 946
			if err != nil {
				log.Errorf("BUG: %s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
			}
			defer frelease()

			st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
			if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
947
				log.Errorf("BUG: %s: invalidate blk #%d: %s: store cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, frev.path(), st)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
948 949
			}
		}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
950
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
951

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
952
	// invalidate file/head/data[blk] in OS file cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
953 954
	st := fsconn.FileNotify(f.Inode(), off, blksize)
	if st != fuse.OK {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
955
		return syscall.Errno(st)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
956
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
957

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
958
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
959
}
960

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
961
// invalidateAttr invalidates file attributes in kernel cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
962 963
//
// Complements invalidateBlk and is used to invalidate file size.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
964
func (f *BigFile) invalidateAttr() (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
965
	defer xerr.Contextf(&err, "%s: invalidate attr", f.path())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
966 967
	fsconn := gfsconn
	st := fsconn.FileNotify(f.Inode(), -1, -1) // metadata only
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
968 969 970 971
	if st != fuse.OK {
		return syscall.Errno(st)
	}
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
972 973
}

974

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
975
// mkrevfile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel.
976 977
//
// We need node ID to be know to the kernel, when we need to store data into
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
978
// file's kernel cache - if the kernel don't have the node ID for the file in
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
979
// question, FileNotifyStoreCache will just fail.
980
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
981 982 983
// For kernel to know the inode mkrevfile issues regular filesystem lookup
// request which goes to kernel and should go back to wcfs. It is thus not safe
// to use mkrevfile from under FUSE request handler as doing so might deadlock.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
984 985 986
//
// Caller must call release when inode ID is no longer required to be present.
func (root *Root) mkrevfile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, release func(), err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
987 988 989 990 991 992 993 994 995
	fsconn := gfsconn

	frevpath := fmt.Sprintf("@%s/bigfile/%s", rev, fid) // relative to fs root for now
	defer xerr.Contextf(&err, "/: mkrevfile %s", frevpath)

	// first check without going through kernel, whether the inode maybe know already
	xfrev := fsconn.LookupNode(root.Inode(), frevpath)
	if xfrev != nil {
		// FIXME checking for "node{0}" is fragile, but currently no other way
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
996 997
		// XXX the node could be still forgotten since we are not holding open on it
		// XXX -> always os.open unconditionally? or it is ok since it is just a cache?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
998
		if xfrev.String() != "node{0}" {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
999
			return xfrev.Node().(*BigFile), func(){}, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1000 1001 1002 1003 1004 1005 1006
		}
	}

	// we have to ping the kernel
	frevospath := gmntpt + "/" + frevpath // now starting from OS /
	f, err := os.Open(frevospath)
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1007
		return nil, nil, err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1008 1009 1010
	}

	xfrev = fsconn.LookupNode(root.Inode(), frevpath)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1011
	// must be !nil as open succeeded	XXX better recheck
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1012
	return xfrev.Node().(*BigFile), func() { f.Close() }, nil
1013
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1014

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
// -------- 7) FUSE read(#blk) --------

// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
	f.head.zconnMu.RLock()
	defer f.head.zconnMu.RUnlock()

	// cap read request to file size
	end := off + int64(len(dest))		// XXX overflow?
	if end > f.size {
		end = f.size
	}
	if end <= off {
		// XXX off >= size -> EINVAL? (but when size=0 kernel issues e.g. [0 +4K) read)
		return fuse.ReadResultData(nil), fuse.OK
	}

	// widen read request to be aligned with blksize granularity
	// (we can load only whole ZBlk* blocks)
	aoff := off - (off % f.blksize)
	aend := end
	if re := end % f.blksize; re != 0 {
		aend += f.blksize - re
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1039
	// XXX use original dest if it can fit the data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1040 1041 1042
	dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file

	// XXX better ctx = transaction.PutIntoContext(ctx, txn)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1043
	ctx, cancel := xcontext.Merge(fctx, f.head.zconn.txnCtx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059
	defer cancel()

	// read/load all block(s) in parallel
	wg, ctx := errgroup.WithContext(ctx)
	for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
		blkoff := blkoff
		blk := blkoff / f.blksize
		wg.Go(func() error {
			δ := blkoff-aoff // blk position in dest
			//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
			return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
		})
	}

	err := wg.Wait()
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1060
		return nil, err2LogStatus(err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1061 1062 1063 1064 1065 1066 1067 1068 1069 1070
	}

	return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK
}

// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
//
// see "7) when we receive a FUSE read(#blk) request ..." in overview.
//
// len(dest) == blksize.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1071 1072 1073
// called with head.zconnMu rlocked.
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) {
	defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093

	// check if someone else is already loading this block
	f.loadMu.Lock()
	loading, already := f.loading[blk]
	if !already {
		loading = &blkLoadState{
			ready:   make(chan struct{}),
		}
		f.loading[blk] = loading
	}
	f.loadMu.Unlock()

	// if it is already loading - just wait for it
	if already {
		select {
		case <-ctx.Done():
			return ctx.Err()

		case <-loading.ready:
			if loading.err == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1094
				copy(dest, loading.blkdata)	// XXX copy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1095 1096 1097 1098 1099 1100
			}
			return loading.err
		}
	}

	// noone was loading - we became responsible to load this block
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1101
	blkdata, treepath, zblkrev, err := f.zfile.LoadBlk(ctx, blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1102 1103 1104 1105 1106
	loading.blkdata = blkdata
	loading.err = err

	// data loaded with error - cleanup .loading
	if loading.err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1107
		close(loading.ready)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1108 1109 1110 1111 1112 1113
		f.loadMu.Lock()
		delete(f.loading, blk)
		f.loadMu.Unlock()
		return err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1114
	// we have the data - it can be used after watchers are updated
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1115
	// XXX should we use ctx here? (see updateWatcher comments)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1116
	f.updateWatchers(ctx, blk, treepath, zblkrev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1117 1118 1119 1120

	// data can be used now
	close(loading.ready)
	copy(dest, blkdata)	// XXX copy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142

	// store to kernel pagecache whole block that we've just loaded from database.
	// This way, even if the user currently requested to read only small portion from it,
	// it will prevent next e.g. consecutive user read request to again hit
	// the DB, and instead will be served by kernel from its pagecache.
	//
	// We cannot do this directly from reading goroutine - while reading
	// kernel FUSE is holding corresponding page in pagecache locked, and if
	// we would try to update that same page in pagecache it would result
	// in deadlock inside kernel.
	//
	// .loading cleanup is done once we are finished with putting the data into OS pagecache.
	// If we do it earlier - a simultaneous read covered by the same block could result
	// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
	// and thus would trigger DB access again.
	//
	// XXX if direct-io: don't touch pagecache
	go f.uploadBlk(blk, loading)

	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1143
// updateWatchers complements readBlk: it updates watchers of the file after a
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1144 1145
// block was loaded from ZODB and before block data is returned to kernel.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1146 1147 1148
// See "7.2) for all registered client@at watchers ..."
//
// Called with f.head.zconnMu rlocked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1149 1150 1151 1152
//
// XXX do we really need to use/propagate caller contex here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1153 1154
//func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, pathRevMax zodb.Tid) {
func (f *BigFile) updateWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblkrev zodb.Tid) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1155 1156 1157 1158 1159 1160 1161
	// only head/ is being watched for
	if f.head.rev != 0 {
		return
	}

	// update δbtree index
	bfdir := f.head.bfdir
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1162
	bfdir.δFmu.Lock()		// XXX locking correct?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1163
	bfdir.δFtail.Track(f, treepath)	// XXX pass in zblk.oid / zblk.rev here?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1164
	bfdir.δFmu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1165

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1166 1167
	blkrev, _ := f.LastBlkRev(blk, f.head.zconn.At())
	// XXX ^^^ merge in zblkrev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1168

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1169
	wg, ctx := errgroup.WithContext(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1170
	for w := range f.watches {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1171 1172 1173
		w := w
		wg.Go(func() error {
			// XXX close watcher on any error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1174
			return w.pinIfNewer(ctx, blk, blkrev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1175
		})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1176
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1177 1178 1179
	err := wg.Wait()
	if err != nil {
		panic(err)	// XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1180 1181 1182
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1183
// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
	head := f.head

	// rlock zconnMu and make sure zwatcher is not asking us to pause.
	// if it does - wait for a safer time not to deadlock.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry:
	for {
		head.zconnMu.RLock()
		// help zwatcher if it asks us to pause uploadings, so it can
		// take zconnMu wlocked without deadlocks.
		if head.pauseOSCacheUpload {
			ready := head.continueOSCacheUpload
			head.zconnMu.RUnlock()
			<-ready
			continue retry
		}

		break
	}

	// zwatcher is not currently trying to pause OS cache uploads.

	// check if this block was already invalidated by zwatcher.
	// if so don't upload the block into OS cache.
	f.loadMu.Lock()
	loading_ := f.loading[blk]
	f.loadMu.Unlock()
	if loading != loading_ {
		head.zconnMu.RUnlock()
		return
	}

	oid := f.zfile.POid()

	// signal to zwatcher not to run while we are performing the upload.
	// upload with released zconnMu so that zwatcher can lock it even if to
	// check inflightOSCacheUploads status.
	atomic.AddInt32(&head.inflightOSCacheUploads, +1)
	head.zconnMu.RUnlock()

	st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)

	f.loadMu.Lock()
	bug := (loading != f.loading[blk])
	if !bug {
		delete(f.loading, blk)
	}
	f.loadMu.Unlock()

	// signal to zwatcher that we are done and it can continue.
	atomic.AddInt32(&head.inflightOSCacheUploads, -1)

	if bug {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1238
		panicf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
	}

	if st == fuse.OK {
		return
	}

	// pagecache update failed, but it must not (we verified on startup that
	// pagecache control is supported by kernel). We can correctly live on
	// with the error, but data access will be likely very slow. Tell user
	// about the problem.
	log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s  (ignoring, but reading from bigfile will be very slow)", oid, blk, st)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1252
// -------- invalidation protocol notification/serving --------
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1253

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1254 1255 1256 1257
// pin makes sure that file[blk] on client side is the same as of @rev state.
//
// rev must be ≤ w.at
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1258
// XXX what is passed here is rev(blk, @head) - we need to consider rev(blk, @w.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1259 1260 1261
//
// XXX describe more.
// XXX explain that if rev ≤ .at there is no rev_next: rev < rev_next ≤ at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1262
// XXX error - when? or close watch on any error?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1263
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1264
	foid := w.file.zfile.POid()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1265
	defer xerr.Contextf(&err, "f<%s>: watch%d: pin #%d @%s", foid, w.link.id, blk, rev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1266 1267 1268 1269

	// XXX locking?
	// XXX simultaneous calls?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1270 1271 1272
	if rev > w.at {
		panicf("f<%s>: watch%d: pin #%d @%s: watch.at (%s) < rev",
			foid, w.link.id, blk, rev, w.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1273 1274 1275 1276 1277 1278 1279
	}

	if w.pinned.Has(blk) {
		// XXX pinned has to be invalidated when w.at^
		return // already pinned
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1280
	ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, rev))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291
	if err != nil {
		return err
	}

	if ack != "ack" {
		return fmt.Errorf("expect %q; got %q", "ack", ack)
	}

	w.pinned.Add(blk)
	return nil
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1292

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1293
// pinIfNewer makes sure that file[blk] on client side stays as of @w.at state.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314
//
// rev is blk revision as of head. If rev > w.at the block is pinned on client side.
func (w *Watch) pinIfNewer(ctx context.Context, blk int64, rev zodb.Tid) error {
	// XXX locking

	// the block is already covered by @w.at database view
	if rev <= w.at {
		return nil
	}

	// the block is newer - find out its revision as of @w.at and pin to that.
	//
	// We don't pin to w.at since if we would do so for several clients,
	// and most of them would be on different w.at - cache of the file will
	// be lost. Via pinning to particular block revision, we make sure the
	// revision to pin is the same on all clients, and so file cache is shared.

	rev, _ = w.file.LastBlkRev(blk, w.at)
	return w.pin(ctx, blk, rev)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1315 1316
// setupWatch sets up a Watch when client sends `watch <file> @<at>` request.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1317 1318
// XXX sends "pin" notifications; final "ok" must be sent by caller.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1319
// XXX called synchronously - only 1 setupWatch call at a time?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1320
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1321 1322 1323 1324 1325 1326
	defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
	// XXX locking

	// XXX at = zobd.InvalidTid - remove watch

	// XXX if watch was already established - we need to update it
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1327
	w := wlink.fileTab[foid]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1328
	if w != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1329
		panic("TODO")	// XXX update the watch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1330 1331 1332 1333
	}

	// watch was not previously established - set it up anew
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1334 1335
	bfdir := wlink.head.bfdir
	f := bfdir.fileTab[foid]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1336 1337 1338 1339 1340
	if f == nil {
		// by "invalidation protocol" watch is setup after data file was opened
		return fmt.Errorf("file not yet known or is not a ZBigFile")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1341
	// XXX wait wlink.head.zconn.At() ≥ at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1342 1343
	// XXX <~> f.δtail.Head() ≥ at (?)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1344
	if at < bfdir.δFtail.Tail() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1345
		// XXX err += head.at?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1346 1347 1348
		return fmt.Errorf("at is too far away back from head/at")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1349
	toPin := map[int64]zodb.Tid{} // blk -> @rev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1350

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1351 1352 1353 1354
	// XXX locking ok?
	bfdir.δFmu.RLock()
	defer bfdir.δFmu.RUnlock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1355
/* XXX reenable
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1356

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1357 1358
	// FIXME (!!!) since f.δtail does not have all changes to f, here we
	// can be missing some pins we should be sending. (see wcfs_test.py for details)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1359
	for _, δ := range δFtail.SliceByRev(f, at, f.δtail.Head()) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1360 1361 1362 1363 1364
		for _, blk := range δ.Changev {
			_, already := toPin[blk]
			if already {
				continue
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1365

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1366
			// FIXME (!!!) again f.δtail can miss some entries
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1367
			toPin[blk], _ = f.LastBlkRev(blk, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1368 1369
		}
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1370
*/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1371

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1372 1373 1374 1375 1376
	wg, ctx := errgroup.WithContext(ctx)
	for blk, rev := range toPin {
		blk := blk
		rev := rev
		wg.Go(func() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1377
			return w.pin(ctx, blk, rev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1378 1379 1380 1381 1382 1383
		})
	}
	err = wg.Wait()
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1384

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1385 1386 1387
	// XXX register w to f (here ?)

	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1388 1389
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1390
// Open serves /head/watch opens.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1391
func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1392
	// XXX check flags?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1393
	head := wnode.head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1394

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1395
	wlink := &WatchLink{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1396
		sk:      NewFileSock(),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1397
		id:      atomic.AddInt32(&wnode.idNext, +1),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1398
		head:    head,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1399
		fileTab: make(map[zodb.Oid]*Watch),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1400 1401 1402
	}

	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1403 1404
	// XXX del wlinkTab[w] on w.sk.File.Release
	head.wlinkTab[wlink] = struct{}{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1405

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1406 1407
	go wlink.serveRX()
	return wlink.sk.File(), fuse.OK
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1408 1409
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1410 1411
// serveRX serves client initiated watch requests and routes client replies to
// wcfs initiated requests.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1412 1413
func (wlink *WatchLink) serveRX() {
	err := wlink._serveRX()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1414
	_ = err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1415
	// XXX log error if !close
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1416
	// XXX close if was not closed?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1417
	// XXX locking
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1418
	delete(wlink.head.wlinkTab, wlink)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1419 1420
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1421 1422 1423
func (wlink *WatchLink) _serveRX() (err error) {
	defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
	r := bufio.NewReader(wlink.sk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1424

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1425 1426 1427
	// XXX write to peer if it was logical error on client side
	// XXX on which stream? -1?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1428
	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1429
		l, err := r.ReadString('\n')	// XXX limit accepted line len to prevent DOS
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1430
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1431
			return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1432 1433
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1434
		fmt.Printf("S: watch: rx: %q\n", l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1435

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1436
		stream, msg, err := parseWatchFrame(l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1437
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1438
			return fmt.Errorf("%s", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1439 1440
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1441
		// reply from client to to wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1442 1443
		reply := (stream % 2 == 0)
		if reply {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1444 1445 1446 1447
			wlink.rxMu.Lock()
			rxq := wlink.rxTab[stream]
			delete(wlink.rxTab, stream)
			wlink.rxMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1448 1449

			if rxq == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1450
				return fmt.Errorf("%d: reply on unexpected stream", stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1451
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1452 1453 1454
			rxq <- msg
			continue
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1455

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1456 1457
		// client-initiated watch request
		foid, at, err := parseWatch(msg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1458
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1459
			return fmt.Errorf("%d: %s", stream, err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1460
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1461

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1462 1463
		ctx := context.TODO()	// XXX ctx = ?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1464 1465
		fmt.Printf("S: watch: AAA\n")

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1466
		err = wlink.setupWatch(ctx, foid, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1467
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1468
			fmt.Printf("S: watch: QQQ: %s\n", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1469
			return fmt.Errorf("%d: %s", stream, err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1470
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1471 1472

		fmt.Printf("S: watch: BBB\n")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1473 1474 1475 1476 1477

		err = wlink.send(ctx, stream, "ok")
		if err != nil {
			panic(err)	// XXX
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1478 1479

		fmt.Printf("S: watch: CCC\n")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1480
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1481
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1482

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1483
// sendReq sends wcfs-originated request to client and returns client response.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1484
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1485
	// XXX err ctx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1486
	stream := uint64(2) // FIXME allocate stream anew as several in-flight sendReq are possible
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1487 1488

	rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1489 1490 1491
	wlink.rxMu.Lock()
	wlink.rxTab[stream] = rxq	// XXX assert .stream is not there?
	wlink.rxMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1492

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1493
	err = wlink.send(ctx, stream, req)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506
	if err != nil {
		return "", err
	}

	select {
	case <-ctx.Done():
		return "", ctx.Err()

	case reply = <-rxq:
		return reply, nil
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526
// send sends a message message to client over specified stream ID.
//
// Multiple send can be called simultaneously; send serializes writes.
func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) error {
	// XXX err ctx
	// XXX assert '\n' not in msg

	wlink.txMu.Lock()
	defer wlink.txMu.Unlock()

	// XXX timeout write on ctx cancel
	pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg))
	_, err := wlink.sk.Write(pkt)
	if err != nil {
		return err
	}

	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1527

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1528
// ---- Lookup ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1529

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1530
// /(head|<rev>)/bigfile/ -> Lookup receives client request to create /(head|<rev>)/bigfile/<bigfileX>.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1531
func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1532
	f, err := bfdir.lookup(out, name, fctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1533 1534 1535 1536
	var inode *nodefs.Inode
	if f != nil {
		inode = f.Inode()
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1537 1538 1539 1540
	return inode, err2LogStatus(err)

}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1541
func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (f *BigFile, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1542
	defer xerr.Contextf(&err, "%s: lookup %q", bfdir.path(), name)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1543 1544 1545 1546 1547 1548

	oid, err := zodb.ParseOid(name)
	if err != nil {
		return nil, eINVALf("not oid")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1549 1550
	bfdir.head.zconnMu.RLock()
	defer bfdir.head.zconnMu.RUnlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1551 1552 1553 1554 1555 1556 1557

	defer func() {
		if f != nil {
			f.getattr(out)
		}
	}()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1558
	// check to see if dir(oid) is already there
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1559
	bfdir.fileMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1560
	f, already := bfdir.fileTab[oid]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1561
	bfdir.fileMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1562 1563

	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1564
		return f, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1565 1566
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1567
	// not there - without bfdir lock proceed to open BigFile from ZODB
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1568
	f, err = bfdir.head.bigopen(fctx, oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1569 1570 1571
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1572

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1573
	// relock bfdir and either register f or, if the file was maybe
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1574
	// simultaneously created while we were not holding bfdir.fileMu, return that.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1575
	bfdir.fileMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1576
	f2, already := bfdir.fileTab[oid]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1577
	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1578
		bfdir.fileMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1579
		f.Close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1580
		return f2, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1581 1582
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1583
	bfdir.fileTab[oid] = f
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1584
	bfdir.fileMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1585

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1586
	// mkfile takes filesystem treeLock - do it outside bfdir.fileMu
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1587
	mkfile(bfdir, name, f)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1588

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1589
	return f, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1590 1591
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1592 1593 1594 1595 1596 1597 1598 1599
// / -> Lookup receives client request to create @<rev>/.
func (root *Root) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
	revd, err := root.lookup(name, fctx)
	var inode *nodefs.Inode
	if revd != nil {
		inode = revd.Inode()
		_ = revd.GetAttr(out, nil, fctx) // always ok
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1600 1601 1602
	return inode, err2LogStatus(err)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1603
func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1604
	defer xerr.Contextf(&err, "/: lookup %q", name)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1605

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1606
	var rev zodb.Tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1607 1608 1609
	ok := false

	if strings.HasPrefix(name, "@") {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1610
		rev, err = zodb.ParseTid(name[1:])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1611 1612 1613
		ok = (err == nil)
	}
	if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1614
		return nil, eINVALf("not @rev")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1615 1616
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1617
	// check to see if dir(rev) is already there
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1618
	root.revMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1619
	revDir, already := root.revTab[rev]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1620
	root.revMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1621 1622

	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1623
		return revDir, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1624 1625
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1626
	// not there - without revMu lock proceed to open @rev view of ZODB
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1627 1628
//	zconnRev, err := root.zopenAt(fctx, rev)
	zconnRev, err := zopen(fctx, root.zdb, &zodb.ConnOptions{At: rev})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1629 1630 1631 1632
	if err != nil {
		return nil, err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1633 1634 1635
	// relock root and either register new revX/ directory or, if the
	// directory was maybe simultaneously created while we were not holding
	// revMu, return that.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1636
	root.revMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1637
	revDir, already = root.revTab[rev]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1638
	if already {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1639 1640 1641
		root.revMu.Unlock()
//		zconnRev.Release()
		transaction.Current(zconnRev.txnCtx).Abort()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1642
		return revDir, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1643
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1644

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1645
	// XXX -> newHead()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1646
	revDir = &Head{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1647
		fsNode: newFSNode(&fsOptions{Sticky: false}),	// XXX + Head.OnForget() -> del root.revTab[]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1648 1649
		rev:    rev,
		zconn:  zconnRev,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1650
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1651

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1652
	bfdir := &BigFileDir{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1653 1654 1655 1656
		fsNode:  newFSNode(&fsOptions{Sticky: false}),	// XXX + BigFileDir.OnForget()
		head:    revDir,
		fileTab: make(map[zodb.Oid]*BigFile),
		δFtail:  nil, // δFtail not needed/used for @revX/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1657 1658 1659
	}
	revDir.bfdir = bfdir

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1660 1661
	root.revTab[rev] = revDir
	root.revMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1662

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1663
	// mkdir takes filesystem treeLock - do it outside revMu.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1664
	mkdir(root, name, revDir)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1665 1666
	mkdir(revDir, "bigfile", bfdir)
	// XXX + "at"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1667

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1668
	return revDir, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1669 1670 1671
}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1672
// bigopen opens BigFile corresponding to oid on head.zconn.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1673 1674 1675
//
// A ZBigFile corresponding to oid is activated and statted.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1676 1677 1678
// head.zconn must be locked.
func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) {
	zconn := head.zconn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1679
	defer xerr.Contextf(&err, "bigopen %s @%s", oid, zconn.At())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1680 1681

	// XXX better ctx = transaction.PutIntoContext(ctx, txn)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1682
	ctx, cancel := xcontext.Merge(ctx, zconn.txnCtx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1683 1684
	defer cancel()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1685
	xzfile, err := zconn.Get(ctx, oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1686
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1687 1688 1689 1690 1691 1692 1693 1694
		switch errors.Cause(err).(type) {
		case *zodb.NoObjectError:
			return nil, eINVAL(err)
		case *zodb.NoDataError:
			return nil, eINVAL(err) // XXX what to do if it was existing and got deleted?
		default:
			return nil, err
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1695
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1696

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1697
	zfile, ok := xzfile.(*ZBigFile)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1698
	if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1699
		return nil, eINVALf("%s is not a ZBigFile", typeOf(xzfile))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1700 1701
	}

1702
	// extract blksize, size and initial approximation for file revision
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1703
	err = zfile.PActivate(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1704 1705 1706
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1707
	blksize := zfile.blksize
1708
	// XXX it should be revision of both ZBigFile and its data. But we
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1709 1710 1711 1712
	// cannot get data revision without expensive scan of all ZBigFile's objects.
	// -> approximate mtime initially with ZBigFile object mtime.
	//
	// XXX for @rev/... we can know initial mtime more exactly?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1713 1714
	rev     := zfile.PSerial()
	zfile.PDeactivate()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1715

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1716
	size, treePath, err := zfile.Size(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1717 1718 1719 1720
	if err != nil {
		return nil, err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1721
//	zconn.Incref()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1722
	f := &BigFile{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1723
		fsNode:  newFSNode(&fsOptions{Sticky: false}),	// XXX + BigFile.OnForget -> del .head.bfdir.fileTab[]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1724
		head:    head,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1725
		zfile:   zfile,
1726
		blksize: blksize,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1727
		size:    size,
1728
		rev:     rev,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1729

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1730 1731
//		// XXX this is needed only for head/
//		δtail:   NewΔTailI64(zconn.At()),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1732
		loading: make(map[int64]*blkLoadState),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1733 1734
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1735
	// only head/ needs δFtail.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1736
	if head.rev == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1737 1738 1739
		head.bfdir.δFmu.Lock()	// XXX locking ok?
		head.bfdir.δFtail.Track(f, treePath)
		head.bfdir.δFmu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1740
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1741 1742

	return f, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1743 1744
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1745
// Close release all resources of BigFile.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1746
func (f *BigFile) Close() error {
1747
	// XXX locking?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1748
	f.zfile = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1749

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1750 1751 1752
//	f.zconn.Release()
//	f.zconn = nil
	f.head = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1753 1754 1755

	return nil
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1756

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1757 1758 1759 1760 1761 1762 1763 1764 1765 1766
// ---- misc ---

// /(head|<rev>)/at -> readAt serves read.
func (h *Head) readAt() []byte {
	h.zconnMu.RLock()
	defer h.zconnMu.RUnlock()

	return []byte(h.zconn.At().String())
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781
// /(head|<rev>)/ -> Getattr serves stat.
func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
	at := head.rev
	if at == 0 {
		head.zconnMu.RLock()
		at = head.zconn.At()
		head.zconnMu.RUnlock()
	}
	t := at.Time().Time

	out.Mode = fuse.S_IFDIR | 0555
	out.SetTimes(/*atime=*/nil, /*mtime=*/&t, /*ctime=*/&t)
	return fuse.OK
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1782
// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1783
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1784 1785 1786
	f.head.zconnMu.RLock()
	defer f.head.zconnMu.RUnlock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1787 1788 1789
	f.getattr(out)
	return fuse.OK
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1790

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1791
func (f *BigFile) getattr(out *fuse.Attr) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1792
	out.Mode = fuse.S_IFREG | 0444
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1793
	out.Size = uint64(f.size)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1794
	// .Blocks
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1795
	// .Blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1796

1797
	mtime := f.rev.Time().Time
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1798 1799 1800 1801
	out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime)
}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1802 1803


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1804
// FIXME groot/gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode
Kirill Smelkov's avatar
Kirill Smelkov committed
1805 1806 1807 1808 1809
// TODO:
//	- Inode += .Mount() -> nodefs.Mount
//	- Mount:
//		.Root()		-> root Inode of the fs
//		.Connector()	-> FileSystemConnector through which fs is mounted
1810
var groot   *Root
Kirill Smelkov's avatar
Kirill Smelkov committed
1811 1812
var gfsconn *nodefs.FileSystemConnector

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1813 1814 1815 1816 1817 1818 1819 1820
// root of the filesystem is mounted here.
//
// we need to talk to kernel and lookup @<rev>/bigfile/<fid> before uploading
// data to kernel cache there. Referencing root of the filesystem via path is
// vulnerable to bugs wrt e.g. `mount --move` and/or mounting something else
// over wcfs. However keeping opened root fd will prevent wcfs to be unmounted,
// so we still have to reference the root via path.
var gmntpt string
1821

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1822 1823 1824 1825 1826 1827 1828
// debugging
var gdebug = struct {
	// .wcfs/zhead opens
	// protected by groot.head.zconnMu
	zheadSockTab map[*FileSock]struct{}
}{}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1829 1830 1831 1832
func init() {
	gdebug.zheadSockTab = make(map[*FileSock]struct{})
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1833 1834
// _wcfs_Zhead serves .wcfs/zhead opens.
type _wcfs_Zhead struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1835
	fsNode
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1836 1837 1838 1839 1840 1841 1842 1843 1844 1845
}

func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
	// XXX check flags?
	sk := NewFileSock()
	sk.CloseRead()

	groot.head.zconnMu.Lock()
	defer groot.head.zconnMu.Unlock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1846
	// XXX del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1847 1848 1849 1850
	gdebug.zheadSockTab[sk] = struct{}{}
	return sk.File(), fuse.OK
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1851
func main() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1852
	stdlog.SetPrefix("wcfs: ")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1853
	//log.CopyStandardLogTo("WARNING") // XXX -> "DEBUG" if -d ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1854
	defer log.Flush()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1855

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1856
	debug := flag.Bool("d", false, "debug")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1857
	autoexit := flag.Bool("autoexit", false, "automatically stop service when there is no client activity")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1858 1859
	// XXX option to prevent starting if wcfs was already started ?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1860 1861
	flag.Parse()
	if len(flag.Args()) != 2 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1862
		log.Fatalf("Usage: %s [OPTIONS] zurl mntpt", os.Args[0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1863 1864 1865 1866
	}
	zurl := flag.Args()[0]
	mntpt := flag.Args()[1]

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1867 1868 1869 1870 1871
	// debug -> precise t, no dates	(XXX -> always precise t?)
	if *debug {
		stdlog.SetFlags(stdlog.Lmicroseconds)
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1872
	// open zodb storage/db/connection
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1873
	ctx := context.Background()	// XXX + timeout?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1874
	zstor, err := zodb.Open(ctx, zurl, &zodb.OpenOptions{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1875 1876
		ReadOnly: true,
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1877 1878 1879 1880 1881
	if err != nil {
		log.Fatal(err)
	}
	defer zstor.Close()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1882
	zdb := zodb.NewDB(zstor)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1883
	defer zdb.Close()	// XXX err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1884 1885 1886 1887 1888
	zhead, err := zopen(ctx, zdb, &zodb.ConnOptions{
		// we need zhead.cache to be maintained across several transactions.
		// see "3) for head/bigfile/* the following invariant is maintained ..."
		NoPool: true,
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1889 1890 1891
	if err != nil {
		log.Fatal(err)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1892 1893 1894
	zhead.Cache().Lock()
	zhead.Cache().SetControl(&zodbCacheControl{})
	zhead.Cache().Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1895

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1896
	// mount root + head/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1897
	// XXX -> newHead()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1898
	head := &Head{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1899 1900 1901
		fsNode:   newFSNode(fSticky),
		rev:      0,
		zconn:    zhead,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1902
		wlinkTab: make(map[*WatchLink]struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1903
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1904

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1905
	wnode := &WatchNode{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1906 1907
		fsNode: newFSNode(fSticky),
		head:   head,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1908 1909
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1910
	bfdir := &BigFileDir{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1911 1912 1913
		fsNode:   newFSNode(fSticky),
		head:     head,
		fileTab:  make(map[zodb.Oid]*BigFile),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1914
		δFtail:   NewΔFTail(zhead.At()),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1915 1916
	}
	head.bfdir = bfdir
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1917

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1918
	root := &Root{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1919
		fsNode: newFSNode(fSticky),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1920 1921 1922 1923
		zstor:  zstor,
		zdb:    zdb,
		head:   head,
		revTab: make(map[zodb.Tid]*Head),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1924 1925
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1926
	opts := &fuse.MountOptions{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1927 1928
		FsName: zurl,
		Name:   "wcfs",
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1929

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1930 1931 1932 1933 1934
		// We retrieve kernel cache in ZBlk.blksize chunks, which are 2MB in size.
		// XXX currently go-fuse caps MaxWrite to 128KB.
		// TODO -> teach go-fuse to handle Init.MaxPages (Linux 4.20+).
		MaxWrite:      2*1024*1024,

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1935 1936
		// XXX tune MaxReadAhead? MaxBackground?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1937 1938 1939 1940
		// OS cache that we populate with bigfile data is precious;
		// we precisely propagate ZODB invalidations into file invalidations.
		PreciseDataCacheControl: true,

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1941
		DisableXAttrs: true,        // we don't use
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1942
		Debug:         *debug,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1943
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1944

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1945
	fssrv, fsconn, err := mount(mntpt, root, opts)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1946
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1947
		log.Fatal(err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1948
	}
1949
	groot   = root		// FIXME temp workaround (see ^^^)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1950
	gfsconn = fsconn	// FIXME ----//----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1951
	gmntpt  = mntpt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1952

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1953
	// we require proper pagecache control (added to Linux 2.6.36 in 2010)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1954 1955 1956
	kinit := fssrv.KernelSettings()
	kfuse := fmt.Sprintf("kernel FUSE (API %d.%d)", kinit.Major, kinit.Minor)
	supports := kinit.SupportsNotify
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1957
	if !(supports(fuse.NOTIFY_STORE_CACHE) && supports(fuse.NOTIFY_RETRIEVE_CACHE)) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1958 1959 1960 1961 1962 1963 1964 1965 1966
		log.Fatalf("%s does not support pagecache control", kfuse)
	}
	// make a bold warning if kernel does not support precise cache invalidation
	// (patch sent upstream; see notes.txt -> "Notes on OS pagecache control")
	if kinit.Flags & fuse.CAP_PRECISE_INVAL_DATA == 0 {
		w1 := fmt.Sprintf("%s does not support precise data cache invalidation", kfuse)
		w2 := "-> performance will be AWFUL."
		log.Error(w1); log.Error(w2)
		fmt.Fprintf(os.Stderr, "W: wcfs: %s\nW: wcfs: %s\n", w1, w2)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1967 1968
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1969
	// add entries to /
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1970
	mkdir(root, "head", head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1971
	mkdir(head, "bigfile", bfdir)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1972
	mkfile(head, "at", NewSmallFile(head.readAt))   // TODO mtime(at) = tidtime(at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1973
	mkfile(head, "watch", wnode)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1974 1975

	// for debugging/testing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1976
	_wcfs := newFSNode(fSticky)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1977 1978
	mkdir(root, ".wcfs", &_wcfs)
	mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl)))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1979

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1980 1981 1982 1983 1984
	// .wcfs/zhead - special file channel that sends zhead.at.
	//
	// If a user opens it, it will start to get tids of through which
	// zhead.at was, starting from the time when .wcfs/zhead was opened.
	// There can be multiple openers. Once opened, the file must be read,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1985 1986
	// as wcfs blocks waiting for data to be read when processing
	// invalidations.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1987
	mkfile(&_wcfs, "zhead", &_wcfs_Zhead{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1988
		fsNode: newFSNode(fSticky),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1989
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1990

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1991 1992 1993 1994 1995
	// XXX place = ok?
	// XXX ctx = ok?
	// XXX wait for zwatcher shutdown.
	go root.zwatcher(ctx)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1996
	// TODO handle autoexit
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1997 1998
	// (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl
	//  opened, so when all inodes has been forgotten - we know all wcfs.py clients exited)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1999 2000
	_ = autoexit

2001 2002 2003 2004
	// serve client requests.
	//
	// use `go serve` + `waitMount` not just `serve` - because waitMount
	// cares to disable OS calling poll on us.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2005
	// ( if we don't disable polling - fs serving can get stuck - see
2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
	//   https://github.com/hanwen/go-fuse/commit/4f10e248eb for details )
	done := make(chan struct{})
	go func () {
		fssrv.Serve()
		close(done)
	}()
	err = fssrv.WaitMount()
	if err != nil {
		log.Fatal(err) // XXX err ctx?
	}
	<-done
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2017
}