Commit dc7a129c authored by Julien Muchembled's avatar Julien Muchembled

Make sure transactions are committed in full when using internal replication

parent cf6e48ea
...@@ -38,7 +38,7 @@ from .exception import NEOStorageNotFoundError ...@@ -38,7 +38,7 @@ from .exception import NEOStorageNotFoundError
from .handlers import storage, master from .handlers import storage, master
from neo.lib.threaded_app import ThreadedApplication from neo.lib.threaded_app import ThreadedApplication
from .cache import ClientCache from .cache import ClientCache
from .pool import ConnectionPool from .pool import ConnectionPool, InvolvedNodeDict
from neo.lib.util import p64, u64, parseMasterList from neo.lib.util import p64, u64, parseMasterList
CHECKED_SERIAL = object() CHECKED_SERIAL = object()
...@@ -80,16 +80,12 @@ class TransactionContainer(dict): ...@@ -80,16 +80,12 @@ class TransactionContainer(dict):
# data stored: this will go to the cache on tpc_finish # data stored: this will go to the cache on tpc_finish
'cache_dict': {}, 'cache_dict': {},
'cache_size': 0, 'cache_size': 0,
# track successful stores/checks
'object_stored_counter_dict': {}, # {oid: {serial: {storage_id}}}
# conflicts to resolve # conflicts to resolve
'conflict_dict': {}, # {oid: (base_serial, serial)} 'conflict_dict': {}, # {oid: (base_serial, serial)}
# resolved conflicts # resolved conflicts
'resolved_dict': {}, # {oid: serial} 'resolved_dict': {}, # {oid: serial}
# nodes with at least 1 store (object or transaction) # status: 0 -> check only, 1 -> store, 2 -> failed
'involved_nodes': set(), # {node} 'involved_nodes': InvolvedNodeDict(), # {node_id: status}
# nodes with at least 1 check
'checked_nodes': set(), # {node}
} }
return context return context
...@@ -369,7 +365,7 @@ class Application(ThreadedApplication): ...@@ -369,7 +365,7 @@ class Application(ThreadedApplication):
def _loadFromStorage(self, oid, at_tid, before_tid): def _loadFromStorage(self, oid, at_tid, before_tid):
packet = Packets.AskObject(oid, at_tid, before_tid) packet = Packets.AskObject(oid, at_tid, before_tid)
for node, conn in self.cp.iterateForObject(oid, readable=True): for conn in self.cp.iterateForObject(oid):
try: try:
tid, next_tid, compression, checksum, data, data_tid \ tid, next_tid, compression, checksum, data, data_tid \
= self._askStorage(conn, packet) = self._askStorage(conn, packet)
...@@ -440,21 +436,11 @@ class Application(ThreadedApplication): ...@@ -440,21 +436,11 @@ class Application(ThreadedApplication):
txn_context['data_size'] += size txn_context['data_size'] += size
# Store object in tmp cache # Store object in tmp cache
txn_context['data_dict'][oid] = data txn_context['data_dict'][oid] = data
# Store data on each node
txn_context['object_stored_counter_dict'][oid] = {}
queue = txn_context['queue'] queue = txn_context['queue']
involved_nodes = txn_context['involved_nodes']
add_involved_nodes = involved_nodes.add
packet = Packets.AskStoreObject(oid, serial, compression, packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, ttid) checksum, compressed_data, data_serial, ttid)
for node, conn in self.cp.iterateForObject(oid): for ask in self.cp.iterateForWrite(oid, txn_context['involved_nodes']):
try: ask(packet, queue=queue, oid=oid, serial=serial)
conn.ask(packet, queue=queue, oid=oid, serial=serial)
add_involved_nodes(node)
except ConnectionClosed:
continue
if not involved_nodes:
raise NEOStorageError("Store failed")
while txn_context['data_size'] >= self._cache._max_size: while txn_context['data_size'] >= self._cache._max_size:
self._waitAnyTransactionMessage(txn_context) self._waitAnyTransactionMessage(txn_context)
...@@ -550,13 +536,8 @@ class Application(ThreadedApplication): ...@@ -550,13 +536,8 @@ class Application(ThreadedApplication):
_waitAnyTransactionMessage(txn_context) _waitAnyTransactionMessage(txn_context)
if conflict_dict: if conflict_dict:
_handleConflicts(txn_context, tryToResolveConflict) _handleConflicts(txn_context, tryToResolveConflict)
if txn_context['data_dict']:
# Check for never-stored objects, and update result for all others raise NEOStorageError('could not store/check all oids')
for oid, store_dict in \
txn_context['object_stored_counter_dict'].iteritems():
if not store_dict:
logging.error('tpc_store failed')
raise NEOStorageError('tpc_store failed')
if OLD_ZODB: if OLD_ZODB:
return [(oid, ResolvedSerial) return [(oid, ResolvedSerial)
for oid in txn_context['resolved_dict']] for oid in txn_context['resolved_dict']]
...@@ -566,36 +547,38 @@ class Application(ThreadedApplication): ...@@ -566,36 +547,38 @@ class Application(ThreadedApplication):
"""Store current transaction.""" """Store current transaction."""
txn_context = self._txn_container.get(transaction) txn_context = self._txn_container.get(transaction)
result = self.waitStoreResponses(txn_context, tryToResolveConflict) result = self.waitStoreResponses(txn_context, tryToResolveConflict)
ttid = txn_context['ttid'] ttid = txn_context['ttid']
# Store data on each node
assert not txn_context['data_dict'], txn_context
packet = Packets.AskStoreTransaction(ttid, str(transaction.user), packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension), str(transaction.description), dumps(transaction._extension),
txn_context['cache_dict']) txn_context['cache_dict'])
queue = txn_context['queue'] queue = txn_context['queue']
trans_nodes = []
for node, conn in self.cp.iterateForObject(ttid):
logging.debug("voting transaction %s on %s", dump(ttid),
dump(conn.getUUID()))
try:
conn.ask(packet, queue=queue)
except ConnectionClosed:
continue
trans_nodes.append(node)
# check at least one storage node accepted
if trans_nodes:
involved_nodes = txn_context['involved_nodes'] involved_nodes = txn_context['involved_nodes']
# Ask in parallel all involved storage nodes to commit object metadata.
# Nodes that store the transaction metadata get a special packet.
trans_nodes = [ask(packet, queue=queue)
for ask in self.cp.iterateForWrite(ttid, involved_nodes)]
packet = Packets.AskVoteTransaction(ttid) packet = Packets.AskVoteTransaction(ttid)
for node in involved_nodes.difference(trans_nodes): for uuid, status in involved_nodes.iteritems():
if status == 1 and uuid not in trans_nodes:
node = self.nm.getByUUID(uuid)
if node is not None:
conn = self.cp.getConnForNode(node) conn = self.cp.getConnForNode(node)
if conn is not None: if conn is not None:
try: involved_nodes.ask(conn)(packet, queue=queue)
conn.ask(packet, queue=queue) continue
involved_nodes[uuid] = 2
self.waitResponses(queue)
# If there are failed nodes, ask the master whether they can be
# disconnected while keeping the cluster operational. If possible,
# this will happen during tpc_finish.
failed = [node.getUUID()
for node in self.nm.getStorageList()
if node.isRunning() and involved_nodes.get(node.getUUID()) == 2]
if failed:
try:
self._askPrimary(Packets.FailedVote(ttid, failed))
except ConnectionClosed: except ConnectionClosed:
pass pass
involved_nodes.update(trans_nodes)
self.waitResponses(queue)
txn_context['voted'] = None txn_context['voted'] = None
# We must not go further if connection to master was lost since # We must not go further if connection to master was lost since
# tpc_begin, to lower the probability of failing during tpc_finish. # tpc_begin, to lower the probability of failing during tpc_finish.
...@@ -606,8 +589,6 @@ class Application(ThreadedApplication): ...@@ -606,8 +589,6 @@ class Application(ThreadedApplication):
if 'error' in txn_context: if 'error' in txn_context:
raise NEOStorageError(txn_context['error']) raise NEOStorageError(txn_context['error'])
return result return result
logging.error('tpc_vote failed')
raise NEOStorageError('tpc_vote failed')
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
"""Abort current transaction.""" """Abort current transaction."""
...@@ -616,11 +597,12 @@ class Application(ThreadedApplication): ...@@ -616,11 +597,12 @@ class Application(ThreadedApplication):
return return
p = Packets.AbortTransaction(txn_context['ttid']) p = Packets.AbortTransaction(txn_context['ttid'])
# cancel transaction on all those nodes # cancel transaction on all those nodes
nodes = map(self.cp.getConnForNode, conns = [self.master_conn]
txn_context['involved_nodes'] | for uuid in txn_context['involved_nodes']:
txn_context['checked_nodes']) node = self.nm.getByUUID(uuid)
nodes.append(self.master_conn) if node is not None:
for conn in nodes: conns.append(self.cp.getConnForNode(node))
for conn in conns:
if conn is not None: if conn is not None:
try: try:
conn.notify(p) conn.notify(p)
...@@ -685,8 +667,7 @@ class Application(ThreadedApplication): ...@@ -685,8 +667,7 @@ class Application(ThreadedApplication):
pass pass
if tid == MAX_TID: if tid == MAX_TID:
while 1: while 1:
for _, conn in self.cp.iterateForObject( for conn in self.cp.iterateForObject(ttid):
ttid, readable=True):
try: try:
return self._askStorage(conn, p) return self._askStorage(conn, p)
except ConnectionClosed: except ConnectionClosed:
...@@ -777,7 +758,7 @@ class Application(ThreadedApplication): ...@@ -777,7 +758,7 @@ class Application(ThreadedApplication):
def _getTransactionInformation(self, tid): def _getTransactionInformation(self, tid):
packet = Packets.AskTransactionInformation(tid) packet = Packets.AskTransactionInformation(tid)
for node, conn in self.cp.iterateForObject(tid, readable=True): for conn in self.cp.iterateForObject(tid):
try: try:
txn_info, txn_ext = self._askStorage(conn, packet) txn_info, txn_ext = self._askStorage(conn, packet)
except ConnectionClosed: except ConnectionClosed:
...@@ -838,7 +819,7 @@ class Application(ThreadedApplication): ...@@ -838,7 +819,7 @@ class Application(ThreadedApplication):
# request a tid list for each partition # request a tid list for each partition
for offset in xrange(self.pt.getPartitions()): for offset in xrange(self.pt.getPartitions()):
p = Packets.AskTIDsFrom(start, stop, limit, offset) p = Packets.AskTIDsFrom(start, stop, limit, offset)
for node, conn in self.cp.iterateForObject(offset, readable=True): for conn in self.cp.iterateForObject(offset):
try: try:
r = self._askStorage(conn, p) r = self._askStorage(conn, p)
break break
...@@ -864,7 +845,7 @@ class Application(ThreadedApplication): ...@@ -864,7 +845,7 @@ class Application(ThreadedApplication):
def history(self, oid, size=1, filter=None): def history(self, oid, size=1, filter=None):
# Get history informations for object first # Get history informations for object first
packet = Packets.AskObjectHistory(oid, 0, size) packet = Packets.AskObjectHistory(oid, 0, size)
for node, conn in self.cp.iterateForObject(oid, readable=True): for conn in self.cp.iterateForObject(oid):
try: try:
history_list = self._askStorage(conn, packet) history_list = self._askStorage(conn, packet)
except ConnectionClosed: except ConnectionClosed:
...@@ -939,20 +920,13 @@ class Application(ThreadedApplication): ...@@ -939,20 +920,13 @@ class Application(ThreadedApplication):
ttid = txn_context['ttid'] ttid = txn_context['ttid']
# Placeholders # Placeholders
queue = txn_context['queue'] queue = txn_context['queue']
txn_context['object_stored_counter_dict'][oid] = {}
# ZODB.Connection performs calls 'checkCurrentSerialInTransaction' # ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
# after stores, and skips oids that have been successfully stored. # after stores, and skips oids that have been successfully stored.
assert oid not in txn_context['cache_dict'], (oid, txn_context) assert oid not in txn_context['cache_dict'], (oid, txn_context)
txn_context['data_dict'].setdefault(oid, CHECKED_SERIAL) txn_context['data_dict'].setdefault(oid, CHECKED_SERIAL)
checked_nodes = txn_context['checked_nodes'] involved_nodes = txn_context['involved_nodes']
packet = Packets.AskCheckCurrentSerial(ttid, serial, oid) packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
for node, conn in self.cp.iterateForObject(oid): for ask in self.cp.iterateForWrite(oid, involved_nodes, 0):
try: ask(packet, queue=queue, oid=oid, serial=serial)
conn.ask(packet, queue=queue, oid=oid, serial=serial)
except ConnectionClosed:
continue
checked_nodes.add(node)
if not checked_nodes:
raise NEOStorageError("checkCurrent failed")
self._waitAnyTransactionMessage(txn_context, False) self._waitAnyTransactionMessage(txn_context, False)
...@@ -204,6 +204,10 @@ class PrimaryAnswersHandler(AnswerBaseHandler): ...@@ -204,6 +204,10 @@ class PrimaryAnswersHandler(AnswerBaseHandler):
oid_list.reverse() oid_list.reverse()
self.app.new_oid_list = oid_list self.app.new_oid_list = oid_list
def incompleteTransaction(self, conn, message):
raise NEOStorageError("storage nodes for which vote failed can not be"
" disconnected without making the cluster non-operational")
def answerTransactionFinished(self, conn, _, tid): def answerTransactionFinished(self, conn, _, tid):
self.app.setHandlerData(tid) self.app.setHandlerData(tid)
......
...@@ -22,6 +22,7 @@ from neo.lib.util import dump ...@@ -22,6 +22,7 @@ from neo.lib.util import dump
from neo.lib.exception import NodeNotReady from neo.lib.exception import NodeNotReady
from neo.lib.handler import MTEventHandler from neo.lib.handler import MTEventHandler
from . import AnswerBaseHandler from . import AnswerBaseHandler
from ..pool import InvolvedNodeDict
from ..exception import NEOStorageError, NEOStorageNotFoundError from ..exception import NEOStorageError, NEOStorageNotFoundError
from ..exception import NEOStorageDoesNotExistError from ..exception import NEOStorageDoesNotExistError
...@@ -31,7 +32,7 @@ class StorageEventHandler(MTEventHandler): ...@@ -31,7 +32,7 @@ class StorageEventHandler(MTEventHandler):
node = self.app.nm.getByAddress(conn.getAddress()) node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None assert node is not None
self.app.cp.removeConnection(node) self.app.cp.removeConnection(node)
self.app.dispatcher.unregister(conn) super(StorageEventHandler, self).connectionLost(conn, new_state)
def connectionFailed(self, conn): def connectionFailed(self, conn):
# Connection to a storage node failed # Connection to a storage node failed
...@@ -63,11 +64,10 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -63,11 +64,10 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerStoreObject(self, conn, conflict, oid, serial): def answerStoreObject(self, conn, conflict, oid, serial):
if not conflict: if not conflict:
# Ignore if not locked on storage side. # Ignore if not locked on storage side. We only had to receive
# this answer, so that this storage is not marked as failed.
return return
txn_context = self.app.getHandlerData() txn_context = self.app.getHandlerData()
object_stored_counter_dict = txn_context[
'object_stored_counter_dict'][oid]
if conflict != serial: if conflict != serial:
# Conflicts can not be resolved now because 'conn' is locked. # Conflicts can not be resolved now because 'conn' is locked.
# We must postpone the resolution (by queuing the conflict in # We must postpone the resolution (by queuing the conflict in
...@@ -84,21 +84,12 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -84,21 +84,12 @@ class StorageAnswersHandler(AnswerBaseHandler):
# resolution. # resolution.
if conflict <= txn_context['resolved_dict'].get(oid, ''): if conflict <= txn_context['resolved_dict'].get(oid, ''):
return return
if conflict in object_stored_counter_dict:
raise NEOStorageError('Storages %s accepted object %s'
' for serial %s but %s reports a conflict for it.' % (
map(dump, object_stored_counter_dict[conflict]),
dump(oid), dump(conflict), dump(conn.getUUID())))
txn_context['conflict_dict'][oid] = serial, conflict txn_context['conflict_dict'][oid] = serial, conflict
else: else:
uuid_set = object_stored_counter_dict.get(serial)
if uuid_set is None: # store to first storage node
object_stored_counter_dict[serial] = uuid_set = set()
try: try:
data = txn_context['data_dict'].pop(oid) data = txn_context['data_dict'].pop(oid)
except KeyError: # multiple undo except KeyError: # replica, or multiple undo
assert txn_context['cache_dict'][oid] is None, oid return
else:
if type(data) is str: if type(data) is str:
size = len(data) size = len(data)
txn_context['data_size'] -= size txn_context['data_size'] -= size
...@@ -111,9 +102,6 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -111,9 +102,6 @@ class StorageAnswersHandler(AnswerBaseHandler):
# prevents memory errors for big transactions. # prevents memory errors for big transactions.
data = None data = None
txn_context['cache_dict'][oid] = data txn_context['cache_dict'][oid] = data
else: # replica
assert oid not in txn_context['data_dict'], oid
uuid_set.add(conn.getUUID())
answerCheckCurrentSerial = answerStoreObject answerCheckCurrentSerial = answerStoreObject
...@@ -122,6 +110,15 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -122,6 +110,15 @@ class StorageAnswersHandler(AnswerBaseHandler):
answerVoteTransaction = answerStoreTransaction answerVoteTransaction = answerStoreTransaction
def connectionClosed(self, conn):
txn_context = self.app.getHandlerData()
# XXX: A 'Transaction' class would be cleaner.
if type(txn_context) is dict:
involved_nodes = txn_context.get('involved_nodes')
if type(involved_nodes) is InvolvedNodeDict:
involved_nodes[conn.getUUID()] = 2
super(StorageAnswersHandler, self).connectionClosed(conn)
def answerTIDsFrom(self, conn, tid_list): def answerTIDsFrom(self, conn, tid_list):
logging.debug('Get %u TIDs from %r', len(tid_list), conn) logging.debug('Get %u TIDs from %r', len(tid_list), conn)
self.app.setHandlerData(tid_list) self.app.setHandlerData(tid_list)
......
...@@ -28,7 +28,7 @@ from .exception import NEOPrimaryMasterLost, NEOStorageError ...@@ -28,7 +28,7 @@ from .exception import NEOPrimaryMasterLost, NEOStorageError
# failed in the past. # failed in the past.
MAX_FAILURE_AGE = 600 MAX_FAILURE_AGE = 600
# Cell list sort keys # Cell list sort keys, only for read access
# We are connected to storage node hosting cell, high priority # We are connected to storage node hosting cell, high priority
CELL_CONNECTED = -1 CELL_CONNECTED = -1
# normal priority # normal priority
...@@ -36,6 +36,24 @@ CELL_GOOD = 0 ...@@ -36,6 +36,24 @@ CELL_GOOD = 0
# Storage node hosting cell failed recently, low priority # Storage node hosting cell failed recently, low priority
CELL_FAILED = 1 CELL_FAILED = 1
class InvolvedNodeDict(dict):
# Keys are node ids instead of Node objects because a node may disappear
# from the cluster. In any case, we always have to check if the id is
# still known by the NodeManager.
def ask(self, conn):
def ask(*args, **kw):
try:
conn.ask(*args, **kw)
except ConnectionClosed:
self[conn.getUUID()] = 2
else:
self.fail = 0
return conn.getUUID()
return ask
class ConnectionPool(object): class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes.""" """This class manages a pool of connections to storage nodes."""
...@@ -86,12 +104,12 @@ class ConnectionPool(object): ...@@ -86,12 +104,12 @@ class ConnectionPool(object):
def getConnForCell(self, cell): def getConnForCell(self, cell):
return self.getConnForNode(cell.getNode()) return self.getConnForNode(cell.getNode())
def iterateForObject(self, object_id, readable=False): def iterateForObject(self, object_id):
""" Iterate over nodes managing an object """ """ Iterate over nodes managing an object """
pt = self.app.pt pt = self.app.pt
if type(object_id) is str: if type(object_id) is str:
object_id = pt.getPartition(object_id) object_id = pt.getPartition(object_id)
cell_list = pt.getCellList(object_id, readable) cell_list = pt.getCellList(object_id, True)
if not cell_list: if not cell_list:
raise NEOStorageError('no storage available') raise NEOStorageError('no storage available')
getConnForNode = self.getConnForNode getConnForNode = self.getConnForNode
...@@ -106,7 +124,7 @@ class ConnectionPool(object): ...@@ -106,7 +124,7 @@ class ConnectionPool(object):
node = cell.getNode() node = cell.getNode()
conn = getConnForNode(node) conn = getConnForNode(node)
if conn is not None: if conn is not None:
yield node, conn yield conn
# Re-check if node is running, as our knowledge of its # Re-check if node is running, as our knowledge of its
# state can have changed during connection attempt. # state can have changed during connection attempt.
elif node.isRunning(): elif node.isRunning():
...@@ -117,6 +135,26 @@ class ConnectionPool(object): ...@@ -117,6 +135,26 @@ class ConnectionPool(object):
if self.app.master_conn is None: if self.app.master_conn is None:
raise NEOPrimaryMasterLost raise NEOPrimaryMasterLost
def iterateForWrite(self, object_id, involved, store=1):
pt = self.app.pt
involved.fail = 1
for cell in pt.getCellList(pt.getPartition(object_id)):
node = cell.getNode()
uuid = node.getUUID()
status = involved.setdefault(uuid, store)
if status < store:
involved[uuid] = store
elif status > 1:
continue
conn = self.getConnForNode(node)
if conn is None:
involved[uuid] = 2
else:
yield involved.ask(conn)
if involved.fail:
raise NEOStorageError(
'no storage available for write to partition %s' % object_id)
def getConnForNode(self, node): def getConnForNode(self, node):
"""Return a locked connection object to a given node """Return a locked connection object to a given node
If no connection exists, create a new one""" If no connection exists, create a new one"""
......
...@@ -75,6 +75,7 @@ def ErrorCodes(): ...@@ -75,6 +75,7 @@ def ErrorCodes():
CHECKING_ERROR CHECKING_ERROR
BACKEND_NOT_IMPLEMENTED BACKEND_NOT_IMPLEMENTED
READ_ONLY_ACCESS READ_ONLY_ACCESS
INCOMPLETE_TRANSACTION
@Enum @Enum
def ClusterStates(): def ClusterStates():
...@@ -862,6 +863,18 @@ class BeginTransaction(Packet): ...@@ -862,6 +863,18 @@ class BeginTransaction(Packet):
PTID('tid'), PTID('tid'),
) )
class FailedVote(Packet):
"""
Report storage nodes for which vote failed. C -> M
True is returned if it's still possible to finish the transaction.
"""
_fmt = PStruct('failed_vote',
PTID('tid'),
PFUUIDList,
)
_answer = Error
class FinishTransaction(Packet): class FinishTransaction(Packet):
""" """
Finish a transaction. C -> PM. Finish a transaction. C -> PM.
...@@ -1634,6 +1647,8 @@ class Packets(dict): ...@@ -1634,6 +1647,8 @@ class Packets(dict):
ValidateTransaction) ValidateTransaction)
AskBeginTransaction, AnswerBeginTransaction = register( AskBeginTransaction, AnswerBeginTransaction = register(
BeginTransaction) BeginTransaction)
FailedVote = register(
FailedVote)
AskFinishTransaction, AnswerTransactionFinished = register( AskFinishTransaction, AnswerTransactionFinished = register(
FinishTransaction, ignore_when_closed=False) FinishTransaction, ignore_when_closed=False)
AskLockInformation, AnswerInformationLocked = register( AskLockInformation, AnswerInformationLocked = register(
......
...@@ -258,15 +258,16 @@ class PartitionTable(object): ...@@ -258,15 +258,16 @@ class PartitionTable(object):
partition on the line (here, line length is 11 to keep the docstring partition on the line (here, line length is 11 to keep the docstring
width under 80 column). width under 80 column).
""" """
node_list = sorted(self.count_dict)
result = ['pt: node %u: %s, %s' % (i, uuid_str(node.getUUID()), result = ['pt: node %u: %s, %s' % (i, uuid_str(node.getUUID()),
protocol.node_state_prefix_dict[node.getState()]) protocol.node_state_prefix_dict[node.getState()])
for i, node in enumerate(sorted(self.count_dict))] for i, node in enumerate(node_list)]
append = result.append append = result.append
line = [] line = []
max_line_len = 20 # XXX: hardcoded number of partitions per line max_line_len = 20 # XXX: hardcoded number of partitions per line
prefix = 0 prefix = 0
prefix_len = int(math.ceil(math.log10(self.np))) prefix_len = int(math.ceil(math.log10(self.np)))
for offset, row in enumerate(self.formatRows()): for offset, row in enumerate(self._formatRows(node_list)):
if len(line) == max_line_len: if len(line) == max_line_len:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line))) append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
line = [] line = []
...@@ -276,8 +277,7 @@ class PartitionTable(object): ...@@ -276,8 +277,7 @@ class PartitionTable(object):
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line))) append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
return result return result
def formatRows(self): def _formatRows(self, node_list):
node_list = sorted(self.count_dict)
cell_state_dict = protocol.cell_state_prefix_dict cell_state_dict = protocol.cell_state_prefix_dict
for row in self.partition_list: for row in self.partition_list:
if row is None: if row is None:
...@@ -287,12 +287,14 @@ class PartitionTable(object): ...@@ -287,12 +287,14 @@ class PartitionTable(object):
for x in row} for x in row}
yield ''.join(cell_dict.get(x, '.') for x in node_list) yield ''.join(cell_dict.get(x, '.') for x in node_list)
def operational(self): def operational(self, exclude_list=()):
if not self.filled(): if not self.filled():
return False return False
for row in self.partition_list: for row in self.partition_list:
for cell in row: for cell in row:
if cell.isReadable() and cell.getNode().isRunning(): if cell.isReadable():
node = cell.getNode()
if node.isRunning() and node.getUUID() not in exclude_list:
break break
else: else:
return False return False
......
...@@ -51,6 +51,7 @@ from .verification import VerificationManager ...@@ -51,6 +51,7 @@ from .verification import VerificationManager
class Application(BaseApplication): class Application(BaseApplication):
"""The master node application.""" """The master node application."""
packing = None packing = None
storage_readiness = 0
# Latest completely committed TID # Latest completely committed TID
last_transaction = ZERO_TID last_transaction = ZERO_TID
backup_tid = None backup_tid = None
...@@ -66,7 +67,7 @@ class Application(BaseApplication): ...@@ -66,7 +67,7 @@ class Application(BaseApplication):
self.server = config.getBind() self.server = config.getBind()
self.autostart = config.getAutostart() self.autostart = config.getAutostart()
self.storage_readiness = set() self.storage_ready_dict = {}
for master_address in config.getMasters(): for master_address in config.getMasters():
self.nm.createMaster(address=master_address) self.nm.createMaster(address=master_address)
...@@ -574,11 +575,16 @@ class Application(BaseApplication): ...@@ -574,11 +575,16 @@ class Application(BaseApplication):
self.last_transaction = tid self.last_transaction = tid
def setStorageNotReady(self, uuid): def setStorageNotReady(self, uuid):
self.storage_readiness.discard(uuid) self.storage_ready_dict.pop(uuid, None)
def setStorageReady(self, uuid): def setStorageReady(self, uuid):
self.storage_readiness.add(uuid) if uuid not in self.storage_ready_dict:
self.storage_readiness = self.storage_ready_dict[uuid] = \
self.storage_readiness + 1
def isStorageReady(self, uuid): def isStorageReady(self, uuid):
return uuid in self.storage_readiness return uuid in self.storage_ready_dict
def getStorageReadySet(self, readiness):
return {k for k, v in self.storage_ready_dict.iteritems()
if v <= readiness}
...@@ -45,46 +45,35 @@ class ClientServiceHandler(MasterHandler): ...@@ -45,46 +45,35 @@ class ClientServiceHandler(MasterHandler):
""" """
app = self.app app = self.app
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
conn.answer(Packets.AnswerBeginTransaction(app.tm.begin(node, tid))) tid = app.tm.begin(node, app.storage_readiness, tid)
conn.answer(Packets.AnswerBeginTransaction(tid))
def askNewOIDs(self, conn, num_oids): def askNewOIDs(self, conn, num_oids):
conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids))) conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
def failedVote(self, conn, *args):
app = self.app
ok = app.tm.vote(app, *args)
if ok is None:
app.tm.queueEvent(self.failedVote, conn, args)
else:
conn.answer((Errors.Ack if ok else Errors.IncompleteTransaction)())
def askFinishTransaction(self, conn, ttid, oid_list, checked_list): def askFinishTransaction(self, conn, ttid, oid_list, checked_list):
app = self.app app = self.app
pt = app.pt tid, node_list = app.tm.prepare(
app,
# Collect partitions related to this transaction.
getPartition = pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.update(map(getPartition, checked_list))
partition_set.add(getPartition(ttid))
# Collect the UUIDs of nodes related to this transaction.
uuid_list = filter(app.isStorageReady, {cell.getUUID()
for part in partition_set
for cell in pt.getCellList(part)
if cell.getNodeState() != NodeStates.HIDDEN})
if not uuid_list:
raise ProtocolError('No storage node ready for transaction')
identified_node_list = app.nm.getIdentifiedList(pool_set=set(uuid_list))
# Request locking data.
# build a new set as we may not send the message to all nodes as some
# might be not reachable at that time
p = Packets.AskLockInformation(
ttid, ttid,
app.tm.prepare(
ttid,
pt.getPartitions(),
oid_list, oid_list,
{x.getUUID() for x in identified_node_list}, checked_list,
conn.getPeerId(), conn.getPeerId(),
),
) )
for node in identified_node_list: if tid:
p = Packets.AskLockInformation(ttid, tid)
for node in node_list:
node.ask(p, timeout=60) node.ask(p, timeout=60)
else:
conn.answer(Errors.IncompleteTransaction())
def askFinalTID(self, conn, ttid): def askFinalTID(self, conn, ttid):
tm = self.app.tm tm = self.app.tm
......
...@@ -26,18 +26,18 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -26,18 +26,18 @@ class StorageServiceHandler(BaseServiceHandler):
def connectionCompleted(self, conn, new): def connectionCompleted(self, conn, new):
app = self.app app = self.app
uuid = conn.getUUID()
app.setStorageNotReady(uuid)
if new: if new:
super(StorageServiceHandler, self).connectionCompleted(conn, new) super(StorageServiceHandler, self).connectionCompleted(conn, new)
if app.nm.getByUUID(uuid).isRunning(): # node may be PENDING if app.nm.getByUUID(conn.getUUID()).isRunning(): # node may be PENDING
conn.notify(Packets.StartOperation(app.backup_tid)) conn.notify(Packets.StartOperation(app.backup_tid))
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
app = self.app app = self.app
node = app.nm.getByUUID(conn.getUUID()) uuid = conn.getUUID()
node = app.nm.getByUUID(uuid)
super(StorageServiceHandler, self).connectionLost(conn, new_state) super(StorageServiceHandler, self).connectionLost(conn, new_state)
app.tm.storageLost(conn.getUUID()) app.setStorageNotReady(uuid)
app.tm.storageLost(uuid)
if (app.getClusterState() == ClusterStates.BACKINGUP if (app.getClusterState() == ClusterStates.BACKINGUP
# Also check if we're exiting, because backup_app is not usable # Also check if we're exiting, because backup_app is not usable
# in this case. Maybe cluster state should be set to something # in this case. Maybe cluster state should be set to something
......
...@@ -18,6 +18,7 @@ from collections import deque ...@@ -18,6 +18,7 @@ from collections import deque
from time import time from time import time
from struct import pack, unpack from struct import pack, unpack
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import EventQueue
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_OID, ZERO_TID from neo.lib.protocol import ProtocolError, uuid_str, ZERO_OID, ZERO_TID
from neo.lib.util import dump, u64, addTID, tidFromTime from neo.lib.util import dump, u64, addTID, tidFromTime
...@@ -31,16 +32,18 @@ class Transaction(object): ...@@ -31,16 +32,18 @@ class Transaction(object):
_tid = None _tid = None
_msg_id = None _msg_id = None
_oid_list = None _oid_list = None
_failed = frozenset()
_prepared = False _prepared = False
# uuid dict hold flag to known who has locked the transaction # uuid dict hold flag to known who has locked the transaction
_uuid_set = None _uuid_set = None
_lock_wait_uuid_set = None _lock_wait_uuid_set = None
def __init__(self, node, ttid): def __init__(self, node, storage_readiness, ttid):
""" """
Prepare the transaction, set OIDs and UUIDs related to it Prepare the transaction, set OIDs and UUIDs related to it
""" """
self._node = node self._node = node
self._storage_readiness = storage_readiness
self._ttid = ttid self._ttid = ttid
self._birth = time() self._birth = time()
# store storage uuids that must be notified at commit # store storage uuids that must be notified at commit
...@@ -113,13 +116,13 @@ class Transaction(object): ...@@ -113,13 +116,13 @@ class Transaction(object):
""" """
return list(self._notification_set) return list(self._notification_set)
def prepare(self, tid, oid_list, uuid_list, msg_id): def prepare(self, tid, oid_list, uuid_set, msg_id):
self._tid = tid self._tid = tid
self._oid_list = oid_list self._oid_list = oid_list
self._msg_id = msg_id self._msg_id = msg_id
self._uuid_set = set(uuid_list) self._uuid_set = uuid_set
self._lock_wait_uuid_set = set(uuid_list) self._lock_wait_uuid_set = uuid_set.copy()
self._prepared = True self._prepared = True
def storageLost(self, uuid): def storageLost(self, uuid):
...@@ -163,7 +166,7 @@ class Transaction(object): ...@@ -163,7 +166,7 @@ class Transaction(object):
return not self._lock_wait_uuid_set return not self._lock_wait_uuid_set
class TransactionManager(object): class TransactionManager(EventQueue):
""" """
Manage current transactions Manage current transactions
""" """
...@@ -173,6 +176,7 @@ class TransactionManager(object): ...@@ -173,6 +176,7 @@ class TransactionManager(object):
self.reset() self.reset()
def reset(self): def reset(self):
EventQueue.__init__(self)
# ttid -> transaction # ttid -> transaction
self._ttid_dict = {} self._ttid_dict = {}
self._last_oid = ZERO_OID self._last_oid = ZERO_OID
...@@ -195,6 +199,7 @@ class TransactionManager(object): ...@@ -195,6 +199,7 @@ class TransactionManager(object):
except ValueError: except ValueError:
pass pass
del self._ttid_dict[ttid] del self._ttid_dict[ttid]
self.executeQueuedEvents()
def __contains__(self, ttid): def __contains__(self, ttid):
""" """
...@@ -285,7 +290,7 @@ class TransactionManager(object): ...@@ -285,7 +290,7 @@ class TransactionManager(object):
txn.registerForNotification(uuid) txn.registerForNotification(uuid)
return self._ttid_dict.keys() return self._ttid_dict.keys()
def begin(self, node, tid=None): def begin(self, node, storage_readiness, tid=None):
""" """
Generate a new TID Generate a new TID
""" """
...@@ -297,28 +302,91 @@ class TransactionManager(object): ...@@ -297,28 +302,91 @@ class TransactionManager(object):
# last TID. # last TID.
self._queue.append(tid) self._queue.append(tid)
self.setLastTID(tid) self.setLastTID(tid)
txn = self._ttid_dict[tid] = Transaction(node, tid) txn = self._ttid_dict[tid] = Transaction(node, storage_readiness, tid)
logging.debug('Begin %s', txn) logging.debug('Begin %s', txn)
return tid return tid
def prepare(self, ttid, divisor, oid_list, uuid_list, msg_id): def vote(self, app, ttid, uuid_list):
"""
Check that the transaction can be voted
when the client reports failed nodes.
"""
txn = self[ttid]
# The client does not know which nodes are not expected to have
# transactions in full. Let's filter out them.
failed = app.getStorageReadySet(txn._storage_readiness)
failed.intersection_update(uuid_list)
if failed:
operational = app.pt.operational
if not operational(failed):
# No way to commit this transaction because there are
# non-replicated storage nodes with failed stores.
return False
failed = failed.copy()
for t in self._ttid_dict.itervalues():
failed |= t._failed
if not operational(failed):
# Other transactions were voted and unless they're aborted,
# we won't be able to finish this one, because that would make
# the cluster non-operational. Let's tell the caller to retry
# later.
return
# Allow the client to finish the transaction,
# even if it will disconnect storage nodes.
txn._failed = failed
return True
def prepare(self, app, ttid, oid_list, checked_list, msg_id):
""" """
Prepare a transaction to be finished Prepare a transaction to be finished
""" """
txn = self[ttid] txn = self[ttid]
pt = app.pt
failed = txn._failed
if failed and not pt.operational(failed):
return None, None
ready = app.getStorageReadySet(txn._storage_readiness)
getPartition = pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.update(map(getPartition, checked_list))
partition_set.add(getPartition(ttid))
node_list = []
uuid_set = set()
for partition in partition_set:
for cell in pt.getCellList(partition):
node = cell.getNode()
if node.isIdentified():
uuid = node.getUUID()
if uuid in uuid_set:
continue
if uuid in failed:
# This will commit a new PT with outdated cells before
# locking the transaction, which is important during
# the verification phase.
node.getConnection().close()
elif uuid in ready:
uuid_set.add(uuid)
node_list.append(node)
# A node that was not ready at the beginning of the transaction
# can't have readable cells. And if we're still operational without
# the 'failed' nodes, then there must still be 1 node in 'ready'
# that is UP.
assert node_list, (ready, failed)
# maybe not the fastest but _queue should be often small # maybe not the fastest but _queue should be often small
if ttid in self._queue: if ttid in self._queue:
tid = ttid tid = ttid
else: else:
tid = self._nextTID(ttid, divisor) tid = self._nextTID(ttid, pt.getPartitions())
self._queue.append(ttid) self._queue.append(ttid)
logging.debug('Finish TXN %s for %s (was %s)', logging.debug('Finish TXN %s for %s (was %s)',
dump(tid), txn.getNode(), dump(ttid)) dump(tid), txn.getNode(), dump(ttid))
txn.prepare(tid, oid_list, uuid_list, msg_id) txn.prepare(tid, oid_list, uuid_set, msg_id)
# check if greater and foreign OID was stored # check if greater and foreign OID was stored
if oid_list: if oid_list:
self.setLastOID(max(oid_list)) self.setLastOID(max(oid_list))
return tid return tid, node_list
def abort(self, ttid, uuid): def abort(self, ttid, uuid):
""" """
...@@ -350,7 +418,7 @@ class TransactionManager(object): ...@@ -350,7 +418,7 @@ class TransactionManager(object):
for ttid, txn in self._ttid_dict.iteritems(): for ttid, txn in self._ttid_dict.iteritems():
if txn.storageLost(uuid) and self._queue[0] == ttid: if txn.storageLost(uuid) and self._queue[0] == ttid:
unlock = True unlock = True
# do not break: we must call forget() on all transactions # do not break: we must call storageLost() on all transactions
if unlock: if unlock:
self._unlockPending() self._unlockPending()
...@@ -370,6 +438,7 @@ class TransactionManager(object): ...@@ -370,6 +438,7 @@ class TransactionManager(object):
break break
del queue[0], self._ttid_dict[ttid] del queue[0], self._ttid_dict[ttid]
self._on_commit(txn) self._on_commit(txn)
self.executeQueuedEvents()
def clientLost(self, node): def clientLost(self, node):
for txn in self._ttid_dict.values(): for txn in self._ttid_dict.values():
...@@ -380,4 +449,4 @@ class TransactionManager(object): ...@@ -380,4 +449,4 @@ class TransactionManager(object):
logging.info('Transactions:') logging.info('Transactions:')
for txn in self._ttid_dict.itervalues(): for txn in self._ttid_dict.itervalues():
logging.info(' %r', txn) logging.info(' %r', txn)
self.logQueuedEvents()
...@@ -244,8 +244,8 @@ class Application(BaseApplication): ...@@ -244,8 +244,8 @@ class Application(BaseApplication):
while not self.operational: while not self.operational:
_poll() _poll()
self.ready = True self.ready = True
self.replicator.populate()
self.master_conn.notify(Packets.NotifyReady()) self.master_conn.notify(Packets.NotifyReady())
self.replicator.populate()
def doOperation(self): def doOperation(self):
"""Handle everything, including replications and transactions.""" """Handle everything, including replications and transactions."""
......
...@@ -29,7 +29,7 @@ partitions. ...@@ -29,7 +29,7 @@ partitions.
2 parts, done sequentially: 2 parts, done sequentially:
- Transaction (metadata) replication - Transaction (metadata) replication
- Object (data) replication - Object (metadata+data) replication
Both parts follow the same mechanism: Both parts follow the same mechanism:
- The range of data to replicate is split into chunks of FETCH_COUNT items - The range of data to replicate is split into chunks of FETCH_COUNT items
...@@ -37,15 +37,52 @@ Both parts follow the same mechanism: ...@@ -37,15 +37,52 @@ Both parts follow the same mechanism:
- For every chunk, the requesting node sends to seeding node the list of items - For every chunk, the requesting node sends to seeding node the list of items
it already has. it already has.
- Before answering, the seeding node sends 1 packet for every missing item. - Before answering, the seeding node sends 1 packet for every missing item.
For items that are already on the replicating node, there is no check that
values matches.
- The seeding node finally answers with the list of items to delete (usually - The seeding node finally answers with the list of items to delete (usually
empty). empty).
Replication is partial, starting from the greatest stored tid in the partition: Internal replication, which is similar to RAID1 (and as opposed to asynchronous
- For transactions, this tid is excluded from replication. replication to a backup cluster) requires extra care with respect to
- For objects, this tid is included unless the storage already knows it has transactions. The transition of a cell from OUT_OF_DATE to UP_TO_DATE is done
all oids for it. is several steps.
There is no check that item values on both nodes matches. A replicating node can not depend on other nodes to fetch the data
recently/being committed because that can not be done atomically: it could miss
writes between the processing of its request by a source node and the reception
of the answer.
Therefore, outdated cells are writable: a storage node asks the master for
transactions being committed and then it is expected to fully receive from the
client any transaction that is started after this answer.
Which has in turn other consequences:
- The client must not fail to write to a storage node after the above request
to the master: for this, the storage must have announced it is ready, and it
must delay identification of unknown clients (those for which it hasn't
received yet a notification from the master).
- Writes must be accepted blindly (i.e. without taking a write-lock) when a
storage node lacks the data to check for conflicts. This is possible because
1 up-to-date cell (for each partition) is enough to do these checks.
- Because the client can not reliably know if a storage node is expected to
receive a transaction in full, all writes must succeed.
- Even if the replication is finished, we have to wait that we don't have any
lockless writes left before announcing to the master that we're up-to-date.
To sum up:
1. ask unfinished transactions -> (last_transaction, ttid_list)
2. replicate to last_transaction
3. wait for all ttid_list to be finished -> new last_transaction
4. replicate to last_transaction
5. no lockless write anymore, except to (oid, ttid) that were already
stored/checked without taking a lock
6. wait for all transactions with lockless writes to be finished
7. announce we're up-to-date
For any failed write, the client marks the storage node as failed and stops
writing to it for the transaction. Unless there's no failed write, vote ends
with an extra request to the master: the transaction will only succeed if the
failed nodes can be disconnected, forcing them to replicate the missing data.
TODO: Packing and replication currently fail when they happen at the same time. TODO: Packing and replication currently fail when they happen at the same time.
""" """
...@@ -85,11 +122,6 @@ class Replicator(object): ...@@ -85,11 +122,6 @@ class Replicator(object):
if node is not None and node.isConnected(True): if node is not None and node.isConnected(True):
return node.getConnection() return node.getConnection()
# XXX: We can't replicate unfinished transactions but do we need such
# complex code ? Backup mechanism does not rely on this: instead
# the upstream storage delays the answer. Maybe we can do the same
# for internal replication.
def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list): def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
"""This is a callback from MasterOperationHandler.""" """This is a callback from MasterOperationHandler."""
assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list) assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list)
......
...@@ -115,7 +115,7 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -115,7 +115,7 @@ class ClientApplicationTests(NeoUnitTestBase):
# connection to SN close # connection to SN close
self.assertFalse(oid in cache._oid_dict) self.assertFalse(oid in cache._oid_dict)
conn = Mock({'getAddress': ('', 0)}) conn = Mock({'getAddress': ('', 0)})
app.cp = Mock({'iterateForObject': [(Mock(), conn)]}) app.cp = Mock({'iterateForObject': (conn,)})
def fakeReceived(packet): def fakeReceived(packet):
packet.setId(0) packet.setId(0)
conn.fakeReceived = iter((packet,)).next conn.fakeReceived = iter((packet,)).next
......
...@@ -39,8 +39,6 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -39,8 +39,6 @@ class MasterClientHandlerTests(NeoUnitTestBase):
# define some variable to simulate client and storage node # define some variable to simulate client and storage node
self.client_port = 11022 self.client_port = 11022
self.storage_port = 10021 self.storage_port = 10021
self.master_port = 10010
self.master_address = ('127.0.0.1', self.master_port)
self.client_address = ('127.0.0.1', self.client_port) self.client_address = ('127.0.0.1', self.client_port)
self.storage_address = ('127.0.0.1', self.storage_port) self.storage_address = ('127.0.0.1', self.storage_port)
self.storage_uuid = self.getStorageUUID() self.storage_uuid = self.getStorageUUID()
...@@ -63,105 +61,6 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -63,105 +61,6 @@ class MasterClientHandlerTests(NeoUnitTestBase):
) )
return uuid return uuid
def checkAnswerBeginTransaction(self, conn):
return self.checkAnswerPacket(conn, Packets.AnswerBeginTransaction)
# Tests
def test_07_askBeginTransaction(self):
tid1 = self.getNextTID()
tid2 = self.getNextTID()
service = self.service
tm_org = self.app.tm
self.app.tm = tm = Mock({
'begin': '\x00\x00\x00\x00\x00\x00\x00\x01',
})
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
client_node = self.app.nm.getByUUID(client_uuid)
conn = self.getFakeConnection(client_uuid, self.client_address)
service.askBeginTransaction(conn, None)
calls = tm.mockGetNamedCalls('begin')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(client_node, None)
self.checkAnswerBeginTransaction(conn)
# Client asks for a TID
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.tm = tm_org
service.askBeginTransaction(conn, tid1)
calls = tm.mockGetNamedCalls('begin')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(client_node, None)
packet = self.checkAnswerBeginTransaction(conn)
self.assertEqual(packet.decode(), (tid1, ))
def test_08_askNewOIDs(self):
service = self.service
oid1, oid2 = p64(1), p64(2)
self.app.tm.setLastOID(oid1)
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
for node in self.app.nm.getStorageList():
conn = self.getFakeConnection(node.getUUID(), node.getAddress())
node.setConnection(conn)
service.askNewOIDs(conn, 1)
self.assertTrue(self.app.tm.getLastOID() > oid1)
def test_09_askFinishTransaction(self):
service = self.service
# do the right job
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
storage_uuid = self.storage_uuid
storage_conn = self.getFakeConnection(storage_uuid,
self.storage_address, is_server=True)
storage2_uuid = self.identifyToMasterNode(port=10022)
storage2_conn = self.getFakeConnection(storage2_uuid,
(self.storage_address[0], self.storage_address[1] + 1),
is_server=True)
self.app.setStorageReady(storage2_uuid)
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.pt = Mock({
'getPartition': 0,
'getCellList': [
Mock({'getUUID': storage_uuid}),
Mock({'getUUID': storage2_uuid}),
],
'getPartitions': 2,
})
ttid = self.getNextTID()
service.askBeginTransaction(conn, ttid)
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
# No packet sent if storage node is not ready
self.assertFalse(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, ttid, (), ())
self.checkNoPacketSent(storage_conn)
# ...but AskLockInformation is sent if it is ready
self.app.setStorageReady(storage_uuid)
self.assertTrue(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, ttid, (), ())
self.checkAskPacket(storage_conn, Packets.AskLockInformation)
self.assertEqual(len(self.app.tm.registerForNotification(storage_uuid)), 1)
txn = self.app.tm[ttid]
pending_ttid = list(self.app.tm.registerForNotification(storage_uuid))[0]
self.assertEqual(ttid, pending_ttid)
self.assertEqual(len(txn.getOIDList()), 0)
self.assertEqual(len(txn.getUUIDList()), 1)
def test_connectionClosed(self):
# give a client uuid which have unfinished transactions
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT,
port = self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.listening_conn = object() # mark as running
lptid = self.app.pt.getID()
self.assertEqual(self.app.nm.getByUUID(client_uuid).getState(),
NodeStates.RUNNING)
self.service.connectionClosed(conn)
# node must be have been remove, and no more transaction must remains
self.assertEqual(self.app.nm.getByUUID(client_uuid), None)
self.assertEqual(lptid, self.app.pt.getID())
def test_askPack(self): def test_askPack(self):
self.assertEqual(self.app.packing, None) self.assertEqual(self.app.packing, None)
self.app.nm.createClient() self.app.nm.createClient()
......
...@@ -19,9 +19,9 @@ from ..mock import Mock ...@@ -19,9 +19,9 @@ from ..mock import Mock
from neo.lib import protocol from neo.lib import protocol
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib.protocol import NodeTypes, NodeStates, Packets from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.master.app import Application
from neo.master.handlers.election import ClientElectionHandler, \ from neo.master.handlers.election import ClientElectionHandler, \
ServerElectionHandler ServerElectionHandler
from neo.master.app import Application
from neo.lib.exception import ElectionFailure from neo.lib.exception import ElectionFailure
from neo.lib.connection import ClientConnection from neo.lib.connection import ClientConnection
......
...@@ -24,66 +24,11 @@ from neo.master.transactions import TransactionManager ...@@ -24,66 +24,11 @@ from neo.master.transactions import TransactionManager
class testTransactionManager(NeoUnitTestBase): class testTransactionManager(NeoUnitTestBase):
def makeOID(self, i):
return pack('!Q', i)
def makeNode(self, node_type): def makeNode(self, node_type):
uuid = self.getNewUUID(node_type) uuid = self.getNewUUID(node_type)
node = Mock({'getUUID': uuid, '__hash__': uuid, '__repr__': 'FakeNode'}) node = Mock({'getUUID': uuid, '__hash__': uuid, '__repr__': 'FakeNode'})
return uuid, node return uuid, node
def test_storageLost(self):
client1 = Mock({'__hash__': 1})
client2 = Mock({'__hash__': 2})
client3 = Mock({'__hash__': 3})
storage_1_uuid = self.getStorageUUID()
storage_2_uuid = self.getStorageUUID()
oid_list = [self.makeOID(1), ]
tm = TransactionManager(None)
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
ttid1 = tm.begin(client1)
tid1 = tm.prepare(ttid1, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(ttid1, storage_2_uuid)
t1 = tm[ttid1]
self.assertFalse(t1.locked())
# Storage 1 dies:
# t1 is over
self.assertTrue(t1.storageLost(storage_1_uuid))
self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
del tm[ttid1]
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
ttid2 = tm.begin(client2)
tid2 = tm.prepare(ttid2, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_2)
t2 = tm[ttid2]
self.assertFalse(t2.locked())
# Storage 1 dies:
# t2 still waits for storage 2
self.assertFalse(t2.storageLost(storage_1_uuid))
self.assertEqual(t2.getUUIDList(), [storage_2_uuid])
self.assertTrue(t2.lock(storage_2_uuid))
del tm[ttid2]
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
ttid3 = tm.begin(client3)
tid3 = tm.prepare(ttid3, 1, oid_list, [storage_2_uuid, ],
msg_id_3)
t3 = tm[ttid3]
self.assertFalse(t3.locked())
# Storage 1 dies:
# t3 doesn't care
self.assertFalse(t3.storageLost(storage_1_uuid))
self.assertEqual(t3.getUUIDList(), [storage_2_uuid])
self.assertTrue(t3.lock(storage_2_uuid))
del tm[ttid3]
def testTIDUtils(self): def testTIDUtils(self):
""" """
Tests packTID/unpackTID/addTID. Tests packTID/unpackTID/addTID.
...@@ -110,53 +55,14 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -110,53 +55,14 @@ class testTransactionManager(NeoUnitTestBase):
unpackTID(addTID(packTID((2010, 11, 30, 23, 59), 2**32 - 1), 1)), unpackTID(addTID(packTID((2010, 11, 30, 23, 59), 2**32 - 1), 1)),
((2010, 12, 1, 0, 0), 0)) ((2010, 12, 1, 0, 0), 0))
def testTransactionLock(self):
"""
Transaction lock is present to ensure invalidation TIDs are sent in
strictly increasing order.
Note: this implementation might change later, for more parallelism.
"""
client_uuid, client = self.makeNode(NodeTypes.CLIENT)
tm = TransactionManager(None)
# With a requested TID, lock spans from begin to remove
ttid1 = self.getNextTID()
ttid2 = self.getNextTID()
tid1 = tm.begin(client, ttid1)
self.assertEqual(tid1, ttid1)
del tm[ttid1]
# Without a requested TID, lock spans from prepare to remove only
ttid3 = tm.begin(client)
ttid4 = tm.begin(client) # Doesn't raise
node = Mock({'getUUID': client_uuid, '__hash__': 0})
tid4 = tm.prepare(ttid4, 1, [], [], 0)
del tm[ttid4]
tm.prepare(ttid3, 1, [], [], 0)
def testClientDisconectsAfterBegin(self): def testClientDisconectsAfterBegin(self):
client_uuid1, node1 = self.makeNode(NodeTypes.CLIENT) client_uuid1, node1 = self.makeNode(NodeTypes.CLIENT)
tm = TransactionManager(None) tm = TransactionManager(None)
tid1 = self.getNextTID() tid1 = self.getNextTID()
tid2 = self.getNextTID() tid2 = self.getNextTID()
tm.begin(node1, tid1) tm.begin(node1, 0, tid1)
tm.clientLost(node1) tm.clientLost(node1)
self.assertTrue(tid1 not in tm) self.assertTrue(tid1 not in tm)
def testUnlockPending(self):
callback = Mock()
uuid1, node1 = self.makeNode(NodeTypes.CLIENT)
uuid2, node2 = self.makeNode(NodeTypes.CLIENT)
storage_uuid = self.getStorageUUID()
tm = TransactionManager(callback)
ttid1 = tm.begin(node1)
ttid2 = tm.begin(node2)
tid1 = tm.prepare(ttid1, 1, [], [storage_uuid], 0)
tid2 = tm.prepare(ttid2, 1, [], [storage_uuid], 0)
tm.lock(ttid2, storage_uuid)
# txn 2 is still blocked by txn 1
self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
tm.lock(ttid1, storage_uuid)
# both transactions are unlocked when txn 1 is fully locked
self.assertEqual(len(callback.getNamedCalls('__call__')), 2)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -983,7 +983,10 @@ class NEOThreadedTest(NeoTestBase): ...@@ -983,7 +983,10 @@ class NEOThreadedTest(NeoTestBase):
self.assertRaises(ConnectionClosed, txn.commit) self.assertRaises(ConnectionClosed, txn.commit)
def assertPartitionTable(self, cluster, stats): def assertPartitionTable(self, cluster, stats):
self.assertEqual(stats, '|'.join(cluster.admin.pt.formatRows())) pt = cluster.admin.pt
index = [x.uuid for x in cluster.storage_list].index
self.assertEqual(stats, '|'.join(pt._formatRows(sorted(
pt.count_dict, key=lambda x: index(x.getUUID())))))
def predictable_random(seed=None): def predictable_random(seed=None):
......
...@@ -1419,6 +1419,49 @@ class Test(NEOThreadedTest): ...@@ -1419,6 +1419,49 @@ class Test(NEOThreadedTest):
self.tic() self.tic()
t1.commit() t1.commit()
@with_cluster(replicas=1)
def testReplicaDisconnectionDuringCommit(self, cluster):
"""
S0 C S1
<------- c1+=1 -->
<------- c2+=2 --> C-S1 closed
<------- c3+=3
U U
finish O
U
down
loads <--
"""
count = [0]
def ask(orig, self, packet, **kw):
if (isinstance(packet, Packets.AskStoreObject)
and self.getUUID() == s1.uuid):
count[0] += 1
if count[0] == 2:
self.close()
return orig(self, packet, **kw)
s0, s1 = cluster.storage_list
t, c = cluster.getTransaction()
r = c.root()
for x in xrange(3):
r[x] = PCounter()
t.commit()
for x in xrange(3):
r[x].value += x
with ConnectionFilter() as f, Patch(MTClientConnection, ask=ask):
f.delayAskFetchTransactions()
t.commit()
self.assertEqual(count[0], 2)
self.assertPartitionTable(cluster, 'UO')
self.tic()
s0.stop()
cluster.join((s0,))
cluster.client._cache.clear()
value_list = []
for x in xrange(3):
r[x]._p_deactivate()
value_list.append(r[x].value)
self.assertEqual(value_list, range(3))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment