filestorage.go 21.9 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1 2 3 4 5 6 7 8 9 10 11 12
// Copyright (C) 2017  Nexedi SA and Contributors.
//                     Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 2, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
13
// XXX partly based on code from ZODB ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
14
// TODO link to format in zodb/py
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
15 16 17 18 19 20 21 22 23 24 25 26 27

// FileStorage v1.  XXX text
package fs1

import (
	"encoding/binary"
	"fmt"
	"io"
	"os"

	"../../zodb"
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
28 29
// FileStorage is a ZODB storage which stores data in simple append-only file
// organized as transactional log.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
30
type FileStorage struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
31 32
	file	*os.File
	index	*fsIndex	// oid -> data record position in transaction which last changed oid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
33
	topPos	int64		// position pointing just past last committed transaction
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
34
				// (= size(.file) when no commit is in progress)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
35 36 37

	// min/max tids committed
	tidMin, tidMax	zodb.Tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
38 39
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
40
// IStorage	XXX move ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
41 42
var _ zodb.IStorage = (*FileStorage)(nil)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
43
// TxnHeader represents header of a transaction record
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
44
type TxnHeader struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
45
	Pos	int64	// position of transaction start
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
46
	LenPrev	int64	// whole previous transaction record length
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
47 48
			// (-1 if there is no previous txn record) XXX see rules in Load
	Len	int64	// whole transaction record length	   XXX see rules in Load
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
49

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
50
	// transaction metadata itself
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51 52
	zodb.TxnInfo

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53
	// underlying memory for header loading and for user/desc/extension strings
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
54
	workMem	[]byte
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
55 56 57 58
}

// DataHeader represents header of a data record
type DataHeader struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
59
	Pos		int64	// position of data record start
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
60 61
	Oid             zodb.Oid
	Tid             zodb.Tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62
	// XXX -> .PosPrevRev  .PosTxn  .LenData
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
63
	PrevRevPos	int64	// position of this oid's previous-revision data record	XXX naming
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
64
	TxnPos          int64	// position of transaction record this data record belongs to
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
65
	//_		uint16	// 2-bytes with zero values. (Was version length.)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66
	DataLen		int64	// length of following data. if 0 -> following = 8 bytes backpointer
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
67 68 69
				// if backpointer == 0 -> oid deleted
	//Data            []byte
	//DataRecPos      uint64  // if Data == nil -> byte position of data record containing data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
70 71

	// XXX include word0 ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
72 73
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
74
const (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
75
	Magic = "FS21"	// every FileStorage file starts with this
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
76

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
77 78 79
	// on-disk sizes
	TxnHeaderFixSize	= 8+8+1+2+2+2		// without user/desc/ext strings
	txnXHeaderFixSize	= 8 + TxnHeaderFixSize	// ^^^ with trail LenPrev from previous record
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
	DataHeaderSize		= 8+8+8+8+2+8

	// txn/data pos that are < vvv are for sure invalid
	txnValidFrom	= int64(len(Magic))
	dataValidFrom	= txnValidFrom + TxnHeaderFixSize
)

// ErrTxnRecord is returned on transaction record read / decode errors
type ErrTxnRecord struct {
	Pos	int64	// position of transaction record
	Subj	string	// about what .Err is
	Err	error	// actual error
}

func (e *ErrTxnRecord) Error() string {
	return fmt.Sprintf("transaction record @%v: %v: %v", e.Pos, e.Subj, e.Err)
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
98
// err creates ErrTxnRecord for transaction located at txnh.Pos
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
99
func (txnh *TxnHeader) err(subj string, err error) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
100 101 102
	return &ErrTxnRecord{txnh.Pos, subj, err}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
103

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
104
// ErrDataRecord is returned on data record read / decode errors
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
105
type ErrDataRecord struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
106 107 108
	Pos	int64	// position of data record
	Subj	string	// about what .Err is
	Err	error	// actual error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
109 110
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
111
func (e *ErrDataRecord) Error() string {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
112
	return fmt.Sprintf("data record @%v: %v: %v", e.Pos, e.Subj, e.Err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
113 114
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
115 116
// err creates ErrDataRecord for data record located at dh.Pos
// XXX add link to containing txn? (check whether we can do it on data access) ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
117
func (dh *DataHeader) err(subj string, err error) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
118 119 120
	return &ErrDataRecord{dh.Pos, subj, err}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
121 122 123 124 125 126 127

// xerr is an interface for something which can create errors
// it is used by TxnHeader and DataHeader to create appropriate errors with their context
type xerr interface {
	err(subj string, err error) error
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
128
// errf is syntactic shortcut for err and fmt.Errorf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
129 130
func errf(e xerr, subj, format string, a ...interface{}) error {
	return e.err(subj, fmt.Errorf(format, a...))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
131 132
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
133
// decodeErr is syntactic shortcut for errf("decode", ...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
134
// TODO in many places "decode" -> "selfcheck"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
135 136
func decodeErr(e xerr, format string, a ...interface{}) error {
	return errf(e, "decode", format, a...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
138

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
139
// bug panics with errf("bug", ...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
140 141
func bug(e xerr, format string, a ...interface{}) {
	panic(errf(e, "bug", format, a...))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
142
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
143

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
144

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
145 146 147 148 149 150 151 152
// noEOF returns err, but changes io.EOF -> io.ErrUnexpectedEOF
func noEOF(err error) error {
	if err == io.EOF {
		err = io.ErrUnexpectedEOF
	}
	return err
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
153

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
154 155 156
// flags for TxnHeader.Load
type TxnLoadFlags int
const (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
157 158
	LoadAll		TxnLoadFlags	= 0x00 // load whole transaction header
	LoadNoStrings			= 0x01 // do not load user/desc/ext strings
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
159 160
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
161
// Load reads and decodes transaction record header
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
162
// pos: points to transaction start
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
163
// no prerequisite requirements are made to previous txnh state
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
164
// TODO describe what happens at EOF and when .LenPrev is still valid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
165 166 167
//
// rules for Len/LenPrev returns:
// Len ==  0			transaction header could not be read
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
168
// Len == -1			EOF forward
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
169 170
// Len >= TxnHeaderFixSize	transaction was read normally
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
171
// LenPrev == 0			prev record length could not be read
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
172 173
// LenPrev == -1		EOF backward
// LenPrev >= TxnHeaderFixSize	LenPrev was read/checked normally
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
174
func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLoadFlags) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
175 176 177
	if cap(txnh.workMem) < txnXHeaderFixSize {
		txnh.workMem = make([]byte, txnXHeaderFixSize)	// XXX or 0, ... ?
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
178
	work := txnh.workMem[:txnXHeaderFixSize]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
180
	// XXX recheck rules about error exit
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
181
	txnh.Pos = pos
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
182 183
	txnh.Len = 0		// read error
	txnh.LenPrev = 0	// read error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
184 185

	if pos < txnValidFrom {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
186
		bug(txnh, "Load() on invalid position")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
187 188 189 190
	}

	var n int
	var err error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
191

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
192
	if pos - 8 >= txnValidFrom {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
193
		// read together with previous's txn record redundant length
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
194
		n, err = r.ReadAt(work, pos - 8)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
195 196
		n -= 8	// relative to pos
		if n >= 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
197 198
			lenPrev := 8 + int64(binary.BigEndian.Uint64(work[8-8:]))
			if lenPrev < TxnHeaderFixSize {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
199
				return decodeErr(txnh, "invalid prev record length: %v", lenPrev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
200 201 202
			}
			posPrev := txnh.Pos - lenPrev
			if posPrev < txnValidFrom {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
203
				return decodeErr(txnh, "prev record length goes beyond valid area: %v", lenPrev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
204 205
			}
			if posPrev < txnValidFrom + TxnHeaderFixSize && posPrev != txnValidFrom {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
206
				return decodeErr(txnh, "prev record does not land exactly at valid area start: %v", posPrev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
207
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
208
			txnh.LenPrev = lenPrev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
209
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
210
	} else {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
211
		// read only current txn without previous record length
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
212
		n, err = r.ReadAt(work[8:], pos)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
213
		txnh.LenPrev = -1	// EOF backward
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
214
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
215 216


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
217
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
218
		if err == io.EOF && n == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
219 220
			txnh.Len = -1	// EOF forward
			return err	// end of stream
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
221 222
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
223
		// EOF after txn header is not good - because at least
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
224
		// redundant length should be also there
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
225
		return txnh.err("read", noEOF(err))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
226 227
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
228
	txnh.Tid = zodb.Tid(binary.BigEndian.Uint64(work[8+0:]))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
229
	if !txnh.Tid.Valid() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
230
		return decodeErr(txnh, "invalid tid: %v", txnh.Tid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
231
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
232

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
233 234
	tlen := 8 + int64(binary.BigEndian.Uint64(work[8+8:]))
	if tlen < TxnHeaderFixSize {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
235
		return decodeErr(txnh, "invalid txn record length: %v", tlen)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
236
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
237
	// XXX also check tlen to not go beyond file size ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
238 239 240 241
	txnh.Len = tlen

	txnh.Status = zodb.TxnStatus(work[8+16])
	if !txnh.Status.Valid() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
242
		return decodeErr(txnh, "invalid status: %v", txnh.Status)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
243 244
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
245

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
246 247 248
        luser := binary.BigEndian.Uint16(work[8+17:])
	ldesc := binary.BigEndian.Uint16(work[8+19:])
	lext  := binary.BigEndian.Uint16(work[8+21:])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
249

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
250
	// NOTE we encode whole strings length into len(.workMem)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
251
	lstr := int(luser) + int(ldesc) + int(lext)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
252 253
	if cap(txnh.workMem) < lstr {
		txnh.workMem = make([]byte, lstr)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
254
	} else {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
255
		txnh.workMem = txnh.workMem[:lstr]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
256 257
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
258 259
	// NOTE we encode each x string length into cap(x)
	//      and set len(x) = 0 to indicate x is not loaded yet
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
260 261 262 263 264 265
	//println("workmem len:", len(txnh.workMem), "cap:", cap(txnh.workMem))
	//println("luser:", luser)
	//println("ldesc:", ldesc)
	//println("lext: ", lext)
	xdesc := luser + ldesc
	xext  := xdesc + lext
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
266
	txnh.User	 = txnh.workMem[0:0:luser]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
267 268
	txnh.Description = txnh.workMem[luser:luser:xdesc]
	txnh.Extension	 = txnh.workMem[xdesc:xdesc:xext]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
269

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
270 271
	if flags & LoadNoStrings == 0 {
		err = txnh.loadStrings(r)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
272
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
273

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
274
	return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
275 276
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
277
// loadStrings makes sure strings that are part of transaction header are loaded
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
278
func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
279 280
	// XXX make it no-op if strings are already loaded?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
281
	// we rely on Load leaving len(workMem) = sum of all strings length ...
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
282
	_, err := r.ReadAt(txnh.workMem, txnh.Pos + TxnHeaderFixSize)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
283
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
284
		return txnh.err("read strings", noEOF(err))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
285 286 287 288 289
	}

	// ... and presetting x to point to appropriate places in .workMem .
	// so set len(x) = cap(x) to indicate strings are now loaded.
	txnh.User = txnh.User[:cap(txnh.User)]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
290 291
	txnh.Description = txnh.Description[:cap(txnh.Description)]
	txnh.Extension = txnh.Extension[:cap(txnh.Extension)]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
292 293

	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
294 295
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
296
// LoadPrev reads and decodes previous transaction record header
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
297
// prerequisite: txnh .Pos and .LenPrev are initialized:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
298 299
//   - by successful call to Load() initially			XXX but EOF also works
//   - by subsequent successful calls to LoadPrev / LoadNext	XXX recheck
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
300
func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
301
	lenPrev := txnh.LenPrev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
302 303
	switch lenPrev {	// XXX recheck states for: 1) LenPrev load error  2) EOF
	case 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
304
		bug(txnh, "LoadPrev() when .LenPrev == error")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
305
	case -1:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
306
		return io.EOF
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
307 308
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
309
	// here we know: Load already checked txnh.Pos - lenPrev to be valid position
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
310 311 312 313 314 315
	err := txnh.Load(r, txnh.Pos - lenPrev, flags)
	if err != nil {
		return err
	}

	if txnh.Len != lenPrev {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
316
		return decodeErr(txnh, "head/tail lengths mismatch: %v, %v", txnh.Len, lenPrev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
317 318 319
	}

	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
320
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
321

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
322
// LoadNext reads and decodes next transaction record header
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
323 324 325
// prerequisite: txnh .Pos and .Len should be already initialized by:
//   - previous successful call to Load() initially		XXX ^^^
//   - TODO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
326
func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
327
	lenCur := txnh.Len
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
328
	posCur := txnh.Pos
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
329 330
	switch lenCur {
	case 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
331
		bug(txnh, "LoadNext() when .Len == error")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
332 333 334
	case -1:
		return io.EOF
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
335

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
336
	err := txnh.Load(r, txnh.Pos + lenCur, flags)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
337 338 339 340

	// before checking loading error for next txn, let's first check redundant length
	// NOTE also: err could be EOF
	if txnh.LenPrev != 0 && txnh.LenPrev != lenCur {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
341 342
		t := &TxnHeader{Pos: posCur} // txn for which we discovered problem
		return decodeErr(t, "head/tail lengths mismatch: %v, %v", lenCur, txnh.LenPrev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
343
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
344 345

	return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
346 347
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
348

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
349 350 351 352 353 354 355 356 357 358 359
// Len returns whole data record length
func (dh *DataHeader) Len() int64 {
	dataLen := dh.DataLen
	if dataLen == 0 {
		// XXX -> .DataLen() ?
		dataLen = 8 // back-pointer | oid removal
	}

	return DataHeaderSize + dataLen
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
360

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
361 362 363
// load reads and decodes data record header
// pos: points to data header start
// no prerequisite requirements are made to previous dh state
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
364
func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[DataHeaderSize]byte) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
365
	dh.Pos = pos
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
366
	// XXX .Len = 0		= read error ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
367

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
368
	if pos < dataValidFrom {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
369
		bug(dh, "Load() on invalid position")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
370
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
371

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
372
	_, err := r.ReadAt(tmpBuf[:], pos)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
373
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
374
		return dh.err("read", noEOF(err))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
375 376 377
	}

	// XXX also check oid.Valid() ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
378 379
	dh.Oid = zodb.Oid(binary.BigEndian.Uint64(tmpBuf[0:]))	// XXX -> zodb.Oid.Decode() ?
	dh.Tid = zodb.Tid(binary.BigEndian.Uint64(tmpBuf[8:]))	// XXX -> zodb.Tid.Decode() ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
380
	if !dh.Tid.Valid() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
381
		return decodeErr(dh, "invalid tid: %v", dh.Tid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
382 383 384
	}

	dh.PrevRevPos = int64(binary.BigEndian.Uint64(tmpBuf[16:]))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
385
	dh.TxnPos = int64(binary.BigEndian.Uint64(tmpBuf[24:]))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
386
	if dh.TxnPos < txnValidFrom {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
387
		return decodeErr(dh, "invalid txn position: %v", dh.TxnPos)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
388 389 390
	}

	if dh.TxnPos + TxnHeaderFixSize > pos {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
391
		return decodeErr(dh, "txn position not decreasing: %v", dh.TxnPos)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
392
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
393 394 395 396 397 398 399
	if dh.PrevRevPos != 0 {	// zero means there is no previous revision
		if dh.PrevRevPos < dataValidFrom {
			return decodeErr(dh, "invalid prev revision position: %v", dh.PrevRevPos)
		}
		if dh.PrevRevPos + DataHeaderSize > dh.TxnPos - 8 {
			return decodeErr(dh, "prev revision position (%v) overlaps with txn (%v)", dh.PrevRevPos, dh.TxnPos)
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
400 401
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
402
	verlen := binary.BigEndian.Uint16(tmpBuf[32:])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
403
	if verlen != 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
404
		return decodeErr(dh, "non-zero version: #%v", verlen)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
405 406
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
407
	dh.DataLen = int64(binary.BigEndian.Uint64(tmpBuf[34:]))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
408 409
	if dh.DataLen < 0 {
		// XXX also check DataLen < max ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
410
		return decodeErr(dh, "invalid data len: %v", dh.DataLen)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
411
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
412

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
413 414 415
	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
416 417 418 419
// XXX do we need Load when load() is there?
func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
	var tmpBuf [DataHeaderSize]byte
	return dh.load(r, pos, &tmpBuf)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
420 421
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
422
// LoadPrevRev reads and decodes previous revision data record header
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
423 424 425 426 427 428 429 430
// prerequisite: dh .Oid .Tid .PrevRevPos are initialized:
//   - TODO describe how
// when there is no previous revision: io.EOF is returned
func (dh *DataHeader) LoadPrevRev(r io.ReaderAt /* *os.File */) error {
	if dh.PrevRevPos == 0 {	// XXX -> -1 ?
		return io.EOF	// no more previous revisions
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
	posCur := dh.Pos

	err := dh.loadPrevRev(r)
	if err != nil {
		// data record @...: loading prev rev: data record @...: ...
		err = &ErrDataRecord{posCur, "loading prev rev", err}
	}
	return err
}

func (dh *DataHeader) loadPrevRev(r io.ReaderAt /* *os.File */) error {
	oid := dh.Oid
	tid := dh.Tid

	err := dh.Load(r, dh.PrevRevPos)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
446 447 448 449 450
	if err != nil {
		return err
	}

	if dh.Oid != oid {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
451 452
		// XXX vvv valid only if ErrDataRecord prints oid
		return decodeErr(dh, "oid mismatch")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
453 454 455
	}

	if dh.Tid >= tid {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
456 457
		// XXX vvv valid only if ErrDataRecord prints tid
		return decodeErr(dh, "tid mismatch")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
458 459 460 461 462
	}

	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
463 464
// LoadBack reads and decodes data header for revision linked via back-pointer
// prerequisite: dh XXX     .DataLen == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
465
func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
466 467 468 469 470
	if dh.DataLen != 0 {
		bug(dh, "LoadBack() on non-backpointer data header")
	}

	var xxx [8]byte	// XXX escapes ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
471
	_, err := r.ReadAt(xxx[:], dh.Pos + DataHeaderSize)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
472 473 474 475
	if err != nil {
		return dh.err("read data", noEOF(err))
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
476
	backPos := int64(binary.BigEndian.Uint64(xxx[:]))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
477 478 479 480 481 482 483 484
	if backPos < dataValidFrom {
		return decodeErr(dh, "invalid backpointer: %v", backPos)
	}
	if backPos + DataHeaderSize > dh.TxnPos - 8 {
		return decodeErr(dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos)
	}
	// TODO backPos can be also == 0 - (means deleted rev)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
	posCur := dh.Pos
	tid := dh.Tid

	// TODO compare this with loadPrevRev() way
	err = func() error {
		err := dh.Load(r, backPos)
		if err != nil {
			return err
		}

		// XXX also dh.Oid == oid ?
		//     but in general back pointer might point to record with different oid
		if dh.Tid >= tid {
			return decodeErr(dh, "tid not decreasing")
		}

		return err
	}()

	if err != nil {
		err = &ErrDataRecord{posCur, "loading back rev", err}
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
508
	return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
509
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
510

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
// LoadNext reads and decodes data header for next data record in the same transaction
// prerequisite: dh .Pos .DataLen are initialized
// when there is no more data records: io.EOF is returned
//
// XXX NOTE(self): iteration starts with {Pos: txnh.Pos, DataLen: -DataHeaderSize}
func (dh *DataHeader) LoadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) error {
	err := dh.loadNext(r, txnh)
	if err != nil && err != io.EOF {
		err = txnh.err("iterating", err)
	}
	return err
}

func (dh *DataHeader) loadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) error {
	// position of txn tail - right after last data record byte
	txnTailPos := txnh.Pos + txnh.Len - 8

	// NOTE we know nextPos does not overlap txnTailPos - it was checked by
	// previous LoadNext()
	nextPos := dh.Pos + dh.Len()
	if nextPos == txnTailPos {
		return io.EOF
	}

	if nextPos + DataHeaderSize > txnTailPos {
		return &ErrDataRecord{nextPos, "decode", fmt.Errorf("data record header overlaps txn boundary")}	// XXX
	}

	err := dh.Load(r, nextPos)
	if err != nil {
		return err
	}

	if dh.Tid != txnh.Tid {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
545
		return decodeErr(dh, "data.tid != txn.Tid")	// XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
546 547
	}
	if dh.TxnPos != txnh.Pos {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
548
		return decodeErr(dh, "data.txnPos != txn.Pos")	// XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
549 550 551 552 553 554 555 556
	}
	if dh.Pos + dh.Len() > txnTailPos {
		return decodeErr(dh, "data record overlaps txn boundary")	// XXX
	}

	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
557

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
558
func Open(path string) (*FileStorage, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
559 560 561 562
	f, err := os.Open(path)	// XXX opens in O_RDONLY
	if err != nil {
		return nil, err	// XXX err more context ?
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
563

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
564
	// check file magic
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
565
	var xxx [len(Magic)]byte
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
566 567 568 569
	_, err = f.ReadAt(xxx[:], 0)
	if err != nil {
		return nil, err	// XXX err more context
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
570
	if string(xxx[:]) != Magic {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
571 572
		return nil, fmt.Errorf("%s: invalid magic %q", path, xxx)	// XXX err?
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
573

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
574 575 576 577 578 579 580
	// TODO recreate index if missing / not sane
	// TODO verify index sane / topPos matches
	topPos, index, err := LoadIndexFile(path + ".index")
	if err != nil {
		panic(err)	// XXX err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
581 582
	// read tidMin/tidMax
	// FIXME support empty file case
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
583
	var txnhMin, txnhMax TxnHeader
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
584
	err = txnhMin.Load(f, txnValidFrom, LoadAll)	// XXX txnValidFrom here -> ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
585 586 587
	if err != nil {
		return nil, err	// XXX +context
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
588
	err = txnhMax.Load(f, topPos, LoadAll)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
589 590 591 592 593
	// expect EOF but .LenPrev must be good
	if err != io.EOF {
		if err == nil {
			err = fmt.Errorf("no EOF after topPos")	// XXX err context
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
594 595
		return nil, err	// XXX +context
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
596 597 598
	if txnhMax.LenPrev <= 0 {
		panic("could not read LenPrev @topPos")	// XXX err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
599

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
600
	err = txnhMax.LoadPrev(f, LoadAll)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
601
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
602
		panic(err)	// XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
603 604
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
605 606

	return &FileStorage{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
607
			file: f,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
608 609
			index: index,
			topPos: topPos,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
610 611
			tidMin: txnhMin.Tid,
			tidMax: txnhMax.Tid,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
612
		}, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
613 614
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
615 616

func (fs *FileStorage) LastTid() zodb.Tid {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
617 618 619
	// XXX check we have transactions at all
	// XXX what to return then?
	return fs.tidMax
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
620 621
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
622 623 624
// ErrXidLoad is returned when there is an error while loading xid
type ErrXidLoad struct {
	Xid	zodb.Xid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
625 626 627
	Err	error
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
628 629
func (e *ErrXidLoad) Error() string {
	return fmt.Sprintf("loading %v: %v", e.Xid, e.Err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
630 631
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
632
func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
633
	// lookup in index position of oid data record within latest transaction who changed this oid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
634
	dataPos, ok := fs.index.Get(xid.Oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
635
	if !ok {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
636
		// XXX drop oid from ErrOidMissing ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
637
		return nil, zodb.Tid(0), &ErrXidLoad{xid, zodb.ErrOidMissing{Oid: xid.Oid}}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
638 639
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
640
	dh := DataHeader{Oid: xid.Oid, Tid: zodb.TidMax, PrevRevPos: dataPos}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
641 642 643 644
	tidBefore := xid.XTid.Tid
	if !xid.XTid.TidBefore {
		tidBefore++	// XXX recheck this is ok wrt overflow
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
645

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
646
	// search backwards for when we first have data record with tid satisfying xid.XTid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
647 648
	for dh.Tid >= tidBefore {
		err = dh.LoadPrevRev(fs.file)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
649
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
650 651 652 653
			if err == io.EOF {
				// no such oid revision
				err = &zodb.ErrXidMissing{Xid: xid}
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
654
			return nil, zodb.Tid(0), &ErrXidLoad{xid, err}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
655 656 657
		}
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
658 659 660 661 662 663
	// found dh.Tid < tidBefore; check it really satisfies xid.XTid
	if !xid.XTid.TidBefore && dh.Tid != xid.XTid.Tid {
		// XXX unify with ^^^
		return nil, zodb.Tid(0), &ErrXidLoad{xid, &zodb.ErrXidMissing{Xid: xid}}
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
664 665 666 667
	// even if we will scan back via backpointers, the tid returned should
	// be of first-found transaction
	tid = dh.Tid

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
668 669
	// scan via backpointers
	for dh.DataLen == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
670
		err = dh.LoadBack(fs.file)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
671
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
672
			panic(err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
673
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
674 675
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
676
	// now read actual data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
677
	data = make([]byte, dh.DataLen)	// TODO -> slab ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
678
	_, err = fs.file.ReadAt(data, dh.Pos + DataHeaderSize)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
679
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
680
		return nil, zodb.Tid(0), &ErrXidLoad{xid, noEOF(err)}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
681 682
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
683
	return data, tid, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
684 685 686 687
}

func (fs *FileStorage) Close() error {
	// TODO dump index
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
688
	err := fs.file.Close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
689 690 691
	if err != nil {
		return err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
692
	fs.file = nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
693 694 695 696 697 698 699 700
	return nil
}

func (fs *FileStorage) StorageName() string {
	return "FileStorage v1"
}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
701 702
// txnIter is iterator over transactions
type txnIter struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
703
	fs *FileStorage
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
704

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
705
	Txnh	TxnHeader	// current transaction information
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
706 707 708
	TidStop	zodb.Tid	// iterate up to tid <= tidStop | tid >= tidStop depending on .dir

	Dir	int		// iterate forward (> 0) / backward (< 0) / EOF reached (== 0)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
709 710
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
711 712 713 714 715
func (fi *txnIter) NextTxn(flags TxnLoadFlags) error {
	if fi.Dir == 0 {
		return io.EOF
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
716
	// XXX from what we start? how to yield 1st elem?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
717 718 719 720 721 722 723 724 725
	var err error

	if fi.Dir > 0 {
		err = fi.Txnh.LoadNext(fi.fs.file, flags)
	} else {
		// XXX we are ok to get EOF, provided that LenPrev was read ok?
		err = fi.Txnh.LoadPrev(fi.fs.file, flags)
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
726 727 728 729 730
	if err != nil {
		return err
	}

	// how to make sure last good txnh is preserved?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
731 732 733
	if (fi.Dir > 0 && fi.Txnh.Tid > fi.TidStop) ||
	   (fi.Dir < 0 && fi.Txnh.Tid < fi.TidStop) {
		fi.Dir = 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
734 735 736 737 738 739 740
		return io.EOF
	}

	return nil
}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
741 742
type Iterator struct {
	txnIter txnIter
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
743 744
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
745 746
func (fsi *Iterator) NextTxn(txnInfo *zodb.TxnInfo) (dataIter zodb.IStorageRecordIterator, err error) {
	err = fsi.txnIter.NextTxn(LoadAll)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
747
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
748
		return nil, err	// XXX recheck
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
749 750
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
751
	*txnInfo = fsi.txnIter.Txnh.TxnInfo
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
752 753 754

	// TODO set dataIter

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
755
	return nil /*dataIter*/, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
756 757 758
}

func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
759 760 761
	if tidMin < fs.tidMin {
		tidMin = fs.tidMin
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
762 763 764 765
	if tidMax > fs.tidMax {
		tidMax = fs.tidMax
	}
	if tidMin > tidMax {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
766 767 768
		// -> XXX empty
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
769 770
	// scan either from file start or end, depending which way it is likely closer, to tidMin
	iter := txnIter{fs: fs}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
771

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
772 773 774 775 776 777 778
	if (tidMin - fs.tidMin) < (fs.tidMax - tidMin) {
		// XXX recheck how we enter loop
		iter.Dir = +1
		iter.Txnh.Pos = txnValidFrom	// XXX -> txnStartFrom ?
		// XXX .LenPrev = 0
		// XXX .Len = ?
		iter.TidStop = tidMin
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
779
	} else {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
780 781 782 783 784 785
		// XXX recheck how we enter loop
		iter.Dir = -1
		iter.Txnh.Pos = fs.topPos
		// XXX .LenPrev = ?
		// XXX .Len = 0
		iter.TidStop = tidMin
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
786 787
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
788
	var err error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
789
	for {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
790 791 792 793 794 795
		err = iter.NextTxn(LoadNoStrings)
		// XXX err
		if err == io.EOF {
			err = nil
			break
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
796 797
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
798 799 800
	if err != nil {
		panic(err)	// XXX
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
801

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
802 803 804 805 806 807 808 809 810 811 812 813
	fmt.Printf("tidRange: %v..%v -> found %v @%v", tidMin, tidMax, iter.Txnh.Tid, iter.Txnh.Pos)
	return nil

//	// prepare to start iterating from found transaction
//	// XXX loadStrings() on first step ?
//	iter.Txnh.Tid
//
//	// txnh should have .Tid <= tidMin but next txn's .Tid is > tidMin
//	posStart := iter.txnPos
//	if t
//
//	return &FileStorageIterator{-1, tidMin, tidMax}	// XXX -1 ok ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
814
}