diff --git a/product/ERP5Catalog/tests/testERP5Catalog.py b/product/ERP5Catalog/tests/testERP5Catalog.py index b660b21b9466819c6e086cb348ea46dfa7ed3d13..8a4946d9846398d50b72991fe75a6c726237ab94 100755 --- a/product/ERP5Catalog/tests/testERP5Catalog.py +++ b/product/ERP5Catalog/tests/testERP5Catalog.py @@ -26,14 +26,6 @@ # ############################################################################## - - -# -# Skeleton ZopeTestCase -# - -from random import randint - import os, sys if __name__ == '__main__': execfile(os.path.join(sys.path[0], 'framework.py')) @@ -44,13 +36,8 @@ os.environ['EVENT_LOG_SEVERITY'] = '-300' from Testing import ZopeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase -from AccessControl.SecurityManagement import newSecurityManager, noSecurityManager -from Acquisition import aq_base, aq_inner +from AccessControl.SecurityManagement import newSecurityManager from zLOG import LOG -from Products.ERP5Type.DateUtils import addToDate -import time -import os -from Products.ERP5Type import product_path from DateTime import DateTime try: @@ -496,7 +483,7 @@ class TestERP5Catalog(ERP5TypeTestCase): self.failIf(uid in uid_dict) uid_dict[uid] = None - def test_17_CreationDate_ModificationDate(self, quiet=0, run=1):#run_all_test): + def test_17_CreationDate_ModificationDate(self, quiet=0, run=run_all_test): if not run: return if not quiet: message = 'getCreationDate, getModificationDate' @@ -534,3 +521,106 @@ class TestERP5Catalog(ERP5TypeTestCase): result[0]['modification_date'].ISO()) self.assertEquals(now, result[0]['modification_date'].ISO()) + def test_18_buildSQLQuery(self, quiet=0, run=1) :#run_all_test): + """Tests that buildSQLQuery works with another query_table than 'catalog'""" + if not run: return + if not quiet: + message = 'buildSQLQuery with query_table' + ZopeTestCase._print('\n%s ' % message) + LOG('Testing... ',0,message) + portal = self.getPortal() + portal_catalog = self.getCatalogTool() + # clear catalog + portal_catalog.manage_catalogClear() + get_transaction().commit() + + # create some content to use destination_section_title as related key + # FIXME: create the related key here ? + module = portal.getDefaultModule('Organisation') + source_organisation = module.newContent( portal_type='Organisation', + title = 'source_organisation') + destination_organisation = module.newContent( portal_type='Organisation', + title = 'destination_organisation') + source_organisation.setDestinationSectionValue(destination_organisation) + source_organisation.recursiveReindexObject() + destination_organisation.recursiveReindexObject() + self.tic() + + # buildSQLQuery can use arbitrary table name. + query_table = "node" + sql_squeleton = """ + SELECT %(query_table)s.uid, + %(query_table)s.id + FROM catalog as %(query_table)s + <dtml-in prefix="table" expr="from_table_list"> + , <dtml-var table_item> AS <dtml-var table_key> + </dtml-in> + <dtml-if where_expression> + WHERE + <dtml-var where_expression> + </dtml-if> + <dtml-if order_by_expression> + ORDER BY <dtml-var order_by_expression> + </dtml-if> + """ % {'query_table' : query_table} + + portal_skins_custom = portal.portal_skins.custom + portal_skins_custom.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'testMethod', + title = '', + connection_id = 'erp5_sql_connection', + arguments = "\n".join([ 'from_table_list', + 'where_expression', + 'order_by_expression' ]), + template = sql_squeleton) + testMethod = portal_skins_custom['testMethod'] + + default_parametrs = {} + default_parametrs['portal_type'] = 'Organisation' + default_parametrs['from_table_list'] = {} + default_parametrs['where_expression'] = "" + default_parametrs['order_by_expression'] = None + + # check that we retrieve our 2 organisations by default. + kw = default_parametrs.copy() + kw.update( portal_catalog.buildSQLQuery( + query_table = query_table, + **kw) ) + self.assertEquals(len(testMethod(**kw)), 2) + + # check we can make a simple filter on title. + kw = default_parametrs.copy() + kw.update( portal_catalog.buildSQLQuery( + query_table = query_table, + title = 'source_organisation', + **kw) ) + self.assertEquals( len(testMethod(**kw)), 1, + testMethod(src__=1, **kw) ) + self.assertEquals( testMethod(**kw)[0]['uid'], + source_organisation.getUid(), + testMethod(src__=1, **kw) ) + + # check sort + kw = default_parametrs.copy() + kw.update(portal_catalog.buildSQLQuery( + query_table = query_table, + sort_on = [('id', 'ascending')], + **kw)) + brains = testMethod(**kw) + self.assertEquals( len(brains), 2, + testMethod(src__=1, **kw)) + self.failIf( brains[0]['id'] > brains[1]['id'], + testMethod(src__=1, **kw) ) + + # check related keys works + kw = default_parametrs.copy() + kw.update(portal_catalog.buildSQLQuery( + query_table = query_table, + destination_section_title = 'organisation_destination'), + **kw) + brains = testMethod(**kw) + self.assertEquals( len(brains), 1, testMethod(src__=1, **kw) ) + self.assertEquals( brains[0]['uid'], + source_organisation.getUid(), + testMethod(src__=1, **kw) ) +