replicator.py 14.1 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
import neo.lib
19 20
from random import choice

Grégory Wisniewski's avatar
Grégory Wisniewski committed
21
from neo.storage.handlers import replication
22 23 24
from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.lib.connection import ClientConnection
from neo.lib.util import dump
25 26 27 28

class Partition(object):
    """This class abstracts the state of a partition."""

29
    def __init__(self, offset, tid):
30
        self.offset = offset
31 32 33
        if tid is None:
            tid = ZERO_TID
        self.tid = tid
34

35 36
    def getOffset(self):
        return self.offset
37 38 39 40

    def getCriticalTID(self):
        return self.tid

41 42 43 44
    def safe(self, min_pending_tid):
        tid = self.tid
        return tid is not None and (
            min_pending_tid is None or tid < min_pending_tid)
45

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
class Task(object):
    """
    A Task is a callable to execute at another time, with given parameters.
    Execution result is kept and can be retrieved later.
    """

    _func = None
    _args = None
    _kw = None
    _result = None
    _processed = False

    def __init__(self, func, args=(), kw=None):
        self._func = func
        self._args = args
        if kw is None:
            kw = {}
        self._kw = kw

    def process(self):
        if self._processed:
            raise ValueError, 'You cannot process a single Task twice'
        self._processed = True
        self._result = self._func(*self._args, **self._kw)

    def getResult(self):
        # Should we instead execute immediately rather than raising ?
        if not self._processed:
            raise ValueError, 'You cannot get a result until task is executed'
        return self._result

    def __repr__(self):
        fmt = '<%s at %x %r(*%r, **%r)%%s>' % (self.__class__.__name__,
            id(self), self._func, self._args, self._kw)
        if self._processed:
            extra = ' => %r' % (self._result, )
        else:
            extra = ''
        return fmt % (extra, )

86 87
class Replicator(object):
    """This class handles replications of objects and transactions.
88

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
    Assumptions:

        - Client nodes recognize partition changes reasonably quickly.

        - When an out of date partition is added, next transaction ID
          is given after the change is notified and serialized.

    Procedures:

        - Get the last TID right after a partition is added. This TID
          is called a "critical TID", because this and TIDs before this
          may not be present in this storage node yet. After a critical
          TID, all transactions must exist in this storage node.

        - Check if a primary master node still has pending transactions
          before and at a critical TID. If so, I must wait for them to be
          committed or aborted.

        - In order to copy data, first get the list of TIDs. This is done
          part by part, because the list can be very huge. When getting
          a part of the list, I verify if they are in my database, and
          ask data only for non-existing TIDs. This is performed until
          the check reaches a critical TID.

        - Next, get the list of OIDs. And, for each OID, ask the history,
          namely, a list of serials. This is also done part by part, and
          I ask only non-existing data. """

117
    # new_partition_set
Vincent Pelletier's avatar
Vincent Pelletier committed
118 119
    #   outdated partitions for which no critical tid was asked to primary
    #   master yet
120
    # critical_tid_list
Vincent Pelletier's avatar
Vincent Pelletier committed
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
    #   outdated partitions for which a critical tid was asked to primary
    #   master, but not answered so far
    # partition_dict
    #   outdated partitions (with or without a critical tid - if without, it
    #   was asked to primary master)
    # current_partition
    #   partition being currently synchronised
    # current_connection
    #   connection to a storage node we are replicating from
    # waiting_for_unfinished_tids
    #   unfinished_tid_list has been asked to primary master node, but it
    #   didn't answer yet.
    # unfinished_tid_list
    #   The list of unfinished TIDs known by master node.
    # replication_done
    #   False if we know there is something to replicate.
    #   True when current_partition is replicated, or we don't know yet if
    #   there is something to replicate

140 141
    current_partition = None
    current_connection = None
142
    waiting_for_unfinished_tids = False
143
    unfinished_tid_list = None
144
    replication_done = True
145

146 147
    def __init__(self, app):
        self.app = app
148
        self.new_partition_set = set()
149
        self.critical_tid_list = []
150 151 152
        self.partition_dict = {}
        self.task_list = []
        self.task_dict = {}
153

154 155 156 157 158 159 160 161
    def masterLost(self):
        """
        When connection to primary master is lost, stop waiting for unfinished
        transactions.
        """
        self.critical_tid_list = []
        self.waiting_for_unfinished_tids = False

162 163 164 165 166 167
    def storageLost(self):
        """
        Restart replicating.
        """
        self.reset()

168 169 170 171 172 173
    def populate(self):
        """
        Populate partitions to replicate. Must be called when partition
        table is the one accepted by primary master.
        Implies a reset.
        """
174 175
        partition_list = self.app.pt.getOutdatedOffsetListFor(self.app.uuid)
        self.new_partition_set = set(partition_list)
176
        self.partition_dict = {}
177
        self.reset()
178 179 180

    def reset(self):
        """Reset attributes to restart replicating."""
181 182
        self.task_list = []
        self.task_dict = {}
183 184 185 186 187 188 189
        self.current_partition = None
        self.current_connection = None
        self.unfinished_tid_list = None
        self.replication_done = True

    def pending(self):
        """Return whether there is any pending partition."""
190 191
        return len(self.partition_dict) or len(self.new_partition_set) \
            or self.critical_tid_list
192

193
    def getCurrentOffset(self):
194
        assert self.current_partition is not None
195
        return self.current_partition.getOffset()
196 197 198 199 200 201 202 203 204 205 206 207

    def getCurrentCriticalTID(self):
        assert self.current_partition is not None
        return self.current_partition.getCriticalTID()

    def setReplicationDone(self):
        """ Callback from ReplicationHandler """
        self.replication_done = True

    def isCurrentConnection(self, conn):
        return self.current_connection is conn

208
    def setCriticalTID(self, tid):
209
        """This is a callback from MasterOperationHandler."""
210
        neo.lib.logging.debug('setting critical TID %s to %s', dump(tid),
211 212 213
            ', '.join([str(p) for p in self.critical_tid_list]))
        for offset in self.critical_tid_list:
            self.partition_dict[offset] = Partition(offset, tid)
214
        self.critical_tid_list = []
215 216

    def _askCriticalTID(self):
217
        self.app.master_conn.ask(Packets.AskLastIDs())
218 219
        self.critical_tid_list.extend(self.new_partition_set)
        self.new_partition_set.clear()
220 221

    def setUnfinishedTIDList(self, tid_list):
222
        """This is a callback from MasterOperationHandler."""
223
        neo.lib.logging.debug('setting unfinished TIDs %s',
224
                      ','.join([dump(tid) for tid in tid_list]))
225 226 227 228
        self.waiting_for_unfinished_tids = False
        self.unfinished_tid_list = tid_list

    def _askUnfinishedTIDs(self):
229
        conn = self.app.master_conn
230
        conn.ask(Packets.AskUnfinishedTransactions())
231 232 233 234 235
        self.waiting_for_unfinished_tids = True

    def _startReplication(self):
        # Choose a storage node for the source.
        app = self.app
236
        cell_list = app.pt.getCellList(self.current_partition.getOffset(),
237 238 239
                                       readable=True)
        node_list = [cell.getNode() for cell in cell_list
                        if cell.getNodeState() == NodeStates.RUNNING]
240 241
        try:
            node = choice(node_list)
242
        except IndexError:
243
            # Not operational.
244
            neo.lib.logging.error('not operational', exc_info = 1)
245
            self.current_partition = None
246 247
            return

248
        addr = node.getAddress()
249
        if addr is None:
250
            neo.lib.logging.error("no address known for the selected node %s" %
251
                    (dump(node.getUUID()), ))
252
            return
253
        if self.current_connection is not None:
254
            if self.current_connection.getAddress() != addr:
255 256 257 258
                self.current_connection.close()
                self.current_connection = None

        if self.current_connection is None:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
259
            handler = replication.ReplicationHandler(app)
260
            self.current_connection = ClientConnection(app.em, handler,
261
                   addr=addr, connector=app.connector_handler())
262
            p = Packets.RequestIdentification(NodeTypes.STORAGE,
263
                    app.uuid, app.server, app.name)
264
            self.current_connection.ask(p)
265 266 267
        else:
            self.current_connection.getHandler().startReplication(
                self.current_connection)
268 269
        self.replication_done = False

270
    def _finishReplication(self):
271
        # TODO: remove try..except: pass
272 273
        try:
            # Notify to a primary master node that my cell is now up-to-date.
274
            conn = self.app.master_conn
275
            offset = self.current_partition.getOffset()
276
            self.partition_dict.pop(offset)
277
            conn.notify(Packets.NotifyReplicationDone(offset))
278
        except KeyError:
279 280
            pass
        self.current_partition = None
281 282 283
        if not self.pending():
            self.current_connection.close()
            self.current_connection = None
284

285 286 287
    def act(self):
        # If the new partition list is not empty, I must ask a critical
        # TID to a primary master node.
288
        if self.new_partition_set:
289
            self._askCriticalTID()
290

291
        if self.current_partition is not None:
292 293 294 295 296
            # Don't end replication until we have received all expected
            # answers, as we might have asked object data just before the last
            # AnswerCheckSerialRange.
            if self.replication_done and \
                    not self.current_connection.isPending():
297
                # finish a replication
298
                neo.lib.logging.info('replication is done for %s' %
299
                        (self.current_partition.getOffset(), ))
300
                self._finishReplication()
301 302 303 304
            return

        if self.waiting_for_unfinished_tids:
            # Still waiting.
305
            neo.lib.logging.debug('waiting for unfinished tids')
306 307 308 309
            return

        if self.unfinished_tid_list is None:
            # Ask pending transactions.
310
            neo.lib.logging.debug('asking unfinished tids')
311 312 313 314
            self._askUnfinishedTIDs()
            return

        # Try to select something.
315 316 317 318
        if len(self.unfinished_tid_list):
            min_unfinished_tid = min(self.unfinished_tid_list)
        else:
            min_unfinished_tid = None
319
        for partition in self.partition_dict.values():
320
            if partition.safe(min_unfinished_tid):
321 322 323 324
                self.current_partition = partition
                break
        else:
            # Not yet.
325
            self.unfinished_tid_list = None
326
            neo.lib.logging.debug('not ready yet')
327 328 329
            return

        self._startReplication()
330

331
    def removePartition(self, offset):
332
        """This is a callback from MasterOperationHandler."""
333
        self.partition_dict.pop(offset, None)
334
        self.new_partition_set.discard(offset)
335

336
    def addPartition(self, offset):
337
        """This is a callback from MasterOperationHandler."""
338 339
        if not self.partition_dict.has_key(offset):
            self.new_partition_set.add(offset)
340

341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
    def _addTask(self, key, func, args=(), kw=None):
        task = Task(func, args, kw)
        task_dict = self.task_dict
        if key in task_dict:
            raise ValueError, 'Task with key %r already exists (%r), cannot ' \
                'add %r' % (key, task_dict[key], task)
        task_dict[key] = task
        self.task_list.append(task)

    def processDelayedTasks(self):
        task_list = self.task_list
        if task_list:
            for task in task_list:
                task.process()
            self.task_list = []

357
    def checkTIDRange(self, min_tid, max_tid, length, partition):
358 359
        app = self.app
        self._addTask(('TID', min_tid, length), app.dm.checkTIDRange,
360
            (min_tid, max_tid, length, app.pt.getPartitions(), partition))
361

362 363
    def checkSerialRange(self, min_oid, min_serial, max_tid, length,
            partition):
364 365
        app = self.app
        self._addTask(('Serial', min_oid, min_serial, length),
366
            app.dm.checkSerialRange, (min_oid, min_serial, max_tid, length,
367 368
            app.pt.getPartitions(), partition))

369
    def getTIDsFrom(self, min_tid, max_tid, length, partition):
370 371
        app = self.app
        self._addTask('TIDsFrom',
372
            app.dm.getReplicationTIDList, (min_tid, max_tid, length,
373 374
            app.pt.getPartitions(), partition))

375 376
    def getObjectHistoryFrom(self, min_oid, min_serial, max_serial, length,
            partition):
377 378
        app = self.app
        self._addTask('ObjectHistoryFrom',
379 380
            app.dm.getObjectHistoryFrom, (min_oid, min_serial, max_serial,
            length, app.pt.getPartitions(), partition))
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396

    def _getCheckResult(self, key):
        return self.task_dict.pop(key).getResult()

    def getTIDCheckResult(self, min_tid, length):
        return self._getCheckResult(('TID', min_tid, length))

    def getSerialCheckResult(self, min_oid, min_serial, length):
        return self._getCheckResult(('Serial', min_oid, min_serial, length))

    def getTIDsFromResult(self):
        return self._getCheckResult('TIDsFrom')

    def getObjectHistoryFromResult(self):
        return self._getCheckResult('ObjectHistoryFrom')