FileStorage.py 80 KB
Newer Older
1
##############################################################################
matt@zope.com's avatar
matt@zope.com committed
2
#
3 4
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
5
# 
matt@zope.com's avatar
matt@zope.com committed
6 7 8 9 10 11
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12 13
# 
##############################################################################
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# 
#  File-based ZODB storage
# 
# Files are arranged as follows.
# 
#   - The first 4 bytes are a file identifier.
#   
#   - The rest of the file consists of a sequence of transaction
#     "records".
# 
# A transaction record consists of:
# 
#   - 8-byte transaction id, which is also a time stamp.
#   
#   - 8-byte transaction record length - 8.
#   
#   - 1-byte status code
#   
#   - 2-byte length of user name
#   
#   - 2-byte length of description 
#   
36
#   - 2-byte length of extension attributes 
37 38 39 40
#   
#   -   user name
#   
#   -   description
41 42
#
#   -   extension attributes
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
# 
#   * A sequence of data records
#   
#   - 8-byte redundant transaction length -8
# 
# A data record consists of
# 
#   - 8-byte oid.
# 
#   - 8-byte serial, which is a type stamp that matches the
#     transaction timestamp.
# 
#   - 8-byte previous-record file-position.
# 
#   - 8-byte beginning of transaction record file position.
# 
#   - 2-byte version length
# 
#   - 8-byte data length
# 
#   ? 8-byte position of non-version data
#     (if version length > 0)
# 
#   ? 8-byte position of previous record in this version
#     (if version length > 0)
# 
#   ?   version string 
#     (if version length > 0)
# 
#   ?   data
#     (data length > 0)
# 
#   ? 8-byte position of data record containing data
#     (data length == 0)
# 
# Note that the lengths and positions are all big-endian.
# Also, the object ids time stamps are big-endian, so comparisons
# are meaningful.
# 
# Version handling
# 
#   There isn't a separate store for versions.  Each record has a
#   version field, indicating what version it is in.  The records in a
#   version form a linked list.  Each record that has a non-empty
#   version string has a pointer to the previous record in the version.
#   Version back pointers are retained *even* when versions are
#   committed or aborted or when transactions are undone.
# 
#   There is a notion of "current" version records, which are the
#   records in a version that are the current records for their
#   respective objects.  When a version is comitted, the current records
#   are committed to the destination version.  When a version is
#   aborted, the current records are aborted.
# 
#   When committing or aborting, we search backward through the linked
#   list until we find a record for an object that does not have a
#   current record in the version.  If we find a record for which the
#   non-version pointer is the same as the previous pointer, then we
#   forget that the corresponding object had a current record in the
#   version. This strategy allows us to avoid searching backward through
#   previously committed or aborted version records.
# 
#   Of course, we ignore records in undone transactions when committing
#   or aborting.
#
# Backpointers
#
#   When we commit or abort a version, we don't copy (or delete)
#   and data.  Instead, we write records with back pointers.
#
#   A version record *never* has a back pointer to a non-version
#   record, because we never abort to a version.  A non-version record
#   may have a back pointer to a version record or to a non-version
#   record.
#
118
__version__='$Revision: 1.93 $'[11:-2]
Jim Fulton's avatar
Jim Fulton committed
119

120
import struct, time, os, string, base64, sys
Jim Fulton's avatar
Jim Fulton committed
121
from struct import pack, unpack
122
import POSException
123
from POSException import UndoError, POSKeyError
Jim Fulton's avatar
Jim Fulton committed
124
from TimeStamp import TimeStamp
125
from lock_file import lock_file
126
from utils import t32, p64, U64, cp
127
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC, register_subsystem
128
register_subsystem('ZODB FS')
129
import BaseStorage
130
from cPickle import Pickler, Unpickler, loads
131
import ConflictResolution
Jim Fulton's avatar
Jim Fulton committed
132

133 134 135 136 137 138
try:
    from fsIndex import fsIndex
except ImportError:
    def fsIndex():
        return {}

139 140 141
try: from posix import fsync
except: fsync=None

Jeremy Hylton's avatar
Jeremy Hylton committed
142
from types import StringType
143

Jim Fulton's avatar
Jim Fulton committed
144
z64='\0'*8
Jeremy Hylton's avatar
Jeremy Hylton committed
145 146 147 148
# constants to support various header sizes
TRANS_HDR_LEN = 23
DATA_HDR_LEN = 42
DATA_VERSION_HDR_LEN = 58
Jim Fulton's avatar
Jim Fulton committed
149

150 151
def warn(message, *data):
    LOG('ZODB FS',WARNING, "%s  warn: %s\n" % (packed_version, (message % data)))
Jim Fulton's avatar
Jim Fulton committed
152

153 154
def error(message, *data):
    LOG('ZODB FS',ERROR,"%s ERROR: %s\n" % (packed_version, (message % data)))
Jim Fulton's avatar
Jim Fulton committed
155

156 157 158
def nearPanic(message, *data):
    LOG('ZODB FS',PANIC,"%s ERROR: %s\n" % (packed_version, (message % data)))

159
def panic(message, *data):
Jim Fulton's avatar
Jim Fulton committed
160
    message=message%data
161
    LOG('ZODB FS',PANIC,"%s ERROR: %s\n" % (packed_version, message))
162
    raise CorruptedTransactionError, message
Jim Fulton's avatar
Jim Fulton committed
163

164
class FileStorageError(POSException.StorageError): pass
Jim Fulton's avatar
Jim Fulton committed
165

166
class FileStorageFormatError(FileStorageError):
Jim Fulton's avatar
Jim Fulton committed
167 168 169 170 171 172 173 174 175 176 177 178 179
    """Invalid file format

    The format of the given file is not valid
    """

class CorruptedFileStorageError(FileStorageError,
                                POSException.StorageSystemError):
    """Corrupted file storage
    """

class CorruptedTransactionError(CorruptedFileStorageError): pass
class CorruptedDataError(CorruptedFileStorageError): pass

180
class FileStorageQuotaError(FileStorageError,
Jeremy Hylton's avatar
Jeremy Hylton committed
181
                            POSException.StorageSystemError):
182 183 184
    """File storage quota exceeded
    """

185
packed_version='FS21'
Jim Fulton's avatar
Jim Fulton committed
186

187 188
class FileStorage(BaseStorage.BaseStorage,
                  ConflictResolution.ConflictResolvingStorage):
189
    _packt=z64
Jim Fulton's avatar
Jim Fulton committed
190

191 192
    def __init__(self, file_name, create=0, read_only=0, stop=None,
                 quota=None):
Jim Fulton's avatar
Jim Fulton committed
193

194 195
        if not os.path.exists(file_name):
            create = 1
196

Jim Fulton's avatar
Jim Fulton committed
197
        if read_only:
198
            self._is_read_only = 1
199 200
            if create:
                raise ValueError, "can\'t create a read-only file"
Jim Fulton's avatar
Jim Fulton committed
201 202 203
        elif stop is not None:
            raise ValueError, "time-travel is only supported in read-only mode"

204 205
        if stop is None:
            stop='\377'*8
206

207
        # Lock the database and set up the temp file.
208
        if not read_only:
209 210 211 212
            try:
                f = open(file_name + '.lock', 'r+')
            except:
                f = open(file_name+'.lock', 'w+')
213 214 215 216
            lock_file(f)
            try:
                f.write(str(os.getpid()))
                f.flush()
217 218 219
            except:
                pass
            self._lock_file = f # so it stays open
220

221
            self._tfile = open(file_name + '.tmp', 'w+b')
222
        else:
223
            self._tfile = None
224

225
        self._file_name = file_name
226

227 228
        BaseStorage.BaseStorage.__init__(self, file_name)

229
        index, vindex, tindex, tvindex = self._newIndexes()
230 231
        self._initIndex(index, vindex, tindex, tvindex)
        
Jim Fulton's avatar
Jim Fulton committed
232 233 234
        # Now open the file
        
        if create:
235 236
            if os.path.exists(file_name):
                os.remove(file_name)
237 238
            self._file = open(file_name, 'w+b')
            self._file.write(packed_version)
239
        else:
240
            self._file = open(file_name, read_only and 'rb' or 'r+b')
241

242
        r = self._restore_index()
Jeremy Hylton's avatar
Jeremy Hylton committed
243
        if r is not None:
244 245 246
            index, vindex, start, maxoid, ltid = r
            self._initIndex(index, vindex, tindex, tvindex)
            self._pos, self._oid, tid = read_index(
247
                self._file, file_name, index, vindex, tindex, stop,
248
                ltid=ltid, start=start, maxoid=maxoid,
Jim Fulton's avatar
Jim Fulton committed
249
                read_only=read_only,
250
                )
251
        else:
252
            self._pos, self._oid, tid = read_index(
253
                self._file, file_name, index, vindex, tindex, stop,
Jim Fulton's avatar
Jim Fulton committed
254 255
                read_only=read_only,
                )
256
        self._ltid = tid
Jim Fulton's avatar
Jim Fulton committed
257

258 259 260
        self._ts = tid = TimeStamp(tid)
        t = time.time()
        t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
261 262 263 264
        if tid > t:
            warn("%s Database records in the future", file_name);
            if tid.timeTime() - t.timeTime() > 86400*30:
                # a month in the future? This is bogus, use current time
265
                self._ts = t
266

267
        self._quota = quota
Jim Fulton's avatar
Jim Fulton committed
268

269 270 271 272 273 274 275 276
    def _initIndex(self, index, vindex, tindex, tvindex):
        self._index=index
        self._vindex=vindex
        self._tindex=tindex
        self._tvindex=tvindex
        self._index_get=index.get
        self._vindex_get=vindex.get

277 278
    def __len__(self):
        return len(self._index)
Jim Fulton's avatar
Jim Fulton committed
279

280 281 282
    def _newIndexes(self):
        # hook to use something other than builtin dict
        return {}, {}, {}, {}
283 284
        
    def abortVersion(self, src, transaction):
285
        return self.commitVersion(src, '', transaction, abort=1)
Jim Fulton's avatar
Jim Fulton committed
286

287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
    def _save_index(self):
        """Write the database index to a file to support quick startup
        """
        
        index_name=self.__name__+'.index'
        tmp_name=index_name+'.index_tmp'

        f=open(tmp_name,'wb')
        p=Pickler(f,1)

        info={'index': self._index, 'pos': self._pos,
              'oid': self._oid, 'vindex': self._vindex}

        p.dump(info)
        f.flush()
        f.close()
        try:
304 305 306 307
            try:
                os.remove(index_name)
            except OSError:
                pass
308 309 310
            os.rename(tmp_name, index_name)
        except: pass

311 312 313
    def _clear_index(self):
        index_name=self.__name__+'.index'
        if os.path.exists(index_name):
314 315 316 317
            try:
                os.remove(index_name)
            except OSError:
                pass
318

319 320 321 322 323 324 325 326
    def _sane(self, index, pos):
        """Sanity check saved index data by reading the last undone trans

        Basically, we read the last not undone transaction and
        check to see that the included records are consistent
        with the index.  Any invalid record records or inconsistent
        object positions cause zero to be returned.
        """
Jeremy Hylton's avatar
Jeremy Hylton committed
327
        
328 329 330 331 332 333 334 335 336 337 338
        if pos < 100: return 0
        file=self._file
        seek=file.seek
        read=file.read
        seek(0,2)
        if file.tell() < pos: return 0
        ltid=None

        while 1:
            seek(pos-8)
            rstl=read(8)
339
            tl=U64(rstl)
340 341 342
            pos=pos-tl-8
            if pos < 4: return 0
            seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
343 344
            s = read(TRANS_HDR_LEN)
            tid, stl, status, ul, dl, el = unpack(">8s8scHHH", s)
345 346 347 348
            if not ltid: ltid=tid
            if stl != rstl: return 0 # inconsistent lengths
            if status == 'u': continue # undone trans, search back
            if status not in ' p': return 0
Jeremy Hylton's avatar
Jeremy Hylton committed
349
            if tl < (TRANS_HDR_LEN + ul + dl + el): return 0
350
            tend=pos+tl
Jeremy Hylton's avatar
Jeremy Hylton committed
351
            opos=pos+(TRANS_HDR_LEN + ul + dl + el)
352 353 354 355 356
            if opos==tend: continue # empty trans

            while opos < tend:
                # Read the data records for this transaction    
                seek(opos)
Jeremy Hylton's avatar
Jeremy Hylton committed
357
                h=read(DATA_HDR_LEN)
358
                oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
359 360
                tloc=U64(stloc)
                plen=U64(splen)
361
                
Jeremy Hylton's avatar
Jeremy Hylton committed
362
                dlen=DATA_HDR_LEN+(plen or 8)
363
                if vlen: dlen=dlen+(16+vlen)
364 365 366
    
                if opos+dlen > tend or tloc != pos: return 0

367
                if index.get(oid, 0) != opos: return 0
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
    
                opos=opos+dlen

            return ltid

    def _restore_index(self):
        """Load the database index from a file to support quick startup
        """
        file_name=self.__name__
        index_name=file_name+'.index'
        
        try: f=open(index_name,'rb')
        except: return None
        
        p=Unpickler(f)

384 385 386
        try:
            info=p.load()
        except:
387
            exc, err = sys.exc_info()[:2]
388 389 390
            warn("Failed to load database index: %s: %s" %
                 (exc, err))
            return None
391 392 393 394
        index=info.get('index')
        pos=info.get('pos')
        oid=info.get('oid')
        vindex=info.get('vindex')
395 396
        if index is None or pos is None or oid is None or vindex is None:
            return None
397
        pos = long(pos)
398 399 400 401 402 403

        tid=self._sane(index, pos)
        if not tid: return None
        
        return index, vindex, pos, oid, tid

Jim Fulton's avatar
Jim Fulton committed
404 405
    def close(self):
        self._file.close()
406 407 408 409 410 411 412 413 414
        if hasattr(self,'_lock_file'):
            self._lock_file.close()
        if self._tfile:
            self._tfile.close()
        try:
            self._save_index()
        except:
            # XXX should log the error, though
            pass # We don't care if this fails.
Jim Fulton's avatar
Jim Fulton committed
415
        
416
    def commitVersion(self, src, dest, transaction, abort=None):
417
        # We are going to commit by simply storing back pointers.
418 419
        if self._is_read_only:
            raise POSException.ReadOnlyError()
420 421
        if not (src and isinstance(src, StringType)
                and isinstance(dest, StringType)):
422 423
            raise POSException.VersionCommitError('Invalid source version')

424 425 426 427
        if src == dest:
            raise POSException.VersionCommitError(
                "Can't commit to same version: %s" % repr(src))

428
        if dest and abort:
429 430
            raise POSException.VersionCommitError(
                "Internal error, can't abort to a version")
431
        
Jim Fulton's avatar
Jim Fulton committed
432 433
        if transaction is not self._transaction:
            raise POSException.StorageTransactionError(self, transaction)
434
        
435
        self._lock_acquire()
Jim Fulton's avatar
Jim Fulton committed
436
        try:
Jeremy Hylton's avatar
Jeremy Hylton committed
437 438 439
            return self._commitVersion(src, dest, transaction, abort)
        finally:
            self._lock_release()
Jim Fulton's avatar
Jim Fulton committed
440

Jeremy Hylton's avatar
Jeremy Hylton committed
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
    def _commitVersion(self, src, dest, transaction, abort=None):
        # call after checking arguments and acquiring lock
        srcpos = self._vindex_get(src, 0)
        spos = p64(srcpos)
        # middle holds bytes 16:34 of a data record:
        #    pos of transaction, len of version name, data length
        #    commit version never writes data, so data length is always 0
        middle = struct.pack(">8sH8s", p64(self._pos), len(dest), z64)

        if dest:
            sd = p64(self._vindex_get(dest, 0))
            heredelta = 66 + len(dest)
        else:
            sd = ''
            heredelta = 50

        here = self._pos + (self._tfile.tell() + self._thl)
        oids = []
        current_oids = {}
        t = None
        tstatus = ' '

        while srcpos:
            self._file.seek(srcpos)
            h = self._file.read(DATA_VERSION_HDR_LEN)
            # h -> oid, serial, prev(oid), tloc, vlen, plen, pnv, pv
            oid=h[:8]
            pnv=h[-16:-8]
            if self._index.get(oid) == srcpos:
                # This is a current record!
                self._tindex[oid] = here
                oids.append(oid)
                self._tfile.write(h[:16] + spos + middle)
                if dest:
                    self._tvindex[dest] = here
                    self._tfile.write(pnv + sd + dest)
                    sd = p64(here)

                self._tfile.write(abort and pnv or spos)
                # data backpointer to src data
                here += heredelta

                current_oids[oid] = 1
484 485

            else:
Jeremy Hylton's avatar
Jeremy Hylton committed
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
                # Hm.  This is a non-current record.  Is there a
                # current record for this oid?
                if not current_oids.has_key(oid):
                    # Nope. We're done *if* this transaction wasn't undone.
                    tloc = h[24:32]
                    if t != tloc:
                        # We haven't checked this transaction before,
                        # get it's status.
                        t = tloc
                        self._file.seek(U64(t) + 16)
                        tstatus = self._file.read(1)
                    if tstatus != 'u':
                        # Yee ha! We can quit
                        break

            spos = h[-8:]
            srcpos = U64(spos)
        return oids
Jim Fulton's avatar
Jim Fulton committed
504 505

    def getSize(self): return self._pos
506 507 508

    def _loada(self, oid, _index, file):
        "Read any version and return the version"
Jeremy Hylton's avatar
Jeremy Hylton committed
509 510 511 512
        try:
            pos=_index[oid]
        except KeyError:
            raise POSKeyError(oid)
513 514
        file.seek(pos)
        read=file.read
Jeremy Hylton's avatar
Jeremy Hylton committed
515
        h=read(DATA_HDR_LEN)
516 517
        doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
        if vlen:
518 519
            nv = read(8) != z64
            file.seek(8,1) # Skip previous version record pointer
520 521 522
            version=read(vlen)
        else:
            version=''
523
            nv=0
524

525
        if plen != z64: return read(U64(plen)), version, nv
526
        return _loadBack(file, oid, read(8))[0], version, nv
527

528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
    def getSerial(self, oid):
        self._lock_acquire()
        try:
            pos = self._index[oid]
            self._file.seek(pos)
            h = self._file.read(34)
            _oid = h[:8]
            if _oid != oid:
                raise CorruptedData, h
            vlen = unpack(">H", h[-2:])[0]
            if vlen:
                # If there is a version, find out its name and let
                # _load() do all the work.  This is less efficient
                # than possible, because _load() will load the pickle
                # data.  Being more efficient is too complicated.
                self._file.seek(24, 1) # skip plen, pnv, and pv
                version = self._file.read(vlen)
                pickledata, serial = self._load(oid, version,
                                                self._index, self._file)
                return serial
            return h[8:16]
        finally:
            self._lock_release()
        

553
    def _load(self, oid, version, _index, file):
Jeremy Hylton's avatar
Jeremy Hylton committed
554 555 556 557
        try:
            pos=_index[oid]
        except KeyError:
            raise POSKeyError(oid)
558 559
        file.seek(pos)
        read=file.read
Jeremy Hylton's avatar
Jeremy Hylton committed
560
        h=read(DATA_HDR_LEN)
561 562 563 564 565 566 567 568 569 570 571 572
        doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
        if doid != oid: raise CorruptedDataError, h
        if vlen:
            pnv=read(8) # Read location of non-version data
            if (not version or len(version) != vlen or
                (read(8) # skip past version link
                 and version != read(vlen))
                ):
                return _loadBack(file, oid, pnv)

        # If we get here, then either this was not a version record,
        # or we've already read past the version data!
573
        if plen != z64: return read(U64(plen)), serial
574 575 576 577
        pnv=read(8)
        # We use the current serial, since that is the one that
        # will get checked when we store.
        return _loadBack(file, oid, pnv)[0], serial
Jim Fulton's avatar
Jim Fulton committed
578 579

    def load(self, oid, version, _stuff=None):
580
        self._lock_acquire()
581 582 583 584
        try:
            return self._load(oid, version, self._index, self._file)
        finally:
            self._lock_release()
585 586

    def loadSerial(self, oid, serial):
587 588 589 590 591
        self._lock_acquire()
        try:
            file=self._file
            seek=file.seek
            read=file.read
592
            try:
593
                pos = self._index[oid]
594 595
            except KeyError:
                raise POSKeyError(oid)
596 597
            while 1:
                seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
598
                h=read(DATA_HDR_LEN)
599 600 601 602
                doid,dserial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
                if doid != oid: raise CorruptedDataError, h
                if dserial == serial: break # Yeee ha!
                # Keep looking for serial
603
                pos=U64(prev)
604 605
                if not pos:
                    raise POSKeyError(serial)
606
                continue
607

608 609 610 611
            if vlen:
                pnv=read(8) # Read location of non-version data
                read(8) # skip past version link
                read(vlen) # skip version
612

613
            if plen != z64: return read(U64(plen))
614 615 616 617 618

            # We got a backpointer, probably from a commit.
            pnv=read(8)
            return _loadBack(file, oid, pnv)[0]
        finally: self._lock_release()
Jim Fulton's avatar
Jim Fulton committed
619 620
                    
    def modifiedInVersion(self, oid):
621
        self._lock_acquire()
Jim Fulton's avatar
Jim Fulton committed
622
        try:
623 624 625 626
            try:
                pos = self._index[oid]
            except KeyError:
                raise POSKeyError(oid)
Jim Fulton's avatar
Jim Fulton committed
627
            file=self._file
628 629
            seek=file.seek
            seek(pos)
Jim Fulton's avatar
Jim Fulton committed
630 631
            doid,serial,prev,tloc,vlen = unpack(">8s8s8s8sH", file.read(34))
            if doid != oid:
Jim Fulton's avatar
Jim Fulton committed
632
                raise CorruptedDataError, pos
Jim Fulton's avatar
Jim Fulton committed
633
            if vlen:
634 635
                seek(24,1) # skip plen, pnv, and pv
                return file.read(vlen)
Jim Fulton's avatar
Jim Fulton committed
636
            return ''
637
        finally: self._lock_release()
Jim Fulton's avatar
Jim Fulton committed
638

Jim Fulton's avatar
Jim Fulton committed
639
    def store(self, oid, serial, data, version, transaction):
640 641
        if self._is_read_only:
            raise POSException.ReadOnlyError()
Jim Fulton's avatar
Jim Fulton committed
642 643 644
        if transaction is not self._transaction:
            raise POSException.StorageTransactionError(self, transaction)

645
        self._lock_acquire()
Jim Fulton's avatar
Jim Fulton committed
646
        try:
647
            old=self._index_get(oid, 0)
Jim Fulton's avatar
Jim Fulton committed
648 649 650 651
            pnv=None
            if old:
                file=self._file
                file.seek(old)
652
                read=file.read
Jeremy Hylton's avatar
Jeremy Hylton committed
653
                h=read(DATA_HDR_LEN)
Jim Fulton's avatar
Jim Fulton committed
654
                doid,oserial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
Jim Fulton's avatar
Jim Fulton committed
655 656
                if doid != oid: raise CorruptedDataError, h
                if vlen:
Jim Fulton's avatar
Jim Fulton committed
657
                    pnv=read(8) # non-version data pointer
658 659 660 661 662
                    read(8) # skip past version link
                    locked_version=read(vlen)
                    if version != locked_version:
                        raise POSException.VersionLockError, (
                            `oid`, locked_version)
Jim Fulton's avatar
Jim Fulton committed
663

664 665 666
                if serial != oserial:
                    data=self.tryToResolveConflict(oid, oserial, serial, data)
                    if not data:
667 668
                        raise POSException.ConflictError(oid=oid,
                                                serials=(oserial, serial))
669 670 671
            else:
                oserial=serial
                    
Jim Fulton's avatar
Jim Fulton committed
672 673 674
            tfile=self._tfile
            write=tfile.write
            pos=self._pos
675
            here=pos+(tfile.tell()+self._thl)
676
            self._tindex[oid]=here
677
            newserial=self._serial
Jim Fulton's avatar
Jim Fulton committed
678
            write(pack(">8s8s8s8sH8s",
679 680
                       oid, newserial, p64(old), p64(pos),
                       len(version), p64(len(data))
Jim Fulton's avatar
Jim Fulton committed
681 682
                       )
                  )
Jim Fulton's avatar
Jim Fulton committed
683 684
            if version:
                if pnv: write(pnv)
Jim Fulton's avatar
Jim Fulton committed
685
                else:   write(p64(old))
Jim Fulton's avatar
Jim Fulton committed
686
                # Link to last record for this version:
687
                tvindex=self._tvindex
688
                pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
689 690
                write(p64(pv))
                tvindex[version]=here
Jim Fulton's avatar
Jim Fulton committed
691
                write(version)
692

Jim Fulton's avatar
Jim Fulton committed
693
            write(data)
Jim Fulton's avatar
Jim Fulton committed
694

695 696
            # Check quota
            quota=self._quota
697
            if quota is not None and pos+(tfile.tell()+self._thl) > quota:
698 699 700
                raise FileStorageQuotaError, (
                    'The storage quota has been exceeded.')

701 702
            return (serial == oserial and newserial
                    or ConflictResolution.ResolvedSerial)
Jim Fulton's avatar
Jim Fulton committed
703
        
Jim Fulton's avatar
Jim Fulton committed
704 705
        finally:
            self._lock_release()
706

707 708 709 710 711 712 713 714 715 716 717 718
    def restore(self, oid, serial, data, version, transaction):
        # A lot like store() but without all the consistency checks.  This
        # should only be used when we /know/ the data is good, hence the
        # method name.  While the signature looks like store() there are some
        # differences:
        #
        # - serial is the serial number of /this/ revision, not of the
        #   previous revision.  It is used instead of self._serial, which is
        #   ignored.
        #
        # - Nothing is returned
        #
719 720
        # - data can be None, which indicates a George Bailey object
        #   (i.e. one who's creation has been transactionally undone).
721 722 723 724 725 726 727 728 729
        if self._is_read_only:
            raise POSException.ReadOnlyError()
        if transaction is not self._transaction:
            raise POSException.StorageTransactionError(self, transaction)

        self._lock_acquire()
        try:
            # Position of the non-version data
            pnv = None
730 731 732 733 734
            # We need to get some information about previous revisions
            # of the object.  Specifically, we need the position of
            # the non-version data if this update is in a version.  We
            # also need the position of the previous record in this
            # version.
735 736 737 738 739
            old = self._index_get(oid, 0)
            if old:
                self._file.seek(old)
                # Read the previous revision record
                h = self._file.read(42)
740 741
                doid,oserial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s",
                                                             h)
742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782
                if doid != oid:
                    raise CorruptedDataError, h
            # Calculate the file position in the temporary file
            here = self._pos + self._tfile.tell() + self._thl
            # And update the temp file index
            self._tindex[oid] = here
            # Write the recovery data record
            if data is None:
                dlen = 0
            else:
                dlen = len(data)
            self._tfile.write(pack('>8s8s8s8sH8s',
                                   oid, serial, p64(old), p64(self._pos),
                                   len(version), p64(dlen)))
            # We need to write some version information if this revision is
            # happening in a version.
            if version:
                # If there's a previous revision in this version, write the
                # position, otherwise write the position of the previous
                # non-version revision.
                if pnv:
                    self._tfile.write(pnv)
                else:
                    self._tfile.write(p64(old))
                # Link to the last record for this version
                pv = self._tvindex.get(version, 0)
                if not pv:
                    self._vindex_get(version, 0)
                self._tfile.write(p64(pv))
                self._tvindex[version] = here
                self._tfile.write(version)
            # And finally, write the data
            if data is None:
                # Write a zero backpointer, which is indication used to
                # represent an un-creation transaction.
                self._tfile.write(z64)
            else:
                self._tfile.write(data)
        finally:
            self._lock_release()

783 784 785 786 787
    def supportsUndo(self):
        return 1
    
    def supportsVersions(self):
        return 1
Jim Fulton's avatar
Jim Fulton committed
788

789
    def _clear_temp(self):
790
        self._tindex.clear()
791
        self._tvindex.clear()
792 793
        if self._tfile is not None:
            self._tfile.seek(0)
794 795

    def _begin(self, tid, u, d, e):
796 797 798 799 800 801 802 803 804 805 806 807 808
        self._nextpos = 0
        self._thl = TRANS_HDR_LEN + len(u) + len(d) + len(e)
        if self._thl > 65535:
            # one of u, d, or e may be > 65535
            # We have to check lengths here because struct.pack
            # doesn't raise an exception on overflow!
            if len(u) > 65535:
                raise FileStorageError('user name too long')
            if len(d) > 65535:
                raise FileStorageError('description too long')
            if len(e) > 65535:
                raise FileStorageError('too much extension data')

809 810 811 812

    def tpc_vote(self, transaction):
        self._lock_acquire()
        try:
813 814 815 816 817 818
            if transaction is not self._transaction:
                return
            dlen = self._tfile.tell()
            if not dlen:
                return # No data in this trans
            self._tfile.seek(0)
819
            user, desc, ext = self._ude
820 821 822
            luser = len(user)
            ldesc = len(desc)
            lext = len(ext)
823

824 825 826
            self._file.seek(self._pos)
            tl = self._thl + dlen
            stl = p64(tl)
827 828 829 830 831

            try:
                # Note that we use a status of 'c', for checkpoint.
                # If this flag isn't cleared, anything after this is
                # suspect.
832 833 834
                self._file.write(pack(
                    ">8s"          "8s" "c"  "H"        "H"        "H"
                     ,self._serial, stl,'c',  luser,     ldesc,     lext,
835
                    ))
836 837 838 839 840 841
                if user:
                    self._file.write(user)
                if desc:
                    self._file.write(desc)
                if ext:
                    self._file.write(ext)
842

843
                cp(self._tfile, self._file, dlen)
844

845 846
                self._file.write(stl)
                self._file.flush()
847 848 849
            except:
                # Hm, an error occured writing out the data. Maybe the
                # disk is full. We don't want any turd at the end.
850
                self._file.truncate(self._pos)
851
                raise
852 853 854
            self._nextpos = self._pos + (tl + 8)
        finally:
            self._lock_release()
855
 
856
    def _finish(self, tid, u, d, e):
857 858 859
        nextpos=self._nextpos
        if nextpos:
            file=self._file
860

861 862 863 864
            # Clear the checkpoint flag
            file.seek(self._pos+16)
            file.write(self._tstatus)        
            file.flush()
865

866
            if fsync is not None: fsync(file.fileno())
867

868
            self._pos=nextpos
869

870
            self._index.update(self._tindex)
871
            self._vindex.update(self._tvindex)
872
        self._ltid = tid
Jim Fulton's avatar
Jim Fulton committed
873

874
    def _abort(self):
875 876 877
        if self._nextpos:
            self._file.truncate(self._pos)
            self._nextpos=0
878

Jim Fulton's avatar
Jim Fulton committed
879
    def undo(self, transaction_id):
880 881
        if self._is_read_only:
            raise POSException.ReadOnlyError()
882
        self._lock_acquire()
883
        try:
884
            self._clear_index()
885
            transaction_id=base64.decodestring(transaction_id + '\n')
886
            tid, tpos = transaction_id[:8], U64(transaction_id[8:])
887 888
            packt=self._packt
            if packt is None or packt > tid:
889
                raise UndoError, (
890 891
                    'Undo is currently disabled for database maintenance.<p>')

Jim Fulton's avatar
Jim Fulton committed
892 893
            file=self._file
            seek=file.seek
894
            read=file.read
895
            index_get=self._index_get
Jim Fulton's avatar
Jim Fulton committed
896 897
            unpack=struct.unpack
            seek(tpos)
Jeremy Hylton's avatar
Jeremy Hylton committed
898 899
            h=read(TRANS_HDR_LEN)
            if len(h) != TRANS_HDR_LEN or h[:8] != tid: 
900
                raise UndoError('Invalid undo transaction id')
Jim Fulton's avatar
Jim Fulton committed
901
            if h[16] == 'u': return
902
            if h[16] != ' ': raise UndoError
903
            tl=U64(h[8:16])
Jeremy Hylton's avatar
Jeremy Hylton committed
904
            ul,dl,el=unpack(">HHH", h[17:TRANS_HDR_LEN])
Jim Fulton's avatar
Jim Fulton committed
905
            tend=tpos+tl
Jeremy Hylton's avatar
Jeremy Hylton committed
906
            pos=tpos+(TRANS_HDR_LEN+ul+dl+el)
907
            t={}
Jim Fulton's avatar
Jim Fulton committed
908 909 910
            while pos < tend:
                # Read the data records for this transaction
                seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
911
                h=read(DATA_HDR_LEN)
Jim Fulton's avatar
Jim Fulton committed
912
                oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
913 914
                plen=U64(splen)
                prev=U64(sprev)
Jeremy Hylton's avatar
Jeremy Hylton committed
915
                dlen=DATA_HDR_LEN+(plen or 8)
916
                if vlen: dlen=dlen+(16+vlen)
917
                if index_get(oid, 0) != pos: raise UndoError
Jim Fulton's avatar
Jim Fulton committed
918
                pos=pos+dlen
919
                if pos > tend: raise UndoError
920
                t[oid]=prev
Jim Fulton's avatar
Jim Fulton committed
921 922 923

            seek(tpos+16)
            file.write('u')
Jim Fulton's avatar
Jim Fulton committed
924
            file.flush()
925
            self._index.update(t)
926 927
            return t.keys()            
        finally: self._lock_release()
Jim Fulton's avatar
Jim Fulton committed
928

929 930
    def supportsTransactionalUndo(self):
        return 1
931

932 933
    def _undoDataInfo(self, oid, pos, tpos):
        """Return the serial, data pointer, data, and version for the oid
934
        record at pos"""
935 936 937 938 939 940 941
        if tpos:
            file=self._tfile
            pos = tpos - self._pos - self._thl
            tpos=file.tell()
        else:
            file=self._file

942 943
        read=file.read
        file.seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
944
        h=read(DATA_HDR_LEN)
945
        roid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
946
        if roid != oid: raise UndoError('Invalid undo transaction id')
947 948 949 950 951 952
        if vlen:
            read(16) # skip nv pointer and version previous pointer
            version=read(vlen)
        else:
            version=''

953 954 955
        plen = U64(splen)
        if plen:
            data = read(plen)
956
        else:
957 958
            data=''
            pos=U64(read(8))
959

960 961 962 963
        if tpos: file.seek(tpos) # Restore temp file to end

        return serial, pos, data, version
        
964 965 966
    def _getVersion(self, oid, pos):
        self._file.seek(pos)
        read=self._file.read
Jeremy Hylton's avatar
Jeremy Hylton committed
967
        h=read(DATA_HDR_LEN)
968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988
        doid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
        if vlen:
            h=read(16)
            return read(vlen), h[:8]
        else:
            return '',''
        
    def _getSerial(self, oid, pos):
        self._file.seek(pos+8)
        return self._file.read(8)

    def _transactionalUndoRecord(self, oid, pos, serial, pre, version):
        """Get the indo information for a data record

        Return a 5-tuple consisting of a pickle, data pointer,
        version, packed non-version data pointer, and current
        position.  If the pickle is true, then the data pointer must
        be 0, but the pickle can be empty *and* the pointer 0.
        """
        
        copy=1 # Can we just copy a data pointer
989
        tpos=self._tindex.get(oid, 0)        
990
        ipos=self._index.get(oid, 0)
991 992
        tipos=tpos or ipos
        if tipos != pos:
993 994
            # Eek, a later transaction modified the data, but,
            # maybe it is pointing at the same data we are.
995
            cserial, cdataptr, cdata, cver = self._undoDataInfo(
996
                oid, ipos, tpos)
997 998
            # Versions of undone record and current record *must* match!
            if cver != version:
999
                raise UndoError('Current and undone versions differ')
1000

1001
            if cdataptr != pos:
1002
                # We aren't sure if we are talking about the same data
1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
                try:
                    if (
                        # The current record wrote a new pickle
                        cdataptr == tipos
                        or
                        # Backpointers are different
                        _loadBackPOS(self._file, oid, p64(pos)) !=
                        _loadBackPOS(self._file, oid, p64(cdataptr))
                        ):
                        if pre and not tpos:
                            copy=0 # we'll try to do conflict resolution
                        else:
                            # We bail if:
                            # - We don't have a previous record, which should
                            #   be impossible.
                            raise UndoError
                except KeyError:
                    # LoadBack gave us a key error. Bail.
                    raise UndoError
1022 1023 1024 1025 1026 1027

        version, snv = self._getVersion(oid, pre)
        if copy:
            # we can just copy our previous-record pointer forward
            return '', pre, version, snv, ipos

1028 1029 1030 1031 1032 1033 1034
        try:
            # returns data, serial tuple
            bdata = _loadBack(self._file, oid, p64(pre))[0]
        except KeyError:
            # couldn't find oid; what's the real explanation for this?
            raise UndoError("_loadBack() failed for %s" % repr(oid))
        data=self.tryToResolveConflict(oid, cserial, serial, bdata, cdata)  
1035 1036 1037 1038

        if data:
            return data, 0, version, snv, ipos

1039
        raise UndoError('Some data were modified by a later transaction')
1040

1041
    # undoLog() returns a description dict that includes an id entry.
1042 1043 1044
    # The id is opaque to the client, but contains the transaction id.
    # The transactionalUndo() implementation does a simple linear
    # search through the file (from the end) to find the transaction.
1045

1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
    def undoLog(self, first=0, last=-20, filter=None):
        if last < 0:
            last = first - last + 1
        self._lock_acquire()
        try:
            if self._packt is None:
                raise UndoError(
                    'Undo is currently disabled for database maintenance.<p>')
            pos = self._pos
            r = []
            i = 0
1057
            # BAW: Why 39 please?  This makes no sense (see also below).
1058 1059 1060 1061 1062 1063
            while i < last and pos > 39:
                self._file.seek(pos - 8)
                pos = pos - U64(self._file.read(8)) - 8
                self._file.seek(pos)
                h = self._file.read(TRANS_HDR_LEN)
                tid, tl, status, ul, dl, el = struct.unpack(">8s8scHHH", h)
1064
                if tid < self._packt or status == 'p':
1065
                    break
1066
                if status != ' ':
1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
                    continue
                d = u = ''
                if ul:
                    u = self._file.read(ul)
                if dl:
                    d = self._file.read(dl)
                e = {}
                if el:
                    try:
                        e = loads(read(el))
                    except:
                        pass
1079
                d = {'id': base64.encodestring(tid).rstrip(),
1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091
                     'time': TimeStamp(tid).timeTime(),
                     'user_name': u,
                     'description': d}
                d.update(e)
                if filter is None or filter(d):
                    if i >= first:
                        r.append(d)
                    i += 1
            return r
        finally:
            self._lock_release()

1092 1093 1094
    def transactionalUndo(self, transaction_id, transaction):
        """Undo a transaction, given by transaction_id.

1095
        Do so by writing new data that reverses the action taken by
Jeremy Hylton's avatar
Jeremy Hylton committed
1096 1097 1098 1099 1100 1101 1102
        the transaction.

        Usually, we can get by with just copying a data pointer, by
        writing a file position rather than a pickle. Sometimes, we
        may do conflict resolution, in which case we actually copy
        new data that results from resolution.
        """
1103 1104 1105

        if self._is_read_only:
            raise POSException.ReadOnlyError()
1106 1107 1108 1109 1110
        if transaction is not self._transaction:
            raise POSException.StorageTransactionError(self, transaction)
        
        self._lock_acquire()
        try:
1111
            return self._txn_undo(transaction_id)
1112 1113 1114
        finally:
            self._lock_release()

1115 1116
    def _txn_undo(self, transaction_id):
        # Find the right transaction to undo and call _txn_undo_write().
1117 1118 1119
        tid = base64.decodestring(transaction_id + '\n')
        assert len(tid) == 8
        tpos = self._txn_find(tid)
1120
        tindex = self._txn_undo_write(tpos, tid)
1121 1122 1123
        self._tindex.update(tindex)
        return tindex.keys()            

1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
    def _txn_find(self, tid):
        pos = self._pos
        # XXX Why 39?  Only because undoLog() uses it as a boundary.
        while pos > 39:
            self._file.seek(pos - 8)
            pos = pos - U64(self._file.read(8)) - 8
            self._file.seek(pos)
            h = self._file.read(TRANS_HDR_LEN)
            _tid = h[:8]
            if _tid == tid:
                return pos
            status = h[16] # get the c in 8s8sc
            if status == 'p' or _tid < self._packt:
                break
        raise UndoError("Invalid transaction id")
1139 1140

    def _txn_undo_write(self, tpos, tid):
1141
        # a helper function to write the data records for transactional undo
1142 1143 1144 1145

        ostloc = p64(self._pos)
        here = self._pos + (self._tfile.tell() + self._thl)
        # Let's move the file pointer back to the start of the txn record.
1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183
        self._file.seek(tpos)
        h = self._file.read(TRANS_HDR_LEN)
        if h[16] == 'u':
            return
        if h[16] != ' ':
            raise UndoError('non-undoable transaction')
        tl = U64(h[8:16])
        ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
        tend = tpos + tl
        pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
        tindex = {}
        failures = {} # keep track of failures, cause we may succeed later
        failed = failures.has_key
        # Read the data records for this transaction
        while pos < tend:
            self._file.seek(pos)
            h = self._file.read(DATA_HDR_LEN)
            oid, serial, sprev, stloc, vlen, splen = \
                 struct.unpack(">8s8s8s8sH8s", h)
            if failed(oid):
                del failures[oid] # second chance! 
            plen = U64(splen)
            prev = U64(sprev)
            if vlen:
                dlen = DATA_VERSION_HDR_LEN + vlen + (plen or 8)
                self._file.seek(16, 1)
                version = self._file.read(vlen)
            else:
                dlen = DATA_HDR_LEN + (plen or 8)
                version = ''

            try:
                p, prev, v, snv, ipos = self._transactionalUndoRecord(
                    oid, pos, serial, prev, version)
            except UndoError, v:
                # Don't fail right away. We may be redeemed later!
                failures[oid] = v
            else:
1184
                plen = len(p)                
1185 1186 1187 1188 1189 1190 1191 1192
                self._tfile.write(pack(">8s8s8s8sH8s",
                                       oid, self._serial, p64(ipos),
                                       ostloc, len(v), p64(plen)))
                if v:
                    vprev=self._tvindex.get(v, 0) or self._vindex.get(v, 0)
                    self._tfile.write(snv + p64(vprev) + v)
                    self._tvindex[v] = here
                    odlen = DATA_VERSION_HDR_LEN + len(v)+(plen or 8)
1193
                else:
1194
                    odlen = DATA_HDR_LEN+(plen or 8)
1195

1196 1197
                if p:
                    self._tfile.write(p)
1198
                else:
1199 1200 1201
                    self._tfile.write(p64(prev))
                tindex[oid] = here
                here += odlen
1202

1203 1204 1205
            pos=pos+dlen
            if pos > tend:
                raise UndoError, 'non-undoable transaction'
1206

1207 1208
        if failures:
            raise UndoError(failures)
1209

1210 1211
        return tindex
        
1212

Jim Fulton's avatar
Jim Fulton committed
1213
    def versionEmpty(self, version):
1214 1215 1216 1217 1218 1219 1220
        if not version:
            # The interface is silent on this case. I think that this should
            # be an error, but Barry thinks this should return 1 if we have
            # any non-version data. This would be excruciatingly painful to
            # test, so I must be right. ;)
            raise POSException.VersionError(
                'The version must be an non-empty string')
1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238
        self._lock_acquire()
        try:
            index=self._index
            file=self._file
            seek=file.seek
            read=file.read
            srcpos=self._vindex_get(version, 0)
            t=tstatus=None
            while srcpos:
                seek(srcpos)
                oid=read(8)
                if index[oid]==srcpos: return 0
                h=read(50) # serial, prev(oid), tloc, vlen, plen, pnv, pv
                tloc=h[16:24]
                if t != tloc:
                    # We haven't checked this transaction before,
                    # get it's status.
                    t=tloc
1239
                    seek(U64(t)+16)
1240 1241 1242 1243 1244
                    tstatus=read(1)

                if tstatus != 'u': return 1

                spos=h[-8:]
1245
                srcpos=U64(spos)
1246 1247 1248

            return 1
        finally: self._lock_release()
Jim Fulton's avatar
Jim Fulton committed
1249

1250
    def versions(self, max=None):
1251 1252
        r=[]
        a=r.append
1253 1254 1255
        keys=self._vindex.keys()
        if max is not None: keys=keys[:max]
        for version in keys:
1256 1257 1258 1259 1260 1261
            if self.versionEmpty(version): continue
            a(version)
            if max and len(r) >= max: return r

        return r

1262
    def history(self, oid, version=None, size=1, filter=None):
1263 1264 1265 1266 1267 1268 1269 1270
        self._lock_acquire()
        try:
            r=[]
            file=self._file
            seek=file.seek
            read=file.read
            pos=self._index[oid]
            wantver=version
1271

1272
            while 1:
1273
                if len(r) >= size: return r
1274
                seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
1275
                h=read(DATA_HDR_LEN)
1276
                doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
1277
                prev=U64(prev)
1278

1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292
                if vlen:
                    nv = read(8) != z64
                    file.seek(8,1) # Skip previous version record pointer
                    version=read(vlen)
                    if wantver is not None and version != wantver:
                        if prev:
                            pos=prev
                            continue
                        else:
                            return r
                else:
                    version=''
                    wantver=None

1293
                seek(U64(tloc))
Jeremy Hylton's avatar
Jeremy Hylton committed
1294
                h=read(TRANS_HDR_LEN)
1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
                tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
                user_name=read(ul)
                description=read(dl)
                if el: d=loads(read(el))
                else: d={}

                d['time']=TimeStamp(serial).timeTime()
                d['user_name']=user_name
                d['description']=description
                d['serial']=serial
                d['version']=version
1306
                d['size']=U64(plen)
1307 1308 1309 1310 1311 1312 1313

                if filter is None or filter(d):
                    r.append(d)

                if prev: pos=prev
                else: return r
        finally: self._lock_release()
1314

1315
    def _redundant_pack(self, file, pos):
1316
        assert pos > 8, pos
1317
        file.seek(pos-8)
1318
        p=U64(file.read(8))
1319
        file.seek(pos-p+8)
1320
        return file.read(1) not in ' u'
1321

1322 1323 1324 1325 1326 1327 1328 1329 1330 1331
    def pack(self, t, referencesf):
        """Copy data from the current database file to a packed file
    
        Non-current records from transactions with time-stamp strings less
        than packtss are ommitted. As are all undone records.
    
        Also, data back pointers that point before packtss are resolved and
        the associated data are copied, since the old records are not copied.
        """

1332 1333
        if self._is_read_only:
            raise POSException.ReadOnlyError()
1334 1335 1336 1337 1338 1339
        # Ugh, this seems long
        
        packing=1 # are we in the packing phase (or the copy phase)
        locked=0
        _lock_acquire=self._lock_acquire
        _lock_release=self._lock_release
Jim Fulton's avatar
Jim Fulton committed
1340 1341
        _commit_lock_acquire=self._commit_lock_acquire
        _commit_lock_release=self._commit_lock_release
1342 1343
        index, vindex, tindex, tvindex = self._newIndexes()
        name=self.__name__
Jim Fulton's avatar
Jim Fulton committed
1344
        file=open(name, 'rb')
1345
        stop=`apply(TimeStamp, time.gmtime(t)[:5]+(t%60,))`
1346
        if stop==z64: raise FileStorageError, 'Invalid pack time'
1347

1348 1349
        # Record pack time so we don't undo while packing
        _lock_acquire()
1350
        try:
1351
            if self._packt != z64:
1352
                # Already packing.
1353
                raise FileStorageError, 'Already packing'
1354
            self._packt=stop
1355
        finally:
1356
            _lock_release()
1357 1358 1359 1360 1361 1362

        try:
            ##################################################################
            # Step 1, get index as of pack time that
            # includes only referenced objects.

1363
            packpos, maxoid, ltid = read_index(
Jim Fulton's avatar
Jim Fulton committed
1364 1365 1366
                file, name, index, vindex, tindex, stop,
                read_only=1,
                )
1367

1368 1369
            if packpos == 4:
                return
1370 1371
            if self._redundant_pack(file, packpos):
                raise FileStorageError, (
1372 1373
                    'The database has already been packed to a later time\n'
                    'or no changes have been made since the last pack')
1374 1375 1376
    
            rootl=[z64]
            pop=rootl.pop
1377
            pindex=fsIndex()
1378 1379 1380 1381 1382 1383 1384 1385
            referenced=pindex.has_key
            _load=self._load
            _loada=self._loada
            v=None
            while rootl:
                oid=pop()
                if referenced(oid): continue
                try:
1386
                    p, v, nv = _loada(oid, index, file)
1387
                    referencesf(p, rootl)
1388
                    if nv:
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406
                        p, serial = _load(oid, '', index, file)
                        referencesf(p, rootl)
    
                    pindex[oid]=index[oid]
                except:
                    pindex[oid]=0
                    error('Bad reference to %s', `(oid,v)`)
    
            spackpos=p64(packpos)
    
            ##################################################################
            # Step 2, copy data and compute new index based on new positions.
            index, vindex, tindex, tvindex = self._newIndexes()
    
            ofile=open(name+'.pack', 'w+b')
    
            # Index for non-version data.  This is a temporary structure
            # to reduce I/O during packing
1407
            nvindex=fsIndex()
1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
    
            # Cache a bunch of methods
            seek=file.seek
            read=file.read
            oseek=ofile.seek
            write=ofile.write
    
            index_get=index.get
            vindex_get=vindex.get
            pindex_get=pindex.get
    
            # Initialize, 
            pv=z64
1421 1422
            offset=0L  # the amount of space freed by packing
            pos=opos=4L
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450
            oseek(0)
            write(packed_version)

            # Copy the data in two stages.  In the packing stage,
            # we skip records that are non-current or that are for
            # unreferenced objects. We also skip undone transactions.
            #
            # After the packing stage, we copy everything but undone
            # transactions, however, we have to update various back pointers.
            # We have to have the storage lock in the second phase to keep
            # data from being changed while we're copying.
            pnv=None
            while 1:

                # Check for end of packed records
                if packing and pos >= packpos:
                    # OK, we're done with the old stuff, now we have
                    # to get the lock so we can copy the new stuff!
                    offset=pos-opos
                    if offset <= 0:
                        # we didn't free any space, there's no point in
                        # continuing
                        ofile.close()
                        file.close()
                        os.remove(name+'.pack')
                        return
                    
                    packing=0
Jim Fulton's avatar
Jim Fulton committed
1451
                    _commit_lock_acquire()
1452 1453 1454 1455 1456 1457
                    _lock_acquire()
                    locked=1
                    self._packt=None # Prevent undo until we're done

                # Read the transaction record
                seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
1458 1459
                h=read(TRANS_HDR_LEN)
                if len(h) < TRANS_HDR_LEN: break
1460
                tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
1461 1462 1463
                if status=='c':
                    # Oops. we found a checkpoint flag.
                    break
1464
                tl=U64(stl)
1465 1466 1467 1468
                tpos=pos
                tend=tpos+tl

                if status=='u':
1469 1470 1471 1472 1473 1474 1475
                    if not packing:
                        # We rely below on a constant offset for unpacked
                        # records. This assumption holds only if we copy
                        # undone unpacked data. This is lame, but necessary
                        # for now to squash a bug.
                        write(h)
                        tl=tl+8
Jeremy Hylton's avatar
Jeremy Hylton committed
1476
                        write(read(tl-TRANS_HDR_LEN))
1477 1478
                        opos=opos+tl
                        
1479 1480 1481 1482 1483 1484 1485
                    # Undone transaction, skip it
                    pos=tend+8
                    continue

                otpos=opos # start pos of output trans

                # write out the transaction record
1486 1487
                status=packing and 'p' or ' '
                write(h[:16]+status+h[17:])
1488 1489 1490 1491 1492
                thl=ul+dl+el
                h=read(thl)
                if len(h) != thl:
                    raise 'Pack Error', opos
                write(h)
Jeremy Hylton's avatar
Jeremy Hylton committed
1493
                thl=TRANS_HDR_LEN+thl
1494 1495 1496 1497 1498 1499 1500
                pos=tpos+thl
                opos=otpos+thl

                while pos < tend:
                    # Read the data records for this transaction

                    seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
1501
                    h=read(DATA_HDR_LEN)
1502 1503
                    oid,serial,sprev,stloc,vlen,splen = unpack(
                        ">8s8s8s8sH8s", h)
1504
                    plen=U64(splen)
Jeremy Hylton's avatar
Jeremy Hylton committed
1505
                    dlen=DATA_HDR_LEN+(plen or 8)
1506 1507

                    if vlen:
1508
                        dlen=dlen+(16+vlen)
1509
                        if packing and pindex_get(oid, 0) != pos:
1510 1511 1512 1513 1514
                            # This is not the most current record, or
                            # the oid is no longer referenced so skip it.
                            pos=pos+dlen
                            continue

1515
                        pnv=U64(read(8))
1516
                        # skip position of previous version record
1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535
                        seek(8,1)
                        version=read(vlen)
                        pv=p64(vindex_get(version, 0))
                        vindex[version]=opos
                    else:
                        if packing:
                            ppos=pindex_get(oid, 0)
                            if ppos != pos:
                                
                                if not ppos:
                                    # This object is no longer referenced
                                    # so skip it.
                                    pos=pos+dlen
                                    continue
                                
                                # This is not the most current record
                                # But maybe it's the most current committed
                                # record.
                                seek(ppos)
Jeremy Hylton's avatar
Jeremy Hylton committed
1536
                                ph=read(DATA_HDR_LEN)
1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
                                pdoid,ps,pp,pt,pvlen,pplen = unpack(
                                    ">8s8s8s8sH8s", ph)
                                if not pvlen:
                                    # The most current record is committed, so
                                    # we can toss this one
                                    pos=pos+dlen
                                    continue
                                pnv=read(8)
                                pnv=_loadBackPOS(file, oid, pnv)
                                if pnv > pos:
                                    # The current non version data is later,
                                    # so this isn't the current record
                                    pos=pos+dlen
                                    continue

1552 1553 1554 1555 1556 1557
                                # Ok, we've gotten this far, so we have
                                # the current record and we're ready to
                                # read the pickle, but we're in the wrong
                                # place, after wandering around to figure
                                # out is we were current. Seek back
                                # to pickle data:
Jeremy Hylton's avatar
Jeremy Hylton committed
1558
                                seek(pos+DATA_HDR_LEN)
1559

1560 1561
                            nvindex[oid]=opos

1562
                    tindex[oid]=opos
1563 1564 1565
                    
                    opos=opos+dlen
                    pos=pos+dlen
1566

1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577
                    if plen:
                        p=read(plen)
                    else:
                        p=read(8)
                        if packing:
                            # When packing we resolve back pointers!
                            p, serial = _loadBack(file, oid, p)
                            plen=len(p)
                            opos=opos+plen-8
                            splen=p64(plen)
                        else:
1578
                            p=U64(p)
1579 1580 1581 1582 1583 1584 1585
                            if p < packpos:
                                # We have a backpointer to a
                                # non-packed record. We have to be
                                # careful.  If we were pointing to a
                                # current record, then we should still
                                # point at one, otherwise, we should
                                # point at the last non-version record.
1586
                                ppos=pindex_get(oid, 0)
1587 1588 1589 1590 1591 1592 1593
                                if ppos:
                                    if ppos==p:
                                        # we were pointing to the
                                        # current record
                                        p=index[oid]
                                    else:
                                        p=nvindex[oid]
1594
                                else:
1595 1596 1597 1598 1599
                                    # Oops, this object was modified
                                    # in a version in which it was deleted.
                                    # Hee hee. It doesn't matter what we
                                    # use cause it's not reachable any more.
                                    p=0
1600 1601 1602 1603 1604 1605
                            else:
                                # This points back to a non-packed record.
                                # Just adjust for the offset
                                p=p-offset
                            p=p64(p)
                            
1606
                    sprev=p64(index_get(oid, 0))
1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633
                    write(pack(">8s8s8s8sH8s",
                               oid,serial,sprev,p64(otpos),vlen,splen))
                    if vlen:
                        if not pnv:
                            write(z64)
                        else:
                            if pnv < packpos:
                                # we need to point to the packed
                                # non-version rec
                                pnv=nvindex[oid]
                            else:
                                # we just need to adjust the pointer
                                # with the offset
                                pnv=pnv-offset
                                
                            write(p64(pnv))
                        write(pv)
                        write(version)

                    write(p)

                # skip the (intentionally redundant) transaction length
                pos=pos+8

                if locked:
                    # temporarily release the lock to give other threads
                    # a chance to do some work!
Jim Fulton's avatar
Jim Fulton committed
1634
                    _commit_lock_release()
1635 1636 1637
                    _lock_release()
                    locked=0

1638 1639
                index.update(tindex) # Record the position
                tindex.clear()
1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666

                # Now, maybe we need to hack or delete the transaction
                otl=opos-otpos
                if otl != tl:
                    # Oops, what came out is not what came in!

                    # Check for empty:
                    if otl==thl:
                        # Empty, slide back over the header:
                        opos=otpos
                        oseek(opos)
                    else:
                        # Not empty, but we need to adjust transaction length
                        # and update the status
                        oseek(otpos+8)
                        otl=p64(otl)
                        write(otl+status)
                        oseek(opos)
                        write(otl)
                        opos=opos+8

                else:
                    write(p64(otl))
                    opos=opos+8


                if not packing:
Jim Fulton's avatar
Jim Fulton committed
1667 1668 1669
                    # We are in the copying phase. We need to get the lock
                    # again to avoid someone writing data while we read it.
                    _commit_lock_acquire()
1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683
                    _lock_acquire()
                    locked=1


            # OK, we've copied everything. Now we need to wrap things
            # up.

            # Hack the files around.
            name=self.__name__

            ofile.flush()
            ofile.close()
            file.close()
            self._file.close()
1684 1685 1686 1687
            try:
                if os.path.exists(name+'.old'):
                    os.remove(name+'.old')
                os.rename(name, name+'.old')
1688 1689 1690 1691 1692 1693 1694 1695 1696 1697
            except:
                # Waaa
                self._file=open(name,'r+b')
                raise

            # OK, we're beyond the point of no return
            os.rename(name+'.pack', name)
            self._file=open(name,'r+b')
            self._initIndex(index, vindex, tindex, tvindex)
            self._pos=opos
1698
            self._save_index()
1699 1700 1701

        finally:

Jim Fulton's avatar
Jim Fulton committed
1702 1703 1704
            if locked:
                _commit_lock_release()
                _lock_release()
1705 1706 1707 1708

            _lock_acquire()
            self._packt=z64
            _lock_release()
Jim Fulton's avatar
Jim Fulton committed
1709

1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735
    def iterator(self, start=None, stop=None):
        return FileIterator(self._file_name, start, stop)

    def lastTransaction(self):
        """Return transaction id for last committed transaction"""
        return self._ltid

    def lastSerial(self, oid):
        """Return last serialno committed for object oid.

        If there is no serialno for this oid -- which can only occur
        if it is a new object -- return None.
        """
        try:
            pos = self._index[oid]
        except KeyError:
            return None
        self._file.seek(pos)
        # first 8 bytes are oid, second 8 bytes are serialno
        h = self._file.read(16)
        if len(h) < 16:
            raise CorruptedDataError, h
        if h[:8] != oid:
            h = h + self._file.read(26) # get rest of header
            raise CorruptedDataError, h
        return h[8:]
1736

Jim Fulton's avatar
Jim Fulton committed
1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770
def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
    """Copy transactions forward in the data file

    This might be done as part of a recovery effort
    """

    # Cache a bunch of methods
    seek=file.seek
    read=file.read
    write=file.write

    index_get=index.get
    vindex_get=vindex.get

    # Initialize, 
    pv=z64
    p1=opos
    p2=pos
    offset=p2-p1
    packpos=opos

    # Copy the data in two stages.  In the packing stage,
    # we skip records that are non-current or that are for
    # unreferenced objects. We also skip undone transactions.
    #
    # After the packing stage, we copy everything but undone
    # transactions, however, we have to update various back pointers.
    # We have to have the storage lock in the second phase to keep
    # data from being changed while we're copying.
    pnv=None
    while 1:

        # Read the transaction record
        seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
1771 1772
        h=read(TRANS_HDR_LEN)
        if len(h) < TRANS_HDR_LEN: break
Jim Fulton's avatar
Jim Fulton committed
1773 1774
        tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
        if status=='c': break # Oops. we found a checkpoint flag.            
1775
        tl=U64(stl)
Jim Fulton's avatar
Jim Fulton committed
1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789
        tpos=pos
        tend=tpos+tl

        otpos=opos # start pos of output trans

        thl=ul+dl+el
        h2=read(thl)
        if len(h2) != thl: raise 'Pack Error', opos

        # write out the transaction record
        seek(opos)
        write(h)
        write(h2)

Jeremy Hylton's avatar
Jeremy Hylton committed
1790
        thl=TRANS_HDR_LEN+thl
Jim Fulton's avatar
Jim Fulton committed
1791 1792 1793 1794 1795 1796
        pos=tpos+thl
        opos=otpos+thl

        while pos < tend:
            # Read the data records for this transaction
            seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
1797
            h=read(DATA_HDR_LEN)
Jim Fulton's avatar
Jim Fulton committed
1798
            oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
1799
            plen=U64(splen)
Jeremy Hylton's avatar
Jeremy Hylton committed
1800
            dlen=DATA_HDR_LEN+(plen or 8)
Jim Fulton's avatar
Jim Fulton committed
1801 1802

            if vlen:
1803 1804
                dlen=dlen+(16+vlen)
                pnv=U64(read(8))
Jim Fulton's avatar
Jim Fulton committed
1805 1806 1807 1808 1809 1810
                # skip position of previous version record
                seek(8,1)
                version=read(vlen)
                pv=p64(vindex_get(version, 0))
                if status != 'u': vindex[version]=opos

1811
            tindex[oid]=opos
Jim Fulton's avatar
Jim Fulton committed
1812 1813 1814 1815

            if plen: p=read(plen)
            else:
                p=read(8)
1816
                p=U64(p)
Jim Fulton's avatar
Jim Fulton committed
1817 1818 1819 1820
                if p >= p2: p=p-offset
                elif p >= p1:
                    # Ick, we're in trouble. Let's bail
                    # to the index and hope for the best
1821
                    p=index_get(oid, 0)
Jim Fulton's avatar
Jim Fulton committed
1822 1823 1824 1825
                p=p64(p)

            # WRITE
            seek(opos)
1826
            sprev=p64(index_get(oid, 0))
Jim Fulton's avatar
Jim Fulton committed
1827 1828 1829 1830 1831 1832 1833
            write(pack(">8s8s8s8sH8s",
                       oid,serial,sprev,p64(otpos),vlen,splen))
            if vlen:
                if not pnv: write(z64)
                else:
                    if pnv >= p2: pnv=pnv-offset
                    elif pnv >= p1:
1834
                        pnv=index_get(oid, 0)
Jim Fulton's avatar
Jim Fulton committed
1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847
                        
                    write(p64(pnv))
                write(pv)
                write(version)

            write(p)
            
            opos=opos+dlen
            pos=pos+dlen

        # skip the (intentionally redundant) transaction length
        pos=pos+8

1848 1849
        if status != 'u':
            index.update(tindex) # Record the position
Jim Fulton's avatar
Jim Fulton committed
1850

1851
        tindex.clear()
Jim Fulton's avatar
Jim Fulton committed
1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864

        write(stl)
        opos=opos+8

    return opos

def search_back(file, pos):
    seek=file.seek
    read=file.read
    seek(0,2)
    s=p=file.tell()
    while p > pos:
        seek(p-8)
1865
        l=U64(read(8))
Jim Fulton's avatar
Jim Fulton committed
1866 1867 1868 1869 1870 1871 1872 1873 1874
        if l <= 0: break
        p=p-l-8

    return p, s

def recover(file_name):
    file=open(file_name, 'r+b')
    index={}
    vindex={}
1875
    tindex={}
Jim Fulton's avatar
Jim Fulton committed
1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896
    
    pos, oid, tid = read_index(
        file, file_name, index, vindex, tindex, recover=1)
    if oid is not None:
        print "Nothing to recover"
        return

    opos=pos
    pos, sz = search_back(file, pos)
    if pos < sz:
        npos = shift_transactions_forward(
            index, vindex, tindex, file, pos, opos,
            )

    file.truncate(npos)

    print "Recovered file, lost %s, ended up with %s bytes" % (
        pos-opos, npos)

    

1897
def read_index(file, name, index, vindex, tindex, stop='\377'*8,
Jim Fulton's avatar
Jim Fulton committed
1898
               ltid=z64, start=4L, maxoid=z64, recover=0, read_only=0):
Jeremy Hylton's avatar
Jeremy Hylton committed
1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919
    """Scan the entire file storage and recreate the index.

    Returns file position, max oid, and last transaction id.  It also
    stores index information in the three dictionary arguments.

    Arguments:
    file -- a file object (the Data.fs)
    name -- the name of the file (presumably file.name)
    index -- dictionary, oid -> data record
    vindex -- dictionary, oid -> data record for version data
    tindex -- dictionary, oid -> data record
       XXX tindex is cleared before return, so it will be empty

    There are several default arguments that affect the scan or the
    return values.  XXX should document them.

    The file position returned is the position just after the last
    valid transaction record.  The oid returned is the maximum object
    id in the data.  The transaction id is the tid of the last
    transaction. 
    """
Jim Fulton's avatar
Jim Fulton committed
1920
    
1921 1922 1923
    read = file.read
    seek = file.seek
    seek(0, 2)
Jim Fulton's avatar
Jim Fulton committed
1924
    file_size=file.tell()
1925

Jim Fulton's avatar
Jim Fulton committed
1926
    if file_size:
1927 1928 1929 1930
        if file_size < start: raise FileStorageFormatError, file.name
        seek(0)
        if read(4) != packed_version: raise FileStorageFormatError, name
    else:
Jim Fulton's avatar
Jim Fulton committed
1931
        if not read_only: file.write(packed_version)
1932
        return 4L, maxoid, ltid
1933 1934 1935

    index_get=index.get
    vndexpos=vindex.get
Jim Fulton's avatar
Jim Fulton committed
1936

1937 1938
    pos=start
    seek(start)
Jim Fulton's avatar
Jim Fulton committed
1939
    unpack=struct.unpack
Jim Fulton's avatar
Jim Fulton committed
1940
    tid='\0'*7+'\1'
Jim Fulton's avatar
Jim Fulton committed
1941 1942 1943

    while 1:
        # Read the transaction record
Jeremy Hylton's avatar
Jeremy Hylton committed
1944
        h=read(TRANS_HDR_LEN)
Jim Fulton's avatar
Jim Fulton committed
1945
        if not h: break
Jeremy Hylton's avatar
Jeremy Hylton committed
1946
        if len(h) != TRANS_HDR_LEN:
Jim Fulton's avatar
Jim Fulton committed
1947 1948 1949 1950
            if not read_only:
                warn('%s truncated at %s', name, pos)
                seek(pos)
                file.truncate()
Jim Fulton's avatar
Jim Fulton committed
1951 1952
            break

1953
        tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
Jim Fulton's avatar
Jim Fulton committed
1954 1955 1956
        if el < 0: el=t32-el

        if tid <= ltid:
1957
            warn("%s time-stamp reduction at %s", name, pos)
Jim Fulton's avatar
Jim Fulton committed
1958 1959
        ltid=tid

1960
        tl=U64(stl)
Jim Fulton's avatar
Jim Fulton committed
1961

1962
        if pos+(tl+8) > file_size or status=='c':
1963 1964
            # Hm, the data were truncated or the checkpoint flag wasn't
            # cleared.  They may also be corrupted,
Jim Fulton's avatar
Jim Fulton committed
1965
            # in which case, we don't want to totally lose the data.
Jim Fulton's avatar
Jim Fulton committed
1966 1967 1968 1969
            if not read_only:
                warn("%s truncated, possibly due to damaged records at %s",
                     name, pos)
                _truncate(file, name, pos)
Jim Fulton's avatar
Jim Fulton committed
1970 1971 1972
            break

        if status not in ' up':
1973
            warn('%s has invalid status, %s, at %s', name, status, pos)
Jim Fulton's avatar
Jim Fulton committed
1974

Jeremy Hylton's avatar
Jeremy Hylton committed
1975
        if tl < (TRANS_HDR_LEN+ul+dl+el):
1976 1977 1978 1979 1980 1981
            # We're in trouble. Find out if this is bad data in the
            # middle of the file, or just a turd that Win 9x dropped
            # at the end when the system crashed.
            # Skip to the end and read what should be the transaction length
            # of the last transaction.
            seek(-8, 2)
1982
            rtl=U64(read(8))
1983 1984
            # Now check to see if the redundant transaction length is
            # reasonable:
Jeremy Hylton's avatar
Jeremy Hylton committed
1985
            if file_size - rtl < pos or rtl < TRANS_HDR_LEN:
1986
                nearPanic('%s has invalid transaction header at %s', name, pos)
Jim Fulton's avatar
Jim Fulton committed
1987 1988 1989 1990 1991 1992
                if not read_only:
                    warn("It appears that there is invalid data at the end of "
                         "the file, possibly due to a system crash.  %s "
                         "truncated to recover from bad data at end."
                         % name)
                    _truncate(file, name, pos)
1993 1994
                break
            else:
1995
                if recover: return pos, None, None
1996
                panic('%s has invalid transaction header at %s', name, pos)
Jim Fulton's avatar
Jim Fulton committed
1997

Jim Fulton's avatar
Jim Fulton committed
1998
        if tid >= stop: break
Jim Fulton's avatar
Jim Fulton committed
1999

Jim Fulton's avatar
Jim Fulton committed
2000 2001 2002 2003 2004
        tpos=pos
        tend=tpos+tl
        
        if status=='u':
            # Undone transaction, skip it
2005
            seek(tend)
Jim Fulton's avatar
Jim Fulton committed
2006 2007
            h=read(8)
            if h != stl:
Jim Fulton's avatar
Jim Fulton committed
2008
                if recover: return tpos, None, None
2009
                panic('%s has inconsistent transaction length at %s',
Jim Fulton's avatar
Jim Fulton committed
2010
                      name, pos)
2011
            pos=tend+8
Jim Fulton's avatar
Jim Fulton committed
2012 2013
            continue

Jeremy Hylton's avatar
Jeremy Hylton committed
2014
        pos=tpos+(TRANS_HDR_LEN+ul+dl+el)
Jim Fulton's avatar
Jim Fulton committed
2015 2016 2017 2018
        while pos < tend:
            # Read the data records for this transaction

            seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
2019
            h=read(DATA_HDR_LEN)
Jim Fulton's avatar
Jim Fulton committed
2020
            oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
2021 2022 2023
            prev=U64(sprev)
            tloc=U64(stloc)
            plen=U64(splen)
Jim Fulton's avatar
Jim Fulton committed
2024
            
Jeremy Hylton's avatar
Jeremy Hylton committed
2025
            dlen=DATA_HDR_LEN+(plen or 8)
2026
            tindex[oid]=pos
Jim Fulton's avatar
Jim Fulton committed
2027 2028
            
            if vlen:
2029
                dlen=dlen+(16+vlen)
Jim Fulton's avatar
Jim Fulton committed
2030
                seek(8,1)
2031
                pv=U64(read(8))
Jim Fulton's avatar
Jim Fulton committed
2032
                version=read(vlen)
2033 2034
                # Jim says: "It's just not worth the bother."
                #if vndexpos(version, 0) != pv:
2035
                #    panic("%s incorrect previous version pointer at %s",
2036
                #          name, pos)
Jim Fulton's avatar
Jim Fulton committed
2037
                vindex[version]=pos
Jim Fulton's avatar
Jim Fulton committed
2038 2039

            if pos+dlen > tend or tloc != tpos:
Jim Fulton's avatar
Jim Fulton committed
2040
                if recover: return tpos, None, None
2041
                panic("%s data record exceeds transaction record at %s",
Jim Fulton's avatar
Jim Fulton committed
2042
                      name, pos)
Jim Fulton's avatar
Jim Fulton committed
2043
                
Jeremy Hylton's avatar
Jeremy Hylton committed
2044
            if index_get(oid, 0) != prev:
2045
                if prev:
Jim Fulton's avatar
Jim Fulton committed
2046
                    if recover: return tpos, None, None
Jim Fulton's avatar
Jim Fulton committed
2047
                    error("%s incorrect previous pointer at %s", name, pos)
2048 2049
                else:
                    warn("%s incorrect previous pointer at %s", name, pos)
Jim Fulton's avatar
Jim Fulton committed
2050

Jim Fulton's avatar
Jim Fulton committed
2051 2052
            pos=pos+dlen

Jim Fulton's avatar
Jim Fulton committed
2053
        if pos != tend:
Jim Fulton's avatar
Jim Fulton committed
2054
            if recover: return tpos, None, None
2055
            panic("%s data records don't add up at %s",name,tpos)
Jim Fulton's avatar
Jim Fulton committed
2056 2057 2058

        # Read the (intentionally redundant) transaction length
        seek(pos)
Jim Fulton's avatar
Jim Fulton committed
2059 2060
        h=read(8)
        if h != stl:
Jim Fulton's avatar
Jim Fulton committed
2061
            if recover: return tpos, None, None
2062
            panic("%s redundant transaction length check failed at %s",
Jim Fulton's avatar
Jim Fulton committed
2063 2064
                  name, pos)
        pos=pos+8
Jeremy Hylton's avatar
Jeremy Hylton committed
2065

2066 2067 2068 2069 2070
        if tindex: # avoid the pathological empty transaction case
            _maxoid = max(tindex.keys()) # in 2.2, just max(tindex)
            maxoid = max(_maxoid, maxoid)
            index.update(tindex)
            tindex.clear()
Jim Fulton's avatar
Jim Fulton committed
2071

2072
    return pos, maxoid, ltid
Jim Fulton's avatar
Jim Fulton committed
2073 2074 2075


def _loadBack(file, oid, back):
Jeremy Hylton's avatar
Jeremy Hylton committed
2076 2077
##    seek=file.seek
##    read=file.read
2078
    
Jim Fulton's avatar
Jim Fulton committed
2079
    while 1:
Jeremy Hylton's avatar
Jeremy Hylton committed
2080
        old = U64(back)
2081 2082
        if not old:
            raise POSKeyError(oid)
Jeremy Hylton's avatar
Jeremy Hylton committed
2083 2084 2085
        file.seek(old)
        h=file.read(DATA_HDR_LEN)
        doid, serial, prev, tloc, vlen, plen = unpack(">8s8s8s8sH8s", h)
Jim Fulton's avatar
Jim Fulton committed
2086

Jeremy Hylton's avatar
Jeremy Hylton committed
2087 2088 2089 2090 2091
        if vlen:
            file.seek(vlen + 16, 1)
        if plen != z64:
            return file.read(U64(plen)), serial
        back = file.read(8) # We got a back pointer!
2092

2093
def _loadBackPOS(file, oid, back):
2094 2095
    """Return the position of the record containing the data used by
    the record at the given position (back)."""
2096 2097 2098 2099
    seek=file.seek
    read=file.read
    
    while 1:
2100
        old=U64(back)
2101 2102
        if not old:
            raise POSKeyError(oid)
2103
        seek(old)
Jeremy Hylton's avatar
Jeremy Hylton committed
2104
        h=read(DATA_HDR_LEN)
2105 2106 2107 2108
        doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
        if vlen: seek(vlen+16,1)
        if plen != z64: return old
        back=read(8) # We got a back pointer!
2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133

def _truncate(file, name, pos):
    seek=file.seek
    seek(0,2)
    file_size=file.tell()
    try:
        i=0
        while 1:
            oname='%s.tr%s' % (name, i)
            if os.path.exists(oname):
                i=i+1
            else:
                warn("Writing truncated data from %s to %s", name, oname)
                o=open(oname,'wb')
                seek(pos)
                cp(file, o, file_size-pos)
                o.close()
                break
    except:
        error("couldn\'t write truncated data for %s", name)
        raise POSException.StorageSystemError, (
            "Couldn't save truncated data")
            
    seek(pos)
    file.truncate()
2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153

class Iterator:
    """A General simple iterator that uses the Python for-loop index protocol
    """
    __index=-1
    __current=None

    def __getitem__(self, i):
        __index=self.__index
        while i > __index:
            __index=__index+1
            self.__current=self.next(__index)

        self.__index=__index
        return self.__current


class FileIterator(Iterator):
    """Iterate over the transactions in a FileStorage file.
    """
Jeremy Hylton's avatar
Jeremy Hylton committed
2154
    _ltid = z64
2155
    _file = None
2156
    
2157
    def __init__(self, file, start=None, stop=None):
2158
        if isinstance(file, StringType):
2159 2160 2161 2162
            file = open(file, 'rb')
        self._file = file
        if file.read(4) != packed_version:
            raise FileStorageFormatError, name
2163
        file.seek(0,2)
2164 2165 2166 2167 2168 2169 2170 2171
        self._file_size = file.tell()
        self._pos = 4L
        assert start is None or isinstance(start, StringType)
        assert stop is None or isinstance(stop, StringType)
        if start:
            self._skip_to_start(start)
        self._stop = stop

2172 2173 2174 2175 2176 2177
    def close(self):
        file = self._file
        if file is not None:
            self._file = None
            file.close()

2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202
    def _skip_to_start(self, start):
        # Scan through the transaction records doing almost no sanity
        # checks. 
        while 1:
            self._file.seek(self._pos)
            h = self._file.read(16)
            if len(h) < 16:
                return
            tid, stl = unpack(">8s8s", h)
            if tid >= start:
                return
            tl = U64(stl)
            try:
                self._pos += tl + 8
            except OverflowError:
                self._pos = long(self._pos) + tl + 8
            if __debug__:
                # Sanity check
                self._file.seek(self._pos - 8, 0)
                rtl = self._file.read(8)
                if rtl != stl:
                    pos = self._file.tell() - 8
                    panic("%s has inconsistent transaction length at %s "
                          "(%s != %s)",
                          self._file.name, pos, U64(rtl), U64(stl))
2203 2204

    def next(self, index=0):
2205 2206 2207 2208
        if self._file is None:
            # A closed iterator.  XXX: Is IOError the best we can do?  For
            # now, mimic a read on a closed file.
            raise IOError, 'iterator is closed'
2209 2210 2211 2212 2213
        file=self._file
        seek=file.seek
        read=file.read
        pos=self._pos

2214
        LOG("ZODB FS", BLATHER, "next(%d)" % index)
2215 2216 2217
        while 1:
            # Read the transaction record
            seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
2218 2219
            h=read(TRANS_HDR_LEN)
            if len(h) < TRANS_HDR_LEN: break
2220 2221 2222 2223 2224

            tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
            if el < 0: el=t32-el

            if tid <= self._ltid:
2225
                warn("%s time-stamp reduction at %s", self._file.name, pos)
2226 2227
            self._ltid=tid

2228
            tl=U64(stl)
2229

2230
            if pos+(tl+8) > self._file_size or status=='c':
2231 2232 2233 2234
                # Hm, the data were truncated or the checkpoint flag wasn't
                # cleared.  They may also be corrupted,
                # in which case, we don't want to totally lose the data.
                warn("%s truncated, possibly due to damaged records at %s",
2235
                     self._file.name, pos)
2236 2237 2238
                break

            if status not in ' up':
2239 2240
                warn('%s has invalid status, %s, at %s', self._file.name,
                     status, pos)
2241

Jeremy Hylton's avatar
Jeremy Hylton committed
2242
            if tl < (TRANS_HDR_LEN+ul+dl+el):
2243 2244 2245 2246 2247
                # We're in trouble. Find out if this is bad data in
                # the middle of the file, or just a turd that Win 9x
                # dropped at the end when the system crashed.  Skip to
                # the end and read what should be the transaction
                # length of the last transaction.
2248
                seek(-8, 2)
2249
                rtl=U64(read(8))
2250 2251
                # Now check to see if the redundant transaction length is
                # reasonable:
Jeremy Hylton's avatar
Jeremy Hylton committed
2252
                if self._file_size - rtl < pos or rtl < TRANS_HDR_LEN:
2253
                    nearPanic('%s has invalid transaction header at %s',
2254
                              self._file.name, pos)
2255 2256 2257
                    warn("It appears that there is invalid data at the end of "
                         "the file, possibly due to a system crash.  %s "
                         "truncated to recover from bad data at end."
2258
                         % self._file.name)
2259 2260
                    break
                else:
2261 2262
                    warn('%s has invalid transaction header at %s',
                         self._file.name, pos)
2263 2264
                    break

2265 2266 2267 2268 2269 2270 2271
            if self._stop is not None:
                LOG("ZODB FS", BLATHER,
                    ("tid %x > stop %x ? %d" %
                     (U64(tid), U64(self._stop), tid > self._stop)))
            if self._stop is not None and tid > self._stop:
                raise IndexError, index

2272 2273 2274 2275 2276 2277 2278 2279 2280
            tpos=pos
            tend=tpos+tl

            if status=='u':
                # Undone transaction, skip it
                seek(tend)
                h=read(8)
                if h != stl:
                    panic('%s has inconsistent transaction length at %s',
2281
                          self._file.name, pos)
2282 2283 2284
                pos=tend+8
                continue

Jeremy Hylton's avatar
Jeremy Hylton committed
2285
            pos=tpos+(TRANS_HDR_LEN+ul+dl+el)
2286 2287 2288
            user=read(ul)
            description=read(dl)
            if el:
2289 2290 2291 2292
                try: e=loads(read(el))
                except: e={}
            else: e={}

2293
            result=RecordIterator(
2294
                tid, status, user, description, e,
2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
                pos, (tend, file, seek, read,
                      tpos,
                      )
                )

            pos=tend

            # Read the (intentionally redundant) transaction length
            seek(pos)
            h=read(8)
            if h != stl:
                warn("%s redundant transaction length check failed at %s",
2307
                     self._file.name, pos)
2308 2309 2310 2311 2312 2313 2314
                break
            self._pos=pos+8

            return result

        raise IndexError, index
    
2315
class RecordIterator(Iterator, BaseStorage.TransactionRecord):
2316 2317
    """Iterate over the transactions in a FileStorage file.
    """
2318
    def __init__(self, tid, status, user, desc, ext, pos, stuff):
2319
        self.tid=tid
2320
        self.status=status
2321 2322 2323 2324 2325 2326 2327 2328 2329
        self.user=user
        self.description=desc
        self._extension=ext
        self._pos=pos
        self._stuff = stuff

    def next(self, index=0):
        name=''
        pos = self._pos
2330
        tend, file, seek, read, tpos = self._stuff
2331 2332 2333
        while pos < tend:
            # Read the data records for this transaction
            seek(pos)
Jeremy Hylton's avatar
Jeremy Hylton committed
2334
            h=read(DATA_HDR_LEN)
2335
            oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
2336 2337 2338
            prev=U64(sprev)
            tloc=U64(stloc)
            plen=U64(splen)
2339

Jeremy Hylton's avatar
Jeremy Hylton committed
2340
            dlen=DATA_HDR_LEN+(plen or 8)
2341 2342

            if vlen:
2343
                dlen=dlen+(16+vlen)
2344
                seek(8,1)
2345
                pv=U64(read(8))
2346 2347 2348 2349 2350 2351 2352 2353 2354 2355
                version=read(vlen)
            else:
                version=''

            if pos+dlen > tend or tloc != tpos:
                warn("%s data record exceeds transaction record at %s",
                     name, pos)
                break

            self._pos=pos+dlen
2356 2357
            if plen:
                p = read(plen)
2358
            else:
2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369
                p = read(8)
                if p == z64:
                    # If the backpointer is 0 (encoded as z64), then
                    # this transaction undoes the object creation.  It
                    # either aborts the version that created the
                    # object or undid the transaction that created it.
                    # Return None instead of a pickle to indicate
                    # this.
                    p = None
                else:
                    p = _loadBack(file, oid, p)[0]
2370
                
Jeremy Hylton's avatar
Jeremy Hylton committed
2371
            r = Record(oid, serial, version, p)
2372 2373 2374 2375 2376
            
            return r
        
        raise IndexError, index

2377
class Record(BaseStorage.DataRecord):
2378 2379 2380
    """An abstract database record
    """
    def __init__(self, *args):
2381
        self.oid, self.serial, self.version, self.data = args