Commit b621a98f authored by Julien Muchembled's avatar Julien Muchembled

bench: new option to mesure ZEO perfs in matrix test

parent 114c7ab6
......@@ -116,36 +116,31 @@ class PortAllocator(object):
__del__ = release
class NEOProcess(object):
class Process(object):
_coverage_fd = None
_coverage_prefix = os.path.join(getTempDirectory(), 'coverage-')
_coverage_index = 0
pid = 0
def __init__(self, command, uuid, arg_dict):
try:
__import__('neo.scripts.' + command, level=0)
except ImportError:
raise NotFound, '%s not found' % (command)
def __init__(self, command, arg_dict={}):
self.command = command
self.arg_dict = arg_dict
self.with_uuid = True
self.setUUID(uuid)
def start(self, with_uuid=True):
# Prevent starting when already forked and wait wasn't called.
if self.pid != 0:
raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
command = self.command
def _args(self):
args = []
self.with_uuid = with_uuid
for arg, param in self.arg_dict.iteritems():
args.append('--' + arg)
if param is not None:
args.append(str(param))
if with_uuid:
args += '--uuid', str(self.uuid)
return args
def start(self):
# Prevent starting when already forked and wait wasn't called.
if self.pid != 0:
raise AlreadyRunning('Already running with PID %r' % self.pid)
command = self.command
args = self._args()
global coverage
if coverage:
cls = self.__class__
......@@ -179,7 +174,7 @@ class NEOProcess(object):
os.close(self._coverage_fd)
os.write(w, '\0')
sys.argv = [command] + args
getattr(neo.scripts, command).main()
self.run()
status = 0
except SystemExit, e:
status = e.code
......@@ -203,6 +198,9 @@ class NEOProcess(object):
logging.info('pid %u: %s %s',
self.pid, command, ' '.join(map(repr, args)))
def run(self):
raise NotImplementedError
def child_coverage(self):
r = self._coverage_fd
if r is not None:
......@@ -249,11 +247,32 @@ class NEOProcess(object):
self.kill()
self.wait()
def getPID(self):
return self.pid
def isAlive(self):
try:
return psutil.Process(self.pid).status() != psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
class NEOProcess(Process):
def __init__(self, command, uuid, arg_dict):
try:
__import__('neo.scripts.' + command, level=0)
except ImportError:
raise NotFound(command + ' not found')
super(NEOProcess, self).__init__(command, arg_dict)
self.setUUID(uuid)
def _args(self):
args = super(NEOProcess, self)._args()
if self.uuid:
args += '--uuid', str(self.uuid)
return args
def run(self):
getattr(neo.scripts, self.command).main()
def getUUID(self):
assert self.with_uuid, 'UUID disabled on this process'
return self.uuid
def setUUID(self, uuid):
......@@ -262,12 +281,6 @@ class NEOProcess(object):
"""
self.uuid = uuid
def isAlive(self):
try:
return psutil.Process(self.pid).status() != psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
class NEOCluster(object):
SSL = None
......
from __future__ import print_function
import os
import signal
import tempfile
import ZEO.runzeo
from ZEO.ClientStorage import ClientStorage as _ClientStorage
from . import buildUrlFromString, ADDRESS_TYPE, IP_VERSION_FORMAT_DICT
from .functional import AlreadyStopped, PortAllocator, Process
class ZEOProcess(Process):
def __init__(self, **kw):
super(ZEOProcess, self).__init__('runzeo', kw)
def run(self):
from ZEO.runzeo import ZEOServer
del ZEOServer.handle_sigusr2
getattr(ZEO, self.command).main()
class ClientStorage(_ClientStorage):
@property
def restore(self):
raise AttributeError('IStorageRestoreable disabled')
class ZEOCluster(object):
def start(self):
self.zodb_storage_list = []
local_ip = IP_VERSION_FORMAT_DICT[ADDRESS_TYPE]
port_allocator = PortAllocator()
port = port_allocator.allocate(ADDRESS_TYPE, local_ip)
self.address = buildUrlFromString(local_ip), port
temp_dir = tempfile.mkdtemp(prefix='neo_')
print('Using temp directory', temp_dir)
self.zeo = ZEOProcess(address='%s:%s' % self.address,
filename=os.path.join(temp_dir, 'Data.fs'))
port_allocator.release()
self.zeo.start()
def stop(self):
storage_list = self.zodb_storage_list
zeo = self.zeo
del self.zeo, self.zodb_storage_list
try:
for storage in storage_list:
storage.close()
zeo.kill(signal.SIGUSR2)
except AlreadyStopped:
pass
else:
zeo.child_coverage()
zeo.kill(signal.SIGKILL)
zeo.wait()
def getZODBStorage(self):
storage = ClientStorage(self.address)
self.zodb_storage_list.append(storage)
return storage
def setupDB(self):
pass
#!/usr/bin/env python
from __future__ import print_function
import sys
import os
import math
......@@ -17,6 +17,7 @@ class MatrixImportBenchmark(BenchmarkRunner):
def add_options(self, parser):
parser.add_option('-d', '--datafs')
parser.add_option('-z', '--zeo', action="store_true")
parser.add_option('', '--min-storages', type='int', default=1)
parser.add_option('', '--max-storages', type='int', default=2)
parser.add_option('', '--min-replicas', type='int', default=0)
......@@ -33,6 +34,7 @@ class MatrixImportBenchmark(BenchmarkRunner):
min_r = options.min_replicas,
max_r = options.max_replicas,
threaded = options.threaded,
zeo = options.zeo,
)
def start(self):
......@@ -47,22 +49,28 @@ class MatrixImportBenchmark(BenchmarkRunner):
if storages[-1] < max_s:
storages.append(max_s)
replicas = range(min_r, max_r + 1)
result_list = [self.runMatrix(storages, replicas)
for x in xrange(self._config.repeat)]
results = {}
for s in storages:
results[s] = z = {}
for r in replicas:
if r < s:
x = [x[s][r] for x in result_list if x[s][r] is not None]
if x:
z[r] = min(x)
else:
z[r] = None
def merge_min(a, b):
for k, vb in b.iteritems():
try:
va = a[k]
except KeyError:
pass
else:
if type(va) is dict:
merge_min(va, vb)
continue
if vb is None or None is not va <= vb:
continue
a[k] = vb
for x in xrange(self._config.repeat):
merge_min(results, self.runMatrix(storages, replicas))
return self.buildReport(storages, replicas, results)
def runMatrix(self, storages, replicas):
stats = {}
if self._config.zeo:
stats['zeo'] = self.runImport()
for s in storages:
stats[s] = z = {}
for r in replicas:
......@@ -70,7 +78,7 @@ class MatrixImportBenchmark(BenchmarkRunner):
z[r] = self.runImport(1, s, r, 12*s//(1+r))
return stats
def runImport(self, masters, storages, replicas, partitions):
def runImport(self, *neo_args):
datafs = self._config.datafs
if datafs:
dfs_storage = FileStorage(file_name=self._config.datafs)
......@@ -79,28 +87,36 @@ class MatrixImportBenchmark(BenchmarkRunner):
import random, neo.tests.stat_zodb
dfs_storage = getattr(neo.tests.stat_zodb, datafs)(
random.Random(0)).as_storage(5000)
print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
datafs, masters, storages, replicas, partitions)
if self._config.threaded:
from neo.tests.threaded import NEOCluster
info = "Import of " + datafs
if neo_args:
masters, storages, replicas, partitions = neo_args
info += " with m=%s, s=%s, r=%s, p=%s" % (
masters, storages, replicas, partitions)
if self._config.threaded:
from neo.tests.threaded import NEOCluster
else:
from neo.tests.functional import NEOCluster
zodb = NEOCluster(
db_list=['%s_matrix_%u' % (DB_PREFIX, i) for i in xrange(storages)],
clear_databases=True,
master_count=masters,
partitions=partitions,
replicas=replicas,
)
else:
from neo.tests.functional import NEOCluster
neo = NEOCluster(
db_list=['%s_matrix_%u' % (DB_PREFIX, i) for i in xrange(storages)],
clear_databases=True,
master_count=masters,
partitions=partitions,
replicas=replicas,
)
from neo.tests.zeo_cluster import ZEOCluster
info += " with ZEO"
zodb = ZEOCluster()
print(info)
try:
neo.start()
zodb.start()
try:
neo_storage = neo.getZODBStorage()
if not self._config.threaded:
assert len(neo.getStorageList()) == storages
neo.expectOudatedCells(number=0)
storage = zodb.getZODBStorage()
if neo_args and not self._config.threaded:
assert len(zodb.getStorageList()) == storages
zodb.expectOudatedCells(number=0)
start = time()
neo_storage.copyTransactionsFrom(dfs_storage)
storage.copyTransactionsFrom(dfs_storage)
end = time()
size = dfs_storage.getSize()
if self._size is None:
......@@ -108,15 +124,14 @@ class MatrixImportBenchmark(BenchmarkRunner):
else:
assert self._size == size
finally:
neo.stop()
zodb.stop()
# Clear DB if no error happened.
neo.setupDB()
zodb.setupDB()
return end - start
except:
traceback.print_exc()
self.error_log += "Import with m=%s, s=%s, r=%s, p=%s:" % (
masters, storages, replicas, partitions)
self.error_log += "\n%s\n" % ''.join(traceback.format_exc())
self.error_log += "%s:\n%s\n" % (
info, ''.join(traceback.format_exc()))
def buildReport(self, storages, replicas, results):
# draw an array with results
......@@ -130,6 +145,14 @@ class MatrixImportBenchmark(BenchmarkRunner):
report += sep
failures = 0
speedlist = []
if self._config.zeo:
result = results['zeo']
if result is None:
result = 'FAIL'
failures += 1
else:
result = '%.1f kB/s' % (dfs_size / (result * 1e3))
self.add_status('ZEO', result)
for s in storages:
values = []
assert s in results
......@@ -151,7 +174,7 @@ class MatrixImportBenchmark(BenchmarkRunner):
if failures:
info = '%d failures' % (failures, )
else:
info = '%.1f KB/s' % (sum(speedlist) / len(speedlist))
info = '%.1f kB/s' % (sum(speedlist) / len(speedlist))
return info, report
def main(args=None):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment