Commit dc2f0691 authored by Hanno Schlichting's avatar Hanno Schlichting

Factored out the `Products.ZSQLMethod` into its own distribution. The...

Factored out the `Products.ZSQLMethod` into its own distribution. The distribution also includes the `Shared.DC.ZRDB` code.
parent f4e591f7
...@@ -19,6 +19,7 @@ parts = ...@@ -19,6 +19,7 @@ parts =
sources-dir = develop sources-dir = develop
auto-checkout = auto-checkout =
DateTime DateTime
Products.ZSQLMethods
[test] [test]
...@@ -51,6 +52,7 @@ eggs = ...@@ -51,6 +52,7 @@ eggs =
MultiMapping MultiMapping
Persistence Persistence
Products.ZCTextIndex Products.ZCTextIndex
Products.ZSQLMethods
Record Record
RestrictedPython RestrictedPython
initgroups initgroups
......
...@@ -22,6 +22,9 @@ Bugs Fixed ...@@ -22,6 +22,9 @@ Bugs Fixed
Restructuring Restructuring
+++++++++++++ +++++++++++++
- Factored out the `Products.ZSQLMethod` into its own distribution. The
distribution also includes the `Shared.DC.ZRDB` code.
- Made both `Shared` and `Shared.DC` namespace packages. - Made both `Shared` and `Shared.DC` namespace packages.
- Removed fallback code for old Python versions from - Removed fallback code for old Python versions from
......
...@@ -48,6 +48,7 @@ setup(name='Zope2', ...@@ -48,6 +48,7 @@ setup(name='Zope2',
'MultiMapping', 'MultiMapping',
'Persistence', 'Persistence',
'Products.ZCTextIndex', 'Products.ZCTextIndex',
'Products.ZSQLMethods',
'Record', 'Record',
'RestrictedPython', 'RestrictedPython',
'ZConfig', 'ZConfig',
......
...@@ -10,6 +10,7 @@ MultiMapping = svn svn://svn.zope.org/repos/main/MultiMapping/trunk ...@@ -10,6 +10,7 @@ MultiMapping = svn svn://svn.zope.org/repos/main/MultiMapping/trunk
nt_svcutils = svn svn://svn.zope.org/repos/main/nt_svcutils/trunk nt_svcutils = svn svn://svn.zope.org/repos/main/nt_svcutils/trunk
Persistence = svn svn://svn.zope.org/repos/main/Persistence/trunk Persistence = svn svn://svn.zope.org/repos/main/Persistence/trunk
Products.ZCTextIndex = svn svn://svn.zope.org/repos/main/Products.ZCTextIndex/trunk Products.ZCTextIndex = svn svn://svn.zope.org/repos/main/Products.ZCTextIndex/trunk
Products.ZSQLMethods = svn svn://svn.zope.org/repos/main/Products.ZSQLMethods/trunk
Record = svn svn://svn.zope.org/repos/main/Record/trunk Record = svn svn://svn.zope.org/repos/main/Record/trunk
tempstorage = svn svn://svn.zope.org/repos/main/tempstorage/trunk tempstorage = svn svn://svn.zope.org/repos/main/tempstorage/trunk
zExceptions = svn svn://svn.zope.org/repos/main/zExceptions/trunk zExceptions = svn svn://svn.zope.org/repos/main/zExceptions/trunk
......
Aqueduct SQL Methods Changes
AqueductSQLMethods 1.2.1
Bugs Fixed
- The new Aqueduct.Results module was ommitted.
AqueductSQLMethods 1.2
Bugs Fixed
- Source changes were lost when resizing the editing screen.
- When testing, on DAs that use new DA protocol,
non-select SQL is reported as such.
Features Added
- Support for new Aqueduct DA protocol, providing much faster
retrievals.
- Edit screen size preferences are now distinct from Document
size preferences.
AqueductSQLMethods 1.1.5
Features Added
- Added permission settings to work with recent Principia versions.
Aqueduct SQL Methods 1.1.4
Bugs Fixed
- Generated reports failed in cases where column names contained
odd characters, as would be the case with::
select salary*1.2 from payrole
Aqueduct SQL Methods 1.1.3
Bugs Fixed
- Changed permission settings to be in line with principia 1.2
Aqueduct SQL Methods 1.1.2
Bugs Fixed
- If strings contained backslashes but not backslash-t or
backslash-n, then the backslashes were getting doubled.
Aqueduct SQL Methods 1.1.1
Bugs Fixed
- The SQL string quoting done by the sqlvar and sqltest tags
failed on non-ANSI-SQL databases like mySQL. Extra hooks
were added to allow database adapters to provide custom quoting
logic.
Aqueduct SQL Methods 1.1.0
Features
- New DTML tags are available in SQLMethods that greatly
improve the ease, safety, and flexibility of database methods.
- SQL Templates are now rendered in the context of the containing
folder, rather than the method itself.
- Input arguments are shown in input forms in the order that
they were defined, rather than in alphabetical order.
Aqueduct SQL Methods 1.0.5
Features Fixed
- Database methods are slightly better about binding themselves
to the correct object when acquired.
Aqueduct SQL Methods 1.0.4
Features Fixed
- RDBMS transaction boundaries now coincide with Principia
tranasaction boundaries. For example, if a Document
runs 3 database methods and the third method raises an error,
then the other methods are aborted.
Aqueduct SQL Methods 1.0.3
Bugs Fixed
- Add permissions were not editable.
- Methods on database records could not use acquired attributes.
AqueductSQLMethods 1.0.1
Bug Fixes
- Error reporting was broken in test mode.
Features
- Include generated SQL source in test output.
ZSQLMethods
The ZSQLMethods product provides support for SQL Method objects
which can be used in conjunction with any database adapter to
use relational database data from within the Zope environment.
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''SQL Methods
$Id$'''
__version__='$Revision: 1.21 $'[11:-2]
from AccessControl.class_init import InitializeClass
from AccessControl.Permissions import change_database_methods
from AccessControl.SecurityInfo import ClassSecurityInfo
from App.special_dtml import DTMLFile
from Shared.DC.ZRDB.DA import DA
def SQLConnectionIDs(self):
"""Find SQL database connections in the current folder and above
This function return a list of ids.
"""
ids={}
have_id=ids.has_key
StringType=type('')
while self is not None:
if hasattr(self, 'objectValues'):
for o in self.objectValues():
if (hasattr(o,'_isAnSQLConnection') and o._isAnSQLConnection
and hasattr(o,'id')):
id=o.id
if type(id) is not StringType: id=id()
if not have_id(id):
if hasattr(o,'title_and_id'): o=o.title_and_id()
else: o=id
ids[id]=id
if hasattr(self, 'aq_parent'): self=self.aq_parent
else: self=None
ids=map(lambda item: (item[1], item[0]), ids.items())
ids.sort()
return ids
manage_addZSQLMethodForm=DTMLFile('dtml/add', globals())
def manage_addZSQLMethod(self, id, title,
connection_id, arguments, template,
REQUEST=None, submit=None):
"""Add an SQL Method
The 'connection_id' argument is the id of a database connection
that resides in the current folder or in a folder above the
current folder. The database should understand SQL.
The 'arguments' argument is a string containing an arguments
specification, as would be given in the SQL method cration form.
The 'template' argument is a string containing the source for the
SQL Template.
"""
# Note - type checking is handled by _setObject and constructor.
self._setObject(id, SQL(id, title, connection_id, arguments, template))
if REQUEST is not None:
try: u=self.DestinationURL()
except: u=REQUEST['URL1']
if submit==" Add and Edit ":
u="%s/%s/manage_main" % (u,id)
elif submit==" Add and Test ":
u="%s/%s/manage_testForm" % (u,id)
else:
u=u+'/manage_main'
REQUEST.RESPONSE.redirect(u)
return ''
class SQL(DA):
"""SQL Database methods
SQL Database methods are used to access external SQL databases.
They support three important abstractions:
- Method
SQL Methods behave like methods of the folders they are
accessed in. In particular, they can be used from other
methods, like Documents, ExternalMethods, and even other SQL
Methods.
- Searchability
Database methods support the Searchable Object Interface.
Search interface wizards can be used to build user
interfaces to them. They can be used in joins and
unions. They provide meta-data about their input parameters
and result data.
For more information, see the searchable-object interface
specification.
- Containment
Database methods support URL traversal to access and invoke
methods on individual record objects. For example, suppose you
had an 'employees' database method that took a single argument
'employee_id'. Suppose that employees had a 'service_record'
method (defined in a record class or acquired from a
folder). The 'service_record' method could be accessed with a
URL like::
employees/employee_id/1234/service_record
"""
meta_type='Z SQL Method'
security = ClassSecurityInfo()
security.declareProtected(change_database_methods, 'manage')
security.declareProtected(change_database_methods, 'manage_main')
manage=manage_main=DTMLFile('dtml/edit', globals())
manage_main._setName('manage_main')
InitializeClass(SQL)
# install SQL.py
# install __init__.py
# install add.dtml
# install edit.dtml
# install CHANGES.txt
# package Aqueduct
# package AqueductDA
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""SQL Method Product.
$Id$
"""
import Shared.DC.ZRDB.Search, Shared.DC.ZRDB.Aqueduct, SQL
import Shared.DC.ZRDB.RDB
import Shared.DC.ZRDB.sqlvar, Shared.DC.ZRDB.sqlgroup, Shared.DC.ZRDB.sqltest
def initialize(context):
context.registerClass(
SQL.SQL,
permission='Add Database Methods',
constructors=(SQL.manage_addZSQLMethodForm, SQL.manage_addZSQLMethod),
icon='sqlmethod.gif',
# XXX: can this permission be removed?
permissions=('Open/Close Database Connections',),
legacy=(SQL.SQLConnectionIDs,)
)
context.registerClass(
meta_type='Z Search Interface',
permission='Add Documents, Images, and Files',
constructors=(Shared.DC.ZRDB.Search.addForm,
Shared.DC.ZRDB.Search.manage_addZSearch),
legacy=(Shared.DC.ZRDB.Search.ZQueryIds,)
)
context.registerHelp()
context.registerHelpTitle('Zope Help')
__module_aliases__=(
('Products.AqueductSQLMethods','Products.ZSQLMethods'),
('Aqueduct', Shared.DC.ZRDB),
('AqueductDA', Shared.DC.ZRDB),
('Products.AqueductSQLMethods.SQL', SQL),
('Aqueduct.Aqueduct', Shared.DC.ZRDB.Aqueduct),
('AqueductDA.DA', Shared.DC.ZRDB.DA),
('Aqueduct.RDB', Shared.DC.ZRDB.RDB),
('AqueductDA.sqlvar', Shared.DC.ZRDB.sqlvar),
('AqueductDA.sqltest', Shared.DC.ZRDB.sqltest),
('AqueductDA.sqlgroup', Shared.DC.ZRDB.sqlgroup),
)
<dtml-var manage_page_header>
<dtml-var "manage_form_title(this(), _,
form_title='Add SQL Method',
help_product='ZSQLMethods',
help_topic='Z-SQL-Method_Add.stx'
)">
<dtml-if SQLConnectionIDs>
<p class="form-help">
A SQL Method allows you to access a SQL database. For more information see
the <a href="http://www.zope.org/Documentation/Guides/ZSQL">Z SQL Methods
User's Guide</a>.
</p>
<p class="form-help">
In the form below <EM>connection id</EM> is the name of the SQL Database
Connection to use. <EM>Arguments</EM> is a list of variables which the
SQL Method accepts. <EM>Query template</EM> is a template of the SQL
statement which the SQL Method will execute.
</p>
<form action="manage_addZSQLMethod" method="post">
<table cellspacing="0" cellpadding="2" border="0">
<tr>
<td align="left" valign="top">
<div class="form-label">
Id
</div>
</td>
<td align="left" valign="top">
<input type="text" name="id" size="40" value="" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-optional">
Title
</div>
</td>
<td align="left" valign="top">
<input type="text" name="title" size="40" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Connection Id
</div>
</td>
<td align="left" valign="top">
<div class="form-element">
<select name="connection_id">
<dtml-in SQLConnectionIDs>
<option value="&dtml-sequence-item;">
&dtml-sequence-key;</option>
</dtml-in>
</select>
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-optional">
Arguments
</div>
</td>
<td align="left" valign="top">
<textarea name="arguments" cols="40" rows="4"></textarea>
</td>
</tr>
<tr>
<td align="left" valign="top" colspan="2">
<span class="form-label">Query Template</span>
<br />
<div style="width: 100%;">
<textarea style="width: 100%;" name="template:text" rows="9" cols="60"
wrap="off">select * from data</textarea></div>
</td>
</tr>
<tr>
<td align="left" valign="top" colspan="2">
<div class="form-element">
<input class="form-element" type="submit" name="submit"
value=" Add " />
<input class="form-element" type="submit" name="submit"
value=" Add and Edit " />
<input class="form-element" type="submit" name="submit"
value=" Add and Test " />
</div>
</td>
</tr>
</table>
</form>
<dtml-else>
<p class="form-text">
There are no SQL database connections. You need to add a Zope
SQL database connection before you can create a Zope SQL Method.
</p>
</dtml-if>
<dtml-var manage_page_footer>
<dtml-var manage_page_header>
<dtml-var manage_tabs>
<dtml-if SQLConnectionIDs>
<form action="manage_edit" method="POST">
<table cellpadding="2" cellspacing="0" width="100%" border="0">
<tr>
<td align="left" valign="top">
<div class="form-optional">
Title
</div>
</td>
<td align="left" valign="top">
<input type="text" name="title" size="40" value="<dtml-if
title>&dtml-title;</dtml-if>">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Connection Id
</div>
</td>
<td align="left" valign="top">
<div class="form-element">
<select name="connection_id">
<dtml-in SQLConnectionIDs>
<option value="&dtml-sequence-item;"<dtml-if
expr="connection_id==_vars['sequence-item']">
selected</dtml-if>>
&dtml-sequence-key;</option>
</dtml-in>
</select>
<dtml-if connectionIsValid>
<dtml-if connected><dtml-else>
<p style="{color:red;}">
<strong>Warning:</strong>
The database connection used by this method is closed.
</p>
</dtml-if>
<dtml-else>
<p style="{color:red;}">
<strong>Warning:</strong>
The selected database connection (&dtml-connection_id;)
cannot be found!
</p>
</dtml-if>
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-optional">
Arguments
</div>
</td>
<td align="left" valign="top">
<textarea name="arguments" cols="40" rows="4">&dtml-arguments_src;</textarea>
</td>
</tr>
<tr>
<td align="left" valign="top" colspan="2">
<dtml-let cols="REQUEST.get('dtpref_cols', '100%')"
rows="REQUEST.get('dtpref_rows', '20')">
<dtml-if expr="cols[-1]=='%'">
<textarea name="template:text" wrap="off" style="width: &dtml-cols;;"
<dtml-else>
<textarea name="template:text" wrap="off" cols="&dtml-cols;"
</dtml-if>
rows="&dtml-rows;">&dtml-src;</textarea>
</dtml-let>
</td>
</tr>
<tr>
<td align="left" valign="top" colspan="2">
<div class="form-element">
<dtml-if wl_isLocked>
<em>Locked by WebDAV</em>
<dtml-else>
<input class="form-element" type="submit" name="SUBMIT"
value="Save Changes">
<input class="form-element" type="submit" name="SUBMIT"
value="Change and Test">
</dtml-if wl_isLocked>
<br />
<input class="form-element" type="submit" name="SUBMIT" value="Taller">
<input class="form-element" type="submit" name="SUBMIT" value="Shorter">
<input class="form-element" type="submit" name="SUBMIT" value="Wider">
<input class="form-element" type="submit" name="SUBMIT" value="Narrower">
</div>
</td>
</tr>
</table>
</form>
<dtml-else>
<p class="form-help">
There are no SQL database connections. You need to add a Zope SQL
database connection before you can edit a Zope SQL Method.
</p>
</dtml-if>
<dtml-var manage_page_footer>
Z SQL Method Object: Query relational databases with SQL
Description
Z SQL Methods allows you to access data in SQL databases.
Z SQL Methods define and call SQL statements on databases which
are defined by database adapter objects. To create a new Z SQL
Method you will need to setup a database adapter first. Every Z
SQL Method is directly linked to a database connection through a
database adapter.
**Note:** You must associate a Z SQL method with a database
adapter. If Zope cannot find a database adapter from your
location, you will not be able to create a Z SQL Method.
Z SQL Method - Add: Create new Z SQL Method
Description
This view allows you to create a new Z SQL Method.
Controls
'ID' -- Specifies the id of the sql method.
'Title' -- Specifies the title of the sql method.
'Connection id' -- Select list that specifies the database
connection on which the sql method operates.
**Note:** If you change the ID of your DB Connection object, all
the Z SQL Methods using this connection object will *NOT* be
updated. You have to update all Methods one by one.
'Arguments' -- Allows you to specify a list of arguments that the sql
method takes. The arguments should be separated by spaces or
a newline in the field. Furthermore, you can specify the
type of the argument as well as a default
value.
**Example:** 'title:string="No title!"', where 'title' is the
attribute, 'string' is the type of the attribute, and '"No
title!"' is the default value.
'Query template' --
Text area that specifies the SQL query that will execute when
this method is called. The query template can contain DTML
markup to tailor the SQL statement to the arguments. There are
three additional DTML tags called 'SQLVAR', 'SQLTEST', and
'SQLGROUP'. Please refer to their references get more details on
how to use them in your SQL statement.
**Note:** the SQL statement need not be a 'SELECT' query, it may
be any valid SQL statement including an INSERT or UPDATE.
**Important:** You can have several SQL statements in one ZSQL
Method.
Z SQL Method - Advanced: Manage advanced settings
Description
Manage the advanced settings such as caching and plugable
brains of a SQL method.
Controls
'ID' -- Indicates the id of the sql method.
'Title' -- Allows you to specify the title of the sql method.
'Connection Hook' -- Specifies the id of a method to be called when the database connection is made. This method will return a string containing the id of the database connection to use. If left blank there is no effect, the connection id specified will be used.
'Maximum number of rows retrieved' -- Specify an upper limit
for the number of rows a sql method can retrieve. Setting this
option correctly (based on your specific environment) can
increase the execution time of the method.
'Maximum number of results in the cache' -- Set the size of
the sql method cache.
'Maximum time (seconds) to cache results' -- The time in
seconds that results are cached. Setting to zero disables
caching.
Rows returned from this method can be turned into 'Result
objects' that are instances of a class you can specify below.
'Class name' -- Allows you to specify the name of the Python class.
'Class file' -- Allows you to specify the name of the Python
module that the class is defined in. The class file should
reside in the Zope 'Extensions' directory.
Z SQL Method - Edit: Change attributes of a SQL method
Description
This view allows you to edit the attributes of a SQL method.
Controls
'ID' -- Specifies the id of the sql method.
'Title' -- Specifies the title of the sql method.
'Connection id' -- Select list that specifies the database
connection on which the sql method operates.
**Note:** If you change the ID of your DB Connection object, all
the Z SQL Methods using this connection object will *NOT* be
updated. You have to update all Methods one by one.
'Arguments' -- Allows you to specify a list of arguments that the sql
method takes. The arguments should be separated by spaces or
a newline in the field. Furthermore, you can specify the
type of the argument as well as a default
value.
**Example:** 'title:string="No title!"', where 'title' is the
attribute, 'string' is the type of the attribute, and '"No
title!"' is the default value.
'Query template' --
Text area that specifies the SQL query that will execute when
this method is called. The query template can contain DTML
markup to tailor the SQL statement to the arguments. There are
three additional DTML tags called 'SQLVAR', 'SQLTEST', and
'SQLGROUP'. Please refer to their references get more details on
how to use them in your SQL statement.
**Note:** the SQL statement need not be a 'SELECT' query, it may
be any valid SQL statement including an INSERT or UPDATE.
**Important:** You can have several SQL statements in one ZSQL
Method.
Z SQL Method - Test: Test the validity of your SQL statement
Description
Test a SQL method.
Controls
'*[arument names]*' -- There will be one text field for each
argument specified for the SQL method. The text field will
contain the default value. **Important:** You must enter a
value matching the specified argument type, otherwise a "Type
Error" is returned.
'Submit Query' -- Causes the query to be executed and the
results are returned as an HTML table. If the query was had a
syntax error or the accessed table did not exist, a
database-specific error message is returned in form of an
exception.
If you have a SQL statement that does not return any data,
Zope will tell you that this SQL statement was not a
query and will not display any results.
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
def manage_addZSQLMethod(id, title,
connection_id, arguments, template):
"""
Add an SQL Method to an ObjectManager.
The 'connection_id' argument is the id of a database connection
that resides in the current folder or in a folder above the
current folder. The database should understand SQL.
The 'arguments' argument is a string containing an arguments
specification, as would be given in the SQL method cration form.
The 'template' argument is a string containing the source for the
SQL Template.
"""
class ZSQLMethod:
"""
ZSQLMethods abstract SQL code in Zope.
SQL Methods behave like methods of the folders they are
accessed in. In particular, they can be used from other
methods, like Documents, ExternalMethods, and even other SQL
Methods.
Database methods support the Searchable Object Interface.
Search interface wizards can be used to build user
interfaces to them. They can be used in joins and
unions. They provide meta-data about their input parameters
and result data.
For more information, see the searchable-object interface
specification.
Database methods support URL traversal to access and invoke
methods on individual record objects. For example, suppose you
had an 'employees' database method that took a single argument
'employee_id'. Suppose that employees had a 'service_record'
method (defined in a record class or acquired from a
folder). The 'service_record' method could be accessed with a
URL like::
employees/employee_id/1234/service_record
Search results are returned as Record objects. The schema of
a Record objects matches the schema of the table queried in
the search.
"""
__constructor__=manage_addZSQLMethod
def __call__(REQUEST=None, **kw):
"""
Call the ZSQLMethod.
The arguments to the method should be passed via keyword
arguments, or in a single mapping object. If no arguments are
given, and if the method was invoked through the Web, then the
method will try to acquire and use the Web REQUEST object as
the argument mapping.
The returned value is a sequence of record objects.
"""
def manage_edit(title,connection_id,arguments,template):
"""
Change database method properties.
The 'connection_id' argument is the id of a database
connection that resides in the current folder or in a folder
above the current folder. The database should understand SQL.
The 'arguments' argument is a string containing an arguments
specification, as would be given in the SQL method creation
form.
The 'template' argument is a string containing the source for
the SQL Template.
"""
import unittest
class SQLMethodTests(unittest.TestCase):
def _getTargetClass(self):
from Products.ZSQLMethods.SQL import SQL
return SQL
def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)
def test_class_conforms_to_IWriteLock(self):
from zope.interface.verify import verifyClass
from webdav.interfaces import IWriteLock
verifyClass(IWriteLock, self._getTargetClass())
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(SQLMethodTests),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''Shared classes and functions
$Id$'''
__version__='$Revision: 1.58 $'[11:-2]
import binascii
from cStringIO import StringIO
import os
import re
import string
from Acquisition import Implicit
from App.Common import package_home
from DateTime.DateTime import DateTime
from DocumentTemplate import File
from DocumentTemplate import HTML
from OFS.role import RoleManager
from OFS.SimpleItem import Item
from Persistence import Persistent
from zExceptions import Redirect
dtml_dir=os.path.join(package_home(globals()), 'dtml')
InvalidParameter='Invalid Parameter'
class BaseQuery(Persistent, Item, Implicit, RoleManager):
def query_year(self):
return self.query_date.year()
def query_month(self):
return self.query_date.month()
def query_day(self):
return self.query_date.day()
query_date = DateTime()
manage_options=()
def quoted_input(self):
return quotedHTML(self.input_src)
def quoted_report(self):
return quotedHTML(self.report_src)
MissingArgumentError = 'Bad Request'
def _convert(self):
self._arg=parse(self.arguments_src)
def _argdata(self, REQUEST):
r={}
try: args=self._arg
except:
self._convert()
args=self._arg
id=self.id
missing=[]
for name in args.keys():
idname="%s/%s" % (id, name)
try:
r[name]=REQUEST[idname]
except:
try: r[name]=REQUEST[name]
except:
arg=args[name]
try: r[name]=arg['default']
except:
try:
if not arg['optional']: missing.append(name)
except: missing.append(name)
# Note: the code above tries to check if an argument of the
# ZSQL method above has the "optional" flag set (in case the
# argument is omitted from the ZSQL function call). But there
# is neither corresponding code inside the parse() function to
# check for the "optional" parameter nor any documentation.
# So we omit the check for the optional parameter. There will
# be probably no code break but there will be hopefully more code
# to work as supposed to work.
# if missing:
# raise self.MissingArgumentError, \
# "The following arguments were omitted " \
# " from the ZSQL method call: %s" % str(missing)
#
return r
_col=None
_arg={}
class Searchable(BaseQuery):
def _searchable_arguments(self):
try: return self._arg
except:
self._convert()
return self._arg
def _searchable_result_columns(self): return self._col
def manage_testForm(self, REQUEST):
"""Provide testing interface"""
input_src=default_input_form(self.title_or_id(),
self._searchable_arguments(),
'manage_test')
return HTML(input_src)(self, REQUEST)
def manage_test(self, REQUEST):
'Perform an actual query'
result=self(REQUEST)
report=HTML(custom_default_report(self.id, result))
return apply(report,(self,REQUEST),{self.id:result})
def index_html(self, URL1):
" "
raise Redirect, ("%s/manage_testForm" % URL1)
class Composite:
def _getquery(self,id):
o=self
i=0
while 1:
__traceback_info__=o
q=getattr(o,id)
try:
if hasattr(q,'_searchable_arguments'):
try: q=q.__of__(self.aq_parent)
except: pass
return q
except: pass
if i > 100: raise AttributeError, id
i=i+1
o=o.aq_parent
def myQueryIds(self):
return map(
lambda k, queries=self.queries:
{'id': k, 'selected': k in queries},
self.ZQueryIds())
def default_input_form(id,arguments,action='query',
tabs=''):
if arguments:
items=arguments.items()
return (
"%s\n%s%s" % (
'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN" "http://www.w3.org/TR/REC-html40/loose.dtd">\n'
'<html lang="en"><head><title>%s Input Data</title></head>\n'
'<body bgcolor="#FFFFFF" link="#000099" vlink="#555555">\n%s\n'
'<form action="&dtml-URL2;/&dtml-id;/%s" '
'method="get">\n'
'<h2>%s Input Data</h2>\n'
'Enter query parameters:<br>'
'<table>\n'
% (id, tabs, action,id),
string.joinfields(
map(
lambda a:
('<tr> <th>%s</th>\n'
' <td><input name="%s"\n'
' size="30" value="%s">'
' </td></tr>'
% (nicify(a[0]),
(
a[1].has_key('type') and
("%s:%s" % (a[0],a[1]['type'])) or
a[0]
),
a[1].has_key('default') and a[1]['default'] or ''
))
, items
),
'\n'),
'\n<tr><td colspan=2 align=center>\n'
'<input type="SUBMIT" name="SUBMIT" value="Submit Query">\n'
'<dtml-if HTTP_REFERER>\n'
' <input type="SUBMIT" name="SUBMIT" value="Cancel">\n'
' <INPUT NAME="CANCEL_ACTION" TYPE="HIDDEN"\n'
' VALUE="&dtml-HTTP_REFERER;">\n'
'</dtml-if>\n'
'</td></tr>\n</table>\n</form>\n</body>\n</html>\n'
)
)
else:
return (
'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN" "http://www.w3.org/TR/REC-html40/loose.dtd">\n'
'<html lang="en"><head><title>%s Input Data</title></head>\n'
'<body bgcolor="#FFFFFF" link="#000099" vlink="#555555">\n%s\n'
'<form action="&dtml-URL2;/&dtml-id;/%s" '
'method="get">\n'
'<h2>%s Input Data</h2>\n'
'This query requires no input.<p>\n'
'<input type="SUBMIT" name="SUBMIT" value="Submit Query">\n'
'<dtml-if HTTP_REFERER>\n'
' <input type="SUBMIT" name="SUBMIT" value="Cancel">\n'
' <INPUT NAME="CANCEL_ACTION" TYPE="HIDDEN"\n'
' VALUE="&dtml-HTTP_REFERER;">\n'
'</dtml-if>\n'
'</td></tr>\n</table>\n</form>\n</body>\n</html>\n'
% (id, tabs, action, id)
)
custom_default_report_src=File(
os.path.join(dtml_dir,'customDefaultReport.dtml'))
custom_default_zpt_report_src=File(
os.path.join(dtml_dir,'customDefaultZPTReport.dtml'))
def custom_default_report(id, result, action='', no_table=0,
goofy=re.compile('\W').search
):
columns=result._searchable_result_columns()
__traceback_info__=columns
heading=('<tr>\n%s </tr>' %
string.joinfields(
map(lambda c:
' <th>%s</th>\n' % nicify(c['name']),
columns),
''
)
)
if no_table: tr, _tr, td, _td, delim = '<p>', '</p>', '', '', ',\n'
else: tr, _tr, td, _td, delim = '<tr>', '</tr>', '<td>', '</td>', '\n'
row=[]
for c in columns:
n=c['name']
if goofy(n) is not None:
n='expr="_[\'%s]"' % (`'"'+n`[2:])
row.append(' %s<dtml-var %s%s>%s'
% (td,n,c['type']!='s' and ' null=""' or '',_td))
row=(' %s\n%s\n %s' % (tr,string.joinfields(row,delim), _tr))
return custom_default_report_src(
id=id,heading=heading,row=row,action=action,no_table=no_table)
def custom_default_zpt_report(id, result, action='', no_table=0,
goofy=re.compile('\W').search
):
columns=result._searchable_result_columns()
__traceback_info__=columns
heading=('<tr>\n%s </tr>' %
string.joinfields(
map(lambda c:
' <th>%s</th>\n' % nicify(c['name']),
columns),
''
)
)
if no_table: tr, _tr, td, _td, delim = '<p>', '</p>', '', '', ',\n'
else: tr, _tr, td, _td, delim = '<tr>', '</tr>', '<td>', '</td>', '\n'
row=[]
for c in columns:
n=c['name']
# ugh! what the hell is goofy?
# if goofy(n) is not None:
# n='expr="_[\'%s]"' % (`'"'+n`[2:])
row.append(' %s<span tal:replace="result/%s">%s goes here</span>%s'
% (td,n,n,_td))
row=(' %s\n%s\n %s' % (tr,string.joinfields(row,delim), _tr))
return custom_default_zpt_report_src(
id=id, heading=heading, row=row, action=action, no_table=no_table)
def detypify(arg):
l=string.find(arg,':')
if l > 0: arg=arg[:l]
return arg
def decode(input,output):
while 1:
line = input.readline()
if not line: break
s = binascii.a2b_base64(line[:-1])
output.write(s)
def decodestring(s):
f = StringIO(s)
g = StringIO()
decode(f, g)
return g.getvalue()
class Args:
def __init__(self, data, keys):
self._data=data
self._keys=keys
def items(self):
return map(lambda k, d=self._data: (k,d[k]), self._keys)
def values(self):
return map(lambda k, d=self._data: d[k], self._keys)
def keys(self): return list(self._keys)
def has_key(self, key): return self._data.has_key(key)
def __getitem__(self, key): return self._data[key]
def __setitem__(self, key, v): self._data[key]=v
def __delitem__(self, key): del self._data[key]
def __len__(self): return len(self._data)
def parse(text,
result=None,
keys=None,
unparmre=re.compile(
r'([\000- ]*([^\000- ="]+))'),
parmre=re.compile(
r'([\000- ]*([^\000- ="]+)=([^\000- ="]+))'),
qparmre=re.compile(
r'([\000- ]*([^\000- ="]+)="([^"]*)")'),
):
if result is None:
result = {}
keys = []
__traceback_info__=text
mo = parmre.match(text)
if mo:
name = mo.group(2)
value = {'default':mo.group(3)}
l = len(mo.group(1))
else:
mo = qparmre.match(text)
if mo:
name = mo.group(2)
value = {'default':mo.group(3)}
l = len(mo.group(1))
else:
mo = unparmre.match(text)
if mo:
name = mo.group(2)
value = {}
l = len(mo.group(1))
else:
if not text or not text.strip(): return Args(result,keys)
raise InvalidParameter, text
lt=name.find(':')
if lt > 0:
value['type']=name[lt+1:]
name=name[:lt]
result[name]=value
keys.append(name)
return parse(text[l:],result,keys)
def quotedHTML(text,
character_entities=(
('&', '&amp;'),
("<", '&lt;' ),
(">", '&gt;' ),
('"', '&quot;'))): #"
for re,name in character_entities:
text=string.replace(text,re,name)
return text
def nicify(name):
name=string.replace(string.strip(name), '_',' ')
return string.upper(name[:1])+name[1:]
def decapitate(html, RESPONSE=None,
header_re=re.compile(
r'(('
r'[^\000- <>:]+:[^\n]*\n'
r'|'
r'[ \011]+[^\000- ][^\n]*\n'
r')+)[ \t]*\n([\000-\377]+)'
), # please kill me now
space_re=re.compile(r'([ \t]+)'),
name_re=re.compile(r'([^\000- <>:]+):([^\n]*)'),
):
mo = header_re.match(html)
if mo is None: return html
headers, html = mo.group(1,3)
headers=string.split(headers,'\n')
i=1
while i < len(headers):
if not headers[i]:
del headers[i]
else:
mo = space_re.match(headers[i])
if mo:
headers[i-1]="%s %s" % (headers[i-1],
headers[i][len(mo.group(1)):])
del headers[i]
else:
i=i+1
for i in range(len(headers)):
mo = name_re.match(headers[i])
if mo:
k,v = mo.group(1,2)
v=string.strip(v)
else:
raise ValueError, 'Invalid Header (%d): %s ' % (i,headers[i])
RESPONSE.setHeader(k,v)
return html
def delimited_output(results,REQUEST,RESPONSE):
delim=REQUEST['output-delimiter']
try: output_type=REQUEST['output-type']
except: output_type='text/plain'
RESPONSE.setHeader('content-type', output_type)
join=string.join
return "%s\n%s\n" % (
join(results.names(),delim),
join(map(lambda row, delim=delim, join=join:
join(map(str,row),delim),
results),
'\n')
)
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''Generic Database Connection Support
$Id$'''
__version__='$Revision: 1.39 $'[11:-2]
from cgi import escape
from cStringIO import StringIO
from logging import getLogger
import string
import sys
from AccessControl.class_init import InitializeClass
from AccessControl.Permissions import view_management_screens
from AccessControl.Permissions import change_database_connections
from AccessControl.Permissions import test_database_connections
from AccessControl.Permissions import open_close_database_connection
from AccessControl.SecurityInfo import ClassSecurityInfo
from Acquisition import Implicit
from App.Dialogs import MessageDialog
from App.special_dtml import DTMLFile
from DateTime.DateTime import DateTime
from DocumentTemplate import HTML
from OFS.role import RoleManager
from OFS.SimpleItem import Item
from Persistence import Persistent
from zExceptions import BadRequest
from Aqueduct import custom_default_report
import RDB
from Results import Results
LOG = getLogger('ZRDB.Connection')
class Connection(Persistent,
RoleManager,
Item,
Implicit,
):
security = ClassSecurityInfo()
# Specify definitions for tabs:
manage_options=(
(
{'label':'Status', 'action':'manage_main'},
{'label':'Properties', 'action':'manage_properties'},
{'label':'Test', 'action':'manage_testForm'},
)
+ RoleManager.manage_options
+ Item.manage_options
)
_v_connected=''
connection_string=''
def __init__(self, id, title, connection_string, check=None):
self.id=str(id)
self.edit(title, connection_string, check)
def __setstate__(self, state):
Persistent.__setstate__(self, state)
if self.connection_string:
try: self.connect(self.connection_string)
except:
LOG.error('Error connecting to relational database.',
exc_info=True)
def title_and_id(self):
s=Connection.inheritedAttribute('title_and_id')(self)
if hasattr(self, '_v_connected') and self._v_connected:
s="%s, which is connected" % s
else:
s="%s, which is <font color=red> not connected</font>" % s
return s
def title_or_id(self):
s=Connection.inheritedAttribute('title_or_id')(self)
if hasattr(self, '_v_connected') and self._v_connected:
s="%s (connected)" % s
else:
s="%s (<font color=red> not connected</font>)" % s
return s
def connected(self): return self._v_connected
def edit(self, title, connection_string, check=1):
self.title=title
self.connection_string=connection_string
if check: self.connect(connection_string)
manage_properties=DTMLFile('dtml/connectionEdit', globals())
security.declareProtected(change_database_connections, 'manage_edit')
def manage_edit(self, title, connection_string, check=None, REQUEST=None):
"""Change connection
"""
self.edit(title, connection_string, check)
if REQUEST is not None:
return MessageDialog(
title='Edited',
message='<strong>%s</strong> has been edited.' % escape(self.id),
action ='./manage_main',
)
security.declareProtected(test_database_connections, 'manage_testForm')
manage_testForm=DTMLFile('dtml/connectionTestForm', globals())
security.declareProtected(test_database_connections, 'manage_test')
def manage_test(self, query, REQUEST=None):
"Executes the SQL in parameter 'query' and returns results"
dbc=self() #get our connection
res=dbc.query(query)
if type(res) is type(''):
f=StringIO()
f.write(res)
f.seek(0)
result=RDB.File(f)
else:
result=Results(res)
if REQUEST is None:
return result #return unadulterated result objects
if result._searchable_result_columns():
r=custom_default_report(self.id, result)
else:
r='This statement returned no results.'
report = HTML(
'<html><body bgcolor="#ffffff" link="#000099" vlink="#555555">\n'
'<dtml-var name="manage_tabs">\n<hr>\n%s\n\n'
'<hr><h4>SQL Used:</strong><br>\n<pre>\n%s\n</pre>\n<hr>\n'
'</body></html>'
% (r, query))
report = apply(report,(self,REQUEST),{self.id:result})
return report
security.declareProtected(view_management_screens, 'manage_main')
manage_main=DTMLFile('dtml/connectionStatus', globals())
security.declareProtected(open_close_database_connection,
'manage_close_connection')
def manage_close_connection(self, REQUEST=None):
" "
try:
if hasattr(self,'_v_database_connection'):
self._v_database_connection.close()
except:
LOG.error('Error closing relational database connection.',
exc_info=True)
self._v_connected=''
if REQUEST is not None:
return self.manage_main(self, REQUEST)
security.declareProtected(open_close_database_connection,
'manage_open_connection')
def manage_open_connection(self, REQUEST=None):
" "
self.connect(self.connection_string)
return self.manage_main(self, REQUEST)
def __call__(self, v=None):
try: return self._v_database_connection
except AttributeError:
s=self.connection_string
if s:
self.connect(s)
return self._v_database_connection
raise BadRequest,(
'''The database connection is not connected''')
def connect(self,s):
self.manage_close_connection()
DB=self.factory()
try:
try:
self._v_database_connection=DB(s)
except:
t, v, tb = sys.exc_info()
raise BadRequest, (
'<strong>Error connecting to DB.</strong><br>\n'
'<!--\n%s\n%s\n-->\n'
% (t,v)), tb
finally: tb=None
self._v_connected=DateTime()
return self
def sql_quote__(self, v):
if string.find(v,"\'") >= 0:
v = string.join(string.split(v,"\'"),"''")
return "'%s'" % v
InitializeClass(Connection)
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''Generic Database adapter'''
__version__='$Revision: 1.116 $'[11:-2]
from cStringIO import StringIO
import re
import string
import sys
from time import time
from AccessControl.class_init import InitializeClass
from AccessControl.Permissions import change_database_methods
from AccessControl.Permissions import use_database_methods
from AccessControl.Permissions import view_management_screens
from AccessControl.SecurityInfo import ClassSecurityInfo
from AccessControl.SecurityManagement import getSecurityManager
from Acquisition import Implicit
from App.Extensions import getBrain
from App.special_dtml import DTMLFile
from DocumentTemplate import HTML
from DocumentTemplate.html_quote import html_quote
from DocumentTemplate.security import RestrictedDTML
from DateTime.DateTime import DateTime
from ExtensionClass import Base
from BTrees.OOBTree import OOBucket as Bucket
from OFS.role import RoleManager
from OFS.SimpleItem import Item
from Persistence import Persistent
from webdav.Resource import Resource
from webdav.Lockable import ResourceLockedError
from zExceptions import BadRequest
from Aqueduct import BaseQuery
from Aqueduct import custom_default_report
from Aqueduct import default_input_form
from Aqueduct import parse
from RDB import File
from Results import Results
from sqlgroup import SQLGroup
from sqltest import SQLTest
from sqlvar import SQLVar
class DatabaseError(BadRequest):
" base class for external relational data base connection problems "
pass
class nvSQL(HTML):
# Non-validating SQL Template for use by SQLFiles.
commands={}
for k, v in HTML.commands.items():
commands[k]=v
commands['sqlvar'] = SQLVar
commands['sqltest'] = SQLTest
commands['sqlgroup' ] = SQLGroup
_proxy_roles=()
class SQL(RestrictedDTML, Base, nvSQL):
# Validating SQL template for Zope SQL Methods.
pass
class DA(BaseQuery,
Implicit,
Persistent,
RoleManager,
Item,
Resource
):
'Database Adapter'
security = ClassSecurityInfo()
security.declareObjectProtected(use_database_methods)
security.setPermissionDefault(use_database_methods,
('Anonymous', 'Manager'))
_col=None
max_rows_=1000
cache_time_=0
max_cache_=100
class_name_=class_file_=''
allow_simple_one_argument_traversal=None
template_class=SQL
connection_hook=None
manage_options=(
(
{'label':'Edit', 'action':'manage_main',
'help':('ZSQLMethods','Z-SQL-Method_Edit.stx')},
{'label':'Test', 'action':'manage_testForm',
'help':('ZSQLMethods','Z-SQL-Method_Test.stx')},
{'label':'Advanced', 'action':'manage_advancedForm',
'help':('ZSQLMethods','Z-SQL-Method_Advanced.stx')},
)
+ RoleManager.manage_options
+ Item.manage_options
)
def __init__(self, id, title, connection_id, arguments, template):
self.id=str(id)
self.manage_edit(title, connection_id, arguments, template)
security.declareProtected(view_management_screens, 'manage_advancedForm')
manage_advancedForm=DTMLFile('dtml/advanced', globals())
security.declarePublic('test_url')
def test_url_(self):
'Method for testing server connection information'
return 'PING'
_size_changes={
'Bigger': (5,5),
'Smaller': (-5,-5),
'Narrower': (0,-5),
'Wider': (0,5),
'Taller': (5,0),
'Shorter': (-5,0),
}
def _er(self,title,connection_id,arguments,template,
SUBMIT,dtpref_cols,dtpref_rows,REQUEST):
dr,dc = self._size_changes[SUBMIT]
rows = str(max(1, int(dtpref_rows) + dr))
cols = str(dtpref_cols)
if cols.endswith('%'):
cols = str(min(100, max(25, int(cols[:-1]) + dc))) + '%'
else:
cols = str(max(35, int(cols) + dc))
e = (DateTime("GMT") + 365).rfc822()
setCookie = REQUEST["RESPONSE"].setCookie
setCookie("dtpref_rows", rows, path='/', expires=e)
setCookie("dtpref_cols", cols, path='/', expires=e)
REQUEST.other.update({"dtpref_cols":cols, "dtpref_rows":rows})
return self.manage_main(self, REQUEST, title=title,
arguments_src=arguments,
connection_id=connection_id, src=template)
security.declareProtected(change_database_methods, 'manage_edit')
def manage_edit(self,title,connection_id,arguments,template,
SUBMIT='Change', dtpref_cols='100%', dtpref_rows='20',
REQUEST=None):
"""Change database method properties
The 'connection_id' argument is the id of a database connection
that resides in the current folder or in a folder above the
current folder. The database should understand SQL.
The 'arguments' argument is a string containing an arguments
specification, as would be given in the SQL method cration form.
The 'template' argument is a string containing the source for the
SQL Template.
"""
if self._size_changes.has_key(SUBMIT):
return self._er(title,connection_id,arguments,template,
SUBMIT,dtpref_cols,dtpref_rows,REQUEST)
if self.wl_isLocked():
raise ResourceLockedError, 'SQL Method is locked via WebDAV'
self.title=str(title)
self.connection_id=str(connection_id)
arguments=str(arguments)
self.arguments_src=arguments
self._arg=parse(arguments)
template=str(template)
self.src=template
self.template=t=self.template_class(template)
t.cook()
self._v_cache={}, Bucket()
if REQUEST:
if SUBMIT=='Change and Test':
return self.manage_testForm(REQUEST)
message='ZSQL Method content changed'
return self.manage_main(self, REQUEST, manage_tabs_message=message)
return ''
security.declareProtected(change_database_methods, 'manage_advanced')
def manage_advanced(self, max_rows, max_cache, cache_time,
class_name, class_file, direct=None,
REQUEST=None, connection_hook=None):
"""Change advanced properties
The arguments are:
max_rows -- The maximum number of rows to be returned from a query.
max_cache -- The maximum number of results to cache
cache_time -- The maximum amound of time to use a cached result.
class_name -- The name of a class that provides additional
attributes for result record objects. This class will be a
base class of the result record class.
class_file -- The name of the file containing the class
definition.
The class file normally resides in the 'Extensions'
directory, however, the file name may have a prefix of
'product.', indicating that it should be found in a product
directory.
For example, if the class file is: 'ACMEWidgets.foo', then an
attempt will first be made to use the file
'lib/python/Products/ACMEWidgets/Extensions/foo.py'. If this
failes, then the file 'Extensions/ACMEWidgets.foo.py' will be
used.
"""
# paranoid type checking
if type(max_rows) is not type(1):
max_rows=string.atoi(max_rows)
if type(max_cache) is not type(1):
max_cache=string.atoi(max_cache)
if type(cache_time) is not type(1):
cache_time=string.atoi(cache_time)
class_name=str(class_name)
class_file=str(class_file)
self.max_rows_ = max_rows
self.max_cache_, self.cache_time_ = max_cache, cache_time
self._v_cache={}, Bucket()
self.class_name_, self.class_file_ = class_name, class_file
self._v_brain=getBrain(self.class_file_, self.class_name_, 1)
self.allow_simple_one_argument_traversal=direct
self.connection_hook = connection_hook
if REQUEST is not None:
m="ZSQL Method advanced settings have been set"
return self.manage_advancedForm(self,REQUEST,manage_tabs_message=m)
security.declareProtected(view_management_screens, 'PrincipiaSearchSource')
def PrincipiaSearchSource(self):
"""Return content for use by the Find machinery."""
return '%s\n%s' % (self.arguments_src, self.src)
# WebDAV / FTP support
default_content_type = 'text/plain'
security.declareProtected(view_management_screens, 'document_src')
def document_src(self, REQUEST=None, RESPONSE=None):
"""Return unprocessed document source."""
if RESPONSE is not None:
RESPONSE.setHeader('Content-Type', 'text/plain')
return '<params>%s</params>\n%s' % (self.arguments_src, self.src)
def manage_FTPget(self):
"""Get source for FTP download"""
self.REQUEST.RESPONSE.setHeader('Content-Type', 'text/plain')
return '<params>%s</params>\n%s' % (self.arguments_src, self.src)
def get_size(self): return len(self.document_src())
security.declareProtected(change_database_methods, 'PUT')
def PUT(self, REQUEST, RESPONSE):
"""Handle put requests"""
self.dav__init(REQUEST, RESPONSE)
self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1)
body = REQUEST.get('BODY', '')
m = re.match('\s*<params>(.*)</params>\s*\n', body, re.I | re.S)
if m:
self.arguments_src = m.group(1)
self._arg=parse(self.arguments_src)
body = body[m.end():]
template = body
self.src = template
self.template=t=self.template_class(template)
t.cook()
self._v_cache={}, Bucket()
RESPONSE.setStatus(204)
return RESPONSE
security.declareProtected(change_database_methods, 'manage_testForm')
def manage_testForm(self, REQUEST):
" "
input_src=default_input_form(self.title_or_id(),
self._arg, 'manage_test',
'<dtml-var manage_tabs>')
return HTML(input_src)(self, REQUEST, HTTP_REFERER='')
security.declareProtected(change_database_methods, 'manage_test')
def manage_test(self, REQUEST):
"""Test an SQL method."""
# Try to render the query template first so that the rendered
# source will be available for the error message in case some
# error occurs...
try: src=self(REQUEST, src__=1)
except: src="Could not render the query template!"
result=()
t=v=tb=None
try:
try:
src, result=self(REQUEST, test__=1)
if string.find(src,'\0'):
src=string.join(string.split(src,'\0'),'\n'+'-'*60+'\n')
if result._searchable_result_columns():
r=custom_default_report(self.id, result)
else:
r='This statement returned no results.'
except:
t, v, tb = sys.exc_info()
r='<strong>Error, <em>%s</em>:</strong> %s' % (t, v)
report = HTML(
'<html>\n'
'<BODY BGCOLOR="#FFFFFF" LINK="#000099" VLINK="#555555">\n'
'<dtml-var manage_tabs>\n<hr>\n%s\n\n'
'<hr><strong>SQL used:</strong><br>\n<pre>\n%s\n</pre>\n<hr>\n'
'</body></html>'
% (r,html_quote(src)))
report=apply(report,(self,REQUEST),{self.id:result})
if tb is not None:
self.raise_standardErrorMessage(
None, REQUEST, t, v, tb, None, report)
return report
finally: tb=None
security.declareProtected(view_management_screens, 'index_html')
def index_html(self, REQUEST):
""" """
REQUEST.RESPONSE.redirect("%s/manage_testForm" % REQUEST['URL1'])
def _searchable_arguments(self): return self._arg
def _searchable_result_columns(self): return self._col
def _cached_result(self, DB__, query, max_rows, conn_id):
# Try to fetch a result from the cache.
# Compute and cache the result otherwise.
# Also maintains the cache and ensures stale entries
# are never returned and that the cache never gets too large.
# NB: Correct cache behavior is predicated on Bucket.keys()
# returning a sequence ordered from smalled number
# (ie: the oldest cache entry) to largest number
# (ie: the newest cache entry). Please be careful if you
# change the class instantied below!
# get hold of a cache
caches = getattr(self,'_v_cache',None)
if caches is None:
caches = self._v_cache = {}, Bucket()
cache, tcache = caches
# the key for caching
cache_key = query,max_rows,conn_id
# the maximum number of result sets to cache
max_cache=self.max_cache_
# the current time
now=time()
# the oldest time which is not stale
t=now-self.cache_time_
# if the cache is too big, we purge entries from it
if len(cache) >= max_cache:
keys=tcache.keys()
# We also hoover out any stale entries, as we're
# already doing cache minimisation.
# 'keys' is ordered, so we purge the oldest results
# until the cache is small enough and there are no
# stale entries in it
while keys and (len(keys) >= max_cache or keys[0] < t):
key=keys[0]
q=tcache[key]
del tcache[key]
del cache[q]
del keys[0]
# okay, now see if we have a cached result
if cache.has_key(cache_key):
k, r = cache[cache_key]
# the result may still be stale, as we only hoover out
# stale results above if the cache gets too large.
if k > t:
# yay! a cached result returned!
return r
else:
# delete stale cache entries
del cache[cache_key]
del tcache[k]
# call the pure query
result=DB__.query(query,max_rows)
# When a ZSQL method is handled by one ZPublisher thread twice in
# less time than it takes for time.time() to return a different
# value, the SQL generated is different, then this code will leak
# an entry in 'cache' for each time the ZSQL method generates
# different SQL until time.time() returns a different value.
#
# On Linux, you would need an extremely fast machine under extremely
# high load, making this extremely unlikely. On Windows, this is a
# little more likely, but still unlikely to be a problem.
#
# If it does become a problem, the values of the tcache mapping
# need to be turned into sets of cache keys rather than a single
# cache key.
tcache[now]=cache_key
cache[cache_key]= now, result
return result
security.declareProtected(use_database_methods, '__call__')
def __call__(self, REQUEST=None, __ick__=None, src__=0, test__=0, **kw):
"""Call the database method
The arguments to the method should be passed via keyword
arguments, or in a single mapping object. If no arguments are
given, and if the method was invoked through the Web, then the
method will try to acquire and use the Web REQUEST object as
the argument mapping.
The returned value is a sequence of record objects.
"""
__traceback_supplement__ = (SQLMethodTracebackSupplement, self)
if REQUEST is None:
if kw:
REQUEST=kw
else:
if hasattr(self, 'REQUEST'):
REQUEST=self.REQUEST
else:
REQUEST={}
# connection hook
c = self.connection_id
# for backwards compatability
hk = self.connection_hook
# go get the connection hook and call it
if hk:
c = getattr(self, hk)()
try:
dbc=getattr(self, c)
except AttributeError:
raise AttributeError, (
"The database connection <em>%s</em> cannot be found." % (
c))
try:
DB__=dbc()
except: raise DatabaseError, (
'%s is not connected to a database' % self.id)
if hasattr(self, 'aq_parent'):
p=self.aq_parent
else:
p=None
argdata=self._argdata(REQUEST)
argdata['sql_delimiter']='\0'
argdata['sql_quote__']=dbc.sql_quote__
security=getSecurityManager()
security.addContext(self)
try:
try:
query=apply(self.template, (p,), argdata)
except TypeError, msg:
msg = str(msg)
if string.find(msg,'client') >= 0:
raise NameError("'client' may not be used as an " +
"argument name in this context")
else: raise
finally:
security.removeContext(self)
if src__:
return query
if self.cache_time_ > 0 and self.max_cache_ > 0:
result=self._cached_result(DB__, query, self.max_rows_, c)
else:
result=DB__.query(query, self.max_rows_)
if hasattr(self, '_v_brain'):
brain=self._v_brain
else:
brain=self._v_brain=getBrain(self.class_file_, self.class_name_)
if type(result) is type(''):
f=StringIO()
f.write(result)
f.seek(0)
result = File(f,brain,p, None)
else:
result = Results(result, brain, p, None)
columns = result._searchable_result_columns()
if test__ and columns != self._col:
self._col=columns
# If run in test mode, return both the query and results so
# that the template doesn't have to be rendered twice!
if test__:
return query, result
return result
def da_has_single_argument(self): return len(self._arg)==1
def __getitem__(self, key):
args=self._arg
if self.allow_simple_one_argument_traversal and len(args)==1:
results=self({args.keys()[0]: key})
if results:
if len(results) > 1: raise KeyError, key
else: raise KeyError, key
r=results[0]
# if hasattr(self, 'aq_parent'): r=r.__of__(self.aq_parent)
return r
self._arg[key] # raise KeyError if not an arg
return Traverse(self,{},key)
def connectionIsValid(self):
return (hasattr(self, self.connection_id) and
hasattr(getattr(self, self.connection_id), 'connected'))
def connected(self):
return getattr(getattr(self, self.connection_id), 'connected')()
InitializeClass(DA)
ListType=type([])
class Traverse(Base):
"""Helper class for 'traversing' searches during URL traversal
"""
_da=None
def __init__(self, da, args, name=None):
self._r=None
self._da=da
self._args=args
self._name=name
def __bobo_traverse__(self, REQUEST, key):
name=self._name
da=self.__dict__['_da']
args=self._args
if name:
if args.has_key(name):
v=args[name]
if type(v) is not ListType: v=[v]
v.append(key)
key=v
args[name]=key
if len(args) < len(da._arg):
return self.__class__(da, args)
key=self # "consume" key
elif da._arg.has_key(key): return self.__class__(da, args, key)
results=da(args)
if results:
if len(results) > 1:
try: return results[string.atoi(key)].__of__(da)
except: raise KeyError, key
else: raise KeyError, key
r=results[0]
# if hasattr(da, 'aq_parent'): r=r.__of__(da.aq_parent)
self._r=r
if key is self: return r
if hasattr(r,'__bobo_traverse__'):
try: return r.__bobo_traverse__(REQUEST, key)
except: pass
try: return getattr(r,key)
except AttributeError, v:
if str(v) != key: raise AttributeError, v
return r[key]
def __getattr__(self, name):
r=self.__dict__['_r']
if hasattr(r, name): return getattr(r,name)
return getattr(self.__dict__['_da'], name)
class SQLMethodTracebackSupplement:
def __init__(self, sql):
self.object = sql
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''Class for reading RDB files
$Id$'''
__version__='$Revision: 1.33 $'[11:-2]
from string import split, strip, lower, upper, atof, atoi, atol, find, join,find
import DateTime,re
from Missing import MV
from array import array
from Record import Record
from Acquisition import Implicit
import ExtensionClass
def parse_text(s):
if find(s,'\\') < 0 and (find(s,'\\t') < 0 and find(s,'\\n') < 0): return s
r=[]
for x in split(s,'\\\\'):
x=join(split(x,'\\n'),'\n')
r.append(join(split(x,'\\t'),'\t'))
return join(r,'\\')
Parsers={'n': atof,
'i': atoi,
'l': atol,
'd': DateTime.DateTime,
't': parse_text,
}
class SQLAlias(ExtensionClass.Base):
def __init__(self, name): self._n=name
def __of__(self, parent): return getattr(parent, self._n)
class NoBrains: pass
class DatabaseResults:
"""Class for reading RDB files
"""
_index=None
# We need to allow access to not-explicitly-protected
# individual record objects contained in the result.
__allow_access_to_unprotected_subobjects__=1
def __init__(self,file,brains=NoBrains, parent=None, zbrains=None):
self._file=file
readline=file.readline
line=readline()
self._parent=parent
if zbrains is None: zbrains=NoBrains
while line and line.find('#') != -1 : line=readline()
line=line[:-1]
if line and line[-1:] in '\r\n': line=line[:-1]
self._names=names=split(line,'\t')
if not names: raise ValueError, 'No column names'
aliases=[]
self._schema=schema={}
i=0
for name in names:
name=strip(name)
if not name:
raise ValueError, 'Empty column name, %s' % name
if schema.has_key(name):
raise ValueError, 'Duplicate column name, %s' % name
schema[name]=i
n=lower(name)
if n != name: aliases.append((n, SQLAlias(name)))
n=upper(name)
if n != name: aliases.append((n, SQLAlias(name)))
i=i+1
self._nv=nv=len(names)
line=readline()
line=line[:-1]
if line[-1:] in '\r\n': line=line[:-1]
self._defs=defs=split(line,'\t')
if not defs: raise ValueError, 'No column definitions'
if len(defs) != nv:
raise ValueError, (
"""The number of column names and the number of column
definitions are different.""")
i=0
self._parsers=parsers=[]
defre=re.compile(r'([0-9]*)([a-zA-Z])?')
self._data_dictionary=dd={}
self.__items__=items=[]
for _def in defs:
_def=strip(_def)
if not _def:
raise ValueError, ('Empty column definition for %s' % names[i])
mo = defre.match(_def)
if mo is None:
raise ValueError, (
'Invalid column definition for, %s, for %s'
% _def, names[i])
type = mo.group(2).lower()
width = mo.group(1)
if width: width=atoi(width)
else: width=8
try: parser=Parsers[type]
except: parser=str
name=names[i]
d={'name': name, 'type': type, 'width': width, 'parser': parser}
items.append(d)
dd[name]=d
parsers.append((i,parser))
i=i+1
# Create a record class to hold the records.
names=tuple(names)
class r(Record, Implicit, brains, zbrains):
'Result record class'
r.__record_schema__=schema
for k in filter(lambda k: k[:2]=='__', Record.__dict__.keys()):
setattr(r,k,getattr(Record,k))
# Add SQL Aliases
for k, v in aliases:
if not hasattr(r,k):
setattr(r, k, v)
if hasattr(brains, '__init__'):
binit=brains.__init__
if hasattr(binit,'im_func'): binit=binit.im_func
def __init__(self, data, parent, binit=binit):
Record.__init__(self,data)
binit(self.__of__(parent))
setattr(r, '__init__', __init__)
self._class=r
# OK, we've read meta data, now get line indexes
p=file.tell()
save=self._lines=array('i')
save=save.append
l=readline()
while l:
save(p)
p=p+len(l)
l=readline()
def _searchable_result_columns(self): return self.__items__
def names(self): return self._names
def data_dictionary(self): return self._data_dictionary
def __len__(self): return len(self._lines)
def __getitem__(self,index):
if index==self._index: return self._row
file=self._file
file.seek(self._lines[index])
line=file.readline()
line=line[:-1]
if line and line[-1:] in '\r\n': line=line[:-1]
fields=split(line,'\t')
l=len(fields)
nv=self._nv
if l != nv:
if l < nv:
fields=fields+['']*(nv-l)
else:
raise ValueError, (
"""The number of items in record %s is invalid
<pre>%s\n%s\n%s\n%s</pre>
"""
% (index, ('='*40), line, ('='*40), fields))
for i, parser in self._parsers:
try: v=parser(fields[i])
except:
if fields[i]:
raise ValueError, (
"""Invalid value, %s, for %s in record %s"""
% (fields[i], self._names[i], index))
else: v=MV
fields[i]=v
parent=self._parent
fields=self._class(fields, parent)
self._index=index
self._row=fields
if parent is None: return fields
return fields.__of__(parent)
File=DatabaseResults
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import ExtensionClass
from Acquisition import Implicit
from Record import Record
class SQLAlias(ExtensionClass.Base):
def __init__(self, name): self._n=name
def __of__(self, parent): return getattr(parent, self._n)
class NoBrains: pass
class Results:
"""Class for providing a nice interface to DBI result data
"""
_index=None
# We need to allow access to not-explicitly-protected
# individual record objects contained in the result.
__allow_access_to_unprotected_subobjects__=1
def __init__(self,(items,data),brains=NoBrains, parent=None,
zbrains=None):
self._data=data
self.__items__=items
self._parent=parent
self._names=names=[]
self._schema=schema={}
self._data_dictionary=dd={}
aliases=[]
if zbrains is None: zbrains=NoBrains
i=0
for item in items:
name=item['name']
name=name.strip()
if not name:
raise ValueError, 'Empty column name, %s' % name
if schema.has_key(name):
raise ValueError, 'Duplicate column name, %s' % name
schema[name]=i
n=name.lower()
if n != name: aliases.append((n, SQLAlias(name)))
n=name.upper()
if n != name: aliases.append((n, SQLAlias(name)))
dd[name]=item
names.append(name)
i=i+1
self._nv=nv=len(names)
# Create a record class to hold the records.
names=tuple(names)
class r(Record, Implicit, brains, zbrains):
'Result record class'
r.__record_schema__=schema
for k in Record.__dict__.keys():
if k[:2]=='__':
setattr(r,k,getattr(Record,k))
# Add SQL Aliases
for k, v in aliases:
if not hasattr(r, k):
setattr(r, k, v)
if hasattr(brains, '__init__'):
binit=brains.__init__
if hasattr(binit,'im_func'): binit=binit.im_func
def __init__(self, data, parent, binit=binit):
Record.__init__(self,data)
if parent is not None: self=self.__of__(parent)
binit(self)
setattr(r, '__init__', __init__)
self._class=r
# OK, we've read meta data, now get line indexes
def _searchable_result_columns(self): return self.__items__
def names(self): return self._names
def data_dictionary(self): return self._data_dictionary
def __len__(self): return len(self._data)
def __getitem__(self,index):
if index==self._index: return self._row
parent=self._parent
fields=self._class(self._data[index], parent)
if parent is not None: fields=fields.__of__(parent)
self._index=index
self._row=fields
return fields
def tuples(self):
return map(tuple, self)
def dictionaries(self):
r=[]
a=r.append
names=self.names()
for row in self:
d={}
for n in names: d[n]=row[n]
a(d)
return r
def asRDB(self): # Waaaaa
r=[]
append=r.append
strings=[]
nstrings=[]
items=self.__items__
indexes=range(len(items))
for i in indexes:
item=items[i]
t=item['type'].lower()
if t=='s' or t=='t':
t=='t'
strings.append(i)
else: nstrings.append(i)
if item.has_key('width'): append('%s%s' % (item['width'], t))
else: r.append(t)
r=['\t'.join(self._names), '\t'.join(r)]
append=r.append
row=['']*len(items)
tostr=str
for d in self._data:
for i in strings:
v=tostr(d[i])
if v:
if v.find('\\') > 0: v='\\\\'.join(v.split('\\'))
if v.find('\t') > 0: v='\\t'.join(v.split('\t'))
if v.find('\n') > 0: v='\\n'.join(v.split('\n'))
row[i]=v
for i in nstrings:
row[i]=tostr(d[i])
append('\t'.join(row))
append('')
return '\n'.join(r)
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''Search Interface Wizard
$Id$'''
__version__='$Revision: 1.22 $'[11:-2]
from string import join
from cgi import escape
from App.special_dtml import DTMLFile
from Aqueduct import custom_default_report
from Aqueduct import custom_default_zpt_report
from Aqueduct import nicify
from Aqueduct import Args
from AccessControl import getSecurityManager
addForm=DTMLFile('dtml/searchAdd', globals())
def manage_addZSearch(self, report_id, report_title, report_style,
input_id, input_title, object_type, queries=[],
REQUEST=None):
'add a report'
if not queries: raise ValueError, (
'No <em>searchable objects</em> were selected')
if not report_id: raise ValueError, (
'No <em>report id</em> were specified')
if input_title and not input_id: raise ValueError, (
'No <em>input id</em> were specified')
qs=map(lambda q, self=self: _getquery(self, q), queries)
arguments={}
keys=[]
checkPermission=getSecurityManager().checkPermission
for q in qs:
url=q.absolute_url()
if input_id:
for name, arg in q._searchable_arguments().items():
if len(qs) > 1: key="%s/%s" % (id,name)
else: key=name
arguments[key]=arg
keys.append(key)
if q._searchable_result_columns() is None:
raise ValueError,(
"""The input searchable object, <em>%s</em>,
has not been tested. Until it has been tested,
it\'s output schema is unknown, and a report
cannot be generated. Before creating a report
from this query, you must try out the query. To
try out the query, <a href="%s">click here</a>.
""" % (escape(q.title_and_id()), escape(url, 1)))
if object_type == 'dtml_methods':
if not checkPermission('Add DTML Methods', self):
raise Unauthorized, (
'You are not authorized to add DTML Methods.'
)
if input_id:
arguments=Args(arguments, keys)
self.manage_addDocument(
input_id,input_title,
default_input_form(arguments, report_id))
self.manage_addDocument(
report_id,report_title,
('<html><head><title><dtml-var title_or_id></title>'
'</head><body bgcolor="#FFFFFF">\n%s\n'
'</body></html>' %
join(map(lambda q, report_style=report_style:
custom_default_report(q.id, q, no_table=report_style), qs),
'\n<hr>\n')))
if REQUEST: return self.manage_main(self,REQUEST)
elif object_type == 'page_templates':
if not checkPermission('Add Page Templates', self):
raise Unauthorized, (
'You are not authorized to add Page Templates.'
)
if input_id:
arguments = Args(arguments, keys)
self.manage_addProduct['PageTemplates'].manage_addPageTemplate(
input_id, input_title,
default_input_zpt_form(arguments, report_id))
self.manage_addProduct['PageTemplates'].manage_addPageTemplate(
report_id,report_title,
('<html><body>\n%s\n'
'</body></html>' %
join(map(lambda q, report_style=report_style:
custom_default_zpt_report(q.id, q, no_table=report_style), qs),
'\n<hr>\n')))
if REQUEST: return self.manage_main(self,REQUEST)
def ZQueryIds(self):
# Note that report server configurations will expend on this
t=[]
ids={}
old=ids.has_key
o=self
n=0
while 1:
# Look for queries
try: map=o.objectMap()
except AttributeError: map=()
for i in map:
try:
id=i['id']
if (not old(id) and
hasattr(getattr(o,id),'_searchable_arguments')
):
t.append(i['id'])
ids[id]=1
except: pass
# Now extend search to parent
try: o=o.aq_parent
except: return t
if n > 100: return t # Seat belt
n=n+1
def _getquery(self,id):
o=self
i=0
while 1:
__traceback_info__=o
q=getattr(o,id)
try:
if hasattr(q,'_searchable_arguments'):
try: q=q.__of__(self.aq_parent)
except: pass
return q
except: pass
if i > 100: raise AttributeError, id
i=i+1
o=o.aq_parent
def default_input_form(arguments,action='query',
tabs=''):
if arguments:
items=arguments.items()
return (
"%s\n%s%s" % (
'<html><head><title><dtml-var title_or_id></title>'
'</head><body bgcolor="#FFFFFF">\n%s\n'
'<form action="%s" method="get">\n'
'<h2><dtml-var document_title></h2>\n'
'Enter query parameters:<br>'
'<table>\n'
% (tabs,action),
join(
map(
lambda a:
('<tr><th>%s</th>\n'
' <td><input name="%s"\n'
' size="30" value="%s">'
'</td></tr>'
% (nicify(a[0]),
(
a[1].has_key('type') and
("%s:%s" % (a[0],a[1]['type'])) or
a[0]
),
a[1].has_key('default') and a[1]['default'] or ''
))
, items
),
'\n'),
'\n<tr><td colspan=2 align=center>\n'
'<input type="SUBMIT" name="SUBMIT" value="Submit Query">\n'
'</td></tr>\n</table>\n</form>\n'
'</body></html>\n'
)
)
else:
return (
'<html><head><title><dtml-var title_or_id></title>'
'</head><body bgcolor="#FFFFFF">\n%s\n'
'<form action="%s" method="get">\n'
'<h2><dtml-var document_title></h2>\n'
'This query requires no input.<p>\n'
'<input type="SUBMIT" name="SUBMIT" value="Submit Query">\n'
'</form>\n'
'</body></html>\n'
% (tabs, action)
)
def default_input_zpt_form(arguments,action='query',
tabs=''):
if arguments:
items=arguments.items()
return (
"%s\n%s%s" % (
'<html><body>\n%s\n'
'<form action="%s" method="get">\n'
'<h2 tal:content="template/title_or_id">Title</h2>\n'
'Enter query parameters:<br>'
'<table>\n'
% (tabs,action),
join(
map(
lambda a:
('<tr><th>%s</th>\n'
' <td><input name="%s"\n'
' size="30" value="%s">'
'</td></tr>'
% (nicify(a[0]),
(
a[1].has_key('type') and
("%s:%s" % (a[0],a[1]['type'])) or
a[0]
),
a[1].has_key('default') and a[1]['default'] or ''
))
, items
),
'\n'),
'\n<tr><td colspan=2 align=center>\n'
'<input type="SUBMIT" name="SUBMIT" value="Submit Query">\n'
'</td></tr>\n</table>\n</form>\n'
'</body></html>\n'
)
)
else:
return (
'<html><body>\n%s\n'
'<form action="%s" method="get">\n'
'<h2 tal:content="template/title_or_id">Title</h2>\n'
'<p>This query requires no input.</p>\n'
'<input type="SUBMIT" name="SUBMIT" value="Submit Query">\n'
'</form>\n'
'</body></html>\n'
% (tabs, action)
)
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import thread
import TM
from TM import Surrogate
import transaction
thunk_lock = thread.allocate_lock()
class THUNKED_TM(TM.TM):
"""A big heavy hammer for handling non-thread safe DAs
"""
def _register(self):
if not self._registered:
thunk_lock.acquire()
try:
transaction.get().register(Surrogate(self))
self._begin()
except:
thunk_lock.release()
raise
else:
self._registered=1
def tpc_finish(self, *ignored):
if self._registered:
try:
self._finish()
finally:
thunk_lock.release()
self._registered=0
def abort(self, *ignored):
if self._registered:
try:
self._abort()
finally:
thunk_lock.release()
self._registered=0
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Provide support for linking an external transaction manager with Zope's
"""
import transaction
class TM:
"""Mix-in class that provides transaction management support
A sub class should call self._register() whenever it performs any
transaction-dependent operations (e.g. sql statements).
The sub class will need to override _finish, to finalize work,
_abort, to roll-back work, and perhaps _begin, if any work is
needed at the start of a transaction.
A subclass that uses locking during transaction commit must
define a sortKey() method.
"""
_registered=None
def _begin(self): pass
def _register(self):
if not self._registered:
try:
transaction.get().register(Surrogate(self))
self._begin()
self._registered = 1
self._finalize = 0
except: pass
def tpc_begin(self, *ignored): pass
commit=tpc_begin
def _finish(self):
self.db.commit()
def _abort(self):
self.db.rollback()
def tpc_vote(self, *ignored):
self._finalize = 1
def tpc_finish(self, *ignored):
if self._finalize:
try: self._finish()
finally: self._registered=0
def abort(self, *ignored):
try: self._abort()
finally: self._registered=0
tpc_abort = abort
# Most DA's talking to RDBMS systems do not care about commit order, so
# return the constant 1
_sort_key = 1
def sortKey(self, *ignored):
""" The sortKey method is used by the transaction subsystem to have a
known commit order for lock acquisition.
"""
return self._sort_key
def setSortKey(self, sort_key):
self._sort_key = sort_key
class Surrogate:
def __init__(self, db):
self._p_jar=db
self.__inform_commit__=db.tpc_finish
self.__inform_abort__=db.tpc_abort
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''short description
$Id$'''
# Copyright
#
# Copyright 1997 Digital Creations, Inc, 910 Princess Anne
# Street, Suite 300, Fredericksburg, Virginia 22401 U.S.A. All
# rights reserved.
#
__version__='$Revision: 1.11 $'[11:-2]
import string, sys
from string import strip, split, find, join
import transaction
failures=0
calls=0
nonselect_desc=[
('Query', 'STRING', 62, 62, 0, 0, 1),
('Status', 'STRING', 12, 12, 0, 0, 1),
('Calls', 'STRING', 12, 12, 0, 0, 1),
]
class QueryError(Exception):
pass
class DB:
_p_oid=_p_changed=_registered=None
defs={'STRING':'s', 'NUMBER':'n', 'DATE':'d'}
def Database_Connection(self, string):
# Create a dbi-compatible database connection
raise NotImplemetedError, (
'attempt to create a database connection for an abstract dbi')
Database_Error='Should be overriden by subclass'
def __init__(self,connection):
self.connection=connection
db=self.db=self.Database_Connection(connection)
self.cursor=db.cursor()
def str(self,v, StringType=type('')):
if v is None: return ''
r=str(v)
if r[-1:]=='L' and type(v) is not StringType: r=r[:-1]
return r
def __inform_commit__(self, *ignored):
self._registered=None
self.db.commit()
def __inform_abort__(self, *ignored):
self._registered=None
self.db.rollback()
def register(self):
if self._registered: return
transaction.get().register(self)
self._registered=1
def query(self,query_string, max_rows=9999999):
global failures, calls
calls=calls+1
try:
c=self.cursor
self.register()
queries=filter(None, map(strip,split(query_string, '\0')))
if not queries: raise QueryError, 'empty query'
if len(queries) > 1:
result=[]
for qs in queries:
r=c.execute(qs)
if r is None: raise QueryError, (
'select in multiple sql-statement query'
)
result.append((qs, str(`r`), calls))
desc=nonselect_desc
else:
query_string=queries[0]
r=c.execute(query_string)
if r is None:
result=c.fetchmany(max_rows)
desc=c.description
else:
result=((query_string, str(`r`), calls),)
desc=nonselect_desc
failures=0
c.close()
except self.Database_Error, mess:
c.close()
self.db.rollback()
failures=failures+1
if ((find(mess,": invalid") < 0 and
find(mess,"PARSE") < 0) or
# DBI IS stupid
find(mess,
"Error while trying to retrieve text for error") > 0
or
# If we have a large number of consecutive failures,
# our connection is probably dead.
failures > 100
):
# Hm. maybe the db is hosed. Let's try once to restart it.
failures=0
c.close()
self.db.close()
db=self.db=self.Database_Connection(self.connection)
self.cursor=db.cursor()
c=self.cursor
c.execute(query_string)
result=c.fetchall()
desc=c.description
else:
raise sys.exc_info()
if result:
result=join(
map(
lambda row, self=self:
join(map(self.str,row),'\t'),
result),
'\n')+'\n'
else:
result=''
return (
"%s\n%s\n%s" % (
join(map(lambda d: d[0],desc), '\t'),
join(
map(
lambda d, defs=self.defs: "%d%s" % (d[2],defs[d[1]]),
desc),
'\t'),
result,
)
)
<dtml-var manage_page_header>
<dtml-var manage_tabs>
<form action="manage_advanced" method="post">
<table>
<table cellspacing="0" cellpadding="2" border="0">
<tr>
<td align="left" valign="top">
<div class="form-optional">
Connection Hook
</div>
</td>
<td align="LEFT" valign="TOP">
<input type="TEXT" name="connection_hook" size="40"
value="<dtml-var connection_hook null="" missing="" html_quote>">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Maximum rows to retrieve
</div>
</td>
<td align="left" valign="top">
<input name="max_rows:int" size="10" value="&dtml-max_rows_;">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Maximum results to cache
</div>
</td>
<td align="left" valign="top">
<input name="max_cache:int" size="10" value="&dtml-max_cache_;">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Maximum time (sec) to cache
</div>
</td>
<td align="left" valign="top">
<input name="cache_time:int" size="10" value="&dtml-cache_time_;">
</td>
</tr>
<dtml-if da_has_single_argument>
<tr>
<td align="left" valign="top">
<div class="form-label">
Allow "simple" direct traversal
</div>
</td>
<td align="left" valign="top">
<input name="direct" type="checkbox" <dtml-if
allow_simple_one_argument_traversal>checked</dtml-if> />
</td>
</tr>
</dtml-if>
<tr>
<td align="left" valign="top" colspan="2">
<div class="form-text">
<br />
You may specify a <strong>class</strong> for the data records. This
class must be defined in a file that resides in the <code>Extensions</code>
directory of this Zope installation.
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Class Name
</div>
</td>
<td align="left" valign="top">
<input name="class_name" size="30" value="&dtml-class_name_;">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Class File
</div>
</td>
<td align="left" valign="top">
<input name="class_file" size="30" value="&dtml-class_file_;">
</td>
</tr>
<tr>
<td align="left" valign="top">
</td>
<td align="left" valign="top">
<div class="form-element">
<input class="form-element" type="submit" name="submit"
value="Save Changes">
</div>
</td>
</tr>
</table>
</form>
<dtml-var manage_page_footer>
<dtml-var manage_page_header>
<dtml-var "manage_form_title(this(), _,
form_title='Add %s Connection' % database_type
)">
<form action="manage_addZ&dtml-database_type;Connection"
method="POST">
<table cellspacing="2">
<tr>
<td align="left" valign="top">
<div class="form-label">
Id
</div>
</td>
<td align="LEFT" valign="TOP">
<input type="TEXT" name="id" size="40"
value="&dtml-default_id;">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-optional">
Title
</div>
</td>
<td align="LEFT" valign="TOP">
<input type="TEXT" name="title" size="40"
value="&dtml-default_title;">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Connection String
</div>
</td>
<td align="LEFT" valign="TOP">
<input type="TEXT" name="connection_string" size="40">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Connect Immediately?
</div>
</td>
<td align="LEFT" valign="TOP">
<input name="check" type="CHECKBOX" value="YES" CHECKED>
</td>
</tr>
<tr>
<td align="left" valign="top">
</td>
<td align="left" valign="top">
<div class="form-element">
<br />
<input class="form-element" type="submit" name="submit" value="Add">
</div>
</td>
</tr>
</table>
</form>
<dtml-var manage_page_footer>
<dtml-var manage_page_header>
<dtml-var manage_tabs>
<form action="manage_edit" method="POST">
<table cellspacing="2">
<tr>
<td align="left" valign="top">
<div class="form-label">
Id
</div>
</td>
<td align="LEFT" valign="TOP">&dtml-id;</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-optional">
Title
</div>
</td>
<td align="LEFT" valign="TOP">
<input type="TEXT" name="title" size="40"
value="&dtml-title;">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Connection String
</div>
</td>
<td align="LEFT" valign="TOP">
<input type="TEXT" name="connection_string" size="40"
value="&dtml-connection_string;">
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Connect Immediately
</div>
</td>
<td align="LEFT" valign="TOP">
<input name="check" type="CHECKBOX" value="YES" CHECKED>
</td>
</tr>
<tr>
<td align="left" valign="top">
</td>
<td align="left" valign="top">
<div class="form-element">
<br />
<input class="form-element" type="submit" name="submit"
value="Save Changes">
</div>
</td>
</tr>
</table>
</form>
<dtml-var manage_page_footer>
<dtml-var manage_page_header>
<dtml-var manage_tabs>
<p class="form-text">
The database connection is <dtml-if
connected>open<dtml-else>closed</dtml-if>.
</p>
<p>
<div class="form-element">
<dtml-if connected>
<form action="manage_close_connection" method="get">
<input class="form-element" type="submit" name="submit"
value="Close Connection">
</form>
<dtml-else>
<form action="manage_open_connection" method="get">
<input class="form-element" type="submit" name="submit"
value="Open Connection">
</form>
</dtml-if>
</div>
</p>
<dtml-var manage_page_footer>
<dtml-var manage_page_header>
<dtml-var manage_tabs>
<dtml-if connected>
<p class="form-help">
You can write a test query and run it with this database connection
using the form below. Click <em>submit query</em> to run the query.
</p>
<form action="manage_test" method="post">
<table cellspacing="0" cellpadding="2" border="0">
<tr>
<td align="left" valign="top">
<div style="width: 100%;">
<textarea style="width: 100%;" rows="10" cols="60" name="query:text"
wrap="off" accesskey="e" tabindex="1"></textarea>
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-element">
<input class="form-element" type="submit" name="submit"
value="Submit Query" tabindex="2">
</div>
</td>
</tr>
</table>
</form>
<dtml-else>
<p class="form-help">
This database connection is currently closed. You must click on the
<em>status</em> tab and open the connection before you can test it.
</p>
</dtml-if>
<dtml-var manage_page_footer>
<dtml-in %(id)s size=20 start=query_start>
<dtml-if sequence-start>
<dtml-if previous-sequence>
<a href="&dtml-URL;?query_start=&dtml-previous-sequence-start-number;&query=<dtml-var query url_quote missing=''>">
(Previous <dtml-var previous-sequence-size> results)
</a>
</dtml-if previous-sequence>
%(else no_table)[<table border>
%(heading)s
%(else)]
</dtml-if sequence-start>
%(row)s
<dtml-if sequence-end>
%(else no_table)[</table>%(else)]
<dtml-if next-sequence>
<a href="&dtml-URL;?query_start=&dtml-next-sequence-start-number;&query=<dtml-var query url_quote missing=''>">
(Next <dtml-var next-sequence-size> results)
</a>
</dtml-if next-sequence>
</dtml-if sequence-end>
<dtml-else>
There was no data matching this &dtml-title_or_id; query.
</dtml-in>
<html>
<body tal:define="results here/%(id)s;
start request/start|python:0;
batch python:modules['ZTUtils'].Batch(results,
size=20,
start=start);
previous python:batch.previous;
next python:batch.next">
<p>
<a tal:condition="previous"
tal:attributes="href string:${request/URL0}?start:int=${previous/first}"
href="previous_url">previous <span tal:replace="previous/length">20</span> results</a>
<a tal:condition="next"
tal:attributes="href string:${request/URL0}?start:int=${next/first}"
href="next_url">next <span tal:replace="next/length">20</span> results</a>
</p>
%(else no_table)[<table border>
%(heading)s
%(else)]
<tal:x repeat="result batch" >
%(row)s
</tal:x>
%(else no_table)[</table>%(else)]
<p>
<a tal:condition="previous"
tal:attributes="href string:${request/URL0}?start:int=${previous/first}"
href="previous_url">previous <span tal:replace="previous/length">20</span> results</a>
<a tal:condition="next"
tal:attributes="href string:${request/URL0}?start:int=${next/first}"
href="next_url">next <span tal:replace="next/length">20</span> results</a>
</p>
</body>
</html>
<dtml-var manage_page_header>
<dtml-var "manage_form_title(this(), _,
form_title='Add Search Interface',
help_product='OFSP',
help_topic='ZSearch-Interface_Add.stx'
)">
<form action="manage_addZSearch" method="POST">
<p class="form-help">
A Search Interface allows you to search Zope databases.
The Search Interface will create a
search-input form and a report for displaying the search results.
</p>
<p class="form-help">
In the form below, <em>seachable objects</em> are the objects
(usually SQL Methods) to be searched. <em>report id</em> and
<em>search input id</em> are the ids of the
report and search form objects that will be created.
<em>report style</em> indicates the type of report to generate.
</p>
<table>
<tr>
<td align="left" valign="top">
<div class="form-label">
Select one<br>or more<br>searchable <br>objects
</div>
</td>
<td align="left" valign="top">
<div class="form-element">
<select name="queries:list" size="4" multiple>
<dtml-in ZQueryIds>
<option>&dtml-sequence-item;</option>
</dtml-in>
</select>
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Report Id
</div>
</td>
<td align="left" valign="top">
<input type="text" name="report_id" size="40" value="" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-optional">
Report Title
</div>
</td>
<td align="left" valign="top">
<input type="text" name="report_title" size="40" value="" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Report Style
</div>
</td>
<td align="left" valign="top">
<div class="form-element">
<select name='report_style:int'>
<option value="0">Tabular</option>
<option value="1">Records</option>
</select>
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-optional">
Search Input Id
</div>
</td>
<td align="left" valign="top">
<input type="text" name="input_id" size="40" value="" />
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-optional">
Search Input Title
</div>
</td>
<td align="left" valign="top">
<input type="text" name="input_title" size="40" value="" />
</td>
</tr
<tr>
<td align="left" valign="top">
</td>
<td align="left" valign="top">
<div class="form-element">
<input class="form-element" type="radio" name="object_type"
value="dtml_methods">Generate DTML Methods
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
</td>
<td align="left" valign="top">
<div class="form-element">
<input class="form-element" type="radio" name="object_type"
value="page_templates">Generate Page Templates
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
</td>
<td align="left" valign="top">
<div class="form-element">
<input class="form-element" type="submit" name="submit"
value="Add">
</div>
</td>
</tr>
</table>
</form>
<dtml-var manage_page_footer>
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Inserting optional tests with 'sqlgroup'
It is sometimes useful to make inputs to an SQL statement
optinal. Doing so can be difficult, because not only must the
test be inserted conditionally, but SQL boolean operators may or
may not need to be inserted depending on whether other, possibly
optional, comparisons have been done. The 'sqlgroup' tag
automates the conditional insertion of boolean operators.
The 'sqlgroup' tag is a block tag. It can
have any number of 'and' and 'or' continuation tags.
The 'sqlgroup' tag has an optional attribure, 'required' to
specify groups that must include at least one test. This is
useful when you want to make sure that a query is qualified, but
want to be very flexible about how it is qualified.
Suppose we want to find people with a given first or nick name,
city or minimum and maximum age. Suppose we want all inputs to be
optional, but want to require *some* input. We can
use DTML source like the following::
<dtml-sqlgroup required>
<dtml-sqlgroup>
<dtml-sqltest name column=nick_name type=nb multiple optional>
<dtml-or>
<dtml-sqltest name column=first_name type=nb multiple optional>
</dtml-sqlgroup>
<dtml-and>
<dtml-sqltest home_town type=nb optional>
<dtml-and>
<dtml-if minimum_age>
age >= <dtml-sqlvar minimum_age type=int>
</dtml-if>
<dtml-and>
<dtml-if maximum_age>
age <= <dtml-sqlvar maximum_age type=int>
</dtml-if>
</dtml-sqlgroup>
This example illustrates how groups can be nested to control
boolean evaluation order. It also illustrates that the grouping
facility can also be used with other DTML tags like 'if' tags.
The 'sqlgroup' tag checks to see if text to be inserted contains
other than whitespace characters. If it does, then it is inserted
with the appropriate boolean operator, as indicated by use of an
'and' or 'or' tag, otherwise, no text is inserted.
$Id$
"""
############################################################################
# Copyright
#
# Copyright 1996 Digital Creations, L.C., 910 Princess Anne
# Street, Suite 300, Fredericksburg, Virginia 22401 U.S.A. All
# rights reserved.
#
############################################################################
__rcs_id__='$Id$'
__version__='$Revision: 1.10 $'[11:-2]
from DocumentTemplate.DT_Util import parse_params
str = __builtins__['str']
from string import strip, join
import sys
_TNAME_MAPPING = {'comma': ','}
class SQLGroup:
blockContinuations = 'and', 'or', 'comma'
name = 'sqlgroup'
required = None
where = None
set = None
noparens = None
def __init__(self, blocks):
self.blocks = blocks
tname, args, section = blocks[0]
self.__name__ = "%s %s" % (tname, args)
args = parse_params(args, required=1, where=1, set=1, noparens=1)
if args.has_key(''):
args[args['']] = 1
if args.has_key('required'):
self.required = args['required']
if args.has_key('where'):
self.where = args['where']
if args.has_key('set'):
self.set = args['set']
if args.has_key('noparens'):
self.noparens = args['noparens']
def render(self,md):
r = []
for tname, args, section in self.blocks:
__traceback_info__ = tname
s = strip(section(None, md))
if s:
if r:
r.append(_TNAME_MAPPING.get(tname, tname))
if self.noparens:
r.append(s)
else:
r.append("%s\n" % s)
if r:
if len(r) > 1:
if self.noparens:
r = "%s\n" % join(r,' ')
else:
r = "(%s)\n" % join(r,' ')
else:
r = r[0]
if self.set:
r = "set\n"+r
if self.where:
r = "where\n"+r
return r
if self.required:
raise ValueError, 'Not enough input was provided!<p>'
return ''
__call__ = render
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
'''Inserting optional tests with 'sqlgroup'
It is sometimes useful to make inputs to an SQL statement
optinal. Doing so can be difficult, because not only must the
test be inserted conditionally, but SQL boolean operators may or
may not need to be inserted depending on whether other, possibly
optional, comparisons have been done. The 'sqlgroup' tag
automates the conditional insertion of boolean operators.
The 'sqlgroup' tag is a block tag that has no attributes. It can
have any number of 'and' and 'or' continuation tags.
Suppose we want to find all people with a given first or nick name
and optionally constrain the search by city and minimum and
maximum age. Suppose we want all inputs to be optional. We can
use DTML source like the following::
<dtml-sqlgroup>
<dtml-sqlgroup>
<dtml-sqltest name column=nick_name type=nb multiple optional>
<dtml-or>
<dtml-sqltest name column=first_name type=nb multiple optional>
</dtml-sqlgroup>
<dtml-and>
<dtml-sqltest home_town type=nb optional>
<dtml-and>
<dtml-if minimum_age>
age >= <dtml-sqlvar minimum_age type=int>
</dtml-if>
<dtml-and>
<dtml-if maximum_age>
age <= <dtml-sqlvar maximum_age type=int>
</dtml-if>
</dtml-sqlgroup>
This example illustrates how groups can be nested to control
boolean evaluation order. It also illustrates that the grouping
facility can also be used with other DTML tags like 'if' tags.
The 'sqlgroup' tag checks to see if text to be inserted contains
other than whitespace characters. If it does, then it is inserted
with the appropriate boolean operator, as indicated by use of an
'and' or 'or' tag, otherwise, no text is inserted.
'''
__rcs_id__='$Id$'
############################################################################
# Copyright
#
# Copyright 1996 Digital Creations, L.C., 910 Princess Anne
# Street, Suite 300, Fredericksburg, Virginia 22401 U.S.A. All
# rights reserved.
#
############################################################################
__version__='$Revision: 1.5 $'[11:-2]
import sys
from DocumentTemplate.DT_Util import ParseError, parse_params, name_param
str=__builtins__['str']
from string import find, split, join, atoi, atof
from types import ListType, TupleType, StringType
class SQLTest:
name='sqltest'
optional=multiple=None
def __init__(self, args):
args = parse_params(args, name='', expr='', type=None, column=None,
multiple=1, optional=1, op=None)
name,expr = name_param(args,'sqlvar',1)
if expr is None:
expr=name
else: expr=expr.eval
self.__name__, self.expr = name, expr
self.args=args
if not args.has_key('type'):
raise ParseError, ('the type attribute is required', 'sqltest')
self.type=t=args['type']
if not valid_type(t):
raise ParseError, ('invalid type, %s' % t, 'sqltest')
if args.has_key('optional'): self.optional=args['optional']
if args.has_key('multiple'): self.multiple=args['multiple']
if args.has_key('column'):
self.column=args['column']
elif self.__name__ is None:
err = ' the column attribute is required if an expression is used'
raise ParseError, (err, 'sqltest')
else:
self.column=self.__name__
# Deal with optional operator specification
op = '=' # Default
if args.has_key('op'):
op = args['op']
# Try to get it from the chart, otherwise use the one provided
op = comparison_operators.get(op, op)
self.op = op
def render(self, md):
name=self.__name__
t=self.type
args=self.args
try:
expr=self.expr
if type(expr) is type(''):
v=md[expr]
else:
v=expr(md)
except KeyError:
if args.has_key('optional') and args['optional']:
return ''
raise ValueError, 'Missing input variable, <em>%s</em>' % name
if type(v) in (ListType, TupleType):
if len(v) > 1 and not self.multiple:
raise ValueError, (
'multiple values are not allowed for <em>%s</em>'
% name)
else: v=[v]
vs=[]
for v in v:
if not v and type(v) is StringType and t != 'string': continue
if t=='int':
try:
if type(v) is StringType:
if v[-1:]=='L':
v=v[:-1]
atoi(v)
else: v=str(int(v))
except ValueError:
raise ValueError, (
'Invalid integer value for <em>%s</em>' % name)
elif t=='float':
if not v and type(v) is StringType: continue
try:
if type(v) is StringType: atof(v)
else: v=str(float(v))
except ValueError:
raise ValueError, (
'Invalid floating-point value for <em>%s</em>' % name)
else:
if not isinstance(v, (str, unicode)):
v = str(v)
v=md.getitem('sql_quote__',0)(v)
#if find(v,"\'") >= 0: v=join(split(v,"\'"),"''")
#v="'%s'" % v
vs.append(v)
if not vs and t=='nb':
if args.has_key('optional') and args['optional']:
return ''
else:
err = 'Invalid empty string value for <em>%s</em>' % name
raise ValueError, err
if not vs:
if self.optional: return ''
raise ValueError, (
'No input was provided for <em>%s</em>' % name)
if len(vs) > 1:
vs=join(map(str,vs),', ')
if self.op == '<>':
## Do the equivalent of 'not-equal' for a list,
## "a not in (b,c)"
return "%s not in (%s)" % (self.column, vs)
else:
## "a in (b,c)"
return "%s in (%s)" % (self.column, vs)
return "%s %s %s" % (self.column, self.op, vs[0])
__call__=render
valid_type={'int':1, 'float':1, 'string':1, 'nb': 1}.has_key
comparison_operators = { 'eq': '=', 'ne': '<>',
'lt': '<', 'le': '<=', 'lte': '<=',
'gt': '>', 'ge': '>=', 'gte': '>=' }
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
'''Inserting values with the 'sqlvar' tag
The 'sqlvar' tag is used to type-safely insert values into SQL
text. The 'sqlvar' tag is similar to the 'var' tag, except that
it replaces text formatting parameters with SQL type information.
The sqlvar tag has the following attributes:
name -- The name of the variable to insert. As with other
DTML tags, the 'name=' prefix may be, and usually is,
ommitted.
type -- The data type of the value to be inserted. This
attribute is required and may be one of 'string',
'int', 'float', or 'nb'. The 'nb' data type indicates a
string that must have a length that is greater than 0.
optional -- A flag indicating that a value is optional. If a
value is optional and is not provided (or is blank
when a non-blank value is expected), then the string
'null' is inserted.
For example, given the tag::
<dtml-sqlvar x type=nb optional>
if the value of 'x' is::
Let\'s do it
then the text inserted is:
'Let''s do it'
however, if x is ommitted or an empty string, then the value
inserted is 'null'.
'''
__rcs_id__='$Id$'
############################################################################
# Copyright
#
# Copyright 1996 Digital Creations, L.C., 910 Princess Anne
# Street, Suite 300, Fredericksburg, Virginia 22401 U.S.A. All
# rights reserved.
#
############################################################################
__version__='$Revision: 1.15 $'[11:-2]
from DocumentTemplate.DT_Util import ParseError, parse_params, name_param
from string import find, split, join, atoi, atof
StringType = str
str = __builtins__['str']
class SQLVar:
name='sqlvar'
def __init__(self, args):
args = parse_params(args, name='', expr='', type=None, optional=1)
name,expr=name_param(args,'sqlvar',1)
if expr is None: expr=name
else: expr=expr.eval
self.__name__, self.expr = name, expr
self.args=args
if not args.has_key('type'):
raise ParseError('the type attribute is required', 'dtvar')
t=args['type']
if not valid_type(t):
raise ParseError('invalid type, %s' % t, 'dtvar')
def render(self, md):
name=self.__name__
args=self.args
t=args['type']
try:
expr=self.expr
if type(expr) is type(''): v=md[expr]
else: v=expr(md)
except:
if args.has_key('optional') and args['optional']:
return 'null'
if type(expr) is not type(''):
raise
raise ValueError, 'Missing input variable, <em>%s</em>' % name
if v is None:
return 'null'
if t=='int':
try:
if type(v) is StringType:
if v[-1:]=='L':
v=v[:-1]
atoi(v)
else: v=str(int(v))
except:
if not v and args.has_key('optional') and args['optional']:
return 'null'
raise ValueError, (
'Invalid integer value for <em>%s</em>' % name)
elif t=='float':
try:
if type(v) is StringType:
if v[-1:]=='L':
v=v[:-1]
atof(v)
else: v=str(float(v))
except:
if not v and args.has_key('optional') and args['optional']:
return 'null'
raise ValueError, (
'Invalid floating-point value for <em>%s</em>' % name)
else:
if not isinstance(v, (str, unicode)):
v=str(v)
if not v and t=='nb':
if args.has_key('optional') and args['optional']:
return 'null'
else:
raise ValueError, (
'Invalid empty string value for <em>%s</em>' % name)
v=md.getitem('sql_quote__',0)(v)
#if find(v,"\'") >= 0: v=join(split(v,"\'"),"''")
#v="'%s'" % v
return v
__call__=render
valid_type={'int':1, 'float':1, 'string':1, 'nb': 1}.has_key
from unittest import TestCase, TestSuite, makeSuite, main
from cStringIO import StringIO
from ExtensionClass import Base
from Shared.DC.ZRDB.Results import Results
from Shared.DC.ZRDB import RDB
class Brain:
def __init__(self, *args): pass
Parent = Base()
class TestResults(TestCase):
def test_results(self):
r = Results(([{'name':'foo', 'type':'integer'},
{'name':'bar', 'type':'integer'}],
((1, 2), (3, 4))),
brains=Brain,
parent=Parent)
self.assertEquals(len(r), 2)
row = r[0]
self.assertEquals(row[0], 1)
self.assertEquals(row[1], 2)
self.assertEquals(row.foo, 1)
self.assertEquals(row.bar, 2)
self.assertEquals(row.FOO, 1)
self.assertEquals(row.BAR, 2)
row = r[1]
self.assertEquals(row[0], 3)
self.assertEquals(row[1], 4)
self.assertEquals(row.foo, 3)
self.assertEquals(row.bar, 4)
self.assertEquals(row.FOO, 3)
self.assertEquals(row.BAR, 4)
self.failUnless(isinstance(row, Brain))
def test_rdb_file(self):
infile = StringIO("""\
foo\tbar
2i\t2i
1\t2
3\t4\
""")
r = RDB.File(infile,
brains=Brain,
parent=Parent)
self.assertEquals(len(r), 2)
row = r[0]
self.assertEquals(row[0], 1)
self.assertEquals(row[1], 2)
self.assertEquals(row.foo, 1)
self.assertEquals(row.bar, 2)
self.assertEquals(row.FOO, 1)
self.assertEquals(row.BAR, 2)
row = r[1]
self.assertEquals(row[0], 3)
self.assertEquals(row[1], 4)
self.assertEquals(row.foo, 3)
self.assertEquals(row.bar, 4)
self.assertEquals(row.FOO, 3)
self.assertEquals(row.BAR, 4)
self.failUnless(isinstance(row, Brain))
def test_suite():
return TestSuite((makeSuite(TestResults),))
if __name__ == '__main__':
main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from unittest import TestCase, TestSuite, makeSuite
from Shared.DC.ZRDB.TM import TM
class TestTM(TestCase):
def test_sortKey(self):
tm = TM()
# the default Transaction Manager should have .sortKey() of 1 for
# backward compatibility
self.assertEquals(tm.sortKey(), 1)
# but the sortKey() should be adjustable
tm.setSortKey(())
self.assertEquals(tm.sortKey(), ())
def test_suite():
return TestSuite((makeSuite(TestTM),))
from pprint import pprint
from time import time
from unittest import TestCase,TestSuite,makeSuite
class DummyDB:
conn_num = 1
result = None
def query(self,query,max_rows):
if self.result:
return self.result
return 'result for ' + query
def hook_method(self):
conn_to_use = 'conn'+str(self.conn_num)
self.conn_num += 1
return conn_to_use
class DummyTime:
def __init__(self,t):
self.t = float(t)
def __call__(self):
return self.t
class TestCaching(TestCase):
echo = False
def setUp(self):
from Shared.DC.ZRDB import DA
self.DA = DA
self.da = DA.DA('da','title','conn_id','arg1 arg2','some sql')
# set the da's caching parameters
self.da.cache_time_ = 10
self.da.max_cache_ = 2
def _do_query(self,query,t):
try:
self.DA.time = DummyTime(t)
result = self.da._cached_result(DummyDB(),query,1,'conn_id')
finally:
self.DA.time = time
self.assertEqual(result,'result for '+query)
def _check_mapping(self,expected,actual):
missing = []
extra = []
different = []
for key,value in expected.items():
try:
ai = actual[key]
except KeyError:
missing.append(key)
else:
if ai!=value:
different.append('%r: %r != %r' % (key,value,ai))
for key in actual.keys():
try:
expected[key]
except KeyError:
extra.append(key)
result = []
if different:
different.sort()
result.append("Mismatching, key: (expected != actual):")
for r in different:
result.append(r)
if missing:
missing.sort()
result.append("The following keys were missing from actual:")
for r in missing:
result.append(repr(r))
if extra:
extra.sort()
result.append("The following extra keys were found in actual:")
for r in extra:
result.append(repr(r))
return result
def _check_cache(self,cache,tcache):
if self.echo:
print "cache:"
pprint(self.da._v_cache[0])
print "tcache:"
pprint(self.da._v_cache[1])
print
result = []
r = self._check_mapping(cache,self.da._v_cache[0])
if r:
result.append("cache didn't match expected:")
result.extend(r)
r = self._check_mapping(tcache,self.da._v_cache[1])
if r:
result.append("tcache didn't match expected:")
result.extend(r)
if result:
self.fail('\n\n'+'\n'.join(result))
def test_bad_aquisition(self):
# checks that something called _v_cache isn't acquired from anywhere
from ExtensionClass import Base
class Dummy(Base):
_v_cache = 'muhahaha'
obj = Dummy()
self.da = self.da.__of__(obj)
del self.da._v_cache
self._do_query('query',1)
def test_same_query_different_seconds(self):
# this tests a sequence of requests for the same
# query, but where the item returned is always in the cache
self._check_cache({},{})
for t in range(1,6):
self._do_query('query',t)
self._check_cache(
{('query',1,'conn_id'): (1,'result for query')},
{1: ('query',1,'conn_id')}
)
def test_same_query_same_second(self):
# this tests a sequence set of requests for the same
# query, but where the item returned is always in the cache
# and where the queries all occur in the same second
self._check_cache({},{})
for t in range(11,16,1):
t = float(t)/10
self._do_query('query',t)
self._check_cache(
{('query',1,'conn_id'): (1.1,'result for query')},
{1.1: ('query',1,'conn_id')}
)
def test_different_queries_different_second(self):
# This tests different queries being fired into the cache
# in sufficient volume to excercise the purging code
self._check_cache({},{})
# one
self._do_query('query1',1.1)
self._check_cache(
{('query1',1,'conn_id'): (1.1,'result for query1')},
{1.1: ('query1',1,'conn_id')}
)
# two
self._do_query( 'query2',3.2)
self._check_cache(
{('query1',1,'conn_id'): (1.1,'result for query1'),
('query2',1,'conn_id'): (3.2,'result for query2'),},
{1.1: ('query1',1,'conn_id'),
3.2: ('query2',1,'conn_id'),}
)
# three - now we drop our first cache entry
self._do_query('query3',4.3)
self._check_cache(
{('query2',1,'conn_id'): (3.2,'result for query2'),
('query3',1,'conn_id'): (4.3,'result for query3'),},
{3.2: ('query2',1,'conn_id'),
4.3: ('query3',1,'conn_id'),}
)
# four - now we drop our second cache entry
self._do_query('query4',8.4)
self._check_cache(
{('query3',1,'conn_id'): (4.3,'result for query3'),
('query4',1,'conn_id'): (8.4,'result for query4'),},
{4.3: ('query3',1,'conn_id'),
8.4: ('query4',1,'conn_id'),}
)
def test_different_queries_same_second(self):
# This tests different queries being fired into the cache
# in the same second in sufficient quantities to exercise
# the purging code
self._check_cache({},{})
# one
self._do_query('query1',1.0)
self._check_cache(
{('query1',1,'conn_id'): (1.0,'result for query1')},
{1.0: ('query1',1,'conn_id')}
)
# two
self._do_query( 'query2',1.1)
self._check_cache(
{('query1',1,'conn_id'): (1.0,'result for query1'),
('query2',1,'conn_id'): (1.1,'result for query2'),},
{1.0: ('query1',1,'conn_id'),
1.1: ('query2',1,'conn_id'),}
)
# three - now we drop our first cache entry
self._do_query('query3',1.2)
self._check_cache(
{('query2',1,'conn_id'): (1.1,'result for query2'),
('query3',1,'conn_id'): (1.2,'result for query3'),},
{1.1: ('query2',1,'conn_id'),
1.2: ('query3',1,'conn_id'),}
)
# four - now we drop another cache entry
self._do_query('query4',1.3)
self._check_cache(
{('query3',1,'conn_id'): (1.2,'result for query3'),
('query4',1,'conn_id'): (1.3,'result for query4'),},
{1.2: ('query3',1,'conn_id'),
1.3: ('query4',1,'conn_id'),}
)
def test_time_tcache_expires(self):
# This tests that once the cache purging code is triggered,
# it will actively hoover out all expired cache entries
# the first query gets cached
self._do_query('query1',1)
self._check_cache(
{('query1',1,'conn_id'): (1,'result for query1')},
{1: ('query1',1,'conn_id')}
)
# the 2nd gets cached, the cache is still smaller than max_cache
self._do_query('query2',2)
self._check_cache(
{('query1',1,'conn_id'): (1,'result for query1'),
('query2',1,'conn_id'): (2,'result for query2')},
{1: ('query1',1,'conn_id'),
2:('query2',1,'conn_id')}
)
# the 3rd trips the max_cache trigger, so both our old queries get
# dumped because they are past their expiration time
self._do_query('query',23)
self._check_cache(
{('query',1,'conn_id'): (23,'result for query')},
{23:('query',1,'conn_id')}
)
def test_time_refreshed_cache(self):
# This tests that when a cached query is expired when it comes
# to check for a cached entry for that query, the stale entry is
# removed and replaced with a fresh entry.
# the first query gets cached
self._do_query('query1',1)
self._check_cache(
{('query1',1,'conn_id'): (1,'result for query1')},
{1: ('query1',1,'conn_id')}
)
# do the same query much later, so new one gets cached
self._do_query('query1',12)
self._check_cache(
{('query1',1,'conn_id'): (12,'result for query1')},
{12: ('query1',1,'conn_id')}
)
class DummyDA:
def __call__(self):
conn = DummyDB()
conn.result = ((),())
return conn
sql_quote__ = "I don't know what this is."
class Hook:
conn_num = 1
def __call__(self):
conn_to_use = 'conn'+str(self.conn_num)
self.conn_num += 1
return conn_to_use
class TestCacheKeys(TestCase):
# These tests check that the keys used for caching are unique
# in the right ways.
def _cached_result(self,DB__,query,row_count,conn_id):
self.cache_key = query,row_count,conn_id
# we return something that can be safely turned into an empty Result
return ((),())
def setUp(self):
from Shared.DC.ZRDB.DA import DA
self.da = DA('da','title','conn_id','arg1 arg2','some sql')
self.da._cached_result = self._cached_result
self.da.conn_id = DummyDA()
# These need to be set so DA.__call__ tries for a cached result
self.da.cache_time_ = 1
self.da.max_cache_ = 1
def test_default(self):
self.da()
self.assertEqual(self.cache_key,('some sql',1000,'conn_id'))
def test_different_max_rows(self):
self.da.max_rows_ = 123
self.da()
self.assertEqual(self.cache_key,('some sql',123,'conn_id'))
def test_connection_hook(self):
self.da.connection_hook = 'hook_method'
self.da.hook_method = Hook()
self.da.conn1 = DummyDA()
self.da()
self.assertEqual(self.cache_key,('some sql',1000,'conn1'))
self.da.conn2 = DummyDA()
self.da()
self.assertEqual(self.cache_key,('some sql',1000,'conn2'))
class TestFullChain(TestCase):
# This exercises both DA.__call__ and DA._cached_result.
def setUp(self):
from Shared.DC.ZRDB.DA import DA
self.da = DA('da','title','conn_id','arg1 arg2','some sql')
self.da.conn_id = DummyDA()
def test_args_match(self):
# This checks is that DA._cached_result's call signature
# matches that expected by DA.__call__
# These need to be set so DA.__call__ tries for a cached result
self.da.cache_time_ = 1
self.da.max_cache_ = 1
# the actual test, will throw exceptions if things aren't right
self.da()
def test_cached_result_not_called_for_no_caching(self):
# blow up the _cached_result method on our
# test instance
self.da._cached_result = None
# check we never get there with the default "no cachine"
self.da()
# turn caching on
self.da.cache_time_ = 1
self.da.max_cache_ = 1
# check that we get an exception
self.assertRaises(TypeError,self.da)
def test_suite():
suite = TestSuite()
suite.addTest(makeSuite(TestCaching))
suite.addTest(makeSuite(TestCacheKeys))
suite.addTest(makeSuite(TestFullChain))
return suite
""" Unit tests for Products.RHGDelivery.simpleresults
$Id: test_results.py,v 1.2 2005/09/07 21:25:47 tseaver Exp $
"""
import unittest
from ExtensionClass import Base
from Acquisition import aq_parent
class Brain:
def __init__(self, *args): pass
Parent = Base()
class TestResults(unittest.TestCase):
# test fixtures
columns = [ {'name' : 'string', 'type' : 't', 'width':1},
{'name':'int', 'type': 'i'} ]
data = [['string1', 1], ['string2', 2]]
def _getTargetClass(self):
from Shared.DC.ZRDB.Results import Results
return Results
def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)
def test_searchable_result_columns(self):
ob = self._makeOne((self.columns, self.data))
self.assertEqual(ob._searchable_result_columns(), self.columns)
def test_names(self):
ob = self._makeOne((self.columns, self.data))
self.assertEqual(ob.names(), ['string', 'int'])
def test_data_dictionary(self):
ob = self._makeOne((self.columns, self.data))
self.assertEqual(
ob.data_dictionary(),
{ 'string':{'name' : 'string', 'type' : 't', 'width':1},
'int':{'name':'int', 'type': 'i'} }
)
def test_len(self):
ob = self._makeOne((self.columns, self.data))
self.assertEqual(len(ob), 2)
def test_getitem(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assertEquals(row[0], 'string1')
self.assertEquals(row[1], 1)
row = ob[1]
self.assertEquals(row[0], 'string2')
self.assertEquals(row[1], 2)
def test_getattr_and_aliases(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assertEqual(row.string, 'string1')
self.assertEqual(row.int, 1)
self.assertEqual(row.STRING, 'string1')
self.assertEqual(row.INT, 1)
row = ob[1]
self.assertEqual(row.string, 'string2')
self.assertEqual(row.int, 2)
self.assertEqual(row.STRING, 'string2')
self.assertEqual(row.INT, 2)
def test_suppliedbrain(self):
ob = self._makeOne((self.columns, self.data), brains=Brain)
row = ob[0]
self.failUnless(isinstance(row, Brain))
def test_suppliedparent(self):
ob = self._makeOne((self.columns, self.data), parent=Parent)
row = ob[0]
self.failUnless(aq_parent(row) is Parent)
def test_tuples(self):
ob = self._makeOne((self.columns, self.data))
tuples = ob.tuples()
self.assertEqual( tuples, [('string1', 1), ('string2', 2)] )
def test_dictionaries(self):
ob = self._makeOne((self.columns, self.data))
dicts = ob.dictionaries()
self.assertEqual( dicts, [{'string':'string1', 'int':1},
{'string':'string2', 'int':2}] )
def test_asRDB(self):
ob = self._makeOne((self.columns, self.data))
asrdb = ob.asRDB()
columns = ['string\tint', '1t\ti', 'string1\t1', 'string2\t2\n']
self.assertEqual(asrdb, '\n'.join(columns))
def _set_noschema(self, row):
row.cantdoit = 1
def test_recordschema(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assertEqual(row.__record_schema__, {'string':0, 'int':1})
self.assertRaises(AttributeError, self._set_noschema, row)
def test_record_as_read_mapping(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assertEqual('%(string)s %(int)s' % row, 'string1 1')
row = ob[1]
self.assertEqual('%(string)s %(int)s' % row, 'string2 2')
def test_record_as_write_mapping(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
row['int'] = 5
self.assertEqual('%(string)s %(int)s' % row, 'string1 5')
def test_record_as_write_mapping2(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
row.int = 5
self.assertEqual('%(string)s %(int)s' % row, 'string1 5')
def test_record_as_sequence(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assertEqual(row[0], 'string1')
self.assertEqual(row[1], 1)
self.assertEqual(list(row), ['string1', 1])
row = ob[1]
self.assertEqual(row[0], 'string2')
self.assertEqual(row[1], 2)
self.assertEqual(list(row), ['string2', 2])
def test_record_of(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
wrapped = row.__of__(Parent)
self.assertEqual(wrapped.aq_self, row)
self.assertEqual(wrapped.aq_parent, Parent)
def test_record_hash(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assert_(isinstance(hash(row), int))
def test_record_len(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assertEqual(len(row), 2)
def _add(self, row1, row2):
return row1 + row2
def test_record_add(self):
ob = self._makeOne((self.columns, self.data))
row1 = ob[0]
row2 = ob[1]
self.assertRaises(TypeError, self._add, row1, row2)
def _slice(self, row):
return row[1:]
def test_record_slice(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assertRaises(TypeError, self._slice, row)
def _mul(self, row):
return row * 3
def test_record_mul(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assertRaises(TypeError, self._mul, row)
def _del(self, row):
del row[0]
def test_record_delitem(self):
ob = self._makeOne((self.columns, self.data))
row = ob[0]
self.assertRaises(TypeError, self._del, row)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestResults))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2005 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import unittest
from UserDict import UserDict
def _sql_quote(v):
return '"%s"' % v
class SQLGroupTests(unittest.TestCase):
def _getTargetClass(self):
from Shared.DC.ZRDB.sqlgroup import SQLGroup
return SQLGroup
def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)
def test_ctor_empty_args(self):
group = self._makeOne([('sqlgroup', '', None)])
self.assertEqual(group.__name__, 'sqlgroup ')
self.failIf(group.required)
self.failIf(group.where)
self.failIf(group.set)
self.failIf(group.noparens)
def test_ctor_required(self):
group = self._makeOne([('sqlgroup', 'required', None)])
self.assertEqual(group.__name__, 'sqlgroup required')
self.failUnless(group.required)
self.failIf(group.where)
self.failIf(group.set)
self.failIf(group.noparens)
def test_ctor_where(self):
group = self._makeOne([('sqlgroup', 'where', None)])
self.assertEqual(group.__name__, 'sqlgroup where')
self.failIf(group.required)
self.failUnless(group.where)
self.failIf(group.set)
self.failIf(group.noparens)
def test_ctor_noparens(self):
group = self._makeOne([('sqlgroup', 'noparens', None)])
self.assertEqual(group.__name__, 'sqlgroup noparens')
self.failIf(group.required)
self.failIf(group.where)
self.failIf(group.set)
self.failUnless(group.noparens)
def test_ctor_set(self):
group = self._makeOne([('sqlgroup', 'set', None)])
self.assertEqual(group.__name__, 'sqlgroup set')
self.failIf(group.required)
self.failIf(group.where)
self.failUnless(group.set)
self.failIf(group.noparens)
def test_render_empty_optional(self):
group = self._makeOne([('sqlgroup', '', lambda x, y:'')])
md = {}
self.assertEqual(group.render(md), '')
def test_render_empty_optional_where(self):
group = self._makeOne([('sqlgroup', 'where', lambda x, y:'')])
md = {}
self.assertEqual(group.render(md), '')
def test_render_empty_optional_set(self):
group = self._makeOne([('sqlgroup', 'set', lambda x, y:'')])
md = {}
self.assertEqual(group.render(md), '')
def test_render_empty_required_raises_ValueError(self):
group = self._makeOne([('sqlgroup', 'required', lambda x, y:'')])
md = {}
self.assertRaises(ValueError, group.render, md)
def test_render_one_block(self):
group = self._makeOne([('sqlgroup', '', lambda x, y:'abc'),
])
md = {}
rendered = group.render(md)
rendered = ''.join(rendered.split('\n'))
self.assertEqual(rendered, 'abc')
def test_render_one_block_where(self):
group = self._makeOne([('sqlgroup', 'where', lambda x, y:'abc'),
])
md = {}
rendered = group.render(md)
self.assertEqual(rendered, 'where\nabc\n')
def test_render_one_block_set(self):
group = self._makeOne([('sqlgroup', 'set', lambda x, y:'abc'),
])
md = {}
rendered = group.render(md)
self.assertEqual(rendered, 'set\nabc\n')
def test_render_multiple_blocks_with_tname(self):
group = self._makeOne([('sqlgroup', '', lambda x, y:'abc'),
('baz', '', lambda x, y: 'def'),
('qux', '', lambda x, y: 'ghi'),
])
md = {}
rendered = group.render(md)
rendered = ''.join(rendered.split('\n'))
self.assertEqual(rendered, '(abc baz def qux ghi)')
def test_render_multiple_blocks_with_tname_noparens(self):
group = self._makeOne([('sqlgroup', 'noparens', lambda x, y:'abc'),
('baz', '', lambda x, y: 'def'),
('qux', '', lambda x, y: 'ghi'),
])
md = {}
rendered = group.render(md)
rendered = ''.join(rendered.split('\n'))
self.assertEqual(rendered, 'abc baz def qux ghi')
def test_render_multiple_blocks_with_tname_and_where(self):
group = self._makeOne([('sqlgroup', 'where', lambda x, y:'abc'),
('baz', '', lambda x, y: 'def'),
('qux', '', lambda x, y: 'ghi'),
])
md = {}
rendered = group.render(md)
rendered = ''.join(rendered.split('\n'))
self.assertEqual(rendered, 'where(abc baz def qux ghi)')
def test_parsed_rendered_complex_where(self):
# something of a functional test, as we use nvSQL to get parsed.
from Shared.DC.ZRDB.DA import nvSQL
template = nvSQL(WHERE_EXAMPLE)
mapping = {}
mapping['name'] = 'Goofy'
mapping['home_town'] = 'Orlando'
mapping['sql_quote__'] = _sql_quote
rendered = template(None, mapping)
self.assertEqual(rendered,
'select * from actors\n'
'where\n'
'((nick_name = "Goofy"\n'
' or first_name = "Goofy"\n)\n'
' and home_town = "Orlando"\n)\n'
)
def test_parsed_rendered_complex_set(self):
# something of a functional test, as we use nvSQL to get parsed.
from Shared.DC.ZRDB.DA import nvSQL
template = nvSQL(UPDATE_EXAMPLE)
mapping = {}
mapping['nick_name'] = 'Goofy'
mapping['home_town'] = 'Orlando'
mapping['sql_quote__'] = _sql_quote
rendered = template(None, mapping)
self.assertEqual(rendered,
'update actors\n'
'set\nnick_name = "Goofy" , home_town = "Orlando"\n'
)
WHERE_EXAMPLE = """\
select * from actors
<dtml-sqlgroup where required>
<dtml-sqlgroup>
<dtml-sqltest name column=nick_name type=nb multiple optional>
<dtml-or>
<dtml-sqltest name column=first_name type=nb multiple optional>
</dtml-sqlgroup>
<dtml-and>
<dtml-sqltest home_town type=nb optional>
<dtml-and>
<dtml-if minimum_age>
age >= <dtml-sqlvar minimum_age type=int>
</dtml-if>
<dtml-and>
<dtml-if maximum_age>
age <= <dtml-sqlvar maximum_age type=int>
</dtml-if>
</dtml-sqlgroup>
"""
UPDATE_EXAMPLE = """\
update actors
<dtml-sqlgroup set noparens>
<dtml-sqltest nick_name type=nb optional>
<dtml-comma>
<dtml-sqltest home_town type=nb optional>
</dtml-sqlgroup>
"""
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(SQLGroupTests))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2005 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import unittest
from UserDict import UserDict
def _sql_quote(v):
return '"%s"' % v
class FauxMultiDict(UserDict):
def getitem(self, key, call):
if key == 'sql_quote__':
return _sql_quote
v = self[key]
if v is not None:
if call and callable(v):
v = v()
return v
class SQLVarTests(unittest.TestCase):
def _getTargetClass(self):
from Shared.DC.ZRDB.sqlvar import SQLVar
return SQLVar
def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)
def test_constructor_no_type(self):
from DocumentTemplate.DT_Util import ParseError
self.assertRaises(ParseError, self._makeOne, 'foo')
def test_constructor_invalid_type(self):
from DocumentTemplate.DT_Util import ParseError
self.assertRaises(ParseError, self._makeOne, 'foo type="nonesuch"')
def test_constructor_valid_type(self):
from DocumentTemplate.DT_Util import ParseError
v = self._makeOne('foo type="string"')
self.assertEqual(v.__name__, 'foo')
self.assertEqual(v.expr, 'foo')
self.assertEqual(v.args['type'], 'string')
def test_render_name_returns_value(self):
v = self._makeOne('foo type="string"')
self.assertEqual(v.render(FauxMultiDict(foo='FOO')), '"FOO"')
def test_render_name_missing_required_raises_ValueError(self):
v = self._makeOne('foo type="string"')
self.assertRaises(ValueError, v.render, FauxMultiDict())
def test_render_name_missing_optional_returns_null(self):
v = self._makeOne('foo type="string" optional')
self.assertEqual(v.render(FauxMultiDict()), 'null')
def test_render_expr_returns_value(self):
v = self._makeOne('expr="foo" type="string"')
self.assertEqual(v.render(FauxMultiDict(foo='FOO')), '"FOO"')
def test_render_expr_missing_required_raises_NameError(self):
v = self._makeOne('expr="foo" type="string"')
self.assertRaises(NameError, v.render, FauxMultiDict())
def test_render_expr_missing_optional_returns_null(self):
v = self._makeOne('expr="foo" type="string" optional')
self.assertEqual(v.render(FauxMultiDict()), 'null')
def test_render_int_returns_int_without_quoting(self):
v = self._makeOne('expr="foo" type="int"')
self.assertEqual(v.render(FauxMultiDict(foo=42)), '42')
def test_render_int_with_long_returns_value_without_L(self):
v = self._makeOne('expr="foo" type="int"')
self.assertEqual(v.render(FauxMultiDict(foo='42L')), '42')
def test_render_int_required_invalid_raises_ValueError(self):
v = self._makeOne('expr="foo" type="int"')
self.assertRaises(ValueError, v.render, FauxMultiDict(foo=''))
def test_render_int_optional_invalid_returns_null(self):
v = self._makeOne('expr="foo" type="int" optional')
self.assertEqual(v.render(FauxMultiDict(foo='')), 'null')
def test_render_float_returns_float_without_quoting(self):
v = self._makeOne('expr="foo" type="float"')
self.assertEqual(v.render(FauxMultiDict(foo=3.1415)), '3.1415')
def test_render_float_with_long_returns_value_without_L(self):
v = self._makeOne('expr="foo" type="float"')
self.assertEqual(v.render(FauxMultiDict(foo='42L')), '42')
def test_render_float_required_invalid_raises_ValueError(self):
v = self._makeOne('expr="foo" type="float"')
self.assertRaises(ValueError, v.render, FauxMultiDict(foo=''))
def test_render_float_optional_invalid_returns_null(self):
v = self._makeOne('expr="foo" type="float" optional')
self.assertEqual(v.render(FauxMultiDict(foo='')), 'null')
def test_render_nb_required_with_blank_raises_ValueError(self):
v = self._makeOne('expr="foo" type="nb"')
self.assertRaises(ValueError, v.render, FauxMultiDict(foo=''))
def test_render_nb_optional_with_blank_returns_null(self):
v = self._makeOne('expr="foo" type="nb" optional')
self.assertEqual(v.render(FauxMultiDict(foo='')), 'null')
def test_render_name_with_none_returns_null(self):
# Collector #556, patch from Dieter Maurer
v = self._makeOne('foo type="string"')
self.assertEqual(v.render(FauxMultiDict(foo=None)), 'null')
def test_render_expr_with_none_returns_null(self):
# Collector #556, patch from Dieter Maurer
v = self._makeOne('expr="foo" type="string"')
self.assertEqual(v.render(FauxMultiDict(foo=None)), 'null')
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(SQLVarTests))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
...@@ -16,6 +16,7 @@ MultiMapping = 2.13.0 ...@@ -16,6 +16,7 @@ MultiMapping = 2.13.0
nt-svcutils = 2.13.0 nt-svcutils = 2.13.0
Persistence = 2.13.2 Persistence = 2.13.2
Products.ZCTextIndex = 2.13.0 Products.ZCTextIndex = 2.13.0
Products.ZSQLMethods =
Record = 2.13.0 Record = 2.13.0
tempstorage = 2.11.3 tempstorage = 2.11.3
zExceptions = 2.13.0 zExceptions = 2.13.0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment