Commit c321971f authored by Julien Muchembled's avatar Julien Muchembled

tests: simplify preallocation of ports

Binding a port actually does not reserve it. And on the other side, a bound
socket can't be bound again. So it could bind a port twice, warn about this,
and then raise EINVAL when trying to bind again.

Apart from this, the global lock didn't even prevent conflict with another
NEO test run when tests restart nodes. So better keep it simple.
parent f822d7d0
...@@ -20,7 +20,6 @@ import mmap ...@@ -20,7 +20,6 @@ import mmap
import os import os
import psutil import psutil
import signal import signal
import socket
import sys import sys
import tempfile import tempfile
from cPickle import dumps, loads from cPickle import dumps, loads
...@@ -29,46 +28,6 @@ from time import time, sleep ...@@ -29,46 +28,6 @@ from time import time, sleep
from neo.lib import debug from neo.lib import debug
class SocketLock(object):
"""Basic system-wide lock"""
_socket = None
def __init__(self, address, family=socket.AF_UNIX, type=socket.SOCK_DGRAM):
if family == socket.AF_UNIX:
address = '\0' + address
self.address = address
self.socket_args = family, type
def locked(self):
return self._socket is not None
def acquire(self, blocking=1):
assert self._socket is None
s = socket.socket(*self.socket_args)
try:
while True:
try:
s.bind(self.address)
except socket.error, e:
if e[0] != errno.EADDRINUSE:
raise
if not blocking:
return False
sleep(1)
else:
self._socket = s
return True
finally:
if self._socket is None:
s.close()
def release(self):
s = self._socket
del self._socket
s.close()
class ClusterDict(dict): class ClusterDict(dict):
"""Simple storage (dict), shared with forked processes""" """Simple storage (dict), shared with forked processes"""
......
...@@ -22,7 +22,6 @@ import ZODB ...@@ -22,7 +22,6 @@ import ZODB
import socket import socket
import signal import signal
import random import random
import weakref
import MySQLdb import MySQLdb
import sqlite3 import sqlite3
import unittest import unittest
...@@ -38,9 +37,8 @@ from neo.lib import logging ...@@ -38,9 +37,8 @@ from neo.lib import logging
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates, \ from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates, \
UUID_NAMESPACES UUID_NAMESPACES
from neo.lib.util import dump from neo.lib.util import dump
from .. import DB_USER, setupMySQLdb, NeoTestBase, buildUrlFromString, \ from .. import cluster, DB_USER, setupMySQLdb, NeoTestBase, buildUrlFromString, \
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory
from ..cluster import SocketLock
from neo.client.Storage import Storage from neo.client.Storage import Storage
from neo.storage.database import buildDatabaseManager from neo.storage.database import buildDatabaseManager
...@@ -67,55 +65,49 @@ class NotFound(Exception): ...@@ -67,55 +65,49 @@ class NotFound(Exception):
class PortAllocator(object): class PortAllocator(object):
lock = SocketLock('neo.PortAllocator')
allocator_set = weakref.WeakSet()
def __init__(self): def __init__(self):
self.socket_list = [] self.socket_list = []
# WKRD: this is a workaround for a weird bug allowing the same port to self.tried_port_set = set()
# be bound more than once, causing later failure in tests, when
# different processes try to bind to the same port.
self.sock_port_set = set()
def allocate(self, address_type, local_ip): def allocate(self, address_type, local_ip):
min_port = n = 16384
max_port = min_port + n
tried_port_set = self.tried_port_set
while True:
s = socket.socket(address_type, socket.SOCK_STREAM) s = socket.socket(address_type, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if not self.lock.locked(): # Find an unreserved port.
self.lock.acquire()
self.allocator_set.add(self)
self.socket_list.append(s)
sock_port_set = self.sock_port_set
while True: while True:
# Do not let the system choose the port to avoid conflicts # Do not let the system choose the port to avoid conflicts
# with other software. IOW, use a range different than: # with other software. IOW, use a range different than:
# - /proc/sys/net/ipv4/ip_local_port_range on Linux # - /proc/sys/net/ipv4/ip_local_port_range on Linux
# - what IANA recommends (49152 to 65535) # - what IANA recommends (49152 to 65535)
port = random.randint(16384, 32767) port = random.randrange(min_port, max_port)
if port not in tried_port_set:
tried_port_set.add(port)
try: try:
s.bind((local_ip, port)) s.bind((local_ip, port))
break
except socket.error, e: except socket.error, e:
if e.errno != errno.EADDRINUSE: if e.errno != errno.EADDRINUSE:
raise raise
else: elif len(tried_port_set) >= n:
if port not in sock_port_set: raise RuntimeError("No free port")
sock_port_set.add(port) # Reserve port.
try:
s.listen(1)
self.socket_list.append(s)
return port return port
logging.warning('Same port allocated twice: %s in %s', except socket.error, e:
port, sock_port_set) if e.errno != errno.EADDRINUSE:
raise
def release(self): def release(self):
for s in self.socket_list: for s in self.socket_list:
s.close() s.close()
self.__init__() self.__init__()
def reset(self): __del__ = release
if self.lock.locked():
self.allocator_set.discard(self)
if not self.allocator_set:
self.lock.release()
self.release()
__del__ = reset
class NEOProcess(object): class NEOProcess(object):
...@@ -150,9 +142,6 @@ class NEOProcess(object): ...@@ -150,9 +142,6 @@ class NEOProcess(object):
try: try:
# release SQLite debug log # release SQLite debug log
logging.setup() logging.setup()
# release system-wide lock
for allocator in PortAllocator.allocator_set.copy():
allocator.reset()
sys.argv = [command] + args sys.argv = [command] + args
getattr(neo.scripts, command).main() getattr(neo.scripts, command).main()
status = 0 status = 0
...@@ -350,7 +339,6 @@ class NEOCluster(object): ...@@ -350,7 +339,6 @@ class NEOCluster(object):
return True return True
if not pdb.wait(test, MAX_START_TIME): if not pdb.wait(test, MAX_START_TIME):
raise AssertionError('Timeout when starting cluster') raise AssertionError('Timeout when starting cluster')
self.port_allocator.reset()
def start(self, except_storages=()): def start(self, except_storages=()):
""" Do a complete start of a cluster """ """ Do a complete start of a cluster """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment