manager.py 18.1 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18 19
from neo.lib import util
from neo.lib.exception import DatabaseFailure
20

21 22 23
class CreationUndone(Exception):
    pass

24 25 26
class DatabaseManager(object):
    """This class only describes an interface for database managers."""

27
    def __init__(self):
28 29 30 31 32
        """
            Initialize the object.
        """
        self._under_transaction = False

33 34 35
    def isUnderTransaction(self):
        return self._under_transaction

36 37 38 39 40
    def begin(self):
        """
            Begin a transaction
        """
        if self._under_transaction:
41
            raise DatabaseFailure('A transaction has already begun')
42 43 44 45 46 47 48 49
        self._begin()
        self._under_transaction = True

    def commit(self):
        """
            Commit the current transaction
        """
        if not self._under_transaction:
50
            raise DatabaseFailure('The transaction has not begun')
51 52 53 54 55 56 57 58 59
        self._commit()
        self._under_transaction = False

    def rollback(self):
        """
            Rollback the current transaction
        """
        self._rollback()
        self._under_transaction = False
60 61 62 63

    def setup(self, reset = 0):
        """Set up a database. If reset is true, existing data must be
        discarded."""
64
        raise NotImplementedError
65 66 67 68

    def _begin(self):
        raise NotImplementedError

69 70 71 72 73 74
    def _commit(self):
        raise NotImplementedError

    def _rollback(self):
        raise NotImplementedError

75 76 77
    def _getPartition(self, oid_or_tid):
        return oid_or_tid % self.getNumPartitions()

78 79 80 81
    def getConfiguration(self, key):
        """
            Return a configuration value, returns None if not found or not set
        """
82
        raise NotImplementedError
83

84 85 86 87
    def setConfiguration(self, key, value):
        """
            Set a configuration value
        """
88 89 90 91 92 93 94 95 96 97 98 99
        if self._under_transaction:
            self._setConfiguration(key, value)
        else:
            self.begin()
            try:
                self._setConfiguration(key, value)
            except:
                self.rollback()
                raise
            self.commit()

    def _setConfiguration(self, key, value):
100
        raise NotImplementedError
101

102 103 104 105 106 107 108 109 110 111 112 113
    def getUUID(self):
        """
            Load an UUID from a database.
        """
        return util.bin(self.getConfiguration('uuid'))

    def setUUID(self, uuid):
        """
            Store an UUID into a database.
        """
        self.setConfiguration('uuid', util.dump(uuid))

114
    def getNumPartitions(self):
115 116 117 118 119 120
        """
            Load the number of partitions from a database.
        """
        n = self.getConfiguration('partitions')
        if n is not None:
            return int(n)
121 122

    def setNumPartitions(self, num_partitions):
123 124 125 126
        """
            Store the number of partitions into a database.
        """
        self.setConfiguration('partitions', num_partitions)
127

128
    def getNumReplicas(self):
129 130 131 132 133 134 135 136 137 138 139 140
        """
            Load the number of replicas from a database.
        """
        n = self.getConfiguration('replicas')
        if n is not None:
            return int(n)

    def setNumReplicas(self, num_replicas):
        """
            Store the number of replicas into a database.
        """
        self.setConfiguration('replicas', num_replicas)
141

142
    def getName(self):
143 144 145 146
        """
            Load a name from a database.
        """
        return self.getConfiguration('name')
147 148

    def setName(self, name):
149 150 151 152
        """
            Store a name into a database.
        """
        self.setConfiguration('name', name)
153 154

    def getPTID(self):
155 156 157
        """
            Load a Partition Table ID from a database.
        """
158
        return long(self.getConfiguration('ptid'))
159 160

    def setPTID(self, ptid):
161 162 163
        """
            Store a Partition Table ID into a database.
        """
164 165 166 167
        if ptid is not None:
            assert isinstance(ptid, (int, long)), ptid
            ptid = str(ptid)
        self.setConfiguration('ptid', ptid)
168 169 170 171 172 173 174 175 176 177 178 179

    def getLastOID(self):
        """
            Returns the last OID used
        """
        return util.bin(self.getConfiguration('loid'))

    def setLastOID(self, loid):
        """
            Set the last OID used
        """
        self.setConfiguration('loid', util.dump(loid))
180 181 182 183 184

    def getPartitionTable(self):
        """Return a whole partition table as a tuple of rows. Each row
        is again a tuple of an offset (row ID), an UUID of a storage
        node, and a cell state."""
185
        raise NotImplementedError
186 187 188 189 190

    def getLastTID(self, all = True):
        """Return the last TID in a database. If all is true,
        unfinished transactions must be taken account into. If there
        is no TID in the database, return None."""
191
        raise NotImplementedError
192 193 194

    def getUnfinishedTIDList(self):
        """Return a list of unfinished transaction's IDs."""
195
        raise NotImplementedError
196 197 198 199 200

    def objectPresent(self, oid, tid, all = True):
        """Return true iff an object specified by a given pair of an
        object ID and a transaction ID is present in a database.
        Otherwise, return false. If all is true, the object must be
201
        searched from unfinished transactions as well."""
202
        raise NotImplementedError
203

204 205 206 207 208 209 210 211 212 213
    def _getObject(self, oid, tid=None, before_tid=None):
        """
        oid (int)
            Identifier of object to retrieve.
        tid (int, None)
            Exact serial to retrieve.
        before_tid (packed, None)
            Serial to retrieve is the highest existing one strictly below this
            value.
        """
214
        raise NotImplementedError
215

216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
    def getObject(self, oid, tid=None, before_tid=None, resolve_data=True):
        """
        oid (packed)
            Identifier of object to retrieve.
        tid (packed, None)
            Exact serial to retrieve.
        before_tid (packed, None)
            Serial to retrieve is the highest existing one strictly below this
            value.
        resolve_data (bool, True)
            If actual object data is desired, or raw record content.
            This is different in case retrieved line undoes a transaction.

        Return value:
            None: Given oid doesn't exist in database.
            False: No record found, but another one exists for given oid.
            6-tuple: Record content.
                - record serial (packed)
                - serial or next record modifying object (packed, None)
                - compression (boolean-ish, None)
                - checksum (integer, None)
                - data (binary string, None)
                - data_serial (packed, None)
        """
        # TODO: resolve_data must be unit-tested
        u64 = util.u64
        p64 = util.p64
        oid = u64(oid)
        if tid is not None:
            tid = u64(tid)
        if before_tid is not None:
            before_tid = u64(before_tid)
        result = self._getObject(oid, tid, before_tid)
        if result is None:
            # See if object exists at all
            result = self._getObject(oid)
            if result is not None:
                # Object exists
                result = False
        else:
            serial, next_serial, compression, checksum, data, data_serial = \
                result
258 259
            assert before_tid is None or next_serial is None or \
                   before_tid <= next_serial
260 261 262 263 264
            if data is None and resolve_data:
                try:
                    _, compression, checksum, data = self._getObjectData(oid,
                        data_serial, serial)
                except CreationUndone:
265
                    pass
266 267 268 269 270 271 272 273 274 275
                data_serial = None
            if serial is not None:
                serial = p64(serial)
            if next_serial is not None:
                next_serial = p64(next_serial)
            if data_serial is not None:
                data_serial = p64(data_serial)
            result = serial, next_serial, compression, checksum, data, data_serial
        return result

276 277 278 279 280
    def changePartitionTable(self, ptid, cell_list):
        """Change a part of a partition table. The list of cells is
        a tuple of tuples, each of which consists of an offset (row ID),
        an UUID of a storage node, and a cell state. The Partition
        Table ID must be stored as well."""
281
        raise NotImplementedError
282 283 284 285 286

    def setPartitionTable(self, ptid, cell_list):
        """Set a whole partition table. The semantics is the same as
        changePartitionTable, except that existing data must be
        thrown away."""
287
        raise NotImplementedError
288

289 290
    def dropPartitions(self, num_partitions, offset_list):
        """ Drop any data of non-assigned partitions for a given UUID """
291 292
        raise NotImplementedError('this method must be overriden')

293 294
    def dropUnfinishedData(self):
        """Drop any unfinished data from a database."""
295
        raise NotImplementedError
296

297 298 299 300 301 302
    def storeTransaction(self, tid, object_list, transaction, temporary = True):
        """Store a transaction temporarily, if temporary is true. Note
        that this transaction is not finished yet. The list of objects
        contains tuples, each of which consists of an object ID,
        a compression specification, a checksum and object data.
        The transaction is either None or a tuple of the list of OIDs,
303 304
        user information, a description, extension information and transaction
        pack state (True for packed)."""
305
        raise NotImplementedError
306

307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
    def _getDataTID(self, oid, tid=None, before_tid=None):
        """
        Return a 2-tuple:
        tid (int)
            tid corresponding to received parameters
        serial
            tid at which actual object data is located

        If 'tid is None', requested object and transaction could
        not be found.
        If 'serial is None', requested object exist but has no data (its creation
        has been undone).
        If 'tid == serial', it means that requested transaction
        contains object data.
        Otherwise, it's an undo transaction which did not involve conflict
        resolution.
        """
        raise NotImplementedError

Vincent Pelletier's avatar
Vincent Pelletier committed
326
    def findUndoTID(self, oid, tid, ltid, undone_tid, transaction_object):
327 328 329 330 331
        """
        oid
            Object OID
        tid
            Transation doing the undo
Vincent Pelletier's avatar
Vincent Pelletier committed
332 333 334
        ltid
            Upper (exclued) bound of transactions visible to transaction doing
            the undo.
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
        undone_tid
            Transaction to undo
        transaction_object
            Object data from memory, if it was modified by running
            transaction.
            None if is was not modified by running transaction.

        Returns a 3-tuple:
        current_tid (p64)
            TID of most recent version of the object client's transaction can
            see. This is used later to detect current conflicts (eg, another
            client modifying the same object in parallel)
        data_tid (int)
            TID containing (without indirection) the data prior to undone
            transaction.
            None if object doesn't exist prior to transaction being undone
              (its creation is being undone).
        is_current (bool)
            False if object was modified by later transaction (ie, data_tid is
            not current), True otherwise.
355
        """
356 357 358 359
        u64 = util.u64
        p64 = util.p64
        oid = u64(oid)
        tid = u64(tid)
360 361
        if ltid:
            ltid = u64(ltid)
362 363 364
        undone_tid = u64(undone_tid)
        _getDataTID = self._getDataTID
        if transaction_object is not None:
365 366
            _, _, _, _, tvalue_serial = transaction_object
            current_tid = current_data_tid = u64(tvalue_serial)
367
        else:
Vincent Pelletier's avatar
Vincent Pelletier committed
368
            current_tid, current_data_tid = _getDataTID(oid, before_tid=ltid)
369 370 371 372 373 374 375 376 377 378 379 380
        if current_tid is None:
            return (None, None, False)
        found_undone_tid, undone_data_tid = _getDataTID(oid, tid=undone_tid)
        assert found_undone_tid is not None, (oid, undone_tid)
        is_current = undone_data_tid in (current_data_tid, tid)
        # Load object data as it was before given transaction.
        # It can be None, in which case it means we are undoing object
        # creation.
        _, data_tid = _getDataTID(oid, before_tid=undone_tid)
        if data_tid is not None:
            data_tid = p64(data_tid)
        return p64(current_tid), data_tid, is_current
381

Grégory Wisniewski's avatar
Grégory Wisniewski committed
382
    def finishTransaction(self, tid):
383 384
        """Finish a transaction specified by a given ID, by moving
        temporarily data to a finished area."""
385
        raise NotImplementedError
386

387
    def deleteTransaction(self, tid, oid_list=()):
388 389
        """Delete a transaction and its content specified by a given ID and
        an oid list"""
390
        raise NotImplementedError
391

392
    def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
393
        """Delete all transactions above given TID (inclued) in given
394 395
        partition, but never above max_tid (in case transactions are committed
        during replication)."""
396 397
        raise NotImplementedError

398 399 400 401 402
    def deleteObject(self, oid, serial=None):
        """Delete given object. If serial is given, only delete that serial for
        given oid."""
        raise NotImplementedError

403 404
    def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
            max_tid):
405
        """Delete all objects above given OID and serial (inclued) in given
406 407
        partition, but never above max_tid (in case objects are stored during
        replication)"""
408 409
        raise NotImplementedError

410 411 412 413 414 415
    def getTransaction(self, tid, all = False):
        """Return a tuple of the list of OIDs, user information,
        a description, and extension information, for a given transaction
        ID. If there is no such transaction ID in a database, return None.
        If all is true, the transaction must be searched from a temporary
        area as well."""
416
        raise NotImplementedError
417

418
    def getObjectHistory(self, oid, offset = 0, length = 1):
419
        """Return a list of serials and sizes for a given object ID.
420 421 422
        The length specifies the maximum size of such a list. Result starts
        with latest serial, and the list must be sorted in descending order.
        If there is no such object ID in a database, return None."""
423
        raise NotImplementedError
424

425 426
    def getObjectHistoryFrom(self, oid, min_serial, max_serial, length,
            num_partitions, partition):
427
        """Return a dict of length serials grouped by oid at (or above)
428 429
        min_oid and min_serial and below max_serial, for given partition,
        sorted in ascending order."""
430 431
        raise NotImplementedError

432
    def getTIDList(self, offset, length, num_partitions, partition_list):
433 434 435 436 437
        """Return a list of TIDs in ascending order from an offset,
        at most the specified length. The list of partitions are passed
        to filter out non-applicable TIDs."""
        raise NotImplementedError

438
    def getReplicationTIDList(self, min_tid, max_tid, length, num_partitions,
439
        partition):
440
        """Return a list of TIDs in ascending order from an initial tid value,
441 442
        at most the specified length up to max_tid. The partition number is
        passed to filter out non-applicable TIDs."""
443
        raise NotImplementedError
444

445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
    def pack(self, tid, updateObjectDataForPack):
        """Prune all non-current object revisions at given tid.
        updateObjectDataForPack is a function called for each deleted object
        and revision with:
        - OID
        - packed TID
        - new value_serial
            If object data was moved to an after-pack-tid revision, this
            parameter contains the TID of that revision, allowing to backlink
            to it.
        - getObjectData function
            To call if value_serial is None and an object needs to be updated.
            Takes no parameter, returns a 3-tuple: compression, checksum,
            value
        """
460
        raise NotImplementedError
461

462
    def checkTIDRange(self, min_tid, max_tid, length, num_partitions, partition):
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
        """
        Generate a diggest from transaction list.
        min_tid (packed)
            TID at which verification starts.
        length (int)
            Maximum number of records to include in result.
        num_partitions, partition (int, int)
            Specifies concerned partition.

        Returns a 3-tuple:
            - number of records actually found
            - a XOR computed from record's TID
              0 if no record found
            - biggest TID found (ie, TID of last record read)
              ZERO_TID if not record found
        """
        raise NotImplementedError

481 482
    def checkSerialRange(self, min_oid, min_serial, max_tid, length,
            num_partitions, partition):
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
        """
        Generate a diggest from object list.
        min_oid (packed)
            OID at which verification starts.
        min_serial (packed)
            Serial of min_oid object at which search should start.
        length
            Maximum number of records to include in result.
        num_partitions, partition (int, int)
            Specifies concerned partition.

        Returns a 5-tuple:
            - number of records actually found
            - a XOR computed from record's OID
              0 if no record found
            - biggest OID found (ie, OID of last record read)
              ZERO_OID if no record found
            - a XOR computed from record's serial
              0 if no record found
            - biggest serial found for biggest OID found (ie, serial of last
              record read)
              ZERO_TID if no record found
        """
        raise NotImplementedError