Commit ebffb6d0 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Don't access replicator's attributes directly.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2364 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 90adae0b
...@@ -66,7 +66,7 @@ Both part follow the same mechanism: ...@@ -66,7 +66,7 @@ Both part follow the same mechanism:
def checkConnectionIsReplicatorConnection(func): def checkConnectionIsReplicatorConnection(func):
def decorator(self, conn, *args, **kw): def decorator(self, conn, *args, **kw):
if self.app.replicator.current_connection is conn: if self.app.replicator.isCurrentConnection(conn):
result = func(self, conn, *args, **kw) result = func(self, conn, *args, **kw)
else: else:
# Should probably raise & close connection... # Should probably raise & close connection...
...@@ -152,30 +152,24 @@ class ReplicationHandler(EventHandler): ...@@ -152,30 +152,24 @@ class ReplicationHandler(EventHandler):
del data del data
def _doAskCheckSerialRange(self, min_oid, min_tid, length=RANGE_LENGTH): def _doAskCheckSerialRange(self, min_oid, min_tid, length=RANGE_LENGTH):
replicator = self.app.replicator partition = self.replicator.getCurrentRID()
partition = replicator.current_partition.getRID()
replicator.checkSerialRange(min_oid, min_tid, length, partition) replicator.checkSerialRange(min_oid, min_tid, length, partition)
return Packets.AskCheckSerialRange(min_oid, min_tid, length, partition) return Packets.AskCheckSerialRange(min_oid, min_tid, length, partition)
def _doAskCheckTIDRange(self, min_tid, length=RANGE_LENGTH): def _doAskCheckTIDRange(self, min_tid, length=RANGE_LENGTH):
replicator = self.app.replicator partition = self.replicator.getCurrentRID()
partition = replicator.current_partition.getRID()
replicator.checkTIDRange(min_tid, length, partition) replicator.checkTIDRange(min_tid, length, partition)
return Packets.AskCheckTIDRange(min_tid, length, partition) return Packets.AskCheckTIDRange(min_tid, length, partition)
def _doAskTIDsFrom(self, min_tid, length): def _doAskTIDsFrom(self, min_tid, length):
replicator = self.app.replicator partition_id = self.replicator.getCurrentRID()
partition = replicator.current_partition max_tid = self.replicator.getCurrentCriticalTID()
partition_id = partition.getRID()
max_tid = partition.getCriticalTID()
replicator.getTIDsFrom(min_tid, max_tid, length, partition_id) replicator.getTIDsFrom(min_tid, max_tid, length, partition_id)
return Packets.AskTIDsFrom(min_tid, max_tid, length, partition_id) return Packets.AskTIDsFrom(min_tid, max_tid, length, partition_id)
def _doAskObjectHistoryFrom(self, min_oid, min_serial, length): def _doAskObjectHistoryFrom(self, min_oid, min_serial, length):
replicator = self.app.replicator partition_id = self.replicator.getCurrentRID()
partition = replicator.current_partition max_serial = self.replicator.getCurrentCriticalTID()
partition_id = partition.getRID()
max_serial = partition.getCriticalTID()
replicator.getObjectHistoryFrom(min_oid, min_serial, max_serial, replicator.getObjectHistoryFrom(min_oid, min_serial, max_serial,
length, partition_id) length, partition_id)
return Packets.AskObjectHistoryFrom(min_oid, min_serial, max_serial, return Packets.AskObjectHistoryFrom(min_oid, min_serial, max_serial,
...@@ -206,7 +200,7 @@ class ReplicationHandler(EventHandler): ...@@ -206,7 +200,7 @@ class ReplicationHandler(EventHandler):
count + 1)) count + 1))
if p is None: if p is None:
if count == length and \ if count == length and \
max_tid < replicator.current_partition.getCriticalTID(): max_tid < self.replicator.getCurrentCriticalTID():
# Go on with next chunk # Go on with next chunk
p = self._doAskCheckTIDRange(add64(max_tid, 1)) p = self._doAskCheckTIDRange(add64(max_tid, 1))
else: else:
...@@ -247,7 +241,7 @@ class ReplicationHandler(EventHandler): ...@@ -247,7 +241,7 @@ class ReplicationHandler(EventHandler):
else: else:
# Nothing remains, so the replication for this partition is # Nothing remains, so the replication for this partition is
# finished. # finished.
replicator.replication_done = True replicator.setReplicationDone()
if p is not None: if p is not None:
conn.ask(p) conn.ask(p)
...@@ -123,13 +123,11 @@ class Replicator(object): ...@@ -123,13 +123,11 @@ class Replicator(object):
# critical_tid_dict # critical_tid_dict
# outdated partitions for which a critical tid was asked to primary # outdated partitions for which a critical tid was asked to primary
# master, but not answered so far # master, but not answered so far
# XXX: could probably be made a list/set rather than a dict
# partition_dict # partition_dict
# outdated partitions (with or without a critical tid - if without, it # outdated partitions (with or without a critical tid - if without, it
# was asked to primary master) # was asked to primary master)
# current_partition # current_partition
# partition being currently synchronised # partition being currently synchronised
# XXX: accessed (r) directly by ReplicationHandler
# current_connection # current_connection
# connection to a storage node we are replicating from # connection to a storage node we are replicating from
# XXX: accessed (r) directory by ReplicationHandler # XXX: accessed (r) directory by ReplicationHandler
...@@ -142,7 +140,6 @@ class Replicator(object): ...@@ -142,7 +140,6 @@ class Replicator(object):
# False if we know there is something to replicate. # False if we know there is something to replicate.
# True when current_partition is replicated, or we don't know yet if # True when current_partition is replicated, or we don't know yet if
# there is something to replicate # there is something to replicate
# XXX: accessed (w) directly by ReplicationHandler
new_partition_dict = None new_partition_dict = None
critical_tid_dict = None critical_tid_dict = None
...@@ -192,6 +189,21 @@ class Replicator(object): ...@@ -192,6 +189,21 @@ class Replicator(object):
"""Return whether there is any pending partition.""" """Return whether there is any pending partition."""
return len(self.partition_dict) or len(self.new_partition_dict) return len(self.partition_dict) or len(self.new_partition_dict)
def getCurrentRID(self):
assert self.current_partition is not None
return self.current_partition.getRID()
def getCurrentCriticalTID(self):
assert self.current_partition is not None
return self.current_partition.getCriticalTID()
def setReplicationDone(self):
""" Callback from ReplicationHandler """
self.replication_done = True
def isCurrentConnection(self, conn):
return self.current_connection is conn
def setCriticalTID(self, uuid, tid): def setCriticalTID(self, uuid, tid):
"""This is a callback from MasterOperationHandler.""" """This is a callback from MasterOperationHandler."""
try: try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment