Commit f84a1095 authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 'master' into x/go

* master:
  client: fix item eviction from cache, which could break loading from storage
  Bump protocol version for new read-only mode in BACKUPING state
  backup: Teach cluster in BACKUPING state to also serve regular ZODB clients in read-only mode
  tests/threaded: Add handy shortcuts to NEOCluster to concisely check cluster properties in tests
parents fe823e9c 4ef05b9e
......@@ -102,6 +102,12 @@ class ClientCache(object):
if item is head:
break
def _remove_from_oid_dict(self, item):
item_list = self._oid_dict[item.oid]
item_list.remove(item)
if not item_list:
del self._oid_dict[item.oid]
def _add(self, item):
level = item.level
try:
......@@ -126,10 +132,7 @@ class ClientCache(object):
self._history_size += 1
if self._max_history_size < self._history_size:
self._remove(head)
item_list = self._oid_dict[head.oid]
item_list.remove(head)
if not item_list:
del self._oid_dict[head.oid]
self._remove_from_oid_dict(head)
def _remove(self, item):
level = item.level
......@@ -165,7 +168,7 @@ class ClientCache(object):
if head.level or head.counter:
self._add(head)
else:
self._oid_dict[head.oid].remove(head)
self._remove_from_oid_dict(head)
break
def _load(self, oid, before_tid=None):
......@@ -247,7 +250,7 @@ class ClientCache(object):
head.level = 0
self._add(head)
else:
self._oid_dict[head.oid].remove(head)
self._remove_from_oid_dict(head)
if self._size <= max_size:
return
head = next
......
......@@ -15,9 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import handler
from ZODB.POSException import StorageError
from ZODB.POSException import StorageError, ReadOnlyError
class AnswerBaseHandler(handler.AnswerBaseHandler): # XXX
def protocolError(self, conn, message):
raise StorageError("protocol error: %s" % message)
def readOnlyAccess(self, conn, message):
raise ReadOnlyError(message)
......@@ -20,7 +20,7 @@ import traceback
from cStringIO import StringIO
from struct import Struct
PROTOCOL_VERSION = 6
PROTOCOL_VERSION = 7
# Size restrictions.
MIN_PACKET_SIZE = 10
......@@ -75,6 +75,7 @@ def ErrorCodes():
REPLICATION_ERROR
CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED
READ_ONLY_ACCESS
@Enum
def ClusterStates():
......
......@@ -103,6 +103,7 @@ class Application(BaseApplication):
self)
self.secondary_master_handler = secondary.SecondaryMasterHandler(self)
self.client_service_handler = client.ClientServiceHandler(self)
self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(self)
self.storage_service_handler = storage.StorageServiceHandler(self)
registerLiveDebugger(on_log=self.log)
......@@ -418,7 +419,6 @@ class Application(BaseApplication):
return
# select the storage handler
client_handler = self.client_service_handler
if state in (ClusterStates.RUNNING, ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP, ClusterStates.STOPPING_BACKUP):
storage_handler = self.storage_service_handler
......@@ -435,10 +435,13 @@ class Application(BaseApplication):
conn = node.getConnection()
conn.notify(notification_packet)
if node.isClient():
if state != ClusterStates.RUNNING:
if state == ClusterStates.RUNNING:
handler = self.client_service_handler
elif state == ClusterStates.BACKINGUP:
handler = self.client_ro_service_handler
else:
conn.abort()
continue
handler = client_handler
elif node.isStorage() and storage_handler:
handler = storage_handler
else:
......
......@@ -310,6 +310,8 @@ class BackupApplication(object):
logging.debug("partition %u: updating backup_tid of %r to %s",
offset, cell, dump(tid))
cell.backup_tid = tid
# TODO provide invalidation feedback about new txns to read-only clients connected to backup cluster
# NOTE ^^^ not only here but also hooked to in-progress feedback from fetchObjects (storage)
# Forget tids we won't need anymore.
cell_list = app.pt.getCellList(offset, readable=True)
del tid_list[:bisect(tid_list, min(x.backup_tid for x in cell_list))]
......
......@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID
from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID, Errors
from . import MasterHandler
class ClientServiceHandler(MasterHandler):
......@@ -118,3 +118,27 @@ class ClientServiceHandler(MasterHandler):
# BUG: The replicator may wait this transaction to be finished.
self.app.tm.abort(tid, conn.getUUID())
# like ClientServiceHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyServiceHandler(ClientServiceHandler):
def _readOnly(self, conn, *args, **kw):
conn.answer(Errors.ReadOnlyAccess(
'read-only access because cluster is in backuping mode'))
askBeginTransaction = _readOnly
askNewOIDs = _readOnly
askFinishTransaction = _readOnly
askFinalTID = _readOnly
askPack = _readOnly
abortTransaction = _readOnly
# XXX LastIDs is not used by client at all, and it requires work to determine
# last_oid up to backup_tid, so just make it non-functional for client.
askLastIDs = _readOnly
# like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn):
assert self.app.backup_tid is not None # we are in BACKUPING mode
backup_tid = self.app.pt.getBackupTid()
conn.answer(Packets.AnswerLastTransaction(backup_tid))
......@@ -41,9 +41,12 @@ class IdentificationHandler(MasterHandler):
state = NodeStates.RUNNING
if node_type == NodeTypes.CLIENT:
if app.cluster_state != ClusterStates.RUNNING:
raise NotReadyError
if app.cluster_state == ClusterStates.RUNNING:
handler = app.client_service_handler
elif app.cluster_state == ClusterStates.BACKINGUP:
handler = app.client_ro_service_handler
else:
raise NotReadyError
human_readable_node_type = ' client '
elif node_type == NodeTypes.STORAGE:
if app.cluster_state == ClusterStates.STOPPING_BACKUP:
......
......@@ -16,7 +16,7 @@
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.util import dump, makeChecksum
from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError, NotRegisteredError
......@@ -130,7 +130,7 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
min_tid, max_tid, length, partition)))
def askTIDs(self, conn, first, last, partition):
def _askTIDs(self, first, last, partition):
# This method is complicated, because I must return TIDs only
# about usable partitions assigned to me.
if first >= last:
......@@ -142,8 +142,10 @@ class ClientOperationHandler(EventHandler):
else:
partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first, partition_list)
conn.answer(Packets.AnswerTIDs(tid_list))
return app.dm.getTIDList(first, last - first, partition_list)
def askTIDs(self, conn, *args):
conn.answer(Packets.AnswerTIDs(self._askTIDs(*args)))
def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.tm.getFinalTID(ttid)))
......@@ -222,3 +224,67 @@ class ClientOperationHandler(EventHandler):
logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
# like ClientOperationHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyOperationHandler(ClientOperationHandler):
def _readOnly(self, conn, *args, **kw):
conn.answer(Errors.ReadOnlyAccess(
'read-only access because cluster is in backuping mode'))
abortTransaction = _readOnly
askStoreTransaction = _readOnly
askVoteTransaction = _readOnly
askStoreObject = _readOnly
askFinalTID = _readOnly
# takes write lock & is only used when going to commit
askCheckCurrentSerial = _readOnly
# below operations: like in ClientOperationHandler but cut tid <= backup_tid
def askTransactionInformation(self, conn, tid):
backup_tid = self.app.dm.getBackupTID()
if tid > backup_tid:
conn.answer(Errors.TidNotFound(
'tids > %s are not fully fetched yet' % dump(backup_tid)))
return
super(ClientReadOnlyOperationHandler, self).askTransactionInformation(
conn, tid)
def askObject(self, conn, oid, serial, tid):
backup_tid = self.app.dm.getBackupTID()
if serial:
if serial > backup_tid:
# obj lookup will find nothing, but return properly either
# OidDoesNotExist or OidNotFound
serial = ZERO_TID
elif tid:
tid = min(tid, add64(backup_tid, 1))
# limit "latest obj" query to tid <= backup_tid
else:
tid = add64(backup_tid, 1)
super(ClientReadOnlyOperationHandler, self).askObject(
conn, oid, serial, tid)
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
backup_tid = self.app.dm.getBackupTID()
max_tid = min(max_tid, backup_tid)
# NOTE we don't need to adjust min_tid: if min_tid > max_tid
# db.getReplicationTIDList will return empty [], which is correct
super(ClientReadOnlyOperationHandler, self).askTIDsFrom(
conn, min_tid, max_tid, length, partition)
def askTIDs(self, conn, first, last, partition):
backup_tid = self.app.dm.getBackupTID()
tid_list = self._askTIDs(first, last, partition)
tid_list = filter(lambda tid: tid <= backup_tid, tid_list)
conn.answer(Packets.AnswerTIDs(tid_list))
# FIXME askObjectUndoSerial to limit tid <= backup_tid
# (askObjectUndoSerial is used in undo() but itself is read-only query)
# FIXME askObjectHistory to limit tid <= backup_tid
# TODO dm.getObjectHistory has to be first fixed for this
#def askObjectHistory(self, conn, oid, first, last):
......@@ -19,7 +19,7 @@ from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, NodeTypes, NotReadyError, Packets
from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError
from .storage import StorageOperationHandler
from .client import ClientOperationHandler
from .client import ClientOperationHandler, ClientReadOnlyOperationHandler
class IdentificationHandler(EventHandler):
""" Handler used for incoming connections during operation state """
......@@ -48,6 +48,9 @@ class IdentificationHandler(EventHandler):
raise BrokenNodeDisallowedError
# choose the handler according to the node type
if node_type == NodeTypes.CLIENT:
if app.dm.getBackupTID():
handler = ClientReadOnlyOperationHandler
else:
handler = ClientOperationHandler
if node is None:
node = app.nm.createClient(uuid=uuid)
......
......@@ -105,6 +105,7 @@ class StorageOperationHandler(EventHandler):
self.app.dm.commit()
assert not pack_tid, "TODO"
if next_tid:
# TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid)
else:
self.app.replicator.finish()
......
......@@ -668,10 +668,20 @@ class NEOCluster(object):
return admin
###
# More handy shortcuts for tests
@property
def backup_tid(self):
return self.neoctl.getRecovery()[1]
@property
def last_tid(self):
return self.primary_master.getLastTransaction()
@property
def primary_master(self):
master, = [master for master in self.master_list if master.primary]
return master
###
def reset(self, clear_database=False):
for node_type in 'master', 'storage', 'admin':
......
......@@ -17,10 +17,12 @@
import random
import time
import transaction
from ZODB.POSException import ReadOnlyError, POSKeyError
import unittest
from collections import defaultdict
from functools import wraps
from neo.lib import logging
from neo.client.exception import NEOStorageError
from neo.storage.checker import CHECK_COUNT
from neo.storage.replicator import Replicator
from neo.lib.connector import SocketConnector
......@@ -142,7 +144,7 @@ class ReplicationTests(NEOThreadedTest):
self.tic()
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction()))
max_tid=backup.last_tid))
finally:
backup.stop()
backup.reset()
......@@ -160,7 +162,7 @@ class ReplicationTests(NEOThreadedTest):
self.tic()
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction()))
max_tid=backup.last_tid))
finally:
backup.stop()
finally:
......@@ -461,5 +463,99 @@ class ReplicationTests(NEOThreadedTest):
checker.CHECK_COUNT = CHECK_COUNT
cluster.stop()
@backup_test()
def testBackupReadOnlyAccess(self, backup):
"""Check backup cluster can be used in read-only mode by ZODB clients"""
B = backup
U = B.upstream
Z = U.getZODBStorage()
#Zb = B.getZODBStorage() # XXX see below about invalidations
oid_list = []
tid_list = []
# S -> Sb link stops working during [cutoff, recover) test iterations
cutoff = 4
recover = 7
def delayReplication(conn, packet):
return isinstance(packet, Packets.AnswerFetchTransactions)
with ConnectionFilter() as f:
for i in xrange(10):
if i == cutoff:
f.add(delayReplication)
if i == recover:
# .remove() removes the filter and retransmits all packets
# that were queued once first filtered packed was detected
# on a connection.
f.remove(delayReplication)
# commit new data to U
txn = transaction.Transaction()
txn.note('test transaction %i' % i)
Z.tpc_begin(txn)
oid = Z.new_oid()
Z.store(oid, None, '%s-%i' % (oid, i), '', txn)
Z.tpc_vote(txn)
tid = Z.tpc_finish(txn)
oid_list.append(oid)
tid_list.append(tid)
# make sure data propagated to B (depending on cutoff)
self.tic()
if cutoff <= i < recover:
self.assertLess(B.backup_tid, U.last_tid)
else:
self.assertEqual(B.backup_tid, U.last_tid)
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(1, self.checkBackup(B, max_tid=B.backup_tid))
# read data from B and verify it is what it should be
# XXX we open new ZODB storage every time because invalidations
# are not yet implemented in read-only mode.
Zb = B.getZODBStorage()
for j, oid in enumerate(oid_list):
if cutoff <= i < recover and j >= cutoff:
self.assertRaises(POSKeyError, Zb.load, oid, '')
else:
data, serial = Zb.load(oid, '')
self.assertEqual(data, '%s-%s' % (oid, j))
self.assertEqual(serial, tid_list[j])
# verify how transaction log & friends behave under potentially
# not-yet-fully fetched backup state (transactions committed at
# [cutoff, recover) should not be there; otherwise transactions
# should be fully there)
Zb = B.getZODBStorage()
Btxn_list = list(Zb.iterator())
self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover
else i+1)
for j, txn in enumerate(Btxn_list):
self.assertEqual(txn.tid, tid_list[j])
self.assertEqual(txn.description, 'test transaction %i' % j)
obj, = txn
self.assertEqual(obj.oid, oid_list[j])
self.assertEqual(obj.data, '%s-%s' % (obj.oid, j))
# TODO test askObjectHistory once it is fixed
# try to commit something to backup storage and make sure it is
# really read-only
Zb._cache._max_size = 0 # make store() do work in sync way
txn = transaction.Transaction()
self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn)
self.assertRaises(ReadOnlyError, Zb.new_oid)
self.assertRaises(ReadOnlyError, Zb.store, oid_list[-1],
tid_list[-1], 'somedata', '', txn)
# tpc_vote first checks whether there were store replies -
# thus not ReadOnlyError
self.assertRaises(NEOStorageError, Zb.tpc_vote, txn)
# close storage because client app is otherwise shared in
# threaded tests and we need to refresh last_tid on next run
# (XXX see above about invalidations not working)
Zb.close()
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment