Commit d61a34c0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parents f5cf9484 42fd89bc
...@@ -6,5 +6,5 @@ ...@@ -6,5 +6,5 @@
/build/ /build/
/dist/ /dist/
/htmlcov/ /htmlcov/
/mock.py /neo/tests/mock.py
/neoppod.egg-info/ /neoppod.egg-info/
Change History Change History
============== ==============
1.7.1 (2017-01-18)
------------------
- Replication:
- Fixed possibly wrong knowledge of cells' backup_tid when resuming backup.
In such case, 'neoctl print ids' gave false impression that the backup
cluster was up-to-date. This also resulted in an inconsistent database
when leaving backup mode before that the issue resolved by itself.
- Storage nodes now select the partition which is furthest behind. Previous
criterion was such that in case of high upstream activity, the backup could
even be stuck looping on a subset of partitions.
- Fixed replication of unfinished imported transactions.
- Fixed abort before vote, to free the storage space used by the transaction.
A new 'prune_orphan' neoctl command was added to delete unreferenced raw data
in the database.
- Removed short storage option -R to reset the db.
Help is reworded to clarify that --reset exits once done.
- The application receiving buffer size has been increased.
This speeds up transfer of big packets.
- The master raised AttributeError at exit during recovery.
- At startup, the importer storage backend connected twice to the destination
database.
1.7.0 (2016-12-19) 1.7.0 (2016-12-19)
------------------ ------------------
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -71,6 +71,7 @@ class AdminEventHandler(EventHandler): ...@@ -71,6 +71,7 @@ class AdminEventHandler(EventHandler):
setNodeState = forward_ask(Packets.SetNodeState) setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas) checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate) truncate = forward_ask(Packets.Truncate)
repair = forward_ask(Packets.Repair)
class MasterEventHandler(EventHandler): class MasterEventHandler(EventHandler):
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -191,11 +191,6 @@ class Storage(BaseStorage.BaseStorage, ...@@ -191,11 +191,6 @@ class Storage(BaseStorage.BaseStorage,
# seems used only by FileStorage # seems used only by FileStorage
raise NotImplementedError raise NotImplementedError
def cleanup(self):
# Used in unit tests to remove local database files.
# We have no such thing, so make this method a no-op.
pass
def close(self): def close(self):
# WARNING: This does not handle the case where an app is shared by # WARNING: This does not handle the case where an app is shared by
# several Storage instances, but this is something that only # several Storage instances, but this is something that only
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -544,6 +544,8 @@ class Application(ThreadedApplication): ...@@ -544,6 +544,8 @@ class Application(ThreadedApplication):
# A later serial has already been resolved, skip. # A later serial has already been resolved, skip.
resolved_serial_set.update(conflict_serial_set) resolved_serial_set.update(conflict_serial_set)
continue continue
if self.last_tid < conflict_serial:
self.sync() # possible late invalidation (very rare)
try: try:
new_data = tryToResolveConflict(oid, conflict_serial, new_data = tryToResolveConflict(oid, conflict_serial,
serial, data) serial, data)
......
# #
# Copyright (C) 2011-2016 Nexedi SA # Copyright (C) 2011-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2015-2016 Nexedi SA # Copyright (C) 2015-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -145,7 +145,7 @@ class SocketConnector(object): ...@@ -145,7 +145,7 @@ class SocketConnector(object):
def receive(self, read_buf): def receive(self, read_buf):
try: try:
data = self.socket.recv(4096) data = self.socket.recv(65536)
except socket.error, e: except socket.error, e:
self._error('recv', e) self._error('recv', e)
if data: if data:
...@@ -155,6 +155,7 @@ class SocketConnector(object): ...@@ -155,6 +155,7 @@ class SocketConnector(object):
raise ConnectorException raise ConnectorException
def send(self): def send(self):
# XXX: unefficient for big packets
msg = ''.join(self.queued) msg = ''.join(self.queued)
if msg: if msg:
try: try:
......
# #
# Copyright (C) 2010-2016 Nexedi SA # Copyright (C) 2010-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, thread import os
from time import time from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EAGAIN, EEXIST, EINTR, ENOENT from errno import EAGAIN, EEXIST, EINTR, ENOENT
...@@ -35,7 +35,6 @@ class EpollEventManager(object): ...@@ -35,7 +35,6 @@ class EpollEventManager(object):
"""This class manages connections and events based on epoll(5).""" """This class manages connections and events based on epoll(5)."""
_timeout = None _timeout = None
_trigger_exit = False
def __init__(self): def __init__(self):
self.connection_dict = {} self.connection_dict = {}
...@@ -43,6 +42,7 @@ class EpollEventManager(object): ...@@ -43,6 +42,7 @@ class EpollEventManager(object):
self.writer_set = set() self.writer_set = set()
self.epoll = epoll() self.epoll = epoll()
self._pending_processing = [] self._pending_processing = []
self._trigger_list = []
self._trigger_fd, w = os.pipe() self._trigger_fd, w = os.pipe()
os.close(w) os.close(w)
self._trigger_lock = Lock() self._trigger_lock = Lock()
...@@ -231,9 +231,12 @@ class EpollEventManager(object): ...@@ -231,9 +231,12 @@ class EpollEventManager(object):
if fd == self._trigger_fd: if fd == self._trigger_fd:
with self._trigger_lock: with self._trigger_lock:
self.epoll.unregister(fd) self.epoll.unregister(fd)
if self._trigger_exit: action_list = self._trigger_list
del self._trigger_exit try:
thread.exit() while action_list:
action_list.pop(0)()
finally:
del action_list[:]
continue continue
if conn.readable(): if conn.readable():
self._addPendingConnection(conn) self._addPendingConnection(conn)
...@@ -253,9 +256,9 @@ class EpollEventManager(object): ...@@ -253,9 +256,9 @@ class EpollEventManager(object):
def setTimeout(self, *args): def setTimeout(self, *args):
self._timeout, self._on_timeout = args self._timeout, self._on_timeout = args
def wakeup(self, exit=False): def wakeup(self, *actions):
with self._trigger_lock: with self._trigger_lock:
self._trigger_exit |= exit self._trigger_list += actions
try: try:
self.epoll.register(self._trigger_fd) self.epoll.register(self._trigger_fd)
except IOError, e: except IOError, e:
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2015-2016 Nexedi SA # Copyright (C) 2015-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2015-2016 Nexedi SA # Copyright (C) 2015-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -20,7 +20,7 @@ import traceback ...@@ -20,7 +20,7 @@ import traceback
from cStringIO import StringIO from cStringIO import StringIO
from struct import Struct from struct import Struct
PROTOCOL_VERSION = 8 PROTOCOL_VERSION = 9
# Size restrictions. # Size restrictions.
MIN_PACKET_SIZE = 10 MIN_PACKET_SIZE = 10
...@@ -237,14 +237,10 @@ class Packet(object): ...@@ -237,14 +237,10 @@ class Packet(object):
_id = None _id = None
poll_thread = False poll_thread = False
def __init__(self, *args, **kw): def __init__(self, *args):
assert self._code is not None, "Packet class not registered" assert self._code is not None, "Packet class not registered"
if args or kw: if args:
args = list(args)
buf = StringIO() buf = StringIO()
# load named arguments
for item in self._fmt._items[len(args):]:
args.append(kw.get(item._name))
self._fmt.encode(buf.write, args) self._fmt.encode(buf.write, args)
self._body = buf.getvalue() self._body = buf.getvalue()
else: else:
...@@ -1176,6 +1172,25 @@ class SetClusterState(Packet): ...@@ -1176,6 +1172,25 @@ class SetClusterState(Packet):
_answer = Error _answer = Error
class Repair(Packet):
"""
Ask storage nodes to repair their databases. ctl -> A -> M
"""
_flags = map(PBoolean, ('dry_run',
# 'prune_orphan' (commented because it's the only option for the moment)
))
_fmt = PStruct('repair',
PFUUIDList,
*_flags)
_answer = Error
class RepairOne(Packet):
"""
See Repair. M -> S
"""
_fmt = PStruct('repair', *Repair._flags)
class ClusterInformation(Packet): class ClusterInformation(Packet):
""" """
Notify information about the cluster Notify information about the cluster
...@@ -1685,6 +1700,10 @@ class Packets(dict): ...@@ -1685,6 +1700,10 @@ class Packets(dict):
TweakPartitionTable, ignore_when_closed=False) TweakPartitionTable, ignore_when_closed=False)
SetClusterState = register( SetClusterState = register(
SetClusterState, ignore_when_closed=False) SetClusterState, ignore_when_closed=False)
Repair = register(
Repair)
NotifyRepair = register(
RepairOne)
NotifyClusterInformation = register( NotifyClusterInformation = register(
ClusterInformation) ClusterInformation)
AskClusterState, AnswerClusterState = register( AskClusterState, AnswerClusterState = register(
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -258,30 +258,34 @@ class PartitionTable(object): ...@@ -258,30 +258,34 @@ class PartitionTable(object):
partition on the line (here, line length is 11 to keep the docstring partition on the line (here, line length is 11 to keep the docstring
width under 80 column). width under 80 column).
""" """
node_list = sorted(self.count_dict)
result = ['pt: node %u: %s, %s' % (i, uuid_str(node.getUUID()), result = ['pt: node %u: %s, %s' % (i, uuid_str(node.getUUID()),
protocol.node_state_prefix_dict[node.getState()]) protocol.node_state_prefix_dict[node.getState()])
for i, node in enumerate(node_list)] for i, node in enumerate(sorted(self.count_dict))]
append = result.append append = result.append
line = [] line = []
max_line_len = 20 # XXX: hardcoded number of partitions per line max_line_len = 20 # XXX: hardcoded number of partitions per line
cell_state_dict = protocol.cell_state_prefix_dict
prefix = 0 prefix = 0
prefix_len = int(math.ceil(math.log10(self.np))) prefix_len = int(math.ceil(math.log10(self.np)))
for offset, row in enumerate(self.partition_list): for offset, row in enumerate(self.formatRows()):
if len(line) == max_line_len: if len(line) == max_line_len:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line))) append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
line = [] line = []
prefix = offset prefix = offset
line.append(row)
if line:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
return result
def formatRows(self):
node_list = sorted(self.count_dict)
cell_state_dict = protocol.cell_state_prefix_dict
for row in self.partition_list:
if row is None: if row is None:
line.append('X' * len(node_list)) yield 'X' * len(node_list)
else: else:
cell_dict = {x.getNode(): cell_state_dict[x.getState()] cell_dict = {x.getNode(): cell_state_dict[x.getState()]
for x in row} for x in row}
line.append(''.join(cell_dict.get(x, '.') for x in node_list)) yield ''.join(cell_dict.get(x, '.') for x in node_list)
if line:
append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
return result
def operational(self): def operational(self):
if not self.filled(): if not self.filled():
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading, weakref import thread, threading, weakref
from . import logging from . import logging
from .app import BaseApplication from .app import BaseApplication
from .connection import ConnectionClosed from .connection import ConnectionClosed
...@@ -69,7 +69,7 @@ class ThreadedApplication(BaseApplication): ...@@ -69,7 +69,7 @@ class ThreadedApplication(BaseApplication):
conn.close() conn.close()
# Stop polling thread # Stop polling thread
logging.debug('Stopping %s', self.poll_thread) logging.debug('Stopping %s', self.poll_thread)
self.em.wakeup(True) self.em.wakeup(thread.exit)
else: else:
super(ThreadedApplication, self).close() super(ThreadedApplication, self).close()
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
# Copyright (C) 2012-2016 Nexedi SA # Copyright (C) 2012-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -64,6 +64,9 @@ class AdministrationHandler(MasterHandler): ...@@ -64,6 +64,9 @@ class AdministrationHandler(MasterHandler):
for node in storage_list: for node in storage_list:
assert node.isPending(), node assert node.isPending(), node
if node.getConnection().isPending(): if node.getConnection().isPending():
# XXX: It's wrong to use ProtocolError here. We must reply
# less aggressively because the admin has no way to
# know that there's still pending activity.
raise ProtocolError('Cannot exit recovery now: node %r is ' raise ProtocolError('Cannot exit recovery now: node %r is '
'entering cluster' % (node, )) 'entering cluster' % (node, ))
app._startup_allowed = True app._startup_allowed = True
...@@ -147,6 +150,19 @@ class AdministrationHandler(MasterHandler): ...@@ -147,6 +150,19 @@ class AdministrationHandler(MasterHandler):
logging.warning('No node added') logging.warning('No node added')
conn.answer(Errors.Ack('No node added')) conn.answer(Errors.Ack('No node added'))
def repair(self, conn, uuid_list, *args):
getByUUID = self.app.nm.getByUUID
node_list = []
for uuid in uuid_list:
node = getByUUID(uuid)
if node is None or not (node.isStorage() and node.isIdentified()):
raise ProtocolError("invalid storage node %s" % uuid_str(uuid))
node_list.append(node)
repair = Packets.NotifyRepair(*args)
for node in node_list:
node.notify(repair)
conn.answer(Errors.Ack(''))
def tweakPartitionTable(self, conn, uuid_list): def tweakPartitionTable(self, conn, uuid_list):
app = self.app app = self.app
state = app.getClusterState() state = app.getClusterState()
......
# #
# Copyright (C) 2012-2016 Nexedi SA # Copyright (C) 2012-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -53,7 +53,10 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -53,7 +53,10 @@ class StorageServiceHandler(BaseServiceHandler):
last_tid = app.pt.getBackupTid(min) last_tid = app.pt.getBackupTid(min)
pending_list = () pending_list = ()
else: else:
last_tid = app.tm.getLastTID() # This can't be app.tm.getLastTID() for imported transactions,
# because outdated cells must at least wait that they're locked
# at source side. For normal transactions, it would not matter.
last_tid = app.getLastTransaction()
pending_list = app.tm.registerForNotification(conn.getUUID()) pending_list = app.tm.registerForNotification(conn.getUUID())
p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list) p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list)
conn.answer(p) conn.answer(p)
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -137,7 +137,7 @@ class RecoveryManager(MasterHandler): ...@@ -137,7 +137,7 @@ class RecoveryManager(MasterHandler):
logging.warning("Waiting for %r to come back." logging.warning("Waiting for %r to come back."
" No other node has version %s of the partition table.", " No other node has version %s of the partition table.",
node, self.target_ptid) node, self.target_ptid)
if node.getState() == new_state: if node is None or node.getState() == new_state:
return return
node.setState(new_state) node.setState(new_state)
# broadcast to all so that admin nodes gets informed # broadcast to all so that admin nodes gets informed
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -36,6 +36,7 @@ action_dict = { ...@@ -36,6 +36,7 @@ action_dict = {
'tweak': 'tweakPartitionTable', 'tweak': 'tweakPartitionTable',
'drop': 'dropNode', 'drop': 'dropNode',
'kill': 'killNode', 'kill': 'killNode',
'prune_orphan': 'pruneOrphan',
'truncate': 'truncate', 'truncate': 'truncate',
} }
...@@ -146,20 +147,20 @@ class TerminalNeoCTL(object): ...@@ -146,20 +147,20 @@ class TerminalNeoCTL(object):
assert len(params) == 0 assert len(params) == 0
return self.neoctl.startCluster() return self.neoctl.startCluster()
def _getStorageList(self, params):
if len(params) == 1 and params[0] == 'all':
node_list = self.neoctl.getNodeList(NodeTypes.STORAGE)
return [node[2] for node in node_list]
return map(self.asNode, params)
def enableStorageList(self, params): def enableStorageList(self, params):
""" """
Enable cluster to make use of pending storages. Enable cluster to make use of pending storages.
Parameters: all Parameters: node [node [...]]
node [node [...]] node: if "all", add all pending storage nodes,
node: if "all", add all pending storage nodes.
otherwise, the list of storage nodes to enable. otherwise, the list of storage nodes to enable.
""" """
if len(params) == 1 and params[0] == 'all': return self.neoctl.enableStorageList(self._getStorageList(params))
node_list = self.neoctl.getNodeList(NodeTypes.STORAGE)
uuid_list = [node[2] for node in node_list]
else:
uuid_list = map(self.asNode, params)
return self.neoctl.enableStorageList(uuid_list)
def tweakPartitionTable(self, params): def tweakPartitionTable(self, params):
""" """
...@@ -189,6 +190,20 @@ class TerminalNeoCTL(object): ...@@ -189,6 +190,20 @@ class TerminalNeoCTL(object):
""" """
return uuid_str(self.neoctl.getPrimary()) return uuid_str(self.neoctl.getPrimary())
def pruneOrphan(self, params):
"""
Fix database by deleting unreferenced raw data
This can take a long time.
Parameters: dry_run node [node [...]]
dry_run: 0 or 1
node: if "all", ask all connected storage nodes to repair,
otherwise, only the given list of storage nodes.
"""
dry_run = "01".index(params.pop(0))
return self.neoctl.repair(self._getStorageList(params), dry_run)
def truncate(self, params): def truncate(self, params):
""" """
Truncate the database at the given tid. Truncate the database at the given tid.
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -172,6 +172,12 @@ class NeoCTL(BaseApplication): ...@@ -172,6 +172,12 @@ class NeoCTL(BaseApplication):
raise RuntimeError(response) raise RuntimeError(response)
return response[1] return response[1]
def repair(self, *args):
response = self.__ask(Packets.Repair(*args))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def truncate(self, tid): def truncate(self, tid):
response = self.__ask(Packets.Truncate(tid)) response = self.__ask(Packets.Truncate(tid))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK: if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# #
# neoadmin - run an administrator node of NEO # neoadmin - run an administrator node of NEO
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# #
# neoadmin - run an administrator node of NEO # neoadmin - run an administrator node of NEO
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# #
# neolog - read a NEO log # neolog - read a NEO log
# #
# Copyright (C) 2012-2016 Nexedi SA # Copyright (C) 2012-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# #
# neomaster - run a master node of NEO # neomaster - run a master node of NEO
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# #
# neomaster - run a master node of NEO # neomaster - run a master node of NEO
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# #
# neostorage - run a storage node of NEO # neostorage - run a storage node of NEO
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -24,14 +24,14 @@ from neo.lib.config import getServerOptionParser, ConfigurationManager ...@@ -24,14 +24,14 @@ from neo.lib.config import getServerOptionParser, ConfigurationManager
parser = getServerOptionParser() parser = getServerOptionParser()
parser.add_option('-u', '--uuid', help='specify an UUID to use for this ' \ parser.add_option('-u', '--uuid', help='specify an UUID to use for this ' \
'process. Previously assigned UUID takes precedence (ie ' \ 'process. Previously assigned UUID takes precedence (ie ' \
'you should always use -R with this switch)') 'you should always use --reset with this switch)')
parser.add_option('-R', '--reset', action = 'store_true',
help = 'remove an existing database if any')
parser.add_option('-a', '--adapter', help = 'database adapter to use') parser.add_option('-a', '--adapter', help = 'database adapter to use')
parser.add_option('-d', '--database', help = 'database connections string') parser.add_option('-d', '--database', help = 'database connections string')
parser.add_option('-e', '--engine', help = 'database engine') parser.add_option('-e', '--engine', help = 'database engine')
parser.add_option('-w', '--wait', help='seconds to wait for backend to be ' parser.add_option('-w', '--wait', help='seconds to wait for backend to be '
'available, before erroring-out (-1 = infinite)', type='float', default=0) 'available, before erroring-out (-1 = infinite)', type='float', default=0)
parser.add_option('--reset', action='store_true',
help='remove an existing database if any, and exit')
defaults = dict( defaults = dict(
bind = '127.0.0.1', bind = '127.0.0.1',
......
#!/usr/bin/env python #!/usr/bin/env python
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -23,6 +23,7 @@ import os ...@@ -23,6 +23,7 @@ import os
import re import re
from collections import Counter, defaultdict from collections import Counter, defaultdict
from cStringIO import StringIO from cStringIO import StringIO
from fnmatch import fnmatchcase
from unittest.runner import _WritelnDecorator from unittest.runner import _WritelnDecorator
if filter(re.compile(r'--coverage$|-\w*c').match, sys.argv[1:]): if filter(re.compile(r'--coverage$|-\w*c').match, sys.argv[1:]):
...@@ -32,7 +33,8 @@ if filter(re.compile(r'--coverage$|-\w*c').match, sys.argv[1:]): ...@@ -32,7 +33,8 @@ if filter(re.compile(r'--coverage$|-\w*c').match, sys.argv[1:]):
coverage.neotestrunner = [] coverage.neotestrunner = []
coverage.start() coverage.start()
from neo.tests import getTempDirectory, __dict__ as neo_tests__dict__ from neo.tests import getTempDirectory, NeoTestBase, Patch, \
__dict__ as neo_tests__dict__
from neo.tests.benchmark import BenchmarkRunner from neo.tests.benchmark import BenchmarkRunner
# list of test modules # list of test modules
...@@ -64,7 +66,6 @@ UNIT_TEST_MODULES = [ ...@@ -64,7 +66,6 @@ UNIT_TEST_MODULES = [
# client application # client application
'neo.tests.client.testClientApp', 'neo.tests.client.testClientApp',
'neo.tests.client.testMasterHandler', 'neo.tests.client.testMasterHandler',
'neo.tests.client.testStorageHandler',
'neo.tests.client.testConnectionPool', 'neo.tests.client.testConnectionPool',
# light functional tests # light functional tests
'neo.tests.threaded.test', 'neo.tests.threaded.test',
...@@ -114,17 +115,33 @@ class NeoTestRunner(unittest.TextTestResult): ...@@ -114,17 +115,33 @@ class NeoTestRunner(unittest.TextTestResult):
def wasSuccessful(self): def wasSuccessful(self):
return not (self.failures or self.errors or self.unexpectedSuccesses) return not (self.failures or self.errors or self.unexpectedSuccesses)
def run(self, name, modules): def run(self, name, modules, only):
print '\n', name
suite = unittest.TestSuite() suite = unittest.TestSuite()
loader = unittest.defaultTestLoader loader = unittest.TestLoader()
if only:
exclude = only[0] == '!'
test_only = only[exclude + 1:]
only = only[exclude]
if test_only:
def getTestCaseNames(testCaseClass):
tests = loader.__class__.getTestCaseNames(
loader, testCaseClass)
x = testCaseClass.__name__ + '.'
return [t for t in tests
if exclude != any(fnmatchcase(x + t, o)
for o in test_only)]
loader.getTestCaseNames = getTestCaseNames
if not only:
only = '*'
else:
print '\n', name
for test_module in modules: for test_module in modules:
# load prefix if supplied # load prefix if supplied
if isinstance(test_module, tuple): if isinstance(test_module, tuple):
test_module, prefix = test_module test_module, loader.testMethodPrefix = test_module
loader.testMethodPrefix = prefix if only and not (exclude and test_only or
else: exclude != fnmatchcase(test_module, only)):
loader.testMethodPrefix = 'test' continue
try: try:
test_module = __import__(test_module, globals(), locals(), ['*']) test_module = __import__(test_module, globals(), locals(), ['*'])
except ImportError, err: except ImportError, err:
...@@ -135,7 +152,11 @@ class NeoTestRunner(unittest.TextTestResult): ...@@ -135,7 +152,11 @@ class NeoTestRunner(unittest.TextTestResult):
# NOTE it is also possible to run individual tests via `python -m unittest ...` # NOTE it is also possible to run individual tests via `python -m unittest ...`
if 1 or test_module.__name__ == 'neo.tests.functional.testStorage': if 1 or test_module.__name__ == 'neo.tests.functional.testStorage':
suite.addTests(loader.loadTestsFromModule(test_module)) suite.addTests(loader.loadTestsFromModule(test_module))
suite.run(self) try:
suite.run(self)
finally:
# Workaround weird behaviour of Python.
self._previousTestClass = None
def startTest(self, test): def startTest(self, test):
super(NeoTestRunner, self).startTest(test) super(NeoTestRunner, self).startTest(test)
...@@ -203,7 +224,8 @@ class NeoTestRunner(unittest.TextTestResult): ...@@ -203,7 +224,8 @@ class NeoTestRunner(unittest.TextTestResult):
for test in self.unexpectedSuccesses: for test in self.unexpectedSuccesses:
body.write("UNEXPECTED SUCCESS: %s\n" % self.getDescription(test)) body.write("UNEXPECTED SUCCESS: %s\n" % self.getDescription(test))
self.stream = _WritelnDecorator(body) self.stream = _WritelnDecorator(body)
self.printErrors() self.printErrorList('ERROR', self.errors)
self.printErrorList('FAIL', self.failures)
return subject, body.getvalue() return subject, body.getvalue()
class TestRunner(BenchmarkRunner): class TestRunner(BenchmarkRunner):
...@@ -211,6 +233,11 @@ class TestRunner(BenchmarkRunner): ...@@ -211,6 +233,11 @@ class TestRunner(BenchmarkRunner):
def add_options(self, parser): def add_options(self, parser):
parser.add_option('-c', '--coverage', action='store_true', parser.add_option('-c', '--coverage', action='store_true',
help='Enable coverage') help='Enable coverage')
parser.add_option('-C', '--cov-unit', action='store_true',
help='Same as -c but output 1 file per test,'
' in the temporary test directory')
parser.add_option('-l', '--loop', type='int', default=1,
help='Repeat tests several times')
parser.add_option('-f', '--functional', action='store_true', parser.add_option('-f', '--functional', action='store_true',
help='Functional tests') help='Functional tests')
parser.add_option('-u', '--unit', action='store_true', parser.add_option('-u', '--unit', action='store_true',
...@@ -219,7 +246,12 @@ class TestRunner(BenchmarkRunner): ...@@ -219,7 +246,12 @@ class TestRunner(BenchmarkRunner):
help='ZODB test suite running on a NEO') help='ZODB test suite running on a NEO')
parser.add_option('-v', '--verbose', action='store_true', parser.add_option('-v', '--verbose', action='store_true',
help='Verbose output') help='Verbose output')
parser.usage += " [[!] module [test...]]"
parser.format_epilog = lambda _: """ parser.format_epilog = lambda _: """
Positional:
Filter by given module/test. These arguments are shell patterns.
This implies -ufz if none of this option is passed.
Environment Variables: Environment Variables:
NEO_TESTS_ADAPTER Default is SQLite for threaded clusters, NEO_TESTS_ADAPTER Default is SQLite for threaded clusters,
MySQL otherwise. MySQL otherwise.
...@@ -241,27 +273,51 @@ Environment Variables: ...@@ -241,27 +273,51 @@ Environment Variables:
""" % neo_tests__dict__ """ % neo_tests__dict__
def load_options(self, options, args): def load_options(self, options, args):
if not (options.unit or options.functional or options.zodb or args): if options.coverage and options.cov_unit:
sys.exit('Nothing to run, please give one of -f, -u, -z') sys.exit('-c conflicts with -C')
if not (options.unit or options.functional or options.zodb):
if not args:
sys.exit('Nothing to run, please give one of -f, -u, -z')
options.unit = options.functional = options.zodb = True
return dict( return dict(
loop = options.loop,
unit = options.unit, unit = options.unit,
functional = options.functional, functional = options.functional,
zodb = options.zodb, zodb = options.zodb,
verbosity = 2 if options.verbose else 1, verbosity = 2 if options.verbose else 1,
coverage = options.coverage, coverage = options.coverage,
cov_unit = options.cov_unit,
only = args,
) )
def start(self): def start(self):
config = self._config config = self._config
only = config.only
# run requested tests # run requested tests
runner = NeoTestRunner(config.title or 'Neo', config.verbosity) runner = NeoTestRunner(config.title or 'Neo', config.verbosity)
if config.cov_unit:
from coverage import Coverage
cov_dir = runner.temp_directory + '/coverage'
os.mkdir(cov_dir)
@Patch(NeoTestBase)
def setUp(orig, self):
orig(self)
self.__coverage = Coverage('%s/%s' % (cov_dir, self.id()))
self.__coverage.start()
@Patch(NeoTestBase)
def _tearDown(orig, self, success):
self.__coverage.stop()
self.__coverage.save()
del self.__coverage
orig(self, success)
try: try:
if config.unit: for _ in xrange(config.loop):
runner.run('Unit tests', UNIT_TEST_MODULES) if config.unit:
if config.functional: runner.run('Unit tests', UNIT_TEST_MODULES, only)
runner.run('Functional tests', FUNC_TEST_MODULES) if config.functional:
if config.zodb: runner.run('Functional tests', FUNC_TEST_MODULES, only)
runner.run('ZODB tests', ZODB_TEST_MODULES) if config.zodb:
runner.run('ZODB tests', ZODB_TEST_MODULES, only)
except KeyboardInterrupt: except KeyboardInterrupt:
config['mail_to'] = None config['mail_to'] = None
traceback.print_exc() traceback.print_exc()
...@@ -270,7 +326,13 @@ Environment Variables: ...@@ -270,7 +326,13 @@ Environment Variables:
if coverage.neotestrunner: if coverage.neotestrunner:
coverage.combine(coverage.neotestrunner) coverage.combine(coverage.neotestrunner)
coverage.save() coverage.save()
if runner.dots:
print
# build report # build report
if only and not config.mail_to:
runner._buildSummary = lambda *args: (
runner.__class__._buildSummary(runner, *args)[0], '')
self.build_report = str
self._successful = runner.wasSuccessful() self._successful = runner.wasSuccessful()
return runner.buildReport(self.add_status) return runner.buildReport(self.add_status)
......
#!/usr/bin/env python #!/usr/bin/env python
# #
# Copyright (C) 2011-2016 Nexedi SA # Copyright (C) 2011-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2012-2016 Nexedi SA # Copyright (C) 2012-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2014-2016 Nexedi SA # Copyright (C) 2014-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -281,7 +281,6 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -281,7 +281,6 @@ class ImporterDatabaseManager(DatabaseManager):
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw) super(ImporterDatabaseManager, self).__init__(*args, **kw)
self.db._connect()
implements(self, """_getNextTID checkSerialRange checkTIDRange implements(self, """_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction dropPartitions getLastTID deleteObject deleteTransaction dropPartitions getLastTID
getReplicationObjectList getTIDList nonempty""".split()) getReplicationObjectList getTIDList nonempty""".split())
...@@ -305,10 +304,13 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -305,10 +304,13 @@ class ImporterDatabaseManager(DatabaseManager):
getPartitionTable changePartitionTable getPartitionTable changePartitionTable
getUnfinishedTIDDict dropUnfinishedData abortTransaction getUnfinishedTIDDict dropUnfinishedData abortTransaction
storeTransaction lockTransaction unlockTransaction storeTransaction lockTransaction unlockTransaction
storeData _pruneData deferCommit storeData getOrphanList _pruneData deferCommit
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
def _connect(self):
pass
def commit(self): def commit(self):
self.db.commit() self.db.commit()
self._last_commit = time.time() self._last_commit = time.time()
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -14,7 +14,9 @@ ...@@ -14,7 +14,9 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
from collections import defaultdict from collections import defaultdict
from contextlib import contextmanager
from functools import wraps from functools import wraps
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure from neo.lib.exception import DatabaseFailure
...@@ -54,6 +56,9 @@ class DatabaseManager(object): ...@@ -54,6 +56,9 @@ class DatabaseManager(object):
ENGINES = () ENGINES = ()
_deferred = 0
_duplicating = _repairing = None
def __init__(self, database, engine=None, wait=0): def __init__(self, database, engine=None, wait=0):
""" """
Initialize the object. Initialize the object.
...@@ -64,22 +69,42 @@ class DatabaseManager(object): ...@@ -64,22 +69,42 @@ class DatabaseManager(object):
% (engine, self.ENGINES)) % (engine, self.ENGINES))
self._engine = engine self._engine = engine
self._wait = wait self._wait = wait
self._deferred = 0
self._parse(database) self._parse(database)
self._connect()
def __getattr__(self, attr): def __getattr__(self, attr):
if attr == "_getPartition": if attr == "_getPartition":
np = self.getNumPartitions() np = self.getNumPartitions()
value = lambda x: x % np value = lambda x: x % np
else: elif self._duplicating is None:
return self.__getattribute__(attr) return self.__getattribute__(attr)
else:
value = getattr(self._duplicating, attr)
setattr(self, attr, value) setattr(self, attr, value)
return value return value
@contextmanager
def _duplicate(self):
cls = self.__class__
db = cls.__new__(cls)
db._duplicating = self
try:
db._connect()
finally:
del db._duplicating
try:
yield db
finally:
db.close()
@abstract @abstract
def _parse(self, database): def _parse(self, database):
"""Called during instantiation, to process database parameter.""" """Called during instantiation, to process database parameter."""
@abstract
def _connect(self):
"""Connect to the database"""
def setup(self, reset=0): def setup(self, reset=0):
"""Set up a database, discarding existing data first if reset is True """Set up a database, discarding existing data first if reset is True
""" """
...@@ -415,6 +440,15 @@ class DatabaseManager(object): ...@@ -415,6 +440,15 @@ class DatabaseManager(object):
is always the case at tpc_vote. is always the case at tpc_vote.
""" """
@abstract
def getOrphanList(self):
"""Return the list of data id that is not referenced by the obj table
This is a repair method, and it's usually expensive.
There was a bug that did not free data of transactions that were
aborted before vote. This method is used to reclaim the wasted space.
"""
@abstract @abstract
def _pruneData(self, data_id_list): def _pruneData(self, data_id_list):
"""To be overridden by the backend to delete any unreferenced data """To be overridden by the backend to delete any unreferenced data
...@@ -423,6 +457,8 @@ class DatabaseManager(object): ...@@ -423,6 +457,8 @@ class DatabaseManager(object):
- not in self._uncommitted_data - not in self._uncommitted_data
- and not referenced by a fully-committed object (storage should have - and not referenced by a fully-committed object (storage should have
an index or a refcount of all data ids of all objects) an index or a refcount of all data ids of all objects)
The returned value is the number of deleted rows from the data table.
""" """
@abstract @abstract
...@@ -588,6 +624,37 @@ class DatabaseManager(object): ...@@ -588,6 +624,37 @@ class DatabaseManager(object):
self._setTruncateTID(None) self._setTruncateTID(None)
self.commit() self.commit()
def repair(self, weak_app, dry_run):
t = self._repairing
if t and t.is_alive():
logging.error('already repairing')
return
def repair():
l = threading.Lock()
l.acquire()
def finalize():
try:
if data_id_list and not dry_run:
self.commit()
logging.info("repair: deleted %s orphan records",
self._pruneData(data_id_list))
self.commit()
finally:
l.release()
try:
with self._duplicate() as db:
data_id_list = db.getOrphanList()
logging.info("repair: found %s records that may be orphan",
len(data_id_list))
weak_app().em.wakeup(finalize)
l.acquire()
finally:
del self._repairing
logging.info("repair: done")
t = self._repairing = threading.Thread(target=repair)
t.daemon = 1
t.start()
@abstract @abstract
def getTransaction(self, tid, all = False): def getTransaction(self, tid, all = False):
"""Return a tuple of the list of OIDs, user information, """Return a tuple of the list of OIDs, user information,
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -56,12 +56,6 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -56,12 +56,6 @@ class MySQLDatabaseManager(DatabaseManager):
_max_allowed_packet = 32769 * 1024 _max_allowed_packet = 32769 * 1024
def __init__(self, *args, **kw):
super(MySQLDatabaseManager, self).__init__(*args, **kw)
self.conn = None
self._config = {}
self._connect()
def _parse(self, database): def _parse(self, database):
""" Get the database credentials (username, password, database) """ """ Get the database credentials (username, password, database) """
# expected pattern : [user[:password]@]database[(~|.|/)unix_socket] # expected pattern : [user[:password]@]database[(~|.|/)unix_socket]
...@@ -93,6 +87,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -93,6 +87,7 @@ class MySQLDatabaseManager(DatabaseManager):
logging.exception('Connection to MySQL failed, retrying.') logging.exception('Connection to MySQL failed, retrying.')
time.sleep(1) time.sleep(1)
self._active = 0 self._active = 0
self._config = {}
conn = self.conn conn = self.conn
conn.autocommit(False) conn.autocommit(False)
conn.query("SET SESSION group_concat_max_len = %u" % (2**32-1)) conn.query("SET SESSION group_concat_max_len = %u" % (2**32-1))
...@@ -475,6 +470,11 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -475,6 +470,11 @@ class MySQLDatabaseManager(DatabaseManager):
_structLL = struct.Struct(">LL") _structLL = struct.Struct(">LL")
_unpackLL = _structLL.unpack _unpackLL = _structLL.unpack
def getOrphanList(self):
return [x for x, in self.query(
"SELECT id FROM data LEFT JOIN obj ON (id=data_id)"
" WHERE data_id IS NULL")]
def _pruneData(self, data_id_list): def _pruneData(self, data_id_list):
data_id_list = set(data_id_list).difference(self._uncommitted_data) data_id_list = set(data_id_list).difference(self._uncommitted_data)
if data_id_list: if data_id_list:
...@@ -495,6 +495,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -495,6 +495,8 @@ class MySQLDatabaseManager(DatabaseManager):
if bigid_list: if bigid_list:
q("DELETE FROM bigdata WHERE id IN (%s)" q("DELETE FROM bigdata WHERE id IN (%s)"
% ",".join(map(str, bigid_list))) % ",".join(map(str, bigid_list)))
return len(id_list)
return 0
def _bigData(self, value): def _bigData(self, value):
bigdata_id, length = self._unpackLL(value) bigdata_id, length = self._unpackLL(value)
...@@ -582,11 +584,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -582,11 +584,8 @@ class MySQLDatabaseManager(DatabaseManager):
def abortTransaction(self, ttid): def abortTransaction(self, ttid):
ttid = util.u64(ttid) ttid = util.u64(ttid)
q = self.query q = self.query
sql = " FROM tobj WHERE tid=%s" % ttid q("DELETE FROM tobj WHERE tid=%s" % ttid)
data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
q("DELETE" + sql)
q("DELETE FROM ttrans WHERE ttid=%s" % ttid) q("DELETE FROM ttrans WHERE ttid=%s" % ttid)
self.releaseData(data_id_list, True)
def deleteTransaction(self, tid): def deleteTransaction(self, tid):
tid = util.u64(tid) tid = util.u64(tid)
......
# #
# Copyright (C) 2012-2016 Nexedi SA # Copyright (C) 2012-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -69,11 +69,6 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -69,11 +69,6 @@ class SQLiteDatabaseManager(DatabaseManager):
VERSION = 1 VERSION = 1
def __init__(self, *args, **kw):
super(SQLiteDatabaseManager, self).__init__(*args, **kw)
self._config = {}
self._connect()
def _parse(self, database): def _parse(self, database):
self.db = os.path.expanduser(database) self.db = os.path.expanduser(database)
...@@ -83,6 +78,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -83,6 +78,7 @@ class SQLiteDatabaseManager(DatabaseManager):
def _connect(self): def _connect(self):
logging.info('connecting to SQLite database %r', self.db) logging.info('connecting to SQLite database %r', self.db)
self.conn = sqlite3.connect(self.db, check_same_thread=False) self.conn = sqlite3.connect(self.db, check_same_thread=False)
self._config = {}
def _commit(self): def _commit(self):
retry_if_locked(self.conn.commit) retry_if_locked(self.conn.commit)
...@@ -376,6 +372,11 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -376,6 +372,11 @@ class SQLiteDatabaseManager(DatabaseManager):
packed, buffer(''.join(oid_list)), packed, buffer(''.join(oid_list)),
buffer(user), buffer(desc), buffer(ext), u64(ttid))) buffer(user), buffer(desc), buffer(ext), u64(ttid)))
def getOrphanList(self):
return [x for x, in self.query(
"SELECT id FROM data LEFT JOIN obj ON (id=data_id)"
" WHERE data_id IS NULL")]
def _pruneData(self, data_id_list): def _pruneData(self, data_id_list):
data_id_list = set(data_id_list).difference(self._uncommitted_data) data_id_list = set(data_id_list).difference(self._uncommitted_data)
if data_id_list: if data_id_list:
...@@ -385,6 +386,8 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -385,6 +386,8 @@ class SQLiteDatabaseManager(DatabaseManager):
% ",".join(map(str, data_id_list)))) % ",".join(map(str, data_id_list))))
q("DELETE FROM data WHERE id IN (%s)" q("DELETE FROM data WHERE id IN (%s)"
% ",".join(map(str, data_id_list))) % ",".join(map(str, data_id_list)))
return len(data_id_list)
return 0
def storeData(self, checksum, data, compression, def storeData(self, checksum, data, compression,
_dup=unique_constraint_message("data", "hash", "compression")): _dup=unique_constraint_message("data", "hash", "compression")):
...@@ -439,11 +442,8 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -439,11 +442,8 @@ class SQLiteDatabaseManager(DatabaseManager):
def abortTransaction(self, ttid): def abortTransaction(self, ttid):
args = util.u64(ttid), args = util.u64(ttid),
q = self.query q = self.query
sql = " FROM tobj WHERE tid=?" q("DELETE FROM tobj WHERE tid=?", args)
data_id_list = [x for x, in q("SELECT data_id" + sql, args) if x]
q("DELETE" + sql, args)
q("DELETE FROM ttrans WHERE ttid=?", args) q("DELETE FROM ttrans WHERE ttid=?", args)
self.releaseData(data_id_list, True)
def deleteTransaction(self, tid): def deleteTransaction(self, tid):
tid = util.u64(tid) tid = util.u64(tid)
......
# #
# Copyright (C) 2010-2016 Nexedi SA # Copyright (C) 2010-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import weakref
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.exception import PrimaryFailure, StoppedOperation from neo.lib.exception import PrimaryFailure, StoppedOperation
...@@ -59,3 +60,7 @@ class BaseMasterHandler(EventHandler): ...@@ -59,3 +60,7 @@ class BaseMasterHandler(EventHandler):
def askFinalTID(self, conn, ttid): def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid))) conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid)))
def notifyRepair(self, conn, *args):
app = self.app
app.dm.repair(weakref.ref(app), *args)
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -77,7 +77,7 @@ class ClientOperationHandler(EventHandler): ...@@ -77,7 +77,7 @@ class ClientOperationHandler(EventHandler):
checksum, data, data_serial, unlock) checksum, data, data_serial, unlock)
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
conn.answer(Packets.AnswerStoreObject(1, oid, err.getTID())) conn.answer(Packets.AnswerStoreObject(1, oid, err.tid))
except DelayedError: except DelayedError:
# locked by a previous transaction, retry later # locked by a previous transaction, retry later
# If we are unlocking, we want queueEvent to raise # If we are unlocking, we want queueEvent to raise
...@@ -194,8 +194,7 @@ class ClientOperationHandler(EventHandler): ...@@ -194,8 +194,7 @@ class ClientOperationHandler(EventHandler):
self.app.tm.checkCurrentSerial(ttid, serial, oid) self.app.tm.checkCurrentSerial(ttid, serial, oid)
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(1, oid, conn.answer(Packets.AnswerCheckCurrentSerial(1, oid, err.tid))
err.getTID()))
except DelayedError: except DelayedError:
# locked by a previous transaction, retry later # locked by a previous transaction, retry later
try: try:
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -184,6 +184,10 @@ class StorageOperationHandler(EventHandler): ...@@ -184,6 +184,10 @@ class StorageOperationHandler(EventHandler):
if app.tm.isLockedTid(max_tid): if app.tm.isLockedTid(max_tid):
# Wow, backup cluster is fast. Requested transactions are still in # Wow, backup cluster is fast. Requested transactions are still in
# ttrans/ttobj so wait a little. # ttrans/ttobj so wait a little.
# This can also happen for internal replication, when
# NotifyTransactionFinished(M->S) + AskFetchTransactions(S->S)
# is faster than
# NotifyUnlockInformation(M->S)
app.queueEvent(self.askFetchTransactions, conn, app.queueEvent(self.askFetchTransactions, conn,
(partition, length, min_tid, max_tid, tid_list)) (partition, length, min_tid, max_tid, tid_list))
return return
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -92,6 +92,7 @@ class Replicator(object): ...@@ -92,6 +92,7 @@ class Replicator(object):
def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list): def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
"""This is a callback from MasterOperationHandler.""" """This is a callback from MasterOperationHandler."""
assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list)
if ttid_list: if ttid_list:
self.ttid_set.update(ttid_list) self.ttid_set.update(ttid_list)
max_ttid = max(ttid_list) max_ttid = max(ttid_list)
......
This diff is collapsed.
This diff is collapsed.
from __future__ import print_function
import sys import sys
import smtplib import smtplib
import optparse import optparse
...@@ -34,13 +34,13 @@ class BenchmarkRunner(object): ...@@ -34,13 +34,13 @@ class BenchmarkRunner(object):
parser.add_option('', '--repeat', type='int', default=1) parser.add_option('', '--repeat', type='int', default=1)
self.add_options(parser) self.add_options(parser)
# check common arguments # check common arguments
options, self._args = parser.parse_args() options, args = parser.parse_args()
if bool(options.mail_to) ^ bool(options.mail_from): if bool(options.mail_to) ^ bool(options.mail_from):
sys.exit('Need a sender and recipients to mail report') sys.exit('Need a sender and recipients to mail report')
mail_server = options.mail_server or MAIL_SERVER mail_server = options.mail_server or MAIL_SERVER
# check specifics arguments # check specifics arguments
self._config = AttributeDict() self._config = AttributeDict()
self._config.update(self.load_options(options, self._args)) self._config.update(self.load_options(options, args))
self._config.update( self._config.update(
title = options.title or self.__class__.__name__, title = options.title or self.__class__.__name__,
mail_from = options.mail_from, mail_from = options.mail_from,
...@@ -87,7 +87,7 @@ class BenchmarkRunner(object): ...@@ -87,7 +87,7 @@ class BenchmarkRunner(object):
try: try:
s.sendmail(self._config.mail_from, recipient, mail) s.sendmail(self._config.mail_from, recipient, mail)
except smtplib.SMTPRecipientsRefused: except smtplib.SMTPRecipientsRefused:
print "Mail for %s fails" % recipient print("Mail for %s fails" % recipient)
s.close() s.close()
def run(self): def run(self):
...@@ -95,9 +95,10 @@ class BenchmarkRunner(object): ...@@ -95,9 +95,10 @@ class BenchmarkRunner(object):
report = self.build_report(report) report = self.build_report(report)
if self._config.mail_to: if self._config.mail_to:
self.send_report(subject, report) self.send_report(subject, report)
print subject print(subject)
print if report:
print report print()
print(report, end='')
def was_successful(self): def was_successful(self):
return self._successful return self._successful
......
This diff is collapsed.
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -15,12 +15,13 @@ ...@@ -15,12 +15,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import time, unittest import time, unittest
from mock import Mock from ..mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.client.app import ConnectionPool from neo.client.app import ConnectionPool
from neo.client.exception import NEOStorageError from neo.client.exception import NEOStorageError
from neo.client import pool from neo.client import pool
from neo.lib.util import p64
class ConnectionPoolTests(NeoUnitTestBase): class ConnectionPoolTests(NeoUnitTestBase):
...@@ -54,7 +55,7 @@ class ConnectionPoolTests(NeoUnitTestBase): ...@@ -54,7 +55,7 @@ class ConnectionPoolTests(NeoUnitTestBase):
def test_iterateForObject_noStorageAvailable(self): def test_iterateForObject_noStorageAvailable(self):
# no node available # no node available
oid = self.getOID(1) oid = p64(1)
app = Mock() app = Mock()
app.pt = Mock({'getCellList': []}) app.pt = Mock({'getCellList': []})
pool = ConnectionPool(app) pool = ConnectionPool(app)
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import unittest
from mock import Mock from ..mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.client.handlers.master import PrimaryAnswersHandler from neo.client.handlers.master import PrimaryAnswersHandler
from neo.client.exception import NEOStorageError from neo.client.exception import NEOStorageError
......
#
# Copyright (C) 2009-2016 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from mock import Mock
from .. import NeoUnitTestBase
from neo.client.handlers.storage import StorageAnswersHandler
from neo.client.exception import NEOStorageError, NEOStorageNotFoundError
class StorageAnswerHandlerTests(NeoUnitTestBase):
def setUp(self):
super(StorageAnswerHandlerTests, self).setUp()
self.app = Mock()
self.handler = StorageAnswersHandler(self.app)
def _getAnswerStoreObjectHandler(self, object_stored_counter_dict,
conflict_serial_dict, resolved_conflict_serial_dict):
app = Mock({
'getHandlerData': {
'object_stored_counter_dict': object_stored_counter_dict,
'conflict_serial_dict': conflict_serial_dict,
'resolved_conflict_serial_dict': resolved_conflict_serial_dict,
}
})
return StorageAnswersHandler(app)
def test_answerStoreObject_1(self):
conn = self.getFakeConnection()
oid = self.getOID(0)
tid = self.getNextTID()
# conflict
object_stored_counter_dict = {oid: {}}
conflict_serial_dict = {}
resolved_conflict_serial_dict = {}
self._getAnswerStoreObjectHandler(object_stored_counter_dict,
conflict_serial_dict, resolved_conflict_serial_dict,
).answerStoreObject(conn, 1, oid, tid)
self.assertEqual(conflict_serial_dict[oid], {tid})
self.assertEqual(object_stored_counter_dict[oid], {})
self.assertFalse(oid in resolved_conflict_serial_dict)
# object was already accepted by another storage, raise
handler = self._getAnswerStoreObjectHandler({oid: {tid: {1}}}, {}, {})
self.assertRaises(NEOStorageError, handler.answerStoreObject,
conn, 1, oid, tid)
def test_answerStoreObject_2(self):
conn = self.getFakeConnection()
oid = self.getOID(0)
tid = self.getNextTID()
tid_2 = self.getNextTID()
# resolution-pending conflict
object_stored_counter_dict = {oid: {}}
conflict_serial_dict = {oid: {tid}}
resolved_conflict_serial_dict = {}
self._getAnswerStoreObjectHandler(object_stored_counter_dict,
conflict_serial_dict, resolved_conflict_serial_dict,
).answerStoreObject(conn, 1, oid, tid)
self.assertEqual(conflict_serial_dict[oid], {tid})
self.assertFalse(oid in resolved_conflict_serial_dict)
self.assertEqual(object_stored_counter_dict[oid], {})
# object was already accepted by another storage, raise
handler = self._getAnswerStoreObjectHandler({oid: {tid: {1}}},
{oid: {tid}}, {})
self.assertRaises(NEOStorageError, handler.answerStoreObject,
conn, 1, oid, tid)
# detected conflict is different, don't raise
self._getAnswerStoreObjectHandler({oid: {}}, {oid: {tid}}, {},
).answerStoreObject(conn, 1, oid, tid_2)
def test_answerStoreObject_3(self):
conn = self.getFakeConnection()
oid = self.getOID(0)
tid = self.getNextTID()
tid_2 = self.getNextTID()
# already-resolved conflict
# This case happens if a storage is answering a store action for which
# any other storage already answered (with same conflict) and any other
# storage accepted the resolved object.
object_stored_counter_dict = {oid: {tid_2: 1}}
conflict_serial_dict = {}
resolved_conflict_serial_dict = {oid: {tid}}
self._getAnswerStoreObjectHandler(object_stored_counter_dict,
conflict_serial_dict, resolved_conflict_serial_dict,
).answerStoreObject(conn, 1, oid, tid)
self.assertFalse(oid in conflict_serial_dict)
self.assertEqual(resolved_conflict_serial_dict[oid], {tid})
self.assertEqual(object_stored_counter_dict[oid], {tid_2: 1})
# detected conflict is different, don't raise
self._getAnswerStoreObjectHandler({oid: {tid: 1}}, {},
{oid: {tid}}).answerStoreObject(conn, 1, oid, tid_2)
def test_tidNotFound(self):
conn = self.getFakeConnection()
self.assertRaises(NEOStorageNotFoundError, self.handler.tidNotFound,
conn, 'message')
if __name__ == '__main__':
unittest.main()
# #
# Copyright (C) 2011-2016 Nexedi SA # Copyright (C) 2011-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2014-2016 Nexedi SA # Copyright (C) 2014-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -424,6 +424,18 @@ class NEOCluster(object): ...@@ -424,6 +424,18 @@ class NEOCluster(object):
if not pdb.wait(test, MAX_START_TIME): if not pdb.wait(test, MAX_START_TIME):
raise AssertionError('Timeout when starting cluster') raise AssertionError('Timeout when starting cluster')
def startCluster(self):
# Even if the storage nodes are in the expected state, there may still
# be activity between them and the master, preventing the cluster to
# start.
def start(last_try):
try:
self.neoctl.startCluster()
except (NotReadyException, RuntimeError), e:
return False, e
return True, None
self.expectCondition(start)
def stop(self, clients=True): def stop(self, clients=True):
# Suspend all processes to kill before actually killing them, so that # Suspend all processes to kill before actually killing them, so that
# nodes don't log errors because they get disconnected from other nodes: # nodes don't log errors because they get disconnected from other nodes:
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -267,8 +267,8 @@ class ClientTests(NEOFunctionalTest): ...@@ -267,8 +267,8 @@ class ClientTests(NEOFunctionalTest):
db2, conn2 = self.neo.getZODBConnection() db2, conn2 = self.neo.getZODBConnection()
st1, st2 = conn1._storage, conn2._storage st1, st2 = conn1._storage, conn2._storage
t1, t2 = transaction.Transaction(), transaction.Transaction() t1, t2 = transaction.Transaction(), transaction.Transaction()
t1.user = t2.user = 'user' t1.user = t2.user = u'user'
t1.description = t2.description = 'desc' t1.description = t2.description = u'desc'
oid = st1.new_oid() oid = st1.new_oid()
rev = '\0' * 8 rev = '\0' * 8
data = zodb_pickle(PObject()) data = zodb_pickle(PObject())
...@@ -311,8 +311,8 @@ class ClientTests(NEOFunctionalTest): ...@@ -311,8 +311,8 @@ class ClientTests(NEOFunctionalTest):
db2, conn2 = self.neo.getZODBConnection() db2, conn2 = self.neo.getZODBConnection()
st1, st2 = conn1._storage, conn2._storage st1, st2 = conn1._storage, conn2._storage
t1, t2 = transaction.Transaction(), transaction.Transaction() t1, t2 = transaction.Transaction(), transaction.Transaction()
t1.user = t2.user = 'user' t1.user = t2.user = u'user'
t1.description = t2.description = 'desc' t1.description = t2.description = u'desc'
oid = st1.new_oid() oid = st1.new_oid()
rev = '\0' * 8 rev = '\0' * 8
data = zodb_pickle(PObject()) data = zodb_pickle(PObject())
...@@ -330,8 +330,8 @@ class ClientTests(NEOFunctionalTest): ...@@ -330,8 +330,8 @@ class ClientTests(NEOFunctionalTest):
db3, conn3 = self.neo.getZODBConnection() db3, conn3 = self.neo.getZODBConnection()
st3 = conn3._storage st3 = conn3._storage
t3 = transaction.Transaction() t3 = transaction.Transaction()
t3.user = 'user' t3.user = u'user'
t3.description = 'desc' t3.description = u'desc'
st3.tpc_begin(t3) st3.tpc_begin(t3)
# retrieve the last revision # retrieve the last revision
data, serial = st3.load(oid) data, serial = st3.load(oid)
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -31,7 +31,6 @@ class ClusterTests(NEOFunctionalTest): ...@@ -31,7 +31,6 @@ class ClusterTests(NEOFunctionalTest):
def testClusterStartup(self): def testClusterStartup(self):
neo = self.neo = NEOCluster(['test_neo1', 'test_neo2'], replicas=1, neo = self.neo = NEOCluster(['test_neo1', 'test_neo2'], replicas=1,
temp_dir=self.getTempDirectory()) temp_dir=self.getTempDirectory())
neoctl = neo.neoctl
neo.run() neo.run()
# Runing a new cluster doesn't exit Recovery state. # Runing a new cluster doesn't exit Recovery state.
s1, s2 = neo.getStorageProcessList() s1, s2 = neo.getStorageProcessList()
...@@ -40,7 +39,7 @@ class ClusterTests(NEOFunctionalTest): ...@@ -40,7 +39,7 @@ class ClusterTests(NEOFunctionalTest):
neo.expectClusterRecovering() neo.expectClusterRecovering()
# When allowing cluster to exit Recovery, it reaches Running state and # When allowing cluster to exit Recovery, it reaches Running state and
# all present storage nodes reach running state. # all present storage nodes reach running state.
neoctl.startCluster() neo.startCluster()
neo.expectRunning(s1) neo.expectRunning(s1)
neo.expectRunning(s2) neo.expectRunning(s2)
neo.expectClusterRunning() neo.expectClusterRunning()
...@@ -64,7 +63,7 @@ class ClusterTests(NEOFunctionalTest): ...@@ -64,7 +63,7 @@ class ClusterTests(NEOFunctionalTest):
neo.expectPending(s1) neo.expectPending(s1)
neo.expectUnknown(s2) neo.expectUnknown(s2)
neo.expectClusterRecovering() neo.expectClusterRecovering()
neoctl.startCluster() neo.startCluster()
neo.expectRunning(s1) neo.expectRunning(s1)
neo.expectUnknown(s2) neo.expectUnknown(s2)
neo.expectClusterRunning() neo.expectClusterRunning()
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -14,14 +14,12 @@ ...@@ -14,14 +14,12 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import unittest import unittest
import transaction import transaction
from persistent import Persistent from persistent import Persistent
from . import NEOCluster, NEOFunctionalTest from . import NEOCluster, NEOFunctionalTest
from neo.lib.protocol import ClusterStates, NodeStates from neo.lib.protocol import ClusterStates, NodeStates
from ZODB.tests.StorageTestBase import zodb_pickle
class PObject(Persistent): class PObject(Persistent):
...@@ -421,47 +419,5 @@ class StorageTests(NEOFunctionalTest): ...@@ -421,47 +419,5 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectClusterRunning() self.neo.expectClusterRunning()
self.neo.expectOudatedCells(number=0) self.neo.expectOudatedCells(number=0)
def testReplicationBlockedByUnfinished(self):
# start a cluster with 1 of 2 storages and a replica
(started, stopped) = self.__setup(storage_number=2, replicas=1,
pending_number=1, partitions=10)
self.neo.expectRunning(started[0])
self.neo.expectStorageNotKnown(stopped[0])
self.neo.expectOudatedCells(number=0)
self.neo.expectClusterRunning()
self.__populate()
self.neo.expectOudatedCells(number=0)
# start a transaction that will block the end of the replication
db, conn = self.neo.getZODBConnection()
st = conn._storage
t = transaction.Transaction()
t.user = 'user'
t.description = 'desc'
oid = st.new_oid()
rev = '\0' * 8
data = zodb_pickle(PObject(42))
st.tpc_begin(t)
st.store(oid, rev, data, '', t)
# start the outdated storage
stopped[0].start()
self.neo.expectPending(stopped[0])
self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
self.neo.neoctl.tweakPartitionTable()
self.neo.expectRunning(stopped[0])
self.neo.expectClusterRunning()
self.neo.expectAssignedCells(started[0], 10)
self.neo.expectAssignedCells(stopped[0], 10)
# wait a bit, replication must not happen. This hack is required
# because we cannot gather informations directly from the storages
time.sleep(10)
self.neo.expectOudatedCells(number=10)
# finish the transaction, the replication must happen and finish
st.tpc_vote(t)
st.tpc_finish(t)
self.neo.expectOudatedCells(number=0, timeout=10)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import unittest
from mock import Mock from ..mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib.util import p64
from neo.lib.protocol import NodeTypes, NodeStates, Packets from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.master.handlers.client import ClientServiceHandler from neo.master.handlers.client import ClientServiceHandler
from neo.master.app import Application from neo.master.app import Application
...@@ -62,6 +63,9 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -62,6 +63,9 @@ class MasterClientHandlerTests(NeoUnitTestBase):
) )
return uuid return uuid
def checkAnswerBeginTransaction(self, conn):
return self.checkAnswerPacket(conn, Packets.AnswerBeginTransaction)
# Tests # Tests
def test_07_askBeginTransaction(self): def test_07_askBeginTransaction(self):
tid1 = self.getNextTID() tid1 = self.getNextTID()
...@@ -87,12 +91,12 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -87,12 +91,12 @@ class MasterClientHandlerTests(NeoUnitTestBase):
calls = tm.mockGetNamedCalls('begin') calls = tm.mockGetNamedCalls('begin')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(client_node, None) calls[0].checkArgs(client_node, None)
args = self.checkAnswerBeginTransaction(conn, decode=True) packet = self.checkAnswerBeginTransaction(conn)
self.assertEqual(args, (tid1, )) self.assertEqual(packet.decode(), (tid1, ))
def test_08_askNewOIDs(self): def test_08_askNewOIDs(self):
service = self.service service = self.service
oid1, oid2 = self.getOID(1), self.getOID(2) oid1, oid2 = p64(1), p64(2)
self.app.tm.setLastOID(oid1) self.app.tm.setLastOID(oid1)
# client call it # client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port) client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
...@@ -136,7 +140,7 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -136,7 +140,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
self.app.setStorageReady(storage_uuid) self.app.setStorageReady(storage_uuid)
self.assertTrue(self.app.isStorageReady(storage_uuid)) self.assertTrue(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, ttid, (), ()) service.askFinishTransaction(conn, ttid, (), ())
self.checkAskLockInformation(storage_conn) self.checkAskPacket(storage_conn, Packets.AskLockInformation)
self.assertEqual(len(self.app.tm.registerForNotification(storage_uuid)), 1) self.assertEqual(len(self.app.tm.registerForNotification(storage_uuid)), 1)
txn = self.app.tm[ttid] txn = self.app.tm[ttid]
pending_ttid = list(self.app.tm.registerForNotification(storage_uuid))[0] pending_ttid = list(self.app.tm.registerForNotification(storage_uuid))[0]
...@@ -170,8 +174,7 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -170,8 +174,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn) self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
self.service.askPack(conn, tid) self.service.askPack(conn, tid)
self.checkNoPacketSent(conn) self.checkNoPacketSent(conn)
ptid = self.checkAskPacket(storage_conn, Packets.AskPack, ptid = self.checkAskPacket(storage_conn, Packets.AskPack).decode()[0]
decode=True)[0]
self.assertEqual(ptid, tid) self.assertEqual(ptid, tid)
self.assertTrue(self.app.packing[0] is conn) self.assertTrue(self.app.packing[0] is conn)
self.assertEqual(self.app.packing[1], peer_id) self.assertEqual(self.app.packing[1], peer_id)
...@@ -183,8 +186,7 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -183,8 +186,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn) self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
self.service.askPack(conn, tid) self.service.askPack(conn, tid)
self.checkNoPacketSent(storage_conn) self.checkNoPacketSent(storage_conn)
status = self.checkAnswerPacket(conn, Packets.AnswerPack, status = self.checkAnswerPacket(conn, Packets.AnswerPack).decode()[0]
decode=True)[0]
self.assertFalse(status) self.assertFalse(status)
if __name__ == '__main__': if __name__ == '__main__':
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import unittest
from mock import Mock from ..mock import Mock
from neo.lib import protocol from neo.lib import protocol
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib.protocol import NodeTypes, NodeStates from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.master.handlers.election import ClientElectionHandler, \ from neo.master.handlers.election import ClientElectionHandler, \
ServerElectionHandler ServerElectionHandler
from neo.master.app import Application from neo.master.app import Application
...@@ -48,6 +48,9 @@ class MasterClientElectionTestBase(NeoUnitTestBase): ...@@ -48,6 +48,9 @@ class MasterClientElectionTestBase(NeoUnitTestBase):
node.setConnection(conn) node.setConnection(conn)
return (node, conn) return (node, conn)
def checkAcceptIdentification(self, conn):
return self.checkAnswerPacket(conn, Packets.AcceptIdentification)
class MasterClientElectionTests(MasterClientElectionTestBase): class MasterClientElectionTests(MasterClientElectionTestBase):
def setUp(self): def setUp(self):
...@@ -91,7 +94,7 @@ class MasterClientElectionTests(MasterClientElectionTestBase): ...@@ -91,7 +94,7 @@ class MasterClientElectionTests(MasterClientElectionTestBase):
self.election.connectionCompleted(conn) self.election.connectionCompleted(conn)
self._checkUnconnected(node) self._checkUnconnected(node)
self.assertTrue(node.isUnknown()) self.assertTrue(node.isUnknown())
self.checkRequestIdentification(conn) self.checkAskPacket(conn, Packets.RequestIdentification)
def _setNegociating(self, node): def _setNegociating(self, node):
self._checkUnconnected(node) self._checkUnconnected(node)
...@@ -252,9 +255,8 @@ class MasterServerElectionTests(MasterClientElectionTestBase): ...@@ -252,9 +255,8 @@ class MasterServerElectionTests(MasterClientElectionTestBase):
self.election.requestIdentification(conn, self.election.requestIdentification(conn,
NodeTypes.MASTER, *args) NodeTypes.MASTER, *args)
self.checkUUIDSet(conn, node.getUUID()) self.checkUUIDSet(conn, node.getUUID())
args = self.checkAcceptIdentification(conn, decode=True)
(node_type, uuid, partitions, replicas, new_uuid, primary_uuid, (node_type, uuid, partitions, replicas, new_uuid, primary_uuid,
master_list) = args master_list) = self.checkAcceptIdentification(conn).decode()
self.assertEqual(node.getUUID(), new_uuid) self.assertEqual(node.getUUID(), new_uuid)
self.assertNotEqual(node.getUUID(), uuid) self.assertNotEqual(node.getUUID(), uuid)
...@@ -290,7 +292,7 @@ class MasterServerElectionTests(MasterClientElectionTestBase): ...@@ -290,7 +292,7 @@ class MasterServerElectionTests(MasterClientElectionTestBase):
None, None,
) )
node_type, uuid, partitions, replicas, _peer_uuid, primary, \ node_type, uuid, partitions, replicas, _peer_uuid, primary, \
master_list = self.checkAcceptIdentification(conn, decode=True) master_list = self.checkAcceptIdentification(conn).decode()
self.assertEqual(node_type, NodeTypes.MASTER) self.assertEqual(node_type, NodeTypes.MASTER)
self.assertEqual(uuid, self.app.uuid) self.assertEqual(uuid, self.app.uuid)
self.assertEqual(partitions, self.app.pt.getPartitions()) self.assertEqual(partitions, self.app.pt.getPartitions())
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
import unittest import unittest
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib.protocol import Packets
from neo.master.app import Application from neo.master.app import Application
class MasterAppTests(NeoUnitTestBase): class MasterAppTests(NeoUnitTestBase):
...@@ -31,6 +32,9 @@ class MasterAppTests(NeoUnitTestBase): ...@@ -31,6 +32,9 @@ class MasterAppTests(NeoUnitTestBase):
self.app.close() self.app.close()
NeoUnitTestBase._tearDown(self, success) NeoUnitTestBase._tearDown(self, success)
def checkNotifyNodeInformation(self, conn):
return self.checkNotifyPacket(conn, Packets.NotifyNodeInformation)
def test_06_broadcastNodeInformation(self): def test_06_broadcastNodeInformation(self):
# defined some nodes to which data will be send # defined some nodes to which data will be send
master_uuid = self.getMasterUUID() master_uuid = self.getMasterUUID()
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import unittest
from mock import Mock from ..mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib.protocol import NodeTypes, Packets from neo.lib.protocol import NodeTypes, Packets
from neo.master.handlers.storage import StorageServiceHandler from neo.master.handlers.storage import StorageServiceHandler
...@@ -71,10 +71,9 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -71,10 +71,9 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
self.checkNoPacketSent(client_conn) self.checkNoPacketSent(client_conn)
self.assertEqual(self.app.packing[2], {conn2.getUUID()}) self.assertEqual(self.app.packing[2], {conn2.getUUID()})
self.service.answerPack(conn2, False) self.service.answerPack(conn2, False)
status = self.checkAnswerPacket(client_conn, Packets.AnswerPack, packet = self.checkAnswerPacket(client_conn, Packets.AnswerPack)
decode=True)[0]
# TODO: verify packet peer id # TODO: verify packet peer id
self.assertTrue(status) self.assertTrue(packet.decode()[0])
self.assertEqual(self.app.packing, None) self.assertEqual(self.app.packing, None)
if __name__ == '__main__': if __name__ == '__main__':
......
# #
# Copyright (C) 2006-2016 Nexedi SA # Copyright (C) 2006-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import unittest
from mock import Mock from ..mock import Mock
from struct import pack from struct import pack
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.lib.protocol import NodeTypes from neo.lib.protocol import NodeTypes
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -15,12 +15,12 @@ ...@@ -15,12 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import unittest
from mock import Mock, ReturnValues from ..mock import Mock, ReturnValues
from collections import deque
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.storage.app import Application from neo.storage.app import Application
from neo.storage.handlers.client import ClientOperationHandler from neo.storage.handlers.client import ClientOperationHandler
from neo.lib.protocol import INVALID_TID, INVALID_OID, Packets, LockState from neo.lib.util import p64
from neo.lib.protocol import INVALID_TID, Packets, LockState
class StorageClientHandlerTests(NeoUnitTestBase): class StorageClientHandlerTests(NeoUnitTestBase):
...@@ -30,11 +30,6 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -30,11 +30,6 @@ class StorageClientHandlerTests(NeoUnitTestBase):
# create an application object # create an application object
config = self.getStorageConfiguration(master_number=1) config = self.getStorageConfiguration(master_number=1)
self.app = Application(config) self.app = Application(config)
self.app.transaction_dict = {}
self.app.store_lock_dict = {}
self.app.load_lock_dict = {}
self.app.event_queue = deque()
self.app.event_queue_dict = {}
self.app.tm = Mock({'__contains__': True}) self.app.tm = Mock({'__contains__': True})
# handler # handler
self.operation = ClientOperationHandler(self.app) self.operation = ClientOperationHandler(self.app)
...@@ -59,19 +54,6 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -59,19 +54,6 @@ class StorageClientHandlerTests(NeoUnitTestBase):
self.operation.askTransactionInformation(conn, INVALID_TID) self.operation.askTransactionInformation(conn, INVALID_TID)
self.checkErrorPacket(conn) self.checkErrorPacket(conn)
def test_24_askObject1(self):
# delayed response
conn = self._getConnection()
self.app.dm = Mock()
self.app.tm = Mock({'loadLocked': True})
self.app.load_lock_dict[INVALID_OID] = object()
self.assertEqual(len(self.app.event_queue), 0)
self.operation.askObject(conn, oid=INVALID_OID,
serial=INVALID_TID, tid=INVALID_TID)
self.assertEqual(len(self.app.event_queue), 1)
self.checkNoPacketSent(conn)
self.assertEqual(len(self.app.dm.mockGetNamedCalls('getObject')), 0)
def test_25_askTIDs1(self): def test_25_askTIDs1(self):
# invalid offsets => error # invalid offsets => error
app = self.app app = self.app
...@@ -91,7 +73,7 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -91,7 +73,7 @@ class StorageClientHandlerTests(NeoUnitTestBase):
calls = self.app.dm.mockGetNamedCalls('getTIDList') calls = self.app.dm.mockGetNamedCalls('getTIDList')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(1, 1, [1, ]) calls[0].checkArgs(1, 1, [1, ])
self.checkAnswerTids(conn) self.checkAnswerPacket(conn, Packets.AnswerTIDs)
def test_26_askObjectHistory1(self): def test_26_askObjectHistory1(self):
# invalid offsets => error # invalid offsets => error
...@@ -108,8 +90,7 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -108,8 +90,7 @@ class StorageClientHandlerTests(NeoUnitTestBase):
ltid = self.getNextTID() ltid = self.getNextTID()
undone_tid = self.getNextTID() undone_tid = self.getNextTID()
# Keep 2 entries here, so we check findUndoTID is called only once. # Keep 2 entries here, so we check findUndoTID is called only once.
oid_list = [self.getOID(1), self.getOID(2)] oid_list = map(p64, (1, 2))
obj2_data = [] # Marker
self.app.tm = Mock({ self.app.tm = Mock({
'getObjectFromTransaction': None, 'getObjectFromTransaction': None,
}) })
...@@ -134,7 +115,7 @@ class StorageClientHandlerTests(NeoUnitTestBase): ...@@ -134,7 +115,7 @@ class StorageClientHandlerTests(NeoUnitTestBase):
conn = self._getConnection() conn = self._getConnection()
self.operation.askHasLock(conn, tid_1, oid) self.operation.askHasLock(conn, tid_1, oid)
p_oid, p_status = self.checkAnswerPacket(conn, p_oid, p_status = self.checkAnswerPacket(conn,
Packets.AnswerHasLock, decode=True) Packets.AnswerHasLock).decode()
self.assertEqual(oid, p_oid) self.assertEqual(oid, p_oid)
self.assertEqual(status, p_status) self.assertEqual(status, p_status)
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import unittest
from mock import Mock from ..mock import Mock
from collections import deque from collections import deque
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.storage.app import Application from neo.storage.app import Application
......
# #
# Copyright (C) 2009-2016 Nexedi SA # Copyright (C) 2009-2017 Nexedi SA
# #
# This program is free software; you can redistribute it and/or # This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License # modify it under the terms of the GNU General Public License
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest import unittest
from mock import Mock from ..mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.storage.app import Application from neo.storage.app import Application
from neo.lib.protocol import CellStates from neo.lib.protocol import CellStates
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment