Commit 0fc95175 authored by Julien Muchembled's avatar Julien Muchembled

Bump protocol version

parents fd95a217 4c3b6c4d
...@@ -15,11 +15,10 @@ ...@@ -15,11 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import NotReadyError, PrimaryFailure, ProtocolError
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import uuid_str, \ from neo.lib.protocol import uuid_str, NodeTypes, Packets
NodeTypes, NotReadyError, Packets, ProtocolError
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
from neo.lib.exception import PrimaryFailure
NOT_CONNECTED_MESSAGE = 'Not connected to a primary master.' NOT_CONNECTED_MESSAGE = 'Not connected to a primary master.'
......
...@@ -20,6 +20,7 @@ from zope.interface import implementer ...@@ -20,6 +20,7 @@ from zope.interface import implementer
import ZODB.interfaces import ZODB.interfaces
from neo.lib import logging from neo.lib import logging
from neo.lib.util import tidFromTime
from .app import Application from .app import Application
from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError
...@@ -235,7 +236,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -235,7 +236,7 @@ class Storage(BaseStorage.BaseStorage,
logging.warning('Garbage Collection is not available in NEO,' logging.warning('Garbage Collection is not available in NEO,'
' please use an external tool. Packing without GC.') ' please use an external tool. Packing without GC.')
try: try:
self.app.pack(t) self.app.pack(tidFromTime(t))
except Exception: except Exception:
logging.exception('pack_time=%r', t) logging.exception('pack_time=%r', t)
raise raise
......
...@@ -28,20 +28,25 @@ def patch(): ...@@ -28,20 +28,25 @@ def patch():
# successful commit (which ends with a response from the master) already # successful commit (which ends with a response from the master) already
# acts as a "network barrier". # acts as a "network barrier".
# BBB: What this monkey-patch does has been merged in ZODB5. # BBB: What this monkey-patch does has been merged in ZODB5.
if not hasattr(Connection, '_flush_invalidations'): if hasattr(Connection, '_flush_invalidations'):
return
assert H(Connection.afterCompletion) in ( assert H(Connection.afterCompletion) in (
'cd3a080b80fd957190ff3bb867149448', # Python 2.7 'cd3a080b80fd957190ff3bb867149448', # Python 2.7
'b1d9685c13967d4b6d74c7ef86f68f17', # PyPy 2.7 'b1d9685c13967d4b6d74c7ef86f68f17', # PyPy 2.7
) )
def afterCompletion(self, *ignored): def afterCompletion(self, *ignored):
self._readCurrent.clear() self._readCurrent.clear()
# PATCH: do not call sync() # PATCH: do not call sync()
self._flush_invalidations() self._flush_invalidations()
Connection.afterCompletion = afterCompletion Connection.afterCompletion = afterCompletion
global TransactionMetaData
try:
from ZODB.Connection import TransactionMetaData
except ImportError: # BBB: ZODB < 5
from ZODB.BaseStorage import TransactionRecord
TransactionMetaData = lambda user='', description='', extension=None: \
TransactionRecord(None, None, user, description, extension)
patch() patch()
from . import app # set up signal handlers early enough to do it in the main thread from . import app # set up signal handlers early enough to do it in the main thread
...@@ -25,7 +25,6 @@ except ImportError: ...@@ -25,7 +25,6 @@ except ImportError:
from cPickle import dumps, loads from cPickle import dumps, loads
_protocol = 1 _protocol = 1
from ZODB.POSException import UndoError, ConflictError, ReadConflictError from ZODB.POSException import UndoError, ConflictError, ReadConflictError
from persistent.TimeStamp import TimeStamp
from neo.lib import logging from neo.lib import logging
from neo.lib.compress import decompress_list, getCompress from neo.lib.compress import decompress_list, getCompress
...@@ -35,6 +34,7 @@ from neo.lib.util import makeChecksum, dump ...@@ -35,6 +34,7 @@ from neo.lib.util import makeChecksum, dump
from neo.lib.locking import Empty, Lock from neo.lib.locking import Empty, Lock
from neo.lib.connection import MTClientConnection, ConnectionClosed from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.exception import NodeNotReady from neo.lib.exception import NodeNotReady
from . import TransactionMetaData
from .exception import (NEOStorageError, NEOStorageCreationUndoneError, from .exception import (NEOStorageError, NEOStorageCreationUndoneError,
NEOStorageReadRetry, NEOStorageNotFoundError, NEOPrimaryMasterLost) NEOStorageReadRetry, NEOStorageNotFoundError, NEOPrimaryMasterLost)
from .handlers import storage, master from .handlers import storage, master
...@@ -49,6 +49,8 @@ CHECKED_SERIAL = object() ...@@ -49,6 +49,8 @@ CHECKED_SERIAL = object()
# failed in the past. # failed in the past.
MAX_FAILURE_AGE = 600 MAX_FAILURE_AGE = 600
TXN_PACK_DESC = 'IStorage.pack'
try: try:
from Signals.Signals import SignalHandler from Signals.Signals import SignalHandler
except ImportError: except ImportError:
...@@ -64,6 +66,8 @@ class Application(ThreadedApplication): ...@@ -64,6 +66,8 @@ class Application(ThreadedApplication):
# the transaction is really committed, no matter for how long the master # the transaction is really committed, no matter for how long the master
# is unreachable. # is unreachable.
max_reconnection_to_master = float('inf') max_reconnection_to_master = float('inf')
# For tests only. See end of pack() method.
wait_for_pack = False
def __init__(self, master_nodes, name, compress=True, cache_size=None, def __init__(self, master_nodes, name, compress=True, cache_size=None,
**kw): **kw):
...@@ -499,7 +503,6 @@ class Application(ThreadedApplication): ...@@ -499,7 +503,6 @@ class Application(ThreadedApplication):
compression = 0 compression = 0
checksum = ZERO_HASH checksum = ZERO_HASH
else: else:
assert data_serial is None
size, compression, compressed_data = self.compress(data) size, compression, compressed_data = self.compress(data)
checksum = makeChecksum(compressed_data) checksum = makeChecksum(compressed_data)
txn_context.data_size += size txn_context.data_size += size
...@@ -529,7 +532,7 @@ class Application(ThreadedApplication): ...@@ -529,7 +532,7 @@ class Application(ThreadedApplication):
if data is CHECKED_SERIAL: if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, raise ReadConflictError(oid=oid,
serials=(serial, old_serial)) serials=(serial, old_serial))
# TODO: data can be None if a conflict happens during undo # data can be None if a conflict happens when undoing creation
if data: if data:
txn_context.data_size -= len(data) txn_context.data_size -= len(data)
if self.last_tid < serial: if self.last_tid < serial:
...@@ -591,7 +594,8 @@ class Application(ThreadedApplication): ...@@ -591,7 +594,8 @@ class Application(ThreadedApplication):
# user and description are cast to str in case they're unicode. # user and description are cast to str in case they're unicode.
# BBB: This is not required anymore with recent ZODB. # BBB: This is not required anymore with recent ZODB.
packet = Packets.AskStoreTransaction(ttid, str(transaction.user), packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), ext, list(txn_context.cache_dict)) str(transaction.description), ext, list(txn_context.cache_dict),
txn_context.pack)
queue = txn_context.queue queue = txn_context.queue
conn_dict = txn_context.conn_dict conn_dict = txn_context.conn_dict
# Ask in parallel all involved storage nodes to commit object metadata. # Ask in parallel all involved storage nodes to commit object metadata.
...@@ -706,7 +710,7 @@ class Application(ThreadedApplication): ...@@ -706,7 +710,7 @@ class Application(ThreadedApplication):
del cache_dict[oid] del cache_dict[oid]
ttid = txn_context.ttid ttid = txn_context.ttid
p = Packets.AskFinishTransaction(ttid, list(cache_dict), p = Packets.AskFinishTransaction(ttid, list(cache_dict),
checked_list) checked_list, txn_context.pack)
try: try:
tid = self._askPrimary(p, cache_dict=cache_dict, callback=f) tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
assert tid assert tid
...@@ -760,7 +764,7 @@ class Application(ThreadedApplication): ...@@ -760,7 +764,7 @@ class Application(ThreadedApplication):
'partition_oid_dict': partition_oid_dict, 'partition_oid_dict': partition_oid_dict,
'undo_object_tid_dict': undo_object_tid_dict, 'undo_object_tid_dict': undo_object_tid_dict,
} }
while partition_oid_dict: while 1:
for partition, oid_list in partition_oid_dict.iteritems(): for partition, oid_list in partition_oid_dict.iteritems():
cell_list = [cell cell_list = [cell
for cell in getCellList(partition, readable=True) for cell in getCellList(partition, readable=True)
...@@ -769,11 +773,17 @@ class Application(ThreadedApplication): ...@@ -769,11 +773,17 @@ class Application(ThreadedApplication):
# only between the client and the storage, the latter would # only between the client and the storage, the latter would
# still be readable until we commit. # still be readable until we commit.
if txn_context.conn_dict.get(cell.getUUID(), 0) is not None] if txn_context.conn_dict.get(cell.getUUID(), 0) is not None]
storage_conn = getConnForNode( conn = getConnForNode(
min(cell_list, key=getCellSortKey).getNode()) min(cell_list, key=getCellSortKey).getNode())
storage_conn.ask(Packets.AskObjectUndoSerial(ttid, try:
conn.ask(Packets.AskObjectUndoSerial(ttid,
snapshot_tid, undone_tid, oid_list), snapshot_tid, undone_tid, oid_list),
partition=partition, **kw) partition=partition, **kw)
except AttributeError:
if conn is not None:
raise
except ConnectionClosed:
pass
# Wait for all AnswerObjectUndoSerial. We might get # Wait for all AnswerObjectUndoSerial. We might get
# OidNotFoundError, meaning that objects in transaction's oid_list # OidNotFoundError, meaning that objects in transaction's oid_list
...@@ -785,10 +795,37 @@ class Application(ThreadedApplication): ...@@ -785,10 +795,37 @@ class Application(ThreadedApplication):
self.dispatcher.forget_queue(queue) self.dispatcher.forget_queue(queue)
raise UndoError('non-undoable transaction') raise UndoError('non-undoable transaction')
if not partition_oid_dict:
break
# Do not retry too quickly, for example
# when there's an incoming PT update.
self.sync()
# Send undo data to all storage nodes. # Send undo data to all storage nodes.
for oid, (current_serial, undo_serial, is_current) in \ for oid, (current_serial, undo_serial, is_current) in \
undo_object_tid_dict.iteritems(): undo_object_tid_dict.iteritems():
if is_current: if is_current:
if undo_serial:
# The data are used:
# - by outdated cells that don't have them
# - if there's a conflict to resolve
# Otherwise, they're ignored.
# IDEA: So as an optimization, if all cells we're going to
# write are readable, we could move the following
# load to _handleConflicts and simply pass None here.
# But evaluating such condition without race
# condition is not easy:
# 1. The transaction context must have established
# with all nodes that will be involved (e.g.
# doable while processing partition_oid_dict).
# 2. The partition table must be up-to-date by
# pinging the master (i.e. self.sync()).
# 3. At last, the PT can be looked up here.
try:
data = self.load(oid, undo_serial)[0]
except NEOStorageCreationUndoneError:
data = None
else:
data = None data = None
else: else:
# Serial being undone is not the latest version for this # Serial being undone is not the latest version for this
...@@ -945,12 +982,16 @@ class Application(ThreadedApplication): ...@@ -945,12 +982,16 @@ class Application(ThreadedApplication):
def sync(self): def sync(self):
self._askPrimary(Packets.Ping()) self._askPrimary(Packets.Ping())
def pack(self, t): def pack(self, tid, _oids=None): # TODO: API for partial pack
tid = TimeStamp(*time.gmtime(t)[:5] + (t % 60, )).raw() transaction = TransactionMetaData(description=TXN_PACK_DESC)
if tid == ZERO_TID: self.tpc_begin(None, transaction)
raise NEOStorageError('Invalid pack time') self._txn_container.get(transaction).pack = _oids and sorted(_oids), tid
self._askPrimary(Packets.AskPack(tid)) tid = self.tpc_finish(transaction)
# XXX: this is only needed to make ZODB unit tests pass. if not self.wait_for_pack:
return
# Waiting for pack to be finished is only needed
# to make ZODB unit tests pass.
self._askPrimary(Packets.WaitForPack(tid))
# It should not be otherwise required (clients should be free to load # It should not be otherwise required (clients should be free to load
# old data as long as it is available in cache, event if it was pruned # old data as long as it is available in cache, event if it was pruned
# by a pack), so don't bother invalidating on other clients. # by a pack), so don't bother invalidating on other clients.
......
...@@ -37,6 +37,13 @@ class NEOStorageCreationUndoneError(NEOStorageDoesNotExistError): ...@@ -37,6 +37,13 @@ class NEOStorageCreationUndoneError(NEOStorageDoesNotExistError):
some object existed at some point, but its creation was undone. some object existed at some point, but its creation was undone.
""" """
class NEOUndoPackError(NEOStorageNotFoundError):
"""Race condition between undo & pack
While undoing a transaction, an oid record disappeared.
This can happen if the storage node is packing.
"""
# TODO: Inherit from transaction.interfaces.TransientError # TODO: Inherit from transaction.interfaces.TransientError
# (not recognized yet by ERP5 as a transient error). # (not recognized yet by ERP5 as a transient error).
class NEOPrimaryMasterLost(POSException.ReadConflictError): class NEOPrimaryMasterLost(POSException.ReadConflictError):
......
...@@ -174,3 +174,6 @@ class PrimaryAnswersHandler(AnswerBaseHandler): ...@@ -174,3 +174,6 @@ class PrimaryAnswersHandler(AnswerBaseHandler):
def answerFinalTID(self, conn, tid): def answerFinalTID(self, conn, tid):
self.app.setHandlerData(tid) self.app.setHandlerData(tid)
def waitedForPack(self, conn):
pass
...@@ -25,8 +25,10 @@ from neo.lib.exception import NodeNotReady ...@@ -25,8 +25,10 @@ from neo.lib.exception import NodeNotReady
from neo.lib.handler import MTEventHandler from neo.lib.handler import MTEventHandler
from . import AnswerBaseHandler from . import AnswerBaseHandler
from ..transactions import Transaction from ..transactions import Transaction
from ..exception import NEOStorageError, NEOStorageNotFoundError from ..exception import (
from ..exception import NEOStorageReadRetry, NEOStorageDoesNotExistError NEOStorageError, NEOStorageNotFoundError, NEOUndoPackError,
NEOStorageReadRetry, NEOStorageDoesNotExistError,
)
@apply @apply
class _DeadlockPacket(object): class _DeadlockPacket(object):
...@@ -194,6 +196,9 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -194,6 +196,9 @@ class StorageAnswersHandler(AnswerBaseHandler):
# This can happen when requiring txn informations # This can happen when requiring txn informations
raise NEOStorageNotFoundError(message) raise NEOStorageNotFoundError(message)
def undoPackError(self, conn, message):
raise NEOUndoPackError(message)
def nonReadableCell(self, conn, message): def nonReadableCell(self, conn, message):
logging.info('non readable cell') logging.info('non readable cell')
raise NEOStorageReadRetry(True) raise NEOStorageReadRetry(True)
......
...@@ -31,6 +31,7 @@ class Transaction(object): ...@@ -31,6 +31,7 @@ class Transaction(object):
voted = False voted = False
ttid = None # XXX: useless, except for testBackupReadOnlyAccess ttid = None # XXX: useless, except for testBackupReadOnlyAccess
lockless_dict = None # {partition: {uuid}} lockless_dict = None # {partition: {uuid}}
pack = None
def __init__(self, txn): def __init__(self, txn):
self.queue = SimpleQueue() self.queue = SimpleQueue()
......
...@@ -21,9 +21,9 @@ from msgpack.exceptions import OutOfData, UnpackValueError ...@@ -21,9 +21,9 @@ from msgpack.exceptions import OutOfData, UnpackValueError
from . import attributeTracker, logging from . import attributeTracker, logging
from .connector import ConnectorException, ConnectorDelayedConnection from .connector import ConnectorException, ConnectorDelayedConnection
from .exception import PacketMalformedError
from .locking import RLock from .locking import RLock
from .protocol import uuid_str, Errors, PacketMalformedError, Packets, \ from .protocol import uuid_str, Errors, Packets, Unpacker
Unpacker
try: try:
msgpack.Unpacker().read_bytes(1) msgpack.Unpacker().read_bytes(1)
...@@ -600,11 +600,40 @@ class Connection(BaseConnection): ...@@ -600,11 +600,40 @@ class Connection(BaseConnection):
packet.setId(self.peer_id) packet.setId(self.peer_id)
self._addPacket(packet) self._addPacket(packet)
def delayedAnswer(self, packet):
return DelayedAnswer(self, packet)
def _connected(self): def _connected(self):
self.connecting = False self.connecting = False
self.getHandler().connectionCompleted(self) self.getHandler().connectionCompleted(self)
class DelayedAnswer(object):
def __init__(self, conn, packet):
assert packet.isResponse() and not packet.isError(), packet
self.conn = conn
self.packet = packet
self.msg_id = conn.peer_id
def __call__(self, *args):
# Same behaviour as Connection.answer for closed connections.
# Not more tolerant, because connections are expected to be properly
# cleaned up when they're closed (__eq__/__hash__ help to identify
# instances that are related to the connection being closed).
try:
self.conn.send(self.packet(*args), self.msg_id)
except ConnectionClosed:
if self.packet.ignoreOnClosedConnection():
raise
def __hash__(self):
return hash(self.conn)
def __eq__(self, other):
return self is other or self.conn is other
class ClientConnection(Connection): class ClientConnection(Connection):
"""A connection from this node to a remote node.""" """A connection from this node to a remote node."""
......
...@@ -14,8 +14,9 @@ ...@@ -14,8 +14,9 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os import fcntl, os
from collections import deque from collections import deque
from signal import set_wakeup_fd
from time import time from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EAGAIN, EEXIST, EINTR, ENOENT from errno import EAGAIN, EEXIST, EINTR, ENOENT
...@@ -31,6 +32,15 @@ def dictionary_changed_size_during_iteration(): ...@@ -31,6 +32,15 @@ def dictionary_changed_size_during_iteration():
return str(e) return str(e)
raise AssertionError raise AssertionError
def nonblock(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# We use set_wakeup_fd to handle the case of a signal that happens between
# Python checks for signals and epoll_wait is called. Otherwise, the signal
# would not be processed as long as epoll_wait sleeps.
# If a process has several instances of EpollEventManager like in threaded
# tests, it does not matter which one is woke up by signals.
class EpollEventManager(object): class EpollEventManager(object):
"""This class manages connections and events based on epoll(5).""" """This class manages connections and events based on epoll(5)."""
...@@ -44,9 +54,17 @@ class EpollEventManager(object): ...@@ -44,9 +54,17 @@ class EpollEventManager(object):
self.epoll = epoll() self.epoll = epoll()
self._pending_processing = deque() self._pending_processing = deque()
self._trigger_list = [] self._trigger_list = []
self._trigger_fd, w = os.pipe() r, w = os.pipe()
os.close(w) self._wakeup_rfd = r
self._wakeup_wfd = w
nonblock(r)
nonblock(w)
fd = set_wakeup_fd(w)
assert fd == -1, fd
self.epoll.register(r, EPOLLIN)
self._trigger_lock = Lock() self._trigger_lock = Lock()
self.lock = l = Lock()
l.acquire()
close_list = [] close_list = []
self._closeAppend = close_list.append self._closeAppend = close_list.append
l = Lock() l = Lock()
...@@ -61,9 +79,12 @@ class EpollEventManager(object): ...@@ -61,9 +79,12 @@ class EpollEventManager(object):
self._closeRelease = release self._closeRelease = release
def close(self): def close(self):
os.close(self._trigger_fd) set_wakeup_fd(-1)
os.close(self._wakeup_wfd)
os.close(self._wakeup_rfd)
for c in self.connection_dict.values(): for c in self.connection_dict.values():
c.close() c.close()
self.epoll.close()
del self.__dict__ del self.__dict__
def getConnectionList(self): def getConnectionList(self):
...@@ -188,6 +209,15 @@ class EpollEventManager(object): ...@@ -188,6 +209,15 @@ class EpollEventManager(object):
# granularity of 1ms and Python 2.7 rounds the timeout towards zero. # granularity of 1ms and Python 2.7 rounds the timeout towards zero.
# See also https://bugs.python.org/issue20452 (fixed in Python 3). # See also https://bugs.python.org/issue20452 (fixed in Python 3).
blocking = .001 + max(0, timeout - time()) if timeout else -1 blocking = .001 + max(0, timeout - time()) if timeout else -1
def poll(blocking):
l = self.lock
l.release()
try:
return self.epoll.poll(blocking)
finally:
l.acquire()
else:
poll = self.epoll.poll
# From this point, and until we have processed all fds returned by # From this point, and until we have processed all fds returned by
# epoll, we must prevent any fd from being closed, because they could # epoll, we must prevent any fd from being closed, because they could
# be reallocated by new connection, either by this thread or by another. # be reallocated by new connection, either by this thread or by another.
...@@ -195,7 +225,7 @@ class EpollEventManager(object): ...@@ -195,7 +225,7 @@ class EpollEventManager(object):
# 'finally' clause. # 'finally' clause.
self._closeAcquire() self._closeAcquire()
try: try:
event_list = self.epoll.poll(blocking) event_list = poll(blocking)
except IOError, exc: except IOError, exc:
if exc.errno in (0, EAGAIN): if exc.errno in (0, EAGAIN):
logging.info('epoll.poll triggered undocumented error %r', logging.info('epoll.poll triggered undocumented error %r',
...@@ -213,6 +243,15 @@ class EpollEventManager(object): ...@@ -213,6 +243,15 @@ class EpollEventManager(object):
try: try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
except KeyError: except KeyError:
if fd == self._wakeup_rfd:
os.read(fd, 8)
with self._trigger_lock:
action_list = self._trigger_list
try:
while action_list:
action_list.pop(0)()
finally:
del action_list[:]
continue continue
if conn.readable(): if conn.readable():
pending_processing.append(conn) pending_processing.append(conn)
...@@ -230,15 +269,6 @@ class EpollEventManager(object): ...@@ -230,15 +269,6 @@ class EpollEventManager(object):
try: try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
except KeyError: except KeyError:
if fd == self._trigger_fd:
with self._trigger_lock:
self.epoll.unregister(fd)
action_list = self._trigger_list
try:
while action_list:
action_list.pop(0)()
finally:
del action_list[:]
continue continue
if conn.readable(): if conn.readable():
pending_processing.append(conn) pending_processing.append(conn)
...@@ -262,10 +292,10 @@ class EpollEventManager(object): ...@@ -262,10 +292,10 @@ class EpollEventManager(object):
with self._trigger_lock: with self._trigger_lock:
self._trigger_list += actions self._trigger_list += actions
try: try:
self.epoll.register(self._trigger_fd) os.write(self._wakeup_wfd, '\0')
except IOError, e: except OSError, e:
# Ignore if 'wakeup' is called several times in a row. # Ignore if wakeup fd is triggered many times in a row.
if e.errno != EEXIST: if e.errno != EAGAIN:
raise raise
def addReader(self, conn): def addReader(self, conn):
......
...@@ -29,3 +29,33 @@ class StoppedOperation(NeoException): ...@@ -29,3 +29,33 @@ class StoppedOperation(NeoException):
class NodeNotReady(NeoException): class NodeNotReady(NeoException):
pass pass
class ProtocolError(NeoException):
""" Base class for protocol errors, close the connection """
class PacketMalformedError(ProtocolError):
pass
class UnexpectedPacketError(ProtocolError):
pass
class NotReadyError(ProtocolError):
pass
class BackendNotImplemented(NeoException):
""" Method not implemented by backend storage """
class NonReadableCell(NeoException):
"""Read-access to a cell that is actually non-readable
This happens in case of race condition at processing partition table
updates: client's PT is older or newer than storage's. The latter case is
possible because the master must validate any end of replication, which
means that the storage node can't anticipate the PT update (concurrently,
there may be a first tweaks that moves the replicated cell to another node,
and a second one that moves it back).
On such event, the client must retry, preferably another cell.
"""
class UndoPackError(NeoException):
pass
...@@ -19,10 +19,9 @@ from collections import deque ...@@ -19,10 +19,9 @@ from collections import deque
from operator import itemgetter from operator import itemgetter
from . import logging from . import logging
from .connection import ConnectionClosed from .connection import ConnectionClosed
from .exception import PrimaryElected from .exception import (BackendNotImplemented, NonReadableCell, NotReadyError,
from .protocol import (NodeStates, NodeTypes, Packets, uuid_str, PacketMalformedError, PrimaryElected, ProtocolError, UnexpectedPacketError)
Errors, BackendNotImplemented, NonReadableCell, NotReadyError, from .protocol import NodeStates, NodeTypes, Packets, uuid_str, Errors
PacketMalformedError, ProtocolError, UnexpectedPacketError)
from .util import cached_property from .util import cached_property
......
...@@ -18,9 +18,9 @@ import errno, json, os ...@@ -18,9 +18,9 @@ import errno, json, os
from time import time from time import time
from . import attributeTracker, logging from . import attributeTracker, logging
from .exception import NotReadyError, ProtocolError
from .handler import DelayEvent, EventQueue from .handler import DelayEvent, EventQueue
from .protocol import formatNodeList, uuid_str, \ from .protocol import formatNodeList, uuid_str, NodeTypes, NodeStates
NodeTypes, NodeStates, NotReadyError, ProtocolError
class Node(object): class Node(object):
......
...@@ -26,7 +26,7 @@ except ImportError: ...@@ -26,7 +26,7 @@ except ImportError:
# The protocol version must be increased whenever upgrading a node may require # The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. # to upgrade other nodes.
PROTOCOL_VERSION = 2 PROTOCOL_VERSION = 3
# By encoding the handshake packet with msgpack, the whole NEO stream can be # By encoding the handshake packet with msgpack, the whole NEO stream can be
# decoded with msgpack. The first byte is 0x92, which is different from TLS # decoded with msgpack. The first byte is 0x92, which is different from TLS
# Handshake (0x16). # Handshake (0x16).
...@@ -173,6 +173,7 @@ def ErrorCodes(): ...@@ -173,6 +173,7 @@ def ErrorCodes():
NON_READABLE_CELL NON_READABLE_CELL
READ_ONLY_ACCESS READ_ONLY_ACCESS
INCOMPLETE_TRANSACTION INCOMPLETE_TRANSACTION
UNDO_PACK_ERROR
@Enum @Enum
def NodeStates(): def NodeStates():
...@@ -233,34 +234,6 @@ uuid_str = (lambda ns: lambda uuid: ...@@ -233,34 +234,6 @@ uuid_str = (lambda ns: lambda uuid:
ns[uuid >> 24] + str(uuid & 0xffffff) if uuid else str(uuid) ns[uuid >> 24] + str(uuid & 0xffffff) if uuid else str(uuid)
)({v: str(k)[0] for k, v in UUID_NAMESPACES.iteritems()}) )({v: str(k)[0] for k, v in UUID_NAMESPACES.iteritems()})
class ProtocolError(Exception):
""" Base class for protocol errors, close the connection """
class PacketMalformedError(ProtocolError):
"""Close the connection"""
class UnexpectedPacketError(ProtocolError):
"""Close the connection"""
class NotReadyError(ProtocolError):
""" Just close the connection """
class BackendNotImplemented(Exception):
""" Method not implemented by backend storage """
class NonReadableCell(Exception):
"""Read-access to a cell that is actually non-readable
This happens in case of race condition at processing partition table
updates: client's PT is older or newer than storage's. The latter case is
possible because the master must validate any end of replication, which
means that the storage node can't anticipate the PT update (concurrently,
there may be a first tweaks that moves the replicated cell to another node,
and a second one that moves it back).
On such event, the client must retry, preferably another cell.
"""
class Packet(object): class Packet(object):
""" """
...@@ -301,21 +274,24 @@ class Packet(object): ...@@ -301,21 +274,24 @@ class Packet(object):
assert isinstance(other, Packet) assert isinstance(other, Packet)
return self._code == other._code return self._code == other._code
def isError(self): @classmethod
return self._code == RESPONSE_MASK def isError(cls):
return cls._code == RESPONSE_MASK
def isResponse(self): @classmethod
return self._code & RESPONSE_MASK def isResponse(cls):
return cls._code & RESPONSE_MASK
def getAnswerClass(self): def getAnswerClass(self):
return self._answer return self._answer
def ignoreOnClosedConnection(self): @classmethod
def ignoreOnClosedConnection(cls):
""" """
Tells if this packet must be ignored when its connection is closed Tells if this packet must be ignored when its connection is closed
when it is handled. when it is handled.
""" """
return self._ignore_when_closed return cls._ignore_when_closed
class PacketRegistryFactory(dict): class PacketRegistryFactory(dict):
...@@ -697,11 +673,37 @@ class Packets(dict): ...@@ -697,11 +673,37 @@ class Packets(dict):
:nodes: C -> S :nodes: C -> S
""") """)
AskPack, AnswerPack = request(""" WaitForPack, WaitedForPack = request("""
Request a pack at given TID. Wait until pack given by tid is completed.
:nodes: C -> M -> S :nodes: C -> M
""", ignore_when_closed=False) """)
AskPackOrders, AnswerPackOrders = request("""
Request list of pack orders excluding oldest completed ones.
:nodes: M -> S; C, S -> M
""")
NotifyPackSigned = notify("""
Send ids of pack orders to be processed. Also used to fix replicas
that may have lost them.
When a pack order is auto-approved, the master also notifies storage
that store it, even though they're already notified via
AskLockInformation. In addition to make the implementation simpler,
storage nodes don't have to detect this case and it's slightly faster
when there's no pack.
:nodes: M -> S, backup
""")
NotifyPackCompleted = notify("""
Notify the master node that partitions have been successfully
packed up to the given ids.
:nodes: S -> M
""")
CheckReplicas = request(""" CheckReplicas = request("""
Ask the cluster to search for mismatches between replicas, metadata Ask the cluster to search for mismatches between replicas, metadata
......
...@@ -30,9 +30,11 @@ class PartitionTableException(Exception): ...@@ -30,9 +30,11 @@ class PartitionTableException(Exception):
class Cell(object): class Cell(object):
"""This class represents a cell in a partition table.""" """This class represents a cell in a partition table."""
state = CellStates.DISCARDED
def __init__(self, node, state = CellStates.UP_TO_DATE): def __init__(self, node, state = CellStates.UP_TO_DATE):
self.node = node self.node = node
self.state = state self.setState(state)
def __repr__(self): def __repr__(self):
return "<Cell(uuid=%s, address=%s, state=%s)>" % ( return "<Cell(uuid=%s, address=%s, state=%s)>" % (
......
...@@ -101,6 +101,9 @@ def datetimeFromTID(tid): ...@@ -101,6 +101,9 @@ def datetimeFromTID(tid):
seconds, lower = divmod(lower * 60, TID_LOW_OVERFLOW) seconds, lower = divmod(lower * 60, TID_LOW_OVERFLOW)
return datetime(*(higher + (seconds, int(lower * MICRO_FROM_UINT32)))) return datetime(*(higher + (seconds, int(lower * MICRO_FROM_UINT32))))
def timeFromTID(tid, _epoch=datetime.utcfromtimestamp(0)):
return (datetimeFromTID(tid) - _epoch).total_seconds()
def addTID(ptid, offset): def addTID(ptid, offset):
""" """
Offset given packed TID. Offset given packed TID.
......
...@@ -42,6 +42,7 @@ def monotonic_time(): ...@@ -42,6 +42,7 @@ def monotonic_time():
from .backup_app import BackupApplication from .backup_app import BackupApplication
from .handlers import identification, administration, client, master, storage from .handlers import identification, administration, client, master, storage
from .pack import PackManager
from .pt import PartitionTable from .pt import PartitionTable
from .recovery import RecoveryManager from .recovery import RecoveryManager
from .transactions import TransactionManager from .transactions import TransactionManager
...@@ -51,7 +52,6 @@ from .verification import VerificationManager ...@@ -51,7 +52,6 @@ from .verification import VerificationManager
@buildOptionParser @buildOptionParser
class Application(BaseApplication): class Application(BaseApplication):
"""The master node application.""" """The master node application."""
packing = None
storage_readiness = 0 storage_readiness = 0
# Latest completely committed TID # Latest completely committed TID
last_transaction = ZERO_TID last_transaction = ZERO_TID
...@@ -101,6 +101,7 @@ class Application(BaseApplication): ...@@ -101,6 +101,7 @@ class Application(BaseApplication):
super(Application, self).__init__( super(Application, self).__init__(
config.get('ssl'), config.get('dynamic_master_list')) config.get('ssl'), config.get('dynamic_master_list'))
self.tm = TransactionManager(self.onTransactionCommitted) self.tm = TransactionManager(self.onTransactionCommitted)
self.pm = PackManager()
self.name = config['cluster'] self.name = config['cluster']
self.server = config['bind'] self.server = config['bind']
...@@ -317,6 +318,8 @@ class Application(BaseApplication): ...@@ -317,6 +318,8 @@ class Application(BaseApplication):
truncate = Packets.Truncate(*e.args) if e.args else None truncate = Packets.Truncate(*e.args) if e.args else None
# Automatic restart except if we truncate or retry to. # Automatic restart except if we truncate or retry to.
self._startup_allowed = not (self.truncate_tid or truncate) self._startup_allowed = not (self.truncate_tid or truncate)
finally:
self.pm.reset()
self.storage_readiness = 0 self.storage_readiness = 0
self.storage_ready_dict.clear() self.storage_ready_dict.clear()
self.storage_starting_set.clear() self.storage_starting_set.clear()
...@@ -560,7 +563,8 @@ class Application(BaseApplication): ...@@ -560,7 +563,8 @@ class Application(BaseApplication):
tid = txn.getTID() tid = txn.getTID()
transaction_node = txn.getNode() transaction_node = txn.getNode()
invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList()) invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
for client_node in self.nm.getClientList(only_identified=True): client_list = self.nm.getClientList(only_identified=True)
for client_node in client_list:
if client_node is transaction_node: if client_node is transaction_node:
client_node.send(Packets.AnswerTransactionFinished(ttid, tid), client_node.send(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId()) msg_id=txn.getMessageId())
...@@ -570,9 +574,26 @@ class Application(BaseApplication): ...@@ -570,9 +574,26 @@ class Application(BaseApplication):
# Unlock Information to relevant storage nodes. # Unlock Information to relevant storage nodes.
notify_unlock = Packets.NotifyUnlockInformation(ttid) notify_unlock = Packets.NotifyUnlockInformation(ttid)
getByUUID = self.nm.getByUUID getByUUID = self.nm.getByUUID
for storage_uuid in txn.getUUIDList(): txn_storage_list = txn.getUUIDList()
for storage_uuid in txn_storage_list:
getByUUID(storage_uuid).send(notify_unlock) getByUUID(storage_uuid).send(notify_unlock)
# Notify storage nodes about new pack order if any.
pack = self.pm.packs.get(tid)
if pack is not None is not pack.approved:
# We could exclude those that store transaction metadata, because
# they can deduce it upon NotifyUnlockInformation: quite simple but
# for the moment, let's optimize the case where there's no pack.
# We're only there in case of automatic approval.
assert pack.approved
pack = Packets.NotifyPackSigned((tid,), ())
for uuid in self.getStorageReadySet():
getByUUID(uuid).send(pack)
# Notify backup clusters.
for node in client_list:
if node.extra.get('backup'):
node.send(pack)
# Notify storage that have replications blocked by this transaction, # Notify storage that have replications blocked by this transaction,
# and clients that try to recover from a failure during tpc_finish. # and clients that try to recover from a failure during tpc_finish.
notify_finished = Packets.NotifyTransactionFinished(ttid, tid) notify_finished = Packets.NotifyTransactionFinished(ttid, tid)
...@@ -612,6 +633,9 @@ class Application(BaseApplication): ...@@ -612,6 +633,9 @@ class Application(BaseApplication):
assert uuid not in self.storage_ready_dict, self.storage_ready_dict assert uuid not in self.storage_ready_dict, self.storage_ready_dict
self.storage_readiness = self.storage_ready_dict[uuid] = \ self.storage_readiness = self.storage_ready_dict[uuid] = \
self.storage_readiness + 1 self.storage_readiness + 1
pack = self.pm.getApprovedRejected()
if any(pack):
self.nm.getByUUID(uuid).send(Packets.NotifyPackSigned(*pack))
self.tm.executeQueuedEvents() self.tm.executeQueuedEvents()
def isStorageReady(self, uuid): def isStorageReady(self, uuid):
...@@ -629,3 +653,12 @@ class Application(BaseApplication): ...@@ -629,3 +653,12 @@ class Application(BaseApplication):
getByUUID = self.nm.getByUUID getByUUID = self.nm.getByUUID
for uuid in uuid_set: for uuid in uuid_set:
getByUUID(uuid).send(p) getByUUID(uuid).send(p)
def updateCompletedPackId(self):
try:
pack_id = min(node.completed_pack_id
for node in self.pt.getNodeSet(True)
if hasattr(node, "completed_pack_id"))
except ValueError:
return
self.pm.notifyCompleted(pack_id)
...@@ -75,6 +75,7 @@ class BackupApplication(object): ...@@ -75,6 +75,7 @@ class BackupApplication(object):
self.nm.createMasters(master_addresses) self.nm.createMasters(master_addresses)
em = property(lambda self: self.app.em) em = property(lambda self: self.app.em)
pm = property(lambda self: self.app.pm)
ssl = property(lambda self: self.app.ssl) ssl = property(lambda self: self.app.ssl)
def close(self): def close(self):
...@@ -117,8 +118,19 @@ class BackupApplication(object): ...@@ -117,8 +118,19 @@ class BackupApplication(object):
app.changeClusterState(ClusterStates.BACKINGUP) app.changeClusterState(ClusterStates.BACKINGUP)
del bootstrap, node del bootstrap, node
self.ignore_invalidations = True self.ignore_invalidations = True
self.ignore_pack_notifications = True
conn.setHandler(BackupHandler(self)) conn.setHandler(BackupHandler(self))
conn.ask(Packets.AskLastTransaction()) conn.ask(Packets.AskLastTransaction())
assert app.backup_tid == pt.getBackupTid()
min_tid = add64(app.backup_tid, 1)
p = app.pm.packs
for tid in sorted(p):
if min_tid <= tid:
break
if p[tid].approved is None:
min_tid = tid
break
conn.ask(Packets.AskPackOrders(min_tid), min_tid=min_tid)
# debug variable to log how big 'tid_list' can be. # debug variable to log how big 'tid_list' can be.
self.debug_tid_count = 0 self.debug_tid_count = 0
while True: while True:
...@@ -375,3 +387,12 @@ class BackupApplication(object): ...@@ -375,3 +387,12 @@ class BackupApplication(object):
if node_list: if node_list:
min(node_list, key=lambda node: node.getUUID()).send( min(node_list, key=lambda node: node.getUUID()).send(
Packets.NotifyUpstreamAdmin(addr)) Packets.NotifyUpstreamAdmin(addr))
def broadcastApprovedRejected(self, min_tid):
app = self.app
p = app.pm.getApprovedRejected(min_tid)
if any(p):
getByUUID = app.nm.getByUUID
p = Packets.NotifyPackSigned(*p)
for uuid in app.getStorageReadySet():
getByUUID(uuid).send(p)
...@@ -15,10 +15,11 @@ ...@@ -15,10 +15,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..app import monotonic_time from ..app import monotonic_time
from ..pack import RequestOld
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import StoppedOperation from neo.lib.exception import StoppedOperation
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import Packets from neo.lib.protocol import Packets, ZERO_TID
class MasterHandler(EventHandler): class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
...@@ -40,12 +41,21 @@ class MasterHandler(EventHandler): ...@@ -40,12 +41,21 @@ class MasterHandler(EventHandler):
def askLastIDs(self, conn): def askLastIDs(self, conn):
tm = self.app.tm tm = self.app.tm
conn.answer(Packets.AnswerLastIDs(tm.getLastOID(), tm.getLastTID())) conn.answer(Packets.AnswerLastIDs(tm.getLastTID(), tm.getLastOID()))
def askLastTransaction(self, conn): def askLastTransaction(self, conn):
conn.answer(Packets.AnswerLastTransaction( conn.answer(Packets.AnswerLastTransaction(
self.app.getLastTransaction())) self.app.getLastTransaction()))
def _askPackOrders(self, conn, pack_id, only_first_approved):
app = self.app
if pack_id is not None is not app.pm.max_completed >= pack_id:
RequestOld(app, pack_id, only_first_approved,
conn.delayedAnswer(Packets.AnswerPackOrders))
else:
conn.answer(Packets.AnswerPackOrders(
app.pm.dump(pack_id or ZERO_TID, only_first_approved)))
def _notifyNodeInformation(self, conn): def _notifyNodeInformation(self, conn):
app = self.app app = self.app
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
......
...@@ -25,7 +25,7 @@ from neo.lib.handler import AnswerDenied ...@@ -25,7 +25,7 @@ from neo.lib.handler import AnswerDenied
from neo.lib.pt import PartitionTableException from neo.lib.pt import PartitionTableException
from neo.lib.protocol import ClusterStates, Errors, \ from neo.lib.protocol import ClusterStates, Errors, \
NodeStates, NodeTypes, Packets, uuid_str NodeStates, NodeTypes, Packets, uuid_str
from neo.lib.util import dump from neo.lib.util import add64, dump
CLUSTER_STATE_WORKFLOW = { CLUSTER_STATE_WORKFLOW = {
# destination: sources # destination: sources
...@@ -234,6 +234,15 @@ class AdministrationHandler(MasterHandler): ...@@ -234,6 +234,15 @@ class AdministrationHandler(MasterHandler):
@check_state(ClusterStates.RUNNING) @check_state(ClusterStates.RUNNING)
def truncate(self, conn, tid): def truncate(self, conn, tid):
app = self.app
if app.getLastTransaction() <= tid:
raise AnswerDenied("Truncating after last transaction does nothing")
if app.pm.getApprovedRejected(add64(tid, 1))[0]:
# TODO: The protocol must be extended to support safe cases
# (e.g. no started pack whose id is after truncation tid).
# The user may also accept having a truncated DB with missing
# records (i.e. have an option to force that).
raise AnswerDenied("Can not truncate before an approved pack")
conn.answer(Errors.Ack('')) conn.answer(Errors.Ack(''))
raise StoppedOperation(tid) raise StoppedOperation(tid)
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import NodeTypes, NodeStates, Packets, ZERO_TID from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
class BackupHandler(EventHandler): class BackupHandler(EventHandler):
...@@ -72,3 +72,45 @@ class BackupHandler(EventHandler): ...@@ -72,3 +72,45 @@ class BackupHandler(EventHandler):
partition_set.add(getPartition(tid)) partition_set.add(getPartition(tid))
prev_tid = app.app.getLastTransaction() prev_tid = app.app.getLastTransaction()
app.invalidatePartitions(tid, prev_tid, partition_set) app.invalidatePartitions(tid, prev_tid, partition_set)
# The following 2 methods:
# - keep the PackManager up-to-date;
# - replicate the status of pack orders when they're known after the
# storage nodes have fetched related transactions.
def notifyPackSigned(self, conn, approved, rejected):
backup_app = self.app
if backup_app.ignore_pack_notifications:
return
app = backup_app.app
packs = app.pm.packs
ask_tid = min_tid = None
for approved, tid in (True, approved), (False, rejected):
for tid in tid:
try:
packs[tid].approved = approved
except KeyError:
if not ask_tid or tid < ask_tid:
ask_tid = tid
else:
if not min_tid or tid < min_tid:
min_tid = tid
if ask_tid:
if min_tid is None:
min_tid = ask_tid
else:
assert min_tid < ask_tid, (min_tid, ask_tid)
conn.ask(Packets.AskPackOrders(ask_tid), min_tid=min_tid)
elif min_tid:
backup_app.broadcastApprovedRejected(min_tid)
def answerPackOrders(self, conn, pack_list, min_tid):
backup_app = self.app
app = backup_app.app
add = app.pm.add
for pack_order in pack_list:
add(*pack_order)
backup_app.broadcastApprovedRejected(min_tid)
backup_app.ignore_pack_notifications = False
###
...@@ -15,7 +15,8 @@ ...@@ -15,7 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.handler import DelayEvent from neo.lib.handler import DelayEvent
from neo.lib.protocol import Packets, ProtocolError, MAX_TID, Errors from neo.lib.exception import ProtocolError
from neo.lib.protocol import Packets, MAX_TID, Errors
from ..app import monotonic_time from ..app import monotonic_time
from . import MasterHandler from . import MasterHandler
...@@ -31,6 +32,7 @@ class ClientServiceHandler(MasterHandler): ...@@ -31,6 +32,7 @@ class ClientServiceHandler(MasterHandler):
app = self.app app = self.app
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
assert node is not None, conn assert node is not None, conn
app.pm.clientLost(conn)
for x in app.tm.clientLost(node): for x in app.tm.clientLost(node):
app.notifyTransactionAborted(*x) app.notifyTransactionAborted(*x)
node.setUnknown() node.setUnknown()
...@@ -62,7 +64,7 @@ class ClientServiceHandler(MasterHandler): ...@@ -62,7 +64,7 @@ class ClientServiceHandler(MasterHandler):
conn.answer((Errors.Ack if app.tm.vote(app, *args) else conn.answer((Errors.Ack if app.tm.vote(app, *args) else
Errors.IncompleteTransaction)()) Errors.IncompleteTransaction)())
def askFinishTransaction(self, conn, ttid, oid_list, checked_list): def askFinishTransaction(self, conn, ttid, oid_list, checked_list, pack):
app = self.app app = self.app
tid, node_list = app.tm.prepare( tid, node_list = app.tm.prepare(
app, app,
...@@ -72,7 +74,8 @@ class ClientServiceHandler(MasterHandler): ...@@ -72,7 +74,8 @@ class ClientServiceHandler(MasterHandler):
conn.getPeerId(), conn.getPeerId(),
) )
if tid: if tid:
p = Packets.AskLockInformation(ttid, tid) p = Packets.AskLockInformation(ttid, tid,
app.pm.new(tid, *pack) if pack else False)
for node in node_list: for node in node_list:
node.ask(p) node.ask(p)
else: else:
...@@ -99,18 +102,6 @@ class ClientServiceHandler(MasterHandler): ...@@ -99,18 +102,6 @@ class ClientServiceHandler(MasterHandler):
tid = MAX_TID tid = MAX_TID
conn.answer(Packets.AnswerFinalTID(tid)) conn.answer(Packets.AnswerFinalTID(tid))
def askPack(self, conn, tid):
app = self.app
if app.packing is None:
storage_list = app.nm.getStorageList(only_identified=True)
app.packing = (conn, conn.getPeerId(),
{x.getUUID() for x in storage_list})
p = Packets.AskPack(tid)
for storage in storage_list:
storage.getConnection().ask(p)
else:
conn.answer(Packets.AnswerPack(False))
def abortTransaction(self, conn, tid, uuid_list): def abortTransaction(self, conn, tid, uuid_list):
# Consider a failure when the connection between the storage and the # Consider a failure when the connection between the storage and the
# client breaks while the answer to the first write is sent back. # client breaks while the answer to the first write is sent back.
...@@ -125,6 +116,16 @@ class ClientServiceHandler(MasterHandler): ...@@ -125,6 +116,16 @@ class ClientServiceHandler(MasterHandler):
involved.update(uuid_list) involved.update(uuid_list)
app.notifyTransactionAborted(tid, involved) app.notifyTransactionAborted(tid, involved)
def askPackOrders(self, conn, pack_id):
return self._askPackOrders(conn, pack_id, False)
def waitForPack(self, conn, tid):
try:
pack = self.app.pm.packs[tid]
except KeyError:
conn.answer(Packets.WaitedForPack())
else:
pack.waitForPack(conn.delayedAnswer(Packets.WaitedForPack))
# like ClientServiceHandler but read-only & only for tid <= backup_tid # like ClientServiceHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyServiceHandler(ClientServiceHandler): class ClientReadOnlyServiceHandler(ClientServiceHandler):
......
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import PrimaryElected from neo.lib.exception import NotReadyError, PrimaryElected, ProtocolError
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \ from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \
NodeTypes, NotReadyError, Packets, ProtocolError, uuid_str NodeTypes, Packets, uuid_str
from ..app import monotonic_time from ..app import monotonic_time
class IdentificationHandler(EventHandler): class IdentificationHandler(EventHandler):
......
...@@ -15,13 +15,14 @@ ...@@ -15,13 +15,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import (CellStates, ClusterStates, Packets, ProtocolError, from neo.lib.exception import ProtocolError, StoppedOperation
uuid_str) from neo.lib.protocol import CellStates, ClusterStates, Packets, uuid_str
from neo.lib.exception import StoppedOperation
from neo.lib.pt import PartitionTableException from neo.lib.pt import PartitionTableException
from neo.lib.util import dump from neo.lib.util import dump
from . import BaseServiceHandler from . import BaseServiceHandler
EXPERIMENTAL_CORRUPTED_STATE = False
class StorageServiceHandler(BaseServiceHandler): class StorageServiceHandler(BaseServiceHandler):
""" Handler dedicated to storages during service state """ """ Handler dedicated to storages during service state """
...@@ -44,14 +45,14 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -44,14 +45,14 @@ class StorageServiceHandler(BaseServiceHandler):
super(StorageServiceHandler, self).connectionLost(conn, new_state) super(StorageServiceHandler, self).connectionLost(conn, new_state)
app.setStorageNotReady(uuid) app.setStorageNotReady(uuid)
app.tm.storageLost(uuid) app.tm.storageLost(uuid)
app.pm.connectionLost(conn)
app.updateCompletedPackId()
if (app.getClusterState() == ClusterStates.BACKINGUP if (app.getClusterState() == ClusterStates.BACKINGUP
# Also check if we're exiting, because backup_app is not usable # Also check if we're exiting, because backup_app is not usable
# in this case. Maybe cluster state should be set to something # in this case. Maybe cluster state should be set to something
# else, like STOPPING, during cleanup (__del__/close). # else, like STOPPING, during cleanup (__del__/close).
and app.listening_conn): and app.listening_conn):
app.backup_app.nodeLost(node) app.backup_app.nodeLost(node)
if app.packing is not None:
self.answerPack(conn, False)
def askUnfinishedTransactions(self, conn, offset_list): def askUnfinishedTransactions(self, conn, offset_list):
app = self.app app = self.app
...@@ -77,6 +78,10 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -77,6 +78,10 @@ class StorageServiceHandler(BaseServiceHandler):
app.tm.lock(ttid, conn.getUUID()) app.tm.lock(ttid, conn.getUUID())
def notifyPartitionCorrupted(self, conn, partition, cell_list): def notifyPartitionCorrupted(self, conn, partition, cell_list):
if not EXPERIMENTAL_CORRUPTED_STATE:
logging.error("Partition %s corrupted in: %s",
partition, ', '.join(map(uuid_str, cell_list)))
return
change_list = [] change_list = []
for cell in self.app.pt.getCellList(partition): for cell in self.app.pt.getCellList(partition):
if cell.getUUID() in cell_list: if cell.getUUID() in cell_list:
...@@ -109,13 +114,13 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -109,13 +114,13 @@ class StorageServiceHandler(BaseServiceHandler):
uuid_str(uuid), offset, dump(tid)) uuid_str(uuid), offset, dump(tid))
self.app.broadcastPartitionChanges(cell_list) self.app.broadcastPartitionChanges(cell_list)
def answerPack(self, conn, status): def notifyPackCompleted(self, conn, pack_id):
app = self.app app = self.app
if app.packing is not None: app.nm.getByUUID(conn.getUUID()).completed_pack_id = pack_id
client, msg_id, uid_set = app.packing app.updateCompletedPackId()
uid_set.remove(conn.getUUID())
if not uid_set: def askPackOrders(self, conn, pack_id):
app.packing = None return self._askPackOrders(conn, pack_id, True)
if not client.isClosed():
client.send(Packets.AnswerPack(True), msg_id)
def answerPackOrders(self, conn, pack_list, process):
process(pack_list)
#
# Copyright (C) 2021 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# IDEA: Keep minimal information to avoid useless memory usage, e.g. with
# arbitrary data large like a list of OIDs. Only {tid: id} is important:
# everything could be queried from storage nodes when needed. Note
# however that extra information allows the master to automatically drop
# redundant pack orders: keeping partial/time may be an acceptable cost.
from collections import defaultdict
from functools import partial
from operator import attrgetter
from weakref import proxy
from neo.lib.protocol import Packets, ZERO_TID
from neo.lib.util import add64
class Pack(object):
def __init__(self, tid, approved, partial, oids, time):
self.tid = tid
self.approved = approved
self.partial = partial
self.oids = oids
self.time = time
self._waiting = []
@property
def waitForPack(self):
return self._waiting.append
def completed(self):
for callback in self._waiting:
callback()
del self._waiting
def connectionLost(self, conn):
try:
self._waiting.remove(conn)
except ValueError:
pass
class RequestOld(object):
caller = None
def __init__(self, app, pack_id, only_first_approved, caller):
self.app = proxy(app)
self.caller = caller
self.pack_id = pack_id
self.only_first_approved = only_first_approved
self.offsets = set(xrange(app.pt.getPartitions()))
self.packs = []
# In case that the PT changes, we may ask a node again before it
# replies to previous requests, so we can't simply use its id as key.
self.querying = set()
app.pm.old.append(self)
self._ask()
def connectionLost(self, conn):
if self.caller != conn:
nid = conn.getUUID()
x = [x for x in self.querying if x[0] == nid]
if x:
self.querying.difference_update(x)
self._ask()
return True
self.__dict__.clear()
def _ask(self):
getCellList = self.app.pt.getCellList
readable = defaultdict(list)
for offset in self.offsets:
for cell in getCellList(offset, True):
readable[cell.getUUID()].append(offset)
offsets = self.offsets.copy()
for x in self.querying:
offsets.difference_update(x[1])
p = Packets.AskPackOrders(self.pack_id)
while offsets:
node = getCellList(offsets.pop(), True)[0].getNode()
nid = node.getUUID()
x = tuple(readable.pop(nid))
offsets.difference_update(x)
x = nid, x
self.querying.add(x)
node.ask(p, process=partial(self._answer, x))
def _answer(self, nid_offsets, pack_list):
caller = self.caller
if caller:
self.querying.remove(nid_offsets)
self.offsets.difference_update(nid_offsets[1])
self.packs += pack_list
if self.offsets:
self._ask()
else:
del self.caller
app = self.app
pm = app.pm
tid = self.pack_id
pm.max_completed = add64(tid, -1)
for pack_order in self.packs:
pm.add(*pack_order)
caller(pm.dump(tid, self.only_first_approved))
app.updateCompletedPackId()
class PackManager(object):
autosign = True
def __init__(self):
self.max_completed = None
self.packs = {}
self.old = []
reset = __init__
def add(self, tid, *args):
p = self.packs.get(tid)
if p is None:
self.packs[tid] = Pack(tid, *args)
if None is not self.max_completed > tid:
self.max_completed = add64(tid, -1)
elif p.approved is None:
p.approved = args[0]
@apply
def dump():
by_tid = attrgetter('tid')
def dump(self, pack_id, only_first_approved):
if only_first_approved:
try:
p = min((p for p in self.packs.itervalues()
if p.approved and p.tid >= pack_id),
key=by_tid),
except ValueError:
p = ()
else:
p = sorted(
(p for p in self.packs.itervalues() if p.tid >= pack_id),
key=by_tid)
return [(p.tid, p.approved, p.partial, p.oids, p.time) for p in p]
return dump
def new(self, tid, oids, time):
autosign = self.autosign and None not in (
p.approved for p in self.packs.itervalues())
self.packs[tid] = Pack(tid, autosign or None, bool(oids), oids, time)
return autosign
def getApprovedRejected(self, min_tid=ZERO_TID):
r = [], []
tid = self.max_completed
if tid and min_tid <= tid:
r[0].append(tid)
for tid, p in self.packs.iteritems():
if min_tid <= tid:
approved = p.approved
if approved is not None:
r[0 if approved else 1].append(tid)
return r
def notifyCompleted(self, pack_id):
for tid in list(self.packs):
if tid <= pack_id:
self.packs.pop(tid).completed()
if self.max_completed is None or self.max_completed < tid:
self.max_completed = tid
def clientLost(self, conn):
for p in self.packs.itervalues():
p.connectionLost(conn)
self.connectionLost(conn)
def connectionLost(self, conn):
self.old = [old for old in self.old if old.connectionLost(conn)]
...@@ -16,7 +16,8 @@ ...@@ -16,7 +16,8 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.connection import ClientConnection from neo.lib.connection import ClientConnection
from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates from neo.lib.exception import ProtocolError
from neo.lib.protocol import Packets, ClusterStates, NodeStates
from .app import monotonic_time from .app import monotonic_time
from .handlers import MasterHandler from .handlers import MasterHandler
......
...@@ -18,8 +18,9 @@ from collections import deque ...@@ -18,8 +18,9 @@ from collections import deque
from time import time from time import time
from struct import pack, unpack from struct import pack, unpack
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import ProtocolError
from neo.lib.handler import DelayEvent, EventQueue from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_OID, ZERO_TID from neo.lib.protocol import uuid_str, ZERO_OID, ZERO_TID
from neo.lib.util import dump, u64, addTID, tidFromTime from neo.lib.util import dump, u64, addTID, tidFromTime
class Transaction(object): class Transaction(object):
......
...@@ -16,7 +16,8 @@ ...@@ -16,7 +16,8 @@
from collections import defaultdict from collections import defaultdict
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import ClusterStates, Packets, NodeStates from neo.lib.protocol import ClusterStates, Packets, NodeStates, ZERO_TID
from neo.lib.util import add64
from .handlers import BaseServiceHandler from .handlers import BaseServiceHandler
...@@ -70,6 +71,15 @@ class VerificationManager(BaseServiceHandler): ...@@ -70,6 +71,15 @@ class VerificationManager(BaseServiceHandler):
app.setLastTransaction(app.tm.getLastTID()) app.setLastTransaction(app.tm.getLastTID())
# Just to not return meaningless information in AnswerRecovery. # Just to not return meaningless information in AnswerRecovery.
app.truncate_tid = None app.truncate_tid = None
# Set up pack manager.
node_set = app.pt.getNodeSet(readable=True)
try:
pack_id = add64(min(node.completed_pack_id
for node in node_set
if hasattr(node, "completed_pack_id")), 1)
except ValueError:
pack_id = ZERO_TID
self._askStorageNodesAndWait(Packets.AskPackOrders(pack_id), node_set)
def verifyData(self): def verifyData(self):
app = self.app app = self.app
...@@ -126,11 +136,20 @@ class VerificationManager(BaseServiceHandler): ...@@ -126,11 +136,20 @@ class VerificationManager(BaseServiceHandler):
for node in getIdentifiedList(pool_set=uuid_set): for node in getIdentifiedList(pool_set=uuid_set):
node.send(packet) node.send(packet)
def answerLastIDs(self, conn, loid, ltid): def notifyPackCompleted(self, conn, pack_id):
self.app.nm.getByUUID(conn.getUUID()).completed_pack_id = pack_id
def answerLastIDs(self, conn, ltid, loid):
self._uuid_set.remove(conn.getUUID()) self._uuid_set.remove(conn.getUUID())
tm = self.app.tm tm = self.app.tm
tm.setLastOID(loid)
tm.setLastTID(ltid) tm.setLastTID(ltid)
tm.setLastOID(loid)
def answerPackOrders(self, conn, pack_list):
self._uuid_set.remove(conn.getUUID())
add = self.app.pm.add
for pack_order in pack_list:
add(*pack_order)
def answerLockedTransactions(self, conn, tid_dict): def answerLockedTransactions(self, conn, tid_dict):
uuid = conn.getUUID() uuid = conn.getUUID()
......
...@@ -103,7 +103,7 @@ class TerminalNeoCTL(object): ...@@ -103,7 +103,7 @@ class TerminalNeoCTL(object):
r = "backup_tid = 0x%x (%s)" % (u64(backup_tid), r = "backup_tid = 0x%x (%s)" % (u64(backup_tid),
datetimeFromTID(backup_tid)) datetimeFromTID(backup_tid))
else: else:
loid, ltid = self.neoctl.getLastIds() ltid, loid = self.neoctl.getLastIds()
r = "last_oid = 0x%x" % (u64(loid)) r = "last_oid = 0x%x" % (u64(loid))
return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %s" % \ return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %s" % \
(u64(ltid), datetimeFromTID(ltid), ptid) (u64(ltid), datetimeFromTID(ltid), ptid)
...@@ -276,11 +276,17 @@ class TerminalNeoCTL(object): ...@@ -276,11 +276,17 @@ class TerminalNeoCTL(object):
def checkReplicas(self, params): def checkReplicas(self, params):
""" """
Test whether partitions have corrupted metadata Test whether partitions have corrupted metadata by comparing replicas
Any corrupted cell is put in CORRUPTED state, possibly make the Any corrupted cell is put in CORRUPTED state, possibly make the
cluster non operational. cluster non operational.
EXPERIMENTAL - This operation is not aware that differences happen
during pack operations and you could easily break
your database. Since there's anyway no mechanism to
repair cells, the primary master only logs possible
corruption rather than mark cells as CORRUPTED.
Parameters: [partition]:[reference] ... [min_tid [max_tid]] Parameters: [partition]:[reference] ... [min_tid [max_tid]]
reference: node id of a storage with known good data reference: node id of a storage with known good data
If not given, and if the cluster is in backup mode, an upstream If not given, and if the cluster is in backup mode, an upstream
......
...@@ -162,13 +162,14 @@ class Log(object): ...@@ -162,13 +162,14 @@ class Log(object):
self._protocol_date = date self._protocol_date = date
g = {} g = {}
exec bz2.decompress(text) in g exec bz2.decompress(text) in g
for x in 'uuid_str', 'Packets', 'PacketMalformedError': for x in 'uuid_str', 'Packets':
setattr(self, x, g[x]) setattr(self, x, g[x])
x = {} x = {}
try: try:
Unpacker = g['Unpacker'] Unpacker = g['Unpacker']
except KeyError: except KeyError:
unpackb = None unpackb = None
self.PacketMalformedError = g['PacketMalformedError']
else: else:
from msgpack import ExtraData, UnpackException from msgpack import ExtraData, UnpackException
def unpackb(data): def unpackb(data):
......
...@@ -48,16 +48,13 @@ UNIT_TEST_MODULES = [ ...@@ -48,16 +48,13 @@ UNIT_TEST_MODULES = [
'neo.tests.testUtil', 'neo.tests.testUtil',
'neo.tests.testPT', 'neo.tests.testPT',
# master application # master application
'neo.tests.master.testClientHandler',
'neo.tests.master.testMasterApp', 'neo.tests.master.testMasterApp',
'neo.tests.master.testMasterPT', 'neo.tests.master.testMasterPT',
'neo.tests.master.testStorageHandler',
'neo.tests.master.testTransactions', 'neo.tests.master.testTransactions',
# storage application # storage application
'neo.tests.storage.testClientHandler', 'neo.tests.storage.testClientHandler',
'neo.tests.storage.testMasterHandler', 'neo.tests.storage.testMasterHandler',
'neo.tests.storage.testStorage' + os.getenv('NEO_TESTS_ADAPTER', 'SQLite'), 'neo.tests.storage.testStorage' + os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
'neo.tests.storage.testTransactions',
# client application # client application
'neo.tests.client.testClientApp', 'neo.tests.client.testClientApp',
'neo.tests.client.testMasterHandler', 'neo.tests.client.testMasterHandler',
...@@ -66,6 +63,7 @@ UNIT_TEST_MODULES = [ ...@@ -66,6 +63,7 @@ UNIT_TEST_MODULES = [
'neo.tests.threaded.test', 'neo.tests.threaded.test',
'neo.tests.threaded.testConfig', 'neo.tests.threaded.testConfig',
'neo.tests.threaded.testImporter', 'neo.tests.threaded.testImporter',
'neo.tests.threaded.testPack',
'neo.tests.threaded.testReplication', 'neo.tests.threaded.testReplication',
'neo.tests.threaded.testSSL', 'neo.tests.threaded.testSSL',
] ]
......
...@@ -19,11 +19,12 @@ from collections import deque ...@@ -19,11 +19,12 @@ from collections import deque
from neo.lib import logging from neo.lib import logging
from neo.lib.app import BaseApplication, buildOptionParser from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, Packets from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, Packets, \
ZERO_TID
from neo.lib.connection import ListeningConnection from neo.lib.connection import ListeningConnection
from neo.lib.exception import StoppedOperation, PrimaryFailure from neo.lib.exception import StoppedOperation, PrimaryFailure
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
from neo.lib.util import dump from neo.lib.util import add64, dump
from neo.lib.bootstrap import BootstrapManager from neo.lib.bootstrap import BootstrapManager
from .checker import Checker from .checker import Checker
from .database import buildDatabaseManager, DATABASE_MANAGERS from .database import buildDatabaseManager, DATABASE_MANAGERS
...@@ -59,6 +60,8 @@ class Application(BaseApplication): ...@@ -59,6 +60,8 @@ class Application(BaseApplication):
_.float('w', 'wait', _.float('w', 'wait',
help="seconds to wait for backend to be available," help="seconds to wait for backend to be available,"
" before erroring-out (-1 = infinite)") " before erroring-out (-1 = infinite)")
_.bool('disable-pack',
help="do not process any pack order")
_.bool('disable-drop-partitions', _.bool('disable-drop-partitions',
help="do not delete data of discarded cells, which is useful for" help="do not delete data of discarded cells, which is useful for"
" big databases because the current implementation is" " big databases because the current implementation is"
...@@ -98,6 +101,7 @@ class Application(BaseApplication): ...@@ -98,6 +101,7 @@ class Application(BaseApplication):
) )
self.disable_drop_partitions = config.get('disable_drop_partitions', self.disable_drop_partitions = config.get('disable_drop_partitions',
False) False)
self.disable_pack = config.get('disable_pack', False)
self.nm.createMasters(config['masters']) self.nm.createMasters(config['masters'])
# set the bind address # set the bind address
...@@ -132,6 +136,7 @@ class Application(BaseApplication): ...@@ -132,6 +136,7 @@ class Application(BaseApplication):
logging.node(self.name, self.uuid) logging.node(self.name, self.uuid)
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
self.dm.lock.release()
def close(self): def close(self):
self.listening_conn = None self.listening_conn = None
...@@ -190,6 +195,7 @@ class Application(BaseApplication): ...@@ -190,6 +195,7 @@ class Application(BaseApplication):
def run(self): def run(self):
try: try:
with self.dm.lock:
self._run() self._run()
except Exception: except Exception:
logging.exception('Pre-mortem data:') logging.exception('Pre-mortem data:')
...@@ -216,6 +222,7 @@ class Application(BaseApplication): ...@@ -216,6 +222,7 @@ class Application(BaseApplication):
if self.master_node is None: if self.master_node is None:
# look for the primary master # look for the primary master
self.connectToPrimary() self.connectToPrimary()
self.completed_pack_id = self.last_pack_id = ZERO_TID
self.checker = Checker(self) self.checker = Checker(self)
self.replicator = Replicator(self) self.replicator = Replicator(self)
self.tm = TransactionManager(self) self.tm = TransactionManager(self)
...@@ -281,16 +288,23 @@ class Application(BaseApplication): ...@@ -281,16 +288,23 @@ class Application(BaseApplication):
self.task_queue = task_queue = deque() self.task_queue = task_queue = deque()
try: try:
self.dm.doOperation(self) with self.dm.operational(self):
with self.dm.lock:
self.maybePack()
while True:
if task_queue and isIdle():
with self.dm.lock:
while True: while True:
while task_queue:
try: try:
while isIdle():
next(task_queue[-1]) or task_queue.rotate() next(task_queue[-1]) or task_queue.rotate()
_poll(0)
break
except StopIteration: except StopIteration:
task_queue.pop() task_queue.pop()
if not task_queue:
break
else:
_poll(0)
if not isIdle():
break
poll() poll()
finally: finally:
del self.task_queue del self.task_queue
...@@ -320,3 +334,50 @@ class Application(BaseApplication): ...@@ -320,3 +334,50 @@ class Application(BaseApplication):
self.dm.erase() self.dm.erase()
logging.info("Application has been asked to shut down") logging.info("Application has been asked to shut down")
sys.exit() sys.exit()
def notifyPackCompleted(self):
if self.disable_pack:
pack_id = self.last_pack_id
else:
packed = self.dm.getPackedIDs()
if not packed:
return
pack_id = min(packed.itervalues())
if self.completed_pack_id != pack_id:
self.completed_pack_id = pack_id
self.master_conn.send(Packets.NotifyPackCompleted(pack_id))
def maybePack(self, info=None, min_id=None):
ready = self.dm.isReadyToStartPack()
if ready:
packed_dict = self.dm.getPackedIDs(True)
if packed_dict:
packed = min(packed_dict.itervalues())
if packed < self.last_pack_id:
if packed == ready[1]:
# Last completed pack for this storage node hasn't
# changed since the last call to dm.pack() so simply
# resume. No info needed.
pack_id = ready[0]
assert not info, (ready, info, min_id)
elif packed == min_id:
# New pack order to process and we've just received
# all needed information to start right now.
pack_id = info[0]
else:
# Time to process the next approved pack after 'packed'.
# We don't even know its id. Ask the master more
# information.
self.master_conn.ask(
Packets.AskPackOrders(add64(packed, 1)),
pack_id=packed)
return
self.dm.pack(self, info, packed,
self.replicator.filterPackable(pack_id,
(k for k, v in packed_dict.iteritems()
if v == packed)))
else:
# All approved pack orders are processed.
self.dm.pack(self, None, None, ()) # for cleanup
else:
assert not self.pt.getReadableOffsetList(self.uuid)
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
LOG_QUERIES = False LOG_QUERIES = False
def useMySQLdb(): def useMySQLdb():
...@@ -65,5 +67,25 @@ DATABASE_MANAGERS = tuple(sorted( ...@@ -65,5 +67,25 @@ DATABASE_MANAGERS = tuple(sorted(
def buildDatabaseManager(name, args=(), kw={}): def buildDatabaseManager(name, args=(), kw={}):
return getAdapterKlass(name)(*args, **kw) return getAdapterKlass(name)(*args, **kw)
class DatabaseFailure(Exception): class DatabaseFailure(Exception):
transient_failure = False
if __debug__:
def getFailingDatabaseManager(self):
pass pass
def logTransientFailure(self):
raise NotImplementedError
def checkTransientFailure(self, dm):
if dm.LOCK or not self.transient_failure:
raise
assert dm is self.getFailingDatabaseManager()
dm.close()
self.logTransientFailure()
# Avoid reconnecting too often.
# Since this is used when wrapping an arbitrary long process and
# not just a single query, we can't limit the number of retries.
time.sleep(5)
...@@ -18,6 +18,7 @@ import os ...@@ -18,6 +18,7 @@ import os
import pickle, sys, time import pickle, sys, time
from bisect import bisect, insort from bisect import bisect, insort
from collections import deque from collections import deque
from contextlib import contextmanager
from cStringIO import StringIO from cStringIO import StringIO
from ConfigParser import SafeConfigParser from ConfigParser import SafeConfigParser
from ZConfig import loadConfigFile from ZConfig import loadConfigFile
...@@ -31,8 +32,9 @@ from ..app import option_defaults ...@@ -31,8 +32,9 @@ from ..app import option_defaults
from . import buildDatabaseManager, DatabaseFailure from . import buildDatabaseManager, DatabaseFailure
from .manager import DatabaseManager, Fallback from .manager import DatabaseManager, Fallback
from neo.lib import compress, logging, patch, util from neo.lib import compress, logging, patch, util
from neo.lib.exception import BackendNotImplemented
from neo.lib.interfaces import implements from neo.lib.interfaces import implements
from neo.lib.protocol import BackendNotImplemented, MAX_TID from neo.lib.protocol import MAX_TID
patch.speedupFileStorageTxnLookup() patch.speedupFileStorageTxnLookup()
...@@ -369,13 +371,14 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -369,13 +371,14 @@ class ImporterDatabaseManager(DatabaseManager):
"""Proxy that transparently imports data from a ZODB storage """Proxy that transparently imports data from a ZODB storage
""" """
_writeback = None _writeback = None
_last_commit = 0
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw) super(ImporterDatabaseManager, self).__init__(
implements(self, """_getNextTID checkSerialRange checkTIDRange background_worker_class=lambda: None,
deleteObject deleteTransaction dropPartitions _getLastTID *args, **kw)
getReplicationObjectList _getTIDList nonempty""".split()) implements(self, """_getNextTID checkSerialRange checkTIDRange _pack
deleteObject deleteTransaction _dropPartition _getLastTID nonempty
getReplicationObjectList _getTIDList _setPartitionPacked""".split())
_getPartition = property(lambda self: self.db._getPartition) _getPartition = property(lambda self: self.db._getPartition)
_getReadablePartition = property(lambda self: self.db._getReadablePartition) _getReadablePartition = property(lambda self: self.db._getReadablePartition)
...@@ -408,7 +411,9 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -408,7 +411,9 @@ class ImporterDatabaseManager(DatabaseManager):
updateCellTID getUnfinishedTIDDict dropUnfinishedData updateCellTID getUnfinishedTIDDict dropUnfinishedData
abortTransaction storeTransaction lockTransaction abortTransaction storeTransaction lockTransaction
loadData storeData getOrphanList _pruneData deferCommit loadData storeData getOrphanList _pruneData deferCommit
_getDevPath dropPartitionsTemporary _getDevPath dropPartitionsTemporary lock
getPackedIDs _getPartitionPacked
_getPackOrders storePackOrder signPackOrders
""".split(): """.split():
setattr(self, x, getattr(db, x)) setattr(self, x, getattr(db, x))
if self._writeback: if self._writeback:
...@@ -416,7 +421,6 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -416,7 +421,6 @@ class ImporterDatabaseManager(DatabaseManager):
db_commit = db.commit db_commit = db.commit
def commit(): def commit():
db_commit() db_commit()
self._last_commit = time.time()
if self._writeback: if self._writeback:
self._writeback.committed() self._writeback.committed()
self.commit = db.commit = commit self.commit = db.commit = commit
...@@ -476,9 +480,11 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -476,9 +480,11 @@ class ImporterDatabaseManager(DatabaseManager):
else: else:
self._import = self._import() self._import = self._import()
def doOperation(self, app): @contextmanager
def operational(self, app):
if self._import: if self._import:
app.newTask(self._import) app.newTask(self._import)
yield
def _import(self): def _import(self):
p64 = util.p64 p64 = util.p64
...@@ -505,9 +511,9 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -505,9 +511,9 @@ class ImporterDatabaseManager(DatabaseManager):
break break
if len(txn) == 3: if len(txn) == 3:
oid, data_id, data_tid = txn oid, data_id, data_tid = txn
if data_id is not None: checksum, data, compression = data_id or (None, None, 0)
checksum, data, compression = data_id data_id = self.holdData(
data_id = self.holdData(checksum, oid, data, compression) checksum, oid, data, compression, data_tid)
data_id_list.append(data_id) data_id_list.append(data_id)
object_list.append((oid, data_id, data_tid)) object_list.append((oid, data_id, data_tid))
# Give the main loop the opportunity to process requests # Give the main loop the opportunity to process requests
...@@ -518,7 +524,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -518,7 +524,7 @@ class ImporterDatabaseManager(DatabaseManager):
# solved when resuming the migration. # solved when resuming the migration.
# XXX: The leak was solved by the deduplication, # XXX: The leak was solved by the deduplication,
# but it was disabled by default. # but it was disabled by default.
else: else: # len(txn) == 5
tid = txn[-1] tid = txn[-1]
self.storeTransaction(tid, object_list, self.storeTransaction(tid, object_list,
((x[0] for x in object_list),) + txn, ((x[0] for x in object_list),) + txn,
...@@ -541,7 +547,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -541,7 +547,7 @@ class ImporterDatabaseManager(DatabaseManager):
" your configuration to use the native backend and restart.") " your configuration to use the native backend and restart.")
self._import = None self._import = None
for x in """getObject getReplicationTIDList getReplicationObjectList for x in """getObject getReplicationTIDList getReplicationObjectList
_fetchObject _getDataTID getLastObjectTID _fetchObject _getObjectHistoryForUndo getLastObjectTID
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
for zodb in self.zodb: for zodb in self.zodb:
...@@ -728,13 +734,15 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -728,13 +734,15 @@ class ImporterDatabaseManager(DatabaseManager):
raise AssertionError raise AssertionError
getLastObjectTID = Fallback.getLastObjectTID.__func__ getLastObjectTID = Fallback.getLastObjectTID.__func__
_getDataTID = Fallback._getDataTID.__func__
def getObjectHistory(self, *args, **kw): def _getObjectHistoryForUndo(self, *args, **kw):
raise BackendNotImplemented(self.getObjectHistory) raise BackendNotImplemented(self._getObjectHistoryForUndo)
def getObjectHistoryWithLength(self, *args, **kw):
raise BackendNotImplemented(self.getObjectHistoryWithLength)
def pack(self, *args, **kw): def isReadyToStartPack(self):
raise BackendNotImplemented(self.pack) pass # disable pack
class WriteBack(object): class WriteBack(object):
...@@ -843,7 +851,7 @@ class WriteBack(object): ...@@ -843,7 +851,7 @@ class WriteBack(object):
class TransactionRecord(BaseStorage.TransactionRecord): class TransactionRecord(BaseStorage.TransactionRecord):
def __init__(self, db, tid): def __init__(self, db, tid):
self._oid_list, user, desc, ext, _, _ = db.getTransaction(tid) self._oid_list, user, desc, ext, _, _, _ = db.getTransaction(tid)
super(TransactionRecord, self).__init__(tid, ' ', user, desc, super(TransactionRecord, self).__init__(tid, ' ', user, desc,
loads(ext) if ext else {}) loads(ext) if ext else {})
self._db = db self._db = db
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -14,12 +14,19 @@ ...@@ -14,12 +14,19 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import weakref from functools import partial
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.exception import PrimaryFailure, StoppedOperation from neo.lib.exception import PrimaryFailure, ProtocolError, StoppedOperation
from neo.lib.protocol import (uuid_str, from neo.lib.protocol import uuid_str, NodeStates, NodeTypes, Packets
NodeStates, NodeTypes, Packets, ProtocolError)
class EventHandler(EventHandler):
def packetReceived(self, *args):
with self.app.dm.lock:
self.dispatch(*args)
class BaseHandler(EventHandler): class BaseHandler(EventHandler):
...@@ -31,6 +38,7 @@ class BaseHandler(EventHandler): ...@@ -31,6 +38,7 @@ class BaseHandler(EventHandler):
def abortTransaction(self, conn, ttid, _): def abortTransaction(self, conn, ttid, _):
self.notifyTransactionFinished(conn, ttid, None) self.notifyTransactionFinished(conn, ttid, None)
class BaseMasterHandler(BaseHandler): class BaseMasterHandler(BaseHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
...@@ -65,21 +73,53 @@ class BaseMasterHandler(BaseHandler): ...@@ -65,21 +73,53 @@ class BaseMasterHandler(BaseHandler):
# See comment in ClientOperationHandler.connectionClosed # See comment in ClientOperationHandler.connectionClosed
self.app.tm.abortFor(uuid, even_if_voted=True) self.app.tm.abortFor(uuid, even_if_voted=True)
def notifyPackSigned(self, conn, approved, rejected):
app = self.app
if not app.disable_pack:
app.replicator.keepPendingSignedPackOrders(
*app.dm.signPackOrders(approved, rejected))
if approved:
pack_id = max(approved)
if app.last_pack_id < pack_id:
app.last_pack_id = pack_id
if app.operational:
if app.disable_pack:
app.notifyPackCompleted()
else:
app.maybePack()
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list): def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
app = self.app app = self.app
if ptid != 1 + app.pt.getID(): if ptid != 1 + app.pt.getID():
raise ProtocolError('wrong partition table id') raise ProtocolError('wrong partition table id')
if app.operational:
getOutdatedOffsetList = partial(
app.pt.getOutdatedOffsetListFor, app.uuid)
were_outdated = set(getOutdatedOffsetList())
app.pt.update(ptid, num_replicas, cell_list, app.nm) app.pt.update(ptid, num_replicas, cell_list, app.nm)
app.dm.changePartitionTable(ptid, num_replicas, cell_list) app.dm.changePartitionTable(app, ptid, num_replicas, cell_list)
if app.operational: if app.operational:
app.replicator.notifyPartitionChanges(cell_list) app.replicator.notifyPartitionChanges(cell_list)
# The U -> !U case is already handled by dm.changePartitionTable.
# XXX: What about CORRUPTED cells?
were_outdated.difference_update(getOutdatedOffsetList())
if were_outdated: # O -> !O
# After a cell is discarded,
# the smallest pt.pack may be greater.
app.notifyPackCompleted()
# And we may start processing the next pack order.
app.maybePack()
app.dm.commit() app.dm.commit()
def askFinalTID(self, conn, ttid): def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid))) conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid)))
def askPackOrders(self, conn, min_completed_id):
conn.answer(Packets.AnswerPackOrders(
self.app.dm.getPackOrders(min_completed_id)))
def notifyRepair(self, conn, *args): def notifyRepair(self, conn, *args):
app = self.app app = self.app
app.dm.repair(weakref.ref(app), *args) app.dm.repair(app, *args)
...@@ -15,9 +15,10 @@ ...@@ -15,9 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import NonReadableCell, ProtocolError, UndoPackError
from neo.lib.handler import DelayEvent from neo.lib.handler import DelayEvent
from neo.lib.util import dump, makeChecksum, add64 from neo.lib.util import dump, makeChecksum, add64
from neo.lib.protocol import Packets, Errors, NonReadableCell, ProtocolError, \ from neo.lib.protocol import Packets, Errors, \
ZERO_HASH, ZERO_TID, INVALID_PARTITION ZERO_HASH, ZERO_TID, INVALID_PARTITION
from ..transactions import ConflictError, NotRegisteredError from ..transactions import ConflictError, NotRegisteredError
from . import BaseHandler from . import BaseHandler
...@@ -45,6 +46,7 @@ class ClientOperationHandler(BaseHandler): ...@@ -45,6 +46,7 @@ class ClientOperationHandler(BaseHandler):
# not releasing write-locks now would lead to a deadlock. # not releasing write-locks now would lead to a deadlock.
# - A client node may be disconnected from the master, whereas # - A client node may be disconnected from the master, whereas
# there are still voted (and not locked) transactions to abort. # there are still voted (and not locked) transactions to abort.
with app.dm.lock:
app.tm.abortFor(conn.getUUID()) app.tm.abortFor(conn.getUUID())
def askTransactionInformation(self, conn, tid): def askTransactionInformation(self, conn, tid):
...@@ -53,7 +55,7 @@ class ClientOperationHandler(BaseHandler): ...@@ -53,7 +55,7 @@ class ClientOperationHandler(BaseHandler):
p = Errors.TidNotFound('%s does not exist' % dump(tid)) p = Errors.TidNotFound('%s does not exist' % dump(tid))
else: else:
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3],
bool(t[4]), t[0]) t[4], t[0])
conn.answer(p) conn.answer(p)
def getEventQueue(self): def getEventQueue(self):
...@@ -105,6 +107,10 @@ class ClientOperationHandler(BaseHandler): ...@@ -105,6 +107,10 @@ class ClientOperationHandler(BaseHandler):
dump(oid), dump(serial), dump(ttid), dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid))) dump(self.app.tm.getLockingTID(oid)))
locked = ZERO_TID locked = ZERO_TID
except UndoPackError:
conn.answer(Errors.UndoPackError(
'Could not undo for oid %s' % dump(oid)))
return
else: else:
if request_time and SLOW_STORE is not None: if request_time and SLOW_STORE is not None:
duration = time.time() - request_time duration = time.time() - request_time
...@@ -121,7 +127,6 @@ class ClientOperationHandler(BaseHandler): ...@@ -121,7 +127,6 @@ class ClientOperationHandler(BaseHandler):
if data or checksum != ZERO_HASH: if data or checksum != ZERO_HASH:
# TODO: return an appropriate error packet # TODO: return an appropriate error packet
assert makeChecksum(data) == checksum assert makeChecksum(data) == checksum
assert data_serial is None
else: else:
checksum = data = None checksum = data = None
try: try:
...@@ -199,7 +204,8 @@ class ClientOperationHandler(BaseHandler): ...@@ -199,7 +204,8 @@ class ClientOperationHandler(BaseHandler):
app = self.app app = self.app
if app.tm.loadLocked(oid): if app.tm.loadLocked(oid):
raise DelayEvent raise DelayEvent
history_list = app.dm.getObjectHistory(oid, first, last - first) history_list = app.dm.getObjectHistoryWithLength(
oid, first, last - first)
if history_list is None: if history_list is None:
p = Errors.OidNotFound(dump(oid)) p = Errors.OidNotFound(dump(oid))
else: else:
...@@ -300,5 +306,5 @@ class ClientReadOnlyOperationHandler(ClientOperationHandler): ...@@ -300,5 +306,5 @@ class ClientReadOnlyOperationHandler(ClientOperationHandler):
# (askObjectUndoSerial is used in undo() but itself is read-only query) # (askObjectUndoSerial is used in undo() but itself is read-only query)
# FIXME askObjectHistory to limit tid <= backup_tid # FIXME askObjectHistory to limit tid <= backup_tid
# TODO dm.getObjectHistory has to be first fixed for this # TODO dm.getObjectHistoryWithLength has to be first fixed for this
#def askObjectHistory(self, conn, oid, first, last): #def askObjectHistory(self, conn, oid, first, last):
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import EventHandler from neo.lib.exception import NotReadyError, ProtocolError
from neo.lib.protocol import NodeTypes, NotReadyError, Packets from neo.lib.protocol import NodeTypes, Packets
from neo.lib.protocol import ProtocolError from . import EventHandler
from .storage import StorageOperationHandler from .storage import StorageOperationHandler
from .client import ClientOperationHandler, ClientReadOnlyOperationHandler from .client import ClientOperationHandler, ClientReadOnlyOperationHandler
......
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import BaseMasterHandler from . import BaseMasterHandler
from neo.lib import logging from neo.lib.exception import ProtocolError
from neo.lib.protocol import Packets, ProtocolError, ZERO_TID from neo.lib.protocol import Packets
class InitializationHandler(BaseMasterHandler): class InitializationHandler(BaseMasterHandler):
...@@ -26,25 +26,11 @@ class InitializationHandler(BaseMasterHandler): ...@@ -26,25 +26,11 @@ class InitializationHandler(BaseMasterHandler):
pt.load(ptid, num_replicas, row_list, app.nm) pt.load(ptid, num_replicas, row_list, app.nm)
if not pt.filled(): if not pt.filled():
raise ProtocolError('Partial partition table received') raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence. cell_list = [(offset, cell.getUUID(), cell.getState())
cell_list = [] for offset in xrange(pt.getPartitions())
unassigned = range(pt.getPartitions()) for cell in pt.getCellList(offset)]
for offset in reversed(unassigned):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(), cell.getState()))
if cell.getUUID() == app.uuid:
unassigned.remove(offset)
# delete objects database
dm = app.dm dm = app.dm
if unassigned: dm.changePartitionTable(app, ptid, num_replicas, cell_list, reset=True)
if app.disable_drop_partitions:
logging.info('partitions %r are discarded but actual deletion'
' of data is disabled', unassigned)
else:
logging.debug('drop data for partitions %r', unassigned)
dm.dropPartitions(unassigned)
dm.changePartitionTable(ptid, num_replicas, cell_list, reset=True)
dm.commit() dm.commit()
def truncate(self, conn, tid): def truncate(self, conn, tid):
...@@ -61,10 +47,15 @@ class InitializationHandler(BaseMasterHandler): ...@@ -61,10 +47,15 @@ class InitializationHandler(BaseMasterHandler):
app.dm.getTruncateTID())) app.dm.getTruncateTID()))
def askLastIDs(self, conn): def askLastIDs(self, conn):
dm = self.app.dm app = self.app
dm = app.dm
dm.truncate() dm.truncate()
ltid, loid = dm.getLastIDs() if not app.disable_pack:
conn.answer(Packets.AnswerLastIDs(loid, ltid)) packed = dm.getPackedIDs()
if packed:
self.app.completed_pack_id = pack_id = min(packed.itervalues())
conn.send(Packets.NotifyPackCompleted(pack_id))
conn.answer(Packets.AnswerLastIDs(*dm.getLastIDs()))
def askPartitionTable(self, conn): def askPartitionTable(self, conn):
pt = self.app.pt pt = self.app.pt
...@@ -77,8 +68,8 @@ class InitializationHandler(BaseMasterHandler): ...@@ -77,8 +68,8 @@ class InitializationHandler(BaseMasterHandler):
def validateTransaction(self, conn, ttid, tid): def validateTransaction(self, conn, ttid, tid):
dm = self.app.dm dm = self.app.dm
dm.lockTransaction(tid, ttid) dm.lockTransaction(tid, ttid, True)
dm.unlockTransaction(tid, ttid, True, True) dm.unlockTransaction(tid, ttid, True, True, True)
dm.commit() dm.commit()
def startOperation(self, conn, backup): def startOperation(self, conn, backup):
......
...@@ -28,19 +28,16 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -28,19 +28,16 @@ class MasterOperationHandler(BaseMasterHandler):
assert self.app.operational and backup assert self.app.operational and backup
self.app.replicator.startOperation(backup) self.app.replicator.startOperation(backup)
def askLockInformation(self, conn, ttid, tid): def askLockInformation(self, conn, ttid, tid, pack):
self.app.tm.lock(ttid, tid) self.app.tm.lock(ttid, tid, pack)
conn.answer(Packets.AnswerInformationLocked(ttid)) conn.answer(Packets.AnswerInformationLocked(ttid))
def notifyUnlockInformation(self, conn, ttid): def notifyUnlockInformation(self, conn, ttid):
self.app.tm.unlock(ttid) self.app.tm.unlock(ttid)
def askPack(self, conn, tid): def answerPackOrders(self, conn, pack_list, pack_id):
app = self.app if pack_list:
logging.info('Pack started, up to %s...', dump(tid)) self.app.maybePack(pack_list[0], pack_id)
app.dm.pack(tid, app.tm.updateObjectDataForPack)
logging.info('Pack finished.')
conn.answer(Packets.AnswerPack(True))
def answerUnfinishedTransactions(self, conn, *args, **kw): def answerUnfinishedTransactions(self, conn, *args, **kw):
self.app.replicator.setUnfinishedTIDList(*args, **kw) self.app.replicator.setUnfinishedTIDList(*args, **kw)
......
...@@ -17,8 +17,10 @@ ...@@ -17,8 +17,10 @@
import weakref import weakref
from functools import wraps from functools import wraps
from neo.lib.connection import ConnectionClosed from neo.lib.connection import ConnectionClosed
from neo.lib.handler import DelayEvent, EventHandler from neo.lib.exception import ProtocolError
from neo.lib.protocol import Errors, Packets, ProtocolError, ZERO_HASH from neo.lib.handler import DelayEvent
from neo.lib.protocol import Errors, Packets, ZERO_HASH
from . import EventHandler
def checkConnectionIsReplicatorConnection(func): def checkConnectionIsReplicatorConnection(func):
def wrapper(self, conn, *args, **kw): def wrapper(self, conn, *args, **kw):
...@@ -46,6 +48,7 @@ class StorageOperationHandler(EventHandler): ...@@ -46,6 +48,7 @@ class StorageOperationHandler(EventHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
app = self.app app = self.app
if app.operational and conn.isClient(): if app.operational and conn.isClient():
with app.dm.lock:
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid: if uuid:
node = app.nm.getByUUID(uuid) node = app.nm.getByUUID(uuid)
...@@ -68,33 +71,36 @@ class StorageOperationHandler(EventHandler): ...@@ -68,33 +71,36 @@ class StorageOperationHandler(EventHandler):
self.app.checker.connected(node) self.app.checker.connected(node)
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerFetchTransactions(self, conn, pack_tid, next_tid, tid_list): def answerFetchTransactions(self, conn, next_tid, tid_list, completed_pack):
app = self.app
if tid_list: if tid_list:
deleteTransaction = self.app.dm.deleteTransaction deleteTransaction = app.dm.deleteTransaction
for tid in tid_list: for tid in tid_list:
deleteTransaction(tid) deleteTransaction(tid)
assert not pack_tid, "TODO" if completed_pack is not None:
app.dm.updateCompletedPackByReplication(
app.replicator.current_partition, completed_pack)
if next_tid: if next_tid:
self.app.replicator.fetchTransactions(next_tid) app.replicator.fetchTransactions(next_tid)
else: else:
self.app.replicator.fetchObjects() app.replicator.fetchObjects()
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def addTransaction(self, conn, tid, user, desc, ext, packed, ttid, def addTransaction(self, conn, tid, user, desc, ext, packed, ttid,
oid_list): oid_list, pack):
# Directly store the transaction. # Directly store the transaction.
self.app.dm.storeTransaction(tid, (), self.app.dm.storeTransaction(tid, (),
(oid_list, user, desc, ext, packed, ttid), False) (oid_list, user, desc, ext, packed, ttid), False)
if pack:
self.app.dm.storePackOrder(tid, *pack)
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerFetchObjects(self, conn, pack_tid, next_tid, def answerFetchObjects(self, conn, next_tid, next_oid, object_dict):
next_oid, object_dict):
if object_dict: if object_dict:
deleteObject = self.app.dm.deleteObject deleteObject = self.app.dm.deleteObject
for serial, oid_list in object_dict.iteritems(): for serial, oid_list in object_dict.iteritems():
for oid in oid_list: for oid in oid_list:
deleteObject(oid, serial) deleteObject(oid, serial)
assert not pack_tid, "TODO"
if next_tid: if next_tid:
# TODO also provide feedback to master about current replication state (tid) # TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid) self.app.replicator.fetchObjects(next_tid, next_oid)
...@@ -106,13 +112,10 @@ class StorageOperationHandler(EventHandler): ...@@ -106,13 +112,10 @@ class StorageOperationHandler(EventHandler):
def addObject(self, conn, oid, serial, compression, def addObject(self, conn, oid, serial, compression,
checksum, data, data_serial): checksum, data, data_serial):
dm = self.app.dm dm = self.app.dm
if data or checksum != ZERO_HASH: if not data and checksum == ZERO_HASH:
data_id = dm.storeData(checksum, oid, data, compression) checksum = data = None
else: data_id = dm.storeData(checksum, oid, data, compression, data_serial)
data_id = None dm.storeTransaction(serial, ((oid, data_id, data_serial),), None, False)
# Directly store the transaction.
obj = oid, data_id, data_serial
dm.storeTransaction(serial, (obj,), None, False)
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def replicationError(self, conn, message): def replicationError(self, conn, message):
...@@ -178,7 +181,7 @@ class StorageOperationHandler(EventHandler): ...@@ -178,7 +181,7 @@ class StorageOperationHandler(EventHandler):
@checkFeedingConnection(check=False) @checkFeedingConnection(check=False)
def askFetchTransactions(self, conn, partition, length, min_tid, max_tid, def askFetchTransactions(self, conn, partition, length, min_tid, max_tid,
tid_list): tid_list, ask_pack_info):
app = self.app app = self.app
if app.tm.isLockedTid(max_tid): if app.tm.isLockedTid(max_tid):
# Wow, backup cluster is fast. Requested transactions are still in # Wow, backup cluster is fast. Requested transactions are still in
...@@ -192,12 +195,12 @@ class StorageOperationHandler(EventHandler): ...@@ -192,12 +195,12 @@ class StorageOperationHandler(EventHandler):
conn = weakref.proxy(conn) conn = weakref.proxy(conn)
peer_tid_set = set(tid_list) peer_tid_set = set(tid_list)
dm = app.dm dm = app.dm
completed_pack = dm.getPackedIDs()[partition] if ask_pack_info else None
tid_list = dm.getReplicationTIDList(min_tid, max_tid, length + 1, tid_list = dm.getReplicationTIDList(min_tid, max_tid, length + 1,
partition) partition)
next_tid = tid_list.pop() if length < len(tid_list) else None next_tid = tid_list.pop() if length < len(tid_list) else None
def push(): def push():
try: try:
pack_tid = None # TODO
for tid in tid_list: for tid in tid_list:
if tid in peer_tid_set: if tid in peer_tid_set:
peer_tid_set.remove(tid) peer_tid_set.remove(tid)
...@@ -208,11 +211,11 @@ class StorageOperationHandler(EventHandler): ...@@ -208,11 +211,11 @@ class StorageOperationHandler(EventHandler):
"partition %u dropped" "partition %u dropped"
% partition), msg_id) % partition), msg_id)
return return
oid_list, user, desc, ext, packed, ttid = t oid_list, user, desc, ext, packed, ttid, pack = t
# Sending such packet does not mark the connection # Sending such packet does not mark the connection
# for writing if there's too little data in the buffer. # for writing if there's too little data in the buffer.
conn.send(Packets.AddTransaction(tid, user, conn.send(Packets.AddTransaction(tid, user,
desc, ext, bool(packed), ttid, oid_list), msg_id) desc, ext, packed, ttid, oid_list, pack), msg_id)
# To avoid delaying several connections simultaneously, # To avoid delaying several connections simultaneously,
# and also prevent the backend from scanning different # and also prevent the backend from scanning different
# parts of the DB at the same time, we ask the # parts of the DB at the same time, we ask the
...@@ -221,7 +224,7 @@ class StorageOperationHandler(EventHandler): ...@@ -221,7 +224,7 @@ class StorageOperationHandler(EventHandler):
# is flushing another one for a concurrent connection. # is flushing another one for a concurrent connection.
yield conn.buffering yield conn.buffering
conn.send(Packets.AnswerFetchTransactions( conn.send(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id) next_tid, peer_tid_set, completed_pack), msg_id)
yield yield
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
...@@ -244,7 +247,6 @@ class StorageOperationHandler(EventHandler): ...@@ -244,7 +247,6 @@ class StorageOperationHandler(EventHandler):
next_tid = next_oid = None next_tid = next_oid = None
def push(): def push():
try: try:
pack_tid = None # TODO
for serial, oid in object_list: for serial, oid in object_list:
oid_set = object_dict.get(serial) oid_set = object_dict.get(serial)
if oid_set: if oid_set:
...@@ -267,7 +269,7 @@ class StorageOperationHandler(EventHandler): ...@@ -267,7 +269,7 @@ class StorageOperationHandler(EventHandler):
conn.send(Packets.AddObject(oid, *object), msg_id) conn.send(Packets.AddObject(oid, *object), msg_id)
yield conn.buffering yield conn.buffering
conn.send(Packets.AnswerFetchObjects( conn.send(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) next_tid, next_oid, object_dict), msg_id)
yield yield
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
......
...@@ -93,7 +93,7 @@ from neo.lib import logging ...@@ -93,7 +93,7 @@ from neo.lib import logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \ from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \
Packets, INVALID_TID, ZERO_TID, ZERO_OID Packets, INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.connection import ClientConnection, ConnectionClosed from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.util import add64, dump, p64 from neo.lib.util import add64, dump, p64, u64
from .handlers.storage import StorageOperationHandler from .handlers.storage import StorageOperationHandler
FETCH_COUNT = 1000 FETCH_COUNT = 1000
...@@ -101,7 +101,10 @@ FETCH_COUNT = 1000 ...@@ -101,7 +101,10 @@ FETCH_COUNT = 1000
class Partition(object): class Partition(object):
__slots__ = 'next_trans', 'next_obj', 'max_ttid' __slots__ = 'next_trans', 'next_obj', 'max_ttid', 'pack'
def __init__(self):
self.pack = [], [] # approved, rejected
def __repr__(self): def __repr__(self):
return '<%s(%s) at 0x%x>' % (self.__class__.__name__, return '<%s(%s) at 0x%x>' % (self.__class__.__name__,
...@@ -365,11 +368,13 @@ class Replicator(object): ...@@ -365,11 +368,13 @@ class Replicator(object):
assert self.current_node.getConnection().isClient(), self.current_node assert self.current_node.getConnection().isClient(), self.current_node
offset = self.current_partition offset = self.current_partition
p = self.partition_dict[offset] p = self.partition_dict[offset]
dm = self.app.dm
if min_tid: if min_tid:
# More than one chunk ? This could be a full replication so avoid # More than one chunk ? This could be a full replication so avoid
# restarting from the beginning by committing now. # restarting from the beginning by committing now.
self.app.dm.commit() dm.commit()
p.next_trans = min_tid p.next_trans = min_tid
ask_pack_info = False
else: else:
try: try:
addr, name = self.source_dict[offset] addr, name = self.source_dict[offset]
...@@ -383,11 +388,13 @@ class Replicator(object): ...@@ -383,11 +388,13 @@ class Replicator(object):
logging.debug("starting replication of <partition=%u" logging.debug("starting replication of <partition=%u"
" min_tid=%s max_tid=%s> from %r", offset, dump(min_tid), " min_tid=%s max_tid=%s> from %r", offset, dump(min_tid),
dump(self.replicate_tid), self.current_node) dump(self.replicate_tid), self.current_node)
ask_pack_info = True
dm.checkNotProcessing(self.app, offset, min_tid)
max_tid = self.replicate_tid max_tid = self.replicate_tid
tid_list = self.app.dm.getReplicationTIDList(min_tid, max_tid, tid_list = dm.getReplicationTIDList(min_tid, max_tid,
FETCH_COUNT, offset) FETCH_COUNT, offset)
self._conn_msg_id = self.current_node.ask(Packets.AskFetchTransactions( self._conn_msg_id = self.current_node.ask(Packets.AskFetchTransactions(
offset, FETCH_COUNT, min_tid, max_tid, tid_list)) offset, FETCH_COUNT, min_tid, max_tid, tid_list, ask_pack_info))
def fetchObjects(self, min_tid=None, min_oid=ZERO_OID): def fetchObjects(self, min_tid=None, min_oid=ZERO_OID):
offset = self.current_partition offset = self.current_partition
...@@ -398,10 +405,12 @@ class Replicator(object): ...@@ -398,10 +405,12 @@ class Replicator(object):
p.next_obj = min_tid p.next_obj = min_tid
self.updateBackupTID() self.updateBackupTID()
dm.updateCellTID(offset, add64(min_tid, -1)) dm.updateCellTID(offset, add64(min_tid, -1))
dm.commit() # like in fetchTransactions
else: else:
min_tid = p.next_obj min_tid = p.next_obj
p.next_trans = add64(max_tid, 1) p.next_trans = add64(max_tid, 1)
if any(p.pack): # only useful in backup mode
p.pack = self.app.dm.signPackOrders(*p.pack, auto_commit=False)
dm.commit()
object_dict = {} object_dict = {}
for serial, oid in dm.getReplicationObjectList(min_tid, for serial, oid in dm.getReplicationObjectList(min_tid,
max_tid, FETCH_COUNT, offset, min_oid): max_tid, FETCH_COUNT, offset, min_oid):
...@@ -429,6 +438,8 @@ class Replicator(object): ...@@ -429,6 +438,8 @@ class Replicator(object):
app.tm.replicated(offset, tid) app.tm.replicated(offset, tid)
logging.debug("partition %u replicated up to %s from %r", logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node) offset, dump(tid), self.current_node)
if app.pt.getCell(offset, app.uuid).isUpToDate():
app.maybePack() # only useful in backup mode
self.getCurrentConnection().setReconnectionNoDelay() self.getCurrentConnection().setReconnectionNoDelay()
self._nextPartition() self._nextPartition()
...@@ -476,3 +487,22 @@ class Replicator(object): ...@@ -476,3 +487,22 @@ class Replicator(object):
' up to %s', offset, addr, dump(tid)) ' up to %s', offset, addr, dump(tid))
# Make UP_TO_DATE cells really UP_TO_DATE # Make UP_TO_DATE cells really UP_TO_DATE
self._nextPartition() self._nextPartition()
def filterPackable(self, tid, parts):
backup = self.app.dm.getBackupTID()
for offset in parts:
if backup:
p = self.partition_dict[offset]
if (None is not p.next_trans <= tid or
None is not p.next_obj <= tid):
continue
yield offset
def keepPendingSignedPackOrders(self, *args):
np = self.app.pt.getPartitions()
for i, x in enumerate(args):
for x in x:
try:
self.partition_dict[u64(x) % np].pack[i].append(x)
except KeyError:
pass
...@@ -16,10 +16,10 @@ ...@@ -16,10 +16,10 @@
from time import time from time import time
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import NonReadableCell, ProtocolError
from neo.lib.handler import DelayEvent, EventQueue from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.util import cached_property, dump from neo.lib.util import cached_property, dump
from neo.lib.protocol import Packets, ProtocolError, NonReadableCell, \ from neo.lib.protocol import Packets, uuid_str, MAX_TID, ZERO_TID
uuid_str, MAX_TID, ZERO_TID
class ConflictError(Exception): class ConflictError(Exception):
""" """
...@@ -42,6 +42,7 @@ class Transaction(object): ...@@ -42,6 +42,7 @@ class Transaction(object):
Container for a pending transaction Container for a pending transaction
""" """
_delayed = {} _delayed = {}
pack = False
tid = None tid = None
voted = 0 voted = 0
...@@ -231,17 +232,22 @@ class TransactionManager(EventQueue): ...@@ -231,17 +232,22 @@ class TransactionManager(EventQueue):
raise ProtocolError("unknown ttid %s" % dump(ttid)) raise ProtocolError("unknown ttid %s" % dump(ttid))
object_list = transaction.store_dict.itervalues() object_list = transaction.store_dict.itervalues()
if txn_info: if txn_info:
user, desc, ext, oid_list = txn_info user, desc, ext, oid_list, pack = txn_info
txn_info = oid_list, user, desc, ext, False, ttid txn_info = oid_list, user, desc, ext, False, ttid
transaction.voted = 2 transaction.voted = 2
else: else:
pack = None
transaction.voted = 1 transaction.voted = 1
# store metadata to temporary table # store metadata to temporary table
dm = self._app.dm dm = self._app.dm
dm.storeTransaction(ttid, object_list, txn_info) dm.storeTransaction(ttid, object_list, txn_info)
if pack:
transaction.pack = True
oid_list, pack_tid = pack
dm.storePackOrder(ttid, None, bool(oid_list), oid_list, pack_tid)
dm.commit() dm.commit()
def lock(self, ttid, tid): def lock(self, ttid, tid, pack):
""" """
Lock a transaction Lock a transaction
""" """
...@@ -256,7 +262,7 @@ class TransactionManager(EventQueue): ...@@ -256,7 +262,7 @@ class TransactionManager(EventQueue):
self._load_lock_dict.update( self._load_lock_dict.update(
dict.fromkeys(transaction.store_dict, ttid)) dict.fromkeys(transaction.store_dict, ttid))
if transaction.voted == 2: if transaction.voted == 2:
self._app.dm.lockTransaction(tid, ttid) self._app.dm.lockTransaction(tid, ttid, pack)
else: else:
assert transaction.voted assert transaction.voted
...@@ -273,7 +279,8 @@ class TransactionManager(EventQueue): ...@@ -273,7 +279,8 @@ class TransactionManager(EventQueue):
dm = self._app.dm dm = self._app.dm
dm.unlockTransaction(tid, ttid, dm.unlockTransaction(tid, ttid,
transaction.voted == 2, transaction.voted == 2,
transaction.store_dict) transaction.store_dict,
transaction.pack)
self._app.em.setTimeout(time() + 1, dm.deferCommit()) self._app.em.setTimeout(time() + 1, dm.deferCommit())
self.abort(ttid, even_if_locked=True) self.abort(ttid, even_if_locked=True)
...@@ -425,11 +432,8 @@ class TransactionManager(EventQueue): ...@@ -425,11 +432,8 @@ class TransactionManager(EventQueue):
self._unstore(transaction, oid) self._unstore(transaction, oid)
transaction.serial_dict[oid] = serial transaction.serial_dict[oid] = serial
# store object # store object
if data is None: transaction.store(oid, self._app.dm.holdData(
data_id = None checksum, oid, data, compression, value_serial), value_serial)
else:
data_id = self._app.dm.holdData(checksum, oid, data, compression)
transaction.store(oid, data_id, value_serial)
if not locked: if not locked:
return ZERO_TID return ZERO_TID
...@@ -567,14 +571,3 @@ class TransactionManager(EventQueue): ...@@ -567,14 +571,3 @@ class TransactionManager(EventQueue):
logging.info(' %s by %s', dump(oid), dump(ttid)) logging.info(' %s by %s', dump(oid), dump(ttid))
self.logQueuedEvents() self.logQueuedEvents()
self.read_queue.logQueuedEvents() self.read_queue.logQueuedEvents()
def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id):
lock_tid = self.getLockingTID(oid)
if lock_tid is not None:
transaction = self._transaction_dict[lock_tid]
if transaction.store_dict[oid][2] == orig_serial:
if new_serial:
data_id = None
else:
self._app.dm.holdData(data_id)
transaction.store(oid, data_id, new_serial)
...@@ -20,10 +20,12 @@ import functools ...@@ -20,10 +20,12 @@ import functools
import gc import gc
import os import os
import random import random
import signal
import socket import socket
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import thread
import unittest import unittest
import weakref import weakref
import transaction import transaction
...@@ -37,10 +39,12 @@ except ImportError: ...@@ -37,10 +39,12 @@ except ImportError:
from cPickle import Unpickler from cPickle import Unpickler
from functools import wraps from functools import wraps
from inspect import isclass from inspect import isclass
from itertools import islice
from .mock import Mock from .mock import Mock
from neo.lib import debug, logging, protocol from neo.lib import debug, event, logging
from neo.lib.protocol import NodeTypes, Packets, UUID_NAMESPACES from neo.lib.protocol import NodeTypes, Packet, Packets, UUID_NAMESPACES
from neo.lib.util import cached_property from neo.lib.util import cached_property
from neo.storage.database.manager import DatabaseManager
from time import time, sleep from time import time, sleep
from struct import pack, unpack from struct import pack, unpack
from unittest.case import _ExpectedFailure, _UnexpectedSuccess from unittest.case import _ExpectedFailure, _UnexpectedSuccess
...@@ -76,6 +80,8 @@ DB_INSTALL = os.getenv('NEO_DB_INSTALL', 'mysql_install_db') ...@@ -76,6 +80,8 @@ DB_INSTALL = os.getenv('NEO_DB_INSTALL', 'mysql_install_db')
DB_MYSQLD = os.getenv('NEO_DB_MYSQLD', '/usr/sbin/mysqld') DB_MYSQLD = os.getenv('NEO_DB_MYSQLD', '/usr/sbin/mysqld')
DB_MYCNF = os.getenv('NEO_DB_MYCNF') DB_MYCNF = os.getenv('NEO_DB_MYCNF')
DatabaseManager.TEST_IDENT = thread.get_ident()
adapter = os.getenv('NEO_TESTS_ADAPTER') adapter = os.getenv('NEO_TESTS_ADAPTER')
if adapter: if adapter:
from neo.storage.database import getAdapterKlass from neo.storage.database import getAdapterKlass
...@@ -96,6 +102,12 @@ logging.default_root_handler.handle = lambda record: None ...@@ -96,6 +102,12 @@ logging.default_root_handler.handle = lambda record: None
debug.register() debug.register()
# XXX: Not so important and complicated to make it work in the test process
# because there may be several EpollEventManager and threads.
# We only need it in child processes so that functional tests can stop.
event.set_wakeup_fd = lambda fd, pid=os.getpid(): (
-1 if pid == os.getpid() else signal.set_wakeup_fd(fd))
def mockDefaultValue(name, function): def mockDefaultValue(name, function):
def method(self, *args, **kw): def method(self, *args, **kw):
if name in self.mockReturnValues: if name in self.mockReturnValues:
...@@ -432,10 +444,6 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -432,10 +444,6 @@ class NeoUnitTestBase(NeoTestBase):
conn.connecting = False conn.connecting = False
return conn return conn
def checkProtocolErrorRaised(self, method, *args, **kwargs):
""" Check if the ProtocolError exception was raised """
self.assertRaises(protocol.ProtocolError, method, *args, **kwargs)
def checkAborted(self, conn): def checkAborted(self, conn):
""" Ensure the connection was aborted """ """ Ensure the connection was aborted """
self.assertEqual(len(conn.mockGetNamedCalls('abort')), 1) self.assertEqual(len(conn.mockGetNamedCalls('abort')), 1)
...@@ -461,7 +469,7 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -461,7 +469,7 @@ class NeoUnitTestBase(NeoTestBase):
calls = conn.mockGetNamedCalls("answer") calls = conn.mockGetNamedCalls("answer")
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
packet = calls.pop().getParam(0) packet = calls.pop().getParam(0)
self.assertTrue(isinstance(packet, protocol.Packet)) self.assertTrue(isinstance(packet, Packet))
self.assertEqual(type(packet), Packets.Error) self.assertEqual(type(packet), Packets.Error)
return packet return packet
...@@ -470,7 +478,7 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -470,7 +478,7 @@ class NeoUnitTestBase(NeoTestBase):
calls = conn.mockGetNamedCalls('ask') calls = conn.mockGetNamedCalls('ask')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
packet = calls.pop().getParam(0) packet = calls.pop().getParam(0)
self.assertTrue(isinstance(packet, protocol.Packet)) self.assertTrue(isinstance(packet, Packet))
self.assertEqual(type(packet), packet_type) self.assertEqual(type(packet), packet_type)
return packet return packet
...@@ -479,7 +487,7 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -479,7 +487,7 @@ class NeoUnitTestBase(NeoTestBase):
calls = conn.mockGetNamedCalls('answer') calls = conn.mockGetNamedCalls('answer')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
packet = calls.pop().getParam(0) packet = calls.pop().getParam(0)
self.assertTrue(isinstance(packet, protocol.Packet)) self.assertTrue(isinstance(packet, Packet))
self.assertEqual(type(packet), packet_type) self.assertEqual(type(packet), packet_type)
return packet return packet
...@@ -487,7 +495,7 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -487,7 +495,7 @@ class NeoUnitTestBase(NeoTestBase):
""" Check if a notify-packet with the right type is sent """ """ Check if a notify-packet with the right type is sent """
calls = conn.mockGetNamedCalls('send') calls = conn.mockGetNamedCalls('send')
packet = calls.pop(packet_number).getParam(0) packet = calls.pop(packet_number).getParam(0)
self.assertTrue(isinstance(packet, protocol.Packet)) self.assertTrue(isinstance(packet, Packet))
self.assertEqual(type(packet), packet_type) self.assertEqual(type(packet), packet_type)
return packet return packet
...@@ -626,6 +634,9 @@ class Patch(object): ...@@ -626,6 +634,9 @@ class Patch(object):
def __exit__(self, t, v, tb): def __exit__(self, t, v, tb):
self.__del__() self.__del__()
def consume(iterator, n):
"""Advance the iterator n-steps ahead and returns the last consumed item"""
return next(islice(iterator, n-1, n))
def unpickle_state(data): def unpickle_state(data):
unpickler = Unpickler(StringIO(data)) unpickler = Unpickler(StringIO(data))
......
...@@ -201,14 +201,6 @@ class Process(object): ...@@ -201,14 +201,6 @@ class Process(object):
logging._max_size, logging._max_packet, logging._max_size, logging._max_packet,
command), command),
*args) *args)
# XXX: Sometimes, the handler is not called immediately.
# The process is stuck at an unknown place and the test
# never ends. strace unlocks:
# strace: Process 5520 attached
# close(25) = 0
# getpid() = 5520
# kill(5520, SIGSTOP) = 0
# ...
signal.signal(signal.SIGUSR2, save_coverage) signal.signal(signal.SIGUSR2, save_coverage)
os.close(self._coverage_fd) os.close(self._coverage_fd)
os.write(w, '\0') os.write(w, '\0')
......
#
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from ..mock import Mock
from .. import NeoUnitTestBase
from neo.lib.util import p64
from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.master.app import Application
from neo.master.handlers.client import ClientServiceHandler
class MasterClientHandlerTests(NeoUnitTestBase):
def setUp(self):
NeoUnitTestBase.setUp(self)
# create an application object
config = self.getMasterConfiguration(master_number=1, replicas=1)
self.app = Application(config)
self.app.em.close()
self.app.em = Mock()
self.app.loid = '\0' * 8
self.app.tm.setLastTID('\0' * 8)
self.service = ClientServiceHandler(self.app)
# define some variable to simulate client and storage node
self.client_port = 11022
self.storage_port = 10021
self.client_address = ('127.0.0.1', self.client_port)
self.storage_address = ('127.0.0.1', self.storage_port)
self.storage_uuid = self.getStorageUUID()
# register the storage
self.app.nm.createStorage(
uuid=self.storage_uuid,
address=self.storage_address,
)
def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1",
port=10021):
"""Do first step of identification to MN """
# register the master itself
uuid = self.getNewUUID(node_type)
self.app.nm.createFromNodeType(
node_type,
address=(ip, port),
uuid=uuid,
state=NodeStates.RUNNING,
)
return uuid
def test_askPack(self):
self.assertEqual(self.app.packing, None)
self.app.nm.createClient()
tid = self.getNextTID()
peer_id = 42
conn = self.getFakeConnection(peer_id=peer_id)
storage_uuid = self.storage_uuid
storage_conn = self.getFakeConnection(storage_uuid,
self.storage_address, is_server=True)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
self.service.askPack(conn, tid)
self.checkNoPacketSent(conn)
ptid = self.checkAskPacket(storage_conn, Packets.AskPack)._args[0]
self.assertEqual(ptid, tid)
self.assertTrue(self.app.packing[0] is conn)
self.assertEqual(self.app.packing[1], peer_id)
self.assertEqual(self.app.packing[2], {storage_uuid})
# Asking again to pack will cause an immediate error
storage_uuid = self.identifyToMasterNode(port=10022)
storage_conn = self.getFakeConnection(storage_uuid,
self.storage_address, is_server=True)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
self.service.askPack(conn, tid)
self.checkNoPacketSent(storage_conn)
status = self.checkAnswerPacket(conn, Packets.AnswerPack)._args[0]
self.assertFalse(status)
if __name__ == '__main__':
unittest.main()
#
# Copyright (C) 2009-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from ..mock import Mock
from .. import NeoUnitTestBase
from neo.lib.protocol import NodeTypes, Packets
from neo.master.app import Application
from neo.master.handlers.storage import StorageServiceHandler
class MasterStorageHandlerTests(NeoUnitTestBase):
def setUp(self):
NeoUnitTestBase.setUp(self)
# create an application object
config = self.getMasterConfiguration(master_number=1, replicas=1)
self.app = Application(config)
self.app.em.close()
self.app.em = Mock()
self.service = StorageServiceHandler(self.app)
def _allocatePort(self):
self.port = getattr(self, 'port', 1000) + 1
return self.port
def _getStorage(self):
return self.identifyToMasterNode(node_type=NodeTypes.STORAGE,
ip='127.0.0.1', port=self._allocatePort())
def identifyToMasterNode(self, node_type=NodeTypes.STORAGE, ip="127.0.0.1",
port=10021):
"""Do first step of identification to MN
"""
nm = self.app.nm
uuid = self.getNewUUID(node_type)
node = nm.createFromNodeType(node_type, address=(ip, port),
uuid=uuid)
conn = self.getFakeConnection(node.getUUID(), node.getAddress(), True)
node.setConnection(conn)
return (node, conn)
def test_answerPack(self):
# Note: incoming status has no meaning here, so it's left to False.
node1, conn1 = self._getStorage()
node2, conn2 = self._getStorage()
self.app.packing = None
# Does nothing
self.service.answerPack(None, False)
client_conn = Mock({
'getPeerId': 512,
})
client_peer_id = 42
self.app.packing = (client_conn, client_peer_id,
{conn1.getUUID(), conn2.getUUID()})
self.service.answerPack(conn1, False)
self.checkNoPacketSent(client_conn)
self.assertEqual(self.app.packing[2], {conn2.getUUID()})
self.service.answerPack(conn2, False)
packet = self.checkNotifyPacket(client_conn, Packets.AnswerPack)
# TODO: verify packet peer id
self.assertTrue(packet._args[0])
self.assertEqual(self.app.packing, None)
if __name__ == '__main__':
unittest.main()
...@@ -3,14 +3,14 @@ AbortTransaction(p64,[int]) ...@@ -3,14 +3,14 @@ AbortTransaction(p64,[int])
AcceptIdentification(NodeTypes,?int,?int) AcceptIdentification(NodeTypes,?int,?int)
AddObject(p64,p64,int,bin,bin,?p64) AddObject(p64,p64,int,bin,bin,?p64)
AddPendingNodes([int]) AddPendingNodes([int])
AddTransaction(p64,bin,bin,bin,bool,p64,[p64]) AddTransaction(p64,bin,bin,bin,bool,p64,[p64],?(?bool,bool,?[p64],p64))
AnswerBeginTransaction(p64) AnswerBeginTransaction(p64)
AnswerCheckCurrentSerial(?p64) AnswerCheckCurrentSerial(?p64)
AnswerCheckSerialRange(int,bin,p64,bin,p64) AnswerCheckSerialRange(int,bin,p64,bin,p64)
AnswerCheckTIDRange(int,bin,p64) AnswerCheckTIDRange(int,bin,p64)
AnswerClusterState(?ClusterStates) AnswerClusterState(?ClusterStates)
AnswerFetchObjects(?,?p64,?p64,{:}) AnswerFetchObjects(?p64,?p64,{:})
AnswerFetchTransactions(?,?p64,[]) AnswerFetchTransactions(?p64,[],?p64)
AnswerFinalTID(p64) AnswerFinalTID(p64)
AnswerInformationLocked(p64) AnswerInformationLocked(p64)
AnswerLastIDs(?p64,?p64) AnswerLastIDs(?p64,?p64)
...@@ -22,7 +22,7 @@ AnswerNodeList([(NodeTypes,?(bin,int),?int,NodeStates,?float)]) ...@@ -22,7 +22,7 @@ AnswerNodeList([(NodeTypes,?(bin,int),?int,NodeStates,?float)])
AnswerObject(p64,p64,?p64,?int,bin,bin,?p64) AnswerObject(p64,p64,?p64,?int,bin,bin,?p64)
AnswerObjectHistory(p64,[(p64,int)]) AnswerObjectHistory(p64,[(p64,int)])
AnswerObjectUndoSerial({p64:(p64,?p64,bool)}) AnswerObjectUndoSerial({p64:(p64,?p64,bool)})
AnswerPack(bool) AnswerPackOrders([(p64,?bool,bool,?[p64],p64)])
AnswerPartitionList(int,int,[[(int,CellStates)]]) AnswerPartitionList(int,int,[[(int,CellStates)]])
AnswerPartitionTable(int,int,[[(int,CellStates)]]) AnswerPartitionTable(int,int,[[(int,CellStates)]])
AnswerPrimary(int) AnswerPrimary(int)
...@@ -43,12 +43,12 @@ AskCheckSerialRange(int,int,p64,p64,p64) ...@@ -43,12 +43,12 @@ AskCheckSerialRange(int,int,p64,p64,p64)
AskCheckTIDRange(int,int,p64,p64) AskCheckTIDRange(int,int,p64,p64)
AskClusterState() AskClusterState()
AskFetchObjects(int,int,p64,p64,p64,{p64:[p64]}) AskFetchObjects(int,int,p64,p64,p64,{p64:[p64]})
AskFetchTransactions(int,int,p64,p64,[p64]) AskFetchTransactions(int,int,p64,p64,[p64],bool)
AskFinalTID(p64) AskFinalTID(p64)
AskFinishTransaction(p64,[p64],[p64]) AskFinishTransaction(p64,[p64],[p64],?(?[p64],p64))
AskLastIDs() AskLastIDs()
AskLastTransaction() AskLastTransaction()
AskLockInformation(p64,p64) AskLockInformation(p64,p64,bool)
AskLockedTransactions() AskLockedTransactions()
AskMonitorInformation() AskMonitorInformation()
AskNewOIDs(int) AskNewOIDs(int)
...@@ -56,14 +56,14 @@ AskNodeList(NodeTypes) ...@@ -56,14 +56,14 @@ AskNodeList(NodeTypes)
AskObject(p64,?p64,?p64) AskObject(p64,?p64,?p64)
AskObjectHistory(p64,int,int) AskObjectHistory(p64,int,int)
AskObjectUndoSerial(p64,p64,p64,[p64]) AskObjectUndoSerial(p64,p64,p64,[p64])
AskPack(p64) AskPackOrders(p64)
AskPartitionList(int,int,?) AskPartitionList(int,int,?)
AskPartitionTable() AskPartitionTable()
AskPrimary() AskPrimary()
AskRecovery() AskRecovery()
AskRelockObject(p64,p64) AskRelockObject(p64,p64)
AskStoreObject(p64,p64,int,bin,bin,?p64,?p64) AskStoreObject(p64,p64,int,bin,bin,?p64,?p64)
AskStoreTransaction(p64,bin,bin,bin,[p64]) AskStoreTransaction(p64,bin,bin,bin,[p64],?(?[p64],p64))
AskTIDs(int,int,int) AskTIDs(int,int,int)
AskTIDsFrom(p64,p64,int,int) AskTIDsFrom(p64,p64,int,int)
AskTransactionInformation(p64) AskTransactionInformation(p64)
...@@ -79,6 +79,8 @@ NotifyClusterInformation(ClusterStates) ...@@ -79,6 +79,8 @@ NotifyClusterInformation(ClusterStates)
NotifyDeadlock(p64,p64) NotifyDeadlock(p64,p64)
NotifyMonitorInformation({bin:any}) NotifyMonitorInformation({bin:any})
NotifyNodeInformation(float,[(NodeTypes,?(bin,int),?int,NodeStates,?float)]) NotifyNodeInformation(float,[(NodeTypes,?(bin,int),?int,NodeStates,?float)])
NotifyPackCompleted(p64)
NotifyPackSigned([p64],[p64])
NotifyPartitionChanges(int,int,[(int,int,CellStates)]) NotifyPartitionChanges(int,int,[(int,int,CellStates)])
NotifyPartitionCorrupted(int,[int]) NotifyPartitionCorrupted(int,[int])
NotifyReady() NotifyReady()
...@@ -101,3 +103,5 @@ StopOperation() ...@@ -101,3 +103,5 @@ StopOperation()
Truncate(p64) Truncate(p64)
TweakPartitionTable(bool,[int]) TweakPartitionTable(bool,[int])
ValidateTransaction(p64,p64) ValidateTransaction(p64,p64)
WaitForPack(p64)
WaitedForPack()
...@@ -19,8 +19,9 @@ from ..mock import Mock, ReturnValues ...@@ -19,8 +19,9 @@ from ..mock import Mock, ReturnValues
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.storage.app import Application from neo.storage.app import Application
from neo.storage.handlers.client import ClientOperationHandler from neo.storage.handlers.client import ClientOperationHandler
from neo.lib.util import p64 from neo.lib.exception import ProtocolError
from neo.lib.protocol import INVALID_TID, Packets from neo.lib.protocol import INVALID_TID, Packets
from neo.lib.util import p64
class StorageClientHandlerTests(NeoUnitTestBase): class StorageClientHandlerTests(NeoUnitTestBase):
...@@ -65,7 +66,8 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -65,7 +66,8 @@ class StorageClientHandlerTests(NeoUnitTestBase):
app.pt = Mock() app.pt = Mock()
self.fakeDM() self.fakeDM()
conn = self._getConnection() conn = self._getConnection()
self.checkProtocolErrorRaised(self.operation.askTIDs, conn, 1, 1, None) self.assertRaises(ProtocolError, self.operation.askTIDs,
conn, 1, 1, None)
self.assertEqual(len(app.pt.mockGetNamedCalls('getCellList')), 0) self.assertEqual(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEqual(len(app.dm.mockGetNamedCalls('getTIDList')), 0) self.assertEqual(len(app.dm.mockGetNamedCalls('getTIDList')), 0)
...@@ -84,8 +86,8 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -84,8 +86,8 @@ class StorageClientHandlerTests(NeoUnitTestBase):
# invalid offsets => error # invalid offsets => error
dm = self.fakeDM() dm = self.fakeDM()
conn = self._getConnection() conn = self._getConnection()
self.checkProtocolErrorRaised(self.operation.askObjectHistory, conn, self.assertRaises(ProtocolError, self.operation.askObjectHistory,
1, 1, None) conn, 1, 1, None)
self.assertEqual(len(dm.mockGetNamedCalls('getObjectHistory')), 0) self.assertEqual(len(dm.mockGetNamedCalls('getObjectHistory')), 0)
def test_askObjectUndoSerial(self): def test_askObjectUndoSerial(self):
......
...@@ -19,8 +19,9 @@ from ..mock import Mock ...@@ -19,8 +19,9 @@ from ..mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.storage.app import Application from neo.storage.app import Application
from neo.storage.handlers.master import MasterOperationHandler from neo.storage.handlers.master import MasterOperationHandler
from neo.lib.exception import ProtocolError
from neo.lib.protocol import CellStates
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
from neo.lib.protocol import CellStates, ProtocolError
class StorageMasterHandlerTests(NeoUnitTestBase): class StorageMasterHandlerTests(NeoUnitTestBase):
...@@ -91,7 +92,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase): ...@@ -91,7 +92,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
# dm call # dm call
calls = self.app.dm.mockGetNamedCalls('changePartitionTable') calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(ptid, 1, cells) calls[0].checkArgs(app, ptid, 1, cells)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -14,12 +14,15 @@ ...@@ -14,12 +14,15 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import string, unittest
from binascii import a2b_hex from binascii import a2b_hex
from contextlib import closing, contextmanager from contextlib import closing, contextmanager
import unittest from copy import copy
from neo.lib.util import add64, p64, u64 from neo.lib.util import add64, p64, u64, makeChecksum
from neo.lib.protocol import CellStates, ZERO_HASH, ZERO_OID, ZERO_TID, MAX_TID from neo.lib.protocol import CellStates, ZERO_HASH, ZERO_OID, ZERO_TID, MAX_TID
from neo.storage.database.manager import MVCCDatabaseManager
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from ..mock import Mock
class StorageDBTests(NeoUnitTestBase): class StorageDBTests(NeoUnitTestBase):
...@@ -49,7 +52,9 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -49,7 +52,9 @@ class StorageDBTests(NeoUnitTestBase):
uuid = self.getStorageUUID() uuid = self.getStorageUUID()
db.setUUID(uuid) db.setUUID(uuid)
self.assertEqual(uuid, db.getUUID()) self.assertEqual(uuid, db.getUUID())
db.changePartitionTable(1, 0, app = Mock()
app.last_pack_id = ZERO_TID
db.changePartitionTable(app, 1, 0,
[(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)], [(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)],
reset=True) reset=True)
self.assertEqual(num_partitions, 1 + db._getMaxPartition()) self.assertEqual(num_partitions, 1 + db._getMaxPartition())
...@@ -67,10 +72,10 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -67,10 +72,10 @@ class StorageDBTests(NeoUnitTestBase):
def commitTransaction(self, tid, objs, txn, commit=True): def commitTransaction(self, tid, objs, txn, commit=True):
ttid = txn[-1] ttid = txn[-1]
self.db.storeTransaction(ttid, objs, txn) self.db.storeTransaction(ttid, objs, txn)
self.db.lockTransaction(tid, ttid) self.db.lockTransaction(tid, ttid, None)
yield yield
if commit: if commit:
self.db.unlockTransaction(tid, ttid, True, objs) self.db.unlockTransaction(tid, ttid, True, objs, False)
self.db.commit() self.db.commit()
elif commit is not None: elif commit is not None:
self.db.abortTransaction(ttid) self.db.abortTransaction(ttid)
...@@ -96,7 +101,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -96,7 +101,7 @@ class StorageDBTests(NeoUnitTestBase):
self._last_ttid = ttid = add64(self._last_ttid, 1) self._last_ttid = ttid = add64(self._last_ttid, 1)
transaction = oid_list, 'user', 'desc', 'ext', False, ttid transaction = oid_list, 'user', 'desc', 'ext', False, ttid
H = "0" * 20 H = "0" * 20
object_list = [(oid, self.db.holdData(H, oid, '', 1), None) object_list = [(oid, self.db.holdData(H, oid, '', 1, None), None)
for oid in oid_list] for oid in oid_list]
return (transaction, object_list) return (transaction, object_list)
...@@ -189,25 +194,25 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -189,25 +194,25 @@ class StorageDBTests(NeoUnitTestBase):
with self.commitTransaction(tid1, objs1, txn1), \ with self.commitTransaction(tid1, objs1, txn1), \
self.commitTransaction(tid2, objs2, txn2): self.commitTransaction(tid2, objs2, txn2):
self.assertEqual(self.db.getTransaction(tid1, True), self.assertEqual(self.db.getTransaction(tid1, True),
([oid1], 'user', 'desc', 'ext', False, p64(1))) ([oid1], 'user', 'desc', 'ext', False, p64(1), None))
self.assertEqual(self.db.getTransaction(tid2, True), self.assertEqual(self.db.getTransaction(tid2, True),
([oid2], 'user', 'desc', 'ext', False, p64(2))) ([oid2], 'user', 'desc', 'ext', False, p64(2), None))
self.assertEqual(self.db.getTransaction(tid1, False), None) self.assertEqual(self.db.getTransaction(tid1, False), None)
self.assertEqual(self.db.getTransaction(tid2, False), None) self.assertEqual(self.db.getTransaction(tid2, False), None)
result = self.db.getTransaction(tid1, True) self.assertEqual(self.db.getTransaction(tid1, True),
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1))) ([oid1], 'user', 'desc', 'ext', False, p64(1), None))
result = self.db.getTransaction(tid2, True) self.assertEqual(self.db.getTransaction(tid2, True),
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2))) ([oid2], 'user', 'desc', 'ext', False, p64(2), None))
result = self.db.getTransaction(tid1, False) self.assertEqual(self.db.getTransaction(tid1, False),
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1))) ([oid1], 'user', 'desc', 'ext', False, p64(1), None))
result = self.db.getTransaction(tid2, False) self.assertEqual(self.db.getTransaction(tid2, False),
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2))) ([oid2], 'user', 'desc', 'ext', False, p64(2), None))
def test_deleteTransaction(self): def test_deleteTransaction(self):
txn, objs = self.getTransaction([]) txn, objs = self.getTransaction([])
tid = txn[-1] tid = txn[-1]
self.db.storeTransaction(tid, objs, txn, False) self.db.storeTransaction(tid, objs, txn, False)
self.assertEqual(self.db.getTransaction(tid), txn) self.assertEqual(self.db.getTransaction(tid), txn + (None,))
self.db.deleteTransaction(tid) self.db.deleteTransaction(tid)
self.assertEqual(self.db.getTransaction(tid), None) self.assertEqual(self.db.getTransaction(tid), None)
...@@ -265,13 +270,13 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -265,13 +270,13 @@ class StorageDBTests(NeoUnitTestBase):
with self.commitTransaction(tid1, objs1, txn1), \ with self.commitTransaction(tid1, objs1, txn1), \
self.commitTransaction(tid2, objs2, txn2, None): self.commitTransaction(tid2, objs2, txn2, None):
pass pass
result = self.db.getTransaction(tid1, True) self.assertEqual(self.db.getTransaction(tid1, True),
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1))) ([oid1], 'user', 'desc', 'ext', False, p64(1), None))
result = self.db.getTransaction(tid2, True) self.assertEqual(self.db.getTransaction(tid2, True),
self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False, p64(2))) ([oid2], 'user', 'desc', 'ext', False, p64(2), None))
# get from non-temporary only # get from non-temporary only
result = self.db.getTransaction(tid1, False) self.assertEqual(self.db.getTransaction(tid1, False),
self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False, p64(1))) ([oid1], 'user', 'desc', 'ext', False, p64(1), None))
self.assertEqual(self.db.getTransaction(tid2, False), None) self.assertEqual(self.db.getTransaction(tid2, False), None)
def test_getObjectHistory(self): def test_getObjectHistory(self):
...@@ -282,17 +287,17 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -282,17 +287,17 @@ class StorageDBTests(NeoUnitTestBase):
txn3, objs3 = self.getTransaction([oid]) txn3, objs3 = self.getTransaction([oid])
# one revision # one revision
self.db.storeTransaction(tid1, objs1, txn1, False) self.db.storeTransaction(tid1, objs1, txn1, False)
result = self.db.getObjectHistory(oid, 0, 3) result = self.db.getObjectHistoryWithLength(oid, 0, 3)
self.assertEqual(result, [(tid1, 0)]) self.assertEqual(result, [(tid1, 0)])
result = self.db.getObjectHistory(oid, 1, 1) result = self.db.getObjectHistoryWithLength(oid, 1, 1)
self.assertEqual(result, None) self.assertEqual(result, None)
# two revisions # two revisions
self.db.storeTransaction(tid2, objs2, txn2, False) self.db.storeTransaction(tid2, objs2, txn2, False)
result = self.db.getObjectHistory(oid, 0, 3) result = self.db.getObjectHistoryWithLength(oid, 0, 3)
self.assertEqual(result, [(tid2, 0), (tid1, 0)]) self.assertEqual(result, [(tid2, 0), (tid1, 0)])
result = self.db.getObjectHistory(oid, 1, 3) result = self.db.getObjectHistoryWithLength(oid, 1, 3)
self.assertEqual(result, [(tid1, 0)]) self.assertEqual(result, [(tid1, 0)])
result = self.db.getObjectHistory(oid, 2, 3) result = self.db.getObjectHistoryWithLength(oid, 2, 3)
self.assertEqual(result, None) self.assertEqual(result, None)
def _storeTransactions(self, count): def _storeTransactions(self, count):
...@@ -383,8 +388,8 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -383,8 +388,8 @@ class StorageDBTests(NeoUnitTestBase):
tid4 = self.getNextTID() tid4 = self.getNextTID()
tid5 = self.getNextTID() tid5 = self.getNextTID()
oid1 = p64(1) oid1 = p64(1)
foo = db.holdData("3" * 20, oid1, 'foo', 0) foo = db.holdData("3" * 20, oid1, 'foo', 0, None)
bar = db.holdData("4" * 20, oid1, 'bar', 0) bar = db.holdData("4" * 20, oid1, 'bar', 0, None)
db.releaseData((foo, bar)) db.releaseData((foo, bar))
db.storeTransaction( db.storeTransaction(
tid1, ( tid1, (
...@@ -439,5 +444,31 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -439,5 +444,31 @@ class StorageDBTests(NeoUnitTestBase):
db.findUndoTID(oid1, tid4, tid1, None), db.findUndoTID(oid1, tid4, tid1, None),
(tid3, None, True)) (tid3, None, True))
def testDeferredPruning(self):
self.setupDB(1, True)
db = self.db
if isinstance(db, MVCCDatabaseManager):
self.assertFalse(db.nonempty('todel'))
self.assertEqual([
db.storeData(makeChecksum(x), ZERO_OID, x, 0, None)
for x in string.digits
], range(0, 10))
db2 = copy(db)
for x in (3, 9, 4), (4, 7, 6):
self.assertIsNone(db2._pruneData(x))
db.commit()
db2.commit()
for expected in (3, 4, 6), (7, 9):
self.assertTrue(db.nonempty('todel'))
x = db._dataIdsToPrune(3)
self.assertEqual(tuple(x), expected)
self.assertEqual(db._pruneData(x), len(expected))
self.assertFalse(db._dataIdsToPrune(3))
self.assertFalse(db2.nonempty('todel'))
self.assertEqual(db._pruneData(range(10)), 5)
self.assertFalse(db.nonempty('todel'))
else:
self.assertIsNone(db.nonempty('todel'))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
This diff is collapsed.
This diff is collapsed.
...@@ -200,7 +200,7 @@ class StressApplication(AdminApplication): ...@@ -200,7 +200,7 @@ class StressApplication(AdminApplication):
if conn: if conn:
conn.ask(Packets.AskLastIDs()) conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, loid, ltid): def answerLastIDs(self, ltid, loid):
self.loid = loid self.loid = loid
self.ltid = ltid self.ltid = ltid
self.em.setTimeout(int(time.time() + 1), self.askLastIDs) self.em.setTimeout(int(time.time() + 1), self.askLastIDs)
......
...@@ -17,9 +17,9 @@ ...@@ -17,9 +17,9 @@
import unittest import unittest
from .mock import Mock from .mock import Mock
from . import NeoUnitTestBase from . import NeoUnitTestBase
from neo.lib.handler import EventHandler from neo.lib.exception import PacketMalformedError, UnexpectedPacketError, \
from neo.lib.protocol import PacketMalformedError, UnexpectedPacketError, \
NotReadyError, ProtocolError NotReadyError, ProtocolError
from neo.lib.handler import EventHandler
class HandlerTests(NeoUnitTestBase): class HandlerTests(NeoUnitTestBase):
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment