Commit 6d74ba22 authored by Roque's avatar Roque

CMFActivity: move getCurrentNode and getServerAddress methods outside ActivityTool class

- getCurrentNode method can be directly imported from CMFActivity
- corresponding unittests

/reviewed-on !647
parent 03831389
...@@ -94,9 +94,10 @@ class TestERP5(ERP5TypeTestCase): ...@@ -94,9 +94,10 @@ class TestERP5(ERP5TypeTestCase):
self.login() self.login()
def getOtherZopeNode(self): def getOtherZopeNode(self):
from Products.CMFActivity.ActivityTool import getCurrentNode
activity_tool = self.portal.portal_activities activity_tool = self.portal.portal_activities
node_list = list(activity_tool.getProcessingNodeList()) node_list = list(activity_tool.getProcessingNodeList())
node_list.remove(activity_tool.getCurrentNode()) node_list.remove(getCurrentNode())
assert node_list, "this unit test must be run with at least 2 Zope nodes" assert node_list, "this unit test must be run with at least 2 Zope nodes"
return node_list[0] return node_list[0]
......
...@@ -126,6 +126,46 @@ def activity_timing_method(method, args, kw): ...@@ -126,6 +126,46 @@ def activity_timing_method(method, args, kw):
end = time() end = time()
activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw)) activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))
def getServerAddress():
"""
Return current server address
"""
global _server_address
if _server_address is None:
ip = port = ''
from asyncore import socket_map
for k, v in socket_map.items():
if hasattr(v, 'addr'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
ip, port = v.addr
break
if ip == '0.0.0.0':
ip = socket.gethostbyname(socket.gethostname())
_server_address = '%s:%s' %(ip, port)
return _server_address
def getCurrentNode():
""" Return current node identifier """
global currentNode
if currentNode is None:
currentNode = getattr(
getConfiguration(),
'product_config',
{},
).get('cmfactivity', {}).get('node-id')
if currentNode is None:
warnings.warn('Node name auto-generation is deprecated, please add a'
'\n'
'<product-config CMFActivity>\n'
' node-id = ...\n'
'</product-config>\n'
'section in your zope.conf, replacing "..." with a cluster-unique '
'node identifier.', DeprecationWarning)
currentNode = getServerAddress()
return currentNode
# Here go ActivityBuffer instances # Here go ActivityBuffer instances
# Structure: # Structure:
# global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer # global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
...@@ -359,7 +399,7 @@ Method: %s ...@@ -359,7 +399,7 @@ Method: %s
Arguments: %r Arguments: %r
Named Parameters: %r Named Parameters: %r
""" % (email_from_name, activity_tool.email_from_address, user_email, message, """ % (email_from_name, activity_tool.email_from_address, user_email, message,
path, self.method_id, activity_tool.getCurrentNode(), fail_count, path, self.method_id, getCurrentNode(), fail_count,
self.user_name, self.line.uid, path, self.method_id, self.args, self.kw) self.user_name, self.line.uid, path, self.method_id, self.args, self.kw)
if self.traceback: if self.traceback:
mail_text += '\nException:\n' + self.traceback mail_text += '\nException:\n' + self.traceback
...@@ -819,42 +859,18 @@ class ActivityTool (Folder, UniqueObject): ...@@ -819,42 +859,18 @@ class ActivityTool (Folder, UniqueObject):
""" """
Backward-compatibility code only. Backward-compatibility code only.
""" """
global _server_address LOG('ActivityTool', WARNING,
if _server_address is None: '"getServerAddress" class method is deprecated, use "getServerAddress" module-level function instead.')
ip = port = '' return getServerAddress()
from asyncore import socket_map
for k, v in socket_map.items():
if hasattr(v, 'addr'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
ip, port = v.addr
break
if ip == '0.0.0.0':
ip = socket.gethostbyname(socket.gethostname())
_server_address = '%s:%s' %(ip, port)
return _server_address
security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNode') security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNode')
def getCurrentNode(self): def getCurrentNode(self):
""" Return current node identifier """ """
global currentNode Backward-compatibility code only.
if currentNode is None: """
currentNode = getattr( LOG('ActivityTool', WARNING,
getConfiguration(), '"getCurrentNode" class method is deprecated, use "getCurrentNode" module-level function instead.')
'product_config', return getCurrentNode()
{},
).get('cmfactivity', {}).get('node-id')
if currentNode is None:
warnings.warn('Node name auto-generation is deprecated, please add a'
'\n'
'<product-config CMFActivity>\n'
' node-id = ...\n'
'</product-config>\n'
'section in your zope.conf, replacing "..." with a cluster-unique '
'node identifier.', DeprecationWarning)
currentNode = self.getServerAddress()
return currentNode
security.declareProtected(CMFCorePermissions.ManagePortal, 'getDistributingNode') security.declareProtected(CMFCorePermissions.ManagePortal, 'getDistributingNode')
def getDistributingNode(self): def getDistributingNode(self):
...@@ -884,7 +900,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -884,7 +900,7 @@ class ActivityTool (Folder, UniqueObject):
if node_dict: if node_dict:
# BBB: check if our node was known by address (processing and/or # BBB: check if our node was known by address (processing and/or
# distribution), and migrate it. # distribution), and migrate it.
server_address = self.getServerAddress() server_address = getServerAddress()
role = node_dict.pop(server_address, ROLE_IDLE) role = node_dict.pop(server_address, ROLE_IDLE)
if self.distributingNode == server_address: if self.distributingNode == server_address:
self.distributingNode = node self.distributingNode = node
...@@ -1022,7 +1038,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1022,7 +1038,7 @@ class ActivityTool (Folder, UniqueObject):
user = self.portal_catalog.getWrappedOwner() user = self.portal_catalog.getWrappedOwner()
newSecurityManager(self.REQUEST, user) newSecurityManager(self.REQUEST, user)
currentNode = self.getCurrentNode() currentNode = getCurrentNode()
self.registerNode(currentNode) self.registerNode(currentNode)
processing_node_list = self.getNodeList(role=ROLE_PROCESSING) processing_node_list = self.getNodeList(role=ROLE_PROCESSING)
......
...@@ -53,6 +53,10 @@ import random ...@@ -53,6 +53,10 @@ import random
import threading import threading
import weakref import weakref
import transaction import transaction
from Products.CMFActivity.ActivityTool import getCurrentNode, getServerAddress
from App.config import getConfiguration
from asyncore import socket_map
import socket
class CommitFailed(Exception): class CommitFailed(Exception):
pass pass
...@@ -2949,6 +2953,33 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2949,6 +2953,33 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
skin.manage_delObjects([script_id]) skin.manage_delObjects([script_id])
self.tic() self.tic()
def testGetCurrentNode(self):
current_node = getattr(getConfiguration(),'product_config',{},).get('cmfactivity', {}).get('node-id')
if not current_node:
current_node = getServerAddress()
node = getCurrentNode()
self.assertEqual(node, current_node)
portal = self.getPortal()
activity_node = portal.portal_activities.getCurrentNode()
self.assertEqual(activity_node, current_node)
def testGetServerAddress(self):
ip = port = ''
for k, v in socket_map.items():
if hasattr(v, 'addr'):
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
ip, port = v.addr
break
if ip == '0.0.0.0':
ip = socket.gethostbyname(socket.gethostname())
server_address = '%s:%s' %(ip, port)
address = getServerAddress()
self.assertEqual(address, server_address)
portal = self.getPortal()
activity_address = portal.portal_activities.getServerAddress()
self.assertEqual(activity_address, server_address)
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity)) suite.addTest(unittest.makeSuite(TestCMFActivity))
......
...@@ -93,10 +93,11 @@ class TestInvalidationBug(ERP5TypeTestCase): ...@@ -93,10 +93,11 @@ class TestInvalidationBug(ERP5TypeTestCase):
def testLateInvalidationFromZEO(self): def testLateInvalidationFromZEO(self):
### Check unit test is run properly ### Check unit test is run properly
from ZEO.ClientStorage import ClientStorage from ZEO.ClientStorage import ClientStorage
from Products.CMFActivity.ActivityTool import getCurrentNode
storage = self.portal._p_jar._storage storage = self.portal._p_jar._storage
activity_tool = self.portal.portal_activities activity_tool = self.portal.portal_activities
node_list = list(activity_tool.getProcessingNodeList()) node_list = list(activity_tool.getProcessingNodeList())
node_list.remove(activity_tool.getCurrentNode()) node_list.remove(getCurrentNode())
assert node_list and isinstance(storage, ClientStorage), \ assert node_list and isinstance(storage, ClientStorage), \
"this unit test must be run with at least 2 ZEO clients" "this unit test must be run with at least 2 ZEO clients"
......
...@@ -233,8 +233,7 @@ class FolderMixIn(ExtensionClass.Base): ...@@ -233,8 +233,7 @@ class FolderMixIn(ExtensionClass.Base):
of objects inside a module using activities of objects inside a module using activities
We also append random id We also append random id
""" """
activity_tool = self.getPortalObject().portal_activities new_id = "%s-%s" %(getCurrentNode().replace("-", "_"),
new_id = "%s-%s" %(activity_tool.getCurrentNode().replace("-", "_"),
self._generateRandomId()) self._generateRandomId())
try: try:
checkValidId(self, new_id) checkValidId(self, new_id)
...@@ -250,7 +249,7 @@ class FolderMixIn(ExtensionClass.Base): ...@@ -250,7 +249,7 @@ class FolderMixIn(ExtensionClass.Base):
""" """
activity_tool = self.getPortalObject().portal_activities activity_tool = self.getPortalObject().portal_activities
node_list = list(activity_tool.getNodeList()) node_list = list(activity_tool.getNodeList())
current_node = activity_tool.getCurrentNode() current_node = getCurrentNode()
try: try:
node_number = node_list.index(current_node) + 1 node_number = node_list.index(current_node) + 1
except ValueError: except ValueError:
...@@ -266,7 +265,7 @@ class FolderMixIn(ExtensionClass.Base): ...@@ -266,7 +265,7 @@ class FolderMixIn(ExtensionClass.Base):
""" """
activity_tool = self.getPortalObject().portal_activities activity_tool = self.getPortalObject().portal_activities
node_list = list(activity_tool.getNodeList()) node_list = list(activity_tool.getNodeList())
current_node = activity_tool.getCurrentNode() current_node = getCurrentNode()
try: try:
node_number = node_list.index(current_node) + 1 node_number = node_list.index(current_node) + 1
except ValueError: except ValueError:
...@@ -1679,6 +1678,8 @@ class Folder(CopyContainer, CMFBTreeFolder, CMFHBTreeFolder, Base, FolderMixIn): ...@@ -1679,6 +1678,8 @@ class Folder(CopyContainer, CMFBTreeFolder, CMFHBTreeFolder, Base, FolderMixIn):
pass pass
return '%s/%s' % (url, icon) return '%s/%s' % (url, icon)
from Products.CMFActivity.ActivityTool import getCurrentNode
# We browse all used class from btree and hbtree and set not implemented # We browse all used class from btree and hbtree and set not implemented
# class if one method defined on a class is not defined on other, thus if # class if one method defined on a class is not defined on other, thus if
# new method appears in one class if will raise in the other one # new method appears in one class if will raise in the other one
......
...@@ -10,6 +10,7 @@ from zLOG import LOG, ERROR ...@@ -10,6 +10,7 @@ from zLOG import LOG, ERROR
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.ERP5Type.tests.utils import addUserToDeveloperRole from Products.ERP5Type.tests.utils import addUserToDeveloperRole
from Products.ERP5Type.tests.utils import createZServer from Products.ERP5Type.tests.utils import createZServer
from Products.CMFActivity.ActivityTool import getCurrentNode
class DictPersistentWrapper(IterableUserDict, object): class DictPersistentWrapper(IterableUserDict, object):
...@@ -86,7 +87,7 @@ def patchActivityTool(): ...@@ -86,7 +87,7 @@ def patchActivityTool():
def tic(self, processing_node=1, force=0): def tic(self, processing_node=1, force=0):
processing_node_list = self.getProcessingNodeList() processing_node_list = self.getProcessingNodeList()
if len(processing_node_list) > 1 and \ if len(processing_node_list) > 1 and \
self.getCurrentNode() == self.getDistributingNode(): getCurrentNode() == self.getDistributingNode():
# Sleep between each distribute. # Sleep between each distribute.
time.sleep(0.3) time.sleep(0.3)
transaction.commit() transaction.commit()
...@@ -166,7 +167,7 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase): ...@@ -166,7 +167,7 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
except AttributeError: except AttributeError:
from Products.CMFActivity.ActivityTool import ActivityTool from Products.CMFActivity.ActivityTool import ActivityTool
activity_tool = ActivityTool().__of__(self.app) activity_tool = ActivityTool().__of__(self.app)
currentNode = activity_tool.getCurrentNode() currentNode = getCurrentNode()
if distributing: if distributing:
activity_tool.manage_setDistributingNode(currentNode) activity_tool.manage_setDistributingNode(currentNode)
elif currentNode == activity_tool.getDistributingNode(): elif currentNode == activity_tool.getDistributingNode():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment