Commit 446cc613 authored by Carlos Ramos Carreño's avatar Carlos Ramos Carreño

Allow importing from Python 3.

Do the necessary changes so that the main module can be imported inside
a Python 3 project without errors.
Note that the project will still not work in Python 3, but a project
that imports NEO unconditionally and does not use it in Python 3, such
as wendelin.core, won't have problems.

The changes here are mostly:
  - Replacing old-style excepts (`except Exception, e:`) with the new
  syntax (`except Exception as e:`), compatible with both Python 2 and
  3.
  - Removing calls to `@apply` and replacing them by an explicit call
  to the class/method.
  - Using `six` for some modules that were renamed (e.g. `thread` is
  now `_thread`), as well as some renamed methods `iteritems` is now
  `items`.
  - Replacing `raise` with a tuple of two elements with the appropriate
  call to the exception constructor.
  - Using `six` for achieving a `reraise` with an existing traceback,
  as the old system (raising a tuple of three elements) cannot be used
  in Python 3.
  - I also changed the `requires_extra` for `tests` to include all other
  extras as intended, because the way it was being done did not work
  for me.

The changes are intended to be as surgical and non-controversial as
possible, in order to facilitate review and acceptance.
I can try to achieve full compatibility with Python 3 in future MRs, if
that is desired.
I ran the unittests locally in Python 2 and they seemed to work.
parent 3ddb6663
......@@ -353,7 +353,7 @@ class Application(ThreadedApplication):
return askStorage(conn, packet)
except ConnectionClosed:
pass
except NEOStorageReadRetry, e:
except NEOStorageReadRetry as e:
if e.args[0]:
continue
failed += 1
......
......@@ -36,7 +36,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
def notPrimaryMaster(self, *args):
try:
super(PrimaryNotificationsHandler, self).notPrimaryMaster(*args)
except PrimaryElected, e:
except PrimaryElected as e:
self.app.primary_master_node, = e.args
def answerLastTransaction(self, conn, ltid):
......
......@@ -30,13 +30,14 @@ from ..exception import (
NEOStorageReadRetry, NEOStorageDoesNotExistError,
)
@apply
class _DeadlockPacket(object):
handler_method_name = 'notifyDeadlock'
_args = ()
getId = int
DEADLOCK_PACKET = _DeadlockPacket()
class StorageEventHandler(MTEventHandler):
def _acceptIdentification(*args):
......@@ -45,7 +46,7 @@ class StorageEventHandler(MTEventHandler):
def notifyDeadlock(self, conn, ttid, oid):
for txn_context in self.app.txn_contexts():
if txn_context.ttid == ttid:
txn_context.queue.put((conn, _DeadlockPacket, {'oid': oid}))
txn_context.queue.put((conn, DEADLOCK_PACKET, {'oid': oid}))
break
class StorageBootstrapHandler(AnswerBaseHandler):
......
......@@ -16,7 +16,7 @@
import argparse, os, sys
from functools import wraps
from ConfigParser import SafeConfigParser
from six.moves.configparser import SafeConfigParser
class _DefaultList(list):
"""
......
......@@ -36,11 +36,12 @@ except OutOfData: # fallback implementation
msgpack.Unpacker.read_bytes = read_bytes
del read_bytes
@apply
class dummy_read_buffer(msgpack.Unpacker):
class DummyReadBuffer(msgpack.Unpacker):
def feed(self, _):
pass
dummy_read_buffer = DummyReadBuffer()
class ConnectionClosed(Exception):
pass
......@@ -497,7 +498,7 @@ class Connection(BaseConnection):
self._queue.append(packet)
except ConnectorException:
self._closure()
except PacketMalformedError, e:
except PacketMalformedError as e:
logging.error('malformed packet from %r: %s', self, e)
self._closure()
return empty_queue and not not self._queue
......@@ -651,7 +652,7 @@ class ClientConnection(Connection):
def _connect(self):
try:
connected = self.connector.makeClientConnection()
except ConnectorDelayedConnection, c:
except ConnectorDelayedConnection as c:
connect_limit, = c.args
self.getTimeout = lambda: connect_limit
self.onTimeout = self._delayedConnect
......@@ -749,7 +750,7 @@ class MTClientConnection(ClientConnection):
super(MTClientConnection, self).__init__(*args, **kwargs)
# Alias without lock (cheaper than super())
_ask = ClientConnection.ask.__func__
_ask = ClientConnection.ask
def ask(self, packet, queue=None, **kw):
with self.lock:
......
......@@ -20,6 +20,7 @@ import errno
from time import time
from . import logging
from .protocol import HANDSHAKE_PACKET
import six
# Global connector registry.
# Fill by calling registerConnectorHandler.
......@@ -114,7 +115,7 @@ class SocketConnector(object):
self.is_server = self.is_closed = False
try:
self._connect(addr)
except socket.error, e:
except socket.error as e:
if e.errno == errno.EINPROGRESS:
return False
self._error('connect', e)
......@@ -127,7 +128,7 @@ class SocketConnector(object):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._bind(self.addr)
self.socket.listen(self.SOMAXCONN)
except socket.error, e:
except socket.error as e:
self.is_closed = True
self.socket.close()
self._error('listen', e)
......@@ -161,13 +162,13 @@ class SocketConnector(object):
s, addr = self.socket.accept()
s = self.__class__(addr, s)
return s, s.addr
except socket.error, e:
except socket.error as e:
self._error('accept', e)
def receive(self, read_buf):
try:
data = self.socket.recv(65536)
except socket.error, e:
except socket.error as e:
self._error('recv', e)
if data:
read_buf.feed(data)
......@@ -184,7 +185,7 @@ class SocketConnector(object):
if msg:
try:
n = self.socket.send(msg)
except socket.error, e:
except socket.error as e:
self._error('send', e)
# Do nothing special if n == 0:
# - it never happens for simple sockets;
......@@ -210,7 +211,7 @@ class SocketConnector(object):
pass
try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error, e:
except socket.error as e:
if e.errno != errno.ENOTCONN:
raise
return self.socket.close
......@@ -257,9 +258,9 @@ registerConnectorHandler(SocketConnectorIPv6)
def overlay_connector_class(cls):
name = cls.__name__[1:]
alias = name + 'ConnectorClass'
for base in connector_registry.itervalues():
for base in six.itervalues(connector_registry):
setattr(base, alias, type(name + base.__name__,
cls.__bases__ + (base,), cls.__dict__))
(base,) + cls.__bases__, dict(cls.__dict__)))
return cls
@overlay_connector_class
......@@ -283,7 +284,7 @@ class _SSL:
read_buf.feed(data)
except ssl.SSLWantReadError:
pass
except socket.error, e:
except socket.error as e:
self._error('recv', e)
@overlay_connector_class
......@@ -320,7 +321,7 @@ class _SSLHandshake(_SSL):
return read_buf is None
except ssl.SSLWantWriteError:
return read_buf is not None
except socket.error, e:
except socket.error as e:
# OpenSSL 1.1 may raise socket.error(0)
# where previous versions raised SSLEOFError.
self._error('send' if read_buf is None else 'recv',
......@@ -346,7 +347,7 @@ class _SSLHandshake(_SSL):
def receive(self, read_buf):
try:
content_type = self.socket._sock.recv(1, socket.MSG_PEEK)
except socket.error, e:
except socket.error as e:
self._error('recv', e)
if content_type == '\26': # handshake
return self.send(read_buf)
......
......@@ -125,8 +125,8 @@ class PdbSocket(object):
try:
self._socket.recv(0)
return True
except socket.error, (err, _):
if err != errno.EAGAIN:
except socket.error as e:
if e.errno != errno.EAGAIN:
raise
self._socket.setblocking(1)
return False
......@@ -19,7 +19,6 @@ from .locking import Lock, Empty
EMPTY = {}
NOBODY = []
@apply
class _ConnectionClosed(object):
handler_method_name = 'connectionClosed'
......@@ -29,6 +28,8 @@ class _ConnectionClosed(object):
def __eq__(self, other):
return True
CONNECTION_CLOSED = _ConnectionClosed()
def giant_lock(func):
def wrapped(self, *args, **kw):
self.lock_acquire()
......@@ -97,7 +98,7 @@ class Dispatcher:
continue
queue_id = id(queue)
if queue_id not in notified_set:
queue.put((conn, _ConnectionClosed, EMPTY))
queue.put((conn, CONNECTION_CLOSED, EMPTY))
notified_set.add(queue_id)
_decrefQueue(queue)
finally:
......
......@@ -24,8 +24,7 @@ from errno import EAGAIN, EEXIST, EINTR, ENOENT
from . import logging
from .locking import Lock
@apply
def dictionary_changed_size_during_iteration():
def get_dictionary_changed_size_during_iteration_msg():
d = {}; i = iter(d); d[0] = 0
try:
next(i)
......@@ -33,6 +32,8 @@ def dictionary_changed_size_during_iteration():
return str(e)
raise AssertionError
dictionary_changed_size_during_iteration = get_dictionary_changed_size_during_iteration_msg()
def nonblock(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
......@@ -107,7 +108,7 @@ class EpollEventManager(object):
try:
return [x for x in self.connection_dict.itervalues()
if not x.isAborted()]
except RuntimeError, e:
except RuntimeError as e:
if str(e) != dictionary_changed_size_during_iteration:
raise
logging.info("%r", e)
......@@ -161,7 +162,7 @@ class EpollEventManager(object):
self.epoll.unregister(fd)
except KeyError:
pass
except IOError, e:
except IOError as e:
if e.errno != ENOENT:
raise
else:
......@@ -214,7 +215,7 @@ class EpollEventManager(object):
timeout = t
timeout_object = conn
break
except RuntimeError, e:
except RuntimeError as e:
if str(e) != dictionary_changed_size_during_iteration:
raise
logging.info("%r", e)
......@@ -239,7 +240,7 @@ class EpollEventManager(object):
self._closeAcquire()
try:
event_list = poll(blocking)
except IOError, exc:
except IOError as exc:
if exc.errno in (0, EAGAIN):
logging.info('epoll.poll triggered undocumented error %r',
exc.errno)
......@@ -306,7 +307,7 @@ class EpollEventManager(object):
self._trigger_list += actions
try:
os.write(self._wakeup_wfd, '\0')
except OSError, e:
except OSError as e:
# Ignore if wakeup fd is triggered many times in a row.
if e.errno != EAGAIN:
raise
......
......@@ -23,7 +23,7 @@ from .exception import (BackendNotImplemented, NonReadableCell, NotReadyError,
PacketMalformedError, PrimaryElected, ProtocolError, UnexpectedPacketError)
from .protocol import NodeStates, NodeTypes, Packets, uuid_str, Errors
from .util import cached_property
from six import reraise
class AnswerDenied(Exception):
"""Helper exception to stop packet processing and answer a Denied error"""
......@@ -72,32 +72,32 @@ class EventHandler(object):
raise UnexpectedPacketError('no handler found')
args = packet._args
method(conn, *args, **kw)
except DelayEvent, e:
except DelayEvent as e:
assert not kw, kw
self.getEventQueue().queueEvent(method, conn, args, *e.args)
except UnexpectedPacketError, e:
except UnexpectedPacketError as e:
if not conn.isClosed():
self.__unexpectedPacket(conn, packet, *e.args)
except NotReadyError, message:
except NotReadyError as message:
if not conn.isClosed():
if not message.args:
message = 'Retry Later'
message = str(message)
conn.answer(Errors.NotReady(message))
conn.abort()
except ProtocolError, message:
except ProtocolError as message:
if not conn.isClosed():
message = str(message)
conn.answer(Errors.ProtocolError(message))
conn.abort()
except BackendNotImplemented, message:
except BackendNotImplemented as message:
m = message[0]
conn.answer(Errors.BackendNotImplemented(
"%s.%s does not implement %s"
% (m.im_class.__module__, m.im_class.__name__, m.__name__)))
except NonReadableCell, e:
except NonReadableCell as e:
conn.answer(Errors.NonReadableCell())
except AnswerDenied, e:
except AnswerDenied as e:
conn.answer(Errors.Denied(str(e)))
except AssertionError:
e = sys.exc_info()
......@@ -106,7 +106,7 @@ class EventHandler(object):
conn.close()
except Exception:
logging.exception("")
raise e[0], e[1], e[2]
reraise(e[0], e[1], e[2])
finally:
del e
......
......@@ -4,7 +4,8 @@ import threading
import traceback
from collections import deque
from time import time
from Queue import Empty
from six.moves.queue import Empty
from six import reraise
"""
Verbose locking classes.
......@@ -108,7 +109,7 @@ class VerboseLockBase(object):
except self._error_class:
t, v, tb = sys.exc_info()
if str(v) == self._release_error:
raise t, "%s %s (%s)" % (v, self, me), tb
reraise(t, "%s %s (%s)" % (v, self, me), tb)
raise
def __exit__(self, t, v, tb):
......
......@@ -210,7 +210,7 @@ class MasterDB(object):
try:
with open(path) as db:
self._set = set(map(tuple, json.load(db)))
except IOError, e:
except IOError as e:
if e.errno != errno.ENOENT:
raise
self._set = set()
......@@ -504,8 +504,7 @@ class NodeManager(EventQueue):
map(Node.asTuple, self._node_set), ' * ')))
self.logQueuedEvents()
@apply
def NODE_TYPE_MAPPING():
def create_node_type_mapping():
def setmethod(cls, attr, value):
assert not hasattr(cls, attr), (cls, attr)
setattr(cls, attr, value)
......@@ -539,3 +538,5 @@ def NODE_TYPE_MAPPING():
setfullmethod(NodeManager, 'get%sList' % name, getList(node_type))
return node_type_dict
NODE_TYPE_MAPPING = create_node_type_mapping()
\ No newline at end of file
......@@ -17,6 +17,8 @@
import threading
from functools import partial
from msgpack import packb
import six
from six.moves import range
# For msgpack & Py2/ZODB5.
try:
......@@ -43,8 +45,7 @@ RESPONSE_MASK = 0x8000
# that we could compare the stream position (Unpacker.tell); it's not worth it.
UNPACK_BUFFER_SIZE = 0x4000000
@apply
def Unpacker():
def create_unpacker():
global registerExtType, packb
from msgpack import ExtType, unpackb, Packer, Unpacker
ext_type_dict = []
......@@ -75,10 +76,15 @@ def Unpacker():
return partial(Unpacker, use_list=False, max_buffer_size=UNPACK_BUFFER_SIZE,
ext_hook=lambda code, data: ext_type_dict[code](data))
Unpacker = create_unpacker()
class Enum(tuple):
class Item(int):
__slots__ = '_name', '_enum', '_pack'
# int is variable-length in Python 3 (like long in Python 2).
# Thus, it does not support __slots__.
# They are left here as a reference of the available attributes.
# __slots__ = '_name', '_enum', '_pack'
def __str__(self):
return self._name
def __repr__(self):
......@@ -89,9 +95,19 @@ class Enum(tuple):
return other is self
return other == int(self)
def __hash__(self):
"""
Hash of the enumeration item.
This needs to be defined explicitly in Python 3 if we override the
`__eq__` method.
We use the same hash as the integer equivalent of this item.
"""
return hash(int(self))
def __new__(cls, func):
names = func.func_code.co_names
self = tuple.__new__(cls, map(cls.Item, xrange(len(names))))
names = six.get_function_code(func).co_names
self = tuple.__new__(cls, map(cls.Item, range(len(names))))
self._name = func.__name__
pack = registerExtType(int, self.__getitem__)
for item, name in zip(self, names):
......@@ -232,7 +248,7 @@ UUID_NAMESPACES = {
}
uuid_str = (lambda ns: lambda uuid:
ns[uuid >> 24] + str(uuid & 0xffffff) if uuid else str(uuid)
)({v: str(k)[0] for k, v in UUID_NAMESPACES.iteritems()})
)({v: str(k)[0] for k, v in six.iteritems(UUID_NAMESPACES)})
class Packet(object):
......@@ -878,7 +894,7 @@ def formatNodeList(node_list, prefix='', _sort_key=itemgetter(2)):
for node_type, addr, uuid, state, id_timestamp
in sorted(node_list, key=_sort_key)]
t = ''.join('%%-%us | ' % max(len(x[i]) for x in node_list)
for i in xrange(len(node_list[0]) - 1))
for i in range(len(node_list[0]) - 1))
return map((prefix + t + '%s').__mod__, node_list)
return ()
......
......@@ -14,7 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import thread, threading, weakref
from six.moves import _thread
import threading, weakref
from . import debug, logging
from .app import BaseApplication
from .dispatcher import Dispatcher
......@@ -67,7 +68,7 @@ class ThreadedApplication(BaseApplication):
conn.close()
# Stop polling thread
logging.debug('Stopping %s', self.poll_thread)
self.em.wakeup(thread.exit)
self.em.wakeup(_thread.exit)
else:
super(ThreadedApplication, self).close()
......@@ -114,14 +115,16 @@ class ThreadedApplication(BaseApplication):
# connection
node = self.nm.getByAddress(conn.getAddress())
if node is None:
raise ValueError, 'Expecting an answer from a node ' \
raise ValueError(
'Expecting an answer from a node '
'which type is not known... Is this right ?'
)
if node.isStorage():
handler = self.storage_handler
elif node.isMaster():
handler = self.primary_handler
else:
raise ValueError, 'Unknown node type: %r' % (node.__class__, )
raise ValueError('Unknown node type: %r' % (node.__class__, ))
with conn.lock:
handler.dispatch(conn, packet, kw)
......
......@@ -19,7 +19,7 @@ import os, socket
from binascii import a2b_hex, b2a_hex
from datetime import timedelta, datetime
from hashlib import sha1
from Queue import deque
from collections import deque
from struct import pack, unpack, Struct
from time import gmtime
......
......@@ -43,7 +43,7 @@ from . import ClientApplication, ConnectionFilter, LockLock, NEOCluster, \
from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client import exception
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.handlers.storage import _DeadlockPacket
from neo.client.handlers.storage import DEADLOCK_PACKET
from neo.client.transactions import Transaction
from neo.master.handlers.client import ClientServiceHandler
from neo.master.pt import PartitionTable
......@@ -2237,7 +2237,7 @@ class Test(NEOThreadedTest):
except IndexError:
pass
def _handlePacket(orig, *args):
if args[2] is _DeadlockPacket:
if args[2] is DEADLOCK_PACKET:
return sched(orig, *args)
return orig(*args)
with RandomConflictDict, \
......
......@@ -3,6 +3,7 @@
from setuptools import setup, find_packages
import os
import itertools
classifiers = """\
Framework :: ZODB
......@@ -58,9 +59,13 @@ extras_require = {
'storage-pymysql': ['PyMySQL'],
'storage-importer': zodb_require + ['setproctitle'],
}
all_extra_deps = list(itertools.chain(extras_require.values()))
extras_require['tests'] = ['coverage', 'zope.testing', 'psutil>=2',
'mock', # ZODB test dependency
'neoppod[%s]' % ', '.join(extras_require)]
]
extras_require['tests'] += all_extra_deps
extras_require['stress'] = ['NetfilterQueue', 'gevent', 'neoppod[tests]',
'cython-zstd', # recommended (log rotation)
]
......@@ -115,6 +120,7 @@ setup(
install_requires = [
'msgpack>=0.5.6,<1',
'python-dateutil', # neolog --from
'six',
],
extras_require = extras_require,
package_data = {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment