Commit 9f1e4eef authored by Julien Muchembled's avatar Julien Muchembled

Use argparse instead of optparse

Besides the use of another module for option parsing, the main change is that
there's no more Config class that mixes configuration for different components.
Application classes now takes a simple 'dict' with parsed values.

The changes in 'neoctl' are somewhat ugly, because command-line options are not
defined on the command-line class, but this component is likely to disappear
in the future.

It remains possible to pass options via a configuration file. The code is a bit
complex but isolated in neo.lib.config

For SSL, the code may be simpler if we change for a single --ssl option that
takes 3 paths. Not done to not break compatibility. Hence, the hack with
an extra OptionList class in neo.lib.app

A new functional test tests the 'neomigrate' script, instead of just the
internal API to migrate data.
parent 56d0b764
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.app import BaseApplication from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.connection import ListeningConnection from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \ from .handler import AdminEventHandler, MasterEventHandler, \
...@@ -25,24 +25,35 @@ from neo.lib.pt import PartitionTable ...@@ -25,24 +25,35 @@ from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
@buildOptionParser
class Application(BaseApplication): class Application(BaseApplication):
"""The storage node application.""" """The storage node application."""
@classmethod
def _buildOptionParser(cls):
_ = cls.option_parser
_.description = "NEO Admin node"
cls.addCommonServerOptions('admin', '127.0.0.1:9999')
_ = _.group('admin')
_.int('u', 'uuid',
help="specify an UUID to use for this process (testing purpose)")
def __init__(self, config): def __init__(self, config):
super(Application, self).__init__( super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList()) config.get('ssl'), config.get('dynamic_master_list'))
for address in config.getMasters(): for address in config['masters']:
self.nm.createMaster(address=address) self.nm.createMaster(address=address)
self.name = config.getCluster() self.name = config['cluster']
self.server = config.getBind() self.server = config['bind']
logging.debug('IP address is %s, port is %d', *self.server) logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of # The partition table is initialized after getting the number of
# partitions. # partitions.
self.pt = None self.pt = None
self.uuid = config.getUUID() self.uuid = config.get('uuid')
self.request_handler = MasterRequestEventHandler(self) self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self) self.master_event_handler = MasterEventHandler(self)
self.cluster_state = None self.cluster_state = None
......
...@@ -14,16 +14,57 @@ ...@@ -14,16 +14,57 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import logging from . import logging, util
from .config import OptionList
from .event import EventManager from .event import EventManager
from .node import NodeManager from .node import NodeManager
def buildOptionParser(cls):
parser = cls.option_parser = cls.OptionList()
_ = parser.path
_('l', 'logfile',
help="log debugging information to specified SQLite DB")
_('ca', help="(SSL) certificate authority in PEM format")
_('cert', help="(SSL) certificate in PEM format")
_('key', help="(SSL) private key in PEM format")
cls._buildOptionParser()
return cls
class BaseApplication(object): class BaseApplication(object):
class OptionList(OptionList):
def parse(self, argv=None):
config = OptionList.parse(self, argv)
ssl = (
config.pop('ca', None),
config.pop('cert', None),
config.pop('key', None),
)
if any(ssl):
config['ssl'] = ssl
return config
server = None server = None
ssl = None ssl = None
@classmethod
def addCommonServerOptions(cls, section, bind, masters='127.0.0.1:10000'):
_ = cls.option_parser.group('server node')
_.path('f', 'file', help='specify a configuration file')
_('s', 'section', default=section,
help='specify a configuration section')
_('c', 'cluster', required=True, help='the cluster name')
_('m', 'masters', default=masters, parse=util.parseMasterList,
help='master node list')
_('b', 'bind', default=bind,
parse=lambda x: util.parseNodeAddress(x, 0),
help='the local address to bind to')
_.path('D', 'dynamic-master-list',
help='path of the file containing dynamic master node list')
def __init__(self, ssl=None, dynamic_master_list=None): def __init__(self, ssl=None, dynamic_master_list=None):
if ssl: if ssl:
if not all(ssl): if not all(ssl):
......
...@@ -14,139 +14,187 @@ ...@@ -14,139 +14,187 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os import argparse, os, sys
from optparse import OptionParser from functools import wraps
from ConfigParser import SafeConfigParser, NoOptionError from ConfigParser import SafeConfigParser
from . import util
from .util import parseNodeAddress
class _Required(object):
def getOptionParser(): def __init__(self, *args):
parser = OptionParser() self._option_list, self._name = args
parser.add_option('-l', '--logfile',
help='log debugging information to specified SQLite DB') def __nonzero__(self):
parser.add_option('--ca', help='certificate authority in PEM format') with_required = self._option_list._with_required
parser.add_option('--cert', help='certificate in PEM format') return with_required is not None and self._name not in with_required
parser.add_option('--key', help='private key in PEM format')
return parser class _Option(object):
def getServerOptionParser(): def __init__(self, *args, **kw):
parser = getOptionParser() if len(args) > 1:
parser.add_option('-f', '--file', help='specify a configuration file') self.short, self.name = args
parser.add_option('-s', '--section', help='specify a configuration section')
parser.add_option('-c', '--cluster', help='the cluster name')
parser.add_option('-m', '--masters', help='master node list')
parser.add_option('-b', '--bind', help='the local address to bind to')
parser.add_option('-D', '--dynamic-master-list',
help='path of the file containing dynamic master node list')
return parser
class ConfigurationManager(object):
"""
Configuration manager that load options from a configuration file and
command line arguments
"""
def __init__(self, defaults, options, section):
self.argument_list = options = {k: v
for k, v in options.__dict__.iteritems()
if v is not None}
self.defaults = defaults
config_file = options.pop('file', None)
if config_file:
self.parser = SafeConfigParser(defaults)
self.parser.read(config_file)
else:
self.parser = None
self.section = options.pop('section', section)
def __get(self, key, optional=False):
value = self.argument_list.get(key)
if value is None:
if self.parser is None:
value = self.defaults.get(key)
else: else:
self.name, = args
self.__dict__.update(kw)
def _asArgparse(self, parser, option_list):
kw = self._argument_kw()
args = ['--' + self.name]
try: try:
value = self.parser.get(self.section, key) args.insert(0, '-' + self.short)
except NoOptionError: except AttributeError:
pass pass
if value is None and not optional: kw['help'] = self.help
raise RuntimeError("Option '%s' is undefined'" % (key, )) action = parser.add_argument(*args, **kw)
if action.required:
assert not hasattr(self, 'default')
action.required = _Required(option_list, self.name)
def fromConfigFile(self, cfg, section):
return self(cfg.get(section, self.name.replace('-', '_')))
@staticmethod
def parse(value):
return value
class BoolOption(_Option):
def _argument_kw(self):
return {'action': 'store_true'}
def __call__(self, value):
return value return value
def __getPath(self, *args, **kw): def fromConfigFile(self, cfg, section):
path = self.__get(*args, **kw) return cfg.getboolean(section, self.name)
if path:
return os.path.expanduser(path)
def getLogfile(self): class Option(_Option):
return self.__getPath('logfile', True)
def getSSL(self): @property
r = [self.__getPath(key, True) for key in ('ca', 'cert', 'key')] def __name__(self):
if any(r): return self.type.__name__
return r
def getMasters(self): def _argument_kw(self):
""" Get the master node list except itself """ kw = {'type': self}
return util.parseMasterList(self.__get('masters')) for x in 'default', 'metavar', 'required', 'choices':
try:
kw[x] = getattr(self, x)
except AttributeError:
pass
return kw
@staticmethod
def type(value):
if value:
return value
raise argparse.ArgumentTypeError('value is empty')
def getBind(self): def __call__(self, value):
""" Get the address to bind to """ return self.type(value)
bind = self.__get('bind')
return parseNodeAddress(bind, 0)
def getDisableDropPartitions(self): class OptionGroup(object):
return self.__get('disable_drop_partitions', True)
def getDatabase(self): def __init__(self, description=None):
return self.__get('database') self.description = description
self._options = []
def getEngine(self): def _asArgparse(self, parser, option_list):
return self.__get('engine', True) g = parser.add_argument_group(self.description)
for option in self._options:
option._asArgparse(g, option_list)
def set_defaults(self, **kw):
option_dict = self.getOptionDict()
for k, v in kw.iteritems():
option_dict[k].default = v
def getOptionDict(self):
option_dict = {}
for option in self._options:
if isinstance(option, OptionGroup):
option_dict.update(option.getOptionDict())
else:
option_dict[option.name.replace('-', '_')] = option
return option_dict
def getWait(self): def __call__(self, *args, **kw):
# XXX: see also DatabaseManager.__init__ self._options.append(Option(*args, **kw))
return self.__get('wait')
def getDynamicMasterList(self): def __option_type(t):
return self.__getPath('dynamic_master_list', optional=True) return wraps(t)(lambda self, *args, **kw: self(type=t, *args, **kw))
def getAdapter(self): float = __option_type(float)
return self.__get('adapter') int = __option_type(int)
path = __option_type(os.path.expanduser)
def getCluster(self): def bool(self, *args, **kw):
cluster = self.__get('cluster') self._options.append(BoolOption(*args, **kw))
assert cluster != '', "Cluster name must be non-empty"
return cluster
def getReplicas(self): class Argument(Option):
return int(self.__get('replicas'))
def getPartitions(self): def __init__(self, name, **kw):
return int(self.__get('partitions')) super(Argument, self).__init__(name, **kw)
def getReset(self): def _asArgparse(self, parser, option_list):
# only from command line kw = {'help': self.help, 'type': self}
return self.argument_list.get('reset', False) for x in 'default', 'metavar', 'nargs', 'choices':
try:
kw[x] = getattr(self, x)
except AttributeError:
pass
parser.add_argument(self.name, **kw)
def getUUID(self): class OptionList(OptionGroup):
# only from command line
uuid = self.argument_list.get('uuid', None)
if uuid:
return int(uuid)
def getUpstreamCluster(self): _with_required = None
return self.__get('upstream_cluster', True)
def getUpstreamMasters(self): def argument(self, *args, **kw):
return util.parseMasterList(self.__get('upstream_masters')) self._options.append(Argument(*args, **kw))
def getAutostart(self): def group(self, description):
n = self.__get('autostart', True) group = OptionGroup(description)
if n: self._options.append(group)
return int(n) return group
def getDedup(self): def parse(self, argv=None):
return self.__get('dedup', True) parser = argparse.ArgumentParser(description=self.description,
argument_default=argparse.SUPPRESS,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
for option in self._options:
option._asArgparse(parser, self)
_format_help = parser.format_help
def format_help():
self._with_required = ()
try:
return _format_help()
finally:
del self._with_required
parser.format_help = format_help
if argv is None:
argv = sys.argv[1:]
args = parser.parse_args(argv)
option_dict = self.getOptionDict()
try:
config_file = args.file
except AttributeError:
d = ()
else:
cfg = SafeConfigParser()
cfg.read(config_file)
section = args.section
d = {}
for name in cfg.options(section):
try:
option = option_dict[name]
except KeyError:
continue
d[name] = option.fromConfigFile(cfg, section)
parser.set_defaults(**d)
self._with_required = d
try:
args = parser.parse_args(argv)
finally:
del self._with_required
return {name: option.parse(getattr(args, name))
for name, option in option_dict.iteritems()
if hasattr(args, name)}
...@@ -18,8 +18,8 @@ import sys ...@@ -18,8 +18,8 @@ import sys
from collections import defaultdict from collections import defaultdict
from time import time from time import time
from neo.lib import logging from neo.lib import logging, util
from neo.lib.app import BaseApplication from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
...@@ -47,6 +47,7 @@ from .transactions import TransactionManager ...@@ -47,6 +47,7 @@ from .transactions import TransactionManager
from .verification import VerificationManager from .verification import VerificationManager
@buildOptionParser
class Application(BaseApplication): class Application(BaseApplication):
"""The master node application.""" """The master node application."""
packing = None packing = None
...@@ -72,26 +73,47 @@ class Application(BaseApplication): ...@@ -72,26 +73,47 @@ class Application(BaseApplication):
if self.primary and self.cluster_state == ClusterStates.RECOVERING: if self.primary and self.cluster_state == ClusterStates.RECOVERING:
return self.primary return self.primary
@classmethod
def _buildOptionParser(cls):
_ = cls.option_parser
_.description = "NEO Master node"
cls.addCommonServerOptions('master', '127.0.0.1:10000', '')
_ = _.group('master')
_.int('r', 'replicas', default=0, help="replicas number")
_.int('p', 'partitions', default=100, help="partitions number")
_.int('A', 'autostart',
help="minimum number of pending storage nodes to automatically"
" start new cluster (to avoid unwanted recreation of the"
" cluster, this should be the total number of storage nodes)")
_('C', 'upstream-cluster',
help='the name of cluster to backup')
_('M', 'upstream-masters', parse=util.parseMasterList,
help='list of master nodes in the cluster to backup')
_.int('u', 'uuid',
help="specify an UUID to use for this process (testing purpose)")
def __init__(self, config): def __init__(self, config):
super(Application, self).__init__( super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList()) config.get('ssl'), config.get('dynamic_master_list'))
self.tm = TransactionManager(self.onTransactionCommitted) self.tm = TransactionManager(self.onTransactionCommitted)
self.name = config.getCluster() self.name = config['cluster']
self.server = config.getBind() self.server = config['bind']
self.autostart = config.getAutostart() self.autostart = config.get('autostart')
self.storage_ready_dict = {} self.storage_ready_dict = {}
self.storage_starting_set = set() self.storage_starting_set = set()
for master_address in config.getMasters(): for master_address in config['masters']:
self.nm.createMaster(address=master_address) self.nm.createMaster(address=master_address)
self._node = self.nm.createMaster(address=self.server, self._node = self.nm.createMaster(address=self.server,
uuid=config.getUUID()) uuid=config.get('uuid'))
logging.debug('IP address is %s, port is %d', *self.server) logging.debug('IP address is %s, port is %d', *self.server)
# Partition table # Partition table
replicas, partitions = config.getReplicas(), config.getPartitions() replicas = config['replicas']
partitions = config['partitions']
if replicas < 0: if replicas < 0:
raise RuntimeError, 'replicas must be a positive integer' raise RuntimeError, 'replicas must be a positive integer'
if partitions <= 0: if partitions <= 0:
...@@ -107,13 +129,13 @@ class Application(BaseApplication): ...@@ -107,13 +129,13 @@ class Application(BaseApplication):
self._current_manager = None self._current_manager = None
# backup # backup
upstream_cluster = config.getUpstreamCluster() upstream_cluster = config.get('upstream_cluster')
if upstream_cluster: if upstream_cluster:
if upstream_cluster == self.name: if upstream_cluster == self.name:
raise ValueError("upstream cluster name must be" raise ValueError("upstream cluster name must be"
" different from cluster name") " different from cluster name")
self.backup_app = BackupApplication(self, upstream_cluster, self.backup_app = BackupApplication(self, upstream_cluster,
config.getUpstreamMasters()) config['upstream_masters'])
self.administration_handler = administration.AdministrationHandler( self.administration_handler = administration.AdministrationHandler(
self) self)
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from .neoctl import NeoCTL, NotReadyException from .neoctl import NeoCTL, NotReadyException
from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID
from neo.lib.protocol import uuid_str, formatNodeList, \ from neo.lib.protocol import uuid_str, formatNodeList, \
...@@ -266,18 +267,18 @@ class Application(object): ...@@ -266,18 +267,18 @@ class Application(object):
# state (RUNNING, DOWN...) and modify the partition if asked # state (RUNNING, DOWN...) and modify the partition if asked
# set cluster name [shutdown|operational] : either shutdown the # set cluster name [shutdown|operational] : either shutdown the
# cluster or mark it as operational # cluster or mark it as operational
if not args:
return self.usage()
current_action = action_dict current_action = action_dict
level = 0 level = 0
while current_action is not None and \ try:
level < len(args) and \ while level < len(args) and \
isinstance(current_action, dict): isinstance(current_action, dict):
current_action = current_action.get(args[level]) current_action = current_action[args[level]]
level += 1 level += 1
action = None except KeyError:
if isinstance(current_action, basestring): sys.exit('invalid command: ' + ' '.join(args))
action = getattr(self.neoctl, current_action, None) action = getattr(self.neoctl, current_action)
if action is None:
return self.usage('unknown command')
try: try:
return action(args[level:]) return action(args[level:])
except NotReadyException, message: except NotReadyException, message:
...@@ -312,8 +313,8 @@ class Application(object): ...@@ -312,8 +313,8 @@ class Application(object):
for x in docstring_line_list]) for x in docstring_line_list])
return '\n'.join(result) return '\n'.join(result)
def usage(self, message): def usage(self):
output_list = (message, 'Available commands:', self._usage(action_dict), output_list = ('Available commands:', self._usage(action_dict),
"TID arguments can be either integers or timestamps as floats," "TID arguments can be either integers or timestamps as floats,"
" e.g. '257684787499560686', '0x3937af2eeeeeeee' or '1325421296.'" " e.g. '257684787499560686', '0x3937af2eeeeeeee' or '1325421296.'"
" for 2012-01-01 12:34:56 UTC") " for 2012-01-01 12:34:56 UTC")
......
...@@ -14,7 +14,9 @@ ...@@ -14,7 +14,9 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.app import BaseApplication import argparse
from neo.lib import util
from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.connection import ClientConnection, ConnectionClosed from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
from .handler import CommandEventHandler from .handler import CommandEventHandler
...@@ -22,11 +24,24 @@ from .handler import CommandEventHandler ...@@ -22,11 +24,24 @@ from .handler import CommandEventHandler
class NotReadyException(Exception): class NotReadyException(Exception):
pass pass
@buildOptionParser
class NeoCTL(BaseApplication): class NeoCTL(BaseApplication):
connection = None connection = None
connected = False connected = False
@classmethod
def _buildOptionParser(cls):
# XXX: Use argparse sub-commands.
parser = cls.option_parser
parser.description = "NEO Control node"
parser('a', 'address', default='127.0.0.1:9999',
parse=lambda x: util.parseNodeAddress(x, 9999),
help="address of an admin node")
parser.argument('cmd', nargs=argparse.REMAINDER,
help="command to execute; if not supplied,"
" the list of available commands is displayed")
def __init__(self, address, **kw): def __init__(self, address, **kw):
super(NeoCTL, self).__init__(**kw) super(NeoCTL, self).__init__(**kw)
self.server = self.nm.createAdmin(address=address) self.server = self.nm.createAdmin(address=address)
......
...@@ -18,27 +18,15 @@ ...@@ -18,27 +18,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.config import getServerOptionParser, ConfigurationManager
parser = getServerOptionParser()
parser.add_option('-u', '--uuid', help='specify an UUID to use for this ' \
'process')
defaults = dict(
bind = '127.0.0.1:9999',
masters = '127.0.0.1:10000',
)
def main(args=None): def main(args=None):
# build configuration dict from command line options from neo.admin.app import Application
(options, args) = parser.parse_args(args=args) config = Application.option_parser.parse(args)
config = ConfigurationManager(defaults, options, 'admin')
# setup custom logging # setup custom logging
logging.setup(config.getLogfile()) logging.setup(config.get('logfile'))
# and then, load and run the application # and then, load and run the application
from neo.admin.app import Application
app = Application(config) app = Application(config)
app.run() app.run()
...@@ -18,30 +18,22 @@ ...@@ -18,30 +18,22 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.config import getOptionParser
from neo.lib.util import parseNodeAddress
parser = getOptionParser()
parser.add_option('-a', '--address', help = 'specify the address (ip:port) ' \
'of an admin node', default = '127.0.0.1:9999')
def main(args=None): def main(args=None):
(options, args) = parser.parse_args(args=args) from neo.neoctl.neoctl import NeoCTL
if options.address is not None: config = NeoCTL.option_parser.parse(args)
address = parseNodeAddress(options.address, 9999)
else:
address = ('127.0.0.1', 9999)
if options.logfile: logfile = config.get('logfile')
if logfile:
# Contrary to daemons, we log everything to disk automatically # Contrary to daemons, we log everything to disk automatically
# because a user using -l option here: # because a user using -l option here:
# - is certainly debugging an issue and wants everything, # - is certainly debugging an issue and wants everything,
# - would not have to time to send SIGRTMIN before neoctl exits. # - would not have to time to send SIGRTMIN before neoctl exits.
logging.backlog(None) logging.backlog(None)
logging.setup(options.logfile) logging.setup(logfile)
from neo.neoctl.app import Application
ssl = options.ca, options.cert, options.key from neo.neoctl.app import Application
r = Application(address, ssl=ssl if any(ssl) else None).execute(args) app = Application(config['address'], ssl=config.get('ssl'))
r = app.execute(config['cmd'])
if r is not None: if r is not None:
print r print r
...@@ -18,38 +18,14 @@ ...@@ -18,38 +18,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.config import getServerOptionParser, ConfigurationManager
parser = getServerOptionParser()
parser.add_option('-u', '--uuid', help='the node UUID (testing purpose)')
parser.add_option('-r', '--replicas', help = 'replicas number')
parser.add_option('-p', '--partitions', help = 'partitions number')
parser.add_option('-A', '--autostart',
help='minimum number of pending storage nodes to automatically start'
' new cluster (to avoid unwanted recreation of the cluster,'
' this should be the total number of storage nodes)')
parser.add_option('-C', '--upstream-cluster',
help='the name of cluster to backup')
parser.add_option('-M', '--upstream-masters',
help='list of master nodes in cluster to backup')
defaults = dict(
bind = '127.0.0.1:10000',
masters = '',
replicas = 0,
partitions = 100,
)
def main(args=None): def main(args=None):
# build configuration dict from command line options from neo.master.app import Application
(options, args) = parser.parse_args(args=args) config = Application.option_parser.parse(args)
config = ConfigurationManager(defaults, options, 'master')
# setup custom logging # setup custom logging
logging.setup(config.getLogfile()) logging.setup(config.get('logfile'))
# and then, load and run the application # and then, load and run the application
from neo.master.app import Application
app = Application(config) app = Application(config)
app.run() app.run()
...@@ -17,51 +17,62 @@ ...@@ -17,51 +17,62 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.config import getOptionParser from __future__ import print_function
import time import time
import os import os
from neo.lib.app import buildOptionParser
# register options import_warning = (
parser = getOptionParser() "WARNING: This is not the recommended way to import data to NEO:"
parser.add_option('-s', '--source', help='the source database') " you should use the Importer backend instead.\n"
parser.add_option('-d', '--destination', help='the destination database') "NEO also does not implement IStorageRestoreable interface, which"
parser.add_option('-c', '--cluster', help='the NEO cluster name') " means that undo information is not preserved when using this tool:"
" conflict resolution could happen when undoing an old transaction."
)
def main(args=None): @buildOptionParser
# parse options class NEOMigrate(object):
(options, args) = parser.parse_args(args=args)
source = options.source or None from neo.lib.config import OptionList
destination = options.destination or None
cluster = options.cluster or None @classmethod
def _buildOptionParser(cls):
parser = cls.option_parser
parser.description = "NEO <-> FileStorage conversion tool"
parser('c', 'cluster', required=True, help='the NEO cluster name')
parser.bool('q', 'quiet', help='print nothing to standard output')
parser.argument('source', help='the source database')
parser.argument('destination', help='the destination database')
# check options def __init__(self, config):
if source is None or destination is None: self.name = config.pop('cluster')
raise RuntimeError('Source and destination databases must be supplied') self.source = config.pop('source')
if cluster is None: self.destination = config.pop('destination')
raise RuntimeError('The NEO cluster name must be supplied') self.quiet = config.pop('quiet', False)
# open storages
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from neo.client.Storage import Storage as NEOStorage from neo.client.Storage import Storage as NEOStorage
if os.path.exists(source): if os.path.exists(self.source):
print("WARNING: This is not the recommended way to import data to NEO:" if not self.quiet:
" you should use the Importer backend instead.\n" print(import_warning)
"NEO also does not implement IStorageRestoreable interface," self.src = FileStorage(file_name=self.source, read_only=True)
" which means that undo information is not preserved when using" self.dst = NEOStorage(master_nodes=self.destination, name=self.name,
" this tool: conflict resolution could happen when undoing an" **config)
" old transaction.")
src = FileStorage(file_name=source, read_only=True)
dst = NEOStorage(master_nodes=destination, name=cluster,
logfile=options.logfile)
else: else:
src = NEOStorage(master_nodes=source, name=cluster, self.src = NEOStorage(master_nodes=self.source, name=self.name,
logfile=options.logfile, read_only=True) read_only=True, **config)
dst = FileStorage(file_name=destination) self.dst = FileStorage(file_name=self.destination)
# do the job def run(self):
print "Migrating from %s to %s" % (source, destination) if not self.quiet:
print("Migrating from %s to %s" % (self.source, self.destination))
start = time.time() start = time.time()
dst.copyTransactionsFrom(src) self.dst.copyTransactionsFrom(self.src)
if not self.quiet:
elapsed = time.time() - start elapsed = time.time() - start
print "Migration done in %3.5f" % (elapsed, ) print("Migration done in %3.5f" % elapsed)
def main(args=None):
config = NEOMigrate.option_parser.parse(args)
NEOMigrate(config).run()
...@@ -18,49 +18,15 @@ ...@@ -18,49 +18,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.config import getServerOptionParser, ConfigurationManager
parser = getServerOptionParser()
parser.add_option('-u', '--uuid', help='specify an UUID to use for this ' \
'process. Previously assigned UUID takes precedence (ie ' \
'you should always use --reset with this switch)')
parser.add_option('-a', '--adapter', help = 'database adapter to use')
parser.add_option('-d', '--database', help = 'database connections string')
parser.add_option('-e', '--engine', help = 'database engine')
parser.add_option('-w', '--wait', help='seconds to wait for backend to be '
'available, before erroring-out (-1 = infinite)', type='float', default=0)
parser.add_option('--dedup', action='store_true',
help = 'enable deduplication of data when setting'
' up a new storage node (for RocksDB, check'
' https://github.com/facebook/mysql-5.6/issues/702)')
parser.add_option('--disable-drop-partitions', action='store_true',
help = 'do not delete data of discarded cells, which is'
' useful for big databases because the current'
' implementation is inefficient (this option should'
' disappear in the future)')
parser.add_option('--reset', action='store_true',
help='remove an existing database if any, and exit')
defaults = dict(
bind = '127.0.0.1',
masters = '127.0.0.1:10000',
adapter = 'MySQL',
)
def main(args=None): def main(args=None):
# TODO: Forbid using "reset" along with any unneeded argument. from neo.storage.app import Application
# "reset" is too dangerous to let user a chance of accidentally config = Application.option_parser.parse(args)
# letting it slip through in a long option list.
# We should drop support configuration files to make such check useful.
(options, args) = parser.parse_args(args=args)
config = ConfigurationManager(defaults, options, 'storage')
# setup custom logging # setup custom logging
logging.setup(config.getLogfile()) logging.setup(config.get('logfile'))
# and then, load and run the application # and then, load and run the application
from neo.storage.app import Application
app = Application(config) app = Application(config)
if not config.getReset(): if not config.get('reset'):
app.run() app.run()
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import traceback import traceback
import unittest import unittest
import time import time
...@@ -274,41 +275,43 @@ class NeoTestRunner(unittest.TextTestResult): ...@@ -274,41 +275,43 @@ class NeoTestRunner(unittest.TextTestResult):
class TestRunner(BenchmarkRunner): class TestRunner(BenchmarkRunner):
def add_options(self, parser): def add_options(self, parser):
parser.add_option('-c', '--coverage', action='store_true', x = parser.add_mutually_exclusive_group().add_argument
x('-c', '--coverage', action='store_true',
help='Enable coverage') help='Enable coverage')
parser.add_option('-C', '--cov-unit', action='store_true', x('-C', '--cov-unit', action='store_true',
help='Same as -c but output 1 file per test,' help='Same as -c but output 1 file per test,'
' in the temporary test directory') ' in the temporary test directory')
parser.add_option('-L', '--log', action='store_true', _ = parser.add_argument
_('-L', '--log', action='store_true',
help='Force all logs to be emitted immediately and keep' help='Force all logs to be emitted immediately and keep'
' packet body in logs of successful threaded tests') ' packet body in logs of successful threaded tests')
parser.add_option('-l', '--loop', type='int', default=1, _('-l', '--loop', type=int, default=1,
help='Repeat tests several times') help='Repeat tests several times')
parser.add_option('-f', '--functional', action='store_true', _('-f', '--functional', action='store_true',
help='Functional tests') help='Functional tests')
parser.add_option('-s', '--stop-on-error', action='store_false', x = parser.add_mutually_exclusive_group().add_argument
dest='stop_on_success', x('-s', '--stop-on-error', action='store_false',
dest='stop_on_success', default=None,
help='Continue as long as tests pass successfully.' help='Continue as long as tests pass successfully.'
' It is usually combined with --loop, to check that tests' ' It is usually combined with --loop, to check that tests'
' do not fail randomly.') ' do not fail randomly.')
parser.add_option('-S', '--stop-on-success', action='store_true', x('-S', '--stop-on-success', action='store_true', default=None,
help='Opposite of --stop-on-error: stop as soon as a test' help='Opposite of --stop-on-error: stop as soon as a test'
' passes. Details about errors are not printed at exit.') ' passes. Details about errors are not printed at exit.')
parser.add_option('-r', '--readable-tid', action='store_true', _('-r', '--readable-tid', action='store_true',
help='Change master behaviour to generate readable TIDs for easier' help='Change master behaviour to generate readable TIDs for easier'
' debugging (rather than from current time).') ' debugging (rather than from current time).')
parser.add_option('-u', '--unit', action='store_true', _('-u', '--unit', action='store_true',
help='Unit & threaded tests') help='Unit & threaded tests')
parser.add_option('-z', '--zodb', action='store_true', _('-z', '--zodb', action='store_true',
help='ZODB test suite running on a NEO') help='ZODB test suite running on a NEO')
parser.add_option('-v', '--verbose', action='store_true', _('-v', '--verbose', action='store_true',
help='Verbose output') help='Verbose output')
parser.usage += " [[!] module [test...]]" _('only', nargs=argparse.REMAINDER, metavar='[[!] module [test...]]',
parser.format_epilog = lambda _: """ help="Filter by given module/test. These arguments are shell"
Positional: " patterns. This implies -ufz if none of this option is"
Filter by given module/test. These arguments are shell patterns. " passed.")
This implies -ufz if none of this option is passed. parser.epilog = """
Environment Variables: Environment Variables:
NEO_TESTS_ADAPTER Default is SQLite for threaded clusters, NEO_TESTS_ADAPTER Default is SQLite for threaded clusters,
MySQL otherwise. MySQL otherwise.
...@@ -329,25 +332,23 @@ Environment Variables: ...@@ -329,25 +332,23 @@ Environment Variables:
NEO_TEST_ZODB_STORAGES default: 1 NEO_TEST_ZODB_STORAGES default: 1
""" % neo_tests__dict__ """ % neo_tests__dict__
def load_options(self, options, args): def load_options(self, args):
if options.coverage and options.cov_unit: if not (args.unit or args.functional or args.zodb):
sys.exit('-c conflicts with -C') if not args.only:
if not (options.unit or options.functional or options.zodb):
if not args:
sys.exit('Nothing to run, please give one of -f, -u, -z') sys.exit('Nothing to run, please give one of -f, -u, -z')
options.unit = options.functional = options.zodb = True args.unit = args.functional = args.zodb = True
return dict( return dict(
log = options.log, log = args.log,
loop = options.loop, loop = args.loop,
unit = options.unit, unit = args.unit,
functional = options.functional, functional = args.functional,
zodb = options.zodb, zodb = args.zodb,
verbosity = 2 if options.verbose else 1, verbosity = 2 if args.verbose else 1,
coverage = options.coverage, coverage = args.coverage,
cov_unit = options.cov_unit, cov_unit = args.cov_unit,
only = args, only = args.only,
stop_on_success = options.stop_on_success, stop_on_success = args.stop_on_success,
readable_tid = options.readable_tid, readable_tid = args.readable_tid,
) )
def start(self): def start(self):
......
...@@ -15,9 +15,8 @@ ...@@ -15,9 +15,8 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import inspect, random import argparse, inspect, random
from logging import getLogger, INFO from logging import getLogger, INFO
from optparse import OptionParser
from neo.lib import logging from neo.lib import logging
from neo.tests import functional from neo.tests import functional
logging.backlog() logging.backlog()
...@@ -26,9 +25,9 @@ del logging.default_root_handler.handle ...@@ -26,9 +25,9 @@ del logging.default_root_handler.handle
def main(): def main():
args, _, _, defaults = inspect.getargspec(functional.NEOCluster.__init__) args, _, _, defaults = inspect.getargspec(functional.NEOCluster.__init__)
option_list = zip(args[-len(defaults):], defaults) option_list = zip(args[-len(defaults):], defaults)
parser = OptionParser(usage="%prog [options] [db...]", parser = argparse.ArgumentParser(
description="Quickly setup a simple NEO cluster for testing purpose.") description="Quickly setup a simple NEO cluster for testing purpose.")
parser.add_option('--seed', help="settings like node ports/uuids and" parser.add_argument('--seed', help="settings like node ports/uuids and"
" cluster name are random: pass any string to initialize the RNG") " cluster name are random: pass any string to initialize the RNG")
defaults = {} defaults = {}
for option, default in sorted(option_list): for option, default in sorted(option_list):
...@@ -39,14 +38,15 @@ def main(): ...@@ -39,14 +38,15 @@ def main():
elif default is not None: elif default is not None:
defaults[option] = default defaults[option] = default
if isinstance(default, int): if isinstance(default, int):
kw['type'] = "int" kw['type'] = int
parser.add_option('--' + option, **kw) parser.add_argument('--' + option, **kw)
parser.set_defaults(**defaults) parser.set_defaults(**defaults)
options, args = parser.parse_args() parser.add_argument('db', nargs='+')
if options.seed: args = parser.parse_args()
functional.random = random.Random(options.seed) if args.seed:
functional.random = random.Random(args.seed)
getLogger().setLevel(INFO) getLogger().setLevel(INFO)
cluster = functional.NEOCluster(args, **{x: getattr(options, x) cluster = functional.NEOCluster(args.db, **{x: getattr(args, x)
for x, _ in option_list}) for x, _ in option_list})
try: try:
cluster.run() cluster.run()
......
...@@ -18,7 +18,7 @@ import sys ...@@ -18,7 +18,7 @@ import sys
from collections import deque from collections import deque
from neo.lib import logging from neo.lib import logging
from neo.lib.app import BaseApplication from neo.lib.app import BaseApplication, buildOptionParser
from neo.lib.protocol import uuid_str, \ from neo.lib.protocol import uuid_str, \
CellStates, ClusterStates, NodeTypes, Packets CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.connection import ListeningConnection from neo.lib.connection import ListeningConnection
...@@ -27,35 +27,80 @@ from neo.lib.pt import PartitionTable ...@@ -27,35 +27,80 @@ from neo.lib.pt import PartitionTable
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.bootstrap import BootstrapManager from neo.lib.bootstrap import BootstrapManager
from .checker import Checker from .checker import Checker
from .database import buildDatabaseManager from .database import buildDatabaseManager, DATABASE_MANAGER_DICT
from .handlers import identification, initialization, master from .handlers import identification, initialization, master
from .replicator import Replicator from .replicator import Replicator
from .transactions import TransactionManager from .transactions import TransactionManager
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
option_defaults = {
'adapter': 'MySQL',
'wait': 0,
}
assert option_defaults['adapter'] in DATABASE_MANAGER_DICT
@buildOptionParser
class Application(BaseApplication): class Application(BaseApplication):
"""The storage node application.""" """The storage node application."""
checker = replicator = tm = None checker = replicator = tm = None
@classmethod
def _buildOptionParser(cls):
parser = cls.option_parser
parser.description = "NEO Storage node"
cls.addCommonServerOptions('storage', '127.0.0.1')
_ = parser.group('storage')
_('a', 'adapter', choices=sorted(DATABASE_MANAGER_DICT),
help="database adapter to use")
_('d', 'database', required=True,
help="database connections string")
_.float('w', 'wait',
help="seconds to wait for backend to be available,"
" before erroring-out (-1 = infinite)")
_.bool('disable-drop-partitions',
help="do not delete data of discarded cells, which is useful for"
" big databases because the current implementation is"
" inefficient (this option should disappear in the future)")
_ = parser.group('database creation')
_.int('u', 'uuid',
help="specify an UUID to use for this process. Previously"
" assigned UUID takes precedence (i.e. you should"
" always use reset with this switch)")
_('e', 'engine', help="database engine (MySQL only)")
_.bool('dedup',
help="enable deduplication of data"
" when setting up a new storage node")
# TODO: Forbid using "reset" along with any unneeded argument.
# "reset" is too dangerous to let user a chance of accidentally
# letting it slip through in a long option list.
# It should even be forbidden in configuration files.
_.bool('reset',
help="remove an existing database if any, and exit")
parser.set_defaults(**option_defaults)
def __init__(self, config): def __init__(self, config):
super(Application, self).__init__( super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList()) config.get('ssl'), config.get('dynamic_master_list'))
# set the cluster name # set the cluster name
self.name = config.getCluster() self.name = config['cluster']
self.dm = buildDatabaseManager(config.getAdapter(), self.dm = buildDatabaseManager(config['adapter'],
(config.getDatabase(), config.getEngine(), config.getWait()), (config['database'], config.get('engine'), config['wait']),
) )
self.disable_drop_partitions = config.getDisableDropPartitions() self.disable_drop_partitions = config.get('disable_drop_partitions',
False)
# load master nodes # load master nodes
for master_address in config.getMasters(): for master_address in config['masters']:
self.nm.createMaster(address=master_address) self.nm.createMaster(address=master_address)
# set the bind address # set the bind address
self.server = config.getBind() self.server = config['bind']
logging.debug('IP address is %s, port is %d', *self.server) logging.debug('IP address is %s, port is %d', *self.server)
# The partition table is initialized after getting the number of # The partition table is initialized after getting the number of
...@@ -69,13 +114,14 @@ class Application(BaseApplication): ...@@ -69,13 +114,14 @@ class Application(BaseApplication):
# operation related data # operation related data
self.operational = False self.operational = False
self.dm.setup(reset=config.getReset(), dedup=config.getDedup()) self.dm.setup(reset=config.get('reset', False),
dedup=config.get('dedup', False))
self.loadConfiguration() self.loadConfiguration()
self.devpath = self.dm.getTopologyPath() self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only # force node uuid from command line argument, for testing purpose only
if config.getUUID() is not None: if 'uuid' in config:
self.uuid = config.getUUID() self.uuid = config['uuid']
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
......
...@@ -31,6 +31,7 @@ except ImportError: ...@@ -31,6 +31,7 @@ except ImportError:
_protocol = 1 _protocol = 1
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ..app import option_defaults
from . import buildDatabaseManager, DatabaseFailure from . import buildDatabaseManager, DatabaseFailure
from .manager import DatabaseManager from .manager import DatabaseManager
from neo.lib import compress, logging, patch, util from neo.lib import compress, logging, patch, util
...@@ -359,8 +360,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -359,8 +360,7 @@ class ImporterDatabaseManager(DatabaseManager):
config = SafeConfigParser() config = SafeConfigParser()
config.read(os.path.expanduser(database)) config.read(os.path.expanduser(database))
sections = config.sections() sections = config.sections()
# XXX: defaults copy & pasted from elsewhere - refactoring needed main = self._conf = option_defaults.copy()
main = self._conf = {'adapter': 'MySQL', 'wait': 0}
main.update(config.items(sections.pop(0))) main.update(config.items(sections.pop(0)))
self.zodb = [(x, dict(config.items(x))) for x in sections] self.zodb = [(x, dict(config.items(x))) for x in sections]
x = main.get('compress', 'true') x = main.get('compress', 'true')
......
...@@ -255,14 +255,14 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -255,14 +255,14 @@ class NeoUnitTestBase(NeoTestBase):
assert master_number >= 1 and master_number <= 10 assert master_number >= 1 and master_number <= 10
masters = ([(self.local_ip, 10010 + i) masters = ([(self.local_ip, 10010 + i)
for i in xrange(master_number)]) for i in xrange(master_number)])
return Mock({ return {
'getCluster': cluster, 'cluster': cluster,
'getBind': masters[0], 'bind': masters[0],
'getMasters': masters, 'masters': masters,
'getReplicas': replicas, 'replicas': replicas,
'getPartitions': partitions, 'partitions': partitions,
'getUUID': uuid, 'uuid': uuid,
}) }
def getStorageConfiguration(self, cluster='main', master_number=2, def getStorageConfiguration(self, cluster='main', master_number=2,
index=0, prefix=DB_PREFIX, uuid=None): index=0, prefix=DB_PREFIX, uuid=None):
...@@ -277,15 +277,15 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -277,15 +277,15 @@ class NeoUnitTestBase(NeoTestBase):
db = os.path.join(getTempDirectory(), 'test_neo%s.sqlite' % index) db = os.path.join(getTempDirectory(), 'test_neo%s.sqlite' % index)
else: else:
assert False, adapter assert False, adapter
return Mock({ return {
'getCluster': cluster, 'cluster': cluster,
'getBind': (masters[0], 10020 + index), 'bind': (masters[0], 10020 + index),
'getMasters': masters, 'masters': masters,
'getDatabase': db, 'database': db,
'getUUID': uuid, 'uuid': uuid,
'getReset': False, 'adapter': adapter,
'getAdapter': adapter, 'wait': 0,
}) }
def getNewUUID(self, node_type): def getNewUUID(self, node_type):
""" """
......
from __future__ import print_function from __future__ import print_function
import argparse
import sys import sys
import smtplib import smtplib
import optparse
import platform import platform
import datetime import datetime
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
...@@ -22,28 +22,28 @@ class BenchmarkRunner(object): ...@@ -22,28 +22,28 @@ class BenchmarkRunner(object):
def __init__(self): def __init__(self):
self._successful = True self._successful = True
self._status = [] self._status = []
parser = optparse.OptionParser() parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter)
# register common options # register common options
parser.add_option('', '--title') _ = parser.add_argument
parser.add_option('', '--mail-to', action='append') _('--title')
parser.add_option('', '--mail-from') _('--mail-to', action='append')
parser.add_option('', '--mail-server') _('--mail-from')
parser.add_option('', '--repeat', type='int', default=1) _('--mail-server')
self.add_options(parser) self.add_options(parser)
# check common arguments # check common arguments
options, args = parser.parse_args() args = parser.parse_args()
if bool(options.mail_to) ^ bool(options.mail_from): if bool(args.mail_to) ^ bool(args.mail_from):
sys.exit('Need a sender and recipients to mail report') sys.exit('Need a sender and recipients to mail report')
mail_server = options.mail_server or MAIL_SERVER mail_server = args.mail_server or MAIL_SERVER
# check specifics arguments # check specifics arguments
self._config = AttributeDict() self._config = AttributeDict()
self._config.update(self.load_options(options, args)) self._config.update(self.load_options(args))
self._config.update( self._config.update(
title = options.title or self.__class__.__name__, title = args.title or self.__class__.__name__,
mail_from = options.mail_from, mail_from = args.mail_from,
mail_to = options.mail_to, mail_to = args.mail_to,
mail_server = mail_server.split(':'), mail_server = mail_server.split(':'),
repeat = options.repeat,
) )
def add_status(self, key, value): def add_status(self, key, value):
...@@ -104,7 +104,7 @@ class BenchmarkRunner(object): ...@@ -104,7 +104,7 @@ class BenchmarkRunner(object):
""" Append options to command line parser """ """ Append options to command line parser """
raise NotImplementedError raise NotImplementedError
def load_options(self, options, args): def load_options(self, args):
""" Check options and return a configuration dict """ """ Check options and return a configuration dict """
raise NotImplementedError raise NotImplementedError
......
...@@ -122,9 +122,10 @@ class Process(object): ...@@ -122,9 +122,10 @@ class Process(object):
_coverage_index = 0 _coverage_index = 0
pid = 0 pid = 0
def __init__(self, command, arg_dict={}): def __init__(self, command, *args, **kw):
self.command = command self.command = command
self.arg_dict = arg_dict self.args = args
self.arg_dict = kw
def _args(self): def _args(self):
args = [] args = []
...@@ -132,6 +133,7 @@ class Process(object): ...@@ -132,6 +133,7 @@ class Process(object):
args.append('--' + arg) args.append('--' + arg)
if param is not None: if param is not None:
args.append(str(param)) args.append(str(param))
args += self.args
return args return args
def start(self): def start(self):
...@@ -239,8 +241,8 @@ class Process(object): ...@@ -239,8 +241,8 @@ class Process(object):
self.pid = 0 self.pid = 0
self.child_coverage() self.child_coverage()
if result: if result:
raise NodeProcessError('%r %r exited with status %r' % ( raise NodeProcessError('%r %r %r exited with status %r' % (
self.command, self.arg_dict, result)) self.command, self.args, self.arg_dict, result))
return result return result
def stop(self): def stop(self):
...@@ -255,18 +257,18 @@ class Process(object): ...@@ -255,18 +257,18 @@ class Process(object):
class NEOProcess(Process): class NEOProcess(Process):
def __init__(self, command, uuid, arg_dict): def __init__(self, command, *args, **kw):
try: try:
__import__('neo.scripts.' + command, level=0) __import__('neo.scripts.' + command, level=0)
except ImportError: except ImportError:
raise NotFound(command + ' not found') raise NotFound(command + ' not found')
super(NEOProcess, self).__init__(command, arg_dict) self.setUUID(kw.pop('uuid', None))
self.setUUID(uuid) super(NEOProcess, self).__init__(command, *args, **kw)
def _args(self): def _args(self):
args = super(NEOProcess, self)._args() args = super(NEOProcess, self)._args()
if self.uuid: if self.uuid:
args += '--uuid', str(self.uuid) args[:0] = '--uuid', str(self.uuid)
return args return args
def run(self): def run(self):
...@@ -360,7 +362,7 @@ class NEOCluster(object): ...@@ -360,7 +362,7 @@ class NEOCluster(object):
if self.SSL: if self.SSL:
kw['ca'], kw['cert'], kw['key'] = self.SSL kw['ca'], kw['cert'], kw['key'] = self.SSL
self.process_dict.setdefault(node_type, []).append( self.process_dict.setdefault(node_type, []).append(
NEOProcess(command_dict[node_type], uuid, kw)) NEOProcess(command_dict[node_type], uuid=uuid, **kw))
def setupDB(self, clear_databases=True): def setupDB(self, clear_databases=True):
if self.adapter == 'MySQL': if self.adapter == 'MySQL':
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import os import os
import unittest import unittest
import transaction import transaction
...@@ -25,7 +26,7 @@ from neo.lib.util import makeChecksum, u64 ...@@ -25,7 +26,7 @@ from neo.lib.util import makeChecksum, u64
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ZODB.tests.StorageTestBase import zodb_pickle from ZODB.tests.StorageTestBase import zodb_pickle
from persistent import Persistent from persistent import Persistent
from . import NEOCluster, NEOFunctionalTest from . import NEOCluster, NEOFunctionalTest, NEOProcess
TREE_SIZE = 6 TREE_SIZE = 6
...@@ -120,13 +121,13 @@ class ClientTests(NEOFunctionalTest): ...@@ -120,13 +121,13 @@ class ClientTests(NEOFunctionalTest):
self.__checkTree(tree.right, depth) self.__checkTree(tree.right, depth)
self.__checkTree(tree.left, depth) self.__checkTree(tree.left, depth)
def __getDataFS(self, reset=False): def __getDataFS(self):
name = os.path.join(self.getTempDirectory(), 'data.fs') name = os.path.join(self.getTempDirectory(), 'data.fs')
if reset and os.path.exists(name): if os.path.exists(name):
os.remove(name) os.remove(name)
return FileStorage(file_name=name) return FileStorage(file_name=name)
def __populate(self, db, tree_size=TREE_SIZE): def __populate(self, db, tree_size=TREE_SIZE, with_undo=True):
if isinstance(db.storage, FileStorage): if isinstance(db.storage, FileStorage):
from base64 import b64encode as undo_tid from base64 import b64encode as undo_tid
else: else:
...@@ -146,6 +147,7 @@ class ClientTests(NEOFunctionalTest): ...@@ -146,6 +147,7 @@ class ClientTests(NEOFunctionalTest):
t2 = db.lastTransaction() t2 = db.lastTransaction()
ob.left = left ob.left = left
transaction.commit() transaction.commit()
if with_undo:
undo() undo()
t4 = db.lastTransaction() t4 = db.lastTransaction()
undo(t2) undo(t2)
...@@ -174,10 +176,37 @@ class ClientTests(NEOFunctionalTest): ...@@ -174,10 +176,37 @@ class ClientTests(NEOFunctionalTest):
(neo_db, neo_conn) = self.neo.getZODBConnection() (neo_db, neo_conn) = self.neo.getZODBConnection()
self.__checkTree(neo_conn.root()['trees']) self.__checkTree(neo_conn.root()['trees'])
def __dump(self, storage): def testMigrationTool(self):
return {u64(t.tid): [(u64(o.oid), o.data_txn and u64(o.data_txn), dfs_storage = self.__getDataFS()
dfs_db = ZODB.DB(dfs_storage)
self.__populate(dfs_db, with_undo=False)
dump = self.__dump(dfs_storage)
fs_path = dfs_storage.__name__
dfs_db.close()
neo = self.neo
neo.start()
kw = {'cluster': neo.cluster_name, 'quiet': None}
master_nodes = neo.master_nodes.replace('/', ' ')
if neo.SSL:
kw['ca'], kw['cert'], kw['key'] = neo.SSL
p = NEOProcess('neomigrate', fs_path, master_nodes, **kw)
p.start()
p.wait()
os.remove(fs_path)
p = NEOProcess('neomigrate', master_nodes, fs_path, **kw)
p.start()
p.wait()
self.assertEqual(dump, self.__dump(FileStorage(fs_path)))
def __dump(self, storage, sorted=sorted):
return {u64(t.tid): sorted((u64(o.oid), o.data_txn and u64(o.data_txn),
None if o.data is None else makeChecksum(o.data)) None if o.data is None else makeChecksum(o.data))
for o in t] for o in t)
for t in storage.iterator()} for t in storage.iterator()}
def testExport(self): def testExport(self):
...@@ -186,10 +215,10 @@ class ClientTests(NEOFunctionalTest): ...@@ -186,10 +215,10 @@ class ClientTests(NEOFunctionalTest):
self.neo.start() self.neo.start()
(neo_db, neo_conn) = self.neo.getZODBConnection() (neo_db, neo_conn) = self.neo.getZODBConnection()
self.__populate(neo_db) self.__populate(neo_db)
dump = self.__dump(neo_db.storage) dump = self.__dump(neo_db.storage, list)
# copy neo to data fs # copy neo to data fs
dfs_storage = self.__getDataFS(reset=True) dfs_storage = self.__getDataFS()
neo_storage = self.neo.getZODBStorage() neo_storage = self.neo.getZODBStorage()
dfs_storage.copyTransactionsFrom(neo_storage) dfs_storage.copyTransactionsFrom(neo_storage)
...@@ -209,7 +238,7 @@ class ClientTests(NEOFunctionalTest): ...@@ -209,7 +238,7 @@ class ClientTests(NEOFunctionalTest):
self.neo.start() self.neo.start()
neo_db, neo_conn = self.neo.getZODBConnection() neo_db, neo_conn = self.neo.getZODBConnection()
self.__checkTree(neo_conn.root()['trees']) self.__checkTree(neo_conn.root()['trees'])
self.assertEqual(dump, self.__dump(neo_db.storage)) self.assertEqual(dump, self.__dump(neo_db.storage, list))
def testIPv6Client(self): def testIPv6Client(self):
""" Test the connectivity of an IPv6 connection for neo client """ """ Test the connectivity of an IPv6 connection for neo client """
......
...@@ -270,10 +270,6 @@ class TestSerialized(Serialized): ...@@ -270,10 +270,6 @@ class TestSerialized(Serialized):
class Node(object): class Node(object):
@staticmethod
def convertInitArgs(**kw):
return {'get' + k.capitalize(): v for k, v in kw.iteritems()}
def getConnectionList(self, *peers): def getConnectionList(self, *peers):
addr = lambda c: c and (c.addr if c.is_server else c.getAddress()) addr = lambda c: c and (c.addr if c.is_server else c.getAddress())
addr_set = {addr(c.connector) for peer in peers addr_set = {addr(c.connector) for peer in peers
...@@ -337,18 +333,17 @@ class ServerNode(Node): ...@@ -337,18 +333,17 @@ class ServerNode(Node):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
self.node_name = '%s_%u' % (self.node_type, port) self.node_name = '%s_%u' % (self.node_type, port)
kw.update(getCluster=name, getBind=address, kw.update(cluster=name, bind=address,
getMasters=master_nodes and parseMasterList(master_nodes)) masters=master_nodes and parseMasterList(master_nodes))
super(ServerNode, self).__init__(Mock(kw)) super(ServerNode, self).__init__(kw)
def getVirtualAddress(self): def getVirtualAddress(self):
return self._init_args['address'] return self._init_args['address']
def resetNode(self, **kw): def resetNode(self, **kw):
assert not self.is_alive() assert not self.is_alive()
kw = self.convertInitArgs(**kw)
init_args = self._init_args init_args = self._init_args
init_args['getReset'] = False init_args['reset'] = False
assert set(kw).issubset(init_args), (kw, init_args) assert set(kw).issubset(init_args), (kw, init_args)
init_args.update(kw) init_args.update(kw)
self.close() self.close()
...@@ -403,7 +398,7 @@ class StorageApplication(ServerNode, neo.storage.app.Application): ...@@ -403,7 +398,7 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
self.master_conn.close() self.master_conn.close()
def getAdapter(self): def getAdapter(self):
return self._init_args['getAdapter'] return self._init_args['adapter']
def getDataLockInfo(self): def getDataLockInfo(self):
dm = self.dm dm = self.dm
...@@ -663,15 +658,15 @@ class NEOCluster(object): ...@@ -663,15 +658,15 @@ class NEOCluster(object):
master_list = [MasterApplication.newAddress() master_list = [MasterApplication.newAddress()
for _ in xrange(master_count)] for _ in xrange(master_count)]
self.master_nodes = ' '.join('%s:%s' % x for x in master_list) self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
kw = Node.convertInitArgs(replicas=replicas, adapter=adapter, kw = dict(replicas=replicas, adapter=adapter,
partitions=partitions, reset=clear_databases, dedup=dedup) partitions=partitions, reset=clear_databases, dedup=dedup)
kw['cluster'] = weak_self = weakref.proxy(self) kw['cluster'] = weak_self = weakref.proxy(self)
kw['getSSL'] = self.SSL kw['ssl'] = self.SSL
if upstream is not None: if upstream is not None:
self.upstream = weakref.proxy(upstream) self.upstream = weakref.proxy(upstream)
kw.update(getUpstreamCluster=upstream.name, kw.update(upstream_cluster=upstream.name,
getUpstreamMasters=parseMasterList(upstream.master_nodes)) upstream_masters=parseMasterList(upstream.master_nodes))
self.master_list = [MasterApplication(getAutostart=autostart, self.master_list = [MasterApplication(autostart=autostart,
address=x, **kw) address=x, **kw)
for x in master_list] for x in master_list]
if db_list is None: if db_list is None:
...@@ -693,8 +688,9 @@ class NEOCluster(object): ...@@ -693,8 +688,9 @@ class NEOCluster(object):
db = os.path.join(getTempDirectory(), '%s.conf') db = os.path.join(getTempDirectory(), '%s.conf')
with open(db % tuple(db_list), "w") as f: with open(db % tuple(db_list), "w") as f:
cfg.write(f) cfg.write(f)
kw["getAdapter"] = "Importer" kw["adapter"] = "Importer"
self.storage_list = [StorageApplication(getDatabase=db % x, **kw) kw['wait'] = 0
self.storage_list = [StorageApplication(database=db % x, **kw)
for x in db_list] for x in db_list]
self.admin_list = [AdminApplication(**kw)] self.admin_list = [AdminApplication(**kw)]
...@@ -969,7 +965,7 @@ class NEOThreadedTest(NeoTestBase): ...@@ -969,7 +965,7 @@ class NEOThreadedTest(NeoTestBase):
@contextmanager @contextmanager
def getLoopbackConnection(self): def getLoopbackConnection(self):
app = MasterApplication(address=BIND, app = MasterApplication(address=BIND,
getSSL=NEOCluster.SSL, getReplicas=0, getPartitions=1) ssl=NEOCluster.SSL, replicas=0, partitions=1)
try: try:
handler = EventHandler(app) handler = EventHandler(app)
app.listening_conn = ListeningConnection(app, handler, app.server) app.listening_conn = ListeningConnection(app, handler, app.server)
......
...@@ -16,25 +16,28 @@ class MatrixImportBenchmark(BenchmarkRunner): ...@@ -16,25 +16,28 @@ class MatrixImportBenchmark(BenchmarkRunner):
_size = None _size = None
def add_options(self, parser): def add_options(self, parser):
parser.add_option('-d', '--datafs') _ = parser.add_argument
parser.add_option('-z', '--zeo', action="store_true") _('-d', '--datafs')
parser.add_option('', '--min-storages', type='int', default=1) _('-z', '--zeo', action="store_true")
parser.add_option('', '--max-storages', type='int', default=2) _('--min-storages', type=int, default=1)
parser.add_option('', '--min-replicas', type='int', default=0) _('--max-storages', type=int, default=2)
parser.add_option('', '--max-replicas', type='int', default=1) _('--min-replicas', type=int, default=0)
parser.add_option('', '--threaded', action="store_true") _('--max-replicas', type=int, default=1)
_('--repeat', type=int, default=1)
_('--threaded', action="store_true")
def load_options(self, options, args): def load_options(self, args):
if options.datafs and not os.path.exists(options.datafs): if args.datafs and not os.path.exists(args.datafs):
sys.exit('Missing or wrong data.fs argument') sys.exit('Missing or wrong data.fs argument')
return dict( return dict(
datafs = options.datafs, datafs = args.datafs,
min_s = options.min_storages, min_s = args.min_storages,
max_s = options.max_storages, max_s = args.max_storages,
min_r = options.min_replicas, min_r = args.min_replicas,
max_r = options.max_replicas, max_r = args.max_replicas,
threaded = options.threaded, repeat = args.repeat,
zeo = options.zeo, threaded = args.threaded,
zeo = args.zeo,
) )
def start(self): def start(self):
......
...@@ -16,21 +16,22 @@ class ImportBenchmark(BenchmarkRunner): ...@@ -16,21 +16,22 @@ class ImportBenchmark(BenchmarkRunner):
""" Test import of a datafs """ """ Test import of a datafs """
def add_options(self, parser): def add_options(self, parser):
parser.add_option('-d', '--datafs') _ = parser.add_argument
parser.add_option('-m', '--masters') _('-d', '--datafs')
parser.add_option('-s', '--storages') _('-m', '--masters', type=int, default=1)
parser.add_option('-p', '--partitions') _('-s', '--storages', type=int, default=1)
parser.add_option('-r', '--replicas') _('-p', '--partitions', type=int, default=10)
_('-r', '--replicas', type=int, default=0)
def load_options(self, options, args):
if options.datafs and not os.path.exists(options.datafs): def load_options(self, args):
if args.datafs and not os.path.exists(args.datafs):
sys.exit('Missing or wrong data.fs argument') sys.exit('Missing or wrong data.fs argument')
return dict( return dict(
datafs = options.datafs, datafs = args.datafs,
masters = int(options.masters or 1), masters = args.masters,
storages = int(options.storages or 1), storages = args.storages,
partitions = int(options.partitions or 10), partitions = args.partitions,
replicas = int(options.replicas or 0), replicas = args.replicas,
) )
def start(self): def start(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment