Commit d307b9cb authored by Kazuhiko Shiozaki's avatar Kazuhiko Shiozaki

ERP5Catalog: implment getDocumentValueList() in CatalogTool.

parent 6e58cc8f
"""
This script is part of ERP5 Base
The default implementation searches for
documents which are in the user language if any
with same reference.
"""
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
valid_portal_type_list = ('Notification Message',)
# Find the applicable language
if language in (None, ''):
language = portal.Localizer.get_selected_language()
# Find the default language
default_language = portal.Localizer.get_default_language() or 'en'
if validation_state is None:
validation_state = ('validated',)
# Search the catalog for all documents matching the reference
# this will only return documents which are accessible by the user
notification_message_list = portal_catalog(reference=reference,
portal_type=valid_portal_type_list,
validation_state=validation_state,
language=language,
sort_on=[('version', 'descending')],
group_by=('reference',),
**kw)
if len(notification_message_list) == 0 and language != default_language:
# Search again with English as a fallback.
notification_message_list = portal_catalog(reference=reference,
portal_type=valid_portal_type_list,
validation_state=validation_state,
language=default_language,
sort_on=[('version', 'descending')],
group_by=('reference',),
**kw)
if len(notification_message_list) == 0:
# Search again without the language
notification_message_list = portal_catalog(reference=reference,
portal_type=valid_portal_type_list,
validation_state=validation_state,
sort_on=[('version', 'descending')],
group_by=('reference',),
**kw)
if len(notification_message_list) == 0:
# Default returns None
notification_message = None
else:
# Try to get the first page on the list
notification_message = notification_message_list[0]
notification_message = notification_message.getObject()
# return the Notification Message
return notification_message
notification_message_list = portal.portal_catalog.getDocumentValueList(
portal_type='Notification Message',
validation_state=validation_state or 'validated',
reference=reference,
language=language,
all_languages=True,
)
if notification_message_list:
return notification_message_list[0].getObject()
# This script returns a list of document values
return context.WebSection_getDocumentValueListBase(**kw)
# First find the Web Section or Web Site we belong to
search_context = context.getWebSectionValue()
if all_versions is None:
all_versions = search_context.getLayoutProperty('layout_all_versions', default=False)
if all_languages is None:
all_languages = search_context.getLayoutProperty('layout_all_languages', default=False)
return context.getPortalObject().portal_catalog.getDocumentValueList(
search_context=search_context,
all_versions=all_versions,
all_languages=all_languages,
**kw)
......@@ -50,7 +50,7 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>**kw</string> </value>
<value> <string>all_versions=None, all_languages=None, **kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
......
"""
This script is part of ERP5 Web
ERP5 Web is a business template of ERP5 which provides a way
to create web sites which can display selected
ERP5 contents through multiple custom web layouts.
This script returns a list of document values (ie. objects or brains)
which are considered as part of this section. It can be
a list of web pages (usual case), a list of products
......@@ -40,7 +34,7 @@
SUGGESTIONS:
- Prevent showing duplicate references
- Add documents associated to this section through 'aggregate'.
- Display only the latest version and the appropriate language.
......@@ -49,44 +43,43 @@ from Products.ZSQLCatalog.SQLCatalog import SimpleQuery, ComplexQuery
from zExceptions import Unauthorized
try:
portal = container.getPortalObject()
kw = portal.portal_catalog.getSQLCatalog().getCannonicalArgumentDict(kw)
# First find the Web Section or Web Site we belong to
current_section = context.getWebSectionValue()
if all_versions is None:
all_versions = context.getLayoutProperty('layout_all_versions', default=False)
if all_languages is None:
all_languages = context.getLayoutProperty('layout_all_languages', default=False)
portal = context.getPortalObject()
kw = context.getCannonicalArgumentDict(kw)
if search_context is None:
search_context = portal
# Build the list of parameters
if not language:
language = portal.Localizer.get_selected_language()
if validation_state is None:
if 'portal_type' not in kw:
kw['portal_type'] = portal.getPortalDocumentTypeList()
if 'validation_state' not in kw:
# XXX hardcoded validation state list.
# Use predicate or layout property instead
validation_state = ('released', 'released_alive', 'published',
'published_alive', 'shared', 'shared_alive',
'public', 'validated')
kw['validation_state'] = validation_state
kw['validation_state'] = ('released', 'released_alive', 'published',
'published_alive', 'shared', 'shared_alive',
'public', 'validated')
if 'order_by_list' not in kw:
# XXX Do not sort by default, as it increases query time
kw['order_by_list'] = [('int_index', 'DESC'), ('reference', 'DESC')]
if effective_date is None:
if 'effective_date' not in kw:
if now is None:
now = DateTime()
effective_date = ComplexQuery(
kw['effective_date'] = ComplexQuery(
SimpleQuery(effective_date=None),
SimpleQuery(effective_date=now, comparison_operator='<='),
logical_operator='or',
)
kw['effective_date'] = effective_date
if not all_versions:
if all_versions:
if not all_languages:
kw['language'] = language
return search_context.searchResults(src__=src__, **kw)
else:
group_by_list = set(kw.get('group_by_list', []))
if all_languages:
kw['group_by_list'] = list(group_by_list.union(('reference', 'language')))
......@@ -99,14 +92,11 @@ try:
kw.setdefault('select_dict', {}).update(
(x.replace('.', '_') + '__ext__', x)
for x in extra_column_set if not x.endswith('__score__'))
return current_section.WebSection_zGetDocumentValueList(language=language,
all_languages=all_languages,
src__=src__,
kw=kw)
else:
if not all_languages:
kw['language'] = language
return current_section.searchResults(src__=src__, **kw)
return context.SQLCatalog_zGetDocumentValueList(search_context=search_context,
language=language,
all_languages=all_languages,
src__=src__,
kw=kw)
except Unauthorized:
return []
......@@ -50,11 +50,11 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>language=None, validation_state=None, all_languages=None, all_versions=None, effective_date=None, now=None, src__=0, **kw</string> </value>
<value> <string>search_context=None, language=None, all_languages=None, all_versions=None, now=None, src__=0, **kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>WebSection_getDocumentValueListBase</string> </value>
<value> <string>SQLCatalog_getDocumentValueList</string> </value>
</item>
</dictionary>
</pickle>
......
<dtml-let query="buildSQLQuery(query=portal_catalog.getSecurityQuery(**kw), **kw)"
<dtml-let query="getattr(search_context, 'buildSQLQuery', portal_catalog.buildSQLQuery)(query=portal_catalog.getSecurityQuery(**kw), **kw)"
selection_domain="kw.get('selection_domain', None)"
selection_report="kw.get('selection_report', None)"
optimizer_switch_key_list="portal_catalog.getSQLCatalog().getOptimizerSwitchKeyList()">
optimizer_switch_key_list="getOptimizerSwitchKeyList()">
<dtml-comment>
Currently, there is no other choice to implement this method as an SQL catalog until SQLCatalog
......@@ -31,7 +31,7 @@
CONCAT(CASE my_versioning.language
WHEN <dtml-sqlvar language type="string"> THEN '4'
WHEN '' THEN '3'
WHEN 'en' THEN '2'
WHEN <dtml-sqlvar expr="Localizer.get_default_language() or 'en'" type="string"> THEN '2'
ELSE '1' END,
my_versioning.version) AS priority
<dtml-if "query['select_expression']">,<dtml-var "query['select_expression']"></dtml-if>
......
......@@ -22,7 +22,8 @@
</item>
<item>
<key> <string>arguments_src</string> </key>
<value> <string>language\r\n
<value> <string>search_context\r\n
language\r\n
all_languages\r\n
kw</string> </value>
</item>
......@@ -50,7 +51,7 @@ kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>WebSection_zGetDocumentValueList</string> </value>
<value> <string>SQLCatalog_zGetDocumentValueList</string> </value>
</item>
<item>
<key> <string>max_cache_</string> </key>
......
erp5_mysql_innodb/SQLCatalog_catalogTransformation
erp5_mysql_innodb/SQLCatalog_catalogTransformationList
erp5_mysql_innodb/SQLCatalog_getDocumentValueList
erp5_mysql_innodb/SQLCatalog_zGetDocumentValueList
erp5_mysql_innodb/SQLCatalog_makeFullTextQuery
erp5_mysql_innodb/SQLCatalog_makeQuickSearchQuery
erp5_mysql_innodb/SQLCatalog_makeSearchTextQuery
......
......@@ -360,7 +360,7 @@ Hé Hé Hé!""", page.asText().strip())
websection.getAggregateReferenceList())
# even though we create many pages we should get only one
# this is the most recent one since all share the same reference
self.assertEqual(1, len(websection.WebSection_getDocumentValueList()))
self.assertEqual(1, len(websection.getDocumentValueList()))
# use already created few pages in different languages with same reference
# and check that we always get the right one based on selected
......@@ -483,7 +483,7 @@ Hé Hé Hé!""", page.asText().strip())
base_url = base_list[0]
self.assertEqual(base_url, "%s/%s/" % (website.absolute_url(), web_page_en.getReference()))
def test_07_WebSection_getDocumentValueList(self):
def test_07_getDocumentValueList(self):
""" Check getting getDocumentValueList from Web Section.
"""
portal = self.getPortal()
......@@ -530,7 +530,7 @@ Hé Hé Hé!""", page.asText().strip())
sequence_count = 0
for sequence in [sequence_one, sequence_two, sequence_three]:
sequence_count += 1
message = '\ntest_07_WebSection_getDocumentValueList (Sequence %s)' \
message = '\ntest_07_getDocumentValueList (Sequence %s)' \
% (sequence_count)
ZopeTestCase._print(message)
......@@ -603,7 +603,7 @@ Hé Hé Hé!""", page.asText().strip())
[w.getLanguage() for w in ja_document_value_list])
# Tests for all_languages parameter (language parameter is simply ignored)
en_document_value_list = websection.WebSection_getDocumentValueListBase(all_languages=1)
en_document_value_list = websection.getDocumentValueList(all_languages=1)
self.assertEqual(13, len(en_document_value_list))
self.assertEqual(4, len([w.getLanguage() for w in en_document_value_list \
if w.getLanguage() == 'en']))
......@@ -612,7 +612,7 @@ Hé Hé Hé!""", page.asText().strip())
self.assertEqual(4, len([w.getLanguage() for w in en_document_value_list \
if w.getLanguage() == 'ja']))
pt_document_value_list = websection.WebSection_getDocumentValueListBase(all_languages=1,
pt_document_value_list = websection.getDocumentValueList(all_languages=1,
language='pt')
self.assertEqual(13, len(pt_document_value_list))
self.assertEqual(4, len([w.getLanguage() for w in pt_document_value_list \
......@@ -623,16 +623,16 @@ Hé Hé Hé!""", page.asText().strip())
if w.getLanguage() == 'ja']))
# Tests for all_languages and all_versions
en_document_value_list = websection.WebSection_getDocumentValueListBase(all_languages=1,
all_versions=1)
en_document_value_list = websection.getDocumentValueList(all_languages=1,
all_versions=1)
pt_document_value_list = websection.WebSection_getDocumentValueListBase(all_languages=1,
all_versions=1,
language='pt')
pt_document_value_list = websection.getDocumentValueList(all_languages=1,
all_versions=1,
language='pt')
ja_document_value_list = websection.WebSection_getDocumentValueListBase(all_languages=1,
all_versions=1,
language='ja')
ja_document_value_list = websection.getDocumentValueList(all_languages=1,
all_versions=1,
language='ja')
for document_value_list in [en_document_value_list, pt_document_value_list,
ja_document_value_list]:
......@@ -664,22 +664,22 @@ Hé Hé Hé!""", page.asText().strip())
self.assertEqual(['A', 'A', 'A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'D', 'E', 'F'],
[w.getReference() for w in \
websection.WebSection_getDocumentValueListBase(all_languages=1,
websection.getDocumentValueList(all_languages=1,
sort_on=[('reference', 'ASC')])])
self.assertEqual(['01', '02', '03', '04', '05', '06', '07', '08', '09', '11', '12', '13', '16'],
[w.getTitle() for w in \
websection.WebSection_getDocumentValueListBase(all_languages=1,
websection.getDocumentValueList(all_languages=1,
sort_on=[('title', 'ASC')])])
self.assertEqual(['F', 'E', 'D', 'C', 'C', 'C', 'B', 'B', 'B', 'A', 'A', 'A', 'A'],
[w.getReference() for w in \
websection.WebSection_getDocumentValueListBase(all_languages=1,
websection.getDocumentValueList(all_languages=1,
sort_on=[('reference', 'DESC')])])
self.assertEqual(['16', '13', '12', '11', '09', '08', '07', '06', '05', '04', '03', '02', '01'],
[w.getTitle() for w in \
websection.WebSection_getDocumentValueListBase(all_languages=1,
websection.getDocumentValueList(all_languages=1,
sort_on=[('title', 'DESC')])])
self.web_page_module.manage_delObjects(list(self.web_page_module.objectIds()))
......@@ -1568,7 +1568,7 @@ Hé Hé Hé!""", page.asText().strip())
document = section.WebSection_getDocumentValue(reference, now=date)
self.assertNotEqual(document, None)
self.assertEqual(document.getPath(), expected_document.getPath())
document_list = section.WebSection_getDocumentValueList(now=date)
document_list = section.getDocumentValueList(now=date)
self.assertEqual(len(document_list), 1)
self.assertEqual(document_list[0].getPath(), expected_document.getPath())
# document1 is visible & listed before date2
......@@ -1701,8 +1701,8 @@ class TestERP5WebWithSimpleSecurity(ERP5TypeTestCase):
self.assertSameSet((),
section.get_local_roles_for_userid(person_user_id))
def test_03_WebSection_getDocumentValueListSecurity(self):
""" Test WebSection_getDocumentValueList behaviour and security"""
def test_03_getDocumentValueListSecurity(self):
""" Test getDocumentValueList behaviour and security"""
self.loginByUserName('admin')
site = self.portal.web_site_module.newContent(portal_type='Web Site',
id='site')
......@@ -1753,28 +1753,28 @@ class TestERP5WebWithSimpleSecurity(ERP5TypeTestCase):
self.tic()
self.portal.Localizer.changeLanguage('en')
self.assertEqual(0, len(section.WebSection_getDocumentValueList()))
self.assertEqual(0, len(section.getDocumentValueList()))
self.loginByUserName('erp5user')
page_en_0.publish()
self.tic()
self.portal.Localizer.changeLanguage('en')
self.assertEqual(1, len(section.WebSection_getDocumentValueList()))
self.assertEqual(1, len(section.getDocumentValueList()))
self.assertEqual(page_en_0.getUid(),
section.WebSection_getDocumentValueList()[0].getUid())
section.getDocumentValueList()[0].getUid())
self.portal.Localizer.changeLanguage('jp')
self.assertEqual(0, len(section.WebSection_getDocumentValueList()))
self.assertEqual(0, len(section.getDocumentValueList()))
# By Anonymous
self.logout()
self.portal.Localizer.changeLanguage('en')
self.assertEqual(1, len(section.WebSection_getDocumentValueList()))
self.assertEqual(1, len(section.getDocumentValueList()))
self.assertEqual(page_en_0.getUid(),
section.WebSection_getDocumentValueList()[0].getUid())
section.getDocumentValueList()[0].getUid())
self.portal.Localizer.changeLanguage('jp')
self.assertEqual(0, len(section.WebSection_getDocumentValueList()))
self.assertEqual(0, len(section.getDocumentValueList()))
# Second Object
self.loginByUserName('erp5user')
......@@ -1782,18 +1782,18 @@ class TestERP5WebWithSimpleSecurity(ERP5TypeTestCase):
self.tic()
self.portal.Localizer.changeLanguage('en')
self.assertEqual(1, len(section.WebSection_getDocumentValueList()))
self.assertEqual(1, len(section.getDocumentValueList()))
self.assertEqual(page_en_1.getUid(),
section.WebSection_getDocumentValueList()[0].getUid())
section.getDocumentValueList()[0].getUid())
self.portal.Localizer.changeLanguage('jp')
self.assertEqual(0, len(section.WebSection_getDocumentValueList()))
self.assertEqual(0, len(section.getDocumentValueList()))
# By Anonymous
self.logout()
self.portal.Localizer.changeLanguage('en')
self.assertEqual(1, len(section.WebSection_getDocumentValueList()))
self.assertEqual(1, len(section.getDocumentValueList()))
self.assertEqual(page_en_1.getUid(),
section.WebSection_getDocumentValueList()[0].getUid())
section.getDocumentValueList()[0].getUid())
# Trird Object
self.loginByUserName('erp5user')
......@@ -1801,16 +1801,16 @@ class TestERP5WebWithSimpleSecurity(ERP5TypeTestCase):
self.tic()
self.portal.Localizer.changeLanguage('en')
self.assertEqual(2, len(section.WebSection_getDocumentValueList()))
self.assertEqual(2, len(section.getDocumentValueList()))
self.portal.Localizer.changeLanguage('jp')
self.assertEqual(0, len(section.WebSection_getDocumentValueList()))
self.assertEqual(0, len(section.getDocumentValueList()))
# By Anonymous
self.logout()
self.portal.Localizer.changeLanguage('en')
self.assertEqual(2, len(section.WebSection_getDocumentValueList()))
self.assertEqual(2, len(section.getDocumentValueList()))
self.portal.Localizer.changeLanguage('jp')
self.assertEqual(0, len(section.WebSection_getDocumentValueList()))
self.assertEqual(0, len(section.getDocumentValueList()))
# First Japanese Object
self.loginByUserName('erp5user')
......@@ -1818,18 +1818,18 @@ class TestERP5WebWithSimpleSecurity(ERP5TypeTestCase):
self.tic()
self.portal.Localizer.changeLanguage('en')
self.assertEqual(2, len(section.WebSection_getDocumentValueList()))
self.assertEqual(2, len(section.getDocumentValueList()))
self.portal.Localizer.changeLanguage('jp')
self.assertEqual(1, len(section.WebSection_getDocumentValueList()))
self.assertEqual(1, len(section.getDocumentValueList()))
# By Anonymous
self.logout()
self.portal.Localizer.changeLanguage('en')
self.assertEqual(2, len(section.WebSection_getDocumentValueList()))
self.assertEqual(2, len(section.getDocumentValueList()))
self.portal.Localizer.changeLanguage('jp')
self.assertEqual(1, len(section.WebSection_getDocumentValueList()))
self.assertEqual(1, len(section.getDocumentValueList()))
self.assertEqual(page_jp_0.getUid(),
section.WebSection_getDocumentValueList()[0].getUid())
section.getDocumentValueList()[0].getUid())
def test_04_ExpireUserAction(self):
""" Test the expire user action"""
......@@ -2057,10 +2057,10 @@ class TestERP5WebWithSimpleSecurity(ERP5TypeTestCase):
web_page_no_follow_up.publish()
self.tic()
self.assertEqual(1, len(website.WebSection_getDocumentValueList()))
self.assertEqual(1, len(website.getDocumentValueList()))
self.logout()
self.assertEqual(1, len(website.WebSection_getDocumentValueList()))
self.assertEqual(1, len(website.getDocumentValueList()))
def test_WebSiteModuleDefaultSecurity(self):
"""
......
......@@ -1243,5 +1243,21 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
db.query(r)
return src
# XXX which permission ?
  • @kazuhiko @vpelletier , How about making that method public ? If I understand correctly, just like searchResults, this method applies security to only consider documents user can view.

    ( this causes a The following 1 methods have a docstring but have no security assertions. test failure )

  • Public should be fine yes.

    Also, I do not understand the point of this change: from the extra code in Catalog Tool, it seems the purpose is to make this mechanism local to a specific SQL Catalog (maybe for connection choice needs). But it fails to expose an sql_catalog_id parameter which it should pass to getSQLCatalog, which reduces the benefits we can get from it. And because ** magic is used, it prevents easily extending the API.

    Why is the commit message totally empty of any interesting and relevant description of the change ?

  • OK, I'll make it public explicitly. Also, I will modify the code to support sql_catalog_id.

    For the change itself, I already asked Vincent to review this change long ago, but indeed it lacks the detailed explanation... It is to provide so-called 'Document search API' without Web Section.

  • I defined API explicitly : bb6f6696e78c7fd681bf9b1cc46360f1bd0ded2c

  • provide so-called 'Document search API' without Web Section.

    Makes sense indeed. This would have been perfect in the commit message. Now I can accurately fix my upcomming change which conflicted.

  • I'm confused: bb6f6696e78c7fd681bf9b1cc46360f1bd0ded2c is not in master, and apparently not in any branch at the moment. Maybe it is pending testing ?

  • I'm adding more commits on this topic on erp5-component branch and am waiting for the test result.

Please register or sign in to reply
# XXX API parameters should be explicitly defined in interface
# instead of **kw
def getDocumentValueList(self, **kw):
"""
Return the list of documents which belong to the
current section. The API is designed to
support additional parameters so that it is possible
to group documents by reference, version, language, etc.
or to implement filtering of documents.
This method must be implemented through a
catalog method script :
SQLCatalog_getDocumentValueList
"""
return self.getSQLCatalog().SQLCatalog_getDocumentValueList(**kw)
InitializeClass(CatalogTool)
  • This breaks 1 test:

    ======================================================================
    FAIL: test_method_protection (testSecurity.TestSecurity)
    ----------------------------------------------------------------------
    Traceback (most recent call last):
      File "Products/ERP5/tests/testSecurity.py", line 110, in test_method_protection
        self.fail(message)
    AssertionError:
    The following 1 methods have a docstring but have no security assertions.
            Products/ERP5Catalog/CatalogTool.py:1249 getDocumentValueList
  • mentioned in commit 65dde4a1

    Toggle commit list
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment