diff --git a/product/ERP5SyncML/Subscription.py b/product/ERP5SyncML/Subscription.py index 264cbe7f3c9a50c7543d660e6f3cd53939f1a9cd..b2d5bb02d6a1477e7360cbd14f39975b8b5b42ed 100755 --- a/product/ERP5SyncML/Subscription.py +++ b/product/ERP5SyncML/Subscription.py @@ -397,11 +397,11 @@ class Signature(SyncCode): Set the partial string we will have to deliver in the future """ - LOG('Subscriber.setPartialXML before',0,'partial_xml: %s' % str(self.partial_xml)) + #LOG('Subscriber.setPartialXML before',0,'partial_xml: %s' % str(self.partial_xml)) if type(xml) is type(u'a'): xml = xml.encode('utf-8') self.partial_xml = xml - LOG('Subscriber.setPartialXML after',0,'partial_xml: %s' % str(self.partial_xml)) + #LOG('Subscriber.setPartialXML after',0,'partial_xml: %s' % str(self.partial_xml)) def getPartialXML(self): """ @@ -557,6 +557,32 @@ class Subscription(SyncCode, Implicit): LOG('Subscription',0,'getSynchronizationType: %s' % code) return code + def checkCorrectRemoteSessionId(self, session_id): + """ + We will see if the last session id was the same + wich means that the same message was sent again + + return 1 if the session id was not seen, 0 if already seen + """ + last_session_id = getattr(self,'last_session_id',None) + if last_session_id == session_id: + return 0 + self.last_session_id = session_id + return 1 + + + def getLastSentMessage(self): + """ + This is the getter for the last message we have sent + """ + return getattr(self,'last_sent_message','') + + def setLastSentMessage(self,xml): + """ + This is the setter for the last message we have sent + """ + self.last_sent_message = xml + def getLocalId(self, rid, path=None): """ Returns the local id from a know remote id diff --git a/product/ERP5SyncML/SubscriptionSynchronization.py b/product/ERP5SyncML/SubscriptionSynchronization.py index eef0cc988b7bb12d7ad188cabab34135e56722c2..d07ee62e7fb1d69de5d5079caaaddf127beebe0d 100755 --- a/product/ERP5SyncML/SubscriptionSynchronization.py +++ b/product/ERP5SyncML/SubscriptionSynchronization.py @@ -67,8 +67,6 @@ class SubscriptionSynchronization(XMLSyncUtils): xml += ' <Put>\n' xml += ' <CmdID>%s</CmdID>\n' % cmd_id ; cmd_id += 1 - # TODO add the DTD and the value devinf11 - # TODO here should be the client specification xml += ' </Put>\n' xml += ' </SyncBody>\n' diff --git a/product/ERP5SyncML/SynchronizationTool.py b/product/ERP5SyncML/SynchronizationTool.py index 7eddaec0126dd22b45803e381d6e4ed04bed33f6..d3d6fa18a28175425f747ad91166a705daff70c2 100755 --- a/product/ERP5SyncML/SynchronizationTool.py +++ b/product/ERP5SyncML/SynchronizationTool.py @@ -46,6 +46,7 @@ from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import noSecurityManager from AccessControl.User import UnrestrictedUser from Acquisition import aq_base +from xml.parsers.expat import ExpatError # parseString error import urllib import urllib2 import socket @@ -716,7 +717,8 @@ class SynchronizationTool( UniqueObject, SimpleItem, uf = self.acl_users user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'') newSecurityManager(None, user) - self.activate(activity='RAMQueue').readResponse(sync_id=sync_id,text=result) + #self.activate(activity='RAMQueue').readResponse(sync_id=sync_id,text=result) + self.readResponse(sync_id=sync_id,text=result) security.declarePublic('sync') def sync(self): diff --git a/product/ERP5SyncML/XMLSyncUtils.py b/product/ERP5SyncML/XMLSyncUtils.py index 9715904c35b0f2e125a24f9413124e7eb6732621..e220d3b10cd169f7c915d46fabee632f59d5c529 100755 --- a/product/ERP5SyncML/XMLSyncUtils.py +++ b/product/ERP5SyncML/XMLSyncUtils.py @@ -225,6 +225,20 @@ class XMLSyncUtilsMixin(SyncCode, ActiveObject): xml += xml_method() return xml + def getSessionId(self, xml): + """ + We will retrieve the session id of the message + """ + session_id = 0 + for subnode in self.getElementNodeList(xml): + if subnode.nodeName == 'SyncML': + for subnode1 in self.getElementNodeList(subnode): + if subnode1.nodeName == 'SyncHdr': + for subnode2 in self.getElementNodeList(subnode1): + if subnode2.nodeName == 'SessionID': + session_id = int(subnode2.childNodes[0].data) + return session_id + def getAlertLastAnchor(self, xml_stream): """ Return the value of the last anchor, in the @@ -905,6 +919,23 @@ class XMLSyncUtils(XMLSyncUtilsMixin): subscription_url = str(subnode.childNodes[0].data) subscriber = domain.getSubscriber(subscription_url) + # We have to check if this message was not already, this can be dangerous + # to update two times the same object + session_id = self.getSessionId(remote_xml) + correct_session = subscriber.checkCorrectRemoteSessionId(session_id) + if not correct_session: # We need to send again the message + last_xml = subscriber.getLastSentMessage() + if last_xml != '': + has_response = 1 + if domain.domain_type == self.PUB: # We always reply + self.sendResponse(from_url=domain.publication_url, to_url=subscriber.subscription_url, + sync_id=domain.id, xml=last_xml,domain=domain) + elif domain.domain_type == self.SUB: + self.sendResponse(from_url=domain.subscription_url, to_url=domain.publication_url, + sync_id=domain.id, xml=last_xml,domain=domain) + return {'has_response':has_response,'xml':last_xml} + subscriber.setLastSentMessage('') + # First apply the list of status codes (destination_waiting_more_data,has_status_list) = self.applyStatusList( subscriber=subscriber, @@ -979,6 +1010,7 @@ class XMLSyncUtils(XMLSyncUtilsMixin): xml += ' </SyncBody>\n' xml += '</SyncML>\n' if domain.domain_type == self.PUB: # We always reply + subscriber.setLastSentMessage(xml) self.sendResponse(from_url=domain.publication_url, to_url=subscriber.subscription_url, sync_id=domain.id, xml=xml,domain=domain) has_response = 1 @@ -986,6 +1018,7 @@ class XMLSyncUtils(XMLSyncUtilsMixin): if self.checkAlert(remote_xml) or \ (xml_confirmation,syncml_data)!=('','') or \ has_status_list: + subscriber.setLastSentMessage(xml) self.sendResponse(from_url=domain.subscription_url, to_url=domain.publication_url, sync_id=domain.id, xml=xml,domain=domain) has_response = 1 diff --git a/product/ERP5SyncML/tests/testERP5SyncML.py b/product/ERP5SyncML/tests/testERP5SyncML.py index 40b636b8f424939a4bfec239657db72eaa6ace38..efd48a90af31bb46fb696f544c6ea03774a45c33 100755 --- a/product/ERP5SyncML/tests/testERP5SyncML.py +++ b/product/ERP5SyncML/tests/testERP5SyncML.py @@ -325,6 +325,43 @@ class TestERP5SyncML(ERP5TypeTestCase): nb_message += 1 + result['has_response'] return nb_message + def synchronizeWithBrokenMessage(self, id, run=run_all_test): + """ + This just define how we synchronize, we have + to define it here because it is specific to the unit testing + """ + portal_sync = self.getSynchronizationTool() + #portal_sync.email = None # XXX To be removed + subscription = portal_sync.getSubscription(id) + publication = None + for publication in portal_sync.getPublicationList(): + if publication.getPublicationUrl()==subscription.getSubscriptionUrl(): + publication = publication + self.failUnless(publication is not None) + # reset files, because we do sync by files + file = open('/tmp/sync_client1','w') + file.write('') + file.close() + file = open('/tmp/sync_client2','w') + file.write('') + file.close() + file = open('/tmp/sync','w') + file.write('') + file.close() + nb_message = 1 + result = portal_sync.SubSync(subscription.getId()) + while result['has_response']==1: + # We do thing three times, so that we will test + # if we manage well duplicate messages + portal_sync.PubSync(publication.getId()) + portal_sync.PubSync(publication.getId()) + portal_sync.PubSync(publication.getId()) + result = portal_sync.SubSync(subscription.getId()) + result = portal_sync.SubSync(subscription.getId()) + result = portal_sync.SubSync(subscription.getId()) + nb_message += 1 + result['has_response'] + return nb_message + def testFirstSynchronization(self, quiet=0, run=run_all_test): # We will try to populate the folder person_client1 # with the data form person_server @@ -1080,6 +1117,42 @@ class TestERP5SyncML(ERP5TypeTestCase): self.assertEquals(sub_sub_person2.getLastName(),self.last_name2) SyncCode.MAX_LINES = previous_max_lines + def testBrokenMessage(self, quiet=0, run=run_all_test): + """ + With http synchronization, when a message is not well + received, then we send message again, we want to + be sure that is such case we don't do stupid things + + If we want to make this test more intersting, it is + better to split messages + """ + if not run: return + if not quiet: + ZopeTestCase._print('\nTest Broken Message ') + LOG('Testing... ',0,'testBrokenMessage') + previous_max_lines = SyncCode.MAX_LINES + SyncCode.MAX_LINES = 10 + self.setupPublicationAndSubscription(quiet=1,run=1) + nb_person = self.populatePersonServer(quiet=1,run=1) + # Synchronize the first client + nb_message1 = self.synchronizeWithBrokenMessage(self.sub_id1) + #self.failUnless(nb_message1==self.nb_message_first_synchronization) + portal_sync = self.getSynchronizationTool() + subscription1 = portal_sync.getSubscription(self.sub_id1) + self.failUnless(len(subscription1.getObjectList())==nb_person) + person_server = self.getPersonServer() # We also check we don't + # modify initial ob + person1_s = person_server._getOb(self.id1) + self.failUnless(person1_s.getId()==self.id1) + self.failUnless(person1_s.getFirstName()==self.first_name1) + self.failUnless(person1_s.getLastName()==self.last_name1) + person_client1 = self.getPersonClient1() + person1_c = person_client1._getOb(self.id1) + self.failUnless(person1_c.getId()==self.id1) + self.failUnless(person1_c.getFirstName()==self.first_name1) + self.failUnless(person1_c.getLastName()==self.last_name1) + SyncCode.MAX_LINES = previous_max_lines + # We may add a test in order to check if the slow_sync mode works fine, ie # if we do have both object on the client and server side, we must make sure # that the server first sends is own data