Commit c081a86f authored by Julien Muchembled's avatar Julien Muchembled

wip

parent f9c988fa
......@@ -543,7 +543,7 @@ class MySQLDatabaseManager(DatabaseManager):
def _changePartitionTable(self, cell_list, reset=False):
q = self.query
delete = set(q("SELECT `partition`, nid FROM pt"))) if reset else set()
delete = set(q("SELECT `partition`, nid FROM pt")) if reset else set()
for offset, nid, tid, pack in cell_list:
key = offset, nid
if tid is None:
......@@ -681,6 +681,7 @@ class MySQLDatabaseManager(DatabaseManager):
" WHERE `partition`=%s AND oid=%s AND tid=%s"
% (p, oid, util.u64(data_tid))):
return r
assert p not in self._readable_set
if not checksum:
return # delete
e = self.escape
......@@ -957,7 +958,6 @@ class MySQLDatabaseManager(DatabaseManager):
% (offset, oid, max_serial, max_serial,
offset, oid, max_serial)):
value_dict[value_serial].append((i, serial))
data_id_set = set()
sql = " FROM obj WHERE `partition`=%s AND oid=%s AND tid<%s" \
% (offset, oid, max_serial)
for serial, data_id in q("SELECT tid, data_id" + sql):
......
......@@ -491,6 +491,7 @@ class SQLiteDatabaseManager(DatabaseManager):
" WHERE partition=? AND oid=? AND tid=?",
(p, oid, util.u64(data_tid))):
return r
assert p not in self._readable_set
if not checksum:
return # delete
H = buffer(checksum)
......@@ -646,7 +647,8 @@ class SQLiteDatabaseManager(DatabaseManager):
str(user), str(desc), str(ext),
bool(packed), util.p64(ttid),
None if pack_partial is None else (
approved, bool(pack_partial),
None if approved is None else bool(approved),
bool(pack_partial),
None if pack_oids is None else splitOIDField(tid, pack_oids),
util.p64(pack_tid)))
......@@ -737,7 +739,6 @@ class SQLiteDatabaseManager(DatabaseManager):
(offset, oid, max_serial, max_serial,
offset, oid, max_serial)):
value_dict[value_serial].append((i, serial))
data_id_set = set()
sql = " FROM obj WHERE partition=? AND oid=? AND tid<?"
args = offset, oid, max_serial
for serial, data_id in q("SELECT tid, data_id" + sql, args):
......
......@@ -436,6 +436,7 @@ class Replicator(object):
app.tm.replicated(offset, tid)
logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node)
if app.pt.getCell(offset, app.uuid).isUpToDate():
app.maybePack() # only useful in backup mode
self.getCurrentConnection().setReconnectionNoDelay()
self._nextPartition()
......@@ -486,10 +487,13 @@ class Replicator(object):
self._nextPartition()
def filterPackable(self, tid, parts):
backup = self.app.dm.getBackupTID()
for offset in parts:
if backup:
p = self.partition_dict[offset]
if not (None is not p.next_trans <= tid or
if (None is not p.next_trans <= tid or
None is not p.next_obj <= tid):
continue
yield offset
def keepPendingSignedPackOrders(self, *args):
......
......@@ -3,7 +3,7 @@ AbortTransaction(p64,[int])
AcceptIdentification(NodeTypes,?int,?int)
AddObject(p64,p64,int,bin,bin,?p64)
AddPendingNodes([int])
AddTransaction(p64,bin,bin,bin,bool,p64,[p64],?)
AddTransaction(p64,bin,bin,bin,bool,p64,[p64],?(?bool,bool,?[p64],p64))
AnswerBeginTransaction(p64)
AnswerCheckCurrentSerial(?p64)
AnswerCheckSerialRange(int,bin,p64,bin,p64)
......
......@@ -54,7 +54,9 @@ class StorageDBTests(NeoUnitTestBase):
uuid = self.getStorageUUID()
db.setUUID(uuid)
self.assertEqual(uuid, db.getUUID())
db.changePartitionTable(Mock(), 1, 0,
app = Mock()
app.last_pack_id = None
db.changePartitionTable(app, 1, 0,
[(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)],
reset=True)
self.assertEqual(num_partitions, 1 + db._getMaxPartition())
......
......@@ -211,8 +211,8 @@ class ImporterTests(NEOThreadedTest):
# does not import data too fast and we test read/write access
# by the client during the import.
dm = cluster.storage.dm
def doOperation(app):
del dm.doOperation
def startJobs(app):
del dm.startJobs
q = app.task_queue[1]
try:
while True:
......@@ -223,7 +223,7 @@ class ImporterTests(NEOThreadedTest):
q.pop()
assert not any(app.task_queue)
app.task_priority = -1
dm.doOperation = doOperation
dm.startJobs = startJobs
cluster.start()
t, c = cluster.getTransaction()
r = c.root()['tree']
......@@ -240,7 +240,7 @@ class ImporterTests(NEOThreadedTest):
h = random_tree.hashTree(r)
h(30)
logging.info("start migration")
dm.doOperation(cluster.storage)
dm.startJobs(cluster.storage)
# Adjust if needed. Must remain > 0.
beforeCheck(h, 22)
# New writes after the switch to NEO.
......@@ -408,7 +408,7 @@ class ImporterTests(NEOThreadedTest):
storage = cluster.storage
dm = storage.dm
with storage.patchDeferred(dm._finished):
with storage.patchDeferred(dm.doOperation):
with storage.patchDeferred(dm.startJobs):
cluster.start()
s = cluster.getZODBStorage()
check() # before import
......
......@@ -15,18 +15,50 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from collections import deque
from collections import defaultdict, deque
from contextlib import contextmanager, nested
from time import time
from persistent import Persistent
from neo.lib.protocol import Packets
from .. import consume, Patch
from . import NEOThreadedTest, with_cluster
from . import ConnectionFilter, NEOThreadedTest, with_cluster
class PCounter(Persistent):
value = 0
class TestPack(NEOThreadedTest):
@contextmanager
def assertPackOperationCount(self, cluster_or_storages, *counts):
def patch(storage):
packs = {}
def _pack(orig, *args):
offset = args[1]
tid = args[3]
try:
tids = packs[offset]
except KeyError:
packs[offset] = [tid]
else:
self.assertLessEqual(tids[-1], tid)
tids.append(tid)
return orig(*args)
return Patch(storage.dm, _pack=_pack), packs
patches, packs = zip(*map(patch,
getattr(cluster_or_storages, 'storage_list', cluster_or_storages)))
with nested(*patches):
yield
self.assertSequenceEqual(counts,
tuple(sum(len(set(x)) for x in x.itervalues()) for x in packs))
def countAskPackOrders(self, connection_filter):
counts = defaultdict(int)
@connection_filter.add
def _(conn, packet):
if isinstance(packet, Packets.AskPackOrders):
counts[self.getConnectionApp(conn).uuid] += 1
return counts
def populate(self, cluster):
t, c = cluster.getTransaction()
r = c.root()
......@@ -40,23 +72,62 @@ class TestPack(NEOThreadedTest):
yield cluster.client.last_tid
c.close()
@with_cluster(partitions=2, replicas=1, storage_count=3)
def test(self, cluster):
s = cluster.storage_list[0]
@with_cluster(partitions=3, replicas=1, storage_count=3)
def testOutdatedNodeIsBack(self, cluster):
s0 = cluster.storage_list[0]
populate = self.populate(cluster)
consume(populate, 3)
with s.delayTasks(1):
with self.assertPackOperationCount(cluster, 0, 4, 4), \
ConnectionFilter() as f:
counts = self.countAskPackOrders(f)
with s0.delayTasks(1):
cluster.client.pack(time())
consume(populate, 2)
cluster.client.pack(time())
s.stop()
cluster.join((s,))
s0.stop()
cluster.join((s0,))
self.tic()
s.resetNode()
# First storage node stops any pack-related work after the first
# response to AskPackOrders. Other storage nodes process a pack order
# for all cells before asking the master for the next pack order.
self.assertEqual(counts, {s.uuid: 1 if s is s0 else 2
for s in cluster.storage_list})
s0.resetNode()
with self.assertPackOperationCount(cluster, 4, 0, 0), \
ConnectionFilter() as f:
counts = self.countAskPackOrders(f)
deque(populate, 0)
s.start()
s0.start()
self.tic()
# The master queries 2 storage nodes for old pack orders and remember
# those that s0 has not completed. s0 processes all orders for the first
# replicated cell and ask them again when the second is up-to-date.
self.assertEqual(counts, {s0.uuid: 4, cluster.master.uuid: 2})
self.checkReplicas(cluster)
@with_cluster(replicas=1)
def testValueSerial(self, cluster):
t, c = cluster.getTransaction()
ob = c.root()[''] = PCounter()
t.commit()
s0 = cluster.storage_list[0]
s0.stop()
cluster.join((s0,))
ob.value += 1
t.commit()
ob.value += 1
t.commit()
s0.resetNode()
with ConnectionFilter() as f:
f.delayAskFetchTransactions()
s0.start()
c.db().undo(ob._p_serial, t.get())
t.commit()
cluster.client.pack(time())
self.tic()
self.tic()
self.checkReplicas(cluster)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment