Commit 32d23f21 authored by Jeremy Hylton's avatar Jeremy Hylton

Modify to user forker

Add -t n flag to specify number of concurrent threads
Add -U flag to specify a Unix domain socket
parent 995d858a
...@@ -106,6 +106,10 @@ Options: ...@@ -106,6 +106,10 @@ Options:
-M Output means only -M Output means only
-C Run with a persistent client cache -C Run with a persistent client cache
-U Run ZEO using a Unix domain socket
-t n Number of concurrent threads to run.
""" """
import asyncore import asyncore
...@@ -115,6 +119,8 @@ import sys, os, getopt, string, time ...@@ -115,6 +119,8 @@ import sys, os, getopt, string, time
import ZODB, ZODB.FileStorage import ZODB, ZODB.FileStorage
import Persistence import Persistence
import ZEO.ClientStorage, ZEO.StorageServer import ZEO.ClientStorage, ZEO.StorageServer
from ZEO.tests import forker
from ZODB.POSException import ConflictError
class P(Persistence.Persistent): class P(Persistence.Persistent):
pass pass
...@@ -141,8 +147,49 @@ class ZEOExit(asyncore.file_dispatcher): ...@@ -141,8 +147,49 @@ class ZEOExit(asyncore.file_dispatcher):
os.unlink(fs_name + ".lock") os.unlink(fs_name + ".lock")
os.unlink(fs_name + ".tmp") os.unlink(fs_name + ".tmp")
def work(db, results, nrep, compress, data, detailed, minimize, threadno=None):
for j in range(nrep):
for r in 1, 10, 100, 1000:
t = time.time()
jar = db.open()
while 1:
try:
get_transaction().begin()
rt = jar.root()
key = 's%s' % r
if rt.has_key(key):
p = rt[key]
else:
rt[key] = p =P()
for i in range(r):
v = getattr(p, str(i), P())
if compress is not None:
v.d = compress(data)
else:
v.d = data
setattr(p, str(i), v)
get_transaction().commit()
except ConflictError:
pass
else:
break
jar.close()
t = time.time() - t
if detailed:
if threadno is None:
print "%s\t%s\t%.4f" % (j, r, t)
else:
print "%s\t%s\t%.4f\t%d" % (j, r, t, threadno)
results[r] = results[r] + t
rt=d=p=v=None # release all references
if minimize:
time.sleep(3)
jar.cacheMinimize(3)
def main(args): def main(args):
opts, args = getopt.getopt(args, 'zd:n:Ds:LM') opts, args = getopt.getopt(args, 'zd:n:Ds:LMt:U')
s = None s = None
compress = None compress = None
data=sys.argv[0] data=sys.argv[0]
...@@ -150,6 +197,8 @@ def main(args): ...@@ -150,6 +197,8 @@ def main(args):
minimize=0 minimize=0
detailed=1 detailed=1
cache = None cache = None
domain = 'AF_INET'
threads = 1
for o, v in opts: for o, v in opts:
if o=='-n': nrep = int(v) if o=='-n': nrep = int(v)
elif o=='-d': data = v elif o=='-d': data = v
...@@ -168,79 +217,50 @@ def main(args): ...@@ -168,79 +217,50 @@ def main(args):
debug = 1 debug = 1
elif o == '-C': elif o == '-C':
cache = 'speed' cache = 'speed'
elif o == '-U':
domain = 'AF_UNIX'
elif o == '-t':
threads = int(v)
zeo_pipe = None zeo_pipe = None
if s: if s:
s = __import__(s, globals(), globals(), ('__doc__',)) s = __import__(s, globals(), globals(), ('__doc__',))
s = s.Storage s = s.Storage
server = None
else: else:
rd, wr = os.pipe()
pid = os.fork()
if pid:
# in the child, run the storage server
os.close(wr)
import asyncore
ZEOExit(rd)
fs = ZODB.FileStorage.FileStorage(fs_name, create=1) fs = ZODB.FileStorage.FileStorage(fs_name, create=1)
serv = ZEO.StorageServer.StorageServer(('', 1975), {'1':fs}) s, server, pid = forker.start_zeo(fs, domain=domain)
asyncore.loop()
else:
os.close(rd)
zeo_pipe = wr
s = ZEO.ClientStorage.ClientStorage(('', 1975), debug=0,
client=cache)
if hasattr(s, 'is_connected'):
while not s.is_connected():
time.sleep(0.1)
else:
time.sleep(1.0)
data=open(data).read() data=open(data).read()
db=ZODB.DB(s, db=ZODB.DB(s,
# disable cache deactivation # disable cache deactivation
cache_size=4000, cache_size=4000,
cache_deactivate_after=6000,) cache_deactivate_after=6000,)
db.open().root()
print "Beginning work..."
results={1:0, 10:0, 100:0, 1000:0} results={1:0, 10:0, 100:0, 1000:0}
for j in range(nrep): if threads > 1:
for r in 1, 10, 100, 1000: import threading
t = time.time() l = [threading.Thread(target=work,
args=(db, results, nrep, compress, data,
detailed, minimize, i))
for i in range(threads)]
for t in l:
t.start()
for t in l:
t.join()
jar = db.open()
get_transaction().begin()
rt = jar.root()
key = 's%s' % r
if rt.has_key(key):
p = rt[key]
else: else:
rt[key] = p =P() work(db, results, nrep, compress, data, detailed, minimize)
for i in range(r):
v = getattr(p, str(i), P())
if compress is not None:
v.d = compress(data)
else:
v.d = data
setattr(p, str(i), v)
get_transaction().commit()
jar.close()
t = time.time() - t
if detailed:
print "%s\t%s\t%.4f" % (j, r, t)
results[r] = results[r] + t
rt=d=p=v=None # release all references
if minimize:
time.sleep(3)
jar.cacheMinimize(3)
if zeo_pipe: if server is not None:
os.write(zeo_pipe, "done") server.close()
os.waitpid(pid, 0)
if detailed: if detailed:
print '-'*24 print '-'*24
for r in 1, 10, 100, 1000: for r in 1, 10, 100, 1000:
t=results[r]/nrep t=results[r]/(nrep * threads)
print "mean:\t%s\t%.4f\t%.4f (s/o)" % (r, t, t/r) print "mean:\t%s\t%.4f\t%.4f (s/o)" % (r, t, t/r)
##def compress(s): ##def compress(s):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment