Commit 4db74aeb authored by Sebastien Robin's avatar Sebastien Robin

- finished unit test for synchronization :

  - adding workflows
  - update, delete and add local roles
- updated the code in order to make this tests working fine


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@392 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent d80a5e97
......@@ -125,7 +125,7 @@ class ERP5Conduit(XMLSyncUtilsMixin):
# In the case where this new node is a object to add
LOG('addNode',0,'object.id: %s' % object.getId())
LOG('addNode',0,'xml.nodeName: %s' % xml.nodeName)
LOG('addNode',0,'isSubObjectAdd: %i' % self.getSubObjectDepth(xml))
LOG('addNode',0,'getSubObjectDepth: %i' % self.getSubObjectDepth(xml))
LOG('addNode',0,'isHistoryAdd: %i' % self.isHistoryAdd(xml))
if xml.nodeName in self.XUPDATE_INSERT_OR_ADD and self.getSubObjectDepth(xml)==0:
if self.isHistoryAdd(xml)!=-1: # bad hack XXX to be removed
......@@ -209,21 +209,24 @@ class ERP5Conduit(XMLSyncUtilsMixin):
#for action in self.getWorkflowActionFromXml(xml):
status = self.getStatusFromXml(xml)
LOG('addNode, status:',0,status)
wf_conflict_list = self.isWorkflowActionAddable(object=object,
add_action = self.isWorkflowActionAddable(object=object,
status=status,wf_tool=wf_tool,
xml=xml)
LOG('addNode, workflow_history wf_conflict_list:',0,wf_conflict_list)
if wf_conflict_list==[] or force and not simulate:
wf_id=wf_id,xml=xml)
#LOG('addNode, workflow_history wf_conflict_list:',0,wf_conflict_list)
LOG('addNode, workflow_history add_action:',0,add_action)
if add_action and not simulate:
LOG('addNode, setting status:',0,'ok')
wf_tool.setStatusOf(wf_id,object,status)
else:
conflict_list += wf_conflict_list
#else:
# conflict_list += wf_conflict_list
elif xml.nodeName in self.local_role_list and not simulate:
# We want to add a local role
roles = self.convertXmlValue(xml.childNodes[0].data,data_type='tokens')
user = self.getAttribute(xml,'id')
roles = list(roles) # Needed for CPS, or we have a CPS error
user = roles[0]
roles = roles[1:]
LOG('local_role: ',0,'user: %s roles: %s' % (repr(user),repr(roles)))
#user = roles[0]
#roles = roles[1:]
object.manage_setLocalRoles(user,roles)
else:
conflict_list += self.updateNode(xml=xml,object=object, force=force,
......@@ -236,9 +239,10 @@ class ERP5Conduit(XMLSyncUtilsMixin):
"""
A node is deleted
"""
# In the case where this new node is a object to delete
# In the case where we have to delete an object
LOG('ERP5Conduit',0,'deleteNode')
LOG('ERP5Conduit',0,'deleteNode, object.id: %s' % object.getId())
LOG('ERP5Conduit',0,'deleteNode, object path: %s' % repr(object.getPhysicalPath()))
conflict_list = []
if xml is not None:
xml = self.convertToXml(xml)
......@@ -248,6 +252,7 @@ class ERP5Conduit(XMLSyncUtilsMixin):
object_id = self.getAttribute(xml,'id')
elif self.getSubObjectDepth(xml)==1:
object_id = self.getSubObjectId(xml)
#LOG('ERP5Conduit',0,'deleteNode, SubObjectDepth: %i' % self.getSubObjectDepth(xml))
elif self.getSubObjectDepth(xml)==2:
# we have to call delete node on a subsubobject
sub_object_id = self.getSubObjectId(xml)
......@@ -256,14 +261,24 @@ class ERP5Conduit(XMLSyncUtilsMixin):
sub_xml = self.getSubObjectXupdate(xml)
conflict_list += self.deleteNode(xml=sub_xml,object=sub_object,
force=force, simulate=simulate, **kw)
except KeyError:
except (KeyError, AttributeError):
LOG('ERP5Conduit',0,'deleteNode, Unable to delete SubObject: %s' % str(sub_object_id))
pass
else: # We do have an object_id
if object_id is not None: # We do have an object_id
try:
object._delObject(object_id)
except (AttributeError, KeyError):
LOG('ERP5Conduit',0,'deleteNode, Unable to delete: %s' % str(object_id))
pass
# In the case where we have to delete an user role
# If we are still there, this means the delete is for this node
elif xml.nodeName in self.XUPDATE_DEL:
xml = self.getElementFromXupdate(xml)
if xml.nodeName in self.local_role_list and not simulate:
# We want to del a local role
user = self.getAttribute(xml,'id')
LOG('local_role: ',0,'user: %s' % repr(user))
object.manage_delLocalRoles([user])
return conflict_list
security.declareProtected(Permissions.ModifyPortalContent, 'updateNode')
......@@ -382,12 +397,14 @@ class ERP5Conduit(XMLSyncUtilsMixin):
conflict_list += self.addNode(xml=subnode,object=object,force=force,
simulate=simulate,**kw)
elif keyword == self.local_role_tag and not simulate:
# This is the case where we have to call addNode
# This is the case where we have to update Roles
LOG('updateNode',0,'we will add a local role')
roles = self.convertXmlValue(data,data_type='tokens')
user = roles[0]
roles = roles[1:]
object.manage_setLocalRoles(user,roles)
#user = self.getSubObjectId(xml)
#roles = self.convertXmlValue(data,data_type='tokens')
#object.manage_setLocalRoles(user,roles)
xml = self.getElementFromXupdate(xml)
conflict_list += self.addNode(xml=xml,object=object,force=force,
simulate=simulate,**kw)
elif self.isSubObjectModification(xml):
# We should find the object corresponding to
# this update, so we have to look in the previous_xml
......@@ -537,7 +554,7 @@ class ERP5Conduit(XMLSyncUtilsMixin):
"""
Return a string wich is the selection for the subobject
ex: for "/object[@id='161']/object[@id='default_address']/street_address"
if returns "/object[@id='default_address']/street_address"
it returns "/object[@id='default_address']/street_address"
"""
if re.search(self.object_exp,select) is not None:
s = '/'
......@@ -594,17 +611,6 @@ class ERP5Conduit(XMLSyncUtilsMixin):
return subnode
return None
# def getObjectId(self, xml):
# """
# Retrieve the id
# # XXX Deprecated, we should use instead, getParameter(xml,'id') XXX
# """
# for attribute in self.getAttributeNodeList(xml):
# if attribute.nodeName == 'id':
# data = attribute.childNodes[0].data
# return self.convertXmlValue(data)
# return None
security.declareProtected(Permissions.AccessContentsInformation,'getAttribute')
def getAttribute(self, xml, param):
"""
......@@ -801,6 +807,36 @@ class ERP5Conduit(XMLSyncUtilsMixin):
result += xml_string[maxi:xml_string.find('</xupdate:element>')]
result += '</' + xml.attributes[0].nodeValue + '>'
return self.convertToXml(result)
if xml.nodeName in (self.XUPDATE_UPDATE+self.XUPDATE_DEL):
result = u'<'
for subnode in self.getAttributeNodeList(xml):
if subnode.nodeName == 'select':
attribute = subnode.nodeValue
s = '[@id='
s_place = attribute.find(s)
select_id = None
if (s_place > 0):
select_id = attribute[s_place+len(s):]
select_id = select_id[:select_id.find("'",1)+1]
else:
s_place = len(attribute)
property = attribute[:s_place]
if property.find('/')==0:
property = property[1:]
result += property
if select_id is not None:
result += ' id=%s' % select_id
result += '>'
# Then dumps the xml and remove what we does'nt want
xml_string = StringIO()
PrettyPrint(xml,xml_string)
xml_string = xml_string.getvalue()
xml_string = unicode(xml_string,encoding='utf-8')
maxi = xml_string.find('>')+1
result += xml_string[maxi:xml_string.find('</%s>' % xml.nodeName)]
result += '</' + property + '>'
LOG('getElementFromXupdate, result:',0,repr(result))
return self.convertToXml(result)
return xml
security.declareProtected(Permissions.AccessContentsInformation,'getWorkflowActionFromXml')
......@@ -833,6 +869,8 @@ class ERP5Conduit(XMLSyncUtilsMixin):
data = data.replace('\n','')
if type(data) is type(u"a"):
data = data.encode(self.getEncoding())
if data=='None':
return None
# We can now convert string in tuple, dict, binary...
if data_type in self.list_type_list:
if type(data) is type('a'):
......@@ -891,26 +929,47 @@ class ERP5Conduit(XMLSyncUtilsMixin):
return conflict_list
def isWorkflowActionAddable(self, object=None,status=None,wf_tool=None,xml=None):
def isWorkflowActionAddable(self, object=None,status=None,wf_tool=None,
wf_id=None,xml=None):
"""
Some checking in order to check if we should add the workfow or not
"""
conflict_list = []
if status.has_key('action'):
action_name = status['action']
authorized = 0
authorized_actions = wf_tool.getActionsFor(object)
LOG('isWorkflowActionAddable, status:',0,status)
LOG('isWorkflowActionAddable, authorized_actions:',0,authorized_actions)
for action in authorized_actions:
if action['id']==action_name:
authorized = 1
if not authorized:
string_io = StringIO()
PrettyPrint(xml,stream=string_io)
conflict = Conflict(object_path=object.getPhysicalPath(),
keyword=self.history_tag)
conflict.setXupdate(string_io.getvalue())
conflict.setRemoteValue(status)
conflict_list += [conflict]
return conflict_list
We should not returns a conflict list as we wanted before, we should
instead go to a conflict state.
"""
# We first test if the status in not already inside the workflow_history
wf_history = object.workflow_history
if wf_history.has_key(wf_id):
action_list = wf_history[wf_id]
else: action_list = []
addable = 1
for action in action_list:
this_one = 1
for key in action.keys():
if status[key] != action[key]:
this_one = 0
break
if this_one:
addable = 0
break
return addable
# This doesn't works fine because all workflows variables
# are not set the same way
# if status.has_key('action'):
# action_name = status['action']
# authorized = 0
# authorized_actions = wf_tool.getActionsFor(object)
# LOG('isWorkflowActionAddable, status:',0,status)
# LOG('isWorkflowActionAddable, authorized_actions:',0,authorized_actions)
# for action in authorized_actions:
# if action['id']==action_name:
# authorized = 1
# if not authorized:
# string_io = StringIO()
# PrettyPrint(xml,stream=string_io)
# conflict = Conflict(object_path=object.getPhysicalPath(),
# keyword=self.history_tag)
# conflict.setXupdate(string_io.getvalue())
# conflict.setRemoteValue(status)
# conflict_list += [conflict]
# return conflict_list
......@@ -187,9 +187,9 @@ class Signature(SyncCode):
"""
# Constructor
def __init__(self,gid=None, status=None, xml_string=None):
def __init__(self,gid=None, id=None, status=None, xml_string=None):
self.setGid(gid)
self.setId(None)
self.setId(id)
self.status = status
self.setXML(xml_string)
self.partial_xml = None
......@@ -595,14 +595,14 @@ class Subscription(SyncCode, Implicit):
"""
self.xml_mapping = xml_mapping
def setGidGenerator(self, method_id):
def setGidGenerator(self, method):
"""
This set the method name wich allows to find a gid
from any object
"""
if method_id in (None,''):
method_id = 'getId'
self.gid_generator = method_id
if method in (None,''):
method = 'getId'
self.gid_generator = method
def getGidGenerator(self):
"""
......@@ -611,6 +611,22 @@ class Subscription(SyncCode, Implicit):
"""
return self.gid_generator
def getGidFromObject(self, object):
"""
"""
o_base = aq_base(object)
o_gid = None
LOG('getGidFromObject',0,'gidgenerator : %s' % repr(self.getGidGenerator()))
gid_gen = self.getGidGenerator()
if callable(gid_gen):
o_gid=gid_gen(object)
elif hasattr(o_base, gid_gen):
LOG('getGidFromObject',0,'there is the gid generator')
generator = getattr(object, self.getGidGenerator())
o_gid = generator()
LOG('getGidFromObject',0,'o_gid: %s' % repr(o_gid))
return o_gid
def getObjectFromGid(self, gid):
"""
This tries to get the object with the given gid
......@@ -619,15 +635,9 @@ class Subscription(SyncCode, Implicit):
signature = self.getSignature(gid)
# First look if we do already have the mapping between
# the id and the gid
# query_list = []
# query = self.getQuery()
# if query is type('a'):
# query_method = getattr(object,self.getQuery(),None)
# query_list = query()
# if callable(query):
# query_list = query(self)
object_list = self.getObjectList()
destination = self.getDestination()
LOG('getObjectFromGid',0,'gid: %s' % repr(gid))
if signature is not None:
o_id = signature.getId()
o = None
......@@ -639,16 +649,9 @@ class Subscription(SyncCode, Implicit):
return o
for o in object_list:
LOG('getObjectFromGid',0,'working on : %s' % repr(o))
o_base = aq_base(o)
LOG('getObjectFromGid',0,'gidgenerator : %s' % repr(self.getGidGenerator()))
if hasattr(o_base, self.getGidGenerator()):
LOG('getObjectFromGid',0,'there is the gid generator')
generator = getattr(o, self.getGidGenerator())
o_gid = generator()
LOG('getObjectFromGid',0,'o_gid: %s' % repr(o_gid))
LOG('getObjectFromGid',0,'gid: %s' % repr(gid))
if o_gid == gid:
return o
o_gid = self.getGidFromObject(o)
if o_gid == gid:
return o
LOG('getObjectFromGid',0,'returning None')
return None
......@@ -675,20 +678,27 @@ class Subscription(SyncCode, Implicit):
"""
This tries to generate a new Id
"""
if self.getIdGenerator() is not None:
LOG('generateNewId, object: ',0,object.getPhysicalPath())
id_generator = self.getIdGenerator()
LOG('generateNewId, id_generator: ',0,id_generator)
if id_generator is not None:
o_base = aq_base(object)
if hasattr(aq_base, self.getIdGenerator()):
generator = getattr(o, self.getIdGenerator())
new_id = None
if callable(id_generator):
new_id = id_generator(object)
elif hasattr(o_base, id_generator):
generator = getattr(object, id_generator)
new_id = generator()
return new_id
LOG('generateNewId, new_id: ',0,new_id)
return new_id
return None
def setIdGenerator(self, method_id):
def setIdGenerator(self, method):
"""
This set the method name wich allows to generate
a new id
"""
self.id_generator = method_id
self.id_generator = method
def getIdGenerator(self):
"""
......
......@@ -198,14 +198,8 @@ class XMLSyncUtilsMixin(SyncCode):
file2 = open('/tmp/sync_old_object','w')
file2.write(old_xml)
file2.close()
#xupdate = commands.getoutput('xmldiff -xg /tmp/sync_old_object /tmp/sync_new_object')
xupdate = commands.getoutput('erp5diff /tmp/sync_old_object /tmp/sync_new_object')
xupdate = xupdate[xupdate.find('<xupdate:modifications'):]
# XXX To be removed, this is only needed for xmldiff with does bad things
#while xupdate.find('xupdate:move')>0:
# LOG('getXupdateObject',0,'Removing the move section')
# xupdate = xupdate[:xupdate.find('<xupdate:move')] + \
# xupdate[xupdate.find('</xupdate:move>\n')+16:]
return xupdate
def getXMLObject(self, object=None, xml_mapping=None):
......@@ -318,6 +312,17 @@ class XMLSyncUtilsMixin(SyncCode):
return int(subnode.childNodes[0].data)
return None
def getStatusCommand(self, xml):
"""
Return the value of the command inside the xml_stream
"""
# Get informations from the body
if xml.nodeName=='Status':
for subnode in xml.childNodes:
if subnode.nodeType == subnode.ELEMENT_NODE and subnode.nodeName == 'Cmd':
return subnode.childNodes[0].data
return None
def getAlertCode(self, xml_stream):
"""
Return the value of the alert code inside the full syncml message
......@@ -428,20 +433,20 @@ class XMLSyncUtilsMixin(SyncCode):
return subnode
return next_status
def getActionObjectId(self, action):
"""
XXX Deprecated
Return the id of the object described by the action
"""
for subnode in action.childNodes:
if subnode.nodeType == subnode.ELEMENT_NODE and subnode.nodeName == 'Item':
for subnode2 in subnode.childNodes:
if subnode2.nodeType == subnode2.ELEMENT_NODE and subnode2.nodeName == 'Data':
for subnode3 in subnode2.childNodes:
if subnode3.nodeType == subnode3.ELEMENT_NODE and subnode3.nodeName == 'object':
for subnode4 in subnode3.childNodes:
if subnode4.nodeType == subnode4.ELEMENT_NODE and subnode4.nodeName == 'id':
return str(subnode4.childNodes[0].data)
# def getActionObjectId(self, action):
# """
# XXX Deprecated
# Return the id of the object described by the action
# """
# for subnode in action.childNodes:
# if subnode.nodeType == subnode.ELEMENT_NODE and subnode.nodeName == 'Item':
# for subnode2 in subnode.childNodes:
# if subnode2.nodeType == subnode2.ELEMENT_NODE and subnode2.nodeName == 'Data':
# for subnode3 in subnode2.childNodes:
# if subnode3.nodeType == subnode3.ELEMENT_NODE and subnode3.nodeName == 'object':
# for subnode4 in subnode3.childNodes:
# if subnode4.nodeType == subnode4.ELEMENT_NODE and subnode4.nodeName == 'id':
# return str(subnode4.childNodes[0].data)
#return subnode4.childNodes[0].data
def getDataSubNode(self, action):
......@@ -562,20 +567,14 @@ class XMLSyncUtilsMixin(SyncCode):
"""
local_gid_list = []
syncml_data = ''
# store_xupdate = 0
# if object is None:
# object_list = domain.getObjectList()
# else:
# store_xupdate = 1
# object_list = [object]
for object in domain.getObjectList():
status = self.SENT
gid_generator = getattr(object,domain.getGidGenerator(),None)
object_gid = None
if gid_generator is not None:
object_gid = gid_generator()
local_gid_list += [object_gid]
#gid_generator = getattr(object,domain.getGidGenerator(),None)
object_gid = domain.getGidFromObject(object)
local_gid_list += [object_gid]
#if gid_generator is not None:
# object_gid = gid_generator()
force = 0
if syncml_data.count('\n') < self.MAX_LINES and (object.id.find('.')!=0): # If not we have to cut
xml_object = self.getXMLObject(object=object,xml_mapping=domain.xml_mapping)
......@@ -597,7 +596,7 @@ class XMLSyncUtilsMixin(SyncCode):
#LOG('PubSyncModif',0,'Current object.getPath: %s' % object.getPath())
LOG('getSyncMLData',0,'no signature for gid: %s' % object_gid)
xml_string = xml_object
signature = Signature(gid=object_gid)
signature = Signature(gid=object_gid,id=object.getId())
signature.setTempXML(xml_object)
if xml_string.count('\n') > self.MAX_LINES:
more_data=1
......@@ -710,7 +709,6 @@ class XMLSyncUtilsMixin(SyncCode):
# were not able to create
syncml_data += self.deleteXMLObject(xml_object=signature.getXML() or '',
object_gid=object_gid,cmd_id=cmd_id)
subscriber.delSignature(object_gid)
return (syncml_data,xml_confirmation,cmd_id)
......@@ -750,7 +748,6 @@ class XMLSyncUtilsMixin(SyncCode):
data_subnode = self.getDataSubNode(next_action)
if next_action.nodeName == 'Add':
# Then store the xml of this new subobject
#object = domain.getObjectFromGid(object=destination_path,gid=object_gid)
if object is None:
object_id = domain.generateNewId(object=destination_path)
conflict_list += conduit.addNode(xml=data_subnode, object=destination_path,
......@@ -763,7 +760,6 @@ class XMLSyncUtilsMixin(SyncCode):
xml_object = ''
if mapping is not None:
xml_object = mapping()
#xml_object = object.asXML()
signature.setStatus(self.SYNCHRONIZED)
signature.setId(object.getId())
signature.setXML(xml_object)
......@@ -785,7 +781,6 @@ class XMLSyncUtilsMixin(SyncCode):
xml_object = ''
if mapping is not None:
xml_object = mapping()
#xml_object = object.asXML()
signature.setTempXML(xml_object)
if conflict_list != []:
status_code = self.CONFLICT
......@@ -809,16 +804,14 @@ class XMLSyncUtilsMixin(SyncCode):
data_subnode_string = string_io.getvalue()
LOG('applyActionList, subscriber_xupdate:',0,data_subnode_string)
signature.setSubscriberXupdate(data_subnode_string)
# xml_string = self.getXupdateObject(object=object,
# xml_mapping=domain.xml_mapping,
# old_xml=signature.getXML())
# signature.setPublisherXupdate(xml_string) XXX is it needed ??
elif next_action.nodeName == 'Delete':
object_id = object.id
object_id = signature.getId()
conduit.deleteNode(xml=self.getDataSubNode(next_action), object=destination_path,
object_id=self.getActionId(next_action))
object_id=object_id)
subscriber.delSignature(object_gid)
xml_confirmation += self.SyncMLConfirmation(cmd_id,
object_gid,status_code,'Delete')
else: # We want to retrieve more data
signature.setStatus(self.PARTIAL)
#LOG('SyncModif',0,'setPartialXML: %s' % str(previous_partial))
......@@ -851,23 +844,28 @@ class XMLSyncUtilsMixin(SyncCode):
while next_status != None:
object_gid = self.getStatusTarget(next_status)
status_code = self.getStatusCode(next_status)
status_cmd = self.getStatusCommand(next_status)
signature = subscriber.getSignature(object_gid)
LOG('SyncModif',0,'next_status: %s' % str(status_code))
if status_code == self.CHUNK_OK:
destination_waiting_more_data = 1
signature.setStatus(self.PARTIAL)
elif status_code == self.CONFLICT:
signature.setStatus(self.CONFLICT)
elif status_code == self.CONFLICT_MERGE:
# We will have to apply the update, and we should not care about conflicts
# so we have to force the update
signature.setStatus(self.NOT_SYNCHRONIZED)
signature.setForce(1)
elif status_code == self.CONFLICT_CLIENT_WIN:
# The server was agree to apply our updates, nothing to do
signature.setStatus(self.SYNCHRONIZED)
elif status_code == self.SUCCESS:
signature.setStatus(self.SYNCHRONIZED)
if status_cmd in ('Add','Replace'):
if status_code == self.CHUNK_OK:
destination_waiting_more_data = 1
signature.setStatus(self.PARTIAL)
elif status_code == self.CONFLICT:
signature.setStatus(self.CONFLICT)
elif status_code == self.CONFLICT_MERGE:
# We will have to apply the update, and we should not care about conflicts
# so we have to force the update
signature.setStatus(self.NOT_SYNCHRONIZED)
signature.setForce(1)
elif status_code == self.CONFLICT_CLIENT_WIN:
# The server was agree to apply our updates, nothing to do
signature.setStatus(self.SYNCHRONIZED)
elif status_code == self.SUCCESS:
signature.setStatus(self.SYNCHRONIZED)
elif status_cmd == 'Delete':
if status_code == self.SUCCESS:
subscriber.delSignature(object_gid)
next_status = self.getNextSyncBodyStatus(remote_xml, next_status)
return (destination_waiting_more_data, has_status_list)
......
......@@ -53,15 +53,24 @@ class TestERP5SyncML(ERP5TypeTestCase):
# Different variables used for this test
run_all_test = 1
workflow_id = 'edit_workflow'
first_name1 = 'Sebastien'
last_name1 = 'Robin'
description1 = 'description1 $sdfr_sdfsdf_oisfsopf'
lang1 = 'fr'
format2 = 'html'
format3 = 'xml'
format4 = 'txt'
first_name2 = 'Jean-Paul'
last_name2 = 'Smets'
description2 = 'description2@ $*< <<< >>>></title>&oekd'
lang2 = 'en'
first_name3 = 'Yoshinori'
last_name3 = 'Okuji'
description3 = 'description3 sdf__sdf_df___&&]]]'
description4 = 'description4 sdflkmooo^^^^]]]]]{{{{{{{'
lang3 = 'jp'
lang4 = 'ca'
xml_mapping = 'asXML'
id1 = '170'
id2 = '171'
......@@ -153,8 +162,8 @@ class TestERP5SyncML(ERP5TypeTestCase):
def login(self, quiet=0, run=run_all_test):
uf = self.getPortal().acl_users
uf._doAddUser('ERP5TypeTestCase', '', ['Manager'], [])
user = uf.getUserById('ERP5TypeTestCase').__of__(uf)
uf._doAddUser('seb', '', ['Manager'], [])
user = uf.getUserById('seb').__of__(uf)
newSecurityManager(None, user)
def populatePersonServer(self, quiet=0, run=run_all_test):
......@@ -178,9 +187,24 @@ class TestERP5SyncML(ERP5TypeTestCase):
return nb_person
def setupPublicationAndSubscription(self, quiet=0, run=run_all_test):
self.testAddPublication(quiet=quiet,run=run)
self.testAddSubscription1(quiet=quiet,run=run)
self.testAddSubscription2(quiet=quiet,run=run)
self.testAddPublication(quiet=1,run=1)
self.testAddSubscription1(quiet=1,run=1)
self.testAddSubscription2(quiet=1,run=1)
def setupPublicationAndSubscriptionAndGid(self, quiet=0, run=run_all_test):
self.setupPublicationAndSubscription(quiet=1,run=1)
def getGid(object):
return object.getTitle()
portal_sync = self.getSynchronizationTool()
sub1 = portal_sync.getSubscription(self.sub_id1)
sub2 = portal_sync.getSubscription(self.sub_id2)
pub = portal_sync.getPublication(self.pub_id)
pub.setGidGenerator(getGid)
sub1.setGidGenerator(getGid)
sub2.setGidGenerator(getGid)
pub.setIdGenerator('generateNewId')
sub1.setIdGenerator('generateNewId')
sub2.setIdGenerator('generateNewId')
def testGetSynchronizationList(self, quiet=0, run=run_all_test):
# This test the getSynchronizationList, ie,
......@@ -196,9 +220,11 @@ class TestERP5SyncML(ERP5TypeTestCase):
self.failUnless(len(synchronization_list)==self.nb_synchronization)
def testGetObjectList(self, quiet=0, run=run_all_test):
# This test the default getObjectList, ie, when the
# query is 'objectValues', and this also test if we enter
# a new method for the query
"""
This test the default getObjectList, ie, when the
query is 'objectValues', and this also test if we enter
a new method for the query
"""
if not run: return
if not quiet:
ZopeTestCase._print('\nTest getObjectList ')
......@@ -224,8 +250,10 @@ class TestERP5SyncML(ERP5TypeTestCase):
self.failUnless(len(object_list)==1)
def testExportImport(self, quiet=0, run=run_all_test):
# We will try to export a person with asXML
# And then try to add it to another folder with a conduit
"""
We will try to export a person with asXML
And then try to add it to another folder with a conduit
"""
if not run: return
if not quiet:
ZopeTestCase._print('\nTest Export and Import ')
......@@ -242,10 +270,17 @@ class TestERP5SyncML(ERP5TypeTestCase):
new_object = person_client1._getOb(self.id1)
self.failUnless(new_object.getLastName()==self.last_name1)
self.failUnless(new_object.getFirstName()==self.first_name1)
# XXX We should also looks at the workflow history
self.failUnless(len(new_object.workflow_history[self.workflow_id])==2)
s_local_role = person_server.get_local_roles()
c_local_role = person_client1.get_local_roles()
self.assertEqual(s_local_role,c_local_role)
def synchronize(self, id, run=run_all_test):
# This just define how we synchronize, we have
# to define it here because it is specific to the unit testing
"""
This just define how we synchronize, we have
to define it here because it is specific to the unit testing
"""
portal_sync = self.getSynchronizationTool()
portal_sync.email = None
subscription = portal_sync.getSubscription(id)
......@@ -489,6 +524,7 @@ class TestERP5SyncML(ERP5TypeTestCase):
- id1
- id1
- id2
- id1
- id2
"""
if not run: return
......@@ -501,12 +537,18 @@ class TestERP5SyncML(ERP5TypeTestCase):
kw = {'first_name':self.first_name1,'last_name':self.last_name1,
'description':self.description1}
sub_person1.edit(**kw)
sub_sub_person = sub_person1.newContent(id=self.id2,portal_type='Person')
sub_sub_person1 = sub_person1.newContent(id=self.id1,portal_type='Person')
kw = {'first_name':self.first_name1,'last_name':self.last_name1,
'description':self.description1}
sub_sub_person1.edit(**kw)
sub_sub_person2 = sub_person1.newContent(id=self.id2,portal_type='Person')
kw = {'first_name':self.first_name2,'last_name':self.last_name2,
'description':self.description2}
sub_sub_person.edit(**kw)
sub_sub_person2.edit(**kw)
# remove ('','portal...','person_server')
len_path = len(sub_sub_person.getPhysicalPath()) - 3
len_path = len(sub_sub_person1.getPhysicalPath()) - 3
self.failUnless(len_path==3)
len_path = len(sub_sub_person2.getPhysicalPath()) - 3
self.failUnless(len_path==3)
def testAddSubObject(self, quiet=0, run=run_all_test):
......@@ -522,23 +564,32 @@ class TestERP5SyncML(ERP5TypeTestCase):
LOG('Testing... ',0,'testAddSubObject')
self.populatePersonServerWithSubObject(quiet=1,run=1)
self.synchronize(self.sub_id1)
self.synchronize(self.sub_id2)
self.checkSynchronizationStateIsSynchronized()
person_client1 = self.getPersonClient1()
person1_c = person_client1._getOb(self.id1)
sub_person1_c = person1_c._getOb(self.id1)
sub_sub_person = sub_person1_c._getOb(self.id2)
sub_sub_person1 = sub_person1_c._getOb(self.id1)
sub_sub_person2 = sub_person1_c._getOb(self.id2)
# remove ('','portal...','person_server')
len_path = len(sub_sub_person.getPhysicalPath()) - 3
len_path = len(sub_sub_person1.getPhysicalPath()) - 3
self.failUnless(len_path==3)
self.failUnless(sub_sub_person.getDescription()==self.description2)
self.failUnless(sub_sub_person.getFirstName()==self.first_name2)
self.failUnless(sub_sub_person.getLastName()==self.last_name2)
def testUpdateSubObject(self, quiet=0, run=1):
len_path = len(sub_sub_person2.getPhysicalPath()) - 3
self.failUnless(len_path==3)
self.failUnless(sub_sub_person1.getDescription()==self.description1)
self.failUnless(sub_sub_person1.getFirstName()==self.first_name1)
self.failUnless(sub_sub_person1.getLastName()==self.last_name1)
self.failUnless(sub_sub_person2.getDescription()==self.description2)
self.failUnless(sub_sub_person2.getFirstName()==self.first_name2)
self.failUnless(sub_sub_person2.getLastName()==self.last_name2)
def testUpdateSubObject(self, quiet=0, run=run_all_test):
"""
In this test, we start with a tree of object already
In this test, we start with a tree of object already
synchronized, then we update a subobject, and we will see
if it is updated correctly
if it is updated correctly.
To make this test a bit more harder, we will update on both
the client and the server by the same time
"""
if not run: return
self.testAddSubObject(quiet=1,run=1)
......@@ -562,6 +613,326 @@ class TestERP5SyncML(ERP5TypeTestCase):
self.failUnless(sub_sub_person_s.getDescription()==self.description3)
self.failUnless(sub_sub_person_s.getFirstName()==self.first_name3)
def testDeleteObject(self, quiet=0, run=run_all_test):
"""
We will do a first synchronization, then delete an object on both
sides, and we will see if nothing is left on the server and also
on the two clients
"""
if not run: return
self.testFirstSynchronization(quiet=1,run=1)
if not quiet:
ZopeTestCase._print('\nTest Delete Object ')
LOG('Testing... ',0,'testDeleteObject')
person_server = self.getPersonServer()
person_server.manage_delObjects(self.id1)
person_client1 = self.getPersonClient1()
person_client1.manage_delObjects(self.id2)
self.synchronize(self.sub_id1)
self.synchronize(self.sub_id2)
self.checkSynchronizationStateIsSynchronized()
portal_sync = self.getSynchronizationTool()
publication = portal_sync.getPublication(self.pub_id)
subscription1 = portal_sync.getSubscription(self.sub_id1)
subscription2 = portal_sync.getSubscription(self.sub_id2)
self.failUnless(len(publication.getObjectList())==0)
self.failUnless(len(subscription1.getObjectList())==0)
self.failUnless(len(subscription2.getObjectList())==0)
def testDeleteSubObject(self, quiet=0, run=run_all_test):
"""
We will do a first synchronization, then delete a sub-object on both
sides, and we will see if nothing is left on the server and also
on the two clients
- before : after :
- id1 - id1
- id1 - id1
- id2 - id2
- id1
- id2
"""
if not run: return
self.testAddSubObject(quiet=1,run=1)
if not quiet:
ZopeTestCase._print('\nTest Delete Sub Object ')
LOG('Testing... ',0,'testDeleteSubObject')
person_server = self.getPersonServer()
sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
sub_object_s.manage_delObjects(self.id1)
person_client1 = self.getPersonClient1()
sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
sub_object_c1.manage_delObjects(self.id2)
person_client2 = self.getPersonClient2()
sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
self.synchronize(self.sub_id1)
self.synchronize(self.sub_id2)
self.checkSynchronizationStateIsSynchronized()
len_s = len(sub_object_s.objectValues())
len_c1 = len(sub_object_c1.objectValues())
len_c2 = len(sub_object_c2.objectValues())
self.failUnless(len_s==len_c1==len_c2==0)
def testGetConflictListOnSubObject(self, quiet=0, run=run_all_test):
"""
We will change several attributes on a sub object on both the server
and a client, then we will see if we have correctly the conflict list
"""
if not run: return
self.testAddSubObject(quiet=1,run=1)
if not quiet:
ZopeTestCase._print('\nTest Get Conflict List On Sub Object ')
LOG('Testing... ',0,'testGetConflictListOnSubObject')
person_server = self.getPersonServer()
object_s = person_server._getOb(self.id1)
sub_object_s = object_s._getOb(self.id1)
person_client1 = self.getPersonClient1()
sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
person_client2 = self.getPersonClient2()
sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
# Change values so that we will get conflicts
kw = {'language':self.lang2,'description':self.description2}
sub_object_s.edit(**kw)
kw = {'language':self.lang3,'description':self.description3}
sub_object_c1.edit(**kw)
self.synchronize(self.sub_id1)
portal_sync = self.getSynchronizationTool()
conflict_list = portal_sync.getConflictList()
self.failUnless(len(conflict_list)==2)
conflict_list = portal_sync.getConflictList(sub_object_c1)
self.failUnless(len(conflict_list)==0)
conflict_list = portal_sync.getConflictList(object_s)
self.failUnless(len(conflict_list)==0)
conflict_list = portal_sync.getConflictList(sub_object_s)
self.failUnless(len(conflict_list)==2)
def testApplyPublisherDocumentOnSubObject(self, quiet=0, run=run_all_test):
"""
there's several conflict on a sub object, we will see if we can
correctly have the publisher version of this document
"""
if not run: return
self.testGetConflictListOnSubObject(quiet=1,run=1)
if not quiet:
ZopeTestCase._print('\nTest Apply Publisher Document On Sub Object ')
LOG('Testing... ',0,'testApplyPublisherDocumentOnSubObject')
portal_sync = self.getSynchronizationTool()
conflict_list = portal_sync.getConflictList()
conflict = conflict_list[0]
conflict.applyPublisherDocument()
person_server = self.getPersonServer()
sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
person_client1 = self.getPersonClient1()
sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
person_client2 = self.getPersonClient2()
sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
self.synchronize(self.sub_id1)
self.synchronize(self.sub_id2)
self.checkSynchronizationStateIsSynchronized()
self.failUnless(sub_object_s.getDescription()==self.description2)
self.failUnless(sub_object_s.getLanguage()==self.lang2)
self.failUnless(sub_object_c1.getDescription()==self.description2)
self.failUnless(sub_object_c1.getLanguage()==self.lang2)
self.failUnless(sub_object_c2.getDescription()==self.description2)
self.failUnless(sub_object_c2.getLanguage()==self.lang2)
def testApplySubscriberDocumentOnSubObject(self, quiet=0, run=run_all_test):
"""
there's several conflict on a sub object, we will see if we can
correctly have the subscriber version of this document
"""
if not run: return
self.testGetConflictListOnSubObject(quiet=1,run=1)
if not quiet:
ZopeTestCase._print('\nTest Apply Subscriber Document On Sub Object ')
LOG('Testing... ',0,'testApplySubscriberDocumentOnSubObject')
portal_sync = self.getSynchronizationTool()
conflict_list = portal_sync.getConflictList()
conflict = conflict_list[0]
conflict.applySubscriberDocument()
person_server = self.getPersonServer()
sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
person_client1 = self.getPersonClient1()
sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
person_client2 = self.getPersonClient2()
sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
self.synchronize(self.sub_id1)
self.synchronize(self.sub_id2)
self.checkSynchronizationStateIsSynchronized()
self.failUnless(sub_object_s.getDescription()==self.description3)
self.failUnless(sub_object_s.getLanguage()==self.lang3)
self.failUnless(sub_object_c1.getDescription()==self.description3)
self.failUnless(sub_object_c1.getLanguage()==self.lang3)
self.failUnless(sub_object_c2.getDescription()==self.description3)
self.failUnless(sub_object_c2.getLanguage()==self.lang3)
def testSynchronizeWithStrangeGid(self, quiet=0, run=run_all_test):
"""
By default, the synchronization process use the id in order to
recognize objects (because by default, getGid==getId. Here, we will see
if it also works with a somewhat strange getGid
"""
if not run: return
if not quiet:
ZopeTestCase._print('\nTest Synchronize With Strange Gid ')
LOG('Testing... ',0,'testSynchronizeWithStrangeGid')
self.login()
self.setupPublicationAndSubscriptionAndGid(quiet=1,run=1)
nb_person = self.populatePersonServer(quiet=1,run=1)
# This will test adding object
self.synchronize(self.sub_id1)
self.checkSynchronizationStateIsSynchronized()
portal_sync = self.getSynchronizationTool()
subscription1 = portal_sync.getSubscription(self.sub_id1)
self.failUnless(len(subscription1.getObjectList())==nb_person)
publication = portal_sync.getPublication(self.pub_id)
self.failUnless(len(publication.getObjectList())==nb_person)
gid = self.first_name1 + ' ' + self.last_name1 # ie the title 'Sebastien Robin'
person_c1 = subscription1.getObjectFromGid(gid)
id_c1 = person_c1.getId()
self.failUnless(id_c1 in ('1','2')) # id given by the default generateNewId
person_s = publication.getObjectFromGid(gid)
id_s = person_s.getId()
self.failUnless(id_s==self.id1)
# This will test updating object
person_s.setDescription(self.description3)
self.synchronize(self.sub_id1)
self.checkSynchronizationStateIsSynchronized()
self.failUnless(person_s.getDescription()==self.description3)
self.failUnless(person_c1.getDescription()==self.description3)
# This will test deleting object
person_server = self.getPersonServer()
person_client1 = self.getPersonClient1()
person_server.manage_delObjects(self.id2)
self.synchronize(self.sub_id1)
self.checkSynchronizationStateIsSynchronized()
self.failUnless(len(subscription1.getObjectList())==(nb_person-1))
self.failUnless(len(publication.getObjectList())==(nb_person-1))
person_s = publication.getObjectFromGid(gid)
person_c1 = subscription1.getObjectFromGid(gid)
self.failUnless(person_s.getDescription()==self.description3)
self.failUnless(person_c1.getDescription()==self.description3)
def testMultiNodeConflict(self, quiet=0, run=run_all_test):
"""
We will create conflicts with 3 differents nodes, and we will
solve it by taking one full version of documents.
"""
if not run: return
self.testFirstSynchronization(quiet=1,run=1)
if not quiet:
ZopeTestCase._print('\nTest Multi Node Conflict ')
LOG('Testing... ',0,'testMultiNodeConflict')
portal_sync = self.getSynchronizationTool()
person_server = self.getPersonServer()
person1_s = person_server._getOb(self.id1)
kw = {'language':self.lang2,'description':self.description2,
'format':self.format2}
person1_s.edit(**kw)
person_client1 = self.getPersonClient1()
person1_c1 = person_client1._getOb(self.id1)
kw = {'language':self.lang3,'description':self.description3,
'format':self.format3}
person1_c1.edit(**kw)
person_client2 = self.getPersonClient2()
person1_c2 = person_client2._getOb(self.id1)
kw = {'language':self.lang4,'description':self.description4,
'format':self.format4}
person1_c2.edit(**kw)
self.synchronize(self.sub_id1)
self.synchronize(self.sub_id2)
conflict_list = portal_sync.getConflictList()
self.failUnless(len(conflict_list)==6)
# we will take :
# description on person_server
# language on person_client1
# format on person_client2
for conflict in conflict_list:
subscriber = conflict.getSubscriber()
property = conflict.getPropertyId()
resolve = 0
if property == 'language':
if subscriber.getSubscriptionUrl()==self.subscription_url1:
resolve = 1
conflict.applySubscriberValue()
if property == 'format':
if subscriber.getSubscriptionUrl()==self.subscription_url2:
resolve = 1
conflict.applySubscriberValue()
if not resolve:
conflict.applyPublisherValue()
self.synchronize(self.sub_id1)
self.synchronize(self.sub_id2)
self.checkSynchronizationStateIsSynchronized()
self.failUnless(person1_s.getDescription()==self.description2)
self.failUnless(person1_c1.getDescription()==self.description2)
self.failUnless(person1_c2.getDescription()==self.description2)
self.failUnless(person1_s.getLanguage()==self.lang3)
self.failUnless(person1_c1.getLanguage()==self.lang3)
self.failUnless(person1_c2.getLanguage()==self.lang3)
self.failUnless(person1_s.getFormat()==self.format4)
self.failUnless(person1_c1.getFormat()==self.format4)
self.failUnless(person1_c2.getFormat()==self.format4)
def testSynchronizeWorkflowHistory(self, quiet=0, run=run_all_test):
"""
We will do a synchronization, then we will edit two times
the object on the server, then two times the object on the
client, and see if the global history as 4 more actions.
"""
if not run: return
self.testFirstSynchronization(quiet=1,run=1)
if not quiet:
ZopeTestCase._print('\nTest Synchronize WorkflowHistory ')
LOG('Testing... ',0,'testSynchronizeWorkflowHistory')
person_server = self.getPersonServer()
person1_s = person_server._getOb(self.id1)
person_client1 = self.getPersonClient1()
person1_c = person_client1._getOb(self.id1)
kw1 = {'description':self.description1}
kw2 = {'description':self.description2}
len_wf = len(person1_s.workflow_history[self.workflow_id])
person1_s.edit(**kw2)
person1_c.edit(**kw2)
person1_s.edit(**kw1)
person1_c.edit(**kw1)
self.synchronize(self.sub_id1)
self.checkSynchronizationStateIsSynchronized()
self.failUnless(len(person1_s.workflow_history[self.workflow_id])==len_wf+4)
self.failUnless(len(person1_c.workflow_history[self.workflow_id])==len_wf+4)
def testUpdateLocalRole(self, quiet=0, run=run_all_test):
"""
We will do a first synchronization, then modify, add and delete
an user role and see if it is correctly synchronized
"""
if not run: return
self.testFirstSynchronization(quiet=1,run=1)
if not quiet:
ZopeTestCase._print('\nTest Update Local Role ')
LOG('Testing... ',0,'testUpdateLocalRole')
# First, Create a new user
uf = self.getPortal().acl_users
uf._doAddUser('jp', '', ['Manager'], [])
user = uf.getUserById('jp').__of__(uf)
# then update create and delete roles
person_server = self.getPersonServer()
person1_s = person_server._getOb(self.id1)
person2_s = person_server._getOb(self.id2)
person_client1 = self.getPersonClient1()
person1_c = person_client1._getOb(self.id1)
person2_c = person_client1._getOb(self.id2)
person1_s.manage_setLocalRoles('seb',['Manager','Owner'])
person2_s.manage_setLocalRoles('jp',['Manager','Owner'])
person2_s.manage_delLocalRoles(['seb'])
self.synchronize(self.sub_id1)
self.synchronize(self.sub_id2)
role_1_s = person1_s.get_local_roles()
role_2_s = person2_s.get_local_roles()
role_1_c = person1_c.get_local_roles()
role_2_c = person2_c.get_local_roles()
self.assertEqual(role_1_s,role_1_c)
self.assertEqual(role_2_s,role_2_c)
if __name__ == '__main__':
framework()
else:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment