Commit d4944062 authored by Kirill Smelkov's avatar Kirill Smelkov Committed by Julien Muchembled

backup: Teach cluster in BACKUPING state to also serve regular ZODB clients in read-only mode

A backup cluster for tids <= backup_tid has all data to provide regular
read-only ZODB service. Having regular ZODB access to the data can be
handy e.g. for externally verifying data for consistency between
main and backup clusters. Peeking around without disturbing main
cluster might be also useful sometimes.

In this patch:

- master & storage nodes are taught:

    * to instantiate read-only or regular client service handler depending on cluster state:
      RUNNING   -> regular
      BACKINGUP -> read-only

    * in read-only client handler:
      + to reject write-related operations
      + to provide read operations but adjust semantic as last_tid in the database
        would be = backup_tid

- new READ_ONLY_ACCESS protocol error code is introduced so that client can
  raise POSException.ReadOnlyError upon receiving it.

I have not implemented back-channel for invalidations in read-only mode (yet ?).
This way once a client connects to cluster in backup state, it won't see
new data fetched by backup cluster from upstream after client connected.

The reasons invalidations are not implemented is that for now (imho)
there is no off-hand ready infrastructure to get updates from
replicating node on transaction-by-transaction basis (it currently only
notifies when whole batch is done). For consistency verification (main
reason for this patch) we also don't need invalidations to work, as in
that task we always connect afresh to backup. So I simply only put
relevant TODOs about invalidations for now.

The patch is not very polished but should work.

/reviewed-on !4
parent ab552d87
Pipeline #3547 skipped
...@@ -15,9 +15,12 @@ ...@@ -15,9 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import handler from neo.lib import handler
from ZODB.POSException import StorageError from ZODB.POSException import StorageError, ReadOnlyError
class AnswerBaseHandler(handler.AnswerBaseHandler): # XXX class AnswerBaseHandler(handler.AnswerBaseHandler): # XXX
def protocolError(self, conn, message): def protocolError(self, conn, message):
raise StorageError("protocol error: %s" % message) raise StorageError("protocol error: %s" % message)
def readOnlyAccess(self, conn, message):
raise ReadOnlyError(message)
...@@ -75,6 +75,7 @@ def ErrorCodes(): ...@@ -75,6 +75,7 @@ def ErrorCodes():
REPLICATION_ERROR REPLICATION_ERROR
CHECKING_ERROR CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED BACKEND_NOT_IMPLEMENTED
READ_ONLY_ACCESS
@Enum @Enum
def ClusterStates(): def ClusterStates():
......
...@@ -103,6 +103,7 @@ class Application(BaseApplication): ...@@ -103,6 +103,7 @@ class Application(BaseApplication):
self) self)
self.secondary_master_handler = secondary.SecondaryMasterHandler(self) self.secondary_master_handler = secondary.SecondaryMasterHandler(self)
self.client_service_handler = client.ClientServiceHandler(self) self.client_service_handler = client.ClientServiceHandler(self)
self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(self)
self.storage_service_handler = storage.StorageServiceHandler(self) self.storage_service_handler = storage.StorageServiceHandler(self)
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
...@@ -418,7 +419,6 @@ class Application(BaseApplication): ...@@ -418,7 +419,6 @@ class Application(BaseApplication):
return return
# select the storage handler # select the storage handler
client_handler = self.client_service_handler
if state in (ClusterStates.RUNNING, ClusterStates.STARTING_BACKUP, if state in (ClusterStates.RUNNING, ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP, ClusterStates.STOPPING_BACKUP): ClusterStates.BACKINGUP, ClusterStates.STOPPING_BACKUP):
storage_handler = self.storage_service_handler storage_handler = self.storage_service_handler
...@@ -435,10 +435,13 @@ class Application(BaseApplication): ...@@ -435,10 +435,13 @@ class Application(BaseApplication):
conn = node.getConnection() conn = node.getConnection()
conn.notify(notification_packet) conn.notify(notification_packet)
if node.isClient(): if node.isClient():
if state != ClusterStates.RUNNING: if state == ClusterStates.RUNNING:
handler = self.client_service_handler
elif state == ClusterStates.BACKINGUP:
handler = self.client_ro_service_handler
else:
conn.abort() conn.abort()
continue continue
handler = client_handler
elif node.isStorage() and storage_handler: elif node.isStorage() and storage_handler:
handler = storage_handler handler = storage_handler
else: else:
......
...@@ -310,6 +310,8 @@ class BackupApplication(object): ...@@ -310,6 +310,8 @@ class BackupApplication(object):
logging.debug("partition %u: updating backup_tid of %r to %s", logging.debug("partition %u: updating backup_tid of %r to %s",
offset, cell, dump(tid)) offset, cell, dump(tid))
cell.backup_tid = tid cell.backup_tid = tid
# TODO provide invalidation feedback about new txns to read-only clients connected to backup cluster
# NOTE ^^^ not only here but also hooked to in-progress feedback from fetchObjects (storage)
# Forget tids we won't need anymore. # Forget tids we won't need anymore.
cell_list = app.pt.getCellList(offset, readable=True) cell_list = app.pt.getCellList(offset, readable=True)
del tid_list[:bisect(tid_list, min(x.backup_tid for x in cell_list))] del tid_list[:bisect(tid_list, min(x.backup_tid for x in cell_list))]
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID, Errors
from . import MasterHandler from . import MasterHandler
class ClientServiceHandler(MasterHandler): class ClientServiceHandler(MasterHandler):
...@@ -118,3 +118,27 @@ class ClientServiceHandler(MasterHandler): ...@@ -118,3 +118,27 @@ class ClientServiceHandler(MasterHandler):
# BUG: The replicator may wait this transaction to be finished. # BUG: The replicator may wait this transaction to be finished.
self.app.tm.abort(tid, conn.getUUID()) self.app.tm.abort(tid, conn.getUUID())
# like ClientServiceHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyServiceHandler(ClientServiceHandler):
def _readOnly(self, conn, *args, **kw):
conn.answer(Errors.ReadOnlyAccess(
'read-only access because cluster is in backuping mode'))
askBeginTransaction = _readOnly
askNewOIDs = _readOnly
askFinishTransaction = _readOnly
askFinalTID = _readOnly
askPack = _readOnly
abortTransaction = _readOnly
# XXX LastIDs is not used by client at all, and it requires work to determine
# last_oid up to backup_tid, so just make it non-functional for client.
askLastIDs = _readOnly
# like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn):
assert self.app.backup_tid is not None # we are in BACKUPING mode
backup_tid = self.app.pt.getBackupTid()
conn.answer(Packets.AnswerLastTransaction(backup_tid))
...@@ -41,9 +41,12 @@ class IdentificationHandler(MasterHandler): ...@@ -41,9 +41,12 @@ class IdentificationHandler(MasterHandler):
state = NodeStates.RUNNING state = NodeStates.RUNNING
if node_type == NodeTypes.CLIENT: if node_type == NodeTypes.CLIENT:
if app.cluster_state != ClusterStates.RUNNING: if app.cluster_state == ClusterStates.RUNNING:
raise NotReadyError
handler = app.client_service_handler handler = app.client_service_handler
elif app.cluster_state == ClusterStates.BACKINGUP:
handler = app.client_ro_service_handler
else:
raise NotReadyError
human_readable_node_type = ' client ' human_readable_node_type = ' client '
elif node_type == NodeTypes.STORAGE: elif node_type == NodeTypes.STORAGE:
if app.cluster_state == ClusterStates.STOPPING_BACKUP: if app.cluster_state == ClusterStates.STOPPING_BACKUP:
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.util import dump, makeChecksum from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \ from neo.lib.protocol import Packets, LockState, Errors, ProtocolError, \
ZERO_HASH, INVALID_PARTITION ZERO_HASH, INVALID_PARTITION
from ..transactions import ConflictError, DelayedError, NotRegisteredError from ..transactions import ConflictError, DelayedError, NotRegisteredError
...@@ -130,7 +130,7 @@ class ClientOperationHandler(EventHandler): ...@@ -130,7 +130,7 @@ class ClientOperationHandler(EventHandler):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList( conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
min_tid, max_tid, length, partition))) min_tid, max_tid, length, partition)))
def askTIDs(self, conn, first, last, partition): def _askTIDs(self, first, last, partition):
# This method is complicated, because I must return TIDs only # This method is complicated, because I must return TIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
...@@ -142,8 +142,10 @@ class ClientOperationHandler(EventHandler): ...@@ -142,8 +142,10 @@ class ClientOperationHandler(EventHandler):
else: else:
partition_list = [partition] partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first, partition_list) return app.dm.getTIDList(first, last - first, partition_list)
conn.answer(Packets.AnswerTIDs(tid_list))
def askTIDs(self, conn, *args):
conn.answer(Packets.AnswerTIDs(self._askTIDs(*args)))
def askFinalTID(self, conn, ttid): def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.tm.getFinalTID(ttid))) conn.answer(Packets.AnswerFinalTID(self.app.tm.getFinalTID(ttid)))
...@@ -222,3 +224,67 @@ class ClientOperationHandler(EventHandler): ...@@ -222,3 +224,67 @@ class ClientOperationHandler(EventHandler):
logging.info('CheckCurrentSerial delay: %.02fs', duration) logging.info('CheckCurrentSerial delay: %.02fs', duration)
conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial)) conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
# like ClientOperationHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyOperationHandler(ClientOperationHandler):
def _readOnly(self, conn, *args, **kw):
conn.answer(Errors.ReadOnlyAccess(
'read-only access because cluster is in backuping mode'))
abortTransaction = _readOnly
askStoreTransaction = _readOnly
askVoteTransaction = _readOnly
askStoreObject = _readOnly
askFinalTID = _readOnly
# takes write lock & is only used when going to commit
askCheckCurrentSerial = _readOnly
# below operations: like in ClientOperationHandler but cut tid <= backup_tid
def askTransactionInformation(self, conn, tid):
backup_tid = self.app.dm.getBackupTID()
if tid > backup_tid:
conn.answer(Errors.TidNotFound(
'tids > %s are not fully fetched yet' % dump(backup_tid)))
return
super(ClientReadOnlyOperationHandler, self).askTransactionInformation(
conn, tid)
def askObject(self, conn, oid, serial, tid):
backup_tid = self.app.dm.getBackupTID()
if serial:
if serial > backup_tid:
# obj lookup will find nothing, but return properly either
# OidDoesNotExist or OidNotFound
serial = ZERO_TID
elif tid:
tid = min(tid, add64(backup_tid, 1))
# limit "latest obj" query to tid <= backup_tid
else:
tid = add64(backup_tid, 1)
super(ClientReadOnlyOperationHandler, self).askObject(
conn, oid, serial, tid)
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
backup_tid = self.app.dm.getBackupTID()
max_tid = min(max_tid, backup_tid)
# NOTE we don't need to adjust min_tid: if min_tid > max_tid
# db.getReplicationTIDList will return empty [], which is correct
super(ClientReadOnlyOperationHandler, self).askTIDsFrom(
conn, min_tid, max_tid, length, partition)
def askTIDs(self, conn, first, last, partition):
backup_tid = self.app.dm.getBackupTID()
tid_list = self._askTIDs(first, last, partition)
tid_list = filter(lambda tid: tid <= backup_tid, tid_list)
conn.answer(Packets.AnswerTIDs(tid_list))
# FIXME askObjectUndoSerial to limit tid <= backup_tid
# (askObjectUndoSerial is used in undo() but itself is read-only query)
# FIXME askObjectHistory to limit tid <= backup_tid
# TODO dm.getObjectHistory has to be first fixed for this
#def askObjectHistory(self, conn, oid, first, last):
...@@ -19,7 +19,7 @@ from neo.lib.handler import EventHandler ...@@ -19,7 +19,7 @@ from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, NodeTypes, NotReadyError, Packets from neo.lib.protocol import uuid_str, NodeTypes, NotReadyError, Packets
from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError from neo.lib.protocol import ProtocolError, BrokenNodeDisallowedError
from .storage import StorageOperationHandler from .storage import StorageOperationHandler
from .client import ClientOperationHandler from .client import ClientOperationHandler, ClientReadOnlyOperationHandler
class IdentificationHandler(EventHandler): class IdentificationHandler(EventHandler):
""" Handler used for incoming connections during operation state """ """ Handler used for incoming connections during operation state """
...@@ -48,6 +48,9 @@ class IdentificationHandler(EventHandler): ...@@ -48,6 +48,9 @@ class IdentificationHandler(EventHandler):
raise BrokenNodeDisallowedError raise BrokenNodeDisallowedError
# choose the handler according to the node type # choose the handler according to the node type
if node_type == NodeTypes.CLIENT: if node_type == NodeTypes.CLIENT:
if app.dm.getBackupTID():
handler = ClientReadOnlyOperationHandler
else:
handler = ClientOperationHandler handler = ClientOperationHandler
if node is None: if node is None:
node = app.nm.createClient(uuid=uuid) node = app.nm.createClient(uuid=uuid)
......
...@@ -105,6 +105,7 @@ class StorageOperationHandler(EventHandler): ...@@ -105,6 +105,7 @@ class StorageOperationHandler(EventHandler):
self.app.dm.commit() self.app.dm.commit()
assert not pack_tid, "TODO" assert not pack_tid, "TODO"
if next_tid: if next_tid:
# TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid) self.app.replicator.fetchObjects(next_tid, next_oid)
else: else:
self.app.replicator.finish() self.app.replicator.finish()
......
...@@ -17,10 +17,12 @@ ...@@ -17,10 +17,12 @@
import random import random
import time import time
import transaction import transaction
from ZODB.POSException import ReadOnlyError, POSKeyError
import unittest import unittest
from collections import defaultdict from collections import defaultdict
from functools import wraps from functools import wraps
from neo.lib import logging from neo.lib import logging
from neo.client.exception import NEOStorageError
from neo.storage.checker import CHECK_COUNT from neo.storage.checker import CHECK_COUNT
from neo.storage.replicator import Replicator from neo.storage.replicator import Replicator
from neo.lib.connector import SocketConnector from neo.lib.connector import SocketConnector
...@@ -461,5 +463,99 @@ class ReplicationTests(NEOThreadedTest): ...@@ -461,5 +463,99 @@ class ReplicationTests(NEOThreadedTest):
checker.CHECK_COUNT = CHECK_COUNT checker.CHECK_COUNT = CHECK_COUNT
cluster.stop() cluster.stop()
@backup_test()
def testBackupReadOnlyAccess(self, backup):
"""Check backup cluster can be used in read-only mode by ZODB clients"""
B = backup
U = B.upstream
Z = U.getZODBStorage()
#Zb = B.getZODBStorage() # XXX see below about invalidations
oid_list = []
tid_list = []
# S -> Sb link stops working during [cutoff, recover) test iterations
cutoff = 4
recover = 7
def delayReplication(conn, packet):
return isinstance(packet, Packets.AnswerFetchTransactions)
with ConnectionFilter() as f:
for i in xrange(10):
if i == cutoff:
f.add(delayReplication)
if i == recover:
# .remove() removes the filter and retransmits all packets
# that were queued once first filtered packed was detected
# on a connection.
f.remove(delayReplication)
# commit new data to U
txn = transaction.Transaction()
txn.note('test transaction %i' % i)
Z.tpc_begin(txn)
oid = Z.new_oid()
Z.store(oid, None, '%s-%i' % (oid, i), '', txn)
Z.tpc_vote(txn)
tid = Z.tpc_finish(txn)
oid_list.append(oid)
tid_list.append(tid)
# make sure data propagated to B (depending on cutoff)
self.tic()
if cutoff <= i < recover:
self.assertLess(B.backup_tid, U.last_tid)
else:
self.assertEqual(B.backup_tid, U.last_tid)
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(1, self.checkBackup(B, max_tid=B.backup_tid))
# read data from B and verify it is what it should be
# XXX we open new ZODB storage every time because invalidations
# are not yet implemented in read-only mode.
Zb = B.getZODBStorage()
for j, oid in enumerate(oid_list):
if cutoff <= i < recover and j >= cutoff:
self.assertRaises(POSKeyError, Zb.load, oid, '')
else:
data, serial = Zb.load(oid, '')
self.assertEqual(data, '%s-%s' % (oid, j))
self.assertEqual(serial, tid_list[j])
# verify how transaction log & friends behave under potentially
# not-yet-fully fetched backup state (transactions committed at
# [cutoff, recover) should not be there; otherwise transactions
# should be fully there)
Zb = B.getZODBStorage()
Btxn_list = list(Zb.iterator())
self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover
else i+1)
for j, txn in enumerate(Btxn_list):
self.assertEqual(txn.tid, tid_list[j])
self.assertEqual(txn.description, 'test transaction %i' % j)
obj, = txn
self.assertEqual(obj.oid, oid_list[j])
self.assertEqual(obj.data, '%s-%s' % (obj.oid, j))
# TODO test askObjectHistory once it is fixed
# try to commit something to backup storage and make sure it is
# really read-only
Zb._cache._max_size = 0 # make store() do work in sync way
txn = transaction.Transaction()
self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn)
self.assertRaises(ReadOnlyError, Zb.new_oid)
self.assertRaises(ReadOnlyError, Zb.store, oid_list[-1],
tid_list[-1], 'somedata', '', txn)
# tpc_vote first checks whether there were store replies -
# thus not ReadOnlyError
self.assertRaises(NEOStorageError, Zb.tpc_vote, txn)
# close storage because client app is otherwise shared in
# threaded tests and we need to refresh last_tid on next run
# (XXX see above about invalidations not working)
Zb.close()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment