Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
Zope
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
Zope
Commits
ad215c3a
Commit
ad215c3a
authored
Jul 31, 2010
by
Hanno Schlichting
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Remove utterly outdated regression tests
parent
6a406941
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
0 additions
and
2609 deletions
+0
-2609
src/Products/ZCatalog/regressiontests/keywords.py
src/Products/ZCatalog/regressiontests/keywords.py
+0
-46
src/Products/ZCatalog/regressiontests/loadmail.py
src/Products/ZCatalog/regressiontests/loadmail.py
+0
-755
src/Products/ZCatalog/regressiontests/regressionCatalog.py
src/Products/ZCatalog/regressiontests/regressionCatalog.py
+0
-710
src/Products/ZCatalog/regressiontests/regressionCatalogTiming.py
...ducts/ZCatalog/regressiontests/regressionCatalogTiming.py
+0
-246
src/Products/ZCatalog/regressiontests/regressionUnicode.py
src/Products/ZCatalog/regressiontests/regressionUnicode.py
+0
-122
src/Products/ZCatalog/regressiontests/unittest_patched.py
src/Products/ZCatalog/regressiontests/unittest_patched.py
+0
-730
No files found.
src/Products/ZCatalog/regressiontests/keywords.py
deleted
100644 → 0
View file @
6a406941
import
rfc822
,
mailbox
,
cPickle
,
string
class
Keywords
:
""" stupid class to read a list of rfc822 messages and extract
all words from the subject header. We use this class for testing
purposes only
"""
def
__init__
(
self
):
self
.
kw
=
[]
def
build
(
self
,
mbox
,
limit
):
mb
=
mailbox
.
UnixMailbox
(
open
(
mbox
))
msg
=
mb
.
next
()
while
msg
and
len
(
self
.
kw
)
<
limit
:
sub
=
msg
.
dict
.
get
(
"subject"
,
""
).
split
(
' '
)
for
f
in
sub
:
ok
=
1
for
c
in
f
:
if
not
c
in
string
.
letters
:
ok
=
0
if
ok
==
1
and
not
f
in
self
.
kw
:
self
.
kw
.
append
(
f
)
msg
=
mb
.
next
()
P
=
cPickle
.
Pickler
(
open
(
'data/keywords'
,
'w'
))
P
.
dump
(
self
.
kw
)
def
reload
(
self
):
P
=
cPickle
.
Unpickler
(
open
(
'data/keywords'
,
'r'
))
self
.
kw
=
P
.
load
()
def
keywords
(
self
):
return
self
.
kw
if
__name__
==
"__main__"
:
k
=
Keywords
()
k
.
build
(
"/home/andreas/zope.mbox"
,
1000
)
src/Products/ZCatalog/regressiontests/loadmail.py
deleted
100644 → 0
View file @
6a406941
"""Test script for exercising various catalog features under load
Usage:
cd lib/python
python Products/ZCatalog/regressiontests/loadmail.py command args
where each command has its own command-line arguments that it expects.
Note that all of the commands operate on the Zope database,
typically var/Data.fs.
Note that this script uses the proc file system to get memory size.
Many of the commands output performance statisics on lines that look like::
11.3585170507 0.06 2217781L 7212
where the numbers are:
- clock time in seconds
- cpu time used by the main thread, in seconds,
- Database size growth over the test
- Memory growth during the test (if the proc file system is available).
Commands:
base mbox [max]
Build a base database by:
- Deleting ../../var/Data.fs
- Starting Zope
- Adding a top-level folder names 'mail'
- Reading up to max messages from the Unix mailbox file, mbox
and adding them as documents to the mail folder.
index [threshold]
Index all of the DTML documents in the database, committing
sub-transactions after each threshold objects. threshold defaults to
1000.
If the threshold is less than the number of messages, then the
size of the temporary sub-transaction commit file is output.
inc mbox start end [threads [wait]]
Incrementally index messages start to end in unix mailbox mbox.
If the threads argument is supplied, then it specifies the
number of threads to use. For example, with:
python Products/ZCatalog/tests/loadmail.py inc mbox 0 200 2
One thread indexes messages 0 to 99 and another thread indexes messages
100 to 199.
If wait is specified, then after each document is indexed, the
thread sleeps a random number of seconds between 0 and 2*wait.
The default wait is 0.25 seconds.
For each thread, a line that looks like::
3.41 (0, 1)
is output, containing:
- The cpu time
- The number of ZODB transaction conflicts detected when reading
- The number of ZODB transaction conflicts detected when committing
edit edits deletes inserts threads wait
Incrementally edit edits messages from mail. For each message,
do a random number of word deletes between 0 and deletes * 2
and do a random number of inserts between 0 and inserts * 2.
For each thread, a line that looks like::
3.41 (0, 1)
as described above.
catdel
Delete the entire catalog in one transaction. This is a fun one for
storages that do reference counting garbage collection.
pdebug command args
Run one of the other commands in the Python debugger.
sample suite of tests::
cd lib/python
python Products/ZCatalog/regressiontests/loadmail.py base ~/zope.mbox 1000
python Products/ZCatalog/regressiontests/loadmail.py index 100
python Products/ZCatalog/regressiontests/loadmail.py
\
inc ~/python-dev.mbox 0 10 2
python Products/ZCatalog/regressiontests/loadmail.py edit 10 10 10 2
python Products/ZCatalog/regressiontests/loadmail.py catdel
"""
import
getopt
import
mailbox
,
time
,
sys
,
os
,
string
sys
.
path
.
insert
(
0
,
'.'
)
import
random
from
string
import
strip
,
find
,
split
,
lower
,
atoi
from
urllib
import
quote
import
transaction
VERBOSE
=
0
def
do
(
db
,
f
,
args
,
returnf
=
None
):
"""Do something and measure it's impact"""
t
=
c
=
size
=
mem
=
r
=
None
try
:
size
=
db
.
getSize
()
mem
=
VmSize
()
t
=
time
.
time
()
c
=
time
.
clock
()
r
=
apply
(
f
,
args
)
t
=
time
.
time
()
-
t
c
=
time
.
clock
()
-
c
size
=
db
.
getSize
()
-
size
mem
=
VmSize
()
-
mem
finally
:
if
returnf
is
not
None
:
returnf
(
t
,
c
,
size
,
mem
,
r
)
else
:
return
t
,
c
,
size
,
mem
,
r
def
loadmessage
(
dest
,
message
,
i
,
body
=
None
,
headers
=
None
):
if
body
is
None
:
body
=
message
.
fp
.
read
()
if
headers
is
None
:
headers
=
message
.
headers
dest
.
manage_addDTMLDocument
(
str
(
i
),
file
=
body
)
doc
=
dest
[
str
(
i
)]
for
h
in
headers
:
h
=
strip
(
h
)
l
=
find
(
h
,
':'
)
if
l
<=
0
:
continue
name
=
lower
(
h
[:
l
])
if
name
==
'subject'
:
name
=
'title'
v
=
strip
(
h
[
l
+
1
:])
type
=
'string'
if
0
and
name
==
'date'
:
type
=
'date'
elif
0
:
try
:
atoi
(
v
)
except
:
pass
else
:
type
=
int
if
name
==
'title'
:
doc
.
manage_changeProperties
(
title
=
h
)
else
:
try
:
doc
.
manage_addProperty
(
name
,
v
,
type
)
except
:
pass
def
loadmail
(
dest
,
name
,
mbox
,
max
=-
1
):
try
:
import
Products.BTreeFolder.BTreeFolder
except
:
dest
.
manage_addFolder
(
name
)
else
:
Products
.
BTreeFolder
.
BTreeFolder
.
manage_addBTreeFolder
(
dest
,
name
)
dest
=
getattr
(
dest
,
name
)
f
=
open
(
mbox
)
mb
=
mailbox
.
UnixMailbox
(
f
)
i
=
0
message
=
mb
.
next
()
while
message
:
if
max
>=
0
and
i
>
max
:
break
if
i
%
100
==
0
and
VERBOSE
:
fmt
=
"
\
t
%s
\
t
%s
\
t
\
r
"
if
os
.
environ
.
get
(
'TERM'
)
in
(
'dumb'
,
'emacs'
):
fmt
=
"
\
t
%s
\
t
%s
\
t
\
n
"
sys
.
stdout
.
write
(
fmt
%
(
i
,
f
.
tell
()))
sys
.
stdout
.
flush
()
if
i
and
(
i
%
5000
==
0
):
transaction
.
commit
()
dest
.
_p_jar
.
_cache
.
minimize
()
loadmessage
(
dest
,
message
,
i
)
i
=
i
+
1
message
=
mb
.
next
()
dest
.
number_of_messages
=
i
print
transaction
.
commit
()
def
loadinc
(
name
,
mb
,
f
,
max
=
99999999
,
wait
=
1
):
from
ZODB.POSException
import
ConflictError
from
time
import
sleep
from
random
import
uniform
import
Zope2
,
sys
rconflicts
=
wconflicts
=
0
i
=
0
message
=
mb
.
next
()
body
=
message
.
fp
.
read
()
headers
=
list
(
message
.
headers
)
while
i
<
max
:
# sys.stderr.write("%s " % i)
# sys.stdout.flush()
if
wait
:
sleep
(
uniform
(
0
,
wait
))
jar
=
Zope2
.
DB
.
open
()
app
=
jar
.
root
()[
'Application'
]
mdest
=
getattr
(
app
,
name
)
if
i
%
100
==
0
and
VERBOSE
:
fmt
=
"
\
t
%s
\
t
%s
\
t
\
r
"
if
os
.
environ
.
get
(
'TERM'
)
in
(
'dumb'
,
'emacs'
):
fmt
=
"
\
t
%s
\
t
%s
\
t
\
n
"
sys
.
stdout
.
write
(
fmt
%
(
i
,
f
.
tell
()))
sys
.
stdout
.
flush
()
did
=
str
(
i
)
try
:
loadmessage
(
mdest
,
message
,
did
,
body
,
headers
)
doc
=
mdest
[
did
]
app
.
cat
.
catalog_object
(
doc
)
except
ConflictError
,
v
:
# print v.args
rconflicts
=
rconflicts
+
1
transaction
.
abort
()
else
:
try
:
transaction
.
commit
()
i
=
i
+
1
message
=
mb
.
next
()
body
=
message
.
fp
.
read
()
headers
=
list
(
message
.
headers
)
except
ConflictError
:
wconflicts
=
wconflicts
+
1
transaction
.
abort
()
doc
=
app
=
mdest
=
0
jar
.
close
()
if
VERBOSE
:
sys
.
stdout
.
write
(
"
\
t
%s
\
t
%s
\
t
\
n
"
%
(
i
,
f
.
tell
()))
sys
.
stdout
.
flush
()
return
rconflicts
,
wconflicts
def
base
():
try
:
os
.
unlink
(
'../../var/Data.fs'
)
except
:
pass
import
Zope2
app
=
Zope2
.
app
()
if
len
(
sys
.
argv
)
>
3
:
max
=
atoi
(
sys
.
argv
[
3
])
else
:
max
=
-
1
print
do
(
Zope2
.
DB
,
loadmail
,
(
app
,
'mail'
,
sys
.
argv
[
2
],
max
))
Zope2
.
DB
.
close
()
class
RE
:
def
redirect
(
*
args
,
**
kw
):
pass
def
indexf
(
app
):
r
=
RE
()
r
.
PARENTS
=
[
0
,
app
.
mail
]
app
.
cat
.
manage_catalogFoundItems
(
r
,
r
,
''
,
''
,[
'DTML Document'
])
transaction
.
commit
()
def
index
():
os
.
environ
[
'STUPID_LOG_FILE'
]
=
''
os
.
environ
[
'STUPID_LOG_SEVERITY'
]
=
'-111'
import
Zope2
,
Products
.
ZCatalog
.
ZCatalog
import
AccessControl.SecurityManagement
,
AccessControl
.
SpecialUsers
app
=
Zope2
.
app
()
Products
.
ZCatalog
.
ZCatalog
.
manage_addZCatalog
(
app
,
'cat'
,
''
)
try
:
app
.
cat
.
threshold
=
atoi
(
sys
.
argv
[
2
])
except
IndexError
:
app
.
cat
.
threashold
=
1000
from
Products.ZCTextIndex.ZCTextIndex
\
import
PLexicon
from
Products.ZCTextIndex.Lexicon
\
import
Splitter
,
CaseNormalizer
app
.
cat
.
_setObject
(
'lex'
,
PLexicon
(
'lex'
,
''
,
Splitter
(),
CaseNormalizer
())
)
class
extra
:
doc_attr
=
'PrincipiaSearchSource'
lexicon_id
=
'lex'
index_type
=
'Okapi BM25 Rank'
app
.
cat
.
addIndex
(
'PrincipiaSearchSource'
,
'ZCTextIndex'
,
extra
)
transaction
.
commit
()
system
=
AccessControl
.
SpecialUsers
.
system
AccessControl
.
SecurityManagement
.
newSecurityManager
(
None
,
system
)
r
=
RE
()
r
.
PARENTS
=
[
app
.
cat
,
app
]
print
do
(
Zope2
.
DB
,
indexf
,
(
app
,))
#hist(sys.argv[2])
Zope2
.
DB
.
close
()
def
initmaili
(
n
):
import
Zope2
app
=
Zope2
.
app
()
try
:
import
Products.BTreeFolder.BTreeFolder
except
:
app
.
manage_addFolder
(
n
)
else
:
Products
.
BTreeFolder
.
BTreeFolder
.
manage_addBTreeFolder
(
app
,
n
)
transaction
.
commit
()
app
.
_p_jar
.
close
()
def
hist
(
n
):
import
Zope2
app
=
Zope2
.
app
()
import
cPickle
pickler
=
cPickle
.
Pickler
(
open
(
"h%s.hist"
%
n
,
'w'
))
h
=
app
.
cat
.
_catalog
.
indexes
[
'PrincipiaSearchSource'
].
histogram
()
pickler
.
dump
(
list
(
h
.
items
()))
#h=app.cat._catalog.uids.keys()
#pickler.dump(list(h))
def
inc
():
import
Zope2
,
thread
min
,
max
=
atoi
(
sys
.
argv
[
3
]),
atoi
(
sys
.
argv
[
4
])
count
=
max
-
min
try
:
threads
=
atoi
(
sys
.
argv
[
5
])
except
:
threads
=
1
wait
=
0
else
:
try
:
wait
=
atof
(
sys
.
argv
[
6
])
except
:
wait
=
0.25
wait
=
wait
*
2
count
=
count
/
threads
max
=
min
+
count
omin
=
min
db
=
Zope2
.
DB
size
=
db
.
getSize
()
mem
=
VmSize
()
t
=
time
.
time
()
c
=
time
.
clock
()
mbox
=
sys
.
argv
[
2
]
argss
=
[]
for
i
in
range
(
threads
):
amin
=
min
+
i
*
count
dest
=
'maili%s'
%
amin
initmaili
(
dest
)
f
=
open
(
mbox
)
mb
=
mailbox
.
UnixMailbox
(
f
)
j
=
0
while
j
<
amin
:
mb
.
next
()
j
=
j
+
1
lock
=
thread
.
allocate_lock
()
lock
.
acquire
()
def
returnf
(
t
,
c
,
size
,
mem
,
r
,
lock
=
lock
):
print
c
,
r
lock
.
release
()
argss
.
append
((
lock
,
(
dest
,
mb
,
f
,
count
,
wait
),
returnf
))
for
lock
,
args
,
returnf
in
argss
:
thread
.
start_new_thread
(
do
,
(
Zope2
.
DB
,
loadinc
,
args
,
returnf
))
for
lock
,
args
,
returnf
in
argss
:
lock
.
acquire
()
t
=
time
.
time
()
-
t
c
=
time
.
clock
()
-
c
size
=
db
.
getSize
()
-
size
mem
=
VmSize
()
-
mem
print
t
,
c
,
size
,
mem
#hist("%s-%s-%s" % (omin, count, threads))
Zope2
.
DB
.
close
()
def
catdel
():
import
Zope2
app
=
Zope2
.
app
()
db
=
Zope2
.
DB
t
=
time
.
time
()
c
=
time
.
clock
()
size
=
db
.
getSize
()
mem
=
VmSize
()
del
app
.
cat
transaction
.
commit
()
t
=
time
.
time
()
-
t
c
=
time
.
clock
()
-
c
size
=
db
.
getSize
()
-
size
mem
=
VmSize
()
-
mem
print
t
,
c
,
size
,
mem
words
=
[
'banishment'
,
'indirectly'
,
'imprecise'
,
'peeks'
,
'opportunely'
,
'bribe'
,
'sufficiently'
,
'Occidentalized'
,
'elapsing'
,
'fermenting'
,
'listen'
,
'orphanage'
,
'younger'
,
'draperies'
,
'Ida'
,
'cuttlefish'
,
'mastermind'
,
'Michaels'
,
'populations'
,
'lent'
,
'cater'
,
'attentional'
,
'hastiness'
,
'dragnet'
,
'mangling'
,
'scabbards'
,
'princely'
,
'star'
,
'repeat'
,
'deviation'
,
'agers'
,
'fix'
,
'digital'
,
'ambitious'
,
'transit'
,
'jeeps'
,
'lighted'
,
'Prussianizations'
,
'Kickapoo'
,
'virtual'
,
'Andrew'
,
'generally'
,
'boatsman'
,
'amounts'
,
'promulgation'
,
'Malay'
,
'savaging'
,
'courtesan'
,
'nursed'
,
'hungered'
,
'shiningly'
,
'ship'
,
'presides'
,
'Parke'
,
'moderns'
,
'Jonas'
,
'unenlightening'
,
'dearth'
,
'deer'
,
'domesticates'
,
'recognize'
,
'gong'
,
'penetrating'
,
'dependents'
,
'unusually'
,
'complications'
,
'Dennis'
,
'imbalances'
,
'nightgown'
,
'attached'
,
'testaments'
,
'congresswoman'
,
'circuits'
,
'bumpers'
,
'braver'
,
'Boreas'
,
'hauled'
,
'Howe'
,
'seethed'
,
'cult'
,
'numismatic'
,
'vitality'
,
'differences'
,
'collapsed'
,
'Sandburg'
,
'inches'
,
'head'
,
'rhythmic'
,
'opponent'
,
'blanketer'
,
'attorneys'
,
'hen'
,
'spies'
,
'indispensably'
,
'clinical'
,
'redirection'
,
'submit'
,
'catalysts'
,
'councilwoman'
,
'kills'
,
'topologies'
,
'noxious'
,
'exactions'
,
'dashers'
,
'balanced'
,
'slider'
,
'cancerous'
,
'bathtubs'
,
'legged'
,
'respectably'
,
'crochets'
,
'absenteeism'
,
'arcsine'
,
'facility'
,
'cleaners'
,
'bobwhite'
,
'Hawkins'
,
'stockade'
,
'provisional'
,
'tenants'
,
'forearms'
,
'Knowlton'
,
'commit'
,
'scornful'
,
'pediatrician'
,
'greets'
,
'clenches'
,
'trowels'
,
'accepts'
,
'Carboloy'
,
'Glenn'
,
'Leigh'
,
'enroll'
,
'Madison'
,
'Macon'
,
'oiling'
,
'entertainingly'
,
'super'
,
'propositional'
,
'pliers'
,
'beneficiary'
,
'hospitable'
,
'emigration'
,
'sift'
,
'sensor'
,
'reserved'
,
'colonization'
,
'shrilled'
,
'momentously'
,
'stevedore'
,
'Shanghaiing'
,
'schoolmasters'
,
'shaken'
,
'biology'
,
'inclination'
,
'immoderate'
,
'stem'
,
'allegory'
,
'economical'
,
'daytime'
,
'Newell'
,
'Moscow'
,
'archeology'
,
'ported'
,
'scandals'
,
'Blackfoot'
,
'leery'
,
'kilobit'
,
'empire'
,
'obliviousness'
,
'productions'
,
'sacrificed'
,
'ideals'
,
'enrolling'
,
'certainties'
,
'Capsicum'
,
'Brookdale'
,
'Markism'
,
'unkind'
,
'dyers'
,
'legislates'
,
'grotesquely'
,
'megawords'
,
'arbitrary'
,
'laughing'
,
'wildcats'
,
'thrower'
,
'sex'
,
'devils'
,
'Wehr'
,
'ablates'
,
'consume'
,
'gossips'
,
'doorways'
,
'Shari'
,
'advanced'
,
'enumerable'
,
'existentially'
,
'stunt'
,
'auctioneers'
,
'scheduler'
,
'blanching'
,
'petulance'
,
'perceptibly'
,
'vapors'
,
'progressed'
,
'rains'
,
'intercom'
,
'emergency'
,
'increased'
,
'fluctuating'
,
'Krishna'
,
'silken'
,
'reformed'
,
'transformation'
,
'easter'
,
'fares'
,
'comprehensible'
,
'trespasses'
,
'hallmark'
,
'tormenter'
,
'breastworks'
,
'brassiere'
,
'bladders'
,
'civet'
,
'death'
,
'transformer'
,
'tolerably'
,
'bugle'
,
'clergy'
,
'mantels'
,
'satin'
,
'Boswellizes'
,
'Bloomington'
,
'notifier'
,
'Filippo'
,
'circling'
,
'unassigned'
,
'dumbness'
,
'sentries'
,
'representativeness'
,
'souped'
,
'Klux'
,
'Kingstown'
,
'gerund'
,
'Russell'
,
'splices'
,
'bellow'
,
'bandies'
,
'beefers'
,
'cameramen'
,
'appalled'
,
'Ionian'
,
'butterball'
,
'Portland'
,
'pleaded'
,
'admiringly'
,
'pricks'
,
'hearty'
,
'corer'
,
'deliverable'
,
'accountably'
,
'mentors'
,
'accorded'
,
'acknowledgement'
,
'Lawrenceville'
,
'morphology'
,
'eucalyptus'
,
'Rena'
,
'enchanting'
,
'tighter'
,
'scholars'
,
'graduations'
,
'edges'
,
'Latinization'
,
'proficiency'
,
'monolithic'
,
'parenthesizing'
,
'defy'
,
'shames'
,
'enjoyment'
,
'Purdue'
,
'disagrees'
,
'barefoot'
,
'maims'
,
'flabbergast'
,
'dishonorable'
,
'interpolation'
,
'fanatics'
,
'dickens'
,
'abysses'
,
'adverse'
,
'components'
,
'bowl'
,
'belong'
,
'Pipestone'
,
'trainees'
,
'paw'
,
'pigtail'
,
'feed'
,
'whore'
,
'conditioner'
,
'Volstead'
,
'voices'
,
'strain'
,
'inhabits'
,
'Edwin'
,
'discourses'
,
'deigns'
,
'cruiser'
,
'biconvex'
,
'biking'
,
'depreciation'
,
'Harrison'
,
'Persian'
,
'stunning'
,
'agar'
,
'rope'
,
'wagoner'
,
'elections'
,
'reticulately'
,
'Cruz'
,
'pulpits'
,
'wilt'
,
'peels'
,
'plants'
,
'administerings'
,
'deepen'
,
'rubs'
,
'hence'
,
'dissension'
,
'implored'
,
'bereavement'
,
'abyss'
,
'Pennsylvania'
,
'benevolent'
,
'corresponding'
,
'Poseidon'
,
'inactive'
,
'butchers'
,
'Mach'
,
'woke'
,
'loading'
,
'utilizing'
,
'Hoosier'
,
'undo'
,
'Semitization'
,
'trigger'
,
'Mouthe'
,
'mark'
,
'disgracefully'
,
'copier'
,
'futility'
,
'gondola'
,
'algebraic'
,
'lecturers'
,
'sponged'
,
'instigators'
,
'looted'
,
'ether'
,
'trust'
,
'feeblest'
,
'sequencer'
,
'disjointness'
,
'congresses'
,
'Vicksburg'
,
'incompatibilities'
,
'commend'
,
'Luxembourg'
,
'reticulation'
,
'instructively'
,
'reconstructs'
,
'bricks'
,
'attache'
,
'Englishman'
,
'provocation'
,
'roughen'
,
'cynic'
,
'plugged'
,
'scrawls'
,
'antipode'
,
'injected'
,
'Daedalus'
,
'Burnsides'
,
'asker'
,
'confronter'
,
'merriment'
,
'disdain'
,
'thicket'
,
'stinker'
,
'great'
,
'tiers'
,
'oust'
,
'antipodes'
,
'Macintosh'
,
'tented'
,
'packages'
,
'Mediterraneanize'
,
'hurts'
,
'orthodontist'
,
'seeder'
,
'readying'
,
'babying'
,
'Florida'
,
'Sri'
,
'buckets'
,
'complementary'
,
'cartographer'
,
'chateaus'
,
'shaves'
,
'thinkable'
,
'Tehran'
,
'Gordian'
,
'Angles'
,
'arguable'
,
'bureau'
,
'smallest'
,
'fans'
,
'navigated'
,
'dipole'
,
'bootleg'
,
'distinctive'
,
'minimization'
,
'absorbed'
,
'surmised'
,
'Malawi'
,
'absorbent'
,
'close'
,
'conciseness'
,
'hopefully'
,
'declares'
,
'descent'
,
'trick'
,
'portend'
,
'unable'
,
'mildly'
,
'Morse'
,
'reference'
,
'scours'
,
'Caribbean'
,
'battlers'
,
'astringency'
,
'likelier'
,
'Byronizes'
,
'econometric'
,
'grad'
,
'steak'
,
'Austrian'
,
'ban'
,
'voting'
,
'Darlington'
,
'bison'
,
'Cetus'
,
'proclaim'
,
'Gilbertson'
,
'evictions'
,
'submittal'
,
'bearings'
,
'Gothicizer'
,
'settings'
,
'McMahon'
,
'densities'
,
'determinants'
,
'period'
,
'DeKastere'
,
'swindle'
,
'promptness'
,
'enablers'
,
'wordy'
,
'during'
,
'tables'
,
'responder'
,
'baffle'
,
'phosgene'
,
'muttering'
,
'limiters'
,
'custodian'
,
'prevented'
,
'Stouffer'
,
'waltz'
,
'Videotex'
,
'brainstorms'
,
'alcoholism'
,
'jab'
,
'shouldering'
,
'screening'
,
'explicitly'
,
'earner'
,
'commandment'
,
'French'
,
'scrutinizing'
,
'Gemma'
,
'capacitive'
,
'sheriff'
,
'herbivore'
,
'Betsey'
,
'Formosa'
,
'scorcher'
,
'font'
,
'damming'
,
'soldiers'
,
'flack'
,
'Marks'
,
'unlinking'
,
'serenely'
,
'rotating'
,
'converge'
,
'celebrities'
,
'unassailable'
,
'bawling'
,
'wording'
,
'silencing'
,
'scotch'
,
'coincided'
,
'masochists'
,
'graphs'
,
'pernicious'
,
'disease'
,
'depreciates'
,
'later'
,
'torus'
,
'interject'
,
'mutated'
,
'causer'
,
'messy'
,
'Bechtel'
,
'redundantly'
,
'profoundest'
,
'autopsy'
,
'philosophic'
,
'iterate'
,
'Poisson'
,
'horridly'
,
'silversmith'
,
'millennium'
,
'plunder'
,
'salmon'
,
'missioner'
,
'advances'
,
'provers'
,
'earthliness'
,
'manor'
,
'resurrectors'
,
'Dahl'
,
'canto'
,
'gangrene'
,
'gabler'
,
'ashore'
,
'frictionless'
,
'expansionism'
,
'emphasis'
,
'preservations'
,
'Duane'
,
'descend'
,
'isolated'
,
'firmware'
,
'dynamites'
,
'scrawled'
,
'cavemen'
,
'ponder'
,
'prosperity'
,
'squaw'
,
'vulnerable'
,
'opthalmic'
,
'Simms'
,
'unite'
,
'totallers'
,
'Waring'
,
'enforced'
,
'bridge'
,
'collecting'
,
'sublime'
,
'Moore'
,
'gobble'
,
'criticizes'
,
'daydreams'
,
'sedate'
,
'apples'
,
'Concordia'
,
'subsequence'
,
'distill'
,
'Allan'
,
'seizure'
,
'Isadore'
,
'Lancashire'
,
'spacings'
,
'corresponded'
,
'hobble'
,
'Boonton'
,
'genuineness'
,
'artifact'
,
'gratuities'
,
'interviewee'
,
'Vladimir'
,
'mailable'
,
'Bini'
,
'Kowalewski'
,
'interprets'
,
'bereave'
,
'evacuated'
,
'friend'
,
'tourists'
,
'crunched'
,
'soothsayer'
,
'fleetly'
,
'Romanizations'
,
'Medicaid'
,
'persevering'
,
'flimsy'
,
'doomsday'
,
'trillion'
,
'carcasses'
,
'guess'
,
'seersucker'
,
'ripping'
,
'affliction'
,
'wildest'
,
'spokes'
,
'sheaths'
,
'procreate'
,
'rusticates'
,
'Schapiro'
,
'thereafter'
,
'mistakenly'
,
'shelf'
,
'ruination'
,
'bushel'
,
'assuredly'
,
'corrupting'
,
'federation'
,
'portmanteau'
,
'wading'
,
'incendiary'
,
'thing'
,
'wanderers'
,
'messages'
,
'Paso'
,
'reexamined'
,
'freeings'
,
'denture'
,
'potting'
,
'disturber'
,
'laborer'
,
'comrade'
,
'intercommunicating'
,
'Pelham'
,
'reproach'
,
'Fenton'
,
'Alva'
,
'oasis'
,
'attending'
,
'cockpit'
,
'scout'
,
'Jude'
,
'gagging'
,
'jailed'
,
'crustaceans'
,
'dirt'
,
'exquisitely'
,
'Internet'
,
'blocker'
,
'smock'
,
'Troutman'
,
'neighboring'
,
'surprise'
,
'midscale'
,
'impart'
,
'badgering'
,
'fountain'
,
'Essen'
,
'societies'
,
'redresses'
,
'afterwards'
,
'puckering'
,
'silks'
,
'Blakey'
,
'sequel'
,
'greet'
,
'basements'
,
'Aubrey'
,
'helmsman'
,
'album'
,
'wheelers'
,
'easternmost'
,
'flock'
,
'ambassadors'
,
'astatine'
,
'supplant'
,
'gird'
,
'clockwork'
,
'foxes'
,
'rerouting'
,
'divisional'
,
'bends'
,
'spacer'
,
'physiologically'
,
'exquisite'
,
'concerts'
,
'unbridled'
,
'crossing'
,
'rock'
,
'leatherneck'
,
'Fortescue'
,
'reloading'
,
'Laramie'
,
'Tim'
,
'forlorn'
,
'revert'
,
'scarcer'
,
'spigot'
,
'equality'
,
'paranormal'
,
'aggrieves'
,
'pegs'
,
'committeewomen'
,
'documented'
,
'interrupt'
,
'emerald'
,
'Battelle'
,
'reconverted'
,
'anticipated'
,
'prejudices'
,
'drowsiness'
,
'trivialities'
,
'food'
,
'blackberries'
,
'Cyclades'
,
'tourist'
,
'branching'
,
'nugget'
,
'Asilomar'
,
'repairmen'
,
'Cowan'
,
'receptacles'
,
'nobler'
,
'Nebraskan'
,
'territorial'
,
'chickadee'
,
'bedbug'
,
'darted'
,
'vigilance'
,
'Octavia'
,
'summands'
,
'policemen'
,
'twirls'
,
'style'
,
'outlawing'
,
'specifiable'
,
'pang'
,
'Orpheus'
,
'epigram'
,
'Babel'
,
'butyrate'
,
'wishing'
,
'fiendish'
,
'accentuate'
,
'much'
,
'pulsed'
,
'adorned'
,
'arbiters'
,
'counted'
,
'Afrikaner'
,
'parameterizes'
,
'agenda'
,
'Americanism'
,
'referenda'
,
'derived'
,
'liquidity'
,
'trembling'
,
'lordly'
,
'Agway'
,
'Dillon'
,
'propellers'
,
'statement'
,
'stickiest'
,
'thankfully'
,
'autograph'
,
'parallel'
,
'impulse'
,
'Hamey'
,
'stylistic'
,
'disproved'
,
'inquirer'
,
'hoisting'
,
'residues'
,
'variant'
,
'colonials'
,
'dequeued'
,
'especial'
,
'Samoa'
,
'Polaris'
,
'dismisses'
,
'surpasses'
,
'prognosis'
,
'urinates'
,
'leaguers'
,
'ostriches'
,
'calculative'
,
'digested'
,
'divided'
,
'reconfigurer'
,
'Lakewood'
,
'illegalities'
,
'redundancy'
,
'approachability'
,
'masterly'
,
'cookery'
,
'crystallized'
,
'Dunham'
,
'exclaims'
,
'mainline'
,
'Australianizes'
,
'nationhood'
,
'pusher'
,
'ushers'
,
'paranoia'
,
'workstations'
,
'radiance'
,
'impedes'
,
'Minotaur'
,
'cataloging'
,
'bites'
,
'fashioning'
,
'Alsop'
,
'servants'
,
'Onondaga'
,
'paragraph'
,
'leadings'
,
'clients'
,
'Latrobe'
,
'Cornwallis'
,
'excitingly'
,
'calorimetric'
,
'savior'
,
'tandem'
,
'antibiotics'
,
'excuse'
,
'brushy'
,
'selfish'
,
'naive'
,
'becomes'
,
'towers'
,
'popularizes'
,
'engender'
,
'introducing'
,
'possession'
,
'slaughtered'
,
'marginally'
,
'Packards'
,
'parabola'
,
'utopia'
,
'automata'
,
'deterrent'
,
'chocolates'
,
'objectives'
,
'clannish'
,
'aspirin'
,
'ferociousness'
,
'primarily'
,
'armpit'
,
'handfuls'
,
'dangle'
,
'Manila'
,
'enlivened'
,
'decrease'
,
'phylum'
,
'hardy'
,
'objectively'
,
'baskets'
,
'chaired'
,
'Sepoy'
,
'deputy'
,
'blizzard'
,
'shootings'
,
'breathtaking'
,
'sticking'
,
'initials'
,
'epitomized'
,
'Forrest'
,
'cellular'
,
'amatory'
,
'radioed'
,
'horrified'
,
'Neva'
,
'simultaneous'
,
'delimiter'
,
'expulsion'
,
'Himmler'
,
'contradiction'
,
'Remus'
,
'Franklinizations'
,
'luggage'
,
'moisture'
,
'Jews'
,
'comptroller'
,
'brevity'
,
'contradictions'
,
'Ohio'
,
'active'
,
'babysit'
,
'China'
,
'youngest'
,
'superstition'
,
'clawing'
,
'raccoons'
,
'chose'
,
'shoreline'
,
'helmets'
,
'Jeffersonian'
,
'papered'
,
'kindergarten'
,
'reply'
,
'succinct'
,
'split'
,
'wriggle'
,
'suitcases'
,
'nonce'
,
'grinders'
,
'anthem'
,
'showcase'
,
'maimed'
,
'blue'
,
'obeys'
,
'unreported'
,
'perusing'
,
'recalculate'
,
'rancher'
,
'demonic'
,
'Lilliputianize'
,
'approximation'
,
'repents'
,
'yellowness'
,
'irritates'
,
'Ferber'
,
'flashlights'
,
'booty'
,
'Neanderthal'
,
'someday'
,
'foregoes'
,
'lingering'
,
'cloudiness'
,
'guy'
,
'consumer'
,
'Berkowitz'
,
'relics'
,
'interpolating'
,
'reappearing'
,
'advisements'
,
'Nolan'
,
'turrets'
,
'skeletal'
,
'skills'
,
'mammas'
,
'Winsett'
,
'wheelings'
,
'stiffen'
,
'monkeys'
,
'plainness'
,
'braziers'
,
'Leary'
,
'advisee'
,
'jack'
,
'verb'
,
'reinterpret'
,
'geometrical'
,
'trolleys'
,
'arboreal'
,
'overpowered'
,
'Cuzco'
,
'poetical'
,
'admirations'
,
'Hobbes'
,
'phonemes'
,
'Newsweek'
,
'agitator'
,
'finally'
,
'prophets'
,
'environment'
,
'easterners'
,
'precomputed'
,
'faults'
,
'rankly'
,
'swallowing'
,
'crawl'
,
'trolley'
,
'spreading'
,
'resourceful'
,
'go'
,
'demandingly'
,
'broader'
,
'spiders'
,
'Marsha'
,
'debris'
,
'operates'
,
'Dundee'
,
'alleles'
,
'crunchier'
,
'quizzical'
,
'hanging'
,
'Fisk'
]
from
ZODB.utils
import
u64
def
incedit
(
edits
,
wait
,
ndel
=
20
,
nins
=
20
):
import
Zope2
,
random
,
string
,
time
from
ZODB.POSException
import
ConflictError
rconflicts
=
wconflicts
=
0
did
=
str
(
edits
.
pop
())
while
edits
:
if
wait
:
time
.
sleep
(
random
.
uniform
(
0
,
wait
))
jar
=
Zope2
.
DB
.
open
()
app
=
jar
.
root
()[
'Application'
]
doc
=
getattr
(
app
.
mail
,
did
)
text
=
string
.
split
(
doc
.
raw
)
n
=
random
.
randint
(
0
,
ndel
*
2
)
for
j
in
range
(
n
):
if
len
(
text
)
<
2
:
break
j
=
random
.
randint
(
0
,
len
(
text
)
-
1
)
#del text[j]
n
=
random
.
randint
(
0
,
nins
*
2
)
for
j
in
range
(
n
):
word
=
random
.
choice
(
words
)
text
.
append
(
word
)
doc
.
raw
=
string
.
join
(
text
)
try
:
app
.
cat
.
catalog_object
(
doc
)
except
ConflictError
,
v
:
#print v.args
rconflicts
=
rconflicts
+
1
transaction
.
abort
()
else
:
try
:
transaction
.
commit
()
did
=
str
(
edits
.
pop
())
except
ConflictError
:
wconflicts
=
wconflicts
+
1
transaction
.
abort
()
doc
=
app
=
0
jar
.
close
()
return
rconflicts
,
wconflicts
def
edit
():
import
Zope2
,
thread
nedit
,
ndel
,
nins
=
atoi
(
sys
.
argv
[
2
]),
atoi
(
sys
.
argv
[
3
]),
atoi
(
sys
.
argv
[
4
])
try
:
threads
=
atoi
(
sys
.
argv
[
5
])
except
:
threads
=
1
wait
=
0
else
:
try
:
wait
=
atof
(
sys
.
argv
[
6
])
except
:
wait
=
0.25
wait
=
wait
*
2
if
threads
==
1
:
start_new_thread
=
apply
else
:
start_new_thread
=
thread
.
start_new_thread
db
=
Zope2
.
DB
app
=
Zope2
.
app
()
number_of_messages
=
app
.
mail
.
number_of_messages
app
.
_p_jar
.
close
()
size
=
db
.
getSize
()
mem
=
VmSize
()
t
=
time
.
time
()
c
=
time
.
clock
()
alledits
=
{}
argss
=
[]
for
i
in
range
(
threads
):
lock
=
thread
.
allocate_lock
()
if
threads
>
1
:
lock
.
acquire
()
def
returnf
(
t
,
c
,
size
,
mem
,
r
,
lock
=
lock
):
print
c
,
r
lock
.
release
()
else
:
def
returnf
(
t
,
c
,
size
,
mem
,
r
,
lock
=
lock
):
print
c
,
r
edits
=
[
0
]
while
len
(
edits
)
<=
nedit
:
edit
=
random
.
randint
(
0
,
number_of_messages
)
if
not
alledits
.
has_key
(
edit
):
alledits
[
edit
]
=
1
edits
.
append
(
edit
)
#print edits
argss
.
append
((
lock
,
(
edits
,
wait
,
ndel
,
nins
),
returnf
))
for
lock
,
args
,
returnf
in
argss
:
start_new_thread
(
do
,
(
Zope2
.
DB
,
incedit
,
args
,
returnf
))
for
lock
,
args
,
returnf
in
argss
:
lock
.
acquire
()
t
=
time
.
time
()
-
t
c
=
time
.
clock
()
-
c
size
=
db
.
getSize
()
-
size
mem
=
VmSize
()
-
mem
print
t
,
c
,
size
,
mem
#hist("e%s" % (threads))
Zope2
.
DB
.
close
()
def
VmSize
():
try
:
f
=
open
(
'/proc/%s/status'
%
os
.
getpid
())
except
:
return
0
else
:
l
=
filter
(
lambda
l
:
l
[:
7
]
==
'VmSize:'
,
f
.
readlines
())
if
l
:
l
=
string
.
split
(
string
.
strip
(
l
[
0
][
7
:]))[
0
]
return
string
.
atoi
(
l
)
return
0
def
pdebug
():
import
pdb
del
sys
.
argv
[
1
]
pdb
.
run
(
'globals()[sys.argv[1]]()'
)
def
usage
(
code
,
msg
=
''
):
print
>>
sys
.
stderr
,
__doc__
if
msg
:
print
>>
sys
.
stderr
,
msg
sys
.
exit
(
code
)
if
__name__
==
'__main__'
:
try
:
opts
,
args
=
getopt
.
getopt
(
sys
.
argv
[
1
:],
'hv'
,
[
'help'
,
'verbose'
])
except
getopt
.
error
,
msg
:
usage
(
1
,
msg
)
for
opt
,
arg
in
opts
:
if
opt
in
(
'-h'
,
'--help'
):
usage
(
0
)
elif
opt
in
(
'-v'
,
'--verbose'
):
VERBOSE
=
1
try
:
f
=
globals
()[
sys
.
argv
[
1
]]
except
:
print
__doc__
sys
.
exit
(
1
)
else
:
f
()
src/Products/ZCatalog/regressiontests/regressionCatalog.py
deleted
100755 → 0
View file @
6a406941
#!/usr/bin/env python
# XXX: Products.PluginIndexes.TextIndex and Vocabulary no longer exist
# Regression test for ZCatalog
import
os
,
sys
sys
.
path
.
insert
(
0
,
'.'
)
try
:
import
Testing
except
ImportError
:
sys
.
path
[
0
]
=
"../../.."
import
Testing
os
.
environ
[
'STUPID_LOG_FILE'
]
=
"debug.log"
here
=
os
.
getcwd
()
import
Zope2
import
ZODB
,
ZODB
.
FileStorage
import
transaction
from
Products.ZCatalog
import
ZCatalog
#,Vocabulary
from
Products.ZCatalog.Catalog
import
CatalogError
import
Persistence
import
ExtensionClass
from
Testing
import
dispatcher
import
keywords
import
getopt
,
random
,
time
,
string
,
mailbox
,
rfc822
import
unittest_patched
as
unittest
# maximum number of files to read for the test suite
maxFiles
=
1000
# maximum number of threads for stress testa
numThreads
=
4
# number of iterations for searches
searchIterations
=
1000
# number of iterations for catalog/uncatalog operations
updateIterations
=
100
# input mailbox file
mbox
=
os
.
environ
.
get
(
"TESTCATALOG_MBOX"
,
"/usr/home/andreas/zope.mbox"
)
mbox2
=
os
.
environ
.
get
(
"TESTCATALOG_MBOX2"
,
"/usr/home/andreas/python.mbox"
)
dataDir
=
""
#
# Don't change anything below
#
class
testZODB
:
""" some wrapper stuff around ZODB """
def
__init__
(
self
,
file
=
"data/work/Data.fs"
,
open
=
1
):
self
.
db
=
ZODB
.
DB
(
ZODB
.
FileStorage
.
FileStorage
(
file
)
)
if
open
==
1
:
self
.
connection
=
self
.
db
.
open
()
self
.
root
=
self
.
connection
.
root
()
def
write
(
self
,
name
,
obj
):
self
.
root
[
name
]
=
obj
transaction
.
commit
()
def
read
(
self
,
name
):
return
self
.
root
[
name
]
def
__del__
(
self
):
self
.
db
.
close
()
class
testCatalog
(
Persistence
.
Persistent
,
unittest
.
TestCase
):
""" Wrapper around the catalog stuff """
def
__init__
(
self
,
mboxname
,
maxfiles
):
self
.
msg_ids
=
[]
self
.
num_files
=
0
self
.
keywords
=
[]
self
.
maxfiles
=
maxfiles
# self._vocabulary = Vocabulary.Vocabulary('Vocabulary',
# 'Vocabulary', globbing=1)
self
.
_catalog
=
ZCatalog
.
ZCatalog
(
"zcatalog"
)
self
.
_catalog
.
addIndex
(
'to'
,
'TextIndex'
)
self
.
_catalog
.
addIndex
(
'sender'
,
'TextIndex'
)
self
.
_catalog
.
addIndex
(
'subject'
,
'TextIndex'
)
self
.
_catalog
.
addIndex
(
'content'
,
'TextIndex'
)
self
.
_catalog
.
addIndex
(
'file_id'
,
'TextIndex'
)
self
.
_catalog
.
addColumn
(
'file_id'
)
self
.
_catalog
.
addIndex
(
'length'
,
'FieldIndex'
)
self
.
_catalog
.
addColumn
(
'length'
)
self
.
_catalog
.
addIndex
(
'date'
,
'FieldIndex'
)
self
.
_catalog
.
addIndex
(
'keywords'
,
"KeywordIndex"
)
self
.
build_catalog
(
mboxname
)
def
build_catalog
(
self
,
mboxname
):
mb
=
mailbox
.
UnixMailbox
(
open
(
mboxname
,
"r"
))
i
=
0
msg
=
mb
.
next
()
while
msg
and
self
.
num_files
<
self
.
maxfiles
:
try
:
self
.
catMessage
(
msg
)
self
.
msg_ids
.
append
(
msg
.
dict
[
"message-id"
])
except
:
msg
=
mb
.
next
()
continue
msg
=
mb
.
next
()
self
.
num_files
=
self
.
num_files
+
1
if
self
.
num_files
%
100
==
0
:
print
self
.
num_files
try
:
sub
=
string
.
split
(
msg
.
dict
.
get
(
"subject"
,
""
))
except
:
msg
=
mb
.
next
()
continue
for
s
in
sub
:
if
not
s
in
self
.
keywords
:
self
.
keywords
.
append
(
s
)
self
.
_catalog
.
aq_parent
=
None
def
catMessage
(
self
,
m
):
self
.
_catalog
.
catalogObject
(
testMessage
(
m
)
,
m
.
dict
[
"message-id"
]
)
def
uncatMessage
(
self
,
uid
):
self
.
_catalog
.
uncatalogObject
(
uid
)
class
testMessage
(
ExtensionClass
.
Base
):
def
__init__
(
self
,
msg
,
modify_doc
=
0
):
self
.
sender
=
msg
.
dict
.
get
(
"from"
,
""
)
self
.
subject
=
msg
.
dict
.
get
(
"subject"
,
""
)
self
.
to
=
msg
.
dict
.
get
(
"to"
,
""
)
self
.
content
=
str
(
msg
)
self
.
keywords
=
string
.
split
(
self
.
subject
,
" "
)
if
modify_doc
!=
0
:
self
.
keywords
=
map
(
self
.
reverse
,
self
.
keywords
)
self
.
file_id
=
msg
.
dict
.
get
(
"message-id"
,
""
)
self
.
length
=
len
(
str
(
msg
))
date
=
msg
.
dict
.
get
(
"date"
,
""
)
try
:
self
.
date
=
time
.
mktime
(
rfc822
.
parsedate
(
date
)[:
9
])
except
:
pass
def
reverse
(
self
,
s
):
l
=
list
(
s
)
l
.
reverse
()
return
string
.
join
(
l
,
""
)
def
__del__
(
self
):
pass
class
BuildEnv
(
dispatcher
.
Dispatcher
,
unittest
.
TestCase
):
""" build environment """
def
__init__
(
self
,
func
,
*
args
,
**
kw
):
unittest
.
TestCase
.
__init__
(
self
,
func
,
args
,
kw
)
dispatcher
.
Dispatcher
.
__init__
(
self
,
func
)
self
.
init_phase
=
0
self
.
setlog
(
open
(
"dispatcher.log"
,
"a"
)
)
self
.
logn
(
'treads=%d searchiterations=%d'
%
(
numThreads
,
searchIterations
))
self
.
logn
(
'updateiterations=%d maxfiles=%d'
%
(
updateIterations
,
maxFiles
))
#############################################################
# Build up ZODB
#############################################################
def
buildTestEnvironment
(
self
,
args
,
kw
):
self
.
init_phase
=
1
self
.
dispatcher
(
"funcTestEnvironment"
,(
"funcTestEnvironment"
,
1
,
args
,
kw
))
def
funcTestEnvironment
(
self
,
dataDir
,
maxFiles
):
env
=
self
.
th_setup
()
if
not
os
.
path
.
exists
(
dataDir
):
os
.
makedirs
(
dataDir
)
os
.
system
(
"rm -f %s/*"
%
dataDir
)
zodb
=
testZODB
(
"%s/Data_orig.fs"
%
dataDir
)
print
"parsing and reading mailbox file %s....please wait"
%
mbox
tc
=
testCatalog
(
mbox
,
maxFiles
)
print
"writing Catalog to ZODB"
zodb
.
write
(
"catalog"
,
tc
)
print
"Creating keywords file"
kw
=
keywords
.
Keywords
()
kw
.
build
(
mbox
,
1000
)
print
tc
.
num_files
,
"files read"
print
"Initalization complete"
self
.
th_teardown
(
env
)
class
testSearches
(
dispatcher
.
Dispatcher
,
unittest
.
TestCase
):
""" test searches """
def
__init__
(
self
,
func
,
*
args
,
**
kw
):
unittest
.
TestCase
.
__init__
(
self
,
func
,
args
,
kw
)
dispatcher
.
Dispatcher
.
__init__
(
self
,
func
)
self
.
init_phase
=
0
self
.
setlog
(
open
(
"dispatcher.log"
,
"a"
)
)
def
setUp
(
self
):
os
.
system
(
"rm -fr data/work"
)
if
not
os
.
path
.
exists
(
"data/work"
):
os
.
makedirs
(
"data/work"
)
assert
os
.
system
(
"cp %s/Data_orig.fs data/work/Data.fs"
%
dataDir
)
==
0
,
\
"Error while replicating original data"
self
.
zodb
=
testZODB
(
"data/work/Data.fs"
,
open
=
0
)
self
.
threads
=
{}
self
.
init_zodb_size
=
self
.
zodb_size
()
kw
=
keywords
.
Keywords
()
kw
.
reload
()
self
.
keywords
=
kw
.
keywords
()
self
.
logn
(
"-"
*
80
)
self
.
logn
(
'treads=%d searchiterations=%d'
%
(
numThreads
,
searchIterations
))
self
.
logn
(
'updateiterations=%d maxfiles=%d'
%
(
updateIterations
,
maxFiles
))
def
tearDown
(
self
):
self
.
log_zodb_size
(
"before"
,
self
.
init_zodb_size
)
self
.
log_zodb_size
(
"after "
,
self
.
zodb_size
())
del
self
.
zodb
self
.
zodb
=
self
.
catalog
=
None
def
log_zodb_size
(
self
,
s
,
n
):
self
.
logn
(
"Size of ZODB (data/work/Data.fs) %s test : %s"
%
(
s
,
n
)
)
def
zodb_size
(
self
):
return
self
.
size2size
(
os
.
stat
(
"data/work/Data.fs"
)[
6
])
def
size2size
(
self
,
n
):
import
math
if
n
<
1024.0
:
return
"%8.3lf Bytes"
%
n
if
n
<
1024.0
*
1024.0
:
return
"%8.3lf KB"
%
(
1.0
*
n
/
1024.0
)
if
n
<
1024.0
*
1024.0
*
1024.0
:
return
"%8.3lf MB"
%
(
1.0
*
n
/
1024.0
/
1024.0
)
#############################################################
# Fulltext test
#############################################################
def
testFulltextIndex
(
self
,
args
,
kw
):
""" benchmark FulltextIndex """
self
.
dispatcher
(
'funcFulltextIndex'
,
(
'funcFulltextIndex'
,
kw
[
"numThreads"
]
,
()
,
{}
)
)
def
funcFulltextIndex
(
self
,
*
args
):
""" benchmark FulltextIndex """
cat
,
msg_ids
=
self
.
get_catalog
()
env
=
self
.
th_setup
()
for
kw
in
self
.
keywords
:
res
=
cat
.
searchResults
(
{
"content"
:
kw
}
)
self
.
th_teardown
(
env
)
#############################################################
# Field index test
#############################################################
def
testFieldIndex
(
self
,
args
,
kw
):
""" benchmark field index"""
self
.
dispatcher
(
'funcFieldIndex'
,
(
'funcFieldIndex'
,
kw
[
"numThreads"
]
,
()
,
{}
)
)
def
funcFieldIndex
(
self
,
*
args
):
""" benchmark FieldIndex """
cat
,
msg_ids
=
self
.
get_catalog
()
env
=
self
.
th_setup
()
for
i
in
range
(
0
,
searchIterations
):
res
=
cat
.
searchResults
(
{
"length"
:
i
}
)
for
r
in
res
:
assert
i
==
r
.
length
,
"%s should have size %d but is %s"
%
\
(
r
.
file_id
,
i
,
r
.
length
)
self
.
th_teardown
(
env
)
#############################################################
# Keyword index test
#############################################################
def
testKeywordIndex
(
self
,
args
,
kw
):
""" benchmark Keyword index"""
self
.
dispatcher
(
'funcKeywordIndex'
,
(
'funcKeywordIndex'
,
kw
[
"numThreads"
]
,
()
,
{}
)
)
def
funcKeywordIndex
(
self
,
*
args
):
""" benchmark KeywordIndex """
cat
,
msg_ids
=
self
.
get_catalog
()
env
=
self
.
th_setup
()
for
kw
in
self
.
keywords
:
res
=
cat
.
searchResults
(
{
"subject"
:
kw
}
)
# assert len(res) != 0 , "Search result for keyword '%s' is empty" % kw
self
.
th_teardown
(
env
)
#############################################################
# Field range index test
#############################################################
def
testFieldRangeIndex
(
self
,
args
,
kw
):
""" benchmark field range index"""
self
.
dispatcher
(
'funcFieldRangeIndex'
,
(
'funcFieldRangeIndex'
,
kw
[
"numThreads"
]
,
()
,
{}
)
)
def
funcFieldRangeIndex
(
self
,
*
args
):
""" benchmark FieldRangeIndex """
cat
,
msg_ids
=
self
.
get_catalog
()
env
=
self
.
th_setup
()
rg
=
[]
for
i
in
range
(
searchIterations
):
m
=
random
.
randint
(
0
,
10000
)
n
=
m
+
200
rg
.
append
((
m
,
n
))
for
i
in
range
(
searchIterations
):
for
r
in
cat
.
searchResults
(
{
"length"
:
rg
[
i
],
"length_usage"
:
"range:min:max"
}
):
size
=
r
.
length
assert
rg
[
i
][
0
]
<=
size
and
size
<=
rg
[
i
][
1
]
,
\
"Filesize of %s is out of range (%d,%d) %d"
%
(
r
.
file_id
,
rg
[
i
][
0
],
rg
[
i
][
1
],
size
)
self
.
th_teardown
(
env
)
#############################################################
# Keyword + range index test
#############################################################
def
testKeywordRangeIndex
(
self
,
args
,
kw
):
""" benchmark Keyword range index"""
self
.
dispatcher
(
'funcKeywordRangeIndex'
,
(
'funcKeywordRangeIndex'
,
kw
[
"numThreads"
]
,
()
,
{}
)
)
def
funcKeywordRangeIndex
(
self
,
*
args
):
""" benchmark Keyword & IndexRange search """
cat
,
msg_ids
=
self
.
get_catalog
()
rg
=
[]
for
i
in
range
(
len
(
self
.
keywords
)):
m
=
random
.
randint
(
0
,
10000
)
n
=
m
+
200
rg
.
append
(
(
m
,
n
)
)
env
=
self
.
th_setup
()
results
=
[]
for
i
in
range
(
len
(
self
.
keywords
)):
results
.
append
(
cat
.
searchResults
(
{
"keywords"
:
self
.
keywords
[
i
],
"length"
:
rg
[
i
],
"length_usage"
:
"range:min:max"
}
)
)
self
.
th_teardown
(
env
)
#############################################################
# Test full reindexing
#############################################################
def
testUpdates
(
self
,
args
,
kw
):
""" benchmark concurrent catalog/uncatalog operations """
self
.
dispatcher
(
"testUpdates"
,
(
"funcUpdates"
,
kw
[
"numThreads"
]
,
args
,
kw
))
def
funcUpdates
(
self
,
*
args
,
**
kw
):
""" benchmark concurrent catalog/uncatalog operations """
uncat_conflicts
=
cat_conflicts
=
0
cat
,
msg_ids
=
self
.
get_catalog
()
msgs
=
self
.
setupUpdatesMethod
(
kw
[
"numUpdates"
])
keys
=
msgs
.
keys
()
rdgen
=
random
.
Random
()
env
=
self
.
th_setup
()
for
i
in
range
(
len
(
keys
)):
r
=
rdgen
.
randint
(
0
,
len
(
msgs
)
-
1
)
mid
=
keys
[
r
]
obj
=
msgs
[
mid
]
try
:
cat
.
uncatalog_object
(
mid
)
if
kw
.
get
(
"commit"
,
1
)
==
1
:
transaction
.
commit
()
time
.
sleep
(
0.1
)
except
ZODB
.
POSException
.
ConflictError
:
uncat_conflicts
=
uncat_conflicts
+
1
try
:
cat
.
catalog_object
(
obj
,
mid
)
if
kw
.
get
(
"commit"
,
1
)
==
1
:
transaction
.
commit
()
time
.
sleep
(
0.1
)
except
ZODB
.
POSException
.
ConflictError
:
cat_conflicts
=
cat_conflicts
+
1
try
:
transaction
.
commit
()
except
:
pass
self
.
th_teardown
(
env
,
cat_conflicts
=
cat_conflicts
,
uncat_conflicts
=
uncat_conflicts
)
def
setupUpdatesMethod
(
self
,
numUpdates
):
""" this method prepares a datastructure for the updates test.
we are reading the first n mails from the primary mailbox.
they are used for the update test
"""
i
=
0
dict
=
{}
mb
=
mailbox
.
UnixMailbox
(
open
(
mbox
,
"r"
))
msg
=
mb
.
next
()
while
msg
and
i
<
numUpdates
:
obj
=
testMessage
(
msg
)
mid
=
msg
.
dict
.
get
(
"message-id"
,
None
)
if
mid
:
dict
[
mid
]
=
obj
i
=
i
+
1
msg
=
mb
.
next
()
return
dict
#############################################################
# Test full reindexing
#############################################################
def
testReindexing
(
self
,
args
,
kw
):
""" test reindexing of existing data """
self
.
dispatcher
(
"testReindexing"
,
(
"funcReindexing"
,
kw
[
"numThreads"
]
,
(
mbox
,
1000
)
,
{}
))
def
testReindexingAndModify
(
self
,
args
,
kw
):
""" test reindexing of existing data but with modifications"""
self
.
dispatcher
(
"testReindexing"
,
(
"funcReindexing"
,
kw
[
"numThreads"
]
,
(
mbox
,
1000
,
1
)
,
{}
))
def
funcReindexing
(
self
,
mbox
,
numfiles
=
100
,
modify_doc
=
0
):
""" test reindexing of existing data """
cat_conflicts
=
0
cat
,
msg_ids
=
self
.
get_catalog
()
env
=
self
.
th_setup
()
mb
=
mailbox
.
UnixMailbox
(
open
(
mbox
,
"r"
))
i
=
0
msg
=
mb
.
next
()
while
msg
and
i
<
numfiles
:
obj
=
testMessage
(
msg
,
modify_doc
)
if
msg
.
dict
.
has_key
(
"message-id"
):
mid
=
msg
.
dict
[
"message-id"
]
else
:
msg
=
mb
.
next
()
continue
try
:
cat
.
catalogObject
(
obj
,
mid
)
transaction
.
commit
()
except
:
cat_conflicts
=
cat_conflicts
+
1
msg
=
mb
.
next
()
i
=
i
+
1
if
i
%
100
==
0
:
print
i
self
.
th_teardown
(
env
,
cat_conflicts
=
cat_conflicts
)
#############################################################
# Test full reindexing
#############################################################
def
testIncrementalIndexing
(
self
,
args
,
kw
):
""" testing incremental indexing """
self
.
dispatcher
(
"testIncrementalIndexing"
,
(
"funcReindexing"
,
kw
[
"numThreads"
],
(
mbox2
,
1000
)
,
{}))
def
get_catalog
(
self
):
""" return a catalog object """
# depended we are running in multithreaded mode we must take
# care how threads open the ZODB
connection
=
self
.
zodb
.
db
.
open
()
root
=
connection
.
root
()
cat
=
root
[
"catalog"
].
_catalog
msg_ids
=
root
[
'catalog'
].
msg_ids
return
cat
,
msg_ids
def
usage
(
program
):
print
"Usage: "
print
print
"initalize the test catalog: %s -i -f <maximum number files to use> "
%
program
print
"to run the basic tests: %s -b -f <maximum number files to use> "
%
program
print
"to run the advanced tests: %s -a -f <maximum number files to use> "
%
program
def
main
():
global
dataDir
,
maxFiles
opts
,
args
=
getopt
.
getopt
(
sys
.
argv
[
1
:],
"hiabf:xp"
,[
'help'
])
opts
.
sort
()
optsLst
=
map
(
lambda
x
:
x
[
0
],
opts
)
if
optsLst
==
[]:
usage
(
os
.
path
.
basename
(
sys
.
argv
[
0
]));
sys
.
exit
(
0
)
for
k
,
v
in
opts
:
if
k
in
[
'-h'
,
'--help'
]
:
usage
(
os
.
path
.
basename
(
sys
.
argv
[
0
]));
sys
.
exit
(
0
)
if
k
==
"-f"
:
maxFiles
=
string
.
atoi
(
v
)
dataDir
=
os
.
path
.
join
(
"data"
,
str
(
maxFiles
))
if
'-i'
in
optsLst
:
unittest
.
TextTestRunner
().
run
(
get_tests
(
'init'
))
if
'-b'
in
optsLst
:
unittest
.
TextTestRunner
().
run
(
get_tests
(
'bench1'
))
if
'-a'
in
optsLst
:
unittest
.
TextTestRunner
().
run
(
get_tests
(
'bench2'
))
if
'-x'
in
optsLst
:
unittest
.
TextTestRunner
().
run
(
get_tests
(
'exp'
))
if
'-p'
in
optsLst
:
unittest
.
TextTestRunner
().
run
(
test_suite
())
def
test_suite
():
return
get_tests
(
'basic'
)
def
get_tests
(
what
):
global
dataDir
,
maxFiles
if
what
==
'basic'
:
maxFiles
=
100
dataDir
=
'data/%d'
%
maxFiles
t_aj
=
(
BuildEnv
(
'buildTestEnvironment'
,
dataDir
,
maxFiles
),
testSearches
(
"testFulltextIndex"
,
numThreads
=
1
),
testSearches
(
"testFieldIndex"
,
numThreads
=
1
),
testSearches
(
"testFieldRangeIndex"
,
numThreads
=
1
),
testSearches
(
"testKeywordIndex"
,
numThreads
=
1
),
testSearches
(
"testKeywordRangeIndex"
,
numThreads
=
1
)
)
bench1_tests
=
(
testSearches
(
"testFulltextIndex"
,
numThreads
=
1
),
testSearches
(
"testFulltextIndex"
,
numThreads
=
4
),
testSearches
(
"testFieldIndex"
,
numThreads
=
1
),
testSearches
(
"testFieldIndex"
,
numThreads
=
4
),
testSearches
(
"testFieldRangeIndex"
,
numThreads
=
1
),
testSearches
(
"testFieldRangeIndex"
,
numThreads
=
4
),
testSearches
(
"testKeywordIndex"
,
numThreads
=
1
),
testSearches
(
"testKeywordIndex"
,
numThreads
=
4
),
testSearches
(
"testKeywordRangeIndex"
,
numThreads
=
1
),
testSearches
(
"testKeywordRangeIndex"
,
numThreads
=
4
)
)
bench2_tests
=
(
# testSearches("testReindexing",numThreads=1),
# testSearches("testIncrementalIndexing",numThreads=1),
testSearches
(
"testUpdates"
,
numThreads
=
2
,
numUpdates
=
200
),
# testSearches("testUpdates",numThreads=4,numUpdates=200)
)
exp_tests
=
(
# testRS("testRangeSearch"),
# testSearches("testReindexing",numThreads=1),
testSearches
(
"testReindexingAndModify"
,
numThreads
=
1
),
# testSearches("testUpdates",numThreads=10,numUpdates=100),
)
init_tests
=
(
BuildEnv
(
"buildTestEnvironment"
,
dataDir
,
maxFiles
)
,
)
ts
=
unittest
.
TestSuite
()
for
x
in
eval
(
'%s_tests'
%
what
):
ts
.
addTest
(
x
)
return
ts
return
def
pdebug
():
import
pdb
test_suite
()
def
debug
():
test_suite
().
debug
()
def
pdebug
():
import
pdb
pdb
.
run
(
'debug()'
)
if
__name__
==
'__main__'
:
main
()
src/Products/ZCatalog/regressiontests/regressionCatalogTiming.py
deleted
100644 → 0
View file @
6a406941
# XXX: Products.PluginIndexes.TextIndex no longer exists
import
os
,
sys
sys
.
path
.
insert
(
0
,
'.'
)
try
:
import
Testing
os
.
environ
[
'SOFTWARE_HOME'
]
=
os
.
environ
.
get
(
'SOFTWARE_HOME'
,
'.'
)
except
ImportError
:
sys
.
path
[
0
]
=
'../../..'
import
Testing
os
.
environ
[
'SOFTWARE_HOME'
]
=
'../../..'
os
.
environ
[
'INSTANCE_HOME'
]
=
os
.
environ
.
get
(
'INSTANCE_HOME'
,
os
.
path
.
join
(
os
.
environ
[
'SOFTWARE_HOME'
],
'..'
,
'..'
)
)
os
.
environ
[
'STUPID_LOG_FILE'
]
=
os
.
path
.
join
(
os
.
environ
[
'INSTANCE_HOME'
],
'var'
,
'debug.log'
)
here
=
os
.
getcwd
()
import
Zope2
import
mailbox
,
time
,
httplib
from
string
import
strip
,
find
,
split
,
lower
,
atoi
,
join
from
urllib
import
quote
from
Products.ZCatalog
import
ZCatalog
from
unittest
import
TestCase
,
TestSuite
,
JUnitTextTestRunner
,
\
VerboseTextTestRunner
,
makeSuite
import
transaction
from
Products.PluginIndexes.FieldIndex.FieldIndex
import
FieldIndex
#from Products.PluginIndexes.TextIndex.TextIndex import TextIndex
#from Products.PluginIndexes.TextIndex.Lexicon import Lexicon
from
Products.PluginIndexes.KeywordIndex.KeywordIndex
import
KeywordIndex
from
Testing.makerequest
import
makerequest
TextTestRunner
=
VerboseTextTestRunner
class
TestTimeIndex
(
TestCase
):
def
setUp
(
self
):
self
.
app
=
makerequest
(
Zope2
.
app
())
try
:
self
.
app
.
_delObject
(
'catalogtest'
)
except
AttributeError
:
pass
self
.
app
.
manage_addFolder
(
'catalogtest'
)
zcatalog
=
ZCatalog
.
ZCatalog
(
'catalog'
,
'a catalog'
)
self
.
app
.
catalogtest
.
_setObject
(
'catalog'
,
zcatalog
)
c
=
self
.
app
.
catalogtest
.
catalog
for
x
in
(
'title'
,
'to'
,
'from'
,
'date'
,
'raw'
):
try
:
c
.
manage_delIndex
([
x
])
except
:
pass
c
.
manage_addIndex
(
'title'
,
'TextIndex'
)
c
.
manage_addIndex
(
'to'
,
'TextIndex'
)
c
.
manage_addIndex
(
'from'
,
'TextIndex'
)
c
.
manage_addIndex
(
'date'
,
'FieldIndex'
)
c
.
manage_addIndex
(
'raw'
,
'TextIndex'
)
def
tearDown
(
self
):
try
:
self
.
app
.
_delObject
(
'catalogtest'
)
except
AttributeError
:
pass
try
:
self
.
app
.
_p_jar
.
_db
.
pack
()
self
.
app
.
_p_jar
.
close
()
except
AttributeError
:
pass
self
.
app
=
None
del
self
.
app
def
checkTimeBulkIndex
(
self
):
print
c
=
self
.
app
.
catalogtest
.
catalog
t
=
time
.
time
()
loadmail
(
self
.
app
.
catalogtest
,
'zopemail'
,
os
.
path
.
join
(
here
,
'zope.mbox'
),
500
)
transaction
.
commit
()
loadtime
=
time
.
time
()
-
t
out
(
"loading data took %s seconds.. "
%
loadtime
)
t
=
time
.
time
()
req
=
self
.
app
.
REQUEST
parents
=
[
self
.
app
.
catalogtest
.
catalog
,
self
.
app
.
catalogtest
,
self
.
app
]
req
[
'PARENTS'
]
=
parents
rsp
=
self
.
app
.
REQUEST
.
RESPONSE
url1
=
''
c
.
manage_catalogFoundItems
(
req
,
rsp
,
url1
,
url1
,
obj_metatypes
=
[
'DTML Document'
])
indextime
=
time
.
time
()
-
t
out
(
"bulk index took %s seconds.. "
%
indextime
)
out
(
"total time for load and index was %s seconds.. "
%
(
loadtime
+
indextime
))
def
checkTimeIncrementalIndexAndQuery
(
self
):
print
c
=
self
.
app
.
catalogtest
.
catalog
t
=
time
.
time
()
max
=
500
m
=
loadmail
(
self
.
app
.
catalogtest
,
'zopemail'
,
os
.
path
.
join
(
here
,
'zope.mbox'
),
max
,
c
)
transaction
.
commit
()
total
=
time
.
time
()
-
t
out
(
"total time for load and index was %s seconds.. "
%
total
)
t
=
time
.
time
()
rs
=
c
()
# empty query should return all
assert
len
(
rs
)
==
max
,
len
(
rs
)
dates
=
m
[
'date'
]
froms
=
m
[
'from'
]
tos
=
m
[
'to'
]
titles
=
m
[
'title'
]
assert
len
(
c
({
'date'
:
'foobarfoo'
}))
==
0
# should return no results
for
x
in
dates
:
assert
len
(
c
({
'date'
:
x
}))
==
1
# each date should be fieldindexed
assert
len
(
c
({
'from'
:
'a'
}))
==
0
# should be caught by splitter
assert
len
(
c
({
'raw'
:
'chris'
}))
!=
0
assert
len
(
c
({
'raw'
:
'gghdjkasjdsda'
}))
==
0
assert
c
({
'PrincipiaSearchSource'
:
'the*'
})
def
checkTimeSubcommit
(
self
):
print
for
x
in
(
None
,
100
,
500
,
1000
,
10000
):
out
(
"testing subcommit at theshhold of %s"
%
x
)
if
x
is
not
None
:
self
.
setUp
()
c
=
self
.
app
.
catalogtest
.
catalog
c
.
threshold
=
x
transaction
.
commit
()
t
=
time
.
time
()
loadmail
(
self
.
app
.
catalogtest
,
'zopemail'
,
os
.
path
.
join
(
here
,
'zope.mbox'
),
500
,
c
)
transaction
.
commit
()
total
=
time
.
time
()
-
t
out
(
"total time with subcommit thresh %s was %s seconds.. "
%
(
x
,
total
))
self
.
tearDown
()
# utility
def
loadmail
(
folder
,
name
,
mbox
,
max
=
None
,
catalog
=
None
):
"""
creates a folder inside object 'folder' named 'name', opens
filename 'mbox' and adds 'max' mail messages as DTML documents to
the ZODB inside the folder named 'name'. If 'catalog' (which
should be a ZCatalog object) is passed in, call catalog_object on it
with the document while we're iterating. If 'max' is not None,
only do 'max' messages, else do all messages in the mbox archive.
"""
m
=
{
'date'
:[],
'from'
:[],
'to'
:[],
'title'
:[]}
folder
.
manage_addFolder
(
name
)
folder
=
getattr
(
folder
,
name
)
mb
=
mailbox
.
UnixMailbox
(
open
(
mbox
))
i
=
0
every
=
100
message
=
mb
.
next
()
while
message
:
part
=
`i/every * 100`
try
:
dest
=
getattr
(
folder
,
part
)
except
AttributeError
:
folder
.
manage_addFolder
(
part
)
dest
=
getattr
(
folder
,
part
)
dest
.
manage_addDTMLDocument
(
str
(
i
),
file
=
message
.
fp
.
read
())
doc
=
getattr
(
dest
,
str
(
i
))
i
=
i
+
1
for
h
in
message
.
headers
:
h
=
strip
(
h
)
l
=
find
(
h
,
':'
)
if
l
<=
0
:
continue
name
=
lower
(
h
[:
l
])
if
name
==
'subject'
:
name
=
'title'
h
=
strip
(
h
[
l
+
1
:])
type
=
'string'
if
0
and
name
==
'date'
:
type
=
'date'
elif
0
:
try
:
atoi
(
h
)
except
:
pass
else
:
type
=
int
if
name
==
'title'
:
doc
.
manage_changeProperties
(
title
=
h
)
m
[
name
].
append
(
h
)
elif
name
in
(
'to'
,
'from'
,
'date'
):
try
:
doc
.
manage_addProperty
(
name
,
h
,
type
)
except
:
pass
m
[
name
].
append
(
h
)
if
catalog
:
path
=
join
(
doc
.
getPhysicalPath
(),
'/'
)
catalog
.
catalog_object
(
doc
,
path
)
if
max
is
not
None
:
if
i
>=
max
:
break
message
=
mb
.
next
()
return
m
def
out
(
s
):
print
" %s"
%
s
def
test_suite
():
s1
=
makeSuite
(
TestTimeIndex
,
'check'
)
testsuite
=
TestSuite
((
s1
,))
return
testsuite
def
main
():
mb
=
os
.
path
.
join
(
here
,
'zope.mbox'
)
if
not
os
.
path
.
isfile
(
mb
):
print
"do you want to get the zope.mbox file from lists.zope.org?"
print
"it's required for testing (98MB, ~ 30mins on fast conn)"
print
"it's also available at korak:/home/chrism/zope.mbox"
print
"-- type 'Y' or 'N'"
a
=
raw_input
()
if
lower
(
a
[:
1
])
==
'y'
:
server
=
'lists.zope.org:80'
method
=
'/pipermail/zope.mbox/zope.mbox'
h
=
httplib
.
HTTP
(
server
)
h
.
putrequest
(
'GET'
,
method
)
h
.
putheader
(
'User-Agent'
,
'silly'
)
h
.
putheader
(
'Accept'
,
'text/html'
)
h
.
putheader
(
'Accept'
,
'text/plain'
)
h
.
putheader
(
'Host'
,
server
)
h
.
endheaders
()
errcode
,
errmsg
,
headers
=
h
.
getreply
()
if
errcode
!=
200
:
f
=
h
.
getfile
()
data
=
f
.
read
()
print
data
class
HTTPRequestError
(
Exception
):
pass
raise
HTTPRequestError
,
"Error reading from host %s"
%
server
f
=
h
.
getfile
()
out
=
open
(
mb
,
'w'
)
print
"this is going to take a while..."
print
"downloading mbox from %s"
%
server
while
1
:
l
=
f
.
readline
()
if
not
l
:
break
out
.
write
(
l
)
alltests
=
test_suite
()
runner
=
TextTestRunner
()
runner
.
run
(
alltests
)
def
debug
():
test_suite
().
debug
()
if
__name__
==
'__main__'
:
if
len
(
sys
.
argv
)
>
1
:
globals
()[
sys
.
argv
[
1
]]()
else
:
main
()
src/Products/ZCatalog/regressiontests/regressionUnicode.py
deleted
100644 → 0
View file @
6a406941
# XXX: Products.PluginIndexes.TextIndex no longer exists
import
os
,
sys
import
unittest
import
Zope2
from
Products.ZCatalog.ZCatalog
import
ZCatalog
#from Products.PluginIndexes.TextIndex import Splitter
# This patch pretends the ZCatalog is using the Unicode Splitter
# but by default the ZCatalog/TextIndexes uses the standard
# non-unicode-aware ZopeSplitter
#Splitter.availableSplitters = [ ("UnicodeSplitter" , "Unicode-aware splitter")]
#Splitter.splitterNames = [ "UnicodeSplitter" ]
class
TO
:
def
__init__
(
self
,
txt
,
kw
=
''
):
self
.
text
=
txt
self
.
kw
=
kw
class
UnicodeTextIndexCatalogTest
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
cat
=
ZCatalog
(
"catalog"
)
self
.
cat
.
addIndex
(
'text'
,
"TextIndex"
)
self
.
cat
.
addColumn
(
'text'
)
self
.
cat
.
addIndex
(
'kw'
,
'KeywordIndex'
)
self
.
cat
.
addColumn
(
'kw'
)
t1
=
TO
(
'the quick brown fox jumps over the lazy dog'
,[
'quick'
,
'fox'
])
t2
=
TO
(
'i am the nice alien from the future'
,[
'alien'
,
'future'
])
t3
=
TO
(
'i am a brown fox dancing with a future alien'
,[
'zerstrt'
,
'knnten'
])
t4
=
TO
(
'i am a brown '
+
unicode
(
'fox'
)
+
' dancing with a future alien'
,[])
t5
=
TO
(
"""
Die USA und Grobritannien knnen nach der Zerstrung der
afghanischen Luftabwehr nun rund um die Uhr Angriffe fliegen. Das gab
Verteidigungsminister Donald Rumsfeld bekannt. Bei den dreitgigen Angriffen
seien auch bis auf einen alle Flugpltze der Taliban zerstrt worden. Rumsfeld
erklrte weiter, er knne die Berichte nicht besttigen, wonach bei den
amerikanischen Angriffen vier afghanische Mitarbeiter einer von den UN
finanzierten Hilfsorganisation gettet wurden. Diese knnten auch durch
Gegenfeuer der Taliban gettet worden sein.
"""
,[
unicode
(
'dreitgigen'
,
'latin1'
),
'zerstrt'
])
self
.
cat
.
catalog_object
(
t1
,
"o1"
)
self
.
cat
.
catalog_object
(
t2
,
"o2"
)
self
.
cat
.
catalog_object
(
t3
,
"o3"
)
self
.
cat
.
catalog_object
(
t4
,
"o4"
)
self
.
cat
.
catalog_object
(
t5
,
"o5"
)
self
.
tests
=
[(
'quick'
,(
'o1'
,)),
(
'fox'
,(
'o1'
,
'o3'
,
'o4'
)),
(
'afghanischen'
,
(
'o5'
,)),
(
'dreitgigen'
,(
'o5'
,))
]
self
.
kw_tests
=
[
(
'quick'
,(
'o1'
,)
),
(
'zerstrt'
,(
'o3'
,
'o5'
)),
(
'dreitgigen'
,(
'o5'
,))
]
def
_doTests
(
self
,
tests
,
field
,
test_unicode
=
0
):
for
q
,
objs
in
tests
:
if
test_unicode
:
res
=
self
.
cat
.
searchResults
({
field
:{
'query'
:
unicode
(
q
,
'latin1'
)}})
else
:
res
=
self
.
cat
.
searchResults
({
field
:{
'query'
:
q
}})
got
=
[
x
.
getURL
()
for
x
in
res
]
got
.
sort
()
expected
=
list
(
objs
)
expected
.
sort
()
assert
got
==
expected
,
\
"%s: got: %s, expected: %s"
%
(
q
,
got
,
expected
)
def
testAsciiQuery
(
self
):
""" ascii query textindex """
self
.
_doTests
(
self
.
tests
,
'text'
,
test_unicode
=
0
)
def
testUnicodeQuery
(
self
):
""" unicode query textindex """
self
.
_doTests
(
self
.
tests
,
'text'
,
test_unicode
=
1
)
# The Tests for KeywordIndexes are disabled at this time
# because of a strange behaviour of OOBTrees containing
# mixed strings and unicode strings
#
#
# def testAsciiKeywords(self):
# """ ascii query keyword index """
# self._doTests(self.kw_tests, 'kw', test_unicode=0)
#
#
# def testUnicodeKeywords(self):
# """ ascii query keyword index """
# self._doTests(self.kw_tests, 'kw', test_unicode=1)
def
test_suite
():
return
unittest
.
makeSuite
(
UnicodeTextIndexCatalogTest
)
def
main
():
unittest
.
TextTestRunner
().
run
(
test_suite
())
if
__name__
==
'__main__'
:
main
()
src/Products/ZCatalog/regressiontests/unittest_patched.py
deleted
100755 → 0
View file @
6a406941
#!/usr/bin/env python
"""
Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's
Smalltalk testing framework.
Further information is available in the bundled documentation, and from
http://pyunit.sourceforge.net/
This module contains the core framework classes that form the basis of
specific test cases and suites (TestCase, TestSuite etc.), and also a
text-based utility class for running the tests and reporting the results
(TextTestRunner).
Copyright (c) 1999, 2000, 2001 Zope Foundation and Contributors
This module is free software, and you may redistribute it and/or modify
it under the same terms as Python itself, so long as this copyright message
and disclaimer are retained in their original form.
IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
"""
# This is patched version of unittest.py and allows to pass additional
# parameters to the TestCase constructor.
# This special version is only need to run the regression test
# in testCatalog.py#
#
# ajung
__author__
=
"Steve Purcell"
__email__
=
"stephen_purcell@yahoo.com"
__version__
=
"$Revision: 1.3 $"
[
11
:
-
2
]
import
time
import
sys
import
traceback
import
string
import
os
##############################################################################
# A platform-specific concession to help the code work for JPython users
##############################################################################
plat
=
string
.
lower
(
sys
.
platform
)
_isJPython
=
string
.
find
(
plat
,
'java'
)
>=
0
or
string
.
find
(
plat
,
'jdk'
)
>=
0
del
plat
##############################################################################
# Test framework core
##############################################################################
class
TestResult
:
"""Holder for test result information.
Test results are automatically managed by the TestCase and TestSuite
classes, and do not need to be explicitly manipulated by writers of tests.
Each instance holds the total number of tests run, and collections of
failures and errors that occurred among those test runs. The collections
contain tuples of (testcase, exceptioninfo), where exceptioninfo is a
tuple of values as returned by sys.exc_info().
"""
def
__init__
(
self
,
args
=
(),
kw
=
{}):
self
.
failures
=
[]
self
.
errors
=
[]
self
.
testsRun
=
0
self
.
shouldStop
=
0
self
.
__args
=
args
self
.
__kw
=
kw
def
startTest
(
self
,
test
):
"Called when the given test is about to be run"
self
.
testsRun
=
self
.
testsRun
+
1
def
stopTest
(
self
,
test
):
"Called when the given test has been run"
pass
def
addError
(
self
,
test
,
err
):
"Called when an error has occurred"
self
.
errors
.
append
((
test
,
err
))
def
addFailure
(
self
,
test
,
err
):
"Called when a failure has occurred"
self
.
failures
.
append
((
test
,
err
))
def
wasSuccessful
(
self
):
"Tells whether or not this result was a success"
return
len
(
self
.
failures
)
==
len
(
self
.
errors
)
==
0
def
stop
(
self
):
"Indicates that the tests should be aborted"
self
.
shouldStop
=
1
def
__repr__
(
self
):
return
"<%s run=%i errors=%i failures=%i>"
%
\
(
self
.
__class__
,
self
.
testsRun
,
len
(
self
.
errors
),
len
(
self
.
failures
))
class
TestCase
:
"""A class whose instances are single test cases.
Test authors should subclass TestCase for their own tests. Construction
and deconstruction of the test's environment ('fixture') can be
implemented by overriding the 'setUp' and 'tearDown' methods respectively.
By default, the test code itself should be placed in a method named
'runTest'.
If the fixture may be used for many test cases, create as
many test methods as are needed. When instantiating such a TestCase
subclass, specify in the constructor arguments the name of the test method
that the instance is to execute.
If it is necessary to override the __init__ method, the base class
__init__ method must always be called.
"""
def
__init__
(
self
,
methodName
=
'runTest'
,
*
args
,
**
kw
):
"""Create an instance of the class that will use the named test
method when executed. Raises a ValueError if the instance does
not have a method with the specified name.
"""
try
:
self
.
__testMethodName
=
methodName
testMethod
=
getattr
(
self
,
methodName
)
self
.
__testMethodDoc
=
testMethod
.
__doc__
except
AttributeError
:
raise
ValueError
,
"no such test method in %s: %s"
%
\
(
self
.
__class__
,
methodName
)
self
.
__args
=
args
self
.
__kw
=
kw
def
setUp
(
self
):
"Hook method for setting up the test fixture before exercising it."
pass
def
tearDown
(
self
):
"Hook method for deconstructing the test fixture after testing it."
pass
def
countTestCases
(
self
):
return
1
def
defaultTestResult
(
self
):
return
TestResult
(
self
.
__args
,
self
.
__kw
)
def
shortDescription
(
self
):
"""Returns a one-line description of the test, or None if no
description has been provided.
The default implementation of this method returns the first line of
the specified test method's docstring.
"""
doc
=
self
.
__testMethodDoc
return
doc
and
string
.
strip
(
string
.
split
(
doc
,
"
\
n
"
)[
0
])
or
None
def
id
(
self
):
return
"%s.%s"
%
(
self
.
__class__
,
self
.
__testMethodName
)
def
__str__
(
self
):
return
"%s (%s)"
%
(
self
.
__testMethodName
,
self
.
__class__
)
def
__repr__
(
self
):
return
"<%s testMethod=%s>"
%
\
(
self
.
__class__
,
self
.
__testMethodName
)
def
run
(
self
,
result
=
None
):
return
self
(
result
)
def
__call__
(
self
,
result
=
None
):
if
result
is
None
:
result
=
self
.
defaultTestResult
()
result
.
startTest
(
self
)
testMethod
=
getattr
(
self
,
self
.
__testMethodName
)
try
:
try
:
self
.
setUp
()
except
:
result
.
addError
(
self
,
self
.
__exc_info
())
return
try
:
apply
(
testMethod
,
self
.
__args
,
self
.
__kw
)
except
AssertionError
,
e
:
result
.
addFailure
(
self
,
self
.
__exc_info
())
except
:
result
.
addError
(
self
,
self
.
__exc_info
())
try
:
self
.
tearDown
()
except
:
result
.
addError
(
self
,
self
.
__exc_info
())
finally
:
result
.
stopTest
(
self
)
def
debug
(
self
):
"""Run the test without collecting errors in a TestResult"""
self
.
setUp
()
getattr
(
self
,
self
.
__testMethodName
)()
self
.
tearDown
()
def
assert_
(
self
,
expr
,
msg
=
None
):
"""Equivalent of built-in 'assert', but is not optimised out when
__debug__ is false.
"""
if
not
expr
:
raise
AssertionError
,
msg
failUnless
=
assert_
def
failIf
(
self
,
expr
,
msg
=
None
):
"Fail the test if the expression is true."
apply
(
self
.
assert_
,(
not
expr
,
msg
))
def
assertRaises
(
self
,
excClass
,
callableObj
,
*
args
,
**
kwargs
):
"""Assert that an exception of class excClass is thrown
by callableObj when invoked with arguments args and keyword
arguments kwargs. If a different type of exception is
thrown, it will not be caught, and the test case will be
deemed to have suffered an error, exactly as for an
unexpected exception.
"""
try
:
apply
(
callableObj
,
args
,
kwargs
)
except
excClass
:
return
else
:
if
hasattr
(
excClass
,
'__name__'
):
excName
=
excClass
.
__name__
else
:
excName
=
str
(
excClass
)
raise
AssertionError
,
excName
def
assertEqual
(
self
,
first
,
second
,
msg
=
None
):
"""Assert that the two objects are equal as determined by the '=='
operator.
"""
self
.
assert_
((
first
==
second
),
msg
or
'%s != %s'
%
(
first
,
second
))
def
fail
(
self
,
msg
=
None
):
"""Fail immediately, with the given message."""
raise
AssertionError
,
msg
def
__exc_info
(
self
):
"""Return a version of sys.exc_info() with the traceback frame
minimised; usually the top level of the traceback frame is not
needed.
"""
exctype
,
excvalue
,
tb
=
sys
.
exc_info
()
newtb
=
tb
.
tb_next
if
newtb
is
None
:
return
(
exctype
,
excvalue
,
tb
)
return
(
exctype
,
excvalue
,
newtb
)
class
TestSuite
:
"""A test suite is a composite test consisting of a number of TestCases.
For use, create an instance of TestSuite, then add test case instances.
When all tests have been added, the suite can be passed to a test
runner, such as TextTestRunner. It will run the individual test cases
in the order in which they were added, aggregating the results. When
subclassing, do not forget to call the base class constructor.
"""
def
__init__
(
self
,
tests
=
()):
self
.
_tests
=
[]
self
.
addTests
(
tests
)
def
__repr__
(
self
):
return
"<%s tests=%s>"
%
(
self
.
__class__
,
self
.
_tests
)
__str__
=
__repr__
def
countTestCases
(
self
):
cases
=
0
for
test
in
self
.
_tests
:
cases
=
cases
+
test
.
countTestCases
()
return
cases
def
addTest
(
self
,
test
):
self
.
_tests
.
append
(
test
)
def
addTests
(
self
,
tests
):
for
test
in
tests
:
self
.
addTest
(
test
)
def
run
(
self
,
result
):
return
self
(
result
)
def
__call__
(
self
,
result
):
for
test
in
self
.
_tests
:
if
result
.
shouldStop
:
break
test
(
result
)
return
result
def
debug
(
self
):
"""Run the tests without collecting errors in a TestResult"""
for
test
in
self
.
_tests
:
test
.
debug
()
class
FunctionTestCase
(
TestCase
):
"""A test case that wraps a test function.
This is useful for slipping pre-existing test functions into the
PyUnit framework. Optionally, set-up and tidy-up functions can be
supplied. As with TestCase, the tidy-up ('tearDown') function will
always be called if the set-up ('setUp') function ran successfully.
"""
def
__init__
(
self
,
testFunc
,
setUp
=
None
,
tearDown
=
None
,
description
=
None
):
TestCase
.
__init__
(
self
)
self
.
__setUpFunc
=
setUp
self
.
__tearDownFunc
=
tearDown
self
.
__testFunc
=
testFunc
self
.
__description
=
description
def
setUp
(
self
):
if
self
.
__setUpFunc
is
not
None
:
self
.
__setUpFunc
()
def
tearDown
(
self
):
if
self
.
__tearDownFunc
is
not
None
:
self
.
__tearDownFunc
()
def
runTest
(
self
):
self
.
__testFunc
()
def
id
(
self
):
return
self
.
__testFunc
.
__name__
def
__str__
(
self
):
return
"%s (%s)"
%
(
self
.
__class__
,
self
.
__testFunc
.
__name__
)
def
__repr__
(
self
):
return
"<%s testFunc=%s>"
%
(
self
.
__class__
,
self
.
__testFunc
)
def
shortDescription
(
self
):
if
self
.
__description
is
not
None
:
return
self
.
__description
doc
=
self
.
__testFunc
.
__doc__
return
doc
and
string
.
strip
(
string
.
split
(
doc
,
"
\
n
"
)[
0
])
or
None
##############################################################################
# Convenience functions
##############################################################################
def
getTestCaseNames
(
testCaseClass
,
prefix
,
sortUsing
=
cmp
):
"""Extracts all the names of functions in the given test case class
and its base classes that start with the given prefix. This is used
by makeSuite().
"""
testFnNames
=
filter
(
lambda
n
,
p
=
prefix
:
n
[:
len
(
p
)]
==
p
,
dir
(
testCaseClass
))
for
baseclass
in
testCaseClass
.
__bases__
:
testFnNames
=
testFnNames
+
\
getTestCaseNames
(
baseclass
,
prefix
,
sortUsing
=
None
)
if
sortUsing
:
testFnNames
.
sort
(
sortUsing
)
return
testFnNames
def
makeSuite
(
testCaseClass
,
prefix
=
'test'
,
sortUsing
=
cmp
,
suiteClass
=
TestSuite
):
"""Returns a TestSuite instance built from all of the test functions
in the given test case class whose names begin with the given
prefix. The cases are sorted by their function names
using the supplied comparison function, which defaults to 'cmp'.
"""
cases
=
map
(
testCaseClass
,
getTestCaseNames
(
testCaseClass
,
prefix
,
sortUsing
))
return
suiteClass
(
cases
)
def
findTestCases
(
module
,
prefix
=
'test'
,
sortUsing
=
cmp
,
suiteClass
=
TestSuite
):
import
types
tests
=
[]
for
name
in
dir
(
module
):
obj
=
getattr
(
module
,
name
)
if
type
(
obj
)
==
types
.
ClassType
and
issubclass
(
obj
,
TestCase
):
tests
.
append
(
makeSuite
(
obj
,
prefix
=
prefix
,
sortUsing
=
sortUsing
,
suiteClass
=
suiteClass
))
return
suiteClass
(
tests
)
def
createTestInstance
(
name
,
module
=
None
,
suiteClass
=
TestSuite
):
"""Finds tests by their name, optionally only within the given module.
Return the newly-constructed test, ready to run. If the name contains a ':'
then the portion of the name after the colon is used to find a specific
test case within the test case class named before the colon.
Examples:
findTest('examples.listtests.suite')
-- returns result of calling 'suite'
findTest('examples.listtests.ListTestCase:checkAppend')
-- returns result of calling ListTestCase('checkAppend')
findTest('examples.listtests.ListTestCase:check-')
-- returns result of calling makeSuite(ListTestCase, prefix="check")
"""
spec
=
string
.
split
(
name
,
':'
)
if
len
(
spec
)
>
2
:
raise
ValueError
,
"illegal test name: %s"
%
name
if
len
(
spec
)
==
1
:
testName
=
spec
[
0
]
caseName
=
None
else
:
testName
,
caseName
=
spec
parts
=
string
.
split
(
testName
,
'.'
)
if
module
is
None
:
if
len
(
parts
)
<
2
:
raise
ValueError
,
"incomplete test name: %s"
%
name
constructor
=
__import__
(
string
.
join
(
parts
[:
-
1
],
'.'
))
parts
=
parts
[
1
:]
else
:
constructor
=
module
for
part
in
parts
:
constructor
=
getattr
(
constructor
,
part
)
if
not
callable
(
constructor
):
raise
ValueError
,
"%s is not a callable object"
%
constructor
if
caseName
:
if
caseName
[
-
1
]
==
'-'
:
prefix
=
caseName
[:
-
1
]
if
not
prefix
:
raise
ValueError
,
"prefix too short: %s"
%
name
test
=
makeSuite
(
constructor
,
prefix
=
prefix
,
suiteClass
=
suiteClass
)
else
:
test
=
constructor
(
caseName
)
else
:
test
=
constructor
()
if
not
hasattr
(
test
,
"countTestCases"
):
raise
TypeError
,
\
"object %s found with spec %s is not a test"
%
(
test
,
name
)
return
test
##############################################################################
# Text UI
##############################################################################
class
_WritelnDecorator
:
"""Used to decorate file-like objects with a handy 'writeln' method"""
def
__init__
(
self
,
stream
):
self
.
stream
=
stream
if
_isJPython
:
import
java.lang.System
self
.
linesep
=
java
.
lang
.
System
.
getProperty
(
"line.separator"
)
else
:
self
.
linesep
=
os
.
linesep
def
__getattr__
(
self
,
attr
):
return
getattr
(
self
.
stream
,
attr
)
def
writeln
(
self
,
*
args
):
if
args
:
apply
(
self
.
write
,
args
)
self
.
write
(
self
.
linesep
)
class
_JUnitTextTestResult
(
TestResult
):
"""A test result class that can print formatted text results to a stream.
Used by JUnitTextTestRunner.
"""
def
__init__
(
self
,
stream
):
self
.
stream
=
stream
TestResult
.
__init__
(
self
)
def
addError
(
self
,
test
,
error
):
TestResult
.
addError
(
self
,
test
,
error
)
self
.
stream
.
write
(
'E'
)
self
.
stream
.
flush
()
if
error
[
0
]
is
KeyboardInterrupt
:
self
.
shouldStop
=
1
def
addFailure
(
self
,
test
,
error
):
TestResult
.
addFailure
(
self
,
test
,
error
)
self
.
stream
.
write
(
'F'
)
self
.
stream
.
flush
()
def
startTest
(
self
,
test
):
TestResult
.
startTest
(
self
,
test
)
self
.
stream
.
write
(
'.'
)
self
.
stream
.
flush
()
def
printNumberedErrors
(
self
,
errFlavour
,
errors
):
if
not
errors
:
return
if
len
(
errors
)
==
1
:
self
.
stream
.
writeln
(
"There was 1 %s:"
%
errFlavour
)
else
:
self
.
stream
.
writeln
(
"There were %i %ss:"
%
(
len
(
errors
),
errFlavour
))
i
=
1
for
test
,
error
in
errors
:
errString
=
string
.
join
(
apply
(
traceback
.
format_exception
,
error
),
""
)
self
.
stream
.
writeln
(
"%i) %s"
%
(
i
,
test
))
self
.
stream
.
writeln
(
errString
)
i
=
i
+
1
def
printErrors
(
self
):
self
.
printNumberedErrors
(
"error"
,
self
.
errors
)
def
printFailures
(
self
):
self
.
printNumberedErrors
(
"failure"
,
self
.
failures
)
def
printHeader
(
self
):
self
.
stream
.
writeln
()
if
self
.
wasSuccessful
():
self
.
stream
.
writeln
(
"OK (%i tests)"
%
self
.
testsRun
)
else
:
self
.
stream
.
writeln
(
"!!!FAILURES!!!"
)
self
.
stream
.
writeln
(
"Test Results"
)
self
.
stream
.
writeln
()
self
.
stream
.
writeln
(
"Run: %i ; Failures: %i ; Errors: %i"
%
(
self
.
testsRun
,
len
(
self
.
failures
),
len
(
self
.
errors
)))
def
printResult
(
self
):
self
.
printHeader
()
self
.
printErrors
()
self
.
printFailures
()
class
JUnitTextTestRunner
:
"""A test runner class that displays results in textual form.
The display format approximates that of JUnit's 'textui' test runner.
This test runner may be removed in a future version of PyUnit.
"""
def
__init__
(
self
,
stream
=
sys
.
stderr
):
self
.
stream
=
_WritelnDecorator
(
stream
)
def
run
(
self
,
test
):
"Run the given test case or test suite."
result
=
_JUnitTextTestResult
(
self
.
stream
)
startTime
=
time
.
time
()
test
(
result
)
stopTime
=
time
.
time
()
self
.
stream
.
writeln
()
self
.
stream
.
writeln
(
"Time: %.3fs"
%
float
(
stopTime
-
startTime
))
result
.
printResult
()
return
result
##############################################################################
# Verbose text UI
##############################################################################
class
_VerboseTextTestResult
(
TestResult
):
"""A test result class that can print formatted text results to a stream.
Used by VerboseTextTestRunner.
"""
def
__init__
(
self
,
stream
,
descriptions
):
TestResult
.
__init__
(
self
)
self
.
stream
=
stream
self
.
lastFailure
=
None
self
.
descriptions
=
descriptions
def
startTest
(
self
,
test
):
TestResult
.
startTest
(
self
,
test
)
if
self
.
descriptions
:
self
.
stream
.
write
(
test
.
shortDescription
()
or
str
(
test
))
else
:
self
.
stream
.
write
(
str
(
test
))
self
.
stream
.
write
(
" ... "
)
def
stopTest
(
self
,
test
):
TestResult
.
stopTest
(
self
,
test
)
if
self
.
lastFailure
is
not
test
:
self
.
stream
.
writeln
(
"ok"
)
def
addError
(
self
,
test
,
err
):
TestResult
.
addError
(
self
,
test
,
err
)
self
.
_printError
(
"ERROR"
,
test
,
err
)
self
.
lastFailure
=
test
if
err
[
0
]
is
KeyboardInterrupt
:
self
.
shouldStop
=
1
def
addFailure
(
self
,
test
,
err
):
TestResult
.
addFailure
(
self
,
test
,
err
)
self
.
_printError
(
"FAIL"
,
test
,
err
)
self
.
lastFailure
=
test
def
_printError
(
self
,
flavour
,
test
,
err
):
errLines
=
[]
separator1
=
"
\
t
"
+
'='
*
70
separator2
=
"
\
t
"
+
'-'
*
70
if
not
self
.
lastFailure
is
test
:
self
.
stream
.
writeln
()
self
.
stream
.
writeln
(
separator1
)
self
.
stream
.
writeln
(
"
\
t
%s"
%
flavour
)
self
.
stream
.
writeln
(
separator2
)
for
line
in
apply
(
traceback
.
format_exception
,
err
):
for
l
in
string
.
split
(
line
,
"
\
n
"
)[:
-
1
]:
self
.
stream
.
writeln
(
"
\
t
%s"
%
l
)
self
.
stream
.
writeln
(
separator1
)
class
VerboseTextTestRunner
:
"""A test runner class that displays results in textual form.
It prints out the names of tests as they are run, errors as they
occur, and a summary of the results at the end of the test run.
"""
def
__init__
(
self
,
stream
=
sys
.
stderr
,
descriptions
=
1
):
self
.
stream
=
_WritelnDecorator
(
stream
)
self
.
descriptions
=
descriptions
def
run
(
self
,
test
):
"Run the given test case or test suite."
result
=
_VerboseTextTestResult
(
self
.
stream
,
self
.
descriptions
)
startTime
=
time
.
time
()
test
(
result
)
stopTime
=
time
.
time
()
timeTaken
=
float
(
stopTime
-
startTime
)
self
.
stream
.
writeln
(
"-"
*
78
)
run
=
result
.
testsRun
self
.
stream
.
writeln
(
"Ran %d test%s in %.3fs"
%
(
run
,
run
>
1
and
"s"
or
""
,
timeTaken
))
self
.
stream
.
writeln
()
if
not
result
.
wasSuccessful
():
self
.
stream
.
write
(
"FAILED ("
)
failed
,
errored
=
map
(
len
,
(
result
.
failures
,
result
.
errors
))
if
failed
:
self
.
stream
.
write
(
"failures=%d"
%
failed
)
if
errored
:
if
failed
:
self
.
stream
.
write
(
", "
)
self
.
stream
.
write
(
"errors=%d"
%
errored
)
self
.
stream
.
writeln
(
")"
)
else
:
self
.
stream
.
writeln
(
"OK"
)
return
result
# Which flavour of TextTestRunner is the default?
TextTestRunner
=
VerboseTextTestRunner
##############################################################################
# Facilities for running tests from the command line
##############################################################################
class
TestProgram
:
"""A command-line program that runs a set of tests; this is primarily
for making test modules conveniently executable.
"""
USAGE
=
"""
\
Usage: %(progName)s [-h|--help] [test[:(casename|prefix-)]] [...]
Examples:
%(progName)s - run default set of tests
%(progName)s MyTestSuite - run suite 'MyTestSuite'
%(progName)s MyTestCase:checkSomething - run MyTestCase.checkSomething
%(progName)s MyTestCase:check- - run all 'check*' test methods
in MyTestCase
"""
def
__init__
(
self
,
module
=
'__main__'
,
defaultTest
=
None
,
argv
=
None
,
testRunner
=
None
,
suiteClass
=
TestSuite
):
if
type
(
module
)
==
type
(
''
):
self
.
module
=
__import__
(
module
)
for
part
in
string
.
split
(
module
,
'.'
)[
1
:]:
self
.
module
=
getattr
(
self
.
module
,
part
)
else
:
self
.
module
=
module
if
argv
is
None
:
argv
=
sys
.
argv
self
.
defaultTest
=
defaultTest
self
.
testRunner
=
testRunner
self
.
suiteClass
=
suiteClass
self
.
progName
=
os
.
path
.
basename
(
argv
[
0
])
self
.
parseArgs
(
argv
)
self
.
runTests
()
def
usageExit
(
self
,
msg
=
None
):
if
msg
:
print
msg
print
self
.
USAGE
%
self
.
__dict__
sys
.
exit
(
2
)
def
parseArgs
(
self
,
argv
):
import
getopt
try
:
options
,
args
=
getopt
.
getopt
(
argv
[
1
:],
'hH'
,
[
'help'
])
opts
=
{}
for
opt
,
value
in
options
:
if
opt
in
(
'-h'
,
'-H'
,
'--help'
):
self
.
usageExit
()
if
len
(
args
)
==
0
and
self
.
defaultTest
is
None
:
self
.
test
=
findTestCases
(
self
.
module
,
suiteClass
=
self
.
suiteClass
)
return
if
len
(
args
)
>
0
:
self
.
testNames
=
args
else
:
self
.
testNames
=
(
self
.
defaultTest
,)
self
.
createTests
()
except
getopt
.
error
,
msg
:
self
.
usageExit
(
msg
)
def
createTests
(
self
):
tests
=
[]
for
testName
in
self
.
testNames
:
tests
.
append
(
createTestInstance
(
testName
,
self
.
module
,
suiteClass
=
self
.
suiteClass
))
self
.
test
=
self
.
suiteClass
(
tests
)
def
runTests
(
self
):
if
self
.
testRunner
is
None
:
self
.
testRunner
=
TextTestRunner
()
result
=
self
.
testRunner
.
run
(
self
.
test
)
sys
.
exit
(
not
result
.
wasSuccessful
())
main
=
TestProgram
##############################################################################
# Executing this module from the command line
##############################################################################
if
__name__
==
"__main__"
:
main
(
module
=
None
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment