Commit f359f267 authored by Jérome Perrin's avatar Jérome Perrin

Use Distributed Cache for Session

Until now, portal_sessions was only reliably usable when using a family with only one zope node, because the session data was using RAM cache.
When used by authenticated users it was more or less usable, because of haproxy
sticky cookie that we set for authenticated users, but for non authenticated users
this was basically unusable.
This was especially a problem for CaptchaField, for which users are generally not
authenticated.

This changes portal_sessions to use a distributed cache, which brings several
differences:
 - sessions are now shared between all zopes of a cluster.
 - storing ERP5 temp documents is still possible, but modifying a temp document in
   session does not automatically save the changes in session, for next session read
   to be using the modified document it's required to save the document explicitly.
 - session respects transaction semantics, changes are only persisted in session when transaction commits successfully.
 - `portal_caches.clearAllCaches` API no longer clear all sessions.

See merge request !1451
parents d8a4b510 08918cef
......@@ -88,6 +88,9 @@ if not line_found:
method_id(category)
order_line.setPrice(context.getPrice(supply_path_type=["Sale Supply Line", "Sale Supply Cell"], context=order_line))
context.WebSection_updateShoppingCartTradeCondition(shopping_cart, None)
context.getPortalObject().portal_sessions[session_id].update(shopping_cart=shopping_cart)
if checkout:
website = context.getWebSiteValue()
if website is not None:
......
......@@ -7,7 +7,8 @@
- use Base_edit and a real ERP5 Form in order to benefit from
field reusability and property validation
"""
translateString = context.Base_translateString
portal = context.getPortalObject()
translateString = portal.Base_translateString
if field_my_buy_quantity is None:
field_my_buy_quantity = context.REQUEST.get("field_my_buy_quantity", None)
......@@ -44,7 +45,7 @@ if field_my_shipping_method not in ['', None]:
line = getattr(shopping_cart, 'shipping_method', None)
if line is not None:
shopping_cart.manage_delObjects(line.getId())
shipping = context.getPortalObject().restrictedTraverse(field_my_shipping_method)
shipping = portal.restrictedTraverse(field_my_shipping_method)
# create new shipping method order line
shopping_cart.newContent(
id='shipping_method',
......@@ -57,7 +58,9 @@ if field_my_comment is not None:
shopping_cart.setComment(field_my_comment)
context.WebSection_updateShoppingCartTradeCondition(shopping_cart, field_my_payment_mode, preserve=True)
portal.portal_sessions[container.REQUEST['session_id']].update(shopping_cart=shopping_cart)
if redirect:
# Hardcode redirection.
return context.Base_redirect("WebSection_viewShoppingCart", \
......
"""
Update Trade Condition with the appropriated Trade Condition.
"""
portal = context.getPortalObject()
if payment_mode is None and preserve:
current_trade_condition = shopping_cart.getSpecialiseValue()
if current_trade_condition is not None:
......@@ -16,7 +17,7 @@ if context.REQUEST.get("loyalty_reward", "") == "enable" and context.getSiteLoya
if payment_mode:
reference = '%s_%s' % (reference, payment_mode.lower())
sale_trade_condition = context.portal_catalog.getResultValue(
sale_trade_condition = portal.portal_catalog.getResultValue(
portal_type='Sale Trade Condition',
reference='%' + reference + '%',
validation_state='published',
......@@ -27,3 +28,6 @@ if sale_trade_condition:
shopping_cart.setSpecialiseValue(sale_trade_condition.getObject())
else:
shopping_cart.setSpecialise(context.WebSection_getDefaultTradeCondition())
portal.portal_sessions[container.REQUEST['session_id']].update(shopping_cart=shopping_cart)
......@@ -71,6 +71,7 @@ else:
shopping_cart = context.SaleOrder_getShoppingCart()
if shopping_cart is not None:
shopping_cart.manage_setLocalRoles(user_id, ['Owner'])
portal.portal_sessions[container.REQUEST['session_id']].update(shopping_cart=shopping_cart)
"""
response = context.REQUEST.RESPONSE
......
......@@ -43,12 +43,14 @@ for order_line in shopping_cart_items:
line_found=True
break
if line_found == False:
if not line_found:
## new Resource so add it to shopping cart
order_line = shopping_cart.newContent(portal_type='Sale Order Line')
order_line.setResource(resource.getRelativeUrl())
order_line.setQuantity(quantity)
context.getPortalObject().portal_sessions[request['session_id']].update(shopping_cart=shopping_cart)
if( context.getPortalType() == 'Product'):
context.Base_redirect('Resource_viewAsShop',
keep_items={'portal_status_message':context.Base_translateString("Added to cart.")})
......
"""
Delete a shopping cart item.
"""
translateString = context.Base_translateString
portal = context.getPortalObject()
translateString = portal.Base_translateString
shopping_cart = context.SaleOrder_getShoppingCart()
if field_my_order_line_id is not None:
......@@ -11,5 +11,6 @@ if field_my_order_line_id is not None:
else:
portal_status_message = "Please select an item."
portal.portal_sessions[container.REQUEST['session_id']].update(shopping_cart=shopping_cart)
context.Base_redirect(form_id, \
keep_items={'portal_status_message': translateString(portal_status_message, mapping={})})
......@@ -197,6 +197,8 @@ class TestCommerce(ERP5TypeTestCase):
self.clearModule(self.portal.currency_module)
self.clearModule(self.portal.sale_trade_condition_module)
self.portal.portal_caches.clearAllCache()
self.portal.portal_sessions.manage_delObjects([SESSION_ID])
self.commit()
def createDefaultOrganisation(self):
"""
......
......@@ -29,7 +29,6 @@
import unittest
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.Sequence import SequenceList
from erp5.component.tool.SessionTool import SESSION_CACHE_FACTORY
from string import letters as LETTERS
from random import choice
......@@ -42,10 +41,18 @@ primitives_kw = dict(attr_1 = ['list_item'], \
attr_5 = {'some_key': 'some_value'}, \
attr_6 = 'string', )
class TestSessionTool(ERP5TypeTestCase):
class SessionToolTestCase(ERP5TypeTestCase):
session_id = "123456789"
def afterSetUp(self):
super(SessionToolTestCase, self).afterSetUp()
self.setCachePlugin()
def setCachePlugin(self):
raise NotImplementedError()
def _changeCachePlugin(self, portal_type, storage_duration = 86400):
""" Change current cache plugin with new one. """
portal_caches = self.portal.portal_caches
......@@ -60,8 +67,7 @@ class TestSessionTool(ERP5TypeTestCase):
self.commit()
portal_caches.updateCache()
def stepTestSetGet(self, sequence=None,
sequence_list=None, **kw):
def test_set_get(self):
session = self.portal.portal_sessions[self.session_id]
session.clear()
session.update(primitives_kw)
......@@ -69,15 +75,14 @@ class TestSessionTool(ERP5TypeTestCase):
self.assertEqual(primitives_kw, session)
# API check
self.assert_(self.portal.portal_sessions[self.session_id] == \
self.assertEqual(self.portal.portal_sessions[self.session_id],
self.portal.portal_sessions.getSession(self.session_id))
session.clear()
session.edit(**primitives_kw)
session = self.portal.portal_sessions[self.session_id]
self.assertEqual(primitives_kw, session)
def stepTestAcquisitionRamSessionStorage(self, sequence=None, \
sequence_list=None, **kw):
def test_store_temp_object(self):
portal_sessions = self.portal.portal_sessions
session = portal_sessions.newContent(
self.session_id,
......@@ -86,13 +91,24 @@ class TestSessionTool(ERP5TypeTestCase):
## check temp (RAM based) attributes stored in session
for i in range (1, 3):
attr_name = 'attr_%s' %i
self.assert_(attr_name in session.keys())
self.assertIn(attr_name, session.keys())
attr = session[attr_name]
self.assert_(str(i), attr.getId())
self.assert_(0 == len(attr.objectIds()))
self.assertEqual(str(i), attr.getId())
self.assertEqual(0, len(attr.objectIds()))
def test_store_recursive_temp_object(self):
doc = self.portal.newContent(
temp_object=True, portal_type='Document', id='doc', title='Doc')
doc.newContent(
temp_object=True, portal_type='Document', id='sub_doc', title='Sub doc')
self.portal.portal_sessions.newContent(self.session_id, doc=doc)
self.commit()
doc = self.portal.portal_sessions[self.session_id]['doc']
self.assertEqual(doc.getTitle(), 'Doc')
self.assertEqual(doc.sub_doc.getTitle(), 'Sub doc')
self.assertEqual(len(doc.contentValues()), 1)
def stepModifySession(self, sequence=None, \
sequence_list=None, **kw):
def test_modify_session(self):
""" Modify session and check that modifications are updated in storage backend."""
portal_sessions = self.portal.portal_sessions
session = portal_sessions.newContent(self.session_id, \
......@@ -104,33 +120,33 @@ class TestSessionTool(ERP5TypeTestCase):
# get again session object again and check that session value is updated
# (this makes sense for memcached)
session = portal_sessions[self.session_id]
self.assert_(not 'attr_1' in session.keys())
self.assert_(not 'attr_2' in session.keys())
self.assertNotIn('attr_1', session.keys())
self.assertNotIn('attr_2', session.keys())
session.update(**{'key_1':'value_1',
'key_2':'value_2',})
session.update(**{'key_1': 'value_1',
'key_2': 'value_2',})
session = portal_sessions[self.session_id]
self.assert_('key_1' in session.keys())
self.assert_(session['key_1'] == 'value_1')
self.assert_('key_2' in session.keys())
self.assert_(session['key_2'] == 'value_2')
self.assertIn('key_1', session.keys())
self.assertEqual(session['key_1'], 'value_1')
self.assertIn('key_2', session.keys())
self.assertEqual(session['key_2'], 'value_2')
session.clear()
session = portal_sessions[self.session_id]
self.assert_(session == {})
self.assertEqual(session, {})
session['pop_key'] = 'pop_value'
session = portal_sessions[self.session_id]
self.assert_(session['pop_key'] == 'pop_value')
self.assertEqual(session['pop_key'], 'pop_value')
session.popitem()
session = portal_sessions[self.session_id]
self.assert_(session == {})
self.assertEqual(session, {})
session.setdefault('default', 'value')
session = portal_sessions[self.session_id]
self.assert_(session['default'] == 'value')
self.assertEqual(session['default'], 'value')
def stepDeleteClearSession(self, sequence=None, \
def test_clear_session(self, sequence=None, \
sequence_list=None, **kw):
""" Get session object and check keys stored in previous test. """
portal_sessions = self.portal.portal_sessions
......@@ -139,19 +155,18 @@ class TestSessionTool(ERP5TypeTestCase):
# delete it
portal_sessions.manage_delObjects(self.session_id)
session = portal_sessions[self.session_id]
self.assert_({} == session)
self.assertEqual(session, {})
# clear it
session = portal_sessions.newContent(
self.session_id, \
**primitives_kw)
session = portal_sessions[self.session_id]
self.assert_(primitives_kw == session)
self.assertEqual(session, primitives_kw)
session.clear()
session = portal_sessions[self.session_id]
self.assert_(session == {})
self.assertEqual(session, {})
def stepTestSessionDictInterface(self, sequence=None, \
sequence_list=None, **kw):
def test_session_dict_interface(self):
session = self.portal.portal_sessions[self.session_id]
session.clear()
# get / set
......@@ -182,9 +197,7 @@ class TestSessionTool(ERP5TypeTestCase):
self.assertEqual(('popitem', 'Bar'), session.popitem())
self.assertRaises(KeyError, session.popitem)
def stepTestSessionGetattr(self, sequence=None, \
sequence_list=None, **kw):
def test_session_getattr(self):
session = self.portal.portal_sessions[self.session_id]
session.clear()
session['foo'] = 'Bar'
......@@ -192,8 +205,7 @@ class TestSessionTool(ERP5TypeTestCase):
self.assertEqual('Default', getattr(session, 'bar', 'Default'))
self.assertRaises(AttributeError, getattr, session, 'bar')
def stepTestSessionBulkStorage(self, sequence=None, \
sequence_list=None, **kw):
def test_session_bulk_storage(self):
""" Test massive session sets which uses different cache plugin. """
kw = {}
session = self.portal.portal_sessions[self.session_id]
......@@ -216,8 +228,7 @@ class TestSessionTool(ERP5TypeTestCase):
session = self.portal.portal_sessions[self.session_id]
self.assertEqual(kw, session)
def stepTestSessionExpire(self, sequence=None, \
sequence_list=None, **kw):
def test_session_expire(self):
""" Test expire session which uses different cache plugin. """
interval = 3
portal_sessions = self.portal.portal_sessions
......@@ -227,10 +238,9 @@ class TestSessionTool(ERP5TypeTestCase):
time.sleep(interval+1)
session = self.portal.portal_sessions.getSession(self.session_id)
# session should be an emty dic as it expired
self.assert_(session == {})
self.assertEqual(session, {})
def stepTestCheckSessionAfterNewPerson(self, sequence=None, \
sequence_list=None, **kw):
def test_check_session_after_new_person(self):
""" Test if session still the same after create new person setting the
reference. """
session = self.portal.portal_sessions[self.session_id]
......@@ -252,44 +262,19 @@ class TestSessionTool(ERP5TypeTestCase):
self.assertEqual(session.get('key'), 'value')
self.abort()
def test_01_CheckSessionTool(self):
""" Checks session tool is present """
self.assertNotEqual(None, getattr(self.portal, 'portal_sessions', None))
def test_02_RamSession(self):
""" Test RamSession which uses local RAM based cache plugin. """
sequence_list = SequenceList()
sequence_string = 'stepTestSetGet \
stepTestAcquisitionRamSessionStorage \
stepModifySession \
stepDeleteClearSession \
stepTestSessionDictInterface \
stepTestSessionGetattr \
stepTestSessionBulkStorage \
stepTestSessionExpire \
stepTestCheckSessionAfterNewPerson \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_03_MemcachedDistributedSession(self):
""" Test DistributedSession which uses memcached based cache plugin. """
# create memcached plugin and test
class TestRAMCacheSessionTool(SessionToolTestCase):
def setCachePlugin(self):
self._changeCachePlugin('Ram Cache')
class TestDistributedCacheSessionTool(SessionToolTestCase):
def setCachePlugin(self):
self._changeCachePlugin('Distributed Ram Cache')
sequence_list = SequenceList()
sequence_string = 'stepTestSetGet \
stepModifySession \
stepDeleteClearSession \
stepTestSessionDictInterface \
stepTestSessionGetattr \
stepTestSessionBulkStorage \
stepTestSessionExpire \
stepTestCheckSessionAfterNewPerson \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestSessionTool))
suite.addTest(unittest.makeSuite(TestRAMCacheSessionTool))
suite.addTest(unittest.makeSuite(TestDistributedCacheSessionTool))
return suite
......@@ -64,6 +64,8 @@ class SupportRequestTestCase(ERP5TypeTestCase, object):
self.portal.person_module.manage_delObjects(
[self.user.getId()])
self.assertEqual(self.portal.ERP5Site_cleanupSupportRequestUITestDataSet(), 'Done.')
self.portal.portal_sessions.manage_delObjects(
['support_request_module/erp5_officejs_support_request_ui_test_support_reuqest_001.latest_comment'])
self.tic()
def createUserAndLogin(self):
......
......@@ -2,35 +2,21 @@
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Ram Cache" module="erp5.portal_type"/>
<global name="Distributed Ram Cache" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_count</string> </key>
<key> <string>categories</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
<tuple>
<string>specialise/portal_memcached/default_memcached_plugin</string>
</tuple>
</value>
</item>
<item>
<key> <string>_mt_index</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>_tree</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
<item>
<key> <string>cache_expire_check_interval</string> </key>
<value> <int>1800</int> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>1</string> </value>
<value> <string>2</string> </value>
</item>
<item>
<key> <string>int_index</string> </key>
......@@ -38,35 +24,13 @@
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Ram Cache</string> </value>
<value> <string>Distributed Ram Cache</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>RAM Cache</string> </value>
<value> <string>Distributed Ram Cache</string> </value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="Length" module="BTrees.Length"/>
</pickle>
<pickle> <int>0</int> </pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<none/>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<none/>
</pickle>
</record>
</ZopeData>
......@@ -60,31 +60,20 @@ class Session(UserDict):
# used to set duration of session
session_duration = None
def _updatecontext(self, aq_context):
""" Update current aquisition context.
This makes only sense for local RAM Session."""
pass
def _updateSessionDuration(self, session_duration):
self.session_duration = int(session_duration)
def _updateSessionId(self, session_id):
self.session_id = session_id
def __str__(self):
return self.__repr__()
def edit(self, **kw):
""" Edit session object. """
for key, item in kw.items():
self.__setitem__(key, item)
class RamSession(Session):
""" Local RAM Session dictionary """
# a handle to current aquisition context
_aq_context = None
def __getstate__(self):
"""filter out acqusition wrappers when serializing.
"""
state = {
'session_duration': self.session_duration,
'data': {k: aq_base(v) for k, v in self.data.iteritems()}
}
if 'session_id' in self.__dict__:
state['session_id'] = self.session_id
return state
def _updatecontext(self, aq_context):
""" Update current aquisition context. """
self._aq_context = aq_context
......@@ -98,9 +87,29 @@ class RamSession(Session):
return value
raise KeyError(key)
def _updateSessionDuration(self, session_duration):
self.session_duration = int(session_duration)
def _updateSessionId(self, session_id):
self.session_id = session_id
def __str__(self):
return self.__repr__()
def edit(self, **kw):
""" Edit session object. """
for key, item in kw.items():
self.__setitem__(key, item)
def __setitem__(self, key, item):
# save value without its acquisition context
Session.__setitem__(self, key, aq_base(item))
UserDict.__setitem__(self, key, aq_base(item))
def update(self, dict=None, **kwargs): # pylint: disable=redefined-builtin
for k, v in (dict or kwargs).iteritems():
# make sure to use our __setitem__ which removes acquistion wrappers
self[k] = v
class DistributedSession(Session):
""" Distributed Session dictionary.
......@@ -111,6 +120,7 @@ class DistributedSession(Session):
def _updateStorage(self):
""" Update backend storage. """
assert self.session_id
storage_plugin.set(self.session_id, \
SESSION_SCOPE, \
value = self, \
......@@ -174,13 +184,28 @@ class SessionTool(BaseTool):
shopping_cart = session['shopping_cart']
Please note that:
- developer is responsible for handling an unique sessiond_id (using cookies for example).
- it's not recommended to store in portal_sessions ZODB persistent objects because in order
to store them in Local RAM portal_sessions tool will remove aquisition wrapper. At "get"
request they'll be returend wrapped.
- developer can store temporary RAM based objects like 'TempOrder' but ONLY
when using Local RAM type of sessions. In a distributed environment one can use only
pickable types ue to the nature of memcached server.
- developer is responsible for handling an unique sessiond_id (using cookies for example).
- it's not recommended to store in portal_sessions ZODB persistent objects because in order
to store them in Local RAM portal_sessions tool will remove aquisition wrapper. At "get"
request they'll be returend wrapped.
- developer can store temporary ERP5 documents like 'TempOrder', but keep
in mind that after making changes to temporary documents they need to be
saved again in portal_sessions, so:
>>> shopping_cart = context.newContent(portal_type='Order', temp_object=True, id='987654321')
>>> shopping_cart.newContent(portal_type='Order Line', quantity=1, resource=...)
>>> session['shopping_cart'] = shopping_cart
# modifying a temp document from session is valid
>>> shopping_cart.getMovementList()[0].setQuantity(3)
# but the session still reference the documents as it was when saved to session:
>>> session['shopping_cart'].getMovementList()[0].getQuantity()
1
# to make the change persist in the session, the temp document has to be saved again:
>>> session['shopping_cart'] = shopping_cart
>>> session['shopping_cart'].getMovementList()[0].getQuantity()
3
"""
id = 'portal_sessions'
......@@ -206,7 +231,7 @@ class SessionTool(BaseTool):
# init it in cache and use different Session types based on cache plugin type used as a storage
storage_plugin_type = storage_plugin_.__class__.__name__
if storage_plugin_type in ("RamCache",):
session = RamSession()
session = Session()
elif storage_plugin_type in ("DistributedRamCache",):
session = DistributedSession()
session._updateSessionId(session_id)
......
......@@ -383,9 +383,8 @@ def loadTempPortalTypeClass(portal_type_name):
"""
import erp5.portal_type
klass = getattr(erp5.portal_type, portal_type_name)
return type(portal_type_name, (TemporaryDocumentMixin, klass), {})
return type("Temporary " + portal_type_name,
(TemporaryDocumentMixin, klass), {})
last_sync = -1
_bootstrapped = set()
......
......@@ -35,6 +35,7 @@ from Acquisition import aq_base
from Products.ERP5Type.Accessor.Constant import PropertyGetter as \
PropertyConstantGetter
class TemporaryDocumentMixin(object):
"""
Setters and attributes that are attached to temporary documents.
......@@ -44,8 +45,10 @@ class TemporaryDocumentMixin(object):
__roles__ = None
def __getstate__(self):
# disallow persistent storage
raise PicklingError("Temporary objects can't be pickled")
if getattr(self, '_p_jar', None) is not None:
# disallow persistent storage
raise PicklingError("Temporary objects can't be pickled")
return super(TemporaryDocumentMixin, self).__getstate__()
def reindexObject(self, *args, **kw):
pass
......
......@@ -31,8 +31,10 @@ other Tests are in erp5_core_test:testERP5Type) which is deprecated in favor
of Portal Type as Classes and ZODB Components
"""
import pickle
import unittest
import warnings
from Acquisition import aq_base
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.ZopeGuards import guarded_import
from Products.ERP5Type.tests.utils import LogInterceptor
......@@ -244,6 +246,19 @@ class TestERP5Type(ERP5TypeTestCase, LogInterceptor):
self.assertTrue(o.isTempObject())
self.assertTrue(guarded_import("Products.ERP5Type.Document", fromlist=["newTempBase"]))
def test_TempObjectPersistent(self):
# Temp objects can not be stored in ZODB
temp_object = self.portal.person_module.newContent(portal_type='Person', temp_object=True)
self.assertTrue(temp_object.isTempObject())
# they can be pickled
self.assertTrue(pickle.dumps(aq_base(temp_object)))
# but they can not be saved in ZODB accidentally
self.portal.person_module.oops = temp_object
self.assertRaisesRegexp(Exception, "Temporary objects can't be pickled", self.commit)
self.abort()
def test_warnings_redirected_to_event_log(self):
self._catch_log_errors()
self.addCleanup(self._ignore_log_errors)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment