Commit 1aa9dcfd authored by Julien Muchembled's avatar Julien Muchembled

New conflict-free list type to store results on active processes

parent 5c7d50d5
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Folder" module="OFS.Folder"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_objects</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test_conflict_resolution</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_body</string> </key>
<value> <string>context.postResult(result)\n
</string> </value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>result</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>testActiveProcess_postResult</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
############################################################################## ##############################################################################
# Copyright (c) 2010 Nexedi SA and Contributors. All Rights Reserved. # Copyright (c) 2010-2011 Nexedi SA and Contributors. All Rights Reserved.
# Julien Muchembled <jm@nexedi.com> # Julien Muchembled <jm@nexedi.com>
# #
# WARNING: This program as such is intended to be used by professional # WARNING: This program as such is intended to be used by professional
...@@ -29,15 +29,66 @@ ...@@ -29,15 +29,66 @@
import unittest import unittest
import urllib import urllib
import transaction import transaction
import ZODB
from ZODB.DemoStorage import DemoStorage
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
class TestConflictResolution(ERP5TypeTestCase):
class TestType(unittest.TestCase):
def setUp(self):
self.db = ZODB.DB(DemoStorage())
self.tm1 = transaction.TransactionManager()
self.conn1 = self.db.open(transaction_manager=self.tm1)
self.tm2 = transaction.TransactionManager()
self.conn2 = self.db.open(transaction_manager=self.tm2)
def tearDown(self):
self.db.close()
del self.tm1, self.conn1, self.tm2, self.conn2, self.db
def testConflictFreeLog(self):
from Products.ERP5Type.ConflictFree import ConflictFreeLog
for t in (1, 404, 4), (500, 407, 3), (1000, 407, 2), (1500, 808, 1):
self.conn1.root()['x'] = x1 = ConflictFreeLog(bucket_size=t[0])
self.tm1.commit()
self.tm2.begin()
x2 = self.conn2.root()['x']
x1.append(-1)
x2.extend(xrange(200))
self.tm1.commit()
self.tm2.commit()
self.tm1.begin()
x1 += 401, 402
x2.extend(xrange(200, 400))
self.tm2.commit()
x2.append(400)
self.tm2.commit()
self.tm1.commit()
self.tm2.begin()
expected = range(-1, 403)
self.assertEqual(expected, list(x1))
self.assertEqual(expected, list(x2))
self.assertEqual(expected[::-1], list(x1.reversed()))
self.assertEqual(len(expected), len(x1))
self.assertEqual(len(expected), len(x2))
x1 += x2
self.assertEqual(t[1], len(x1._log))
bucket_count = 1
x = x2._next
while x not in (x2, None):
x = x._next
bucket_count += 1
self.assertEqual(t[2], bucket_count)
class TestERP5(ERP5TypeTestCase):
def getTitle(self): def getTitle(self):
return "Conflict Resolution" return "Conflict Resolution: ERP5"
def getBusinessTemplateList(self): def getBusinessTemplateList(self):
return ('erp5_base',) return 'erp5_base', 'test_conflict_resolution'
def afterSetUp(self): def afterSetUp(self):
other_node = self.getOtherZEOClientNode() other_node = self.getOtherZEOClientNode()
...@@ -67,7 +118,25 @@ class TestConflictResolution(ERP5TypeTestCase): ...@@ -67,7 +118,25 @@ class TestConflictResolution(ERP5TypeTestCase):
transaction.commit() # max(1, 2) + 1 transaction.commit() # max(1, 2) + 1
self.assertEqual(3, portal.getCacheCookie(cookie_name)) self.assertEqual(3, portal.getCacheCookie(cookie_name))
def testActiveProcess(self):
active_process = self.portal.portal_activities.newActiveProcess()
transaction.commit()
remote = self.other_node
for id in active_process.getRelativeUrl().split('/'):
remote = getattr(remote, id)
for x in xrange(100):
active_process.postResult(x)
remote.testActiveProcess_postResult(100)
try:
transaction.commit()
except:
transaction.abort() # make failure more readable in case of regression
raise
self.assertEqual(sorted(active_process.getResultList()), range(101))
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestConflictResolution)) suite.addTest(unittest.makeSuite(TestType))
suite.addTest(unittest.makeSuite(TestERP5))
return suite return suite
Copyright (c) 2011 Nexedi SA
\ No newline at end of file
erp5_base
\ No newline at end of file
GPL
\ No newline at end of file
1
\ No newline at end of file
test_conflict_resolution
\ No newline at end of file
testConflictResolution
\ No newline at end of file
test_conflict_resolution
\ No newline at end of file
...@@ -31,7 +31,7 @@ from AccessControl import ClassSecurityInfo ...@@ -31,7 +31,7 @@ from AccessControl import ClassSecurityInfo
from Products.CMFCore import permissions as CMFCorePermissions from Products.CMFCore import permissions as CMFCorePermissions
from Products.ERP5Type.Base import Base from Products.ERP5Type.Base import Base
from Products.ERP5Type import PropertySheet from Products.ERP5Type import PropertySheet
from BTrees.IOBTree import IOBTree from Products.ERP5Type.ConflictFree import ConflictFreeLog
from BTrees.Length import Length from BTrees.Length import Length
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, \ from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, \
VALIDATE_ERROR_STATE VALIDATE_ERROR_STATE
...@@ -83,37 +83,43 @@ class ActiveProcess(Base): ...@@ -83,37 +83,43 @@ class ActiveProcess(Base):
# Declarative constructors # Declarative constructors
constructors = (manage_addActiveProcessForm, addActiveProcess) constructors = (manage_addActiveProcessForm, addActiveProcess)
def _generateRandomId(self): def __init__(self, *args, **kw):
""" Base.__init__(self, *args, **kw)
Generate a random int depending on the size of the result list self.result_list = ConflictFreeLog()
"""
random_id = randint(1, 10000 * (self.result_len.value + 1))
return random_id
security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult')
def postResult(self, result): def postResult(self, result):
if getattr(self, 'result_list', None) is None: try:
self.result_list = IOBTree() result_list = self.result_list
self.result_len = Length() except AttributeError:
random_id = self._generateRandomId() # BBB: self was created before implementation of __init__
_marker = [] self.result_list = result_list = ConflictFreeLog()
else:
if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree
# use a random id in order to store result in a way with # use a random id in order to store result in a way with
# fewer conflict errors # fewer conflict errors
while self.result_list.get(random_id, _marker) is not _marker: random_id = randrange(0, 10000 * (self.result_len.value + 1))
random_id = self._generateRandomId() while result_list.has_key(random_id):
self.result_list[random_id] = result random_id += 1
result_list[random_id] = result
self.result_len.change(1) self.result_len.change(1)
return
result_list.append(result)
security.declareProtected(CMFCorePermissions.ManagePortal, 'getResultList') security.declareProtected(CMFCorePermissions.ManagePortal, 'getResultList')
def getResultList(self, **kw): def getResultList(self, **kw):
""" """
Returns the list of results Returns the list of results
""" """
if getattr(self, 'result_list', None) is None:
self.result_list = IOBTree()
self.result_len = Length()
# Improve this to include sort order XXX # Improve this to include sort order XXX
return self.result_list.values() try:
result_list = self.result_list
except AttributeError:
# BBB: self was created before implementation of __init__
return []
if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree
return result_list.values()
return list(result_list)
security.declareProtected(CMFCorePermissions.ManagePortal, 'activateResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'activateResult')
def activateResult(self, result): def activateResult(self, result):
......
...@@ -260,13 +260,7 @@ class Message(BaseMessage): ...@@ -260,13 +260,7 @@ class Message(BaseMessage):
result = ActiveResult(result=result) result = ActiveResult(result=result)
# XXX Allow other method_id in future # XXX Allow other method_id in future
result.edit(object_path=object, method_id=self.method_id) result.edit(object_path=object, method_id=self.method_id)
kw = self.activity_kw active_process.postResult(result)
kw = dict((k, kw[k]) for k in ('priority', 'tag') if k in kw)
# Save result in a separate activity to reduce
# probability and cost of conflict error.
active_process.activate(activity='SQLQueue',
group_method_id=None, # dummy group method
**kw).activateResult(result)
def __call__(self, activity_tool): def __call__(self, activity_tool):
try: try:
......
from persistent import Persistent
class ConflictFreeLog(Persistent):
"""Scalable conflict-free append-only double-linked list
"""
_prev = _next = None
_tail_count = 0
_bucket_size = 1000
def __init__(self, items=(), bucket_size=None):
self._log = list(items)
if bucket_size:
assert bucket_size > 0
self._bucket_size = bucket_size
def __len__(self):
return self._tail_count + len(self._log)
if not hasattr(Persistent, '_p_estimated_size'): # BBB: Zope 2.8
_p_estimated_size = property(lambda self: len(self._log) * 64)
def _maybe_rotate(self):
if self._p_estimated_size < self._bucket_size:
self._p_changed = 1
else:
tail = self.__class__()
tail._log = self._log
prev = self._prev
if prev is None:
prev = self
else:
assert not self._next._tail_count
tail._tail_count = self._tail_count
tail._prev = prev
prev._next = tail
self._prev = tail
tail._next = self
self._tail_count += len(self._log)
self._log = []
def append(self, item):
if not self._p_changed:
self._maybe_rotate()
self._log.append(item)
def extend(self, items):
if not self._p_changed:
self._maybe_rotate()
self._log.extend(items)
def __iadd__(self, other):
self.extend(other)
return self
def __iter__(self):
bucket = self._next
if bucket is None:
bucket = self
while 1:
for item in bucket._log:
yield item
if bucket is self:
break
bucket = bucket._next
def reversed(self):
bucket = self
while 1:
for item in bucket._log[::-1]:
yield item
bucket = bucket._prev
if bucket in (None, self):
break
def _p_resolveConflict(self, old_state, saved_state, new_state):
if old_state.get('_tail_count', 0) == new_state.get('_tail_count', 0):
i = len(old_state['_log'])
else:
i = 0
saved_state['_log'].extend(new_state['_log'][i:])
return saved_state
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment