##############################################################################
#
# Copyright (c) 2007-2009 Nexedi SA and Contributors. All Rights Reserved.
#                    Kazuhiko <kazuhiko@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

import unittest
from threading import Thread
from thread import get_ident

import transaction
import ZODB

from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.SecurityManagement import newSecurityManager
from Products.ERP5Form.Selection import Selection
from Products.ERP5Form.Tool.SelectionTool import SelectionTool
from Products.ERP5Type.tests.backportUnittest import skip

class TestSelectionTool(ERP5TypeTestCase):

  def getTitle(self):
    return "SelectionTool"

  def getBusinessTemplateList(self):
    return tuple()

  def afterSetUp(self):
    uf = self.getPortal().acl_users
    uf._doAddUser('manager', '', ['Manager', 'Assignor'], [])
    user = uf.getUserById('manager').__of__(uf)
    newSecurityManager(None, user)
    self.portal_selections = self.getPortal().portal_selections
    name = 'test_selection'
    self.portal_selections.setSelectionFor(name, Selection(name))
    self.portal_selections.setSelectionParamsFor('test_selection', {'key':'value'})

  def testGetSelectionContainer(self):
    self.assertEquals(['test_selection'],
                      self.portal_selections.getSelectionNameList())
    self.assertEquals(['test_selection'],
                      self.portal_selections.getSelectionNames())
    self.assert_(self.portal_selections._getContainer() is not None)
    self.assert_(getattr(self.portal_selections, 'selection_data', None)
                 is not None)
    self.assert_(getattr(self.portal_selections, '_v_selection_container', None)
                 is not None)

  def testGetSelectionFor(self):
    selection = self.portal_selections.getSelectionFor('test_selection')
    self.assert_(isinstance(selection, Selection))
    self.assertEquals('test_selection', selection.name)

  def testGetSelectionParamsFor(self):
    self.assertEquals({'key':'value'},
                      self.portal_selections.getSelectionParamsFor('test_selection'))

  def testGetSelectionParamsDictInterface(self):
    self.assertEquals('value',
                      self.portal_selections['test_selection']['key'])
    # the main use case is to have a dict interface in TALES expressions:
    from Products.PageTemplates.Expressions import getEngine
    evaluate_tales = getEngine().getContext(dict(context=self.portal)).evaluate
    self.assertEquals('value',
            evaluate_tales('context/portal_selections/test_selection/key'))
    self.assertEquals('default', evaluate_tales(
      'context/portal_selections/test_selection/not_found | string:default'))


  @skip('Test to be written')
  def testCallSelectionFor(self):
    self.assertEquals(None,
                      self.portal_selections.callSelectionFor('not_found_selection'))
    raise NotImplementedError('more tests needed')

  def testCheckedUids(self):
    self.assertEquals([],
                      self.portal_selections.getSelectionCheckedUidsFor('test_selection'))
    self.portal_selections.setSelectionCheckedUidsFor('test_selection',
                                                      ['foo'])
    self.assertEquals(['foo'],
                      self.portal_selections.getSelectionCheckedUidsFor('test_selection'))
    self.portal_selections.updateSelectionCheckedUidList('test_selection',
                                                         ['foo'], ['bar'])
    self.assertEquals(['bar'],
                      self.portal_selections.getSelectionCheckedUidsFor('test_selection'))
    self.portal_selections.checkAll('test_selection',
                                    ['foo', 'baz'])
    self.assertEquals(sorted(['foo', 'bar', 'baz']),
                      sorted(self.portal_selections.getSelectionCheckedUidsFor('test_selection')))
    self.portal_selections.uncheckAll('test_selection',
                                    ['foo', 'bar'])
    self.assertEquals(['baz'],
                      self.portal_selections.getSelectionCheckedUidsFor('test_selection'))

  def testGetSelectionListUrlFor(self):
    self.assertEquals('',
                      self.portal_selections.getSelectionListUrlFor('test_selection'))

  def testInvertMode(self):
    self.portal_selections.setSelectionInvertModeFor('test_selection', 1)
    self.assertEquals(1,
                      self.portal_selections.getSelectionInvertModeFor('test_selection'))
    self.assertEquals([],
                      self.portal_selections.getSelectionInvertModeUidListFor('test_selection'))

  def testSetSelectionToAll(self):
    self.portal_selections.checkAll('test_selection',
                                    ['foo', 'bar'])
    self.portal_selections.setSelectionToAll('test_selection')
    self.assertEquals(0,
                      self.portal_selections.getSelectionInvertModeFor('test_selection'))
    self.assertEquals({},
                      self.portal_selections.getSelectionParamsFor('test_selection'))
    self.assertEquals([],
                      self.portal_selections.getSelectionCheckedUidsFor('test_selection'))

  def testSortOrder(self):
    self.portal_selections.setSelectionSortOrder('test_selection',
                                                 [('title', 'ascending')])
    self.assertEquals([('title', 'ascending')],
                      self.portal_selections.getSelectionSortOrder('test_selection'))
    self.portal_selections.setSelectionQuickSortOrder('test_selection',
                                                      'title')
    self.assertEquals([('title', 'descending')],
                      self.portal_selections.getSelectionSortOrder('test_selection'))
    self.portal_selections.setSelectionQuickSortOrder('test_selection',
                                                      'date')
    self.assertEquals([('date', 'ascending')],
                      self.portal_selections.getSelectionSortOrder('test_selection'))

  def testColumns(self):
    self.assertEquals([],
                      self.portal_selections.getSelectionColumns('test_selection'))
    self.assertEquals([('default_key', 'default_val')],
                      self.portal_selections.getSelectionColumns('test_selection', [('default_key', 'default_val')]))
    self.portal_selections.setSelectionColumns('test_selection',
                                                 [('key', 'val')])
    self.assertEquals([('key', 'val')],
                      self.portal_selections.getSelectionColumns('test_selection'))
    self.assertEquals([('key', 'val')],
                      self.portal_selections.getSelectionColumns('test_selection', [('default_key', 'default_val')]))

  def testStats(self):
    self.assertEquals([' ', ' ', ' ', ' ', ' ', ' '],
                      self.portal_selections.getSelectionStats('test_selection'))
    self.portal_selections.setSelectionStats('test_selection',
                                                 [])
    self.assertEquals([],
                      self.portal_selections.getSelectionStats('test_selection'))

  @skip('Test to be written')
  def testView(self):
    raise NotImplementedError('test should be added')

  @skip('Test to be written')
  def testPage(self):
    raise NotImplementedError('test should be added')

  def testDomainSelection(self):
    self.assertEquals('',
                      self.portal_selections.buildSQLJoinExpressionFromDomainSelection({}))
    self.assertEquals('',
                      self.portal_selections.buildSQLExpressionFromDomainSelection({}))
    from Products.ERP5Form.Selection import DomainSelection
    self.assertEquals('',
                      self.portal_selections.buildSQLJoinExpressionFromDomainSelection(DomainSelection({}).__of__(self.portal_selections)))
    category_tool = self.getCategoryTool()
    base = category_tool.newContent(portal_type = 'Base Category',
                                   id='test_base_cat')
    base_uid = base.getUid()
    self.assertEquals('category AS test_base_cat_category',
                      self.portal_selections.buildSQLJoinExpressionFromDomainSelection({'test_base_cat': ('portal_categories', 'test_base_cat')}))
    self.assertEquals('( catalog.uid = test_base_cat_category.uid AND (test_base_cat_category.category_uid = %d AND test_base_cat_category.base_category_uid = %d) )' % (base_uid, base_uid),
                      self.portal_selections.buildSQLExpressionFromDomainSelection({'test_base_cat': ('portal_categories', 'test_base_cat')}))
    test = base.newContent(portal_type = 'Category', id = 'test_cat')
    test_uid = test.getUid()
    self.assertEquals('category AS test_base_cat_category',
                      self.portal_selections.buildSQLJoinExpressionFromDomainSelection({'test_base_cat': ('portal_categories', 'test_base_cat/test_cat')}))
    self.assertEquals('( catalog.uid = test_base_cat_category.uid AND (test_base_cat_category.category_uid = %d AND test_base_cat_category.base_category_uid = %d) )' % (test_uid, base_uid),
                      self.portal_selections.buildSQLExpressionFromDomainSelection({'test_base_cat': ('portal_categories', 'test_base_cat/test_cat')}))
    self.assertEquals('( catalog.uid = test_base_cat_category.uid AND (test_base_cat_category.category_uid = %d AND test_base_cat_category.base_category_uid = %d AND test_base_cat_category.category_strict_membership = 1) )' % (test_uid, base_uid),
                      self.portal_selections.buildSQLExpressionFromDomainSelection({'test_base_cat': ('portal_categories', 'test_base_cat/test_cat')}, strict_membership = 1))

  def testDict(self):
    self.assertEquals({},
                      self.portal_selections.getSelectionDomainDictFor('test_selection'))
    self.assertEquals({},
                      self.portal_selections.getSelectionReportDictFor('test_selection'))

  def testIndex(self):
    self.assertEquals(None,
                      self.portal_selections.getSelectionIndexFor('test_selection'))

  def testDeleteSelection(self):
    selection = self.portal_selections.getSelectionFor('test_selection')
    self.assert_(isinstance(selection, Selection))
    self.portal_selections.manage_deleteSelection('test_selection')
    selection = self.portal_selections.getSelectionFor('test_selection')
    self.assertEqual(selection, None)

  def testDeleteSelectionForUser(self):
    # XXX: There is side effect, that manager, running user, is the same use
    #      and there is no way (for now) to get selections per user...
    selection = self.portal_selections.getSelectionFor('test_selection')
    self.assert_(isinstance(selection, Selection))
    self.portal_selections.manage_deleteSelectionForUser('test_selection',
        'manager')
    selection = self.portal_selections.getSelectionFor('test_selection')
    self.assertEqual(selection, None)

  def testDeleteGlobalSelection(self):
    selection = self.portal_selections.getSelectionFor('test_selection')
    self.assert_(isinstance(selection, Selection))
    self.portal_selections.manage_deleteGlobalSelection('test_selection')
    selection = self.portal_selections.getSelectionFor('test_selection')
    self.assertEqual(selection, None)

class TestSelectionPersistence(unittest.TestCase):
  """SelectionTool tests that needs a "real" FileStorage to make sure selection
  are really persistent and supports conflict resolution.
  """
  def setUp(self):
    # patch selection tool class so that we don't need a portal_membership to
    # find the current user name
    SelectionTool._getUserId_saved = SelectionTool._getUserId
    SelectionTool._getUserId = lambda self: 'user'
    SelectionTool._isAnonymous = lambda self: 0

    self.db = ZODB.DB(ZODB.DemoStorage.DemoStorage())
    self.cnx = self.db.open()
    self.portal_selections = \
      self.cnx.root().portal_selections = SelectionTool()
    name = 'test_selection'
    self.portal_selections.setSelectionFor(name, Selection(name))
    transaction.commit()

  def tearDown(self):
    # revert the patch from setUp
    SelectionTool._getUserId = SelectionTool._getUserId_saved
    self.cnx.close()
    self.db.close()

  def _runWithAnotherConnection(self, thread_func):
    """runs `thread_func` with another ZODB connection

    thread_func must be a callable accepting the connection object as only
    argument.
    """
    t = Thread(target=thread_func, args=(self.db.open(),))
    t.start()
    t.join(60)
    self.assertFalse(t.isAlive())

  def testSelectionParamConflictResolution(self):
    # same user edits the same selection with two different parameters
    self.portal_selections.setSelectionParamsFor(
                       'test_selection', dict(a="b"))
    def thread_func(cnx):
      try:
        portal_selections = cnx.root().portal_selections
        portal_selections.setSelectionParamsFor(
                              'test_selection', dict(a="c"))
        transaction.commit()
      finally:
        cnx.close()
    self._runWithAnotherConnection(thread_func)

    # This would raise a ConflictError without conflict resolution code
    transaction.commit()
    params = self.portal_selections.getSelectionParamsFor('test_selection')
    self.assertTrue(params.get('a'))

  def testSelectionNameConflictResolution(self):
    # same user edits two different selections
    self.portal_selections.setSelectionParamsFor(
                       'test_selection2', dict(a="b"))
    def thread_func(cnx):
      try:
        portal_selections = cnx.root().portal_selections
        portal_selections.setSelectionParamsFor(
                       'test_selection1', dict(a="b"))
        transaction.commit()
      finally:
        cnx.close()
    self._runWithAnotherConnection(thread_func)

    # This would raise a ConflictError without conflict resolution code
    transaction.commit()
    params = self.portal_selections.getSelectionParamsFor('test_selection1')
    self.assertEquals(params.get('a'), 'b')
    params = self.portal_selections.getSelectionParamsFor('test_selection2')
    self.assertEquals(params.get('a'), 'b')

  def testDifferentUsernameConflictResolution(self):
    # different users edits selections
    SelectionTool._getUserId = lambda self: 'user-%s' % get_ident()
    # Note that in current implementation, the first time we initialized a
    # selection for a user the mapping user -> selections is modified, which
    # will generate a conflict if we have two new users at the same time.
    # This test just checks that once we have initialized a user it doesn't
    # generate conflicts when another users also modifies it owns selection.
    # So we make sure that selection container is initialized for this user
    self.portal_selections.setSelectionParamsFor(
                       'test_selection', dict(initialized="1"))
    transaction.commit()

    self.portal_selections.setSelectionParamsFor(
                       'test_selection', dict(a="b"))
    def thread_func(cnx):
      try:
        portal_selections = cnx.root().portal_selections
        portal_selections.setSelectionParamsFor(
                       'test_selection', dict(a="b"))
        transaction.commit()
      finally:
        cnx.close()
    self._runWithAnotherConnection(thread_func)

    transaction.commit()
    # this check is quite low level.
    # we know that setUp stored one selection, and each of our 2 threads stored
    # one selection.
    self.assertEquals(3, len(self.portal_selections.selection_data.keys()))

  def testPersistentSelections(self):
    # test that selection parameters are persistent
    self.portal_selections.setSelectionParamsFor(
                 'test_selection', dict(key="saved_value"))
    transaction.commit()
    self.cnx.close()

    self.cnx = self.db.open()
    portal_selections = self.cnx.root().portal_selections
    self.assertEquals('saved_value',
        portal_selections.getSelectionParamsFor('test_selection').get('key'))

class TestSelectionToolMemcachedStorage(TestSelectionTool):

  def getTitle(self):
    return "SelectionTool with Memcached Storage"

  def afterSetUp(self):
    # create a Memcached Plugin
    memcached_tool = self.getPortal().portal_memcached
    if getattr(memcached_tool, 'default_memcached_plugin', None) is None:
      memcached_tool.newContent(id='default_memcached_plugin',
                                portal_type='Memcached Plugin',
                                int_index=0,
                                url_string='127.0.0.1:11211')
    self.portal.portal_selections.setStorage('portal_memcached/default_memcached_plugin')
    TestSelectionTool.afterSetUp(self)

  def testGetSelectionContainer(self):
    self.assertEquals([],
                      self.portal_selections.getSelectionNameList())
    self.assertEquals([],
                      self.portal_selections.getSelectionNames())
    self.assert_(self.portal_selections._getContainer() is not None)
    self.assert_(getattr(self.portal_selections, '_v_selection_container', None)
                 is not None)

  @skip('To be decided if implementation is required')
  def testDeleteGlobalSelection(self):
    pass

  def testChangeSelectionToolContainer(self):
    """
    After changing SelectionTool container, the new one should be used
    straightaway, and more specifically volatile variables should have
    been reset
    """
    from Products.ERP5Form.Tool.SelectionTool import (MemcachedContainer,
                                                      PersistentMappingContainer)

    # testGetSelectionFor() already checked if the Selection can be retrieved
    # from the container set in afterSetUp(), so no need to check again here
    self.portal_selections.setStorage('selection_data')
    transaction.commit()

    self.assertEqual(getattr(self.portal_selections, '_v_selection_container',
                             None), None)

    self.assertEqual(self.portal_selections.getSelectionFor('test_selection'),
                     None)

    self.assertTrue(isinstance(getattr(self.portal_selections,
                                       '_v_selection_container', None),
                                PersistentMappingContainer))


    self.portal_selections.setStorage('portal_memcached/default_memcached_plugin')
    transaction.commit()

    self.assertEqual(getattr(self.portal_selections, '_v_selection_container',
                             None), None)

    self.assertNotEqual(self.portal_selections.getSelectionFor('test_selection'),
                        None)

    self.assertTrue(isinstance(getattr(self.portal_selections,
                                       '_v_selection_container', None),
                               MemcachedContainer))

  def testChangeMemcached(self):
    """
    After Memcached has been changed, the new setting should be used and more
    specifically container volative variables should have been reset
    """
    self.assertNotEqual(self.portal_selections.getSelectionFor('test_selection'),
                        None)

    memcached_plugin = self.portal.portal_memcached.default_memcached_plugin
    url_string_before = memcached_plugin.getUrlString()

    memcached_plugin.setUrlString('127.0.0.1:4242')
    transaction.commit()

    try:
      self.assertEqual(getattr(self.portal_selections, '_v_selection_container',
                               None), None)

      self.assertEqual(self.portal_selections.getSelectionFor('test_selection'),
                       None)

      self.assertNotEqual(getattr(self.portal_selections, '_v_selection_container',
                                  None), None)
    finally:
      memcached_plugin.setUrlString(url_string_before)
      transaction.commit()

def test_suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(TestSelectionTool))
  suite.addTest(unittest.makeSuite(TestSelectionToolMemcachedStorage))
  suite.addTest(unittest.makeSuite(TestSelectionPersistence))
  return suite