Commit 1d157542 authored by Sebastien Robin's avatar Sebastien Robin

we now check if we have not already received a syncml message


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@933 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 709fb7cf
...@@ -397,11 +397,11 @@ class Signature(SyncCode): ...@@ -397,11 +397,11 @@ class Signature(SyncCode):
Set the partial string we will have to Set the partial string we will have to
deliver in the future deliver in the future
""" """
LOG('Subscriber.setPartialXML before',0,'partial_xml: %s' % str(self.partial_xml)) #LOG('Subscriber.setPartialXML before',0,'partial_xml: %s' % str(self.partial_xml))
if type(xml) is type(u'a'): if type(xml) is type(u'a'):
xml = xml.encode('utf-8') xml = xml.encode('utf-8')
self.partial_xml = xml self.partial_xml = xml
LOG('Subscriber.setPartialXML after',0,'partial_xml: %s' % str(self.partial_xml)) #LOG('Subscriber.setPartialXML after',0,'partial_xml: %s' % str(self.partial_xml))
def getPartialXML(self): def getPartialXML(self):
""" """
...@@ -557,6 +557,32 @@ class Subscription(SyncCode, Implicit): ...@@ -557,6 +557,32 @@ class Subscription(SyncCode, Implicit):
LOG('Subscription',0,'getSynchronizationType: %s' % code) LOG('Subscription',0,'getSynchronizationType: %s' % code)
return code return code
def checkCorrectRemoteSessionId(self, session_id):
"""
We will see if the last session id was the same
wich means that the same message was sent again
return 1 if the session id was not seen, 0 if already seen
"""
last_session_id = getattr(self,'last_session_id',None)
if last_session_id == session_id:
return 0
self.last_session_id = session_id
return 1
def getLastSentMessage(self):
"""
This is the getter for the last message we have sent
"""
return getattr(self,'last_sent_message','')
def setLastSentMessage(self,xml):
"""
This is the setter for the last message we have sent
"""
self.last_sent_message = xml
def getLocalId(self, rid, path=None): def getLocalId(self, rid, path=None):
""" """
Returns the local id from a know remote id Returns the local id from a know remote id
......
...@@ -67,8 +67,6 @@ class SubscriptionSynchronization(XMLSyncUtils): ...@@ -67,8 +67,6 @@ class SubscriptionSynchronization(XMLSyncUtils):
xml += ' <Put>\n' xml += ' <Put>\n'
xml += ' <CmdID>%s</CmdID>\n' % cmd_id ; cmd_id += 1 xml += ' <CmdID>%s</CmdID>\n' % cmd_id ; cmd_id += 1
# TODO add the DTD and the value devinf11
# TODO here should be the client specification
xml += ' </Put>\n' xml += ' </Put>\n'
xml += ' </SyncBody>\n' xml += ' </SyncBody>\n'
......
...@@ -46,6 +46,7 @@ from AccessControl.SecurityManagement import newSecurityManager ...@@ -46,6 +46,7 @@ from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.User import UnrestrictedUser from AccessControl.User import UnrestrictedUser
from Acquisition import aq_base from Acquisition import aq_base
from xml.parsers.expat import ExpatError # parseString error
import urllib import urllib
import urllib2 import urllib2
import socket import socket
...@@ -716,7 +717,8 @@ class SynchronizationTool( UniqueObject, SimpleItem, ...@@ -716,7 +717,8 @@ class SynchronizationTool( UniqueObject, SimpleItem,
uf = self.acl_users uf = self.acl_users
user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'') user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'')
newSecurityManager(None, user) newSecurityManager(None, user)
self.activate(activity='RAMQueue').readResponse(sync_id=sync_id,text=result) #self.activate(activity='RAMQueue').readResponse(sync_id=sync_id,text=result)
self.readResponse(sync_id=sync_id,text=result)
security.declarePublic('sync') security.declarePublic('sync')
def sync(self): def sync(self):
......
...@@ -225,6 +225,20 @@ class XMLSyncUtilsMixin(SyncCode, ActiveObject): ...@@ -225,6 +225,20 @@ class XMLSyncUtilsMixin(SyncCode, ActiveObject):
xml += xml_method() xml += xml_method()
return xml return xml
def getSessionId(self, xml):
"""
We will retrieve the session id of the message
"""
session_id = 0
for subnode in self.getElementNodeList(xml):
if subnode.nodeName == 'SyncML':
for subnode1 in self.getElementNodeList(subnode):
if subnode1.nodeName == 'SyncHdr':
for subnode2 in self.getElementNodeList(subnode1):
if subnode2.nodeName == 'SessionID':
session_id = int(subnode2.childNodes[0].data)
return session_id
def getAlertLastAnchor(self, xml_stream): def getAlertLastAnchor(self, xml_stream):
""" """
Return the value of the last anchor, in the Return the value of the last anchor, in the
...@@ -905,6 +919,23 @@ class XMLSyncUtils(XMLSyncUtilsMixin): ...@@ -905,6 +919,23 @@ class XMLSyncUtils(XMLSyncUtilsMixin):
subscription_url = str(subnode.childNodes[0].data) subscription_url = str(subnode.childNodes[0].data)
subscriber = domain.getSubscriber(subscription_url) subscriber = domain.getSubscriber(subscription_url)
# We have to check if this message was not already, this can be dangerous
# to update two times the same object
session_id = self.getSessionId(remote_xml)
correct_session = subscriber.checkCorrectRemoteSessionId(session_id)
if not correct_session: # We need to send again the message
last_xml = subscriber.getLastSentMessage()
if last_xml != '':
has_response = 1
if domain.domain_type == self.PUB: # We always reply
self.sendResponse(from_url=domain.publication_url, to_url=subscriber.subscription_url,
sync_id=domain.id, xml=last_xml,domain=domain)
elif domain.domain_type == self.SUB:
self.sendResponse(from_url=domain.subscription_url, to_url=domain.publication_url,
sync_id=domain.id, xml=last_xml,domain=domain)
return {'has_response':has_response,'xml':last_xml}
subscriber.setLastSentMessage('')
# First apply the list of status codes # First apply the list of status codes
(destination_waiting_more_data,has_status_list) = self.applyStatusList( (destination_waiting_more_data,has_status_list) = self.applyStatusList(
subscriber=subscriber, subscriber=subscriber,
...@@ -979,6 +1010,7 @@ class XMLSyncUtils(XMLSyncUtilsMixin): ...@@ -979,6 +1010,7 @@ class XMLSyncUtils(XMLSyncUtilsMixin):
xml += ' </SyncBody>\n' xml += ' </SyncBody>\n'
xml += '</SyncML>\n' xml += '</SyncML>\n'
if domain.domain_type == self.PUB: # We always reply if domain.domain_type == self.PUB: # We always reply
subscriber.setLastSentMessage(xml)
self.sendResponse(from_url=domain.publication_url, to_url=subscriber.subscription_url, self.sendResponse(from_url=domain.publication_url, to_url=subscriber.subscription_url,
sync_id=domain.id, xml=xml,domain=domain) sync_id=domain.id, xml=xml,domain=domain)
has_response = 1 has_response = 1
...@@ -986,6 +1018,7 @@ class XMLSyncUtils(XMLSyncUtilsMixin): ...@@ -986,6 +1018,7 @@ class XMLSyncUtils(XMLSyncUtilsMixin):
if self.checkAlert(remote_xml) or \ if self.checkAlert(remote_xml) or \
(xml_confirmation,syncml_data)!=('','') or \ (xml_confirmation,syncml_data)!=('','') or \
has_status_list: has_status_list:
subscriber.setLastSentMessage(xml)
self.sendResponse(from_url=domain.subscription_url, to_url=domain.publication_url, self.sendResponse(from_url=domain.subscription_url, to_url=domain.publication_url,
sync_id=domain.id, xml=xml,domain=domain) sync_id=domain.id, xml=xml,domain=domain)
has_response = 1 has_response = 1
......
...@@ -325,6 +325,43 @@ class TestERP5SyncML(ERP5TypeTestCase): ...@@ -325,6 +325,43 @@ class TestERP5SyncML(ERP5TypeTestCase):
nb_message += 1 + result['has_response'] nb_message += 1 + result['has_response']
return nb_message return nb_message
def synchronizeWithBrokenMessage(self, id, run=run_all_test):
"""
This just define how we synchronize, we have
to define it here because it is specific to the unit testing
"""
portal_sync = self.getSynchronizationTool()
#portal_sync.email = None # XXX To be removed
subscription = portal_sync.getSubscription(id)
publication = None
for publication in portal_sync.getPublicationList():
if publication.getPublicationUrl()==subscription.getSubscriptionUrl():
publication = publication
self.failUnless(publication is not None)
# reset files, because we do sync by files
file = open('/tmp/sync_client1','w')
file.write('')
file.close()
file = open('/tmp/sync_client2','w')
file.write('')
file.close()
file = open('/tmp/sync','w')
file.write('')
file.close()
nb_message = 1
result = portal_sync.SubSync(subscription.getId())
while result['has_response']==1:
# We do thing three times, so that we will test
# if we manage well duplicate messages
portal_sync.PubSync(publication.getId())
portal_sync.PubSync(publication.getId())
portal_sync.PubSync(publication.getId())
result = portal_sync.SubSync(subscription.getId())
result = portal_sync.SubSync(subscription.getId())
result = portal_sync.SubSync(subscription.getId())
nb_message += 1 + result['has_response']
return nb_message
def testFirstSynchronization(self, quiet=0, run=run_all_test): def testFirstSynchronization(self, quiet=0, run=run_all_test):
# We will try to populate the folder person_client1 # We will try to populate the folder person_client1
# with the data form person_server # with the data form person_server
...@@ -1080,6 +1117,42 @@ class TestERP5SyncML(ERP5TypeTestCase): ...@@ -1080,6 +1117,42 @@ class TestERP5SyncML(ERP5TypeTestCase):
self.assertEquals(sub_sub_person2.getLastName(),self.last_name2) self.assertEquals(sub_sub_person2.getLastName(),self.last_name2)
SyncCode.MAX_LINES = previous_max_lines SyncCode.MAX_LINES = previous_max_lines
def testBrokenMessage(self, quiet=0, run=run_all_test):
"""
With http synchronization, when a message is not well
received, then we send message again, we want to
be sure that is such case we don't do stupid things
If we want to make this test more intersting, it is
better to split messages
"""
if not run: return
if not quiet:
ZopeTestCase._print('\nTest Broken Message ')
LOG('Testing... ',0,'testBrokenMessage')
previous_max_lines = SyncCode.MAX_LINES
SyncCode.MAX_LINES = 10
self.setupPublicationAndSubscription(quiet=1,run=1)
nb_person = self.populatePersonServer(quiet=1,run=1)
# Synchronize the first client
nb_message1 = self.synchronizeWithBrokenMessage(self.sub_id1)
#self.failUnless(nb_message1==self.nb_message_first_synchronization)
portal_sync = self.getSynchronizationTool()
subscription1 = portal_sync.getSubscription(self.sub_id1)
self.failUnless(len(subscription1.getObjectList())==nb_person)
person_server = self.getPersonServer() # We also check we don't
# modify initial ob
person1_s = person_server._getOb(self.id1)
self.failUnless(person1_s.getId()==self.id1)
self.failUnless(person1_s.getFirstName()==self.first_name1)
self.failUnless(person1_s.getLastName()==self.last_name1)
person_client1 = self.getPersonClient1()
person1_c = person_client1._getOb(self.id1)
self.failUnless(person1_c.getId()==self.id1)
self.failUnless(person1_c.getFirstName()==self.first_name1)
self.failUnless(person1_c.getLastName()==self.last_name1)
SyncCode.MAX_LINES = previous_max_lines
# We may add a test in order to check if the slow_sync mode works fine, ie # We may add a test in order to check if the slow_sync mode works fine, ie
# if we do have both object on the client and server side, we must make sure # if we do have both object on the client and server side, we must make sure
# that the server first sends is own data # that the server first sends is own data
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment